From 8020f71afd34d7696d7933659df2d763ab05542f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 16:31:17 +0200 Subject: Adding upstream version 1.37.1. Signed-off-by: Daniel Baumann --- database/engine/Makefile.am | 11 + database/engine/README.md | 302 +++++++ database/engine/datafile.c | 460 ++++++++++ database/engine/datafile.h | 67 ++ database/engine/journalfile.c | 587 +++++++++++++ database/engine/journalfile.h | 49 ++ database/engine/metadata_log/README.md | 0 database/engine/pagecache.c | 1313 +++++++++++++++++++++++++++ database/engine/pagecache.h | 254 ++++++ database/engine/rrddiskprotocol.h | 120 +++ database/engine/rrdengine.c | 1508 ++++++++++++++++++++++++++++++++ database/engine/rrdengine.h | 283 ++++++ database/engine/rrdengineapi.c | 1272 +++++++++++++++++++++++++++ database/engine/rrdengineapi.h | 144 +++ database/engine/rrdenginelib.c | 311 +++++++ database/engine/rrdenginelib.h | 102 +++ database/engine/rrdenglocking.c | 241 +++++ database/engine/rrdenglocking.h | 17 + 18 files changed, 7041 insertions(+) create mode 100644 database/engine/Makefile.am create mode 100644 database/engine/README.md create mode 100644 database/engine/datafile.c create mode 100644 database/engine/datafile.h create mode 100644 database/engine/journalfile.c create mode 100644 database/engine/journalfile.h create mode 100644 database/engine/metadata_log/README.md create mode 100644 database/engine/pagecache.c create mode 100644 database/engine/pagecache.h create mode 100644 database/engine/rrddiskprotocol.h create mode 100644 database/engine/rrdengine.c create mode 100644 database/engine/rrdengine.h create mode 100755 database/engine/rrdengineapi.c create mode 100644 database/engine/rrdengineapi.h create mode 100644 database/engine/rrdenginelib.c create mode 100644 database/engine/rrdenginelib.h create mode 100644 database/engine/rrdenglocking.c create mode 100644 database/engine/rrdenglocking.h (limited to 'database/engine') diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am new file mode 100644 index 0000000..59250a9 --- /dev/null +++ b/database/engine/Makefile.am @@ -0,0 +1,11 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +SUBDIRS = \ + $(NULL) + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/database/engine/README.md b/database/engine/README.md new file mode 100644 index 0000000..c67e400 --- /dev/null +++ b/database/engine/README.md @@ -0,0 +1,302 @@ + + +# Database engine + +The Database Engine works like a traditional time series database. Unlike other [database modes](/database/README.md), +the amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression +ratio, not a fixed number of metrics collected. + +## Tiering + +Tiering is a mechanism of providing multiple tiers of data with +different [granularity on metrics](/docs/store/distributed-data-architecture.md#granularity-of-metrics). + +For Netdata Agents with version `netdata-1.35.0.138.nightly` and greater, `dbengine` supports Tiering, allowing almost +unlimited retention of data. + + +### Metric size + +Every Tier down samples the exact lower tier (lower tiers have greater resolution). You can have up to 5 +Tiers **[0. . 4]** of data (including the Tier 0, which has the highest resolution) + +Tier 0 is the default that was always available in `dbengine` mode. Tier 1 is the first level of aggregation, Tier 2 is +the second, and so on. + +Metrics on all tiers except of the _Tier 0_ also store the following five additional values for every point for accurate +representation: + +1. The `sum` of the points aggregated +2. The `min` of the points aggregated +3. The `max` of the points aggregated +4. The `count` of the points aggregated (could be constant, but it may not be due to gaps in data collection) +5. The `anomaly_count` of the points aggregated (how many of the aggregated points found anomalous) + +Among `min`, `max` and `sum`, the correct value is chosen based on the user query. `average` is calculated on the fly at +query time. + +### Tiering in a nutshell + +The `dbengine` is capable of retaining metrics for years. To further understand the `dbengine` tiering mechanism let's +explore the following configuration. + +``` +[db] + mode = dbengine + + # per second data collection + update every = 1 + + # enables Tier 1 and Tier 2, Tier 0 is always enabled in dbengine mode + storage tiers = 3 + + # Tier 0, per second data for a week + dbengine multihost disk space MB = 1100 + + # Tier 1, per minute data for a month + dbengine tier 1 multihost disk space MB = 330 + + # Tier 2, per hour data for a year + dbengine tier 2 multihost disk space MB = 67 +``` + +For 2000 metrics, collected every second and retained for a week, Tier 0 needs: 1 byte x 2000 metrics x 3600 secs per +hour x 24 hours per day x 7 days per week = 1100MB. + +By setting `dbengine multihost disk space MB` to `1100`, this node will start maintaining about a week of data. But pay +attention to the number of metrics. If you have more than 2000 metrics on a node, or you need more that a week of high +resolution metrics, you may need to adjust this setting accordingly. + +Tier 1 is by default sampling the data every **60 points of Tier 0**. In our case, Tier 0 is per second, if we want to +transform this information in terms of time then the Tier 1 "resolution" is per minute. + +Tier 1 needs four times more storage per point compared to Tier 0. So, for 2000 metrics, with per minute resolution, +retained for a month, Tier 1 needs: 4 bytes x 2000 metrics x 60 minutes per hour x 24 hours per day x 30 days per month += 330MB. + +Tier 2 is by default sampling data every 3600 points of Tier 0 (60 of Tier 1, which is the previous exact Tier). Again +in term of "time" (Tier 0 is per second), then Tier 2 is per hour. + +The storage requirements are the same to Tier 1. + +For 2000 metrics, with per hour resolution, retained for a year, Tier 2 needs: 4 bytes x 2000 metrics x 24 hours per day +x 365 days per year = 67MB. + +## Legacy configuration + +### v1.35.1 and prior + +These versions of the Agent do not support [Tiering](#Tiering). You could change the metric retention for the parent and +all of its children only with the `dbengine multihost disk space MB` setting. This setting accounts the space allocation +for the parent node and all of its children. + +To configure the database engine, look for the `page cache size MB` and `dbengine multihost disk space MB` settings in +the `[db]` section of your `netdata.conf`. + +```conf +[db] + dbengine page cache size MB = 32 + dbengine multihost disk space MB = 256 +``` + +### v1.23.2 and prior + +_For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and +another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five +instances in total (`1 parent + 4 child nodes = 5 instances`). + +The Agent allocates resources for each instance separately using the `dbengine disk space MB` (**deprecated**) setting. +If +`dbengine disk space MB`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, +which means the total disk space required to store all instances is, +roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`. + +#### Backward compatibility + +All existing metrics belonging to child nodes are automatically converted to legacy dbengine instances and the localhost +metrics are transferred to the multihost dbengine instance. + +All new child nodes are automatically transferred to the multihost dbengine instance and share its page cache and disk +space. If you want to migrate a child node from its legacy dbengine instance to the multihost dbengine instance, you +must delete the instance's directory, which is located in `/var/cache/netdata/MACHINE_GUID/dbengine`, after stopping the +Agent. + +##### Information + +For more information about setting `[db].mode` on your nodes, in addition to other streaming configurations, see +[streaming](/streaming/README.md). + +## Requirements & limitations + +### Memory + +Using database mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the +available memory. + +There are explicit memory requirements **per** DB engine **instance**: + +- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what + the user configured with `dbengine page cache size MB`. + + +- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata. + + - roughly speaking this is 3% of the uncompressed disk space taken by the DB files. + + - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space + footprint. + +An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space` +options. + +You can use +our [database engine calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-ram-disk-space-needed-to-store-metrics) +to validate the memory requirements for your particular system(s) and configuration (**out-of-date**). + +### Disk space + +There are explicit disk space requirements **per** DB engine **instance**: + +- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what the + user configured with `dbengine multihost disk space` or `dbengine disk space`. + +### File descriptor + +The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or parent +server). When configuring your system you should make sure there are at least 50 file descriptors available per +`dbengine` instance. + +Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of +the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take +that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the +Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use +`[db].mode = dbengine`. + +If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to +override the Netdata service by running `sudo systemctl edit netdata` and creating a file with contents: + +```sh +[Service] +LimitNOFILE=65536 +``` + +For other types of services one can add the line: + +```sh +ulimit -n 65536 +``` + +at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing +`/etc/sysctl.conf`. For linux that would be: + +```conf +fs.file-max = 65536 +``` + +In FreeBSD and OS X you change the lines like this: + +```conf +kern.maxfilesperproc=65536 +kern.maxfiles=65536 +``` + +You can apply the settings by running `sysctl -p` or by rebooting. + +## Files + +With the DB engine mode the metric data are stored in database files. These files are organized in pairs, the datafiles +and their corresponding journalfiles, e.g.: + +```sh +datafile-1-0000000001.ndf +journalfile-1-0000000001.njf +datafile-1-0000000002.ndf +journalfile-1-0000000002.njf +datafile-1-0000000003.ndf +journalfile-1-0000000003.njf +... +``` + +They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default +location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user +can safely delete some pairs of files when Netdata is stopped to manually free up some space. + +_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ You can also set up +one or more [exporting connectors](/exporting/README.md) to send your Netdata metrics to other databases for long-term +storage at lower granularity. + +## Operation + +The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store +consecutive values generated from the data collectors. Those pages comprise the **Page Cache**. + +When those pages fill up, they are slowly compressed and flushed to disk. It can +take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected every 1 second, to fill a +page. Pages can be cut short when we stop Netdata or the DB engine instance so as to not lose the data. When we query +the DB engine for data we trigger disk read I/O requests that fill the Page Cache with the requested pages and +potentially evict cold (not recently used) +pages. + +When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting +the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated +and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time. + +The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O +traffic so as to create the minimum possible interference with other applications. + +## Evaluation + +We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the web +API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a Netdata +parent node. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a Seagate +Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD. + +For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker +thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis +of the time-series is emulated and accelerated so that the worker threads can generate as many data points as possible +without delays. + +We also defined 32 worker threads that perform queries on random metrics with semi-random time ranges. The starting time +of the query is randomly selected between the beginning of the time-series and the time of the latest data point. The +ending time is randomly selected between 1 second and 1 hour after the starting time. The pseudo-random numbers are +generated with a uniform distribution. + +The data are written to the database at the same time as they are read from it. This is a concurrent read/write mixed +workload with a duration of 60 seconds. The faster `dbengine` runs, the bigger the dataset size becomes since more data +points will be generated. We set a page cache size of 64MiB for the two disk-bound scenarios. This way, the dataset size +of the metric data is much bigger than the RAM that is being used for caching so as to trigger I/O requests most of the +time. In our final scenario, we set the page cache size to 16 GiB. That way, the dataset fits in the page cache so as to +avoid all disk bottlenecks. + +The reported numbers are the following: + +| device | page cache | dataset | reads/sec | writes/sec | +|:------:|:----------:|--------:|----------:|-----------:| +| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M | +| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M | +| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M | + +where "reads/sec" is the number of metric data points being read from the database via its API per second and +"writes/sec" is the number of metric data points being written to the database per second. + +Notice that the HDD numbers are pretty high and not much slower than the SSD numbers. This is thanks to the database +engine design being optimized for rotating media. In the database engine disk I/O requests are: + +- asynchronous to mask the high I/O latency of HDDs. +- mostly large to reduce the amount of HDD seeking time. +- mostly sequential to reduce the amount of HDD seeking time. +- compressed to reduce the amount of required throughput. + +As a result, the HDD is not thousands of times slower than the SSD, which is typical for other workloads. + +An interesting observation to make is that the CPU-bound run (16 GiB page cache) generates fewer data than the SSD run +(6.8 GiB vs 9.8 GiB). The reason is that the 32 reader threads in the SSD scenario are more frequently blocked by I/O, +and generate a read load of 1.7M/sec, whereas in the CPU-bound scenario the read load is 70 times higher at 118M/sec. +Consequently, there is a significant degree of interference by the reader threads, that slow down the writer threads. +This is also possible because the interference effects are greater than the SSD impact on data generation throughput. + + diff --git a/database/engine/datafile.c b/database/engine/datafile.c new file mode 100644 index 0000000..9c70068 --- /dev/null +++ b/database/engine/datafile.c @@ -0,0 +1,460 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +void df_extent_insert(struct extent_info *extent) +{ + struct rrdengine_datafile *datafile = extent->datafile; + + if (likely(NULL != datafile->extents.last)) { + datafile->extents.last->next = extent; + } + if (unlikely(NULL == datafile->extents.first)) { + datafile->extents.first = extent; + } + datafile->extents.last = extent; +} + +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + if (likely(NULL != ctx->datafiles.last)) { + ctx->datafiles.last->next = datafile; + } + if (unlikely(NULL == ctx->datafiles.first)) { + ctx->datafiles.first = datafile; + } + ctx->datafiles.last = datafile; +} + +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + struct rrdengine_datafile *next; + + next = datafile->next; + fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); + ctx->datafiles.first = next; +} + + +static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx, + unsigned tier, unsigned fileno) +{ + fatal_assert(tier == 1); + datafile->tier = tier; + datafile->fileno = fileno; + datafile->file = (uv_file)0; + datafile->pos = 0; + datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */ + datafile->journalfile = NULL; + datafile->next = NULL; + datafile->ctx = ctx; +} + +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +int close_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return ret; +} + +int destroy_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); + if (ret < 0) { + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return ret; +} + +int create_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + datafile->file = file; + ++ctx->stats.datafile_creations; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + memset(superblock, 0, sizeof(*superblock)); + (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ); + superblock->tier = 1; + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + } + uv_fs_req_cleanup(&req); + posix_memfree(superblock); + if (ret < 0) { + destroy_data_file(datafile); + return ret; + } + + datafile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + + return 0; +} + +static int check_data_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || + superblock->tier != 1) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + posix_memfree(superblock); + return ret; +} + +static int load_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_RDWR, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + info("Initializing data file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_CEILING(file_size); + + ret = check_data_file_superblock(file); + if (ret) + goto error; + ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb); + ++ctx->stats.io_read_requests; + + datafile->file = file; + datafile->pos = file_size; + + info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + return 0; + + error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + return error; +} + +static int scan_data_files_cmp(const void *a, const void *b) +{ + struct rrdengine_datafile *file1, *file2; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; + + file1 = *(struct rrdengine_datafile **)a; + file2 = *(struct rrdengine_datafile **)b; + generate_datafilepath(file1, path1, sizeof(path1)); + generate_datafilepath(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of datafiles that were loaded or < 0 on error */ +static int scan_data_files(struct rrdengine_instance *ctx) +{ + int ret; + unsigned tier, no, matched_files, i,failed_to_load; + static uv_fs_t req; + uv_dirent_t dent; + struct rrdengine_datafile **datafiles, *datafile; + struct rrdengine_journalfile *journalfile; + + ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } + info("Found %d files in path %s", ret, ctx->dbfiles_path); + + datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); + ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); + if (2 == ret) { + info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, no); + datafiles[matched_files++] = datafile; + } + } + uv_fs_req_cleanup(&req); + + if (0 == matched_files) { + freez(datafiles); + return 0; + } + if (matched_files == MAX_DATAFILES) { + error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + } + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + /* TODO: change this when tiering is implemented */ + ctx->last_fileno = datafiles[matched_files - 1]->fileno; + + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + uint8_t must_delete_pair = 0; + + datafile = datafiles[i]; + ret = load_data_file(datafile); + if (0 != ret) { + must_delete_pair = 1; + } + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = load_journal_file(ctx, journalfile, datafile); + if (0 != ret) { + if (!must_delete_pair) /* If datafile is still open close it */ + close_data_file(datafile); + must_delete_pair = 1; + } + if (must_delete_pair) { + char path[RRDENG_PATH_MAX]; + + error("Deleting invalid data and journal file pair."); + ret = unlink_journal_file(journalfile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + } + ret = unlink_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + } + freez(journalfile); + freez(datafile); + ++failed_to_load; + continue; + } + + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; + } + matched_files -= failed_to_load; + freez(datafiles); + + return matched_files; +} + +/* Creates a datafile and a journalfile pair */ +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +{ + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + int ret; + char path[RRDENG_PATH_MAX]; + + info("Creating new data and journal files in path %s", ctx->dbfiles_path); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, fileno); + ret = create_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Created data file \"%s\".", path); + } else { + goto error_after_datafile; + } + + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = create_journal_file(journalfile, datafile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Created journal file \"%s\".", path); + } else { + goto error_after_journalfile; + } + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; + + return 0; + +error_after_journalfile: + destroy_data_file(datafile); + freez(journalfile); +error_after_datafile: + freez(datafile); + return ret; +} + +/* Page cache must already be initialized. + * Return 0 on success. + */ +int init_data_files(struct rrdengine_instance *ctx) +{ + int ret; + + ret = scan_data_files(ctx); + if (ret < 0) { + error("Failed to scan path \"%s\".", ctx->dbfiles_path); + return ret; + } else if (0 == ret) { + info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path); + ret = create_new_datafile_pair(ctx, 1, 1); + if (ret) { + error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path); + return ret; + } + ctx->last_fileno = 1; + } + + return 0; +} + +void finalize_data_files(struct rrdengine_instance *ctx) +{ + struct rrdengine_datafile *datafile, *next_datafile; + struct rrdengine_journalfile *journalfile; + struct extent_info *extent, *next_extent; + + for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { + journalfile = datafile->journalfile; + next_datafile = datafile->next; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { + next_extent = extent->next; + freez(extent); + } + close_journal_file(journalfile, datafile); + close_data_file(datafile); + freez(journalfile); + freez(datafile); + } +} diff --git a/database/engine/datafile.h b/database/engine/datafile.h new file mode 100644 index 0000000..1cf256a --- /dev/null +++ b/database/engine/datafile.h @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DATAFILE_H +#define NETDATA_DATAFILE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_datafile; +struct rrdengine_journalfile; +struct rrdengine_instance; + +#define DATAFILE_PREFIX "datafile-" +#define DATAFILE_EXTENSION ".ndf" + +#define MAX_DATAFILE_SIZE (1073741824LU) +#define MIN_DATAFILE_SIZE (4194304LU) +#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ +#define TARGET_DATAFILES (20) + +#define DATAFILE_IDEAL_IO_SIZE (1048576U) + +struct extent_info { + uint64_t offset; + uint32_t size; + uint8_t number_of_pages; + struct rrdengine_datafile *datafile; + struct extent_info *next; + struct rrdeng_page_descr *pages[]; +}; + +struct rrdengine_df_extents { + /* the extent list is sorted based on disk offset */ + struct extent_info *first; + struct extent_info *last; +}; + +/* only one event loop is supported for now */ +struct rrdengine_datafile { + unsigned tier; + unsigned fileno; + uv_file file; + uint64_t pos; + struct rrdengine_instance *ctx; + struct rrdengine_df_extents extents; + struct rrdengine_journalfile *journalfile; + struct rrdengine_datafile *next; +}; + +struct rrdengine_datafile_list { + struct rrdengine_datafile *first; /* oldest */ + struct rrdengine_datafile *last; /* newest */ +}; + +void df_extent_insert(struct extent_info *extent); +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +int close_data_file(struct rrdengine_datafile *datafile); +int unlink_data_file(struct rrdengine_datafile *datafile); +int destroy_data_file(struct rrdengine_datafile *datafile); +int create_data_file(struct rrdengine_datafile *datafile); +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +int init_data_files(struct rrdengine_instance *ctx); +void finalize_data_files(struct rrdengine_instance *ctx); + +#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c new file mode 100644 index 0000000..500dd78 --- /dev/null +++ b/database/engine/journalfile.c @@ -0,0 +1,587 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +static void flush_transaction_buffer_cb(uv_fs_t* req) +{ + struct generic_io_descriptor *io_descr = req->data; + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + + debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + if (req->result < 0) { + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } else { + debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + } + + uv_fs_req_cleanup(req); + posix_memfree(io_descr->buf); + freez(io_descr); +} + +/* Careful to always call this before creating a new journal file */ +void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + struct generic_io_descriptor *io_descr; + unsigned pos, size; + struct rrdengine_journalfile *journalfile; + + if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) { + return; + } + /* care with outstanding transactions when switching journal files */ + journalfile = ctx->datafiles.last->journalfile; + + io_descr = mallocz(sizeof(*io_descr)); + pos = ctx->commit_log.buf_pos; + size = ctx->commit_log.buf_size; + if (pos < size) { + /* simulate an empty transaction to skip the rest of the block */ + *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING; + } + io_descr->buf = ctx->commit_log.buf; + io_descr->bytes = size; + io_descr->pos = journalfile->pos; + io_descr->req.data = io_descr; + io_descr->completion = NULL; + + io_descr->iov = uv_buf_init((void *)io_descr->buf, size); + ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, + journalfile->pos, flush_transaction_buffer_cb); + fatal_assert(-1 != ret); + journalfile->pos += RRDENG_BLOCK_SIZE; + ctx->disk_space += RRDENG_BLOCK_SIZE; + ctx->commit_log.buf = NULL; + ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE; + ++ctx->stats.io_write_requests; +} + +void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + unsigned buf_pos = 0, buf_size; + + fatal_assert(size); + if (ctx->commit_log.buf) { + unsigned remaining; + + buf_pos = ctx->commit_log.buf_pos; + buf_size = ctx->commit_log.buf_size; + remaining = buf_size - buf_pos; + if (size > remaining) { + /* we need a new buffer */ + wal_flush_transaction_buffer(wc); + } + } + if (NULL == ctx->commit_log.buf) { + buf_size = ALIGN_BYTES_CEILING(size); + ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + memset(ctx->commit_log.buf, 0, buf_size); + buf_pos = ctx->commit_log.buf_pos = 0; + ctx->commit_log.buf_size = buf_size; + } + ctx->commit_log.buf_pos += size; + + return ctx->commit_log.buf + buf_pos; +} + +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + journalfile->file = (uv_file)0; + journalfile->pos = 0; + journalfile->datafile = datafile; +} + +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_journal_file(struct rrdengine_journalfile *journalfile) +{ + struct rrdengine_datafile *datafile = journalfile->datafile; + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return ret; +} + +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); + if (ret < 0) { + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return ret; +} + +int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + journalfile->file = file; + ++ctx->stats.journalfile_creations; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + memset(superblock, 0, sizeof(*superblock)); + (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ); + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + } + uv_fs_req_cleanup(&req); + posix_memfree(superblock); + if (ret < 0) { + destroy_journal_file(journalfile, datafile); + return ret; + } + + journalfile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + + return 0; +} + +static int check_journal_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + posix_memfree(superblock); + return ret; +} + +static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, unsigned max_size) +{ + static BITMAP256 page_error_map; + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned i, count, payload_length, descr_size, valid_pages; + struct rrdeng_page_descr *descr; + struct extent_info *extent; + /* persistent structures */ + struct rrdeng_jf_store_data *jf_metric_data; + + jf_metric_data = buf; + count = jf_metric_data->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + if (payload_length > max_size) { + error("Corrupted transaction payload."); + return; + } + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + extent->offset = jf_metric_data->extent_offset; + extent->size = jf_metric_data->extent_size; + extent->datafile = journalfile->datafile; + extent->next = NULL; + + for (i = 0, valid_pages = 0 ; i < count ; ++i) { + uuid_t *temp_id; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + uint8_t page_type = jf_metric_data->descr[i].type; + + if (page_type > PAGE_TYPE_MAX) { + if (!bitmap256_get_bit(&page_error_map, page_type)) { + error("Unknown page type %d encountered.", page_type); + bitmap256_set_bit(&page_error_map, page_type, 1); + } + continue; + } + uint64_t start_time_ut = jf_metric_data->descr[i].start_time_ut; + uint64_t end_time_ut = jf_metric_data->descr[i].end_time_ut; + size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type]; + time_t update_every_s = (entries > 1) ? ((end_time_ut - start_time_ut) / USEC_PER_SEC / (entries - 1)) : 0; + + if (unlikely(start_time_ut > end_time_ut)) { + ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut = end_time_ut; + continue; + } + + if (unlikely(start_time_ut == end_time_ut && entries != 1)) { + ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut = end_time_ut; + continue; + } + + if (unlikely(!entries)) { + ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut = end_time_ut; + continue; + } + + if(entries > 1 && update_every_s == 0) { + ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut = end_time_ut; + continue; + } + + if(start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1) != end_time_ut) { + ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut = end_time_ut; + + // let this be + // end_time_ut = start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1); + } + + temp_id = (uuid_t *)jf_metric_data->descr[i].uuid; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + /* First time we see the UUID */ + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(temp_id, ctx); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + + descr = pg_cache_create_descr(); + descr->page_length = jf_metric_data->descr[i].page_length; + descr->start_time_ut = start_time_ut; + descr->end_time_ut = end_time_ut; + descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s); + descr->id = &page_index->id; + descr->extent = extent; + descr->type = page_type; + extent->pages[valid_pages++] = descr; + pg_cache_insert(ctx, page_index, descr); + + if(page_index->latest_time_ut == descr->end_time_ut) + page_index->latest_update_every_s = descr->update_every_s; + + if(descr->update_every_s == 0) + fatal( + "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", + (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries); + } + + extent->number_of_pages = valid_pages; + + if (likely(valid_pages)) + df_extent_insert(extent); + else { + freez(extent); + ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++; + } +} + +/* + * Replays transaction by interpreting up to max_size bytes from buf. + * Sets id to the current transaction id or to 0 if unknown. + * Returns size of transaction record or 0 for unknown size. + */ +static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, uint64_t *id, unsigned max_size) +{ + unsigned payload_length, size_bytes; + int ret; + /* persistent structures */ + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + *id = 0; + jf_header = buf; + if (STORE_PADDING == jf_header->type) { + debug(D_RRDENGINE, "Skipping padding."); + return 0; + } + if (sizeof(*jf_header) > max_size) { + error("Corrupted transaction record, skipping."); + return 0; + } + *id = jf_header->id; + payload_length = jf_header->payload_length; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + if (size_bytes > max_size) { + error("Corrupted transaction record, skipping."); + return 0; + } + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + ret = crc32cmp(jf_trailer->checksum, crc); + debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); + if (unlikely(ret)) { + error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); + return size_bytes; + } + switch (jf_header->type) { + case STORE_DATA: + debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + break; + default: + error("Unknown transaction type. Skipping record."); + break; + } + + return size_bytes; +} + + +#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256) +/* + * Iterates journal file transactions and populates the page cache. + * Page cache must already be initialized. + * Returns the maximum transaction id it discovered. + */ +static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) +{ + uv_file file; + uint64_t file_size;//, data_file_size; + int ret; + uint64_t pos, pos_i, max_id, id; + unsigned size_bytes; + void *buf; + uv_buf_t iov; + uv_fs_t req; + + file = journalfile->file; + file_size = journalfile->pos; + //data_file_size = journalfile->datafile->pos; TODO: utilize this? + + max_id = 1; + bool journal_is_mmapped = (journalfile->data != NULL); + if (unlikely(!journal_is_mmapped)) { + ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES); + if (unlikely(ret)) + fatal("posix_memalign:%s", strerror(ret)); + } + else + buf = journalfile->data + sizeof(struct rrdeng_jf_sb); + for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) { + size_bytes = MIN(READAHEAD_BYTES, file_size - pos); + if (unlikely(!journal_is_mmapped)) { + iov = uv_buf_init(buf, size_bytes); + ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); + if (ret < 0) { + error("uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto skip_file; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + ++ctx->stats.io_read_requests; + ctx->stats.io_read_bytes += size_bytes; + } + + for (pos_i = 0 ; pos_i < size_bytes ; ) { + unsigned max_size; + + max_size = pos + size_bytes - pos_i; + ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size); + if (!ret) /* TODO: support transactions bigger than 4K */ + /* unknown transaction size, move on to the next block */ + pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE); + else + pos_i += ret; + max_id = MAX(max_id, id); + } + if (likely(journal_is_mmapped)) + buf += size_bytes; + } +skip_file: + if (unlikely(!journal_is_mmapped)) + posix_memfree(buf); + return max_id; +} + +int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile) +{ + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size, max_id; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_RDWR, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + info("Loading journal file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_FLOOR(file_size); + + ret = check_journal_file_superblock(file); + if (ret) + goto error; + ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb); + ++ctx->stats.io_read_requests; + + journalfile->file = file; + journalfile->pos = file_size; + journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0); + info("Loading journal file \"%s\" using %s.", path, journalfile->data?"MMAP":"uv_fs_read"); + + max_id = iterate_transactions(ctx, journalfile); + + ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1); + + info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size); + if (likely(journalfile->data)) + netdata_munmap(journalfile->data, file_size); + return 0; + + error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + return error; +} + +void init_commit_log(struct rrdengine_instance *ctx) +{ + ctx->commit_log.buf = NULL; + ctx->commit_log.buf_pos = 0; + ctx->commit_log.transaction_id = 1; +} diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h new file mode 100644 index 0000000..011c506 --- /dev/null +++ b/database/engine/journalfile.h @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_JOURNALFILE_H +#define NETDATA_JOURNALFILE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_instance; +struct rrdengine_worker_config; +struct rrdengine_datafile; +struct rrdengine_journalfile; + +#define WALFILE_PREFIX "journalfile-" +#define WALFILE_EXTENSION ".njf" + + +/* only one event loop is supported for now */ +struct rrdengine_journalfile { + uv_file file; + uint64_t pos; + void *data; + struct rrdengine_datafile *datafile; +}; + +/* only one event loop is supported for now */ +struct transaction_commit_log { + uint64_t transaction_id; + + /* outstanding transaction buffer */ + void *buf; + unsigned buf_pos; + unsigned buf_size; +}; + +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); +void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +int unlink_journal_file(struct rrdengine_journalfile *journalfile); +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile); +void init_commit_log(struct rrdengine_instance *ctx); + + +#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md new file mode 100644 index 0000000..e69de29 diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c new file mode 100644 index 0000000..4f5da70 --- /dev/null +++ b/database/engine/pagecache.c @@ -0,0 +1,1313 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" + +ARAL page_descr_aral = { + .requested_element_size = sizeof(struct rrdeng_page_descr), + .initial_elements = 20000, + .filename = "page_descriptors", + .cache_dir = &netdata_configured_cache_dir, + .use_mmap = false, + .internal.initialized = false +}; + +void rrdeng_page_descr_aral_go_singlethreaded(void) { + page_descr_aral.internal.lockless = true; +} +void rrdeng_page_descr_aral_go_multithreaded(void) { + page_descr_aral.internal.lockless = false; +} + +struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void) { + struct rrdeng_page_descr *descr; + descr = arrayalloc_mallocz(&page_descr_aral); + return descr; +} + +void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr) { + arrayalloc_freez(&page_descr_aral, descr); +} + +void rrdeng_page_descr_use_malloc(void) { + if(page_descr_aral.internal.initialized) + error("DBENGINE: cannot change ARAL allocation policy after it has been initialized."); + else + page_descr_aral.use_mmap = false; +} + +void rrdeng_page_descr_use_mmap(void) { + if(page_descr_aral.internal.initialized) + error("DBENGINE: cannot change ARAL allocation policy after it has been initialized."); + else + page_descr_aral.use_mmap = true; +} + +bool rrdeng_page_descr_is_mmap(void) { + return page_descr_aral.use_mmap; +} + +/* Forward declarations */ +static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx); + +/* always inserts into tail */ +static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if (likely(NULL != pg_cache->replaceQ.tail)) { + pg_cache_descr->prev = pg_cache->replaceQ.tail; + pg_cache->replaceQ.tail->next = pg_cache_descr; + } + if (unlikely(NULL == pg_cache->replaceQ.head)) { + pg_cache->replaceQ.head = pg_cache_descr; + } + pg_cache->replaceQ.tail = pg_cache_descr; +} + +static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next; + + prev = pg_cache_descr->prev; + next = pg_cache_descr->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) { + pg_cache->replaceQ.head = next; + } + if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) { + pg_cache->replaceQ.tail = prev; + } + pg_cache_descr->prev = pg_cache_descr->next = NULL; +} + +void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_insert_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} + +void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} +void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + pg_cache_replaceQ_insert_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} + +struct rrdeng_page_descr *pg_cache_create_descr(void) +{ + struct rrdeng_page_descr *descr; + + descr = rrdeng_page_descr_mallocz(); + descr->page_length = 0; + descr->start_time_ut = INVALID_TIME; + descr->end_time_ut = INVALID_TIME; + descr->id = NULL; + descr->extent = NULL; + descr->pg_cache_descr_state = 0; + descr->pg_cache_descr = NULL; + descr->update_every_s = 0; + + return descr; +} + +/* The caller must hold page descriptor lock. */ +void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->waiters) + uv_cond_broadcast(&pg_cache_descr->cond); +} + +void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); +} + +/* + * The caller must hold page descriptor lock. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + */ +void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + ++pg_cache_descr->waiters; + uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex); + --pg_cache_descr->waiters; +} + +/* + * The caller must hold page descriptor lock. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + * Returns UV_ETIMEDOUT if timeout_sec seconds pass. + */ +int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec) +{ + int ret; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + ++pg_cache_descr->waiters; + ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC); + --pg_cache_descr->waiters; + + return ret; +} + +/* + * Returns page flags. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + */ +unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + unsigned long flags; + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_wait_event_unsafe(descr); + flags = pg_cache_descr->flags; + rrdeng_page_descr_mutex_unlock(ctx, descr); + + return flags; +} + +/* + * The caller must hold page descriptor lock. + */ +int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || + (exclusive_access && pg_cache_descr->refcnt)) { + return 0; + } + + return 1; +} + +/* + * The caller must hold page descriptor lock. + * Gets a reference to the page descriptor. + * Returns 1 on success and 0 on failure. + */ +int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if (!pg_cache_can_get_unsafe(descr, exclusive_access)) + return 0; + + if (exclusive_access) + pg_cache_descr->flags |= RRD_PAGE_LOCKED; + ++pg_cache_descr->refcnt; + + return 1; +} + +/* + * The caller must hold the page descriptor lock. + * This function may block doing cleanup. + */ +void pg_cache_put_unsafe(struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + if (0 == --pg_cache_descr->refcnt) { + pg_cache_wake_up_waiters_unsafe(descr); + } +} + +/* + * This function may block doing cleanup. + */ +void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_put_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); +} + +/* The caller must hold the page cache lock */ +static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->populated_pages -= number; +} + +static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + pg_cache_release_pages_unsafe(ctx, number); + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +/* + * This function returns the maximum number of pages allowed in the page cache. + */ +unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) +{ + return ctx->max_cache_pages + (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the + * number of pages below that number. + */ +unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) +{ + return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page + * cache. + */ +unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx) +{ + /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */ + return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function will block until it reserves #number populated pages. + * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit. + */ +static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned failures = 0; + const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */ + unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10; + + assert(number < ctx->max_cache_pages); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) + debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==", + number); + while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) { + + if (!pg_cache_try_evict_one_page_unsafe(ctx)) { + /* failed to evict */ + struct completion compl; + struct rrdeng_cmd cmd; + + ++failures; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + completion_init(&compl); + cmd.opcode = RRDENG_FLUSH_PAGES; + cmd.completion = &compl; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + /* wait for some pages to be flushed */ + debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__); + completion_wait_for(&compl); + completion_destroy(&compl); + + if (unlikely(failures > 1)) { + unsigned long slots, usecs_to_sleep; + /* exponential backoff */ + slots = random() % (2LU << MIN(failures, FAILURES_CEILING)); + usecs_to_sleep = slots * exp_backoff_slot_usec; + + if (usecs_to_sleep >= USEC_PER_SEC) + error("Page cache is full. Sleeping for %llu second(s).", usecs_to_sleep / USEC_PER_SEC); + + (void)sleep_usec(usecs_to_sleep); + } + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + } + } + pg_cache->populated_pages += number; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +/* + * This function will attempt to reserve #number populated pages. + * It may trigger evictions if the pg_cache_soft_limit() limit is hit. + * Returns 0 on failure and 1 on success. + */ +static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned count = 0; + int ret = 0; + + assert(number < ctx->max_cache_pages); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) { + debug(D_RRDENGINE, + "==Page cache full. Trying to reserve %u pages.==", + number); + do { + if (!pg_cache_try_evict_one_page_unsafe(ctx)) + break; + ++count; + } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1); + debug(D_RRDENGINE, "Evicted %u pages.", count); + } + + if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) { + pg_cache->populated_pages += number; + ret = 1; /* success */ + } + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + return ret; +} + +/* The caller must hold the page cache and the page descriptor locks in that order */ +static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + dbengine_page_free(pg_cache_descr->page); + pg_cache_descr->page = NULL; + pg_cache_descr->flags &= ~RRD_PAGE_POPULATED; + pg_cache_release_pages_unsafe(ctx, 1); + ++ctx->stats.pg_cache_evictions; +} + +/* + * The caller must hold the page cache lock. + * Lock order: page cache -> replaceQ -> page descriptor + * This function iterates all pages and tries to evict one. + * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress, + * or it sets it to NULL if no write-back is in progress. + * + * Returns 1 on success and 0 on failure. + */ +static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned long old_flags; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr = NULL; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) { + descr = pg_cache_descr->descr; + + rrdeng_page_descr_mutex_lock(ctx, descr); + old_flags = pg_cache_descr->flags; + if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) { + /* must evict */ + pg_cache_evict_unsafe(ctx, descr); + pg_cache_put_unsafe(descr); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + + rrdeng_page_descr_mutex_unlock(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + + return 1; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + + /* failed to evict */ + return 0; +} + +/** + * Deletes a page from the database. + * Callers of this function need to make sure they're not deleting the same descriptor concurrently. + * @param ctx is the database instance. + * @param descr is the page descriptor. + * @param remove_dirty must be non-zero if the page to be deleted is dirty. + * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference. + * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not + * NULL. Otherwise, metric_id is not set. + * @return 1 if it's safe to delete the metric, 0 otherwise. + */ +uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, + uint8_t is_exclusive_holder, uuid_t *metric_id) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = NULL; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + int ret; + uint8_t can_delete_metric = 0; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); + fatal_assert(NULL != PValue); + page_index = *PValue; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + uv_rwlock_wrlock(&page_index->lock); + ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0); + if (unlikely(0 == ret)) { + uv_rwlock_wrunlock(&page_index->lock); + if (unlikely(debug_flags & D_RRDENGINE)) { + print_page_descr(descr); + } + goto destroy; + } + --page_index->page_count; + if (!page_index->writers && !page_index->page_count) { + can_delete_metric = 1; + if (metric_id) { + memcpy(metric_id, page_index->id, sizeof(uuid_t)); + } + } + uv_rwlock_wrunlock(&page_index->lock); + fatal_assert(1 == ret); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + ++ctx->stats.pg_cache_deletions; + --pg_cache->page_descriptors; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!is_exclusive_holder) { + /* If we don't hold an exclusive page reference get one */ + while (!pg_cache_try_get_unsafe(descr, 1)) { + debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + pg_cache_wait_event_unsafe(descr); + } + } + if (remove_dirty) { + pg_cache_descr->flags &= ~RRD_PAGE_DIRTY; + } else { + /* even a locked page could be dirty */ + while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { + debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + pg_cache_wait_event_unsafe(descr); + } + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + + while (unlikely(pg_cache_descr->flags & RRD_PAGE_READ_PENDING)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "%s: Found page with READ PENDING, waiting for read to complete", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + pg_cache_wait_event_unsafe(descr); + } + + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + /* only after locking can it be safely deleted from LRU */ + pg_cache_replaceQ_delete(ctx, descr); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + pg_cache_evict_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + } + pg_cache_put(ctx, descr); + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */ + (void)sleep_usec(1000); /* 1 msec */ + } +destroy: + rrdeng_page_descr_freez(descr); + pg_cache_update_metric_times(page_index); + + return can_delete_metric; +} + +static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time) +{ + usec_t pg_start, pg_end; + + pg_start = descr->start_time_ut; + pg_end = descr->end_time_ut; + + return (pg_start < start_time && pg_end >= start_time) || + (pg_start >= start_time && pg_start <= end_time); +} + +static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time) +{ + return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut); +} + +/* The caller must hold the page index lock */ +static inline struct rrdeng_page_descr * + find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time) +{ + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + return NULL; +} + +/* Update metric oldest and latest timestamps efficiently when adding new values */ +void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) +{ + usec_t oldest_time = page_index->oldest_time_ut; + usec_t latest_time = page_index->latest_time_ut; + + if (unlikely(oldest_time == INVALID_TIME || descr->start_time_ut < oldest_time)) { + page_index->oldest_time_ut = descr->start_time_ut; + } + if (likely(descr->end_time_ut > latest_time || latest_time == INVALID_TIME)) { + page_index->latest_time_ut = descr->end_time_ut; + } +} + +/* Update metric oldest and latest timestamps when removing old values */ +void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) +{ + Pvoid_t *firstPValue, *lastPValue; + Word_t firstIndex, lastIndex; + struct rrdeng_page_descr *descr; + usec_t oldest_time = INVALID_TIME; + usec_t latest_time = INVALID_TIME; + + uv_rwlock_rdlock(&page_index->lock); + /* Find first page in range */ + firstIndex = (Word_t)0; + firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0); + if (likely(NULL != firstPValue)) { + descr = *firstPValue; + oldest_time = descr->start_time_ut; + } + lastIndex = (Word_t)-1; + lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0); + if (likely(NULL != lastPValue)) { + descr = *lastPValue; + latest_time = descr->end_time_ut; + } + uv_rwlock_rdunlock(&page_index->lock); + + if (unlikely(NULL == firstPValue)) { + fatal_assert(NULL == lastPValue); + page_index->oldest_time_ut = page_index->latest_time_ut = INVALID_TIME; + return; + } + page_index->oldest_time_ut = oldest_time; + page_index->latest_time_ut = latest_time; +} + +/* If index is NULL lookup by UUID (descr->id) */ +void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + unsigned long pg_cache_descr_state = descr->pg_cache_descr_state; + + if (0 != pg_cache_descr_state) { + /* there is page cache descriptor pre-allocated state */ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + pg_cache_reserve_pages(ctx, 1); + if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_insert(ctx, descr); + } + } + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); + fatal_assert(NULL != PValue); + page_index = *PValue; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + } else { + page_index = index; + } + + uv_rwlock_wrlock(&page_index->lock); + PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0); + *PValue = descr; + ++page_index->page_count; + pg_cache_add_new_metric_time(page_index, descr); + uv_rwlock_wrunlock(&page_index->lock); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + ++ctx->stats.pg_cache_insertions; + ++pg_cache->page_descriptors; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return INVALID_TIME; + } + + uv_rwlock_rdlock(&page_index->lock); + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); + if (NULL == descr) { + uv_rwlock_rdunlock(&page_index->lock); + return INVALID_TIME; + } + uv_rwlock_rdunlock(&page_index->lock); + return descr->start_time_ut; +} + +/** + * Return page information for the first page before point_in_time that satisfies the filter. + * @param ctx DB context + * @param page_index page index of a metric + * @param point_in_time_ut the pages that are searched must be older than this timestamp + * @param filter decides if the page satisfies the caller's criteria + * @param page_info the result of the search is set in this pointer + */ +void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter, + struct rrdeng_page_info *page_info) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + (void)pg_cache; + fatal_assert(NULL != page_index); + + Index = (Word_t)(point_in_time_ut / USEC_PER_SEC); + uv_rwlock_rdlock(&page_index->lock); + do { + PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } while (descr != NULL && !filter(descr)); + if (unlikely(NULL == descr)) { + page_info->page_length = 0; + page_info->start_time_ut = INVALID_TIME; + page_info->end_time_ut = INVALID_TIME; + } else { + page_info->page_length = descr->page_length; + page_info->start_time_ut = descr->start_time_ut; + page_info->end_time_ut = descr->end_time_ut; + } + uv_rwlock_rdunlock(&page_index->lock); +} + +/** + * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference. + * @param ctx DB context + * @param id lookup by UUID + * @param start_time_ut exact starting time in usec + * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. + * @return the page descriptor or NULL on failure. It can fail if: + * 1. The page is already allocated to the page cache. + * 2. It did not succeed to get a reference. + * 3. It did not succeed to reserve a spot in the page cache. + */ +struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id, + usec_t start_time_ut) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + Word_t Index; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) { + /* Failed to find page or failed to reserve a spot in the cache */ + return NULL; + } + + uv_rwlock_rdlock(&page_index->lock); + Index = (Word_t)(start_time_ut / USEC_PER_SEC); + PValue = JudyLGet(page_index->JudyL_array, Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + } + if (NULL == PValue || 0 == descr->page_length) { + /* Failed to find non-empty page */ + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + uv_rwlock_rdunlock(&page_index->lock); + + if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) { + /* Failed to get reference or page is already populated */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + + return descr; +} + +/** + * Searches for pages in a time range and triggers disk I/O if necessary and possible. + * Does not get a reference. + * @param ctx DB context + * @param id UUID + * @param start_time_ut inclusive starting time in usec + * @param end_time_ut inclusive ending time in usec + * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap + * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez(). + * If page_info_arrayp is set to NULL nothing was allocated. + * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. + * @return the number of pages that overlap with the time range [start_time,end_time]. + */ +unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut, + struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned i, j, k, preload_count, count, page_info_array_max_size; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + Word_t Index; + uint8_t failed_to_reserve; + + fatal_assert(NULL != ret_page_indexp); + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + *ret_page_indexp = page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); + *ret_page_indexp = NULL; + return 0; + } + + uv_rwlock_rdlock(&page_index->lock); + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); + if (NULL == descr) { + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); + *ret_page_indexp = NULL; + return 0; + } else { + Index = (Word_t)(descr->start_time_ut / USEC_PER_SEC); + } + if (page_info_arrayp) { + page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = mallocz(page_info_array_max_size); + } + + for (count = 0, preload_count = 0 ; + descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ; + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + /* Iterate all pages in range */ + + if (unlikely(0 == descr->page_length)) + continue; + if (page_info_arrayp) { + if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) { + page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size); + } + (*page_info_arrayp)[count].start_time_ut = descr->start_time_ut; + (*page_info_arrayp)[count].end_time_ut = descr->end_time_ut; + (*page_info_arrayp)[count].page_length = descr->page_length; + } + ++count; + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if (pg_cache_can_get_unsafe(descr, 0)) { + if (flags & RRD_PAGE_POPULATED) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + continue; + } + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + preload_array[preload_count++] = descr; + if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) { + rrdeng_page_descr_mutex_unlock(ctx, descr); + break; + } + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + + } + uv_rwlock_rdunlock(&page_index->lock); + + failed_to_reserve = 0; + for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) { + struct rrdeng_cmd cmd; + struct rrdeng_page_descr *next; + + descr = preload_array[i]; + if (NULL == descr) { + continue; + } + if (!pg_cache_try_reserve_pages(ctx, 1)) { + failed_to_reserve = 1; + break; + } + cmd.opcode = RRDENG_READ_EXTENT; + cmd.read_extent.page_cache_descr[0] = descr; + /* don't use this page again */ + preload_array[i] = NULL; + for (j = 0, k = 1 ; j < preload_count ; ++j) { + next = preload_array[j]; + if (NULL == next) { + continue; + } + if (descr->extent == next->extent) { + /* same extent, consolidate */ + if (!pg_cache_try_reserve_pages(ctx, 1)) { + failed_to_reserve = 1; + break; + } + cmd.read_extent.page_cache_descr[k++] = next; + /* don't use this page again */ + preload_array[j] = NULL; + } + } + cmd.read_extent.page_count = k; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + } + if (failed_to_reserve) { + debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__); + for (i = 0 ; i < preload_count ; ++i) { + descr = preload_array[i]; + if (NULL == descr) { + continue; + } + pg_cache_put(ctx, descr); + } + } + if (!preload_count) { + /* no such page */ + debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__); + } + if (unlikely(0 == count && page_info_arrayp)) { + freez(*page_info_arrayp); + *page_info_arrayp = NULL; + } + return count; +} + +/* + * Searches for a page and gets a reference. + * When point_in_time is INVALID_TIME get any page. + * If index is NULL lookup by UUID (id). + */ +struct rrdeng_page_descr * + pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t point_in_time_ut) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + Word_t Index; + uint8_t page_not_in_cache; + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return NULL; + } + } else { + page_index = index; + } + pg_cache_reserve_pages(ctx, 1); + + page_not_in_cache = 0; + uv_rwlock_rdlock(&page_index->lock); + while (1) { + Index = (Word_t)(point_in_time_ut / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + } + if (NULL == PValue || + 0 == descr->page_length || + (INVALID_TIME != point_in_time_ut && + !is_point_in_time_in_page(descr, point_in_time_ut))) { + /* non-empty page not found */ + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + break; + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + struct rrdeng_cmd cmd; + + uv_rwlock_rdunlock(&page_index->lock); + + cmd.opcode = RRDENG_READ_PAGE; + cmd.read_page.page_cache_descr = descr; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { + pg_cache_wait_event_unsafe(descr); + } + /* success */ + /* Downgrade exclusive reference to allow other readers */ + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + return descr; + } + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + if (!(flags & RRD_PAGE_POPULATED)) + page_not_in_cache = 1; + pg_cache_wait_event_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + + /* reset scan to find again */ + uv_rwlock_rdlock(&page_index->lock); + } + uv_rwlock_rdunlock(&page_index->lock); + + if (!(flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_set_hot(ctx, descr); + pg_cache_release_pages(ctx, 1); + if (page_not_in_cache) + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + else + rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); + return descr; +} + +/* + * Searches for the first page between start_time and end_time and gets a reference. + * start_time and end_time are inclusive. + * If index is NULL lookup by UUID (id). + */ +struct rrdeng_page_descr * +pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t start_time_ut, usec_t end_time_ut) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + uint8_t page_not_in_cache; + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return NULL; + } + } else { + page_index = index; + } + pg_cache_reserve_pages(ctx, 1); + + page_not_in_cache = 0; + uv_rwlock_rdlock(&page_index->lock); + int retry_count = 0; + while (1) { + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); + if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) { + /* non-empty page not found */ + if (retry_count == default_rrdeng_page_fetch_retries) + error_report("Page cache timeout while waiting for page %p : returning FAIL", descr); + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + break; + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + struct rrdeng_cmd cmd; + + uv_rwlock_rdunlock(&page_index->lock); + + cmd.opcode = RRDENG_READ_PAGE; + cmd.read_page.page_cache_descr = descr; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { + pg_cache_wait_event_unsafe(descr); + } + /* success */ + /* Downgrade exclusive reference to allow other readers */ + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + return descr; + } + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + if (!(flags & RRD_PAGE_POPULATED)) + page_not_in_cache = 1; + + if (pg_cache_timedwait_event_unsafe(descr, default_rrdeng_page_fetch_timeout) == UV_ETIMEDOUT) { + error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count); + ++retry_count; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + + /* reset scan to find again */ + uv_rwlock_rdlock(&page_index->lock); + } + uv_rwlock_rdunlock(&page_index->lock); + + if (!(flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_set_hot(ctx, descr); + pg_cache_release_pages(ctx, 1); + if (page_not_in_cache) + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + else + rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); + return descr; +} + +struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx) +{ + struct pg_cache_page_index *page_index; + + page_index = mallocz(sizeof(*page_index)); + page_index->JudyL_array = (Pvoid_t) NULL; + uuid_copy(page_index->id, *id); + fatal_assert(0 == uv_rwlock_init(&page_index->lock)); + page_index->oldest_time_ut = INVALID_TIME; + page_index->latest_time_ut = INVALID_TIME; + page_index->prev = NULL; + page_index->page_count = 0; + page_index->refcount = 0; + page_index->writers = 0; + page_index->ctx = ctx; + page_index->latest_update_every_s = default_rrd_update_every; + + return page_index; +} + +static void init_metrics_index(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; + pg_cache->metrics_index.last_page_index = NULL; + fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); +} + +static void init_replaceQ(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->replaceQ.head = NULL; + pg_cache->replaceQ.tail = NULL; + fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); +} + +static void init_committed_page_index(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL; + fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock)); + pg_cache->committed_page_index.latest_corr_id = 0; + pg_cache->committed_page_index.nr_committed_pages = 0; +} + +void init_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->page_descriptors = 0; + pg_cache->populated_pages = 0; + fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); + + init_metrics_index(ctx); + init_replaceQ(ctx); + init_committed_page_index(ctx); +} + +void free_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index, *prev_page_index; + Word_t Index; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + // if we are exiting, the OS will recover all memory so do not slow down the shutdown process + // Do the cleanup if we are compiling with NETDATA_INTERNAL_CHECKS + // This affects the reporting of dbengine statistics which are available in real time + // via the /api/v1/dbengine_stats endpoint +#ifndef NETDATA_DBENGINE_FREE + if (netdata_exit) + return; +#endif + Word_t metrics_index_bytes = 0, pages_index_bytes = 0, pages_dirty_index_bytes = 0; + + /* Free committed page index */ + pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); + fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array); + + for (page_index = pg_cache->metrics_index.last_page_index ; + page_index != NULL ; + page_index = prev_page_index) { + + prev_page_index = page_index->prev; + + /* Find first page in range */ + Index = (Word_t) 0; + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + + while (descr != NULL) { + /* Iterate all page descriptors of this metric */ + + if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + /* Check rrdenglocking.c */ + pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + dbengine_page_free(pg_cache_descr->page); + } + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + } + rrdeng_page_descr_freez(descr); + + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } + + /* Free page index */ + pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0); + fatal_assert(NULL == page_index->JudyL_array); + freez(page_index); + } + /* Free metrics index */ + metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); + fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); + info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes); +} diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h new file mode 100644 index 0000000..635b021 --- /dev/null +++ b/database/engine/pagecache.h @@ -0,0 +1,254 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_PAGECACHE_H +#define NETDATA_PAGECACHE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_instance; +struct extent_info; +struct rrdeng_page_descr; + +#define INVALID_TIME (0) +#define MAX_PAGE_CACHE_FETCH_RETRIES (3) +#define PAGE_CACHE_FETCH_WAIT_TIMEOUT (3) + +/* Page flags */ +#define RRD_PAGE_DIRTY (1LU << 0) +#define RRD_PAGE_LOCKED (1LU << 1) +#define RRD_PAGE_READ_PENDING (1LU << 2) +#define RRD_PAGE_WRITE_PENDING (1LU << 3) +#define RRD_PAGE_POPULATED (1LU << 4) + +struct page_cache_descr { + struct rrdeng_page_descr *descr; /* parent descriptor */ + void *page; + unsigned long flags; + struct page_cache_descr *prev; /* LRU */ + struct page_cache_descr *next; /* LRU */ + + unsigned refcnt; + uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */ + uv_cond_t cond; + unsigned waiters; +}; + +/* Page cache descriptor flags, state = 0 means no descriptor */ +#define PG_CACHE_DESCR_ALLOCATED (1LU << 0) +#define PG_CACHE_DESCR_DESTROY (1LU << 1) +#define PG_CACHE_DESCR_LOCKED (1LU << 2) +#define PG_CACHE_DESCR_SHIFT (3) +#define PG_CACHE_DESCR_USERS_MASK (((unsigned long)-1) << PG_CACHE_DESCR_SHIFT) +#define PG_CACHE_DESCR_FLAGS_MASK (((unsigned long)-1) >> (BITS_PER_ULONG - PG_CACHE_DESCR_SHIFT)) + +/* + * Page cache descriptor state bits (works for both 32-bit and 64-bit architectures): + * + * 63 ... 31 ... 3 | 2 | 1 | 0| + * -----------------------------+------------+------------+-----------| + * number of descriptor users | DESTROY | LOCKED | ALLOCATED | + */ +struct rrdeng_page_descr { + uuid_t *id; /* never changes */ + struct extent_info *extent; + + /* points to ephemeral page cache descriptor if the page resides in the cache */ + struct page_cache_descr *pg_cache_descr; + + /* Compare-And-Swap target for page cache descriptor allocation algorithm */ + volatile unsigned long pg_cache_descr_state; + + /* page information */ + usec_t start_time_ut; + usec_t end_time_ut; + uint32_t update_every_s:24; + uint8_t type; + uint32_t page_length; +}; + +#define PAGE_INFO_SCRATCH_SZ (8) +struct rrdeng_page_info { + uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */ + + usec_t start_time_ut; + usec_t end_time_ut; + uint32_t page_length; +}; + +/* returns 1 for success, 0 for failure */ +typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *); + +#define PAGE_CACHE_MAX_PRELOAD_PAGES (256) + +struct pg_alignment { + uint32_t page_length; + uint32_t refcount; +}; + +/* maps time ranges to pages */ +struct pg_cache_page_index { + uuid_t id; + /* + * care: JudyL_array indices are converted from useconds to seconds to fit in one word in 32-bit architectures + * TODO: examine if we want to support better granularity than seconds + */ + Pvoid_t JudyL_array; + Word_t page_count; + unsigned short refcount; + unsigned short writers; + uv_rwlock_t lock; + + /* + * Only one effective writer, data deletion workqueue. + * It's also written during the DB loading phase. + */ + usec_t oldest_time_ut; + + /* + * Only one effective writer, data collection thread. + * It's also written by the data deletion workqueue when data collection is disabled for this metric. + */ + usec_t latest_time_ut; + + struct rrdengine_instance *ctx; + uint32_t latest_update_every_s; + + struct pg_cache_page_index *prev; +}; + +/* maps UUIDs to page indices */ +struct pg_cache_metrics_index { + uv_rwlock_t lock; + Pvoid_t JudyHS_array; + struct pg_cache_page_index *last_page_index; +}; + +/* gathers dirty pages to be written on disk */ +struct pg_cache_committed_page_index { + uv_rwlock_t lock; + + Pvoid_t JudyL_array; + + /* + * Dirty page correlation ID is a hint. Dirty pages that are correlated should have + * a small correlation ID difference. Dirty pages in memory should never have the + * same ID at the same time for correctness. + */ + Word_t latest_corr_id; + + unsigned nr_committed_pages; +}; + +/* + * Gathers populated pages to be evicted. + * Relies on page cache descriptors being there as it uses their memory. + */ +struct pg_cache_replaceQ { + uv_rwlock_t lock; /* LRU lock */ + + struct page_cache_descr *head; /* LRU */ + struct page_cache_descr *tail; /* MRU */ +}; + +struct page_cache { /* TODO: add statistics */ + uv_rwlock_t pg_cache_rwlock; /* page cache lock */ + + struct pg_cache_metrics_index metrics_index; + struct pg_cache_committed_page_index committed_page_index; + struct pg_cache_replaceQ replaceQ; + + unsigned page_descriptors; + unsigned populated_pages; +}; + +void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr); +void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr); +unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr); +void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr); +void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr); +struct rrdeng_page_descr *pg_cache_create_descr(void); +int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access); +void pg_cache_put_unsafe(struct rrdeng_page_descr *descr); +void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, + struct rrdeng_page_descr *descr); +uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, + uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id); +usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, + usec_t start_time_ut, usec_t end_time_ut); +void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter, + struct rrdeng_page_info *page_info); +struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id, + usec_t start_time_ut); +unsigned + pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut, + struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp); +struct rrdeng_page_descr * + pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t point_in_time_ut); +struct rrdeng_page_descr * + pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t start_time_ut, usec_t end_time_ut); +struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx); +void init_page_cache(struct rrdengine_instance *ctx); +void free_page_cache(struct rrdengine_instance *ctx); +void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr); +void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); +unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx); +unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx); +unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx); + +void rrdeng_page_descr_aral_go_singlethreaded(void); +void rrdeng_page_descr_aral_go_multithreaded(void); +void rrdeng_page_descr_use_malloc(void); +void rrdeng_page_descr_use_mmap(void); +bool rrdeng_page_descr_is_mmap(void); +struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void); +void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr); + +static inline void + pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_time_ut_p, uint32_t *page_lengthp) +{ + usec_t end_time_ut, old_end_time_ut; + uint32_t page_length; + + if (NULL == descr->extent) { + /* this page is currently being modified, get consistent info locklessly */ + do { + end_time_ut = descr->end_time_ut; + __sync_synchronize(); + old_end_time_ut = end_time_ut; + page_length = descr->page_length; + __sync_synchronize(); + end_time_ut = descr->end_time_ut; + __sync_synchronize(); + } while ((end_time_ut != old_end_time_ut || (end_time_ut & 1) != 0)); + + *end_time_ut_p = end_time_ut; + *page_lengthp = page_length; + } else { + *end_time_ut_p = descr->end_time_ut; + *page_lengthp = descr->page_length; + } +} + +/* The caller must hold a reference to the page and must have already set the new data */ +static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time_ut, uint32_t page_length) +{ + fatal_assert(!(end_time_ut & 1)); + __sync_synchronize(); + descr->end_time_ut |= 1; /* mark start of uncertainty period by adding 1 microsecond */ + __sync_synchronize(); + descr->page_length = page_length; + __sync_synchronize(); + descr->end_time_ut = end_time_ut; /* mark end of uncertainty period */ +} + +#endif /* NETDATA_PAGECACHE_H */ diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h new file mode 100644 index 0000000..5b4be94 --- /dev/null +++ b/database/engine/rrddiskprotocol.h @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDDISKPROTOCOL_H +#define NETDATA_RRDDISKPROTOCOL_H + +#define RRDENG_BLOCK_SIZE (4096) +#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE + +#define RRDENG_MAGIC_SZ (32) +#define RRDENG_DF_MAGIC "netdata-data-file" +#define RRDENG_JF_MAGIC "netdata-journal-file" + +#define RRDENG_VER_SZ (16) +#define RRDENG_DF_VER "1.0" +#define RRDENG_JF_VER "1.0" + +#define UUID_SZ (16) +#define CHECKSUM_SZ (4) /* CRC32 */ + +#define RRD_NO_COMPRESSION (0) +#define RRD_LZ4 (1) + +#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t))) +/* + * Data file persistent super-block + */ +struct rrdeng_df_sb { + char magic_number[RRDENG_MAGIC_SZ]; + char version[RRDENG_VER_SZ]; + uint8_t tier; + uint8_t padding[RRDENG_DF_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Page types + */ +#define PAGE_METRICS (0) +#define PAGE_TIER (1) +#define PAGE_TYPE_MAX 1 // Maximum page type (inclusive) + +/* + * Data file page descriptor + */ +struct rrdeng_extent_page_descr { + uint8_t type; + + uint8_t uuid[UUID_SZ]; + uint32_t page_length; + uint64_t start_time_ut; + uint64_t end_time_ut; +} __attribute__ ((packed)); + +/* + * Data file extent header + */ +struct rrdeng_df_extent_header { + uint32_t payload_length; + uint8_t compression_algorithm; + uint8_t number_of_pages; + /* #number_of_pages page descriptors follow */ + struct rrdeng_extent_page_descr descr[]; +} __attribute__ ((packed)); + +/* + * Data file extent trailer + */ +struct rrdeng_df_extent_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +#define RRDENG_JF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ)) +/* + * Journal file super-block + */ +struct rrdeng_jf_sb { + char magic_number[RRDENG_MAGIC_SZ]; + char version[RRDENG_VER_SZ]; + uint8_t padding[RRDENG_JF_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Transaction record types + */ +#define STORE_PADDING (0) +#define STORE_DATA (1) +#define STORE_LOGS (2) /* reserved */ + +/* + * Journal file transaction record header + */ +struct rrdeng_jf_transaction_header { + /* when set to STORE_PADDING jump to start of next block */ + uint8_t type; + + uint32_t reserved; /* reserved for future use */ + uint64_t id; + uint16_t payload_length; +} __attribute__ ((packed)); + +/* + * Journal file transaction record trailer + */ +struct rrdeng_jf_transaction_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +/* + * Journal file STORE_DATA action + */ +struct rrdeng_jf_store_data { + /* data file extent information */ + uint64_t extent_offset; + uint32_t extent_size; + + uint8_t number_of_pages; + /* #number_of_pages page descriptors follow */ + struct rrdeng_extent_page_descr descr[]; +} __attribute__ ((packed)); + +#endif /* NETDATA_RRDDISKPROTOCOL_H */ \ No newline at end of file diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c new file mode 100644 index 0000000..a6840f3 --- /dev/null +++ b/database/engine/rrdengine.c @@ -0,0 +1,1508 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" + +rrdeng_stats_t global_io_errors = 0; +rrdeng_stats_t global_fs_errors = 0; +rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; +rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; +rrdeng_stats_t global_flushing_pressure_page_deletions = 0; + +unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2) +#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) +#endif + +void *dbengine_page_alloc() { + void *page = NULL; + if (unlikely(db_engine_use_malloc)) + page = mallocz(RRDENG_BLOCK_SIZE); + else { + page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); + if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); + } + return page; +} + +void dbengine_page_free(void *page) { + if (unlikely(db_engine_use_malloc)) + freez(page); + else + netdata_munmap(page, RRDENG_BLOCK_SIZE); +} + +static void sanity_check(void) +{ + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)); + + /* Magic numbers must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); + + /* Version strings must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ); + + /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */ + BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0); + + BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */ + + /* page count must fit in 8 bits */ + BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); + + /* extent cache count must fit in 32 bits */ + BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); + + /* page info scratch space must be able to hold 2 32-bit integers */ + BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); +} + +/* always inserts into tail */ +static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + + xt_cache_elem->prev = NULL; + xt_cache_elem->next = NULL; + + if (likely(NULL != xt_cache->replaceQ_tail)) { + xt_cache_elem->prev = xt_cache->replaceQ_tail; + xt_cache->replaceQ_tail->next = xt_cache_elem; + } + if (unlikely(NULL == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = xt_cache_elem; + } + xt_cache->replaceQ_tail = xt_cache_elem; +} + +static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *prev, *next; + + prev = xt_cache_elem->prev; + next = xt_cache_elem->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = next; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) { + xt_cache->replaceQ_tail = prev; + } + xt_cache_elem->prev = xt_cache_elem->next = NULL; +} + +static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_replaceQ_insert(wc, xt_cache_elem); +} + +/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */ +static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned idx; + int ret; + + ret = find_first_zero(xt_cache->allocation_bitmap); + if (-1 == ret || ret >= MAX_CACHED_EXTENTS) { + for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) { + idx = xt_cache_elem - xt_cache->extent_array; + if (!check_bit(xt_cache->inflight_bitmap, idx)) { + xt_cache_replaceQ_delete(wc, xt_cache_elem); + break; + } + } + if (NULL == xt_cache_elem) + return -1; + } else { + idx = (unsigned)ret; + xt_cache_elem = &xt_cache->extent_array[idx]; + } + xt_cache_elem->extent = extent; + xt_cache_elem->fileno = extent->datafile->fileno; + xt_cache_elem->inflight_io_descr = NULL; + xt_cache_replaceQ_insert(wc, xt_cache_elem); + modify_bit(&xt_cache->allocation_bitmap, idx, 1); + + return (int)idx; +} + +/** + * Returns 0 if the cached extent was found in the extent cache, 1 otherwise. + * Sets *idx to point to the position of the extent inside the cache. + **/ +static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned i; + + for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) { + xt_cache_elem = &xt_cache->extent_array[i]; + if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent && + xt_cache_elem->fileno == extent->datafile->fileno) { + *idx = i; + return 0; + } + } + return 1; +} + +#if 0 /* disabled code */ +static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + + xt_cache_elem = &xt_cache->extent_array[idx]; + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_elem->extent = NULL; + modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */ + modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */ +} +#endif + +void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx, + struct extent_io_descriptor *xt_io_descr) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + struct extent_io_descriptor *old_next; + + xt_cache_elem = &xt_cache->extent_array[idx]; + old_next = xt_cache_elem->inflight_io_descr->next; + xt_cache_elem->inflight_io_descr->next = xt_io_descr; + xt_io_descr->next = old_next; +} + +void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr) +{ + unsigned i, j, page_offset; + struct rrdengine_instance *ctx = wc->ctx; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + void *page; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = dbengine_page_alloc(); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*extent->pages[j]->id, *descr->id) && + extent->pages[j]->page_length == descr->page_length && + extent->pages[j]->start_time_ut == descr->start_time_ut && + extent->pages[j]->end_time_ut == descr->end_time_ut) { + break; + } + page_offset += extent->pages[j]->page_length; + + } + /* care, we don't hold the descriptor mutex */ + (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); + if (xt_io_descr->release_descr) { + pg_cache_put(ctx, descr); + } else { + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); + } + } + if (xt_io_descr->completion) + completion_mark_complete(xt_io_descr->completion); + freez(xt_io_descr); +} + +static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) { + switch(type) { + case PAGE_METRICS: { + storage_number n = pack_storage_number(NAN, SN_FLAG_NONE); + storage_number *array = (storage_number *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + case PAGE_TIER: { + storage_number_tier1_t n = { + .min_value = NAN, + .max_value = NAN, + .sum_value = NAN, + .count = 1, + .anomaly_count = 0, + }; + storage_number_tier1_t *array = (storage_number_tier1_t *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type); + logged = true; + } + memset(page, 0, page_length); + } + } +} + +struct rrdeng_page_descr *get_descriptor(struct pg_cache_page_index *page_index, time_t start_time_s) +{ + uv_rwlock_rdlock(&page_index->lock); + Pvoid_t *PValue = JudyLGet(page_index->JudyL_array, start_time_s, PJE0); + struct rrdeng_page_descr *descr = unlikely(NULL == PValue) ? NULL : *PValue; + uv_rwlock_rdunlock(&page_index->lock); + return descr; +}; + +static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + int ret; + unsigned i, j, count; + void *page, *uncompressed_buf = NULL; + uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0; + uint8_t have_read_error = 0; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + header = xt_io_descr->buf; + payload_length = header->payload_length; + count = header->number_of_pages; + payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; + trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); + + if (unlikely(read_failed)) { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + have_read_error = 1; + error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos, + xt_io_descr->bytes, datafile->tier, datafile->fileno); + goto after_crc_check; + } + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); + ret = crc32cmp(trailer->checksum, crc); +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + } +#endif + if (unlikely(ret)) { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + have_read_error = 1; + error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + } + +after_crc_check: + if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { + uncompressed_payload_length = 0; + for (i = 0 ; i < count ; ++i) { + uncompressed_payload_length += header->descr[i].page_length; + } + uncompressed_buf = mallocz(uncompressed_payload_length); + ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf, + payload_length, uncompressed_payload_length); + ctx->stats.before_decompress_bytes += payload_length; + ctx->stats.after_decompress_bytes += ret; + debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); + /* care, we don't hold the descriptor mutex */ + } + { + uint8_t xt_is_cached = 0; + unsigned xt_idx; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) { + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx]; + struct extent_io_descriptor *curr, *next; + + if (have_read_error) { + memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages)); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length); + } else { + (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length); + } + /* complete all connected in-flight read requests */ + for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) { + next = curr->next; + read_cached_extent_cb(wc, xt_idx, curr); + } + xt_cache_elem->inflight_io_descr = NULL; + modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */ + } + } + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, xt_io_descr->descr_array[0]->id, sizeof(uuid_t)); + struct pg_cache_page_index *page_index = likely( NULL != PValue) ? *PValue : NULL; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + + for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) { + uint8_t is_prefetched_page; + descr = NULL; + for (j = 0 ; j < xt_io_descr->descr_count; ++j) { + struct rrdeng_page_descr descrj; + + descrj = xt_io_descr->descr_read_array[j]; + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj.id) && + header->descr[i].page_length == descrj.page_length && + header->descr[i].start_time_ut == descrj.start_time_ut && + header->descr[i].end_time_ut == descrj.end_time_ut) { + //descr = descrj; + descr = get_descriptor(page_index, (time_t) (descrj.start_time_ut / USEC_PER_SEC)); + if (unlikely(!descr)) { + error_limit_static_thread_var(erl, 1, 0); + error_limit(&erl, "%s: Required descriptor is not in the page index anymore", __FUNCTION__); + } + break; + } + } + is_prefetched_page = 0; + if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */ + descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid, + header->descr[i].start_time_ut); + if (!descr) + continue; /* Failed to reserve a suitable page */ + is_prefetched_page = 1; + } + page = dbengine_page_alloc(); + + /* care, we don't hold the descriptor mutex */ + if (have_read_error) { + fill_page_with_nulls(page, descr->page_length, descr->type); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); + } else { + (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); + if (xt_io_descr->release_descr || is_prefetched_page) { + pg_cache_put(ctx, descr); + } else { + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); + } + } + if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { + freez(uncompressed_buf); + } + if (xt_io_descr->completion) + completion_mark_complete(xt_io_descr->completion); +} + +static void read_extent_cb(uv_fs_t *req) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct extent_io_descriptor *xt_io_descr; + + xt_io_descr = req->data; + do_extent_processing(wc, xt_io_descr, req->result < 0); + uv_fs_req_cleanup(req); + posix_memfree(xt_io_descr->buf); + freez(xt_io_descr); +} + +static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct extent_io_descriptor *xt_io_descr; + xt_io_descr = req->data; + + if (likely(xt_io_descr->map_base)) { + do_extent_processing(wc, xt_io_descr, false); + munmap(xt_io_descr->map_base, xt_io_descr->map_length); + freez(xt_io_descr); + return; + } + + // MMAP failed, so do uv_fs_read + int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + xt_io_descr->req.data = xt_io_descr; + ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb); + fatal_assert(-1 != ret); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; +} + +static void do_mmap_read_extent(uv_work_t *req) +{ + struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data; + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + + off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos); + size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start; + unsigned real_io_size = xt_io_descr->bytes; + + void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start); + if (likely(data != MAP_FAILED)) { + xt_io_descr->map_base = data; + xt_io_descr->map_length = length; + xt_io_descr->buf = data + (xt_io_descr->pos - map_start); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; + } +} + +static void do_read_extent(struct rrdengine_worker_config* wc, + struct rrdeng_page_descr **descr, + unsigned count, + uint8_t release_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache_descr *pg_cache_descr; + int ret; + unsigned i, size_bytes, pos; + struct extent_io_descriptor *xt_io_descr; + struct rrdengine_datafile *datafile; + struct extent_info *extent = descr[0]->extent; + uint8_t xt_is_cached = 0, xt_is_inflight = 0; + unsigned xt_idx; + + datafile = extent->datafile; + pos = extent->offset; + size_bytes = extent->size; + + xt_io_descr = callocz(1, sizeof(*xt_io_descr)); + for (i = 0 ; i < count; ++i) { + rrdeng_page_descr_mutex_lock(ctx, descr[i]); + pg_cache_descr = descr[i]->pg_cache_descr; + pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr[i]); + xt_io_descr->descr_array[i] = descr[i]; + xt_io_descr->descr_read_array[i] = *(descr[i]); + } + xt_io_descr->descr_count = count; + xt_io_descr->file = datafile->file; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = pos; + xt_io_descr->req_worker.data = xt_io_descr; + xt_io_descr->completion = NULL; + xt_io_descr->release_descr = release_descr; + xt_io_descr->buf = NULL; + + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached) { + xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]); + xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx); + if (xt_is_inflight) { + enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr); + return; + } + return read_cached_extent_cb(wc, xt_idx, xt_io_descr); + } else { + ret = try_insert_into_xt_cache(wc, extent); + if (-1 != ret) { + xt_idx = (unsigned)ret; + modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1); + wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr; + } + } + + ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb); + fatal_assert(-1 != ret); + + ++ctx->stats.io_read_requests; + ++ctx->stats.io_read_extents; + ctx->stats.pg_cache_backfills += count; +} + +static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + unsigned count, payload_length, descr_size, size_bytes; + void *buf; + /* persistent structures */ + struct rrdeng_df_extent_header *df_header; + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_store_data *jf_metric_data; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + df_header = xt_io_descr->buf; + count = df_header->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + + buf = wal_get_transaction_buffer(wc, size_bytes); + + jf_header = buf; + jf_header->type = STORE_DATA; + jf_header->reserved = 0; + jf_header->id = ctx->commit_log.transaction_id++; + jf_header->payload_length = payload_length; + + jf_metric_data = buf + sizeof(*jf_header); + jf_metric_data->extent_offset = xt_io_descr->pos; + jf_metric_data->extent_size = xt_io_descr->bytes; + jf_metric_data->number_of_pages = count; + memcpy(jf_metric_data->descr, df_header->descr, descr_size); + + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + crc32set(jf_trailer->checksum, crc); +} + +static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data) +{ + switch (type) { + case STORE_DATA: + commit_data_extent(wc, (struct extent_io_descriptor *)data); + break; + default: + fatal_assert(type == STORE_DATA); + break; + } +} + +static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + int error; + + error = uv_thread_join(wc->now_invalidating_dirty_pages); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; +} + +static void invalidate_oldest_committed(void *arg) +{ + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config *wc = &ctx->worker_config; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + Pvoid_t *PValue; + Word_t Index; + unsigned nr_committed_pages; + + do { + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + for (Index = 0, + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue; + + descr != NULL; + + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + fatal_assert(0 != descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) { + rrdeng_page_descr_mutex_unlock(ctx, descr); + + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); + fatal_assert(1 == ret); + break; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (!descr) { + info("Failed to invalidate any dirty pages to relieve page cache pressure."); + + goto out; + } + pg_cache_punch_hole(ctx, descr, 1, 1, NULL); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1); + rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1); + + } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)); +out: + wc->cleanup_thread_invalidating_dirty_pages = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned nr_committed_pages; + int error; + + if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */ + return; + + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + + if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* delete the oldest page in memory */ + if (wc->now_invalidating_dirty_pages) { + /* already deleting a page */ + return; + } + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path); + + wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages)); + wc->cleanup_thread_invalidating_dirty_pages = 0; + + error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + } + } +} + +void flush_pages_cb(uv_fs_t* req) +{ + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct extent_io_descriptor *xt_io_descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + unsigned i, count; + + xt_io_descr = req->data; + if (req->result < 0) { + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", + __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + } +#endif + count = xt_io_descr->descr_count; + for (i = 0 ; i < count ; ++i) { + /* care, we don't hold the descriptor mutex */ + descr = xt_io_descr->descr_array[i]; + + pg_cache_replaceQ_insert(ctx, descr); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); + /* wake up waiters, care no reference being held */ + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + if (xt_io_descr->completion) + completion_mark_complete(xt_io_descr->completion); + uv_fs_req_cleanup(req); + posix_memfree(xt_io_descr->buf); + freez(xt_io_descr); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + pg_cache->committed_page_index.nr_committed_pages -= count; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + wc->inflight_dirty_pages -= count; +} + +/* + * completion must be NULL or valid. + * Returns 0 when no flushing can take place. + * Returns datafile bytes to be written on successful flushing initiation. + */ +static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + int compressed_size, max_compressed_size = 0; + unsigned i, count, size_bytes, pos, real_io_size; + uint32_t uncompressed_payload_length, payload_offset; + struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct page_cache_descr *pg_cache_descr; + struct extent_io_descriptor *xt_io_descr; + void *compressed_buf = NULL; + Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; + Pvoid_t *PValue; + Word_t Index; + uint8_t compression_algorithm = ctx->global_compress_alg; + struct extent_info *extent; + struct rrdengine_datafile *datafile; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + if (force) { + debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); + } + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + for (Index = 0, count = 0, uncompressed_payload_length = 0, + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue ; + + descr != NULL && count != rrdeng_pages_per_extent; + + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + uint8_t page_write_pending; + + fatal_assert(0 != descr->page_length); + page_write_pending = 0; + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) { + page_write_pending = 1; + /* care, no reference being held */ + pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING; + uncompressed_payload_length += descr->page_length; + descr_commit_idx_array[count] = Index; + eligible_pages[count++] = descr; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + + if (page_write_pending) { + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); + fatal_assert(1 == ret); + } + } + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (!count) { + debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); + if (completion) + completion_mark_complete(completion); + return 0; + } + wc->inflight_dirty_pages += count; + + xt_io_descr = mallocz(sizeof(*xt_io_descr)); + payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); + break; + default: /* Compress */ + fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); + max_compressed_size = LZ4_compressBound(uncompressed_payload_length); + compressed_buf = mallocz(max_compressed_size); + size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); + break; + } + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* freez(xt_io_descr);*/ + } + memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes)); + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); + xt_io_descr->descr_count = count; + + pos = 0; + header = xt_io_descr->buf; + header->compression_algorithm = compression_algorithm; + header->number_of_pages = count; + pos += sizeof(*header); + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */ + extent->offset = datafile->pos; + extent->number_of_pages = count; + extent->datafile = datafile; + extent->next = NULL; + + for (i = 0 ; i < count ; ++i) { + /* This is here for performance reasons */ + xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; + + descr = xt_io_descr->descr_array[i]; + header->descr[i].type = descr->type; + uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); + header->descr[i].page_length = descr->page_length; + header->descr[i].start_time_ut = descr->start_time_ut; + header->descr[i].end_time_ut = descr->end_time_ut; + pos += sizeof(header->descr[i]); + } + for (i = 0 ; i < count ; ++i) { + descr = xt_io_descr->descr_array[i]; + /* care, we don't hold the descriptor mutex */ + (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length); + descr->extent = extent; + extent->pages[i] = descr; + + pos += descr->page_length; + } + df_extent_insert(extent); + + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + header->payload_length = uncompressed_payload_length; + break; + default: /* Compress */ + compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf, + uncompressed_payload_length, max_compressed_size); + ctx->stats.before_compress_bytes += uncompressed_payload_length; + ctx->stats.after_compress_bytes += compressed_size; + debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); + (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); + freez(compressed_buf); + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + header->payload_length = compressed_size; + break; + } + extent->size = size_bytes; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = datafile->pos; + xt_io_descr->req.data = xt_io_descr; + xt_io_descr->completion = completion; + + trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer)); + crc32set(trailer->checksum, crc); + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); + fatal_assert(-1 != ret); + ctx->stats.io_write_bytes += real_io_size; + ++ctx->stats.io_write_requests; + ctx->stats.io_write_extent_bytes += real_io_size; + ++ctx->stats.io_write_extents; + do_commit_transaction(wc, STORE_DATA, xt_io_descr); + datafile->pos += ALIGN_BYTES_CEILING(size_bytes); + ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes); + rrdeng_test_quota(wc); + + return ALIGN_BYTES_CEILING(size_bytes); +} + +static void after_delete_old_data(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + unsigned deleted_bytes, journalfile_bytes, datafile_bytes; + int ret, error; + char path[RRDENG_PATH_MAX]; + + datafile = ctx->datafiles.first; + journalfile = datafile->journalfile; + datafile_bytes = datafile->pos; + journalfile_bytes = journalfile->pos; + deleted_bytes = 0; + + info("Deleting data and journal file pair."); + datafile_list_delete(ctx, datafile); + ret = destroy_journal_file(journalfile, datafile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + deleted_bytes += journalfile_bytes; + } + ret = destroy_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + deleted_bytes += datafile_bytes; + } + freez(journalfile); + freez(datafile); + + ctx->disk_space -= deleted_bytes; + info("Reclaimed %u bytes of disk space.", deleted_bytes); + + error = uv_thread_join(wc->now_deleting_files); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_deleting_files); + /* unfreeze command processing */ + wc->now_deleting_files = NULL; + + wc->cleanup_thread_deleting_files = 0; + rrdcontext_db_rotation(); + + /* interrupt event loop */ + uv_stop(wc->loop); +} + +static void delete_old_data(void *arg) +{ + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config* wc = &ctx->worker_config; + struct rrdengine_datafile *datafile; + struct extent_info *extent, *next; + struct rrdeng_page_descr *descr; + unsigned count, i; + uint8_t can_delete_metric; + uuid_t metric_id; + + /* Safe to use since it will be deleted after we are done */ + datafile = ctx->datafiles.first; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next) { + count = extent->number_of_pages; + for (i = 0 ; i < count ; ++i) { + descr = extent->pages[i]; + can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id); + if (unlikely(can_delete_metric)) { + /* + * If the metric is empty, has no active writers and if the metadata log has been initialized then + * attempt to delete the corresponding netdata dimension. + */ + metaqueue_delete_dimension_uuid(&metric_id); + } + } + next = extent->next; + freez(extent); + } + wc->cleanup_thread_deleting_files = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +void rrdeng_test_quota(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct rrdengine_datafile *datafile; + unsigned current_size, target_size; + uint8_t out_of_space, only_one_datafile; + int ret, error; + + out_of_space = 0; + /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */ + if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) { + out_of_space = 1; + } + datafile = ctx->datafiles.last; + current_size = datafile->pos; + target_size = ctx->max_disk_space / TARGET_DATAFILES; + target_size = MIN(target_size, MAX_DATAFILE_SIZE); + target_size = MAX(target_size, MIN_DATAFILE_SIZE); + only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0; + if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { + /* Finalize data and journal file and create a new pair */ + wal_flush_transaction_buffer(wc); + ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1); + if (likely(!ret)) { + ++ctx->last_fileno; + } + } + if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) { + /* delete old data */ + if (wc->now_deleting_files) { + /* already deleting data */ + return; + } + if (NULL == ctx->datafiles.first->next) { + error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\"" + " to reclaim space, there are no other file pairs left.", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + return; + } + info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files)); + wc->cleanup_thread_deleting_files = 0; + + error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_deleting_files); + wc->now_deleting_files = NULL; + } + } +} + +static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc) +{ + if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) { + return 1; + } + return 0; +} + +static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + + if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) { + after_invalidate_oldest_committed(wc); + } + if (unlikely(wc->cleanup_thread_deleting_files)) { + after_delete_old_data(wc); + } + if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) { + ctx->quiesce = QUIESCED; + completion_mark_complete(&ctx->rrdengine_completion); + } +} + +/* return 0 on success */ +int init_rrd_files(struct rrdengine_instance *ctx) +{ + int ret = init_data_files(ctx); + + BUFFER *wb = buffer_create(1000); + size_t all_errors = 0; + usec_t now = now_realtime_usec(); + + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) { + buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) { + buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) { + buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter + ); + all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter; + } + + if(all_errors) + info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb)); + + buffer_free(wb); + return ret; +} + +void finalize_rrd_files(struct rrdengine_instance *ctx) +{ + return finalize_data_files(ctx); +} + +void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) +{ + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + wc->queue_size = 0; + fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); + fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); +} + +void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) +{ + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&wc->cmd_mutex); + while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { + uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); + } + fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); + /* enqueue command */ + wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; + wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.tail + 1 : 0; + wc->queue_size = queue_size + 1; + uv_mutex_unlock(&wc->cmd_mutex); + + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) +{ + struct rrdeng_cmd ret; + unsigned queue_size; + + uv_mutex_lock(&wc->cmd_mutex); + queue_size = wc->queue_size; + if (queue_size == 0) { + ret.opcode = RRDENG_NOOP; + } else { + /* dequeue command */ + ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; + if (queue_size == 1) { + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + } else { + wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.head + 1 : 0; + } + wc->queue_size = queue_size - 1; + + /* wake up producers */ + uv_cond_signal(&wc->cmd_cond); + } + uv_mutex_unlock(&wc->cmd_mutex); + + return ret; +} + +static void load_configuration_dynamic(void) +{ + unsigned read_num = (unsigned)config_get_number(CONFIG_SECTION_DB, "dbengine pages per extent", MAX_PAGES_PER_EXTENT); + if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) + rrdeng_pages_per_extent = read_num; + else { + error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent); + config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent); + } +} + +void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); + uv_update_time(handle->loop); + debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); +} + +/* Flushes dirty pages when timer expires */ +#define TIMER_PERIOD_MS (1000) + +void timer_cb(uv_timer_t* handle) +{ + worker_is_busy(RRDENG_MAX_OPCODE + 1); + + struct rrdengine_worker_config* wc = handle->data; + struct rrdengine_instance *ctx = wc->ctx; + + uv_stop(handle->loop); + uv_update_time(handle->loop); + rrdeng_test_quota(wc); + debug(D_RRDENGINE, "%s: timeout reached.", __func__); + if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { + /* There is free space so we can write to disk and we are not actively deleting dirty buffers */ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark, + high_watermark; + + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + producers = ctx->metric_API_max_producers; + /* are flushable pages more than 25% of the maximum page cache size */ + high_watermark = (ctx->max_cache_pages * 25LLU) / 100; + low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */ + + /* Flush more pages only if disk can keep up */ + if (wc->inflight_dirty_pages < high_watermark + producers) { + if (nr_committed_pages > producers && + /* committed to be written pages are more than the produced number */ + nr_committed_pages - producers > high_watermark) { + /* Flushing speed must increase to stop page cache from filling with dirty pages */ + bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; + } + bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); + + debug(D_RRDENGINE, "Flushing pages to disk."); + for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL); + bytes_written && (total_bytes < bytes_to_write); + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 0, NULL); + } + } + } + load_configuration_dynamic(); +#ifdef NETDATA_INTERNAL_CHECKS + { + char buf[4096]; + debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); + } +#endif + + worker_is_idle(); +} + +#define MAX_CMD_BATCH_SIZE (256) + +void rrdeng_worker(void* arg) +{ + worker_register("DBENGINE"); + worker_register_job_name(RRDENG_NOOP, "noop"); + worker_register_job_name(RRDENG_READ_PAGE, "page read"); + worker_register_job_name(RRDENG_READ_EXTENT, "extent read"); + worker_register_job_name(RRDENG_COMMIT_PAGE, "commit"); + worker_register_job_name(RRDENG_FLUSH_PAGES, "flush"); + worker_register_job_name(RRDENG_SHUTDOWN, "shutdown"); + worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru"); + worker_register_job_name(RRDENG_QUIESCE, "quiesce"); + worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup"); + worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer"); + + struct rrdengine_worker_config* wc = arg; + struct rrdengine_instance *ctx = wc->ctx; + uv_loop_t* loop; + int shutdown, ret; + enum rrdeng_opcode opcode; + uv_timer_t timer_req; + struct rrdeng_cmd cmd; + unsigned cmd_batch_size; + + rrdeng_init_cmd_queue(wc); + + loop = wc->loop = mallocz(sizeof(uv_loop_t)); + ret = uv_loop_init(loop); + if (ret) { + error("uv_loop_init(): %s", uv_strerror(ret)); + goto error_after_loop_init; + } + loop->data = wc; + + ret = uv_async_init(wc->loop, &wc->async, async_cb); + if (ret) { + error("uv_async_init(): %s", uv_strerror(ret)); + goto error_after_async_init; + } + wc->async.data = wc; + + wc->now_deleting_files = NULL; + wc->cleanup_thread_deleting_files = 0; + + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; + wc->inflight_dirty_pages = 0; + + /* dirty page flushing timer */ + ret = uv_timer_init(loop, &timer_req); + if (ret) { + error("uv_timer_init(): %s", uv_strerror(ret)); + goto error_after_timer_init; + } + timer_req.data = wc; + + wc->error = 0; + /* wake up initialization thread */ + completion_mark_complete(&ctx->rrdengine_completion); + + fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); + shutdown = 0; + int set_name = 0; + while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { + worker_is_idle(); + uv_run(loop, UV_RUN_DEFAULT); + worker_is_busy(RRDENG_MAX_OPCODE); + rrdeng_cleanup_finished_threads(wc); + + /* wait for commands */ + cmd_batch_size = 0; + do { + /* + * Avoid starving the loop when there are too many commands coming in. + * timer_cb will interrupt the loop again to allow serving more commands. + */ + if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) + break; + + cmd = rrdeng_deq_cmd(wc); + opcode = cmd.opcode; + ++cmd_batch_size; + + if(likely(opcode != RRDENG_NOOP)) + worker_is_busy(opcode); + + switch (opcode) { + case RRDENG_NOOP: + /* the command queue was empty, do nothing */ + break; + case RRDENG_SHUTDOWN: + shutdown = 1; + break; + case RRDENG_QUIESCE: + ctx->drop_metrics_under_page_cache_pressure = 0; + ctx->quiesce = SET_QUIESCE; + fatal_assert(0 == uv_timer_stop(&timer_req)); + uv_close((uv_handle_t *)&timer_req, NULL); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all committed pages. */ + } + wal_flush_transaction_buffer(wc); + if (!rrdeng_threads_alive(wc)) { + ctx->quiesce = QUIESCED; + completion_mark_complete(&ctx->rrdengine_completion); + } + break; + case RRDENG_READ_PAGE: + do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); + break; + case RRDENG_READ_EXTENT: + do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); + if (unlikely(!set_name)) { + set_name = 1; + uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); + } + break; + case RRDENG_COMMIT_PAGE: + do_commit_transaction(wc, STORE_DATA, NULL); + break; + case RRDENG_FLUSH_PAGES: { + if (wc->now_invalidating_dirty_pages) { + /* Do not flush if the disk cannot keep up */ + completion_mark_complete(cmd.completion); + } else { + (void)do_flush_pages(wc, 1, cmd.completion); + } + break; + case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE: + rrdeng_invalidate_oldest_committed(wc); + break; + } + default: + debug(D_RRDENGINE, "%s: default.", __func__); + break; + } + } while (opcode != RRDENG_NOOP); + } + + /* cleanup operations of the event loop */ + info("Shutting down RRD engine event loop for tier %d", ctx->tier); + + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour and we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all committed pages. */ + } + wal_flush_transaction_buffer(wc); + uv_run(loop, UV_RUN_DEFAULT); + + info("Shutting down RRD engine event loop for tier %d complete", ctx->tier); + /* TODO: don't let the API block by waiting to enqueue commands */ + uv_cond_destroy(&wc->cmd_cond); +/* uv_mutex_destroy(&wc->cmd_mutex); */ + fatal_assert(0 == uv_loop_close(loop)); + freez(loop); + + worker_unregister(); + return; + +error_after_timer_init: + uv_close((uv_handle_t *)&wc->async, NULL); +error_after_async_init: + fatal_assert(0 == uv_loop_close(loop)); +error_after_loop_init: + freez(loop); + + wc->error = UV_EAGAIN; + /* wake up initialization thread */ + completion_mark_complete(&ctx->rrdengine_completion); + worker_unregister(); +} + +/* C entry point for development purposes + * make "LDFLAGS=-errdengine_main" + */ +void rrdengine_main(void) +{ + int ret; + struct rrdengine_instance *ctx; + + sanity_check(); + ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB, 0); + if (ret) { + exit(ret); + } + rrdeng_exit(ctx); + fprintf(stderr, "Hello world!"); + exit(0); +} diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h new file mode 100644 index 0000000..521d252 --- /dev/null +++ b/database/engine/rrdengine.h @@ -0,0 +1,283 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINE_H +#define NETDATA_RRDENGINE_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include +#include +#include +#include +#include +#include "daemon/common.h" +#include "../rrd.h" +#include "rrddiskprotocol.h" +#include "rrdenginelib.h" +#include "datafile.h" +#include "journalfile.h" +#include "rrdengineapi.h" +#include "pagecache.h" +#include "rrdenglocking.h" + +#ifdef NETDATA_RRD_INTERNALS + +#endif /* NETDATA_RRD_INTERNALS */ + +extern unsigned rrdeng_pages_per_extent; + +/* Forward declarations */ +struct rrdengine_instance; + +#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */ + +#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u" +#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" + +struct rrdeng_collect_handle { + struct pg_cache_page_index *page_index; + struct rrdeng_page_descr *descr; + unsigned long page_correlation_id; + // set to 1 when this dimension is not page aligned with the other dimensions in the chart + uint8_t unaligned_page; + struct pg_alignment *alignment; +}; + +struct rrdeng_query_handle { + struct rrdeng_page_descr *descr; + struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + time_t wanted_start_time_s; + time_t now_s; + unsigned position; + unsigned entries; + storage_number *page; + usec_t page_end_time_ut; + uint32_t page_length; + time_t dt_s; +}; + +typedef enum { + RRDENGINE_STATUS_UNINITIALIZED = 0, + RRDENGINE_STATUS_INITIALIZING, + RRDENGINE_STATUS_INITIALIZED +} rrdengine_state_t; + +enum rrdeng_opcode { + /* can be used to return empty status or flush the command queue */ + RRDENG_NOOP = 0, + + RRDENG_READ_PAGE, + RRDENG_READ_EXTENT, + RRDENG_COMMIT_PAGE, + RRDENG_FLUSH_PAGES, + RRDENG_SHUTDOWN, + RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, + RRDENG_QUIESCE, + + RRDENG_MAX_OPCODE +}; + +struct rrdeng_read_page { + struct rrdeng_page_descr *page_cache_descr; +}; + +struct rrdeng_read_extent { + struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT]; + int page_count; +}; + +struct rrdeng_cmd { + enum rrdeng_opcode opcode; + union { + struct rrdeng_read_page read_page; + struct rrdeng_read_extent read_extent; + struct completion *completion; + }; +}; + +#define RRDENG_CMD_Q_MAX_SIZE (2048) + +struct rrdeng_cmdqueue { + unsigned head, tail; + struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE]; +}; + +struct extent_io_descriptor { + uv_fs_t req; + uv_work_t req_worker; + uv_buf_t iov; + uv_file file; + void *buf; + void *map_base; + size_t map_length; + uint64_t pos; + unsigned bytes; + struct completion *completion; + unsigned descr_count; + int release_descr; + struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT]; + struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT]; + Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; + struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */ +}; + +struct generic_io_descriptor { + uv_fs_t req; + uv_buf_t iov; + void *buf; + uint64_t pos; + unsigned bytes; + struct completion *completion; +}; + +struct extent_cache_element { + struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */ + unsigned fileno; + struct extent_cache_element *prev; /* LRU */ + struct extent_cache_element *next; /* LRU */ + struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */ + uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE]; +}; + +#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */ + +/* Initialize by setting the structure to zero */ +struct extent_cache { + struct extent_cache_element extent_array[MAX_CACHED_EXTENTS]; + unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */ + unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */ + + struct extent_cache_element *replaceQ_head; /* LRU */ + struct extent_cache_element *replaceQ_tail; /* MRU */ +}; + +struct rrdengine_worker_config { + struct rrdengine_instance *ctx; + + uv_thread_t thread; + uv_loop_t* loop; + uv_async_t async; + + /* file deletion thread */ + uv_thread_t *now_deleting_files; + unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */ + + /* dirty page deletion thread */ + uv_thread_t *now_invalidating_dirty_pages; + /* set to 0 when now_invalidating_dirty_pages is still running */ + unsigned long cleanup_thread_invalidating_dirty_pages; + unsigned inflight_dirty_pages; + + /* FIFO command queue */ + uv_mutex_t cmd_mutex; + uv_cond_t cmd_cond; + volatile unsigned queue_size; + struct rrdeng_cmdqueue cmd_queue; + + struct extent_cache xt_cache; + + int error; +}; + +/* + * Debug statistics not used by code logic. + * They only describe operations since DB engine instance load time. + */ +struct rrdengine_statistics { + rrdeng_stats_t metric_API_producers; + rrdeng_stats_t metric_API_consumers; + rrdeng_stats_t pg_cache_insertions; + rrdeng_stats_t pg_cache_deletions; + rrdeng_stats_t pg_cache_hits; + rrdeng_stats_t pg_cache_misses; + rrdeng_stats_t pg_cache_backfills; + rrdeng_stats_t pg_cache_evictions; + rrdeng_stats_t before_decompress_bytes; + rrdeng_stats_t after_decompress_bytes; + rrdeng_stats_t before_compress_bytes; + rrdeng_stats_t after_compress_bytes; + rrdeng_stats_t io_write_bytes; + rrdeng_stats_t io_write_requests; + rrdeng_stats_t io_read_bytes; + rrdeng_stats_t io_read_requests; + rrdeng_stats_t io_write_extent_bytes; + rrdeng_stats_t io_write_extents; + rrdeng_stats_t io_read_extent_bytes; + rrdeng_stats_t io_read_extents; + rrdeng_stats_t datafile_creations; + rrdeng_stats_t datafile_deletions; + rrdeng_stats_t journalfile_creations; + rrdeng_stats_t journalfile_deletions; + rrdeng_stats_t page_cache_descriptors; + rrdeng_stats_t io_errors; + rrdeng_stats_t fs_errors; + rrdeng_stats_t pg_cache_over_half_dirty_events; + rrdeng_stats_t flushing_pressure_page_deletions; +}; + +/* I/O errors global counter */ +extern rrdeng_stats_t global_io_errors; +/* File-System errors global counter */ +extern rrdeng_stats_t global_fs_errors; +/* number of File-Descriptors that have been reserved by dbengine */ +extern rrdeng_stats_t rrdeng_reserved_file_descriptors; +/* inability to flush global counters */ +extern rrdeng_stats_t global_pg_cache_over_half_dirty_events; +extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */ + +#define NO_QUIESCE (0) /* initial state when all operations function normally */ +#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */ +#define QUIESCED (2) /* is set after all threads have finished running */ + +typedef enum { + LOAD_ERRORS_PAGE_FLIPPED_TIME = 0, + LOAD_ERRORS_PAGE_EQUAL_TIME = 1, + LOAD_ERRORS_PAGE_ZERO_ENTRIES = 2, + LOAD_ERRORS_PAGE_UPDATE_ZERO = 3, + LOAD_ERRORS_PAGE_FLEXY_TIME = 4, + LOAD_ERRORS_DROPPED_EXTENT = 5, +} INVALID_PAGE_ID; + +struct rrdengine_instance { + struct rrdengine_worker_config worker_config; + struct completion rrdengine_completion; + struct page_cache pg_cache; + uint8_t drop_metrics_under_page_cache_pressure; /* boolean */ + uint8_t global_compress_alg; + struct transaction_commit_log commit_log; + struct rrdengine_datafile_list datafiles; + RRDHOST *host; /* the legacy host, or NULL for multi-host DB */ + char dbfiles_path[FILENAME_MAX + 1]; + char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */ + uint64_t disk_space; + uint64_t max_disk_space; + int tier; + unsigned last_fileno; /* newest index of datafile and journalfile */ + unsigned long max_cache_pages; + unsigned long cache_pages_low_watermark; + unsigned long metric_API_max_producers; + + uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */ + uint8_t page_type; /* Default page type for this context */ + + struct rrdengine_statistics stats; + + struct { + size_t counter; + usec_t latest_end_time_ut; + } load_errors[6]; +}; + +void *dbengine_page_alloc(void); +void dbengine_page_free(void *page); + +int init_rrd_files(struct rrdengine_instance *ctx); +void finalize_rrd_files(struct rrdengine_instance *ctx); +void rrdeng_test_quota(struct rrdengine_worker_config* wc); +void rrdeng_worker(void* arg); +void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd); +struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc); + +#endif /* NETDATA_RRDENGINE_H */ diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c new file mode 100755 index 0000000..4525b04 --- /dev/null +++ b/database/engine/rrdengineapi.c @@ -0,0 +1,1272 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" +#include "../storage_engine.h" + +/* Default global database instance */ +struct rrdengine_instance multidb_ctx_storage_tier0; +struct rrdengine_instance multidb_ctx_storage_tier1; +struct rrdengine_instance multidb_ctx_storage_tier2; +struct rrdengine_instance multidb_ctx_storage_tier3; +struct rrdengine_instance multidb_ctx_storage_tier4; +#if RRD_STORAGE_TIERS != 5 +#error RRD_STORAGE_TIERS is not 5 - you need to add allocations here +#endif +struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS]; +uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER}; + +#if PAGE_TYPE_MAX != 1 +#error PAGE_TYPE_MAX is not 1 - you need to add allocations here +#endif +size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)}; + +__attribute__((constructor)) void initialize_multidb_ctx(void) { + multidb_ctx[0] = &multidb_ctx_storage_tier0; + multidb_ctx[1] = &multidb_ctx_storage_tier1; + multidb_ctx[2] = &multidb_ctx_storage_tier2; + multidb_ctx[3] = &multidb_ctx_storage_tier3; + multidb_ctx[4] = &multidb_ctx_storage_tier4; +} + +int db_engine_use_malloc = 0; +int default_rrdeng_page_fetch_timeout = 3; +int default_rrdeng_page_fetch_retries = 3; +int default_rrdeng_page_cache_mb = 32; +int default_rrdeng_disk_quota_mb = 256; +int default_multidb_disk_quota_mb = 256; +/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ +uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; + +// ---------------------------------------------------------------------------- +// metrics groups + +static inline void rrdeng_page_alignment_acquire(struct pg_alignment *pa) { + if(unlikely(!pa)) return; + __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST); +} + +static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) { + if(unlikely(!pa)) return true; + + if(__atomic_sub_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST) == 0) { + freez(pa); + return true; + } + + return false; +} + +// charts call this +STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) { + struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment)); + rrdeng_page_alignment_acquire(pa); + return (STORAGE_METRICS_GROUP *)pa; +} + +// charts call this +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) { + if(unlikely(!smg)) return; + + struct pg_alignment *pa = (struct pg_alignment *)smg; + rrdeng_page_alignment_release(pa); +} + +// ---------------------------------------------------------------------------- +// metric handle for legacy dbs + +/* This UUID is not unique across hosts */ +void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid) +{ + EVP_MD_CTX *evpctx; + unsigned char hash_value[EVP_MAX_MD_SIZE]; + unsigned int hash_len; + + evpctx = EVP_MD_CTX_create(); + EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); + EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id)); + EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id)); + EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); + EVP_MD_CTX_destroy(evpctx); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} + +/* Transform legacy UUID to be unique across hosts deterministically */ +void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid) +{ + EVP_MD_CTX *evpctx; + unsigned char hash_value[EVP_MAX_MD_SIZE]; + unsigned int hash_len; + + evpctx = EVP_MD_CTX_create(); + EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); + EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN); + EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t)); + EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); + EVP_MD_CTX_destroy(evpctx); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) { + uuid_t legacy_uuid; + rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid); + return rrdeng_metric_get(db_instance, &legacy_uuid); +} + +// ---------------------------------------------------------------------------- +// metric handle + +void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + + __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + return db_metric_handle; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct page_cache *pg_cache = &ctx->pg_cache; + struct pg_cache_page_index *page_index = NULL; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) + page_index = *PValue; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + if (likely(page_index)) + __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + + return (STORAGE_METRIC_HANDLE *)page_index; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { + internal_fatal(!db_instance, "DBENGINE: db_instance is NULL"); + + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct pg_cache_page_index *page_index; + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + Pvoid_t *PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(uuid, ctx); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + page_index->refcount = 1; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + + return (STORAGE_METRIC_HANDLE *)page_index; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { + STORAGE_METRIC_HANDLE *db_metric_handle; + + db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid); + if(!db_metric_handle) { + db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset)); + if(db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + uuid_copy(rd->metric_uuid, page_index->id); + } + } + if(!db_metric_handle) + db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid); + +#ifdef NETDATA_INTERNAL_CHECKS + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + if(uuid_compare(rd->metric_uuid, page_index->id) != 0) { + char uuid1[UUID_STR_LEN + 1]; + char uuid2[UUID_STR_LEN + 1]; + + uuid_unparse(rd->metric_uuid, uuid1); + uuid_unparse(page_index->id, uuid2); + fatal("DBENGINE: uuids do not match, asked for metric '%s', but got page_index of metric '%s'", uuid1, uuid2); + } + + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + if(page_index->ctx != ctx) + fatal("DBENGINE: mixed up rrdengine instances, asked for metric from %p, got from %p", ctx, page_index->ctx); +#endif + + return db_metric_handle; +} + + +// ---------------------------------------------------------------------------- +// collect ops + +/* + * Gets a handle for storing metrics to the database. + * The handle must be released with rrdeng_store_metric_final(). + */ +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + struct rrdeng_collect_handle *handle; + + handle = callocz(1, sizeof(struct rrdeng_collect_handle)); + handle->page_index = page_index; + handle->descr = NULL; + handle->unaligned_page = 0; + page_index->latest_update_every_s = update_every; + + handle->alignment = (struct pg_alignment *)smg; + rrdeng_page_alignment_acquire(handle->alignment); + + uv_rwlock_wrlock(&page_index->lock); + ++page_index->writers; + uv_rwlock_wrunlock(&page_index->lock); + + return (STORAGE_COLLECT_HANDLE *)handle; +} + +/* The page must be populated and referenced */ +static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) +{ + switch(descr->type) { + case PAGE_METRICS: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number *array = (storage_number *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(does_storage_number_exist(array[i])) + return 0; + } + } + break; + + case PAGE_TIER: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(fpclassify(array[i].sum_value) != FP_NAN) + return 0; + } + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type); + logged = true; + } + return 0; + } + } + + return 1; +} + +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; + struct rrdengine_instance *ctx = handle->page_index->ctx; + struct rrdeng_page_descr *descr = handle->descr; + + if (unlikely(!ctx)) return; + if (unlikely(!descr)) return; + + if (likely(descr->page_length)) { + int page_is_empty; + + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); + + page_is_empty = page_has_only_empty_metrics(descr); + if (page_is_empty) { + print_page_cache_descr(descr, "Page has empty metrics only, deleting", true); + pg_cache_put(ctx, descr); + pg_cache_punch_hole(ctx, descr, 1, 0, NULL); + } else + rrdeng_commit_page(ctx, descr, handle->page_correlation_id); + } else { + dbengine_page_free(descr->pg_cache_descr->page); + rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); + rrdeng_page_descr_freez(descr); + } + handle->descr = NULL; +} + +static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time_ut, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags) +{ + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + struct rrdengine_instance *ctx = handle->page_index->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = handle->descr; + + void *page; + uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; + + if (descr) { + /* Make alignment decisions */ + +#ifdef NETDATA_INTERNAL_CHECKS + if(descr->end_time_ut + page_index->latest_update_every_s * USEC_PER_SEC != point_in_time_ut) { + char buffer[200 + 1]; + snprintfz(buffer, 200, + "metrics collected are %s, end_time_ut = %llu, point_in_time_ut = %llu, update_every = %u, delta = %llu", + (point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC > page_index->latest_update_every_s)?"far apart":"not aligned", + descr->end_time_ut / USEC_PER_SEC, + point_in_time_ut / USEC_PER_SEC, + page_index->latest_update_every_s, + point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC); + print_page_cache_descr(descr, buffer, false); + } +#endif + + if (descr->page_length == handle->alignment->page_length) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + /* is the metric far enough out of alignment with the others? */ + if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < handle->alignment->page_length)) { + handle->unaligned_page = 1; + print_page_cache_descr(descr, "Metric page is not aligned with chart", true); + } + if (unlikely(handle->unaligned_page && + /* did the other metrics change page? */ + handle->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) { + print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true); + must_flush_unaligned_page = 1; + handle->unaligned_page = 0; + } + } + if (unlikely(NULL == descr || + descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE || + must_flush_unaligned_page)) { + + if(descr) { + print_page_cache_descr(descr, "flushing metric", true); + rrdeng_store_metric_flush_current_page(collection_handle); + } + + page = rrdeng_create_page(ctx, &page_index->id, &descr); + fatal_assert(page); + + descr->update_every_s = page_index->latest_update_every_s; + handle->descr = descr; + + handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); + + if (0 == handle->alignment->page_length) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + } + + page = descr->pg_cache_descr->page; + + switch (descr->type) { + case PAGE_METRICS: { + ((storage_number *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = pack_storage_number(n, flags); + } + break; + + case PAGE_TIER: { + storage_number_tier1_t number_tier1; + number_tier1.sum_value = (float)n; + number_tier1.min_value = (float)min_value; + number_tier1.max_value = (float)max_value; + number_tier1.anomaly_count = anomaly_count; + number_tier1.count = count; + ((storage_number_tier1_t *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = number_tier1; + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot store metric on unknown page type id %d", descr->type); + logged = true; + } + } + break; + } + + pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); + + if (perfect_page_alignment) + handle->alignment->page_length = descr->page_length; + if (unlikely(INVALID_TIME == descr->start_time_ut)) { + unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; + descr->start_time_ut = point_in_time_ut; + + new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1); + while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) { + /* Increase ctx->metric_API_max_producers */ + ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers, + old_metric_API_max_producers, + new_metric_API_producers); + if (old_metric_API_max_producers == ret_metric_API_max_producers) { + /* success */ + break; + } + } + + pg_cache_insert(ctx, page_index, descr); + } else { + pg_cache_add_new_metric_time(page_index, descr); + } + +// { +// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 }; +// if(uuid_compare(u, page_index->id) == 0) { +// char buffer[100]; +// snprintfz(buffer, 100, "store system.cpu, collect:%u, page_index first:%u, last:%u", +// (uint32_t)(point_in_time / USEC_PER_SEC), +// (uint32_t)(page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// } +// } +} + +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time_ut, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags) +{ + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + struct rrdeng_page_descr *descr = handle->descr; + + if(likely(descr)) { + usec_t last_point_in_time_ut = descr->end_time_ut; + usec_t update_every_ut = page_index->latest_update_every_s * USEC_PER_SEC; + size_t points_gap = (point_in_time_ut <= last_point_in_time_ut) ? + (size_t)0 : + (size_t)((point_in_time_ut - last_point_in_time_ut) / update_every_ut); + + if(unlikely(points_gap != 1)) { + if (unlikely(points_gap <= 0)) { + time_t now = now_realtime_sec(); + static __thread size_t counter = 0; + static __thread time_t last_time_logged = 0; + counter++; + + if(now - last_time_logged > 600) { + error("DBENGINE: collected point is in the past (repeated %zu times in the last %zu secs). Ignoring these data collection points.", + counter, (size_t)(last_time_logged?(now - last_time_logged):0)); + + last_time_logged = now; + counter = 0; + } + return; + } + + size_t point_size = PAGE_POINT_SIZE_BYTES(descr); + size_t page_size_in_points = RRDENG_BLOCK_SIZE / point_size; + size_t used_points = descr->page_length / point_size; + size_t remaining_points_in_page = page_size_in_points - used_points; + + bool new_point_is_aligned = true; + if(unlikely((point_in_time_ut - last_point_in_time_ut) / points_gap != update_every_ut)) + new_point_is_aligned = false; + + if(unlikely(points_gap > remaining_points_in_page || !new_point_is_aligned)) { +// char buffer[200]; +// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Cutting page.", +// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s); +// print_page_cache_descr(descr, buffer, false); + + rrdeng_store_metric_flush_current_page(collection_handle); + } + else { +// char buffer[200]; +// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Filling the gap.", +// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s); +// print_page_cache_descr(descr, buffer, false); + + // loop to fill the gap + usec_t step_ut = page_index->latest_update_every_s * USEC_PER_SEC; + usec_t last_point_filled_ut = last_point_in_time_ut + step_ut; + + while (last_point_filled_ut < point_in_time_ut) { + rrdeng_store_metric_next_internal( + collection_handle, last_point_filled_ut, NAN, NAN, NAN, + 1, 0, SN_EMPTY_SLOT); + + last_point_filled_ut += step_ut; + } + } + } + } + + rrdeng_store_metric_next_internal(collection_handle, point_in_time_ut, n, min_value, max_value, count, anomaly_count, flags); +} + + +/* + * Releases the database reference from the handle for storing metrics. + * Returns 1 if it's safe to delete the dimension. + */ +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + + uint8_t can_delete_metric = 0; + + rrdeng_store_metric_flush_current_page(collection_handle); + uv_rwlock_wrlock(&page_index->lock); + + if (!--page_index->writers && !page_index->page_count) + can_delete_metric = 1; + + uv_rwlock_wrunlock(&page_index->lock); + + rrdeng_page_alignment_release(handle->alignment); + freez(handle); + + return can_delete_metric; +} + +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + rrdeng_store_metric_flush_current_page(collection_handle); + uv_rwlock_rdlock(&page_index->lock); + page_index->latest_update_every_s = update_every; + uv_rwlock_rdunlock(&page_index->lock); +} + +// ---------------------------------------------------------------------------- +// query ops + +//static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) +//{ +// return (uint32_t *)&page_info->scratch[0]; +//} +// +//static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) +//{ +// return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; +//} +// +/* + * Gets a handle for loading metrics from the database. + * The handle must be released with rrdeng_load_metric_final(). + */ +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, time_t start_time_s, time_t end_time_s) +{ + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + struct rrdengine_instance *ctx = page_index->ctx; + + // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time); + + struct rrdeng_query_handle *handle; + unsigned pages_nr; + + if(!page_index->latest_update_every_s) + page_index->latest_update_every_s = default_rrd_update_every; + + rrdimm_handle->start_time_s = start_time_s; + rrdimm_handle->end_time_s = end_time_s; + + handle = callocz(1, sizeof(struct rrdeng_query_handle)); + handle->wanted_start_time_s = start_time_s; + handle->now_s = start_time_s; + handle->position = 0; + handle->ctx = ctx; + handle->descr = NULL; + handle->dt_s = page_index->latest_update_every_s; + rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; + pages_nr = pg_cache_preload(ctx, &page_index->id, start_time_s * USEC_PER_SEC, end_time_s * USEC_PER_SEC, + NULL, &handle->page_index); + if (unlikely(NULL == handle->page_index || 0 == pages_nr)) + // there are no metrics to load + handle->wanted_start_time_s = INVALID_TIME; +} + +static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_handle, bool debug_this __maybe_unused) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; + + uint32_t page_length; + usec_t page_end_time_ut; + unsigned position; + + if (likely(descr)) { + // Drop old page's reference + +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); +#endif + + pg_cache_put(ctx, descr); + handle->descr = NULL; + handle->wanted_start_time_s = (time_t)((handle->page_end_time_ut / USEC_PER_SEC) + handle->dt_s); + + if (unlikely(handle->wanted_start_time_s > rrdimm_handle->end_time_s)) + return 1; + } + + usec_t wanted_start_time_ut = handle->wanted_start_time_s * USEC_PER_SEC; + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, + wanted_start_time_ut, rrdimm_handle->end_time_s * USEC_PER_SEC); + if (NULL == descr) + return 1; + +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); +#endif + + handle->descr = descr; + pg_cache_atomic_get_pg_info(descr, &page_end_time_ut, &page_length); + if (unlikely(INVALID_TIME == descr->start_time_ut || INVALID_TIME == page_end_time_ut || 0 == descr->update_every_s)) { + error("DBENGINE: discarding invalid page descriptor (start_time = %llu, end_time = %llu, update_every_s = %d)", + descr->start_time_ut, page_end_time_ut, descr->update_every_s); + return 1; + } + + if (unlikely(descr->start_time_ut != page_end_time_ut && wanted_start_time_ut > descr->start_time_ut)) { + // we're in the middle of the page somewhere + unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr); + position = ((uint64_t)(wanted_start_time_ut - descr->start_time_ut)) * (entries - 1) / + (page_end_time_ut - descr->start_time_ut); + } + else + position = 0; + + handle->page_end_time_ut = page_end_time_ut; + handle->page_length = page_length; + handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr); + handle->page = descr->pg_cache_descr->page; + handle->dt_s = descr->update_every_s; + handle->position = position; + +// if(debug_this) +// info("DBENGINE: rrdeng_load_page_next(), " +// "position:%d, " +// "start_time_ut:%llu, " +// "page_end_time_ut:%llu, " +// "next_page_time_ut:%llu, " +// "in_out:%s" +// , position +// , descr->start_time_ut +// , page_end_time_ut +// , +// wanted_start_time_ut, in_out?"true":"false" +// ); + + return 0; +} + +// Returns the metric and sets its timestamp into current_time +// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) +// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; + // struct rrdeng_metric_handle *metric_handle = handle->metric_handle; + + struct rrdeng_page_descr *descr = handle->descr; + time_t now = handle->now_s + handle->dt_s; + +// bool debug_this = false; +// { +// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 }; +// if(uuid_compare(u, handle->page_index->id) == 0) { +// char buffer[100]; +// snprintfz(buffer, 100, "load system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u", +// (uint32_t)(now), +// (uint32_t)(handle->dt_s), +// (uint32_t)(handle->position), +// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// debug_this = true; +// } +// } + + STORAGE_POINT sp; + unsigned position = handle->position + 1; + storage_number_tier1_t tier1_value; + + if (unlikely(INVALID_TIME == handle->wanted_start_time_s)) { + handle->wanted_start_time_s = INVALID_TIME; + handle->now_s = now; + storage_point_empty(sp, now - handle->dt_s, now); + return sp; + } + + if (unlikely(!descr || position >= handle->entries)) { + // We need to get a new page + if(rrdeng_load_page_next(rrddim_handle, false)) { + // next calls will not load any more metrics + handle->wanted_start_time_s = INVALID_TIME; + handle->now_s = now; + storage_point_empty(sp, now - handle->dt_s, now); + return sp; + } + + descr = handle->descr; + position = handle->position; + now = (time_t)((descr->start_time_ut / USEC_PER_SEC) + position * descr->update_every_s); + +// if(debug_this) { +// char buffer[100]; +// snprintfz(buffer, 100, "NEW PAGE system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u", +// (uint32_t)(now), +// (uint32_t)(handle->dt_s), +// (uint32_t)(handle->position), +// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// } + } + + sp.start_time = now - handle->dt_s; + sp.end_time = now; + + handle->position = position; + handle->now_s = now; + + switch(descr->type) { + case PAGE_METRICS: { + storage_number n = handle->page[position]; + sp.min = sp.max = sp.sum = unpack_storage_number(n); + sp.flags = n & SN_USER_FLAGS; + sp.count = 1; + sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; + } + break; + + case PAGE_TIER: { + tier1_value = ((storage_number_tier1_t *)handle->page)[position]; + sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS; + sp.count = tier1_value.count; + sp.anomaly_count = tier1_value.anomaly_count; + sp.min = tier1_value.min_value; + sp.max = tier1_value.max_value; + sp.sum = tier1_value.sum_value; + } + break; + + // we don't know this page type + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", descr->type); + logged = true; + } + storage_point_empty(sp, sp.start_time, sp.end_time); + } + break; + } + + if (unlikely(now >= rrddim_handle->end_time_s)) { + // next calls will not load any more metrics + handle->wanted_start_time_s = INVALID_TIME; + } + +// if(debug_this) +// info("DBENGINE: returning point: " +// "time from %ld to %ld // query from %ld to %ld // wanted_start_time_s %ld" +// , sp.start_time, sp.end_time +// , rrddim_handle->start_time_s, rrddim_handle->end_time_s +// , handle->wanted_start_time_s +// ); + + return sp; +} + +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle) +{ + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + return (INVALID_TIME == handle->wanted_start_time_s); +} + +/* + * Releases the database reference from the handle for loading metrics. + */ +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle) +{ + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; + + if (descr) { +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); +#endif + pg_cache_put(ctx, descr); + } + + // whatever is allocated at rrdeng_load_metric_init() should be freed here + freez(handle); + rrdimm_handle->handle = NULL; +} + +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + return (time_t)(page_index->latest_time_ut / USEC_PER_SEC); +} +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + return (time_t)(page_index->oldest_time_ut / USEC_PER_SEC); +} + +int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t) +{ + struct page_cache *pg_cache; + struct rrdengine_instance *ctx; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + + ctx = (struct rrdengine_instance *)si; + if (unlikely(!ctx)) { + error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__); + return 1; + } + pg_cache = &ctx->pg_cache; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + if (likely(page_index)) { + *first_entry_t = page_index->oldest_time_ut / USEC_PER_SEC; + *last_entry_t = page_index->latest_time_ut / USEC_PER_SEC; + return 0; + } + + return 1; +} + +/* Also gets a reference for the page */ +void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr) +{ + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + void *page; + /* TODO: check maximum number of pages in page cache limit */ + + descr = pg_cache_create_descr(); + descr->id = id; /* TODO: add page type: metric, log, something? */ + descr->type = ctx->page_type; + page = dbengine_page_alloc(); /*TODO: add page size */ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; + pg_cache_descr->refcnt = 1; + + debug(D_RRDENGINE, "Created new page:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + rrdeng_page_descr_mutex_unlock(ctx, descr); + *ret_descr = descr; + return page; +} + +/* The page must not be empty */ +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, + Word_t page_correlation_id) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + unsigned nr_committed_pages; + + if (unlikely(NULL == descr)) { + debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__); + return; + } + fatal_assert(descr->page_length); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0); + *PValue = descr; + nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) { + /* over 50% of pages have not been committed yet */ + + if (ctx->drop_metrics_under_page_cache_pressure && + nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* 100% of pages are dirty */ + struct rrdeng_cmd cmd; + + cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + } else { + if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) { + /* only print the first time */ + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data at risk of not being stored in the database, " + "please reduce disk load or use a faster disk.", ctx->dbfiles_path); + } + rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1); + rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1); + } + } + + pg_cache_put(ctx, descr); +} + +/* Gets a reference for the page */ +void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle) +{ + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + debug(D_RRDENGINE, "Reading existing page:"); + descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME); + if (NULL == descr) { + *handle = NULL; + + return NULL; + } + *handle = descr; + pg_cache_descr = descr->pg_cache_descr; + + return pg_cache_descr->page; +} + +/* Gets a reference for the page */ +void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle) +{ + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + debug(D_RRDENGINE, "Reading existing page:"); + descr = pg_cache_lookup(ctx, NULL, id, point_in_time_ut); + if (NULL == descr) { + *handle = NULL; + + return NULL; + } + *handle = descr; + pg_cache_descr = descr->pg_cache_descr; + + return pg_cache_descr->page; +} + +/* + * Gathers Database Engine statistics. + * Careful when modifying this function. + * You must not change the indices of the statistics or user code will break. + * You must not exceed RRDENG_NR_STATS or it will crash. + */ +void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +{ + if (ctx == NULL) + return; + + struct page_cache *pg_cache = &ctx->pg_cache; + + array[0] = (uint64_t)ctx->stats.metric_API_producers; + array[1] = (uint64_t)ctx->stats.metric_API_consumers; + array[2] = (uint64_t)pg_cache->page_descriptors; + array[3] = (uint64_t)pg_cache->populated_pages; + array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages; + array[5] = (uint64_t)ctx->stats.pg_cache_insertions; + array[6] = (uint64_t)ctx->stats.pg_cache_deletions; + array[7] = (uint64_t)ctx->stats.pg_cache_hits; + array[8] = (uint64_t)ctx->stats.pg_cache_misses; + array[9] = (uint64_t)ctx->stats.pg_cache_backfills; + array[10] = (uint64_t)ctx->stats.pg_cache_evictions; + array[11] = (uint64_t)ctx->stats.before_compress_bytes; + array[12] = (uint64_t)ctx->stats.after_compress_bytes; + array[13] = (uint64_t)ctx->stats.before_decompress_bytes; + array[14] = (uint64_t)ctx->stats.after_decompress_bytes; + array[15] = (uint64_t)ctx->stats.io_write_bytes; + array[16] = (uint64_t)ctx->stats.io_write_requests; + array[17] = (uint64_t)ctx->stats.io_read_bytes; + array[18] = (uint64_t)ctx->stats.io_read_requests; + array[19] = (uint64_t)ctx->stats.io_write_extent_bytes; + array[20] = (uint64_t)ctx->stats.io_write_extents; + array[21] = (uint64_t)ctx->stats.io_read_extent_bytes; + array[22] = (uint64_t)ctx->stats.io_read_extents; + array[23] = (uint64_t)ctx->stats.datafile_creations; + array[24] = (uint64_t)ctx->stats.datafile_deletions; + array[25] = (uint64_t)ctx->stats.journalfile_creations; + array[26] = (uint64_t)ctx->stats.journalfile_deletions; + array[27] = (uint64_t)ctx->stats.page_cache_descriptors; + array[28] = (uint64_t)ctx->stats.io_errors; + array[29] = (uint64_t)ctx->stats.fs_errors; + array[30] = (uint64_t)global_io_errors; + array[31] = (uint64_t)global_fs_errors; + array[32] = (uint64_t)rrdeng_reserved_file_descriptors; + array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events; + array[34] = (uint64_t)global_pg_cache_over_half_dirty_events; + array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions; + array[36] = (uint64_t)global_flushing_pressure_page_deletions; + fatal_assert(RRDENG_NR_STATS == 37); +} + +/* Releases reference to page */ +void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) +{ + (void)ctx; + pg_cache_put(ctx, (struct rrdeng_page_descr *)handle); +} + +/* + * Returns 0 on success, negative on error + */ +int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb, size_t tier) { + struct rrdengine_instance *ctx; + int error; + uint32_t max_open_files; + + max_open_files = rlimit_nofile.rlim_cur / 4; + + /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */ + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE); + if (rrdeng_reserved_file_descriptors > max_open_files) { + error( + "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", + (unsigned)rrdeng_reserved_file_descriptors, + (unsigned)max_open_files); + + rrd_stat_atomic_add(&global_fs_errors, 1); + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EMFILE; + } + + if(NULL == ctxp) { + ctx = multidb_ctx[tier]; + memset(ctx, 0, sizeof(*ctx)); + } + else { + *ctxp = ctx = callocz(1, sizeof(*ctx)); + } + ctx->tier = tier; + ctx->page_type = tier_page_type[tier]; + ctx->global_compress_alg = RRD_LZ4; + if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) + page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; + ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE); + /* try to keep 5% of the page cache free */ + ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100; + if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB) + disk_space_mb = RRDENG_MIN_DISK_SPACE_MB; + ctx->max_disk_space = disk_space_mb * 1048576LLU; + strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1); + ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0'; + if (NULL == host) + strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN); + else + strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN); + + ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure; + ctx->metric_API_max_producers = 0; + ctx->quiesce = NO_QUIESCE; + ctx->host = host; + + memset(&ctx->worker_config, 0, sizeof(ctx->worker_config)); + ctx->worker_config.ctx = ctx; + init_page_cache(ctx); + init_commit_log(ctx); + error = init_rrd_files(ctx); + if (error) { + goto error_after_init_rrd_files; + } + + completion_init(&ctx->rrdengine_completion); + fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config)); + /* wait for worker thread to initialize */ + completion_wait_for(&ctx->rrdengine_completion); + completion_destroy(&ctx->rrdengine_completion); + uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER"); + if (ctx->worker_config.error) { + goto error_after_rrdeng_worker; + } +// error = metalog_init(ctx); +// if (error) { +// error("Failed to initialize metadata log file event loop."); +// goto error_after_rrdeng_worker; +// } + + return 0; + +error_after_rrdeng_worker: + finalize_rrd_files(ctx); +error_after_init_rrd_files: + free_page_cache(ctx); + if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) { + freez(ctx); + if (ctxp) + *ctxp = NULL; + } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EIO; +} + +/* + * Returns 0 on success, 1 on error + */ +int rrdeng_exit(struct rrdengine_instance *ctx) +{ + struct rrdeng_cmd cmd; + + if (NULL == ctx) { + return 1; + } + + /* TODO: add page to page cache */ + cmd.opcode = RRDENG_SHUTDOWN; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread)); + + finalize_rrd_files(ctx); + //metalog_exit(ctx->metalog_ctx); + free_page_cache(ctx); + + if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) + freez(ctx); + + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return 0; +} + +void rrdeng_prepare_exit(struct rrdengine_instance *ctx) +{ + struct rrdeng_cmd cmd; + + if (NULL == ctx) { + return; + } + + completion_init(&ctx->rrdengine_completion); + cmd.opcode = RRDENG_QUIESCE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + /* wait for dbengine to quiesce */ + completion_wait_for(&ctx->rrdengine_completion); + completion_destroy(&ctx->rrdengine_completion); + + //metalog_prepare_exit(ctx->metalog_ctx); +} + +RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { + RRDENG_SIZE_STATS stats = { 0 }; + + for(struct pg_cache_page_index *page_index = ctx->pg_cache.metrics_index.last_page_index; + page_index != NULL ;page_index = page_index->prev) { + stats.metrics++; + stats.metrics_pages += page_index->page_count; + } + + for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) { + stats.datafiles++; + + for(struct extent_info *ei = df->extents.first; ei ; ei = ei->next) { + stats.extents++; + stats.extents_compressed_bytes += ei->size; + + for(int p = 0; p < ei->number_of_pages ;p++) { + struct rrdeng_page_descr *descr = ei->pages[p]; + + usec_t update_every_usec; + + size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + + if(likely(points > 1)) + update_every_usec = (descr->end_time_ut - descr->start_time_ut) / (points - 1); + else { + update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC; + stats.single_point_pages++; + } + + time_t duration_secs = (time_t)((descr->end_time_ut - descr->start_time_ut + update_every_usec)/USEC_PER_SEC); + + stats.extents_pages++; + stats.pages_uncompressed_bytes += descr->page_length; + stats.pages_duration_secs += duration_secs; + stats.points += points; + + stats.page_types[descr->type].pages++; + stats.page_types[descr->type].pages_uncompressed_bytes += descr->page_length; + stats.page_types[descr->type].pages_duration_secs += duration_secs; + stats.page_types[descr->type].points += points; + + if(!stats.first_t || (descr->start_time_ut - update_every_usec) < stats.first_t) + stats.first_t = (descr->start_time_ut - update_every_usec) / USEC_PER_SEC; + + if(!stats.last_t || descr->end_time_ut > stats.last_t) + stats.last_t = descr->end_time_ut / USEC_PER_SEC; + } + } + } + + + stats.currently_collected_metrics = ctx->stats.metric_API_producers; + stats.max_concurrently_collected_metrics = ctx->metric_API_max_producers; + + internal_error(stats.metrics_pages != stats.extents_pages + stats.currently_collected_metrics, + "DBENGINE: metrics pages is %zu, but extents pages is %zu and API consumers is %zu", + stats.metrics_pages, stats.extents_pages, stats.currently_collected_metrics); + + stats.disk_space = ctx->disk_space; + stats.max_disk_space = ctx->max_disk_space; + + stats.database_retention_secs = (time_t)(stats.last_t - stats.first_t); + + if(stats.extents_pages) + stats.average_page_size_bytes = (double)stats.pages_uncompressed_bytes / (double)stats.extents_pages; + + if(stats.pages_uncompressed_bytes > 0) + stats.average_compression_savings = 100.0 - ((double)stats.extents_compressed_bytes * 100.0 / (double)stats.pages_uncompressed_bytes); + + if(stats.points) + stats.average_point_duration_secs = (double)stats.pages_duration_secs / (double)stats.points; + + if(stats.metrics) { + stats.average_metric_retention_secs = (double)stats.pages_duration_secs / (double)stats.metrics; + + if(stats.database_retention_secs) { + double metric_coverage = stats.average_metric_retention_secs / (double)stats.database_retention_secs; + double db_retention_days = (double)stats.database_retention_secs / 86400.0; + + stats.estimated_concurrently_collected_metrics = stats.metrics * metric_coverage; + + stats.ephemeral_metrics_per_day_percent = ((double)stats.metrics * 100.0 / (double)stats.estimated_concurrently_collected_metrics - 100.0) / (double)db_retention_days; + } + } + + stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index) + sizeof(struct pg_alignment)); + stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr)); + stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile)); + stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr)); + stats.sizeof_point_data = page_type_size[ctx->page_type]; + stats.sizeof_page_data = RRDENG_BLOCK_SIZE; + stats.pages_per_extent = rrdeng_pages_per_extent; + + stats.sizeof_extent = sizeof(struct extent_info); + stats.sizeof_page_in_extent = sizeof(struct rrdeng_page_descr *); + + stats.sizeof_metric_in_index = 40; + stats.sizeof_page_in_index = 24; + + stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->tier); + + return stats; +} diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h new file mode 100644 index 0000000..3acee4e --- /dev/null +++ b/database/engine/rrdengineapi.h @@ -0,0 +1,144 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINEAPI_H +#define NETDATA_RRDENGINEAPI_H + +#include "rrdengine.h" + +#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8) +#define RRDENG_MIN_DISK_SPACE_MB (64) + +#define RRDENG_NR_STATS (37) + +#define RRDENG_FD_BUDGET_PER_INSTANCE (50) + +extern int db_engine_use_malloc; +extern int default_rrdeng_page_fetch_timeout; +extern int default_rrdeng_page_fetch_retries; +extern int default_rrdeng_page_cache_mb; +extern int default_rrdeng_disk_quota_mb; +extern int default_multidb_disk_quota_mb; +extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure; +extern struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS]; +extern size_t page_type_size[]; + +#define PAGE_POINT_SIZE_BYTES(x) page_type_size[(x)->type] + +struct rrdeng_region_info { + time_t start_time_s; + int update_every; + unsigned points; +}; + +void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr); +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, + Word_t page_correlation_id); +void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle); +void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle); +void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); + +void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid); +void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, + uuid_t *ret_uuid); + + +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance); +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id); +void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle); +STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle); + +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags); +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); + +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, + time_t start_time_s, time_t end_time_s); +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle); + + +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle); +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle); +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle); + +void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array); + +/* must call once before using anything */ +int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb, size_t tier); + +int rrdeng_exit(struct rrdengine_instance *ctx); +void rrdeng_prepare_exit(struct rrdengine_instance *ctx); +int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t); + +extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); + +typedef struct rrdengine_size_statistics { + size_t default_granularity_secs; + + size_t sizeof_metric; + size_t sizeof_metric_in_index; + size_t sizeof_page; + size_t sizeof_page_in_index; + size_t sizeof_extent; + size_t sizeof_page_in_extent; + size_t sizeof_datafile; + size_t sizeof_page_in_cache; + size_t sizeof_point_data; + size_t sizeof_page_data; + + size_t pages_per_extent; + + size_t datafiles; + size_t extents; + size_t extents_pages; + size_t points; + size_t metrics; + size_t metrics_pages; + + size_t extents_compressed_bytes; + size_t pages_uncompressed_bytes; + time_t pages_duration_secs; + + struct { + size_t pages; + size_t pages_uncompressed_bytes; + time_t pages_duration_secs; + size_t points; + } page_types[256]; + + size_t single_point_pages; + + usec_t first_t; + usec_t last_t; + + size_t currently_collected_metrics; + size_t max_concurrently_collected_metrics; + size_t estimated_concurrently_collected_metrics; + + size_t disk_space; + size_t max_disk_space; + + time_t database_retention_secs; + double average_compression_savings; + double average_point_duration_secs; + double average_metric_retention_secs; + + double ephemeral_metrics_per_day_percent; + + double average_page_size_bytes; +} RRDENG_SIZE_STATS; + +RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx); + +#endif /* NETDATA_RRDENGINEAPI_H */ diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c new file mode 100644 index 0000000..58bd9c4 --- /dev/null +++ b/database/engine/rrdenginelib.c @@ -0,0 +1,311 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +#define BUFSIZE (512) + +/* Caller must hold descriptor lock */ +void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug) +{ + if(log_debug && !(debug_flags & D_RRDENGINE)) + return; + + BUFFER *wb = buffer_create(512); + + if(!descr) { + buffer_sprintf(wb, "DBENGINE: %s : descr is NULL", msg); + } + else { + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + char uuid_str[UUID_STR_LEN]; + + uuid_unparse_lower(*descr->id, uuid_str); + buffer_sprintf(wb, "DBENGINE: %s : page(%p) metric:%s, len:%"PRIu32", time:%"PRIu64"->%"PRIu64", update_every:%u, type:%u, xt_offset:", + msg, + pg_cache_descr->page, uuid_str, + descr->page_length, + (uint64_t)descr->start_time_ut, + (uint64_t)descr->end_time_ut, + (uint32_t)descr->update_every_s, + (uint32_t)descr->type + ); + if (!descr->extent) { + buffer_strcat(wb, "N/A"); + } else { + buffer_sprintf(wb, "%"PRIu64, descr->extent->offset); + } + + buffer_sprintf(wb, ", flags:0x%2.2lX refcnt:%u", pg_cache_descr->flags, pg_cache_descr->refcnt); + } + + if(log_debug) + debug(D_RRDENGINE, "%s", buffer_tostring(wb)); + else + internal_error(true, "%s", buffer_tostring(wb)); + + buffer_free(wb); +} + +void print_page_descr(struct rrdeng_page_descr *descr) +{ + char uuid_str[UUID_STR_LEN]; + char str[BUFSIZE + 1]; + int pos = 0; + + uuid_unparse_lower(*descr->id, uuid_str); + pos += snprintfz(str, BUFSIZE - pos, "id=%s\n" + "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", + uuid_str, + descr->page_length, + (uint64_t)descr->start_time_ut, + (uint64_t)descr->end_time_ut); + if (!descr->extent) { + pos += snprintfz(str + pos, BUFSIZE - pos, "N/A"); + } else { + pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset); + } + snprintfz(str + pos, BUFSIZE - pos, "\n\n"); + fputs(str, stderr); +} + +int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size) +{ + int ret; + uv_fs_t req; + uv_stat_t* s; + + ret = uv_fs_fstat(NULL, &req, file, NULL); + if (ret < 0) { + fatal("uv_fs_fstat: %s\n", uv_strerror(ret)); + } + fatal_assert(req.result == 0); + s = req.ptr; + if (!(s->st_mode & S_IFREG)) { + error("Not a regular file.\n"); + uv_fs_req_cleanup(&req); + return UV_EINVAL; + } + if (s->st_size < min_size) { + error("File length is too short.\n"); + uv_fs_req_cleanup(&req); + return UV_EINVAL; + } + *file_size = s->st_size; + uv_fs_req_cleanup(&req); + + return 0; +} + +/** + * Open file for I/O. + * + * @param path The full path of the file. + * @param flags Same flags as the open() system call uses. + * @param file On success sets (*file) to be the uv_file that was opened. + * @param direct Tries to open a file in direct I/O mode when direct=1, falls back to buffered mode if not possible. + * @return Returns UV error number that is < 0 on failure. 0 on success. + */ +int open_file_for_io(char *path, int flags, uv_file *file, int direct) +{ + uv_fs_t req; + int fd = -1, current_flags; + + fatal_assert(0 == direct || 1 == direct); + for ( ; direct >= 0 ; --direct) { +#ifdef __APPLE__ + /* Apple OS does not support O_DIRECT */ + direct = 0; +#endif + current_flags = flags; + if (direct) { + current_flags |= O_DIRECT; + } + fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + if ((direct) && (UV_EINVAL == fd)) { + error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path); + } else { + error("Failed to open file \"%s\".", path); + --direct; /* break the loop */ + } + } else { + fatal_assert(req.result >= 0); + *file = req.result; +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + --direct; /* break the loop */ + } + uv_fs_req_cleanup(&req); + } + + return fd; +} + +char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size) +{ + struct page_cache *pg_cache; + + pg_cache = &ctx->pg_cache; + snprintfz(str, size, + "metric_API_producers: %ld\n" + "metric_API_consumers: %ld\n" + "page_cache_total_pages: %ld\n" + "page_cache_descriptors: %ld\n" + "page_cache_populated_pages: %ld\n" + "page_cache_committed_pages: %ld\n" + "page_cache_insertions: %ld\n" + "page_cache_deletions: %ld\n" + "page_cache_hits: %ld\n" + "page_cache_misses: %ld\n" + "page_cache_backfills: %ld\n" + "page_cache_evictions: %ld\n" + "compress_before_bytes: %ld\n" + "compress_after_bytes: %ld\n" + "decompress_before_bytes: %ld\n" + "decompress_after_bytes: %ld\n" + "io_write_bytes: %ld\n" + "io_write_requests: %ld\n" + "io_read_bytes: %ld\n" + "io_read_requests: %ld\n" + "io_write_extent_bytes: %ld\n" + "io_write_extents: %ld\n" + "io_read_extent_bytes: %ld\n" + "io_read_extents: %ld\n" + "datafile_creations: %ld\n" + "datafile_deletions: %ld\n" + "journalfile_creations: %ld\n" + "journalfile_deletions: %ld\n" + "io_errors: %ld\n" + "fs_errors: %ld\n" + "global_io_errors: %ld\n" + "global_fs_errors: %ld\n" + "rrdeng_reserved_file_descriptors: %ld\n" + "pg_cache_over_half_dirty_events: %ld\n" + "global_pg_cache_over_half_dirty_events: %ld\n" + "flushing_pressure_page_deletions: %ld\n" + "global_flushing_pressure_page_deletions: %ld\n", + (long)ctx->stats.metric_API_producers, + (long)ctx->stats.metric_API_consumers, + (long)pg_cache->page_descriptors, + (long)ctx->stats.page_cache_descriptors, + (long)pg_cache->populated_pages, + (long)pg_cache->committed_page_index.nr_committed_pages, + (long)ctx->stats.pg_cache_insertions, + (long)ctx->stats.pg_cache_deletions, + (long)ctx->stats.pg_cache_hits, + (long)ctx->stats.pg_cache_misses, + (long)ctx->stats.pg_cache_backfills, + (long)ctx->stats.pg_cache_evictions, + (long)ctx->stats.before_compress_bytes, + (long)ctx->stats.after_compress_bytes, + (long)ctx->stats.before_decompress_bytes, + (long)ctx->stats.after_decompress_bytes, + (long)ctx->stats.io_write_bytes, + (long)ctx->stats.io_write_requests, + (long)ctx->stats.io_read_bytes, + (long)ctx->stats.io_read_requests, + (long)ctx->stats.io_write_extent_bytes, + (long)ctx->stats.io_write_extents, + (long)ctx->stats.io_read_extent_bytes, + (long)ctx->stats.io_read_extents, + (long)ctx->stats.datafile_creations, + (long)ctx->stats.datafile_deletions, + (long)ctx->stats.journalfile_creations, + (long)ctx->stats.journalfile_deletions, + (long)ctx->stats.io_errors, + (long)ctx->stats.fs_errors, + (long)global_io_errors, + (long)global_fs_errors, + (long)rrdeng_reserved_file_descriptors, + (long)ctx->stats.pg_cache_over_half_dirty_events, + (long)global_pg_cache_over_half_dirty_events, + (long)ctx->stats.flushing_pressure_page_deletions, + (long)global_flushing_pressure_page_deletions + ); + return str; +} + +int is_legacy_child(const char *machine_guid) +{ + uuid_t uuid; + char dbengine_file[FILENAME_MAX+1]; + + if (unlikely(!strcmp(machine_guid, "unittest-dbengine") || !strcmp(machine_guid, "dbengine-dataset") || + !strcmp(machine_guid, "dbengine-stress-test"))) { + return 1; + } + if (!uuid_parse(machine_guid, uuid)) { + uv_fs_t stat_req; + snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid); + int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL); + if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) { + //info("Found legacy engine folder \"%s\"", dbengine_file); + return 1; + } + } + return 0; +} + +int count_legacy_children(char *dbfiles_path) +{ + int ret; + uv_fs_t req; + uv_dirent_t dent; + int legacy_engines = 0; + + ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); + if (ret < 0) { + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); + return ret; + } + + while(UV_EOF != uv_fs_scandir_next(&req, &dent)) { + if (dent.type == UV_DIRENT_DIR) { + if (is_legacy_child(dent.name)) + legacy_engines++; + } + } + uv_fs_req_cleanup(&req); + return legacy_engines; +} + +int compute_multidb_diskspace() +{ + char multidb_disk_space_file[FILENAME_MAX + 1]; + FILE *fp; + int computed_multidb_disk_quota_mb = -1; + + snprintfz(multidb_disk_space_file, FILENAME_MAX, "%s/dbengine_multihost_size", netdata_configured_varlib_dir); + fp = fopen(multidb_disk_space_file, "r"); + if (likely(fp)) { + int rc = fscanf(fp, "%d", &computed_multidb_disk_quota_mb); + fclose(fp); + if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) { + errno = 0; + error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file); + computed_multidb_disk_quota_mb = -1; + } + } + + if (computed_multidb_disk_quota_mb == -1) { + int rc = count_legacy_children(netdata_configured_cache_dir); + if (likely(rc >= 0)) { + computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb; + info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb); + + fp = fopen(multidb_disk_space_file, "w"); + if (likely(fp)) { + fprintf(fp, "%d", computed_multidb_disk_quota_mb); + info("Created file '%s' to store the computed value", multidb_disk_space_file); + fclose(fp); + } else + error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file); + } + else + computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb; + } + + return computed_multidb_disk_quota_mb; +} diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h new file mode 100644 index 0000000..6b1a15f --- /dev/null +++ b/database/engine/rrdenginelib.h @@ -0,0 +1,102 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINELIB_H +#define NETDATA_RRDENGINELIB_H + +#include "libnetdata/libnetdata.h" + +/* Forward declarations */ +struct rrdeng_page_descr; +struct rrdengine_instance; + +#define STR_HELPER(x) #x +#define STR(x) STR_HELPER(x) + +#define BITS_PER_ULONG (sizeof(unsigned long) * 8) + +#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) +#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) + +#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC) + +typedef uintptr_t rrdeng_stats_t; + +#ifdef __ATOMIC_RELAXED +#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED) +#define rrd_atomic_add_fetch(p, n) __atomic_add_fetch(p, n, __ATOMIC_RELAXED) +#else +#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n) +#define rrd_atomic_add_fetch(p, n) __sync_add_and_fetch(p, n) +#endif + +#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n) + +/* returns -1 if it didn't find the first cleared bit, the position otherwise. Starts from LSB. */ +static inline int find_first_zero(unsigned x) +{ + return ffs((int)(~x)) - 1; +} + +/* Starts from LSB. */ +static inline uint8_t check_bit(unsigned x, size_t pos) +{ + return !!(x & (1 << pos)); +} + +/* Starts from LSB. val is 0 or 1 */ +static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val) +{ + switch(val) { + case 0: + *x &= ~(1U << pos); + break; + case 1: + *x |= 1U << pos; + break; + default: + error("modify_bit() called with invalid argument."); + break; + } +} + +#define RRDENG_PATH_MAX (4096) + +/* returns old *ptr value */ +static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr, + unsigned long oldval, unsigned long newval) +{ + return __sync_val_compare_and_swap(ptr, oldval, newval); +} + +#ifndef O_DIRECT +/* Workaround for OS X */ +#define O_DIRECT (0) +#endif + +static inline int crc32cmp(void *crcp, uLong crc) +{ + return (*(uint32_t *)crcp != crc); +} + +static inline void crc32set(void *crcp, uLong crc) +{ + *(uint32_t *)crcp = crc; +} + +void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug); +void print_page_descr(struct rrdeng_page_descr *descr); +int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); +int open_file_for_io(char *path, int flags, uv_file *file, int direct); +static inline int open_file_direct_io(char *path, int flags, uv_file *file) +{ + return open_file_for_io(path, flags, file, 1); +} +static inline int open_file_buffered_io(char *path, int flags, uv_file *file) +{ + return open_file_for_io(path, flags, file, 0); +} +char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size); +int compute_multidb_diskspace(); +int is_legacy_child(const char *machine_guid); + +#endif /* NETDATA_RRDENGINELIB_H */ diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c new file mode 100644 index 0000000..a23abf3 --- /dev/null +++ b/database/engine/rrdenglocking.c @@ -0,0 +1,241 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx) +{ + struct page_cache_descr *pg_cache_descr; + + pg_cache_descr = mallocz(sizeof(*pg_cache_descr)); + rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, 1); + pg_cache_descr->page = NULL; + pg_cache_descr->flags = 0; + pg_cache_descr->prev = pg_cache_descr->next = NULL; + pg_cache_descr->refcnt = 0; + pg_cache_descr->waiters = 0; + fatal_assert(0 == uv_cond_init(&pg_cache_descr->cond)); + fatal_assert(0 == uv_mutex_init(&pg_cache_descr->mutex)); + + return pg_cache_descr; +} + +void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr) +{ + uv_cond_destroy(&pg_cache_descr->cond); + uv_mutex_destroy(&pg_cache_descr->mutex); + freez(pg_cache_descr); + rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1); +} + +/* also allocates page cache descriptor if missing */ +void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, old_users, new_state, ret_state; + struct page_cache_descr *pg_cache_descr = NULL; + uint8_t we_locked; + + we_locked = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(we_locked)) { + fatal_assert(old_state & PG_CACHE_DESCR_LOCKED); + new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + fatal_assert(0 == old_users); + continue; /* spin */ + } + if (0 == old_state) { + /* no page cache descriptor has been allocated */ + + if (NULL == pg_cache_descr) { + pg_cache_descr = rrdeng_create_pg_cache_descr(ctx); + } + new_state = PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, 0, new_state); + if (0 == ret_state) { + we_locked = 1; + descr->pg_cache_descr = pg_cache_descr; + pg_cache_descr->descr = descr; + pg_cache_descr = NULL; /* make sure we don't free pg_cache_descr */ + /* retry */ + continue; + } + continue; /* spin */ + } + /* page cache descriptor is already allocated */ + if (unlikely(!(old_state & PG_CACHE_DESCR_ALLOCATED))) { + fatal("Invalid page cache descriptor locking state:%#lX", old_state); + } + new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT; + new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + /* spin */ + } + + if (pg_cache_descr) { + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + } + pg_cache_descr = descr->pg_cache_descr; + uv_mutex_lock(&pg_cache_descr->mutex); +} + +void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, new_state, ret_state, old_users; + struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL; + uint8_t we_locked; + + uv_mutex_unlock(&descr->pg_cache_descr->mutex); + + we_locked = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(we_locked)) { + fatal_assert(0 == old_users); + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0); + if (old_state == ret_state) { + /* success */ + rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr); + return; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + fatal_assert(0 == old_users); + continue; /* spin */ + } + fatal_assert(old_state & PG_CACHE_DESCR_ALLOCATED); + pg_cache_descr = descr->pg_cache_descr; + /* caller is the only page cache descriptor user and there are no pending references on the page */ + if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) && + !pg_cache_descr->flags && !pg_cache_descr->refcnt) { + fatal_assert(!pg_cache_descr->waiters); + + new_state = PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + we_locked = 1; + delete_pg_cache_descr = pg_cache_descr; + descr->pg_cache_descr = NULL; + /* retry */ + continue; + } + continue; /* spin */ + } + fatal_assert(old_users > 0); + new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT; + new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + /* spin */ + } +} + +/* + * Tries to deallocate page cache descriptor. If it fails, it postpones deallocation by setting the + * PG_CACHE_DESCR_DESTROY flag which will be eventually cleared by a different context after doing + * the deallocation. + */ +void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, new_state, ret_state, old_users; + struct page_cache_descr *pg_cache_descr = NULL; + uint8_t just_locked, can_free, must_unlock; + + just_locked = 0; + can_free = 0; + must_unlock = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(just_locked)) { + fatal_assert(0 == old_users); + + must_unlock = 1; + just_locked = 0; + /* Try deallocate if there are no pending references on the page */ + if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) { + fatal_assert(!pg_cache_descr->waiters); + + descr->pg_cache_descr = NULL; + can_free = 1; + /* success */ + continue; + } + continue; /* spin */ + } + if (unlikely(must_unlock)) { + fatal_assert(0 == old_users); + + if (can_free) { + /* success */ + new_state = 0; + } else { + new_state = old_state | PG_CACHE_DESCR_DESTROY; + new_state &= ~PG_CACHE_DESCR_LOCKED; + } + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* unlocked */ + if (can_free) + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + return; + } + continue; /* spin */ + } + if (!(old_state & PG_CACHE_DESCR_ALLOCATED)) { + /* don't do anything */ + return; + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + fatal_assert(0 == old_users); + continue; /* spin */ + } + /* caller is the only page cache descriptor user */ + if (0 == old_users) { + new_state = old_state | PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + just_locked = 1; + pg_cache_descr = descr->pg_cache_descr; + /* retry */ + continue; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_DESTROY) { + /* don't do anything */ + return; + } + /* plant PG_CACHE_DESCR_DESTROY so that other contexts eventually free the page cache descriptor */ + new_state = old_state | PG_CACHE_DESCR_DESTROY; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + return; + } + /* spin */ + } +} \ No newline at end of file diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h new file mode 100644 index 0000000..078eab3 --- /dev/null +++ b/database/engine/rrdenglocking.h @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGLOCKING_H +#define NETDATA_RRDENGLOCKING_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct page_cache_descr; + +struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx); +void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr); +void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); + +#endif /* NETDATA_RRDENGLOCKING_H */ \ No newline at end of file -- cgit v1.2.3