summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/Makefile.am8
-rw-r--r--database/engine/README.md109
-rw-r--r--database/engine/datafile.c335
-rw-r--r--database/engine/datafile.h63
-rw-r--r--database/engine/journalfile.c462
-rw-r--r--database/engine/journalfile.h46
-rw-r--r--database/engine/pagecache.c792
-rw-r--r--database/engine/pagecache.h132
-rw-r--r--database/engine/rrddiskprotocol.h119
-rw-r--r--database/engine/rrdengine.c780
-rw-r--r--database/engine/rrdengine.h171
-rw-r--r--database/engine/rrdengineapi.c484
-rw-r--r--database/engine/rrdengineapi.h37
-rw-r--r--database/engine/rrdenginelib.c116
-rw-r--r--database/engine/rrdenginelib.h84
15 files changed, 3738 insertions, 0 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
new file mode 100644
index 000000000..19554bed8
--- /dev/null
+++ b/database/engine/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/README.md b/database/engine/README.md
new file mode 100644
index 000000000..28a2528cb
--- /dev/null
+++ b/database/engine/README.md
@@ -0,0 +1,109 @@
+# Database engine
+
+The Database Engine works like a traditional
+database. There is some amount of RAM dedicated to data caching and indexing and the rest of
+the data reside compressed on disk. The number of history entries is not fixed in this case,
+but depends on the configured disk space and the effective compression ratio of the data stored.
+
+## Files
+
+With the DB engine memory mode the metric data are stored in database files. These files are
+organized in pairs, the datafiles and their corresponding journalfiles, e.g.:
+
+```
+datafile-1-0000000001.ndf
+journalfile-1-0000000001.njf
+datafile-1-0000000002.ndf
+journalfile-1-0000000002.njf
+datafile-1-0000000003.ndf
+journalfile-1-0000000003.njf
+...
+```
+
+They are located under their host's cache directory in the directory `./dbengine`
+(e.g. for localhost the default location is `/var/cache/netdata/dbengine/*`). The higher
+numbered filenames contain more recent metric data. The user can safely delete some pairs
+of files when netdata is stopped to manually free up some space.
+
+*Users should* **back up** *their `./dbengine` folders if they consider this data to be important.*
+
+## Configuration
+
+There is one DB engine instance per netdata host/node. That is, there is one `./dbengine` folder
+per node, and all charts of `dbengine` memory mode in such a host share the same storage space
+and DB engine instance memory state. You can select the memory mode for localhost by editing
+netdata.conf and setting:
+
+```
+[global]
+ memory mode = dbengine
+```
+
+For setting the memory mode for the rest of the nodes you should look at
+[streaming](../../streaming/).
+
+The `history` configuration option is meaningless for `memory mode = dbengine` and is ignored
+for any metrics being stored in the DB engine.
+
+All DB engine instances, for localhost and all other streaming recipient nodes inherit their
+configuration from `netdata.conf`:
+
+```
+[global]
+ page cache size = 32
+ dbengine disk space = 256
+```
+
+The above values are the default and minimum values for Page Cache size and DB engine disk space
+quota. Both numbers are in **MiB**. All DB engine instances will allocate the configured resources
+separately.
+
+The `page cache size` option determines the amount of RAM in **MiB** that is dedicated to caching
+netdata metric values themselves.
+
+The `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated
+to storing netdata metric values and all related metadata describing them.
+
+## Operation
+
+The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets
+its own page to store consecutive values generated from the data collectors. Those pages comprise
+the **Page Cache**.
+
+When those pages fill up they are slowly compressed and flushed to disk.
+It can take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected
+every 1 second, to fill a page. Pages can be cut short when we stop netdata or the DB engine
+instance so as to not lose the data. When we query the DB engine for data we trigger disk read
+I/O requests that fill the Page Cache with the requested pages and potentially evict cold
+(not recently used) pages.
+
+When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by
+automatically deleting the oldest datafile and journalfile pair. Any corresponding pages residing
+in the Page Cache will also be invalidated and removed. The DB engine logic will try to maintain
+between 10 and 20 file pairs at any point in time.
+
+The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not
+generate excessive I/O traffic so as to create the minimum possible interference with other
+applications.
+
+## Memory requirements
+
+Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that
+is much larger than the available memory.
+
+There are explicit memory requirements **per** DB engine **instance**, meaning **per** netdata
+**node** (e.g. localhost and streaming recipient nodes):
+
+- `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes.
+
+- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata.
+
+ - roughly speaking this is 6% of the uncompressed disk space taken by the DB files.
+
+ - for very highly compressible data (compression ratio > 90%) this RAM overhead
+ is comparable to the disk space footprint.
+
+An important observation is that RAM usage depends on both the `page cache size` and the
+`dbengine disk space` options.
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
new file mode 100644
index 000000000..2d17d05e4
--- /dev/null
+++ b/database/engine/datafile.c
@@ -0,0 +1,335 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void df_extent_insert(struct extent_info *extent)
+{
+ struct rrdengine_datafile *datafile = extent->datafile;
+
+ if (likely(NULL != datafile->extents.last)) {
+ datafile->extents.last->next = extent;
+ }
+ if (unlikely(NULL == datafile->extents.first)) {
+ datafile->extents.first = extent;
+ }
+ datafile->extents.last = extent;
+}
+
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ if (likely(NULL != ctx->datafiles.last)) {
+ ctx->datafiles.last->next = datafile;
+ }
+ if (unlikely(NULL == ctx->datafiles.first)) {
+ ctx->datafiles.first = datafile;
+ }
+ ctx->datafiles.last = datafile;
+}
+
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_datafile *next;
+
+ next = datafile->next;
+ assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
+ ctx->datafiles.first = next;
+}
+
+
+static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
+ unsigned tier, unsigned fileno)
+{
+ assert(tier == 1);
+ datafile->tier = tier;
+ datafile->fileno = fileno;
+ datafile->file = (uv_file)0;
+ datafile->pos = 0;
+ datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
+ datafile->journalfile = NULL;
+ datafile->next = NULL;
+ datafile->ctx = ctx;
+}
+
+static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+int destroy_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret, fd;
+ char path[1024];
+
+ ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_close: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_unlink(NULL, &req, path, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return 0;
+}
+
+int create_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
+ superblock->tier = 1;
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_write: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+ fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+
+ datafile->file = file;
+ datafile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+ ++ctx->stats.datafile_creations;
+
+ return 0;
+}
+
+static int check_data_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
+ superblock->tier != 1) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static int load_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ uint64_t file_size;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ /* if (UV_ENOENT != fd) */
+ error("uv_fs_fsopen: %s", uv_strerror(fd));
+ uv_fs_req_cleanup(&req);
+ return fd;
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ info("Initializing data file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_CEILING(file_size);
+
+ ret = check_data_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
+ ++ctx->stats.io_read_requests;
+
+ datafile->file = file;
+ datafile->pos = file_size;
+
+ info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ (void) uv_fs_close(NULL, &req, file, NULL);
+ uv_fs_req_cleanup(&req);
+ return ret;
+}
+
+static int scan_data_files_cmp(const void *a, const void *b)
+{
+ struct rrdengine_datafile *file1, *file2;
+ char path1[1024], path2[1024];
+
+ file1 = *(struct rrdengine_datafile **)a;
+ file2 = *(struct rrdengine_datafile **)b;
+ generate_datafilepath(file1, path1, sizeof(path1));
+ generate_datafilepath(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of datafiles that were loaded */
+static int scan_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+ unsigned tier, no, matched_files, i,failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct rrdengine_datafile **datafiles, *datafile;
+ struct rrdengine_journalfile *journalfile;
+
+ ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
+ assert(ret >= 0);
+ assert(req.result >= 0);
+ info("Found %d files in path %s", ret, ctx->dbfiles_path);
+
+ datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s\"", dent.name);
+ ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
+ if (2 == ret) {
+ info("Matched file \"%s\"", dent.name);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, no);
+ datafiles[matched_files++] = datafile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ datafile = datafiles[i];
+ ret = load_data_file(datafile);
+ if (0 != ret) {
+ free(datafile);
+ ++failed_to_load;
+ continue;
+ }
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = load_journal_file(ctx, journalfile, datafile);
+ if (0 != ret) {
+ free(datafile);
+ free(journalfile);
+ ++failed_to_load;
+ continue;
+ }
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+ }
+ if (failed_to_load) {
+ error("%u files failed to load.", failed_to_load);
+ }
+ free(datafiles);
+
+ return matched_files - failed_to_load;
+}
+
+/* Creates a datafile and a journalfile pair */
+void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+{
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ int ret;
+
+ info("Creating new data and journal files.");
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, fileno);
+ ret = create_data_file(datafile);
+ assert(!ret);
+
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = create_journal_file(journalfile, datafile);
+ assert(!ret);
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+}
+
+/* Page cache must already be initialized. */
+int init_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+
+ ret = scan_data_files(ctx);
+ if (0 == ret) {
+ info("Data files not found, creating.");
+ create_new_datafile_pair(ctx, 1, 1);
+ }
+ return 0;
+} \ No newline at end of file
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
new file mode 100644
index 000000000..c5c8f31f3
--- /dev/null
+++ b/database/engine/datafile.h
@@ -0,0 +1,63 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_DATAFILE_H
+#define NETDATA_DATAFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+struct rrdengine_instance;
+
+#define DATAFILE_PREFIX "datafile-"
+#define DATAFILE_EXTENSION ".ndf"
+
+#define MAX_DATAFILE_SIZE (1073741824LU)
+#define MIN_DATAFILE_SIZE (16777216LU)
+#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
+#define TARGET_DATAFILES (20)
+
+#define DATAFILE_IDEAL_IO_SIZE (1048576U)
+
+struct extent_info {
+ uint64_t offset;
+ uint32_t size;
+ uint8_t number_of_pages;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *next;
+ struct rrdeng_page_cache_descr *pages[];
+};
+
+struct rrdengine_df_extents {
+ /* the extent list is sorted based on disk offset */
+ struct extent_info *first;
+ struct extent_info *last;
+};
+
+/* only one event loop is supported for now */
+struct rrdengine_datafile {
+ unsigned tier;
+ unsigned fileno;
+ uv_file file;
+ uint64_t pos;
+ struct rrdengine_instance *ctx;
+ struct rrdengine_df_extents extents;
+ struct rrdengine_journalfile *journalfile;
+ struct rrdengine_datafile *next;
+};
+
+struct rrdengine_datafile_list {
+ struct rrdengine_datafile *first; /* oldest */
+ struct rrdengine_datafile *last; /* newest */
+};
+
+extern void df_extent_insert(struct extent_info *extent);
+extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern int destroy_data_file(struct rrdengine_datafile *datafile);
+extern int create_data_file(struct rrdengine_datafile *datafile);
+extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+extern int init_data_files(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
new file mode 100644
index 000000000..44d8461db
--- /dev/null
+++ b/database/engine/journalfile.c
@@ -0,0 +1,462 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+static void flush_transaction_buffer_cb(uv_fs_t* req)
+{
+ struct generic_io_descriptor *io_descr;
+
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ if (req->result < 0) {
+ fatal("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ }
+ io_descr = req->data;
+
+ uv_fs_req_cleanup(req);
+ free(io_descr->buf);
+ free(io_descr);
+}
+
+/* Careful to always call this before creating a new journal file */
+void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ struct generic_io_descriptor *io_descr;
+ unsigned pos, size;
+ struct rrdengine_journalfile *journalfile;
+
+ if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
+ return;
+ }
+ /* care with outstanding transactions when switching journal files */
+ journalfile = ctx->datafiles.last->journalfile;
+
+ io_descr = mallocz(sizeof(*io_descr));
+ pos = ctx->commit_log.buf_pos;
+ size = ctx->commit_log.buf_size;
+ if (pos < size) {
+ /* simulate an empty transaction to skip the rest of the block */
+ *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
+ }
+ io_descr->buf = ctx->commit_log.buf;
+ io_descr->bytes = size;
+ io_descr->pos = journalfile->pos;
+ io_descr->req.data = io_descr;
+ io_descr->completion = NULL;
+
+ io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
+ ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
+ journalfile->pos, flush_transaction_buffer_cb);
+ assert (-1 != ret);
+ journalfile->pos += RRDENG_BLOCK_SIZE;
+ ctx->disk_space += RRDENG_BLOCK_SIZE;
+ ctx->commit_log.buf = NULL;
+ ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
+ ++ctx->stats.io_write_requests;
+}
+
+void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ unsigned buf_pos, buf_size;
+
+ assert(size);
+ if (ctx->commit_log.buf) {
+ unsigned remaining;
+
+ buf_pos = ctx->commit_log.buf_pos;
+ buf_size = ctx->commit_log.buf_size;
+ remaining = buf_size - buf_pos;
+ if (size > remaining) {
+ /* we need a new buffer */
+ wal_flush_transaction_buffer(wc);
+ }
+ }
+ if (NULL == ctx->commit_log.buf) {
+ buf_size = ALIGN_BYTES_CEILING(size);
+ ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ buf_pos = ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.buf_size = buf_size;
+ }
+ ctx->commit_log.buf_pos += size;
+
+ return ctx->commit_log.buf + buf_pos;
+}
+
+static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ journalfile->file = (uv_file)0;
+ journalfile->pos = 0;
+ journalfile->datafile = datafile;
+}
+
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret, fd;
+ char path[1024];
+
+ ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_close: %s", uv_strerror(ret));
+ exit(ret);
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_unlink(NULL, &req, path, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return 0;
+}
+
+int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ char path[1024];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_write: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+ fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+
+ journalfile->file = file;
+ journalfile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+ ++ctx->stats.journalfile_creations;
+
+ return 0;
+}
+
+static int check_journal_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, unsigned max_size)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned i, count, payload_length, descr_size, valid_pages;
+ struct rrdeng_page_cache_descr *descr;
+ struct extent_info *extent;
+ /* persistent structures */
+ struct rrdeng_jf_store_data *jf_metric_data;
+
+ jf_metric_data = buf;
+ count = jf_metric_data->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ if (payload_length > max_size) {
+ error("Corrupted transaction payload.");
+ return;
+ }
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ extent->offset = jf_metric_data->extent_offset;
+ extent->size = jf_metric_data->extent_size;
+ extent->number_of_pages = count;
+ extent->datafile = journalfile->datafile;
+ extent->next = NULL;
+
+ for (i = 0, valid_pages = 0 ; i < count ; ++i) {
+ uuid_t *temp_id;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+
+ if (PAGE_METRICS != jf_metric_data->descr[i].type) {
+ error("Unknown page type encountered.");
+ continue;
+ }
+ ++valid_pages;
+ temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ /* First time we see the UUID */
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
+ assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(temp_id);
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+
+ descr = pg_cache_create_descr();
+ descr->page_length = jf_metric_data->descr[i].page_length;
+ descr->start_time = jf_metric_data->descr[i].start_time;
+ descr->end_time = jf_metric_data->descr[i].end_time;
+ descr->id = &page_index->id;
+ descr->extent = extent;
+ extent->pages[i] = descr;
+ pg_cache_insert(ctx, page_index, descr);
+ }
+ if (likely(valid_pages))
+ df_extent_insert(extent);
+}
+
+/*
+ * Replays transaction by interpreting up to max_size bytes from buf.
+ * Sets id to the current transaction id or to 0 if unknown.
+ * Returns size of transaction record or 0 for unknown size.
+ */
+static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, uint64_t *id, unsigned max_size)
+{
+ unsigned payload_length, size_bytes;
+ int ret;
+ /* persistent structures */
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ *id = 0;
+ jf_header = buf;
+ if (STORE_PADDING == jf_header->type) {
+ debug(D_RRDENGINE, "Skipping padding.");
+ return 0;
+ }
+ if (sizeof(*jf_header) > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ *id = jf_header->id;
+ payload_length = jf_header->payload_length;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+ if (size_bytes > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ ret = crc32cmp(jf_trailer->checksum, crc);
+ debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
+ if (unlikely(ret)) {
+ return size_bytes;
+ }
+ switch (jf_header->type) {
+ case STORE_DATA:
+ debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ break;
+ default:
+ error("Unknown transaction type. Skipping record.");
+ break;
+ }
+
+ return size_bytes;
+}
+
+
+#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
+/*
+ * Iterates journal file transactions and populates the page cache.
+ * Page cache must already be initialized.
+ * Returns the maximum transaction id it discovered.
+ */
+static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
+{
+ uv_file file;
+ uint64_t file_size;//, data_file_size;
+ int ret;
+ uint64_t pos, pos_i, max_id, id;
+ unsigned size_bytes;
+ void *buf;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ file = journalfile->file;
+ file_size = journalfile->pos;
+ //data_file_size = journalfile->datafile->pos; TODO: utilize this?
+
+ max_id = 1;
+ ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+
+ for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
+ size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
+ iov = uv_buf_init(buf, size_bytes);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_read: %s", uv_strerror(ret));
+ /*uv_fs_req_cleanup(&req);*/
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+ ctx->stats.io_read_bytes += size_bytes;
+ ++ctx->stats.io_read_requests;
+
+ //pos_i = pos;
+ //while (pos_i < pos + size_bytes) {
+ for (pos_i = 0 ; pos_i < size_bytes ; ) {
+ unsigned max_size;
+
+ max_size = pos + size_bytes - pos_i;
+ ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
+ if (!ret) /* TODO: support transactions bigger than 4K */
+ /* unknown transaction size, move on to the next block */
+ pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
+ else
+ pos_i += ret;
+ max_id = MAX(max_id, id);
+ }
+ }
+
+ free(buf);
+ return max_id;
+}
+
+int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile)
+{
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ uint64_t file_size, max_id;
+ char path[1024];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ /* if (UV_ENOENT != fd) */
+ error("uv_fs_fsopen: %s", uv_strerror(fd));
+ uv_fs_req_cleanup(&req);
+ return fd;
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ info("Loading journal file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_FLOOR(file_size);
+
+ ret = check_journal_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
+ ++ctx->stats.io_read_requests;
+
+ journalfile->file = file;
+ journalfile->pos = file_size;
+
+ max_id = iterate_transactions(ctx, journalfile);
+
+ ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
+
+ info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ (void) uv_fs_close(NULL, &req, file, NULL);
+ uv_fs_req_cleanup(&req);
+ return ret;
+}
+
+void init_commit_log(struct rrdengine_instance *ctx)
+{
+ ctx->commit_log.buf = NULL;
+ ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.transaction_id = 1;
+} \ No newline at end of file
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
new file mode 100644
index 000000000..50489aeee
--- /dev/null
+++ b/database/engine/journalfile.h
@@ -0,0 +1,46 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_JOURNALFILE_H
+#define NETDATA_JOURNALFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_instance;
+struct rrdengine_worker_config;
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+
+#define WALFILE_PREFIX "journalfile-"
+#define WALFILE_EXTENSION ".njf"
+
+
+/* only one event loop is supported for now */
+struct rrdengine_journalfile {
+ uv_file file;
+ uint64_t pos;
+
+ struct rrdengine_datafile *datafile;
+};
+
+/* only one event loop is supported for now */
+struct transaction_commit_log {
+ uint64_t transaction_id;
+
+ /* outstanding transaction buffer */
+ void *buf;
+ unsigned buf_pos;
+ unsigned buf_size;
+};
+
+extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
+extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile);
+extern void init_commit_log(struct rrdengine_instance *ctx);
+
+
+#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
new file mode 100644
index 000000000..c90947a67
--- /dev/null
+++ b/database/engine/pagecache.c
@@ -0,0 +1,792 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+/* Forward declerations */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
+
+/* always inserts into tail */
+static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ if (likely(NULL != pg_cache->replaceQ.tail)) {
+ descr->prev = pg_cache->replaceQ.tail;
+ pg_cache->replaceQ.tail->next = descr;
+ }
+ if (unlikely(NULL == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = descr;
+ }
+ pg_cache->replaceQ.tail = descr;
+}
+
+static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_cache_descr *prev, *next;
+
+ prev = descr->prev;
+ next = descr->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(descr == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = next;
+ }
+ if (unlikely(descr == pg_cache->replaceQ.tail)) {
+ pg_cache->replaceQ.tail = prev;
+ }
+ descr->prev = descr->next = NULL;
+}
+
+void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+struct rrdeng_page_cache_descr *pg_cache_create_descr(void)
+{
+ struct rrdeng_page_cache_descr *descr;
+
+ descr = mallocz(sizeof(*descr));
+ descr->page = NULL;
+ descr->page_length = 0;
+ descr->start_time = INVALID_TIME;
+ descr->end_time = INVALID_TIME;
+ descr->id = NULL;
+ descr->extent = NULL;
+ descr->flags = 0;
+ descr->prev = descr->next = descr->private = NULL;
+ descr->refcnt = 0;
+ descr->waiters = 0;
+ descr->handle = NULL;
+ assert(0 == uv_cond_init(&descr->cond));
+ assert(0 == uv_mutex_init(&descr->mutex));
+
+ return descr;
+}
+
+void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr)
+{
+ uv_cond_destroy(&descr->cond);
+ uv_mutex_destroy(&descr->mutex);
+ free(descr);
+}
+
+/* The caller must hold page descriptor lock. */
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
+{
+ if (descr->waiters)
+ uv_cond_broadcast(&descr->cond);
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
+{
+ ++descr->waiters;
+ uv_cond_wait(&descr->cond, &descr->mutex);
+ --descr->waiters;
+}
+
+/*
+ * Returns page flags.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
+{
+ unsigned long flags;
+
+ uv_mutex_lock(&descr->mutex);
+ pg_cache_wait_event_unsafe(descr);
+ flags = descr->flags;
+ uv_mutex_unlock(&descr->mutex);
+
+ return flags;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * Gets a reference to the page descriptor.
+ * Returns 1 on success and 0 on failure.
+ */
+int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+{
+ if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && descr->refcnt)) {
+ return 0;
+ }
+ if (exclusive_access)
+ descr->flags |= RRD_PAGE_LOCKED;
+ ++descr->refcnt;
+
+ return 1;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * Same return values as pg_cache_try_get_unsafe() without doing anything.
+ */
+int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+{
+ if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && descr->refcnt)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*
+ * The caller must hold the page descriptor lock.
+ * This function may block doing cleanup.
+ */
+void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr)
+{
+ descr->flags &= ~RRD_PAGE_LOCKED;
+ if (0 == --descr->refcnt) {
+ pg_cache_wake_up_waiters_unsafe(descr);
+ }
+ /* TODO: perform cleanup */
+}
+
+/*
+ * This function may block doing cleanup.
+ */
+void pg_cache_put(struct rrdeng_page_cache_descr *descr)
+{
+ uv_mutex_lock(&descr->mutex);
+ pg_cache_put_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+}
+
+/* The caller must hold the page cache lock */
+static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->populated_pages -= number;
+}
+
+static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_release_pages_unsafe(ctx, number);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+/*
+ * This function will block until it reserves #number populated pages.
+ * It will trigger evictions or dirty page flushing if the ctx->max_cache_pages limit is hit.
+ */
+static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
+ debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
+ number);
+ while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
+ if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
+ /* failed to evict */
+ struct completion compl;
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ init_completion(&compl);
+ cmd.opcode = RRDENG_FLUSH_PAGES;
+ cmd.completion = &compl;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ /* wait for some pages to be flushed */
+ debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
+ wait_for_completion(&compl);
+ destroy_completion(&compl);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ }
+ }
+ pg_cache->populated_pages += number;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * This function will attempt to reserve #number populated pages.
+ * It may trigger evictions if the ctx->cache_pages_low_watermark limit is hit.
+ * Returns 0 on failure and 1 on success.
+ */
+static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned count = 0;
+ int ret = 0;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
+ debug(D_RRDENGINE,
+ "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
+ number);
+ do {
+ if (!pg_cache_try_evict_one_page_unsafe(ctx))
+ break;
+ ++count;
+ } while (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1);
+ debug(D_RRDENGINE, "Evicted %u pages.", count);
+ }
+
+ if (pg_cache->populated_pages + number < ctx->max_cache_pages + 1) {
+ pg_cache->populated_pages += number;
+ ret = 1; /* success */
+ }
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ return ret;
+}
+
+/* The caller must hold the page cache and the page descriptor locks in that order */
+static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+{
+ free(descr->page);
+ descr->page = NULL;
+ descr->flags &= ~RRD_PAGE_POPULATED;
+ pg_cache_release_pages_unsafe(ctx, 1);
+ ++ctx->stats.pg_cache_evictions;
+}
+
+/*
+ * The caller must hold the page cache lock.
+ * Lock order: page cache -> replaceQ -> descriptor
+ * This function iterates all pages and tries to evict one.
+ * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
+ * or it sets it to NULL if no write-back is in progress.
+ *
+ * Returns 1 on success and 0 on failure.
+ */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long old_flags;
+ struct rrdeng_page_cache_descr *descr;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) {
+ uv_mutex_lock(&descr->mutex);
+ old_flags = descr->flags;
+ if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
+ /* must evict */
+ pg_cache_evict_unsafe(ctx, descr);
+ pg_cache_put_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ return 1;
+ }
+ uv_mutex_unlock(&descr->mutex);
+ };
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ /* failed to evict */
+ return 0;
+}
+
+/*
+ * TODO: last waiter frees descriptor ?
+ */
+void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ int ret;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ uv_rwlock_wrlock(&page_index->lock);
+ ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ uv_rwlock_wrunlock(&page_index->lock);
+ if (unlikely(0 == ret)) {
+ error("Page under deletion was not in index.");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ goto destroy;
+ }
+ assert(1 == ret);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_deletions;
+ --pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ uv_mutex_lock(&descr->mutex);
+ while (!pg_cache_try_get_unsafe(descr, 1)) {
+ debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* even a locked page could be dirty */
+ while (unlikely(descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ uv_mutex_unlock(&descr->mutex);
+
+ if (descr->flags & RRD_PAGE_POPULATED) {
+ /* only after locking can it be safely deleted from LRU */
+ pg_cache_replaceQ_delete(ctx, descr);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_evict_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+ }
+ pg_cache_put(descr);
+destroy:
+ pg_cache_destroy_descr(descr);
+ pg_cache_update_metric_times(page_index);
+}
+
+static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time)
+{
+ usec_t pg_start, pg_end;
+
+ pg_start = descr->start_time;
+ pg_end = descr->end_time;
+
+ return (pg_start < start_time && pg_end >= start_time) ||
+ (pg_start >= start_time && pg_start <= end_time);
+}
+
+static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time)
+{
+ return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
+}
+
+/* Update metric oldest and latest timestamps efficiently when adding new values */
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr)
+{
+ usec_t oldest_time = page_index->oldest_time;
+ usec_t latest_time = page_index->latest_time;
+
+ if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
+ page_index->oldest_time = descr->start_time;
+ }
+ if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
+ page_index->latest_time = descr->end_time;
+ }
+}
+
+/* Update metric oldest and latest timestamps when removing old values */
+void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
+{
+ Pvoid_t *firstPValue, *lastPValue;
+ Word_t firstIndex, lastIndex;
+ struct rrdeng_page_cache_descr *descr;
+ usec_t oldest_time = INVALID_TIME;
+ usec_t latest_time = INVALID_TIME;
+
+ uv_rwlock_rdlock(&page_index->lock);
+ /* Find first page in range */
+ firstIndex = (Word_t)0;
+ firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
+ if (likely(NULL != firstPValue)) {
+ descr = *firstPValue;
+ oldest_time = descr->start_time;
+ }
+ lastIndex = (Word_t)-1;
+ lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
+ if (likely(NULL != lastPValue)) {
+ descr = *lastPValue;
+ latest_time = descr->end_time;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (unlikely(NULL == firstPValue)) {
+ assert(NULL == lastPValue);
+ page_index->oldest_time = page_index->latest_time = INVALID_TIME;
+ return;
+ }
+ page_index->oldest_time = oldest_time;
+ page_index->latest_time = latest_time;
+}
+
+/* If index is NULL lookup by UUID (descr->id) */
+void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+
+ if (descr->flags & RRD_PAGE_POPULATED) {
+ pg_cache_reserve_pages(ctx, 1);
+ if (!(descr->flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_insert(ctx, descr);
+ }
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ } else {
+ page_index = index;
+ }
+
+ uv_rwlock_wrlock(&page_index->lock);
+ PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ *PValue = descr;
+ pg_cache_add_new_metric_time(page_index, descr);
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_insertions;
+ ++pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * Searches for a page and triggers disk I/O if necessary and possible.
+ * Does not get a reference.
+ * Returns page index pointer for given metric UUID.
+ */
+struct pg_cache_page_index *
+ pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ int i, j, k, count, found;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ Word_t Index;
+ uint8_t failed_to_reserve;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ return NULL;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ /* Find first page in range */
+ found = 0;
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ found = 1;
+ }
+ }
+ if (!found) {
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ found = 1;
+ }
+ }
+ }
+ if (!found) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ return page_index;
+ }
+
+ for (count = 0 ;
+ descr != NULL && is_page_in_time_range(descr, start_time, end_time);
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ /* Iterate all pages in range */
+
+ if (unlikely(0 == descr->page_length))
+ continue;
+ uv_mutex_lock(&descr->mutex);
+ flags = descr->flags;
+ if (pg_cache_can_get_unsafe(descr, 0)) {
+ if (flags & RRD_PAGE_POPULATED) {
+ /* success */
+ uv_mutex_unlock(&descr->mutex);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ continue;
+ }
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ preload_array[count++] = descr;
+ if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
+ uv_mutex_unlock(&descr->mutex);
+ break;
+ }
+ }
+ uv_mutex_unlock(&descr->mutex);
+
+ };
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ failed_to_reserve = 0;
+ for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
+ struct rrdeng_cmd cmd;
+ struct rrdeng_page_cache_descr *next;
+
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.opcode = RRDENG_READ_EXTENT;
+ cmd.read_extent.page_cache_descr[0] = descr;
+ /* don't use this page again */
+ preload_array[i] = NULL;
+ for (j = 0, k = 1 ; j < count ; ++j) {
+ next = preload_array[j];
+ if (NULL == next) {
+ continue;
+ }
+ if (descr->extent == next->extent) {
+ /* same extent, consolidate */
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.read_extent.page_cache_descr[k++] = next;
+ /* don't use this page again */
+ preload_array[j] = NULL;
+ }
+ }
+ cmd.read_extent.page_count = k;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ }
+ if (failed_to_reserve) {
+ debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
+ for (i = 0 ; i < count ; ++i) {
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ pg_cache_put(descr);
+ }
+ }
+ if (!count) {
+ /* no such page */
+ debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
+ }
+ return page_index;
+}
+
+/*
+ * Searches for a page and gets a reference.
+ * When point_in_time is INVALID_TIME get any page.
+ * If index is NULL lookup by UUID (id).
+ */
+struct rrdeng_page_cache_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_cache_descr *descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ Word_t Index;
+ uint8_t page_not_in_cache;
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return NULL;
+ }
+ } else {
+ page_index = index;
+ }
+ pg_cache_reserve_pages(ctx, 1);
+
+ page_not_in_cache = 0;
+ uv_rwlock_rdlock(&page_index->lock);
+ while (1) {
+ Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ if (NULL == PValue ||
+ 0 == descr->page_length ||
+ (INVALID_TIME != point_in_time &&
+ !is_point_in_time_in_page(descr, point_in_time))) {
+ /* non-empty page not found */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ uv_mutex_lock(&descr->mutex);
+ flags = descr->flags;
+ if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
+ /* success */
+ uv_mutex_unlock(&descr->mutex);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ break;
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ cmd.opcode = RRDENG_READ_PAGE;
+ cmd.read_page.page_cache_descr = descr;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ while (!(descr->flags & RRD_PAGE_POPULATED)) {
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* success */
+ /* Downgrade exclusive reference to allow other readers */
+ descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_wake_up_waiters_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ return descr;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ if (!(flags & RRD_PAGE_POPULATED))
+ page_not_in_cache = 1;
+ pg_cache_wait_event_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+
+ /* reset scan to find again */
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (!(flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_set_hot(ctx, descr);
+ pg_cache_release_pages(ctx, 1);
+ if (page_not_in_cache)
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ else
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
+ return descr;
+}
+
+struct pg_cache_page_index *create_page_index(uuid_t *id)
+{
+ struct pg_cache_page_index *page_index;
+
+ page_index = mallocz(sizeof(*page_index));
+ page_index->JudyL_array = (Pvoid_t) NULL;
+ uuid_copy(page_index->id, *id);
+ assert(0 == uv_rwlock_init(&page_index->lock));
+ page_index->oldest_time = INVALID_TIME;
+ page_index->latest_time = INVALID_TIME;
+
+ return page_index;
+}
+
+static void init_metrics_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
+}
+
+static void init_replaceQ(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->replaceQ.head = NULL;
+ pg_cache->replaceQ.tail = NULL;
+ assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
+}
+
+static void init_commited_page_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->commited_page_index.JudyL_array = (Pvoid_t) NULL;
+ assert(0 == uv_rwlock_init(&pg_cache->commited_page_index.lock));
+ pg_cache->commited_page_index.latest_corr_id = 0;
+ pg_cache->commited_page_index.nr_commited_pages = 0;
+}
+
+void init_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->page_descriptors = 0;
+ pg_cache->populated_pages = 0;
+ assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
+
+ init_metrics_index(ctx);
+ init_replaceQ(ctx);
+ init_commited_page_index(ctx);
+} \ No newline at end of file
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
new file mode 100644
index 000000000..d1e29aaab
--- /dev/null
+++ b/database/engine/pagecache.h
@@ -0,0 +1,132 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PAGECACHE_H
+#define NETDATA_PAGECACHE_H
+
+#include "rrdengine.h"
+
+/* Forward declerations */
+struct rrdengine_instance;
+struct extent_info;
+
+#define INVALID_TIME (0)
+
+/* Page flags */
+#define RRD_PAGE_DIRTY (1LU << 0)
+#define RRD_PAGE_LOCKED (1LU << 1)
+#define RRD_PAGE_READ_PENDING (1LU << 2)
+#define RRD_PAGE_WRITE_PENDING (1LU << 3)
+#define RRD_PAGE_POPULATED (1LU << 4)
+
+struct rrdeng_page_cache_descr {
+ void *page;
+ uint32_t page_length;
+ usec_t start_time;
+ usec_t end_time;
+ uuid_t *id; /* never changes */
+ struct extent_info *extent;
+ unsigned long flags;
+ void *private;
+ struct rrdeng_page_cache_descr *prev;
+ struct rrdeng_page_cache_descr *next;
+
+ /* TODO: move waiter logic to concurrency table */
+ unsigned refcnt;
+ uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */
+ uv_cond_t cond;
+ unsigned waiters;
+ struct rrdeng_collect_handle *handle; /* API user */
+};
+
+#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
+
+/* maps time ranges to pages */
+struct pg_cache_page_index {
+ uuid_t id;
+ /*
+ * care: JudyL_array indices are converted from useconds to seconds to fit in one word in 32-bit architectures
+ * TODO: examine if we want to support better granularity than seconds
+ */
+ Pvoid_t JudyL_array;
+ uv_rwlock_t lock;
+
+ /*
+ * Only one effective writer, data deletion workqueue.
+ * It's also written during the DB loading phase.
+ */
+ usec_t oldest_time;
+
+ /*
+ * Only one effective writer, data collection thread.
+ * It's also written by the data deletion workqueue when data collection is disabled for this metric.
+ */
+ usec_t latest_time;
+};
+
+/* maps UUIDs to page indices */
+struct pg_cache_metrics_index {
+ uv_rwlock_t lock;
+ Pvoid_t JudyHS_array;
+};
+
+/* gathers dirty pages to be written on disk */
+struct pg_cache_commited_page_index {
+ uv_rwlock_t lock;
+
+ Pvoid_t JudyL_array;
+
+ /*
+ * Dirty page correlation ID is a hint. Dirty pages that are correlated should have
+ * a small correlation ID difference. Dirty pages in memory should never have the
+ * same ID at the same time for correctness.
+ */
+ Word_t latest_corr_id;
+
+ unsigned nr_commited_pages;
+};
+
+/* gathers populated pages to be evicted */
+struct pg_cache_replaceQ {
+ uv_rwlock_t lock; /* LRU lock */
+
+ struct rrdeng_page_cache_descr *head; /* LRU */
+ struct rrdeng_page_cache_descr *tail; /* MRU */
+};
+
+struct page_cache { /* TODO: add statistics */
+ uv_rwlock_t pg_cache_rwlock; /* page cache lock */
+
+ struct pg_cache_metrics_index metrics_index;
+ struct pg_cache_commited_page_index commited_page_index;
+ struct pg_cache_replaceQ replaceQ;
+
+ unsigned page_descriptors;
+ unsigned populated_pages;
+};
+
+extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr);
+extern unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr);
+extern struct rrdeng_page_cache_descr *pg_cache_create_descr(void);
+extern void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_put(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr);
+extern struct pg_cache_page_index *
+ pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time);
+extern struct rrdeng_page_cache_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time);
+extern struct pg_cache_page_index *create_page_index(uuid_t *id);
+extern void init_page_cache(struct rrdengine_instance *ctx);
+extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
+
+#endif /* NETDATA_PAGECACHE_H */ \ No newline at end of file
diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h
new file mode 100644
index 000000000..db47af531
--- /dev/null
+++ b/database/engine/rrddiskprotocol.h
@@ -0,0 +1,119 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDDISKPROTOCOL_H
+#define NETDATA_RRDDISKPROTOCOL_H
+
+#define RRDENG_BLOCK_SIZE (4096)
+#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE
+
+#define RRDENG_MAGIC_SZ (32)
+#define RRDENG_DF_MAGIC "netdata-data-file"
+#define RRDENG_JF_MAGIC "netdata-journal-file"
+
+#define RRDENG_VER_SZ (16)
+#define RRDENG_DF_VER "1.0"
+#define RRDENG_JF_VER "1.0"
+
+#define UUID_SZ (16)
+#define CHECKSUM_SZ (4) /* CRC32 */
+
+#define RRD_NO_COMPRESSION (0)
+#define RRD_LZ4 (1)
+
+#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t)))
+/*
+ * Data file persistent super-block
+ */
+struct rrdeng_df_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t tier;
+ uint8_t padding[RRDENG_DF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Page types
+ */
+#define PAGE_METRICS (0)
+#define PAGE_LOGS (1) /* reserved */
+
+/*
+ * Data file page descriptor
+ */
+struct rrdeng_extent_page_descr {
+ uint8_t type;
+
+ uint8_t uuid[UUID_SZ];
+ uint32_t page_length;
+ uint64_t start_time;
+ uint64_t end_time;
+} __attribute__ ((packed));
+
+/*
+ * Data file extent header
+ */
+struct rrdeng_df_extent_header {
+ uint32_t payload_length;
+ uint8_t compression_algorithm;
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+/*
+ * Data file extent trailer
+ */
+struct rrdeng_df_extent_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+#define RRDENG_JF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ))
+/*
+ * Journal file super-block
+ */
+struct rrdeng_jf_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t padding[RRDENG_JF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Transaction record types
+ */
+#define STORE_PADDING (0)
+#define STORE_DATA (1)
+#define STORE_LOGS (2) /* reserved */
+
+/*
+ * Journal file transaction record header
+ */
+struct rrdeng_jf_transaction_header {
+ /* when set to STORE_PADDING jump to start of next block */
+ uint8_t type;
+
+ uint32_t reserved; /* reserved for future use */
+ uint64_t id;
+ uint16_t payload_length;
+} __attribute__ ((packed));
+
+/*
+ * Journal file transaction record trailer
+ */
+struct rrdeng_jf_transaction_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+/*
+ * Journal file STORE_DATA action
+ */
+struct rrdeng_jf_store_data {
+ /* data file extent information */
+ uint64_t extent_offset;
+ uint32_t extent_size;
+
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+#endif /* NETDATA_RRDDISKPROTOCOL_H */ \ No newline at end of file
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
new file mode 100644
index 000000000..b8e4eba01
--- /dev/null
+++ b/database/engine/rrdengine.c
@@ -0,0 +1,780 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+void sanity_check(void)
+{
+ /* Magic numbers must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
+
+ /* Version strings must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ);
+
+ /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
+ BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
+
+ BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */
+
+ /* page count must fit in 8 bits */
+ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
+}
+
+void read_extent_cb(uv_fs_t* req)
+{
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdeng_page_cache_descr *descr;
+ int ret;
+ unsigned i, j, count;
+ void *page, *uncompressed_buf = NULL;
+ uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
+ struct rrdengine_datafile *datafile;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ xt_io_descr = req->data;
+ if (req->result < 0) {
+ error("%s: uv_fs_read: %s", __func__, uv_strerror((int)req->result));
+ goto cleanup;
+ }
+
+ header = xt_io_descr->buf;
+ payload_length = header->payload_length;
+ count = header->number_of_pages;
+
+ payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
+
+ trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
+ ret = crc32cmp(trailer->checksum, crc);
+ datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+ if (unlikely(ret)) {
+ /* TODO: handle errors */
+ exit(UV_EIO);
+ goto cleanup;
+ }
+
+ if (RRD_NO_COMPRESSION != header->compression_algorithm) {
+ uncompressed_payload_length = 0;
+ for (i = 0 ; i < count ; ++i) {
+ uncompressed_payload_length += header->descr[i].page_length;
+ }
+ uncompressed_buf = mallocz(uncompressed_payload_length);
+ ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf,
+ payload_length, uncompressed_payload_length);
+ ctx->stats.before_decompress_bytes += payload_length;
+ ctx->stats.after_decompress_bytes += ret;
+ debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
+ /* care, we don't hold the descriptor mutex */
+ }
+
+ for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ descr = xt_io_descr->descr_array[i];
+ for (j = 0, page_offset = 0; j < count; ++j) {
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) &&
+ header->descr[j].page_length == descr->page_length &&
+ header->descr[j].start_time == descr->start_time &&
+ header->descr[j].end_time == descr->end_time) {
+ break;
+ }
+ page_offset += header->descr[j].page_length;
+ }
+ /* care, we don't hold the descriptor mutex */
+ if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
+ } else {
+ (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
+ }
+ pg_cache_replaceQ_insert(ctx, descr);
+ uv_mutex_lock(&descr->mutex);
+ descr->page = page;
+ descr->flags |= RRD_PAGE_POPULATED;
+ descr->flags &= ~RRD_PAGE_READ_PENDING;
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ if (xt_io_descr->release_descr) {
+ pg_cache_put_unsafe(descr);
+ } else {
+ pg_cache_wake_up_waiters_unsafe(descr);
+ }
+ uv_mutex_unlock(&descr->mutex);
+ }
+ if (RRD_NO_COMPRESSION != header->compression_algorithm) {
+ free(uncompressed_buf);
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+cleanup:
+ uv_fs_req_cleanup(req);
+ free(xt_io_descr->buf);
+ free(xt_io_descr);
+}
+
+
+static void do_read_extent(struct rrdengine_worker_config* wc,
+ struct rrdeng_page_cache_descr **descr,
+ unsigned count,
+ uint8_t release_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ unsigned i, size_bytes, pos, real_io_size;
+// uint32_t payload_length;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdengine_datafile *datafile;
+
+ datafile = descr[0]->extent->datafile;
+ pos = descr[0]->extent->offset;
+ size_bytes = descr[0]->extent->size;
+
+ xt_io_descr = mallocz(sizeof(*xt_io_descr));
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* free(xt_io_descr);
+ return;*/
+ }
+ for (i = 0 ; i < count; ++i) {
+ uv_mutex_lock(&descr[i]->mutex);
+ descr[i]->flags |= RRD_PAGE_READ_PENDING;
+// payload_length = descr[i]->page_length;
+ uv_mutex_unlock(&descr[i]->mutex);
+
+ xt_io_descr->descr_array[i] = descr[i];
+ }
+ xt_io_descr->descr_count = count;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = pos;
+ xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->completion = NULL;
+ /* xt_io_descr->descr_commit_idx_array[0] */
+ xt_io_descr->release_descr = release_descr;
+
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
+ assert (-1 != ret);
+ ctx->stats.io_read_bytes += real_io_size;
+ ++ctx->stats.io_read_requests;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+ ++ctx->stats.io_read_extents;
+ ctx->stats.pg_cache_backfills += count;
+}
+
+static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ unsigned count, payload_length, descr_size, size_bytes;
+ void *buf;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *df_header;
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_store_data *jf_metric_data;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ df_header = xt_io_descr->buf;
+ count = df_header->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+
+ buf = wal_get_transaction_buffer(wc, size_bytes);
+
+ jf_header = buf;
+ jf_header->type = STORE_DATA;
+ jf_header->reserved = 0;
+ jf_header->id = ctx->commit_log.transaction_id++;
+ jf_header->payload_length = payload_length;
+
+ jf_metric_data = buf + sizeof(*jf_header);
+ jf_metric_data->extent_offset = xt_io_descr->pos;
+ jf_metric_data->extent_size = xt_io_descr->bytes;
+ jf_metric_data->number_of_pages = count;
+ memcpy(jf_metric_data->descr, df_header->descr, descr_size);
+
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ crc32set(jf_trailer->checksum, crc);
+}
+
+static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data)
+{
+ switch (type) {
+ case STORE_DATA:
+ commit_data_extent(wc, (struct extent_io_descriptor *)data);
+ break;
+ default:
+ assert(type == STORE_DATA);
+ break;
+ }
+}
+
+void flush_pages_cb(uv_fs_t* req)
+{
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdeng_page_cache_descr *descr;
+ struct rrdengine_datafile *datafile;
+ int ret;
+ unsigned i, count;
+ Word_t commit_id;
+
+ xt_io_descr = req->data;
+ if (req->result < 0) {
+ error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ goto cleanup;
+ }
+ datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
+ __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+
+ count = xt_io_descr->descr_count;
+ for (i = 0 ; i < count ; ++i) {
+ /* care, we don't hold the descriptor mutex */
+ descr = xt_io_descr->descr_array[i];
+
+ uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ commit_id = xt_io_descr->descr_commit_idx_array[i];
+ ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, commit_id, PJE0);
+ assert(1 == ret);
+ --pg_cache->commited_page_index.nr_commited_pages;
+ uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+
+ pg_cache_replaceQ_insert(ctx, descr);
+
+ uv_mutex_lock(&descr->mutex);
+ descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
+ /* wake up waiters, care no reference being held */
+ pg_cache_wake_up_waiters_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+cleanup:
+ uv_fs_req_cleanup(req);
+ free(xt_io_descr->buf);
+ free(xt_io_descr);
+}
+
+/*
+ * completion must be NULL or valid.
+ * Returns 0 when no flushing can take place.
+ * Returns datafile bytes to be written on successful flushing initiation.
+ */
+static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ int compressed_size, max_compressed_size = 0;
+ unsigned i, count, size_bytes, pos, real_io_size;
+ uint32_t uncompressed_payload_length, payload_offset;
+ struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct extent_io_descriptor *xt_io_descr;
+ void *compressed_buf = NULL;
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ Pvoid_t *PValue;
+ Word_t Index;
+ uint8_t compression_algorithm = ctx->global_compress_alg;
+ struct extent_info *extent;
+ struct rrdengine_datafile *datafile;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ if (force) {
+ debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
+ }
+ uv_rwlock_rdlock(&pg_cache->commited_page_index.lock);
+ for (Index = 0, count = 0, uncompressed_payload_length = 0,
+ PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue ;
+
+ descr != NULL && count != MAX_PAGES_PER_EXTENT ;
+
+ PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ assert(0 != descr->page_length);
+
+ uv_mutex_lock(&descr->mutex);
+ if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) {
+ /* care, no reference being held */
+ descr->flags |= RRD_PAGE_WRITE_PENDING;
+ uncompressed_payload_length += descr->page_length;
+ descr_commit_idx_array[count] = Index;
+ eligible_pages[count++] = descr;
+ }
+ uv_mutex_unlock(&descr->mutex);
+ }
+ uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock);
+
+ if (!count) {
+ debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
+ if (completion)
+ complete(completion);
+ return 0;
+ }
+ xt_io_descr = mallocz(sizeof(*xt_io_descr));
+ payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
+ break;
+ default: /* Compress */
+ assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
+ max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
+ compressed_buf = mallocz(max_compressed_size);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
+ break;
+ }
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* free(xt_io_descr);*/
+ }
+ (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count);
+ xt_io_descr->descr_count = count;
+
+ pos = 0;
+ header = xt_io_descr->buf;
+ header->compression_algorithm = compression_algorithm;
+ header->number_of_pages = count;
+ pos += sizeof(*header);
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */
+ extent->offset = datafile->pos;
+ extent->number_of_pages = count;
+ extent->datafile = datafile;
+ extent->next = NULL;
+
+ for (i = 0 ; i < count ; ++i) {
+ /* This is here for performance reasons */
+ xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i];
+
+ descr = xt_io_descr->descr_array[i];
+ header->descr[i].type = PAGE_METRICS;
+ uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
+ header->descr[i].page_length = descr->page_length;
+ header->descr[i].start_time = descr->start_time;
+ header->descr[i].end_time = descr->end_time;
+ pos += sizeof(header->descr[i]);
+ }
+ for (i = 0 ; i < count ; ++i) {
+ descr = xt_io_descr->descr_array[i];
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
+ descr->extent = extent;
+ extent->pages[i] = descr;
+
+ pos += descr->page_length;
+ }
+ df_extent_insert(extent);
+
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ header->payload_length = uncompressed_payload_length;
+ break;
+ default: /* Compress */
+ compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf,
+ uncompressed_payload_length, max_compressed_size);
+ ctx->stats.before_compress_bytes += uncompressed_payload_length;
+ ctx->stats.after_compress_bytes += compressed_size;
+ debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
+ (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
+ free(compressed_buf);
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ header->payload_length = compressed_size;
+ break;
+ }
+ extent->size = size_bytes;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = datafile->pos;
+ xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->completion = completion;
+
+ trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer));
+ crc32set(trailer->checksum, crc);
+
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb);
+ assert (-1 != ret);
+ ctx->stats.io_write_bytes += real_io_size;
+ ++ctx->stats.io_write_requests;
+ ctx->stats.io_write_extent_bytes += real_io_size;
+ ++ctx->stats.io_write_extents;
+ do_commit_transaction(wc, STORE_DATA, xt_io_descr);
+ datafile->pos += ALIGN_BYTES_CEILING(size_bytes);
+ ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes);
+ rrdeng_test_quota(wc);
+
+ return ALIGN_BYTES_CEILING(size_bytes);
+}
+
+static void after_delete_old_data(uv_work_t *req, int status)
+{
+ struct rrdengine_instance *ctx = req->data;
+ struct rrdengine_worker_config* wc = &ctx->worker_config;
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ unsigned bytes;
+
+ (void)status;
+ datafile = ctx->datafiles.first;
+ journalfile = datafile->journalfile;
+ bytes = datafile->pos + journalfile->pos;
+
+ datafile_list_delete(ctx, datafile);
+ destroy_journal_file(journalfile, datafile);
+ destroy_data_file(datafile);
+ info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ datafile->tier, datafile->fileno);
+ free(journalfile);
+ free(datafile);
+
+ ctx->disk_space -= bytes;
+ info("Reclaimed %u bytes of disk space.", bytes);
+
+ /* unfreeze command processing */
+ wc->now_deleting.data = NULL;
+ /* wake up event loop */
+ assert(0 == uv_async_send(&wc->async));
+}
+
+static void delete_old_data(uv_work_t *req)
+{
+ struct rrdengine_instance *ctx = req->data;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *extent, *next;
+ struct rrdeng_page_cache_descr *descr;
+ unsigned count, i;
+
+ /* Safe to use since it will be deleted after we are done */
+ datafile = ctx->datafiles.first;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next) {
+ count = extent->number_of_pages;
+ for (i = 0 ; i < count ; ++i) {
+ descr = extent->pages[i];
+ pg_cache_punch_hole(ctx, descr);
+ }
+ next = extent->next;
+ free(extent);
+ }
+}
+
+void rrdeng_test_quota(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdengine_datafile *datafile;
+ unsigned current_size, target_size;
+ uint8_t out_of_space, only_one_datafile;
+
+ out_of_space = 0;
+ if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
+ out_of_space = 1;
+ }
+ datafile = ctx->datafiles.last;
+ current_size = datafile->pos;
+ target_size = ctx->max_disk_space / TARGET_DATAFILES;
+ target_size = MIN(target_size, MAX_DATAFILE_SIZE);
+ target_size = MAX(target_size, MIN_DATAFILE_SIZE);
+ only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0;
+ if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
+ /* Finalize data and journal file and create a new pair */
+ wal_flush_transaction_buffer(wc);
+ create_new_datafile_pair(ctx, 1, datafile->fileno + 1);
+ }
+ if (unlikely(out_of_space)) {
+ /* delete old data */
+ if (wc->now_deleting.data) {
+ /* already deleting data */
+ return;
+ }
+ info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ wc->now_deleting.data = ctx;
+ uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data);
+ }
+}
+
+int init_rrd_files(struct rrdengine_instance *ctx)
+{
+ return init_data_files(ctx);
+}
+
+void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
+{
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ wc->queue_size = 0;
+ assert(0 == uv_cond_init(&wc->cmd_cond));
+ assert(0 == uv_mutex_init(&wc->cmd_mutex));
+}
+
+void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
+{
+ unsigned queue_size;
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&wc->cmd_mutex);
+ while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) {
+ uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
+ }
+ assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
+ wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.tail + 1 : 0;
+ wc->queue_size = queue_size + 1;
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ /* wake up event loop */
+ assert(0 == uv_async_send(&wc->async));
+}
+
+struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
+{
+ struct rrdeng_cmd ret;
+ unsigned queue_size;
+
+ uv_mutex_lock(&wc->cmd_mutex);
+ queue_size = wc->queue_size;
+ if (queue_size == 0) {
+ ret.opcode = RRDENG_NOOP;
+ } else {
+ /* dequeue command */
+ ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
+ if (queue_size == 1) {
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ } else {
+ wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.head + 1 : 0;
+ }
+ wc->queue_size = queue_size - 1;
+
+ /* wake up producers */
+ uv_cond_signal(&wc->cmd_cond);
+ }
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ return ret;
+}
+
+void async_cb(uv_async_t *handle)
+{
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
+}
+
+void timer_cb(uv_timer_t* handle)
+{
+ struct rrdengine_worker_config* wc = handle->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ rrdeng_test_quota(wc);
+ debug(D_RRDENGINE, "%s: timeout reached.", __func__);
+ if (likely(!wc->now_deleting.data)) {
+ unsigned total_bytes, bytes_written;
+
+ /* There is free space so we can write to disk */
+ debug(D_RRDENGINE, "Flushing pages to disk.");
+ for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
+ bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 0, NULL);
+ }
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ char buf[4096];
+ debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf)));
+ }
+#endif
+}
+
+/* Flushes dirty pages when timer expires */
+#define TIMER_PERIOD_MS (1000)
+
+#define CMD_BATCH_SIZE (256)
+
+void rrdeng_worker(void* arg)
+{
+ struct rrdengine_worker_config* wc = arg;
+ struct rrdengine_instance *ctx = wc->ctx;
+ uv_loop_t* loop;
+ int shutdown;
+ enum rrdeng_opcode opcode;
+ uv_timer_t timer_req;
+ struct rrdeng_cmd cmd;
+
+ rrdeng_init_cmd_queue(wc);
+
+ loop = wc->loop = mallocz(sizeof(uv_loop_t));
+ uv_loop_init(loop);
+ loop->data = wc;
+
+ uv_async_init(wc->loop, &wc->async, async_cb);
+ wc->async.data = wc;
+
+ wc->now_deleting.data = NULL;
+
+ /* dirty page flushing timer */
+ uv_timer_init(loop, &timer_req);
+ timer_req.data = wc;
+
+ /* wake up initialization thread */
+ complete(&ctx->rrdengine_completion);
+
+ uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS);
+ shutdown = 0;
+ while (shutdown == 0 || uv_loop_alive(loop)) {
+ uv_run(loop, UV_RUN_DEFAULT);
+ /* wait for commands */
+ do {
+ cmd = rrdeng_deq_cmd(wc);
+ opcode = cmd.opcode;
+
+ switch (opcode) {
+ case RRDENG_NOOP:
+ /* the command queue was empty, do nothing */
+ break;
+ case RRDENG_SHUTDOWN:
+ shutdown = 1;
+ if (unlikely(wc->now_deleting.data)) {
+ /* postpone shutdown until after deletion */
+ info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
+ rrdeng_enq_cmd(wc, &cmd);
+ break;
+ }
+ /*
+ * uv_async_send after uv_close does not seem to crash in linux at the moment,
+ * it is however undocumented behaviour and we need to be aware if this becomes
+ * an issue in the future.
+ */
+ uv_close((uv_handle_t *)&wc->async, NULL);
+ assert(0 == uv_timer_stop(&timer_req));
+ uv_close((uv_handle_t *)&timer_req, NULL);
+ info("Shutting down RRD engine event loop.");
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all commited pages. */
+ }
+ break;
+ case RRDENG_READ_PAGE:
+ do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
+ break;
+ case RRDENG_READ_EXTENT:
+ do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1);
+ break;
+ case RRDENG_COMMIT_PAGE:
+ do_commit_transaction(wc, STORE_DATA, NULL);
+ break;
+ case RRDENG_FLUSH_PAGES: {
+ unsigned total_bytes, bytes_written;
+
+ /* First I/O should be enough to call completion */
+ bytes_written = do_flush_pages(wc, 1, cmd.completion);
+ for (total_bytes = bytes_written ;
+ bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 1, NULL);
+ }
+ break;
+ }
+ default:
+ debug(D_RRDENGINE, "%s: default.", __func__);
+ break;
+ }
+ } while (opcode != RRDENG_NOOP);
+ }
+ /* cleanup operations of the event loop */
+ wal_flush_transaction_buffer(wc);
+ uv_run(loop, UV_RUN_DEFAULT);
+
+ info("Shutting down RRD engine event loop complete.");
+ /* TODO: don't let the API block by waiting to enqueue commands */
+ uv_cond_destroy(&wc->cmd_cond);
+/* uv_mutex_destroy(&wc->cmd_mutex); */
+ assert(0 == uv_loop_close(loop));
+ free(loop);
+}
+
+
+#define NR_PAGES (256)
+static void basic_functional_test(struct rrdengine_instance *ctx)
+{
+ int i, j, failed_validations;
+ uuid_t uuid[NR_PAGES];
+ void *buf;
+ struct rrdeng_page_cache_descr *handle[NR_PAGES];
+ char uuid_str[37];
+ char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */
+
+ for (i = 0 ; i < NR_PAGES ; ++i) {
+ uuid_generate(uuid[i]);
+ uuid_unparse_lower(uuid[i], uuid_str);
+// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str);
+ buf = rrdeng_create_page(&uuid[i], &handle[i]);
+ /* Each page contains 10 times its own UUID stringified */
+ for (j = 0 ; j < 100 ; ++j) {
+ strcpy(buf + 37 * j, uuid_str);
+ strcpy(backup[i] + 37 * j, uuid_str);
+ }
+ rrdeng_commit_page(ctx, handle[i], (Word_t)i);
+ }
+ fprintf(stderr, "\n********** CREATED %d METRIC PAGES ***********\n\n", NR_PAGES);
+ failed_validations = 0;
+ for (i = 0 ; i < NR_PAGES ; ++i) {
+ buf = rrdeng_get_latest_page(ctx, &uuid[i], (void **)&handle[i]);
+ if (NULL == buf) {
+ ++failed_validations;
+ fprintf(stderr, "Page %d was LOST.\n", i);
+ }
+ if (memcmp(backup[i], buf, 37 * 100)) {
+ ++failed_validations;
+ fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i);
+ }
+ rrdeng_put_page(ctx, handle[i]);
+ }
+ fprintf(stderr, "\n********** CORRECTLY VALIDATED %d/%d METRIC PAGES ***********\n\n",
+ NR_PAGES - failed_validations, NR_PAGES);
+
+}
+/* C entry point for development purposes
+ * make "LDFLAGS=-errdengine_main"
+ */
+void rrdengine_main(void)
+{
+ int ret;
+ struct rrdengine_instance *ctx;
+
+ ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
+ if (ret) {
+ exit(ret);
+ }
+ basic_functional_test(ctx);
+
+ rrdeng_exit(ctx);
+ fprintf(stderr, "Hello world!");
+ exit(0);
+} \ No newline at end of file
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
new file mode 100644
index 000000000..141bb9c63
--- /dev/null
+++ b/database/engine/rrdengine.h
@@ -0,0 +1,171 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINE_H
+#define NETDATA_RRDENGINE_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <fcntl.h>
+#include <aio.h>
+#include <uv.h>
+#include <assert.h>
+#include <lz4.h>
+#include <Judy.h>
+#include <openssl/sha.h>
+#include <openssl/evp.h>
+#include <stdint.h>
+#include "../rrd.h"
+#include "rrddiskprotocol.h"
+#include "rrdenginelib.h"
+#include "datafile.h"
+#include "journalfile.h"
+#include "rrdengineapi.h"
+#include "pagecache.h"
+
+#ifdef NETDATA_RRD_INTERNALS
+
+#endif /* NETDATA_RRD_INTERNALS */
+
+/* Forward declerations */
+struct rrdengine_instance;
+
+#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
+
+#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
+#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+
+
+typedef enum {
+ RRDENGINE_STATUS_UNINITIALIZED = 0,
+ RRDENGINE_STATUS_INITIALIZING,
+ RRDENGINE_STATUS_INITIALIZED
+} rrdengine_state_t;
+
+enum rrdeng_opcode {
+ /* can be used to return empty status or flush the command queue */
+ RRDENG_NOOP = 0,
+
+ RRDENG_READ_PAGE,
+ RRDENG_READ_EXTENT,
+ RRDENG_COMMIT_PAGE,
+ RRDENG_FLUSH_PAGES,
+ RRDENG_SHUTDOWN,
+
+ RRDENG_MAX_OPCODE
+};
+
+struct rrdeng_cmd {
+ enum rrdeng_opcode opcode;
+ union {
+ struct rrdeng_read_page {
+ struct rrdeng_page_cache_descr *page_cache_descr;
+ } read_page;
+ struct rrdeng_read_extent {
+ struct rrdeng_page_cache_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
+ int page_count;
+ } read_extent;
+ struct completion *completion;
+ };
+};
+
+#define RRDENG_CMD_Q_MAX_SIZE (2048)
+
+struct rrdeng_cmdqueue {
+ unsigned head, tail;
+ struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE];
+};
+
+struct extent_io_descriptor {
+ uv_fs_t req;
+ uv_buf_t iov;
+ void *buf;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+ unsigned descr_count;
+ int release_descr;
+ struct rrdeng_page_cache_descr *descr_array[MAX_PAGES_PER_EXTENT];
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+};
+
+struct generic_io_descriptor {
+ uv_fs_t req;
+ uv_buf_t iov;
+ void *buf;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+};
+
+struct rrdengine_worker_config {
+ struct rrdengine_instance *ctx;
+
+ uv_thread_t thread;
+ uv_loop_t* loop;
+ uv_async_t async;
+ uv_work_t now_deleting;
+
+ /* FIFO command queue */
+ uv_mutex_t cmd_mutex;
+ uv_cond_t cmd_cond;
+ volatile unsigned queue_size;
+ struct rrdeng_cmdqueue cmd_queue;
+};
+
+/*
+ * Debug statistics not used by code logic.
+ * They only describe operations since DB engine instance load time.
+ */
+struct rrdengine_statistics {
+ rrdeng_stats_t metric_API_producers;
+ rrdeng_stats_t metric_API_consumers;
+ rrdeng_stats_t pg_cache_insertions;
+ rrdeng_stats_t pg_cache_deletions;
+ rrdeng_stats_t pg_cache_hits;
+ rrdeng_stats_t pg_cache_misses;
+ rrdeng_stats_t pg_cache_backfills;
+ rrdeng_stats_t pg_cache_evictions;
+ rrdeng_stats_t before_decompress_bytes;
+ rrdeng_stats_t after_decompress_bytes;
+ rrdeng_stats_t before_compress_bytes;
+ rrdeng_stats_t after_compress_bytes;
+ rrdeng_stats_t io_write_bytes;
+ rrdeng_stats_t io_write_requests;
+ rrdeng_stats_t io_read_bytes;
+ rrdeng_stats_t io_read_requests;
+ rrdeng_stats_t io_write_extent_bytes;
+ rrdeng_stats_t io_write_extents;
+ rrdeng_stats_t io_read_extent_bytes;
+ rrdeng_stats_t io_read_extents;
+ rrdeng_stats_t datafile_creations;
+ rrdeng_stats_t datafile_deletions;
+ rrdeng_stats_t journalfile_creations;
+ rrdeng_stats_t journalfile_deletions;
+};
+
+struct rrdengine_instance {
+ rrdengine_state_t rrdengine_state;
+ struct rrdengine_worker_config worker_config;
+ struct completion rrdengine_completion;
+ struct page_cache pg_cache;
+ uint8_t global_compress_alg;
+ struct transaction_commit_log commit_log;
+ struct rrdengine_datafile_list datafiles;
+ char dbfiles_path[FILENAME_MAX+1];
+ uint64_t disk_space;
+ uint64_t max_disk_space;
+ unsigned long max_cache_pages;
+ unsigned long cache_pages_low_watermark;
+
+ struct rrdengine_statistics stats;
+};
+
+extern void sanity_check(void);
+extern int init_rrd_files(struct rrdengine_instance *ctx);
+extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
+extern void rrdeng_worker(void* arg);
+extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
+extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
+
+#endif /* NETDATA_RRDENGINE_H */ \ No newline at end of file
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
new file mode 100644
index 000000000..a4e711554
--- /dev/null
+++ b/database/engine/rrdengineapi.c
@@ -0,0 +1,484 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+/* Default global database instance */
+static struct rrdengine_instance default_global_ctx;
+
+int default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB;
+
+/*
+ * Gets a handle for storing metrics to the database.
+ * The handle must be released with rrdeng_store_metric_final().
+ */
+void rrdeng_store_metric_init(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct page_cache *pg_cache;
+ struct rrdengine_instance *ctx;
+ uuid_t temp_id;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ EVP_MD_CTX *evpctx;
+ unsigned char hash_value[EVP_MAX_MD_SIZE];
+ unsigned int hash_len;
+
+ //&default_global_ctx; TODO: test this use case or remove it?
+
+ ctx = rd->rrdset->rrdhost->rrdeng_ctx;
+ pg_cache = &ctx->pg_cache;
+ handle = &rd->state->handle.rrdeng;
+ handle->ctx = ctx;
+
+ evpctx = EVP_MD_CTX_create();
+ EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(evpctx, rd->id, strlen(rd->id));
+ EVP_DigestUpdate(evpctx, rd->rrdset->id, strlen(rd->rrdset->id));
+ EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
+ EVP_MD_CTX_destroy(evpctx);
+ assert(hash_len > sizeof(temp_id));
+ memcpy(&temp_id, hash_value, sizeof(temp_id));
+
+ handle->descr = NULL;
+ handle->prev_descr = NULL;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ /* First time we see the UUID */
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
+ assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(&temp_id);
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+ rd->state->rrdeng_uuid = &page_index->id;
+ handle->page_index = page_index;
+}
+
+void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct page_cache *pg_cache;
+ struct rrdeng_page_cache_descr *descr;
+ storage_number *page;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ pg_cache = &ctx->pg_cache;
+ descr = handle->descr;
+ if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
+ if (descr) {
+ descr->handle = NULL;
+ if (descr->page_length) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+ ++descr->refcnt;
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(handle->prev_descr);
+ }
+ handle->prev_descr = descr;
+ } else {
+ free(descr->page);
+ free(descr);
+ handle->descr = NULL;
+ }
+ }
+ page = rrdeng_create_page(&handle->page_index->id, &descr);
+ assert(page);
+ handle->prev_descr = handle->descr;
+ handle->descr = descr;
+ descr->handle = handle;
+ uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
+ uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+ }
+ page = descr->page;
+
+ page[descr->page_length / sizeof(number)] = number;
+ descr->end_time = point_in_time;
+ descr->page_length += sizeof(number);
+ if (unlikely(INVALID_TIME == descr->start_time)) {
+ descr->start_time = point_in_time;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, 1);
+#endif
+ pg_cache_insert(ctx, handle->page_index, descr);
+ } else {
+ pg_cache_add_new_metric_time(handle->page_index, descr);
+ }
+}
+
+/*
+ * Releases the database reference from the handle for storing metrics.
+ */
+void rrdeng_store_metric_finalize(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_cache_descr *descr;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (descr) {
+ descr->handle = NULL;
+ if (descr->page_length) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(handle->prev_descr);
+ }
+ } else {
+ free(descr->page);
+ free(descr);
+ }
+ }
+}
+
+/*
+ * Gets a handle for loading metrics from the database.
+ * The handle must be released with rrdeng_load_metric_final().
+ */
+void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+
+ ctx = rd->rrdset->rrdhost->rrdeng_ctx;
+ rrdimm_handle->start_time = start_time;
+ rrdimm_handle->end_time = end_time;
+ handle = &rrdimm_handle->rrdeng;
+ handle->now = start_time;
+ handle->dt = rd->rrdset->update_every;
+ handle->ctx = ctx;
+ handle->descr = NULL;
+ handle->page_index = pg_cache_preload(ctx, rd->state->rrdeng_uuid,
+ start_time * USEC_PER_SEC, end_time * USEC_PER_SEC);
+}
+
+storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_cache_descr *descr;
+ storage_number *page, ret;
+ unsigned position;
+ usec_t point_in_time;
+
+ handle = &rrdimm_handle->rrdeng;
+ if (unlikely(INVALID_TIME == handle->now)) {
+ return SN_EMPTY_SLOT;
+ }
+ ctx = handle->ctx;
+ point_in_time = handle->now * USEC_PER_SEC;
+ descr = handle->descr;
+
+ if (unlikely(NULL == handle->page_index)) {
+ ret = SN_EMPTY_SLOT;
+ goto out;
+ }
+ if (unlikely(NULL == descr ||
+ point_in_time < descr->start_time ||
+ point_in_time > descr->end_time)) {
+ if (descr) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+ pg_cache_put(descr);
+ handle->descr = NULL;
+ }
+ descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
+ if (NULL == descr) {
+ ret = SN_EMPTY_SLOT;
+ goto out;
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
+#endif
+ handle->descr = descr;
+ }
+ if (unlikely(INVALID_TIME == descr->start_time ||
+ INVALID_TIME == descr->end_time)) {
+ ret = SN_EMPTY_SLOT;
+ goto out;
+ }
+ page = descr->page;
+ if (unlikely(descr->start_time == descr->end_time)) {
+ ret = page[0];
+ goto out;
+ }
+ position = ((uint64_t)(point_in_time - descr->start_time)) * (descr->page_length / sizeof(storage_number)) /
+ (descr->end_time - descr->start_time + 1);
+ ret = page[position];
+
+out:
+ handle->now += handle->dt;
+ if (unlikely(handle->now > rrdimm_handle->end_time)) {
+ handle->now = INVALID_TIME;
+ }
+ return ret;
+}
+
+int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+
+ handle = &rrdimm_handle->rrdeng;
+ return (INVALID_TIME == handle->now);
+}
+
+/*
+ * Releases the database reference from the handle for loading metrics.
+ */
+void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_cache_descr *descr;
+
+ handle = &rrdimm_handle->rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (descr) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+ pg_cache_put(descr);
+ }
+}
+
+time_t rrdeng_metric_latest_time(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct pg_cache_page_index *page_index;
+
+ handle = &rd->state->handle.rrdeng;
+ page_index = handle->page_index;
+
+ return page_index->latest_time / USEC_PER_SEC;
+}
+time_t rrdeng_metric_oldest_time(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct pg_cache_page_index *page_index;
+
+ handle = &rd->state->handle.rrdeng;
+ page_index = handle->page_index;
+
+ return page_index->oldest_time / USEC_PER_SEC;
+}
+
+/* Also gets a reference for the page */
+void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr)
+{
+ struct rrdeng_page_cache_descr *descr;
+ void *page;
+ int ret;
+
+ /* TODO: check maximum number of pages in page cache limit */
+
+ page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ descr = pg_cache_create_descr();
+ descr->page = page;
+ descr->id = id; /* TODO: add page type: metric, log, something? */
+ descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ descr->refcnt = 1;
+
+ debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------");
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ *ret_descr = descr;
+ return page;
+}
+
+/* The page must not be empty */
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+ Word_t page_correlation_id)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+
+ if (unlikely(NULL == descr)) {
+ debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-commited.", __func__);
+ return;
+ }
+ assert(descr->page_length);
+
+ uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ PValue = JudyLIns(&pg_cache->commited_page_index.JudyL_array, page_correlation_id, PJE0);
+ *PValue = descr;
+ ++pg_cache->commited_page_index.nr_commited_pages;
+ uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+
+ pg_cache_put(descr);
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
+{
+ struct rrdeng_page_cache_descr *descr;
+
+ debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+
+ return descr->page;
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
+{
+ struct rrdeng_page_cache_descr *descr;
+
+ debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+
+ return descr->page;
+}
+
+void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ array[0] = (uint64_t)ctx->stats.metric_API_producers;
+ array[1] = (uint64_t)ctx->stats.metric_API_consumers;
+ array[2] = (uint64_t)pg_cache->page_descriptors;
+ array[3] = (uint64_t)pg_cache->populated_pages;
+ array[4] = (uint64_t)pg_cache->commited_page_index.nr_commited_pages;
+ array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
+ array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
+ array[7] = (uint64_t)ctx->stats.pg_cache_hits;
+ array[8] = (uint64_t)ctx->stats.pg_cache_misses;
+ array[9] = (uint64_t)ctx->stats.pg_cache_backfills;
+ array[10] = (uint64_t)ctx->stats.pg_cache_evictions;
+ array[11] = (uint64_t)ctx->stats.before_compress_bytes;
+ array[12] = (uint64_t)ctx->stats.after_compress_bytes;
+ array[13] = (uint64_t)ctx->stats.before_decompress_bytes;
+ array[14] = (uint64_t)ctx->stats.after_decompress_bytes;
+ array[15] = (uint64_t)ctx->stats.io_write_bytes;
+ array[16] = (uint64_t)ctx->stats.io_write_requests;
+ array[17] = (uint64_t)ctx->stats.io_read_bytes;
+ array[18] = (uint64_t)ctx->stats.io_read_requests;
+ array[19] = (uint64_t)ctx->stats.io_write_extent_bytes;
+ array[20] = (uint64_t)ctx->stats.io_write_extents;
+ array[21] = (uint64_t)ctx->stats.io_read_extent_bytes;
+ array[22] = (uint64_t)ctx->stats.io_read_extents;
+ array[23] = (uint64_t)ctx->stats.datafile_creations;
+ array[24] = (uint64_t)ctx->stats.datafile_deletions;
+ array[25] = (uint64_t)ctx->stats.journalfile_creations;
+ array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+}
+
+/* Releases reference to page */
+void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
+{
+ (void)ctx;
+ pg_cache_put((struct rrdeng_page_cache_descr *)handle);
+}
+
+/*
+ * Returns 0 on success, 1 on error
+ */
+int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
+{
+ struct rrdengine_instance *ctx;
+ int error;
+
+ sanity_check();
+ if (NULL == ctxp) {
+ /* for testing */
+ ctx = &default_global_ctx;
+ memset(ctx, 0, sizeof(*ctx));
+ } else {
+ *ctxp = ctx = callocz(1, sizeof(*ctx));
+ }
+ if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) {
+ return 1;
+ }
+ ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING;
+ ctx->global_compress_alg = RRD_LZ4;
+ if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
+ page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+ ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE);
+ /* try to keep 5% of the page cache free */
+ ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100;
+ if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
+ disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
+ ctx->max_disk_space = disk_space_mb * 1048576LLU;
+ strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
+ ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
+
+ memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
+ ctx->worker_config.ctx = ctx;
+ init_page_cache(ctx);
+ init_commit_log(ctx);
+ error = init_rrd_files(ctx);
+ if (error) {
+ ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED;
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ }
+ return 1;
+ }
+
+ init_completion(&ctx->rrdengine_completion);
+ assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
+ /* wait for worker thread to initialize */
+ wait_for_completion(&ctx->rrdengine_completion);
+ destroy_completion(&ctx->rrdengine_completion);
+
+ ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED;
+ return 0;
+}
+
+/*
+ * Returns 0 on success, 1 on error
+ */
+int rrdeng_exit(struct rrdengine_instance *ctx)
+{
+ struct rrdeng_cmd cmd;
+
+ if (NULL == ctx) {
+ /* TODO: move to per host basis */
+ ctx = &default_global_ctx;
+ }
+ if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) {
+ return 1;
+ }
+
+ /* TODO: add page to page cache */
+ cmd.opcode = RRDENG_SHUTDOWN;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ assert(0 == uv_thread_join(&ctx->worker_config.thread));
+
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ }
+ return 0;
+} \ No newline at end of file
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
new file mode 100644
index 000000000..e76629a4b
--- /dev/null
+++ b/database/engine/rrdengineapi.h
@@ -0,0 +1,37 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINEAPI_H
+#define NETDATA_RRDENGINEAPI_H
+
+#include "rrdengine.h"
+
+#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (32)
+#define RRDENG_MIN_DISK_SPACE_MB (256)
+extern int default_rrdeng_page_cache_mb;
+extern int default_rrdeng_disk_quota_mb;
+
+extern void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr);
+extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+ Word_t page_correlation_id);
+extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
+extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
+extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
+extern void rrdeng_store_metric_init(RRDDIM *rd);
+extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
+extern void rrdeng_store_metric_finalize(RRDDIM *rd);
+extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
+ time_t start_time, time_t end_time);
+extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle);
+extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle);
+extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
+extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
+extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
+extern void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+
+/* must call once before using anything */
+extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb);
+
+extern int rrdeng_exit(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_RRDENGINEAPI_H */ \ No newline at end of file
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
new file mode 100644
index 000000000..25f57ba1b
--- /dev/null
+++ b/database/engine/rrdenginelib.c
@@ -0,0 +1,116 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr)
+{
+ char uuid_str[37];
+ char str[512];
+ int pos = 0;
+
+ uuid_unparse_lower(*page_cache_descr->id, uuid_str);
+ pos += snprintfz(str, 512 - pos, "page(%p) id=%s\n"
+ "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
+ page_cache_descr->page, uuid_str,
+ page_cache_descr->page_length,
+ (uint64_t)page_cache_descr->start_time,
+ (uint64_t)page_cache_descr->end_time);
+ if (!page_cache_descr->extent) {
+ pos += snprintfz(str + pos, 512 - pos, "N/A");
+ } else {
+ pos += snprintfz(str + pos, 512 - pos, "%"PRIu64, page_cache_descr->extent->offset);
+ }
+ snprintfz(str + pos, 512 - pos, " flags:0x%2.2lX refcnt:%u\n\n", page_cache_descr->flags, page_cache_descr->refcnt);
+ fputs(str, stderr);
+}
+
+int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
+{
+ int ret;
+ uv_fs_t req;
+ uv_stat_t* s;
+
+ ret = uv_fs_fstat(NULL, &req, file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_fstat: %s\n", uv_strerror(ret));
+ }
+ assert(req.result == 0);
+ s = req.ptr;
+ if (!(s->st_mode & S_IFREG)) {
+ error("Not a regular file.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ if (s->st_size < min_size) {
+ error("File length is too short.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ *file_size = s->st_size;
+ uv_fs_req_cleanup(&req);
+
+ return 0;
+}
+
+char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size)
+{
+ struct page_cache *pg_cache;
+
+ pg_cache = &ctx->pg_cache;
+ snprintfz(str, size,
+ "metric_API_producers: %ld\n"
+ "metric_API_consumers: %ld\n"
+ "page_cache_total_pages: %ld\n"
+ "page_cache_populated_pages: %ld\n"
+ "page_cache_commited_pages: %ld\n"
+ "page_cache_insertions: %ld\n"
+ "page_cache_deletions: %ld\n"
+ "page_cache_hits: %ld\n"
+ "page_cache_misses: %ld\n"
+ "page_cache_backfills: %ld\n"
+ "page_cache_evictions: %ld\n"
+ "compress_before_bytes: %ld\n"
+ "compress_after_bytes: %ld\n"
+ "decompress_before_bytes: %ld\n"
+ "decompress_after_bytes: %ld\n"
+ "io_write_bytes: %ld\n"
+ "io_write_requests: %ld\n"
+ "io_read_bytes: %ld\n"
+ "io_read_requests: %ld\n"
+ "io_write_extent_bytes: %ld\n"
+ "io_write_extents: %ld\n"
+ "io_read_extent_bytes: %ld\n"
+ "io_read_extents: %ld\n"
+ "datafile_creations: %ld\n"
+ "datafile_deletions: %ld\n"
+ "journalfile_creations: %ld\n"
+ "journalfile_deletions: %ld\n",
+ (long)ctx->stats.metric_API_producers,
+ (long)ctx->stats.metric_API_consumers,
+ (long)pg_cache->page_descriptors,
+ (long)pg_cache->populated_pages,
+ (long)pg_cache->commited_page_index.nr_commited_pages,
+ (long)ctx->stats.pg_cache_insertions,
+ (long)ctx->stats.pg_cache_deletions,
+ (long)ctx->stats.pg_cache_hits,
+ (long)ctx->stats.pg_cache_misses,
+ (long)ctx->stats.pg_cache_backfills,
+ (long)ctx->stats.pg_cache_evictions,
+ (long)ctx->stats.before_compress_bytes,
+ (long)ctx->stats.after_compress_bytes,
+ (long)ctx->stats.before_decompress_bytes,
+ (long)ctx->stats.after_decompress_bytes,
+ (long)ctx->stats.io_write_bytes,
+ (long)ctx->stats.io_write_requests,
+ (long)ctx->stats.io_read_bytes,
+ (long)ctx->stats.io_read_requests,
+ (long)ctx->stats.io_write_extent_bytes,
+ (long)ctx->stats.io_write_extents,
+ (long)ctx->stats.io_read_extent_bytes,
+ (long)ctx->stats.io_read_extents,
+ (long)ctx->stats.datafile_creations,
+ (long)ctx->stats.datafile_deletions,
+ (long)ctx->stats.journalfile_creations,
+ (long)ctx->stats.journalfile_deletions
+ );
+ return str;
+}
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
new file mode 100644
index 000000000..bb6f072bf
--- /dev/null
+++ b/database/engine/rrdenginelib.h
@@ -0,0 +1,84 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINELIB_H
+#define NETDATA_RRDENGINELIB_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdeng_page_cache_descr;
+
+#define STR_HELPER(x) #x
+#define STR(x) STR_HELPER(x)
+
+/* Taken from linux kernel */
+#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))
+
+#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+
+typedef uintptr_t rrdeng_stats_t;
+
+#ifdef __ATOMIC_RELAXED
+#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0)
+#else
+#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
+#endif
+
+#ifndef O_DIRECT
+/* Workaround for OS X */
+#define O_DIRECT (0)
+#endif
+
+struct completion {
+ uv_mutex_t mutex;
+ uv_cond_t cond;
+ volatile unsigned completed;
+};
+
+static inline void init_completion(struct completion *p)
+{
+ p->completed = 0;
+ assert(0 == uv_cond_init(&p->cond));
+ assert(0 == uv_mutex_init(&p->mutex));
+}
+
+static inline void destroy_completion(struct completion *p)
+{
+ uv_cond_destroy(&p->cond);
+ uv_mutex_destroy(&p->mutex);
+}
+
+static inline void wait_for_completion(struct completion *p)
+{
+ uv_mutex_lock(&p->mutex);
+ while (0 == p->completed) {
+ uv_cond_wait(&p->cond, &p->mutex);
+ }
+ assert(1 == p->completed);
+ uv_mutex_unlock(&p->mutex);
+}
+
+static inline void complete(struct completion *p)
+{
+ uv_mutex_lock(&p->mutex);
+ p->completed = 1;
+ uv_mutex_unlock(&p->mutex);
+ uv_cond_broadcast(&p->cond);
+}
+
+static inline int crc32cmp(void *crcp, uLong crc)
+{
+ return (*(uint32_t *)crcp != crc);
+}
+
+static inline void crc32set(void *crcp, uLong crc)
+{
+ *(uint32_t *)crcp = crc;
+}
+
+extern void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr);
+extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
+extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
+
+#endif /* NETDATA_RRDENGINELIB_H */ \ No newline at end of file