diff options
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/README.md | 40 | ||||
-rw-r--r-- | database/engine/datafile.c | 224 | ||||
-rw-r--r-- | database/engine/datafile.h | 7 | ||||
-rw-r--r-- | database/engine/journalfile.c | 117 | ||||
-rw-r--r-- | database/engine/journalfile.h | 2 | ||||
-rw-r--r-- | database/engine/pagecache.c | 317 | ||||
-rw-r--r-- | database/engine/pagecache.h | 89 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 234 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 22 | ||||
-rw-r--r-- | database/engine/rrdengineapi.c | 277 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 12 | ||||
-rw-r--r-- | database/engine/rrdenginelib.c | 97 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 21 | ||||
-rw-r--r-- | database/engine/rrdenglocking.c | 233 | ||||
-rw-r--r-- | database/engine/rrdenglocking.h | 17 |
15 files changed, 1254 insertions, 455 deletions
diff --git a/database/engine/README.md b/database/engine/README.md index 28a2528c..adc69ffd 100644 --- a/database/engine/README.md +++ b/database/engine/README.md @@ -96,9 +96,9 @@ There are explicit memory requirements **per** DB engine **instance**, meaning * - `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes. -- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata. +- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata. - - roughly speaking this is 6% of the uncompressed disk space taken by the DB files. + - roughly speaking this is 3% of the uncompressed disk space taken by the DB files. - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space footprint. @@ -106,4 +106,40 @@ There are explicit memory requirements **per** DB engine **instance**, meaning * An important observation is that RAM usage depends on both the `page cache size` and the `dbengine disk space` options. +## File descriptor requirements + +The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming +slave or master server). When configuring your system you should make sure there are at least 50 +file descriptors available per `dbengine` instance. + +Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% +of the file descriptors that are available to the Netdata service are accessible by dbengine instances. +You should take that into account when configuring your service +or system-wide file descriptor limits. You can roughly estimate that the netdata service needs 2048 file +descriptors for every 10 streaming slave hosts when streaming is configured to use `memory mode = dbengine`. + +If for example one wants to allocate 65536 file descriptors to the netdata service on a systemd system +one needs to override the netdata service by running `sudo systemctl edit netdata` and creating a +file with contents: + +``` +[Service] +LimitNOFILE=65536 +``` + +For other types of services one can add the line: +``` +ulimit -n 65536 +``` +at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing `/etc/sysctl.conf`. For linux that would be: +``` +fs.file-max = 65536 +``` +In FreeBSD and OS X you change the lines like this: +``` +kern.maxfilesperproc=65536 +kern.maxfiles=65536 +``` +You can apply the settings by running `sysctl -p` or by rebooting. + [![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]() diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 2d17d05e..8ef4ed59 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_ datafile->ctx = ctx; } -static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); } +int close_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + + int destroy_data_file(struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.datafile_deletions; - return 0; + return ret; } int create_data_file(struct rrdengine_datafile *datafile) @@ -97,21 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile) int ret, fd; struct rrdeng_df_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, - S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif + datafile->file = file; + ++ctx->stats.datafile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -125,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_data_file(datafile); + return ret; + } - datafile->file = file; datafile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.datafile_creations; return 0; } @@ -182,25 +205,17 @@ static int load_data_file(struct rrdengine_datafile *datafile) struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); - uv_fs_req_cleanup(&req); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif info("Initializing data file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); @@ -221,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile) return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } static int scan_data_files_cmp(const void *a, const void *b) { struct rrdengine_datafile *file1, *file2; - char path1[1024], path2[1024]; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; file1 = *(struct rrdengine_datafile **)a; file2 = *(struct rrdengine_datafile **)b; @@ -238,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b) return strcmp(path1, path2); } -/* Returns number of datafiles that were loaded */ +/* Returns number of datafiles that were loaded or < 0 on error */ static int scan_data_files(struct rrdengine_instance *ctx) { int ret; @@ -249,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx) struct rrdengine_journalfile *journalfile; ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); - assert(ret >= 0); - assert(req.result >= 0); + if (ret < 0) { + assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } info("Found %d files in path %s", ret, ctx->dbfiles_path); datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - info("Scanning file \"%s\"", dent.name); + info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); if (2 == ret) { - info("Matched file \"%s\"", dent.name); + info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, no); datafiles[matched_files++] = datafile; @@ -266,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx) } uv_fs_req_cleanup(&req); + if (0 == matched_files) { + freez(datafiles); + return 0; + } if (matched_files == MAX_DATAFILES) { error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); } qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + /* TODO: change this when tiering is implemented */ + ctx->last_fileno = datafiles[matched_files - 1]->fileno; + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { datafile = datafiles[i]; ret = load_data_file(datafile); if (0 != ret) { - free(datafile); + freez(datafile); ++failed_to_load; - continue; + break; } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { - free(datafile); - free(journalfile); + close_data_file(datafile); + freez(datafile); + freez(journalfile); ++failed_to_load; - continue; + break; } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; } + freez(datafiles); if (failed_to_load) { - error("%u files failed to load.", failed_to_load); + error("%u datafiles failed to load.", failed_to_load); + finalize_data_files(ctx); + return UV_EIO; } - free(datafiles); - return matched_files - failed_to_load; + return matched_files; } /* Creates a datafile and a journalfile pair */ -void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) { struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; int ret; + char path[RRDENG_PATH_MAX]; - info("Creating new data and journal files."); + info("Creating new data and journal files in path %s", ctx->dbfiles_path); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, fileno); ret = create_data_file(datafile); - assert(!ret); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Created data file \"%s\".", path); + } else { + goto error_after_datafile; + } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = create_journal_file(journalfile, datafile); - assert(!ret); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Created journal file \"%s\".", path); + } else { + goto error_after_journalfile; + } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; + + return 0; + +error_after_journalfile: + destroy_data_file(datafile); + freez(journalfile); +error_after_datafile: + freez(datafile); + return ret; } -/* Page cache must already be initialized. */ +/* Page cache must already be initialized. + * Return 0 on success. + */ int init_data_files(struct rrdengine_instance *ctx) { int ret; ret = scan_data_files(ctx); - if (0 == ret) { - info("Data files not found, creating."); - create_new_datafile_pair(ctx, 1, 1); + if (ret < 0) { + error("Failed to scan path \"%s\".", ctx->dbfiles_path); + return ret; + } else if (0 == ret) { + info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path); + ret = create_new_datafile_pair(ctx, 1, 1); + if (ret) { + error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path); + return ret; + } + ctx->last_fileno = 1; } + return 0; +} + +void finalize_data_files(struct rrdengine_instance *ctx) +{ + struct rrdengine_datafile *datafile, *next_datafile; + struct rrdengine_journalfile *journalfile; + struct extent_info *extent, *next_extent; + + for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { + journalfile = datafile->journalfile; + next_datafile = datafile->next; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { + next_extent = extent->next; + freez(extent); + } + close_journal_file(journalfile, datafile); + close_data_file(datafile); + freez(journalfile); + freez(datafile); + + } }
\ No newline at end of file diff --git a/database/engine/datafile.h b/database/engine/datafile.h index c5c8f31f..eeb11310 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -26,7 +26,7 @@ struct extent_info { uint8_t number_of_pages; struct rrdengine_datafile *datafile; struct extent_info *next; - struct rrdeng_page_cache_descr *pages[]; + struct rrdeng_page_descr *pages[]; }; struct rrdengine_df_extents { @@ -55,9 +55,12 @@ struct rrdengine_datafile_list { extern void df_extent_insert(struct extent_info *extent); extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +extern int close_data_file(struct rrdengine_datafile *datafile); extern int destroy_data_file(struct rrdengine_datafile *datafile); extern int create_data_file(struct rrdengine_datafile *datafile); -extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); extern int init_data_files(struct rrdengine_instance *ctx); +extern void finalize_data_files(struct rrdengine_instance *ctx); #endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 44d8461d..30eaa0ec 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req) uv_fs_req_cleanup(req); free(io_descr->buf); - free(io_descr); + freez(io_descr); } /* Careful to always call this before creating a new journal file */ @@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s return ctx->commit_log.buf + buf_pos; } -static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); @@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin journalfile->datafile = datafile; } +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); - exit(ret); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.journalfile_deletions; - return 0; + return ret; } int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) @@ -143,21 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng int ret, fd; struct rrdeng_jf_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, - S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif + journalfile->file = file; + ++ctx->stats.journalfile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -170,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_journal_file(journalfile, datafile); + return ret; + } - journalfile->file = file; journalfile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.journalfile_creations; return 0; } @@ -226,7 +247,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden { struct page_cache *pg_cache = &ctx->pg_cache; unsigned i, count, payload_length, descr_size, valid_pages; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; struct extent_info *extent; /* persistent structures */ struct rrdeng_jf_store_data *jf_metric_data; @@ -271,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(temp_id); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } @@ -406,25 +429,17 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi { uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size, max_id; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); - uv_fs_req_cleanup(&req); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif info("Loading journal file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); @@ -449,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } void init_commit_log(struct rrdengine_instance *ctx) diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h index 50489aee..0df66304 100644 --- a/database/engine/journalfile.h +++ b/database/engine/journalfile.h @@ -33,9 +33,11 @@ struct transaction_commit_log { unsigned buf_size; }; +extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); +extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index c90947a6..124f2448 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -8,28 +8,29 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx); /* always inserts into tail */ static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr) + struct rrdeng_page_descr *descr) { struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; if (likely(NULL != pg_cache->replaceQ.tail)) { - descr->prev = pg_cache->replaceQ.tail; - pg_cache->replaceQ.tail->next = descr; + pg_cache_descr->prev = pg_cache->replaceQ.tail; + pg_cache->replaceQ.tail->next = pg_cache_descr; } if (unlikely(NULL == pg_cache->replaceQ.head)) { - pg_cache->replaceQ.head = descr; + pg_cache->replaceQ.head = pg_cache_descr; } - pg_cache->replaceQ.tail = descr; + pg_cache->replaceQ.tail = pg_cache_descr; } static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr) + struct rrdeng_page_descr *descr) { struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_cache_descr *prev, *next; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next; - prev = descr->prev; - next = descr->next; + prev = pg_cache_descr->prev; + next = pg_cache_descr->next; if (likely(NULL != prev)) { prev->next = next; @@ -37,17 +38,17 @@ static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ct if (likely(NULL != next)) { next->prev = prev; } - if (unlikely(descr == pg_cache->replaceQ.head)) { + if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) { pg_cache->replaceQ.head = next; } - if (unlikely(descr == pg_cache->replaceQ.tail)) { + if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) { pg_cache->replaceQ.tail = prev; } - descr->prev = descr->next = NULL; + pg_cache_descr->prev = pg_cache_descr->next = NULL; } void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr) + struct rrdeng_page_descr *descr) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -57,7 +58,7 @@ void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, } void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr) + struct rrdeng_page_descr *descr) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -66,7 +67,7 @@ void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); } void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr) + struct rrdeng_page_descr *descr) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -76,40 +77,28 @@ void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); } -struct rrdeng_page_cache_descr *pg_cache_create_descr(void) +struct rrdeng_page_descr *pg_cache_create_descr(void) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; descr = mallocz(sizeof(*descr)); - descr->page = NULL; descr->page_length = 0; descr->start_time = INVALID_TIME; descr->end_time = INVALID_TIME; descr->id = NULL; descr->extent = NULL; - descr->flags = 0; - descr->prev = descr->next = descr->private = NULL; - descr->refcnt = 0; - descr->waiters = 0; - descr->handle = NULL; - assert(0 == uv_cond_init(&descr->cond)); - assert(0 == uv_mutex_init(&descr->mutex)); + descr->pg_cache_descr_state = 0; + descr->pg_cache_descr = NULL; return descr; } -void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr) -{ - uv_cond_destroy(&descr->cond); - uv_mutex_destroy(&descr->mutex); - free(descr); -} - /* The caller must hold page descriptor lock. */ -void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr) +void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr) { - if (descr->waiters) - uv_cond_broadcast(&descr->cond); + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->waiters) + uv_cond_broadcast(&pg_cache_descr->cond); } /* @@ -117,11 +106,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr) * The lock will be released and re-acquired. The descriptor is not guaranteed * to exist after this function returns. */ -void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr) +void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) { - ++descr->waiters; - uv_cond_wait(&descr->cond, &descr->mutex); - --descr->waiters; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + ++pg_cache_descr->waiters; + uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex); + --pg_cache_descr->waiters; } /* @@ -129,14 +120,15 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr) * The lock will be released and re-acquired. The descriptor is not guaranteed * to exist after this function returns. */ -unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr) +unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) { + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; unsigned long flags; - uv_mutex_lock(&descr->mutex); + rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_wait_event_unsafe(descr); - flags = descr->flags; - uv_mutex_unlock(&descr->mutex); + flags = pg_cache_descr->flags; + rrdeng_page_descr_mutex_unlock(ctx, descr); return flags; } @@ -146,15 +138,17 @@ unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr) * Gets a reference to the page descriptor. * Returns 1 on success and 0 on failure. */ -int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access) +int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) { - if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || - (exclusive_access && descr->refcnt)) { + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || + (exclusive_access && pg_cache_descr->refcnt)) { return 0; } if (exclusive_access) - descr->flags |= RRD_PAGE_LOCKED; - ++descr->refcnt; + pg_cache_descr->flags |= RRD_PAGE_LOCKED; + ++pg_cache_descr->refcnt; return 1; } @@ -163,10 +157,12 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive * The caller must hold page descriptor lock. * Same return values as pg_cache_try_get_unsafe() without doing anything. */ -int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access) +int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) { - if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || - (exclusive_access && descr->refcnt)) { + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || + (exclusive_access && pg_cache_descr->refcnt)) { return 0; } @@ -177,23 +173,24 @@ int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive * The caller must hold the page descriptor lock. * This function may block doing cleanup. */ -void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr) +void pg_cache_put_unsafe(struct rrdeng_page_descr *descr) { - descr->flags &= ~RRD_PAGE_LOCKED; - if (0 == --descr->refcnt) { + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + if (0 == --pg_cache_descr->refcnt) { pg_cache_wake_up_waiters_unsafe(descr); } - /* TODO: perform cleanup */ } /* * This function may block doing cleanup. */ -void pg_cache_put(struct rrdeng_page_cache_descr *descr) +void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) { - uv_mutex_lock(&descr->mutex); + rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_put_unsafe(descr); - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); } /* The caller must hold the page cache lock */ @@ -224,7 +221,7 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) - debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================", + debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==", number); while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) { if (!pg_cache_try_evict_one_page_unsafe(ctx)) { @@ -266,7 +263,7 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) { debug(D_RRDENGINE, - "=================================\nPage cache full. Trying to reserve %u pages.\n=================================", + "==Page cache full. Trying to reserve %u pages.==", number); do { if (!pg_cache_try_evict_one_page_unsafe(ctx)) @@ -286,18 +283,20 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n } /* The caller must hold the page cache and the page descriptor locks in that order */ -static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr) +static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) { - free(descr->page); - descr->page = NULL; - descr->flags &= ~RRD_PAGE_POPULATED; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + freez(pg_cache_descr->page); + pg_cache_descr->page = NULL; + pg_cache_descr->flags &= ~RRD_PAGE_POPULATED; pg_cache_release_pages_unsafe(ctx, 1); ++ctx->stats.pg_cache_evictions; } /* * The caller must hold the page cache lock. - * Lock order: page cache -> replaceQ -> descriptor + * Lock order: page cache -> replaceQ -> page descriptor * This function iterates all pages and tries to evict one. * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress, * or it sets it to NULL if no write-back is in progress. @@ -308,36 +307,40 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) { struct page_cache *pg_cache = &ctx->pg_cache; unsigned long old_flags; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr = NULL; uv_rwlock_wrlock(&pg_cache->replaceQ.lock); - for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) { - uv_mutex_lock(&descr->mutex); - old_flags = descr->flags; + for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) { + descr = pg_cache_descr->descr; + + rrdeng_page_descr_mutex_lock(ctx, descr); + old_flags = pg_cache_descr->flags; if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) { /* must evict */ pg_cache_evict_unsafe(ctx, descr); pg_cache_put_unsafe(descr); - uv_mutex_unlock(&descr->mutex); pg_cache_replaceQ_delete_unsafe(ctx, descr); + + rrdeng_page_descr_mutex_unlock(ctx, descr); uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + return 1; } - uv_mutex_unlock(&descr->mutex); - }; + rrdeng_page_descr_mutex_unlock(ctx, descr); + } uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); /* failed to evict */ return 0; } -/* - * TODO: last waiter frees descriptor ? - */ -void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr) +void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty) { struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = NULL; Pvoid_t *PValue; struct pg_cache_page_index *page_index; int ret; @@ -353,8 +356,9 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach uv_rwlock_wrunlock(&page_index->lock); if (unlikely(0 == ret)) { error("Page under deletion was not in index."); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + if (unlikely(debug_flags & D_RRDENGINE)) { + print_page_descr(descr); + } goto destroy; } assert(1 == ret); @@ -364,23 +368,26 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach --pg_cache->page_descriptors; uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); - uv_mutex_lock(&descr->mutex); + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; while (!pg_cache_try_get_unsafe(descr, 1)) { debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); - pg_cache_wait_event_unsafe(descr); - } - /* even a locked page could be dirty */ - while (unlikely(descr->flags & RRD_PAGE_DIRTY)) { - debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); if (unlikely(debug_flags & D_RRDENGINE)) print_page_cache_descr(descr); pg_cache_wait_event_unsafe(descr); } - uv_mutex_unlock(&descr->mutex); + if (!remove_dirty) { + /* even a locked page could be dirty */ + while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { + debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } + } + rrdeng_page_descr_mutex_unlock(ctx, descr); - if (descr->flags & RRD_PAGE_POPULATED) { + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { /* only after locking can it be safely deleted from LRU */ pg_cache_replaceQ_delete(ctx, descr); @@ -388,13 +395,15 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach pg_cache_evict_unsafe(ctx, descr); uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); } - pg_cache_put(descr); + pg_cache_put(ctx, descr); + + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); destroy: - pg_cache_destroy_descr(descr); + freez(descr); pg_cache_update_metric_times(page_index); } -static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time) +static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time) { usec_t pg_start, pg_end; @@ -405,13 +414,13 @@ static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, u (pg_start >= start_time && pg_start <= end_time); } -static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time) +static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time) { return (point_in_time >= descr->start_time && point_in_time <= descr->end_time); } /* Update metric oldest and latest timestamps efficiently when adding new values */ -void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr) +void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) { usec_t oldest_time = page_index->oldest_time; usec_t latest_time = page_index->latest_time; @@ -429,7 +438,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) { Pvoid_t *firstPValue, *lastPValue; Word_t firstIndex, lastIndex; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; usec_t oldest_time = INVALID_TIME; usec_t latest_time = INVALID_TIME; @@ -460,16 +469,23 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) /* If index is NULL lookup by UUID (descr->id) */ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, - struct rrdeng_page_cache_descr *descr) + struct rrdeng_page_descr *descr) { struct page_cache *pg_cache = &ctx->pg_cache; Pvoid_t *PValue; struct pg_cache_page_index *page_index; + unsigned long pg_cache_descr_state = descr->pg_cache_descr_state; + + if (0 != pg_cache_descr_state) { + /* there is page cache descriptor pre-allocated state */ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - if (descr->flags & RRD_PAGE_POPULATED) { - pg_cache_reserve_pages(ctx, 1); - if (!(descr->flags & RRD_PAGE_DIRTY)) - pg_cache_replaceQ_insert(ctx, descr); + assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + pg_cache_reserve_pages(ctx, 1); + if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_insert(ctx, descr); + } } if (unlikely(NULL == index)) { @@ -503,7 +519,8 @@ struct pg_cache_page_index * pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) { struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; + struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; + struct page_cache_descr *pg_cache_descr = NULL; int i, j, k, count, found; unsigned long flags; Pvoid_t *PValue; @@ -557,12 +574,13 @@ struct pg_cache_page_index * if (unlikely(0 == descr->page_length)) continue; - uv_mutex_lock(&descr->mutex); - flags = descr->flags; + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; if (pg_cache_can_get_unsafe(descr, 0)) { if (flags & RRD_PAGE_POPULATED) { /* success */ - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); continue; } @@ -570,19 +588,19 @@ struct pg_cache_page_index * if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { preload_array[count++] = descr; if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) { - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); break; } } - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); - }; + } uv_rwlock_rdunlock(&page_index->lock); failed_to_reserve = 0; for (i = 0 ; i < count && !failed_to_reserve ; ++i) { struct rrdeng_cmd cmd; - struct rrdeng_page_cache_descr *next; + struct rrdeng_page_descr *next; descr = preload_array[i]; if (NULL == descr) { @@ -622,7 +640,7 @@ struct pg_cache_page_index * if (NULL == descr) { continue; } - pg_cache_put(descr); + pg_cache_put(ctx, descr); } } if (!count) { @@ -637,12 +655,13 @@ struct pg_cache_page_index * * When point_in_time is INVALID_TIME get any page. * If index is NULL lookup by UUID (id). */ -struct rrdeng_page_cache_descr * +struct rrdeng_page_descr * pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, usec_t point_in_time) { struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_cache_descr *descr = NULL; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; unsigned long flags; Pvoid_t *PValue; struct pg_cache_page_index *page_index; @@ -682,11 +701,12 @@ struct rrdeng_page_cache_descr * pg_cache_release_pages(ctx, 1); return NULL; } - uv_mutex_lock(&descr->mutex); - flags = descr->flags; + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { /* success */ - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); break; } @@ -702,14 +722,14 @@ struct rrdeng_page_cache_descr * debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); if(unlikely(debug_flags & D_RRDENGINE)) print_page_cache_descr(descr); - while (!(descr->flags & RRD_PAGE_POPULATED)) { + while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { pg_cache_wait_event_unsafe(descr); } /* success */ /* Downgrade exclusive reference to allow other readers */ - descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; pg_cache_wake_up_waiters_unsafe(descr); - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); return descr; } @@ -720,7 +740,7 @@ struct rrdeng_page_cache_descr * if (!(flags & RRD_PAGE_POPULATED)) page_not_in_cache = 1; pg_cache_wait_event_unsafe(descr); - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); /* reset scan to find again */ uv_rwlock_rdlock(&page_index->lock); @@ -747,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id) assert(0 == uv_rwlock_init(&page_index->lock)); page_index->oldest_time = INVALID_TIME; page_index->latest_time = INVALID_TIME; + page_index->prev = NULL; return page_index; } @@ -756,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx) struct page_cache *pg_cache = &ctx->pg_cache; pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; + pg_cache->metrics_index.last_page_index = NULL; assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); } @@ -789,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx) init_metrics_index(ctx); init_replaceQ(ctx); init_commited_page_index(ctx); +} + +void free_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Word_t ret_Judy, bytes_freed = 0; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index, *prev_page_index; + Word_t Index; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + /* Free commited page index */ + ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0); + assert(NULL == pg_cache->commited_page_index.JudyL_array); + bytes_freed += ret_Judy; + + for (page_index = pg_cache->metrics_index.last_page_index ; + page_index != NULL ; + page_index = prev_page_index) { + prev_page_index = page_index->prev; + + /* Find first page in range */ + Index = (Word_t) 0; + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + } + while (descr != NULL) { + /* Iterate all page descriptors of this metric */ + + if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + /* Check rrdenglocking.c */ + pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + freez(pg_cache_descr->page); + bytes_freed += RRDENG_BLOCK_SIZE; + } + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + bytes_freed += sizeof(*pg_cache_descr); + } + freez(descr); + bytes_freed += sizeof(*descr); + + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } + + /* Free page index */ + ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0); + assert(NULL == page_index->JudyL_array); + bytes_freed += ret_Judy; + freez(page_index); + bytes_freed += sizeof(*page_index); + } + /* Free metrics index */ + ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); + assert(NULL == pg_cache->metrics_index.JudyHS_array); + bytes_freed += ret_Judy; + + info("Freed %lu bytes of memory from page cache.", bytes_freed); }
\ No newline at end of file diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index d1e29aaa..b5670f82 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -5,9 +5,10 @@ #include "rrdengine.h" -/* Forward declerations */ +/* Forward declarations */ struct rrdengine_instance; struct extent_info; +struct rrdeng_page_descr; #define INVALID_TIME (0) @@ -18,24 +19,46 @@ struct extent_info; #define RRD_PAGE_WRITE_PENDING (1LU << 3) #define RRD_PAGE_POPULATED (1LU << 4) -struct rrdeng_page_cache_descr { +struct page_cache_descr { + struct rrdeng_page_descr *descr; /* parent descriptor */ void *page; - uint32_t page_length; - usec_t start_time; - usec_t end_time; - uuid_t *id; /* never changes */ - struct extent_info *extent; unsigned long flags; - void *private; - struct rrdeng_page_cache_descr *prev; - struct rrdeng_page_cache_descr *next; + struct page_cache_descr *prev; /* LRU */ + struct page_cache_descr *next; /* LRU */ - /* TODO: move waiter logic to concurrency table */ unsigned refcnt; uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */ uv_cond_t cond; unsigned waiters; - struct rrdeng_collect_handle *handle; /* API user */ +}; + +/* Page cache descriptor flags, state = 0 means no descriptor */ +#define PG_CACHE_DESCR_ALLOCATED (1LU << 0) +#define PG_CACHE_DESCR_DESTROY (1LU << 1) +#define PG_CACHE_DESCR_LOCKED (1LU << 2) +#define PG_CACHE_DESCR_SHIFT (3) +#define PG_CACHE_DESCR_USERS_MASK (((unsigned long)-1) << PG_CACHE_DESCR_SHIFT) +#define PG_CACHE_DESCR_FLAGS_MASK (((unsigned long)-1) >> (BITS_PER_ULONG - PG_CACHE_DESCR_SHIFT)) + +/* + * Page cache descriptor state bits (works for both 32-bit and 64-bit architectures): + * + * 63 ... 31 ... 3 | 2 | 1 | 0| + * -----------------------------+------------+------------+-----------| + * number of descriptor users | DESTROY | LOCKED | ALLOCATED | + */ +struct rrdeng_page_descr { + uint32_t page_length; + usec_t start_time; + usec_t end_time; + uuid_t *id; /* never changes */ + struct extent_info *extent; + + /* points to ephemeral page cache descriptor if the page resides in the cache */ + struct page_cache_descr *pg_cache_descr; + + /* Compare-And-Swap target for page cache descriptor allocation algorithm */ + volatile unsigned long pg_cache_descr_state; }; #define PAGE_CACHE_MAX_PRELOAD_PAGES (256) @@ -61,12 +84,15 @@ struct pg_cache_page_index { * It's also written by the data deletion workqueue when data collection is disabled for this metric. */ usec_t latest_time; + + struct pg_cache_page_index *prev; }; /* maps UUIDs to page indices */ struct pg_cache_metrics_index { uv_rwlock_t lock; Pvoid_t JudyHS_array; + struct pg_cache_page_index *last_page_index; }; /* gathers dirty pages to be written on disk */ @@ -85,12 +111,15 @@ struct pg_cache_commited_page_index { unsigned nr_commited_pages; }; -/* gathers populated pages to be evicted */ +/* + * Gathers populated pages to be evicted. + * Relies on page cache descriptors being there as it uses their memory. + */ struct pg_cache_replaceQ { uv_rwlock_t lock; /* LRU lock */ - struct rrdeng_page_cache_descr *head; /* LRU */ - struct rrdeng_page_cache_descr *tail; /* MRU */ + struct page_cache_descr *head; /* LRU */ + struct page_cache_descr *tail; /* MRU */ }; struct page_cache { /* TODO: add statistics */ @@ -104,29 +133,31 @@ struct page_cache { /* TODO: add statistics */ unsigned populated_pages; }; -extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr); -extern void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr); -extern unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr); +extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr); +extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr); +extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr); + struct rrdeng_page_descr *descr); extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr); + struct rrdeng_page_descr *descr); extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, - struct rrdeng_page_cache_descr *descr); -extern struct rrdeng_page_cache_descr *pg_cache_create_descr(void); -extern void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr); -extern void pg_cache_put(struct rrdeng_page_cache_descr *descr); + struct rrdeng_page_descr *descr); +extern struct rrdeng_page_descr *pg_cache_create_descr(void); +extern int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access); +extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr); +extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, - struct rrdeng_page_cache_descr *descr); -extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr); + struct rrdeng_page_descr *descr); +extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty); extern struct pg_cache_page_index * pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time); -extern struct rrdeng_page_cache_descr * +extern struct rrdeng_page_descr * pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, usec_t point_in_time); extern struct pg_cache_page_index *create_page_index(uuid_t *id); extern void init_page_cache(struct rrdengine_instance *ctx); -extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr); +extern void free_page_cache(struct rrdengine_instance *ctx); +extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr); extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); -#endif /* NETDATA_PAGECACHE_H */
\ No newline at end of file +#endif /* NETDATA_PAGECACHE_H */ diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index b8e4eba0..0f2dceaa 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -3,6 +3,10 @@ #include "rrdengine.h" +rrdeng_stats_t global_io_errors = 0; +rrdeng_stats_t global_fs_errors = 0; +rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; + void sanity_check(void) { /* Magic numbers must fit in the super-blocks */ @@ -27,12 +31,12 @@ void read_extent_cb(uv_fs_t* req) struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; struct extent_io_descriptor *xt_io_descr; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; int ret; unsigned i, j, count; void *page, *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; - struct rrdengine_datafile *datafile; /* persistent structures */ struct rrdeng_df_extent_header *header; struct rrdeng_df_extent_trailer *trailer; @@ -54,9 +58,13 @@ void read_extent_cb(uv_fs_t* req) crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); ret = crc32cmp(trailer->checksum, crc); - datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, - xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + } +#endif if (unlikely(ret)) { /* TODO: handle errors */ exit(UV_EIO); @@ -97,36 +105,38 @@ void read_extent_cb(uv_fs_t* req) (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); } pg_cache_replaceQ_insert(ctx, descr); - uv_mutex_lock(&descr->mutex); - descr->page = page; - descr->flags |= RRD_PAGE_POPULATED; - descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); if (xt_io_descr->release_descr) { pg_cache_put_unsafe(descr); } else { pg_cache_wake_up_waiters_unsafe(descr); } - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); } if (RRD_NO_COMPRESSION != header->compression_algorithm) { - free(uncompressed_buf); + freez(uncompressed_buf); } if (xt_io_descr->completion) complete(xt_io_descr->completion); cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); - free(xt_io_descr); + freez(xt_io_descr); } static void do_read_extent(struct rrdengine_worker_config* wc, - struct rrdeng_page_cache_descr **descr, + struct rrdeng_page_descr **descr, unsigned count, uint8_t release_descr) { struct rrdengine_instance *ctx = wc->ctx; + struct page_cache_descr *pg_cache_descr; int ret; unsigned i, size_bytes, pos, real_io_size; // uint32_t payload_length; @@ -141,14 +151,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc, ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); - /* free(xt_io_descr); + /* freez(xt_io_descr); return;*/ } for (i = 0 ; i < count; ++i) { - uv_mutex_lock(&descr[i]->mutex); - descr[i]->flags |= RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_lock(ctx, descr[i]); + pg_cache_descr = descr[i]->pg_cache_descr; + pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; // payload_length = descr[i]->page_length; - uv_mutex_unlock(&descr[i]->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr[i]); xt_io_descr->descr_array[i] = descr[i]; } @@ -227,8 +238,8 @@ void flush_pages_cb(uv_fs_t* req) struct rrdengine_instance *ctx = wc->ctx; struct page_cache *pg_cache = &ctx->pg_cache; struct extent_io_descriptor *xt_io_descr; - struct rrdeng_page_cache_descr *descr; - struct rrdengine_datafile *datafile; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; int ret; unsigned i, count; Word_t commit_id; @@ -238,10 +249,13 @@ void flush_pages_cb(uv_fs_t* req) error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); goto cleanup; } - datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", - __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); - +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", + __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + } +#endif count = xt_io_descr->descr_count; for (i = 0 ; i < count ; ++i) { /* care, we don't hold the descriptor mutex */ @@ -256,18 +270,19 @@ void flush_pages_cb(uv_fs_t* req) pg_cache_replaceQ_insert(ctx, descr); - uv_mutex_lock(&descr->mutex); - descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); /* wake up waiters, care no reference being held */ pg_cache_wake_up_waiters_unsafe(descr); - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); } if (xt_io_descr->completion) complete(xt_io_descr->completion); cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); - free(xt_io_descr); + freez(xt_io_descr); } /* @@ -283,7 +298,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; uint32_t uncompressed_payload_length, payload_offset; - struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct page_cache_descr *pg_cache_descr; struct extent_io_descriptor *xt_io_descr; void *compressed_buf = NULL; Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; @@ -311,15 +327,16 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct descr = unlikely(NULL == PValue) ? NULL : *PValue) { assert(0 != descr->page_length); - uv_mutex_lock(&descr->mutex); - if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) { + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) { /* care, no reference being held */ - descr->flags |= RRD_PAGE_WRITE_PENDING; + pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING; uncompressed_payload_length += descr->page_length; descr_commit_idx_array[count] = Index; eligible_pages[count++] = descr; } - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); } uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock); @@ -345,9 +362,9 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); - /* free(xt_io_descr);*/ + /* freez(xt_io_descr);*/ } - (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count); + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); xt_io_descr->descr_count = count; pos = 0; @@ -378,7 +395,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; /* care, we don't hold the descriptor mutex */ - (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length); + (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length); descr->extent = extent; extent->pages[i] = descr; @@ -397,7 +414,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct ctx->stats.after_compress_bytes += compressed_size; debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - free(compressed_buf); + freez(compressed_buf); size_bytes = payload_offset + compressed_size + sizeof(*trailer); header->payload_length = compressed_size; break; @@ -435,23 +452,36 @@ static void after_delete_old_data(uv_work_t *req, int status) struct rrdengine_worker_config* wc = &ctx->worker_config; struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; - unsigned bytes; + unsigned deleted_bytes, journalfile_bytes, datafile_bytes; + int ret; + char path[RRDENG_PATH_MAX]; (void)status; datafile = ctx->datafiles.first; journalfile = datafile->journalfile; - bytes = datafile->pos + journalfile->pos; + datafile_bytes = datafile->pos; + journalfile_bytes = journalfile->pos; + deleted_bytes = 0; + info("Deleting data and journal file pair."); datafile_list_delete(ctx, datafile); - destroy_journal_file(journalfile, datafile); - destroy_data_file(datafile); - info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", - datafile->tier, datafile->fileno); - free(journalfile); - free(datafile); + ret = destroy_journal_file(journalfile, datafile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + deleted_bytes += journalfile_bytes; + } + ret = destroy_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + deleted_bytes += datafile_bytes; + } + freez(journalfile); + freez(datafile); - ctx->disk_space -= bytes; - info("Reclaimed %u bytes of disk space.", bytes); + ctx->disk_space -= deleted_bytes; + info("Reclaimed %u bytes of disk space.", deleted_bytes); /* unfreeze command processing */ wc->now_deleting.data = NULL; @@ -464,7 +494,7 @@ static void delete_old_data(uv_work_t *req) struct rrdengine_instance *ctx = req->data; struct rrdengine_datafile *datafile; struct extent_info *extent, *next; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; unsigned count, i; /* Safe to use since it will be deleted after we are done */ @@ -474,10 +504,10 @@ static void delete_old_data(uv_work_t *req) count = extent->number_of_pages; for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; - pg_cache_punch_hole(ctx, descr); + pg_cache_punch_hole(ctx, descr, 0); } next = extent->next; - free(extent); + freez(extent); } } @@ -487,6 +517,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) struct rrdengine_datafile *datafile; unsigned current_size, target_size; uint8_t out_of_space, only_one_datafile; + int ret; out_of_space = 0; if (unlikely(ctx->disk_space > ctx->max_disk_space)) { @@ -501,7 +532,10 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { /* Finalize data and journal file and create a new pair */ wal_flush_transaction_buffer(wc); - create_new_datafile_pair(ctx, 1, datafile->fileno + 1); + ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1); + if (likely(!ret)) { + ++ctx->last_fileno; + } } if (unlikely(out_of_space)) { /* delete old data */ @@ -509,18 +543,30 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) /* already deleting data */ return; } - info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", - ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + if (NULL == ctx->datafiles.first->next) { + error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\"" + " to reclaim space, there are no other file pairs left.", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + return; + } + info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); wc->now_deleting.data = ctx; - uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data); + assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data)); } } +/* return 0 on success */ int init_rrd_files(struct rrdengine_instance *ctx) { return init_data_files(ctx); } +void finalize_rrd_files(struct rrdengine_instance *ctx) +{ + return finalize_data_files(ctx); +} + void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) { wc->cmd_queue.head = wc->cmd_queue.tail = 0; @@ -588,7 +634,6 @@ void async_cb(uv_async_t *handle) void timer_cb(uv_timer_t* handle) { struct rrdengine_worker_config* wc = handle->data; - struct rrdengine_instance *ctx = wc->ctx; uv_stop(handle->loop); uv_update_time(handle->loop); @@ -608,7 +653,7 @@ void timer_cb(uv_timer_t* handle) #ifdef NETDATA_INTERNAL_CHECKS { char buf[4096]; - debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf))); + debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); } #endif } @@ -623,7 +668,7 @@ void rrdeng_worker(void* arg) struct rrdengine_worker_config* wc = arg; struct rrdengine_instance *ctx = wc->ctx; uv_loop_t* loop; - int shutdown; + int shutdown, ret; enum rrdeng_opcode opcode; uv_timer_t timer_req; struct rrdeng_cmd cmd; @@ -631,22 +676,35 @@ void rrdeng_worker(void* arg) rrdeng_init_cmd_queue(wc); loop = wc->loop = mallocz(sizeof(uv_loop_t)); - uv_loop_init(loop); + ret = uv_loop_init(loop); + if (ret) { + error("uv_loop_init(): %s", uv_strerror(ret)); + goto error_after_loop_init; + } loop->data = wc; - uv_async_init(wc->loop, &wc->async, async_cb); + ret = uv_async_init(wc->loop, &wc->async, async_cb); + if (ret) { + error("uv_async_init(): %s", uv_strerror(ret)); + goto error_after_async_init; + } wc->async.data = wc; wc->now_deleting.data = NULL; /* dirty page flushing timer */ - uv_timer_init(loop, &timer_req); + ret = uv_timer_init(loop, &timer_req); + if (ret) { + error("uv_timer_init(): %s", uv_strerror(ret)); + goto error_after_timer_init; + } timer_req.data = wc; + wc->error = 0; /* wake up initialization thread */ complete(&ctx->rrdengine_completion); - uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS); + assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; while (shutdown == 0 || uv_loop_alive(loop)) { uv_run(loop, UV_RUN_DEFAULT); @@ -661,12 +719,6 @@ void rrdeng_worker(void* arg) break; case RRDENG_SHUTDOWN: shutdown = 1; - if (unlikely(wc->now_deleting.data)) { - /* postpone shutdown until after deletion */ - info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); - rrdeng_enq_cmd(wc, &cmd); - break; - } /* * uv_async_send after uv_close does not seem to crash in linux at the moment, * it is however undocumented behaviour and we need to be aware if this becomes @@ -675,10 +727,6 @@ void rrdeng_worker(void* arg) uv_close((uv_handle_t *)&wc->async, NULL); assert(0 == uv_timer_stop(&timer_req)); uv_close((uv_handle_t *)&timer_req, NULL); - info("Shutting down RRD engine event loop."); - while (do_flush_pages(wc, 1, NULL)) { - ; /* Force flushing of all commited pages. */ - } break; case RRDENG_READ_PAGE: do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); @@ -690,14 +738,14 @@ void rrdeng_worker(void* arg) do_commit_transaction(wc, STORE_DATA, NULL); break; case RRDENG_FLUSH_PAGES: { - unsigned total_bytes, bytes_written; + unsigned bytes_written; /* First I/O should be enough to call completion */ bytes_written = do_flush_pages(wc, 1, cmd.completion); - for (total_bytes = bytes_written ; - bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; - total_bytes += bytes_written) { - bytes_written = do_flush_pages(wc, 1, NULL); + if (bytes_written) { + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all commited pages. */ + } } break; } @@ -708,6 +756,13 @@ void rrdeng_worker(void* arg) } while (opcode != RRDENG_NOOP); } /* cleanup operations of the event loop */ + if (unlikely(wc->now_deleting.data)) { + info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); + } + info("Shutting down RRD engine event loop."); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all commited pages. */ + } wal_flush_transaction_buffer(wc); uv_run(loop, UV_RUN_DEFAULT); @@ -716,7 +771,20 @@ void rrdeng_worker(void* arg) uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ assert(0 == uv_loop_close(loop)); - free(loop); + freez(loop); + + return; + +error_after_timer_init: + uv_close((uv_handle_t *)&wc->async, NULL); +error_after_async_init: + assert(0 == uv_loop_close(loop)); +error_after_loop_init: + freez(loop); + + wc->error = UV_EAGAIN; + /* wake up initialization thread */ + complete(&ctx->rrdengine_completion); } @@ -726,19 +794,19 @@ static void basic_functional_test(struct rrdengine_instance *ctx) int i, j, failed_validations; uuid_t uuid[NR_PAGES]; void *buf; - struct rrdeng_page_cache_descr *handle[NR_PAGES]; - char uuid_str[37]; - char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */ + struct rrdeng_page_descr *handle[NR_PAGES]; + char uuid_str[UUID_STR_LEN]; + char backup[NR_PAGES][UUID_STR_LEN * 100]; /* backup storage for page data verification */ for (i = 0 ; i < NR_PAGES ; ++i) { uuid_generate(uuid[i]); uuid_unparse_lower(uuid[i], uuid_str); // fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str); - buf = rrdeng_create_page(&uuid[i], &handle[i]); + buf = rrdeng_create_page(ctx, &uuid[i], &handle[i]); /* Each page contains 10 times its own UUID stringified */ for (j = 0 ; j < 100 ; ++j) { - strcpy(buf + 37 * j, uuid_str); - strcpy(backup[i] + 37 * j, uuid_str); + strcpy(buf + UUID_STR_LEN * j, uuid_str); + strcpy(backup[i] + UUID_STR_LEN * j, uuid_str); } rrdeng_commit_page(ctx, handle[i], (Word_t)i); } @@ -750,7 +818,7 @@ static void basic_functional_test(struct rrdengine_instance *ctx) ++failed_validations; fprintf(stderr, "Page %d was LOST.\n", i); } - if (memcmp(backup[i], buf, 37 * 100)) { + if (memcmp(backup[i], buf, UUID_STR_LEN * 100)) { ++failed_validations; fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i); } diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 141bb9c6..6f6a6f8f 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -22,6 +22,7 @@ #include "journalfile.h" #include "rrdengineapi.h" #include "pagecache.h" +#include "rrdenglocking.h" #ifdef NETDATA_RRD_INTERNALS @@ -59,10 +60,10 @@ struct rrdeng_cmd { enum rrdeng_opcode opcode; union { struct rrdeng_read_page { - struct rrdeng_page_cache_descr *page_cache_descr; + struct rrdeng_page_descr *page_cache_descr; } read_page; struct rrdeng_read_extent { - struct rrdeng_page_cache_descr *page_cache_descr[MAX_PAGES_PER_EXTENT]; + struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT]; int page_count; } read_extent; struct completion *completion; @@ -85,7 +86,7 @@ struct extent_io_descriptor { struct completion *completion; unsigned descr_count; int release_descr; - struct rrdeng_page_cache_descr *descr_array[MAX_PAGES_PER_EXTENT]; + struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT]; Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; }; @@ -111,6 +112,8 @@ struct rrdengine_worker_config { uv_cond_t cmd_cond; volatile unsigned queue_size; struct rrdeng_cmdqueue cmd_queue; + + int error; }; /* @@ -142,10 +145,19 @@ struct rrdengine_statistics { rrdeng_stats_t datafile_deletions; rrdeng_stats_t journalfile_creations; rrdeng_stats_t journalfile_deletions; + rrdeng_stats_t page_cache_descriptors; + rrdeng_stats_t io_errors; + rrdeng_stats_t fs_errors; }; +/* I/O errors global counter */ +extern rrdeng_stats_t global_io_errors; +/* File-System errors global counter */ +extern rrdeng_stats_t global_fs_errors; +/* number of File-Descriptors that have been reserved by dbengine */ +extern rrdeng_stats_t rrdeng_reserved_file_descriptors; + struct rrdengine_instance { - rrdengine_state_t rrdengine_state; struct rrdengine_worker_config worker_config; struct completion rrdengine_completion; struct page_cache pg_cache; @@ -155,6 +167,7 @@ struct rrdengine_instance { char dbfiles_path[FILENAME_MAX+1]; uint64_t disk_space; uint64_t max_disk_space; + unsigned last_fileno; /* newest index of datafile and journalfile */ unsigned long max_cache_pages; unsigned long cache_pages_low_watermark; @@ -163,6 +176,7 @@ struct rrdengine_instance { extern void sanity_check(void); extern int init_rrd_files(struct rrdengine_instance *ctx); +extern void finalize_rrd_files(struct rrdengine_instance *ctx); extern void rrdeng_test_quota(struct rrdengine_worker_config* wc); extern void rrdeng_worker(void* arg); extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd); diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index a4e71155..a87ce6d6 100644 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -41,6 +41,7 @@ void rrdeng_store_metric_init(RRDDIM *rd) handle->descr = NULL; handle->prev_descr = NULL; + handle->unaligned_page = 0; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t)); @@ -54,59 +55,140 @@ void rrdeng_store_metric_init(RRDDIM *rd) PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0); assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(&temp_id); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } rd->state->rrdeng_uuid = &page_index->id; handle->page_index = page_index; } +/* The page must be populated and referenced */ +static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) +{ + unsigned i; + uint8_t has_only_empty_metrics = 1; + storage_number *page; + + page = descr->pg_cache_descr->page; + for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) { + if (SN_EMPTY_SLOT != page[i]) { + has_only_empty_metrics = 0; + break; + } + } + return has_only_empty_metrics; +} + +void rrdeng_store_metric_flush_current_page(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_descr *descr; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + descr = handle->descr; + if (unlikely(NULL == descr)) { + return; + } + if (likely(descr->page_length)) { + int ret, page_is_empty; + +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); +#endif + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); + } + page_is_empty = page_has_only_empty_metrics(descr); + if (page_is_empty) { + debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_put(ctx, descr); + pg_cache_punch_hole(ctx, descr, 1); + handle->prev_descr = NULL; + } else { + /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ + rrdeng_page_descr_mutex_lock(ctx, descr); + ret = pg_cache_try_get_unsafe(descr, 0); + rrdeng_page_descr_mutex_unlock(ctx, descr); + assert (1 == ret); + + rrdeng_commit_page(ctx, descr, handle->page_correlation_id); + handle->prev_descr = descr; + } + } else { + freez(descr->pg_cache_descr->page); + rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); + freez(descr); + } + handle->descr = NULL; +} + void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; struct page_cache *pg_cache; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; storage_number *page; + uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; pg_cache = &ctx->pg_cache; descr = handle->descr; - if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) { - if (descr) { - descr->handle = NULL; - if (descr->page_length) { -#ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); -#endif - /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ - ++descr->refcnt; - rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(handle->prev_descr); - } - handle->prev_descr = descr; - } else { - free(descr->page); - free(descr); - handle->descr = NULL; - } + + if (descr) { + /* Make alignment decisions */ + + if (descr->page_length == rd->rrdset->rrddim_page_alignment) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + /* is the metric far enough out of alignment with the others? */ + if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) { + handle->unaligned_page = 1; + debug(D_RRDENGINE, "Metric page is not aligned with chart:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); } - page = rrdeng_create_page(&handle->page_index->id, &descr); + if (unlikely(handle->unaligned_page && + /* did the other metrics change page? */ + rd->rrdset->rrddim_page_alignment <= sizeof(number))) { + debug(D_RRDENGINE, "Flushing unaligned metric page."); + must_flush_unaligned_page = 1; + handle->unaligned_page = 0; + } + } + if (unlikely(NULL == descr || + descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE || + must_flush_unaligned_page)) { + rrdeng_store_metric_flush_current_page(rd); + + page = rrdeng_create_page(ctx, &handle->page_index->id, &descr); assert(page); - handle->prev_descr = handle->descr; + handle->descr = descr; - descr->handle = handle; + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); - } - page = descr->page; + if (0 == rd->rrdset->rrddim_page_alignment) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + } + page = descr->pg_cache_descr->page; page[descr->page_length / sizeof(number)] = number; descr->end_time = point_in_time; descr->page_length += sizeof(number); + if (perfect_page_alignment) + rd->rrdset->rrddim_page_alignment = descr->page_length; if (unlikely(INVALID_TIME == descr->start_time)) { descr->start_time = point_in_time; @@ -126,26 +208,13 @@ void rrdeng_store_metric_finalize(RRDDIM *rd) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; - descr = handle->descr; - if (descr) { - descr->handle = NULL; - if (descr->page_length) { -#ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); -#endif - rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(handle->prev_descr); - } - } else { - free(descr->page); - free(descr); - } + rrdeng_store_metric_flush_current_page(rd); + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); } } @@ -174,7 +243,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; storage_number *page, ret; unsigned position; usec_t point_in_time; @@ -198,7 +267,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(descr); + pg_cache_put(ctx, descr); handle->descr = NULL; } descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time); @@ -216,7 +285,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle ret = SN_EMPTY_SLOT; goto out; } - page = descr->page; + page = descr->pg_cache_descr->page; if (unlikely(descr->start_time == descr->end_time)) { ret = page[0]; goto out; @@ -248,7 +317,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; handle = &rrdimm_handle->rrdeng; ctx = handle->ctx; @@ -257,7 +326,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(descr); + pg_cache_put(ctx, descr); } } @@ -283,30 +352,32 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd) } /* Also gets a reference for the page */ -void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr) +void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; void *page; - int ret; - /* TODO: check maximum number of pages in page cache limit */ - page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ descr = pg_cache_create_descr(); - descr->page = page; descr->id = id; /* TODO: add page type: metric, log, something? */ - descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; - descr->refcnt = 1; - - debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------"); - if(unlikely(debug_flags & D_RRDENGINE)) + page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; + pg_cache_descr->refcnt = 1; + + debug(D_RRDENGINE, "Created new page:"); + if (unlikely(debug_flags & D_RRDENGINE)) print_page_cache_descr(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); *ret_descr = descr; return page; } /* The page must not be empty */ -void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr, +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, Word_t page_correlation_id) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -324,15 +395,16 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache ++pg_cache->commited_page_index.nr_commited_pages; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); - pg_cache_put(descr); + pg_cache_put(ctx, descr); } /* Gets a reference for the page */ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; - debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); + debug(D_RRDENGINE, "Reading existing page:"); descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME); if (NULL == descr) { *handle = NULL; @@ -340,16 +412,18 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void ** return NULL; } *handle = descr; + pg_cache_descr = descr->pg_cache_descr; - return descr->page; + return pg_cache_descr->page; } /* Gets a reference for the page */ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; - debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); + debug(D_RRDENGINE, "Reading existing page:"); descr = pg_cache_lookup(ctx, NULL, id, point_in_time); if (NULL == descr) { *handle = NULL; @@ -357,11 +431,18 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i return NULL; } *handle = descr; + pg_cache_descr = descr->pg_cache_descr; - return descr->page; + return pg_cache_descr->page; } -void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +/* + * Gathers Database Engine statistics. + * Careful when modifying this function. + * You must not change the indices of the statistics or user code will break. + * You must not exceed RRDENG_NR_STATS or it will crash. + */ +void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -392,24 +473,46 @@ void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long array[24] = (uint64_t)ctx->stats.datafile_deletions; array[25] = (uint64_t)ctx->stats.journalfile_creations; array[26] = (uint64_t)ctx->stats.journalfile_deletions; + array[27] = (uint64_t)ctx->stats.page_cache_descriptors; + array[28] = (uint64_t)ctx->stats.io_errors; + array[29] = (uint64_t)ctx->stats.fs_errors; + array[30] = (uint64_t)global_io_errors; + array[31] = (uint64_t)global_fs_errors; + array[32] = (uint64_t)rrdeng_reserved_file_descriptors; + assert(RRDENG_NR_STATS == 33); } /* Releases reference to page */ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) { (void)ctx; - pg_cache_put((struct rrdeng_page_cache_descr *)handle); + pg_cache_put(ctx, (struct rrdeng_page_descr *)handle); } /* - * Returns 0 on success, 1 on error + * Returns 0 on success, negative on error */ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb) { struct rrdengine_instance *ctx; int error; + uint32_t max_open_files; sanity_check(); + + max_open_files = rlimit_nofile.rlim_cur / 4; + + /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */ + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE); + if (rrdeng_reserved_file_descriptors > max_open_files) { + error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", + (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); + + rrd_stat_atomic_add(&global_fs_errors, 1); + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EMFILE; + } + if (NULL == ctxp) { /* for testing */ ctx = &default_global_ctx; @@ -417,10 +520,6 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p } else { *ctxp = ctx = callocz(1, sizeof(*ctx)); } - if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) { - return 1; - } - ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING; ctx->global_compress_alg = RRD_LZ4; if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; @@ -439,11 +538,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p init_commit_log(ctx); error = init_rrd_files(ctx); if (error) { - ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED; - if (ctx != &default_global_ctx) { - freez(ctx); - } - return 1; + goto error_after_init_rrd_files; } init_completion(&ctx->rrdengine_completion); @@ -451,9 +546,21 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p /* wait for worker thread to initialize */ wait_for_completion(&ctx->rrdengine_completion); destroy_completion(&ctx->rrdengine_completion); - - ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED; + if (ctx->worker_config.error) { + goto error_after_rrdeng_worker; + } return 0; + +error_after_rrdeng_worker: + finalize_rrd_files(ctx); +error_after_init_rrd_files: + free_page_cache(ctx); + if (ctx != &default_global_ctx) { + freez(ctx); + *ctxp = NULL; + } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EIO; } /* @@ -464,10 +571,6 @@ int rrdeng_exit(struct rrdengine_instance *ctx) struct rrdeng_cmd cmd; if (NULL == ctx) { - /* TODO: move to per host basis */ - ctx = &default_global_ctx; - } - if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) { return 1; } @@ -477,8 +580,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx) assert(0 == uv_thread_join(&ctx->worker_config.thread)); + finalize_rrd_files(ctx); + free_page_cache(ctx); + if (ctx != &default_global_ctx) { freez(ctx); } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return 0; }
\ No newline at end of file diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index e76629a4..e52aabcb 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -7,16 +7,22 @@ #define RRDENG_MIN_PAGE_CACHE_SIZE_MB (32) #define RRDENG_MIN_DISK_SPACE_MB (256) + +#define RRDENG_NR_STATS (33) + +#define RRDENG_FD_BUDGET_PER_INSTANCE (50) + extern int default_rrdeng_page_cache_mb; extern int default_rrdeng_disk_quota_mb; -extern void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr); -extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr, +extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr); +extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, Word_t page_correlation_id); extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle); extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle); extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); extern void rrdeng_store_metric_init(RRDDIM *rd); +extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd); extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); extern void rrdeng_store_metric_finalize(RRDDIM *rd); extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, @@ -26,7 +32,7 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); extern time_t rrdeng_metric_latest_time(RRDDIM *rd); extern time_t rrdeng_metric_oldest_time(RRDDIM *rd); -extern void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array); +extern void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array); /* must call once before using anything */ extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c index 25f57ba1..96504b27 100644 --- a/database/engine/rrdenginelib.c +++ b/database/engine/rrdenginelib.c @@ -1,25 +1,52 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr) +#define BUFSIZE (512) + +/* Caller must hold descriptor lock */ +void print_page_cache_descr(struct rrdeng_page_descr *descr) { - char uuid_str[37]; - char str[512]; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + char uuid_str[UUID_STR_LEN]; + char str[BUFSIZE]; int pos = 0; - uuid_unparse_lower(*page_cache_descr->id, uuid_str); - pos += snprintfz(str, 512 - pos, "page(%p) id=%s\n" + uuid_unparse_lower(*descr->id, uuid_str); + pos += snprintfz(str, BUFSIZE - pos, "page(%p) id=%s\n" "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", - page_cache_descr->page, uuid_str, - page_cache_descr->page_length, - (uint64_t)page_cache_descr->start_time, - (uint64_t)page_cache_descr->end_time); - if (!page_cache_descr->extent) { - pos += snprintfz(str + pos, 512 - pos, "N/A"); + pg_cache_descr->page, uuid_str, + descr->page_length, + (uint64_t)descr->start_time, + (uint64_t)descr->end_time); + if (!descr->extent) { + pos += snprintfz(str + pos, BUFSIZE - pos, "N/A"); + } else { + pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset); + } + + snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt); + debug(D_RRDENGINE, "%s", str); +} + +void print_page_descr(struct rrdeng_page_descr *descr) +{ + char uuid_str[UUID_STR_LEN]; + char str[BUFSIZE]; + int pos = 0; + + uuid_unparse_lower(*descr->id, uuid_str); + pos += snprintfz(str, BUFSIZE - pos, "id=%s\n" + "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", + uuid_str, + descr->page_length, + (uint64_t)descr->start_time, + (uint64_t)descr->end_time); + if (!descr->extent) { + pos += snprintfz(str + pos, BUFSIZE - pos, "N/A"); } else { - pos += snprintfz(str + pos, 512 - pos, "%"PRIu64, page_cache_descr->extent->offset); + pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset); } - snprintfz(str + pos, 512 - pos, " flags:0x%2.2lX refcnt:%u\n\n", page_cache_descr->flags, page_cache_descr->refcnt); + snprintfz(str + pos, BUFSIZE - pos, "\n\n"); fputs(str, stderr); } @@ -51,6 +78,48 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size) return 0; } +/* + * Tries to open a file in direct I/O mode, falls back to buffered mode if not possible. + * Returns UV error number that is < 0 on failure. + * On success sets (*file) to be the uv_file that was opened. + */ +int open_file_direct_io(char *path, int flags, uv_file *file) +{ + uv_fs_t req; + int fd, current_flags, direct; + + for (direct = 1 ; direct >= 0 ; --direct) { +#ifdef __APPLE__ + /* Apple OS does not support O_DIRECT */ + direct = 0; +#endif + current_flags = flags; + if (direct) { + current_flags |= O_DIRECT; + } + fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + if ((direct) && (UV_EINVAL == fd)) { + error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path); + } else { + error("Failed to open file \"%s\".", path); + --direct; /* break the loop */ + } + } else { + assert(req.result >= 0); + *file = req.result; +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + --direct; /* break the loop */ + } + uv_fs_req_cleanup(&req); + } + + return fd; +} + char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size) { struct page_cache *pg_cache; @@ -60,6 +129,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si "metric_API_producers: %ld\n" "metric_API_consumers: %ld\n" "page_cache_total_pages: %ld\n" + "page_cache_descriptors: %ld\n" "page_cache_populated_pages: %ld\n" "page_cache_commited_pages: %ld\n" "page_cache_insertions: %ld\n" @@ -87,6 +157,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si (long)ctx->stats.metric_API_producers, (long)ctx->stats.metric_API_consumers, (long)pg_cache->page_descriptors, + (long)ctx->stats.page_cache_descriptors, (long)pg_cache->populated_pages, (long)pg_cache->commited_page_index.nr_commited_pages, (long)ctx->stats.pg_cache_insertions, diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h index bb6f072b..36d414e8 100644 --- a/database/engine/rrdenginelib.h +++ b/database/engine/rrdenginelib.h @@ -6,11 +6,17 @@ #include "rrdengine.h" /* Forward declarations */ -struct rrdeng_page_cache_descr; +struct rrdeng_page_descr; #define STR_HELPER(x) #x #define STR(x) STR_HELPER(x) +#define BITS_PER_ULONG (sizeof(unsigned long) * 8) + +#ifndef UUID_STR_LEN +#define UUID_STR_LEN (37) +#endif + /* Taken from linux kernel */ #define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)])) @@ -25,6 +31,15 @@ typedef uintptr_t rrdeng_stats_t; #define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0) #endif +#define RRDENG_PATH_MAX (4096) + +/* returns old *ptr value */ +static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr, + unsigned long oldval, unsigned long newval) +{ + return __sync_val_compare_and_swap(ptr, oldval, newval); +} + #ifndef O_DIRECT /* Workaround for OS X */ #define O_DIRECT (0) @@ -77,8 +92,10 @@ static inline void crc32set(void *crcp, uLong crc) *(uint32_t *)crcp = crc; } -extern void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr); +extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr); +extern void print_page_descr(struct rrdeng_page_descr *descr); extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); +extern int open_file_direct_io(char *path, int flags, uv_file *file); extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size); #endif /* NETDATA_RRDENGINELIB_H */
\ No newline at end of file diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c new file mode 100644 index 00000000..0eb9019b --- /dev/null +++ b/database/engine/rrdenglocking.c @@ -0,0 +1,233 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx) +{ + struct page_cache_descr *pg_cache_descr; + + pg_cache_descr = mallocz(sizeof(*pg_cache_descr)); + rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, 1); + pg_cache_descr->page = NULL; + pg_cache_descr->flags = 0; + pg_cache_descr->prev = pg_cache_descr->next = NULL; + pg_cache_descr->refcnt = 0; + pg_cache_descr->waiters = 0; + assert(0 == uv_cond_init(&pg_cache_descr->cond)); + assert(0 == uv_mutex_init(&pg_cache_descr->mutex)); + + return pg_cache_descr; +} + +void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr) +{ + uv_cond_destroy(&pg_cache_descr->cond); + uv_mutex_destroy(&pg_cache_descr->mutex); + freez(pg_cache_descr); + rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1); +} + +/* also allocates page cache descriptor if missing */ +void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, old_users, new_state, ret_state; + struct page_cache_descr *pg_cache_descr = NULL; + uint8_t we_locked; + + we_locked = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(we_locked)) { + assert(old_state & PG_CACHE_DESCR_LOCKED); + new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + assert(0 == old_users); + continue; /* spin */ + } + if (0 == old_state) { + /* no page cache descriptor has been allocated */ + + if (NULL == pg_cache_descr) { + pg_cache_descr = rrdeng_create_pg_cache_descr(ctx); + } + new_state = PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, 0, new_state); + if (0 == ret_state) { + we_locked = 1; + descr->pg_cache_descr = pg_cache_descr; + pg_cache_descr->descr = descr; + pg_cache_descr = NULL; /* make sure we don't free pg_cache_descr */ + /* retry */ + continue; + } + continue; /* spin */ + } + /* page cache descriptor is already allocated */ + assert(old_state & PG_CACHE_DESCR_ALLOCATED); + + new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT; + new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + /* spin */ + } + + if (pg_cache_descr) { + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + } + pg_cache_descr = descr->pg_cache_descr; + uv_mutex_lock(&pg_cache_descr->mutex); +} + +void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, new_state, ret_state, old_users; + struct page_cache_descr *pg_cache_descr; + uint8_t we_locked; + + uv_mutex_unlock(&descr->pg_cache_descr->mutex); + + we_locked = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(we_locked)) { + assert(0 == old_users); + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0); + if (old_state == ret_state) { + /* success */ + break; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + assert(0 == old_users); + continue; /* spin */ + } + assert(old_state & PG_CACHE_DESCR_ALLOCATED); + pg_cache_descr = descr->pg_cache_descr; + /* caller is the only page cache descriptor user and there are no pending references on the page */ + if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) && + !pg_cache_descr->flags && !pg_cache_descr->refcnt) { + new_state = PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + we_locked = 1; + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + /* retry */ + continue; + } + continue; /* spin */ + } + assert(old_users > 0); + new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT; + new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + /* spin */ + } + +} + +/* + * Tries to deallocate page cache descriptor. If it fails, it postpones deallocation by setting the + * PG_CACHE_DESCR_DESTROY flag which will be eventually cleared by a different context after doing + * the deallocation. + */ +void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, new_state, ret_state, old_users; + struct page_cache_descr *pg_cache_descr; + uint8_t just_locked, we_freed, must_unlock; + + just_locked = 0; + we_freed = 0; + must_unlock = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(just_locked)) { + assert(0 == old_users); + + must_unlock = 1; + just_locked = 0; + /* Try deallocate if there are no pending references on the page */ + if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) { + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + we_freed = 1; + /* success */ + continue; + } + continue; /* spin */ + } + if (unlikely(must_unlock)) { + assert(0 == old_users); + + if (we_freed) { + /* success */ + new_state = 0; + } else { + new_state = old_state | PG_CACHE_DESCR_DESTROY; + new_state &= ~PG_CACHE_DESCR_LOCKED; + } + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* unlocked */ + return; + } + continue; /* spin */ + } + if (!(old_state & PG_CACHE_DESCR_ALLOCATED)) { + /* don't do anything */ + return; + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + assert(0 == old_users); + continue; /* spin */ + } + pg_cache_descr = descr->pg_cache_descr; + /* caller is the only page cache descriptor user */ + if (0 == old_users) { + new_state = old_state | PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + just_locked = 1; + /* retry */ + continue; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_DESTROY) { + /* don't do anything */ + return; + } + /* plant PG_CACHE_DESCR_DESTROY so that other contexts eventually free the page cache descriptor */ + new_state = old_state | PG_CACHE_DESCR_DESTROY; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + return; + } + /* spin */ + } +}
\ No newline at end of file diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h new file mode 100644 index 00000000..127ddc90 --- /dev/null +++ b/database/engine/rrdenglocking.h @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGLOCKING_H +#define NETDATA_RRDENGLOCKING_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct page_cache_descr; + +extern struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx); +extern void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr); +extern void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +extern void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +extern void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); + +#endif /* NETDATA_RRDENGLOCKING_H */
\ No newline at end of file |