diff options
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/Makefile.am | 8 | ||||
-rw-r--r-- | database/engine/README.md | 109 | ||||
-rw-r--r-- | database/engine/datafile.c | 335 | ||||
-rw-r--r-- | database/engine/datafile.h | 63 | ||||
-rw-r--r-- | database/engine/journalfile.c | 462 | ||||
-rw-r--r-- | database/engine/journalfile.h | 46 | ||||
-rw-r--r-- | database/engine/pagecache.c | 792 | ||||
-rw-r--r-- | database/engine/pagecache.h | 132 | ||||
-rw-r--r-- | database/engine/rrddiskprotocol.h | 119 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 780 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 171 | ||||
-rw-r--r-- | database/engine/rrdengineapi.c | 484 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 37 | ||||
-rw-r--r-- | database/engine/rrdenginelib.c | 116 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 84 |
15 files changed, 3738 insertions, 0 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am new file mode 100644 index 000000000..19554bed8 --- /dev/null +++ b/database/engine/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/database/engine/README.md b/database/engine/README.md new file mode 100644 index 000000000..28a2528cb --- /dev/null +++ b/database/engine/README.md @@ -0,0 +1,109 @@ +# Database engine + +The Database Engine works like a traditional +database. There is some amount of RAM dedicated to data caching and indexing and the rest of +the data reside compressed on disk. The number of history entries is not fixed in this case, +but depends on the configured disk space and the effective compression ratio of the data stored. + +## Files + +With the DB engine memory mode the metric data are stored in database files. These files are +organized in pairs, the datafiles and their corresponding journalfiles, e.g.: + +``` +datafile-1-0000000001.ndf +journalfile-1-0000000001.njf +datafile-1-0000000002.ndf +journalfile-1-0000000002.njf +datafile-1-0000000003.ndf +journalfile-1-0000000003.njf +... +``` + +They are located under their host's cache directory in the directory `./dbengine` +(e.g. for localhost the default location is `/var/cache/netdata/dbengine/*`). The higher +numbered filenames contain more recent metric data. The user can safely delete some pairs +of files when netdata is stopped to manually free up some space. + +*Users should* **back up** *their `./dbengine` folders if they consider this data to be important.* + +## Configuration + +There is one DB engine instance per netdata host/node. That is, there is one `./dbengine` folder +per node, and all charts of `dbengine` memory mode in such a host share the same storage space +and DB engine instance memory state. You can select the memory mode for localhost by editing +netdata.conf and setting: + +``` +[global] + memory mode = dbengine +``` + +For setting the memory mode for the rest of the nodes you should look at +[streaming](../../streaming/). + +The `history` configuration option is meaningless for `memory mode = dbengine` and is ignored +for any metrics being stored in the DB engine. + +All DB engine instances, for localhost and all other streaming recipient nodes inherit their +configuration from `netdata.conf`: + +``` +[global] + page cache size = 32 + dbengine disk space = 256 +``` + +The above values are the default and minimum values for Page Cache size and DB engine disk space +quota. Both numbers are in **MiB**. All DB engine instances will allocate the configured resources +separately. + +The `page cache size` option determines the amount of RAM in **MiB** that is dedicated to caching +netdata metric values themselves. + +The `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated +to storing netdata metric values and all related metadata describing them. + +## Operation + +The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets +its own page to store consecutive values generated from the data collectors. Those pages comprise +the **Page Cache**. + +When those pages fill up they are slowly compressed and flushed to disk. +It can take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected +every 1 second, to fill a page. Pages can be cut short when we stop netdata or the DB engine +instance so as to not lose the data. When we query the DB engine for data we trigger disk read +I/O requests that fill the Page Cache with the requested pages and potentially evict cold +(not recently used) pages. + +When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by +automatically deleting the oldest datafile and journalfile pair. Any corresponding pages residing +in the Page Cache will also be invalidated and removed. The DB engine logic will try to maintain +between 10 and 20 file pairs at any point in time. + +The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not +generate excessive I/O traffic so as to create the minimum possible interference with other +applications. + +## Memory requirements + +Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that +is much larger than the available memory. + +There are explicit memory requirements **per** DB engine **instance**, meaning **per** netdata +**node** (e.g. localhost and streaming recipient nodes): + +- `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes. + +- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata. + + - roughly speaking this is 6% of the uncompressed disk space taken by the DB files. + + - for very highly compressible data (compression ratio > 90%) this RAM overhead + is comparable to the disk space footprint. + +An important observation is that RAM usage depends on both the `page cache size` and the +`dbengine disk space` options. + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]() diff --git a/database/engine/datafile.c b/database/engine/datafile.c new file mode 100644 index 000000000..2d17d05e4 --- /dev/null +++ b/database/engine/datafile.c @@ -0,0 +1,335 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +void df_extent_insert(struct extent_info *extent) +{ + struct rrdengine_datafile *datafile = extent->datafile; + + if (likely(NULL != datafile->extents.last)) { + datafile->extents.last->next = extent; + } + if (unlikely(NULL == datafile->extents.first)) { + datafile->extents.first = extent; + } + datafile->extents.last = extent; +} + +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + if (likely(NULL != ctx->datafiles.last)) { + ctx->datafiles.last->next = datafile; + } + if (unlikely(NULL == ctx->datafiles.first)) { + ctx->datafiles.first = datafile; + } + ctx->datafiles.last = datafile; +} + +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + struct rrdengine_datafile *next; + + next = datafile->next; + assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); + ctx->datafiles.first = next; +} + + +static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx, + unsigned tier, unsigned fileno) +{ + assert(tier == 1); + datafile->tier = tier; + datafile->fileno = fileno; + datafile->file = (uv_file)0; + datafile->pos = 0; + datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */ + datafile->journalfile = NULL; + datafile->next = NULL; + datafile->ctx = ctx; +} + +static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +int destroy_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret, fd; + char path[1024]; + + ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); + if (ret < 0) { + fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + fatal("uv_fs_close: %s", uv_strerror(ret)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + generate_datafilepath(datafile, path, sizeof(path)); + fd = uv_fs_unlink(NULL, &req, path, NULL); + if (fd < 0) { + fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return 0; +} + +int create_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + char path[1024]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, + S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + } + assert(req.result >= 0); + file = req.result; + uv_fs_req_cleanup(&req); +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ); + superblock->tier = 1; + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal("uv_fs_write: %s", uv_strerror(ret)); + } + if (req.result < 0) { + fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + } + uv_fs_req_cleanup(&req); + free(superblock); + + datafile->file = file; + datafile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + ++ctx->stats.datafile_creations; + + return 0; +} + +static int check_data_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || + superblock->tier != 1) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + free(superblock); + return ret; +} + +static int load_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + uint64_t file_size; + char path[1024]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + /* if (UV_ENOENT != fd) */ + error("uv_fs_fsopen: %s", uv_strerror(fd)); + uv_fs_req_cleanup(&req); + return fd; + } + assert(req.result >= 0); + file = req.result; + uv_fs_req_cleanup(&req); +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + info("Initializing data file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_CEILING(file_size); + + ret = check_data_file_superblock(file); + if (ret) + goto error; + ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb); + ++ctx->stats.io_read_requests; + + datafile->file = file; + datafile->pos = file_size; + + info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + return 0; + + error: + (void) uv_fs_close(NULL, &req, file, NULL); + uv_fs_req_cleanup(&req); + return ret; +} + +static int scan_data_files_cmp(const void *a, const void *b) +{ + struct rrdengine_datafile *file1, *file2; + char path1[1024], path2[1024]; + + file1 = *(struct rrdengine_datafile **)a; + file2 = *(struct rrdengine_datafile **)b; + generate_datafilepath(file1, path1, sizeof(path1)); + generate_datafilepath(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of datafiles that were loaded */ +static int scan_data_files(struct rrdengine_instance *ctx) +{ + int ret; + unsigned tier, no, matched_files, i,failed_to_load; + static uv_fs_t req; + uv_dirent_t dent; + struct rrdengine_datafile **datafiles, *datafile; + struct rrdengine_journalfile *journalfile; + + ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); + assert(ret >= 0); + assert(req.result >= 0); + info("Found %d files in path %s", ret, ctx->dbfiles_path); + + datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s\"", dent.name); + ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); + if (2 == ret) { + info("Matched file \"%s\"", dent.name); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, no); + datafiles[matched_files++] = datafile; + } + } + uv_fs_req_cleanup(&req); + + if (matched_files == MAX_DATAFILES) { + error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + } + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + datafile = datafiles[i]; + ret = load_data_file(datafile); + if (0 != ret) { + free(datafile); + ++failed_to_load; + continue; + } + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = load_journal_file(ctx, journalfile, datafile); + if (0 != ret) { + free(datafile); + free(journalfile); + ++failed_to_load; + continue; + } + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; + } + if (failed_to_load) { + error("%u files failed to load.", failed_to_load); + } + free(datafiles); + + return matched_files - failed_to_load; +} + +/* Creates a datafile and a journalfile pair */ +void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +{ + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + int ret; + + info("Creating new data and journal files."); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, fileno); + ret = create_data_file(datafile); + assert(!ret); + + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = create_journal_file(journalfile, datafile); + assert(!ret); + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; +} + +/* Page cache must already be initialized. */ +int init_data_files(struct rrdengine_instance *ctx) +{ + int ret; + + ret = scan_data_files(ctx); + if (0 == ret) { + info("Data files not found, creating."); + create_new_datafile_pair(ctx, 1, 1); + } + return 0; +}
\ No newline at end of file diff --git a/database/engine/datafile.h b/database/engine/datafile.h new file mode 100644 index 000000000..c5c8f31f3 --- /dev/null +++ b/database/engine/datafile.h @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DATAFILE_H +#define NETDATA_DATAFILE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_datafile; +struct rrdengine_journalfile; +struct rrdengine_instance; + +#define DATAFILE_PREFIX "datafile-" +#define DATAFILE_EXTENSION ".ndf" + +#define MAX_DATAFILE_SIZE (1073741824LU) +#define MIN_DATAFILE_SIZE (16777216LU) +#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ +#define TARGET_DATAFILES (20) + +#define DATAFILE_IDEAL_IO_SIZE (1048576U) + +struct extent_info { + uint64_t offset; + uint32_t size; + uint8_t number_of_pages; + struct rrdengine_datafile *datafile; + struct extent_info *next; + struct rrdeng_page_cache_descr *pages[]; +}; + +struct rrdengine_df_extents { + /* the extent list is sorted based on disk offset */ + struct extent_info *first; + struct extent_info *last; +}; + +/* only one event loop is supported for now */ +struct rrdengine_datafile { + unsigned tier; + unsigned fileno; + uv_file file; + uint64_t pos; + struct rrdengine_instance *ctx; + struct rrdengine_df_extents extents; + struct rrdengine_journalfile *journalfile; + struct rrdengine_datafile *next; +}; + +struct rrdengine_datafile_list { + struct rrdengine_datafile *first; /* oldest */ + struct rrdengine_datafile *last; /* newest */ +}; + +extern void df_extent_insert(struct extent_info *extent); +extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern int destroy_data_file(struct rrdengine_datafile *datafile); +extern int create_data_file(struct rrdengine_datafile *datafile); +extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +extern int init_data_files(struct rrdengine_instance *ctx); + +#endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c new file mode 100644 index 000000000..44d8461db --- /dev/null +++ b/database/engine/journalfile.c @@ -0,0 +1,462 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +static void flush_transaction_buffer_cb(uv_fs_t* req) +{ + struct generic_io_descriptor *io_descr; + + debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + if (req->result < 0) { + fatal("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } + io_descr = req->data; + + uv_fs_req_cleanup(req); + free(io_descr->buf); + free(io_descr); +} + +/* Careful to always call this before creating a new journal file */ +void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + struct generic_io_descriptor *io_descr; + unsigned pos, size; + struct rrdengine_journalfile *journalfile; + + if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) { + return; + } + /* care with outstanding transactions when switching journal files */ + journalfile = ctx->datafiles.last->journalfile; + + io_descr = mallocz(sizeof(*io_descr)); + pos = ctx->commit_log.buf_pos; + size = ctx->commit_log.buf_size; + if (pos < size) { + /* simulate an empty transaction to skip the rest of the block */ + *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING; + } + io_descr->buf = ctx->commit_log.buf; + io_descr->bytes = size; + io_descr->pos = journalfile->pos; + io_descr->req.data = io_descr; + io_descr->completion = NULL; + + io_descr->iov = uv_buf_init((void *)io_descr->buf, size); + ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, + journalfile->pos, flush_transaction_buffer_cb); + assert (-1 != ret); + journalfile->pos += RRDENG_BLOCK_SIZE; + ctx->disk_space += RRDENG_BLOCK_SIZE; + ctx->commit_log.buf = NULL; + ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE; + ++ctx->stats.io_write_requests; +} + +void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + unsigned buf_pos, buf_size; + + assert(size); + if (ctx->commit_log.buf) { + unsigned remaining; + + buf_pos = ctx->commit_log.buf_pos; + buf_size = ctx->commit_log.buf_size; + remaining = buf_size - buf_pos; + if (size > remaining) { + /* we need a new buffer */ + wal_flush_transaction_buffer(wc); + } + } + if (NULL == ctx->commit_log.buf) { + buf_size = ALIGN_BYTES_CEILING(size); + ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + buf_pos = ctx->commit_log.buf_pos = 0; + ctx->commit_log.buf_size = buf_size; + } + ctx->commit_log.buf_pos += size; + + return ctx->commit_log.buf + buf_pos; +} + +static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + journalfile->file = (uv_file)0; + journalfile->pos = 0; + journalfile->datafile = datafile; +} + +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret, fd; + char path[1024]; + + ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); + if (ret < 0) { + fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + fatal("uv_fs_close: %s", uv_strerror(ret)); + exit(ret); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = uv_fs_unlink(NULL, &req, path, NULL); + if (fd < 0) { + fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return 0; +} + +int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + char path[1024]; + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, + S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + } + assert(req.result >= 0); + file = req.result; + uv_fs_req_cleanup(&req); +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ); + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal("uv_fs_write: %s", uv_strerror(ret)); + } + if (req.result < 0) { + fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + } + uv_fs_req_cleanup(&req); + free(superblock); + + journalfile->file = file; + journalfile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + ++ctx->stats.journalfile_creations; + + return 0; +} + +static int check_journal_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + free(superblock); + return ret; +} + +static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, unsigned max_size) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned i, count, payload_length, descr_size, valid_pages; + struct rrdeng_page_cache_descr *descr; + struct extent_info *extent; + /* persistent structures */ + struct rrdeng_jf_store_data *jf_metric_data; + + jf_metric_data = buf; + count = jf_metric_data->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + if (payload_length > max_size) { + error("Corrupted transaction payload."); + return; + } + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + extent->offset = jf_metric_data->extent_offset; + extent->size = jf_metric_data->extent_size; + extent->number_of_pages = count; + extent->datafile = journalfile->datafile; + extent->next = NULL; + + for (i = 0, valid_pages = 0 ; i < count ; ++i) { + uuid_t *temp_id; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + + if (PAGE_METRICS != jf_metric_data->descr[i].type) { + error("Unknown page type encountered."); + continue; + } + ++valid_pages; + temp_id = (uuid_t *)jf_metric_data->descr[i].uuid; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + /* First time we see the UUID */ + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); + assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(temp_id); + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + + descr = pg_cache_create_descr(); + descr->page_length = jf_metric_data->descr[i].page_length; + descr->start_time = jf_metric_data->descr[i].start_time; + descr->end_time = jf_metric_data->descr[i].end_time; + descr->id = &page_index->id; + descr->extent = extent; + extent->pages[i] = descr; + pg_cache_insert(ctx, page_index, descr); + } + if (likely(valid_pages)) + df_extent_insert(extent); +} + +/* + * Replays transaction by interpreting up to max_size bytes from buf. + * Sets id to the current transaction id or to 0 if unknown. + * Returns size of transaction record or 0 for unknown size. + */ +static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, uint64_t *id, unsigned max_size) +{ + unsigned payload_length, size_bytes; + int ret; + /* persistent structures */ + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + *id = 0; + jf_header = buf; + if (STORE_PADDING == jf_header->type) { + debug(D_RRDENGINE, "Skipping padding."); + return 0; + } + if (sizeof(*jf_header) > max_size) { + error("Corrupted transaction record, skipping."); + return 0; + } + *id = jf_header->id; + payload_length = jf_header->payload_length; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + if (size_bytes > max_size) { + error("Corrupted transaction record, skipping."); + return 0; + } + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + ret = crc32cmp(jf_trailer->checksum, crc); + debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); + if (unlikely(ret)) { + return size_bytes; + } + switch (jf_header->type) { + case STORE_DATA: + debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + break; + default: + error("Unknown transaction type. Skipping record."); + break; + } + + return size_bytes; +} + + +#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256) +/* + * Iterates journal file transactions and populates the page cache. + * Page cache must already be initialized. + * Returns the maximum transaction id it discovered. + */ +static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) +{ + uv_file file; + uint64_t file_size;//, data_file_size; + int ret; + uint64_t pos, pos_i, max_id, id; + unsigned size_bytes; + void *buf; + uv_buf_t iov; + uv_fs_t req; + + file = journalfile->file; + file_size = journalfile->pos; + //data_file_size = journalfile->datafile->pos; TODO: utilize this? + + max_id = 1; + ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + + for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) { + size_bytes = MIN(READAHEAD_BYTES, file_size - pos); + iov = uv_buf_init(buf, size_bytes); + ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); + if (ret < 0) { + fatal("uv_fs_read: %s", uv_strerror(ret)); + /*uv_fs_req_cleanup(&req);*/ + } + assert(req.result >= 0); + uv_fs_req_cleanup(&req); + ctx->stats.io_read_bytes += size_bytes; + ++ctx->stats.io_read_requests; + + //pos_i = pos; + //while (pos_i < pos + size_bytes) { + for (pos_i = 0 ; pos_i < size_bytes ; ) { + unsigned max_size; + + max_size = pos + size_bytes - pos_i; + ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size); + if (!ret) /* TODO: support transactions bigger than 4K */ + /* unknown transaction size, move on to the next block */ + pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE); + else + pos_i += ret; + max_id = MAX(max_id, id); + } + } + + free(buf); + return max_id; +} + +int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile) +{ + uv_fs_t req; + uv_file file; + int ret, fd; + uint64_t file_size, max_id; + char path[1024]; + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + /* if (UV_ENOENT != fd) */ + error("uv_fs_fsopen: %s", uv_strerror(fd)); + uv_fs_req_cleanup(&req); + return fd; + } + assert(req.result >= 0); + file = req.result; + uv_fs_req_cleanup(&req); +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + info("Loading journal file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_FLOOR(file_size); + + ret = check_journal_file_superblock(file); + if (ret) + goto error; + ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb); + ++ctx->stats.io_read_requests; + + journalfile->file = file; + journalfile->pos = file_size; + + max_id = iterate_transactions(ctx, journalfile); + + ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1); + + info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size); + return 0; + + error: + (void) uv_fs_close(NULL, &req, file, NULL); + uv_fs_req_cleanup(&req); + return ret; +} + +void init_commit_log(struct rrdengine_instance *ctx) +{ + ctx->commit_log.buf = NULL; + ctx->commit_log.buf_pos = 0; + ctx->commit_log.transaction_id = 1; +}
\ No newline at end of file diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h new file mode 100644 index 000000000..50489aeee --- /dev/null +++ b/database/engine/journalfile.h @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_JOURNALFILE_H +#define NETDATA_JOURNALFILE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_instance; +struct rrdengine_worker_config; +struct rrdengine_datafile; +struct rrdengine_journalfile; + +#define WALFILE_PREFIX "journalfile-" +#define WALFILE_EXTENSION ".njf" + + +/* only one event loop is supported for now */ +struct rrdengine_journalfile { + uv_file file; + uint64_t pos; + + struct rrdengine_datafile *datafile; +}; + +/* only one event loop is supported for now */ +struct transaction_commit_log { + uint64_t transaction_id; + + /* outstanding transaction buffer */ + void *buf; + unsigned buf_pos; + unsigned buf_size; +}; + +extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); +extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); +extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile); +extern void init_commit_log(struct rrdengine_instance *ctx); + + +#endif /* NETDATA_JOURNALFILE_H */
\ No newline at end of file diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c new file mode 100644 index 000000000..c90947a67 --- /dev/null +++ b/database/engine/pagecache.c @@ -0,0 +1,792 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" + +/* Forward declerations */ +static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx); + +/* always inserts into tail */ +static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + if (likely(NULL != pg_cache->replaceQ.tail)) { + descr->prev = pg_cache->replaceQ.tail; + pg_cache->replaceQ.tail->next = descr; + } + if (unlikely(NULL == pg_cache->replaceQ.head)) { + pg_cache->replaceQ.head = descr; + } + pg_cache->replaceQ.tail = descr; +} + +static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_cache_descr *prev, *next; + + prev = descr->prev; + next = descr->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(descr == pg_cache->replaceQ.head)) { + pg_cache->replaceQ.head = next; + } + if (unlikely(descr == pg_cache->replaceQ.tail)) { + pg_cache->replaceQ.tail = prev; + } + descr->prev = descr->next = NULL; +} + +void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_insert_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} + +void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} +void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + pg_cache_replaceQ_insert_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} + +struct rrdeng_page_cache_descr *pg_cache_create_descr(void) +{ + struct rrdeng_page_cache_descr *descr; + + descr = mallocz(sizeof(*descr)); + descr->page = NULL; + descr->page_length = 0; + descr->start_time = INVALID_TIME; + descr->end_time = INVALID_TIME; + descr->id = NULL; + descr->extent = NULL; + descr->flags = 0; + descr->prev = descr->next = descr->private = NULL; + descr->refcnt = 0; + descr->waiters = 0; + descr->handle = NULL; + assert(0 == uv_cond_init(&descr->cond)); + assert(0 == uv_mutex_init(&descr->mutex)); + + return descr; +} + +void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr) +{ + uv_cond_destroy(&descr->cond); + uv_mutex_destroy(&descr->mutex); + free(descr); +} + +/* The caller must hold page descriptor lock. */ +void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr) +{ + if (descr->waiters) + uv_cond_broadcast(&descr->cond); +} + +/* + * The caller must hold page descriptor lock. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + */ +void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr) +{ + ++descr->waiters; + uv_cond_wait(&descr->cond, &descr->mutex); + --descr->waiters; +} + +/* + * Returns page flags. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + */ +unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr) +{ + unsigned long flags; + + uv_mutex_lock(&descr->mutex); + pg_cache_wait_event_unsafe(descr); + flags = descr->flags; + uv_mutex_unlock(&descr->mutex); + + return flags; +} + +/* + * The caller must hold page descriptor lock. + * Gets a reference to the page descriptor. + * Returns 1 on success and 0 on failure. + */ +int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access) +{ + if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || + (exclusive_access && descr->refcnt)) { + return 0; + } + if (exclusive_access) + descr->flags |= RRD_PAGE_LOCKED; + ++descr->refcnt; + + return 1; +} + +/* + * The caller must hold page descriptor lock. + * Same return values as pg_cache_try_get_unsafe() without doing anything. + */ +int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access) +{ + if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || + (exclusive_access && descr->refcnt)) { + return 0; + } + + return 1; +} + +/* + * The caller must hold the page descriptor lock. + * This function may block doing cleanup. + */ +void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr) +{ + descr->flags &= ~RRD_PAGE_LOCKED; + if (0 == --descr->refcnt) { + pg_cache_wake_up_waiters_unsafe(descr); + } + /* TODO: perform cleanup */ +} + +/* + * This function may block doing cleanup. + */ +void pg_cache_put(struct rrdeng_page_cache_descr *descr) +{ + uv_mutex_lock(&descr->mutex); + pg_cache_put_unsafe(descr); + uv_mutex_unlock(&descr->mutex); +} + +/* The caller must hold the page cache lock */ +static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->populated_pages -= number; +} + +static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + pg_cache_release_pages_unsafe(ctx, number); + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} +/* + * This function will block until it reserves #number populated pages. + * It will trigger evictions or dirty page flushing if the ctx->max_cache_pages limit is hit. + */ +static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + assert(number < ctx->max_cache_pages); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) + debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================", + number); + while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) { + if (!pg_cache_try_evict_one_page_unsafe(ctx)) { + /* failed to evict */ + struct completion compl; + struct rrdeng_cmd cmd; + + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + init_completion(&compl); + cmd.opcode = RRDENG_FLUSH_PAGES; + cmd.completion = &compl; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + /* wait for some pages to be flushed */ + debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__); + wait_for_completion(&compl); + destroy_completion(&compl); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + } + } + pg_cache->populated_pages += number; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +/* + * This function will attempt to reserve #number populated pages. + * It may trigger evictions if the ctx->cache_pages_low_watermark limit is hit. + * Returns 0 on failure and 1 on success. + */ +static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned count = 0; + int ret = 0; + + assert(number < ctx->max_cache_pages); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) { + debug(D_RRDENGINE, + "=================================\nPage cache full. Trying to reserve %u pages.\n=================================", + number); + do { + if (!pg_cache_try_evict_one_page_unsafe(ctx)) + break; + ++count; + } while (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1); + debug(D_RRDENGINE, "Evicted %u pages.", count); + } + + if (pg_cache->populated_pages + number < ctx->max_cache_pages + 1) { + pg_cache->populated_pages += number; + ret = 1; /* success */ + } + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + return ret; +} + +/* The caller must hold the page cache and the page descriptor locks in that order */ +static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr) +{ + free(descr->page); + descr->page = NULL; + descr->flags &= ~RRD_PAGE_POPULATED; + pg_cache_release_pages_unsafe(ctx, 1); + ++ctx->stats.pg_cache_evictions; +} + +/* + * The caller must hold the page cache lock. + * Lock order: page cache -> replaceQ -> descriptor + * This function iterates all pages and tries to evict one. + * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress, + * or it sets it to NULL if no write-back is in progress. + * + * Returns 1 on success and 0 on failure. + */ +static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned long old_flags; + struct rrdeng_page_cache_descr *descr; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) { + uv_mutex_lock(&descr->mutex); + old_flags = descr->flags; + if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) { + /* must evict */ + pg_cache_evict_unsafe(ctx, descr); + pg_cache_put_unsafe(descr); + uv_mutex_unlock(&descr->mutex); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + + return 1; + } + uv_mutex_unlock(&descr->mutex); + }; + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + + /* failed to evict */ + return 0; +} + +/* + * TODO: last waiter frees descriptor ? + */ +void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + int ret; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); + assert(NULL != PValue); + page_index = *PValue; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + uv_rwlock_wrlock(&page_index->lock); + ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); + uv_rwlock_wrunlock(&page_index->lock); + if (unlikely(0 == ret)) { + error("Page under deletion was not in index."); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + goto destroy; + } + assert(1 == ret); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + ++ctx->stats.pg_cache_deletions; + --pg_cache->page_descriptors; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + uv_mutex_lock(&descr->mutex); + while (!pg_cache_try_get_unsafe(descr, 1)) { + debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } + /* even a locked page could be dirty */ + while (unlikely(descr->flags & RRD_PAGE_DIRTY)) { + debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } + uv_mutex_unlock(&descr->mutex); + + if (descr->flags & RRD_PAGE_POPULATED) { + /* only after locking can it be safely deleted from LRU */ + pg_cache_replaceQ_delete(ctx, descr); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + pg_cache_evict_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + } + pg_cache_put(descr); +destroy: + pg_cache_destroy_descr(descr); + pg_cache_update_metric_times(page_index); +} + +static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time) +{ + usec_t pg_start, pg_end; + + pg_start = descr->start_time; + pg_end = descr->end_time; + + return (pg_start < start_time && pg_end >= start_time) || + (pg_start >= start_time && pg_start <= end_time); +} + +static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time) +{ + return (point_in_time >= descr->start_time && point_in_time <= descr->end_time); +} + +/* Update metric oldest and latest timestamps efficiently when adding new values */ +void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr) +{ + usec_t oldest_time = page_index->oldest_time; + usec_t latest_time = page_index->latest_time; + + if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) { + page_index->oldest_time = descr->start_time; + } + if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) { + page_index->latest_time = descr->end_time; + } +} + +/* Update metric oldest and latest timestamps when removing old values */ +void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) +{ + Pvoid_t *firstPValue, *lastPValue; + Word_t firstIndex, lastIndex; + struct rrdeng_page_cache_descr *descr; + usec_t oldest_time = INVALID_TIME; + usec_t latest_time = INVALID_TIME; + + uv_rwlock_rdlock(&page_index->lock); + /* Find first page in range */ + firstIndex = (Word_t)0; + firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0); + if (likely(NULL != firstPValue)) { + descr = *firstPValue; + oldest_time = descr->start_time; + } + lastIndex = (Word_t)-1; + lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0); + if (likely(NULL != lastPValue)) { + descr = *lastPValue; + latest_time = descr->end_time; + } + uv_rwlock_rdunlock(&page_index->lock); + + if (unlikely(NULL == firstPValue)) { + assert(NULL == lastPValue); + page_index->oldest_time = page_index->latest_time = INVALID_TIME; + return; + } + page_index->oldest_time = oldest_time; + page_index->latest_time = latest_time; +} + +/* If index is NULL lookup by UUID (descr->id) */ +void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, + struct rrdeng_page_cache_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + + if (descr->flags & RRD_PAGE_POPULATED) { + pg_cache_reserve_pages(ctx, 1); + if (!(descr->flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_insert(ctx, descr); + } + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); + assert(NULL != PValue); + page_index = *PValue; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + } else { + page_index = index; + } + + uv_rwlock_wrlock(&page_index->lock); + PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); + *PValue = descr; + pg_cache_add_new_metric_time(page_index, descr); + uv_rwlock_wrunlock(&page_index->lock); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + ++ctx->stats.pg_cache_insertions; + ++pg_cache->page_descriptors; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +/* + * Searches for a page and triggers disk I/O if necessary and possible. + * Does not get a reference. + * Returns page index pointer for given metric UUID. + */ +struct pg_cache_page_index * + pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; + int i, j, k, count, found; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + Word_t Index; + uint8_t failed_to_reserve; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); + return NULL; + } + + uv_rwlock_rdlock(&page_index->lock); + /* Find first page in range */ + found = 0; + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + found = 1; + } + } + if (!found) { + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + found = 1; + } + } + } + if (!found) { + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); + return page_index; + } + + for (count = 0 ; + descr != NULL && is_page_in_time_range(descr, start_time, end_time); + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + /* Iterate all pages in range */ + + if (unlikely(0 == descr->page_length)) + continue; + uv_mutex_lock(&descr->mutex); + flags = descr->flags; + if (pg_cache_can_get_unsafe(descr, 0)) { + if (flags & RRD_PAGE_POPULATED) { + /* success */ + uv_mutex_unlock(&descr->mutex); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + continue; + } + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + preload_array[count++] = descr; + if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) { + uv_mutex_unlock(&descr->mutex); + break; + } + } + uv_mutex_unlock(&descr->mutex); + + }; + uv_rwlock_rdunlock(&page_index->lock); + + failed_to_reserve = 0; + for (i = 0 ; i < count && !failed_to_reserve ; ++i) { + struct rrdeng_cmd cmd; + struct rrdeng_page_cache_descr *next; + + descr = preload_array[i]; + if (NULL == descr) { + continue; + } + if (!pg_cache_try_reserve_pages(ctx, 1)) { + failed_to_reserve = 1; + break; + } + cmd.opcode = RRDENG_READ_EXTENT; + cmd.read_extent.page_cache_descr[0] = descr; + /* don't use this page again */ + preload_array[i] = NULL; + for (j = 0, k = 1 ; j < count ; ++j) { + next = preload_array[j]; + if (NULL == next) { + continue; + } + if (descr->extent == next->extent) { + /* same extent, consolidate */ + if (!pg_cache_try_reserve_pages(ctx, 1)) { + failed_to_reserve = 1; + break; + } + cmd.read_extent.page_cache_descr[k++] = next; + /* don't use this page again */ + preload_array[j] = NULL; + } + } + cmd.read_extent.page_count = k; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + } + if (failed_to_reserve) { + debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__); + for (i = 0 ; i < count ; ++i) { + descr = preload_array[i]; + if (NULL == descr) { + continue; + } + pg_cache_put(descr); + } + } + if (!count) { + /* no such page */ + debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__); + } + return page_index; +} + +/* + * Searches for a page and gets a reference. + * When point_in_time is INVALID_TIME get any page. + * If index is NULL lookup by UUID (id). + */ +struct rrdeng_page_cache_descr * + pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t point_in_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_cache_descr *descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + Word_t Index; + uint8_t page_not_in_cache; + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return NULL; + } + } else { + page_index = index; + } + pg_cache_reserve_pages(ctx, 1); + + page_not_in_cache = 0; + uv_rwlock_rdlock(&page_index->lock); + while (1) { + Index = (Word_t)(point_in_time / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + } + if (NULL == PValue || + 0 == descr->page_length || + (INVALID_TIME != point_in_time && + !is_point_in_time_in_page(descr, point_in_time))) { + /* non-empty page not found */ + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + uv_mutex_lock(&descr->mutex); + flags = descr->flags; + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { + /* success */ + uv_mutex_unlock(&descr->mutex); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + break; + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + struct rrdeng_cmd cmd; + + uv_rwlock_rdunlock(&page_index->lock); + + cmd.opcode = RRDENG_READ_PAGE; + cmd.read_page.page_cache_descr = descr; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + while (!(descr->flags & RRD_PAGE_POPULATED)) { + pg_cache_wait_event_unsafe(descr); + } + /* success */ + /* Downgrade exclusive reference to allow other readers */ + descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_wake_up_waiters_unsafe(descr); + uv_mutex_unlock(&descr->mutex); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + return descr; + } + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + if (!(flags & RRD_PAGE_POPULATED)) + page_not_in_cache = 1; + pg_cache_wait_event_unsafe(descr); + uv_mutex_unlock(&descr->mutex); + + /* reset scan to find again */ + uv_rwlock_rdlock(&page_index->lock); + } + uv_rwlock_rdunlock(&page_index->lock); + + if (!(flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_set_hot(ctx, descr); + pg_cache_release_pages(ctx, 1); + if (page_not_in_cache) + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + else + rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); + return descr; +} + +struct pg_cache_page_index *create_page_index(uuid_t *id) +{ + struct pg_cache_page_index *page_index; + + page_index = mallocz(sizeof(*page_index)); + page_index->JudyL_array = (Pvoid_t) NULL; + uuid_copy(page_index->id, *id); + assert(0 == uv_rwlock_init(&page_index->lock)); + page_index->oldest_time = INVALID_TIME; + page_index->latest_time = INVALID_TIME; + + return page_index; +} + +static void init_metrics_index(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; + assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); +} + +static void init_replaceQ(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->replaceQ.head = NULL; + pg_cache->replaceQ.tail = NULL; + assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); +} + +static void init_commited_page_index(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->commited_page_index.JudyL_array = (Pvoid_t) NULL; + assert(0 == uv_rwlock_init(&pg_cache->commited_page_index.lock)); + pg_cache->commited_page_index.latest_corr_id = 0; + pg_cache->commited_page_index.nr_commited_pages = 0; +} + +void init_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->page_descriptors = 0; + pg_cache->populated_pages = 0; + assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); + + init_metrics_index(ctx); + init_replaceQ(ctx); + init_commited_page_index(ctx); +}
\ No newline at end of file diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h new file mode 100644 index 000000000..d1e29aaab --- /dev/null +++ b/database/engine/pagecache.h @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_PAGECACHE_H +#define NETDATA_PAGECACHE_H + +#include "rrdengine.h" + +/* Forward declerations */ +struct rrdengine_instance; +struct extent_info; + +#define INVALID_TIME (0) + +/* Page flags */ +#define RRD_PAGE_DIRTY (1LU << 0) +#define RRD_PAGE_LOCKED (1LU << 1) +#define RRD_PAGE_READ_PENDING (1LU << 2) +#define RRD_PAGE_WRITE_PENDING (1LU << 3) +#define RRD_PAGE_POPULATED (1LU << 4) + +struct rrdeng_page_cache_descr { + void *page; + uint32_t page_length; + usec_t start_time; + usec_t end_time; + uuid_t *id; /* never changes */ + struct extent_info *extent; + unsigned long flags; + void *private; + struct rrdeng_page_cache_descr *prev; + struct rrdeng_page_cache_descr *next; + + /* TODO: move waiter logic to concurrency table */ + unsigned refcnt; + uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */ + uv_cond_t cond; + unsigned waiters; + struct rrdeng_collect_handle *handle; /* API user */ +}; + +#define PAGE_CACHE_MAX_PRELOAD_PAGES (256) + +/* maps time ranges to pages */ +struct pg_cache_page_index { + uuid_t id; + /* + * care: JudyL_array indices are converted from useconds to seconds to fit in one word in 32-bit architectures + * TODO: examine if we want to support better granularity than seconds + */ + Pvoid_t JudyL_array; + uv_rwlock_t lock; + + /* + * Only one effective writer, data deletion workqueue. + * It's also written during the DB loading phase. + */ + usec_t oldest_time; + + /* + * Only one effective writer, data collection thread. + * It's also written by the data deletion workqueue when data collection is disabled for this metric. + */ + usec_t latest_time; +}; + +/* maps UUIDs to page indices */ +struct pg_cache_metrics_index { + uv_rwlock_t lock; + Pvoid_t JudyHS_array; +}; + +/* gathers dirty pages to be written on disk */ +struct pg_cache_commited_page_index { + uv_rwlock_t lock; + + Pvoid_t JudyL_array; + + /* + * Dirty page correlation ID is a hint. Dirty pages that are correlated should have + * a small correlation ID difference. Dirty pages in memory should never have the + * same ID at the same time for correctness. + */ + Word_t latest_corr_id; + + unsigned nr_commited_pages; +}; + +/* gathers populated pages to be evicted */ +struct pg_cache_replaceQ { + uv_rwlock_t lock; /* LRU lock */ + + struct rrdeng_page_cache_descr *head; /* LRU */ + struct rrdeng_page_cache_descr *tail; /* MRU */ +}; + +struct page_cache { /* TODO: add statistics */ + uv_rwlock_t pg_cache_rwlock; /* page cache lock */ + + struct pg_cache_metrics_index metrics_index; + struct pg_cache_commited_page_index commited_page_index; + struct pg_cache_replaceQ replaceQ; + + unsigned page_descriptors; + unsigned populated_pages; +}; + +extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr); +extern void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr); +extern unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr); +extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr); +extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr); +extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, + struct rrdeng_page_cache_descr *descr); +extern struct rrdeng_page_cache_descr *pg_cache_create_descr(void); +extern void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr); +extern void pg_cache_put(struct rrdeng_page_cache_descr *descr); +extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, + struct rrdeng_page_cache_descr *descr); +extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr); +extern struct pg_cache_page_index * + pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time); +extern struct rrdeng_page_cache_descr * + pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t point_in_time); +extern struct pg_cache_page_index *create_page_index(uuid_t *id); +extern void init_page_cache(struct rrdengine_instance *ctx); +extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr); +extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); + +#endif /* NETDATA_PAGECACHE_H */
\ No newline at end of file diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h new file mode 100644 index 000000000..db47af531 --- /dev/null +++ b/database/engine/rrddiskprotocol.h @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDDISKPROTOCOL_H +#define NETDATA_RRDDISKPROTOCOL_H + +#define RRDENG_BLOCK_SIZE (4096) +#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE + +#define RRDENG_MAGIC_SZ (32) +#define RRDENG_DF_MAGIC "netdata-data-file" +#define RRDENG_JF_MAGIC "netdata-journal-file" + +#define RRDENG_VER_SZ (16) +#define RRDENG_DF_VER "1.0" +#define RRDENG_JF_VER "1.0" + +#define UUID_SZ (16) +#define CHECKSUM_SZ (4) /* CRC32 */ + +#define RRD_NO_COMPRESSION (0) +#define RRD_LZ4 (1) + +#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t))) +/* + * Data file persistent super-block + */ +struct rrdeng_df_sb { + char magic_number[RRDENG_MAGIC_SZ]; + char version[RRDENG_VER_SZ]; + uint8_t tier; + uint8_t padding[RRDENG_DF_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Page types + */ +#define PAGE_METRICS (0) +#define PAGE_LOGS (1) /* reserved */ + +/* + * Data file page descriptor + */ +struct rrdeng_extent_page_descr { + uint8_t type; + + uint8_t uuid[UUID_SZ]; + uint32_t page_length; + uint64_t start_time; + uint64_t end_time; +} __attribute__ ((packed)); + +/* + * Data file extent header + */ +struct rrdeng_df_extent_header { + uint32_t payload_length; + uint8_t compression_algorithm; + uint8_t number_of_pages; + /* #number_of_pages page descriptors follow */ + struct rrdeng_extent_page_descr descr[]; +} __attribute__ ((packed)); + +/* + * Data file extent trailer + */ +struct rrdeng_df_extent_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +#define RRDENG_JF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ)) +/* + * Journal file super-block + */ +struct rrdeng_jf_sb { + char magic_number[RRDENG_MAGIC_SZ]; + char version[RRDENG_VER_SZ]; + uint8_t padding[RRDENG_JF_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Transaction record types + */ +#define STORE_PADDING (0) +#define STORE_DATA (1) +#define STORE_LOGS (2) /* reserved */ + +/* + * Journal file transaction record header + */ +struct rrdeng_jf_transaction_header { + /* when set to STORE_PADDING jump to start of next block */ + uint8_t type; + + uint32_t reserved; /* reserved for future use */ + uint64_t id; + uint16_t payload_length; +} __attribute__ ((packed)); + +/* + * Journal file transaction record trailer + */ +struct rrdeng_jf_transaction_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +/* + * Journal file STORE_DATA action + */ +struct rrdeng_jf_store_data { + /* data file extent information */ + uint64_t extent_offset; + uint32_t extent_size; + + uint8_t number_of_pages; + /* #number_of_pages page descriptors follow */ + struct rrdeng_extent_page_descr descr[]; +} __attribute__ ((packed)); + +#endif /* NETDATA_RRDDISKPROTOCOL_H */
\ No newline at end of file diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c new file mode 100644 index 000000000..b8e4eba01 --- /dev/null +++ b/database/engine/rrdengine.c @@ -0,0 +1,780 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" + +void sanity_check(void) +{ + /* Magic numbers must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); + + /* Version strings must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ); + + /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */ + BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0); + + BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */ + + /* page count must fit in 8 bits */ + BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); +} + +void read_extent_cb(uv_fs_t* req) +{ + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct extent_io_descriptor *xt_io_descr; + struct rrdeng_page_cache_descr *descr; + int ret; + unsigned i, j, count; + void *page, *uncompressed_buf = NULL; + uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; + struct rrdengine_datafile *datafile; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + xt_io_descr = req->data; + if (req->result < 0) { + error("%s: uv_fs_read: %s", __func__, uv_strerror((int)req->result)); + goto cleanup; + } + + header = xt_io_descr->buf; + payload_length = header->payload_length; + count = header->number_of_pages; + + payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; + + trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); + ret = crc32cmp(trailer->checksum, crc); + datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + if (unlikely(ret)) { + /* TODO: handle errors */ + exit(UV_EIO); + goto cleanup; + } + + if (RRD_NO_COMPRESSION != header->compression_algorithm) { + uncompressed_payload_length = 0; + for (i = 0 ; i < count ; ++i) { + uncompressed_payload_length += header->descr[i].page_length; + } + uncompressed_buf = mallocz(uncompressed_payload_length); + ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf, + payload_length, uncompressed_payload_length); + ctx->stats.before_decompress_bytes += payload_length; + ctx->stats.after_decompress_bytes += ret; + debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); + /* care, we don't hold the descriptor mutex */ + } + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = mallocz(RRDENG_BLOCK_SIZE); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0; j < count; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) && + header->descr[j].page_length == descr->page_length && + header->descr[j].start_time == descr->start_time && + header->descr[j].end_time == descr->end_time) { + break; + } + page_offset += header->descr[j].page_length; + } + /* care, we don't hold the descriptor mutex */ + if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); + } else { + (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); + } + pg_cache_replaceQ_insert(ctx, descr); + uv_mutex_lock(&descr->mutex); + descr->page = page; + descr->flags |= RRD_PAGE_POPULATED; + descr->flags &= ~RRD_PAGE_READ_PENDING; + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + if (xt_io_descr->release_descr) { + pg_cache_put_unsafe(descr); + } else { + pg_cache_wake_up_waiters_unsafe(descr); + } + uv_mutex_unlock(&descr->mutex); + } + if (RRD_NO_COMPRESSION != header->compression_algorithm) { + free(uncompressed_buf); + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); +cleanup: + uv_fs_req_cleanup(req); + free(xt_io_descr->buf); + free(xt_io_descr); +} + + +static void do_read_extent(struct rrdengine_worker_config* wc, + struct rrdeng_page_cache_descr **descr, + unsigned count, + uint8_t release_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + unsigned i, size_bytes, pos, real_io_size; +// uint32_t payload_length; + struct extent_io_descriptor *xt_io_descr; + struct rrdengine_datafile *datafile; + + datafile = descr[0]->extent->datafile; + pos = descr[0]->extent->offset; + size_bytes = descr[0]->extent->size; + + xt_io_descr = mallocz(sizeof(*xt_io_descr)); + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* free(xt_io_descr); + return;*/ + } + for (i = 0 ; i < count; ++i) { + uv_mutex_lock(&descr[i]->mutex); + descr[i]->flags |= RRD_PAGE_READ_PENDING; +// payload_length = descr[i]->page_length; + uv_mutex_unlock(&descr[i]->mutex); + + xt_io_descr->descr_array[i] = descr[i]; + } + xt_io_descr->descr_count = count; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = pos; + xt_io_descr->req.data = xt_io_descr; + xt_io_descr->completion = NULL; + /* xt_io_descr->descr_commit_idx_array[0] */ + xt_io_descr->release_descr = release_descr; + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); + assert (-1 != ret); + ctx->stats.io_read_bytes += real_io_size; + ++ctx->stats.io_read_requests; + ctx->stats.io_read_extent_bytes += real_io_size; + ++ctx->stats.io_read_extents; + ctx->stats.pg_cache_backfills += count; +} + +static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + unsigned count, payload_length, descr_size, size_bytes; + void *buf; + /* persistent structures */ + struct rrdeng_df_extent_header *df_header; + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_store_data *jf_metric_data; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + df_header = xt_io_descr->buf; + count = df_header->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + + buf = wal_get_transaction_buffer(wc, size_bytes); + + jf_header = buf; + jf_header->type = STORE_DATA; + jf_header->reserved = 0; + jf_header->id = ctx->commit_log.transaction_id++; + jf_header->payload_length = payload_length; + + jf_metric_data = buf + sizeof(*jf_header); + jf_metric_data->extent_offset = xt_io_descr->pos; + jf_metric_data->extent_size = xt_io_descr->bytes; + jf_metric_data->number_of_pages = count; + memcpy(jf_metric_data->descr, df_header->descr, descr_size); + + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + crc32set(jf_trailer->checksum, crc); +} + +static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data) +{ + switch (type) { + case STORE_DATA: + commit_data_extent(wc, (struct extent_io_descriptor *)data); + break; + default: + assert(type == STORE_DATA); + break; + } +} + +void flush_pages_cb(uv_fs_t* req) +{ + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct extent_io_descriptor *xt_io_descr; + struct rrdeng_page_cache_descr *descr; + struct rrdengine_datafile *datafile; + int ret; + unsigned i, count; + Word_t commit_id; + + xt_io_descr = req->data; + if (req->result < 0) { + error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + goto cleanup; + } + datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", + __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + + count = xt_io_descr->descr_count; + for (i = 0 ; i < count ; ++i) { + /* care, we don't hold the descriptor mutex */ + descr = xt_io_descr->descr_array[i]; + + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); + commit_id = xt_io_descr->descr_commit_idx_array[i]; + ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, commit_id, PJE0); + assert(1 == ret); + --pg_cache->commited_page_index.nr_commited_pages; + uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); + + pg_cache_replaceQ_insert(ctx, descr); + + uv_mutex_lock(&descr->mutex); + descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); + /* wake up waiters, care no reference being held */ + pg_cache_wake_up_waiters_unsafe(descr); + uv_mutex_unlock(&descr->mutex); + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); +cleanup: + uv_fs_req_cleanup(req); + free(xt_io_descr->buf); + free(xt_io_descr); +} + +/* + * completion must be NULL or valid. + * Returns 0 when no flushing can take place. + * Returns datafile bytes to be written on successful flushing initiation. + */ +static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + int compressed_size, max_compressed_size = 0; + unsigned i, count, size_bytes, pos, real_io_size; + uint32_t uncompressed_payload_length, payload_offset; + struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct extent_io_descriptor *xt_io_descr; + void *compressed_buf = NULL; + Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; + Pvoid_t *PValue; + Word_t Index; + uint8_t compression_algorithm = ctx->global_compress_alg; + struct extent_info *extent; + struct rrdengine_datafile *datafile; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + if (force) { + debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); + } + uv_rwlock_rdlock(&pg_cache->commited_page_index.lock); + for (Index = 0, count = 0, uncompressed_payload_length = 0, + PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue ; + + descr != NULL && count != MAX_PAGES_PER_EXTENT ; + + PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + assert(0 != descr->page_length); + + uv_mutex_lock(&descr->mutex); + if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) { + /* care, no reference being held */ + descr->flags |= RRD_PAGE_WRITE_PENDING; + uncompressed_payload_length += descr->page_length; + descr_commit_idx_array[count] = Index; + eligible_pages[count++] = descr; + } + uv_mutex_unlock(&descr->mutex); + } + uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock); + + if (!count) { + debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); + if (completion) + complete(completion); + return 0; + } + xt_io_descr = mallocz(sizeof(*xt_io_descr)); + payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); + break; + default: /* Compress */ + assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); + max_compressed_size = LZ4_compressBound(uncompressed_payload_length); + compressed_buf = mallocz(max_compressed_size); + size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); + break; + } + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* free(xt_io_descr);*/ + } + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count); + xt_io_descr->descr_count = count; + + pos = 0; + header = xt_io_descr->buf; + header->compression_algorithm = compression_algorithm; + header->number_of_pages = count; + pos += sizeof(*header); + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */ + extent->offset = datafile->pos; + extent->number_of_pages = count; + extent->datafile = datafile; + extent->next = NULL; + + for (i = 0 ; i < count ; ++i) { + /* This is here for performance reasons */ + xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; + + descr = xt_io_descr->descr_array[i]; + header->descr[i].type = PAGE_METRICS; + uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); + header->descr[i].page_length = descr->page_length; + header->descr[i].start_time = descr->start_time; + header->descr[i].end_time = descr->end_time; + pos += sizeof(header->descr[i]); + } + for (i = 0 ; i < count ; ++i) { + descr = xt_io_descr->descr_array[i]; + /* care, we don't hold the descriptor mutex */ + (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length); + descr->extent = extent; + extent->pages[i] = descr; + + pos += descr->page_length; + } + df_extent_insert(extent); + + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + header->payload_length = uncompressed_payload_length; + break; + default: /* Compress */ + compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf, + uncompressed_payload_length, max_compressed_size); + ctx->stats.before_compress_bytes += uncompressed_payload_length; + ctx->stats.after_compress_bytes += compressed_size; + debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); + (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); + free(compressed_buf); + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + header->payload_length = compressed_size; + break; + } + extent->size = size_bytes; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = datafile->pos; + xt_io_descr->req.data = xt_io_descr; + xt_io_descr->completion = completion; + + trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer)); + crc32set(trailer->checksum, crc); + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); + assert (-1 != ret); + ctx->stats.io_write_bytes += real_io_size; + ++ctx->stats.io_write_requests; + ctx->stats.io_write_extent_bytes += real_io_size; + ++ctx->stats.io_write_extents; + do_commit_transaction(wc, STORE_DATA, xt_io_descr); + datafile->pos += ALIGN_BYTES_CEILING(size_bytes); + ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes); + rrdeng_test_quota(wc); + + return ALIGN_BYTES_CEILING(size_bytes); +} + +static void after_delete_old_data(uv_work_t *req, int status) +{ + struct rrdengine_instance *ctx = req->data; + struct rrdengine_worker_config* wc = &ctx->worker_config; + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + unsigned bytes; + + (void)status; + datafile = ctx->datafiles.first; + journalfile = datafile->journalfile; + bytes = datafile->pos + journalfile->pos; + + datafile_list_delete(ctx, datafile); + destroy_journal_file(journalfile, datafile); + destroy_data_file(datafile); + info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + datafile->tier, datafile->fileno); + free(journalfile); + free(datafile); + + ctx->disk_space -= bytes; + info("Reclaimed %u bytes of disk space.", bytes); + + /* unfreeze command processing */ + wc->now_deleting.data = NULL; + /* wake up event loop */ + assert(0 == uv_async_send(&wc->async)); +} + +static void delete_old_data(uv_work_t *req) +{ + struct rrdengine_instance *ctx = req->data; + struct rrdengine_datafile *datafile; + struct extent_info *extent, *next; + struct rrdeng_page_cache_descr *descr; + unsigned count, i; + + /* Safe to use since it will be deleted after we are done */ + datafile = ctx->datafiles.first; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next) { + count = extent->number_of_pages; + for (i = 0 ; i < count ; ++i) { + descr = extent->pages[i]; + pg_cache_punch_hole(ctx, descr); + } + next = extent->next; + free(extent); + } +} + +void rrdeng_test_quota(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct rrdengine_datafile *datafile; + unsigned current_size, target_size; + uint8_t out_of_space, only_one_datafile; + + out_of_space = 0; + if (unlikely(ctx->disk_space > ctx->max_disk_space)) { + out_of_space = 1; + } + datafile = ctx->datafiles.last; + current_size = datafile->pos; + target_size = ctx->max_disk_space / TARGET_DATAFILES; + target_size = MIN(target_size, MAX_DATAFILE_SIZE); + target_size = MAX(target_size, MIN_DATAFILE_SIZE); + only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0; + if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { + /* Finalize data and journal file and create a new pair */ + wal_flush_transaction_buffer(wc); + create_new_datafile_pair(ctx, 1, datafile->fileno + 1); + } + if (unlikely(out_of_space)) { + /* delete old data */ + if (wc->now_deleting.data) { + /* already deleting data */ + return; + } + info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + wc->now_deleting.data = ctx; + uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data); + } +} + +int init_rrd_files(struct rrdengine_instance *ctx) +{ + return init_data_files(ctx); +} + +void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) +{ + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + wc->queue_size = 0; + assert(0 == uv_cond_init(&wc->cmd_cond)); + assert(0 == uv_mutex_init(&wc->cmd_mutex)); +} + +void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) +{ + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&wc->cmd_mutex); + while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { + uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); + } + assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); + /* enqueue command */ + wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; + wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.tail + 1 : 0; + wc->queue_size = queue_size + 1; + uv_mutex_unlock(&wc->cmd_mutex); + + /* wake up event loop */ + assert(0 == uv_async_send(&wc->async)); +} + +struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) +{ + struct rrdeng_cmd ret; + unsigned queue_size; + + uv_mutex_lock(&wc->cmd_mutex); + queue_size = wc->queue_size; + if (queue_size == 0) { + ret.opcode = RRDENG_NOOP; + } else { + /* dequeue command */ + ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; + if (queue_size == 1) { + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + } else { + wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.head + 1 : 0; + } + wc->queue_size = queue_size - 1; + + /* wake up producers */ + uv_cond_signal(&wc->cmd_cond); + } + uv_mutex_unlock(&wc->cmd_mutex); + + return ret; +} + +void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); + uv_update_time(handle->loop); + debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); +} + +void timer_cb(uv_timer_t* handle) +{ + struct rrdengine_worker_config* wc = handle->data; + struct rrdengine_instance *ctx = wc->ctx; + + uv_stop(handle->loop); + uv_update_time(handle->loop); + rrdeng_test_quota(wc); + debug(D_RRDENGINE, "%s: timeout reached.", __func__); + if (likely(!wc->now_deleting.data)) { + unsigned total_bytes, bytes_written; + + /* There is free space so we can write to disk */ + debug(D_RRDENGINE, "Flushing pages to disk."); + for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ; + bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 0, NULL); + } + } +#ifdef NETDATA_INTERNAL_CHECKS + { + char buf[4096]; + debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf))); + } +#endif +} + +/* Flushes dirty pages when timer expires */ +#define TIMER_PERIOD_MS (1000) + +#define CMD_BATCH_SIZE (256) + +void rrdeng_worker(void* arg) +{ + struct rrdengine_worker_config* wc = arg; + struct rrdengine_instance *ctx = wc->ctx; + uv_loop_t* loop; + int shutdown; + enum rrdeng_opcode opcode; + uv_timer_t timer_req; + struct rrdeng_cmd cmd; + + rrdeng_init_cmd_queue(wc); + + loop = wc->loop = mallocz(sizeof(uv_loop_t)); + uv_loop_init(loop); + loop->data = wc; + + uv_async_init(wc->loop, &wc->async, async_cb); + wc->async.data = wc; + + wc->now_deleting.data = NULL; + + /* dirty page flushing timer */ + uv_timer_init(loop, &timer_req); + timer_req.data = wc; + + /* wake up initialization thread */ + complete(&ctx->rrdengine_completion); + + uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS); + shutdown = 0; + while (shutdown == 0 || uv_loop_alive(loop)) { + uv_run(loop, UV_RUN_DEFAULT); + /* wait for commands */ + do { + cmd = rrdeng_deq_cmd(wc); + opcode = cmd.opcode; + + switch (opcode) { + case RRDENG_NOOP: + /* the command queue was empty, do nothing */ + break; + case RRDENG_SHUTDOWN: + shutdown = 1; + if (unlikely(wc->now_deleting.data)) { + /* postpone shutdown until after deletion */ + info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); + rrdeng_enq_cmd(wc, &cmd); + break; + } + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour and we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + assert(0 == uv_timer_stop(&timer_req)); + uv_close((uv_handle_t *)&timer_req, NULL); + info("Shutting down RRD engine event loop."); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all commited pages. */ + } + break; + case RRDENG_READ_PAGE: + do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); + break; + case RRDENG_READ_EXTENT: + do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); + break; + case RRDENG_COMMIT_PAGE: + do_commit_transaction(wc, STORE_DATA, NULL); + break; + case RRDENG_FLUSH_PAGES: { + unsigned total_bytes, bytes_written; + + /* First I/O should be enough to call completion */ + bytes_written = do_flush_pages(wc, 1, cmd.completion); + for (total_bytes = bytes_written ; + bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 1, NULL); + } + break; + } + default: + debug(D_RRDENGINE, "%s: default.", __func__); + break; + } + } while (opcode != RRDENG_NOOP); + } + /* cleanup operations of the event loop */ + wal_flush_transaction_buffer(wc); + uv_run(loop, UV_RUN_DEFAULT); + + info("Shutting down RRD engine event loop complete."); + /* TODO: don't let the API block by waiting to enqueue commands */ + uv_cond_destroy(&wc->cmd_cond); +/* uv_mutex_destroy(&wc->cmd_mutex); */ + assert(0 == uv_loop_close(loop)); + free(loop); +} + + +#define NR_PAGES (256) +static void basic_functional_test(struct rrdengine_instance *ctx) +{ + int i, j, failed_validations; + uuid_t uuid[NR_PAGES]; + void *buf; + struct rrdeng_page_cache_descr *handle[NR_PAGES]; + char uuid_str[37]; + char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */ + + for (i = 0 ; i < NR_PAGES ; ++i) { + uuid_generate(uuid[i]); + uuid_unparse_lower(uuid[i], uuid_str); +// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str); + buf = rrdeng_create_page(&uuid[i], &handle[i]); + /* Each page contains 10 times its own UUID stringified */ + for (j = 0 ; j < 100 ; ++j) { + strcpy(buf + 37 * j, uuid_str); + strcpy(backup[i] + 37 * j, uuid_str); + } + rrdeng_commit_page(ctx, handle[i], (Word_t)i); + } + fprintf(stderr, "\n********** CREATED %d METRIC PAGES ***********\n\n", NR_PAGES); + failed_validations = 0; + for (i = 0 ; i < NR_PAGES ; ++i) { + buf = rrdeng_get_latest_page(ctx, &uuid[i], (void **)&handle[i]); + if (NULL == buf) { + ++failed_validations; + fprintf(stderr, "Page %d was LOST.\n", i); + } + if (memcmp(backup[i], buf, 37 * 100)) { + ++failed_validations; + fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i); + } + rrdeng_put_page(ctx, handle[i]); + } + fprintf(stderr, "\n********** CORRECTLY VALIDATED %d/%d METRIC PAGES ***********\n\n", + NR_PAGES - failed_validations, NR_PAGES); + +} +/* C entry point for development purposes + * make "LDFLAGS=-errdengine_main" + */ +void rrdengine_main(void) +{ + int ret; + struct rrdengine_instance *ctx; + + ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); + if (ret) { + exit(ret); + } + basic_functional_test(ctx); + + rrdeng_exit(ctx); + fprintf(stderr, "Hello world!"); + exit(0); +}
\ No newline at end of file diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h new file mode 100644 index 000000000..141bb9c63 --- /dev/null +++ b/database/engine/rrdengine.h @@ -0,0 +1,171 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINE_H +#define NETDATA_RRDENGINE_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <fcntl.h> +#include <aio.h> +#include <uv.h> +#include <assert.h> +#include <lz4.h> +#include <Judy.h> +#include <openssl/sha.h> +#include <openssl/evp.h> +#include <stdint.h> +#include "../rrd.h" +#include "rrddiskprotocol.h" +#include "rrdenginelib.h" +#include "datafile.h" +#include "journalfile.h" +#include "rrdengineapi.h" +#include "pagecache.h" + +#ifdef NETDATA_RRD_INTERNALS + +#endif /* NETDATA_RRD_INTERNALS */ + +/* Forward declerations */ +struct rrdengine_instance; + +#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */ + +#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u" +#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" + + +typedef enum { + RRDENGINE_STATUS_UNINITIALIZED = 0, + RRDENGINE_STATUS_INITIALIZING, + RRDENGINE_STATUS_INITIALIZED +} rrdengine_state_t; + +enum rrdeng_opcode { + /* can be used to return empty status or flush the command queue */ + RRDENG_NOOP = 0, + + RRDENG_READ_PAGE, + RRDENG_READ_EXTENT, + RRDENG_COMMIT_PAGE, + RRDENG_FLUSH_PAGES, + RRDENG_SHUTDOWN, + + RRDENG_MAX_OPCODE +}; + +struct rrdeng_cmd { + enum rrdeng_opcode opcode; + union { + struct rrdeng_read_page { + struct rrdeng_page_cache_descr *page_cache_descr; + } read_page; + struct rrdeng_read_extent { + struct rrdeng_page_cache_descr *page_cache_descr[MAX_PAGES_PER_EXTENT]; + int page_count; + } read_extent; + struct completion *completion; + }; +}; + +#define RRDENG_CMD_Q_MAX_SIZE (2048) + +struct rrdeng_cmdqueue { + unsigned head, tail; + struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE]; +}; + +struct extent_io_descriptor { + uv_fs_t req; + uv_buf_t iov; + void *buf; + uint64_t pos; + unsigned bytes; + struct completion *completion; + unsigned descr_count; + int release_descr; + struct rrdeng_page_cache_descr *descr_array[MAX_PAGES_PER_EXTENT]; + Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; +}; + +struct generic_io_descriptor { + uv_fs_t req; + uv_buf_t iov; + void *buf; + uint64_t pos; + unsigned bytes; + struct completion *completion; +}; + +struct rrdengine_worker_config { + struct rrdengine_instance *ctx; + + uv_thread_t thread; + uv_loop_t* loop; + uv_async_t async; + uv_work_t now_deleting; + + /* FIFO command queue */ + uv_mutex_t cmd_mutex; + uv_cond_t cmd_cond; + volatile unsigned queue_size; + struct rrdeng_cmdqueue cmd_queue; +}; + +/* + * Debug statistics not used by code logic. + * They only describe operations since DB engine instance load time. + */ +struct rrdengine_statistics { + rrdeng_stats_t metric_API_producers; + rrdeng_stats_t metric_API_consumers; + rrdeng_stats_t pg_cache_insertions; + rrdeng_stats_t pg_cache_deletions; + rrdeng_stats_t pg_cache_hits; + rrdeng_stats_t pg_cache_misses; + rrdeng_stats_t pg_cache_backfills; + rrdeng_stats_t pg_cache_evictions; + rrdeng_stats_t before_decompress_bytes; + rrdeng_stats_t after_decompress_bytes; + rrdeng_stats_t before_compress_bytes; + rrdeng_stats_t after_compress_bytes; + rrdeng_stats_t io_write_bytes; + rrdeng_stats_t io_write_requests; + rrdeng_stats_t io_read_bytes; + rrdeng_stats_t io_read_requests; + rrdeng_stats_t io_write_extent_bytes; + rrdeng_stats_t io_write_extents; + rrdeng_stats_t io_read_extent_bytes; + rrdeng_stats_t io_read_extents; + rrdeng_stats_t datafile_creations; + rrdeng_stats_t datafile_deletions; + rrdeng_stats_t journalfile_creations; + rrdeng_stats_t journalfile_deletions; +}; + +struct rrdengine_instance { + rrdengine_state_t rrdengine_state; + struct rrdengine_worker_config worker_config; + struct completion rrdengine_completion; + struct page_cache pg_cache; + uint8_t global_compress_alg; + struct transaction_commit_log commit_log; + struct rrdengine_datafile_list datafiles; + char dbfiles_path[FILENAME_MAX+1]; + uint64_t disk_space; + uint64_t max_disk_space; + unsigned long max_cache_pages; + unsigned long cache_pages_low_watermark; + + struct rrdengine_statistics stats; +}; + +extern void sanity_check(void); +extern int init_rrd_files(struct rrdengine_instance *ctx); +extern void rrdeng_test_quota(struct rrdengine_worker_config* wc); +extern void rrdeng_worker(void* arg); +extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd); +extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc); + +#endif /* NETDATA_RRDENGINE_H */
\ No newline at end of file diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c new file mode 100644 index 000000000..a4e711554 --- /dev/null +++ b/database/engine/rrdengineapi.c @@ -0,0 +1,484 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +/* Default global database instance */ +static struct rrdengine_instance default_global_ctx; + +int default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; +int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB; + +/* + * Gets a handle for storing metrics to the database. + * The handle must be released with rrdeng_store_metric_final(). + */ +void rrdeng_store_metric_init(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct page_cache *pg_cache; + struct rrdengine_instance *ctx; + uuid_t temp_id; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + EVP_MD_CTX *evpctx; + unsigned char hash_value[EVP_MAX_MD_SIZE]; + unsigned int hash_len; + + //&default_global_ctx; TODO: test this use case or remove it? + + ctx = rd->rrdset->rrdhost->rrdeng_ctx; + pg_cache = &ctx->pg_cache; + handle = &rd->state->handle.rrdeng; + handle->ctx = ctx; + + evpctx = EVP_MD_CTX_create(); + EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); + EVP_DigestUpdate(evpctx, rd->id, strlen(rd->id)); + EVP_DigestUpdate(evpctx, rd->rrdset->id, strlen(rd->rrdset->id)); + EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); + EVP_MD_CTX_destroy(evpctx); + assert(hash_len > sizeof(temp_id)); + memcpy(&temp_id, hash_value, sizeof(temp_id)); + + handle->descr = NULL; + handle->prev_descr = NULL; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + /* First time we see the UUID */ + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0); + assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(&temp_id); + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + rd->state->rrdeng_uuid = &page_index->id; + handle->page_index = page_index; +} + +void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct page_cache *pg_cache; + struct rrdeng_page_cache_descr *descr; + storage_number *page; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + pg_cache = &ctx->pg_cache; + descr = handle->descr; + if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) { + if (descr) { + descr->handle = NULL; + if (descr->page_length) { +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); +#endif + /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ + ++descr->refcnt; + rrdeng_commit_page(ctx, descr, handle->page_correlation_id); + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(handle->prev_descr); + } + handle->prev_descr = descr; + } else { + free(descr->page); + free(descr); + handle->descr = NULL; + } + } + page = rrdeng_create_page(&handle->page_index->id, &descr); + assert(page); + handle->prev_descr = handle->descr; + handle->descr = descr; + descr->handle = handle; + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); + handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++; + uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); + } + page = descr->page; + + page[descr->page_length / sizeof(number)] = number; + descr->end_time = point_in_time; + descr->page_length += sizeof(number); + if (unlikely(INVALID_TIME == descr->start_time)) { + descr->start_time = point_in_time; + +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, 1); +#endif + pg_cache_insert(ctx, handle->page_index, descr); + } else { + pg_cache_add_new_metric_time(handle->page_index, descr); + } +} + +/* + * Releases the database reference from the handle for storing metrics. + */ +void rrdeng_store_metric_finalize(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_cache_descr *descr; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + descr = handle->descr; + if (descr) { + descr->handle = NULL; + if (descr->page_length) { +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); +#endif + rrdeng_commit_page(ctx, descr, handle->page_correlation_id); + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(handle->prev_descr); + } + } else { + free(descr->page); + free(descr); + } + } +} + +/* + * Gets a handle for loading metrics from the database. + * The handle must be released with rrdeng_load_metric_final(). + */ +void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time) +{ + struct rrdeng_query_handle *handle; + struct rrdengine_instance *ctx; + + ctx = rd->rrdset->rrdhost->rrdeng_ctx; + rrdimm_handle->start_time = start_time; + rrdimm_handle->end_time = end_time; + handle = &rrdimm_handle->rrdeng; + handle->now = start_time; + handle->dt = rd->rrdset->update_every; + handle->ctx = ctx; + handle->descr = NULL; + handle->page_index = pg_cache_preload(ctx, rd->state->rrdeng_uuid, + start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); +} + +storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) +{ + struct rrdeng_query_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_cache_descr *descr; + storage_number *page, ret; + unsigned position; + usec_t point_in_time; + + handle = &rrdimm_handle->rrdeng; + if (unlikely(INVALID_TIME == handle->now)) { + return SN_EMPTY_SLOT; + } + ctx = handle->ctx; + point_in_time = handle->now * USEC_PER_SEC; + descr = handle->descr; + + if (unlikely(NULL == handle->page_index)) { + ret = SN_EMPTY_SLOT; + goto out; + } + if (unlikely(NULL == descr || + point_in_time < descr->start_time || + point_in_time > descr->end_time)) { + if (descr) { +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); +#endif + pg_cache_put(descr); + handle->descr = NULL; + } + descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time); + if (NULL == descr) { + ret = SN_EMPTY_SLOT; + goto out; + } +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); +#endif + handle->descr = descr; + } + if (unlikely(INVALID_TIME == descr->start_time || + INVALID_TIME == descr->end_time)) { + ret = SN_EMPTY_SLOT; + goto out; + } + page = descr->page; + if (unlikely(descr->start_time == descr->end_time)) { + ret = page[0]; + goto out; + } + position = ((uint64_t)(point_in_time - descr->start_time)) * (descr->page_length / sizeof(storage_number)) / + (descr->end_time - descr->start_time + 1); + ret = page[position]; + +out: + handle->now += handle->dt; + if (unlikely(handle->now > rrdimm_handle->end_time)) { + handle->now = INVALID_TIME; + } + return ret; +} + +int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) +{ + struct rrdeng_query_handle *handle; + + handle = &rrdimm_handle->rrdeng; + return (INVALID_TIME == handle->now); +} + +/* + * Releases the database reference from the handle for loading metrics. + */ +void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) +{ + struct rrdeng_query_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_cache_descr *descr; + + handle = &rrdimm_handle->rrdeng; + ctx = handle->ctx; + descr = handle->descr; + if (descr) { +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); +#endif + pg_cache_put(descr); + } +} + +time_t rrdeng_metric_latest_time(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct pg_cache_page_index *page_index; + + handle = &rd->state->handle.rrdeng; + page_index = handle->page_index; + + return page_index->latest_time / USEC_PER_SEC; +} +time_t rrdeng_metric_oldest_time(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct pg_cache_page_index *page_index; + + handle = &rd->state->handle.rrdeng; + page_index = handle->page_index; + + return page_index->oldest_time / USEC_PER_SEC; +} + +/* Also gets a reference for the page */ +void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr) +{ + struct rrdeng_page_cache_descr *descr; + void *page; + int ret; + + /* TODO: check maximum number of pages in page cache limit */ + + page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ + descr = pg_cache_create_descr(); + descr->page = page; + descr->id = id; /* TODO: add page type: metric, log, something? */ + descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; + descr->refcnt = 1; + + debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------"); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + *ret_descr = descr; + return page; +} + +/* The page must not be empty */ +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr, + Word_t page_correlation_id) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + + if (unlikely(NULL == descr)) { + debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-commited.", __func__); + return; + } + assert(descr->page_length); + + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); + PValue = JudyLIns(&pg_cache->commited_page_index.JudyL_array, page_correlation_id, PJE0); + *PValue = descr; + ++pg_cache->commited_page_index.nr_commited_pages; + uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); + + pg_cache_put(descr); +} + +/* Gets a reference for the page */ +void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle) +{ + struct rrdeng_page_cache_descr *descr; + + debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); + descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME); + if (NULL == descr) { + *handle = NULL; + + return NULL; + } + *handle = descr; + + return descr->page; +} + +/* Gets a reference for the page */ +void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle) +{ + struct rrdeng_page_cache_descr *descr; + + debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); + descr = pg_cache_lookup(ctx, NULL, id, point_in_time); + if (NULL == descr) { + *handle = NULL; + + return NULL; + } + *handle = descr; + + return descr->page; +} + +void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + array[0] = (uint64_t)ctx->stats.metric_API_producers; + array[1] = (uint64_t)ctx->stats.metric_API_consumers; + array[2] = (uint64_t)pg_cache->page_descriptors; + array[3] = (uint64_t)pg_cache->populated_pages; + array[4] = (uint64_t)pg_cache->commited_page_index.nr_commited_pages; + array[5] = (uint64_t)ctx->stats.pg_cache_insertions; + array[6] = (uint64_t)ctx->stats.pg_cache_deletions; + array[7] = (uint64_t)ctx->stats.pg_cache_hits; + array[8] = (uint64_t)ctx->stats.pg_cache_misses; + array[9] = (uint64_t)ctx->stats.pg_cache_backfills; + array[10] = (uint64_t)ctx->stats.pg_cache_evictions; + array[11] = (uint64_t)ctx->stats.before_compress_bytes; + array[12] = (uint64_t)ctx->stats.after_compress_bytes; + array[13] = (uint64_t)ctx->stats.before_decompress_bytes; + array[14] = (uint64_t)ctx->stats.after_decompress_bytes; + array[15] = (uint64_t)ctx->stats.io_write_bytes; + array[16] = (uint64_t)ctx->stats.io_write_requests; + array[17] = (uint64_t)ctx->stats.io_read_bytes; + array[18] = (uint64_t)ctx->stats.io_read_requests; + array[19] = (uint64_t)ctx->stats.io_write_extent_bytes; + array[20] = (uint64_t)ctx->stats.io_write_extents; + array[21] = (uint64_t)ctx->stats.io_read_extent_bytes; + array[22] = (uint64_t)ctx->stats.io_read_extents; + array[23] = (uint64_t)ctx->stats.datafile_creations; + array[24] = (uint64_t)ctx->stats.datafile_deletions; + array[25] = (uint64_t)ctx->stats.journalfile_creations; + array[26] = (uint64_t)ctx->stats.journalfile_deletions; +} + +/* Releases reference to page */ +void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) +{ + (void)ctx; + pg_cache_put((struct rrdeng_page_cache_descr *)handle); +} + +/* + * Returns 0 on success, 1 on error + */ +int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb) +{ + struct rrdengine_instance *ctx; + int error; + + sanity_check(); + if (NULL == ctxp) { + /* for testing */ + ctx = &default_global_ctx; + memset(ctx, 0, sizeof(*ctx)); + } else { + *ctxp = ctx = callocz(1, sizeof(*ctx)); + } + if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) { + return 1; + } + ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING; + ctx->global_compress_alg = RRD_LZ4; + if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) + page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; + ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE); + /* try to keep 5% of the page cache free */ + ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100; + if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB) + disk_space_mb = RRDENG_MIN_DISK_SPACE_MB; + ctx->max_disk_space = disk_space_mb * 1048576LLU; + strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1); + ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0'; + + memset(&ctx->worker_config, 0, sizeof(ctx->worker_config)); + ctx->worker_config.ctx = ctx; + init_page_cache(ctx); + init_commit_log(ctx); + error = init_rrd_files(ctx); + if (error) { + ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED; + if (ctx != &default_global_ctx) { + freez(ctx); + } + return 1; + } + + init_completion(&ctx->rrdengine_completion); + assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config)); + /* wait for worker thread to initialize */ + wait_for_completion(&ctx->rrdengine_completion); + destroy_completion(&ctx->rrdengine_completion); + + ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED; + return 0; +} + +/* + * Returns 0 on success, 1 on error + */ +int rrdeng_exit(struct rrdengine_instance *ctx) +{ + struct rrdeng_cmd cmd; + + if (NULL == ctx) { + /* TODO: move to per host basis */ + ctx = &default_global_ctx; + } + if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) { + return 1; + } + + /* TODO: add page to page cache */ + cmd.opcode = RRDENG_SHUTDOWN; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + assert(0 == uv_thread_join(&ctx->worker_config.thread)); + + if (ctx != &default_global_ctx) { + freez(ctx); + } + return 0; +}
\ No newline at end of file diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h new file mode 100644 index 000000000..e76629a4b --- /dev/null +++ b/database/engine/rrdengineapi.h @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINEAPI_H +#define NETDATA_RRDENGINEAPI_H + +#include "rrdengine.h" + +#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (32) +#define RRDENG_MIN_DISK_SPACE_MB (256) +extern int default_rrdeng_page_cache_mb; +extern int default_rrdeng_disk_quota_mb; + +extern void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr); +extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr, + Word_t page_correlation_id); +extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle); +extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle); +extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); +extern void rrdeng_store_metric_init(RRDDIM *rd); +extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); +extern void rrdeng_store_metric_finalize(RRDDIM *rd); +extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, + time_t start_time, time_t end_time); +extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle); +extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle); +extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); +extern time_t rrdeng_metric_latest_time(RRDDIM *rd); +extern time_t rrdeng_metric_oldest_time(RRDDIM *rd); +extern void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array); + +/* must call once before using anything */ +extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb); + +extern int rrdeng_exit(struct rrdengine_instance *ctx); + +#endif /* NETDATA_RRDENGINEAPI_H */
\ No newline at end of file diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c new file mode 100644 index 000000000..25f57ba1b --- /dev/null +++ b/database/engine/rrdenginelib.c @@ -0,0 +1,116 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr) +{ + char uuid_str[37]; + char str[512]; + int pos = 0; + + uuid_unparse_lower(*page_cache_descr->id, uuid_str); + pos += snprintfz(str, 512 - pos, "page(%p) id=%s\n" + "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", + page_cache_descr->page, uuid_str, + page_cache_descr->page_length, + (uint64_t)page_cache_descr->start_time, + (uint64_t)page_cache_descr->end_time); + if (!page_cache_descr->extent) { + pos += snprintfz(str + pos, 512 - pos, "N/A"); + } else { + pos += snprintfz(str + pos, 512 - pos, "%"PRIu64, page_cache_descr->extent->offset); + } + snprintfz(str + pos, 512 - pos, " flags:0x%2.2lX refcnt:%u\n\n", page_cache_descr->flags, page_cache_descr->refcnt); + fputs(str, stderr); +} + +int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size) +{ + int ret; + uv_fs_t req; + uv_stat_t* s; + + ret = uv_fs_fstat(NULL, &req, file, NULL); + if (ret < 0) { + fatal("uv_fs_fstat: %s\n", uv_strerror(ret)); + } + assert(req.result == 0); + s = req.ptr; + if (!(s->st_mode & S_IFREG)) { + error("Not a regular file.\n"); + uv_fs_req_cleanup(&req); + return UV_EINVAL; + } + if (s->st_size < min_size) { + error("File length is too short.\n"); + uv_fs_req_cleanup(&req); + return UV_EINVAL; + } + *file_size = s->st_size; + uv_fs_req_cleanup(&req); + + return 0; +} + +char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size) +{ + struct page_cache *pg_cache; + + pg_cache = &ctx->pg_cache; + snprintfz(str, size, + "metric_API_producers: %ld\n" + "metric_API_consumers: %ld\n" + "page_cache_total_pages: %ld\n" + "page_cache_populated_pages: %ld\n" + "page_cache_commited_pages: %ld\n" + "page_cache_insertions: %ld\n" + "page_cache_deletions: %ld\n" + "page_cache_hits: %ld\n" + "page_cache_misses: %ld\n" + "page_cache_backfills: %ld\n" + "page_cache_evictions: %ld\n" + "compress_before_bytes: %ld\n" + "compress_after_bytes: %ld\n" + "decompress_before_bytes: %ld\n" + "decompress_after_bytes: %ld\n" + "io_write_bytes: %ld\n" + "io_write_requests: %ld\n" + "io_read_bytes: %ld\n" + "io_read_requests: %ld\n" + "io_write_extent_bytes: %ld\n" + "io_write_extents: %ld\n" + "io_read_extent_bytes: %ld\n" + "io_read_extents: %ld\n" + "datafile_creations: %ld\n" + "datafile_deletions: %ld\n" + "journalfile_creations: %ld\n" + "journalfile_deletions: %ld\n", + (long)ctx->stats.metric_API_producers, + (long)ctx->stats.metric_API_consumers, + (long)pg_cache->page_descriptors, + (long)pg_cache->populated_pages, + (long)pg_cache->commited_page_index.nr_commited_pages, + (long)ctx->stats.pg_cache_insertions, + (long)ctx->stats.pg_cache_deletions, + (long)ctx->stats.pg_cache_hits, + (long)ctx->stats.pg_cache_misses, + (long)ctx->stats.pg_cache_backfills, + (long)ctx->stats.pg_cache_evictions, + (long)ctx->stats.before_compress_bytes, + (long)ctx->stats.after_compress_bytes, + (long)ctx->stats.before_decompress_bytes, + (long)ctx->stats.after_decompress_bytes, + (long)ctx->stats.io_write_bytes, + (long)ctx->stats.io_write_requests, + (long)ctx->stats.io_read_bytes, + (long)ctx->stats.io_read_requests, + (long)ctx->stats.io_write_extent_bytes, + (long)ctx->stats.io_write_extents, + (long)ctx->stats.io_read_extent_bytes, + (long)ctx->stats.io_read_extents, + (long)ctx->stats.datafile_creations, + (long)ctx->stats.datafile_deletions, + (long)ctx->stats.journalfile_creations, + (long)ctx->stats.journalfile_deletions + ); + return str; +} diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h new file mode 100644 index 000000000..bb6f072bf --- /dev/null +++ b/database/engine/rrdenginelib.h @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINELIB_H +#define NETDATA_RRDENGINELIB_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdeng_page_cache_descr; + +#define STR_HELPER(x) #x +#define STR(x) STR_HELPER(x) + +/* Taken from linux kernel */ +#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)])) + +#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) +#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) + +typedef uintptr_t rrdeng_stats_t; + +#ifdef __ATOMIC_RELAXED +#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0) +#else +#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0) +#endif + +#ifndef O_DIRECT +/* Workaround for OS X */ +#define O_DIRECT (0) +#endif + +struct completion { + uv_mutex_t mutex; + uv_cond_t cond; + volatile unsigned completed; +}; + +static inline void init_completion(struct completion *p) +{ + p->completed = 0; + assert(0 == uv_cond_init(&p->cond)); + assert(0 == uv_mutex_init(&p->mutex)); +} + +static inline void destroy_completion(struct completion *p) +{ + uv_cond_destroy(&p->cond); + uv_mutex_destroy(&p->mutex); +} + +static inline void wait_for_completion(struct completion *p) +{ + uv_mutex_lock(&p->mutex); + while (0 == p->completed) { + uv_cond_wait(&p->cond, &p->mutex); + } + assert(1 == p->completed); + uv_mutex_unlock(&p->mutex); +} + +static inline void complete(struct completion *p) +{ + uv_mutex_lock(&p->mutex); + p->completed = 1; + uv_mutex_unlock(&p->mutex); + uv_cond_broadcast(&p->cond); +} + +static inline int crc32cmp(void *crcp, uLong crc) +{ + return (*(uint32_t *)crcp != crc); +} + +static inline void crc32set(void *crcp, uLong crc) +{ + *(uint32_t *)crcp = crc; +} + +extern void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr); +extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); +extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size); + +#endif /* NETDATA_RRDENGINELIB_H */
\ No newline at end of file |