summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/Makefile.am11
-rw-r--r--database/engine/README.md302
-rw-r--r--database/engine/datafile.c460
-rw-r--r--database/engine/datafile.h67
-rw-r--r--database/engine/journalfile.c587
-rw-r--r--database/engine/journalfile.h49
-rw-r--r--database/engine/metadata_log/README.md0
-rw-r--r--database/engine/pagecache.c1313
-rw-r--r--database/engine/pagecache.h254
-rw-r--r--database/engine/rrddiskprotocol.h120
-rw-r--r--database/engine/rrdengine.c1508
-rw-r--r--database/engine/rrdengine.h283
-rwxr-xr-xdatabase/engine/rrdengineapi.c1272
-rw-r--r--database/engine/rrdengineapi.h144
-rw-r--r--database/engine/rrdenginelib.c311
-rw-r--r--database/engine/rrdenginelib.h102
-rw-r--r--database/engine/rrdenglocking.c241
-rw-r--r--database/engine/rrdenglocking.h17
18 files changed, 7041 insertions, 0 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
new file mode 100644
index 0000000..59250a9
--- /dev/null
+++ b/database/engine/Makefile.am
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+SUBDIRS = \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/README.md b/database/engine/README.md
new file mode 100644
index 0000000..c67e400
--- /dev/null
+++ b/database/engine/README.md
@@ -0,0 +1,302 @@
+<!--
+title: "Database engine"
+description: "Netdata's highly-efficient database engine use both RAM and disk for distributed, long-term storage of per-second metrics."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/database/engine/README.md
+-->
+
+# Database engine
+
+The Database Engine works like a traditional time series database. Unlike other [database modes](/database/README.md),
+the amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression
+ratio, not a fixed number of metrics collected.
+
+## Tiering
+
+Tiering is a mechanism of providing multiple tiers of data with
+different [granularity on metrics](/docs/store/distributed-data-architecture.md#granularity-of-metrics).
+
+For Netdata Agents with version `netdata-1.35.0.138.nightly` and greater, `dbengine` supports Tiering, allowing almost
+unlimited retention of data.
+
+
+### Metric size
+
+Every Tier down samples the exact lower tier (lower tiers have greater resolution). You can have up to 5
+Tiers **[0. . 4]** of data (including the Tier 0, which has the highest resolution)
+
+Tier 0 is the default that was always available in `dbengine` mode. Tier 1 is the first level of aggregation, Tier 2 is
+the second, and so on.
+
+Metrics on all tiers except of the _Tier 0_ also store the following five additional values for every point for accurate
+representation:
+
+1. The `sum` of the points aggregated
+2. The `min` of the points aggregated
+3. The `max` of the points aggregated
+4. The `count` of the points aggregated (could be constant, but it may not be due to gaps in data collection)
+5. The `anomaly_count` of the points aggregated (how many of the aggregated points found anomalous)
+
+Among `min`, `max` and `sum`, the correct value is chosen based on the user query. `average` is calculated on the fly at
+query time.
+
+### Tiering in a nutshell
+
+The `dbengine` is capable of retaining metrics for years. To further understand the `dbengine` tiering mechanism let's
+explore the following configuration.
+
+```
+[db]
+ mode = dbengine
+
+ # per second data collection
+ update every = 1
+
+ # enables Tier 1 and Tier 2, Tier 0 is always enabled in dbengine mode
+ storage tiers = 3
+
+ # Tier 0, per second data for a week
+ dbengine multihost disk space MB = 1100
+
+ # Tier 1, per minute data for a month
+ dbengine tier 1 multihost disk space MB = 330
+
+ # Tier 2, per hour data for a year
+ dbengine tier 2 multihost disk space MB = 67
+```
+
+For 2000 metrics, collected every second and retained for a week, Tier 0 needs: 1 byte x 2000 metrics x 3600 secs per
+hour x 24 hours per day x 7 days per week = 1100MB.
+
+By setting `dbengine multihost disk space MB` to `1100`, this node will start maintaining about a week of data. But pay
+attention to the number of metrics. If you have more than 2000 metrics on a node, or you need more that a week of high
+resolution metrics, you may need to adjust this setting accordingly.
+
+Tier 1 is by default sampling the data every **60 points of Tier 0**. In our case, Tier 0 is per second, if we want to
+transform this information in terms of time then the Tier 1 "resolution" is per minute.
+
+Tier 1 needs four times more storage per point compared to Tier 0. So, for 2000 metrics, with per minute resolution,
+retained for a month, Tier 1 needs: 4 bytes x 2000 metrics x 60 minutes per hour x 24 hours per day x 30 days per month
+= 330MB.
+
+Tier 2 is by default sampling data every 3600 points of Tier 0 (60 of Tier 1, which is the previous exact Tier). Again
+in term of "time" (Tier 0 is per second), then Tier 2 is per hour.
+
+The storage requirements are the same to Tier 1.
+
+For 2000 metrics, with per hour resolution, retained for a year, Tier 2 needs: 4 bytes x 2000 metrics x 24 hours per day
+x 365 days per year = 67MB.
+
+## Legacy configuration
+
+### v1.35.1 and prior
+
+These versions of the Agent do not support [Tiering](#Tiering). You could change the metric retention for the parent and
+all of its children only with the `dbengine multihost disk space MB` setting. This setting accounts the space allocation
+for the parent node and all of its children.
+
+To configure the database engine, look for the `page cache size MB` and `dbengine multihost disk space MB` settings in
+the `[db]` section of your `netdata.conf`.
+
+```conf
+[db]
+ dbengine page cache size MB = 32
+ dbengine multihost disk space MB = 256
+```
+
+### v1.23.2 and prior
+
+_For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and
+another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five
+instances in total (`1 parent + 4 child nodes = 5 instances`).
+
+The Agent allocates resources for each instance separately using the `dbengine disk space MB` (**deprecated**) setting.
+If
+`dbengine disk space MB`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space,
+which means the total disk space required to store all instances is,
+roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`.
+
+#### Backward compatibility
+
+All existing metrics belonging to child nodes are automatically converted to legacy dbengine instances and the localhost
+metrics are transferred to the multihost dbengine instance.
+
+All new child nodes are automatically transferred to the multihost dbengine instance and share its page cache and disk
+space. If you want to migrate a child node from its legacy dbengine instance to the multihost dbengine instance, you
+must delete the instance's directory, which is located in `/var/cache/netdata/MACHINE_GUID/dbengine`, after stopping the
+Agent.
+
+##### Information
+
+For more information about setting `[db].mode` on your nodes, in addition to other streaming configurations, see
+[streaming](/streaming/README.md).
+
+## Requirements & limitations
+
+### Memory
+
+Using database mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the
+available memory.
+
+There are explicit memory requirements **per** DB engine **instance**:
+
+- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what
+ the user configured with `dbengine page cache size MB`.
+
+
+- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata.
+
+ - roughly speaking this is 3% of the uncompressed disk space taken by the DB files.
+
+ - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space
+ footprint.
+
+An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space`
+options.
+
+You can use
+our [database engine calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-ram-disk-space-needed-to-store-metrics)
+to validate the memory requirements for your particular system(s) and configuration (**out-of-date**).
+
+### Disk space
+
+There are explicit disk space requirements **per** DB engine **instance**:
+
+- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what the
+ user configured with `dbengine multihost disk space` or `dbengine disk space`.
+
+### File descriptor
+
+The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or parent
+server). When configuring your system you should make sure there are at least 50 file descriptors available per
+`dbengine` instance.
+
+Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of
+the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take
+that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the
+Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use
+`[db].mode = dbengine`.
+
+If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to
+override the Netdata service by running `sudo systemctl edit netdata` and creating a file with contents:
+
+```sh
+[Service]
+LimitNOFILE=65536
+```
+
+For other types of services one can add the line:
+
+```sh
+ulimit -n 65536
+```
+
+at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing
+`/etc/sysctl.conf`. For linux that would be:
+
+```conf
+fs.file-max = 65536
+```
+
+In FreeBSD and OS X you change the lines like this:
+
+```conf
+kern.maxfilesperproc=65536
+kern.maxfiles=65536
+```
+
+You can apply the settings by running `sysctl -p` or by rebooting.
+
+## Files
+
+With the DB engine mode the metric data are stored in database files. These files are organized in pairs, the datafiles
+and their corresponding journalfiles, e.g.:
+
+```sh
+datafile-1-0000000001.ndf
+journalfile-1-0000000001.njf
+datafile-1-0000000002.ndf
+journalfile-1-0000000002.njf
+datafile-1-0000000003.ndf
+journalfile-1-0000000003.njf
+...
+```
+
+They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default
+location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user
+can safely delete some pairs of files when Netdata is stopped to manually free up some space.
+
+_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ You can also set up
+one or more [exporting connectors](/exporting/README.md) to send your Netdata metrics to other databases for long-term
+storage at lower granularity.
+
+## Operation
+
+The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store
+consecutive values generated from the data collectors. Those pages comprise the **Page Cache**.
+
+When those pages fill up, they are slowly compressed and flushed to disk. It can
+take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected every 1 second, to fill a
+page. Pages can be cut short when we stop Netdata or the DB engine instance so as to not lose the data. When we query
+the DB engine for data we trigger disk read I/O requests that fill the Page Cache with the requested pages and
+potentially evict cold (not recently used)
+pages.
+
+When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting
+the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated
+and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time.
+
+The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O
+traffic so as to create the minimum possible interference with other applications.
+
+## Evaluation
+
+We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the web
+API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a Netdata
+parent node. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a Seagate
+Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD.
+
+For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker
+thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis
+of the time-series is emulated and accelerated so that the worker threads can generate as many data points as possible
+without delays.
+
+We also defined 32 worker threads that perform queries on random metrics with semi-random time ranges. The starting time
+of the query is randomly selected between the beginning of the time-series and the time of the latest data point. The
+ending time is randomly selected between 1 second and 1 hour after the starting time. The pseudo-random numbers are
+generated with a uniform distribution.
+
+The data are written to the database at the same time as they are read from it. This is a concurrent read/write mixed
+workload with a duration of 60 seconds. The faster `dbengine` runs, the bigger the dataset size becomes since more data
+points will be generated. We set a page cache size of 64MiB for the two disk-bound scenarios. This way, the dataset size
+of the metric data is much bigger than the RAM that is being used for caching so as to trigger I/O requests most of the
+time. In our final scenario, we set the page cache size to 16 GiB. That way, the dataset fits in the page cache so as to
+avoid all disk bottlenecks.
+
+The reported numbers are the following:
+
+| device | page cache | dataset | reads/sec | writes/sec |
+|:------:|:----------:|--------:|----------:|-----------:|
+| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M |
+| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M |
+| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M |
+
+where "reads/sec" is the number of metric data points being read from the database via its API per second and
+"writes/sec" is the number of metric data points being written to the database per second.
+
+Notice that the HDD numbers are pretty high and not much slower than the SSD numbers. This is thanks to the database
+engine design being optimized for rotating media. In the database engine disk I/O requests are:
+
+- asynchronous to mask the high I/O latency of HDDs.
+- mostly large to reduce the amount of HDD seeking time.
+- mostly sequential to reduce the amount of HDD seeking time.
+- compressed to reduce the amount of required throughput.
+
+As a result, the HDD is not thousands of times slower than the SSD, which is typical for other workloads.
+
+An interesting observation to make is that the CPU-bound run (16 GiB page cache) generates fewer data than the SSD run
+(6.8 GiB vs 9.8 GiB). The reason is that the 32 reader threads in the SSD scenario are more frequently blocked by I/O,
+and generate a read load of 1.7M/sec, whereas in the CPU-bound scenario the read load is 70 times higher at 118M/sec.
+Consequently, there is a significant degree of interference by the reader threads, that slow down the writer threads.
+This is also possible because the interference effects are greater than the SSD impact on data generation throughput.
+
+
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
new file mode 100644
index 0000000..9c70068
--- /dev/null
+++ b/database/engine/datafile.c
@@ -0,0 +1,460 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void df_extent_insert(struct extent_info *extent)
+{
+ struct rrdengine_datafile *datafile = extent->datafile;
+
+ if (likely(NULL != datafile->extents.last)) {
+ datafile->extents.last->next = extent;
+ }
+ if (unlikely(NULL == datafile->extents.first)) {
+ datafile->extents.first = extent;
+ }
+ datafile->extents.last = extent;
+}
+
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ if (likely(NULL != ctx->datafiles.last)) {
+ ctx->datafiles.last->next = datafile;
+ }
+ if (unlikely(NULL == ctx->datafiles.first)) {
+ ctx->datafiles.first = datafile;
+ }
+ ctx->datafiles.last = datafile;
+}
+
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_datafile *next;
+
+ next = datafile->next;
+ fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
+ ctx->datafiles.first = next;
+}
+
+
+static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
+ unsigned tier, unsigned fileno)
+{
+ fatal_assert(tier == 1);
+ datafile->tier = tier;
+ datafile->fileno = fileno;
+ datafile->file = (uv_file)0;
+ datafile->pos = 0;
+ datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
+ datafile->journalfile = NULL;
+ datafile->next = NULL;
+ datafile->ctx = ctx;
+}
+
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+int close_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+int unlink_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return ret;
+}
+
+int destroy_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return ret;
+}
+
+int create_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ datafile->file = file;
+ ++ctx->stats.datafile_creations;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ memset(superblock, 0, sizeof(*superblock));
+ (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
+ superblock->tier = 1;
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ posix_memfree(superblock);
+ if (ret < 0) {
+ destroy_data_file(datafile);
+ return ret;
+ }
+
+ datafile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+
+ return 0;
+}
+
+static int check_data_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
+ superblock->tier != 1) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ posix_memfree(superblock);
+ return ret;
+}
+
+static int load_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd, error;
+ uint64_t file_size;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_RDWR, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ info("Initializing data file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_CEILING(file_size);
+
+ ret = check_data_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
+ ++ctx->stats.io_read_requests;
+
+ datafile->file = file;
+ datafile->pos = file_size;
+
+ info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ return error;
+}
+
+static int scan_data_files_cmp(const void *a, const void *b)
+{
+ struct rrdengine_datafile *file1, *file2;
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
+
+ file1 = *(struct rrdengine_datafile **)a;
+ file2 = *(struct rrdengine_datafile **)b;
+ generate_datafilepath(file1, path1, sizeof(path1));
+ generate_datafilepath(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of datafiles that were loaded or < 0 on error */
+static int scan_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+ unsigned tier, no, matched_files, i,failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct rrdengine_datafile **datafiles, *datafile;
+ struct rrdengine_journalfile *journalfile;
+
+ ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
+ info("Found %d files in path %s", ret, ctx->dbfiles_path);
+
+ datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
+ ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
+ if (2 == ret) {
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, no);
+ datafiles[matched_files++] = datafile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (0 == matched_files) {
+ freez(datafiles);
+ return 0;
+ }
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ /* TODO: change this when tiering is implemented */
+ ctx->last_fileno = datafiles[matched_files - 1]->fileno;
+
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ uint8_t must_delete_pair = 0;
+
+ datafile = datafiles[i];
+ ret = load_data_file(datafile);
+ if (0 != ret) {
+ must_delete_pair = 1;
+ }
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = load_journal_file(ctx, journalfile, datafile);
+ if (0 != ret) {
+ if (!must_delete_pair) /* If datafile is still open close it */
+ close_data_file(datafile);
+ must_delete_pair = 1;
+ }
+ if (must_delete_pair) {
+ char path[RRDENG_PATH_MAX];
+
+ error("Deleting invalid data and journal file pair.");
+ ret = unlink_journal_file(journalfile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ }
+ ret = unlink_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ }
+ freez(journalfile);
+ freez(datafile);
+ ++failed_to_load;
+ continue;
+ }
+
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+ }
+ matched_files -= failed_to_load;
+ freez(datafiles);
+
+ return matched_files;
+}
+
+/* Creates a datafile and a journalfile pair */
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+{
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ info("Creating new data and journal files in path %s", ctx->dbfiles_path);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, fileno);
+ ret = create_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Created data file \"%s\".", path);
+ } else {
+ goto error_after_datafile;
+ }
+
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = create_journal_file(journalfile, datafile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Created journal file \"%s\".", path);
+ } else {
+ goto error_after_journalfile;
+ }
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+
+ return 0;
+
+error_after_journalfile:
+ destroy_data_file(datafile);
+ freez(journalfile);
+error_after_datafile:
+ freez(datafile);
+ return ret;
+}
+
+/* Page cache must already be initialized.
+ * Return 0 on success.
+ */
+int init_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+
+ ret = scan_data_files(ctx);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ } else if (0 == ret) {
+ info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
+ ret = create_new_datafile_pair(ctx, 1, 1);
+ if (ret) {
+ error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ }
+ ctx->last_fileno = 1;
+ }
+
+ return 0;
+}
+
+void finalize_data_files(struct rrdengine_instance *ctx)
+{
+ struct rrdengine_datafile *datafile, *next_datafile;
+ struct rrdengine_journalfile *journalfile;
+ struct extent_info *extent, *next_extent;
+
+ for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
+ journalfile = datafile->journalfile;
+ next_datafile = datafile->next;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
+ next_extent = extent->next;
+ freez(extent);
+ }
+ close_journal_file(journalfile, datafile);
+ close_data_file(datafile);
+ freez(journalfile);
+ freez(datafile);
+ }
+}
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
new file mode 100644
index 0000000..1cf256a
--- /dev/null
+++ b/database/engine/datafile.h
@@ -0,0 +1,67 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_DATAFILE_H
+#define NETDATA_DATAFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+struct rrdengine_instance;
+
+#define DATAFILE_PREFIX "datafile-"
+#define DATAFILE_EXTENSION ".ndf"
+
+#define MAX_DATAFILE_SIZE (1073741824LU)
+#define MIN_DATAFILE_SIZE (4194304LU)
+#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
+#define TARGET_DATAFILES (20)
+
+#define DATAFILE_IDEAL_IO_SIZE (1048576U)
+
+struct extent_info {
+ uint64_t offset;
+ uint32_t size;
+ uint8_t number_of_pages;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *next;
+ struct rrdeng_page_descr *pages[];
+};
+
+struct rrdengine_df_extents {
+ /* the extent list is sorted based on disk offset */
+ struct extent_info *first;
+ struct extent_info *last;
+};
+
+/* only one event loop is supported for now */
+struct rrdengine_datafile {
+ unsigned tier;
+ unsigned fileno;
+ uv_file file;
+ uint64_t pos;
+ struct rrdengine_instance *ctx;
+ struct rrdengine_df_extents extents;
+ struct rrdengine_journalfile *journalfile;
+ struct rrdengine_datafile *next;
+};
+
+struct rrdengine_datafile_list {
+ struct rrdengine_datafile *first; /* oldest */
+ struct rrdengine_datafile *last; /* newest */
+};
+
+void df_extent_insert(struct extent_info *extent);
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+int close_data_file(struct rrdengine_datafile *datafile);
+int unlink_data_file(struct rrdengine_datafile *datafile);
+int destroy_data_file(struct rrdengine_datafile *datafile);
+int create_data_file(struct rrdengine_datafile *datafile);
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+int init_data_files(struct rrdengine_instance *ctx);
+void finalize_data_files(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
new file mode 100644
index 0000000..500dd78
--- /dev/null
+++ b/database/engine/journalfile.c
@@ -0,0 +1,587 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+static void flush_transaction_buffer_cb(uv_fs_t* req)
+{
+ struct generic_io_descriptor *io_descr = req->data;
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ if (req->result < 0) {
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ } else {
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ }
+
+ uv_fs_req_cleanup(req);
+ posix_memfree(io_descr->buf);
+ freez(io_descr);
+}
+
+/* Careful to always call this before creating a new journal file */
+void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ struct generic_io_descriptor *io_descr;
+ unsigned pos, size;
+ struct rrdengine_journalfile *journalfile;
+
+ if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
+ return;
+ }
+ /* care with outstanding transactions when switching journal files */
+ journalfile = ctx->datafiles.last->journalfile;
+
+ io_descr = mallocz(sizeof(*io_descr));
+ pos = ctx->commit_log.buf_pos;
+ size = ctx->commit_log.buf_size;
+ if (pos < size) {
+ /* simulate an empty transaction to skip the rest of the block */
+ *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
+ }
+ io_descr->buf = ctx->commit_log.buf;
+ io_descr->bytes = size;
+ io_descr->pos = journalfile->pos;
+ io_descr->req.data = io_descr;
+ io_descr->completion = NULL;
+
+ io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
+ ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
+ journalfile->pos, flush_transaction_buffer_cb);
+ fatal_assert(-1 != ret);
+ journalfile->pos += RRDENG_BLOCK_SIZE;
+ ctx->disk_space += RRDENG_BLOCK_SIZE;
+ ctx->commit_log.buf = NULL;
+ ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
+ ++ctx->stats.io_write_requests;
+}
+
+void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ unsigned buf_pos = 0, buf_size;
+
+ fatal_assert(size);
+ if (ctx->commit_log.buf) {
+ unsigned remaining;
+
+ buf_pos = ctx->commit_log.buf_pos;
+ buf_size = ctx->commit_log.buf_size;
+ remaining = buf_size - buf_pos;
+ if (size > remaining) {
+ /* we need a new buffer */
+ wal_flush_transaction_buffer(wc);
+ }
+ }
+ if (NULL == ctx->commit_log.buf) {
+ buf_size = ALIGN_BYTES_CEILING(size);
+ ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ memset(ctx->commit_log.buf, 0, buf_size);
+ buf_pos = ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.buf_size = buf_size;
+ }
+ ctx->commit_log.buf_pos += size;
+
+ return ctx->commit_log.buf + buf_pos;
+}
+
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ journalfile->file = (uv_file)0;
+ journalfile->pos = 0;
+ journalfile->datafile = datafile;
+}
+
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+int unlink_journal_file(struct rrdengine_journalfile *journalfile)
+{
+ struct rrdengine_datafile *datafile = journalfile->datafile;
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return ret;
+}
+
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return ret;
+}
+
+int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ journalfile->file = file;
+ ++ctx->stats.journalfile_creations;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ memset(superblock, 0, sizeof(*superblock));
+ (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ posix_memfree(superblock);
+ if (ret < 0) {
+ destroy_journal_file(journalfile, datafile);
+ return ret;
+ }
+
+ journalfile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+
+ return 0;
+}
+
+static int check_journal_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ posix_memfree(superblock);
+ return ret;
+}
+
+static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, unsigned max_size)
+{
+ static BITMAP256 page_error_map;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned i, count, payload_length, descr_size, valid_pages;
+ struct rrdeng_page_descr *descr;
+ struct extent_info *extent;
+ /* persistent structures */
+ struct rrdeng_jf_store_data *jf_metric_data;
+
+ jf_metric_data = buf;
+ count = jf_metric_data->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ if (payload_length > max_size) {
+ error("Corrupted transaction payload.");
+ return;
+ }
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ extent->offset = jf_metric_data->extent_offset;
+ extent->size = jf_metric_data->extent_size;
+ extent->datafile = journalfile->datafile;
+ extent->next = NULL;
+
+ for (i = 0, valid_pages = 0 ; i < count ; ++i) {
+ uuid_t *temp_id;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ uint8_t page_type = jf_metric_data->descr[i].type;
+
+ if (page_type > PAGE_TYPE_MAX) {
+ if (!bitmap256_get_bit(&page_error_map, page_type)) {
+ error("Unknown page type %d encountered.", page_type);
+ bitmap256_set_bit(&page_error_map, page_type, 1);
+ }
+ continue;
+ }
+ uint64_t start_time_ut = jf_metric_data->descr[i].start_time_ut;
+ uint64_t end_time_ut = jf_metric_data->descr[i].end_time_ut;
+ size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type];
+ time_t update_every_s = (entries > 1) ? ((end_time_ut - start_time_ut) / USEC_PER_SEC / (entries - 1)) : 0;
+
+ if (unlikely(start_time_ut > end_time_ut)) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut = end_time_ut;
+ continue;
+ }
+
+ if (unlikely(start_time_ut == end_time_ut && entries != 1)) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut = end_time_ut;
+ continue;
+ }
+
+ if (unlikely(!entries)) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut = end_time_ut;
+ continue;
+ }
+
+ if(entries > 1 && update_every_s == 0) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut = end_time_ut;
+ continue;
+ }
+
+ if(start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1) != end_time_ut) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut = end_time_ut;
+
+ // let this be
+ // end_time_ut = start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1);
+ }
+
+ temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ /* First time we see the UUID */
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(temp_id, ctx);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+
+ descr = pg_cache_create_descr();
+ descr->page_length = jf_metric_data->descr[i].page_length;
+ descr->start_time_ut = start_time_ut;
+ descr->end_time_ut = end_time_ut;
+ descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s);
+ descr->id = &page_index->id;
+ descr->extent = extent;
+ descr->type = page_type;
+ extent->pages[valid_pages++] = descr;
+ pg_cache_insert(ctx, page_index, descr);
+
+ if(page_index->latest_time_ut == descr->end_time_ut)
+ page_index->latest_update_every_s = descr->update_every_s;
+
+ if(descr->update_every_s == 0)
+ fatal(
+ "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
+ (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries);
+ }
+
+ extent->number_of_pages = valid_pages;
+
+ if (likely(valid_pages))
+ df_extent_insert(extent);
+ else {
+ freez(extent);
+ ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++;
+ }
+}
+
+/*
+ * Replays transaction by interpreting up to max_size bytes from buf.
+ * Sets id to the current transaction id or to 0 if unknown.
+ * Returns size of transaction record or 0 for unknown size.
+ */
+static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, uint64_t *id, unsigned max_size)
+{
+ unsigned payload_length, size_bytes;
+ int ret;
+ /* persistent structures */
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ *id = 0;
+ jf_header = buf;
+ if (STORE_PADDING == jf_header->type) {
+ debug(D_RRDENGINE, "Skipping padding.");
+ return 0;
+ }
+ if (sizeof(*jf_header) > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ *id = jf_header->id;
+ payload_length = jf_header->payload_length;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+ if (size_bytes > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ ret = crc32cmp(jf_trailer->checksum, crc);
+ debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
+ if (unlikely(ret)) {
+ error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
+ return size_bytes;
+ }
+ switch (jf_header->type) {
+ case STORE_DATA:
+ debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ break;
+ default:
+ error("Unknown transaction type. Skipping record.");
+ break;
+ }
+
+ return size_bytes;
+}
+
+
+#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
+/*
+ * Iterates journal file transactions and populates the page cache.
+ * Page cache must already be initialized.
+ * Returns the maximum transaction id it discovered.
+ */
+static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
+{
+ uv_file file;
+ uint64_t file_size;//, data_file_size;
+ int ret;
+ uint64_t pos, pos_i, max_id, id;
+ unsigned size_bytes;
+ void *buf;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ file = journalfile->file;
+ file_size = journalfile->pos;
+ //data_file_size = journalfile->datafile->pos; TODO: utilize this?
+
+ max_id = 1;
+ bool journal_is_mmapped = (journalfile->data != NULL);
+ if (unlikely(!journal_is_mmapped)) {
+ ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
+ if (unlikely(ret))
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ else
+ buf = journalfile->data + sizeof(struct rrdeng_jf_sb);
+ for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
+ size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
+ if (unlikely(!journal_is_mmapped)) {
+ iov = uv_buf_init(buf, size_bytes);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto skip_file;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+ ++ctx->stats.io_read_requests;
+ ctx->stats.io_read_bytes += size_bytes;
+ }
+
+ for (pos_i = 0 ; pos_i < size_bytes ; ) {
+ unsigned max_size;
+
+ max_size = pos + size_bytes - pos_i;
+ ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
+ if (!ret) /* TODO: support transactions bigger than 4K */
+ /* unknown transaction size, move on to the next block */
+ pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
+ else
+ pos_i += ret;
+ max_id = MAX(max_id, id);
+ }
+ if (likely(journal_is_mmapped))
+ buf += size_bytes;
+ }
+skip_file:
+ if (unlikely(!journal_is_mmapped))
+ posix_memfree(buf);
+ return max_id;
+}
+
+int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile)
+{
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd, error;
+ uint64_t file_size, max_id;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = open_file_direct_io(path, O_RDWR, &file);
+ if (fd < 0) {
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ info("Loading journal file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_FLOOR(file_size);
+
+ ret = check_journal_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
+ ++ctx->stats.io_read_requests;
+
+ journalfile->file = file;
+ journalfile->pos = file_size;
+ journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0);
+ info("Loading journal file \"%s\" using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
+
+ max_id = iterate_transactions(ctx, journalfile);
+
+ ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
+
+ info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size);
+ if (likely(journalfile->data))
+ netdata_munmap(journalfile->data, file_size);
+ return 0;
+
+ error:
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ return error;
+}
+
+void init_commit_log(struct rrdengine_instance *ctx)
+{
+ ctx->commit_log.buf = NULL;
+ ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.transaction_id = 1;
+}
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
new file mode 100644
index 0000000..011c506
--- /dev/null
+++ b/database/engine/journalfile.h
@@ -0,0 +1,49 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_JOURNALFILE_H
+#define NETDATA_JOURNALFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_instance;
+struct rrdengine_worker_config;
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+
+#define WALFILE_PREFIX "journalfile-"
+#define WALFILE_EXTENSION ".njf"
+
+
+/* only one event loop is supported for now */
+struct rrdengine_journalfile {
+ uv_file file;
+ uint64_t pos;
+ void *data;
+ struct rrdengine_datafile *datafile;
+};
+
+/* only one event loop is supported for now */
+struct transaction_commit_log {
+ uint64_t transaction_id;
+
+ /* outstanding transaction buffer */
+ void *buf;
+ unsigned buf_pos;
+ unsigned buf_size;
+};
+
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
+void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+int unlink_journal_file(struct rrdengine_journalfile *journalfile);
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile);
+void init_commit_log(struct rrdengine_instance *ctx);
+
+
+#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file
diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/database/engine/metadata_log/README.md
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
new file mode 100644
index 0000000..4f5da70
--- /dev/null
+++ b/database/engine/pagecache.c
@@ -0,0 +1,1313 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+ARAL page_descr_aral = {
+ .requested_element_size = sizeof(struct rrdeng_page_descr),
+ .initial_elements = 20000,
+ .filename = "page_descriptors",
+ .cache_dir = &netdata_configured_cache_dir,
+ .use_mmap = false,
+ .internal.initialized = false
+};
+
+void rrdeng_page_descr_aral_go_singlethreaded(void) {
+ page_descr_aral.internal.lockless = true;
+}
+void rrdeng_page_descr_aral_go_multithreaded(void) {
+ page_descr_aral.internal.lockless = false;
+}
+
+struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void) {
+ struct rrdeng_page_descr *descr;
+ descr = arrayalloc_mallocz(&page_descr_aral);
+ return descr;
+}
+
+void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr) {
+ arrayalloc_freez(&page_descr_aral, descr);
+}
+
+void rrdeng_page_descr_use_malloc(void) {
+ if(page_descr_aral.internal.initialized)
+ error("DBENGINE: cannot change ARAL allocation policy after it has been initialized.");
+ else
+ page_descr_aral.use_mmap = false;
+}
+
+void rrdeng_page_descr_use_mmap(void) {
+ if(page_descr_aral.internal.initialized)
+ error("DBENGINE: cannot change ARAL allocation policy after it has been initialized.");
+ else
+ page_descr_aral.use_mmap = true;
+}
+
+bool rrdeng_page_descr_is_mmap(void) {
+ return page_descr_aral.use_mmap;
+}
+
+/* Forward declarations */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
+
+/* always inserts into tail */
+static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if (likely(NULL != pg_cache->replaceQ.tail)) {
+ pg_cache_descr->prev = pg_cache->replaceQ.tail;
+ pg_cache->replaceQ.tail->next = pg_cache_descr;
+ }
+ if (unlikely(NULL == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = pg_cache_descr;
+ }
+ pg_cache->replaceQ.tail = pg_cache_descr;
+}
+
+static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
+
+ prev = pg_cache_descr->prev;
+ next = pg_cache_descr->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = next;
+ }
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
+ pg_cache->replaceQ.tail = prev;
+ }
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
+}
+
+void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+struct rrdeng_page_descr *pg_cache_create_descr(void)
+{
+ struct rrdeng_page_descr *descr;
+
+ descr = rrdeng_page_descr_mallocz();
+ descr->page_length = 0;
+ descr->start_time_ut = INVALID_TIME;
+ descr->end_time_ut = INVALID_TIME;
+ descr->id = NULL;
+ descr->extent = NULL;
+ descr->pg_cache_descr_state = 0;
+ descr->pg_cache_descr = NULL;
+ descr->update_every_s = 0;
+
+ return descr;
+}
+
+/* The caller must hold page descriptor lock. */
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->waiters)
+ uv_cond_broadcast(&pg_cache_descr->cond);
+}
+
+void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
+ --pg_cache_descr->waiters;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
+ */
+int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
+{
+ int ret;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
+ --pg_cache_descr->waiters;
+
+ return ret;
+}
+
+/*
+ * Returns page flags.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ unsigned long flags;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_wait_event_unsafe(descr);
+ flags = pg_cache_descr->flags;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ return flags;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ */
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * Gets a reference to the page descriptor.
+ * Returns 1 on success and 0 on failure.
+ */
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if (!pg_cache_can_get_unsafe(descr, exclusive_access))
+ return 0;
+
+ if (exclusive_access)
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
+
+ return 1;
+}
+
+/*
+ * The caller must hold the page descriptor lock.
+ * This function may block doing cleanup.
+ */
+void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ if (0 == --pg_cache_descr->refcnt) {
+ pg_cache_wake_up_waiters_unsafe(descr);
+ }
+}
+
+/*
+ * This function may block doing cleanup.
+ */
+void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_put_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+}
+
+/* The caller must hold the page cache lock */
+static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->populated_pages -= number;
+}
+
+static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_release_pages_unsafe(ctx, number);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * This function returns the maximum number of pages allowed in the page cache.
+ */
+unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
+{
+ return ctx->max_cache_pages + (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
+ * number of pages below that number.
+ */
+unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
+{
+ return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
+ * cache.
+ */
+unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
+{
+ /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
+ return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function will block until it reserves #number populated pages.
+ * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
+ */
+static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned failures = 0;
+ const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
+ unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
+ debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
+ number);
+ while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
+
+ if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
+ /* failed to evict */
+ struct completion compl;
+ struct rrdeng_cmd cmd;
+
+ ++failures;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ completion_init(&compl);
+ cmd.opcode = RRDENG_FLUSH_PAGES;
+ cmd.completion = &compl;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ /* wait for some pages to be flushed */
+ debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
+ completion_wait_for(&compl);
+ completion_destroy(&compl);
+
+ if (unlikely(failures > 1)) {
+ unsigned long slots, usecs_to_sleep;
+ /* exponential backoff */
+ slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
+ usecs_to_sleep = slots * exp_backoff_slot_usec;
+
+ if (usecs_to_sleep >= USEC_PER_SEC)
+ error("Page cache is full. Sleeping for %llu second(s).", usecs_to_sleep / USEC_PER_SEC);
+
+ (void)sleep_usec(usecs_to_sleep);
+ }
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ }
+ }
+ pg_cache->populated_pages += number;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * This function will attempt to reserve #number populated pages.
+ * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
+ * Returns 0 on failure and 1 on success.
+ */
+static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned count = 0;
+ int ret = 0;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
+ debug(D_RRDENGINE,
+ "==Page cache full. Trying to reserve %u pages.==",
+ number);
+ do {
+ if (!pg_cache_try_evict_one_page_unsafe(ctx))
+ break;
+ ++count;
+ } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
+ debug(D_RRDENGINE, "Evicted %u pages.", count);
+ }
+
+ if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
+ pg_cache->populated_pages += number;
+ ret = 1; /* success */
+ }
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ return ret;
+}
+
+/* The caller must hold the page cache and the page descriptor locks in that order */
+static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ dbengine_page_free(pg_cache_descr->page);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
+ pg_cache_release_pages_unsafe(ctx, 1);
+ ++ctx->stats.pg_cache_evictions;
+}
+
+/*
+ * The caller must hold the page cache lock.
+ * Lock order: page cache -> replaceQ -> page descriptor
+ * This function iterates all pages and tries to evict one.
+ * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
+ * or it sets it to NULL if no write-back is in progress.
+ *
+ * Returns 1 on success and 0 on failure.
+ */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long old_flags;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr = NULL;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
+ descr = pg_cache_descr->descr;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ old_flags = pg_cache_descr->flags;
+ if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
+ /* must evict */
+ pg_cache_evict_unsafe(ctx, descr);
+ pg_cache_put_unsafe(descr);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+
+ return 1;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ /* failed to evict */
+ return 0;
+}
+
+/**
+ * Deletes a page from the database.
+ * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
+ * @param ctx is the database instance.
+ * @param descr is the page descriptor.
+ * @param remove_dirty must be non-zero if the page to be deleted is dirty.
+ * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
+ * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
+ * NULL. Otherwise, metric_id is not set.
+ * @return 1 if it's safe to delete the metric, 0 otherwise.
+ */
+uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
+ uint8_t is_exclusive_holder, uuid_t *metric_id)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ int ret;
+ uint8_t can_delete_metric = 0;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ fatal_assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ uv_rwlock_wrlock(&page_index->lock);
+ ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0);
+ if (unlikely(0 == ret)) {
+ uv_rwlock_wrunlock(&page_index->lock);
+ if (unlikely(debug_flags & D_RRDENGINE)) {
+ print_page_descr(descr);
+ }
+ goto destroy;
+ }
+ --page_index->page_count;
+ if (!page_index->writers && !page_index->page_count) {
+ can_delete_metric = 1;
+ if (metric_id) {
+ memcpy(metric_id, page_index->id, sizeof(uuid_t));
+ }
+ }
+ uv_rwlock_wrunlock(&page_index->lock);
+ fatal_assert(1 == ret);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_deletions;
+ --pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!is_exclusive_holder) {
+ /* If we don't hold an exclusive page reference get one */
+ while (!pg_cache_try_get_unsafe(descr, 1)) {
+ debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ }
+ if (remove_dirty) {
+ pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
+ } else {
+ /* even a locked page could be dirty */
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_READ_PENDING)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "%s: Found page with READ PENDING, waiting for read to complete", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ pg_cache_wait_event_unsafe(descr);
+ }
+
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ /* only after locking can it be safely deleted from LRU */
+ pg_cache_replaceQ_delete(ctx, descr);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_evict_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+ }
+ pg_cache_put(ctx, descr);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+ while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
+ (void)sleep_usec(1000); /* 1 msec */
+ }
+destroy:
+ rrdeng_page_descr_freez(descr);
+ pg_cache_update_metric_times(page_index);
+
+ return can_delete_metric;
+}
+
+static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
+{
+ usec_t pg_start, pg_end;
+
+ pg_start = descr->start_time_ut;
+ pg_end = descr->end_time_ut;
+
+ return (pg_start < start_time && pg_end >= start_time) ||
+ (pg_start >= start_time && pg_start <= end_time);
+}
+
+static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
+{
+ return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut);
+}
+
+/* The caller must hold the page index lock */
+static inline struct rrdeng_page_descr *
+ find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
+{
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ return NULL;
+}
+
+/* Update metric oldest and latest timestamps efficiently when adding new values */
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
+{
+ usec_t oldest_time = page_index->oldest_time_ut;
+ usec_t latest_time = page_index->latest_time_ut;
+
+ if (unlikely(oldest_time == INVALID_TIME || descr->start_time_ut < oldest_time)) {
+ page_index->oldest_time_ut = descr->start_time_ut;
+ }
+ if (likely(descr->end_time_ut > latest_time || latest_time == INVALID_TIME)) {
+ page_index->latest_time_ut = descr->end_time_ut;
+ }
+}
+
+/* Update metric oldest and latest timestamps when removing old values */
+void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
+{
+ Pvoid_t *firstPValue, *lastPValue;
+ Word_t firstIndex, lastIndex;
+ struct rrdeng_page_descr *descr;
+ usec_t oldest_time = INVALID_TIME;
+ usec_t latest_time = INVALID_TIME;
+
+ uv_rwlock_rdlock(&page_index->lock);
+ /* Find first page in range */
+ firstIndex = (Word_t)0;
+ firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
+ if (likely(NULL != firstPValue)) {
+ descr = *firstPValue;
+ oldest_time = descr->start_time_ut;
+ }
+ lastIndex = (Word_t)-1;
+ lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
+ if (likely(NULL != lastPValue)) {
+ descr = *lastPValue;
+ latest_time = descr->end_time_ut;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (unlikely(NULL == firstPValue)) {
+ fatal_assert(NULL == lastPValue);
+ page_index->oldest_time_ut = page_index->latest_time_ut = INVALID_TIME;
+ return;
+ }
+ page_index->oldest_time_ut = oldest_time;
+ page_index->latest_time_ut = latest_time;
+}
+
+/* If index is NULL lookup by UUID (descr->id) */
+void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
+
+ if (0 != pg_cache_descr_state) {
+ /* there is page cache descriptor pre-allocated state */
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ pg_cache_reserve_pages(ctx, 1);
+ if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_insert(ctx, descr);
+ }
+ }
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ fatal_assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ } else {
+ page_index = index;
+ }
+
+ uv_rwlock_wrlock(&page_index->lock);
+ PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0);
+ *PValue = descr;
+ ++page_index->page_count;
+ pg_cache_add_new_metric_time(page_index, descr);
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_insertions;
+ ++pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return INVALID_TIME;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
+ if (NULL == descr) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ return INVALID_TIME;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ return descr->start_time_ut;
+}
+
+/**
+ * Return page information for the first page before point_in_time that satisfies the filter.
+ * @param ctx DB context
+ * @param page_index page index of a metric
+ * @param point_in_time_ut the pages that are searched must be older than this timestamp
+ * @param filter decides if the page satisfies the caller's criteria
+ * @param page_info the result of the search is set in this pointer
+ */
+void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
+ usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter,
+ struct rrdeng_page_info *page_info)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ (void)pg_cache;
+ fatal_assert(NULL != page_index);
+
+ Index = (Word_t)(point_in_time_ut / USEC_PER_SEC);
+ uv_rwlock_rdlock(&page_index->lock);
+ do {
+ PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ } while (descr != NULL && !filter(descr));
+ if (unlikely(NULL == descr)) {
+ page_info->page_length = 0;
+ page_info->start_time_ut = INVALID_TIME;
+ page_info->end_time_ut = INVALID_TIME;
+ } else {
+ page_info->page_length = descr->page_length;
+ page_info->start_time_ut = descr->start_time_ut;
+ page_info->end_time_ut = descr->end_time_ut;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+}
+
+/**
+ * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference.
+ * @param ctx DB context
+ * @param id lookup by UUID
+ * @param start_time_ut exact starting time in usec
+ * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
+ * @return the page descriptor or NULL on failure. It can fail if:
+ * 1. The page is already allocated to the page cache.
+ * 2. It did not succeed to get a reference.
+ * 3. It did not succeed to reserve a spot in the page cache.
+ */
+struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time_ut)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ Word_t Index;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) {
+ /* Failed to find page or failed to reserve a spot in the cache */
+ return NULL;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ Index = (Word_t)(start_time_ut / USEC_PER_SEC);
+ PValue = JudyLGet(page_index->JudyL_array, Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ if (NULL == PValue || 0 == descr->page_length) {
+ /* Failed to find non-empty page */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) {
+ /* Failed to get reference or page is already populated */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+
+ return descr;
+}
+
+/**
+ * Searches for pages in a time range and triggers disk I/O if necessary and possible.
+ * Does not get a reference.
+ * @param ctx DB context
+ * @param id UUID
+ * @param start_time_ut inclusive starting time in usec
+ * @param end_time_ut inclusive ending time in usec
+ * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
+ * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
+ * If page_info_arrayp is set to NULL nothing was allocated.
+ * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
+ * @return the number of pages that overlap with the time range [start_time,end_time].
+ */
+unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut,
+ struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned i, j, k, preload_count, count, page_info_array_max_size;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ Word_t Index;
+ uint8_t failed_to_reserve;
+
+ fatal_assert(NULL != ret_page_indexp);
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ *ret_page_indexp = page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ *ret_page_indexp = NULL;
+ return 0;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
+ if (NULL == descr) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ *ret_page_indexp = NULL;
+ return 0;
+ } else {
+ Index = (Word_t)(descr->start_time_ut / USEC_PER_SEC);
+ }
+ if (page_info_arrayp) {
+ page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = mallocz(page_info_array_max_size);
+ }
+
+ for (count = 0, preload_count = 0 ;
+ descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ;
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ /* Iterate all pages in range */
+
+ if (unlikely(0 == descr->page_length))
+ continue;
+ if (page_info_arrayp) {
+ if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
+ page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
+ }
+ (*page_info_arrayp)[count].start_time_ut = descr->start_time_ut;
+ (*page_info_arrayp)[count].end_time_ut = descr->end_time_ut;
+ (*page_info_arrayp)[count].page_length = descr->page_length;
+ }
+ ++count;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ if (pg_cache_can_get_unsafe(descr, 0)) {
+ if (flags & RRD_PAGE_POPULATED) {
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ continue;
+ }
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ preload_array[preload_count++] = descr;
+ if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ break;
+ }
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ failed_to_reserve = 0;
+ for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
+ struct rrdeng_cmd cmd;
+ struct rrdeng_page_descr *next;
+
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.opcode = RRDENG_READ_EXTENT;
+ cmd.read_extent.page_cache_descr[0] = descr;
+ /* don't use this page again */
+ preload_array[i] = NULL;
+ for (j = 0, k = 1 ; j < preload_count ; ++j) {
+ next = preload_array[j];
+ if (NULL == next) {
+ continue;
+ }
+ if (descr->extent == next->extent) {
+ /* same extent, consolidate */
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.read_extent.page_cache_descr[k++] = next;
+ /* don't use this page again */
+ preload_array[j] = NULL;
+ }
+ }
+ cmd.read_extent.page_count = k;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ }
+ if (failed_to_reserve) {
+ debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
+ for (i = 0 ; i < preload_count ; ++i) {
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ pg_cache_put(ctx, descr);
+ }
+ }
+ if (!preload_count) {
+ /* no such page */
+ debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
+ }
+ if (unlikely(0 == count && page_info_arrayp)) {
+ freez(*page_info_arrayp);
+ *page_info_arrayp = NULL;
+ }
+ return count;
+}
+
+/*
+ * Searches for a page and gets a reference.
+ * When point_in_time is INVALID_TIME get any page.
+ * If index is NULL lookup by UUID (id).
+ */
+struct rrdeng_page_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time_ut)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ Word_t Index;
+ uint8_t page_not_in_cache;
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return NULL;
+ }
+ } else {
+ page_index = index;
+ }
+ pg_cache_reserve_pages(ctx, 1);
+
+ page_not_in_cache = 0;
+ uv_rwlock_rdlock(&page_index->lock);
+ while (1) {
+ Index = (Word_t)(point_in_time_ut / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ if (NULL == PValue ||
+ 0 == descr->page_length ||
+ (INVALID_TIME != point_in_time_ut &&
+ !is_point_in_time_in_page(descr, point_in_time_ut))) {
+ /* non-empty page not found */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ break;
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ cmd.opcode = RRDENG_READ_PAGE;
+ cmd.read_page.page_cache_descr = descr;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* success */
+ /* Downgrade exclusive reference to allow other readers */
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ return descr;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ if (!(flags & RRD_PAGE_POPULATED))
+ page_not_in_cache = 1;
+ pg_cache_wait_event_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ /* reset scan to find again */
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (!(flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_set_hot(ctx, descr);
+ pg_cache_release_pages(ctx, 1);
+ if (page_not_in_cache)
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ else
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
+ return descr;
+}
+
+/*
+ * Searches for the first page between start_time and end_time and gets a reference.
+ * start_time and end_time are inclusive.
+ * If index is NULL lookup by UUID (id).
+ */
+struct rrdeng_page_descr *
+pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t start_time_ut, usec_t end_time_ut)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ uint8_t page_not_in_cache;
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return NULL;
+ }
+ } else {
+ page_index = index;
+ }
+ pg_cache_reserve_pages(ctx, 1);
+
+ page_not_in_cache = 0;
+ uv_rwlock_rdlock(&page_index->lock);
+ int retry_count = 0;
+ while (1) {
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
+ if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) {
+ /* non-empty page not found */
+ if (retry_count == default_rrdeng_page_fetch_retries)
+ error_report("Page cache timeout while waiting for page %p : returning FAIL", descr);
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ break;
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ cmd.opcode = RRDENG_READ_PAGE;
+ cmd.read_page.page_cache_descr = descr;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* success */
+ /* Downgrade exclusive reference to allow other readers */
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ return descr;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ if (!(flags & RRD_PAGE_POPULATED))
+ page_not_in_cache = 1;
+
+ if (pg_cache_timedwait_event_unsafe(descr, default_rrdeng_page_fetch_timeout) == UV_ETIMEDOUT) {
+ error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count);
+ ++retry_count;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ /* reset scan to find again */
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (!(flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_set_hot(ctx, descr);
+ pg_cache_release_pages(ctx, 1);
+ if (page_not_in_cache)
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ else
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
+ return descr;
+}
+
+struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx)
+{
+ struct pg_cache_page_index *page_index;
+
+ page_index = mallocz(sizeof(*page_index));
+ page_index->JudyL_array = (Pvoid_t) NULL;
+ uuid_copy(page_index->id, *id);
+ fatal_assert(0 == uv_rwlock_init(&page_index->lock));
+ page_index->oldest_time_ut = INVALID_TIME;
+ page_index->latest_time_ut = INVALID_TIME;
+ page_index->prev = NULL;
+ page_index->page_count = 0;
+ page_index->refcount = 0;
+ page_index->writers = 0;
+ page_index->ctx = ctx;
+ page_index->latest_update_every_s = default_rrd_update_every;
+
+ return page_index;
+}
+
+static void init_metrics_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ pg_cache->metrics_index.last_page_index = NULL;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
+}
+
+static void init_replaceQ(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->replaceQ.head = NULL;
+ pg_cache->replaceQ.tail = NULL;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
+}
+
+static void init_committed_page_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
+ pg_cache->committed_page_index.latest_corr_id = 0;
+ pg_cache->committed_page_index.nr_committed_pages = 0;
+}
+
+void init_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->page_descriptors = 0;
+ pg_cache->populated_pages = 0;
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
+
+ init_metrics_index(ctx);
+ init_replaceQ(ctx);
+ init_committed_page_index(ctx);
+}
+
+void free_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index, *prev_page_index;
+ Word_t Index;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ // if we are exiting, the OS will recover all memory so do not slow down the shutdown process
+ // Do the cleanup if we are compiling with NETDATA_INTERNAL_CHECKS
+ // This affects the reporting of dbengine statistics which are available in real time
+ // via the /api/v1/dbengine_stats endpoint
+#ifndef NETDATA_DBENGINE_FREE
+ if (netdata_exit)
+ return;
+#endif
+ Word_t metrics_index_bytes = 0, pages_index_bytes = 0, pages_dirty_index_bytes = 0;
+
+ /* Free committed page index */
+ pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
+ fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
+
+ for (page_index = pg_cache->metrics_index.last_page_index ;
+ page_index != NULL ;
+ page_index = prev_page_index) {
+
+ prev_page_index = page_index->prev;
+
+ /* Find first page in range */
+ Index = (Word_t) 0;
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+
+ while (descr != NULL) {
+ /* Iterate all page descriptors of this metric */
+
+ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ /* Check rrdenglocking.c */
+ pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ dbengine_page_free(pg_cache_descr->page);
+ }
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ }
+ rrdeng_page_descr_freez(descr);
+
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ }
+
+ /* Free page index */
+ pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0);
+ fatal_assert(NULL == page_index->JudyL_array);
+ freez(page_index);
+ }
+ /* Free metrics index */
+ metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
+ fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes);
+}
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
new file mode 100644
index 0000000..635b021
--- /dev/null
+++ b/database/engine/pagecache.h
@@ -0,0 +1,254 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PAGECACHE_H
+#define NETDATA_PAGECACHE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_instance;
+struct extent_info;
+struct rrdeng_page_descr;
+
+#define INVALID_TIME (0)
+#define MAX_PAGE_CACHE_FETCH_RETRIES (3)
+#define PAGE_CACHE_FETCH_WAIT_TIMEOUT (3)
+
+/* Page flags */
+#define RRD_PAGE_DIRTY (1LU << 0)
+#define RRD_PAGE_LOCKED (1LU << 1)
+#define RRD_PAGE_READ_PENDING (1LU << 2)
+#define RRD_PAGE_WRITE_PENDING (1LU << 3)
+#define RRD_PAGE_POPULATED (1LU << 4)
+
+struct page_cache_descr {
+ struct rrdeng_page_descr *descr; /* parent descriptor */
+ void *page;
+ unsigned long flags;
+ struct page_cache_descr *prev; /* LRU */
+ struct page_cache_descr *next; /* LRU */
+
+ unsigned refcnt;
+ uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */
+ uv_cond_t cond;
+ unsigned waiters;
+};
+
+/* Page cache descriptor flags, state = 0 means no descriptor */
+#define PG_CACHE_DESCR_ALLOCATED (1LU << 0)
+#define PG_CACHE_DESCR_DESTROY (1LU << 1)
+#define PG_CACHE_DESCR_LOCKED (1LU << 2)
+#define PG_CACHE_DESCR_SHIFT (3)
+#define PG_CACHE_DESCR_USERS_MASK (((unsigned long)-1) << PG_CACHE_DESCR_SHIFT)
+#define PG_CACHE_DESCR_FLAGS_MASK (((unsigned long)-1) >> (BITS_PER_ULONG - PG_CACHE_DESCR_SHIFT))
+
+/*
+ * Page cache descriptor state bits (works for both 32-bit and 64-bit architectures):
+ *
+ * 63 ... 31 ... 3 | 2 | 1 | 0|
+ * -----------------------------+------------+------------+-----------|
+ * number of descriptor users | DESTROY | LOCKED | ALLOCATED |
+ */
+struct rrdeng_page_descr {
+ uuid_t *id; /* never changes */
+ struct extent_info *extent;
+
+ /* points to ephemeral page cache descriptor if the page resides in the cache */
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Compare-And-Swap target for page cache descriptor allocation algorithm */
+ volatile unsigned long pg_cache_descr_state;
+
+ /* page information */
+ usec_t start_time_ut;
+ usec_t end_time_ut;
+ uint32_t update_every_s:24;
+ uint8_t type;
+ uint32_t page_length;
+};
+
+#define PAGE_INFO_SCRATCH_SZ (8)
+struct rrdeng_page_info {
+ uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */
+
+ usec_t start_time_ut;
+ usec_t end_time_ut;
+ uint32_t page_length;
+};
+
+/* returns 1 for success, 0 for failure */
+typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *);
+
+#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
+
+struct pg_alignment {
+ uint32_t page_length;
+ uint32_t refcount;
+};
+
+/* maps time ranges to pages */
+struct pg_cache_page_index {
+ uuid_t id;
+ /*
+ * care: JudyL_array indices are converted from useconds to seconds to fit in one word in 32-bit architectures
+ * TODO: examine if we want to support better granularity than seconds
+ */
+ Pvoid_t JudyL_array;
+ Word_t page_count;
+ unsigned short refcount;
+ unsigned short writers;
+ uv_rwlock_t lock;
+
+ /*
+ * Only one effective writer, data deletion workqueue.
+ * It's also written during the DB loading phase.
+ */
+ usec_t oldest_time_ut;
+
+ /*
+ * Only one effective writer, data collection thread.
+ * It's also written by the data deletion workqueue when data collection is disabled for this metric.
+ */
+ usec_t latest_time_ut;
+
+ struct rrdengine_instance *ctx;
+ uint32_t latest_update_every_s;
+
+ struct pg_cache_page_index *prev;
+};
+
+/* maps UUIDs to page indices */
+struct pg_cache_metrics_index {
+ uv_rwlock_t lock;
+ Pvoid_t JudyHS_array;
+ struct pg_cache_page_index *last_page_index;
+};
+
+/* gathers dirty pages to be written on disk */
+struct pg_cache_committed_page_index {
+ uv_rwlock_t lock;
+
+ Pvoid_t JudyL_array;
+
+ /*
+ * Dirty page correlation ID is a hint. Dirty pages that are correlated should have
+ * a small correlation ID difference. Dirty pages in memory should never have the
+ * same ID at the same time for correctness.
+ */
+ Word_t latest_corr_id;
+
+ unsigned nr_committed_pages;
+};
+
+/*
+ * Gathers populated pages to be evicted.
+ * Relies on page cache descriptors being there as it uses their memory.
+ */
+struct pg_cache_replaceQ {
+ uv_rwlock_t lock; /* LRU lock */
+
+ struct page_cache_descr *head; /* LRU */
+ struct page_cache_descr *tail; /* MRU */
+};
+
+struct page_cache { /* TODO: add statistics */
+ uv_rwlock_t pg_cache_rwlock; /* page cache lock */
+
+ struct pg_cache_metrics_index metrics_index;
+ struct pg_cache_committed_page_index committed_page_index;
+ struct pg_cache_replaceQ replaceQ;
+
+ unsigned page_descriptors;
+ unsigned populated_pages;
+};
+
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
+void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
+unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr);
+void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr);
+void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr);
+struct rrdeng_page_descr *pg_cache_create_descr(void);
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access);
+void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
+void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_descr *descr);
+uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+ uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id);
+usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time_ut, usec_t end_time_ut);
+void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
+ usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter,
+ struct rrdeng_page_info *page_info);
+struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time_ut);
+unsigned
+ pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut,
+ struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp);
+struct rrdeng_page_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time_ut);
+struct rrdeng_page_descr *
+ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t start_time_ut, usec_t end_time_ut);
+struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx);
+void init_page_cache(struct rrdengine_instance *ctx);
+void free_page_cache(struct rrdengine_instance *ctx);
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
+void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
+unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
+unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
+unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx);
+
+void rrdeng_page_descr_aral_go_singlethreaded(void);
+void rrdeng_page_descr_aral_go_multithreaded(void);
+void rrdeng_page_descr_use_malloc(void);
+void rrdeng_page_descr_use_mmap(void);
+bool rrdeng_page_descr_is_mmap(void);
+struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void);
+void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr);
+
+static inline void
+ pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_time_ut_p, uint32_t *page_lengthp)
+{
+ usec_t end_time_ut, old_end_time_ut;
+ uint32_t page_length;
+
+ if (NULL == descr->extent) {
+ /* this page is currently being modified, get consistent info locklessly */
+ do {
+ end_time_ut = descr->end_time_ut;
+ __sync_synchronize();
+ old_end_time_ut = end_time_ut;
+ page_length = descr->page_length;
+ __sync_synchronize();
+ end_time_ut = descr->end_time_ut;
+ __sync_synchronize();
+ } while ((end_time_ut != old_end_time_ut || (end_time_ut & 1) != 0));
+
+ *end_time_ut_p = end_time_ut;
+ *page_lengthp = page_length;
+ } else {
+ *end_time_ut_p = descr->end_time_ut;
+ *page_lengthp = descr->page_length;
+ }
+}
+
+/* The caller must hold a reference to the page and must have already set the new data */
+static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time_ut, uint32_t page_length)
+{
+ fatal_assert(!(end_time_ut & 1));
+ __sync_synchronize();
+ descr->end_time_ut |= 1; /* mark start of uncertainty period by adding 1 microsecond */
+ __sync_synchronize();
+ descr->page_length = page_length;
+ __sync_synchronize();
+ descr->end_time_ut = end_time_ut; /* mark end of uncertainty period */
+}
+
+#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h
new file mode 100644
index 0000000..5b4be94
--- /dev/null
+++ b/database/engine/rrddiskprotocol.h
@@ -0,0 +1,120 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDDISKPROTOCOL_H
+#define NETDATA_RRDDISKPROTOCOL_H
+
+#define RRDENG_BLOCK_SIZE (4096)
+#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE
+
+#define RRDENG_MAGIC_SZ (32)
+#define RRDENG_DF_MAGIC "netdata-data-file"
+#define RRDENG_JF_MAGIC "netdata-journal-file"
+
+#define RRDENG_VER_SZ (16)
+#define RRDENG_DF_VER "1.0"
+#define RRDENG_JF_VER "1.0"
+
+#define UUID_SZ (16)
+#define CHECKSUM_SZ (4) /* CRC32 */
+
+#define RRD_NO_COMPRESSION (0)
+#define RRD_LZ4 (1)
+
+#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t)))
+/*
+ * Data file persistent super-block
+ */
+struct rrdeng_df_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t tier;
+ uint8_t padding[RRDENG_DF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Page types
+ */
+#define PAGE_METRICS (0)
+#define PAGE_TIER (1)
+#define PAGE_TYPE_MAX 1 // Maximum page type (inclusive)
+
+/*
+ * Data file page descriptor
+ */
+struct rrdeng_extent_page_descr {
+ uint8_t type;
+
+ uint8_t uuid[UUID_SZ];
+ uint32_t page_length;
+ uint64_t start_time_ut;
+ uint64_t end_time_ut;
+} __attribute__ ((packed));
+
+/*
+ * Data file extent header
+ */
+struct rrdeng_df_extent_header {
+ uint32_t payload_length;
+ uint8_t compression_algorithm;
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+/*
+ * Data file extent trailer
+ */
+struct rrdeng_df_extent_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+#define RRDENG_JF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ))
+/*
+ * Journal file super-block
+ */
+struct rrdeng_jf_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t padding[RRDENG_JF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Transaction record types
+ */
+#define STORE_PADDING (0)
+#define STORE_DATA (1)
+#define STORE_LOGS (2) /* reserved */
+
+/*
+ * Journal file transaction record header
+ */
+struct rrdeng_jf_transaction_header {
+ /* when set to STORE_PADDING jump to start of next block */
+ uint8_t type;
+
+ uint32_t reserved; /* reserved for future use */
+ uint64_t id;
+ uint16_t payload_length;
+} __attribute__ ((packed));
+
+/*
+ * Journal file transaction record trailer
+ */
+struct rrdeng_jf_transaction_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+/*
+ * Journal file STORE_DATA action
+ */
+struct rrdeng_jf_store_data {
+ /* data file extent information */
+ uint64_t extent_offset;
+ uint32_t extent_size;
+
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+#endif /* NETDATA_RRDDISKPROTOCOL_H */ \ No newline at end of file
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
new file mode 100644
index 0000000..a6840f3
--- /dev/null
+++ b/database/engine/rrdengine.c
@@ -0,0 +1,1508 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+rrdeng_stats_t global_io_errors = 0;
+rrdeng_stats_t global_fs_errors = 0;
+rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
+rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
+rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
+
+unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)
+#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
+#endif
+
+void *dbengine_page_alloc() {
+ void *page = NULL;
+ if (unlikely(db_engine_use_malloc))
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ else {
+ page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm);
+ if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()");
+ }
+ return page;
+}
+
+void dbengine_page_free(void *page) {
+ if (unlikely(db_engine_use_malloc))
+ freez(page);
+ else
+ netdata_munmap(page, RRDENG_BLOCK_SIZE);
+}
+
+static void sanity_check(void)
+{
+ BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2));
+
+ /* Magic numbers must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
+
+ /* Version strings must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ);
+
+ /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
+ BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
+
+ BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */
+
+ /* page count must fit in 8 bits */
+ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
+
+ /* extent cache count must fit in 32 bits */
+ BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
+
+ /* page info scratch space must be able to hold 2 32-bit integers */
+ BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
+}
+
+/* always inserts into tail */
+static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+
+ xt_cache_elem->prev = NULL;
+ xt_cache_elem->next = NULL;
+
+ if (likely(NULL != xt_cache->replaceQ_tail)) {
+ xt_cache_elem->prev = xt_cache->replaceQ_tail;
+ xt_cache->replaceQ_tail->next = xt_cache_elem;
+ }
+ if (unlikely(NULL == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = xt_cache_elem;
+ }
+ xt_cache->replaceQ_tail = xt_cache_elem;
+}
+
+static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *prev, *next;
+
+ prev = xt_cache_elem->prev;
+ next = xt_cache_elem->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = next;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) {
+ xt_cache->replaceQ_tail = prev;
+ }
+ xt_cache_elem->prev = xt_cache_elem->next = NULL;
+}
+
+static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+}
+
+/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */
+static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned idx;
+ int ret;
+
+ ret = find_first_zero(xt_cache->allocation_bitmap);
+ if (-1 == ret || ret >= MAX_CACHED_EXTENTS) {
+ for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) {
+ idx = xt_cache_elem - xt_cache->extent_array;
+ if (!check_bit(xt_cache->inflight_bitmap, idx)) {
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ break;
+ }
+ }
+ if (NULL == xt_cache_elem)
+ return -1;
+ } else {
+ idx = (unsigned)ret;
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ }
+ xt_cache_elem->extent = extent;
+ xt_cache_elem->fileno = extent->datafile->fileno;
+ xt_cache_elem->inflight_io_descr = NULL;
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+ modify_bit(&xt_cache->allocation_bitmap, idx, 1);
+
+ return (int)idx;
+}
+
+/**
+ * Returns 0 if the cached extent was found in the extent cache, 1 otherwise.
+ * Sets *idx to point to the position of the extent inside the cache.
+ **/
+static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned i;
+
+ for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) {
+ xt_cache_elem = &xt_cache->extent_array[i];
+ if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent &&
+ xt_cache_elem->fileno == extent->datafile->fileno) {
+ *idx = i;
+ return 0;
+ }
+ }
+ return 1;
+}
+
+#if 0 /* disabled code */
+static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_elem->extent = NULL;
+ modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */
+ modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */
+}
+#endif
+
+void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx,
+ struct extent_io_descriptor *xt_io_descr)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ struct extent_io_descriptor *old_next;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ old_next = xt_cache_elem->inflight_io_descr->next;
+ xt_cache_elem->inflight_io_descr->next = xt_io_descr;
+ xt_io_descr->next = old_next;
+}
+
+void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr)
+{
+ unsigned i, j, page_offset;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ void *page;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
+ page = dbengine_page_alloc();
+ descr = xt_io_descr->descr_array[i];
+ for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) {
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*extent->pages[j]->id, *descr->id) &&
+ extent->pages[j]->page_length == descr->page_length &&
+ extent->pages[j]->start_time_ut == descr->start_time_ut &&
+ extent->pages[j]->end_time_ut == descr->end_time_ut) {
+ break;
+ }
+ page_offset += extent->pages[j]->page_length;
+
+ }
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
+ if (xt_io_descr->release_descr) {
+ pg_cache_put(ctx, descr);
+ } else {
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
+ }
+ }
+ if (xt_io_descr->completion)
+ completion_mark_complete(xt_io_descr->completion);
+ freez(xt_io_descr);
+}
+
+static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) {
+ switch(type) {
+ case PAGE_METRICS: {
+ storage_number n = pack_storage_number(NAN, SN_FLAG_NONE);
+ storage_number *array = (storage_number *)page;
+ size_t slots = page_length / sizeof(n);
+ for(size_t i = 0; i < slots ; i++)
+ array[i] = n;
+ }
+ break;
+
+ case PAGE_TIER: {
+ storage_number_tier1_t n = {
+ .min_value = NAN,
+ .max_value = NAN,
+ .sum_value = NAN,
+ .count = 1,
+ .anomaly_count = 0,
+ };
+ storage_number_tier1_t *array = (storage_number_tier1_t *)page;
+ size_t slots = page_length / sizeof(n);
+ for(size_t i = 0; i < slots ; i++)
+ array[i] = n;
+ }
+ break;
+
+ default: {
+ static bool logged = false;
+ if(!logged) {
+ error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type);
+ logged = true;
+ }
+ memset(page, 0, page_length);
+ }
+ }
+}
+
+struct rrdeng_page_descr *get_descriptor(struct pg_cache_page_index *page_index, time_t start_time_s)
+{
+ uv_rwlock_rdlock(&page_index->lock);
+ Pvoid_t *PValue = JudyLGet(page_index->JudyL_array, start_time_s, PJE0);
+ struct rrdeng_page_descr *descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ uv_rwlock_rdunlock(&page_index->lock);
+ return descr;
+};
+
+static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ int ret;
+ unsigned i, j, count;
+ void *page, *uncompressed_buf = NULL;
+ uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0;
+ uint8_t have_read_error = 0;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ header = xt_io_descr->buf;
+ payload_length = header->payload_length;
+ count = header->number_of_pages;
+ payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
+ trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
+
+ if (unlikely(read_failed)) {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ have_read_error = 1;
+ error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos,
+ xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ goto after_crc_check;
+ }
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
+ ret = crc32cmp(trailer->checksum, crc);
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+ }
+#endif
+ if (unlikely(ret)) {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ have_read_error = 1;
+ error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ }
+
+after_crc_check:
+ if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
+ uncompressed_payload_length = 0;
+ for (i = 0 ; i < count ; ++i) {
+ uncompressed_payload_length += header->descr[i].page_length;
+ }
+ uncompressed_buf = mallocz(uncompressed_payload_length);
+ ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf,
+ payload_length, uncompressed_payload_length);
+ ctx->stats.before_decompress_bytes += payload_length;
+ ctx->stats.after_decompress_bytes += ret;
+ debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
+ /* care, we don't hold the descriptor mutex */
+ }
+ {
+ uint8_t xt_is_cached = 0;
+ unsigned xt_idx;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) {
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx];
+ struct extent_io_descriptor *curr, *next;
+
+ if (have_read_error) {
+ memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages));
+ } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length);
+ } else {
+ (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length);
+ }
+ /* complete all connected in-flight read requests */
+ for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) {
+ next = curr->next;
+ read_cached_extent_cb(wc, xt_idx, curr);
+ }
+ xt_cache_elem->inflight_io_descr = NULL;
+ modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */
+ }
+ }
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, xt_io_descr->descr_array[0]->id, sizeof(uuid_t));
+ struct pg_cache_page_index *page_index = likely( NULL != PValue) ? *PValue : NULL;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+
+ for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) {
+ uint8_t is_prefetched_page;
+ descr = NULL;
+ for (j = 0 ; j < xt_io_descr->descr_count; ++j) {
+ struct rrdeng_page_descr descrj;
+
+ descrj = xt_io_descr->descr_read_array[j];
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj.id) &&
+ header->descr[i].page_length == descrj.page_length &&
+ header->descr[i].start_time_ut == descrj.start_time_ut &&
+ header->descr[i].end_time_ut == descrj.end_time_ut) {
+ //descr = descrj;
+ descr = get_descriptor(page_index, (time_t) (descrj.start_time_ut / USEC_PER_SEC));
+ if (unlikely(!descr)) {
+ error_limit_static_thread_var(erl, 1, 0);
+ error_limit(&erl, "%s: Required descriptor is not in the page index anymore", __FUNCTION__);
+ }
+ break;
+ }
+ }
+ is_prefetched_page = 0;
+ if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */
+ descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid,
+ header->descr[i].start_time_ut);
+ if (!descr)
+ continue; /* Failed to reserve a suitable page */
+ is_prefetched_page = 1;
+ }
+ page = dbengine_page_alloc();
+
+ /* care, we don't hold the descriptor mutex */
+ if (have_read_error) {
+ fill_page_with_nulls(page, descr->page_length, descr->type);
+ } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
+ } else {
+ (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
+ }
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
+ if (xt_io_descr->release_descr || is_prefetched_page) {
+ pg_cache_put(ctx, descr);
+ } else {
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
+ }
+ }
+ if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
+ freez(uncompressed_buf);
+ }
+ if (xt_io_descr->completion)
+ completion_mark_complete(xt_io_descr->completion);
+}
+
+static void read_extent_cb(uv_fs_t *req)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct extent_io_descriptor *xt_io_descr;
+
+ xt_io_descr = req->data;
+ do_extent_processing(wc, xt_io_descr, req->result < 0);
+ uv_fs_req_cleanup(req);
+ posix_memfree(xt_io_descr->buf);
+ freez(xt_io_descr);
+}
+
+static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct extent_io_descriptor *xt_io_descr;
+ xt_io_descr = req->data;
+
+ if (likely(xt_io_descr->map_base)) {
+ do_extent_processing(wc, xt_io_descr, false);
+ munmap(xt_io_descr->map_base, xt_io_descr->map_length);
+ freez(xt_io_descr);
+ return;
+ }
+
+ // MMAP failed, so do uv_fs_read
+ int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ xt_io_descr->req.data = xt_io_descr;
+ ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb);
+ fatal_assert(-1 != ret);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+}
+
+static void do_mmap_read_extent(uv_work_t *req)
+{
+ struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data;
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos);
+ size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start;
+ unsigned real_io_size = xt_io_descr->bytes;
+
+ void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start);
+ if (likely(data != MAP_FAILED)) {
+ xt_io_descr->map_base = data;
+ xt_io_descr->map_length = length;
+ xt_io_descr->buf = data + (xt_io_descr->pos - map_start);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+ }
+}
+
+static void do_read_extent(struct rrdengine_worker_config* wc,
+ struct rrdeng_page_descr **descr,
+ unsigned count,
+ uint8_t release_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache_descr *pg_cache_descr;
+ int ret;
+ unsigned i, size_bytes, pos;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *extent = descr[0]->extent;
+ uint8_t xt_is_cached = 0, xt_is_inflight = 0;
+ unsigned xt_idx;
+
+ datafile = extent->datafile;
+ pos = extent->offset;
+ size_bytes = extent->size;
+
+ xt_io_descr = callocz(1, sizeof(*xt_io_descr));
+ for (i = 0 ; i < count; ++i) {
+ rrdeng_page_descr_mutex_lock(ctx, descr[i]);
+ pg_cache_descr = descr[i]->pg_cache_descr;
+ pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
+ xt_io_descr->descr_array[i] = descr[i];
+ xt_io_descr->descr_read_array[i] = *(descr[i]);
+ }
+ xt_io_descr->descr_count = count;
+ xt_io_descr->file = datafile->file;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = pos;
+ xt_io_descr->req_worker.data = xt_io_descr;
+ xt_io_descr->completion = NULL;
+ xt_io_descr->release_descr = release_descr;
+ xt_io_descr->buf = NULL;
+
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached) {
+ xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]);
+ xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx);
+ if (xt_is_inflight) {
+ enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr);
+ return;
+ }
+ return read_cached_extent_cb(wc, xt_idx, xt_io_descr);
+ } else {
+ ret = try_insert_into_xt_cache(wc, extent);
+ if (-1 != ret) {
+ xt_idx = (unsigned)ret;
+ modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1);
+ wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr;
+ }
+ }
+
+ ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb);
+ fatal_assert(-1 != ret);
+
+ ++ctx->stats.io_read_requests;
+ ++ctx->stats.io_read_extents;
+ ctx->stats.pg_cache_backfills += count;
+}
+
+static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ unsigned count, payload_length, descr_size, size_bytes;
+ void *buf;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *df_header;
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_store_data *jf_metric_data;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ df_header = xt_io_descr->buf;
+ count = df_header->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+
+ buf = wal_get_transaction_buffer(wc, size_bytes);
+
+ jf_header = buf;
+ jf_header->type = STORE_DATA;
+ jf_header->reserved = 0;
+ jf_header->id = ctx->commit_log.transaction_id++;
+ jf_header->payload_length = payload_length;
+
+ jf_metric_data = buf + sizeof(*jf_header);
+ jf_metric_data->extent_offset = xt_io_descr->pos;
+ jf_metric_data->extent_size = xt_io_descr->bytes;
+ jf_metric_data->number_of_pages = count;
+ memcpy(jf_metric_data->descr, df_header->descr, descr_size);
+
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ crc32set(jf_trailer->checksum, crc);
+}
+
+static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data)
+{
+ switch (type) {
+ case STORE_DATA:
+ commit_data_extent(wc, (struct extent_io_descriptor *)data);
+ break;
+ default:
+ fatal_assert(type == STORE_DATA);
+ break;
+ }
+}
+
+static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ int error;
+
+ error = uv_thread_join(wc->now_invalidating_dirty_pages);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+}
+
+static void invalidate_oldest_committed(void *arg)
+{
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config *wc = &ctx->worker_config;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ Pvoid_t *PValue;
+ Word_t Index;
+ unsigned nr_committed_pages;
+
+ do {
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ for (Index = 0,
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+
+ descr != NULL;
+
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ fatal_assert(0 != descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) {
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
+ fatal_assert(1 == ret);
+ break;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (!descr) {
+ info("Failed to invalidate any dirty pages to relieve page cache pressure.");
+
+ goto out;
+ }
+ pg_cache_punch_hole(ctx, descr, 1, 1, NULL);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1);
+ rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1);
+
+ } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx));
+out:
+ wc->cleanup_thread_invalidating_dirty_pages = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned nr_committed_pages;
+ int error;
+
+ if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */
+ return;
+
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* delete the oldest page in memory */
+ if (wc->now_invalidating_dirty_pages) {
+ /* already deleting a page */
+ return;
+ }
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+
+ wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages));
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+
+ error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ }
+ }
+}
+
+void flush_pages_cb(uv_fs_t* req)
+{
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ unsigned i, count;
+
+ xt_io_descr = req->data;
+ if (req->result < 0) {
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
+ __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ }
+#endif
+ count = xt_io_descr->descr_count;
+ for (i = 0 ; i < count ; ++i) {
+ /* care, we don't hold the descriptor mutex */
+ descr = xt_io_descr->descr_array[i];
+
+ pg_cache_replaceQ_insert(ctx, descr);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
+ /* wake up waiters, care no reference being held */
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ if (xt_io_descr->completion)
+ completion_mark_complete(xt_io_descr->completion);
+ uv_fs_req_cleanup(req);
+ posix_memfree(xt_io_descr->buf);
+ freez(xt_io_descr);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ pg_cache->committed_page_index.nr_committed_pages -= count;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ wc->inflight_dirty_pages -= count;
+}
+
+/*
+ * completion must be NULL or valid.
+ * Returns 0 when no flushing can take place.
+ * Returns datafile bytes to be written on successful flushing initiation.
+ */
+static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ int compressed_size, max_compressed_size = 0;
+ unsigned i, count, size_bytes, pos, real_io_size;
+ uint32_t uncompressed_payload_length, payload_offset;
+ struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct page_cache_descr *pg_cache_descr;
+ struct extent_io_descriptor *xt_io_descr;
+ void *compressed_buf = NULL;
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ Pvoid_t *PValue;
+ Word_t Index;
+ uint8_t compression_algorithm = ctx->global_compress_alg;
+ struct extent_info *extent;
+ struct rrdengine_datafile *datafile;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ if (force) {
+ debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
+ }
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ for (Index = 0, count = 0, uncompressed_payload_length = 0,
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue ;
+
+ descr != NULL && count != rrdeng_pages_per_extent;
+
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ uint8_t page_write_pending;
+
+ fatal_assert(0 != descr->page_length);
+ page_write_pending = 0;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) {
+ page_write_pending = 1;
+ /* care, no reference being held */
+ pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING;
+ uncompressed_payload_length += descr->page_length;
+ descr_commit_idx_array[count] = Index;
+ eligible_pages[count++] = descr;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ if (page_write_pending) {
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
+ fatal_assert(1 == ret);
+ }
+ }
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (!count) {
+ debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
+ if (completion)
+ completion_mark_complete(completion);
+ return 0;
+ }
+ wc->inflight_dirty_pages += count;
+
+ xt_io_descr = mallocz(sizeof(*xt_io_descr));
+ payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
+ break;
+ default: /* Compress */
+ fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
+ max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
+ compressed_buf = mallocz(max_compressed_size);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
+ break;
+ }
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* freez(xt_io_descr);*/
+ }
+ memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes));
+ (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
+ xt_io_descr->descr_count = count;
+
+ pos = 0;
+ header = xt_io_descr->buf;
+ header->compression_algorithm = compression_algorithm;
+ header->number_of_pages = count;
+ pos += sizeof(*header);
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */
+ extent->offset = datafile->pos;
+ extent->number_of_pages = count;
+ extent->datafile = datafile;
+ extent->next = NULL;
+
+ for (i = 0 ; i < count ; ++i) {
+ /* This is here for performance reasons */
+ xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i];
+
+ descr = xt_io_descr->descr_array[i];
+ header->descr[i].type = descr->type;
+ uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
+ header->descr[i].page_length = descr->page_length;
+ header->descr[i].start_time_ut = descr->start_time_ut;
+ header->descr[i].end_time_ut = descr->end_time_ut;
+ pos += sizeof(header->descr[i]);
+ }
+ for (i = 0 ; i < count ; ++i) {
+ descr = xt_io_descr->descr_array[i];
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length);
+ descr->extent = extent;
+ extent->pages[i] = descr;
+
+ pos += descr->page_length;
+ }
+ df_extent_insert(extent);
+
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ header->payload_length = uncompressed_payload_length;
+ break;
+ default: /* Compress */
+ compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf,
+ uncompressed_payload_length, max_compressed_size);
+ ctx->stats.before_compress_bytes += uncompressed_payload_length;
+ ctx->stats.after_compress_bytes += compressed_size;
+ debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
+ (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
+ freez(compressed_buf);
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ header->payload_length = compressed_size;
+ break;
+ }
+ extent->size = size_bytes;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = datafile->pos;
+ xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->completion = completion;
+
+ trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer));
+ crc32set(trailer->checksum, crc);
+
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb);
+ fatal_assert(-1 != ret);
+ ctx->stats.io_write_bytes += real_io_size;
+ ++ctx->stats.io_write_requests;
+ ctx->stats.io_write_extent_bytes += real_io_size;
+ ++ctx->stats.io_write_extents;
+ do_commit_transaction(wc, STORE_DATA, xt_io_descr);
+ datafile->pos += ALIGN_BYTES_CEILING(size_bytes);
+ ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes);
+ rrdeng_test_quota(wc);
+
+ return ALIGN_BYTES_CEILING(size_bytes);
+}
+
+static void after_delete_old_data(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
+ int ret, error;
+ char path[RRDENG_PATH_MAX];
+
+ datafile = ctx->datafiles.first;
+ journalfile = datafile->journalfile;
+ datafile_bytes = datafile->pos;
+ journalfile_bytes = journalfile->pos;
+ deleted_bytes = 0;
+
+ info("Deleting data and journal file pair.");
+ datafile_list_delete(ctx, datafile);
+ ret = destroy_journal_file(journalfile, datafile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ deleted_bytes += journalfile_bytes;
+ }
+ ret = destroy_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ deleted_bytes += datafile_bytes;
+ }
+ freez(journalfile);
+ freez(datafile);
+
+ ctx->disk_space -= deleted_bytes;
+ info("Reclaimed %u bytes of disk space.", deleted_bytes);
+
+ error = uv_thread_join(wc->now_deleting_files);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_deleting_files);
+ /* unfreeze command processing */
+ wc->now_deleting_files = NULL;
+
+ wc->cleanup_thread_deleting_files = 0;
+ rrdcontext_db_rotation();
+
+ /* interrupt event loop */
+ uv_stop(wc->loop);
+}
+
+static void delete_old_data(void *arg)
+{
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config* wc = &ctx->worker_config;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *extent, *next;
+ struct rrdeng_page_descr *descr;
+ unsigned count, i;
+ uint8_t can_delete_metric;
+ uuid_t metric_id;
+
+ /* Safe to use since it will be deleted after we are done */
+ datafile = ctx->datafiles.first;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next) {
+ count = extent->number_of_pages;
+ for (i = 0 ; i < count ; ++i) {
+ descr = extent->pages[i];
+ can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id);
+ if (unlikely(can_delete_metric)) {
+ /*
+ * If the metric is empty, has no active writers and if the metadata log has been initialized then
+ * attempt to delete the corresponding netdata dimension.
+ */
+ metaqueue_delete_dimension_uuid(&metric_id);
+ }
+ }
+ next = extent->next;
+ freez(extent);
+ }
+ wc->cleanup_thread_deleting_files = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+void rrdeng_test_quota(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdengine_datafile *datafile;
+ unsigned current_size, target_size;
+ uint8_t out_of_space, only_one_datafile;
+ int ret, error;
+
+ out_of_space = 0;
+ /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */
+ if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) {
+ out_of_space = 1;
+ }
+ datafile = ctx->datafiles.last;
+ current_size = datafile->pos;
+ target_size = ctx->max_disk_space / TARGET_DATAFILES;
+ target_size = MIN(target_size, MAX_DATAFILE_SIZE);
+ target_size = MAX(target_size, MIN_DATAFILE_SIZE);
+ only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0;
+ if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
+ /* Finalize data and journal file and create a new pair */
+ wal_flush_transaction_buffer(wc);
+ ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1);
+ if (likely(!ret)) {
+ ++ctx->last_fileno;
+ }
+ }
+ if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) {
+ /* delete old data */
+ if (wc->now_deleting_files) {
+ /* already deleting data */
+ return;
+ }
+ if (NULL == ctx->datafiles.first->next) {
+ error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\""
+ " to reclaim space, there are no other file pairs left.",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ return;
+ }
+ info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files));
+ wc->cleanup_thread_deleting_files = 0;
+
+ error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_deleting_files);
+ wc->now_deleting_files = NULL;
+ }
+ }
+}
+
+static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc)
+{
+ if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) {
+ return 1;
+ }
+ return 0;
+}
+
+static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) {
+ after_invalidate_oldest_committed(wc);
+ }
+ if (unlikely(wc->cleanup_thread_deleting_files)) {
+ after_delete_old_data(wc);
+ }
+ if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) {
+ ctx->quiesce = QUIESCED;
+ completion_mark_complete(&ctx->rrdengine_completion);
+ }
+}
+
+/* return 0 on success */
+int init_rrd_files(struct rrdengine_instance *ctx)
+{
+ int ret = init_data_files(ctx);
+
+ BUFFER *wb = buffer_create(1000);
+ size_t all_errors = 0;
+ usec_t now = now_realtime_usec();
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) {
+ buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) {
+ buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) {
+ buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter;
+ }
+
+ if(all_errors)
+ info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb));
+
+ buffer_free(wb);
+ return ret;
+}
+
+void finalize_rrd_files(struct rrdengine_instance *ctx)
+{
+ return finalize_data_files(ctx);
+}
+
+void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
+{
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ wc->queue_size = 0;
+ fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
+ fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
+}
+
+void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
+{
+ unsigned queue_size;
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&wc->cmd_mutex);
+ while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) {
+ uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
+ }
+ fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
+ wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.tail + 1 : 0;
+ wc->queue_size = queue_size + 1;
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
+{
+ struct rrdeng_cmd ret;
+ unsigned queue_size;
+
+ uv_mutex_lock(&wc->cmd_mutex);
+ queue_size = wc->queue_size;
+ if (queue_size == 0) {
+ ret.opcode = RRDENG_NOOP;
+ } else {
+ /* dequeue command */
+ ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
+ if (queue_size == 1) {
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ } else {
+ wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.head + 1 : 0;
+ }
+ wc->queue_size = queue_size - 1;
+
+ /* wake up producers */
+ uv_cond_signal(&wc->cmd_cond);
+ }
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ return ret;
+}
+
+static void load_configuration_dynamic(void)
+{
+ unsigned read_num = (unsigned)config_get_number(CONFIG_SECTION_DB, "dbengine pages per extent", MAX_PAGES_PER_EXTENT);
+ if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT)
+ rrdeng_pages_per_extent = read_num;
+ else {
+ error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent);
+ config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent);
+ }
+}
+
+void async_cb(uv_async_t *handle)
+{
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
+}
+
+/* Flushes dirty pages when timer expires */
+#define TIMER_PERIOD_MS (1000)
+
+void timer_cb(uv_timer_t* handle)
+{
+ worker_is_busy(RRDENG_MAX_OPCODE + 1);
+
+ struct rrdengine_worker_config* wc = handle->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ rrdeng_test_quota(wc);
+ debug(D_RRDENGINE, "%s: timeout reached.", __func__);
+ if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
+ /* There is free space so we can write to disk and we are not actively deleting dirty buffers */
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
+ high_watermark;
+
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+ producers = ctx->metric_API_max_producers;
+ /* are flushable pages more than 25% of the maximum page cache size */
+ high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
+ low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
+
+ /* Flush more pages only if disk can keep up */
+ if (wc->inflight_dirty_pages < high_watermark + producers) {
+ if (nr_committed_pages > producers &&
+ /* committed to be written pages are more than the produced number */
+ nr_committed_pages - producers > high_watermark) {
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
+ bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
+ }
+ bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+
+ debug(D_RRDENGINE, "Flushing pages to disk.");
+ for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL);
+ bytes_written && (total_bytes < bytes_to_write);
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 0, NULL);
+ }
+ }
+ }
+ load_configuration_dynamic();
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ char buf[4096];
+ debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
+ }
+#endif
+
+ worker_is_idle();
+}
+
+#define MAX_CMD_BATCH_SIZE (256)
+
+void rrdeng_worker(void* arg)
+{
+ worker_register("DBENGINE");
+ worker_register_job_name(RRDENG_NOOP, "noop");
+ worker_register_job_name(RRDENG_READ_PAGE, "page read");
+ worker_register_job_name(RRDENG_READ_EXTENT, "extent read");
+ worker_register_job_name(RRDENG_COMMIT_PAGE, "commit");
+ worker_register_job_name(RRDENG_FLUSH_PAGES, "flush");
+ worker_register_job_name(RRDENG_SHUTDOWN, "shutdown");
+ worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru");
+ worker_register_job_name(RRDENG_QUIESCE, "quiesce");
+ worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup");
+ worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer");
+
+ struct rrdengine_worker_config* wc = arg;
+ struct rrdengine_instance *ctx = wc->ctx;
+ uv_loop_t* loop;
+ int shutdown, ret;
+ enum rrdeng_opcode opcode;
+ uv_timer_t timer_req;
+ struct rrdeng_cmd cmd;
+ unsigned cmd_batch_size;
+
+ rrdeng_init_cmd_queue(wc);
+
+ loop = wc->loop = mallocz(sizeof(uv_loop_t));
+ ret = uv_loop_init(loop);
+ if (ret) {
+ error("uv_loop_init(): %s", uv_strerror(ret));
+ goto error_after_loop_init;
+ }
+ loop->data = wc;
+
+ ret = uv_async_init(wc->loop, &wc->async, async_cb);
+ if (ret) {
+ error("uv_async_init(): %s", uv_strerror(ret));
+ goto error_after_async_init;
+ }
+ wc->async.data = wc;
+
+ wc->now_deleting_files = NULL;
+ wc->cleanup_thread_deleting_files = 0;
+
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+ wc->inflight_dirty_pages = 0;
+
+ /* dirty page flushing timer */
+ ret = uv_timer_init(loop, &timer_req);
+ if (ret) {
+ error("uv_timer_init(): %s", uv_strerror(ret));
+ goto error_after_timer_init;
+ }
+ timer_req.data = wc;
+
+ wc->error = 0;
+ /* wake up initialization thread */
+ completion_mark_complete(&ctx->rrdengine_completion);
+
+ fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
+ shutdown = 0;
+ int set_name = 0;
+ while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
+ worker_is_idle();
+ uv_run(loop, UV_RUN_DEFAULT);
+ worker_is_busy(RRDENG_MAX_OPCODE);
+ rrdeng_cleanup_finished_threads(wc);
+
+ /* wait for commands */
+ cmd_batch_size = 0;
+ do {
+ /*
+ * Avoid starving the loop when there are too many commands coming in.
+ * timer_cb will interrupt the loop again to allow serving more commands.
+ */
+ if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
+ break;
+
+ cmd = rrdeng_deq_cmd(wc);
+ opcode = cmd.opcode;
+ ++cmd_batch_size;
+
+ if(likely(opcode != RRDENG_NOOP))
+ worker_is_busy(opcode);
+
+ switch (opcode) {
+ case RRDENG_NOOP:
+ /* the command queue was empty, do nothing */
+ break;
+ case RRDENG_SHUTDOWN:
+ shutdown = 1;
+ break;
+ case RRDENG_QUIESCE:
+ ctx->drop_metrics_under_page_cache_pressure = 0;
+ ctx->quiesce = SET_QUIESCE;
+ fatal_assert(0 == uv_timer_stop(&timer_req));
+ uv_close((uv_handle_t *)&timer_req, NULL);
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all committed pages. */
+ }
+ wal_flush_transaction_buffer(wc);
+ if (!rrdeng_threads_alive(wc)) {
+ ctx->quiesce = QUIESCED;
+ completion_mark_complete(&ctx->rrdengine_completion);
+ }
+ break;
+ case RRDENG_READ_PAGE:
+ do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
+ break;
+ case RRDENG_READ_EXTENT:
+ do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1);
+ if (unlikely(!set_name)) {
+ set_name = 1;
+ uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
+ }
+ break;
+ case RRDENG_COMMIT_PAGE:
+ do_commit_transaction(wc, STORE_DATA, NULL);
+ break;
+ case RRDENG_FLUSH_PAGES: {
+ if (wc->now_invalidating_dirty_pages) {
+ /* Do not flush if the disk cannot keep up */
+ completion_mark_complete(cmd.completion);
+ } else {
+ (void)do_flush_pages(wc, 1, cmd.completion);
+ }
+ break;
+ case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE:
+ rrdeng_invalidate_oldest_committed(wc);
+ break;
+ }
+ default:
+ debug(D_RRDENGINE, "%s: default.", __func__);
+ break;
+ }
+ } while (opcode != RRDENG_NOOP);
+ }
+
+ /* cleanup operations of the event loop */
+ info("Shutting down RRD engine event loop for tier %d", ctx->tier);
+
+ /*
+ * uv_async_send after uv_close does not seem to crash in linux at the moment,
+ * it is however undocumented behaviour and we need to be aware if this becomes
+ * an issue in the future.
+ */
+ uv_close((uv_handle_t *)&wc->async, NULL);
+
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all committed pages. */
+ }
+ wal_flush_transaction_buffer(wc);
+ uv_run(loop, UV_RUN_DEFAULT);
+
+ info("Shutting down RRD engine event loop for tier %d complete", ctx->tier);
+ /* TODO: don't let the API block by waiting to enqueue commands */
+ uv_cond_destroy(&wc->cmd_cond);
+/* uv_mutex_destroy(&wc->cmd_mutex); */
+ fatal_assert(0 == uv_loop_close(loop));
+ freez(loop);
+
+ worker_unregister();
+ return;
+
+error_after_timer_init:
+ uv_close((uv_handle_t *)&wc->async, NULL);
+error_after_async_init:
+ fatal_assert(0 == uv_loop_close(loop));
+error_after_loop_init:
+ freez(loop);
+
+ wc->error = UV_EAGAIN;
+ /* wake up initialization thread */
+ completion_mark_complete(&ctx->rrdengine_completion);
+ worker_unregister();
+}
+
+/* C entry point for development purposes
+ * make "LDFLAGS=-errdengine_main"
+ */
+void rrdengine_main(void)
+{
+ int ret;
+ struct rrdengine_instance *ctx;
+
+ sanity_check();
+ ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB, 0);
+ if (ret) {
+ exit(ret);
+ }
+ rrdeng_exit(ctx);
+ fprintf(stderr, "Hello world!");
+ exit(0);
+}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
new file mode 100644
index 0000000..521d252
--- /dev/null
+++ b/database/engine/rrdengine.h
@@ -0,0 +1,283 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINE_H
+#define NETDATA_RRDENGINE_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <fcntl.h>
+#include <lz4.h>
+#include <Judy.h>
+#include <openssl/sha.h>
+#include <openssl/evp.h>
+#include "daemon/common.h"
+#include "../rrd.h"
+#include "rrddiskprotocol.h"
+#include "rrdenginelib.h"
+#include "datafile.h"
+#include "journalfile.h"
+#include "rrdengineapi.h"
+#include "pagecache.h"
+#include "rrdenglocking.h"
+
+#ifdef NETDATA_RRD_INTERNALS
+
+#endif /* NETDATA_RRD_INTERNALS */
+
+extern unsigned rrdeng_pages_per_extent;
+
+/* Forward declarations */
+struct rrdengine_instance;
+
+#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
+
+#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
+#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+
+struct rrdeng_collect_handle {
+ struct pg_cache_page_index *page_index;
+ struct rrdeng_page_descr *descr;
+ unsigned long page_correlation_id;
+ // set to 1 when this dimension is not page aligned with the other dimensions in the chart
+ uint8_t unaligned_page;
+ struct pg_alignment *alignment;
+};
+
+struct rrdeng_query_handle {
+ struct rrdeng_page_descr *descr;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+ time_t wanted_start_time_s;
+ time_t now_s;
+ unsigned position;
+ unsigned entries;
+ storage_number *page;
+ usec_t page_end_time_ut;
+ uint32_t page_length;
+ time_t dt_s;
+};
+
+typedef enum {
+ RRDENGINE_STATUS_UNINITIALIZED = 0,
+ RRDENGINE_STATUS_INITIALIZING,
+ RRDENGINE_STATUS_INITIALIZED
+} rrdengine_state_t;
+
+enum rrdeng_opcode {
+ /* can be used to return empty status or flush the command queue */
+ RRDENG_NOOP = 0,
+
+ RRDENG_READ_PAGE,
+ RRDENG_READ_EXTENT,
+ RRDENG_COMMIT_PAGE,
+ RRDENG_FLUSH_PAGES,
+ RRDENG_SHUTDOWN,
+ RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
+ RRDENG_QUIESCE,
+
+ RRDENG_MAX_OPCODE
+};
+
+struct rrdeng_read_page {
+ struct rrdeng_page_descr *page_cache_descr;
+};
+
+struct rrdeng_read_extent {
+ struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
+ int page_count;
+};
+
+struct rrdeng_cmd {
+ enum rrdeng_opcode opcode;
+ union {
+ struct rrdeng_read_page read_page;
+ struct rrdeng_read_extent read_extent;
+ struct completion *completion;
+ };
+};
+
+#define RRDENG_CMD_Q_MAX_SIZE (2048)
+
+struct rrdeng_cmdqueue {
+ unsigned head, tail;
+ struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE];
+};
+
+struct extent_io_descriptor {
+ uv_fs_t req;
+ uv_work_t req_worker;
+ uv_buf_t iov;
+ uv_file file;
+ void *buf;
+ void *map_base;
+ size_t map_length;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+ unsigned descr_count;
+ int release_descr;
+ struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT];
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */
+};
+
+struct generic_io_descriptor {
+ uv_fs_t req;
+ uv_buf_t iov;
+ void *buf;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+};
+
+struct extent_cache_element {
+ struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */
+ unsigned fileno;
+ struct extent_cache_element *prev; /* LRU */
+ struct extent_cache_element *next; /* LRU */
+ struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */
+ uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE];
+};
+
+#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */
+
+/* Initialize by setting the structure to zero */
+struct extent_cache {
+ struct extent_cache_element extent_array[MAX_CACHED_EXTENTS];
+ unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */
+ unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */
+
+ struct extent_cache_element *replaceQ_head; /* LRU */
+ struct extent_cache_element *replaceQ_tail; /* MRU */
+};
+
+struct rrdengine_worker_config {
+ struct rrdengine_instance *ctx;
+
+ uv_thread_t thread;
+ uv_loop_t* loop;
+ uv_async_t async;
+
+ /* file deletion thread */
+ uv_thread_t *now_deleting_files;
+ unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
+
+ /* dirty page deletion thread */
+ uv_thread_t *now_invalidating_dirty_pages;
+ /* set to 0 when now_invalidating_dirty_pages is still running */
+ unsigned long cleanup_thread_invalidating_dirty_pages;
+ unsigned inflight_dirty_pages;
+
+ /* FIFO command queue */
+ uv_mutex_t cmd_mutex;
+ uv_cond_t cmd_cond;
+ volatile unsigned queue_size;
+ struct rrdeng_cmdqueue cmd_queue;
+
+ struct extent_cache xt_cache;
+
+ int error;
+};
+
+/*
+ * Debug statistics not used by code logic.
+ * They only describe operations since DB engine instance load time.
+ */
+struct rrdengine_statistics {
+ rrdeng_stats_t metric_API_producers;
+ rrdeng_stats_t metric_API_consumers;
+ rrdeng_stats_t pg_cache_insertions;
+ rrdeng_stats_t pg_cache_deletions;
+ rrdeng_stats_t pg_cache_hits;
+ rrdeng_stats_t pg_cache_misses;
+ rrdeng_stats_t pg_cache_backfills;
+ rrdeng_stats_t pg_cache_evictions;
+ rrdeng_stats_t before_decompress_bytes;
+ rrdeng_stats_t after_decompress_bytes;
+ rrdeng_stats_t before_compress_bytes;
+ rrdeng_stats_t after_compress_bytes;
+ rrdeng_stats_t io_write_bytes;
+ rrdeng_stats_t io_write_requests;
+ rrdeng_stats_t io_read_bytes;
+ rrdeng_stats_t io_read_requests;
+ rrdeng_stats_t io_write_extent_bytes;
+ rrdeng_stats_t io_write_extents;
+ rrdeng_stats_t io_read_extent_bytes;
+ rrdeng_stats_t io_read_extents;
+ rrdeng_stats_t datafile_creations;
+ rrdeng_stats_t datafile_deletions;
+ rrdeng_stats_t journalfile_creations;
+ rrdeng_stats_t journalfile_deletions;
+ rrdeng_stats_t page_cache_descriptors;
+ rrdeng_stats_t io_errors;
+ rrdeng_stats_t fs_errors;
+ rrdeng_stats_t pg_cache_over_half_dirty_events;
+ rrdeng_stats_t flushing_pressure_page_deletions;
+};
+
+/* I/O errors global counter */
+extern rrdeng_stats_t global_io_errors;
+/* File-System errors global counter */
+extern rrdeng_stats_t global_fs_errors;
+/* number of File-Descriptors that have been reserved by dbengine */
+extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
+/* inability to flush global counters */
+extern rrdeng_stats_t global_pg_cache_over_half_dirty_events;
+extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */
+
+#define NO_QUIESCE (0) /* initial state when all operations function normally */
+#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
+#define QUIESCED (2) /* is set after all threads have finished running */
+
+typedef enum {
+ LOAD_ERRORS_PAGE_FLIPPED_TIME = 0,
+ LOAD_ERRORS_PAGE_EQUAL_TIME = 1,
+ LOAD_ERRORS_PAGE_ZERO_ENTRIES = 2,
+ LOAD_ERRORS_PAGE_UPDATE_ZERO = 3,
+ LOAD_ERRORS_PAGE_FLEXY_TIME = 4,
+ LOAD_ERRORS_DROPPED_EXTENT = 5,
+} INVALID_PAGE_ID;
+
+struct rrdengine_instance {
+ struct rrdengine_worker_config worker_config;
+ struct completion rrdengine_completion;
+ struct page_cache pg_cache;
+ uint8_t drop_metrics_under_page_cache_pressure; /* boolean */
+ uint8_t global_compress_alg;
+ struct transaction_commit_log commit_log;
+ struct rrdengine_datafile_list datafiles;
+ RRDHOST *host; /* the legacy host, or NULL for multi-host DB */
+ char dbfiles_path[FILENAME_MAX + 1];
+ char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */
+ uint64_t disk_space;
+ uint64_t max_disk_space;
+ int tier;
+ unsigned last_fileno; /* newest index of datafile and journalfile */
+ unsigned long max_cache_pages;
+ unsigned long cache_pages_low_watermark;
+ unsigned long metric_API_max_producers;
+
+ uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
+ uint8_t page_type; /* Default page type for this context */
+
+ struct rrdengine_statistics stats;
+
+ struct {
+ size_t counter;
+ usec_t latest_end_time_ut;
+ } load_errors[6];
+};
+
+void *dbengine_page_alloc(void);
+void dbengine_page_free(void *page);
+
+int init_rrd_files(struct rrdengine_instance *ctx);
+void finalize_rrd_files(struct rrdengine_instance *ctx);
+void rrdeng_test_quota(struct rrdengine_worker_config* wc);
+void rrdeng_worker(void* arg);
+void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
+struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
+
+#endif /* NETDATA_RRDENGINE_H */
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
new file mode 100755
index 0000000..4525b04
--- /dev/null
+++ b/database/engine/rrdengineapi.c
@@ -0,0 +1,1272 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+#include "../storage_engine.h"
+
+/* Default global database instance */
+struct rrdengine_instance multidb_ctx_storage_tier0;
+struct rrdengine_instance multidb_ctx_storage_tier1;
+struct rrdengine_instance multidb_ctx_storage_tier2;
+struct rrdengine_instance multidb_ctx_storage_tier3;
+struct rrdengine_instance multidb_ctx_storage_tier4;
+#if RRD_STORAGE_TIERS != 5
+#error RRD_STORAGE_TIERS is not 5 - you need to add allocations here
+#endif
+struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
+uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER};
+
+#if PAGE_TYPE_MAX != 1
+#error PAGE_TYPE_MAX is not 1 - you need to add allocations here
+#endif
+size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)};
+
+__attribute__((constructor)) void initialize_multidb_ctx(void) {
+ multidb_ctx[0] = &multidb_ctx_storage_tier0;
+ multidb_ctx[1] = &multidb_ctx_storage_tier1;
+ multidb_ctx[2] = &multidb_ctx_storage_tier2;
+ multidb_ctx[3] = &multidb_ctx_storage_tier3;
+ multidb_ctx[4] = &multidb_ctx_storage_tier4;
+}
+
+int db_engine_use_malloc = 0;
+int default_rrdeng_page_fetch_timeout = 3;
+int default_rrdeng_page_fetch_retries = 3;
+int default_rrdeng_page_cache_mb = 32;
+int default_rrdeng_disk_quota_mb = 256;
+int default_multidb_disk_quota_mb = 256;
+/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
+uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
+
+// ----------------------------------------------------------------------------
+// metrics groups
+
+static inline void rrdeng_page_alignment_acquire(struct pg_alignment *pa) {
+ if(unlikely(!pa)) return;
+ __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
+}
+
+static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) {
+ if(unlikely(!pa)) return true;
+
+ if(__atomic_sub_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST) == 0) {
+ freez(pa);
+ return true;
+ }
+
+ return false;
+}
+
+// charts call this
+STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
+ struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment));
+ rrdeng_page_alignment_acquire(pa);
+ return (STORAGE_METRICS_GROUP *)pa;
+}
+
+// charts call this
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) {
+ if(unlikely(!smg)) return;
+
+ struct pg_alignment *pa = (struct pg_alignment *)smg;
+ rrdeng_page_alignment_release(pa);
+}
+
+// ----------------------------------------------------------------------------
+// metric handle for legacy dbs
+
+/* This UUID is not unique across hosts */
+void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid)
+{
+ EVP_MD_CTX *evpctx;
+ unsigned char hash_value[EVP_MAX_MD_SIZE];
+ unsigned int hash_len;
+
+ evpctx = EVP_MD_CTX_create();
+ EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id));
+ EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id));
+ EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
+ EVP_MD_CTX_destroy(evpctx);
+ fatal_assert(hash_len > sizeof(uuid_t));
+ memcpy(ret_uuid, hash_value, sizeof(uuid_t));
+}
+
+/* Transform legacy UUID to be unique across hosts deterministically */
+void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid)
+{
+ EVP_MD_CTX *evpctx;
+ unsigned char hash_value[EVP_MAX_MD_SIZE];
+ unsigned int hash_len;
+
+ evpctx = EVP_MD_CTX_create();
+ EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN);
+ EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t));
+ EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
+ EVP_MD_CTX_destroy(evpctx);
+ fatal_assert(hash_len > sizeof(uuid_t));
+ memcpy(ret_uuid, hash_value, sizeof(uuid_t));
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) {
+ uuid_t legacy_uuid;
+ rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
+ return rrdeng_metric_get(db_instance, &legacy_uuid);
+}
+
+// ----------------------------------------------------------------------------
+// metric handle
+
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+
+ __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
+ return db_metric_handle;
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct pg_cache_page_index *page_index = NULL;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t));
+ if (likely(NULL != PValue))
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ if (likely(page_index))
+ __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
+
+ return (STORAGE_METRIC_HANDLE *)page_index;
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
+ internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
+
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct pg_cache_page_index *page_index;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ Pvoid_t *PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t), PJE0);
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(uuid, ctx);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
+ page_index->refcount = 1;
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+
+ return (STORAGE_METRIC_HANDLE *)page_index;
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
+ STORAGE_METRIC_HANDLE *db_metric_handle;
+
+ db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid);
+ if(!db_metric_handle) {
+ db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset));
+ if(db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ uuid_copy(rd->metric_uuid, page_index->id);
+ }
+ }
+ if(!db_metric_handle)
+ db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ if(uuid_compare(rd->metric_uuid, page_index->id) != 0) {
+ char uuid1[UUID_STR_LEN + 1];
+ char uuid2[UUID_STR_LEN + 1];
+
+ uuid_unparse(rd->metric_uuid, uuid1);
+ uuid_unparse(page_index->id, uuid2);
+ fatal("DBENGINE: uuids do not match, asked for metric '%s', but got page_index of metric '%s'", uuid1, uuid2);
+ }
+
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ if(page_index->ctx != ctx)
+ fatal("DBENGINE: mixed up rrdengine instances, asked for metric from %p, got from %p", ctx, page_index->ctx);
+#endif
+
+ return db_metric_handle;
+}
+
+
+// ----------------------------------------------------------------------------
+// collect ops
+
+/*
+ * Gets a handle for storing metrics to the database.
+ * The handle must be released with rrdeng_store_metric_final().
+ */
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ struct rrdeng_collect_handle *handle;
+
+ handle = callocz(1, sizeof(struct rrdeng_collect_handle));
+ handle->page_index = page_index;
+ handle->descr = NULL;
+ handle->unaligned_page = 0;
+ page_index->latest_update_every_s = update_every;
+
+ handle->alignment = (struct pg_alignment *)smg;
+ rrdeng_page_alignment_acquire(handle->alignment);
+
+ uv_rwlock_wrlock(&page_index->lock);
+ ++page_index->writers;
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ return (STORAGE_COLLECT_HANDLE *)handle;
+}
+
+/* The page must be populated and referenced */
+static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
+{
+ switch(descr->type) {
+ case PAGE_METRICS: {
+ size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
+ storage_number *array = (storage_number *)descr->pg_cache_descr->page;
+ for (size_t i = 0 ; i < slots; ++i) {
+ if(does_storage_number_exist(array[i]))
+ return 0;
+ }
+ }
+ break;
+
+ case PAGE_TIER: {
+ size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
+ storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page;
+ for (size_t i = 0 ; i < slots; ++i) {
+ if(fpclassify(array[i].sum_value) != FP_NAN)
+ return 0;
+ }
+ }
+ break;
+
+ default: {
+ static bool logged = false;
+ if(!logged) {
+ error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type);
+ logged = true;
+ }
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
+ struct rrdengine_instance *ctx = handle->page_index->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
+
+ if (unlikely(!ctx)) return;
+ if (unlikely(!descr)) return;
+
+ if (likely(descr->page_length)) {
+ int page_is_empty;
+
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ print_page_cache_descr(descr, "Page has empty metrics only, deleting", true);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1, 0, NULL);
+ } else
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ } else {
+ dbengine_page_free(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
+ rrdeng_page_descr_freez(descr);
+ }
+ handle->descr = NULL;
+}
+
+static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection_handle,
+ usec_t point_in_time_ut,
+ NETDATA_DOUBLE n,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags)
+{
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct pg_cache_page_index *page_index = handle->page_index;
+ struct rrdengine_instance *ctx = handle->page_index->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = handle->descr;
+
+ void *page;
+ uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
+
+ if (descr) {
+ /* Make alignment decisions */
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(descr->end_time_ut + page_index->latest_update_every_s * USEC_PER_SEC != point_in_time_ut) {
+ char buffer[200 + 1];
+ snprintfz(buffer, 200,
+ "metrics collected are %s, end_time_ut = %llu, point_in_time_ut = %llu, update_every = %u, delta = %llu",
+ (point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC > page_index->latest_update_every_s)?"far apart":"not aligned",
+ descr->end_time_ut / USEC_PER_SEC,
+ point_in_time_ut / USEC_PER_SEC,
+ page_index->latest_update_every_s,
+ point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC);
+ print_page_cache_descr(descr, buffer, false);
+ }
+#endif
+
+ if (descr->page_length == handle->alignment->page_length) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ /* is the metric far enough out of alignment with the others? */
+ if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < handle->alignment->page_length)) {
+ handle->unaligned_page = 1;
+ print_page_cache_descr(descr, "Metric page is not aligned with chart", true);
+ }
+ if (unlikely(handle->unaligned_page &&
+ /* did the other metrics change page? */
+ handle->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
+ print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true);
+ must_flush_unaligned_page = 1;
+ handle->unaligned_page = 0;
+ }
+ }
+ if (unlikely(NULL == descr ||
+ descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE ||
+ must_flush_unaligned_page)) {
+
+ if(descr) {
+ print_page_cache_descr(descr, "flushing metric", true);
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+
+ page = rrdeng_create_page(ctx, &page_index->id, &descr);
+ fatal_assert(page);
+
+ descr->update_every_s = page_index->latest_update_every_s;
+ handle->descr = descr;
+
+ handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
+
+ if (0 == handle->alignment->page_length) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ }
+
+ page = descr->pg_cache_descr->page;
+
+ switch (descr->type) {
+ case PAGE_METRICS: {
+ ((storage_number *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = pack_storage_number(n, flags);
+ }
+ break;
+
+ case PAGE_TIER: {
+ storage_number_tier1_t number_tier1;
+ number_tier1.sum_value = (float)n;
+ number_tier1.min_value = (float)min_value;
+ number_tier1.max_value = (float)max_value;
+ number_tier1.anomaly_count = anomaly_count;
+ number_tier1.count = count;
+ ((storage_number_tier1_t *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = number_tier1;
+ }
+ break;
+
+ default: {
+ static bool logged = false;
+ if(!logged) {
+ error("DBENGINE: cannot store metric on unknown page type id %d", descr->type);
+ logged = true;
+ }
+ }
+ break;
+ }
+
+ pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
+
+ if (perfect_page_alignment)
+ handle->alignment->page_length = descr->page_length;
+ if (unlikely(INVALID_TIME == descr->start_time_ut)) {
+ unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
+ descr->start_time_ut = point_in_time_ut;
+
+ new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1);
+ while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) {
+ /* Increase ctx->metric_API_max_producers */
+ ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers,
+ old_metric_API_max_producers,
+ new_metric_API_producers);
+ if (old_metric_API_max_producers == ret_metric_API_max_producers) {
+ /* success */
+ break;
+ }
+ }
+
+ pg_cache_insert(ctx, page_index, descr);
+ } else {
+ pg_cache_add_new_metric_time(page_index, descr);
+ }
+
+// {
+// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
+// if(uuid_compare(u, page_index->id) == 0) {
+// char buffer[100];
+// snprintfz(buffer, 100, "store system.cpu, collect:%u, page_index first:%u, last:%u",
+// (uint32_t)(point_in_time / USEC_PER_SEC),
+// (uint32_t)(page_index->oldest_time / USEC_PER_SEC),
+// (uint32_t)(page_index->latest_time / USEC_PER_SEC));
+//
+// print_page_cache_descr(descr, buffer, false);
+// }
+// }
+}
+
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
+ usec_t point_in_time_ut,
+ NETDATA_DOUBLE n,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags)
+{
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct pg_cache_page_index *page_index = handle->page_index;
+ struct rrdeng_page_descr *descr = handle->descr;
+
+ if(likely(descr)) {
+ usec_t last_point_in_time_ut = descr->end_time_ut;
+ usec_t update_every_ut = page_index->latest_update_every_s * USEC_PER_SEC;
+ size_t points_gap = (point_in_time_ut <= last_point_in_time_ut) ?
+ (size_t)0 :
+ (size_t)((point_in_time_ut - last_point_in_time_ut) / update_every_ut);
+
+ if(unlikely(points_gap != 1)) {
+ if (unlikely(points_gap <= 0)) {
+ time_t now = now_realtime_sec();
+ static __thread size_t counter = 0;
+ static __thread time_t last_time_logged = 0;
+ counter++;
+
+ if(now - last_time_logged > 600) {
+ error("DBENGINE: collected point is in the past (repeated %zu times in the last %zu secs). Ignoring these data collection points.",
+ counter, (size_t)(last_time_logged?(now - last_time_logged):0));
+
+ last_time_logged = now;
+ counter = 0;
+ }
+ return;
+ }
+
+ size_t point_size = PAGE_POINT_SIZE_BYTES(descr);
+ size_t page_size_in_points = RRDENG_BLOCK_SIZE / point_size;
+ size_t used_points = descr->page_length / point_size;
+ size_t remaining_points_in_page = page_size_in_points - used_points;
+
+ bool new_point_is_aligned = true;
+ if(unlikely((point_in_time_ut - last_point_in_time_ut) / points_gap != update_every_ut))
+ new_point_is_aligned = false;
+
+ if(unlikely(points_gap > remaining_points_in_page || !new_point_is_aligned)) {
+// char buffer[200];
+// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Cutting page.",
+// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
+// print_page_cache_descr(descr, buffer, false);
+
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else {
+// char buffer[200];
+// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Filling the gap.",
+// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
+// print_page_cache_descr(descr, buffer, false);
+
+ // loop to fill the gap
+ usec_t step_ut = page_index->latest_update_every_s * USEC_PER_SEC;
+ usec_t last_point_filled_ut = last_point_in_time_ut + step_ut;
+
+ while (last_point_filled_ut < point_in_time_ut) {
+ rrdeng_store_metric_next_internal(
+ collection_handle, last_point_filled_ut, NAN, NAN, NAN,
+ 1, 0, SN_EMPTY_SLOT);
+
+ last_point_filled_ut += step_ut;
+ }
+ }
+ }
+ }
+
+ rrdeng_store_metric_next_internal(collection_handle, point_in_time_ut, n, min_value, max_value, count, anomaly_count, flags);
+}
+
+
+/*
+ * Releases the database reference from the handle for storing metrics.
+ * Returns 1 if it's safe to delete the dimension.
+ */
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct pg_cache_page_index *page_index = handle->page_index;
+
+ uint8_t can_delete_metric = 0;
+
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ uv_rwlock_wrlock(&page_index->lock);
+
+ if (!--page_index->writers && !page_index->page_count)
+ can_delete_metric = 1;
+
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ rrdeng_page_alignment_release(handle->alignment);
+ freez(handle);
+
+ return can_delete_metric;
+}
+
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct pg_cache_page_index *page_index = handle->page_index;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ uv_rwlock_rdlock(&page_index->lock);
+ page_index->latest_update_every_s = update_every;
+ uv_rwlock_rdunlock(&page_index->lock);
+}
+
+// ----------------------------------------------------------------------------
+// query ops
+
+//static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
+//{
+// return (uint32_t *)&page_info->scratch[0];
+//}
+//
+//static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
+//{
+// return (uint32_t *)&page_info->scratch[sizeof(uint32_t)];
+//}
+//
+/*
+ * Gets a handle for loading metrics from the database.
+ * The handle must be released with rrdeng_load_metric_final().
+ */
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, time_t start_time_s, time_t end_time_s)
+{
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ struct rrdengine_instance *ctx = page_index->ctx;
+
+ // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time);
+
+ struct rrdeng_query_handle *handle;
+ unsigned pages_nr;
+
+ if(!page_index->latest_update_every_s)
+ page_index->latest_update_every_s = default_rrd_update_every;
+
+ rrdimm_handle->start_time_s = start_time_s;
+ rrdimm_handle->end_time_s = end_time_s;
+
+ handle = callocz(1, sizeof(struct rrdeng_query_handle));
+ handle->wanted_start_time_s = start_time_s;
+ handle->now_s = start_time_s;
+ handle->position = 0;
+ handle->ctx = ctx;
+ handle->descr = NULL;
+ handle->dt_s = page_index->latest_update_every_s;
+ rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
+ pages_nr = pg_cache_preload(ctx, &page_index->id, start_time_s * USEC_PER_SEC, end_time_s * USEC_PER_SEC,
+ NULL, &handle->page_index);
+ if (unlikely(NULL == handle->page_index || 0 == pages_nr))
+ // there are no metrics to load
+ handle->wanted_start_time_s = INVALID_TIME;
+}
+
+static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_handle, bool debug_this __maybe_unused) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+
+ struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
+
+ uint32_t page_length;
+ usec_t page_end_time_ut;
+ unsigned position;
+
+ if (likely(descr)) {
+ // Drop old page's reference
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+
+ pg_cache_put(ctx, descr);
+ handle->descr = NULL;
+ handle->wanted_start_time_s = (time_t)((handle->page_end_time_ut / USEC_PER_SEC) + handle->dt_s);
+
+ if (unlikely(handle->wanted_start_time_s > rrdimm_handle->end_time_s))
+ return 1;
+ }
+
+ usec_t wanted_start_time_ut = handle->wanted_start_time_s * USEC_PER_SEC;
+ descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
+ wanted_start_time_ut, rrdimm_handle->end_time_s * USEC_PER_SEC);
+ if (NULL == descr)
+ return 1;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
+#endif
+
+ handle->descr = descr;
+ pg_cache_atomic_get_pg_info(descr, &page_end_time_ut, &page_length);
+ if (unlikely(INVALID_TIME == descr->start_time_ut || INVALID_TIME == page_end_time_ut || 0 == descr->update_every_s)) {
+ error("DBENGINE: discarding invalid page descriptor (start_time = %llu, end_time = %llu, update_every_s = %d)",
+ descr->start_time_ut, page_end_time_ut, descr->update_every_s);
+ return 1;
+ }
+
+ if (unlikely(descr->start_time_ut != page_end_time_ut && wanted_start_time_ut > descr->start_time_ut)) {
+ // we're in the middle of the page somewhere
+ unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
+ position = ((uint64_t)(wanted_start_time_ut - descr->start_time_ut)) * (entries - 1) /
+ (page_end_time_ut - descr->start_time_ut);
+ }
+ else
+ position = 0;
+
+ handle->page_end_time_ut = page_end_time_ut;
+ handle->page_length = page_length;
+ handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
+ handle->page = descr->pg_cache_descr->page;
+ handle->dt_s = descr->update_every_s;
+ handle->position = position;
+
+// if(debug_this)
+// info("DBENGINE: rrdeng_load_page_next(), "
+// "position:%d, "
+// "start_time_ut:%llu, "
+// "page_end_time_ut:%llu, "
+// "next_page_time_ut:%llu, "
+// "in_out:%s"
+// , position
+// , descr->start_time_ut
+// , page_end_time_ut
+// ,
+// wanted_start_time_ut, in_out?"true":"false"
+// );
+
+ return 0;
+}
+
+// Returns the metric and sets its timestamp into current_time
+// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
+// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+ // struct rrdeng_metric_handle *metric_handle = handle->metric_handle;
+
+ struct rrdeng_page_descr *descr = handle->descr;
+ time_t now = handle->now_s + handle->dt_s;
+
+// bool debug_this = false;
+// {
+// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
+// if(uuid_compare(u, handle->page_index->id) == 0) {
+// char buffer[100];
+// snprintfz(buffer, 100, "load system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
+// (uint32_t)(now),
+// (uint32_t)(handle->dt_s),
+// (uint32_t)(handle->position),
+// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
+// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
+//
+// print_page_cache_descr(descr, buffer, false);
+// debug_this = true;
+// }
+// }
+
+ STORAGE_POINT sp;
+ unsigned position = handle->position + 1;
+ storage_number_tier1_t tier1_value;
+
+ if (unlikely(INVALID_TIME == handle->wanted_start_time_s)) {
+ handle->wanted_start_time_s = INVALID_TIME;
+ handle->now_s = now;
+ storage_point_empty(sp, now - handle->dt_s, now);
+ return sp;
+ }
+
+ if (unlikely(!descr || position >= handle->entries)) {
+ // We need to get a new page
+ if(rrdeng_load_page_next(rrddim_handle, false)) {
+ // next calls will not load any more metrics
+ handle->wanted_start_time_s = INVALID_TIME;
+ handle->now_s = now;
+ storage_point_empty(sp, now - handle->dt_s, now);
+ return sp;
+ }
+
+ descr = handle->descr;
+ position = handle->position;
+ now = (time_t)((descr->start_time_ut / USEC_PER_SEC) + position * descr->update_every_s);
+
+// if(debug_this) {
+// char buffer[100];
+// snprintfz(buffer, 100, "NEW PAGE system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
+// (uint32_t)(now),
+// (uint32_t)(handle->dt_s),
+// (uint32_t)(handle->position),
+// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
+// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
+//
+// print_page_cache_descr(descr, buffer, false);
+// }
+ }
+
+ sp.start_time = now - handle->dt_s;
+ sp.end_time = now;
+
+ handle->position = position;
+ handle->now_s = now;
+
+ switch(descr->type) {
+ case PAGE_METRICS: {
+ storage_number n = handle->page[position];
+ sp.min = sp.max = sp.sum = unpack_storage_number(n);
+ sp.flags = n & SN_USER_FLAGS;
+ sp.count = 1;
+ sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
+ }
+ break;
+
+ case PAGE_TIER: {
+ tier1_value = ((storage_number_tier1_t *)handle->page)[position];
+ sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
+ sp.count = tier1_value.count;
+ sp.anomaly_count = tier1_value.anomaly_count;
+ sp.min = tier1_value.min_value;
+ sp.max = tier1_value.max_value;
+ sp.sum = tier1_value.sum_value;
+ }
+ break;
+
+ // we don't know this page type
+ default: {
+ static bool logged = false;
+ if(!logged) {
+ error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", descr->type);
+ logged = true;
+ }
+ storage_point_empty(sp, sp.start_time, sp.end_time);
+ }
+ break;
+ }
+
+ if (unlikely(now >= rrddim_handle->end_time_s)) {
+ // next calls will not load any more metrics
+ handle->wanted_start_time_s = INVALID_TIME;
+ }
+
+// if(debug_this)
+// info("DBENGINE: returning point: "
+// "time from %ld to %ld // query from %ld to %ld // wanted_start_time_s %ld"
+// , sp.start_time, sp.end_time
+// , rrddim_handle->start_time_s, rrddim_handle->end_time_s
+// , handle->wanted_start_time_s
+// );
+
+ return sp;
+}
+
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+ return (INVALID_TIME == handle->wanted_start_time_s);
+}
+
+/*
+ * Releases the database reference from the handle for loading metrics.
+ */
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+ struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
+
+ if (descr) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+ pg_cache_put(ctx, descr);
+ }
+
+ // whatever is allocated at rrdeng_load_metric_init() should be freed here
+ freez(handle);
+ rrdimm_handle->handle = NULL;
+}
+
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ return (time_t)(page_index->latest_time_ut / USEC_PER_SEC);
+}
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ return (time_t)(page_index->oldest_time_ut / USEC_PER_SEC);
+}
+
+int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t)
+{
+ struct page_cache *pg_cache;
+ struct rrdengine_instance *ctx;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+
+ ctx = (struct rrdengine_instance *)si;
+ if (unlikely(!ctx)) {
+ error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
+ return 1;
+ }
+ pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ if (likely(page_index)) {
+ *first_entry_t = page_index->oldest_time_ut / USEC_PER_SEC;
+ *last_entry_t = page_index->latest_time_ut / USEC_PER_SEC;
+ return 0;
+ }
+
+ return 1;
+}
+
+/* Also gets a reference for the page */
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
+{
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ void *page;
+ /* TODO: check maximum number of pages in page cache limit */
+
+ descr = pg_cache_create_descr();
+ descr->id = id; /* TODO: add page type: metric, log, something? */
+ descr->type = ctx->page_type;
+ page = dbengine_page_alloc(); /*TODO: add page size */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ pg_cache_descr->refcnt = 1;
+
+ debug(D_RRDENGINE, "Created new page:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ *ret_descr = descr;
+ return page;
+}
+
+/* The page must not be empty */
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+ Word_t page_correlation_id)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ unsigned nr_committed_pages;
+
+ if (unlikely(NULL == descr)) {
+ debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
+ return;
+ }
+ fatal_assert(descr->page_length);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
+ *PValue = descr;
+ nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) {
+ /* over 50% of pages have not been committed yet */
+
+ if (ctx->drop_metrics_under_page_cache_pressure &&
+ nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* 100% of pages are dirty */
+ struct rrdeng_cmd cmd;
+
+ cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ } else {
+ if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) {
+ /* only print the first time */
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data at risk of not being stored in the database, "
+ "please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+ }
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1);
+ rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1);
+ }
+ }
+
+ pg_cache_put(ctx, descr);
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
+{
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ debug(D_RRDENGINE, "Reading existing page:");
+ descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
+
+ return pg_cache_descr->page;
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle)
+{
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ debug(D_RRDENGINE, "Reading existing page:");
+ descr = pg_cache_lookup(ctx, NULL, id, point_in_time_ut);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
+
+ return pg_cache_descr->page;
+}
+
+/*
+ * Gathers Database Engine statistics.
+ * Careful when modifying this function.
+ * You must not change the indices of the statistics or user code will break.
+ * You must not exceed RRDENG_NR_STATS or it will crash.
+ */
+void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+{
+ if (ctx == NULL)
+ return;
+
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ array[0] = (uint64_t)ctx->stats.metric_API_producers;
+ array[1] = (uint64_t)ctx->stats.metric_API_consumers;
+ array[2] = (uint64_t)pg_cache->page_descriptors;
+ array[3] = (uint64_t)pg_cache->populated_pages;
+ array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages;
+ array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
+ array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
+ array[7] = (uint64_t)ctx->stats.pg_cache_hits;
+ array[8] = (uint64_t)ctx->stats.pg_cache_misses;
+ array[9] = (uint64_t)ctx->stats.pg_cache_backfills;
+ array[10] = (uint64_t)ctx->stats.pg_cache_evictions;
+ array[11] = (uint64_t)ctx->stats.before_compress_bytes;
+ array[12] = (uint64_t)ctx->stats.after_compress_bytes;
+ array[13] = (uint64_t)ctx->stats.before_decompress_bytes;
+ array[14] = (uint64_t)ctx->stats.after_decompress_bytes;
+ array[15] = (uint64_t)ctx->stats.io_write_bytes;
+ array[16] = (uint64_t)ctx->stats.io_write_requests;
+ array[17] = (uint64_t)ctx->stats.io_read_bytes;
+ array[18] = (uint64_t)ctx->stats.io_read_requests;
+ array[19] = (uint64_t)ctx->stats.io_write_extent_bytes;
+ array[20] = (uint64_t)ctx->stats.io_write_extents;
+ array[21] = (uint64_t)ctx->stats.io_read_extent_bytes;
+ array[22] = (uint64_t)ctx->stats.io_read_extents;
+ array[23] = (uint64_t)ctx->stats.datafile_creations;
+ array[24] = (uint64_t)ctx->stats.datafile_deletions;
+ array[25] = (uint64_t)ctx->stats.journalfile_creations;
+ array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+ array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
+ array[28] = (uint64_t)ctx->stats.io_errors;
+ array[29] = (uint64_t)ctx->stats.fs_errors;
+ array[30] = (uint64_t)global_io_errors;
+ array[31] = (uint64_t)global_fs_errors;
+ array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
+ array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events;
+ array[34] = (uint64_t)global_pg_cache_over_half_dirty_events;
+ array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions;
+ array[36] = (uint64_t)global_flushing_pressure_page_deletions;
+ fatal_assert(RRDENG_NR_STATS == 37);
+}
+
+/* Releases reference to page */
+void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
+{
+ (void)ctx;
+ pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
+}
+
+/*
+ * Returns 0 on success, negative on error
+ */
+int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb, size_t tier) {
+ struct rrdengine_instance *ctx;
+ int error;
+ uint32_t max_open_files;
+
+ max_open_files = rlimit_nofile.rlim_cur / 4;
+
+ /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
+ if (rrdeng_reserved_file_descriptors > max_open_files) {
+ error(
+ "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
+ (unsigned)rrdeng_reserved_file_descriptors,
+ (unsigned)max_open_files);
+
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EMFILE;
+ }
+
+ if(NULL == ctxp) {
+ ctx = multidb_ctx[tier];
+ memset(ctx, 0, sizeof(*ctx));
+ }
+ else {
+ *ctxp = ctx = callocz(1, sizeof(*ctx));
+ }
+ ctx->tier = tier;
+ ctx->page_type = tier_page_type[tier];
+ ctx->global_compress_alg = RRD_LZ4;
+ if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
+ page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+ ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE);
+ /* try to keep 5% of the page cache free */
+ ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100;
+ if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
+ disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
+ ctx->max_disk_space = disk_space_mb * 1048576LLU;
+ strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
+ ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
+ if (NULL == host)
+ strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN);
+ else
+ strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN);
+
+ ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
+ ctx->metric_API_max_producers = 0;
+ ctx->quiesce = NO_QUIESCE;
+ ctx->host = host;
+
+ memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
+ ctx->worker_config.ctx = ctx;
+ init_page_cache(ctx);
+ init_commit_log(ctx);
+ error = init_rrd_files(ctx);
+ if (error) {
+ goto error_after_init_rrd_files;
+ }
+
+ completion_init(&ctx->rrdengine_completion);
+ fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
+ /* wait for worker thread to initialize */
+ completion_wait_for(&ctx->rrdengine_completion);
+ completion_destroy(&ctx->rrdengine_completion);
+ uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER");
+ if (ctx->worker_config.error) {
+ goto error_after_rrdeng_worker;
+ }
+// error = metalog_init(ctx);
+// if (error) {
+// error("Failed to initialize metadata log file event loop.");
+// goto error_after_rrdeng_worker;
+// }
+
+ return 0;
+
+error_after_rrdeng_worker:
+ finalize_rrd_files(ctx);
+error_after_init_rrd_files:
+ free_page_cache(ctx);
+ if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) {
+ freez(ctx);
+ if (ctxp)
+ *ctxp = NULL;
+ }
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EIO;
+}
+
+/*
+ * Returns 0 on success, 1 on error
+ */
+int rrdeng_exit(struct rrdengine_instance *ctx)
+{
+ struct rrdeng_cmd cmd;
+
+ if (NULL == ctx) {
+ return 1;
+ }
+
+ /* TODO: add page to page cache */
+ cmd.opcode = RRDENG_SHUTDOWN;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
+
+ finalize_rrd_files(ctx);
+ //metalog_exit(ctx->metalog_ctx);
+ free_page_cache(ctx);
+
+ if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx))
+ freez(ctx);
+
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return 0;
+}
+
+void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
+{
+ struct rrdeng_cmd cmd;
+
+ if (NULL == ctx) {
+ return;
+ }
+
+ completion_init(&ctx->rrdengine_completion);
+ cmd.opcode = RRDENG_QUIESCE;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ /* wait for dbengine to quiesce */
+ completion_wait_for(&ctx->rrdengine_completion);
+ completion_destroy(&ctx->rrdengine_completion);
+
+ //metalog_prepare_exit(ctx->metalog_ctx);
+}
+
+RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
+ RRDENG_SIZE_STATS stats = { 0 };
+
+ for(struct pg_cache_page_index *page_index = ctx->pg_cache.metrics_index.last_page_index;
+ page_index != NULL ;page_index = page_index->prev) {
+ stats.metrics++;
+ stats.metrics_pages += page_index->page_count;
+ }
+
+ for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) {
+ stats.datafiles++;
+
+ for(struct extent_info *ei = df->extents.first; ei ; ei = ei->next) {
+ stats.extents++;
+ stats.extents_compressed_bytes += ei->size;
+
+ for(int p = 0; p < ei->number_of_pages ;p++) {
+ struct rrdeng_page_descr *descr = ei->pages[p];
+
+ usec_t update_every_usec;
+
+ size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
+
+ if(likely(points > 1))
+ update_every_usec = (descr->end_time_ut - descr->start_time_ut) / (points - 1);
+ else {
+ update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC;
+ stats.single_point_pages++;
+ }
+
+ time_t duration_secs = (time_t)((descr->end_time_ut - descr->start_time_ut + update_every_usec)/USEC_PER_SEC);
+
+ stats.extents_pages++;
+ stats.pages_uncompressed_bytes += descr->page_length;
+ stats.pages_duration_secs += duration_secs;
+ stats.points += points;
+
+ stats.page_types[descr->type].pages++;
+ stats.page_types[descr->type].pages_uncompressed_bytes += descr->page_length;
+ stats.page_types[descr->type].pages_duration_secs += duration_secs;
+ stats.page_types[descr->type].points += points;
+
+ if(!stats.first_t || (descr->start_time_ut - update_every_usec) < stats.first_t)
+ stats.first_t = (descr->start_time_ut - update_every_usec) / USEC_PER_SEC;
+
+ if(!stats.last_t || descr->end_time_ut > stats.last_t)
+ stats.last_t = descr->end_time_ut / USEC_PER_SEC;
+ }
+ }
+ }
+
+
+ stats.currently_collected_metrics = ctx->stats.metric_API_producers;
+ stats.max_concurrently_collected_metrics = ctx->metric_API_max_producers;
+
+ internal_error(stats.metrics_pages != stats.extents_pages + stats.currently_collected_metrics,
+ "DBENGINE: metrics pages is %zu, but extents pages is %zu and API consumers is %zu",
+ stats.metrics_pages, stats.extents_pages, stats.currently_collected_metrics);
+
+ stats.disk_space = ctx->disk_space;
+ stats.max_disk_space = ctx->max_disk_space;
+
+ stats.database_retention_secs = (time_t)(stats.last_t - stats.first_t);
+
+ if(stats.extents_pages)
+ stats.average_page_size_bytes = (double)stats.pages_uncompressed_bytes / (double)stats.extents_pages;
+
+ if(stats.pages_uncompressed_bytes > 0)
+ stats.average_compression_savings = 100.0 - ((double)stats.extents_compressed_bytes * 100.0 / (double)stats.pages_uncompressed_bytes);
+
+ if(stats.points)
+ stats.average_point_duration_secs = (double)stats.pages_duration_secs / (double)stats.points;
+
+ if(stats.metrics) {
+ stats.average_metric_retention_secs = (double)stats.pages_duration_secs / (double)stats.metrics;
+
+ if(stats.database_retention_secs) {
+ double metric_coverage = stats.average_metric_retention_secs / (double)stats.database_retention_secs;
+ double db_retention_days = (double)stats.database_retention_secs / 86400.0;
+
+ stats.estimated_concurrently_collected_metrics = stats.metrics * metric_coverage;
+
+ stats.ephemeral_metrics_per_day_percent = ((double)stats.metrics * 100.0 / (double)stats.estimated_concurrently_collected_metrics - 100.0) / (double)db_retention_days;
+ }
+ }
+
+ stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index) + sizeof(struct pg_alignment));
+ stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr));
+ stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile));
+ stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr));
+ stats.sizeof_point_data = page_type_size[ctx->page_type];
+ stats.sizeof_page_data = RRDENG_BLOCK_SIZE;
+ stats.pages_per_extent = rrdeng_pages_per_extent;
+
+ stats.sizeof_extent = sizeof(struct extent_info);
+ stats.sizeof_page_in_extent = sizeof(struct rrdeng_page_descr *);
+
+ stats.sizeof_metric_in_index = 40;
+ stats.sizeof_page_in_index = 24;
+
+ stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->tier);
+
+ return stats;
+}
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
new file mode 100644
index 0000000..3acee4e
--- /dev/null
+++ b/database/engine/rrdengineapi.h
@@ -0,0 +1,144 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINEAPI_H
+#define NETDATA_RRDENGINEAPI_H
+
+#include "rrdengine.h"
+
+#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8)
+#define RRDENG_MIN_DISK_SPACE_MB (64)
+
+#define RRDENG_NR_STATS (37)
+
+#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
+
+extern int db_engine_use_malloc;
+extern int default_rrdeng_page_fetch_timeout;
+extern int default_rrdeng_page_fetch_retries;
+extern int default_rrdeng_page_cache_mb;
+extern int default_rrdeng_disk_quota_mb;
+extern int default_multidb_disk_quota_mb;
+extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure;
+extern struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
+extern size_t page_type_size[];
+
+#define PAGE_POINT_SIZE_BYTES(x) page_type_size[(x)->type]
+
+struct rrdeng_region_info {
+ time_t start_time_s;
+ int update_every;
+ unsigned points;
+};
+
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+ Word_t page_correlation_id);
+void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
+void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle);
+void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
+
+void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid);
+void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid,
+ uuid_t *ret_uuid);
+
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id);
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle);
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle);
+
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags);
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle,
+ time_t start_time_s, time_t end_time_s);
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle);
+
+
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle);
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle);
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+
+void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+
+/* must call once before using anything */
+int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb, size_t tier);
+
+int rrdeng_exit(struct rrdengine_instance *ctx);
+void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
+int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t);
+
+extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+
+typedef struct rrdengine_size_statistics {
+ size_t default_granularity_secs;
+
+ size_t sizeof_metric;
+ size_t sizeof_metric_in_index;
+ size_t sizeof_page;
+ size_t sizeof_page_in_index;
+ size_t sizeof_extent;
+ size_t sizeof_page_in_extent;
+ size_t sizeof_datafile;
+ size_t sizeof_page_in_cache;
+ size_t sizeof_point_data;
+ size_t sizeof_page_data;
+
+ size_t pages_per_extent;
+
+ size_t datafiles;
+ size_t extents;
+ size_t extents_pages;
+ size_t points;
+ size_t metrics;
+ size_t metrics_pages;
+
+ size_t extents_compressed_bytes;
+ size_t pages_uncompressed_bytes;
+ time_t pages_duration_secs;
+
+ struct {
+ size_t pages;
+ size_t pages_uncompressed_bytes;
+ time_t pages_duration_secs;
+ size_t points;
+ } page_types[256];
+
+ size_t single_point_pages;
+
+ usec_t first_t;
+ usec_t last_t;
+
+ size_t currently_collected_metrics;
+ size_t max_concurrently_collected_metrics;
+ size_t estimated_concurrently_collected_metrics;
+
+ size_t disk_space;
+ size_t max_disk_space;
+
+ time_t database_retention_secs;
+ double average_compression_savings;
+ double average_point_duration_secs;
+ double average_metric_retention_secs;
+
+ double ephemeral_metrics_per_day_percent;
+
+ double average_page_size_bytes;
+} RRDENG_SIZE_STATS;
+
+RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
new file mode 100644
index 0000000..58bd9c4
--- /dev/null
+++ b/database/engine/rrdenginelib.c
@@ -0,0 +1,311 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+#define BUFSIZE (512)
+
+/* Caller must hold descriptor lock */
+void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug)
+{
+ if(log_debug && !(debug_flags & D_RRDENGINE))
+ return;
+
+ BUFFER *wb = buffer_create(512);
+
+ if(!descr) {
+ buffer_sprintf(wb, "DBENGINE: %s : descr is NULL", msg);
+ }
+ else {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ char uuid_str[UUID_STR_LEN];
+
+ uuid_unparse_lower(*descr->id, uuid_str);
+ buffer_sprintf(wb, "DBENGINE: %s : page(%p) metric:%s, len:%"PRIu32", time:%"PRIu64"->%"PRIu64", update_every:%u, type:%u, xt_offset:",
+ msg,
+ pg_cache_descr->page, uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time_ut,
+ (uint64_t)descr->end_time_ut,
+ (uint32_t)descr->update_every_s,
+ (uint32_t)descr->type
+ );
+ if (!descr->extent) {
+ buffer_strcat(wb, "N/A");
+ } else {
+ buffer_sprintf(wb, "%"PRIu64, descr->extent->offset);
+ }
+
+ buffer_sprintf(wb, ", flags:0x%2.2lX refcnt:%u", pg_cache_descr->flags, pg_cache_descr->refcnt);
+ }
+
+ if(log_debug)
+ debug(D_RRDENGINE, "%s", buffer_tostring(wb));
+ else
+ internal_error(true, "%s", buffer_tostring(wb));
+
+ buffer_free(wb);
+}
+
+void print_page_descr(struct rrdeng_page_descr *descr)
+{
+ char uuid_str[UUID_STR_LEN];
+ char str[BUFSIZE + 1];
+ int pos = 0;
+
+ uuid_unparse_lower(*descr->id, uuid_str);
+ pos += snprintfz(str, BUFSIZE - pos, "id=%s\n"
+ "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
+ uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time_ut,
+ (uint64_t)descr->end_time_ut);
+ if (!descr->extent) {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
+ } else {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
+ }
+ snprintfz(str + pos, BUFSIZE - pos, "\n\n");
+ fputs(str, stderr);
+}
+
+int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
+{
+ int ret;
+ uv_fs_t req;
+ uv_stat_t* s;
+
+ ret = uv_fs_fstat(NULL, &req, file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_fstat: %s\n", uv_strerror(ret));
+ }
+ fatal_assert(req.result == 0);
+ s = req.ptr;
+ if (!(s->st_mode & S_IFREG)) {
+ error("Not a regular file.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ if (s->st_size < min_size) {
+ error("File length is too short.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ *file_size = s->st_size;
+ uv_fs_req_cleanup(&req);
+
+ return 0;
+}
+
+/**
+ * Open file for I/O.
+ *
+ * @param path The full path of the file.
+ * @param flags Same flags as the open() system call uses.
+ * @param file On success sets (*file) to be the uv_file that was opened.
+ * @param direct Tries to open a file in direct I/O mode when direct=1, falls back to buffered mode if not possible.
+ * @return Returns UV error number that is < 0 on failure. 0 on success.
+ */
+int open_file_for_io(char *path, int flags, uv_file *file, int direct)
+{
+ uv_fs_t req;
+ int fd = -1, current_flags;
+
+ fatal_assert(0 == direct || 1 == direct);
+ for ( ; direct >= 0 ; --direct) {
+#ifdef __APPLE__
+ /* Apple OS does not support O_DIRECT */
+ direct = 0;
+#endif
+ current_flags = flags;
+ if (direct) {
+ current_flags |= O_DIRECT;
+ }
+ fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ if ((direct) && (UV_EINVAL == fd)) {
+ error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path);
+ } else {
+ error("Failed to open file \"%s\".", path);
+ --direct; /* break the loop */
+ }
+ } else {
+ fatal_assert(req.result >= 0);
+ *file = req.result;
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ --direct; /* break the loop */
+ }
+ uv_fs_req_cleanup(&req);
+ }
+
+ return fd;
+}
+
+char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size)
+{
+ struct page_cache *pg_cache;
+
+ pg_cache = &ctx->pg_cache;
+ snprintfz(str, size,
+ "metric_API_producers: %ld\n"
+ "metric_API_consumers: %ld\n"
+ "page_cache_total_pages: %ld\n"
+ "page_cache_descriptors: %ld\n"
+ "page_cache_populated_pages: %ld\n"
+ "page_cache_committed_pages: %ld\n"
+ "page_cache_insertions: %ld\n"
+ "page_cache_deletions: %ld\n"
+ "page_cache_hits: %ld\n"
+ "page_cache_misses: %ld\n"
+ "page_cache_backfills: %ld\n"
+ "page_cache_evictions: %ld\n"
+ "compress_before_bytes: %ld\n"
+ "compress_after_bytes: %ld\n"
+ "decompress_before_bytes: %ld\n"
+ "decompress_after_bytes: %ld\n"
+ "io_write_bytes: %ld\n"
+ "io_write_requests: %ld\n"
+ "io_read_bytes: %ld\n"
+ "io_read_requests: %ld\n"
+ "io_write_extent_bytes: %ld\n"
+ "io_write_extents: %ld\n"
+ "io_read_extent_bytes: %ld\n"
+ "io_read_extents: %ld\n"
+ "datafile_creations: %ld\n"
+ "datafile_deletions: %ld\n"
+ "journalfile_creations: %ld\n"
+ "journalfile_deletions: %ld\n"
+ "io_errors: %ld\n"
+ "fs_errors: %ld\n"
+ "global_io_errors: %ld\n"
+ "global_fs_errors: %ld\n"
+ "rrdeng_reserved_file_descriptors: %ld\n"
+ "pg_cache_over_half_dirty_events: %ld\n"
+ "global_pg_cache_over_half_dirty_events: %ld\n"
+ "flushing_pressure_page_deletions: %ld\n"
+ "global_flushing_pressure_page_deletions: %ld\n",
+ (long)ctx->stats.metric_API_producers,
+ (long)ctx->stats.metric_API_consumers,
+ (long)pg_cache->page_descriptors,
+ (long)ctx->stats.page_cache_descriptors,
+ (long)pg_cache->populated_pages,
+ (long)pg_cache->committed_page_index.nr_committed_pages,
+ (long)ctx->stats.pg_cache_insertions,
+ (long)ctx->stats.pg_cache_deletions,
+ (long)ctx->stats.pg_cache_hits,
+ (long)ctx->stats.pg_cache_misses,
+ (long)ctx->stats.pg_cache_backfills,
+ (long)ctx->stats.pg_cache_evictions,
+ (long)ctx->stats.before_compress_bytes,
+ (long)ctx->stats.after_compress_bytes,
+ (long)ctx->stats.before_decompress_bytes,
+ (long)ctx->stats.after_decompress_bytes,
+ (long)ctx->stats.io_write_bytes,
+ (long)ctx->stats.io_write_requests,
+ (long)ctx->stats.io_read_bytes,
+ (long)ctx->stats.io_read_requests,
+ (long)ctx->stats.io_write_extent_bytes,
+ (long)ctx->stats.io_write_extents,
+ (long)ctx->stats.io_read_extent_bytes,
+ (long)ctx->stats.io_read_extents,
+ (long)ctx->stats.datafile_creations,
+ (long)ctx->stats.datafile_deletions,
+ (long)ctx->stats.journalfile_creations,
+ (long)ctx->stats.journalfile_deletions,
+ (long)ctx->stats.io_errors,
+ (long)ctx->stats.fs_errors,
+ (long)global_io_errors,
+ (long)global_fs_errors,
+ (long)rrdeng_reserved_file_descriptors,
+ (long)ctx->stats.pg_cache_over_half_dirty_events,
+ (long)global_pg_cache_over_half_dirty_events,
+ (long)ctx->stats.flushing_pressure_page_deletions,
+ (long)global_flushing_pressure_page_deletions
+ );
+ return str;
+}
+
+int is_legacy_child(const char *machine_guid)
+{
+ uuid_t uuid;
+ char dbengine_file[FILENAME_MAX+1];
+
+ if (unlikely(!strcmp(machine_guid, "unittest-dbengine") || !strcmp(machine_guid, "dbengine-dataset") ||
+ !strcmp(machine_guid, "dbengine-stress-test"))) {
+ return 1;
+ }
+ if (!uuid_parse(machine_guid, uuid)) {
+ uv_fs_t stat_req;
+ snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid);
+ int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL);
+ if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) {
+ //info("Found legacy engine folder \"%s\"", dbengine_file);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+int count_legacy_children(char *dbfiles_path)
+{
+ int ret;
+ uv_fs_t req;
+ uv_dirent_t dent;
+ int legacy_engines = 0;
+
+ ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
+ return ret;
+ }
+
+ while(UV_EOF != uv_fs_scandir_next(&req, &dent)) {
+ if (dent.type == UV_DIRENT_DIR) {
+ if (is_legacy_child(dent.name))
+ legacy_engines++;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+ return legacy_engines;
+}
+
+int compute_multidb_diskspace()
+{
+ char multidb_disk_space_file[FILENAME_MAX + 1];
+ FILE *fp;
+ int computed_multidb_disk_quota_mb = -1;
+
+ snprintfz(multidb_disk_space_file, FILENAME_MAX, "%s/dbengine_multihost_size", netdata_configured_varlib_dir);
+ fp = fopen(multidb_disk_space_file, "r");
+ if (likely(fp)) {
+ int rc = fscanf(fp, "%d", &computed_multidb_disk_quota_mb);
+ fclose(fp);
+ if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) {
+ errno = 0;
+ error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file);
+ computed_multidb_disk_quota_mb = -1;
+ }
+ }
+
+ if (computed_multidb_disk_quota_mb == -1) {
+ int rc = count_legacy_children(netdata_configured_cache_dir);
+ if (likely(rc >= 0)) {
+ computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb;
+ info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb);
+
+ fp = fopen(multidb_disk_space_file, "w");
+ if (likely(fp)) {
+ fprintf(fp, "%d", computed_multidb_disk_quota_mb);
+ info("Created file '%s' to store the computed value", multidb_disk_space_file);
+ fclose(fp);
+ } else
+ error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file);
+ }
+ else
+ computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb;
+ }
+
+ return computed_multidb_disk_quota_mb;
+}
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
new file mode 100644
index 0000000..6b1a15f
--- /dev/null
+++ b/database/engine/rrdenginelib.h
@@ -0,0 +1,102 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINELIB_H
+#define NETDATA_RRDENGINELIB_H
+
+#include "libnetdata/libnetdata.h"
+
+/* Forward declarations */
+struct rrdeng_page_descr;
+struct rrdengine_instance;
+
+#define STR_HELPER(x) #x
+#define STR(x) STR_HELPER(x)
+
+#define BITS_PER_ULONG (sizeof(unsigned long) * 8)
+
+#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+
+#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC)
+
+typedef uintptr_t rrdeng_stats_t;
+
+#ifdef __ATOMIC_RELAXED
+#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED)
+#define rrd_atomic_add_fetch(p, n) __atomic_add_fetch(p, n, __ATOMIC_RELAXED)
+#else
+#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n)
+#define rrd_atomic_add_fetch(p, n) __sync_add_and_fetch(p, n)
+#endif
+
+#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n)
+
+/* returns -1 if it didn't find the first cleared bit, the position otherwise. Starts from LSB. */
+static inline int find_first_zero(unsigned x)
+{
+ return ffs((int)(~x)) - 1;
+}
+
+/* Starts from LSB. */
+static inline uint8_t check_bit(unsigned x, size_t pos)
+{
+ return !!(x & (1 << pos));
+}
+
+/* Starts from LSB. val is 0 or 1 */
+static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val)
+{
+ switch(val) {
+ case 0:
+ *x &= ~(1U << pos);
+ break;
+ case 1:
+ *x |= 1U << pos;
+ break;
+ default:
+ error("modify_bit() called with invalid argument.");
+ break;
+ }
+}
+
+#define RRDENG_PATH_MAX (4096)
+
+/* returns old *ptr value */
+static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr,
+ unsigned long oldval, unsigned long newval)
+{
+ return __sync_val_compare_and_swap(ptr, oldval, newval);
+}
+
+#ifndef O_DIRECT
+/* Workaround for OS X */
+#define O_DIRECT (0)
+#endif
+
+static inline int crc32cmp(void *crcp, uLong crc)
+{
+ return (*(uint32_t *)crcp != crc);
+}
+
+static inline void crc32set(void *crcp, uLong crc)
+{
+ *(uint32_t *)crcp = crc;
+}
+
+void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug);
+void print_page_descr(struct rrdeng_page_descr *descr);
+int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
+int open_file_for_io(char *path, int flags, uv_file *file, int direct);
+static inline int open_file_direct_io(char *path, int flags, uv_file *file)
+{
+ return open_file_for_io(path, flags, file, 1);
+}
+static inline int open_file_buffered_io(char *path, int flags, uv_file *file)
+{
+ return open_file_for_io(path, flags, file, 0);
+}
+char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
+int compute_multidb_diskspace();
+int is_legacy_child(const char *machine_guid);
+
+#endif /* NETDATA_RRDENGINELIB_H */
diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c
new file mode 100644
index 0000000..a23abf3
--- /dev/null
+++ b/database/engine/rrdenglocking.c
@@ -0,0 +1,241 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx)
+{
+ struct page_cache_descr *pg_cache_descr;
+
+ pg_cache_descr = mallocz(sizeof(*pg_cache_descr));
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, 1);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags = 0;
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
+ pg_cache_descr->refcnt = 0;
+ pg_cache_descr->waiters = 0;
+ fatal_assert(0 == uv_cond_init(&pg_cache_descr->cond));
+ fatal_assert(0 == uv_mutex_init(&pg_cache_descr->mutex));
+
+ return pg_cache_descr;
+}
+
+void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr)
+{
+ uv_cond_destroy(&pg_cache_descr->cond);
+ uv_mutex_destroy(&pg_cache_descr->mutex);
+ freez(pg_cache_descr);
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1);
+}
+
+/* also allocates page cache descriptor if missing */
+void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, old_users, new_state, ret_state;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t we_locked;
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ fatal_assert(old_state & PG_CACHE_DESCR_LOCKED);
+ new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ fatal_assert(0 == old_users);
+ continue; /* spin */
+ }
+ if (0 == old_state) {
+ /* no page cache descriptor has been allocated */
+
+ if (NULL == pg_cache_descr) {
+ pg_cache_descr = rrdeng_create_pg_cache_descr(ctx);
+ }
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, 0, new_state);
+ if (0 == ret_state) {
+ we_locked = 1;
+ descr->pg_cache_descr = pg_cache_descr;
+ pg_cache_descr->descr = descr;
+ pg_cache_descr = NULL; /* make sure we don't free pg_cache_descr */
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ /* page cache descriptor is already allocated */
+ if (unlikely(!(old_state & PG_CACHE_DESCR_ALLOCATED))) {
+ fatal("Invalid page cache descriptor locking state:%#lX", old_state);
+ }
+ new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+
+ if (pg_cache_descr) {
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ }
+ pg_cache_descr = descr->pg_cache_descr;
+ uv_mutex_lock(&pg_cache_descr->mutex);
+}
+
+void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL;
+ uint8_t we_locked;
+
+ uv_mutex_unlock(&descr->pg_cache_descr->mutex);
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ fatal_assert(0 == old_users);
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0);
+ if (old_state == ret_state) {
+ /* success */
+ rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr);
+ return;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ fatal_assert(0 == old_users);
+ continue; /* spin */
+ }
+ fatal_assert(old_state & PG_CACHE_DESCR_ALLOCATED);
+ pg_cache_descr = descr->pg_cache_descr;
+ /* caller is the only page cache descriptor user and there are no pending references on the page */
+ if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) &&
+ !pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ fatal_assert(!pg_cache_descr->waiters);
+
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ we_locked = 1;
+ delete_pg_cache_descr = pg_cache_descr;
+ descr->pg_cache_descr = NULL;
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ fatal_assert(old_users > 0);
+ new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+}
+
+/*
+ * Tries to deallocate page cache descriptor. If it fails, it postpones deallocation by setting the
+ * PG_CACHE_DESCR_DESTROY flag which will be eventually cleared by a different context after doing
+ * the deallocation.
+ */
+void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t just_locked, can_free, must_unlock;
+
+ just_locked = 0;
+ can_free = 0;
+ must_unlock = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(just_locked)) {
+ fatal_assert(0 == old_users);
+
+ must_unlock = 1;
+ just_locked = 0;
+ /* Try deallocate if there are no pending references on the page */
+ if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ fatal_assert(!pg_cache_descr->waiters);
+
+ descr->pg_cache_descr = NULL;
+ can_free = 1;
+ /* success */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (unlikely(must_unlock)) {
+ fatal_assert(0 == old_users);
+
+ if (can_free) {
+ /* success */
+ new_state = 0;
+ } else {
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+ new_state &= ~PG_CACHE_DESCR_LOCKED;
+ }
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* unlocked */
+ if (can_free)
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ return;
+ }
+ continue; /* spin */
+ }
+ if (!(old_state & PG_CACHE_DESCR_ALLOCATED)) {
+ /* don't do anything */
+ return;
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ fatal_assert(0 == old_users);
+ continue; /* spin */
+ }
+ /* caller is the only page cache descriptor user */
+ if (0 == old_users) {
+ new_state = old_state | PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ just_locked = 1;
+ pg_cache_descr = descr->pg_cache_descr;
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_DESTROY) {
+ /* don't do anything */
+ return;
+ }
+ /* plant PG_CACHE_DESCR_DESTROY so that other contexts eventually free the page cache descriptor */
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ return;
+ }
+ /* spin */
+ }
+} \ No newline at end of file
diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h
new file mode 100644
index 0000000..078eab3
--- /dev/null
+++ b/database/engine/rrdenglocking.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGLOCKING_H
+#define NETDATA_RRDENGLOCKING_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct page_cache_descr;
+
+struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx);
+void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr);
+void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+
+#endif /* NETDATA_RRDENGLOCKING_H */ \ No newline at end of file