summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/engine/Makefile.am12
-rw-r--r--database/engine/README.md259
-rw-r--r--database/engine/datafile.c460
-rw-r--r--database/engine/datafile.h67
-rw-r--r--database/engine/journalfile.c515
-rw-r--r--database/engine/journalfile.h49
-rw-r--r--database/engine/metadata_log/Makefile.am8
-rw-r--r--database/engine/metadata_log/README.md0
-rw-r--r--database/engine/metadata_log/compaction.c86
-rw-r--r--database/engine/metadata_log/compaction.h14
-rw-r--r--database/engine/metadata_log/logfile.c453
-rw-r--r--database/engine/metadata_log/logfile.h39
-rw-r--r--database/engine/metadata_log/metadatalog.h28
-rwxr-xr-xdatabase/engine/metadata_log/metadatalogapi.c39
-rw-r--r--database/engine/metadata_log/metadatalogapi.h12
-rw-r--r--database/engine/metadata_log/metadatalogprotocol.h53
-rwxr-xr-xdatabase/engine/metadata_log/metalogpluginsd.c140
-rw-r--r--database/engine/metadata_log/metalogpluginsd.h33
-rw-r--r--database/engine/pagecache.c1178
-rw-r--r--database/engine/pagecache.h231
-rw-r--r--database/engine/rrddiskprotocol.h119
-rw-r--r--database/engine/rrdengine.c1266
-rw-r--r--database/engine/rrdengine.h233
-rwxr-xr-xdatabase/engine/rrdengineapi.c999
-rw-r--r--database/engine/rrdengineapi.h63
-rw-r--r--database/engine/rrdenginelib.c294
-rw-r--r--database/engine/rrdenginelib.h144
-rw-r--r--database/engine/rrdenglocking.c241
-rw-r--r--database/engine/rrdenglocking.h17
29 files changed, 7052 insertions, 0 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
new file mode 100644
index 0000000..4340500
--- /dev/null
+++ b/database/engine/Makefile.am
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+SUBDIRS = \
+ metadata_log \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/README.md b/database/engine/README.md
new file mode 100644
index 0000000..191366a
--- /dev/null
+++ b/database/engine/README.md
@@ -0,0 +1,259 @@
+<!--
+title: "Database engine"
+description: "Netdata's highly-efficient database engine use both RAM and disk for distributed, long-term storage of per-second metrics."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/database/engine/README.md
+-->
+
+# Database engine
+
+The Database Engine works like a traditional database. It dedicates a certain amount of RAM to data caching and
+indexing, while the rest of the data resides compressed on disk. Unlike other [memory modes](/database/README.md), the
+amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression
+ratio, not a fixed number of metrics collected.
+
+By using both RAM and disk space, the database engine allows for long-term storage of per-second metrics inside of the
+Agent itself.
+
+In addition, the database engine is the only memory mode that supports changing the data collection update frequency
+(`update_every`) without losing the metrics your Agent already gathered and stored.
+
+## Configuration
+
+To use the database engine, open `netdata.conf` and set `memory mode` to `dbengine`.
+
+```conf
+[global]
+ memory mode = dbengine
+```
+
+To configure the database engine, look for the `page cache size` and `dbengine multihost disk space` settings in the
+`[global]` section of your `netdata.conf`. The Agent ignores the `history` setting when using the database engine.
+
+```conf
+[global]
+ page cache size = 32
+ dbengine multihost disk space = 256
+```
+
+The above values are the default values for Page Cache size and DB engine disk space quota. Both numbers are
+in **MiB**.
+
+The `page cache size` option determines the amount of RAM in **MiB** dedicated to caching Netdata metric values. The
+actual page cache size will be slightly larger than this figure—see the [memory requirements](#memory-requirements)
+section for details.
+
+The `dbengine multihost disk space` option determines the amount of disk space in **MiB** that is dedicated to storing
+Netdata metric values and all related metadata describing them. You can use the [**database engine
+calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics)
+to correctly set `dbengine multihost disk space` based on your metrics retention policy. The calculator gives an
+accurate estimate based on how many child nodes you have, how many metrics your Agent collects, and more.
+
+### Legacy configuration
+
+The deprecated `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated to storing
+Netdata metric values per legacy database engine instance (see [details on the legacy mode](#legacy-mode) below).
+
+```conf
+[global]
+ dbengine disk space = 256
+```
+
+### Streaming metrics to the database engine
+
+When using the multihost database engine, all parent and child nodes share the same `page cache size` and `dbengine
+multihost disk space` in a single dbengine instance. The [**database engine
+calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics)
+helps you properly set `page cache size` and `dbengine multihost disk space` on your parent node to allocate enough
+resources based on your metrics retention policy and how many child nodes you have.
+
+#### Legacy mode
+
+_For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and
+another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five
+instances in total (`1 parent + 4 child nodes = 5 instances`).
+
+The Agent allocates resources for each instance separately using the `dbengine disk space` (**deprecated**) setting. If
+`dbengine disk space`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, which
+means the total disk space required to store all instances is, roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`.
+
+#### Backward compatibility
+
+All existing metrics belonging to child nodes are automatically converted to legacy dbengine instances and the localhost
+metrics are transferred to the multihost dbengine instance.
+
+All new child nodes are automatically transferred to the multihost dbengine instance and share its page cache and disk
+space. If you want to migrate a child node from its legacy dbengine instance to the multihost dbengine instance, you
+must delete the instance's directory, which is located in `/var/cache/netdata/MACHINE_GUID/dbengine`, after stopping the
+Agent.
+
+##### Information
+
+For more information about setting `memory mode` on your nodes, in addition to other streaming configurations, see
+[streaming](/streaming/README.md).
+
+### Memory requirements
+
+Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the
+available memory.
+
+There are explicit memory requirements **per** DB engine **instance**:
+
+- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what
+ the user configured with `page cache size`.
+
+- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata.
+
+ - roughly speaking this is 3% of the uncompressed disk space taken by the DB files.
+
+ - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space
+ footprint.
+
+An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space`
+options.
+
+You can use our [database engine
+calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics)
+to validate the memory requirements for your particular system(s) and configuration (**out-of-date**).
+
+### Disk space requirements
+
+There are explicit disk space requirements **per** DB engine **instance**:
+
+- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what
+ the user configured with `dbengine multihost disk space` or `dbengine disk space`.
+
+### File descriptor requirements
+
+The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or
+parent server). When configuring your system you should make sure there are at least 50 file descriptors available per
+`dbengine` instance.
+
+Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of
+the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take
+that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the
+Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use
+`memory mode = dbengine`.
+
+If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to
+override the Netdata service by running `sudo systemctl edit netdata` and creating a file with contents:
+
+```sh
+[Service]
+LimitNOFILE=65536
+```
+
+For other types of services one can add the line:
+
+```sh
+ulimit -n 65536
+```
+
+at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing
+ `/etc/sysctl.conf`. For linux that would be:
+
+```conf
+fs.file-max = 65536
+```
+
+In FreeBSD and OS X you change the lines like this:
+
+```conf
+kern.maxfilesperproc=65536
+kern.maxfiles=65536
+```
+
+You can apply the settings by running `sysctl -p` or by rebooting.
+
+## Files
+
+With the DB engine memory mode the metric data are stored in database files. These files are organized in pairs, the
+datafiles and their corresponding journalfiles, e.g.:
+
+```sh
+datafile-1-0000000001.ndf
+journalfile-1-0000000001.njf
+datafile-1-0000000002.ndf
+journalfile-1-0000000002.njf
+datafile-1-0000000003.ndf
+journalfile-1-0000000003.njf
+...
+```
+
+They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default
+location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user
+can safely delete some pairs of files when Netdata is stopped to manually free up some space.
+
+_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ You can also set up
+one or more [exporting connectors](/exporting/README.md) to send your Netdata metrics to other databases for long-term
+storage at lower granularity.
+
+## Operation
+
+The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store
+consecutive values generated from the data collectors. Those pages comprise the **Page Cache**.
+
+When those pages fill up they are slowly compressed and flushed to disk. It can take `4096 / 4 = 1024 seconds = 17
+minutes`, for a chart dimension that is being collected every 1 second, to fill a page. Pages can be cut short when we
+stop Netdata or the DB engine instance so as to not lose the data. When we query the DB engine for data we trigger disk
+read I/O requests that fill the Page Cache with the requested pages and potentially evict cold (not recently used)
+pages.
+
+When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting
+the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated
+and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time.
+
+The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O
+traffic so as to create the minimum possible interference with other applications.
+
+## Evaluation
+
+We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the web
+API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a Netdata
+parent node. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a Seagate
+Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD.
+
+For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker
+thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis
+of the time-series is emulated and accelerated so that the worker threads can generate as many data points as possible
+without delays.
+
+We also defined 32 worker threads that perform queries on random metrics with semi-random time ranges. The
+starting time of the query is randomly selected between the beginning of the time-series and the time of the latest data
+point. The ending time is randomly selected between 1 second and 1 hour after the starting time. The pseudo-random
+numbers are generated with a uniform distribution.
+
+The data are written to the database at the same time as they are read from it. This is a concurrent read/write mixed
+workload with a duration of 60 seconds. The faster `dbengine` runs, the bigger the dataset size becomes since more
+data points will be generated. We set a page cache size of 64MiB for the two disk-bound scenarios. This way, the dataset
+size of the metric data is much bigger than the RAM that is being used for caching so as to trigger I/O requests most
+of the time. In our final scenario, we set the page cache size to 16 GiB. That way, the dataset fits in the page cache
+so as to avoid all disk bottlenecks.
+
+The reported numbers are the following:
+
+| device | page cache | dataset | reads/sec | writes/sec |
+| :----: | :--------: | ------: | --------: | ---------: |
+| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M |
+| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M |
+| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M |
+
+where "reads/sec" is the number of metric data points being read from the database via its API per second and
+"writes/sec" is the number of metric data points being written to the database per second.
+
+Notice that the HDD numbers are pretty high and not much slower than the SSD numbers. This is thanks to the database
+engine design being optimized for rotating media. In the database engine disk I/O requests are:
+
+- asynchronous to mask the high I/O latency of HDDs.
+- mostly large to reduce the amount of HDD seeking time.
+- mostly sequential to reduce the amount of HDD seeking time.
+- compressed to reduce the amount of required throughput.
+
+As a result, the HDD is not thousands of times slower than the SSD, which is typical for other workloads.
+
+An interesting observation to make is that the CPU-bound run (16 GiB page cache) generates fewer data than the SSD run
+(6.8 GiB vs 9.8 GiB). The reason is that the 32 reader threads in the SSD scenario are more frequently blocked by I/O,
+and generate a read load of 1.7M/sec, whereas in the CPU-bound scenario the read load is 70 times higher at 118M/sec.
+Consequently, there is a significant degree of interference by the reader threads, that slow down the writer threads.
+This is also possible because the interference effects are greater than the SSD impact on data generation throughput.
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
new file mode 100644
index 0000000..7a052f9
--- /dev/null
+++ b/database/engine/datafile.c
@@ -0,0 +1,460 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void df_extent_insert(struct extent_info *extent)
+{
+ struct rrdengine_datafile *datafile = extent->datafile;
+
+ if (likely(NULL != datafile->extents.last)) {
+ datafile->extents.last->next = extent;
+ }
+ if (unlikely(NULL == datafile->extents.first)) {
+ datafile->extents.first = extent;
+ }
+ datafile->extents.last = extent;
+}
+
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ if (likely(NULL != ctx->datafiles.last)) {
+ ctx->datafiles.last->next = datafile;
+ }
+ if (unlikely(NULL == ctx->datafiles.first)) {
+ ctx->datafiles.first = datafile;
+ }
+ ctx->datafiles.last = datafile;
+}
+
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_datafile *next;
+
+ next = datafile->next;
+ fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
+ ctx->datafiles.first = next;
+}
+
+
+static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
+ unsigned tier, unsigned fileno)
+{
+ fatal_assert(tier == 1);
+ datafile->tier = tier;
+ datafile->fileno = fileno;
+ datafile->file = (uv_file)0;
+ datafile->pos = 0;
+ datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
+ datafile->journalfile = NULL;
+ datafile->next = NULL;
+ datafile->ctx = ctx;
+}
+
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+int close_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+int unlink_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return ret;
+}
+
+int destroy_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return ret;
+}
+
+int create_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ datafile->file = file;
+ ++ctx->stats.datafile_creations;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
+ superblock->tier = 1;
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+ if (ret < 0) {
+ destroy_data_file(datafile);
+ return ret;
+ }
+
+ datafile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+
+ return 0;
+}
+
+static int check_data_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
+ superblock->tier != 1) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static int load_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd, error;
+ uint64_t file_size;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_RDWR, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ info("Initializing data file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_CEILING(file_size);
+
+ ret = check_data_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
+ ++ctx->stats.io_read_requests;
+
+ datafile->file = file;
+ datafile->pos = file_size;
+
+ info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ return error;
+}
+
+static int scan_data_files_cmp(const void *a, const void *b)
+{
+ struct rrdengine_datafile *file1, *file2;
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
+
+ file1 = *(struct rrdengine_datafile **)a;
+ file2 = *(struct rrdengine_datafile **)b;
+ generate_datafilepath(file1, path1, sizeof(path1));
+ generate_datafilepath(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of datafiles that were loaded or < 0 on error */
+static int scan_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+ unsigned tier, no, matched_files, i,failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct rrdengine_datafile **datafiles, *datafile;
+ struct rrdengine_journalfile *journalfile;
+
+ ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
+ info("Found %d files in path %s", ret, ctx->dbfiles_path);
+
+ datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
+ ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
+ if (2 == ret) {
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, no);
+ datafiles[matched_files++] = datafile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (0 == matched_files) {
+ freez(datafiles);
+ return 0;
+ }
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ /* TODO: change this when tiering is implemented */
+ ctx->last_fileno = datafiles[matched_files - 1]->fileno;
+
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ uint8_t must_delete_pair = 0;
+
+ datafile = datafiles[i];
+ ret = load_data_file(datafile);
+ if (0 != ret) {
+ must_delete_pair = 1;
+ }
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = load_journal_file(ctx, journalfile, datafile);
+ if (0 != ret) {
+ if (!must_delete_pair) /* If datafile is still open close it */
+ close_data_file(datafile);
+ must_delete_pair = 1;
+ }
+ if (must_delete_pair) {
+ char path[RRDENG_PATH_MAX];
+
+ error("Deleting invalid data and journal file pair.");
+ ret = unlink_journal_file(journalfile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ }
+ ret = unlink_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ }
+ freez(journalfile);
+ freez(datafile);
+ ++failed_to_load;
+ continue;
+ }
+
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+ }
+ matched_files -= failed_to_load;
+ freez(datafiles);
+
+ return matched_files;
+}
+
+/* Creates a datafile and a journalfile pair */
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+{
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ info("Creating new data and journal files in path %s", ctx->dbfiles_path);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, fileno);
+ ret = create_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Created data file \"%s\".", path);
+ } else {
+ goto error_after_datafile;
+ }
+
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = create_journal_file(journalfile, datafile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Created journal file \"%s\".", path);
+ } else {
+ goto error_after_journalfile;
+ }
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+
+ return 0;
+
+error_after_journalfile:
+ destroy_data_file(datafile);
+ freez(journalfile);
+error_after_datafile:
+ freez(datafile);
+ return ret;
+}
+
+/* Page cache must already be initialized.
+ * Return 0 on success.
+ */
+int init_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+
+ ret = scan_data_files(ctx);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ } else if (0 == ret) {
+ info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
+ ret = create_new_datafile_pair(ctx, 1, 1);
+ if (ret) {
+ error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ }
+ ctx->last_fileno = 1;
+ }
+
+ return 0;
+}
+
+void finalize_data_files(struct rrdengine_instance *ctx)
+{
+ struct rrdengine_datafile *datafile, *next_datafile;
+ struct rrdengine_journalfile *journalfile;
+ struct extent_info *extent, *next_extent;
+
+ for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
+ journalfile = datafile->journalfile;
+ next_datafile = datafile->next;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
+ next_extent = extent->next;
+ freez(extent);
+ }
+ close_journal_file(journalfile, datafile);
+ close_data_file(datafile);
+ freez(journalfile);
+ freez(datafile);
+
+ }
+} \ No newline at end of file
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
new file mode 100644
index 0000000..ae94bfd
--- /dev/null
+++ b/database/engine/datafile.h
@@ -0,0 +1,67 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_DATAFILE_H
+#define NETDATA_DATAFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+struct rrdengine_instance;
+
+#define DATAFILE_PREFIX "datafile-"
+#define DATAFILE_EXTENSION ".ndf"
+
+#define MAX_DATAFILE_SIZE (1073741824LU)
+#define MIN_DATAFILE_SIZE (4194304LU)
+#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
+#define TARGET_DATAFILES (20)
+
+#define DATAFILE_IDEAL_IO_SIZE (1048576U)
+
+struct extent_info {
+ uint64_t offset;
+ uint32_t size;
+ uint8_t number_of_pages;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *next;
+ struct rrdeng_page_descr *pages[];
+};
+
+struct rrdengine_df_extents {
+ /* the extent list is sorted based on disk offset */
+ struct extent_info *first;
+ struct extent_info *last;
+};
+
+/* only one event loop is supported for now */
+struct rrdengine_datafile {
+ unsigned tier;
+ unsigned fileno;
+ uv_file file;
+ uint64_t pos;
+ struct rrdengine_instance *ctx;
+ struct rrdengine_df_extents extents;
+ struct rrdengine_journalfile *journalfile;
+ struct rrdengine_datafile *next;
+};
+
+struct rrdengine_datafile_list {
+ struct rrdengine_datafile *first; /* oldest */
+ struct rrdengine_datafile *last; /* newest */
+};
+
+extern void df_extent_insert(struct extent_info *extent);
+extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+extern int close_data_file(struct rrdengine_datafile *datafile);
+extern int unlink_data_file(struct rrdengine_datafile *datafile);
+extern int destroy_data_file(struct rrdengine_datafile *datafile);
+extern int create_data_file(struct rrdengine_datafile *datafile);
+extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+extern int init_data_files(struct rrdengine_instance *ctx);
+extern void finalize_data_files(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
new file mode 100644
index 0000000..9fecc48
--- /dev/null
+++ b/database/engine/journalfile.c
@@ -0,0 +1,515 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+static void flush_transaction_buffer_cb(uv_fs_t* req)
+{
+ struct generic_io_descriptor *io_descr = req->data;
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ if (req->result < 0) {
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ } else {
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ }
+
+ uv_fs_req_cleanup(req);
+ free(io_descr->buf);
+ freez(io_descr);
+}
+
+/* Careful to always call this before creating a new journal file */
+void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ struct generic_io_descriptor *io_descr;
+ unsigned pos, size;
+ struct rrdengine_journalfile *journalfile;
+
+ if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
+ return;
+ }
+ /* care with outstanding transactions when switching journal files */
+ journalfile = ctx->datafiles.last->journalfile;
+
+ io_descr = mallocz(sizeof(*io_descr));
+ pos = ctx->commit_log.buf_pos;
+ size = ctx->commit_log.buf_size;
+ if (pos < size) {
+ /* simulate an empty transaction to skip the rest of the block */
+ *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
+ }
+ io_descr->buf = ctx->commit_log.buf;
+ io_descr->bytes = size;
+ io_descr->pos = journalfile->pos;
+ io_descr->req.data = io_descr;
+ io_descr->completion = NULL;
+
+ io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
+ ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
+ journalfile->pos, flush_transaction_buffer_cb);
+ fatal_assert(-1 != ret);
+ journalfile->pos += RRDENG_BLOCK_SIZE;
+ ctx->disk_space += RRDENG_BLOCK_SIZE;
+ ctx->commit_log.buf = NULL;
+ ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
+ ++ctx->stats.io_write_requests;
+}
+
+void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ unsigned buf_pos = 0, buf_size;
+
+ fatal_assert(size);
+ if (ctx->commit_log.buf) {
+ unsigned remaining;
+
+ buf_pos = ctx->commit_log.buf_pos;
+ buf_size = ctx->commit_log.buf_size;
+ remaining = buf_size - buf_pos;
+ if (size > remaining) {
+ /* we need a new buffer */
+ wal_flush_transaction_buffer(wc);
+ }
+ }
+ if (NULL == ctx->commit_log.buf) {
+ buf_size = ALIGN_BYTES_CEILING(size);
+ ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ buf_pos = ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.buf_size = buf_size;
+ }
+ ctx->commit_log.buf_pos += size;
+
+ return ctx->commit_log.buf + buf_pos;
+}
+
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ journalfile->file = (uv_file)0;
+ journalfile->pos = 0;
+ journalfile->datafile = datafile;
+}
+
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+int unlink_journal_file(struct rrdengine_journalfile *journalfile)
+{
+ struct rrdengine_datafile *datafile = journalfile->datafile;
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return ret;
+}
+
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return ret;
+}
+
+int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ journalfile->file = file;
+ ++ctx->stats.journalfile_creations;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+ if (ret < 0) {
+ destroy_journal_file(journalfile, datafile);
+ return ret;
+ }
+
+ journalfile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+
+ return 0;
+}
+
+static int check_journal_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, unsigned max_size)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned i, count, payload_length, descr_size, valid_pages;
+ struct rrdeng_page_descr *descr;
+ struct extent_info *extent;
+ /* persistent structures */
+ struct rrdeng_jf_store_data *jf_metric_data;
+
+ jf_metric_data = buf;
+ count = jf_metric_data->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ if (payload_length > max_size) {
+ error("Corrupted transaction payload.");
+ return;
+ }
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ extent->offset = jf_metric_data->extent_offset;
+ extent->size = jf_metric_data->extent_size;
+ extent->datafile = journalfile->datafile;
+ extent->next = NULL;
+
+ for (i = 0, valid_pages = 0 ; i < count ; ++i) {
+ uuid_t *temp_id;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+
+ if (PAGE_METRICS != jf_metric_data->descr[i].type) {
+ error("Unknown page type encountered.");
+ continue;
+ }
+ temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ /* First time we see the UUID */
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+
+ descr = pg_cache_create_descr();
+ descr->page_length = jf_metric_data->descr[i].page_length;
+ descr->start_time = jf_metric_data->descr[i].start_time;
+ descr->end_time = jf_metric_data->descr[i].end_time;
+ descr->id = &page_index->id;
+ descr->extent = extent;
+ extent->pages[valid_pages++] = descr;
+ pg_cache_insert(ctx, page_index, descr);
+ }
+
+ extent->number_of_pages = valid_pages;
+
+ if (likely(valid_pages))
+ df_extent_insert(extent);
+ else
+ freez(extent);
+}
+
+/*
+ * Replays transaction by interpreting up to max_size bytes from buf.
+ * Sets id to the current transaction id or to 0 if unknown.
+ * Returns size of transaction record or 0 for unknown size.
+ */
+static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, uint64_t *id, unsigned max_size)
+{
+ unsigned payload_length, size_bytes;
+ int ret;
+ /* persistent structures */
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ *id = 0;
+ jf_header = buf;
+ if (STORE_PADDING == jf_header->type) {
+ debug(D_RRDENGINE, "Skipping padding.");
+ return 0;
+ }
+ if (sizeof(*jf_header) > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ *id = jf_header->id;
+ payload_length = jf_header->payload_length;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+ if (size_bytes > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ ret = crc32cmp(jf_trailer->checksum, crc);
+ debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
+ if (unlikely(ret)) {
+ error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
+ return size_bytes;
+ }
+ switch (jf_header->type) {
+ case STORE_DATA:
+ debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ break;
+ default:
+ error("Unknown transaction type. Skipping record.");
+ break;
+ }
+
+ return size_bytes;
+}
+
+
+#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
+/*
+ * Iterates journal file transactions and populates the page cache.
+ * Page cache must already be initialized.
+ * Returns the maximum transaction id it discovered.
+ */
+static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
+{
+ uv_file file;
+ uint64_t file_size;//, data_file_size;
+ int ret;
+ uint64_t pos, pos_i, max_id, id;
+ unsigned size_bytes;
+ void *buf;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ file = journalfile->file;
+ file_size = journalfile->pos;
+ //data_file_size = journalfile->datafile->pos; TODO: utilize this?
+
+ max_id = 1;
+ ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+
+ for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
+ size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
+ iov = uv_buf_init(buf, size_bytes);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_read: %s", uv_strerror(ret));
+ /*uv_fs_req_cleanup(&req);*/
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+ ctx->stats.io_read_bytes += size_bytes;
+ ++ctx->stats.io_read_requests;
+
+ //pos_i = pos;
+ //while (pos_i < pos + size_bytes) {
+ for (pos_i = 0 ; pos_i < size_bytes ; ) {
+ unsigned max_size;
+
+ max_size = pos + size_bytes - pos_i;
+ ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
+ if (!ret) /* TODO: support transactions bigger than 4K */
+ /* unknown transaction size, move on to the next block */
+ pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
+ else
+ pos_i += ret;
+ max_id = MAX(max_id, id);
+ }
+ }
+
+ free(buf);
+ return max_id;
+}
+
+int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile)
+{
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd, error;
+ uint64_t file_size, max_id;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_RDWR, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ info("Loading journal file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_FLOOR(file_size);
+
+ ret = check_journal_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
+ ++ctx->stats.io_read_requests;
+
+ journalfile->file = file;
+ journalfile->pos = file_size;
+
+ max_id = iterate_transactions(ctx, journalfile);
+
+ ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
+
+ info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ return error;
+}
+
+void init_commit_log(struct rrdengine_instance *ctx)
+{
+ ctx->commit_log.buf = NULL;
+ ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.transaction_id = 1;
+} \ No newline at end of file
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
new file mode 100644
index 0000000..f6c43cd
--- /dev/null
+++ b/database/engine/journalfile.h
@@ -0,0 +1,49 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_JOURNALFILE_H
+#define NETDATA_JOURNALFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_instance;
+struct rrdengine_worker_config;
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+
+#define WALFILE_PREFIX "journalfile-"
+#define WALFILE_EXTENSION ".njf"
+
+
+/* only one event loop is supported for now */
+struct rrdengine_journalfile {
+ uv_file file;
+ uint64_t pos;
+
+ struct rrdengine_datafile *datafile;
+};
+
+/* only one event loop is supported for now */
+struct transaction_commit_log {
+ uint64_t transaction_id;
+
+ /* outstanding transaction buffer */
+ void *buf;
+ unsigned buf_pos;
+ unsigned buf_size;
+};
+
+extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
+extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int unlink_journal_file(struct rrdengine_journalfile *journalfile);
+extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile);
+extern void init_commit_log(struct rrdengine_instance *ctx);
+
+
+#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file
diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am
new file mode 100644
index 0000000..161784b
--- /dev/null
+++ b/database/engine/metadata_log/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/database/engine/metadata_log/README.md
diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c
new file mode 100644
index 0000000..ba19e1e
--- /dev/null
+++ b/database/engine/metadata_log/compaction.c
@@ -0,0 +1,86 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+
+/* Return 0 on success. */
+int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
+ unsigned *matched_files)
+{
+ int ret;
+ unsigned starting_fileno, fileno, i, j, recovered_files;
+ struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ for (i = 0 ; i < *matched_files ; ++i) {
+ metalogfile = metalogfiles[i];
+ if (0 == metalogfile->starting_fileno)
+ continue; /* skip standard metadata log files */
+ break; /* this is a compaction temporary file */
+ }
+ if (i == *matched_files) /* no recovery needed */
+ return 0;
+ info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+
+ if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */
+ error("Metadata log files are in an invalid state. Cannot proceed.");
+ return 1;
+ }
+ compactionfile = metalogfile;
+ starting_fileno = compactionfile->starting_fileno;
+ fileno = compactionfile->fileno;
+ /* scratchpad space to move file pointers around */
+ tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles));
+
+ for (j = 0, recovered_files = 0 ; j < i ; ++j) {
+ metalogfile = metalogfiles[j];
+ fatal_assert(0 == metalogfile->starting_fileno);
+ if (metalogfile->fileno < starting_fileno) {
+ tmp_metalogfiles[recovered_files++] = metalogfile;
+ continue;
+ }
+ break; /* reached compaction file serial number */
+ }
+
+ if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ ||
+ (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) {
+ error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno);
+ unlink_metadata_logfile(compactionfile);
+ freez(compactionfile);
+ freez(tmp_metalogfiles);
+ --*matched_files; /* delete the last one */
+
+ info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+ return 0;
+ }
+
+ for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */
+ metalogfile = metalogfiles[j];
+ fatal_assert(0 == metalogfile->starting_fileno);
+ if (metalogfile->fileno < fileno) { /* It has already been compacted */
+ error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno);
+ unlink_metadata_logfile(metalogfile);
+ freez(metalogfile);
+ continue;
+ }
+ tmp_metalogfiles[recovered_files++] = metalogfile;
+ }
+
+ /* compaction temporary file is valid */
+ tmp_metalogfiles[recovered_files++] = compactionfile;
+ ret = rename_metadata_logfile(compactionfile, 0, starting_fileno);
+ if (ret < 0) {
+ error("Cannot rename temporary compaction files. Cannot proceed.");
+ freez(tmp_metalogfiles);
+ return 1;
+ }
+
+ memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles));
+ *matched_files = recovered_files;
+ freez(tmp_metalogfiles);
+
+ info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+ return 0;
+}
diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h
new file mode 100644
index 0000000..d046134
--- /dev/null
+++ b/database/engine/metadata_log/compaction.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_COMPACTION_H
+#define NETDATA_COMPACTION_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include "../rrdengine.h"
+
+extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
+ unsigned *matched_files);
+
+#endif /* NETDATA_COMPACTION_H */
diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c
new file mode 100644
index 0000000..b7c5c06
--- /dev/null
+++ b/database/engine/metadata_log/logfile.c
@@ -0,0 +1,453 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include <database/sqlite/sqlite_functions.h>
+#include "metadatalog.h"
+#include "metalogpluginsd.h"
+
+
+void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION,
+ metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
+}
+
+void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno,
+ unsigned fileno)
+{
+ metalogfile->starting_fileno = starting_fileno;
+ metalogfile->fileno = fileno;
+ metalogfile->file = (uv_file)0;
+ metalogfile->pos = 0;
+ metalogfile->next = NULL;
+ metalogfile->ctx = ctx;
+}
+
+int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno)
+{
+ //struct metalog_instance *ctx = metalogfile->ctx;
+ uv_fs_t req;
+ int ret;
+ char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX];
+ unsigned backup_starting_fileno, backup_fileno;
+
+ backup_starting_fileno = metalogfile->starting_fileno;
+ backup_fileno = metalogfile->fileno;
+ generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath));
+ metalogfile->starting_fileno = new_starting_fileno;
+ metalogfile->fileno = new_fileno;
+ generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath));
+
+ info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath);
+ ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL);
+ if (ret < 0) {
+ error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret));
+ //++ctx->stats.fs_errors; /* this is racy, may miss some errors */
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ /* restore previous values */
+ metalogfile->starting_fileno = backup_starting_fileno;
+ metalogfile->fileno = backup_fileno;
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
+{
+ //struct metalog_instance *ctx = metalogfile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_metadata_logfile_path(metalogfile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+static int check_metadata_logfile_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_metalog_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ if (superblock->version > RRDENG_METALOG_VER) {
+ error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version);
+ }
+error:
+ free(superblock);
+ return ret;
+}
+
+void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload)
+{
+ struct metalog_instance *ctx = metalogfile->ctx;
+ char *line, *nextline, *record_end;
+ int ret;
+
+ debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload);
+ record_end = (char *)payload + header->payload_length - 1;
+ *record_end = '\0';
+
+ for (line = payload ; line ; line = nextline) {
+ nextline = strchr(line, '\n');
+ if (nextline) {
+ *nextline++ = '\0';
+ }
+ ret = parser_action(ctx->metalog_parser_object->parser, line);
+ debug(D_METADATALOG, "parser_action ret:%d", ret);
+ if (ret)
+ return; /* skip record due to error */
+ };
+}
+
+/* This function only works with buffered I/O */
+static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset)
+{
+// struct metalog_instance *ctx;
+ uv_file file;
+ uv_buf_t iov;
+ uv_fs_t req;
+ int ret;
+
+// ctx = metalogfile->ctx;
+ file = metalogfile->file;
+ iov = uv_buf_init(buf, len);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL);
+ if (unlikely(ret < 0 && ret != req.result)) {
+ fatal("uv_fs_read: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+// ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__,
+ uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno);
+ }
+ uv_fs_req_cleanup(&req);
+// ctx->stats.io_read_bytes += len;
+// ++ctx->stats.io_read_requests;
+
+ return ret;
+}
+
+/* Return 0 on success */
+static int metadata_record_integrity_check(void *record)
+{
+ int ret;
+ uint32_t data_size;
+ struct rrdeng_metalog_record_header *header;
+ struct rrdeng_metalog_record_trailer *trailer;
+ uLong crc;
+
+ header = record;
+ data_size = header->header_length + header->payload_length;
+ trailer = record + data_size;
+
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, record, data_size);
+ ret = crc32cmp(trailer->checksum, crc);
+
+ return ret;
+}
+
+#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */
+
+/*
+ * Iterates metadata log file records and creates database objects (host/chart/dimension)
+ */
+static void iterate_records(struct metadata_logfile *metalogfile)
+{
+ uint32_t file_size, pos, bytes_remaining, record_size;
+ void *buf;
+ struct rrdeng_metalog_record_header *header;
+ struct metalog_instance *ctx = metalogfile->ctx;
+ struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private;
+ const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) +
+ sizeof(header->header_length);
+
+ file_size = metalogfile->pos;
+ state->metalogfile = metalogfile;
+
+ buf = mallocz(MAX_READ_BYTES);
+
+ for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) {
+ bytes_remaining = file_size - pos;
+ if (bytes_remaining < min_header_size) {
+ error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ break;
+ }
+ if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0)
+ break;
+ header = (struct rrdeng_metalog_record_header *)buf;
+ if (METALOG_STORE_PADDING == header->type) {
+ info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos;
+ continue;
+ }
+ if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size,
+ pos + min_header_size) < 0)
+ break;
+ record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer);
+ if (header->header_length < min_header_size || record_size > bytes_remaining) {
+ error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ break;
+ }
+ if (record_size > MAX_READ_BYTES) {
+ error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size,
+ metalogfile->starting_fileno, metalogfile->fileno);
+ continue;
+ }
+ if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header),
+ pos + sizeof(*header)) < 0)
+ break;
+ if (metadata_record_integrity_check(buf)) {
+ error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos);
+ continue;
+ }
+ debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__,
+ pos);
+
+ replay_record(metalogfile, header, buf + header->header_length);
+ }
+
+ freez(buf);
+}
+
+int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile)
+{
+ UNUSED(ctx);
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd, error;
+ uint64_t file_size;
+ char path[RRDENG_PATH_MAX];
+
+ generate_metadata_logfile_path(metalogfile, path, sizeof(path));
+ if (file_is_migrated(path))
+ return 0;
+
+ fd = open_file_buffered_io(path, O_RDWR, &file);
+ if (fd < 0) {
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ info("Loading metadata log \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb));
+ if (ret)
+ goto error;
+
+ ret = check_metadata_logfile_superblock(file);
+ if (ret)
+ goto error;
+// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
+// ++ctx->stats.io_read_requests;
+
+ metalogfile->file = file;
+ metalogfile->pos = file_size;
+
+ iterate_records(metalogfile);
+
+ info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size);
+ add_migrated_file(path, file_size);
+ return 0;
+
+error:
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ return error;
+}
+
+static int scan_metalog_files_cmp(const void *a, const void *b)
+{
+ struct metadata_logfile *file1, *file2;
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
+
+ file1 = *(struct metadata_logfile **)a;
+ file2 = *(struct metadata_logfile **)b;
+ generate_metadata_logfile_path(file1, path1, sizeof(path1));
+ generate_metadata_logfile_path(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of metadata logfiles that were loaded or < 0 on error */
+static int scan_metalog_files(struct metalog_instance *ctx)
+{
+ int ret;
+ unsigned starting_no, no, matched_files, i, failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct metadata_logfile **metalogfiles, *metalogfile;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
+ info("Found %d files in path %s", ret, dbfiles_path);
+
+ metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s/%s\"", dbfiles_path, dent.name);
+ ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no);
+ if (2 == ret) {
+ info("Matched file \"%s/%s\"", dbfiles_path, dent.name);
+ metalogfile = mallocz(sizeof(*metalogfile));
+ metadata_logfile_init(metalogfile, ctx, starting_no, no);
+ metalogfiles[matched_files++] = metalogfile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (0 == matched_files) {
+ freez(metalogfiles);
+ return 0;
+ }
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp);
+ ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files);
+ if (ret) { /* If the files are corrupted fail */
+ for (i = 0 ; i < matched_files ; ++i) {
+ freez(metalogfiles[i]);
+ }
+ freez(metalogfiles);
+ return UV_EINVAL;
+ }
+ //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno;
+
+ struct plugind cd = {
+ .enabled = 1,
+ .update_every = 0,
+ .pid = 0,
+ .serial_failures = 0,
+ .successful_collections = 0,
+ .obsolete = 0,
+ .started_t = INVALID_TIME,
+ .next = NULL,
+ .version = 0,
+ };
+
+ struct metalog_pluginsd_state metalog_parser_state;
+ metalog_pluginsd_state_init(&metalog_parser_state, ctx);
+
+ PARSER_USER_OBJECT metalog_parser_object;
+ metalog_parser_object.enabled = cd.enabled;
+ metalog_parser_object.host = ctx->rrdeng_ctx->host;
+ metalog_parser_object.cd = &cd;
+ metalog_parser_object.trust_durations = 0;
+ metalog_parser_object.private = &metalog_parser_state;
+
+ PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT);
+ if (unlikely(!parser)) {
+ error("Failed to initialize metadata log parser.");
+ failed_to_load = matched_files;
+ goto after_failed_to_parse;
+ }
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone);
+ parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action;
+ parser->plugins_action->chart_action = &metalog_pluginsd_chart_action;
+ parser->plugins_action->guid_action = &metalog_pluginsd_guid_action;
+ parser->plugins_action->context_action = &metalog_pluginsd_context_action;
+ parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action;
+ parser->plugins_action->host_action = &metalog_pluginsd_host_action;
+
+
+ metalog_parser_object.parser = parser;
+ ctx->metalog_parser_object = &metalog_parser_object;
+
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ metalogfile = metalogfiles[i];
+ db_lock();
+ db_execute("BEGIN TRANSACTION;");
+ ret = load_metadata_logfile(ctx, metalogfile);
+ if (0 != ret) {
+ error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
+ unlink_metadata_logfile(metalogfile);
+ ++failed_to_load;
+ db_execute("ROLLBACK TRANSACTION;");
+ }
+ else
+ db_execute("COMMIT TRANSACTION;");
+ db_unlock();
+ freez(metalogfile);
+ }
+ matched_files -= failed_to_load;
+ debug(D_METADATALOG, "PARSER ended");
+
+ parser_destroy(parser);
+
+ size_t count __maybe_unused = metalog_parser_object.count;
+
+ debug(D_METADATALOG, "Parsing count=%u", (unsigned)count);
+after_failed_to_parse:
+
+ freez(metalogfiles);
+
+ return matched_files;
+}
+
+/* Return 0 on success. */
+int init_metalog_files(struct metalog_instance *ctx)
+{
+ int ret;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ ret = scan_metalog_files(ctx);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", dbfiles_path);
+ return ret;
+ }/* else if (0 == ret) {
+ ctx->last_fileno = 1;
+ }*/
+
+ return 0;
+}
diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h
new file mode 100644
index 0000000..df12ac7
--- /dev/null
+++ b/database/engine/metadata_log/logfile.h
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_LOGFILE_H
+#define NETDATA_LOGFILE_H
+
+#include "metadatalogprotocol.h"
+#include "../rrdengine.h"
+
+/* Forward declarations */
+struct metadata_logfile;
+struct metalog_worker_config;
+
+#define METALOG_PREFIX "metadatalog-"
+#define METALOG_EXTENSION ".mlf"
+
+/* only one event loop is supported for now */
+struct metadata_logfile {
+ unsigned fileno; /* Starts at 1 */
+ unsigned starting_fileno; /* 0 for normal files, staring number during compaction */
+ uv_file file;
+ uint64_t pos;
+ struct metalog_instance *ctx;
+ struct metadata_logfile *next;
+};
+
+struct metadata_logfile_list {
+ struct metadata_logfile *first; /* oldest */
+ struct metadata_logfile *last; /* newest */
+};
+
+extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen);
+extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno,
+ unsigned new_fileno);
+extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile);
+extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile);
+extern int init_metalog_files(struct metalog_instance *ctx);
+
+
+#endif /* NETDATA_LOGFILE_H */
diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h
new file mode 100644
index 0000000..b484686
--- /dev/null
+++ b/database/engine/metadata_log/metadatalog.h
@@ -0,0 +1,28 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOG_H
+#define NETDATA_METADATALOG_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include "../rrdengine.h"
+#include "metadatalogprotocol.h"
+#include "logfile.h"
+#include "metadatalogapi.h"
+#include "compaction.h"
+
+/* Forward declerations */
+struct metalog_instance;
+struct parser_user_object;
+
+#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u"
+#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u"
+
+struct metalog_instance {
+ struct rrdengine_instance *rrdeng_ctx;
+ struct parser_user_object *metalog_parser_object;
+ uint8_t initialized; /* set to 1 to mark context initialized */
+};
+
+#endif /* NETDATA_METADATALOG_H */
diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c
new file mode 100755
index 0000000..b206cca
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogapi.c
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+
+/*
+ * Returns 0 on success, negative on error
+ */
+int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx)
+{
+ struct metalog_instance *ctx;
+ int error;
+
+ ctx = callocz(1, sizeof(*ctx));
+ ctx->initialized = 0;
+ rrdeng_parent_ctx->metalog_ctx = ctx;
+
+ ctx->rrdeng_ctx = rrdeng_parent_ctx;
+ error = init_metalog_files(ctx);
+ if (error) {
+ goto error_after_init_rrd_files;
+ }
+ ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */
+ return 0;
+
+error_after_init_rrd_files:
+ freez(ctx);
+ return UV_EIO;
+}
+
+/* This function is called by dbengine rotation logic when the metric has no writers */
+void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
+{
+ uuid_t multihost_uuid;
+
+ delete_dimension_uuid(metric_uuid);
+ rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid);
+ delete_dimension_uuid(&multihost_uuid);
+}
diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h
new file mode 100644
index 0000000..d558b93
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogapi.h
@@ -0,0 +1,12 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOGAPI_H
+#define NETDATA_METADATALOGAPI_H
+
+extern void metalog_commit_delete_chart(RRDSET *st);
+extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid);
+
+/* must call once before using anything */
+extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx);
+
+#endif /* NETDATA_METADATALOGAPI_H */
diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h
new file mode 100644
index 0000000..1017213
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogprotocol.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOGPROTOCOL_H
+#define NETDATA_METADATALOGPROTOCOL_H
+
+#include "../rrddiskprotocol.h"
+
+#define RRDENG_METALOG_MAGIC "netdata-metadata-log"
+
+#define RRDENG_METALOG_VER (1)
+
+#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t)))
+/*
+ * Metadata log persistent super-block
+ */
+struct rrdeng_metalog_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ uint16_t version;
+ uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Metadata log record types
+ */
+#define METALOG_STORE_PADDING (0)
+#define METALOG_CREATE_OBJECT (1)
+#define METALOG_DELETE_OBJECT (2)
+#define METALOG_OTHER (3) /* reserved */
+
+/*
+ * Metadata log record header
+ */
+struct rrdeng_metalog_record_header {
+ /* when set to METALOG_STORE_PADDING jump to start of next block */
+ uint8_t type;
+
+ uint16_t header_length;
+ uint32_t payload_length;
+ /******************************************************
+ * No fields above this point can ever change. *
+ ******************************************************
+ * All fields below this point are subject to change. *
+ ******************************************************/
+} __attribute__ ((packed));
+
+/*
+ * Metadata log record trailer
+ */
+struct rrdeng_metalog_record_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+#endif /* NETDATA_METADATALOGPROTOCOL_H */
diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c
new file mode 100755
index 0000000..88c1453
--- /dev/null
+++ b/database/engine/metadata_log/metalogpluginsd.c
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+#include "metalogpluginsd.h"
+
+extern struct config stream_config;
+
+PARSER_RC metalog_pluginsd_host_action(
+ void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone,
+ char *tags)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
+ if (host) {
+ if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) {
+ error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.",
+ host->hostname, rrd_memory_mode_name(host->rrd_memory_mode),
+ rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE));
+ ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */
+ }
+ ((PARSER_USER_OBJECT *) user)->host = host;
+ return PARSER_RC_OK;
+ }
+
+ if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) {
+ ((PARSER_USER_OBJECT *) user)->host = host;
+ return PARSER_RC_OK;
+ }
+
+ if (likely(!uuid_parse(machine_guid, state->host_uuid))) {
+ int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags);
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed to store host %s with UUID %s in the database", hostname, machine_guid);
+ }
+ }
+ else {
+ errno = 0;
+ error("Host machine GUID %s is not valid", machine_guid);
+ }
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context,
+ char *title, char *units, char *plugin, char *module, int priority,
+ int update_every, RRDSET_TYPE chart_type, char *options)
+{
+ UNUSED(options);
+
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+ RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+ if (unlikely(uuid_is_null(state->host_uuid))) {
+ debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host.");
+ return PARSER_RC_OK;
+ }
+ uuid_copy(state->chart_uuid, state->uuid);
+ uuid_clear(state->uuid); /* Consume UUID */
+ (void) sql_store_chart(&state->chart_uuid, &state->host_uuid,
+ type, id, name, family, context, title, units,
+ plugin, module, priority, update_every,
+ chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 1;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
+ long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+ UNUSED(user);
+ UNUSED(options);
+ UNUSED(algorithm);
+ UNUSED(st);
+
+ if (unlikely(uuid_is_null(state->chart_uuid))) {
+ debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart.");
+ info("Ignoring dimension belonging to missing or ignored chart.");
+ return PARSER_RC_OK;
+ }
+
+ if (unlikely(uuid_is_null(state->uuid))) {
+ debug(D_METADATALOG, "Ignoring dimension without unknown UUID");
+ info("Ignoring dimension without unknown UUID");
+ return PARSER_RC_OK;
+ }
+
+ (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type);
+ uuid_clear(state->uuid); /* Consume UUID */
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ uuid_copy(state->uuid, *uuid);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ int rc = find_uuid_type(uuid);
+
+ if (rc == 1) {
+ uuid_copy(state->host_uuid, *uuid);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 0;
+ ((PARSER_USER_OBJECT *)user)->host_exists = 1;
+ } else if (rc == 2) {
+ uuid_copy(state->chart_uuid, *uuid);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 1;
+ } else
+ uuid_copy(state->uuid, *uuid);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid)
+{
+ UNUSED(user);
+ UNUSED(uuid);
+
+ return PARSER_RC_OK;
+}
+
+void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx)
+{
+ state->ctx = ctx;
+ state->skip_record = 0;
+ uuid_clear(state->uuid);
+ state->metalogfile = NULL;
+}
diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h
new file mode 100644
index 0000000..96808aa
--- /dev/null
+++ b/database/engine/metadata_log/metalogpluginsd.h
@@ -0,0 +1,33 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METALOGPLUGINSD_H
+#define NETDATA_METALOGPLUGINSD_H
+
+#include "../../../collectors/plugins.d/pluginsd_parser.h"
+#include "../../../collectors/plugins.d/plugins_d.h"
+#include "../../../parser/parser.h"
+
+struct metalog_pluginsd_state {
+ struct metalog_instance *ctx;
+ uuid_t uuid;
+ uuid_t host_uuid;
+ uuid_t chart_uuid;
+ uint8_t skip_record; /* skip this record due to errors in parsing */
+ struct metadata_logfile *metalogfile; /* current metadata log file being replayed */
+};
+
+extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx);
+
+extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family,
+ char *context, char *title, char *units, char *plugin, char *module,
+ int priority, int update_every, RRDSET_TYPE chart_type, char *options);
+extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
+ long multiplier, long divisor, char *options,
+ RRD_ALGORITHM algorithm_type);
+extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags);
+
+#endif /* NETDATA_METALOGPLUGINSD_H */
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
new file mode 100644
index 0000000..a182071
--- /dev/null
+++ b/database/engine/pagecache.c
@@ -0,0 +1,1178 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+/* Forward declerations */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
+
+/* always inserts into tail */
+static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if (likely(NULL != pg_cache->replaceQ.tail)) {
+ pg_cache_descr->prev = pg_cache->replaceQ.tail;
+ pg_cache->replaceQ.tail->next = pg_cache_descr;
+ }
+ if (unlikely(NULL == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = pg_cache_descr;
+ }
+ pg_cache->replaceQ.tail = pg_cache_descr;
+}
+
+static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
+
+ prev = pg_cache_descr->prev;
+ next = pg_cache_descr->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = next;
+ }
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
+ pg_cache->replaceQ.tail = prev;
+ }
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
+}
+
+void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+struct rrdeng_page_descr *pg_cache_create_descr(void)
+{
+ struct rrdeng_page_descr *descr;
+
+ descr = mallocz(sizeof(*descr));
+ descr->page_length = 0;
+ descr->start_time = INVALID_TIME;
+ descr->end_time = INVALID_TIME;
+ descr->id = NULL;
+ descr->extent = NULL;
+ descr->pg_cache_descr_state = 0;
+ descr->pg_cache_descr = NULL;
+
+ return descr;
+}
+
+/* The caller must hold page descriptor lock. */
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->waiters)
+ uv_cond_broadcast(&pg_cache_descr->cond);
+}
+
+void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
+ --pg_cache_descr->waiters;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
+ */
+int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
+{
+ int ret;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
+ --pg_cache_descr->waiters;
+
+ return ret;
+}
+
+/*
+ * Returns page flags.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ unsigned long flags;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_wait_event_unsafe(descr);
+ flags = pg_cache_descr->flags;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ return flags;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ */
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * Gets a reference to the page descriptor.
+ * Returns 1 on success and 0 on failure.
+ */
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if (!pg_cache_can_get_unsafe(descr, exclusive_access))
+ return 0;
+
+ if (exclusive_access)
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
+
+ return 1;
+}
+
+/*
+ * The caller must hold the page descriptor lock.
+ * This function may block doing cleanup.
+ */
+void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ if (0 == --pg_cache_descr->refcnt) {
+ pg_cache_wake_up_waiters_unsafe(descr);
+ }
+}
+
+/*
+ * This function may block doing cleanup.
+ */
+void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_put_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+}
+
+/* The caller must hold the page cache lock */
+static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->populated_pages -= number;
+}
+
+static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_release_pages_unsafe(ctx, number);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * This function returns the maximum number of pages allowed in the page cache.
+ */
+unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
+{
+ /* it's twice the number of producers since we pin 2 pages per producer */
+ return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
+ * number of pages below that number.
+ */
+unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
+{
+ /* it's twice the number of producers since we pin 2 pages per producer */
+ return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
+ * cache.
+ */
+unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
+{
+ /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
+ return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function will block until it reserves #number populated pages.
+ * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
+ */
+static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned failures = 0;
+ const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
+ unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
+ debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
+ number);
+ while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
+
+ if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
+ /* failed to evict */
+ struct completion compl;
+ struct rrdeng_cmd cmd;
+
+ ++failures;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ init_completion(&compl);
+ cmd.opcode = RRDENG_FLUSH_PAGES;
+ cmd.completion = &compl;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ /* wait for some pages to be flushed */
+ debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
+ wait_for_completion(&compl);
+ destroy_completion(&compl);
+
+ if (unlikely(failures > 1)) {
+ unsigned long slots;
+ /* exponential backoff */
+ slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
+ (void)sleep_usec(slots * exp_backoff_slot_usec);
+ }
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ }
+ }
+ pg_cache->populated_pages += number;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * This function will attempt to reserve #number populated pages.
+ * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
+ * Returns 0 on failure and 1 on success.
+ */
+static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned count = 0;
+ int ret = 0;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
+ debug(D_RRDENGINE,
+ "==Page cache full. Trying to reserve %u pages.==",
+ number);
+ do {
+ if (!pg_cache_try_evict_one_page_unsafe(ctx))
+ break;
+ ++count;
+ } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
+ debug(D_RRDENGINE, "Evicted %u pages.", count);
+ }
+
+ if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
+ pg_cache->populated_pages += number;
+ ret = 1; /* success */
+ }
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ return ret;
+}
+
+/* The caller must hold the page cache and the page descriptor locks in that order */
+static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ freez(pg_cache_descr->page);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
+ pg_cache_release_pages_unsafe(ctx, 1);
+ ++ctx->stats.pg_cache_evictions;
+}
+
+/*
+ * The caller must hold the page cache lock.
+ * Lock order: page cache -> replaceQ -> page descriptor
+ * This function iterates all pages and tries to evict one.
+ * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
+ * or it sets it to NULL if no write-back is in progress.
+ *
+ * Returns 1 on success and 0 on failure.
+ */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long old_flags;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr = NULL;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
+ descr = pg_cache_descr->descr;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ old_flags = pg_cache_descr->flags;
+ if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
+ /* must evict */
+ pg_cache_evict_unsafe(ctx, descr);
+ pg_cache_put_unsafe(descr);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+
+ return 1;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ /* failed to evict */
+ return 0;
+}
+
+/**
+ * Deletes a page from the database.
+ * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
+ * @param ctx is the database instance.
+ * @param descr is the page descriptor.
+ * @param remove_dirty must be non-zero if the page to be deleted is dirty.
+ * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
+ * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
+ * NULL. Otherwise, metric_id is not set.
+ * @return 1 if it's safe to delete the metric, 0 otherwise.
+ */
+uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
+ uint8_t is_exclusive_holder, uuid_t *metric_id)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ int ret;
+ uint8_t can_delete_metric = 0;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ fatal_assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ uv_rwlock_wrlock(&page_index->lock);
+ ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ if (unlikely(0 == ret)) {
+ uv_rwlock_wrunlock(&page_index->lock);
+ error("Page under deletion was not in index.");
+ if (unlikely(debug_flags & D_RRDENGINE)) {
+ print_page_descr(descr);
+ }
+ goto destroy;
+ }
+ --page_index->page_count;
+ if (!page_index->writers && !page_index->page_count) {
+ can_delete_metric = 1;
+ if (metric_id) {
+ memcpy(metric_id, page_index->id, sizeof(uuid_t));
+ }
+ }
+ uv_rwlock_wrunlock(&page_index->lock);
+ fatal_assert(1 == ret);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_deletions;
+ --pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!is_exclusive_holder) {
+ /* If we don't hold an exclusive page reference get one */
+ while (!pg_cache_try_get_unsafe(descr, 1)) {
+ debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ }
+ if (remove_dirty) {
+ pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
+ } else {
+ /* even a locked page could be dirty */
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ /* only after locking can it be safely deleted from LRU */
+ pg_cache_replaceQ_delete(ctx, descr);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_evict_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+ }
+ pg_cache_put(ctx, descr);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+ while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
+ (void)sleep_usec(1000); /* 1 msec */
+ }
+destroy:
+ freez(descr);
+ pg_cache_update_metric_times(page_index);
+
+ return can_delete_metric;
+}
+
+static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
+{
+ usec_t pg_start, pg_end;
+
+ pg_start = descr->start_time;
+ pg_end = descr->end_time;
+
+ return (pg_start < start_time && pg_end >= start_time) ||
+ (pg_start >= start_time && pg_start <= end_time);
+}
+
+static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
+{
+ return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
+}
+
+/* The caller must hold the page index lock */
+static inline struct rrdeng_page_descr *
+ find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
+{
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ return NULL;
+}
+
+/* Update metric oldest and latest timestamps efficiently when adding new values */
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
+{
+ usec_t oldest_time = page_index->oldest_time;
+ usec_t latest_time = page_index->latest_time;
+
+ if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
+ page_index->oldest_time = descr->start_time;
+ }
+ if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
+ page_index->latest_time = descr->end_time;
+ }
+}
+
+/* Update metric oldest and latest timestamps when removing old values */
+void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
+{
+ Pvoid_t *firstPValue, *lastPValue;
+ Word_t firstIndex, lastIndex;
+ struct rrdeng_page_descr *descr;
+ usec_t oldest_time = INVALID_TIME;
+ usec_t latest_time = INVALID_TIME;
+
+ uv_rwlock_rdlock(&page_index->lock);
+ /* Find first page in range */
+ firstIndex = (Word_t)0;
+ firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
+ if (likely(NULL != firstPValue)) {
+ descr = *firstPValue;
+ oldest_time = descr->start_time;
+ }
+ lastIndex = (Word_t)-1;
+ lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
+ if (likely(NULL != lastPValue)) {
+ descr = *lastPValue;
+ latest_time = descr->end_time;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (unlikely(NULL == firstPValue)) {
+ fatal_assert(NULL == lastPValue);
+ page_index->oldest_time = page_index->latest_time = INVALID_TIME;
+ return;
+ }
+ page_index->oldest_time = oldest_time;
+ page_index->latest_time = latest_time;
+}
+
+/* If index is NULL lookup by UUID (descr->id) */
+void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
+
+ if (0 != pg_cache_descr_state) {
+ /* there is page cache descriptor pre-allocated state */
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ pg_cache_reserve_pages(ctx, 1);
+ if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_insert(ctx, descr);
+ }
+ }
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ fatal_assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ } else {
+ page_index = index;
+ }
+
+ uv_rwlock_wrlock(&page_index->lock);
+ PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ *PValue = descr;
+ ++page_index->page_count;
+ pg_cache_add_new_metric_time(page_index, descr);
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_insertions;
+ ++pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return INVALID_TIME;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ return INVALID_TIME;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ return descr->start_time;
+}
+
+/**
+ * Return page information for the first page before point_in_time that satisfies the filter.
+ * @param ctx DB context
+ * @param page_index page index of a metric
+ * @param point_in_time the pages that are searched must be older than this timestamp
+ * @param filter decides if the page satisfies the caller's criteria
+ * @param page_info the result of the search is set in this pointer
+ */
+void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
+ usec_t point_in_time, pg_cache_page_info_filter_t *filter,
+ struct rrdeng_page_info *page_info)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ (void)pg_cache;
+ fatal_assert(NULL != page_index);
+
+ Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ uv_rwlock_rdlock(&page_index->lock);
+ do {
+ PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ } while (descr != NULL && !filter(descr));
+ if (unlikely(NULL == descr)) {
+ page_info->page_length = 0;
+ page_info->start_time = INVALID_TIME;
+ page_info->end_time = INVALID_TIME;
+ } else {
+ page_info->page_length = descr->page_length;
+ page_info->start_time = descr->start_time;
+ page_info->end_time = descr->end_time;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+}
+/**
+ * Searches for pages in a time range and triggers disk I/O if necessary and possible.
+ * Does not get a reference.
+ * @param ctx DB context
+ * @param id UUID
+ * @param start_time inclusive starting time in usec
+ * @param end_time inclusive ending time in usec
+ * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
+ * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
+ * If page_info_arrayp is set to NULL nothing was allocated.
+ * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
+ * @return the number of pages that overlap with the time range [start_time,end_time].
+ */
+unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
+ struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned i, j, k, preload_count, count, page_info_array_max_size;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ Word_t Index;
+ uint8_t failed_to_reserve;
+
+ fatal_assert(NULL != ret_page_indexp);
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ *ret_page_indexp = page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ *ret_page_indexp = NULL;
+ return 0;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ *ret_page_indexp = NULL;
+ return 0;
+ } else {
+ Index = (Word_t)(descr->start_time / USEC_PER_SEC);
+ }
+ if (page_info_arrayp) {
+ page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = mallocz(page_info_array_max_size);
+ }
+
+ for (count = 0, preload_count = 0 ;
+ descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ /* Iterate all pages in range */
+
+ if (unlikely(0 == descr->page_length))
+ continue;
+ if (page_info_arrayp) {
+ if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
+ page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
+ }
+ (*page_info_arrayp)[count].start_time = descr->start_time;
+ (*page_info_arrayp)[count].end_time = descr->end_time;
+ (*page_info_arrayp)[count].page_length = descr->page_length;
+ }
+ ++count;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ if (pg_cache_can_get_unsafe(descr, 0)) {
+ if (flags & RRD_PAGE_POPULATED) {
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ continue;
+ }
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ preload_array[preload_count++] = descr;
+ if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ break;
+ }
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ failed_to_reserve = 0;
+ for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
+ struct rrdeng_cmd cmd;
+ struct rrdeng_page_descr *next;
+
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.opcode = RRDENG_READ_EXTENT;
+ cmd.read_extent.page_cache_descr[0] = descr;
+ /* don't use this page again */
+ preload_array[i] = NULL;
+ for (j = 0, k = 1 ; j < preload_count ; ++j) {
+ next = preload_array[j];
+ if (NULL == next) {
+ continue;
+ }
+ if (descr->extent == next->extent) {
+ /* same extent, consolidate */
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.read_extent.page_cache_descr[k++] = next;
+ /* don't use this page again */
+ preload_array[j] = NULL;
+ }
+ }
+ cmd.read_extent.page_count = k;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ }
+ if (failed_to_reserve) {
+ debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
+ for (i = 0 ; i < preload_count ; ++i) {
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ pg_cache_put(ctx, descr);
+ }
+ }
+ if (!preload_count) {
+ /* no such page */
+ debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
+ }
+ if (unlikely(0 == count && page_info_arrayp)) {
+ freez(*page_info_arrayp);
+ *page_info_arrayp = NULL;
+ }
+ return count;
+}
+
+/*
+ * Searches for a page and gets a reference.
+ * When point_in_time is INVALID_TIME get any page.
+ * If index is NULL lookup by UUID (id).
+ */
+struct rrdeng_page_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ Word_t Index;
+ uint8_t page_not_in_cache;
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return NULL;
+ }
+ } else {
+ page_index = index;
+ }
+ pg_cache_reserve_pages(ctx, 1);
+
+ page_not_in_cache = 0;
+ uv_rwlock_rdlock(&page_index->lock);
+ while (1) {
+ Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ if (NULL == PValue ||
+ 0 == descr->page_length ||
+ (INVALID_TIME != point_in_time &&
+ !is_point_in_time_in_page(descr, point_in_time))) {
+ /* non-empty page not found */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ break;
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ cmd.opcode = RRDENG_READ_PAGE;
+ cmd.read_page.page_cache_descr = descr;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* success */
+ /* Downgrade exclusive reference to allow other readers */
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ return descr;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ if (!(flags & RRD_PAGE_POPULATED))
+ page_not_in_cache = 1;
+ pg_cache_wait_event_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ /* reset scan to find again */
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (!(flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_set_hot(ctx, descr);
+ pg_cache_release_pages(ctx, 1);
+ if (page_not_in_cache)
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ else
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
+ return descr;
+}
+
+/*
+ * Searches for the first page between start_time and end_time and gets a reference.
+ * start_time and end_time are inclusive.
+ * If index is NULL lookup by UUID (id).
+ */
+struct rrdeng_page_descr *
+pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t start_time, usec_t end_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ uint8_t page_not_in_cache;
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return NULL;
+ }
+ } else {
+ page_index = index;
+ }
+ pg_cache_reserve_pages(ctx, 1);
+
+ page_not_in_cache = 0;
+ uv_rwlock_rdlock(&page_index->lock);
+ while (1) {
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr || 0 == descr->page_length) {
+ /* non-empty page not found */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ break;
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ cmd.opcode = RRDENG_READ_PAGE;
+ cmd.read_page.page_cache_descr = descr;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* success */
+ /* Downgrade exclusive reference to allow other readers */
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ return descr;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ if (!(flags & RRD_PAGE_POPULATED))
+ page_not_in_cache = 1;
+ pg_cache_wait_event_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ /* reset scan to find again */
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (!(flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_set_hot(ctx, descr);
+ pg_cache_release_pages(ctx, 1);
+ if (page_not_in_cache)
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ else
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
+ return descr;
+}
+
+struct pg_cache_page_index *create_page_index(uuid_t *id)
+{
+ struct pg_cache_page_index *page_index;
+
+ page_index = mallocz(sizeof(*page_index));
+ page_index->JudyL_array = (Pvoid_t) NULL;
+ uuid_copy(page_index->id, *id);
+ fatal_assert(0 == uv_rwlock_init(&page_index->lock));
+ page_index->oldest_time = INVALID_TIME;
+ page_index->latest_time = INVALID_TIME;
+ page_index->prev = NULL;
+ page_index->page_count = 0;
+ page_index->writers = 0;
+
+ return page_index;
+}
+
+static void init_metrics_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ pg_cache->metrics_index.last_page_index = NULL;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
+}
+
+static void init_replaceQ(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->replaceQ.head = NULL;
+ pg_cache->replaceQ.tail = NULL;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
+}
+
+static void init_committed_page_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
+ pg_cache->committed_page_index.latest_corr_id = 0;
+ pg_cache->committed_page_index.nr_committed_pages = 0;
+}
+
+void init_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->page_descriptors = 0;
+ pg_cache->populated_pages = 0;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
+
+ init_metrics_index(ctx);
+ init_replaceQ(ctx);
+ init_committed_page_index(ctx);
+}
+
+void free_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Word_t ret_Judy, bytes_freed = 0;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index, *prev_page_index;
+ Word_t Index;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Free committed page index */
+ ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
+ fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
+ bytes_freed += ret_Judy;
+
+ for (page_index = pg_cache->metrics_index.last_page_index ;
+ page_index != NULL ;
+ page_index = prev_page_index) {
+ prev_page_index = page_index->prev;
+
+ /* Find first page in range */
+ Index = (Word_t) 0;
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+
+ while (descr != NULL) {
+ /* Iterate all page descriptors of this metric */
+
+ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ /* Check rrdenglocking.c */
+ pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ freez(pg_cache_descr->page);
+ bytes_freed += RRDENG_BLOCK_SIZE;
+ }
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ bytes_freed += sizeof(*pg_cache_descr);
+ }
+ freez(descr);
+ bytes_freed += sizeof(*descr);
+
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ }
+
+ /* Free page index */
+ ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
+ fatal_assert(NULL == page_index->JudyL_array);
+ bytes_freed += ret_Judy;
+ freez(page_index);
+ bytes_freed += sizeof(*page_index);
+ }
+ /* Free metrics index */
+ ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
+ fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ bytes_freed += ret_Judy;
+
+ info("Freed %lu bytes of memory from page cache.", bytes_freed);
+} \ No newline at end of file
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
new file mode 100644
index 0000000..31e9739
--- /dev/null
+++ b/database/engine/pagecache.h
@@ -0,0 +1,231 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PAGECACHE_H
+#define NETDATA_PAGECACHE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_instance;
+struct extent_info;
+struct rrdeng_page_descr;
+
+#define INVALID_TIME (0)
+
+/* Page flags */
+#define RRD_PAGE_DIRTY (1LU << 0)
+#define RRD_PAGE_LOCKED (1LU << 1)
+#define RRD_PAGE_READ_PENDING (1LU << 2)
+#define RRD_PAGE_WRITE_PENDING (1LU << 3)
+#define RRD_PAGE_POPULATED (1LU << 4)
+
+struct page_cache_descr {
+ struct rrdeng_page_descr *descr; /* parent descriptor */
+ void *page;
+ unsigned long flags;
+ struct page_cache_descr *prev; /* LRU */
+ struct page_cache_descr *next; /* LRU */
+
+ unsigned refcnt;
+ uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */
+ uv_cond_t cond;
+ unsigned waiters;
+};
+
+/* Page cache descriptor flags, state = 0 means no descriptor */
+#define PG_CACHE_DESCR_ALLOCATED (1LU << 0)
+#define PG_CACHE_DESCR_DESTROY (1LU << 1)
+#define PG_CACHE_DESCR_LOCKED (1LU << 2)
+#define PG_CACHE_DESCR_SHIFT (3)
+#define PG_CACHE_DESCR_USERS_MASK (((unsigned long)-1) << PG_CACHE_DESCR_SHIFT)
+#define PG_CACHE_DESCR_FLAGS_MASK (((unsigned long)-1) >> (BITS_PER_ULONG - PG_CACHE_DESCR_SHIFT))
+
+/*
+ * Page cache descriptor state bits (works for both 32-bit and 64-bit architectures):
+ *
+ * 63 ... 31 ... 3 | 2 | 1 | 0|
+ * -----------------------------+------------+------------+-----------|
+ * number of descriptor users | DESTROY | LOCKED | ALLOCATED |
+ */
+struct rrdeng_page_descr {
+ uuid_t *id; /* never changes */
+ struct extent_info *extent;
+
+ /* points to ephemeral page cache descriptor if the page resides in the cache */
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Compare-And-Swap target for page cache descriptor allocation algorithm */
+ volatile unsigned long pg_cache_descr_state;
+
+ /* page information */
+ usec_t start_time;
+ usec_t end_time;
+ uint32_t page_length;
+};
+
+#define PAGE_INFO_SCRATCH_SZ (8)
+struct rrdeng_page_info {
+ uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */
+
+ usec_t start_time;
+ usec_t end_time;
+ uint32_t page_length;
+};
+
+/* returns 1 for success, 0 for failure */
+typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *);
+
+#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
+
+/* maps time ranges to pages */
+struct pg_cache_page_index {
+ uuid_t id;
+ /*
+ * care: JudyL_array indices are converted from useconds to seconds to fit in one word in 32-bit architectures
+ * TODO: examine if we want to support better granularity than seconds
+ */
+ Pvoid_t JudyL_array;
+ Word_t page_count;
+ unsigned short writers;
+ uv_rwlock_t lock;
+
+ /*
+ * Only one effective writer, data deletion workqueue.
+ * It's also written during the DB loading phase.
+ */
+ usec_t oldest_time;
+
+ /*
+ * Only one effective writer, data collection thread.
+ * It's also written by the data deletion workqueue when data collection is disabled for this metric.
+ */
+ usec_t latest_time;
+
+ struct pg_cache_page_index *prev;
+};
+
+/* maps UUIDs to page indices */
+struct pg_cache_metrics_index {
+ uv_rwlock_t lock;
+ Pvoid_t JudyHS_array;
+ struct pg_cache_page_index *last_page_index;
+};
+
+/* gathers dirty pages to be written on disk */
+struct pg_cache_committed_page_index {
+ uv_rwlock_t lock;
+
+ Pvoid_t JudyL_array;
+
+ /*
+ * Dirty page correlation ID is a hint. Dirty pages that are correlated should have
+ * a small correlation ID difference. Dirty pages in memory should never have the
+ * same ID at the same time for correctness.
+ */
+ Word_t latest_corr_id;
+
+ unsigned nr_committed_pages;
+};
+
+/*
+ * Gathers populated pages to be evicted.
+ * Relies on page cache descriptors being there as it uses their memory.
+ */
+struct pg_cache_replaceQ {
+ uv_rwlock_t lock; /* LRU lock */
+
+ struct page_cache_descr *head; /* LRU */
+ struct page_cache_descr *tail; /* MRU */
+};
+
+struct page_cache { /* TODO: add statistics */
+ uv_rwlock_t pg_cache_rwlock; /* page cache lock */
+
+ struct pg_cache_metrics_index metrics_index;
+ struct pg_cache_committed_page_index committed_page_index;
+ struct pg_cache_replaceQ replaceQ;
+
+ unsigned page_descriptors;
+ unsigned populated_pages;
+};
+
+extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
+extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr);
+extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr);
+extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr);
+extern struct rrdeng_page_descr *pg_cache_create_descr(void);
+extern int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access);
+extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_descr *descr);
+extern uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+ uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id);
+extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time, usec_t end_time);
+extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
+ usec_t point_in_time, pg_cache_page_info_filter_t *filter,
+ struct rrdeng_page_info *page_info);
+extern unsigned
+ pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
+ struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp);
+extern struct rrdeng_page_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time);
+extern struct rrdeng_page_descr *
+ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t start_time, usec_t end_time);
+extern struct pg_cache_page_index *create_page_index(uuid_t *id);
+extern void init_page_cache(struct rrdengine_instance *ctx);
+extern void free_page_cache(struct rrdengine_instance *ctx);
+extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
+extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
+extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
+extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
+extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx);
+
+static inline void
+ pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)
+{
+ usec_t end_time, old_end_time;
+ uint32_t page_length;
+
+ if (NULL == descr->extent) {
+ /* this page is currently being modified, get consistent info locklessly */
+ do {
+ end_time = descr->end_time;
+ __sync_synchronize();
+ old_end_time = end_time;
+ page_length = descr->page_length;
+ __sync_synchronize();
+ end_time = descr->end_time;
+ __sync_synchronize();
+ } while ((end_time != old_end_time || (end_time & 1) != 0));
+
+ *end_timep = end_time;
+ *page_lengthp = page_length;
+ } else {
+ *end_timep = descr->end_time;
+ *page_lengthp = descr->page_length;
+ }
+}
+
+/* The caller must hold a reference to the page and must have already set the new data */
+static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length)
+{
+ fatal_assert(!(end_time & 1));
+ __sync_synchronize();
+ descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */
+ __sync_synchronize();
+ descr->page_length = page_length;
+ __sync_synchronize();
+ descr->end_time = end_time; /* mark end of uncertainty period */
+}
+
+#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h
new file mode 100644
index 0000000..db47af5
--- /dev/null
+++ b/database/engine/rrddiskprotocol.h
@@ -0,0 +1,119 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDDISKPROTOCOL_H
+#define NETDATA_RRDDISKPROTOCOL_H
+
+#define RRDENG_BLOCK_SIZE (4096)
+#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE
+
+#define RRDENG_MAGIC_SZ (32)
+#define RRDENG_DF_MAGIC "netdata-data-file"
+#define RRDENG_JF_MAGIC "netdata-journal-file"
+
+#define RRDENG_VER_SZ (16)
+#define RRDENG_DF_VER "1.0"
+#define RRDENG_JF_VER "1.0"
+
+#define UUID_SZ (16)
+#define CHECKSUM_SZ (4) /* CRC32 */
+
+#define RRD_NO_COMPRESSION (0)
+#define RRD_LZ4 (1)
+
+#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t)))
+/*
+ * Data file persistent super-block
+ */
+struct rrdeng_df_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t tier;
+ uint8_t padding[RRDENG_DF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Page types
+ */
+#define PAGE_METRICS (0)
+#define PAGE_LOGS (1) /* reserved */
+
+/*
+ * Data file page descriptor
+ */
+struct rrdeng_extent_page_descr {
+ uint8_t type;
+
+ uint8_t uuid[UUID_SZ];
+ uint32_t page_length;
+ uint64_t start_time;
+ uint64_t end_time;
+} __attribute__ ((packed));
+
+/*
+ * Data file extent header
+ */
+struct rrdeng_df_extent_header {
+ uint32_t payload_length;
+ uint8_t compression_algorithm;
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+/*
+ * Data file extent trailer
+ */
+struct rrdeng_df_extent_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+#define RRDENG_JF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ))
+/*
+ * Journal file super-block
+ */
+struct rrdeng_jf_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t padding[RRDENG_JF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Transaction record types
+ */
+#define STORE_PADDING (0)
+#define STORE_DATA (1)
+#define STORE_LOGS (2) /* reserved */
+
+/*
+ * Journal file transaction record header
+ */
+struct rrdeng_jf_transaction_header {
+ /* when set to STORE_PADDING jump to start of next block */
+ uint8_t type;
+
+ uint32_t reserved; /* reserved for future use */
+ uint64_t id;
+ uint16_t payload_length;
+} __attribute__ ((packed));
+
+/*
+ * Journal file transaction record trailer
+ */
+struct rrdeng_jf_transaction_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+/*
+ * Journal file STORE_DATA action
+ */
+struct rrdeng_jf_store_data {
+ /* data file extent information */
+ uint64_t extent_offset;
+ uint32_t extent_size;
+
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+#endif /* NETDATA_RRDDISKPROTOCOL_H */ \ No newline at end of file
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
new file mode 100644
index 0000000..43135ff
--- /dev/null
+++ b/database/engine/rrdengine.c
@@ -0,0 +1,1266 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+rrdeng_stats_t global_io_errors = 0;
+rrdeng_stats_t global_fs_errors = 0;
+rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
+rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
+rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
+
+static void sanity_check(void)
+{
+ /* Magic numbers must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
+
+ /* Version strings must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ);
+
+ /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
+ BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
+
+ BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */
+
+ /* page count must fit in 8 bits */
+ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
+
+ /* extent cache count must fit in 32 bits */
+ BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
+
+ /* page info scratch space must be able to hold 2 32-bit integers */
+ BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
+}
+
+/* always inserts into tail */
+static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+
+ xt_cache_elem->prev = NULL;
+ xt_cache_elem->next = NULL;
+
+ if (likely(NULL != xt_cache->replaceQ_tail)) {
+ xt_cache_elem->prev = xt_cache->replaceQ_tail;
+ xt_cache->replaceQ_tail->next = xt_cache_elem;
+ }
+ if (unlikely(NULL == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = xt_cache_elem;
+ }
+ xt_cache->replaceQ_tail = xt_cache_elem;
+}
+
+static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *prev, *next;
+
+ prev = xt_cache_elem->prev;
+ next = xt_cache_elem->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = next;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) {
+ xt_cache->replaceQ_tail = prev;
+ }
+ xt_cache_elem->prev = xt_cache_elem->next = NULL;
+}
+
+static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+}
+
+/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */
+static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned idx;
+ int ret;
+
+ ret = find_first_zero(xt_cache->allocation_bitmap);
+ if (-1 == ret || ret >= MAX_CACHED_EXTENTS) {
+ for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) {
+ idx = xt_cache_elem - xt_cache->extent_array;
+ if (!check_bit(xt_cache->inflight_bitmap, idx)) {
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ break;
+ }
+ }
+ if (NULL == xt_cache_elem)
+ return -1;
+ } else {
+ idx = (unsigned)ret;
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ }
+ xt_cache_elem->extent = extent;
+ xt_cache_elem->fileno = extent->datafile->fileno;
+ xt_cache_elem->inflight_io_descr = NULL;
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+ modify_bit(&xt_cache->allocation_bitmap, idx, 1);
+
+ return (int)idx;
+}
+
+/**
+ * Returns 0 if the cached extent was found in the extent cache, 1 otherwise.
+ * Sets *idx to point to the position of the extent inside the cache.
+ **/
+static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned i;
+
+ for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) {
+ xt_cache_elem = &xt_cache->extent_array[i];
+ if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent &&
+ xt_cache_elem->fileno == extent->datafile->fileno) {
+ *idx = i;
+ return 0;
+ }
+ }
+ return 1;
+}
+
+#if 0 /* disabled code */
+static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_elem->extent = NULL;
+ modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */
+ modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */
+}
+#endif
+
+void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx,
+ struct extent_io_descriptor *xt_io_descr)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ struct extent_io_descriptor *old_next;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ old_next = xt_cache_elem->inflight_io_descr->next;
+ xt_cache_elem->inflight_io_descr->next = xt_io_descr;
+ xt_io_descr->next = old_next;
+}
+
+void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr)
+{
+ unsigned i, j, page_offset;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ void *page;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ descr = xt_io_descr->descr_array[i];
+ for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) {
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*extent->pages[j]->id, *descr->id) &&
+ extent->pages[j]->page_length == descr->page_length &&
+ extent->pages[j]->start_time == descr->start_time &&
+ extent->pages[j]->end_time == descr->end_time) {
+ break;
+ }
+ page_offset += extent->pages[j]->page_length;
+
+ }
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
+ if (xt_io_descr->release_descr) {
+ pg_cache_put(ctx, descr);
+ } else {
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
+ }
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+ freez(xt_io_descr);
+}
+
+void read_extent_cb(uv_fs_t* req)
+{
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ int ret;
+ unsigned i, j, count;
+ void *page, *uncompressed_buf = NULL;
+ uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0;
+ uint8_t have_read_error = 0;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ xt_io_descr = req->data;
+ header = xt_io_descr->buf;
+ payload_length = header->payload_length;
+ count = header->number_of_pages;
+ payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
+ trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
+
+ if (req->result < 0) {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ have_read_error = 1;
+ error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__,
+ uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ goto after_crc_check;
+ }
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
+ ret = crc32cmp(trailer->checksum, crc);
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+ }
+#endif
+ if (unlikely(ret)) {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ have_read_error = 1;
+ error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ }
+
+after_crc_check:
+ if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
+ uncompressed_payload_length = 0;
+ for (i = 0 ; i < count ; ++i) {
+ uncompressed_payload_length += header->descr[i].page_length;
+ }
+ uncompressed_buf = mallocz(uncompressed_payload_length);
+ ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf,
+ payload_length, uncompressed_payload_length);
+ ctx->stats.before_decompress_bytes += payload_length;
+ ctx->stats.after_decompress_bytes += ret;
+ debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
+ /* care, we don't hold the descriptor mutex */
+ }
+ {
+ uint8_t xt_is_cached = 0;
+ unsigned xt_idx;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) {
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx];
+ struct extent_io_descriptor *curr, *next;
+
+ if (have_read_error) {
+ memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages));
+ } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length);
+ } else {
+ (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length);
+ }
+ /* complete all connected in-flight read requests */
+ for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) {
+ next = curr->next;
+ read_cached_extent_cb(wc, xt_idx, curr);
+ }
+ xt_cache_elem->inflight_io_descr = NULL;
+ modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */
+ }
+ }
+
+ for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ descr = xt_io_descr->descr_array[i];
+ for (j = 0, page_offset = 0; j < count; ++j) {
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) &&
+ header->descr[j].page_length == descr->page_length &&
+ header->descr[j].start_time == descr->start_time &&
+ header->descr[j].end_time == descr->end_time) {
+ break;
+ }
+ page_offset += header->descr[j].page_length;
+ }
+ /* care, we don't hold the descriptor mutex */
+ if (have_read_error) {
+ /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */
+ memset(page, 0, descr->page_length);
+ } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
+ } else {
+ (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
+ }
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
+ if (xt_io_descr->release_descr) {
+ pg_cache_put(ctx, descr);
+ } else {
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
+ }
+ }
+ if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
+ freez(uncompressed_buf);
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+ uv_fs_req_cleanup(req);
+ free(xt_io_descr->buf);
+ freez(xt_io_descr);
+}
+
+
+static void do_read_extent(struct rrdengine_worker_config* wc,
+ struct rrdeng_page_descr **descr,
+ unsigned count,
+ uint8_t release_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache_descr *pg_cache_descr;
+ int ret;
+ unsigned i, size_bytes, pos, real_io_size;
+// uint32_t payload_length;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *extent = descr[0]->extent;
+ uint8_t xt_is_cached = 0, xt_is_inflight = 0;
+ unsigned xt_idx;
+
+ datafile = extent->datafile;
+ pos = extent->offset;
+ size_bytes = extent->size;
+
+ xt_io_descr = callocz(1, sizeof(*xt_io_descr));
+ for (i = 0 ; i < count; ++i) {
+ rrdeng_page_descr_mutex_lock(ctx, descr[i]);
+ pg_cache_descr = descr[i]->pg_cache_descr;
+ pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
+// payload_length = descr[i]->page_length;
+ rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
+
+ xt_io_descr->descr_array[i] = descr[i];
+ }
+ xt_io_descr->descr_count = count;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = pos;
+ xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->completion = NULL;
+ /* xt_io_descr->descr_commit_idx_array[0] */
+ xt_io_descr->release_descr = release_descr;
+
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached) {
+ xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]);
+ xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx);
+ if (xt_is_inflight) {
+ enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr);
+ return;
+ }
+ return read_cached_extent_cb(wc, xt_idx, xt_io_descr);
+ } else {
+ ret = try_insert_into_xt_cache(wc, extent);
+ if (-1 != ret) {
+ xt_idx = (unsigned)ret;
+ modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1);
+ wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr;
+ }
+ }
+
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* freez(xt_io_descr);
+ return;*/
+ }
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
+ fatal_assert(-1 != ret);
+ ctx->stats.io_read_bytes += real_io_size;
+ ++ctx->stats.io_read_requests;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+ ++ctx->stats.io_read_extents;
+ ctx->stats.pg_cache_backfills += count;
+}
+
+static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ unsigned count, payload_length, descr_size, size_bytes;
+ void *buf;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *df_header;
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_store_data *jf_metric_data;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ df_header = xt_io_descr->buf;
+ count = df_header->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+
+ buf = wal_get_transaction_buffer(wc, size_bytes);
+
+ jf_header = buf;
+ jf_header->type = STORE_DATA;
+ jf_header->reserved = 0;
+ jf_header->id = ctx->commit_log.transaction_id++;
+ jf_header->payload_length = payload_length;
+
+ jf_metric_data = buf + sizeof(*jf_header);
+ jf_metric_data->extent_offset = xt_io_descr->pos;
+ jf_metric_data->extent_size = xt_io_descr->bytes;
+ jf_metric_data->number_of_pages = count;
+ memcpy(jf_metric_data->descr, df_header->descr, descr_size);
+
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ crc32set(jf_trailer->checksum, crc);
+}
+
+static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data)
+{
+ switch (type) {
+ case STORE_DATA:
+ commit_data_extent(wc, (struct extent_io_descriptor *)data);
+ break;
+ default:
+ fatal_assert(type == STORE_DATA);
+ break;
+ }
+}
+
+static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ int error;
+
+ error = uv_thread_join(wc->now_invalidating_dirty_pages);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+}
+
+static void invalidate_oldest_committed(void *arg)
+{
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config *wc = &ctx->worker_config;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ Pvoid_t *PValue;
+ Word_t Index;
+ unsigned nr_committed_pages;
+
+ do {
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ for (Index = 0,
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+
+ descr != NULL;
+
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ fatal_assert(0 != descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) {
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
+ fatal_assert(1 == ret);
+ break;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (!descr) {
+ info("Failed to invalidate any dirty pages to relieve page cache pressure.");
+
+ goto out;
+ }
+ pg_cache_punch_hole(ctx, descr, 1, 1, NULL);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1);
+ rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1);
+
+ } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx));
+out:
+ wc->cleanup_thread_invalidating_dirty_pages = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned nr_committed_pages;
+ int error;
+
+ if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */
+ return;
+
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* delete the oldest page in memory */
+ if (wc->now_invalidating_dirty_pages) {
+ /* already deleting a page */
+ return;
+ }
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+
+ wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages));
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+
+ error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ }
+ }
+}
+
+void flush_pages_cb(uv_fs_t* req)
+{
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ unsigned i, count;
+
+ xt_io_descr = req->data;
+ if (req->result < 0) {
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
+ __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ }
+#endif
+ count = xt_io_descr->descr_count;
+ for (i = 0 ; i < count ; ++i) {
+ /* care, we don't hold the descriptor mutex */
+ descr = xt_io_descr->descr_array[i];
+
+ pg_cache_replaceQ_insert(ctx, descr);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
+ /* wake up waiters, care no reference being held */
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+ uv_fs_req_cleanup(req);
+ free(xt_io_descr->buf);
+ freez(xt_io_descr);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ pg_cache->committed_page_index.nr_committed_pages -= count;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ wc->inflight_dirty_pages -= count;
+}
+
+/*
+ * completion must be NULL or valid.
+ * Returns 0 when no flushing can take place.
+ * Returns datafile bytes to be written on successful flushing initiation.
+ */
+static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ int compressed_size, max_compressed_size = 0;
+ unsigned i, count, size_bytes, pos, real_io_size;
+ uint32_t uncompressed_payload_length, payload_offset;
+ struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct page_cache_descr *pg_cache_descr;
+ struct extent_io_descriptor *xt_io_descr;
+ void *compressed_buf = NULL;
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ Pvoid_t *PValue;
+ Word_t Index;
+ uint8_t compression_algorithm = ctx->global_compress_alg;
+ struct extent_info *extent;
+ struct rrdengine_datafile *datafile;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ if (force) {
+ debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
+ }
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ for (Index = 0, count = 0, uncompressed_payload_length = 0,
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue ;
+
+ descr != NULL && count != MAX_PAGES_PER_EXTENT ;
+
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ uint8_t page_write_pending;
+
+ fatal_assert(0 != descr->page_length);
+ page_write_pending = 0;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) {
+ page_write_pending = 1;
+ /* care, no reference being held */
+ pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING;
+ uncompressed_payload_length += descr->page_length;
+ descr_commit_idx_array[count] = Index;
+ eligible_pages[count++] = descr;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ if (page_write_pending) {
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
+ fatal_assert(1 == ret);
+ }
+ }
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (!count) {
+ debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
+ if (completion)
+ complete(completion);
+ return 0;
+ }
+ wc->inflight_dirty_pages += count;
+
+ xt_io_descr = mallocz(sizeof(*xt_io_descr));
+ payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
+ break;
+ default: /* Compress */
+ fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
+ max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
+ compressed_buf = mallocz(max_compressed_size);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
+ break;
+ }
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* freez(xt_io_descr);*/
+ }
+ (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
+ xt_io_descr->descr_count = count;
+
+ pos = 0;
+ header = xt_io_descr->buf;
+ header->compression_algorithm = compression_algorithm;
+ header->number_of_pages = count;
+ pos += sizeof(*header);
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */
+ extent->offset = datafile->pos;
+ extent->number_of_pages = count;
+ extent->datafile = datafile;
+ extent->next = NULL;
+
+ for (i = 0 ; i < count ; ++i) {
+ /* This is here for performance reasons */
+ xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i];
+
+ descr = xt_io_descr->descr_array[i];
+ header->descr[i].type = PAGE_METRICS;
+ uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
+ header->descr[i].page_length = descr->page_length;
+ header->descr[i].start_time = descr->start_time;
+ header->descr[i].end_time = descr->end_time;
+ pos += sizeof(header->descr[i]);
+ }
+ for (i = 0 ; i < count ; ++i) {
+ descr = xt_io_descr->descr_array[i];
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length);
+ descr->extent = extent;
+ extent->pages[i] = descr;
+
+ pos += descr->page_length;
+ }
+ df_extent_insert(extent);
+
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ header->payload_length = uncompressed_payload_length;
+ break;
+ default: /* Compress */
+ compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf,
+ uncompressed_payload_length, max_compressed_size);
+ ctx->stats.before_compress_bytes += uncompressed_payload_length;
+ ctx->stats.after_compress_bytes += compressed_size;
+ debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
+ (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
+ freez(compressed_buf);
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ header->payload_length = compressed_size;
+ break;
+ }
+ extent->size = size_bytes;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = datafile->pos;
+ xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->completion = completion;
+
+ trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer));
+ crc32set(trailer->checksum, crc);
+
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb);
+ fatal_assert(-1 != ret);
+ ctx->stats.io_write_bytes += real_io_size;
+ ++ctx->stats.io_write_requests;
+ ctx->stats.io_write_extent_bytes += real_io_size;
+ ++ctx->stats.io_write_extents;
+ do_commit_transaction(wc, STORE_DATA, xt_io_descr);
+ datafile->pos += ALIGN_BYTES_CEILING(size_bytes);
+ ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes);
+ rrdeng_test_quota(wc);
+
+ return ALIGN_BYTES_CEILING(size_bytes);
+}
+
+static void after_delete_old_data(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
+ int ret, error;
+ char path[RRDENG_PATH_MAX];
+
+ datafile = ctx->datafiles.first;
+ journalfile = datafile->journalfile;
+ datafile_bytes = datafile->pos;
+ journalfile_bytes = journalfile->pos;
+ deleted_bytes = 0;
+
+ info("Deleting data and journal file pair.");
+ datafile_list_delete(ctx, datafile);
+ ret = destroy_journal_file(journalfile, datafile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ deleted_bytes += journalfile_bytes;
+ }
+ ret = destroy_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ deleted_bytes += datafile_bytes;
+ }
+ freez(journalfile);
+ freez(datafile);
+
+ ctx->disk_space -= deleted_bytes;
+ info("Reclaimed %u bytes of disk space.", deleted_bytes);
+
+ error = uv_thread_join(wc->now_deleting_files);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_deleting_files);
+ /* unfreeze command processing */
+ wc->now_deleting_files = NULL;
+
+ wc->cleanup_thread_deleting_files = 0;
+
+ /* interrupt event loop */
+ uv_stop(wc->loop);
+}
+
+static void delete_old_data(void *arg)
+{
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config* wc = &ctx->worker_config;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *extent, *next;
+ struct rrdeng_page_descr *descr;
+ unsigned count, i;
+ uint8_t can_delete_metric;
+ uuid_t metric_id;
+
+ /* Safe to use since it will be deleted after we are done */
+ datafile = ctx->datafiles.first;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next) {
+ count = extent->number_of_pages;
+ for (i = 0 ; i < count ; ++i) {
+ descr = extent->pages[i];
+ can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id);
+ if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) {
+ /*
+ * If the metric is empty, has no active writers and if the metadata log has been initialized then
+ * attempt to delete the corresponding netdata dimension.
+ */
+ metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id);
+ }
+ }
+ next = extent->next;
+ freez(extent);
+ }
+ wc->cleanup_thread_deleting_files = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+void rrdeng_test_quota(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdengine_datafile *datafile;
+ unsigned current_size, target_size;
+ uint8_t out_of_space, only_one_datafile;
+ int ret, error;
+
+ out_of_space = 0;
+ /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */
+ if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) {
+ out_of_space = 1;
+ }
+ datafile = ctx->datafiles.last;
+ current_size = datafile->pos;
+ target_size = ctx->max_disk_space / TARGET_DATAFILES;
+ target_size = MIN(target_size, MAX_DATAFILE_SIZE);
+ target_size = MAX(target_size, MIN_DATAFILE_SIZE);
+ only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0;
+ if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
+ /* Finalize data and journal file and create a new pair */
+ wal_flush_transaction_buffer(wc);
+ ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1);
+ if (likely(!ret)) {
+ ++ctx->last_fileno;
+ }
+ }
+ if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) {
+ /* delete old data */
+ if (wc->now_deleting_files) {
+ /* already deleting data */
+ return;
+ }
+ if (NULL == ctx->datafiles.first->next) {
+ error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\""
+ " to reclaim space, there are no other file pairs left.",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ return;
+ }
+ info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files));
+ wc->cleanup_thread_deleting_files = 0;
+
+ error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_deleting_files);
+ wc->now_deleting_files = NULL;
+ }
+ }
+}
+
+static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc)
+{
+ if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) {
+ return 1;
+ }
+ return 0;
+}
+
+static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) {
+ after_invalidate_oldest_committed(wc);
+ }
+ if (unlikely(wc->cleanup_thread_deleting_files)) {
+ after_delete_old_data(wc);
+ }
+ if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) {
+ ctx->quiesce = QUIESCED;
+ complete(&ctx->rrdengine_completion);
+ }
+}
+
+/* return 0 on success */
+int init_rrd_files(struct rrdengine_instance *ctx)
+{
+ return init_data_files(ctx);
+}
+
+void finalize_rrd_files(struct rrdengine_instance *ctx)
+{
+ return finalize_data_files(ctx);
+}
+
+void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
+{
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ wc->queue_size = 0;
+ fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
+ fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
+}
+
+void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
+{
+ unsigned queue_size;
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&wc->cmd_mutex);
+ while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) {
+ uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
+ }
+ fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
+ wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.tail + 1 : 0;
+ wc->queue_size = queue_size + 1;
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
+{
+ struct rrdeng_cmd ret;
+ unsigned queue_size;
+
+ uv_mutex_lock(&wc->cmd_mutex);
+ queue_size = wc->queue_size;
+ if (queue_size == 0) {
+ ret.opcode = RRDENG_NOOP;
+ } else {
+ /* dequeue command */
+ ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
+ if (queue_size == 1) {
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ } else {
+ wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.head + 1 : 0;
+ }
+ wc->queue_size = queue_size - 1;
+
+ /* wake up producers */
+ uv_cond_signal(&wc->cmd_cond);
+ }
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ return ret;
+}
+
+void async_cb(uv_async_t *handle)
+{
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
+}
+
+/* Flushes dirty pages when timer expires */
+#define TIMER_PERIOD_MS (1000)
+
+void timer_cb(uv_timer_t* handle)
+{
+ struct rrdengine_worker_config* wc = handle->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ if (unlikely(!ctx->metalog_ctx->initialized))
+ return; /* Wait for the metadata log to initialize */
+ rrdeng_test_quota(wc);
+ debug(D_RRDENGINE, "%s: timeout reached.", __func__);
+ if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
+ /* There is free space so we can write to disk and we are not actively deleting dirty buffers */
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
+ high_watermark;
+
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+ producers = ctx->metric_API_max_producers;
+ /* are flushable pages more than 25% of the maximum page cache size */
+ high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
+ low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
+
+ /* Flush more pages only if disk can keep up */
+ if (wc->inflight_dirty_pages < high_watermark + producers) {
+ if (nr_committed_pages > producers &&
+ /* committed to be written pages are more than the produced number */
+ nr_committed_pages - producers > high_watermark) {
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
+ bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
+ }
+ bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+
+ debug(D_RRDENGINE, "Flushing pages to disk.");
+ for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL);
+ bytes_written && (total_bytes < bytes_to_write);
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 0, NULL);
+ }
+ }
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ char buf[4096];
+ debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
+ }
+#endif
+}
+
+#define MAX_CMD_BATCH_SIZE (256)
+
+void rrdeng_worker(void* arg)
+{
+ struct rrdengine_worker_config* wc = arg;
+ struct rrdengine_instance *ctx = wc->ctx;
+ uv_loop_t* loop;
+ int shutdown, ret;
+ enum rrdeng_opcode opcode;
+ uv_timer_t timer_req;
+ struct rrdeng_cmd cmd;
+ unsigned cmd_batch_size;
+
+ rrdeng_init_cmd_queue(wc);
+
+ loop = wc->loop = mallocz(sizeof(uv_loop_t));
+ ret = uv_loop_init(loop);
+ if (ret) {
+ error("uv_loop_init(): %s", uv_strerror(ret));
+ goto error_after_loop_init;
+ }
+ loop->data = wc;
+
+ ret = uv_async_init(wc->loop, &wc->async, async_cb);
+ if (ret) {
+ error("uv_async_init(): %s", uv_strerror(ret));
+ goto error_after_async_init;
+ }
+ wc->async.data = wc;
+
+ wc->now_deleting_files = NULL;
+ wc->cleanup_thread_deleting_files = 0;
+
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+ wc->inflight_dirty_pages = 0;
+
+ /* dirty page flushing timer */
+ ret = uv_timer_init(loop, &timer_req);
+ if (ret) {
+ error("uv_timer_init(): %s", uv_strerror(ret));
+ goto error_after_timer_init;
+ }
+ timer_req.data = wc;
+
+ wc->error = 0;
+ /* wake up initialization thread */
+ complete(&ctx->rrdengine_completion);
+
+ fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
+ shutdown = 0;
+ while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
+ uv_run(loop, UV_RUN_DEFAULT);
+ rrdeng_cleanup_finished_threads(wc);
+
+ /* wait for commands */
+ cmd_batch_size = 0;
+ do {
+ /*
+ * Avoid starving the loop when there are too many commands coming in.
+ * timer_cb will interrupt the loop again to allow serving more commands.
+ */
+ if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
+ break;
+
+ cmd = rrdeng_deq_cmd(wc);
+ opcode = cmd.opcode;
+ ++cmd_batch_size;
+
+ switch (opcode) {
+ case RRDENG_NOOP:
+ /* the command queue was empty, do nothing */
+ break;
+ case RRDENG_SHUTDOWN:
+ shutdown = 1;
+ break;
+ case RRDENG_QUIESCE:
+ ctx->drop_metrics_under_page_cache_pressure = 0;
+ ctx->quiesce = SET_QUIESCE;
+ fatal_assert(0 == uv_timer_stop(&timer_req));
+ uv_close((uv_handle_t *)&timer_req, NULL);
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all committed pages. */
+ }
+ wal_flush_transaction_buffer(wc);
+ if (!rrdeng_threads_alive(wc)) {
+ ctx->quiesce = QUIESCED;
+ complete(&ctx->rrdengine_completion);
+ }
+ break;
+ case RRDENG_READ_PAGE:
+ do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
+ break;
+ case RRDENG_READ_EXTENT:
+ do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1);
+ break;
+ case RRDENG_COMMIT_PAGE:
+ do_commit_transaction(wc, STORE_DATA, NULL);
+ break;
+ case RRDENG_FLUSH_PAGES: {
+ if (wc->now_invalidating_dirty_pages) {
+ /* Do not flush if the disk cannot keep up */
+ complete(cmd.completion);
+ } else {
+ (void)do_flush_pages(wc, 1, cmd.completion);
+ }
+ break;
+ case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE:
+ rrdeng_invalidate_oldest_committed(wc);
+ break;
+ }
+ default:
+ debug(D_RRDENGINE, "%s: default.", __func__);
+ break;
+ }
+ } while (opcode != RRDENG_NOOP);
+ }
+
+ /* cleanup operations of the event loop */
+ info("Shutting down RRD engine event loop.");
+
+ /*
+ * uv_async_send after uv_close does not seem to crash in linux at the moment,
+ * it is however undocumented behaviour and we need to be aware if this becomes
+ * an issue in the future.
+ */
+ uv_close((uv_handle_t *)&wc->async, NULL);
+
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all committed pages. */
+ }
+ wal_flush_transaction_buffer(wc);
+ uv_run(loop, UV_RUN_DEFAULT);
+
+ info("Shutting down RRD engine event loop complete.");
+ /* TODO: don't let the API block by waiting to enqueue commands */
+ uv_cond_destroy(&wc->cmd_cond);
+/* uv_mutex_destroy(&wc->cmd_mutex); */
+ fatal_assert(0 == uv_loop_close(loop));
+ freez(loop);
+
+ return;
+
+error_after_timer_init:
+ uv_close((uv_handle_t *)&wc->async, NULL);
+error_after_async_init:
+ fatal_assert(0 == uv_loop_close(loop));
+error_after_loop_init:
+ freez(loop);
+
+ wc->error = UV_EAGAIN;
+ /* wake up initialization thread */
+ complete(&ctx->rrdengine_completion);
+}
+
+/* C entry point for development purposes
+ * make "LDFLAGS=-errdengine_main"
+ */
+void rrdengine_main(void)
+{
+ int ret;
+ struct rrdengine_instance *ctx;
+
+ sanity_check();
+ ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
+ if (ret) {
+ exit(ret);
+ }
+ rrdeng_exit(ctx);
+ fprintf(stderr, "Hello world!");
+ exit(0);
+}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
new file mode 100644
index 0000000..87af04b
--- /dev/null
+++ b/database/engine/rrdengine.h
@@ -0,0 +1,233 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINE_H
+#define NETDATA_RRDENGINE_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <fcntl.h>
+#include <lz4.h>
+#include <Judy.h>
+#include <openssl/sha.h>
+#include <openssl/evp.h>
+#include "../../daemon/common.h"
+#include "../rrd.h"
+#include "rrddiskprotocol.h"
+#include "rrdenginelib.h"
+#include "datafile.h"
+#include "journalfile.h"
+#include "metadata_log/metadatalog.h"
+#include "rrdengineapi.h"
+#include "pagecache.h"
+#include "rrdenglocking.h"
+
+#ifdef NETDATA_RRD_INTERNALS
+
+#endif /* NETDATA_RRD_INTERNALS */
+
+/* Forward declerations */
+struct rrdengine_instance;
+
+#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
+
+#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
+#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+
+
+typedef enum {
+ RRDENGINE_STATUS_UNINITIALIZED = 0,
+ RRDENGINE_STATUS_INITIALIZING,
+ RRDENGINE_STATUS_INITIALIZED
+} rrdengine_state_t;
+
+enum rrdeng_opcode {
+ /* can be used to return empty status or flush the command queue */
+ RRDENG_NOOP = 0,
+
+ RRDENG_READ_PAGE,
+ RRDENG_READ_EXTENT,
+ RRDENG_COMMIT_PAGE,
+ RRDENG_FLUSH_PAGES,
+ RRDENG_SHUTDOWN,
+ RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
+ RRDENG_QUIESCE,
+
+ RRDENG_MAX_OPCODE
+};
+
+struct rrdeng_cmd {
+ enum rrdeng_opcode opcode;
+ union {
+ struct rrdeng_read_page {
+ struct rrdeng_page_descr *page_cache_descr;
+ } read_page;
+ struct rrdeng_read_extent {
+ struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
+ int page_count;
+ } read_extent;
+ struct completion *completion;
+ };
+};
+
+#define RRDENG_CMD_Q_MAX_SIZE (2048)
+
+struct rrdeng_cmdqueue {
+ unsigned head, tail;
+ struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE];
+};
+
+struct extent_io_descriptor {
+ uv_fs_t req;
+ uv_buf_t iov;
+ void *buf;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+ unsigned descr_count;
+ int release_descr;
+ struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */
+};
+
+struct generic_io_descriptor {
+ uv_fs_t req;
+ uv_buf_t iov;
+ void *buf;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+};
+
+struct extent_cache_element {
+ struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */
+ unsigned fileno;
+ struct extent_cache_element *prev; /* LRU */
+ struct extent_cache_element *next; /* LRU */
+ struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */
+ uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE];
+};
+
+#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */
+
+/* Initialize by setting the structure to zero */
+struct extent_cache {
+ struct extent_cache_element extent_array[MAX_CACHED_EXTENTS];
+ unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */
+ unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */
+
+ struct extent_cache_element *replaceQ_head; /* LRU */
+ struct extent_cache_element *replaceQ_tail; /* MRU */
+};
+
+struct rrdengine_worker_config {
+ struct rrdengine_instance *ctx;
+
+ uv_thread_t thread;
+ uv_loop_t* loop;
+ uv_async_t async;
+
+ /* file deletion thread */
+ uv_thread_t *now_deleting_files;
+ unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
+
+ /* dirty page deletion thread */
+ uv_thread_t *now_invalidating_dirty_pages;
+ /* set to 0 when now_invalidating_dirty_pages is still running */
+ unsigned long cleanup_thread_invalidating_dirty_pages;
+ unsigned inflight_dirty_pages;
+
+ /* FIFO command queue */
+ uv_mutex_t cmd_mutex;
+ uv_cond_t cmd_cond;
+ volatile unsigned queue_size;
+ struct rrdeng_cmdqueue cmd_queue;
+
+ struct extent_cache xt_cache;
+
+ int error;
+};
+
+/*
+ * Debug statistics not used by code logic.
+ * They only describe operations since DB engine instance load time.
+ */
+struct rrdengine_statistics {
+ rrdeng_stats_t metric_API_producers;
+ rrdeng_stats_t metric_API_consumers;
+ rrdeng_stats_t pg_cache_insertions;
+ rrdeng_stats_t pg_cache_deletions;
+ rrdeng_stats_t pg_cache_hits;
+ rrdeng_stats_t pg_cache_misses;
+ rrdeng_stats_t pg_cache_backfills;
+ rrdeng_stats_t pg_cache_evictions;
+ rrdeng_stats_t before_decompress_bytes;
+ rrdeng_stats_t after_decompress_bytes;
+ rrdeng_stats_t before_compress_bytes;
+ rrdeng_stats_t after_compress_bytes;
+ rrdeng_stats_t io_write_bytes;
+ rrdeng_stats_t io_write_requests;
+ rrdeng_stats_t io_read_bytes;
+ rrdeng_stats_t io_read_requests;
+ rrdeng_stats_t io_write_extent_bytes;
+ rrdeng_stats_t io_write_extents;
+ rrdeng_stats_t io_read_extent_bytes;
+ rrdeng_stats_t io_read_extents;
+ rrdeng_stats_t datafile_creations;
+ rrdeng_stats_t datafile_deletions;
+ rrdeng_stats_t journalfile_creations;
+ rrdeng_stats_t journalfile_deletions;
+ rrdeng_stats_t page_cache_descriptors;
+ rrdeng_stats_t io_errors;
+ rrdeng_stats_t fs_errors;
+ rrdeng_stats_t pg_cache_over_half_dirty_events;
+ rrdeng_stats_t flushing_pressure_page_deletions;
+};
+
+/* I/O errors global counter */
+extern rrdeng_stats_t global_io_errors;
+/* File-System errors global counter */
+extern rrdeng_stats_t global_fs_errors;
+/* number of File-Descriptors that have been reserved by dbengine */
+extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
+/* inability to flush global counters */
+extern rrdeng_stats_t global_pg_cache_over_half_dirty_events;
+extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */
+
+#define NO_QUIESCE (0) /* initial state when all operations function normally */
+#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
+#define QUIESCED (2) /* is set after all threads have finished running */
+
+struct rrdengine_instance {
+ struct metalog_instance *metalog_ctx;
+ struct rrdengine_worker_config worker_config;
+ struct completion rrdengine_completion;
+ struct page_cache pg_cache;
+ uint8_t drop_metrics_under_page_cache_pressure; /* boolean */
+ uint8_t global_compress_alg;
+ struct transaction_commit_log commit_log;
+ struct rrdengine_datafile_list datafiles;
+ RRDHOST *host; /* the legacy host, or NULL for multi-host DB */
+ char dbfiles_path[FILENAME_MAX + 1];
+ char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */
+ uint64_t disk_space;
+ uint64_t max_disk_space;
+ unsigned last_fileno; /* newest index of datafile and journalfile */
+ unsigned long max_cache_pages;
+ unsigned long cache_pages_low_watermark;
+ unsigned long metric_API_max_producers;
+
+ uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
+
+ struct rrdengine_statistics stats;
+};
+
+extern int init_rrd_files(struct rrdengine_instance *ctx);
+extern void finalize_rrd_files(struct rrdengine_instance *ctx);
+extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
+extern void rrdeng_worker(void* arg);
+extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
+extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
+
+#endif /* NETDATA_RRDENGINE_H */ \ No newline at end of file
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
new file mode 100755
index 0000000..7b2ff5b
--- /dev/null
+++ b/database/engine/rrdengineapi.c
@@ -0,0 +1,999 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+/* Default global database instance */
+struct rrdengine_instance multidb_ctx;
+
+int default_rrdeng_page_cache_mb = 32;
+int default_rrdeng_disk_quota_mb = 256;
+int default_multidb_disk_quota_mb = 256;
+/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
+uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
+
+static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host)
+{
+ return host->rrdeng_ctx;
+}
+
+/* This UUID is not unique across hosts */
+void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid)
+{
+ EVP_MD_CTX *evpctx;
+ unsigned char hash_value[EVP_MAX_MD_SIZE];
+ unsigned int hash_len;
+
+ evpctx = EVP_MD_CTX_create();
+ EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id));
+ EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id));
+ EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
+ EVP_MD_CTX_destroy(evpctx);
+ fatal_assert(hash_len > sizeof(uuid_t));
+ memcpy(ret_uuid, hash_value, sizeof(uuid_t));
+}
+
+/* Transform legacy UUID to be unique across hosts deterministacally */
+void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid)
+{
+ EVP_MD_CTX *evpctx;
+ unsigned char hash_value[EVP_MAX_MD_SIZE];
+ unsigned int hash_len;
+
+ evpctx = EVP_MD_CTX_create();
+ EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN);
+ EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t));
+ EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
+ EVP_MD_CTX_destroy(evpctx);
+ fatal_assert(hash_len > sizeof(uuid_t));
+ memcpy(ret_uuid, hash_value, sizeof(uuid_t));
+}
+
+void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid)
+{
+ struct page_cache *pg_cache;
+ struct rrdengine_instance *ctx;
+ uuid_t legacy_uuid;
+ uuid_t multihost_legacy_uuid;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ int is_multihost_child = 0;
+ RRDHOST *host = rd->rrdset->rrdhost;
+
+ ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
+ if (unlikely(!ctx)) {
+ error("Failed to fetch multidb context");
+ return;
+ }
+ pg_cache = &ctx->pg_cache;
+
+ rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid);
+ rd->state->metric_uuid = dim_uuid;
+ if (host != localhost && host->rrdeng_ctx == &multidb_ctx)
+ is_multihost_child = 1;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (is_multihost_child || NULL == PValue) {
+ /* First time we see the legacy UUID or metric belongs to child host in multi-host DB.
+ * Drop legacy support, normal path */
+
+ if (unlikely(!rd->state->metric_uuid))
+ rd->state->metric_uuid = create_dimension_uuid(rd->rrdset, rd);
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t), PJE0);
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(rd->state->metric_uuid);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+ } else {
+ /* There are legacy UUIDs in the database, implement backward compatibility */
+
+ rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid,
+ &multihost_legacy_uuid);
+
+ if (unlikely(!rd->state->metric_uuid))
+ rd->state->metric_uuid = mallocz(sizeof(uuid_t));
+
+ int need_to_store = (dim_uuid == NULL || uuid_compare(*rd->state->metric_uuid, multihost_legacy_uuid));
+
+ uuid_copy(*rd->state->metric_uuid, multihost_legacy_uuid);
+
+ if (unlikely(need_to_store))
+ (void)sql_store_dimension(rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
+ rd->algorithm);
+
+ }
+ rd->state->rrdeng_uuid = &page_index->id;
+ rd->state->page_index = page_index;
+}
+
+/*
+ * Gets a handle for storing metrics to the database.
+ * The handle must be released with rrdeng_store_metric_final().
+ */
+void rrdeng_store_metric_init(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+
+ ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
+ handle = &rd->state->handle.rrdeng;
+ handle->ctx = ctx;
+
+ handle->descr = NULL;
+ handle->prev_descr = NULL;
+ handle->unaligned_page = 0;
+
+ page_index = rd->state->page_index;
+ uv_rwlock_wrlock(&page_index->lock);
+ ++page_index->writers;
+ uv_rwlock_wrunlock(&page_index->lock);
+}
+
+/* The page must be populated and referenced */
+static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
+{
+ unsigned i;
+ uint8_t has_only_empty_metrics = 1;
+ storage_number *page;
+
+ page = descr->pg_cache_descr->page;
+ for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) {
+ if (SN_EMPTY_SLOT != page[i]) {
+ has_only_empty_metrics = 0;
+ break;
+ }
+ }
+ return has_only_empty_metrics;
+}
+
+void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_descr *descr;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ if (unlikely(!ctx))
+ return;
+ descr = handle->descr;
+ if (unlikely(NULL == descr)) {
+ return;
+ }
+ if (likely(descr->page_length)) {
+ int page_is_empty;
+
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
+ }
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1, 0, NULL);
+ handle->prev_descr = NULL;
+ } else {
+ /*
+ * Disable pinning for now as it leads to deadlocks. When a collector stops collecting the extra pinned page
+ * eventually gets rotated but it cannot be destroyed due to the extra reference.
+ */
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+/* rrdeng_page_descr_mutex_lock(ctx, descr);
+ ret = pg_cache_try_get_unsafe(descr, 0);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ fatal_assert(1 == ret);*/
+
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ /* handle->prev_descr = descr;*/
+ }
+ } else {
+ freez(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
+ freez(descr);
+ }
+ handle->descr = NULL;
+}
+
+void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct page_cache *pg_cache;
+ struct rrdeng_page_descr *descr;
+ storage_number *page;
+ uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ pg_cache = &ctx->pg_cache;
+ descr = handle->descr;
+
+ if (descr) {
+ /* Make alignment decisions */
+
+ if (descr->page_length == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ /* is the metric far enough out of alignment with the others? */
+ if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) {
+ handle->unaligned_page = 1;
+ debug(D_RRDENGINE, "Metric page is not aligned with chart:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ }
+ if (unlikely(handle->unaligned_page &&
+ /* did the other metrics change page? */
+ rd->rrdset->rrddim_page_alignment <= sizeof(number))) {
+ debug(D_RRDENGINE, "Flushing unaligned metric page.");
+ must_flush_unaligned_page = 1;
+ handle->unaligned_page = 0;
+ }
+ }
+ if (unlikely(NULL == descr ||
+ descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE ||
+ must_flush_unaligned_page)) {
+ rrdeng_store_metric_flush_current_page(rd);
+
+ page = rrdeng_create_page(ctx, &rd->state->page_index->id, &descr);
+ fatal_assert(page);
+
+ handle->descr = descr;
+
+ handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
+
+ if (0 == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ }
+ page = descr->pg_cache_descr->page;
+ page[descr->page_length / sizeof(number)] = number;
+ pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number));
+
+ if (perfect_page_alignment)
+ rd->rrdset->rrddim_page_alignment = descr->page_length;
+ if (unlikely(INVALID_TIME == descr->start_time)) {
+ unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
+ descr->start_time = point_in_time;
+
+ new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1);
+ while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) {
+ /* Increase ctx->metric_API_max_producers */
+ ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers,
+ old_metric_API_max_producers,
+ new_metric_API_producers);
+ if (old_metric_API_max_producers == ret_metric_API_max_producers) {
+ /* success */
+ break;
+ }
+ }
+
+ pg_cache_insert(ctx, rd->state->page_index, descr);
+ } else {
+ pg_cache_add_new_metric_time(rd->state->page_index, descr);
+ }
+}
+
+/*
+ * Releases the database reference from the handle for storing metrics.
+ * Returns 1 if it's safe to delete the dimension.
+ */
+int rrdeng_store_metric_finalize(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+ uint8_t can_delete_metric = 0;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ page_index = rd->state->page_index;
+ rrdeng_store_metric_flush_current_page(rd);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
+ }
+ uv_rwlock_wrlock(&page_index->lock);
+ if (!--page_index->writers && !page_index->page_count) {
+ can_delete_metric = 1;
+ }
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ return can_delete_metric;
+}
+
+/* Returns 1 if the data collection interval is well defined, 0 otherwise */
+static int metrics_with_known_interval(struct rrdeng_page_descr *descr)
+{
+ unsigned page_entries;
+
+ if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time))
+ return 0;
+ page_entries = descr->page_length / sizeof(storage_number);
+ if (likely(page_entries > 1)) {
+ return 1;
+ }
+ return 0;
+}
+
+static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
+{
+ return (uint32_t *)&page_info->scratch[0];
+}
+
+static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
+{
+ return (uint32_t *)&page_info->scratch[sizeof(uint32_t)];
+}
+
+/**
+ * Calculates the regions of different data collection intervals in a netdata chart in the time range
+ * [start_time,end_time]. This call takes the netdata chart read lock.
+ * @param st the netdata chart whose data collection interval boundaries are calculated.
+ * @param start_time inclusive starting time in usec
+ * @param end_time inclusive ending time in usec
+ * @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a
+ * reference dimension that that have different data collection intervals and overlap with the time range
+ * [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set
+ * to NULL nothing was allocated.
+ * @param max_intervalp is derefenced and set to be the largest data collection interval of all regions.
+ * @return number of regions with different data collection intervals.
+ */
+unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
+ struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list)
+{
+ struct pg_cache_page_index *page_index;
+ struct rrdengine_instance *ctx;
+ unsigned pages_nr;
+ RRDDIM *rd_iter, *rd;
+ struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev;
+ unsigned i, j, page_entries, region_points, page_points, regions, max_interval;
+ time_t now;
+ usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page;
+ struct rrdeng_region_info *region_info_array;
+ uint8_t is_first_region_initialized;
+
+ ctx = get_rrdeng_ctx_from_host(st->rrdhost);
+ regions = 1;
+ *max_intervalp = max_interval = 0;
+ region_info_array = NULL;
+ *region_info_arrayp = NULL;
+ page_info_array = NULL;
+
+ RRDDIM *temp_rd = context_param_list ? context_param_list->rd : NULL;
+ rrdset_rdlock(st);
+ for(rd_iter = temp_rd?temp_rd:st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) {
+ /*
+ * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions
+ * but it is a best effort approximation with a bias towards older metrics in a chart. It
+ * matches netdata behaviour in the sense that dimensions are generally aligned in a chart
+ * and older dimensions contain more information about the time range. It does not work well
+ * for metrics that have recently stopped being collected.
+ */
+ curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid,
+ start_time * USEC_PER_SEC, end_time * USEC_PER_SEC);
+ if (INVALID_TIME != curr_time && curr_time < min_time) {
+ rd = rd_iter;
+ min_time = curr_time;
+ }
+ }
+ rrdset_unlock(st);
+ if (NULL == rd) {
+ return 1;
+ }
+ pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
+ &page_info_array, &page_index);
+ if (pages_nr) {
+ /* conservative allocation, will reduce the size later if necessary */
+ region_info_array = mallocz(sizeof(*region_info_array) * pages_nr);
+ }
+ is_first_region_initialized = 0;
+ region_points = 0;
+
+ /* pages loop */
+ for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) {
+ old_prev = prev;
+ prev = curr;
+ curr = &page_info_array[i];
+ *pginfo_to_points(curr) = 0; /* initialize to invalid page */
+ *pginfo_to_dt(curr) = 0; /* no known data collection interval yet */
+ if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time ||
+ curr->end_time < curr->start_time)) {
+ info("Ignoring page with invalid timestamps.");
+ prev = old_prev;
+ continue;
+ }
+ page_entries = curr->page_length / sizeof(storage_number);
+ fatal_assert(0 != page_entries);
+ if (likely(1 != page_entries)) {
+ dt = (curr->end_time - curr->start_time) / (page_entries - 1);
+ *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt);
+ if (unlikely(0 == *pginfo_to_dt(curr)))
+ *pginfo_to_dt(curr) = 1;
+ } else {
+ dt = 0;
+ }
+ for (j = 0, page_points = 0 ; j < page_entries ; ++j) {
+ uint8_t is_metric_out_of_order, is_metric_earlier_than_range;
+
+ is_metric_earlier_than_range = 0;
+ is_metric_out_of_order = 0;
+
+ current_position_time = curr->start_time + j * dt;
+ now = current_position_time / USEC_PER_SEC;
+ if (now > end_time) { /* there will be no more pages in the time range */
+ break;
+ }
+ if (now < start_time)
+ is_metric_earlier_than_range = 1;
+ if (unlikely(current_position_time < max_time)) /* just went back in time */
+ is_metric_out_of_order = 1;
+ if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) {
+ if (unlikely(is_metric_out_of_order))
+ info("Ignoring metric with out of order timestamp.");
+ continue; /* next entry */
+ }
+ /* here is a valid metric */
+ ++page_points;
+ region_info_array[regions - 1].points = ++region_points;
+ max_time = current_position_time;
+ if (1 == page_points)
+ first_valid_time_in_page = current_position_time;
+ if (unlikely(!is_first_region_initialized)) {
+ fatal_assert(1 == regions);
+ /* this is the first region */
+ region_info_array[0].start_time = current_position_time;
+ is_first_region_initialized = 1;
+ }
+ }
+ *pginfo_to_points(curr) = page_points;
+ if (0 == page_points) {
+ prev = old_prev;
+ continue;
+ }
+
+ if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */
+ fatal_assert(1 == page_points);
+
+ if (likely(NULL != prev)) { /* get interval from previous page */
+ *pginfo_to_dt(curr) = *pginfo_to_dt(prev);
+ } else { /* there is no previous page in the query */
+ struct rrdeng_page_info db_page_info;
+
+ /* go to database */
+ pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time,
+ metrics_with_known_interval, &db_page_info);
+ if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME ||
+ 0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */
+ *pginfo_to_dt(curr) = rd->update_every;
+ } else {
+ unsigned db_entries;
+ usec_t db_dt;
+
+ db_entries = db_page_info.page_length / sizeof(storage_number);
+ db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1);
+ *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt);
+ if (unlikely(0 == *pginfo_to_dt(curr)))
+ *pginfo_to_dt(curr) = 1;
+
+ }
+ }
+ }
+ if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) {
+ info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32,
+ *pginfo_to_dt(prev), *pginfo_to_dt(curr));
+ region_info_array[regions++ - 1].points -= page_points;
+ region_info_array[regions - 1].points = region_points = page_points;
+ region_info_array[regions - 1].start_time = first_valid_time_in_page;
+ }
+ if (*pginfo_to_dt(curr) > max_interval)
+ max_interval = *pginfo_to_dt(curr);
+ region_info_array[regions - 1].update_every = *pginfo_to_dt(curr);
+ }
+ if (page_info_array)
+ freez(page_info_array);
+ if (region_info_array) {
+ if (likely(is_first_region_initialized)) {
+ /* free unnecessary memory */
+ region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions);
+ *region_info_arrayp = region_info_array;
+ *max_intervalp = max_interval;
+ } else {
+ /* empty result */
+ freez(region_info_array);
+ }
+ }
+ return regions;
+}
+
+/*
+ * Gets a handle for loading metrics from the database.
+ * The handle must be released with rrdeng_load_metric_final().
+ */
+void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+ unsigned pages_nr;
+
+ ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
+ rrdimm_handle->start_time = start_time;
+ rrdimm_handle->end_time = end_time;
+ handle = &rrdimm_handle->rrdeng;
+ handle->next_page_time = start_time;
+ handle->now = start_time;
+ handle->position = 0;
+ handle->ctx = ctx;
+ handle->descr = NULL;
+ pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
+ NULL, &handle->page_index);
+ if (unlikely(NULL == handle->page_index || 0 == pages_nr))
+ /* there are no metrics to load */
+ handle->next_page_time = INVALID_TIME;
+}
+
+/* Returns the metric and sets its timestamp into current_time */
+storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_descr *descr;
+ storage_number *page, ret;
+ unsigned position, entries;
+ usec_t next_page_time = 0, current_position_time, page_end_time = 0;
+ uint32_t page_length;
+
+ handle = &rrdimm_handle->rrdeng;
+ if (unlikely(INVALID_TIME == handle->next_page_time)) {
+ return SN_EMPTY_SLOT;
+ }
+ ctx = handle->ctx;
+ if (unlikely(NULL == (descr = handle->descr))) {
+ /* it's the first call */
+ next_page_time = handle->next_page_time * USEC_PER_SEC;
+ } else {
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
+ }
+ position = handle->position + 1;
+
+ if (unlikely(NULL == descr ||
+ position >= (page_length / sizeof(storage_number)))) {
+ /* We need to get a new page */
+ if (descr) {
+ /* Drop old page's reference */
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+ pg_cache_put(ctx, descr);
+ handle->descr = NULL;
+ handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
+ if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
+ goto no_more_metrics;
+ }
+ next_page_time = handle->next_page_time * USEC_PER_SEC;
+ }
+
+ descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
+ next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
+ if (NULL == descr) {
+ goto no_more_metrics;
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
+#endif
+ handle->descr = descr;
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
+ if (unlikely(INVALID_TIME == descr->start_time ||
+ INVALID_TIME == page_end_time)) {
+ goto no_more_metrics;
+ }
+ if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
+ /* we're in the middle of the page somewhere */
+ entries = page_length / sizeof(storage_number);
+ position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
+ (page_end_time - descr->start_time);
+ } else {
+ position = 0;
+ }
+ }
+ page = descr->pg_cache_descr->page;
+ ret = page[position];
+ entries = page_length / sizeof(storage_number);
+ if (entries > 1) {
+ usec_t dt;
+
+ dt = (page_end_time - descr->start_time) / (entries - 1);
+ current_position_time = descr->start_time + position * dt;
+ } else {
+ current_position_time = descr->start_time;
+ }
+ handle->position = position;
+ handle->now = current_position_time / USEC_PER_SEC;
+/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
+ The above assertion is an approximation and needs to take update_every into account */
+ if (unlikely(handle->now >= rrdimm_handle->end_time)) {
+ /* next calls will not load any more metrics */
+ handle->next_page_time = INVALID_TIME;
+ }
+ *current_time = handle->now;
+ return ret;
+
+no_more_metrics:
+ handle->next_page_time = INVALID_TIME;
+ return SN_EMPTY_SLOT;
+}
+
+int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+
+ handle = &rrdimm_handle->rrdeng;
+ return (INVALID_TIME == handle->next_page_time);
+}
+
+/*
+ * Releases the database reference from the handle for loading metrics.
+ */
+void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_descr *descr;
+
+ handle = &rrdimm_handle->rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (descr) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+ pg_cache_put(ctx, descr);
+ }
+}
+
+time_t rrdeng_metric_latest_time(RRDDIM *rd)
+{
+ struct pg_cache_page_index *page_index;
+
+ page_index = rd->state->page_index;
+
+ return page_index->latest_time / USEC_PER_SEC;
+}
+time_t rrdeng_metric_oldest_time(RRDDIM *rd)
+{
+ struct pg_cache_page_index *page_index;
+
+ page_index = rd->state->page_index;
+
+ return page_index->oldest_time / USEC_PER_SEC;
+}
+
+/* Also gets a reference for the page */
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
+{
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ void *page;
+ /* TODO: check maximum number of pages in page cache limit */
+
+ descr = pg_cache_create_descr();
+ descr->id = id; /* TODO: add page type: metric, log, something? */
+ page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ pg_cache_descr->refcnt = 1;
+
+ debug(D_RRDENGINE, "Created new page:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ *ret_descr = descr;
+ return page;
+}
+
+/* The page must not be empty */
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+ Word_t page_correlation_id)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ unsigned nr_committed_pages;
+
+ if (unlikely(NULL == descr)) {
+ debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
+ return;
+ }
+ fatal_assert(descr->page_length);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
+ *PValue = descr;
+ nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) {
+ /* over 50% of pages have not been committed yet */
+
+ if (ctx->drop_metrics_under_page_cache_pressure &&
+ nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* 100% of pages are dirty */
+ struct rrdeng_cmd cmd;
+
+ cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ } else {
+ if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) {
+ /* only print the first time */
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data at risk of not being stored in the database, "
+ "please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+ }
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1);
+ rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1);
+ }
+ }
+
+ pg_cache_put(ctx, descr);
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
+{
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ debug(D_RRDENGINE, "Reading existing page:");
+ descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
+
+ return pg_cache_descr->page;
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
+{
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ debug(D_RRDENGINE, "Reading existing page:");
+ descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
+
+ return pg_cache_descr->page;
+}
+
+/*
+ * Gathers Database Engine statistics.
+ * Careful when modifying this function.
+ * You must not change the indices of the statistics or user code will break.
+ * You must not exceed RRDENG_NR_STATS or it will crash.
+ */
+void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+{
+ if (ctx == NULL)
+ return;
+
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ array[0] = (uint64_t)ctx->stats.metric_API_producers;
+ array[1] = (uint64_t)ctx->stats.metric_API_consumers;
+ array[2] = (uint64_t)pg_cache->page_descriptors;
+ array[3] = (uint64_t)pg_cache->populated_pages;
+ array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages;
+ array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
+ array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
+ array[7] = (uint64_t)ctx->stats.pg_cache_hits;
+ array[8] = (uint64_t)ctx->stats.pg_cache_misses;
+ array[9] = (uint64_t)ctx->stats.pg_cache_backfills;
+ array[10] = (uint64_t)ctx->stats.pg_cache_evictions;
+ array[11] = (uint64_t)ctx->stats.before_compress_bytes;
+ array[12] = (uint64_t)ctx->stats.after_compress_bytes;
+ array[13] = (uint64_t)ctx->stats.before_decompress_bytes;
+ array[14] = (uint64_t)ctx->stats.after_decompress_bytes;
+ array[15] = (uint64_t)ctx->stats.io_write_bytes;
+ array[16] = (uint64_t)ctx->stats.io_write_requests;
+ array[17] = (uint64_t)ctx->stats.io_read_bytes;
+ array[18] = (uint64_t)ctx->stats.io_read_requests;
+ array[19] = (uint64_t)ctx->stats.io_write_extent_bytes;
+ array[20] = (uint64_t)ctx->stats.io_write_extents;
+ array[21] = (uint64_t)ctx->stats.io_read_extent_bytes;
+ array[22] = (uint64_t)ctx->stats.io_read_extents;
+ array[23] = (uint64_t)ctx->stats.datafile_creations;
+ array[24] = (uint64_t)ctx->stats.datafile_deletions;
+ array[25] = (uint64_t)ctx->stats.journalfile_creations;
+ array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+ array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
+ array[28] = (uint64_t)ctx->stats.io_errors;
+ array[29] = (uint64_t)ctx->stats.fs_errors;
+ array[30] = (uint64_t)global_io_errors;
+ array[31] = (uint64_t)global_fs_errors;
+ array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
+ array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events;
+ array[34] = (uint64_t)global_pg_cache_over_half_dirty_events;
+ array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions;
+ array[36] = (uint64_t)global_flushing_pressure_page_deletions;
+ fatal_assert(RRDENG_NR_STATS == 37);
+}
+
+/* Releases reference to page */
+void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
+{
+ (void)ctx;
+ pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
+}
+
+/*
+ * Returns 0 on success, negative on error
+ */
+int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb)
+{
+ struct rrdengine_instance *ctx;
+ int error;
+ uint32_t max_open_files;
+
+ max_open_files = rlimit_nofile.rlim_cur / 4;
+
+ /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
+ if (rrdeng_reserved_file_descriptors > max_open_files) {
+ error(
+ "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
+ (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
+
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EMFILE;
+ }
+
+ if (NULL == ctxp) {
+ ctx = &multidb_ctx;
+ memset(ctx, 0, sizeof(*ctx));
+ } else {
+ *ctxp = ctx = callocz(1, sizeof(*ctx));
+ }
+ ctx->global_compress_alg = RRD_LZ4;
+ if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
+ page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+ ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE);
+ /* try to keep 5% of the page cache free */
+ ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100;
+ if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
+ disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
+ ctx->max_disk_space = disk_space_mb * 1048576LLU;
+ strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
+ ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
+ if (NULL == host)
+ strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN);
+ else
+ strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN);
+
+ ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
+ ctx->metric_API_max_producers = 0;
+ ctx->quiesce = NO_QUIESCE;
+ ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */
+ ctx->host = host;
+
+ memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
+ ctx->worker_config.ctx = ctx;
+ init_page_cache(ctx);
+ init_commit_log(ctx);
+ error = init_rrd_files(ctx);
+ if (error) {
+ goto error_after_init_rrd_files;
+ }
+
+ init_completion(&ctx->rrdengine_completion);
+ fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
+ /* wait for worker thread to initialize */
+ wait_for_completion(&ctx->rrdengine_completion);
+ destroy_completion(&ctx->rrdengine_completion);
+ uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
+ if (ctx->worker_config.error) {
+ goto error_after_rrdeng_worker;
+ }
+ error = metalog_init(ctx);
+ if (error) {
+ error("Failed to initialize metadata log file event loop.");
+ goto error_after_rrdeng_worker;
+ }
+
+ return 0;
+
+error_after_rrdeng_worker:
+ finalize_rrd_files(ctx);
+error_after_init_rrd_files:
+ free_page_cache(ctx);
+ if (ctx != &multidb_ctx) {
+ freez(ctx);
+ *ctxp = NULL;
+ }
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EIO;
+}
+
+/*
+ * Returns 0 on success, 1 on error
+ */
+int rrdeng_exit(struct rrdengine_instance *ctx)
+{
+ struct rrdeng_cmd cmd;
+
+ if (NULL == ctx) {
+ return 1;
+ }
+
+ /* TODO: add page to page cache */
+ cmd.opcode = RRDENG_SHUTDOWN;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
+
+ finalize_rrd_files(ctx);
+ //metalog_exit(ctx->metalog_ctx);
+ free_page_cache(ctx);
+
+ if (ctx != &multidb_ctx) {
+ freez(ctx);
+ }
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return 0;
+}
+
+void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
+{
+ struct rrdeng_cmd cmd;
+
+ if (NULL == ctx) {
+ return;
+ }
+
+ init_completion(&ctx->rrdengine_completion);
+ cmd.opcode = RRDENG_QUIESCE;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ /* wait for dbengine to quiesce */
+ wait_for_completion(&ctx->rrdengine_completion);
+ destroy_completion(&ctx->rrdengine_completion);
+
+ //metalog_prepare_exit(ctx->metalog_ctx);
+}
+
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
new file mode 100644
index 0000000..41375b9
--- /dev/null
+++ b/database/engine/rrdengineapi.h
@@ -0,0 +1,63 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINEAPI_H
+#define NETDATA_RRDENGINEAPI_H
+
+#include "rrdengine.h"
+
+#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8)
+#define RRDENG_MIN_DISK_SPACE_MB (64)
+
+#define RRDENG_NR_STATS (37)
+
+#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
+
+extern int default_rrdeng_page_cache_mb;
+extern int default_rrdeng_disk_quota_mb;
+extern int default_multidb_disk_quota_mb;
+extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure;
+extern struct rrdengine_instance multidb_ctx;
+
+struct rrdeng_region_info {
+ time_t start_time;
+ int update_every;
+ unsigned points;
+};
+
+extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
+extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+ Word_t page_correlation_id);
+extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
+extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
+extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
+
+extern void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid);
+extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid,
+ uuid_t *ret_uuid);
+
+
+extern void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid);
+extern void rrdeng_store_metric_init(RRDDIM *rd);
+extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd);
+extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
+extern int rrdeng_store_metric_finalize(RRDDIM *rd);
+extern unsigned
+ rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
+ struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list);
+extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
+ time_t start_time, time_t end_time);
+extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time);
+extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle);
+extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
+extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
+extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
+extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+
+/* must call once before using anything */
+extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb);
+
+extern int rrdeng_exit(struct rrdengine_instance *ctx);
+extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
new file mode 100644
index 0000000..287b86b
--- /dev/null
+++ b/database/engine/rrdenginelib.c
@@ -0,0 +1,294 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+#define BUFSIZE (512)
+
+/* Caller must hold descriptor lock */
+void print_page_cache_descr(struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ char uuid_str[UUID_STR_LEN];
+ char str[BUFSIZE + 1];
+ int pos = 0;
+
+ uuid_unparse_lower(*descr->id, uuid_str);
+ pos += snprintfz(str, BUFSIZE - pos, "page(%p) id=%s\n"
+ "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
+ pg_cache_descr->page, uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time,
+ (uint64_t)descr->end_time);
+ if (!descr->extent) {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
+ } else {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
+ }
+
+ snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt);
+ debug(D_RRDENGINE, "%s", str);
+}
+
+void print_page_descr(struct rrdeng_page_descr *descr)
+{
+ char uuid_str[UUID_STR_LEN];
+ char str[BUFSIZE + 1];
+ int pos = 0;
+
+ uuid_unparse_lower(*descr->id, uuid_str);
+ pos += snprintfz(str, BUFSIZE - pos, "id=%s\n"
+ "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
+ uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time,
+ (uint64_t)descr->end_time);
+ if (!descr->extent) {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
+ } else {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
+ }
+ snprintfz(str + pos, BUFSIZE - pos, "\n\n");
+ fputs(str, stderr);
+}
+
+int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
+{
+ int ret;
+ uv_fs_t req;
+ uv_stat_t* s;
+
+ ret = uv_fs_fstat(NULL, &req, file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_fstat: %s\n", uv_strerror(ret));
+ }
+ fatal_assert(req.result == 0);
+ s = req.ptr;
+ if (!(s->st_mode & S_IFREG)) {
+ error("Not a regular file.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ if (s->st_size < min_size) {
+ error("File length is too short.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ *file_size = s->st_size;
+ uv_fs_req_cleanup(&req);
+
+ return 0;
+}
+
+/**
+ * Open file for I/O.
+ *
+ * @param path The full path of the file.
+ * @param flags Same flags as the open() system call uses.
+ * @param file On success sets (*file) to be the uv_file that was opened.
+ * @param direct Tries to open a file in direct I/O mode when direct=1, falls back to buffered mode if not possible.
+ * @return Returns UV error number that is < 0 on failure. 0 on success.
+ */
+int open_file_for_io(char *path, int flags, uv_file *file, int direct)
+{
+ uv_fs_t req;
+ int fd = -1, current_flags;
+
+ fatal_assert(0 == direct || 1 == direct);
+ for ( ; direct >= 0 ; --direct) {
+#ifdef __APPLE__
+ /* Apple OS does not support O_DIRECT */
+ direct = 0;
+#endif
+ current_flags = flags;
+ if (direct) {
+ current_flags |= O_DIRECT;
+ }
+ fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ if ((direct) && (UV_EINVAL == fd)) {
+ error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path);
+ } else {
+ error("Failed to open file \"%s\".", path);
+ --direct; /* break the loop */
+ }
+ } else {
+ fatal_assert(req.result >= 0);
+ *file = req.result;
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ --direct; /* break the loop */
+ }
+ uv_fs_req_cleanup(&req);
+ }
+
+ return fd;
+}
+
+char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size)
+{
+ struct page_cache *pg_cache;
+
+ pg_cache = &ctx->pg_cache;
+ snprintfz(str, size,
+ "metric_API_producers: %ld\n"
+ "metric_API_consumers: %ld\n"
+ "page_cache_total_pages: %ld\n"
+ "page_cache_descriptors: %ld\n"
+ "page_cache_populated_pages: %ld\n"
+ "page_cache_committed_pages: %ld\n"
+ "page_cache_insertions: %ld\n"
+ "page_cache_deletions: %ld\n"
+ "page_cache_hits: %ld\n"
+ "page_cache_misses: %ld\n"
+ "page_cache_backfills: %ld\n"
+ "page_cache_evictions: %ld\n"
+ "compress_before_bytes: %ld\n"
+ "compress_after_bytes: %ld\n"
+ "decompress_before_bytes: %ld\n"
+ "decompress_after_bytes: %ld\n"
+ "io_write_bytes: %ld\n"
+ "io_write_requests: %ld\n"
+ "io_read_bytes: %ld\n"
+ "io_read_requests: %ld\n"
+ "io_write_extent_bytes: %ld\n"
+ "io_write_extents: %ld\n"
+ "io_read_extent_bytes: %ld\n"
+ "io_read_extents: %ld\n"
+ "datafile_creations: %ld\n"
+ "datafile_deletions: %ld\n"
+ "journalfile_creations: %ld\n"
+ "journalfile_deletions: %ld\n"
+ "io_errors: %ld\n"
+ "fs_errors: %ld\n"
+ "global_io_errors: %ld\n"
+ "global_fs_errors: %ld\n"
+ "rrdeng_reserved_file_descriptors: %ld\n"
+ "pg_cache_over_half_dirty_events: %ld\n"
+ "global_pg_cache_over_half_dirty_events: %ld\n"
+ "flushing_pressure_page_deletions: %ld\n"
+ "global_flushing_pressure_page_deletions: %ld\n",
+ (long)ctx->stats.metric_API_producers,
+ (long)ctx->stats.metric_API_consumers,
+ (long)pg_cache->page_descriptors,
+ (long)ctx->stats.page_cache_descriptors,
+ (long)pg_cache->populated_pages,
+ (long)pg_cache->committed_page_index.nr_committed_pages,
+ (long)ctx->stats.pg_cache_insertions,
+ (long)ctx->stats.pg_cache_deletions,
+ (long)ctx->stats.pg_cache_hits,
+ (long)ctx->stats.pg_cache_misses,
+ (long)ctx->stats.pg_cache_backfills,
+ (long)ctx->stats.pg_cache_evictions,
+ (long)ctx->stats.before_compress_bytes,
+ (long)ctx->stats.after_compress_bytes,
+ (long)ctx->stats.before_decompress_bytes,
+ (long)ctx->stats.after_decompress_bytes,
+ (long)ctx->stats.io_write_bytes,
+ (long)ctx->stats.io_write_requests,
+ (long)ctx->stats.io_read_bytes,
+ (long)ctx->stats.io_read_requests,
+ (long)ctx->stats.io_write_extent_bytes,
+ (long)ctx->stats.io_write_extents,
+ (long)ctx->stats.io_read_extent_bytes,
+ (long)ctx->stats.io_read_extents,
+ (long)ctx->stats.datafile_creations,
+ (long)ctx->stats.datafile_deletions,
+ (long)ctx->stats.journalfile_creations,
+ (long)ctx->stats.journalfile_deletions,
+ (long)ctx->stats.io_errors,
+ (long)ctx->stats.fs_errors,
+ (long)global_io_errors,
+ (long)global_fs_errors,
+ (long)rrdeng_reserved_file_descriptors,
+ (long)ctx->stats.pg_cache_over_half_dirty_events,
+ (long)global_pg_cache_over_half_dirty_events,
+ (long)ctx->stats.flushing_pressure_page_deletions,
+ (long)global_flushing_pressure_page_deletions
+ );
+ return str;
+}
+
+int is_legacy_child(const char *machine_guid)
+{
+ uuid_t uuid;
+ char dbengine_file[FILENAME_MAX+1];
+
+ if (unlikely(!strcmp(machine_guid, "unittest-dbengine") || !strcmp(machine_guid, "dbengine-dataset") ||
+ !strcmp(machine_guid, "dbengine-stress-test"))) {
+ return 1;
+ }
+ if (!uuid_parse(machine_guid, uuid)) {
+ uv_fs_t stat_req;
+ snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid);
+ int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL);
+ if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) {
+ //info("Found legacy engine folder \"%s\"", dbengine_file);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+int count_legacy_children(char *dbfiles_path)
+{
+ int ret;
+ uv_fs_t req;
+ uv_dirent_t dent;
+ int legacy_engines = 0;
+
+ ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
+ return ret;
+ }
+
+ while(UV_EOF != uv_fs_scandir_next(&req, &dent)) {
+ if (dent.type == UV_DIRENT_DIR) {
+ if (is_legacy_child(dent.name))
+ legacy_engines++;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+ return legacy_engines;
+}
+
+int compute_multidb_diskspace()
+{
+ char multidb_disk_space_file[FILENAME_MAX + 1];
+ FILE *fp;
+ int computed_multidb_disk_quota_mb = -1;
+
+ snprintfz(multidb_disk_space_file, FILENAME_MAX, "%s/dbengine_multihost_size", netdata_configured_varlib_dir);
+ fp = fopen(multidb_disk_space_file, "r");
+ if (likely(fp)) {
+ int rc = fscanf(fp, "%d", &computed_multidb_disk_quota_mb);
+ fclose(fp);
+ if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) {
+ errno = 0;
+ error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file);
+ computed_multidb_disk_quota_mb = -1;
+ }
+ }
+
+ if (computed_multidb_disk_quota_mb == -1) {
+ int rc = count_legacy_children(netdata_configured_cache_dir);
+ if (likely(rc >= 0)) {
+ computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb;
+ info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb);
+
+ fp = fopen(multidb_disk_space_file, "w");
+ if (likely(fp)) {
+ fprintf(fp, "%d", computed_multidb_disk_quota_mb);
+ info("Created file '%s' to store the computed value", multidb_disk_space_file);
+ fclose(fp);
+ } else
+ error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file);
+ }
+ else
+ computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb;
+ }
+
+ return computed_multidb_disk_quota_mb;
+}
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
new file mode 100644
index 0000000..ebab93c
--- /dev/null
+++ b/database/engine/rrdenginelib.h
@@ -0,0 +1,144 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINELIB_H
+#define NETDATA_RRDENGINELIB_H
+
+/* Forward declarations */
+struct rrdeng_page_descr;
+struct rrdengine_instance;
+
+#define STR_HELPER(x) #x
+#define STR(x) STR_HELPER(x)
+
+#define BITS_PER_ULONG (sizeof(unsigned long) * 8)
+
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN (37)
+#endif
+
+/* Taken from linux kernel */
+#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))
+
+#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+
+#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC)
+
+typedef uintptr_t rrdeng_stats_t;
+
+#ifdef __ATOMIC_RELAXED
+#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED)
+#define rrd_atomic_add_fetch(p, n) __atomic_add_fetch(p, n, __ATOMIC_RELAXED)
+#else
+#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n)
+#define rrd_atomic_add_fetch(p, n) __sync_add_and_fetch(p, n)
+#endif
+
+#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n)
+
+/* returns -1 if it didn't find the first cleared bit, the position otherwise. Starts from LSB. */
+static inline int find_first_zero(unsigned x)
+{
+ return ffs((int)(~x)) - 1;
+}
+
+/* Starts from LSB. */
+static inline uint8_t check_bit(unsigned x, size_t pos)
+{
+ return !!(x & (1 << pos));
+}
+
+/* Starts from LSB. val is 0 or 1 */
+static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val)
+{
+ switch(val) {
+ case 0:
+ *x &= ~(1U << pos);
+ break;
+ case 1:
+ *x |= 1U << pos;
+ break;
+ default:
+ error("modify_bit() called with invalid argument.");
+ break;
+ }
+}
+
+#define RRDENG_PATH_MAX (4096)
+
+/* returns old *ptr value */
+static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr,
+ unsigned long oldval, unsigned long newval)
+{
+ return __sync_val_compare_and_swap(ptr, oldval, newval);
+}
+
+#ifndef O_DIRECT
+/* Workaround for OS X */
+#define O_DIRECT (0)
+#endif
+
+struct completion {
+ uv_mutex_t mutex;
+ uv_cond_t cond;
+ volatile unsigned completed;
+};
+
+static inline void init_completion(struct completion *p)
+{
+ p->completed = 0;
+ fatal_assert(0 == uv_cond_init(&p->cond));
+ fatal_assert(0 == uv_mutex_init(&p->mutex));
+}
+
+static inline void destroy_completion(struct completion *p)
+{
+ uv_cond_destroy(&p->cond);
+ uv_mutex_destroy(&p->mutex);
+}
+
+static inline void wait_for_completion(struct completion *p)
+{
+ uv_mutex_lock(&p->mutex);
+ while (0 == p->completed) {
+ uv_cond_wait(&p->cond, &p->mutex);
+ }
+ fatal_assert(1 == p->completed);
+ uv_mutex_unlock(&p->mutex);
+}
+
+static inline void complete(struct completion *p)
+{
+ uv_mutex_lock(&p->mutex);
+ p->completed = 1;
+ uv_mutex_unlock(&p->mutex);
+ uv_cond_broadcast(&p->cond);
+}
+
+static inline int crc32cmp(void *crcp, uLong crc)
+{
+ return (*(uint32_t *)crcp != crc);
+}
+
+static inline void crc32set(void *crcp, uLong crc)
+{
+ *(uint32_t *)crcp = crc;
+}
+
+extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr);
+extern void print_page_descr(struct rrdeng_page_descr *descr);
+extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
+extern int open_file_for_io(char *path, int flags, uv_file *file, int direct);
+static inline int open_file_direct_io(char *path, int flags, uv_file *file)
+{
+ return open_file_for_io(path, flags, file, 1);
+}
+static inline int open_file_buffered_io(char *path, int flags, uv_file *file)
+{
+ return open_file_for_io(path, flags, file, 0);
+}
+extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
+extern int compute_multidb_diskspace();
+extern int is_legacy_child(const char *machine_guid);
+
+#endif /* NETDATA_RRDENGINELIB_H */ \ No newline at end of file
diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c
new file mode 100644
index 0000000..a23abf3
--- /dev/null
+++ b/database/engine/rrdenglocking.c
@@ -0,0 +1,241 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx)
+{
+ struct page_cache_descr *pg_cache_descr;
+
+ pg_cache_descr = mallocz(sizeof(*pg_cache_descr));
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, 1);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags = 0;
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
+ pg_cache_descr->refcnt = 0;
+ pg_cache_descr->waiters = 0;
+ fatal_assert(0 == uv_cond_init(&pg_cache_descr->cond));
+ fatal_assert(0 == uv_mutex_init(&pg_cache_descr->mutex));
+
+ return pg_cache_descr;
+}
+
+void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr)
+{
+ uv_cond_destroy(&pg_cache_descr->cond);
+ uv_mutex_destroy(&pg_cache_descr->mutex);
+ freez(pg_cache_descr);
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1);
+}
+
+/* also allocates page cache descriptor if missing */
+void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, old_users, new_state, ret_state;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t we_locked;
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ fatal_assert(old_state & PG_CACHE_DESCR_LOCKED);
+ new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ fatal_assert(0 == old_users);
+ continue; /* spin */
+ }
+ if (0 == old_state) {
+ /* no page cache descriptor has been allocated */
+
+ if (NULL == pg_cache_descr) {
+ pg_cache_descr = rrdeng_create_pg_cache_descr(ctx);
+ }
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, 0, new_state);
+ if (0 == ret_state) {
+ we_locked = 1;
+ descr->pg_cache_descr = pg_cache_descr;
+ pg_cache_descr->descr = descr;
+ pg_cache_descr = NULL; /* make sure we don't free pg_cache_descr */
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ /* page cache descriptor is already allocated */
+ if (unlikely(!(old_state & PG_CACHE_DESCR_ALLOCATED))) {
+ fatal("Invalid page cache descriptor locking state:%#lX", old_state);
+ }
+ new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+
+ if (pg_cache_descr) {
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ }
+ pg_cache_descr = descr->pg_cache_descr;
+ uv_mutex_lock(&pg_cache_descr->mutex);
+}
+
+void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL;
+ uint8_t we_locked;
+
+ uv_mutex_unlock(&descr->pg_cache_descr->mutex);
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ fatal_assert(0 == old_users);
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0);
+ if (old_state == ret_state) {
+ /* success */
+ rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr);
+ return;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ fatal_assert(0 == old_users);
+ continue; /* spin */
+ }
+ fatal_assert(old_state & PG_CACHE_DESCR_ALLOCATED);
+ pg_cache_descr = descr->pg_cache_descr;
+ /* caller is the only page cache descriptor user and there are no pending references on the page */
+ if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) &&
+ !pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ fatal_assert(!pg_cache_descr->waiters);
+
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ we_locked = 1;
+ delete_pg_cache_descr = pg_cache_descr;
+ descr->pg_cache_descr = NULL;
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ fatal_assert(old_users > 0);
+ new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+}
+
+/*
+ * Tries to deallocate page cache descriptor. If it fails, it postpones deallocation by setting the
+ * PG_CACHE_DESCR_DESTROY flag which will be eventually cleared by a different context after doing
+ * the deallocation.
+ */
+void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t just_locked, can_free, must_unlock;
+
+ just_locked = 0;
+ can_free = 0;
+ must_unlock = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(just_locked)) {
+ fatal_assert(0 == old_users);
+
+ must_unlock = 1;
+ just_locked = 0;
+ /* Try deallocate if there are no pending references on the page */
+ if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ fatal_assert(!pg_cache_descr->waiters);
+
+ descr->pg_cache_descr = NULL;
+ can_free = 1;
+ /* success */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (unlikely(must_unlock)) {
+ fatal_assert(0 == old_users);
+
+ if (can_free) {
+ /* success */
+ new_state = 0;
+ } else {
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+ new_state &= ~PG_CACHE_DESCR_LOCKED;
+ }
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* unlocked */
+ if (can_free)
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ return;
+ }
+ continue; /* spin */
+ }
+ if (!(old_state & PG_CACHE_DESCR_ALLOCATED)) {
+ /* don't do anything */
+ return;
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ fatal_assert(0 == old_users);
+ continue; /* spin */
+ }
+ /* caller is the only page cache descriptor user */
+ if (0 == old_users) {
+ new_state = old_state | PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ just_locked = 1;
+ pg_cache_descr = descr->pg_cache_descr;
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_DESTROY) {
+ /* don't do anything */
+ return;
+ }
+ /* plant PG_CACHE_DESCR_DESTROY so that other contexts eventually free the page cache descriptor */
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ return;
+ }
+ /* spin */
+ }
+} \ No newline at end of file
diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h
new file mode 100644
index 0000000..127ddc9
--- /dev/null
+++ b/database/engine/rrdenglocking.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGLOCKING_H
+#define NETDATA_RRDENGLOCKING_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct page_cache_descr;
+
+extern struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx);
+extern void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr);
+extern void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+
+#endif /* NETDATA_RRDENGLOCKING_H */ \ No newline at end of file