diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 11:08:07 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 11:08:07 +0000 |
commit | c69cb8cc094cc916adbc516b09e944cd3d137c01 (patch) | |
tree | f2878ec41fb6d0e3613906c6722fc02b934eeb80 /database/engine | |
parent | Initial commit. (diff) | |
download | netdata-c69cb8cc094cc916adbc516b09e944cd3d137c01.tar.xz netdata-c69cb8cc094cc916adbc516b09e944cd3d137c01.zip |
Adding upstream version 1.29.3.upstream/1.29.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine')
29 files changed, 7052 insertions, 0 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am new file mode 100644 index 0000000..4340500 --- /dev/null +++ b/database/engine/Makefile.am @@ -0,0 +1,12 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +SUBDIRS = \ + metadata_log \ + $(NULL) + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/database/engine/README.md b/database/engine/README.md new file mode 100644 index 0000000..191366a --- /dev/null +++ b/database/engine/README.md @@ -0,0 +1,259 @@ +<!-- +title: "Database engine" +description: "Netdata's highly-efficient database engine use both RAM and disk for distributed, long-term storage of per-second metrics." +custom_edit_url: https://github.com/netdata/netdata/edit/master/database/engine/README.md +--> + +# Database engine + +The Database Engine works like a traditional database. It dedicates a certain amount of RAM to data caching and +indexing, while the rest of the data resides compressed on disk. Unlike other [memory modes](/database/README.md), the +amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression +ratio, not a fixed number of metrics collected. + +By using both RAM and disk space, the database engine allows for long-term storage of per-second metrics inside of the +Agent itself. + +In addition, the database engine is the only memory mode that supports changing the data collection update frequency +(`update_every`) without losing the metrics your Agent already gathered and stored. + +## Configuration + +To use the database engine, open `netdata.conf` and set `memory mode` to `dbengine`. + +```conf +[global] + memory mode = dbengine +``` + +To configure the database engine, look for the `page cache size` and `dbengine multihost disk space` settings in the +`[global]` section of your `netdata.conf`. The Agent ignores the `history` setting when using the database engine. + +```conf +[global] + page cache size = 32 + dbengine multihost disk space = 256 +``` + +The above values are the default values for Page Cache size and DB engine disk space quota. Both numbers are +in **MiB**. + +The `page cache size` option determines the amount of RAM in **MiB** dedicated to caching Netdata metric values. The +actual page cache size will be slightly larger than this figure—see the [memory requirements](#memory-requirements) +section for details. + +The `dbengine multihost disk space` option determines the amount of disk space in **MiB** that is dedicated to storing +Netdata metric values and all related metadata describing them. You can use the [**database engine +calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics) +to correctly set `dbengine multihost disk space` based on your metrics retention policy. The calculator gives an +accurate estimate based on how many child nodes you have, how many metrics your Agent collects, and more. + +### Legacy configuration + +The deprecated `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated to storing +Netdata metric values per legacy database engine instance (see [details on the legacy mode](#legacy-mode) below). + +```conf +[global] + dbengine disk space = 256 +``` + +### Streaming metrics to the database engine + +When using the multihost database engine, all parent and child nodes share the same `page cache size` and `dbengine +multihost disk space` in a single dbengine instance. The [**database engine +calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics) +helps you properly set `page cache size` and `dbengine multihost disk space` on your parent node to allocate enough +resources based on your metrics retention policy and how many child nodes you have. + +#### Legacy mode + +_For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and +another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five +instances in total (`1 parent + 4 child nodes = 5 instances`). + +The Agent allocates resources for each instance separately using the `dbengine disk space` (**deprecated**) setting. If +`dbengine disk space`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, which +means the total disk space required to store all instances is, roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`. + +#### Backward compatibility + +All existing metrics belonging to child nodes are automatically converted to legacy dbengine instances and the localhost +metrics are transferred to the multihost dbengine instance. + +All new child nodes are automatically transferred to the multihost dbengine instance and share its page cache and disk +space. If you want to migrate a child node from its legacy dbengine instance to the multihost dbengine instance, you +must delete the instance's directory, which is located in `/var/cache/netdata/MACHINE_GUID/dbengine`, after stopping the +Agent. + +##### Information + +For more information about setting `memory mode` on your nodes, in addition to other streaming configurations, see +[streaming](/streaming/README.md). + +### Memory requirements + +Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the +available memory. + +There are explicit memory requirements **per** DB engine **instance**: + +- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what + the user configured with `page cache size`. + +- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata. + + - roughly speaking this is 3% of the uncompressed disk space taken by the DB files. + + - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space + footprint. + +An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space` +options. + +You can use our [database engine +calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics) +to validate the memory requirements for your particular system(s) and configuration (**out-of-date**). + +### Disk space requirements + +There are explicit disk space requirements **per** DB engine **instance**: + +- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what + the user configured with `dbengine multihost disk space` or `dbengine disk space`. + +### File descriptor requirements + +The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or +parent server). When configuring your system you should make sure there are at least 50 file descriptors available per +`dbengine` instance. + +Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of +the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take +that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the +Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use +`memory mode = dbengine`. + +If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to +override the Netdata service by running `sudo systemctl edit netdata` and creating a file with contents: + +```sh +[Service] +LimitNOFILE=65536 +``` + +For other types of services one can add the line: + +```sh +ulimit -n 65536 +``` + +at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing + `/etc/sysctl.conf`. For linux that would be: + +```conf +fs.file-max = 65536 +``` + +In FreeBSD and OS X you change the lines like this: + +```conf +kern.maxfilesperproc=65536 +kern.maxfiles=65536 +``` + +You can apply the settings by running `sysctl -p` or by rebooting. + +## Files + +With the DB engine memory mode the metric data are stored in database files. These files are organized in pairs, the +datafiles and their corresponding journalfiles, e.g.: + +```sh +datafile-1-0000000001.ndf +journalfile-1-0000000001.njf +datafile-1-0000000002.ndf +journalfile-1-0000000002.njf +datafile-1-0000000003.ndf +journalfile-1-0000000003.njf +... +``` + +They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default +location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user +can safely delete some pairs of files when Netdata is stopped to manually free up some space. + +_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ You can also set up +one or more [exporting connectors](/exporting/README.md) to send your Netdata metrics to other databases for long-term +storage at lower granularity. + +## Operation + +The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store +consecutive values generated from the data collectors. Those pages comprise the **Page Cache**. + +When those pages fill up they are slowly compressed and flushed to disk. It can take `4096 / 4 = 1024 seconds = 17 +minutes`, for a chart dimension that is being collected every 1 second, to fill a page. Pages can be cut short when we +stop Netdata or the DB engine instance so as to not lose the data. When we query the DB engine for data we trigger disk +read I/O requests that fill the Page Cache with the requested pages and potentially evict cold (not recently used) +pages. + +When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting +the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated +and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time. + +The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O +traffic so as to create the minimum possible interference with other applications. + +## Evaluation + +We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the web +API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a Netdata +parent node. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a Seagate +Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD. + +For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker +thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis +of the time-series is emulated and accelerated so that the worker threads can generate as many data points as possible +without delays. + +We also defined 32 worker threads that perform queries on random metrics with semi-random time ranges. The +starting time of the query is randomly selected between the beginning of the time-series and the time of the latest data +point. The ending time is randomly selected between 1 second and 1 hour after the starting time. The pseudo-random +numbers are generated with a uniform distribution. + +The data are written to the database at the same time as they are read from it. This is a concurrent read/write mixed +workload with a duration of 60 seconds. The faster `dbengine` runs, the bigger the dataset size becomes since more +data points will be generated. We set a page cache size of 64MiB for the two disk-bound scenarios. This way, the dataset +size of the metric data is much bigger than the RAM that is being used for caching so as to trigger I/O requests most +of the time. In our final scenario, we set the page cache size to 16 GiB. That way, the dataset fits in the page cache +so as to avoid all disk bottlenecks. + +The reported numbers are the following: + +| device | page cache | dataset | reads/sec | writes/sec | +| :----: | :--------: | ------: | --------: | ---------: | +| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M | +| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M | +| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M | + +where "reads/sec" is the number of metric data points being read from the database via its API per second and +"writes/sec" is the number of metric data points being written to the database per second. + +Notice that the HDD numbers are pretty high and not much slower than the SSD numbers. This is thanks to the database +engine design being optimized for rotating media. In the database engine disk I/O requests are: + +- asynchronous to mask the high I/O latency of HDDs. +- mostly large to reduce the amount of HDD seeking time. +- mostly sequential to reduce the amount of HDD seeking time. +- compressed to reduce the amount of required throughput. + +As a result, the HDD is not thousands of times slower than the SSD, which is typical for other workloads. + +An interesting observation to make is that the CPU-bound run (16 GiB page cache) generates fewer data than the SSD run +(6.8 GiB vs 9.8 GiB). The reason is that the 32 reader threads in the SSD scenario are more frequently blocked by I/O, +and generate a read load of 1.7M/sec, whereas in the CPU-bound scenario the read load is 70 times higher at 118M/sec. +Consequently, there is a significant degree of interference by the reader threads, that slow down the writer threads. +This is also possible because the interference effects are greater than the SSD impact on data generation throughput. + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/database/engine/datafile.c b/database/engine/datafile.c new file mode 100644 index 0000000..7a052f9 --- /dev/null +++ b/database/engine/datafile.c @@ -0,0 +1,460 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +void df_extent_insert(struct extent_info *extent) +{ + struct rrdengine_datafile *datafile = extent->datafile; + + if (likely(NULL != datafile->extents.last)) { + datafile->extents.last->next = extent; + } + if (unlikely(NULL == datafile->extents.first)) { + datafile->extents.first = extent; + } + datafile->extents.last = extent; +} + +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + if (likely(NULL != ctx->datafiles.last)) { + ctx->datafiles.last->next = datafile; + } + if (unlikely(NULL == ctx->datafiles.first)) { + ctx->datafiles.first = datafile; + } + ctx->datafiles.last = datafile; +} + +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + struct rrdengine_datafile *next; + + next = datafile->next; + fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); + ctx->datafiles.first = next; +} + + +static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx, + unsigned tier, unsigned fileno) +{ + fatal_assert(tier == 1); + datafile->tier = tier; + datafile->fileno = fileno; + datafile->file = (uv_file)0; + datafile->pos = 0; + datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */ + datafile->journalfile = NULL; + datafile->next = NULL; + datafile->ctx = ctx; +} + +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +int close_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return ret; +} + +int destroy_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); + if (ret < 0) { + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return ret; +} + +int create_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + datafile->file = file; + ++ctx->stats.datafile_creations; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ); + superblock->tier = 1; + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + } + uv_fs_req_cleanup(&req); + free(superblock); + if (ret < 0) { + destroy_data_file(datafile); + return ret; + } + + datafile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + + return 0; +} + +static int check_data_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || + superblock->tier != 1) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + free(superblock); + return ret; +} + +static int load_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_RDWR, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + info("Initializing data file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_CEILING(file_size); + + ret = check_data_file_superblock(file); + if (ret) + goto error; + ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb); + ++ctx->stats.io_read_requests; + + datafile->file = file; + datafile->pos = file_size; + + info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + return 0; + + error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + return error; +} + +static int scan_data_files_cmp(const void *a, const void *b) +{ + struct rrdengine_datafile *file1, *file2; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; + + file1 = *(struct rrdengine_datafile **)a; + file2 = *(struct rrdengine_datafile **)b; + generate_datafilepath(file1, path1, sizeof(path1)); + generate_datafilepath(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of datafiles that were loaded or < 0 on error */ +static int scan_data_files(struct rrdengine_instance *ctx) +{ + int ret; + unsigned tier, no, matched_files, i,failed_to_load; + static uv_fs_t req; + uv_dirent_t dent; + struct rrdengine_datafile **datafiles, *datafile; + struct rrdengine_journalfile *journalfile; + + ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } + info("Found %d files in path %s", ret, ctx->dbfiles_path); + + datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); + ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); + if (2 == ret) { + info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, no); + datafiles[matched_files++] = datafile; + } + } + uv_fs_req_cleanup(&req); + + if (0 == matched_files) { + freez(datafiles); + return 0; + } + if (matched_files == MAX_DATAFILES) { + error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + } + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + /* TODO: change this when tiering is implemented */ + ctx->last_fileno = datafiles[matched_files - 1]->fileno; + + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + uint8_t must_delete_pair = 0; + + datafile = datafiles[i]; + ret = load_data_file(datafile); + if (0 != ret) { + must_delete_pair = 1; + } + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = load_journal_file(ctx, journalfile, datafile); + if (0 != ret) { + if (!must_delete_pair) /* If datafile is still open close it */ + close_data_file(datafile); + must_delete_pair = 1; + } + if (must_delete_pair) { + char path[RRDENG_PATH_MAX]; + + error("Deleting invalid data and journal file pair."); + ret = unlink_journal_file(journalfile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + } + ret = unlink_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + } + freez(journalfile); + freez(datafile); + ++failed_to_load; + continue; + } + + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; + } + matched_files -= failed_to_load; + freez(datafiles); + + return matched_files; +} + +/* Creates a datafile and a journalfile pair */ +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +{ + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + int ret; + char path[RRDENG_PATH_MAX]; + + info("Creating new data and journal files in path %s", ctx->dbfiles_path); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, fileno); + ret = create_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Created data file \"%s\".", path); + } else { + goto error_after_datafile; + } + + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = create_journal_file(journalfile, datafile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Created journal file \"%s\".", path); + } else { + goto error_after_journalfile; + } + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; + + return 0; + +error_after_journalfile: + destroy_data_file(datafile); + freez(journalfile); +error_after_datafile: + freez(datafile); + return ret; +} + +/* Page cache must already be initialized. + * Return 0 on success. + */ +int init_data_files(struct rrdengine_instance *ctx) +{ + int ret; + + ret = scan_data_files(ctx); + if (ret < 0) { + error("Failed to scan path \"%s\".", ctx->dbfiles_path); + return ret; + } else if (0 == ret) { + info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path); + ret = create_new_datafile_pair(ctx, 1, 1); + if (ret) { + error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path); + return ret; + } + ctx->last_fileno = 1; + } + + return 0; +} + +void finalize_data_files(struct rrdengine_instance *ctx) +{ + struct rrdengine_datafile *datafile, *next_datafile; + struct rrdengine_journalfile *journalfile; + struct extent_info *extent, *next_extent; + + for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { + journalfile = datafile->journalfile; + next_datafile = datafile->next; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { + next_extent = extent->next; + freez(extent); + } + close_journal_file(journalfile, datafile); + close_data_file(datafile); + freez(journalfile); + freez(datafile); + + } +}
\ No newline at end of file diff --git a/database/engine/datafile.h b/database/engine/datafile.h new file mode 100644 index 0000000..ae94bfd --- /dev/null +++ b/database/engine/datafile.h @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DATAFILE_H +#define NETDATA_DATAFILE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_datafile; +struct rrdengine_journalfile; +struct rrdengine_instance; + +#define DATAFILE_PREFIX "datafile-" +#define DATAFILE_EXTENSION ".ndf" + +#define MAX_DATAFILE_SIZE (1073741824LU) +#define MIN_DATAFILE_SIZE (4194304LU) +#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ +#define TARGET_DATAFILES (20) + +#define DATAFILE_IDEAL_IO_SIZE (1048576U) + +struct extent_info { + uint64_t offset; + uint32_t size; + uint8_t number_of_pages; + struct rrdengine_datafile *datafile; + struct extent_info *next; + struct rrdeng_page_descr *pages[]; +}; + +struct rrdengine_df_extents { + /* the extent list is sorted based on disk offset */ + struct extent_info *first; + struct extent_info *last; +}; + +/* only one event loop is supported for now */ +struct rrdengine_datafile { + unsigned tier; + unsigned fileno; + uv_file file; + uint64_t pos; + struct rrdengine_instance *ctx; + struct rrdengine_df_extents extents; + struct rrdengine_journalfile *journalfile; + struct rrdengine_datafile *next; +}; + +struct rrdengine_datafile_list { + struct rrdengine_datafile *first; /* oldest */ + struct rrdengine_datafile *last; /* newest */ +}; + +extern void df_extent_insert(struct extent_info *extent); +extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +extern int close_data_file(struct rrdengine_datafile *datafile); +extern int unlink_data_file(struct rrdengine_datafile *datafile); +extern int destroy_data_file(struct rrdengine_datafile *datafile); +extern int create_data_file(struct rrdengine_datafile *datafile); +extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +extern int init_data_files(struct rrdengine_instance *ctx); +extern void finalize_data_files(struct rrdengine_instance *ctx); + +#endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c new file mode 100644 index 0000000..9fecc48 --- /dev/null +++ b/database/engine/journalfile.c @@ -0,0 +1,515 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +static void flush_transaction_buffer_cb(uv_fs_t* req) +{ + struct generic_io_descriptor *io_descr = req->data; + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + + debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + if (req->result < 0) { + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } else { + debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + } + + uv_fs_req_cleanup(req); + free(io_descr->buf); + freez(io_descr); +} + +/* Careful to always call this before creating a new journal file */ +void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + struct generic_io_descriptor *io_descr; + unsigned pos, size; + struct rrdengine_journalfile *journalfile; + + if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) { + return; + } + /* care with outstanding transactions when switching journal files */ + journalfile = ctx->datafiles.last->journalfile; + + io_descr = mallocz(sizeof(*io_descr)); + pos = ctx->commit_log.buf_pos; + size = ctx->commit_log.buf_size; + if (pos < size) { + /* simulate an empty transaction to skip the rest of the block */ + *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING; + } + io_descr->buf = ctx->commit_log.buf; + io_descr->bytes = size; + io_descr->pos = journalfile->pos; + io_descr->req.data = io_descr; + io_descr->completion = NULL; + + io_descr->iov = uv_buf_init((void *)io_descr->buf, size); + ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, + journalfile->pos, flush_transaction_buffer_cb); + fatal_assert(-1 != ret); + journalfile->pos += RRDENG_BLOCK_SIZE; + ctx->disk_space += RRDENG_BLOCK_SIZE; + ctx->commit_log.buf = NULL; + ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE; + ++ctx->stats.io_write_requests; +} + +void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + unsigned buf_pos = 0, buf_size; + + fatal_assert(size); + if (ctx->commit_log.buf) { + unsigned remaining; + + buf_pos = ctx->commit_log.buf_pos; + buf_size = ctx->commit_log.buf_size; + remaining = buf_size - buf_pos; + if (size > remaining) { + /* we need a new buffer */ + wal_flush_transaction_buffer(wc); + } + } + if (NULL == ctx->commit_log.buf) { + buf_size = ALIGN_BYTES_CEILING(size); + ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + buf_pos = ctx->commit_log.buf_pos = 0; + ctx->commit_log.buf_size = buf_size; + } + ctx->commit_log.buf_pos += size; + + return ctx->commit_log.buf + buf_pos; +} + +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + journalfile->file = (uv_file)0; + journalfile->pos = 0; + journalfile->datafile = datafile; +} + +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_journal_file(struct rrdengine_journalfile *journalfile) +{ + struct rrdengine_datafile *datafile = journalfile->datafile; + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return ret; +} + +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); + if (ret < 0) { + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return ret; +} + +int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + journalfile->file = file; + ++ctx->stats.journalfile_creations; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ); + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + } + uv_fs_req_cleanup(&req); + free(superblock); + if (ret < 0) { + destroy_journal_file(journalfile, datafile); + return ret; + } + + journalfile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + + return 0; +} + +static int check_journal_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + free(superblock); + return ret; +} + +static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, unsigned max_size) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned i, count, payload_length, descr_size, valid_pages; + struct rrdeng_page_descr *descr; + struct extent_info *extent; + /* persistent structures */ + struct rrdeng_jf_store_data *jf_metric_data; + + jf_metric_data = buf; + count = jf_metric_data->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + if (payload_length > max_size) { + error("Corrupted transaction payload."); + return; + } + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + extent->offset = jf_metric_data->extent_offset; + extent->size = jf_metric_data->extent_size; + extent->datafile = journalfile->datafile; + extent->next = NULL; + + for (i = 0, valid_pages = 0 ; i < count ; ++i) { + uuid_t *temp_id; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + + if (PAGE_METRICS != jf_metric_data->descr[i].type) { + error("Unknown page type encountered."); + continue; + } + temp_id = (uuid_t *)jf_metric_data->descr[i].uuid; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + /* First time we see the UUID */ + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(temp_id); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + + descr = pg_cache_create_descr(); + descr->page_length = jf_metric_data->descr[i].page_length; + descr->start_time = jf_metric_data->descr[i].start_time; + descr->end_time = jf_metric_data->descr[i].end_time; + descr->id = &page_index->id; + descr->extent = extent; + extent->pages[valid_pages++] = descr; + pg_cache_insert(ctx, page_index, descr); + } + + extent->number_of_pages = valid_pages; + + if (likely(valid_pages)) + df_extent_insert(extent); + else + freez(extent); +} + +/* + * Replays transaction by interpreting up to max_size bytes from buf. + * Sets id to the current transaction id or to 0 if unknown. + * Returns size of transaction record or 0 for unknown size. + */ +static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, uint64_t *id, unsigned max_size) +{ + unsigned payload_length, size_bytes; + int ret; + /* persistent structures */ + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + *id = 0; + jf_header = buf; + if (STORE_PADDING == jf_header->type) { + debug(D_RRDENGINE, "Skipping padding."); + return 0; + } + if (sizeof(*jf_header) > max_size) { + error("Corrupted transaction record, skipping."); + return 0; + } + *id = jf_header->id; + payload_length = jf_header->payload_length; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + if (size_bytes > max_size) { + error("Corrupted transaction record, skipping."); + return 0; + } + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + ret = crc32cmp(jf_trailer->checksum, crc); + debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); + if (unlikely(ret)) { + error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); + return size_bytes; + } + switch (jf_header->type) { + case STORE_DATA: + debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + break; + default: + error("Unknown transaction type. Skipping record."); + break; + } + + return size_bytes; +} + + +#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256) +/* + * Iterates journal file transactions and populates the page cache. + * Page cache must already be initialized. + * Returns the maximum transaction id it discovered. + */ +static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) +{ + uv_file file; + uint64_t file_size;//, data_file_size; + int ret; + uint64_t pos, pos_i, max_id, id; + unsigned size_bytes; + void *buf; + uv_buf_t iov; + uv_fs_t req; + + file = journalfile->file; + file_size = journalfile->pos; + //data_file_size = journalfile->datafile->pos; TODO: utilize this? + + max_id = 1; + ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + + for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) { + size_bytes = MIN(READAHEAD_BYTES, file_size - pos); + iov = uv_buf_init(buf, size_bytes); + ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); + if (ret < 0) { + fatal("uv_fs_read: %s", uv_strerror(ret)); + /*uv_fs_req_cleanup(&req);*/ + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + ctx->stats.io_read_bytes += size_bytes; + ++ctx->stats.io_read_requests; + + //pos_i = pos; + //while (pos_i < pos + size_bytes) { + for (pos_i = 0 ; pos_i < size_bytes ; ) { + unsigned max_size; + + max_size = pos + size_bytes - pos_i; + ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size); + if (!ret) /* TODO: support transactions bigger than 4K */ + /* unknown transaction size, move on to the next block */ + pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE); + else + pos_i += ret; + max_id = MAX(max_id, id); + } + } + + free(buf); + return max_id; +} + +int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile) +{ + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size, max_id; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = open_file_direct_io(path, O_RDWR, &file); + if (fd < 0) { + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + info("Loading journal file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_FLOOR(file_size); + + ret = check_journal_file_superblock(file); + if (ret) + goto error; + ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb); + ++ctx->stats.io_read_requests; + + journalfile->file = file; + journalfile->pos = file_size; + + max_id = iterate_transactions(ctx, journalfile); + + ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1); + + info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size); + return 0; + + error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + return error; +} + +void init_commit_log(struct rrdengine_instance *ctx) +{ + ctx->commit_log.buf = NULL; + ctx->commit_log.buf_pos = 0; + ctx->commit_log.transaction_id = 1; +}
\ No newline at end of file diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h new file mode 100644 index 0000000..f6c43cd --- /dev/null +++ b/database/engine/journalfile.h @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_JOURNALFILE_H +#define NETDATA_JOURNALFILE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_instance; +struct rrdengine_worker_config; +struct rrdengine_datafile; +struct rrdengine_journalfile; + +#define WALFILE_PREFIX "journalfile-" +#define WALFILE_EXTENSION ".njf" + + +/* only one event loop is supported for now */ +struct rrdengine_journalfile { + uv_file file; + uint64_t pos; + + struct rrdengine_datafile *datafile; +}; + +/* only one event loop is supported for now */ +struct transaction_commit_log { + uint64_t transaction_id; + + /* outstanding transaction buffer */ + void *buf; + unsigned buf_pos; + unsigned buf_size; +}; + +extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); +extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); +extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern int unlink_journal_file(struct rrdengine_journalfile *journalfile); +extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile); +extern void init_commit_log(struct rrdengine_instance *ctx); + + +#endif /* NETDATA_JOURNALFILE_H */
\ No newline at end of file diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am new file mode 100644 index 0000000..161784b --- /dev/null +++ b/database/engine/metadata_log/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/database/engine/metadata_log/README.md diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c new file mode 100644 index 0000000..ba19e1e --- /dev/null +++ b/database/engine/metadata_log/compaction.c @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" + +/* Return 0 on success. */ +int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, + unsigned *matched_files) +{ + int ret; + unsigned starting_fileno, fileno, i, j, recovered_files; + struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + for (i = 0 ; i < *matched_files ; ++i) { + metalogfile = metalogfiles[i]; + if (0 == metalogfile->starting_fileno) + continue; /* skip standard metadata log files */ + break; /* this is a compaction temporary file */ + } + if (i == *matched_files) /* no recovery needed */ + return 0; + info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + + if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */ + error("Metadata log files are in an invalid state. Cannot proceed."); + return 1; + } + compactionfile = metalogfile; + starting_fileno = compactionfile->starting_fileno; + fileno = compactionfile->fileno; + /* scratchpad space to move file pointers around */ + tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles)); + + for (j = 0, recovered_files = 0 ; j < i ; ++j) { + metalogfile = metalogfiles[j]; + fatal_assert(0 == metalogfile->starting_fileno); + if (metalogfile->fileno < starting_fileno) { + tmp_metalogfiles[recovered_files++] = metalogfile; + continue; + } + break; /* reached compaction file serial number */ + } + + if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ || + (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) { + error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno); + unlink_metadata_logfile(compactionfile); + freez(compactionfile); + freez(tmp_metalogfiles); + --*matched_files; /* delete the last one */ + + info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + return 0; + } + + for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */ + metalogfile = metalogfiles[j]; + fatal_assert(0 == metalogfile->starting_fileno); + if (metalogfile->fileno < fileno) { /* It has already been compacted */ + error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno); + unlink_metadata_logfile(metalogfile); + freez(metalogfile); + continue; + } + tmp_metalogfiles[recovered_files++] = metalogfile; + } + + /* compaction temporary file is valid */ + tmp_metalogfiles[recovered_files++] = compactionfile; + ret = rename_metadata_logfile(compactionfile, 0, starting_fileno); + if (ret < 0) { + error("Cannot rename temporary compaction files. Cannot proceed."); + freez(tmp_metalogfiles); + return 1; + } + + memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles)); + *matched_files = recovered_files; + freez(tmp_metalogfiles); + + info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + return 0; +} diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h new file mode 100644 index 0000000..d046134 --- /dev/null +++ b/database/engine/metadata_log/compaction.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_COMPACTION_H +#define NETDATA_COMPACTION_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include "../rrdengine.h" + +extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, + unsigned *matched_files); + +#endif /* NETDATA_COMPACTION_H */ diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c new file mode 100644 index 0000000..b7c5c06 --- /dev/null +++ b/database/engine/metadata_log/logfile.c @@ -0,0 +1,453 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include <database/sqlite/sqlite_functions.h> +#include "metadatalog.h" +#include "metalogpluginsd.h" + + +void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION, + metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); +} + +void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno, + unsigned fileno) +{ + metalogfile->starting_fileno = starting_fileno; + metalogfile->fileno = fileno; + metalogfile->file = (uv_file)0; + metalogfile->pos = 0; + metalogfile->next = NULL; + metalogfile->ctx = ctx; +} + +int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno) +{ + //struct metalog_instance *ctx = metalogfile->ctx; + uv_fs_t req; + int ret; + char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX]; + unsigned backup_starting_fileno, backup_fileno; + + backup_starting_fileno = metalogfile->starting_fileno; + backup_fileno = metalogfile->fileno; + generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath)); + metalogfile->starting_fileno = new_starting_fileno; + metalogfile->fileno = new_fileno; + generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath)); + + info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath); + ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL); + if (ret < 0) { + error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret)); + //++ctx->stats.fs_errors; /* this is racy, may miss some errors */ + rrd_stat_atomic_add(&global_fs_errors, 1); + /* restore previous values */ + metalogfile->starting_fileno = backup_starting_fileno; + metalogfile->fileno = backup_fileno; + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_metadata_logfile(struct metadata_logfile *metalogfile) +{ + //struct metalog_instance *ctx = metalogfile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_metadata_logfile_path(metalogfile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +static int check_metadata_logfile_superblock(uv_file file) +{ + int ret; + struct rrdeng_metalog_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + if (superblock->version > RRDENG_METALOG_VER) { + error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version); + } +error: + free(superblock); + return ret; +} + +void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload) +{ + struct metalog_instance *ctx = metalogfile->ctx; + char *line, *nextline, *record_end; + int ret; + + debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload); + record_end = (char *)payload + header->payload_length - 1; + *record_end = '\0'; + + for (line = payload ; line ; line = nextline) { + nextline = strchr(line, '\n'); + if (nextline) { + *nextline++ = '\0'; + } + ret = parser_action(ctx->metalog_parser_object->parser, line); + debug(D_METADATALOG, "parser_action ret:%d", ret); + if (ret) + return; /* skip record due to error */ + }; +} + +/* This function only works with buffered I/O */ +static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset) +{ +// struct metalog_instance *ctx; + uv_file file; + uv_buf_t iov; + uv_fs_t req; + int ret; + +// ctx = metalogfile->ctx; + file = metalogfile->file; + iov = uv_buf_init(buf, len); + ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL); + if (unlikely(ret < 0 && ret != req.result)) { + fatal("uv_fs_read: %s", uv_strerror(ret)); + } + if (req.result < 0) { +// ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__, + uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno); + } + uv_fs_req_cleanup(&req); +// ctx->stats.io_read_bytes += len; +// ++ctx->stats.io_read_requests; + + return ret; +} + +/* Return 0 on success */ +static int metadata_record_integrity_check(void *record) +{ + int ret; + uint32_t data_size; + struct rrdeng_metalog_record_header *header; + struct rrdeng_metalog_record_trailer *trailer; + uLong crc; + + header = record; + data_size = header->header_length + header->payload_length; + trailer = record + data_size; + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, record, data_size); + ret = crc32cmp(trailer->checksum, crc); + + return ret; +} + +#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */ + +/* + * Iterates metadata log file records and creates database objects (host/chart/dimension) + */ +static void iterate_records(struct metadata_logfile *metalogfile) +{ + uint32_t file_size, pos, bytes_remaining, record_size; + void *buf; + struct rrdeng_metalog_record_header *header; + struct metalog_instance *ctx = metalogfile->ctx; + struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private; + const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) + + sizeof(header->header_length); + + file_size = metalogfile->pos; + state->metalogfile = metalogfile; + + buf = mallocz(MAX_READ_BYTES); + + for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) { + bytes_remaining = file_size - pos; + if (bytes_remaining < min_header_size) { + error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + break; + } + if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0) + break; + header = (struct rrdeng_metalog_record_header *)buf; + if (METALOG_STORE_PADDING == header->type) { + info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos; + continue; + } + if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size, + pos + min_header_size) < 0) + break; + record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer); + if (header->header_length < min_header_size || record_size > bytes_remaining) { + error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + break; + } + if (record_size > MAX_READ_BYTES) { + error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size, + metalogfile->starting_fileno, metalogfile->fileno); + continue; + } + if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header), + pos + sizeof(*header)) < 0) + break; + if (metadata_record_integrity_check(buf)) { + error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos); + continue; + } + debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__, + pos); + + replay_record(metalogfile, header, buf + header->header_length); + } + + freez(buf); +} + +int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile) +{ + UNUSED(ctx); + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size; + char path[RRDENG_PATH_MAX]; + + generate_metadata_logfile_path(metalogfile, path, sizeof(path)); + if (file_is_migrated(path)) + return 0; + + fd = open_file_buffered_io(path, O_RDWR, &file); + if (fd < 0) { +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + info("Loading metadata log \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb)); + if (ret) + goto error; + + ret = check_metadata_logfile_superblock(file); + if (ret) + goto error; +// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb); +// ++ctx->stats.io_read_requests; + + metalogfile->file = file; + metalogfile->pos = file_size; + + iterate_records(metalogfile); + + info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size); + add_migrated_file(path, file_size); + return 0; + +error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + return error; +} + +static int scan_metalog_files_cmp(const void *a, const void *b) +{ + struct metadata_logfile *file1, *file2; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; + + file1 = *(struct metadata_logfile **)a; + file2 = *(struct metadata_logfile **)b; + generate_metadata_logfile_path(file1, path1, sizeof(path1)); + generate_metadata_logfile_path(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of metadata logfiles that were loaded or < 0 on error */ +static int scan_metalog_files(struct metalog_instance *ctx) +{ + int ret; + unsigned starting_no, no, matched_files, i, failed_to_load; + static uv_fs_t req; + uv_dirent_t dent; + struct metadata_logfile **metalogfiles, *metalogfile; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } + info("Found %d files in path %s", ret, dbfiles_path); + + metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s/%s\"", dbfiles_path, dent.name); + ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no); + if (2 == ret) { + info("Matched file \"%s/%s\"", dbfiles_path, dent.name); + metalogfile = mallocz(sizeof(*metalogfile)); + metadata_logfile_init(metalogfile, ctx, starting_no, no); + metalogfiles[matched_files++] = metalogfile; + } + } + uv_fs_req_cleanup(&req); + + if (0 == matched_files) { + freez(metalogfiles); + return 0; + } + if (matched_files == MAX_DATAFILES) { + error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + } + qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp); + ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files); + if (ret) { /* If the files are corrupted fail */ + for (i = 0 ; i < matched_files ; ++i) { + freez(metalogfiles[i]); + } + freez(metalogfiles); + return UV_EINVAL; + } + //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno; + + struct plugind cd = { + .enabled = 1, + .update_every = 0, + .pid = 0, + .serial_failures = 0, + .successful_collections = 0, + .obsolete = 0, + .started_t = INVALID_TIME, + .next = NULL, + .version = 0, + }; + + struct metalog_pluginsd_state metalog_parser_state; + metalog_pluginsd_state_init(&metalog_parser_state, ctx); + + PARSER_USER_OBJECT metalog_parser_object; + metalog_parser_object.enabled = cd.enabled; + metalog_parser_object.host = ctx->rrdeng_ctx->host; + metalog_parser_object.cd = &cd; + metalog_parser_object.trust_durations = 0; + metalog_parser_object.private = &metalog_parser_state; + + PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT); + if (unlikely(!parser)) { + error("Failed to initialize metadata log parser."); + failed_to_load = matched_files; + goto after_failed_to_parse; + } + parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host); + parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context); + parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone); + parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action; + parser->plugins_action->chart_action = &metalog_pluginsd_chart_action; + parser->plugins_action->guid_action = &metalog_pluginsd_guid_action; + parser->plugins_action->context_action = &metalog_pluginsd_context_action; + parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action; + parser->plugins_action->host_action = &metalog_pluginsd_host_action; + + + metalog_parser_object.parser = parser; + ctx->metalog_parser_object = &metalog_parser_object; + + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + metalogfile = metalogfiles[i]; + db_lock(); + db_execute("BEGIN TRANSACTION;"); + ret = load_metadata_logfile(ctx, metalogfile); + if (0 != ret) { + error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); + unlink_metadata_logfile(metalogfile); + ++failed_to_load; + db_execute("ROLLBACK TRANSACTION;"); + } + else + db_execute("COMMIT TRANSACTION;"); + db_unlock(); + freez(metalogfile); + } + matched_files -= failed_to_load; + debug(D_METADATALOG, "PARSER ended"); + + parser_destroy(parser); + + size_t count __maybe_unused = metalog_parser_object.count; + + debug(D_METADATALOG, "Parsing count=%u", (unsigned)count); +after_failed_to_parse: + + freez(metalogfiles); + + return matched_files; +} + +/* Return 0 on success. */ +int init_metalog_files(struct metalog_instance *ctx) +{ + int ret; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + ret = scan_metalog_files(ctx); + if (ret < 0) { + error("Failed to scan path \"%s\".", dbfiles_path); + return ret; + }/* else if (0 == ret) { + ctx->last_fileno = 1; + }*/ + + return 0; +} diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h new file mode 100644 index 0000000..df12ac7 --- /dev/null +++ b/database/engine/metadata_log/logfile.h @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_LOGFILE_H +#define NETDATA_LOGFILE_H + +#include "metadatalogprotocol.h" +#include "../rrdengine.h" + +/* Forward declarations */ +struct metadata_logfile; +struct metalog_worker_config; + +#define METALOG_PREFIX "metadatalog-" +#define METALOG_EXTENSION ".mlf" + +/* only one event loop is supported for now */ +struct metadata_logfile { + unsigned fileno; /* Starts at 1 */ + unsigned starting_fileno; /* 0 for normal files, staring number during compaction */ + uv_file file; + uint64_t pos; + struct metalog_instance *ctx; + struct metadata_logfile *next; +}; + +struct metadata_logfile_list { + struct metadata_logfile *first; /* oldest */ + struct metadata_logfile *last; /* newest */ +}; + +extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen); +extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, + unsigned new_fileno); +extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile); +extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile); +extern int init_metalog_files(struct metalog_instance *ctx); + + +#endif /* NETDATA_LOGFILE_H */ diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h new file mode 100644 index 0000000..b484686 --- /dev/null +++ b/database/engine/metadata_log/metadatalog.h @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOG_H +#define NETDATA_METADATALOG_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include "../rrdengine.h" +#include "metadatalogprotocol.h" +#include "logfile.h" +#include "metadatalogapi.h" +#include "compaction.h" + +/* Forward declerations */ +struct metalog_instance; +struct parser_user_object; + +#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u" +#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u" + +struct metalog_instance { + struct rrdengine_instance *rrdeng_ctx; + struct parser_user_object *metalog_parser_object; + uint8_t initialized; /* set to 1 to mark context initialized */ +}; + +#endif /* NETDATA_METADATALOG_H */ diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c new file mode 100755 index 0000000..b206cca --- /dev/null +++ b/database/engine/metadata_log/metadatalogapi.c @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" + +/* + * Returns 0 on success, negative on error + */ +int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx) +{ + struct metalog_instance *ctx; + int error; + + ctx = callocz(1, sizeof(*ctx)); + ctx->initialized = 0; + rrdeng_parent_ctx->metalog_ctx = ctx; + + ctx->rrdeng_ctx = rrdeng_parent_ctx; + error = init_metalog_files(ctx); + if (error) { + goto error_after_init_rrd_files; + } + ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */ + return 0; + +error_after_init_rrd_files: + freez(ctx); + return UV_EIO; +} + +/* This function is called by dbengine rotation logic when the metric has no writers */ +void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid) +{ + uuid_t multihost_uuid; + + delete_dimension_uuid(metric_uuid); + rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid); + delete_dimension_uuid(&multihost_uuid); +} diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h new file mode 100644 index 0000000..d558b93 --- /dev/null +++ b/database/engine/metadata_log/metadatalogapi.h @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOGAPI_H +#define NETDATA_METADATALOGAPI_H + +extern void metalog_commit_delete_chart(RRDSET *st); +extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid); + +/* must call once before using anything */ +extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx); + +#endif /* NETDATA_METADATALOGAPI_H */ diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h new file mode 100644 index 0000000..1017213 --- /dev/null +++ b/database/engine/metadata_log/metadatalogprotocol.h @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOGPROTOCOL_H +#define NETDATA_METADATALOGPROTOCOL_H + +#include "../rrddiskprotocol.h" + +#define RRDENG_METALOG_MAGIC "netdata-metadata-log" + +#define RRDENG_METALOG_VER (1) + +#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t))) +/* + * Metadata log persistent super-block + */ +struct rrdeng_metalog_sb { + char magic_number[RRDENG_MAGIC_SZ]; + uint16_t version; + uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Metadata log record types + */ +#define METALOG_STORE_PADDING (0) +#define METALOG_CREATE_OBJECT (1) +#define METALOG_DELETE_OBJECT (2) +#define METALOG_OTHER (3) /* reserved */ + +/* + * Metadata log record header + */ +struct rrdeng_metalog_record_header { + /* when set to METALOG_STORE_PADDING jump to start of next block */ + uint8_t type; + + uint16_t header_length; + uint32_t payload_length; + /****************************************************** + * No fields above this point can ever change. * + ****************************************************** + * All fields below this point are subject to change. * + ******************************************************/ +} __attribute__ ((packed)); + +/* + * Metadata log record trailer + */ +struct rrdeng_metalog_record_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +#endif /* NETDATA_METADATALOGPROTOCOL_H */ diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c new file mode 100755 index 0000000..88c1453 --- /dev/null +++ b/database/engine/metadata_log/metalogpluginsd.c @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" +#include "metalogpluginsd.h" + +extern struct config stream_config; + +PARSER_RC metalog_pluginsd_host_action( + void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, + char *tags) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0); + if (host) { + if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) { + error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.", + host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), + rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE)); + ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */ + } + ((PARSER_USER_OBJECT *) user)->host = host; + return PARSER_RC_OK; + } + + if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) { + ((PARSER_USER_OBJECT *) user)->host = host; + return PARSER_RC_OK; + } + + if (likely(!uuid_parse(machine_guid, state->host_uuid))) { + int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags); + if (unlikely(rc)) { + errno = 0; + error("Failed to store host %s with UUID %s in the database", hostname, machine_guid); + } + } + else { + errno = 0; + error("Host machine GUID %s is not valid", machine_guid); + } + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, + char *title, char *units, char *plugin, char *module, int priority, + int update_every, RRDSET_TYPE chart_type, char *options) +{ + UNUSED(options); + + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + + if (unlikely(uuid_is_null(state->host_uuid))) { + debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host."); + return PARSER_RC_OK; + } + uuid_copy(state->chart_uuid, state->uuid); + uuid_clear(state->uuid); /* Consume UUID */ + (void) sql_store_chart(&state->chart_uuid, &state->host_uuid, + type, id, name, family, context, title, units, + plugin, module, priority, update_every, + chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1); + ((PARSER_USER_OBJECT *)user)->st_exists = 1; + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, + long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + UNUSED(user); + UNUSED(options); + UNUSED(algorithm); + UNUSED(st); + + if (unlikely(uuid_is_null(state->chart_uuid))) { + debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart."); + info("Ignoring dimension belonging to missing or ignored chart."); + return PARSER_RC_OK; + } + + if (unlikely(uuid_is_null(state->uuid))) { + debug(D_METADATALOG, "Ignoring dimension without unknown UUID"); + info("Ignoring dimension without unknown UUID"); + return PARSER_RC_OK; + } + + (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type); + uuid_clear(state->uuid); /* Consume UUID */ + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + uuid_copy(state->uuid, *uuid); + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + int rc = find_uuid_type(uuid); + + if (rc == 1) { + uuid_copy(state->host_uuid, *uuid); + ((PARSER_USER_OBJECT *)user)->st_exists = 0; + ((PARSER_USER_OBJECT *)user)->host_exists = 1; + } else if (rc == 2) { + uuid_copy(state->chart_uuid, *uuid); + ((PARSER_USER_OBJECT *)user)->st_exists = 1; + } else + uuid_copy(state->uuid, *uuid); + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid) +{ + UNUSED(user); + UNUSED(uuid); + + return PARSER_RC_OK; +} + +void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx) +{ + state->ctx = ctx; + state->skip_record = 0; + uuid_clear(state->uuid); + state->metalogfile = NULL; +} diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h new file mode 100644 index 0000000..96808aa --- /dev/null +++ b/database/engine/metadata_log/metalogpluginsd.h @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METALOGPLUGINSD_H +#define NETDATA_METALOGPLUGINSD_H + +#include "../../../collectors/plugins.d/pluginsd_parser.h" +#include "../../../collectors/plugins.d/plugins_d.h" +#include "../../../parser/parser.h" + +struct metalog_pluginsd_state { + struct metalog_instance *ctx; + uuid_t uuid; + uuid_t host_uuid; + uuid_t chart_uuid; + uint8_t skip_record; /* skip this record due to errors in parsing */ + struct metadata_logfile *metalogfile; /* current metadata log file being replayed */ +}; + +extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx); + +extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, + char *context, char *title, char *units, char *plugin, char *module, + int priority, int update_every, RRDSET_TYPE chart_type, char *options); +extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, + long multiplier, long divisor, char *options, + RRD_ALGORITHM algorithm_type); +extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action); +extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags); + +#endif /* NETDATA_METALOGPLUGINSD_H */ diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c new file mode 100644 index 0000000..a182071 --- /dev/null +++ b/database/engine/pagecache.c @@ -0,0 +1,1178 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" + +/* Forward declerations */ +static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx); + +/* always inserts into tail */ +static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if (likely(NULL != pg_cache->replaceQ.tail)) { + pg_cache_descr->prev = pg_cache->replaceQ.tail; + pg_cache->replaceQ.tail->next = pg_cache_descr; + } + if (unlikely(NULL == pg_cache->replaceQ.head)) { + pg_cache->replaceQ.head = pg_cache_descr; + } + pg_cache->replaceQ.tail = pg_cache_descr; +} + +static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next; + + prev = pg_cache_descr->prev; + next = pg_cache_descr->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) { + pg_cache->replaceQ.head = next; + } + if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) { + pg_cache->replaceQ.tail = prev; + } + pg_cache_descr->prev = pg_cache_descr->next = NULL; +} + +void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_insert_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} + +void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} +void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + pg_cache_replaceQ_insert_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); +} + +struct rrdeng_page_descr *pg_cache_create_descr(void) +{ + struct rrdeng_page_descr *descr; + + descr = mallocz(sizeof(*descr)); + descr->page_length = 0; + descr->start_time = INVALID_TIME; + descr->end_time = INVALID_TIME; + descr->id = NULL; + descr->extent = NULL; + descr->pg_cache_descr_state = 0; + descr->pg_cache_descr = NULL; + + return descr; +} + +/* The caller must hold page descriptor lock. */ +void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->waiters) + uv_cond_broadcast(&pg_cache_descr->cond); +} + +void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); +} + +/* + * The caller must hold page descriptor lock. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + */ +void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + ++pg_cache_descr->waiters; + uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex); + --pg_cache_descr->waiters; +} + +/* + * The caller must hold page descriptor lock. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + * Returns UV_ETIMEDOUT if timeout_sec seconds pass. + */ +int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec) +{ + int ret; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + ++pg_cache_descr->waiters; + ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC); + --pg_cache_descr->waiters; + + return ret; +} + +/* + * Returns page flags. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + */ +unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + unsigned long flags; + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_wait_event_unsafe(descr); + flags = pg_cache_descr->flags; + rrdeng_page_descr_mutex_unlock(ctx, descr); + + return flags; +} + +/* + * The caller must hold page descriptor lock. + */ +int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || + (exclusive_access && pg_cache_descr->refcnt)) { + return 0; + } + + return 1; +} + +/* + * The caller must hold page descriptor lock. + * Gets a reference to the page descriptor. + * Returns 1 on success and 0 on failure. + */ +int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + if (!pg_cache_can_get_unsafe(descr, exclusive_access)) + return 0; + + if (exclusive_access) + pg_cache_descr->flags |= RRD_PAGE_LOCKED; + ++pg_cache_descr->refcnt; + + return 1; +} + +/* + * The caller must hold the page descriptor lock. + * This function may block doing cleanup. + */ +void pg_cache_put_unsafe(struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + if (0 == --pg_cache_descr->refcnt) { + pg_cache_wake_up_waiters_unsafe(descr); + } +} + +/* + * This function may block doing cleanup. + */ +void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_put_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); +} + +/* The caller must hold the page cache lock */ +static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->populated_pages -= number; +} + +static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + pg_cache_release_pages_unsafe(ctx, number); + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +/* + * This function returns the maximum number of pages allowed in the page cache. + */ +unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) +{ + /* it's twice the number of producers since we pin 2 pages per producer */ + return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the + * number of pages below that number. + */ +unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) +{ + /* it's twice the number of producers since we pin 2 pages per producer */ + return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page + * cache. + */ +unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx) +{ + /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */ + return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function will block until it reserves #number populated pages. + * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit. + */ +static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned failures = 0; + const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */ + unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10; + + assert(number < ctx->max_cache_pages); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) + debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==", + number); + while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) { + + if (!pg_cache_try_evict_one_page_unsafe(ctx)) { + /* failed to evict */ + struct completion compl; + struct rrdeng_cmd cmd; + + ++failures; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + init_completion(&compl); + cmd.opcode = RRDENG_FLUSH_PAGES; + cmd.completion = &compl; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + /* wait for some pages to be flushed */ + debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__); + wait_for_completion(&compl); + destroy_completion(&compl); + + if (unlikely(failures > 1)) { + unsigned long slots; + /* exponential backoff */ + slots = random() % (2LU << MIN(failures, FAILURES_CEILING)); + (void)sleep_usec(slots * exp_backoff_slot_usec); + } + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + } + } + pg_cache->populated_pages += number; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +/* + * This function will attempt to reserve #number populated pages. + * It may trigger evictions if the pg_cache_soft_limit() limit is hit. + * Returns 0 on failure and 1 on success. + */ +static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned count = 0; + int ret = 0; + + assert(number < ctx->max_cache_pages); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) { + debug(D_RRDENGINE, + "==Page cache full. Trying to reserve %u pages.==", + number); + do { + if (!pg_cache_try_evict_one_page_unsafe(ctx)) + break; + ++count; + } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1); + debug(D_RRDENGINE, "Evicted %u pages.", count); + } + + if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) { + pg_cache->populated_pages += number; + ret = 1; /* success */ + } + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + return ret; +} + +/* The caller must hold the page cache and the page descriptor locks in that order */ +static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + freez(pg_cache_descr->page); + pg_cache_descr->page = NULL; + pg_cache_descr->flags &= ~RRD_PAGE_POPULATED; + pg_cache_release_pages_unsafe(ctx, 1); + ++ctx->stats.pg_cache_evictions; +} + +/* + * The caller must hold the page cache lock. + * Lock order: page cache -> replaceQ -> page descriptor + * This function iterates all pages and tries to evict one. + * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress, + * or it sets it to NULL if no write-back is in progress. + * + * Returns 1 on success and 0 on failure. + */ +static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned long old_flags; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr = NULL; + + uv_rwlock_wrlock(&pg_cache->replaceQ.lock); + for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) { + descr = pg_cache_descr->descr; + + rrdeng_page_descr_mutex_lock(ctx, descr); + old_flags = pg_cache_descr->flags; + if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) { + /* must evict */ + pg_cache_evict_unsafe(ctx, descr); + pg_cache_put_unsafe(descr); + pg_cache_replaceQ_delete_unsafe(ctx, descr); + + rrdeng_page_descr_mutex_unlock(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + + return 1; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + + /* failed to evict */ + return 0; +} + +/** + * Deletes a page from the database. + * Callers of this function need to make sure they're not deleting the same descriptor concurrently. + * @param ctx is the database instance. + * @param descr is the page descriptor. + * @param remove_dirty must be non-zero if the page to be deleted is dirty. + * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference. + * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not + * NULL. Otherwise, metric_id is not set. + * @return 1 if it's safe to delete the metric, 0 otherwise. + */ +uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, + uint8_t is_exclusive_holder, uuid_t *metric_id) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct page_cache_descr *pg_cache_descr = NULL; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + int ret; + uint8_t can_delete_metric = 0; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); + fatal_assert(NULL != PValue); + page_index = *PValue; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + uv_rwlock_wrlock(&page_index->lock); + ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); + if (unlikely(0 == ret)) { + uv_rwlock_wrunlock(&page_index->lock); + error("Page under deletion was not in index."); + if (unlikely(debug_flags & D_RRDENGINE)) { + print_page_descr(descr); + } + goto destroy; + } + --page_index->page_count; + if (!page_index->writers && !page_index->page_count) { + can_delete_metric = 1; + if (metric_id) { + memcpy(metric_id, page_index->id, sizeof(uuid_t)); + } + } + uv_rwlock_wrunlock(&page_index->lock); + fatal_assert(1 == ret); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + ++ctx->stats.pg_cache_deletions; + --pg_cache->page_descriptors; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!is_exclusive_holder) { + /* If we don't hold an exclusive page reference get one */ + while (!pg_cache_try_get_unsafe(descr, 1)) { + debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } + } + if (remove_dirty) { + pg_cache_descr->flags &= ~RRD_PAGE_DIRTY; + } else { + /* even a locked page could be dirty */ + while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { + debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + /* only after locking can it be safely deleted from LRU */ + pg_cache_replaceQ_delete(ctx, descr); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + pg_cache_evict_unsafe(ctx, descr); + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); + } + pg_cache_put(ctx, descr); + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */ + (void)sleep_usec(1000); /* 1 msec */ + } +destroy: + freez(descr); + pg_cache_update_metric_times(page_index); + + return can_delete_metric; +} + +static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time) +{ + usec_t pg_start, pg_end; + + pg_start = descr->start_time; + pg_end = descr->end_time; + + return (pg_start < start_time && pg_end >= start_time) || + (pg_start >= start_time && pg_start <= end_time); +} + +static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time) +{ + return (point_in_time >= descr->start_time && point_in_time <= descr->end_time); +} + +/* The caller must hold the page index lock */ +static inline struct rrdeng_page_descr * + find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time) +{ + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + return NULL; +} + +/* Update metric oldest and latest timestamps efficiently when adding new values */ +void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) +{ + usec_t oldest_time = page_index->oldest_time; + usec_t latest_time = page_index->latest_time; + + if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) { + page_index->oldest_time = descr->start_time; + } + if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) { + page_index->latest_time = descr->end_time; + } +} + +/* Update metric oldest and latest timestamps when removing old values */ +void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) +{ + Pvoid_t *firstPValue, *lastPValue; + Word_t firstIndex, lastIndex; + struct rrdeng_page_descr *descr; + usec_t oldest_time = INVALID_TIME; + usec_t latest_time = INVALID_TIME; + + uv_rwlock_rdlock(&page_index->lock); + /* Find first page in range */ + firstIndex = (Word_t)0; + firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0); + if (likely(NULL != firstPValue)) { + descr = *firstPValue; + oldest_time = descr->start_time; + } + lastIndex = (Word_t)-1; + lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0); + if (likely(NULL != lastPValue)) { + descr = *lastPValue; + latest_time = descr->end_time; + } + uv_rwlock_rdunlock(&page_index->lock); + + if (unlikely(NULL == firstPValue)) { + fatal_assert(NULL == lastPValue); + page_index->oldest_time = page_index->latest_time = INVALID_TIME; + return; + } + page_index->oldest_time = oldest_time; + page_index->latest_time = latest_time; +} + +/* If index is NULL lookup by UUID (descr->id) */ +void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, + struct rrdeng_page_descr *descr) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + unsigned long pg_cache_descr_state = descr->pg_cache_descr_state; + + if (0 != pg_cache_descr_state) { + /* there is page cache descriptor pre-allocated state */ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + pg_cache_reserve_pages(ctx, 1); + if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_insert(ctx, descr); + } + } + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); + fatal_assert(NULL != PValue); + page_index = *PValue; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + } else { + page_index = index; + } + + uv_rwlock_wrlock(&page_index->lock); + PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); + *PValue = descr; + ++page_index->page_count; + pg_cache_add_new_metric_time(page_index, descr); + uv_rwlock_wrunlock(&page_index->lock); + + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + ++ctx->stats.pg_cache_insertions; + ++pg_cache->page_descriptors; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); +} + +usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return INVALID_TIME; + } + + uv_rwlock_rdlock(&page_index->lock); + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr) { + uv_rwlock_rdunlock(&page_index->lock); + return INVALID_TIME; + } + uv_rwlock_rdunlock(&page_index->lock); + return descr->start_time; +} + +/** + * Return page information for the first page before point_in_time that satisfies the filter. + * @param ctx DB context + * @param page_index page index of a metric + * @param point_in_time the pages that are searched must be older than this timestamp + * @param filter decides if the page satisfies the caller's criteria + * @param page_info the result of the search is set in this pointer + */ +void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time, pg_cache_page_info_filter_t *filter, + struct rrdeng_page_info *page_info) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + (void)pg_cache; + fatal_assert(NULL != page_index); + + Index = (Word_t)(point_in_time / USEC_PER_SEC); + uv_rwlock_rdlock(&page_index->lock); + do { + PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } while (descr != NULL && !filter(descr)); + if (unlikely(NULL == descr)) { + page_info->page_length = 0; + page_info->start_time = INVALID_TIME; + page_info->end_time = INVALID_TIME; + } else { + page_info->page_length = descr->page_length; + page_info->start_time = descr->start_time; + page_info->end_time = descr->end_time; + } + uv_rwlock_rdunlock(&page_index->lock); +} +/** + * Searches for pages in a time range and triggers disk I/O if necessary and possible. + * Does not get a reference. + * @param ctx DB context + * @param id UUID + * @param start_time inclusive starting time in usec + * @param end_time inclusive ending time in usec + * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap + * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez(). + * If page_info_arrayp is set to NULL nothing was allocated. + * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. + * @return the number of pages that overlap with the time range [start_time,end_time]. + */ +unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time, + struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned i, j, k, preload_count, count, page_info_array_max_size; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + Word_t Index; + uint8_t failed_to_reserve; + + fatal_assert(NULL != ret_page_indexp); + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + *ret_page_indexp = page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); + *ret_page_indexp = NULL; + return 0; + } + + uv_rwlock_rdlock(&page_index->lock); + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr) { + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); + *ret_page_indexp = NULL; + return 0; + } else { + Index = (Word_t)(descr->start_time / USEC_PER_SEC); + } + if (page_info_arrayp) { + page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = mallocz(page_info_array_max_size); + } + + for (count = 0, preload_count = 0 ; + descr != NULL && is_page_in_time_range(descr, start_time, end_time) ; + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + /* Iterate all pages in range */ + + if (unlikely(0 == descr->page_length)) + continue; + if (page_info_arrayp) { + if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) { + page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size); + } + (*page_info_arrayp)[count].start_time = descr->start_time; + (*page_info_arrayp)[count].end_time = descr->end_time; + (*page_info_arrayp)[count].page_length = descr->page_length; + } + ++count; + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if (pg_cache_can_get_unsafe(descr, 0)) { + if (flags & RRD_PAGE_POPULATED) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + continue; + } + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + preload_array[preload_count++] = descr; + if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) { + rrdeng_page_descr_mutex_unlock(ctx, descr); + break; + } + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + + } + uv_rwlock_rdunlock(&page_index->lock); + + failed_to_reserve = 0; + for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) { + struct rrdeng_cmd cmd; + struct rrdeng_page_descr *next; + + descr = preload_array[i]; + if (NULL == descr) { + continue; + } + if (!pg_cache_try_reserve_pages(ctx, 1)) { + failed_to_reserve = 1; + break; + } + cmd.opcode = RRDENG_READ_EXTENT; + cmd.read_extent.page_cache_descr[0] = descr; + /* don't use this page again */ + preload_array[i] = NULL; + for (j = 0, k = 1 ; j < preload_count ; ++j) { + next = preload_array[j]; + if (NULL == next) { + continue; + } + if (descr->extent == next->extent) { + /* same extent, consolidate */ + if (!pg_cache_try_reserve_pages(ctx, 1)) { + failed_to_reserve = 1; + break; + } + cmd.read_extent.page_cache_descr[k++] = next; + /* don't use this page again */ + preload_array[j] = NULL; + } + } + cmd.read_extent.page_count = k; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + } + if (failed_to_reserve) { + debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__); + for (i = 0 ; i < preload_count ; ++i) { + descr = preload_array[i]; + if (NULL == descr) { + continue; + } + pg_cache_put(ctx, descr); + } + } + if (!preload_count) { + /* no such page */ + debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__); + } + if (unlikely(0 == count && page_info_arrayp)) { + freez(*page_info_arrayp); + *page_info_arrayp = NULL; + } + return count; +} + +/* + * Searches for a page and gets a reference. + * When point_in_time is INVALID_TIME get any page. + * If index is NULL lookup by UUID (id). + */ +struct rrdeng_page_descr * + pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t point_in_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + Word_t Index; + uint8_t page_not_in_cache; + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return NULL; + } + } else { + page_index = index; + } + pg_cache_reserve_pages(ctx, 1); + + page_not_in_cache = 0; + uv_rwlock_rdlock(&page_index->lock); + while (1) { + Index = (Word_t)(point_in_time / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + } + if (NULL == PValue || + 0 == descr->page_length || + (INVALID_TIME != point_in_time && + !is_point_in_time_in_page(descr, point_in_time))) { + /* non-empty page not found */ + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + break; + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + struct rrdeng_cmd cmd; + + uv_rwlock_rdunlock(&page_index->lock); + + cmd.opcode = RRDENG_READ_PAGE; + cmd.read_page.page_cache_descr = descr; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { + pg_cache_wait_event_unsafe(descr); + } + /* success */ + /* Downgrade exclusive reference to allow other readers */ + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + return descr; + } + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + if (!(flags & RRD_PAGE_POPULATED)) + page_not_in_cache = 1; + pg_cache_wait_event_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + + /* reset scan to find again */ + uv_rwlock_rdlock(&page_index->lock); + } + uv_rwlock_rdunlock(&page_index->lock); + + if (!(flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_set_hot(ctx, descr); + pg_cache_release_pages(ctx, 1); + if (page_not_in_cache) + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + else + rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); + return descr; +} + +/* + * Searches for the first page between start_time and end_time and gets a reference. + * start_time and end_time are inclusive. + * If index is NULL lookup by UUID (id). + */ +struct rrdeng_page_descr * +pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t start_time, usec_t end_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + uint8_t page_not_in_cache; + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return NULL; + } + } else { + page_index = index; + } + pg_cache_reserve_pages(ctx, 1); + + page_not_in_cache = 0; + uv_rwlock_rdlock(&page_index->lock); + while (1) { + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr || 0 == descr->page_length) { + /* non-empty page not found */ + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + break; + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + struct rrdeng_cmd cmd; + + uv_rwlock_rdunlock(&page_index->lock); + + cmd.opcode = RRDENG_READ_PAGE; + cmd.read_page.page_cache_descr = descr; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { + pg_cache_wait_event_unsafe(descr); + } + /* success */ + /* Downgrade exclusive reference to allow other readers */ + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + return descr; + } + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + if (!(flags & RRD_PAGE_POPULATED)) + page_not_in_cache = 1; + pg_cache_wait_event_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + + /* reset scan to find again */ + uv_rwlock_rdlock(&page_index->lock); + } + uv_rwlock_rdunlock(&page_index->lock); + + if (!(flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_set_hot(ctx, descr); + pg_cache_release_pages(ctx, 1); + if (page_not_in_cache) + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + else + rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); + return descr; +} + +struct pg_cache_page_index *create_page_index(uuid_t *id) +{ + struct pg_cache_page_index *page_index; + + page_index = mallocz(sizeof(*page_index)); + page_index->JudyL_array = (Pvoid_t) NULL; + uuid_copy(page_index->id, *id); + fatal_assert(0 == uv_rwlock_init(&page_index->lock)); + page_index->oldest_time = INVALID_TIME; + page_index->latest_time = INVALID_TIME; + page_index->prev = NULL; + page_index->page_count = 0; + page_index->writers = 0; + + return page_index; +} + +static void init_metrics_index(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; + pg_cache->metrics_index.last_page_index = NULL; + fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); +} + +static void init_replaceQ(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->replaceQ.head = NULL; + pg_cache->replaceQ.tail = NULL; + fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); +} + +static void init_committed_page_index(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL; + fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock)); + pg_cache->committed_page_index.latest_corr_id = 0; + pg_cache->committed_page_index.nr_committed_pages = 0; +} + +void init_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + + pg_cache->page_descriptors = 0; + pg_cache->populated_pages = 0; + fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); + + init_metrics_index(ctx); + init_replaceQ(ctx); + init_committed_page_index(ctx); +} + +void free_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Word_t ret_Judy, bytes_freed = 0; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index, *prev_page_index; + Word_t Index; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + /* Free committed page index */ + ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); + fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array); + bytes_freed += ret_Judy; + + for (page_index = pg_cache->metrics_index.last_page_index ; + page_index != NULL ; + page_index = prev_page_index) { + prev_page_index = page_index->prev; + + /* Find first page in range */ + Index = (Word_t) 0; + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + + while (descr != NULL) { + /* Iterate all page descriptors of this metric */ + + if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + /* Check rrdenglocking.c */ + pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + freez(pg_cache_descr->page); + bytes_freed += RRDENG_BLOCK_SIZE; + } + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + bytes_freed += sizeof(*pg_cache_descr); + } + freez(descr); + bytes_freed += sizeof(*descr); + + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } + + /* Free page index */ + ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0); + fatal_assert(NULL == page_index->JudyL_array); + bytes_freed += ret_Judy; + freez(page_index); + bytes_freed += sizeof(*page_index); + } + /* Free metrics index */ + ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); + fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); + bytes_freed += ret_Judy; + + info("Freed %lu bytes of memory from page cache.", bytes_freed); +}
\ No newline at end of file diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h new file mode 100644 index 0000000..31e9739 --- /dev/null +++ b/database/engine/pagecache.h @@ -0,0 +1,231 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_PAGECACHE_H +#define NETDATA_PAGECACHE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_instance; +struct extent_info; +struct rrdeng_page_descr; + +#define INVALID_TIME (0) + +/* Page flags */ +#define RRD_PAGE_DIRTY (1LU << 0) +#define RRD_PAGE_LOCKED (1LU << 1) +#define RRD_PAGE_READ_PENDING (1LU << 2) +#define RRD_PAGE_WRITE_PENDING (1LU << 3) +#define RRD_PAGE_POPULATED (1LU << 4) + +struct page_cache_descr { + struct rrdeng_page_descr *descr; /* parent descriptor */ + void *page; + unsigned long flags; + struct page_cache_descr *prev; /* LRU */ + struct page_cache_descr *next; /* LRU */ + + unsigned refcnt; + uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */ + uv_cond_t cond; + unsigned waiters; +}; + +/* Page cache descriptor flags, state = 0 means no descriptor */ +#define PG_CACHE_DESCR_ALLOCATED (1LU << 0) +#define PG_CACHE_DESCR_DESTROY (1LU << 1) +#define PG_CACHE_DESCR_LOCKED (1LU << 2) +#define PG_CACHE_DESCR_SHIFT (3) +#define PG_CACHE_DESCR_USERS_MASK (((unsigned long)-1) << PG_CACHE_DESCR_SHIFT) +#define PG_CACHE_DESCR_FLAGS_MASK (((unsigned long)-1) >> (BITS_PER_ULONG - PG_CACHE_DESCR_SHIFT)) + +/* + * Page cache descriptor state bits (works for both 32-bit and 64-bit architectures): + * + * 63 ... 31 ... 3 | 2 | 1 | 0| + * -----------------------------+------------+------------+-----------| + * number of descriptor users | DESTROY | LOCKED | ALLOCATED | + */ +struct rrdeng_page_descr { + uuid_t *id; /* never changes */ + struct extent_info *extent; + + /* points to ephemeral page cache descriptor if the page resides in the cache */ + struct page_cache_descr *pg_cache_descr; + + /* Compare-And-Swap target for page cache descriptor allocation algorithm */ + volatile unsigned long pg_cache_descr_state; + + /* page information */ + usec_t start_time; + usec_t end_time; + uint32_t page_length; +}; + +#define PAGE_INFO_SCRATCH_SZ (8) +struct rrdeng_page_info { + uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */ + + usec_t start_time; + usec_t end_time; + uint32_t page_length; +}; + +/* returns 1 for success, 0 for failure */ +typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *); + +#define PAGE_CACHE_MAX_PRELOAD_PAGES (256) + +/* maps time ranges to pages */ +struct pg_cache_page_index { + uuid_t id; + /* + * care: JudyL_array indices are converted from useconds to seconds to fit in one word in 32-bit architectures + * TODO: examine if we want to support better granularity than seconds + */ + Pvoid_t JudyL_array; + Word_t page_count; + unsigned short writers; + uv_rwlock_t lock; + + /* + * Only one effective writer, data deletion workqueue. + * It's also written during the DB loading phase. + */ + usec_t oldest_time; + + /* + * Only one effective writer, data collection thread. + * It's also written by the data deletion workqueue when data collection is disabled for this metric. + */ + usec_t latest_time; + + struct pg_cache_page_index *prev; +}; + +/* maps UUIDs to page indices */ +struct pg_cache_metrics_index { + uv_rwlock_t lock; + Pvoid_t JudyHS_array; + struct pg_cache_page_index *last_page_index; +}; + +/* gathers dirty pages to be written on disk */ +struct pg_cache_committed_page_index { + uv_rwlock_t lock; + + Pvoid_t JudyL_array; + + /* + * Dirty page correlation ID is a hint. Dirty pages that are correlated should have + * a small correlation ID difference. Dirty pages in memory should never have the + * same ID at the same time for correctness. + */ + Word_t latest_corr_id; + + unsigned nr_committed_pages; +}; + +/* + * Gathers populated pages to be evicted. + * Relies on page cache descriptors being there as it uses their memory. + */ +struct pg_cache_replaceQ { + uv_rwlock_t lock; /* LRU lock */ + + struct page_cache_descr *head; /* LRU */ + struct page_cache_descr *tail; /* MRU */ +}; + +struct page_cache { /* TODO: add statistics */ + uv_rwlock_t pg_cache_rwlock; /* page cache lock */ + + struct pg_cache_metrics_index metrics_index; + struct pg_cache_committed_page_index committed_page_index; + struct pg_cache_replaceQ replaceQ; + + unsigned page_descriptors; + unsigned populated_pages; +}; + +extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr); +extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr); +extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr); +extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr); +extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr); +extern struct rrdeng_page_descr *pg_cache_create_descr(void); +extern int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access); +extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr); +extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, + struct rrdeng_page_descr *descr); +extern uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, + uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id); +extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, + usec_t start_time, usec_t end_time); +extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time, pg_cache_page_info_filter_t *filter, + struct rrdeng_page_info *page_info); +extern unsigned + pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time, + struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp); +extern struct rrdeng_page_descr * + pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t point_in_time); +extern struct rrdeng_page_descr * + pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t start_time, usec_t end_time); +extern struct pg_cache_page_index *create_page_index(uuid_t *id); +extern void init_page_cache(struct rrdengine_instance *ctx); +extern void free_page_cache(struct rrdengine_instance *ctx); +extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr); +extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); +extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx); +extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx); +extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx); + +static inline void + pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp) +{ + usec_t end_time, old_end_time; + uint32_t page_length; + + if (NULL == descr->extent) { + /* this page is currently being modified, get consistent info locklessly */ + do { + end_time = descr->end_time; + __sync_synchronize(); + old_end_time = end_time; + page_length = descr->page_length; + __sync_synchronize(); + end_time = descr->end_time; + __sync_synchronize(); + } while ((end_time != old_end_time || (end_time & 1) != 0)); + + *end_timep = end_time; + *page_lengthp = page_length; + } else { + *end_timep = descr->end_time; + *page_lengthp = descr->page_length; + } +} + +/* The caller must hold a reference to the page and must have already set the new data */ +static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length) +{ + fatal_assert(!(end_time & 1)); + __sync_synchronize(); + descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */ + __sync_synchronize(); + descr->page_length = page_length; + __sync_synchronize(); + descr->end_time = end_time; /* mark end of uncertainty period */ +} + +#endif /* NETDATA_PAGECACHE_H */ diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h new file mode 100644 index 0000000..db47af5 --- /dev/null +++ b/database/engine/rrddiskprotocol.h @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDDISKPROTOCOL_H +#define NETDATA_RRDDISKPROTOCOL_H + +#define RRDENG_BLOCK_SIZE (4096) +#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE + +#define RRDENG_MAGIC_SZ (32) +#define RRDENG_DF_MAGIC "netdata-data-file" +#define RRDENG_JF_MAGIC "netdata-journal-file" + +#define RRDENG_VER_SZ (16) +#define RRDENG_DF_VER "1.0" +#define RRDENG_JF_VER "1.0" + +#define UUID_SZ (16) +#define CHECKSUM_SZ (4) /* CRC32 */ + +#define RRD_NO_COMPRESSION (0) +#define RRD_LZ4 (1) + +#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t))) +/* + * Data file persistent super-block + */ +struct rrdeng_df_sb { + char magic_number[RRDENG_MAGIC_SZ]; + char version[RRDENG_VER_SZ]; + uint8_t tier; + uint8_t padding[RRDENG_DF_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Page types + */ +#define PAGE_METRICS (0) +#define PAGE_LOGS (1) /* reserved */ + +/* + * Data file page descriptor + */ +struct rrdeng_extent_page_descr { + uint8_t type; + + uint8_t uuid[UUID_SZ]; + uint32_t page_length; + uint64_t start_time; + uint64_t end_time; +} __attribute__ ((packed)); + +/* + * Data file extent header + */ +struct rrdeng_df_extent_header { + uint32_t payload_length; + uint8_t compression_algorithm; + uint8_t number_of_pages; + /* #number_of_pages page descriptors follow */ + struct rrdeng_extent_page_descr descr[]; +} __attribute__ ((packed)); + +/* + * Data file extent trailer + */ +struct rrdeng_df_extent_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +#define RRDENG_JF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ)) +/* + * Journal file super-block + */ +struct rrdeng_jf_sb { + char magic_number[RRDENG_MAGIC_SZ]; + char version[RRDENG_VER_SZ]; + uint8_t padding[RRDENG_JF_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Transaction record types + */ +#define STORE_PADDING (0) +#define STORE_DATA (1) +#define STORE_LOGS (2) /* reserved */ + +/* + * Journal file transaction record header + */ +struct rrdeng_jf_transaction_header { + /* when set to STORE_PADDING jump to start of next block */ + uint8_t type; + + uint32_t reserved; /* reserved for future use */ + uint64_t id; + uint16_t payload_length; +} __attribute__ ((packed)); + +/* + * Journal file transaction record trailer + */ +struct rrdeng_jf_transaction_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +/* + * Journal file STORE_DATA action + */ +struct rrdeng_jf_store_data { + /* data file extent information */ + uint64_t extent_offset; + uint32_t extent_size; + + uint8_t number_of_pages; + /* #number_of_pages page descriptors follow */ + struct rrdeng_extent_page_descr descr[]; +} __attribute__ ((packed)); + +#endif /* NETDATA_RRDDISKPROTOCOL_H */
\ No newline at end of file diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c new file mode 100644 index 0000000..43135ff --- /dev/null +++ b/database/engine/rrdengine.c @@ -0,0 +1,1266 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" + +rrdeng_stats_t global_io_errors = 0; +rrdeng_stats_t global_fs_errors = 0; +rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; +rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; +rrdeng_stats_t global_flushing_pressure_page_deletions = 0; + +static void sanity_check(void) +{ + /* Magic numbers must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); + + /* Version strings must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ); + + /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */ + BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0); + + BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */ + + /* page count must fit in 8 bits */ + BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); + + /* extent cache count must fit in 32 bits */ + BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); + + /* page info scratch space must be able to hold 2 32-bit integers */ + BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); +} + +/* always inserts into tail */ +static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + + xt_cache_elem->prev = NULL; + xt_cache_elem->next = NULL; + + if (likely(NULL != xt_cache->replaceQ_tail)) { + xt_cache_elem->prev = xt_cache->replaceQ_tail; + xt_cache->replaceQ_tail->next = xt_cache_elem; + } + if (unlikely(NULL == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = xt_cache_elem; + } + xt_cache->replaceQ_tail = xt_cache_elem; +} + +static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *prev, *next; + + prev = xt_cache_elem->prev; + next = xt_cache_elem->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = next; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) { + xt_cache->replaceQ_tail = prev; + } + xt_cache_elem->prev = xt_cache_elem->next = NULL; +} + +static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_replaceQ_insert(wc, xt_cache_elem); +} + +/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */ +static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned idx; + int ret; + + ret = find_first_zero(xt_cache->allocation_bitmap); + if (-1 == ret || ret >= MAX_CACHED_EXTENTS) { + for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) { + idx = xt_cache_elem - xt_cache->extent_array; + if (!check_bit(xt_cache->inflight_bitmap, idx)) { + xt_cache_replaceQ_delete(wc, xt_cache_elem); + break; + } + } + if (NULL == xt_cache_elem) + return -1; + } else { + idx = (unsigned)ret; + xt_cache_elem = &xt_cache->extent_array[idx]; + } + xt_cache_elem->extent = extent; + xt_cache_elem->fileno = extent->datafile->fileno; + xt_cache_elem->inflight_io_descr = NULL; + xt_cache_replaceQ_insert(wc, xt_cache_elem); + modify_bit(&xt_cache->allocation_bitmap, idx, 1); + + return (int)idx; +} + +/** + * Returns 0 if the cached extent was found in the extent cache, 1 otherwise. + * Sets *idx to point to the position of the extent inside the cache. + **/ +static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned i; + + for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) { + xt_cache_elem = &xt_cache->extent_array[i]; + if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent && + xt_cache_elem->fileno == extent->datafile->fileno) { + *idx = i; + return 0; + } + } + return 1; +} + +#if 0 /* disabled code */ +static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + + xt_cache_elem = &xt_cache->extent_array[idx]; + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_elem->extent = NULL; + modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */ + modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */ +} +#endif + +void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx, + struct extent_io_descriptor *xt_io_descr) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + struct extent_io_descriptor *old_next; + + xt_cache_elem = &xt_cache->extent_array[idx]; + old_next = xt_cache_elem->inflight_io_descr->next; + xt_cache_elem->inflight_io_descr->next = xt_io_descr; + xt_io_descr->next = old_next; +} + +void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr) +{ + unsigned i, j, page_offset; + struct rrdengine_instance *ctx = wc->ctx; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + void *page; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = mallocz(RRDENG_BLOCK_SIZE); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*extent->pages[j]->id, *descr->id) && + extent->pages[j]->page_length == descr->page_length && + extent->pages[j]->start_time == descr->start_time && + extent->pages[j]->end_time == descr->end_time) { + break; + } + page_offset += extent->pages[j]->page_length; + + } + /* care, we don't hold the descriptor mutex */ + (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); + if (xt_io_descr->release_descr) { + pg_cache_put(ctx, descr); + } else { + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); + } + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); + freez(xt_io_descr); +} + +void read_extent_cb(uv_fs_t* req) +{ + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct extent_io_descriptor *xt_io_descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + int ret; + unsigned i, j, count; + void *page, *uncompressed_buf = NULL; + uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0; + uint8_t have_read_error = 0; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + xt_io_descr = req->data; + header = xt_io_descr->buf; + payload_length = header->payload_length; + count = header->number_of_pages; + payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; + trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); + + if (req->result < 0) { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + have_read_error = 1; + error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, + uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + goto after_crc_check; + } + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); + ret = crc32cmp(trailer->checksum, crc); +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + } +#endif + if (unlikely(ret)) { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + have_read_error = 1; + error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + } + +after_crc_check: + if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { + uncompressed_payload_length = 0; + for (i = 0 ; i < count ; ++i) { + uncompressed_payload_length += header->descr[i].page_length; + } + uncompressed_buf = mallocz(uncompressed_payload_length); + ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf, + payload_length, uncompressed_payload_length); + ctx->stats.before_decompress_bytes += payload_length; + ctx->stats.after_decompress_bytes += ret; + debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); + /* care, we don't hold the descriptor mutex */ + } + { + uint8_t xt_is_cached = 0; + unsigned xt_idx; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) { + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx]; + struct extent_io_descriptor *curr, *next; + + if (have_read_error) { + memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages)); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length); + } else { + (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length); + } + /* complete all connected in-flight read requests */ + for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) { + next = curr->next; + read_cached_extent_cb(wc, xt_idx, curr); + } + xt_cache_elem->inflight_io_descr = NULL; + modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */ + } + } + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = mallocz(RRDENG_BLOCK_SIZE); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0; j < count; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) && + header->descr[j].page_length == descr->page_length && + header->descr[j].start_time == descr->start_time && + header->descr[j].end_time == descr->end_time) { + break; + } + page_offset += header->descr[j].page_length; + } + /* care, we don't hold the descriptor mutex */ + if (have_read_error) { + /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */ + memset(page, 0, descr->page_length); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); + } else { + (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); + if (xt_io_descr->release_descr) { + pg_cache_put(ctx, descr); + } else { + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); + } + } + if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { + freez(uncompressed_buf); + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); + uv_fs_req_cleanup(req); + free(xt_io_descr->buf); + freez(xt_io_descr); +} + + +static void do_read_extent(struct rrdengine_worker_config* wc, + struct rrdeng_page_descr **descr, + unsigned count, + uint8_t release_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache_descr *pg_cache_descr; + int ret; + unsigned i, size_bytes, pos, real_io_size; +// uint32_t payload_length; + struct extent_io_descriptor *xt_io_descr; + struct rrdengine_datafile *datafile; + struct extent_info *extent = descr[0]->extent; + uint8_t xt_is_cached = 0, xt_is_inflight = 0; + unsigned xt_idx; + + datafile = extent->datafile; + pos = extent->offset; + size_bytes = extent->size; + + xt_io_descr = callocz(1, sizeof(*xt_io_descr)); + for (i = 0 ; i < count; ++i) { + rrdeng_page_descr_mutex_lock(ctx, descr[i]); + pg_cache_descr = descr[i]->pg_cache_descr; + pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; +// payload_length = descr[i]->page_length; + rrdeng_page_descr_mutex_unlock(ctx, descr[i]); + + xt_io_descr->descr_array[i] = descr[i]; + } + xt_io_descr->descr_count = count; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = pos; + xt_io_descr->req.data = xt_io_descr; + xt_io_descr->completion = NULL; + /* xt_io_descr->descr_commit_idx_array[0] */ + xt_io_descr->release_descr = release_descr; + + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached) { + xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]); + xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx); + if (xt_is_inflight) { + enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr); + return; + } + return read_cached_extent_cb(wc, xt_idx, xt_io_descr); + } else { + ret = try_insert_into_xt_cache(wc, extent); + if (-1 != ret) { + xt_idx = (unsigned)ret; + modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1); + wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr; + } + } + + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* freez(xt_io_descr); + return;*/ + } + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); + fatal_assert(-1 != ret); + ctx->stats.io_read_bytes += real_io_size; + ++ctx->stats.io_read_requests; + ctx->stats.io_read_extent_bytes += real_io_size; + ++ctx->stats.io_read_extents; + ctx->stats.pg_cache_backfills += count; +} + +static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + unsigned count, payload_length, descr_size, size_bytes; + void *buf; + /* persistent structures */ + struct rrdeng_df_extent_header *df_header; + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_store_data *jf_metric_data; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + df_header = xt_io_descr->buf; + count = df_header->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + + buf = wal_get_transaction_buffer(wc, size_bytes); + + jf_header = buf; + jf_header->type = STORE_DATA; + jf_header->reserved = 0; + jf_header->id = ctx->commit_log.transaction_id++; + jf_header->payload_length = payload_length; + + jf_metric_data = buf + sizeof(*jf_header); + jf_metric_data->extent_offset = xt_io_descr->pos; + jf_metric_data->extent_size = xt_io_descr->bytes; + jf_metric_data->number_of_pages = count; + memcpy(jf_metric_data->descr, df_header->descr, descr_size); + + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + crc32set(jf_trailer->checksum, crc); +} + +static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data) +{ + switch (type) { + case STORE_DATA: + commit_data_extent(wc, (struct extent_io_descriptor *)data); + break; + default: + fatal_assert(type == STORE_DATA); + break; + } +} + +static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + int error; + + error = uv_thread_join(wc->now_invalidating_dirty_pages); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; +} + +static void invalidate_oldest_committed(void *arg) +{ + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config *wc = &ctx->worker_config; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + Pvoid_t *PValue; + Word_t Index; + unsigned nr_committed_pages; + + do { + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + for (Index = 0, + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue; + + descr != NULL; + + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + fatal_assert(0 != descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) { + rrdeng_page_descr_mutex_unlock(ctx, descr); + + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); + fatal_assert(1 == ret); + break; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (!descr) { + info("Failed to invalidate any dirty pages to relieve page cache pressure."); + + goto out; + } + pg_cache_punch_hole(ctx, descr, 1, 1, NULL); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1); + rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1); + + } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)); +out: + wc->cleanup_thread_invalidating_dirty_pages = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned nr_committed_pages; + int error; + + if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */ + return; + + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + + if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* delete the oldest page in memory */ + if (wc->now_invalidating_dirty_pages) { + /* already deleting a page */ + return; + } + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path); + + wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages)); + wc->cleanup_thread_invalidating_dirty_pages = 0; + + error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + } + } +} + +void flush_pages_cb(uv_fs_t* req) +{ + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct extent_io_descriptor *xt_io_descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + unsigned i, count; + + xt_io_descr = req->data; + if (req->result < 0) { + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", + __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + } +#endif + count = xt_io_descr->descr_count; + for (i = 0 ; i < count ; ++i) { + /* care, we don't hold the descriptor mutex */ + descr = xt_io_descr->descr_array[i]; + + pg_cache_replaceQ_insert(ctx, descr); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); + /* wake up waiters, care no reference being held */ + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); + uv_fs_req_cleanup(req); + free(xt_io_descr->buf); + freez(xt_io_descr); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + pg_cache->committed_page_index.nr_committed_pages -= count; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + wc->inflight_dirty_pages -= count; +} + +/* + * completion must be NULL or valid. + * Returns 0 when no flushing can take place. + * Returns datafile bytes to be written on successful flushing initiation. + */ +static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + int compressed_size, max_compressed_size = 0; + unsigned i, count, size_bytes, pos, real_io_size; + uint32_t uncompressed_payload_length, payload_offset; + struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct page_cache_descr *pg_cache_descr; + struct extent_io_descriptor *xt_io_descr; + void *compressed_buf = NULL; + Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; + Pvoid_t *PValue; + Word_t Index; + uint8_t compression_algorithm = ctx->global_compress_alg; + struct extent_info *extent; + struct rrdengine_datafile *datafile; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + if (force) { + debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); + } + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + for (Index = 0, count = 0, uncompressed_payload_length = 0, + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue ; + + descr != NULL && count != MAX_PAGES_PER_EXTENT ; + + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + uint8_t page_write_pending; + + fatal_assert(0 != descr->page_length); + page_write_pending = 0; + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) { + page_write_pending = 1; + /* care, no reference being held */ + pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING; + uncompressed_payload_length += descr->page_length; + descr_commit_idx_array[count] = Index; + eligible_pages[count++] = descr; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + + if (page_write_pending) { + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); + fatal_assert(1 == ret); + } + } + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (!count) { + debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); + if (completion) + complete(completion); + return 0; + } + wc->inflight_dirty_pages += count; + + xt_io_descr = mallocz(sizeof(*xt_io_descr)); + payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); + break; + default: /* Compress */ + fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); + max_compressed_size = LZ4_compressBound(uncompressed_payload_length); + compressed_buf = mallocz(max_compressed_size); + size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); + break; + } + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* freez(xt_io_descr);*/ + } + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); + xt_io_descr->descr_count = count; + + pos = 0; + header = xt_io_descr->buf; + header->compression_algorithm = compression_algorithm; + header->number_of_pages = count; + pos += sizeof(*header); + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */ + extent->offset = datafile->pos; + extent->number_of_pages = count; + extent->datafile = datafile; + extent->next = NULL; + + for (i = 0 ; i < count ; ++i) { + /* This is here for performance reasons */ + xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; + + descr = xt_io_descr->descr_array[i]; + header->descr[i].type = PAGE_METRICS; + uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); + header->descr[i].page_length = descr->page_length; + header->descr[i].start_time = descr->start_time; + header->descr[i].end_time = descr->end_time; + pos += sizeof(header->descr[i]); + } + for (i = 0 ; i < count ; ++i) { + descr = xt_io_descr->descr_array[i]; + /* care, we don't hold the descriptor mutex */ + (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length); + descr->extent = extent; + extent->pages[i] = descr; + + pos += descr->page_length; + } + df_extent_insert(extent); + + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + header->payload_length = uncompressed_payload_length; + break; + default: /* Compress */ + compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf, + uncompressed_payload_length, max_compressed_size); + ctx->stats.before_compress_bytes += uncompressed_payload_length; + ctx->stats.after_compress_bytes += compressed_size; + debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); + (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); + freez(compressed_buf); + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + header->payload_length = compressed_size; + break; + } + extent->size = size_bytes; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = datafile->pos; + xt_io_descr->req.data = xt_io_descr; + xt_io_descr->completion = completion; + + trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer)); + crc32set(trailer->checksum, crc); + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); + fatal_assert(-1 != ret); + ctx->stats.io_write_bytes += real_io_size; + ++ctx->stats.io_write_requests; + ctx->stats.io_write_extent_bytes += real_io_size; + ++ctx->stats.io_write_extents; + do_commit_transaction(wc, STORE_DATA, xt_io_descr); + datafile->pos += ALIGN_BYTES_CEILING(size_bytes); + ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes); + rrdeng_test_quota(wc); + + return ALIGN_BYTES_CEILING(size_bytes); +} + +static void after_delete_old_data(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + unsigned deleted_bytes, journalfile_bytes, datafile_bytes; + int ret, error; + char path[RRDENG_PATH_MAX]; + + datafile = ctx->datafiles.first; + journalfile = datafile->journalfile; + datafile_bytes = datafile->pos; + journalfile_bytes = journalfile->pos; + deleted_bytes = 0; + + info("Deleting data and journal file pair."); + datafile_list_delete(ctx, datafile); + ret = destroy_journal_file(journalfile, datafile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + deleted_bytes += journalfile_bytes; + } + ret = destroy_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + deleted_bytes += datafile_bytes; + } + freez(journalfile); + freez(datafile); + + ctx->disk_space -= deleted_bytes; + info("Reclaimed %u bytes of disk space.", deleted_bytes); + + error = uv_thread_join(wc->now_deleting_files); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_deleting_files); + /* unfreeze command processing */ + wc->now_deleting_files = NULL; + + wc->cleanup_thread_deleting_files = 0; + + /* interrupt event loop */ + uv_stop(wc->loop); +} + +static void delete_old_data(void *arg) +{ + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config* wc = &ctx->worker_config; + struct rrdengine_datafile *datafile; + struct extent_info *extent, *next; + struct rrdeng_page_descr *descr; + unsigned count, i; + uint8_t can_delete_metric; + uuid_t metric_id; + + /* Safe to use since it will be deleted after we are done */ + datafile = ctx->datafiles.first; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next) { + count = extent->number_of_pages; + for (i = 0 ; i < count ; ++i) { + descr = extent->pages[i]; + can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id); + if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) { + /* + * If the metric is empty, has no active writers and if the metadata log has been initialized then + * attempt to delete the corresponding netdata dimension. + */ + metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id); + } + } + next = extent->next; + freez(extent); + } + wc->cleanup_thread_deleting_files = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +void rrdeng_test_quota(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct rrdengine_datafile *datafile; + unsigned current_size, target_size; + uint8_t out_of_space, only_one_datafile; + int ret, error; + + out_of_space = 0; + /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */ + if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) { + out_of_space = 1; + } + datafile = ctx->datafiles.last; + current_size = datafile->pos; + target_size = ctx->max_disk_space / TARGET_DATAFILES; + target_size = MIN(target_size, MAX_DATAFILE_SIZE); + target_size = MAX(target_size, MIN_DATAFILE_SIZE); + only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0; + if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { + /* Finalize data and journal file and create a new pair */ + wal_flush_transaction_buffer(wc); + ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1); + if (likely(!ret)) { + ++ctx->last_fileno; + } + } + if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) { + /* delete old data */ + if (wc->now_deleting_files) { + /* already deleting data */ + return; + } + if (NULL == ctx->datafiles.first->next) { + error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\"" + " to reclaim space, there are no other file pairs left.", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + return; + } + info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files)); + wc->cleanup_thread_deleting_files = 0; + + error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_deleting_files); + wc->now_deleting_files = NULL; + } + } +} + +static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc) +{ + if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) { + return 1; + } + return 0; +} + +static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + + if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) { + after_invalidate_oldest_committed(wc); + } + if (unlikely(wc->cleanup_thread_deleting_files)) { + after_delete_old_data(wc); + } + if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) { + ctx->quiesce = QUIESCED; + complete(&ctx->rrdengine_completion); + } +} + +/* return 0 on success */ +int init_rrd_files(struct rrdengine_instance *ctx) +{ + return init_data_files(ctx); +} + +void finalize_rrd_files(struct rrdengine_instance *ctx) +{ + return finalize_data_files(ctx); +} + +void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) +{ + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + wc->queue_size = 0; + fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); + fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); +} + +void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) +{ + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&wc->cmd_mutex); + while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { + uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); + } + fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); + /* enqueue command */ + wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; + wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.tail + 1 : 0; + wc->queue_size = queue_size + 1; + uv_mutex_unlock(&wc->cmd_mutex); + + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) +{ + struct rrdeng_cmd ret; + unsigned queue_size; + + uv_mutex_lock(&wc->cmd_mutex); + queue_size = wc->queue_size; + if (queue_size == 0) { + ret.opcode = RRDENG_NOOP; + } else { + /* dequeue command */ + ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; + if (queue_size == 1) { + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + } else { + wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.head + 1 : 0; + } + wc->queue_size = queue_size - 1; + + /* wake up producers */ + uv_cond_signal(&wc->cmd_cond); + } + uv_mutex_unlock(&wc->cmd_mutex); + + return ret; +} + +void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); + uv_update_time(handle->loop); + debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); +} + +/* Flushes dirty pages when timer expires */ +#define TIMER_PERIOD_MS (1000) + +void timer_cb(uv_timer_t* handle) +{ + struct rrdengine_worker_config* wc = handle->data; + struct rrdengine_instance *ctx = wc->ctx; + + uv_stop(handle->loop); + uv_update_time(handle->loop); + if (unlikely(!ctx->metalog_ctx->initialized)) + return; /* Wait for the metadata log to initialize */ + rrdeng_test_quota(wc); + debug(D_RRDENGINE, "%s: timeout reached.", __func__); + if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { + /* There is free space so we can write to disk and we are not actively deleting dirty buffers */ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark, + high_watermark; + + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + producers = ctx->metric_API_max_producers; + /* are flushable pages more than 25% of the maximum page cache size */ + high_watermark = (ctx->max_cache_pages * 25LLU) / 100; + low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */ + + /* Flush more pages only if disk can keep up */ + if (wc->inflight_dirty_pages < high_watermark + producers) { + if (nr_committed_pages > producers && + /* committed to be written pages are more than the produced number */ + nr_committed_pages - producers > high_watermark) { + /* Flushing speed must increase to stop page cache from filling with dirty pages */ + bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; + } + bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); + + debug(D_RRDENGINE, "Flushing pages to disk."); + for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL); + bytes_written && (total_bytes < bytes_to_write); + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 0, NULL); + } + } + } +#ifdef NETDATA_INTERNAL_CHECKS + { + char buf[4096]; + debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); + } +#endif +} + +#define MAX_CMD_BATCH_SIZE (256) + +void rrdeng_worker(void* arg) +{ + struct rrdengine_worker_config* wc = arg; + struct rrdengine_instance *ctx = wc->ctx; + uv_loop_t* loop; + int shutdown, ret; + enum rrdeng_opcode opcode; + uv_timer_t timer_req; + struct rrdeng_cmd cmd; + unsigned cmd_batch_size; + + rrdeng_init_cmd_queue(wc); + + loop = wc->loop = mallocz(sizeof(uv_loop_t)); + ret = uv_loop_init(loop); + if (ret) { + error("uv_loop_init(): %s", uv_strerror(ret)); + goto error_after_loop_init; + } + loop->data = wc; + + ret = uv_async_init(wc->loop, &wc->async, async_cb); + if (ret) { + error("uv_async_init(): %s", uv_strerror(ret)); + goto error_after_async_init; + } + wc->async.data = wc; + + wc->now_deleting_files = NULL; + wc->cleanup_thread_deleting_files = 0; + + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; + wc->inflight_dirty_pages = 0; + + /* dirty page flushing timer */ + ret = uv_timer_init(loop, &timer_req); + if (ret) { + error("uv_timer_init(): %s", uv_strerror(ret)); + goto error_after_timer_init; + } + timer_req.data = wc; + + wc->error = 0; + /* wake up initialization thread */ + complete(&ctx->rrdengine_completion); + + fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); + shutdown = 0; + while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { + uv_run(loop, UV_RUN_DEFAULT); + rrdeng_cleanup_finished_threads(wc); + + /* wait for commands */ + cmd_batch_size = 0; + do { + /* + * Avoid starving the loop when there are too many commands coming in. + * timer_cb will interrupt the loop again to allow serving more commands. + */ + if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) + break; + + cmd = rrdeng_deq_cmd(wc); + opcode = cmd.opcode; + ++cmd_batch_size; + + switch (opcode) { + case RRDENG_NOOP: + /* the command queue was empty, do nothing */ + break; + case RRDENG_SHUTDOWN: + shutdown = 1; + break; + case RRDENG_QUIESCE: + ctx->drop_metrics_under_page_cache_pressure = 0; + ctx->quiesce = SET_QUIESCE; + fatal_assert(0 == uv_timer_stop(&timer_req)); + uv_close((uv_handle_t *)&timer_req, NULL); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all committed pages. */ + } + wal_flush_transaction_buffer(wc); + if (!rrdeng_threads_alive(wc)) { + ctx->quiesce = QUIESCED; + complete(&ctx->rrdengine_completion); + } + break; + case RRDENG_READ_PAGE: + do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); + break; + case RRDENG_READ_EXTENT: + do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); + break; + case RRDENG_COMMIT_PAGE: + do_commit_transaction(wc, STORE_DATA, NULL); + break; + case RRDENG_FLUSH_PAGES: { + if (wc->now_invalidating_dirty_pages) { + /* Do not flush if the disk cannot keep up */ + complete(cmd.completion); + } else { + (void)do_flush_pages(wc, 1, cmd.completion); + } + break; + case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE: + rrdeng_invalidate_oldest_committed(wc); + break; + } + default: + debug(D_RRDENGINE, "%s: default.", __func__); + break; + } + } while (opcode != RRDENG_NOOP); + } + + /* cleanup operations of the event loop */ + info("Shutting down RRD engine event loop."); + + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour and we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all committed pages. */ + } + wal_flush_transaction_buffer(wc); + uv_run(loop, UV_RUN_DEFAULT); + + info("Shutting down RRD engine event loop complete."); + /* TODO: don't let the API block by waiting to enqueue commands */ + uv_cond_destroy(&wc->cmd_cond); +/* uv_mutex_destroy(&wc->cmd_mutex); */ + fatal_assert(0 == uv_loop_close(loop)); + freez(loop); + + return; + +error_after_timer_init: + uv_close((uv_handle_t *)&wc->async, NULL); +error_after_async_init: + fatal_assert(0 == uv_loop_close(loop)); +error_after_loop_init: + freez(loop); + + wc->error = UV_EAGAIN; + /* wake up initialization thread */ + complete(&ctx->rrdengine_completion); +} + +/* C entry point for development purposes + * make "LDFLAGS=-errdengine_main" + */ +void rrdengine_main(void) +{ + int ret; + struct rrdengine_instance *ctx; + + sanity_check(); + ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); + if (ret) { + exit(ret); + } + rrdeng_exit(ctx); + fprintf(stderr, "Hello world!"); + exit(0); +} diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h new file mode 100644 index 0000000..87af04b --- /dev/null +++ b/database/engine/rrdengine.h @@ -0,0 +1,233 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINE_H +#define NETDATA_RRDENGINE_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <fcntl.h> +#include <lz4.h> +#include <Judy.h> +#include <openssl/sha.h> +#include <openssl/evp.h> +#include "../../daemon/common.h" +#include "../rrd.h" +#include "rrddiskprotocol.h" +#include "rrdenginelib.h" +#include "datafile.h" +#include "journalfile.h" +#include "metadata_log/metadatalog.h" +#include "rrdengineapi.h" +#include "pagecache.h" +#include "rrdenglocking.h" + +#ifdef NETDATA_RRD_INTERNALS + +#endif /* NETDATA_RRD_INTERNALS */ + +/* Forward declerations */ +struct rrdengine_instance; + +#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */ + +#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u" +#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" + + +typedef enum { + RRDENGINE_STATUS_UNINITIALIZED = 0, + RRDENGINE_STATUS_INITIALIZING, + RRDENGINE_STATUS_INITIALIZED +} rrdengine_state_t; + +enum rrdeng_opcode { + /* can be used to return empty status or flush the command queue */ + RRDENG_NOOP = 0, + + RRDENG_READ_PAGE, + RRDENG_READ_EXTENT, + RRDENG_COMMIT_PAGE, + RRDENG_FLUSH_PAGES, + RRDENG_SHUTDOWN, + RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, + RRDENG_QUIESCE, + + RRDENG_MAX_OPCODE +}; + +struct rrdeng_cmd { + enum rrdeng_opcode opcode; + union { + struct rrdeng_read_page { + struct rrdeng_page_descr *page_cache_descr; + } read_page; + struct rrdeng_read_extent { + struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT]; + int page_count; + } read_extent; + struct completion *completion; + }; +}; + +#define RRDENG_CMD_Q_MAX_SIZE (2048) + +struct rrdeng_cmdqueue { + unsigned head, tail; + struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE]; +}; + +struct extent_io_descriptor { + uv_fs_t req; + uv_buf_t iov; + void *buf; + uint64_t pos; + unsigned bytes; + struct completion *completion; + unsigned descr_count; + int release_descr; + struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT]; + Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; + struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */ +}; + +struct generic_io_descriptor { + uv_fs_t req; + uv_buf_t iov; + void *buf; + uint64_t pos; + unsigned bytes; + struct completion *completion; +}; + +struct extent_cache_element { + struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */ + unsigned fileno; + struct extent_cache_element *prev; /* LRU */ + struct extent_cache_element *next; /* LRU */ + struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */ + uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE]; +}; + +#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */ + +/* Initialize by setting the structure to zero */ +struct extent_cache { + struct extent_cache_element extent_array[MAX_CACHED_EXTENTS]; + unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */ + unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */ + + struct extent_cache_element *replaceQ_head; /* LRU */ + struct extent_cache_element *replaceQ_tail; /* MRU */ +}; + +struct rrdengine_worker_config { + struct rrdengine_instance *ctx; + + uv_thread_t thread; + uv_loop_t* loop; + uv_async_t async; + + /* file deletion thread */ + uv_thread_t *now_deleting_files; + unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */ + + /* dirty page deletion thread */ + uv_thread_t *now_invalidating_dirty_pages; + /* set to 0 when now_invalidating_dirty_pages is still running */ + unsigned long cleanup_thread_invalidating_dirty_pages; + unsigned inflight_dirty_pages; + + /* FIFO command queue */ + uv_mutex_t cmd_mutex; + uv_cond_t cmd_cond; + volatile unsigned queue_size; + struct rrdeng_cmdqueue cmd_queue; + + struct extent_cache xt_cache; + + int error; +}; + +/* + * Debug statistics not used by code logic. + * They only describe operations since DB engine instance load time. + */ +struct rrdengine_statistics { + rrdeng_stats_t metric_API_producers; + rrdeng_stats_t metric_API_consumers; + rrdeng_stats_t pg_cache_insertions; + rrdeng_stats_t pg_cache_deletions; + rrdeng_stats_t pg_cache_hits; + rrdeng_stats_t pg_cache_misses; + rrdeng_stats_t pg_cache_backfills; + rrdeng_stats_t pg_cache_evictions; + rrdeng_stats_t before_decompress_bytes; + rrdeng_stats_t after_decompress_bytes; + rrdeng_stats_t before_compress_bytes; + rrdeng_stats_t after_compress_bytes; + rrdeng_stats_t io_write_bytes; + rrdeng_stats_t io_write_requests; + rrdeng_stats_t io_read_bytes; + rrdeng_stats_t io_read_requests; + rrdeng_stats_t io_write_extent_bytes; + rrdeng_stats_t io_write_extents; + rrdeng_stats_t io_read_extent_bytes; + rrdeng_stats_t io_read_extents; + rrdeng_stats_t datafile_creations; + rrdeng_stats_t datafile_deletions; + rrdeng_stats_t journalfile_creations; + rrdeng_stats_t journalfile_deletions; + rrdeng_stats_t page_cache_descriptors; + rrdeng_stats_t io_errors; + rrdeng_stats_t fs_errors; + rrdeng_stats_t pg_cache_over_half_dirty_events; + rrdeng_stats_t flushing_pressure_page_deletions; +}; + +/* I/O errors global counter */ +extern rrdeng_stats_t global_io_errors; +/* File-System errors global counter */ +extern rrdeng_stats_t global_fs_errors; +/* number of File-Descriptors that have been reserved by dbengine */ +extern rrdeng_stats_t rrdeng_reserved_file_descriptors; +/* inability to flush global counters */ +extern rrdeng_stats_t global_pg_cache_over_half_dirty_events; +extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */ + +#define NO_QUIESCE (0) /* initial state when all operations function normally */ +#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */ +#define QUIESCED (2) /* is set after all threads have finished running */ + +struct rrdengine_instance { + struct metalog_instance *metalog_ctx; + struct rrdengine_worker_config worker_config; + struct completion rrdengine_completion; + struct page_cache pg_cache; + uint8_t drop_metrics_under_page_cache_pressure; /* boolean */ + uint8_t global_compress_alg; + struct transaction_commit_log commit_log; + struct rrdengine_datafile_list datafiles; + RRDHOST *host; /* the legacy host, or NULL for multi-host DB */ + char dbfiles_path[FILENAME_MAX + 1]; + char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */ + uint64_t disk_space; + uint64_t max_disk_space; + unsigned last_fileno; /* newest index of datafile and journalfile */ + unsigned long max_cache_pages; + unsigned long cache_pages_low_watermark; + unsigned long metric_API_max_producers; + + uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */ + + struct rrdengine_statistics stats; +}; + +extern int init_rrd_files(struct rrdengine_instance *ctx); +extern void finalize_rrd_files(struct rrdengine_instance *ctx); +extern void rrdeng_test_quota(struct rrdengine_worker_config* wc); +extern void rrdeng_worker(void* arg); +extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd); +extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc); + +#endif /* NETDATA_RRDENGINE_H */
\ No newline at end of file diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c new file mode 100755 index 0000000..7b2ff5b --- /dev/null +++ b/database/engine/rrdengineapi.c @@ -0,0 +1,999 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +/* Default global database instance */ +struct rrdengine_instance multidb_ctx; + +int default_rrdeng_page_cache_mb = 32; +int default_rrdeng_disk_quota_mb = 256; +int default_multidb_disk_quota_mb = 256; +/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ +uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; + +static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host) +{ + return host->rrdeng_ctx; +} + +/* This UUID is not unique across hosts */ +void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid) +{ + EVP_MD_CTX *evpctx; + unsigned char hash_value[EVP_MAX_MD_SIZE]; + unsigned int hash_len; + + evpctx = EVP_MD_CTX_create(); + EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); + EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id)); + EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id)); + EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); + EVP_MD_CTX_destroy(evpctx); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} + +/* Transform legacy UUID to be unique across hosts deterministacally */ +void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid) +{ + EVP_MD_CTX *evpctx; + unsigned char hash_value[EVP_MAX_MD_SIZE]; + unsigned int hash_len; + + evpctx = EVP_MD_CTX_create(); + EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); + EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN); + EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t)); + EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); + EVP_MD_CTX_destroy(evpctx); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} + +void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid) +{ + struct page_cache *pg_cache; + struct rrdengine_instance *ctx; + uuid_t legacy_uuid; + uuid_t multihost_legacy_uuid; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + int is_multihost_child = 0; + RRDHOST *host = rd->rrdset->rrdhost; + + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); + if (unlikely(!ctx)) { + error("Failed to fetch multidb context"); + return; + } + pg_cache = &ctx->pg_cache; + + rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid); + rd->state->metric_uuid = dim_uuid; + if (host != localhost && host->rrdeng_ctx == &multidb_ctx) + is_multihost_child = 1; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (is_multihost_child || NULL == PValue) { + /* First time we see the legacy UUID or metric belongs to child host in multi-host DB. + * Drop legacy support, normal path */ + + if (unlikely(!rd->state->metric_uuid)) + rd->state->metric_uuid = create_dimension_uuid(rd->rrdset, rd); + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(rd->state->metric_uuid); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + } else { + /* There are legacy UUIDs in the database, implement backward compatibility */ + + rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid, + &multihost_legacy_uuid); + + if (unlikely(!rd->state->metric_uuid)) + rd->state->metric_uuid = mallocz(sizeof(uuid_t)); + + int need_to_store = (dim_uuid == NULL || uuid_compare(*rd->state->metric_uuid, multihost_legacy_uuid)); + + uuid_copy(*rd->state->metric_uuid, multihost_legacy_uuid); + + if (unlikely(need_to_store)) + (void)sql_store_dimension(rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, + rd->algorithm); + + } + rd->state->rrdeng_uuid = &page_index->id; + rd->state->page_index = page_index; +} + +/* + * Gets a handle for storing metrics to the database. + * The handle must be released with rrdeng_store_metric_final(). + */ +void rrdeng_store_metric_init(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); + handle = &rd->state->handle.rrdeng; + handle->ctx = ctx; + + handle->descr = NULL; + handle->prev_descr = NULL; + handle->unaligned_page = 0; + + page_index = rd->state->page_index; + uv_rwlock_wrlock(&page_index->lock); + ++page_index->writers; + uv_rwlock_wrunlock(&page_index->lock); +} + +/* The page must be populated and referenced */ +static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) +{ + unsigned i; + uint8_t has_only_empty_metrics = 1; + storage_number *page; + + page = descr->pg_cache_descr->page; + for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) { + if (SN_EMPTY_SLOT != page[i]) { + has_only_empty_metrics = 0; + break; + } + } + return has_only_empty_metrics; +} + +void rrdeng_store_metric_flush_current_page(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_descr *descr; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + if (unlikely(!ctx)) + return; + descr = handle->descr; + if (unlikely(NULL == descr)) { + return; + } + if (likely(descr->page_length)) { + int page_is_empty; + + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); + + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); + } + page_is_empty = page_has_only_empty_metrics(descr); + if (page_is_empty) { + debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_put(ctx, descr); + pg_cache_punch_hole(ctx, descr, 1, 0, NULL); + handle->prev_descr = NULL; + } else { + /* + * Disable pinning for now as it leads to deadlocks. When a collector stops collecting the extra pinned page + * eventually gets rotated but it cannot be destroyed due to the extra reference. + */ + /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ +/* rrdeng_page_descr_mutex_lock(ctx, descr); + ret = pg_cache_try_get_unsafe(descr, 0); + rrdeng_page_descr_mutex_unlock(ctx, descr); + fatal_assert(1 == ret);*/ + + rrdeng_commit_page(ctx, descr, handle->page_correlation_id); + /* handle->prev_descr = descr;*/ + } + } else { + freez(descr->pg_cache_descr->page); + rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); + freez(descr); + } + handle->descr = NULL; +} + +void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct page_cache *pg_cache; + struct rrdeng_page_descr *descr; + storage_number *page; + uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + pg_cache = &ctx->pg_cache; + descr = handle->descr; + + if (descr) { + /* Make alignment decisions */ + + if (descr->page_length == rd->rrdset->rrddim_page_alignment) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + /* is the metric far enough out of alignment with the others? */ + if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) { + handle->unaligned_page = 1; + debug(D_RRDENGINE, "Metric page is not aligned with chart:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + } + if (unlikely(handle->unaligned_page && + /* did the other metrics change page? */ + rd->rrdset->rrddim_page_alignment <= sizeof(number))) { + debug(D_RRDENGINE, "Flushing unaligned metric page."); + must_flush_unaligned_page = 1; + handle->unaligned_page = 0; + } + } + if (unlikely(NULL == descr || + descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE || + must_flush_unaligned_page)) { + rrdeng_store_metric_flush_current_page(rd); + + page = rrdeng_create_page(ctx, &rd->state->page_index->id, &descr); + fatal_assert(page); + + handle->descr = descr; + + handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); + + if (0 == rd->rrdset->rrddim_page_alignment) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + } + page = descr->pg_cache_descr->page; + page[descr->page_length / sizeof(number)] = number; + pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number)); + + if (perfect_page_alignment) + rd->rrdset->rrddim_page_alignment = descr->page_length; + if (unlikely(INVALID_TIME == descr->start_time)) { + unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; + descr->start_time = point_in_time; + + new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1); + while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) { + /* Increase ctx->metric_API_max_producers */ + ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers, + old_metric_API_max_producers, + new_metric_API_producers); + if (old_metric_API_max_producers == ret_metric_API_max_producers) { + /* success */ + break; + } + } + + pg_cache_insert(ctx, rd->state->page_index, descr); + } else { + pg_cache_add_new_metric_time(rd->state->page_index, descr); + } +} + +/* + * Releases the database reference from the handle for storing metrics. + * Returns 1 if it's safe to delete the dimension. + */ +int rrdeng_store_metric_finalize(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + uint8_t can_delete_metric = 0; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + page_index = rd->state->page_index; + rrdeng_store_metric_flush_current_page(rd); + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); + } + uv_rwlock_wrlock(&page_index->lock); + if (!--page_index->writers && !page_index->page_count) { + can_delete_metric = 1; + } + uv_rwlock_wrunlock(&page_index->lock); + + return can_delete_metric; +} + +/* Returns 1 if the data collection interval is well defined, 0 otherwise */ +static int metrics_with_known_interval(struct rrdeng_page_descr *descr) +{ + unsigned page_entries; + + if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time)) + return 0; + page_entries = descr->page_length / sizeof(storage_number); + if (likely(page_entries > 1)) { + return 1; + } + return 0; +} + +static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) +{ + return (uint32_t *)&page_info->scratch[0]; +} + +static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) +{ + return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; +} + +/** + * Calculates the regions of different data collection intervals in a netdata chart in the time range + * [start_time,end_time]. This call takes the netdata chart read lock. + * @param st the netdata chart whose data collection interval boundaries are calculated. + * @param start_time inclusive starting time in usec + * @param end_time inclusive ending time in usec + * @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a + * reference dimension that that have different data collection intervals and overlap with the time range + * [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set + * to NULL nothing was allocated. + * @param max_intervalp is derefenced and set to be the largest data collection interval of all regions. + * @return number of regions with different data collection intervals. + */ +unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list) +{ + struct pg_cache_page_index *page_index; + struct rrdengine_instance *ctx; + unsigned pages_nr; + RRDDIM *rd_iter, *rd; + struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev; + unsigned i, j, page_entries, region_points, page_points, regions, max_interval; + time_t now; + usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page; + struct rrdeng_region_info *region_info_array; + uint8_t is_first_region_initialized; + + ctx = get_rrdeng_ctx_from_host(st->rrdhost); + regions = 1; + *max_intervalp = max_interval = 0; + region_info_array = NULL; + *region_info_arrayp = NULL; + page_info_array = NULL; + + RRDDIM *temp_rd = context_param_list ? context_param_list->rd : NULL; + rrdset_rdlock(st); + for(rd_iter = temp_rd?temp_rd:st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { + /* + * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions + * but it is a best effort approximation with a bias towards older metrics in a chart. It + * matches netdata behaviour in the sense that dimensions are generally aligned in a chart + * and older dimensions contain more information about the time range. It does not work well + * for metrics that have recently stopped being collected. + */ + curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid, + start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); + if (INVALID_TIME != curr_time && curr_time < min_time) { + rd = rd_iter; + min_time = curr_time; + } + } + rrdset_unlock(st); + if (NULL == rd) { + return 1; + } + pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + &page_info_array, &page_index); + if (pages_nr) { + /* conservative allocation, will reduce the size later if necessary */ + region_info_array = mallocz(sizeof(*region_info_array) * pages_nr); + } + is_first_region_initialized = 0; + region_points = 0; + + /* pages loop */ + for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) { + old_prev = prev; + prev = curr; + curr = &page_info_array[i]; + *pginfo_to_points(curr) = 0; /* initialize to invalid page */ + *pginfo_to_dt(curr) = 0; /* no known data collection interval yet */ + if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time || + curr->end_time < curr->start_time)) { + info("Ignoring page with invalid timestamps."); + prev = old_prev; + continue; + } + page_entries = curr->page_length / sizeof(storage_number); + fatal_assert(0 != page_entries); + if (likely(1 != page_entries)) { + dt = (curr->end_time - curr->start_time) / (page_entries - 1); + *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt); + if (unlikely(0 == *pginfo_to_dt(curr))) + *pginfo_to_dt(curr) = 1; + } else { + dt = 0; + } + for (j = 0, page_points = 0 ; j < page_entries ; ++j) { + uint8_t is_metric_out_of_order, is_metric_earlier_than_range; + + is_metric_earlier_than_range = 0; + is_metric_out_of_order = 0; + + current_position_time = curr->start_time + j * dt; + now = current_position_time / USEC_PER_SEC; + if (now > end_time) { /* there will be no more pages in the time range */ + break; + } + if (now < start_time) + is_metric_earlier_than_range = 1; + if (unlikely(current_position_time < max_time)) /* just went back in time */ + is_metric_out_of_order = 1; + if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) { + if (unlikely(is_metric_out_of_order)) + info("Ignoring metric with out of order timestamp."); + continue; /* next entry */ + } + /* here is a valid metric */ + ++page_points; + region_info_array[regions - 1].points = ++region_points; + max_time = current_position_time; + if (1 == page_points) + first_valid_time_in_page = current_position_time; + if (unlikely(!is_first_region_initialized)) { + fatal_assert(1 == regions); + /* this is the first region */ + region_info_array[0].start_time = current_position_time; + is_first_region_initialized = 1; + } + } + *pginfo_to_points(curr) = page_points; + if (0 == page_points) { + prev = old_prev; + continue; + } + + if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */ + fatal_assert(1 == page_points); + + if (likely(NULL != prev)) { /* get interval from previous page */ + *pginfo_to_dt(curr) = *pginfo_to_dt(prev); + } else { /* there is no previous page in the query */ + struct rrdeng_page_info db_page_info; + + /* go to database */ + pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time, + metrics_with_known_interval, &db_page_info); + if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME || + 0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */ + *pginfo_to_dt(curr) = rd->update_every; + } else { + unsigned db_entries; + usec_t db_dt; + + db_entries = db_page_info.page_length / sizeof(storage_number); + db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1); + *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt); + if (unlikely(0 == *pginfo_to_dt(curr))) + *pginfo_to_dt(curr) = 1; + + } + } + } + if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) { + info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32, + *pginfo_to_dt(prev), *pginfo_to_dt(curr)); + region_info_array[regions++ - 1].points -= page_points; + region_info_array[regions - 1].points = region_points = page_points; + region_info_array[regions - 1].start_time = first_valid_time_in_page; + } + if (*pginfo_to_dt(curr) > max_interval) + max_interval = *pginfo_to_dt(curr); + region_info_array[regions - 1].update_every = *pginfo_to_dt(curr); + } + if (page_info_array) + freez(page_info_array); + if (region_info_array) { + if (likely(is_first_region_initialized)) { + /* free unnecessary memory */ + region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions); + *region_info_arrayp = region_info_array; + *max_intervalp = max_interval; + } else { + /* empty result */ + freez(region_info_array); + } + } + return regions; +} + +/* + * Gets a handle for loading metrics from the database. + * The handle must be released with rrdeng_load_metric_final(). + */ +void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time) +{ + struct rrdeng_query_handle *handle; + struct rrdengine_instance *ctx; + unsigned pages_nr; + + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); + rrdimm_handle->start_time = start_time; + rrdimm_handle->end_time = end_time; + handle = &rrdimm_handle->rrdeng; + handle->next_page_time = start_time; + handle->now = start_time; + handle->position = 0; + handle->ctx = ctx; + handle->descr = NULL; + pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + NULL, &handle->page_index); + if (unlikely(NULL == handle->page_index || 0 == pages_nr)) + /* there are no metrics to load */ + handle->next_page_time = INVALID_TIME; +} + +/* Returns the metric and sets its timestamp into current_time */ +storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) +{ + struct rrdeng_query_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_descr *descr; + storage_number *page, ret; + unsigned position, entries; + usec_t next_page_time = 0, current_position_time, page_end_time = 0; + uint32_t page_length; + + handle = &rrdimm_handle->rrdeng; + if (unlikely(INVALID_TIME == handle->next_page_time)) { + return SN_EMPTY_SLOT; + } + ctx = handle->ctx; + if (unlikely(NULL == (descr = handle->descr))) { + /* it's the first call */ + next_page_time = handle->next_page_time * USEC_PER_SEC; + } else { + pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); + } + position = handle->position + 1; + + if (unlikely(NULL == descr || + position >= (page_length / sizeof(storage_number)))) { + /* We need to get a new page */ + if (descr) { + /* Drop old page's reference */ +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); +#endif + pg_cache_put(ctx, descr); + handle->descr = NULL; + handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; + if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { + goto no_more_metrics; + } + next_page_time = handle->next_page_time * USEC_PER_SEC; + } + + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, + next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); + if (NULL == descr) { + goto no_more_metrics; + } +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); +#endif + handle->descr = descr; + pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); + if (unlikely(INVALID_TIME == descr->start_time || + INVALID_TIME == page_end_time)) { + goto no_more_metrics; + } + if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { + /* we're in the middle of the page somewhere */ + entries = page_length / sizeof(storage_number); + position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / + (page_end_time - descr->start_time); + } else { + position = 0; + } + } + page = descr->pg_cache_descr->page; + ret = page[position]; + entries = page_length / sizeof(storage_number); + if (entries > 1) { + usec_t dt; + + dt = (page_end_time - descr->start_time) / (entries - 1); + current_position_time = descr->start_time + position * dt; + } else { + current_position_time = descr->start_time; + } + handle->position = position; + handle->now = current_position_time / USEC_PER_SEC; +/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); + The above assertion is an approximation and needs to take update_every into account */ + if (unlikely(handle->now >= rrdimm_handle->end_time)) { + /* next calls will not load any more metrics */ + handle->next_page_time = INVALID_TIME; + } + *current_time = handle->now; + return ret; + +no_more_metrics: + handle->next_page_time = INVALID_TIME; + return SN_EMPTY_SLOT; +} + +int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) +{ + struct rrdeng_query_handle *handle; + + handle = &rrdimm_handle->rrdeng; + return (INVALID_TIME == handle->next_page_time); +} + +/* + * Releases the database reference from the handle for loading metrics. + */ +void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) +{ + struct rrdeng_query_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_descr *descr; + + handle = &rrdimm_handle->rrdeng; + ctx = handle->ctx; + descr = handle->descr; + if (descr) { +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); +#endif + pg_cache_put(ctx, descr); + } +} + +time_t rrdeng_metric_latest_time(RRDDIM *rd) +{ + struct pg_cache_page_index *page_index; + + page_index = rd->state->page_index; + + return page_index->latest_time / USEC_PER_SEC; +} +time_t rrdeng_metric_oldest_time(RRDDIM *rd) +{ + struct pg_cache_page_index *page_index; + + page_index = rd->state->page_index; + + return page_index->oldest_time / USEC_PER_SEC; +} + +/* Also gets a reference for the page */ +void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr) +{ + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + void *page; + /* TODO: check maximum number of pages in page cache limit */ + + descr = pg_cache_create_descr(); + descr->id = id; /* TODO: add page type: metric, log, something? */ + page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; + pg_cache_descr->refcnt = 1; + + debug(D_RRDENGINE, "Created new page:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + *ret_descr = descr; + return page; +} + +/* The page must not be empty */ +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, + Word_t page_correlation_id) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Pvoid_t *PValue; + unsigned nr_committed_pages; + + if (unlikely(NULL == descr)) { + debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__); + return; + } + fatal_assert(descr->page_length); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0); + *PValue = descr; + nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) { + /* over 50% of pages have not been committed yet */ + + if (ctx->drop_metrics_under_page_cache_pressure && + nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* 100% of pages are dirty */ + struct rrdeng_cmd cmd; + + cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + } else { + if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) { + /* only print the first time */ + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data at risk of not being stored in the database, " + "please reduce disk load or use a faster disk.", ctx->dbfiles_path); + } + rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1); + rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1); + } + } + + pg_cache_put(ctx, descr); +} + +/* Gets a reference for the page */ +void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle) +{ + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + debug(D_RRDENGINE, "Reading existing page:"); + descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME); + if (NULL == descr) { + *handle = NULL; + + return NULL; + } + *handle = descr; + pg_cache_descr = descr->pg_cache_descr; + + return pg_cache_descr->page; +} + +/* Gets a reference for the page */ +void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle) +{ + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + debug(D_RRDENGINE, "Reading existing page:"); + descr = pg_cache_lookup(ctx, NULL, id, point_in_time); + if (NULL == descr) { + *handle = NULL; + + return NULL; + } + *handle = descr; + pg_cache_descr = descr->pg_cache_descr; + + return pg_cache_descr->page; +} + +/* + * Gathers Database Engine statistics. + * Careful when modifying this function. + * You must not change the indices of the statistics or user code will break. + * You must not exceed RRDENG_NR_STATS or it will crash. + */ +void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +{ + if (ctx == NULL) + return; + + struct page_cache *pg_cache = &ctx->pg_cache; + + array[0] = (uint64_t)ctx->stats.metric_API_producers; + array[1] = (uint64_t)ctx->stats.metric_API_consumers; + array[2] = (uint64_t)pg_cache->page_descriptors; + array[3] = (uint64_t)pg_cache->populated_pages; + array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages; + array[5] = (uint64_t)ctx->stats.pg_cache_insertions; + array[6] = (uint64_t)ctx->stats.pg_cache_deletions; + array[7] = (uint64_t)ctx->stats.pg_cache_hits; + array[8] = (uint64_t)ctx->stats.pg_cache_misses; + array[9] = (uint64_t)ctx->stats.pg_cache_backfills; + array[10] = (uint64_t)ctx->stats.pg_cache_evictions; + array[11] = (uint64_t)ctx->stats.before_compress_bytes; + array[12] = (uint64_t)ctx->stats.after_compress_bytes; + array[13] = (uint64_t)ctx->stats.before_decompress_bytes; + array[14] = (uint64_t)ctx->stats.after_decompress_bytes; + array[15] = (uint64_t)ctx->stats.io_write_bytes; + array[16] = (uint64_t)ctx->stats.io_write_requests; + array[17] = (uint64_t)ctx->stats.io_read_bytes; + array[18] = (uint64_t)ctx->stats.io_read_requests; + array[19] = (uint64_t)ctx->stats.io_write_extent_bytes; + array[20] = (uint64_t)ctx->stats.io_write_extents; + array[21] = (uint64_t)ctx->stats.io_read_extent_bytes; + array[22] = (uint64_t)ctx->stats.io_read_extents; + array[23] = (uint64_t)ctx->stats.datafile_creations; + array[24] = (uint64_t)ctx->stats.datafile_deletions; + array[25] = (uint64_t)ctx->stats.journalfile_creations; + array[26] = (uint64_t)ctx->stats.journalfile_deletions; + array[27] = (uint64_t)ctx->stats.page_cache_descriptors; + array[28] = (uint64_t)ctx->stats.io_errors; + array[29] = (uint64_t)ctx->stats.fs_errors; + array[30] = (uint64_t)global_io_errors; + array[31] = (uint64_t)global_fs_errors; + array[32] = (uint64_t)rrdeng_reserved_file_descriptors; + array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events; + array[34] = (uint64_t)global_pg_cache_over_half_dirty_events; + array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions; + array[36] = (uint64_t)global_flushing_pressure_page_deletions; + fatal_assert(RRDENG_NR_STATS == 37); +} + +/* Releases reference to page */ +void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) +{ + (void)ctx; + pg_cache_put(ctx, (struct rrdeng_page_descr *)handle); +} + +/* + * Returns 0 on success, negative on error + */ +int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb) +{ + struct rrdengine_instance *ctx; + int error; + uint32_t max_open_files; + + max_open_files = rlimit_nofile.rlim_cur / 4; + + /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */ + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE); + if (rrdeng_reserved_file_descriptors > max_open_files) { + error( + "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", + (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); + + rrd_stat_atomic_add(&global_fs_errors, 1); + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EMFILE; + } + + if (NULL == ctxp) { + ctx = &multidb_ctx; + memset(ctx, 0, sizeof(*ctx)); + } else { + *ctxp = ctx = callocz(1, sizeof(*ctx)); + } + ctx->global_compress_alg = RRD_LZ4; + if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) + page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; + ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE); + /* try to keep 5% of the page cache free */ + ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100; + if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB) + disk_space_mb = RRDENG_MIN_DISK_SPACE_MB; + ctx->max_disk_space = disk_space_mb * 1048576LLU; + strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1); + ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0'; + if (NULL == host) + strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN); + else + strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN); + + ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure; + ctx->metric_API_max_producers = 0; + ctx->quiesce = NO_QUIESCE; + ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */ + ctx->host = host; + + memset(&ctx->worker_config, 0, sizeof(ctx->worker_config)); + ctx->worker_config.ctx = ctx; + init_page_cache(ctx); + init_commit_log(ctx); + error = init_rrd_files(ctx); + if (error) { + goto error_after_init_rrd_files; + } + + init_completion(&ctx->rrdengine_completion); + fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config)); + /* wait for worker thread to initialize */ + wait_for_completion(&ctx->rrdengine_completion); + destroy_completion(&ctx->rrdengine_completion); + uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); + if (ctx->worker_config.error) { + goto error_after_rrdeng_worker; + } + error = metalog_init(ctx); + if (error) { + error("Failed to initialize metadata log file event loop."); + goto error_after_rrdeng_worker; + } + + return 0; + +error_after_rrdeng_worker: + finalize_rrd_files(ctx); +error_after_init_rrd_files: + free_page_cache(ctx); + if (ctx != &multidb_ctx) { + freez(ctx); + *ctxp = NULL; + } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EIO; +} + +/* + * Returns 0 on success, 1 on error + */ +int rrdeng_exit(struct rrdengine_instance *ctx) +{ + struct rrdeng_cmd cmd; + + if (NULL == ctx) { + return 1; + } + + /* TODO: add page to page cache */ + cmd.opcode = RRDENG_SHUTDOWN; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread)); + + finalize_rrd_files(ctx); + //metalog_exit(ctx->metalog_ctx); + free_page_cache(ctx); + + if (ctx != &multidb_ctx) { + freez(ctx); + } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return 0; +} + +void rrdeng_prepare_exit(struct rrdengine_instance *ctx) +{ + struct rrdeng_cmd cmd; + + if (NULL == ctx) { + return; + } + + init_completion(&ctx->rrdengine_completion); + cmd.opcode = RRDENG_QUIESCE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + /* wait for dbengine to quiesce */ + wait_for_completion(&ctx->rrdengine_completion); + destroy_completion(&ctx->rrdengine_completion); + + //metalog_prepare_exit(ctx->metalog_ctx); +} + diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h new file mode 100644 index 0000000..41375b9 --- /dev/null +++ b/database/engine/rrdengineapi.h @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINEAPI_H +#define NETDATA_RRDENGINEAPI_H + +#include "rrdengine.h" + +#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8) +#define RRDENG_MIN_DISK_SPACE_MB (64) + +#define RRDENG_NR_STATS (37) + +#define RRDENG_FD_BUDGET_PER_INSTANCE (50) + +extern int default_rrdeng_page_cache_mb; +extern int default_rrdeng_disk_quota_mb; +extern int default_multidb_disk_quota_mb; +extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure; +extern struct rrdengine_instance multidb_ctx; + +struct rrdeng_region_info { + time_t start_time; + int update_every; + unsigned points; +}; + +extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr); +extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, + Word_t page_correlation_id); +extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle); +extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle); +extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); + +extern void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid); +extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, + uuid_t *ret_uuid); + + +extern void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid); +extern void rrdeng_store_metric_init(RRDDIM *rd); +extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd); +extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); +extern int rrdeng_store_metric_finalize(RRDDIM *rd); +extern unsigned + rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list); +extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, + time_t start_time, time_t end_time); +extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time); +extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle); +extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); +extern time_t rrdeng_metric_latest_time(RRDDIM *rd); +extern time_t rrdeng_metric_oldest_time(RRDDIM *rd); +extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array); + +/* must call once before using anything */ +extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb); + +extern int rrdeng_exit(struct rrdengine_instance *ctx); +extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx); + +#endif /* NETDATA_RRDENGINEAPI_H */ diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c new file mode 100644 index 0000000..287b86b --- /dev/null +++ b/database/engine/rrdenginelib.c @@ -0,0 +1,294 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +#define BUFSIZE (512) + +/* Caller must hold descriptor lock */ +void print_page_cache_descr(struct rrdeng_page_descr *descr) +{ + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + char uuid_str[UUID_STR_LEN]; + char str[BUFSIZE + 1]; + int pos = 0; + + uuid_unparse_lower(*descr->id, uuid_str); + pos += snprintfz(str, BUFSIZE - pos, "page(%p) id=%s\n" + "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", + pg_cache_descr->page, uuid_str, + descr->page_length, + (uint64_t)descr->start_time, + (uint64_t)descr->end_time); + if (!descr->extent) { + pos += snprintfz(str + pos, BUFSIZE - pos, "N/A"); + } else { + pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset); + } + + snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt); + debug(D_RRDENGINE, "%s", str); +} + +void print_page_descr(struct rrdeng_page_descr *descr) +{ + char uuid_str[UUID_STR_LEN]; + char str[BUFSIZE + 1]; + int pos = 0; + + uuid_unparse_lower(*descr->id, uuid_str); + pos += snprintfz(str, BUFSIZE - pos, "id=%s\n" + "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", + uuid_str, + descr->page_length, + (uint64_t)descr->start_time, + (uint64_t)descr->end_time); + if (!descr->extent) { + pos += snprintfz(str + pos, BUFSIZE - pos, "N/A"); + } else { + pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset); + } + snprintfz(str + pos, BUFSIZE - pos, "\n\n"); + fputs(str, stderr); +} + +int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size) +{ + int ret; + uv_fs_t req; + uv_stat_t* s; + + ret = uv_fs_fstat(NULL, &req, file, NULL); + if (ret < 0) { + fatal("uv_fs_fstat: %s\n", uv_strerror(ret)); + } + fatal_assert(req.result == 0); + s = req.ptr; + if (!(s->st_mode & S_IFREG)) { + error("Not a regular file.\n"); + uv_fs_req_cleanup(&req); + return UV_EINVAL; + } + if (s->st_size < min_size) { + error("File length is too short.\n"); + uv_fs_req_cleanup(&req); + return UV_EINVAL; + } + *file_size = s->st_size; + uv_fs_req_cleanup(&req); + + return 0; +} + +/** + * Open file for I/O. + * + * @param path The full path of the file. + * @param flags Same flags as the open() system call uses. + * @param file On success sets (*file) to be the uv_file that was opened. + * @param direct Tries to open a file in direct I/O mode when direct=1, falls back to buffered mode if not possible. + * @return Returns UV error number that is < 0 on failure. 0 on success. + */ +int open_file_for_io(char *path, int flags, uv_file *file, int direct) +{ + uv_fs_t req; + int fd = -1, current_flags; + + fatal_assert(0 == direct || 1 == direct); + for ( ; direct >= 0 ; --direct) { +#ifdef __APPLE__ + /* Apple OS does not support O_DIRECT */ + direct = 0; +#endif + current_flags = flags; + if (direct) { + current_flags |= O_DIRECT; + } + fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + if ((direct) && (UV_EINVAL == fd)) { + error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path); + } else { + error("Failed to open file \"%s\".", path); + --direct; /* break the loop */ + } + } else { + fatal_assert(req.result >= 0); + *file = req.result; +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + --direct; /* break the loop */ + } + uv_fs_req_cleanup(&req); + } + + return fd; +} + +char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size) +{ + struct page_cache *pg_cache; + + pg_cache = &ctx->pg_cache; + snprintfz(str, size, + "metric_API_producers: %ld\n" + "metric_API_consumers: %ld\n" + "page_cache_total_pages: %ld\n" + "page_cache_descriptors: %ld\n" + "page_cache_populated_pages: %ld\n" + "page_cache_committed_pages: %ld\n" + "page_cache_insertions: %ld\n" + "page_cache_deletions: %ld\n" + "page_cache_hits: %ld\n" + "page_cache_misses: %ld\n" + "page_cache_backfills: %ld\n" + "page_cache_evictions: %ld\n" + "compress_before_bytes: %ld\n" + "compress_after_bytes: %ld\n" + "decompress_before_bytes: %ld\n" + "decompress_after_bytes: %ld\n" + "io_write_bytes: %ld\n" + "io_write_requests: %ld\n" + "io_read_bytes: %ld\n" + "io_read_requests: %ld\n" + "io_write_extent_bytes: %ld\n" + "io_write_extents: %ld\n" + "io_read_extent_bytes: %ld\n" + "io_read_extents: %ld\n" + "datafile_creations: %ld\n" + "datafile_deletions: %ld\n" + "journalfile_creations: %ld\n" + "journalfile_deletions: %ld\n" + "io_errors: %ld\n" + "fs_errors: %ld\n" + "global_io_errors: %ld\n" + "global_fs_errors: %ld\n" + "rrdeng_reserved_file_descriptors: %ld\n" + "pg_cache_over_half_dirty_events: %ld\n" + "global_pg_cache_over_half_dirty_events: %ld\n" + "flushing_pressure_page_deletions: %ld\n" + "global_flushing_pressure_page_deletions: %ld\n", + (long)ctx->stats.metric_API_producers, + (long)ctx->stats.metric_API_consumers, + (long)pg_cache->page_descriptors, + (long)ctx->stats.page_cache_descriptors, + (long)pg_cache->populated_pages, + (long)pg_cache->committed_page_index.nr_committed_pages, + (long)ctx->stats.pg_cache_insertions, + (long)ctx->stats.pg_cache_deletions, + (long)ctx->stats.pg_cache_hits, + (long)ctx->stats.pg_cache_misses, + (long)ctx->stats.pg_cache_backfills, + (long)ctx->stats.pg_cache_evictions, + (long)ctx->stats.before_compress_bytes, + (long)ctx->stats.after_compress_bytes, + (long)ctx->stats.before_decompress_bytes, + (long)ctx->stats.after_decompress_bytes, + (long)ctx->stats.io_write_bytes, + (long)ctx->stats.io_write_requests, + (long)ctx->stats.io_read_bytes, + (long)ctx->stats.io_read_requests, + (long)ctx->stats.io_write_extent_bytes, + (long)ctx->stats.io_write_extents, + (long)ctx->stats.io_read_extent_bytes, + (long)ctx->stats.io_read_extents, + (long)ctx->stats.datafile_creations, + (long)ctx->stats.datafile_deletions, + (long)ctx->stats.journalfile_creations, + (long)ctx->stats.journalfile_deletions, + (long)ctx->stats.io_errors, + (long)ctx->stats.fs_errors, + (long)global_io_errors, + (long)global_fs_errors, + (long)rrdeng_reserved_file_descriptors, + (long)ctx->stats.pg_cache_over_half_dirty_events, + (long)global_pg_cache_over_half_dirty_events, + (long)ctx->stats.flushing_pressure_page_deletions, + (long)global_flushing_pressure_page_deletions + ); + return str; +} + +int is_legacy_child(const char *machine_guid) +{ + uuid_t uuid; + char dbengine_file[FILENAME_MAX+1]; + + if (unlikely(!strcmp(machine_guid, "unittest-dbengine") || !strcmp(machine_guid, "dbengine-dataset") || + !strcmp(machine_guid, "dbengine-stress-test"))) { + return 1; + } + if (!uuid_parse(machine_guid, uuid)) { + uv_fs_t stat_req; + snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid); + int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL); + if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) { + //info("Found legacy engine folder \"%s\"", dbengine_file); + return 1; + } + } + return 0; +} + +int count_legacy_children(char *dbfiles_path) +{ + int ret; + uv_fs_t req; + uv_dirent_t dent; + int legacy_engines = 0; + + ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); + if (ret < 0) { + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); + return ret; + } + + while(UV_EOF != uv_fs_scandir_next(&req, &dent)) { + if (dent.type == UV_DIRENT_DIR) { + if (is_legacy_child(dent.name)) + legacy_engines++; + } + } + uv_fs_req_cleanup(&req); + return legacy_engines; +} + +int compute_multidb_diskspace() +{ + char multidb_disk_space_file[FILENAME_MAX + 1]; + FILE *fp; + int computed_multidb_disk_quota_mb = -1; + + snprintfz(multidb_disk_space_file, FILENAME_MAX, "%s/dbengine_multihost_size", netdata_configured_varlib_dir); + fp = fopen(multidb_disk_space_file, "r"); + if (likely(fp)) { + int rc = fscanf(fp, "%d", &computed_multidb_disk_quota_mb); + fclose(fp); + if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) { + errno = 0; + error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file); + computed_multidb_disk_quota_mb = -1; + } + } + + if (computed_multidb_disk_quota_mb == -1) { + int rc = count_legacy_children(netdata_configured_cache_dir); + if (likely(rc >= 0)) { + computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb; + info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb); + + fp = fopen(multidb_disk_space_file, "w"); + if (likely(fp)) { + fprintf(fp, "%d", computed_multidb_disk_quota_mb); + info("Created file '%s' to store the computed value", multidb_disk_space_file); + fclose(fp); + } else + error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file); + } + else + computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb; + } + + return computed_multidb_disk_quota_mb; +} diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h new file mode 100644 index 0000000..ebab93c --- /dev/null +++ b/database/engine/rrdenginelib.h @@ -0,0 +1,144 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGINELIB_H +#define NETDATA_RRDENGINELIB_H + +/* Forward declarations */ +struct rrdeng_page_descr; +struct rrdengine_instance; + +#define STR_HELPER(x) #x +#define STR(x) STR_HELPER(x) + +#define BITS_PER_ULONG (sizeof(unsigned long) * 8) + +#ifndef UUID_STR_LEN +#define UUID_STR_LEN (37) +#endif + +/* Taken from linux kernel */ +#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)])) + +#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) +#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) + +#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC) + +typedef uintptr_t rrdeng_stats_t; + +#ifdef __ATOMIC_RELAXED +#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED) +#define rrd_atomic_add_fetch(p, n) __atomic_add_fetch(p, n, __ATOMIC_RELAXED) +#else +#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n) +#define rrd_atomic_add_fetch(p, n) __sync_add_and_fetch(p, n) +#endif + +#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n) + +/* returns -1 if it didn't find the first cleared bit, the position otherwise. Starts from LSB. */ +static inline int find_first_zero(unsigned x) +{ + return ffs((int)(~x)) - 1; +} + +/* Starts from LSB. */ +static inline uint8_t check_bit(unsigned x, size_t pos) +{ + return !!(x & (1 << pos)); +} + +/* Starts from LSB. val is 0 or 1 */ +static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val) +{ + switch(val) { + case 0: + *x &= ~(1U << pos); + break; + case 1: + *x |= 1U << pos; + break; + default: + error("modify_bit() called with invalid argument."); + break; + } +} + +#define RRDENG_PATH_MAX (4096) + +/* returns old *ptr value */ +static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr, + unsigned long oldval, unsigned long newval) +{ + return __sync_val_compare_and_swap(ptr, oldval, newval); +} + +#ifndef O_DIRECT +/* Workaround for OS X */ +#define O_DIRECT (0) +#endif + +struct completion { + uv_mutex_t mutex; + uv_cond_t cond; + volatile unsigned completed; +}; + +static inline void init_completion(struct completion *p) +{ + p->completed = 0; + fatal_assert(0 == uv_cond_init(&p->cond)); + fatal_assert(0 == uv_mutex_init(&p->mutex)); +} + +static inline void destroy_completion(struct completion *p) +{ + uv_cond_destroy(&p->cond); + uv_mutex_destroy(&p->mutex); +} + +static inline void wait_for_completion(struct completion *p) +{ + uv_mutex_lock(&p->mutex); + while (0 == p->completed) { + uv_cond_wait(&p->cond, &p->mutex); + } + fatal_assert(1 == p->completed); + uv_mutex_unlock(&p->mutex); +} + +static inline void complete(struct completion *p) +{ + uv_mutex_lock(&p->mutex); + p->completed = 1; + uv_mutex_unlock(&p->mutex); + uv_cond_broadcast(&p->cond); +} + +static inline int crc32cmp(void *crcp, uLong crc) +{ + return (*(uint32_t *)crcp != crc); +} + +static inline void crc32set(void *crcp, uLong crc) +{ + *(uint32_t *)crcp = crc; +} + +extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr); +extern void print_page_descr(struct rrdeng_page_descr *descr); +extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); +extern int open_file_for_io(char *path, int flags, uv_file *file, int direct); +static inline int open_file_direct_io(char *path, int flags, uv_file *file) +{ + return open_file_for_io(path, flags, file, 1); +} +static inline int open_file_buffered_io(char *path, int flags, uv_file *file) +{ + return open_file_for_io(path, flags, file, 0); +} +extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size); +extern int compute_multidb_diskspace(); +extern int is_legacy_child(const char *machine_guid); + +#endif /* NETDATA_RRDENGINELIB_H */
\ No newline at end of file diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c new file mode 100644 index 0000000..a23abf3 --- /dev/null +++ b/database/engine/rrdenglocking.c @@ -0,0 +1,241 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx) +{ + struct page_cache_descr *pg_cache_descr; + + pg_cache_descr = mallocz(sizeof(*pg_cache_descr)); + rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, 1); + pg_cache_descr->page = NULL; + pg_cache_descr->flags = 0; + pg_cache_descr->prev = pg_cache_descr->next = NULL; + pg_cache_descr->refcnt = 0; + pg_cache_descr->waiters = 0; + fatal_assert(0 == uv_cond_init(&pg_cache_descr->cond)); + fatal_assert(0 == uv_mutex_init(&pg_cache_descr->mutex)); + + return pg_cache_descr; +} + +void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr) +{ + uv_cond_destroy(&pg_cache_descr->cond); + uv_mutex_destroy(&pg_cache_descr->mutex); + freez(pg_cache_descr); + rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1); +} + +/* also allocates page cache descriptor if missing */ +void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, old_users, new_state, ret_state; + struct page_cache_descr *pg_cache_descr = NULL; + uint8_t we_locked; + + we_locked = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(we_locked)) { + fatal_assert(old_state & PG_CACHE_DESCR_LOCKED); + new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + fatal_assert(0 == old_users); + continue; /* spin */ + } + if (0 == old_state) { + /* no page cache descriptor has been allocated */ + + if (NULL == pg_cache_descr) { + pg_cache_descr = rrdeng_create_pg_cache_descr(ctx); + } + new_state = PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, 0, new_state); + if (0 == ret_state) { + we_locked = 1; + descr->pg_cache_descr = pg_cache_descr; + pg_cache_descr->descr = descr; + pg_cache_descr = NULL; /* make sure we don't free pg_cache_descr */ + /* retry */ + continue; + } + continue; /* spin */ + } + /* page cache descriptor is already allocated */ + if (unlikely(!(old_state & PG_CACHE_DESCR_ALLOCATED))) { + fatal("Invalid page cache descriptor locking state:%#lX", old_state); + } + new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT; + new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + /* spin */ + } + + if (pg_cache_descr) { + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + } + pg_cache_descr = descr->pg_cache_descr; + uv_mutex_lock(&pg_cache_descr->mutex); +} + +void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, new_state, ret_state, old_users; + struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL; + uint8_t we_locked; + + uv_mutex_unlock(&descr->pg_cache_descr->mutex); + + we_locked = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(we_locked)) { + fatal_assert(0 == old_users); + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0); + if (old_state == ret_state) { + /* success */ + rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr); + return; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + fatal_assert(0 == old_users); + continue; /* spin */ + } + fatal_assert(old_state & PG_CACHE_DESCR_ALLOCATED); + pg_cache_descr = descr->pg_cache_descr; + /* caller is the only page cache descriptor user and there are no pending references on the page */ + if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) && + !pg_cache_descr->flags && !pg_cache_descr->refcnt) { + fatal_assert(!pg_cache_descr->waiters); + + new_state = PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + we_locked = 1; + delete_pg_cache_descr = pg_cache_descr; + descr->pg_cache_descr = NULL; + /* retry */ + continue; + } + continue; /* spin */ + } + fatal_assert(old_users > 0); + new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT; + new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + break; + } + /* spin */ + } +} + +/* + * Tries to deallocate page cache descriptor. If it fails, it postpones deallocation by setting the + * PG_CACHE_DESCR_DESTROY flag which will be eventually cleared by a different context after doing + * the deallocation. + */ +void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + unsigned long old_state, new_state, ret_state, old_users; + struct page_cache_descr *pg_cache_descr = NULL; + uint8_t just_locked, can_free, must_unlock; + + just_locked = 0; + can_free = 0; + must_unlock = 0; + while (1) { /* spin */ + old_state = descr->pg_cache_descr_state; + old_users = old_state >> PG_CACHE_DESCR_SHIFT; + + if (unlikely(just_locked)) { + fatal_assert(0 == old_users); + + must_unlock = 1; + just_locked = 0; + /* Try deallocate if there are no pending references on the page */ + if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) { + fatal_assert(!pg_cache_descr->waiters); + + descr->pg_cache_descr = NULL; + can_free = 1; + /* success */ + continue; + } + continue; /* spin */ + } + if (unlikely(must_unlock)) { + fatal_assert(0 == old_users); + + if (can_free) { + /* success */ + new_state = 0; + } else { + new_state = old_state | PG_CACHE_DESCR_DESTROY; + new_state &= ~PG_CACHE_DESCR_LOCKED; + } + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* unlocked */ + if (can_free) + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + return; + } + continue; /* spin */ + } + if (!(old_state & PG_CACHE_DESCR_ALLOCATED)) { + /* don't do anything */ + return; + } + if (old_state & PG_CACHE_DESCR_LOCKED) { + fatal_assert(0 == old_users); + continue; /* spin */ + } + /* caller is the only page cache descriptor user */ + if (0 == old_users) { + new_state = old_state | PG_CACHE_DESCR_LOCKED; + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + just_locked = 1; + pg_cache_descr = descr->pg_cache_descr; + /* retry */ + continue; + } + continue; /* spin */ + } + if (old_state & PG_CACHE_DESCR_DESTROY) { + /* don't do anything */ + return; + } + /* plant PG_CACHE_DESCR_DESTROY so that other contexts eventually free the page cache descriptor */ + new_state = old_state | PG_CACHE_DESCR_DESTROY; + + ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); + if (old_state == ret_state) { + /* success */ + return; + } + /* spin */ + } +}
\ No newline at end of file diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h new file mode 100644 index 0000000..127ddc9 --- /dev/null +++ b/database/engine/rrdenglocking.h @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDENGLOCKING_H +#define NETDATA_RRDENGLOCKING_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct page_cache_descr; + +extern struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx); +extern void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr); +extern void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +extern void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +extern void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); + +#endif /* NETDATA_RRDENGLOCKING_H */
\ No newline at end of file |