summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/README.md305
-rw-r--r--database/engine/cache.c15
-rw-r--r--database/engine/datafile.c15
-rw-r--r--database/engine/datafile.h1
-rw-r--r--database/engine/journalfile.c27
-rw-r--r--database/engine/journalfile.h13
-rw-r--r--database/engine/journalfile_v2.ksy.in (renamed from database/engine/journalfile.ksy)28
-rw-r--r--database/engine/metric.c62
-rw-r--r--database/engine/pagecache.c40
-rw-r--r--database/engine/pagecache.h4
-rw-r--r--database/engine/pdc.c17
-rw-r--r--database/engine/rrdengine.c202
-rw-r--r--database/engine/rrdengine.h19
-rwxr-xr-xdatabase/engine/rrdengineapi.c267
-rw-r--r--database/engine/rrdengineapi.h8
-rw-r--r--database/engine/rrdenginelib.c66
16 files changed, 410 insertions, 679 deletions
diff --git a/database/engine/README.md b/database/engine/README.md
index 664d40506..890018642 100644
--- a/database/engine/README.md
+++ b/database/engine/README.md
@@ -1,17 +1,9 @@
-<!--
-title: "Database engine"
-description: "Netdata's highly-efficient database engine use both RAM and disk for distributed, long-term storage of per-second metrics."
-custom_edit_url: "https://github.com/netdata/netdata/edit/master/database/engine/README.md"
-sidebar_label: "Database engine"
-learn_status: "Published"
-learn_topic_type: "Concepts"
-learn_rel_path: "Concepts"
--->
-
-# DBENGINE
+# Database engine
DBENGINE is the time-series database of Netdata.
+![image](https://user-images.githubusercontent.com/2662304/233838474-d4f8f0b9-61dc-4409-a708-97d403cd153a.png)
+
## Design
### Data Points
@@ -118,53 +110,13 @@ Tiers are supported in Netdata Agents with version `netdata-1.35.0.138.nightly`
Updating the higher **tiers** is automated, and it happens in real-time while data are being collected for **tier 0**.
-When the Netdata Agent starts, during the first data collection of each metric, higher tiers are automatically **backfilled** with data from lower tiers, so that the aggregation they provide will be accurate.
-
-3 tiers are enabled by default in Netdata, with the following configuration:
-
-```
-[db]
- mode = dbengine
-
- # per second data collection
- update every = 1
-
- # number of tiers used (1 to 5, 3 being default)
- storage tiers = 3
-
- # Tier 0, per second data
- dbengine multihost disk space MB = 256
-
- # Tier 1, per minute data
- dbengine tier 1 multihost disk space MB = 128
-
- # Tier 2, per hour data
- dbengine tier 2 multihost disk space MB = 64
-```
-
-The exact retention that can be achieved by each tier depends on the number of metrics collected. The more the metrics, the smaller the retention that will fit in a given size. The general rule is that Netdata needs about **1 byte per data point on disk for tier 0**, and **4 bytes per data point on disk for tier 1 and above**.
-
-So, for 1000 metrics collected per second and 256 MB for tier 0, Netdata will store about:
+When the Netdata Agent starts, during the first data collection of each metric, higher tiers are automatically **backfilled** with
+data from lower tiers, so that the aggregation they provide will be accurate.
-```
-256MB on disk / 1 byte per point / 1000 metrics => 256k points per metric / 86400 seconds per day = about 3 days
-```
-
-At tier 1 (per minute):
-
-```
-128MB on disk / 4 bytes per point / 1000 metrics => 32k points per metric / (24 hours * 60 minutes) = about 22 days
-```
+Configuring how the number of tiers and the disk space allocated to each tier is how you can
+[change how long netdata stores metrics](https://github.com/netdata/netdata/blob/master/docs/store/change-metrics-storage.md).
-At tier 2 (per hour):
-
-```
-64MB on disk / 4 bytes per point / 1000 metrics => 16k points per metric / 24 hours per day = about 2 years
-```
-
-Of course double the metrics, half the retention. There are more factors that affect retention. The number of ephemeral metrics (i.e. metrics that are collected for part of the time). The number of metrics that are usually constant over time (affecting compression efficiency). The number of restarts a Netdata Agents gets through time (because it has to break pages prematurely, increasing the metadata overhead). But the actual numbers should not deviate significantly from the above.
-
-### Data Loss
+### Data loss
Until **hot pages** and **dirty pages** are **flushed** to disk they are at risk (e.g. due to a crash, or
power failure), as they are stored only in memory.
@@ -172,36 +124,9 @@ power failure), as they are stored only in memory.
The supported way of ensuring high data availability is the use of Netdata Parents to stream the data in real-time to
multiple other Netdata agents.
-## Memory Requirements
-
-DBENGINE memory is related to the number of metrics concurrently being collected, the retention of the metrics on disk in relation with the queries running, and the number of metrics for which retention is maintained.
-
-### Memory for concurrently collected metrics
-
-DBENGINE is automatically sized to use memory according to this equation:
-
-```
-memory in KiB = METRICS x (TIERS - 1) x 4KiB x 2 + 32768 KiB
-```
-
-Where:
-- `METRICS`: the maximum number of concurrently collected metrics (dimensions) from the time the agent started.
-- `TIERS`: the number of storage tiers configured, by default 3 ( `-1` when using 3+ tiers)
-- `x 2`, to accommodate room for flushing data to disk
-- `x 4KiB`, the data segment size of each metric
-- `+ 32768 KiB`, 32 MB for operational caches
-
-So, for 2000 metrics (dimensions) in 3 storage tiers:
+## Memory requirements and retention
-```
-memory for 2k metrics = 2000 x (3 - 1) x 4 KiB x 2 + 32768 KiB = 64 MiB
-```
-
-For 100k concurrently collected metrics in 3 storage tiers:
-
-```
-memory for 100k metrics = 100000 x (3 - 1) x 4 KiB x 2 + 32768 KiB = 1.6 GiB
-```
+See (change how long netdata stores metrics)[https://github.com/netdata/netdata/edit/master/docs/store/change-metrics-storage.md]
#### Exceptions
@@ -262,216 +187,6 @@ The time-ranges of the queries running control the amount of shared memory requi
DBENGINE uses 150 bytes of memory for every metric for which retention is maintained but is not currently being collected.
----
-
---- OLD DOCS BELOW THIS POINT ---
-
----
-
-
-## Legacy configuration
-
-### v1.35.1 and prior
-
-These versions of the Agent do not support [Tiers](#Tiers). You could change the metric retention for the parent and
-all of its children only with the `dbengine multihost disk space MB` setting. This setting accounts the space allocation
-for the parent node and all of its children.
-
-To configure the database engine, look for the `page cache size MB` and `dbengine multihost disk space MB` settings in
-the `[db]` section of your `netdata.conf`.
-
-```conf
-[db]
- dbengine page cache size MB = 32
- dbengine multihost disk space MB = 256
-```
-
-### v1.23.2 and prior
-
-_For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five instances in total (`1 parent + 4 child nodes = 5 instances`).
-
-The Agent allocates resources for each instance separately using the `dbengine disk space MB` (**deprecated**) setting. If `dbengine disk space MB`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, which means the total disk space required to store all instances is, roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`.
-
-#### Backward compatibility
-
-All existing metrics belonging to child nodes are automatically converted to legacy dbengine instances and the localhost
-metrics are transferred to the multihost dbengine instance.
-
-All new child nodes are automatically transferred to the multihost dbengine instance and share its page cache and disk
-space. If you want to migrate a child node from its legacy dbengine instance to the multihost dbengine instance, you
-must delete the instance's directory, which is located in `/var/cache/netdata/MACHINE_GUID/dbengine`, after stopping the
-Agent.
-
-##### Information
-
-For more information about setting `[db].mode` on your nodes, in addition to other streaming configurations, see
-[streaming](https://github.com/netdata/netdata/blob/master/streaming/README.md).
-
-## Requirements & limitations
-
-### Memory
-
-Using database mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the
-available memory.
-
-There are explicit memory requirements **per** DB engine **instance**:
-
-- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what
- the user configured with `dbengine page cache size MB`.
-
-
-- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata.
-
- - roughly speaking this is 3% of the uncompressed disk space taken by the DB files.
-
- - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space
- footprint.
-
-An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space`
-options.
-
-You can use
-our [database engine calculator](https://github.com/netdata/netdata/blob/master/docs/store/change-metrics-storage.md#calculate-the-system-resources-ram-disk-space-needed-to-store-metrics)
-to validate the memory requirements for your particular system(s) and configuration (**out-of-date**).
-
-### Disk space
-
-There are explicit disk space requirements **per** DB engine **instance**:
-
-- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what the
- user configured with `dbengine multihost disk space` or `dbengine disk space`.
-
-### File descriptor
-
-The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or parent
-server). When configuring your system you should make sure there are at least 50 file descriptors available per
-`dbengine` instance.
-
-Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of
-the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take
-that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the
-Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use
-`[db].mode = dbengine`.
-
-If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to
-override the Netdata service by running `sudo systemctl edit netdata` and creating a file with contents:
-
-```sh
-[Service]
-LimitNOFILE=65536
-```
-
-For other types of services one can add the line:
-
-```sh
-ulimit -n 65536
-```
-
-at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing
-`/etc/sysctl.conf`. For linux that would be:
-
-```conf
-fs.file-max = 65536
-```
-
-In FreeBSD and OS X you change the lines like this:
-
-```conf
-kern.maxfilesperproc=65536
-kern.maxfiles=65536
-```
-
-You can apply the settings by running `sysctl -p` or by rebooting.
-
-## Files
-
-With the DB engine mode the metric data are stored in database files. These files are organized in pairs, the datafiles
-and their corresponding journalfiles, e.g.:
-
-```sh
-datafile-1-0000000001.ndf
-journalfile-1-0000000001.njf
-datafile-1-0000000002.ndf
-journalfile-1-0000000002.njf
-datafile-1-0000000003.ndf
-journalfile-1-0000000003.njf
-...
-```
-
-They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default
-location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user
-can safely delete some pairs of files when Netdata is stopped to manually free up some space.
-
-_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ You can also set up
-one or more [exporting connectors](https://github.com/netdata/netdata/blob/master/exporting/README.md) to send your Netdata metrics to other databases for long-term
-storage at lower granularity.
-
-## Operation
-
-The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store
-consecutive values generated from the data collectors. Those pages comprise the **Page Cache**.
-
-When those pages fill up, they are slowly compressed and flushed to disk. It can
-take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected every 1 second, to fill a
-page. Pages can be cut short when we stop Netdata or the DB engine instance so as to not lose the data. When we query
-the DB engine for data we trigger disk read I/O requests that fill the Page Cache with the requested pages and
-potentially evict cold (not recently used)
-pages.
-
-When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting
-the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated
-and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time.
-
-The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O
-traffic so as to create the minimum possible interference with other applications.
-
-## Evaluation
-
-We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the web
-API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a Netdata
-parent node. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a Seagate
-Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD.
-
-For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker
-thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis
-of the time-series is emulated and accelerated so that the worker threads can generate as many data points as possible
-without delays.
-
-We also defined 32 worker threads that perform queries on random metrics with semi-random time ranges. The starting time
-of the query is randomly selected between the beginning of the time-series and the time of the latest data point. The
-ending time is randomly selected between 1 second and 1 hour after the starting time. The pseudo-random numbers are
-generated with a uniform distribution.
-
-The data are written to the database at the same time as they are read from it. This is a concurrent read/write mixed
-workload with a duration of 60 seconds. The faster `dbengine` runs, the bigger the dataset size becomes since more data
-points will be generated. We set a page cache size of 64MiB for the two disk-bound scenarios. This way, the dataset size
-of the metric data is much bigger than the RAM that is being used for caching so as to trigger I/O requests most of the
-time. In our final scenario, we set the page cache size to 16 GiB. That way, the dataset fits in the page cache so as to
-avoid all disk bottlenecks.
-
-The reported numbers are the following:
-
-| device | page cache | dataset | reads/sec | writes/sec |
-|:------:|:----------:|--------:|----------:|-----------:|
-| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M |
-| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M |
-| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M |
-
-where "reads/sec" is the number of metric data points being read from the database via its API per second and
-"writes/sec" is the number of metric data points being written to the database per second.
-
-Notice that the HDD numbers are pretty high and not much slower than the SSD numbers. This is thanks to the database
-engine design being optimized for rotating media. In the database engine disk I/O requests are:
-- asynchronous to mask the high I/O latency of HDDs.
-- mostly large to reduce the amount of HDD seeking time.
-- mostly sequential to reduce the amount of HDD seeking time.
-- compressed to reduce the amount of required throughput.
-As a result, the HDD is not thousands of times slower than the SSD, which is typical for other workloads.
-An interesting observation to make is that the CPU-bound run (16 GiB page cache) generates fewer data than the SSD run
-(6.8 GiB vs 9.8 GiB). The reason is that the 32 reader threads in the SSD scenario are more frequently blocked by I/O,
-and generate a read load of 1.7M/sec, whereas in the CPU-bound scenario the read load is 70 times higher at 118M/sec.
-Consequently, there is a significant degree of interference by the reader threads, that slow down the writer threads.
-This is also possible because the interference effects are greater than the SSD impact on data generation throughput.
diff --git a/database/engine/cache.c b/database/engine/cache.c
index 4091684b2..bc3ba6b6a 100644
--- a/database/engine/cache.c
+++ b/database/engine/cache.c
@@ -1189,6 +1189,9 @@ premature_exit:
}
static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) {
+ internal_fatal(entry->start_time_s < 0 || entry->end_time_s < 0,
+ "DBENGINE CACHE: timestamps are negative");
+
__atomic_add_fetch(&cache->stats.workers_add, 1, __ATOMIC_RELAXED);
size_t partition = pgc_indexing_partition(cache, entry->metric_id);
@@ -1199,6 +1202,12 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) {
PGC_PAGE *page;
size_t spins = 0;
+ if(unlikely(entry->start_time_s < 0))
+ entry->start_time_s = 0;
+
+ if(unlikely(entry->end_time_s < 0))
+ entry->end_time_s = 0;
+
do {
if(++spins > 1)
__atomic_add_fetch(&cache->stats.insert_spins, 1, __ATOMIC_RELAXED);
@@ -1755,7 +1764,7 @@ PGC *pgc_create(const char *name,
cache->config.max_dirty_pages_per_call = max_dirty_pages_per_flush;
cache->config.pgc_save_init_cb = pgc_save_init_cb;
cache->config.pgc_save_dirty_cb = pgc_save_dirty_cb;
- cache->config.max_pages_per_inline_eviction = (max_pages_per_inline_eviction < 2) ? 2 : max_pages_per_inline_eviction;
+ cache->config.max_pages_per_inline_eviction = max_pages_per_inline_eviction;
cache->config.max_skip_pages_per_inline_eviction = (max_skip_pages_per_inline_eviction < 2) ? 2 : max_skip_pages_per_inline_eviction;
cache->config.max_flushes_inline = (max_flushes_inline < 1) ? 1 : max_flushes_inline;
cache->config.partitions = partitions < 1 ? (size_t)get_netdata_cpus() : partitions;
@@ -1946,7 +1955,7 @@ time_t pgc_page_update_every_s(PGC_PAGE *page) {
time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s) {
if(page->update_every_s == 0)
- page->update_every_s = update_every_s;
+ page->update_every_s = (uint32_t) update_every_s;
return page->update_every_s;
}
@@ -2083,7 +2092,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
struct section_pages *sp = *section_pages_pptr;
if(!netdata_spinlock_trylock(&sp->migration_to_v2_spinlock)) {
- internal_fatal(true, "DBENGINE: migration to journal v2 is already running for this section");
+ info("DBENGINE: migration to journal v2 for datafile %u is postponed, another jv2 indexer is already running for this section", datafile_fileno);
pgc_ll_unlock(cache, &cache->hot);
return;
}
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 286ae1e30..8c413d8dc 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -34,17 +34,6 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta
return datafile;
}
-void datafile_acquire_dup(struct rrdengine_datafile *df) {
- netdata_spinlock_lock(&df->users.spinlock);
-
- if(!df->users.lockers)
- fatal("DBENGINE: datafile is not acquired to duplicate");
-
- df->users.lockers++;
-
- netdata_spinlock_unlock(&df->users.spinlock);
-}
-
bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) {
bool ret;
@@ -390,8 +379,8 @@ static int scan_data_files_cmp(const void *a, const void *b)
/* Returns number of datafiles that were loaded or < 0 on error */
static int scan_data_files(struct rrdengine_instance *ctx)
{
- int ret;
- unsigned tier, no, matched_files, i,failed_to_load;
+ int ret, matched_files, failed_to_load, i;
+ unsigned tier, no;
uv_fs_t req;
uv_dirent_t dent;
struct rrdengine_datafile **datafiles, *datafile;
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index 274add91e..a08f3ae04 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -70,7 +70,6 @@ struct rrdengine_datafile {
} extent_queries;
};
-void datafile_acquire_dup(struct rrdengine_datafile *df);
bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason);
void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason);
bool datafile_acquire_for_deletion(struct rrdengine_datafile *df);
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index de2b909c0..9998ee540 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -40,7 +40,7 @@ static void update_metric_retention_and_granularity_by_uuid(
.section = (Word_t) ctx,
.first_time_s = first_time_s,
.last_time_s = last_time_s,
- .latest_update_every_s = update_every_s
+ .latest_update_every_s = (uint32_t) update_every_s
};
uuid_copy(entry.uuid, *uuid);
metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
@@ -617,7 +617,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
.section = (Word_t)ctx,
.first_time_s = vd.start_time_s,
.last_time_s = vd.end_time_s,
- .latest_update_every_s = vd.update_every_s,
+ .latest_update_every_s = (uint32_t) vd.update_every_s,
};
uuid_copy(entry.uuid, *temp_id);
@@ -911,15 +911,10 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
for (size_t i=0; i < entries; i++) {
time_t start_time_s = header_start_time_s + metric->delta_start_s;
time_t end_time_s = header_start_time_s + metric->delta_end_s;
- time_t update_every_s = (metric->entries > 1) ? ((end_time_s - start_time_s) / (entries - 1)) : 0;
+
update_metric_retention_and_granularity_by_uuid(
- ctx, &metric->uuid, start_time_s, end_time_s, update_every_s, now_s);
+ ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
-#ifdef NETDATA_INTERNAL_CHECKS
- struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
- fatal_assert(uuid_compare(metric_list_header->uuid, metric->uuid) == 0);
- fatal_assert(metric->entries == metric_list_header->entries);
-#endif
metric++;
}
@@ -1038,7 +1033,7 @@ static int journalfile_metric_compare (const void *item1, const void *item2)
const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
- return uuid_compare(*(metric1->uuid), *(metric2->uuid));
+ return memcmp(metric1->uuid, metric2->uuid, sizeof(uuid_t));
}
@@ -1084,6 +1079,7 @@ void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void
metric->page_offset = pages_offset;
metric->delta_start_s = (uint32_t)(metric_info->first_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
metric->delta_end_s = (uint32_t)(metric_info->last_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
+ metric->update_every_s = 0;
return ++metric;
}
@@ -1128,7 +1124,7 @@ void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *
data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
data_page->extent_index = page_info->extent_index;
- data_page->update_every_s = page_info->update_every_s;
+ data_page->update_every_s = (uint32_t) page_info->update_every_s;
data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length);
data_page->type = 0;
@@ -1136,7 +1132,8 @@ void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *
}
// Must be recorded in metric_info->entries
-void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info)
+static void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info,
+ struct journal_metric_list *current_metric)
{
Pvoid_t *PValue;
@@ -1148,13 +1145,16 @@ void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void
Word_t index_time = 0;
bool first = true;
struct jv2_page_info *page_info;
+ uint32_t update_every_s = 0;
while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) {
page_info = *PValue;
// Write one descriptor and return the next data page location
data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
+ update_every_s = (uint32_t) page_info->update_every_s;
if (NULL == data_page)
break;
}
+ current_metric->update_every_s = update_every_s;
return data_page;
}
@@ -1291,6 +1291,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
// Calculate current UUID offset from start of file. We will store this in the data page header
uint32_t uuid_offset = data - data_start;
+ struct journal_metric_list *current_metric = (void *) data;
// Write the UUID we are processing
data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset);
if (unlikely(!data))
@@ -1308,7 +1309,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
uuid_offset);
// Start writing descr @ time
- void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info);
+ void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info, current_metric);
if (unlikely(!page_trailer))
break;
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 5fbcc90fa..f6be6bcd9 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -59,9 +59,9 @@ static inline uint64_t journalfile_current_size(struct rrdengine_journalfile *jo
// Journal v2 structures
-#define JOURVAL_V2_MAGIC (0x01221019)
-#define JOURVAL_V2_REBUILD_MAGIC (0x00221019)
-#define JOURVAL_V2_SKIP_MAGIC (0x02221019)
+#define JOURVAL_V2_MAGIC (0x01230317)
+#define JOURVAL_V2_REBUILD_MAGIC (0x00230317)
+#define JOURVAL_V2_SKIP_MAGIC (0x02230317)
struct journal_v2_block_trailer {
union {
@@ -93,13 +93,14 @@ struct journal_page_list {
};
// UUID_LIST
-// 32 bytes
+// 36 bytes
struct journal_metric_list {
uuid_t uuid;
- uint32_t entries; // Number of entries
- uint32_t page_offset; // OFFSET that contains entries * struct( journal_page_list )
+ uint32_t entries; // Number of entries
+ uint32_t page_offset; // OFFSET that contains entries * struct( journal_page_list )
uint32_t delta_start_s; // Min time of metric
uint32_t delta_end_s; // Max time of metric (to be used to populate page_index)
+ uint32_t update_every_s; // Last update every for this metric in this journal (last page collected)
};
// 16 bytes
diff --git a/database/engine/journalfile.ksy b/database/engine/journalfile_v2.ksy.in
index 858db83d4..6a656bc45 100644
--- a/database/engine/journalfile.ksy
+++ b/database/engine/journalfile_v2.ksy.in
@@ -1,6 +1,9 @@
meta:
- id: netdata_journalfile_v2
+ id: journalfile_v2`'ifdef(`VIRT_MEMBERS',`_virtmemb')
endian: le
+ application: netdata
+ file-extension: njfv2
+ license: GPL-3.0-or-later
seq:
- id: journal_v2_header
@@ -19,12 +22,14 @@ seq:
- id: metric_trailer
type: journal_v2_block_trailer
- id: page_blocs
- type: jounral_v2_page_blocs
+ type: journal_v2_page_block
+ repeat: expr
+ repeat-expr: _root.journal_v2_header.metric_count
+ - id: padding
size: _root._io.size - _root._io.pos - 4
- id: journal_file_trailer
type: journal_v2_block_trailer
-
types:
journal_v2_metric_list:
seq:
@@ -38,11 +43,13 @@ types:
type: u4
- id: delta_end_s
type: u4
- instances:
+ifdef(`VIRT_MEMBERS',
+` instances:
page_block:
type: journal_v2_page_block
io: _root._io
pos: page_offset
+')dnl
journal_v2_page_hdr:
seq:
- id: crc
@@ -69,11 +76,13 @@ types:
type: u1
- id: reserved
type: u1
- instances:
+ifdef(`VIRT_MEMBERS',
+` instances:
extent:
io: _root._io
type: journal_v2_extent_list
pos: _root.journal_v2_header.extent_offset + (extent_idx * 16)
+')dnl
journal_v2_header:
seq:
- id: magic
@@ -106,11 +115,13 @@ types:
type: u4
- id: data
type: u8
- instances:
+ifdef(`VIRT_MEMBERS',
+` instances:
trailer:
io: _root._io
type: journal_v2_block_trailer
pos: _root._io.size - 4
+')dnl
journal_v2_block_trailer:
seq:
- id: checksum
@@ -137,8 +148,3 @@ types:
repeat-expr: hdr.entries
- id: block_trailer
type: journal_v2_block_trailer
- jounral_v2_page_blocs:
- seq:
- - id: blocs
- type: journal_v2_page_block
- repeat: eos
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 9dc9d9ebc..6b65df9bb 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -105,7 +105,7 @@ static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
}
static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
- bool has_retention = (metric->first_time_s || metric->latest_time_s_clean || metric->latest_time_s_hot);
+ bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0);
if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
metric->flags |= METRIC_FLAG_HAS_RETENTION;
@@ -210,8 +210,8 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
METRIC *metric = allocation;
uuid_copy(metric->uuid, entry->uuid);
metric->section = entry->section;
- metric->first_time_s = entry->first_time_s;
- metric->latest_time_s_clean = entry->last_time_s;
+ metric->first_time_s = MAX(0, entry->first_time_s);
+ metric->latest_time_s_clean = MAX(0, entry->last_time_s);
metric->latest_time_s_hot = 0;
metric->latest_update_every_s = entry->latest_update_every_s;
metric->writer = 0;
@@ -388,6 +388,11 @@ Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
}
bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+ internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+
+ if(unlikely(first_time_s < 0))
+ return false;
+
netdata_spinlock_lock(&metric->spinlock);
metric->first_time_s = first_time_s;
metric_has_retention_unsafe(mrg, metric);
@@ -397,12 +402,25 @@ bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t
}
void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
-
+ internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
+ "DBENGINE METRIC: timestamp is negative");
internal_fatal(first_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric first time is in the future");
internal_fatal(last_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric last time is in the future");
+ if(unlikely(first_time_s < 0))
+ first_time_s = 0;
+
+ if(unlikely(last_time_s < 0))
+ last_time_s = 0;
+
+ if(unlikely(update_every_s < 0))
+ update_every_s = 0;
+
+ if(unlikely(!first_time_s && !last_time_s && !update_every_s))
+ return;
+
netdata_spinlock_lock(&metric->spinlock);
if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
@@ -412,16 +430,18 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
metric->latest_time_s_clean = last_time_s;
if(likely(update_every_s))
- metric->latest_update_every_s = update_every_s;
+ metric->latest_update_every_s = (uint32_t) update_every_s;
}
else if(unlikely(!metric->latest_update_every_s && update_every_s))
- metric->latest_update_every_s = update_every_s;
+ metric->latest_update_every_s = (uint32_t) update_every_s;
metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
}
bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+ internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+
bool ret = false;
netdata_spinlock_lock(&metric->spinlock);
@@ -474,6 +494,11 @@ void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *f
}
bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+ internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+
+ if(unlikely(latest_time_s < 0))
+ return false;
+
netdata_spinlock_lock(&metric->spinlock);
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
@@ -487,9 +512,6 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric,
if(unlikely(!metric->first_time_s))
metric->first_time_s = latest_time_s;
-// if(unlikely(metric->first_time_s > latest_time_s))
-// metric->first_time_s = latest_time_s;
-
metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
return true;
@@ -517,7 +539,7 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
page_first_time_s = pgc_page_start_time_s(page);
page_end_time_s = pgc_page_end_time_s(page);
- if ((is_hot || is_dirty) && page_first_time_s < min_first_time_s)
+ if ((is_hot || is_dirty) && page_first_time_s > 0 && page_first_time_s < min_first_time_s)
min_first_time_s = page_first_time_s;
if (is_dirty && page_end_time_s > max_end_time_s)
@@ -548,18 +570,20 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
}
bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+ internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
+ if(unlikely(latest_time_s < 0))
+ return false;
+
netdata_spinlock_lock(&metric->spinlock);
metric->latest_time_s_hot = latest_time_s;
if(unlikely(!metric->first_time_s))
metric->first_time_s = latest_time_s;
-// if(unlikely(metric->first_time_s > latest_time_s))
-// metric->first_time_s = latest_time_s;
-
metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
return true;
@@ -574,23 +598,27 @@ time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
}
bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- if(!update_every_s)
+ internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
+
+ if(update_every_s <= 0)
return false;
netdata_spinlock_lock(&metric->spinlock);
- metric->latest_update_every_s = update_every_s;
+ metric->latest_update_every_s = (uint32_t) update_every_s;
netdata_spinlock_unlock(&metric->spinlock);
return true;
}
bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- if(!update_every_s)
+ internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
+
+ if(update_every_s <= 0)
return false;
netdata_spinlock_lock(&metric->spinlock);
if(!metric->latest_update_every_s)
- metric->latest_update_every_s = update_every_s;
+ metric->latest_update_every_s = (uint32_t) update_every_s;
netdata_spinlock_unlock(&metric->spinlock);
return true;
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index b4902d784..02d08a164 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -99,11 +99,6 @@ inline TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t
return PAGE_IS_IN_RANGE;
}
-static int journal_metric_uuid_compare(const void *key, const void *metric)
-{
- return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
-}
-
static inline struct page_details *pdc_find_page_for_time(
Pcvoid_t PArray,
time_t wanted_time_s,
@@ -310,7 +305,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
pd->first_time_s = page_start_time_s;
pd->last_time_s = page_end_time_s;
pd->page_length = page_length;
- pd->update_every_s = page_update_every_s;
+ pd->update_every_s = (uint32_t) page_update_every_s;
pd->page = (open_cache_mode) ? NULL : page;
pd->status |= tags;
@@ -581,7 +576,7 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR
.metric_id = metric_id,
.start_time_s = page_first_time_s,
.end_time_s = page_last_time_s,
- .update_every_s = page_update_every_s,
+ .update_every_s = (uint32_t) page_update_every_s,
.data = datafile,
.size = 0,
.custom_data = (uint8_t *) &ei,
@@ -635,7 +630,7 @@ void add_page_details_from_journal_v2(PGC_PAGE *page, void *JudyL_pptr) {
pd->last_time_s = pgc_page_end_time_s(page);
pd->datafile.ptr = datafile;
pd->page_length = ei->page_length;
- pd->update_every_s = pgc_page_update_every_s(page);
+ pd->update_every_s = (uint32_t) pgc_page_update_every_s(page);
pd->metric_id = metric_id;
pd->status |= PDC_PAGE_DISK_PENDING | PDC_PAGE_SOURCE_JOURNAL_V2 | PDC_PAGE_DATAFILE_ACQUIRED;
}
@@ -774,7 +769,10 @@ inline void rrdeng_prep_wait(PDC *pdc) {
}
}
-void rrdeng_prep_query(PDC *pdc) {
+void rrdeng_prep_query(struct page_details_control *pdc, bool worker) {
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_QUERY);
+
size_t pages_to_load = 0;
pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric,
pdc->start_time_s * USEC_PER_SEC,
@@ -785,10 +783,10 @@ void rrdeng_prep_query(PDC *pdc) {
if (pages_to_load && pdc->page_list_JudyL) {
pdc_acquire(pdc); // we get 1 for the 1st worker in the chain: do_read_page_list_work()
usec_t start_ut = now_monotonic_usec();
-// if(likely(priority == STORAGE_PRIORITY_BEST_EFFORT))
-// dbengine_load_page_list_directly(ctx, handle->pdc);
-// else
- pdc_route_asynchronously(pdc->ctx, pdc);
+ if(likely(pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS))
+ pdc_route_synchronously(pdc->ctx, pdc);
+ else
+ pdc_route_asynchronously(pdc->ctx, pdc);
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_to_route, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
}
else
@@ -797,6 +795,9 @@ void rrdeng_prep_query(PDC *pdc) {
completion_mark_complete(&pdc->prep_completion);
pdc_release_and_destroy_if_unreferenced(pdc, true, true);
+
+ if(worker)
+ worker_is_idle();
}
/**
@@ -827,7 +828,11 @@ void pg_cache_preload(struct rrdeng_query_handle *handle) {
if(ctx_is_available_for_queries(handle->ctx)) {
handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread
- rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL);
+
+ if(unlikely(handle->pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS))
+ rrdeng_prep_query(handle->pdc, false);
+ else
+ rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL);
}
else {
completion_mark_complete(&handle->pdc->prep_completion);
@@ -924,7 +929,8 @@ struct pgc_page *pg_cache_lookup_next(
else {
if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED);
- pd->update_every_s = page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s);
+ page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s);
+ pd->update_every_s = (uint32_t) page_update_every_s;
}
size_t entries_by_size = page_entries_by_size(page_length, CTX_POINT_SIZE_BYTES(ctx));
@@ -1009,7 +1015,7 @@ void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s
.metric_id = metric_id,
.start_time_s = start_time_s,
.end_time_s = end_time_s,
- .update_every_s = update_every_s,
+ .update_every_s = (uint32_t) update_every_s,
.size = 0,
.data = datafile,
.custom_data = (uint8_t *) &ext_io_data,
@@ -1069,6 +1075,8 @@ void pgc_and_mrg_initialize(void)
main_cache_size = target_cache_size - extent_cache_size;
}
+ extent_cache_size += (size_t)(default_rrdeng_extent_cache_mb * 1024ULL * 1024ULL);
+
main_cache = pgc_create(
"main_cache",
main_cache_size,
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 9ab7db078..5242db89e 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -45,16 +45,14 @@ struct rrdeng_page_info {
};
struct pg_alignment {
- uint32_t page_position;
uint32_t refcount;
- uint16_t initial_slots;
};
struct rrdeng_query_handle;
struct page_details_control;
void rrdeng_prep_wait(struct page_details_control *pdc);
-void rrdeng_prep_query(struct page_details_control *pdc);
+void rrdeng_prep_query(struct page_details_control *pdc, bool worker);
void pg_cache_preload(struct rrdeng_query_handle *handle);
struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, time_t last_update_every_s, size_t *entries);
void pgc_and_mrg_initialize(void);
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
index 8b8e71958..42fb2f6de 100644
--- a/database/engine/pdc.c
+++ b/database/engine/pdc.c
@@ -692,8 +692,9 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
vd.page_length > RRDENG_BLOCK_SIZE ||
vd.start_time_s > vd.end_time_s ||
(now_s && vd.end_time_s > now_s) ||
- vd.start_time_s == 0 ||
- vd.end_time_s == 0 ||
+ vd.start_time_s <= 0 ||
+ vd.end_time_s <= 0 ||
+ vd.update_every_s < 0 ||
(vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
(vd.update_every_s == 0 && vd.entries > 1)
)
@@ -835,7 +836,7 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
uuid_unparse_lower(descr->uuid, uuid);
used_descr = true;
}
- else if (epdl) {
+ else {
struct page_details *pd = NULL;
Word_t start = 0;
@@ -855,7 +856,7 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
}
}
- if(!used_epdl && !used_descr && epdl && epdl->pdc) {
+ if(!used_epdl && !used_descr && epdl->pdc) {
start_time_s = epdl->pdc->start_time_s;
end_time_s = epdl->pdc->end_time_s;
}
@@ -1059,7 +1060,7 @@ static bool epdl_populate_pages_from_extent_data(
.metric_id = metric_id,
.start_time_s = vd.start_time_s,
.end_time_s = vd.end_time_s,
- .update_every_s = vd.update_every_s,
+ .update_every_s = (uint32_t) vd.update_every_s,
.size = (size_t) ((page_data == DBENGINE_EMPTY_PAGE) ? 0 : vd.page_length),
.data = page_data
};
@@ -1150,6 +1151,9 @@ static inline void datafile_extent_read_free(void *buffer) {
}
void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *epdl, bool worker) {
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
+
size_t *statistics_counter = NULL;
PDC_PAGE_STATUS not_loaded_pages_tag = 0, loaded_pages_tag = 0;
@@ -1172,9 +1176,6 @@ void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *e
goto cleanup;
}
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
-
bool extent_found_in_cache = false;
void *extent_compressed_data = NULL;
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index d64868f03..7811a5eaa 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -16,6 +16,24 @@ unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
#endif
+struct rrdeng_cmd {
+ struct rrdengine_instance *ctx;
+ enum rrdeng_opcode opcode;
+ void *data;
+ struct completion *completion;
+ enum storage_priority priority;
+ dequeue_callback_t dequeue_cb;
+
+ struct {
+ struct rrdeng_cmd *prev;
+ struct rrdeng_cmd *next;
+ } queue;
+};
+
+static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker);
+static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker);
+static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker);
+
struct rrdeng_main {
uv_thread_t thread;
uv_loop_t loop;
@@ -45,7 +63,6 @@ struct rrdeng_main {
struct {
size_t dispatched;
size_t executing;
- size_t pending_cb;
} atomics;
} work_cmd;
@@ -132,8 +149,22 @@ static void work_request_init(void) {
);
}
-static inline bool work_request_full(void) {
- return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS);
+enum LIBUV_WORKERS_STATUS {
+ LIBUV_WORKERS_RELAXED,
+ LIBUV_WORKERS_STRESSED,
+ LIBUV_WORKERS_CRITICAL,
+};
+
+static inline enum LIBUV_WORKERS_STATUS work_request_full(void) {
+ size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED);
+
+ if(dispatched >= (size_t)(libuv_worker_threads))
+ return LIBUV_WORKERS_CRITICAL;
+
+ else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS))
+ return LIBUV_WORKERS_STRESSED;
+
+ return LIBUV_WORKERS_RELAXED;
}
static inline void work_done(struct rrdeng_work *work_request) {
@@ -147,12 +178,38 @@ static void work_standard_worker(uv_work_t *req) {
worker_is_busy(UV_EVENT_WORKER_INIT);
struct rrdeng_work *work_request = req->data;
+
work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req);
worker_is_idle();
+ if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) {
+ internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted");
+
+ while(1) {
+ struct rrdeng_cmd cmd = rrdeng_deq_cmd(true);
+ if (cmd.opcode == RRDENG_OPCODE_NOOP)
+ break;
+
+ worker_is_busy(UV_EVENT_WORKER_INIT);
+ switch (cmd.opcode) {
+ case RRDENG_OPCODE_EXTENT_READ:
+ worker_dispatch_extent_read(cmd, true);
+ break;
+
+ case RRDENG_OPCODE_QUERY:
+ worker_dispatch_query_prep(cmd, true);
+ break;
+
+ default:
+ fatal("DBENGINE: Opcode should not be executed synchronously");
+ break;
+ }
+ worker_is_idle();
+ }
+ }
+
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
// signal the event loop a worker is available
fatal_assert(0 == uv_async_send(&rrdeng_main.async));
@@ -167,7 +224,6 @@ static void after_work_standard_callback(uv_work_t* req, int status) {
work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status);
work_done(work_request);
- __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
worker_is_idle();
}
@@ -369,20 +425,6 @@ void wal_release(WAL *wal) {
// ----------------------------------------------------------------------------
// command queue cache
-struct rrdeng_cmd {
- struct rrdengine_instance *ctx;
- enum rrdeng_opcode opcode;
- void *data;
- struct completion *completion;
- enum storage_priority priority;
- dequeue_callback_t dequeue_cb;
-
- struct {
- struct rrdeng_cmd *prev;
- struct rrdeng_cmd *next;
- } queue;
-};
-
static void rrdeng_cmd_queue_init(void) {
rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes",
sizeof(struct rrdeng_cmd),
@@ -465,14 +507,33 @@ static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PR
return false;
}
-static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
+#define opcode_empty (struct rrdeng_cmd) { \
+ .ctx = NULL, \
+ .opcode = RRDENG_OPCODE_NOOP, \
+ .priority = STORAGE_PRIORITY_BEST_EFFORT, \
+ .completion = NULL, \
+ .data = NULL, \
+}
+
+static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
struct rrdeng_cmd *cmd = NULL;
+ enum LIBUV_WORKERS_STATUS status = work_request_full();
+
+ STORAGE_PRIORITY min_priority, max_priority;
+ min_priority = STORAGE_PRIORITY_INTERNAL_DBENGINE;
+ max_priority = (status != LIBUV_WORKERS_RELAXED) ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1;
- STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_BEST_EFFORT;
+ if(from_worker) {
+ if(status == LIBUV_WORKERS_CRITICAL)
+ return opcode_empty;
+
+ min_priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP;
+ max_priority = STORAGE_PRIORITY_BEST_EFFORT;
+ }
// find an opcode to execute from the queue
netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
- for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) {
+ for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) {
cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
if(cmd) {
@@ -508,13 +569,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
aral_freez(rrdeng_main.cmd_queue.ar, cmd);
}
else
- ret = (struct rrdeng_cmd) {
- .ctx = NULL,
- .opcode = RRDENG_OPCODE_NOOP,
- .priority = STORAGE_PRIORITY_BEST_EFFORT,
- .completion = NULL,
- .data = NULL,
- };
+ ret = opcode_empty;
return ret;
}
@@ -927,11 +982,6 @@ struct uuid_first_time_s {
size_t df_index_oldest;
};
-static int journal_metric_compare(const void *key, const void *metric)
-{
- return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
-}
-
struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
@@ -987,7 +1037,10 @@ void find_uuid_first_time(
if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5)
continue;
- struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare);
+ struct journal_metric_list *live_entry =
+ bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,
+ sizeof(*uuid_list), journal_metric_uuid_compare);
+
if (!live_entry) {
// Not found in this journal
not_matching_bsearches++;
@@ -1087,13 +1140,20 @@ void find_uuid_first_time(
}
static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED);
-
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS);
struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile;
struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0);
+
+ if (unlikely(!j2_header)) {
+ if (worker)
+ worker_is_idle();
+ return;
+ }
+
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED);
+
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
size_t count = j2_header->metric_count;
@@ -1348,14 +1408,9 @@ static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused
return data;
}
-static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
- ;
-}
-
static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
- worker_is_busy(UV_EVENT_DBENGINE_QUERY);
PDC *pdc = data;
- rrdeng_prep_query(pdc);
+ rrdeng_prep_query(pdc, true);
return data;
}
@@ -1435,21 +1490,28 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX);
count = 0;
while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) {
+ if(journalfile_v2_data_available(datafile->journalfile)) {
+ // journal file v2 is already there for this datafile
+ datafile = datafile->next;
+ continue;
+ }
netdata_spinlock_lock(&datafile->writers.spinlock);
bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
netdata_spinlock_unlock(&datafile->writers.spinlock);
- if(!available)
+ if(!available) {
+ info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
+ datafile = datafile->next;
continue;
-
- if (unlikely(!journalfile_v2_data_available(datafile->journalfile))) {
- info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
- pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
- journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
- count++;
}
+ info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
+ pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
+ journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
+
+ count++;
+
datafile = datafile->next;
if (unlikely(!ctx_is_available_for_queries(ctx)))
@@ -1472,10 +1534,6 @@ static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused,
rrdeng_main.evictions_running--;
}
-static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
- ;
-}
-
static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
__atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED);
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
@@ -1604,6 +1662,26 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
return true;
}
+static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker) {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ EPDL *epdl = cmd.data;
+
+ if(from_worker)
+ epdl_find_extent_and_populate_pages(ctx, epdl, true);
+ else
+ work_dispatch(ctx, epdl, NULL, cmd.opcode, extent_read_tp_worker, NULL);
+}
+
+static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker) {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ PDC *pdc = cmd.data;
+
+ if(from_worker)
+ rrdeng_prep_query(pdc, true);
+ else
+ work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL);
+}
+
void dbengine_event_loop(void* arg) {
sanity_check();
uv_thread_set_name_np(pthread_self(), "DBENGINE");
@@ -1661,25 +1739,19 @@ void dbengine_event_loop(void* arg) {
/* wait for commands */
do {
worker_is_busy(RRDENG_OPCODE_MAX);
- cmd = rrdeng_deq_cmd();
+ cmd = rrdeng_deq_cmd(RRDENG_OPCODE_NOOP);
opcode = cmd.opcode;
worker_is_busy(opcode);
switch (opcode) {
- case RRDENG_OPCODE_EXTENT_READ: {
- struct rrdengine_instance *ctx = cmd.ctx;
- EPDL *epdl = cmd.data;
- work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read);
+ case RRDENG_OPCODE_EXTENT_READ:
+ worker_dispatch_extent_read(cmd, false);
break;
- }
- case RRDENG_OPCODE_QUERY: {
- struct rrdengine_instance *ctx = cmd.ctx;
- PDC *pdc = cmd.data;
- work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query);
+ case RRDENG_OPCODE_QUERY:
+ worker_dispatch_query_prep(cmd, false);
break;
- }
case RRDENG_OPCODE_EXTENT_WRITE: {
struct rrdengine_instance *ctx = cmd.ctx;
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 492666815..69e412354 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -160,9 +160,7 @@ struct jv2_page_info {
};
typedef enum __attribute__ ((__packed__)) {
- RRDENG_CHO_UNALIGNED = (1 << 0), // set when this metric is not page aligned according to page alignment
- RRDENG_FIRST_PAGE_ALLOCATED = (1 << 1), // set when this metric has allocated its first page
- RRDENG_1ST_METRIC_WRITER = (1 << 2),
+ RRDENG_1ST_METRIC_WRITER = (1 << 0),
} RRDENG_COLLECT_HANDLE_OPTIONS;
typedef enum __attribute__ ((__packed__)) {
@@ -183,12 +181,17 @@ typedef enum __attribute__ ((__packed__)) {
} RRDENG_COLLECT_PAGE_FLAGS;
struct rrdeng_collect_handle {
+ struct storage_collect_handle common; // has to be first item
+
+ RRDENG_COLLECT_PAGE_FLAGS page_flags;
+ RRDENG_COLLECT_HANDLE_OPTIONS options;
+ uint8_t type;
+
struct metric *metric;
struct pgc_page *page;
+ void *data;
+ size_t data_size;
struct pg_alignment *alignment;
- RRDENG_COLLECT_HANDLE_OPTIONS options;
- uint8_t type;
- RRDENG_COLLECT_PAGE_FLAGS page_flags;
uint32_t page_entries_max;
uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it
usec_t page_start_time_ut;
@@ -515,4 +518,8 @@ static inline time_t max_acceptable_collected_time(void) {
void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool update_retention, bool worker);
+static inline int journal_metric_uuid_compare(const void *key, const void *metric) {
+ return uuid_memcmp((uuid_t *)key, &(((struct journal_metric_list *) metric)->uuid));
+}
+
#endif /* NETDATA_RRDENGINE_H */
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 27497bbb8..ddc306ed7 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -35,16 +35,16 @@ __attribute__((constructor)) void initialize_multidb_ctx(void) {
multidb_ctx[4] = &multidb_ctx_storage_tier4;
}
-int default_rrdeng_page_fetch_timeout = 3;
-int default_rrdeng_page_fetch_retries = 3;
int db_engine_journal_check = 0;
int default_rrdeng_disk_quota_mb = 256;
int default_multidb_disk_quota_mb = 256;
#if defined(ENV32BIT)
int default_rrdeng_page_cache_mb = 16;
+int default_rrdeng_extent_cache_mb = 0;
#else
int default_rrdeng_page_cache_mb = 32;
+int default_rrdeng_extent_cache_mb = 0;
#endif
// ----------------------------------------------------------------------------
@@ -163,7 +163,7 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
}
#ifdef NETDATA_INTERNAL_CHECKS
- if(uuid_compare(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric)) != 0) {
+ if(uuid_memcmp(&rd->metric_uuid, mrg_metric_uuid(main_mrg, metric)) != 0) {
char uuid1[UUID_STR_LEN + 1];
char uuid2[UUID_STR_LEN + 1];
@@ -255,8 +255,11 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
struct rrdeng_collect_handle *handle;
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
+ handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE;
handle->metric = metric;
handle->page = NULL;
+ handle->data = NULL;
+ handle->data_size = 0;
handle->page_position = 0;
handle->page_entries_max = 0;
handle->update_every_ut = (usec_t)update_every * USEC_PER_SEC;
@@ -340,6 +343,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
handle->page_flags = 0;
handle->page_position = 0;
handle->page_entries_max = 0;
+ handle->data = NULL;
+ handle->data_size = 0;
// important!
// we should never zero page end time ut, because this will allow
@@ -348,6 +353,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
// handle->page_start_time_ut;
check_and_fix_mrg_update_every(handle);
+
+ timing_step(TIMING_STEP_DBENGINE_FLUSH_PAGE);
}
static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *handle,
@@ -365,7 +372,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
.end_time_s = point_in_time_s,
.size = data_size,
.data = data,
- .update_every_s = update_every_s,
+ .update_every_s = (uint32_t) update_every_s,
.hot = true
};
@@ -414,62 +421,57 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
handle->page_flags |= RRDENG_PAGE_CREATED_IN_FUTURE;
check_and_fix_mrg_update_every(handle);
+
+ timing_step(TIMING_STEP_DBENGINE_CREATE_NEW_PAGE);
}
-static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
- struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- size_t size;
+static size_t aligned_allocation_entries(size_t max_slots, size_t target_slot, time_t now_s) {
+ size_t slots = target_slot;
+ size_t pos = (now_s % max_slots);
- if(handle->options & RRDENG_FIRST_PAGE_ALLOCATED) {
- // any page except the first
- size = tier_page_size[ctx->config.tier];
- }
- else {
- size_t final_slots = 0;
+ if(pos > slots)
+ slots += max_slots - pos;
- // the first page
- handle->options |= RRDENG_FIRST_PAGE_ALLOCATED;
- size_t max_size = tier_page_size[ctx->config.tier];
- size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx);
+ else if(pos < slots)
+ slots -= pos;
- if(handle->alignment->initial_slots) {
- final_slots = handle->alignment->initial_slots;
- }
- else {
- max_slots -= 3;
+ else
+ slots = max_slots;
- size_t smaller_slot = indexing_partition((Word_t)handle->alignment, max_slots);
- final_slots = smaller_slot;
+ return slots;
+}
- time_t now_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
- size_t current_pos = (now_s % max_slots);
+static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- if(current_pos > final_slots)
- final_slots += max_slots - current_pos;
+ size_t max_size = tier_page_size[ctx->config.tier];
+ size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx);
- else if(current_pos < final_slots)
- final_slots -= current_pos;
+ size_t slots = aligned_allocation_entries(
+ max_slots,
+ indexing_partition((Word_t) handle->alignment, max_slots),
+ (time_t) (point_in_time_ut / USEC_PER_SEC)
+ );
- if(final_slots < 3) {
- final_slots += 3;
- smaller_slot += 3;
+ if(slots < max_slots / 3)
+ slots = max_slots / 3;
- if(smaller_slot >= max_slots)
- smaller_slot -= max_slots;
- }
+ if(slots < 3)
+ slots = 3;
- max_slots += 3;
- handle->alignment->initial_slots = smaller_slot + 3;
+ size_t size = slots * CTX_POINT_SIZE_BYTES(ctx);
- internal_fatal(handle->alignment->initial_slots < 3 || handle->alignment->initial_slots >= max_slots, "ooops! wrong distribution of metrics across time");
- internal_fatal(final_slots < 3 || final_slots >= max_slots, "ooops! wrong distribution of metrics across time");
- }
+ // internal_error(true, "PAGE ALLOC %zu bytes (%zu max)", size, max_size);
- size = final_slots * CTX_POINT_SIZE_BYTES(ctx);
- }
+ internal_fatal(slots < 3 || slots > max_slots, "ooops! wrong distribution of metrics across time");
+ internal_fatal(size > tier_page_size[ctx->config.tier] || size < CTX_POINT_SIZE_BYTES(ctx) * 2, "ooops! wrong page size");
*data_size = size;
- return dbengine_page_alloc(size);
+ void *d = dbengine_page_alloc(size);
+
+ timing_step(TIMING_STEP_DBENGINE_PAGE_ALLOC);
+
+ return d;
}
static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_handle,
@@ -484,75 +486,33 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- bool perfect_page_alignment = false;
- void *data;
- size_t data_size;
+ if(unlikely(!handle->data))
+ handle->data = rrdeng_alloc_new_metric_data(handle, &handle->data_size, point_in_time_ut);
- if(likely(handle->page)) {
- /* Make alignment decisions */
- if (handle->page_position == handle->alignment->page_position) {
- /* this is the leading dimension that defines chart alignment */
- perfect_page_alignment = true;
- }
-
- /* is the metric far enough out of alignment with the others? */
- if (unlikely(handle->page_position + 1 < handle->alignment->page_position))
- handle->options |= RRDENG_CHO_UNALIGNED;
+ timing_step(TIMING_STEP_DBENGINE_CHECK_DATA);
- if (unlikely((handle->options & RRDENG_CHO_UNALIGNED) &&
- /* did the other metrics change page? */
- handle->alignment->page_position <= 1)) {
- handle->options &= ~RRDENG_CHO_UNALIGNED;
- handle->page_flags |= RRDENG_PAGE_UNALIGNED;
- rrdeng_store_metric_flush_current_page(collection_handle);
-
- data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
- }
- else {
- data = pgc_page_data(handle->page);
- data_size = pgc_page_data_size(main_cache, handle->page);
- }
+ if(likely(ctx->config.page_type == PAGE_METRICS)) {
+ storage_number *tier0_metric_data = handle->data;
+ tier0_metric_data[handle->page_position] = pack_storage_number(n, flags);
+ }
+ else if(likely(ctx->config.page_type == PAGE_TIER)) {
+ storage_number_tier1_t *tier12_metric_data = handle->data;
+ storage_number_tier1_t number_tier1;
+ number_tier1.sum_value = (float) n;
+ number_tier1.min_value = (float) min_value;
+ number_tier1.max_value = (float) max_value;
+ number_tier1.anomaly_count = anomaly_count;
+ number_tier1.count = count;
+ tier12_metric_data[handle->page_position] = number_tier1;
}
else
- data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
-
- switch (ctx->config.page_type) {
- case PAGE_METRICS: {
- storage_number *tier0_metric_data = data;
- tier0_metric_data[handle->page_position] = pack_storage_number(n, flags);
- }
- break;
+ fatal("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type);
- case PAGE_TIER: {
- storage_number_tier1_t *tier12_metric_data = data;
- storage_number_tier1_t number_tier1;
- number_tier1.sum_value = (float)n;
- number_tier1.min_value = (float)min_value;
- number_tier1.max_value = (float)max_value;
- number_tier1.anomaly_count = anomaly_count;
- number_tier1.count = count;
- tier12_metric_data[handle->page_position] = number_tier1;
- }
- break;
-
- default: {
- static bool logged = false;
- if(!logged) {
- error("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type);
- logged = true;
- }
- }
- break;
- }
+ timing_step(TIMING_STEP_DBENGINE_PACK);
if(unlikely(!handle->page)){
- rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, data, data_size);
+ rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, handle->data, handle->data_size);
// handle->position is set to 1 already
-
- if (0 == handle->alignment->page_position) {
- /* this is the leading dimension that defines chart alignment */
- perfect_page_alignment = true;
- }
}
else {
// update an existing page
@@ -566,11 +526,12 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
}
}
- if (perfect_page_alignment)
- handle->alignment->page_position = handle->page_position;
+ timing_step(TIMING_STEP_DBENGINE_PAGE_FIN);
// update the metric information
mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, (time_t) (point_in_time_ut / USEC_PER_SEC));
+
+ timing_step(TIMING_STEP_DBENGINE_MRG_UPDATE);
}
static void store_metric_next_error_log(struct rrdeng_collect_handle *handle, usec_t point_in_time_ut, const char *msg) {
@@ -612,6 +573,8 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
const uint16_t anomaly_count,
const SN_FLAGS flags)
{
+ timing_step(TIMING_STEP_RRDSET_STORE_METRIC);
+
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
#ifdef NETDATA_INTERNAL_CHECKS
@@ -619,59 +582,62 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
handle->page_flags |= RRDENG_PAGE_FUTURE_POINT;
#endif
- if(likely(handle->page_end_time_ut + handle->update_every_ut == point_in_time_ut)) {
+ usec_t delta_ut = point_in_time_ut - handle->page_end_time_ut;
+
+ if(likely(delta_ut == handle->update_every_ut)) {
// happy path
;
}
+ else if(unlikely(point_in_time_ut > handle->page_end_time_ut)) {
+ if(handle->page) {
+ if (unlikely(delta_ut < handle->update_every_ut)) {
+ handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else if (unlikely(delta_ut % handle->update_every_ut)) {
+ handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else {
+ size_t points_gap = delta_ut / handle->update_every_ut;
+ size_t page_remaining_points = handle->page_entries_max - handle->page_position;
+
+ if (points_gap >= page_remaining_points) {
+ handle->page_flags |= RRDENG_PAGE_BIG_GAP;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else {
+ // loop to fill the gap
+ handle->page_flags |= RRDENG_PAGE_GAP;
+
+ usec_t stop_ut = point_in_time_ut - handle->update_every_ut;
+ for (usec_t this_ut = handle->page_end_time_ut + handle->update_every_ut;
+ this_ut <= stop_ut;
+ this_ut = handle->page_end_time_ut + handle->update_every_ut) {
+ rrdeng_store_metric_append_point(
+ collection_handle,
+ this_ut,
+ NAN, NAN, NAN,
+ 1, 0,
+ SN_EMPTY_SLOT);
+ }
+ }
+ }
+ }
+ }
else if(unlikely(point_in_time_ut < handle->page_end_time_ut)) {
handle->page_flags |= RRDENG_PAGE_PAST_COLLECTION;
store_metric_next_error_log(handle, point_in_time_ut, "is older than the");
return;
}
- else if(unlikely(point_in_time_ut == handle->page_end_time_ut)) {
+ else /* if(unlikely(point_in_time_ut == handle->page_end_time_ut)) */ {
handle->page_flags |= RRDENG_PAGE_REPEATED_COLLECTION;
store_metric_next_error_log(handle, point_in_time_ut, "is at the same time as the");
return;
}
- else if(handle->page) {
- usec_t delta_ut = point_in_time_ut - handle->page_end_time_ut;
-
- if(unlikely(delta_ut < handle->update_every_ut)) {
- handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
- rrdeng_store_metric_flush_current_page(collection_handle);
- }
- else if(unlikely(delta_ut % handle->update_every_ut)) {
- handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED;
- rrdeng_store_metric_flush_current_page(collection_handle);
- }
- else {
- size_t points_gap = delta_ut / handle->update_every_ut;
- size_t page_remaining_points = handle->page_entries_max - handle->page_position;
-
- if(points_gap >= page_remaining_points) {
- handle->page_flags |= RRDENG_PAGE_BIG_GAP;
- rrdeng_store_metric_flush_current_page(collection_handle);
- }
- else {
- // loop to fill the gap
- handle->page_flags |= RRDENG_PAGE_GAP;
-
- usec_t stop_ut = point_in_time_ut - handle->update_every_ut;
- for(usec_t this_ut = handle->page_end_time_ut + handle->update_every_ut;
- this_ut <= stop_ut ;
- this_ut = handle->page_end_time_ut + handle->update_every_ut) {
- rrdeng_store_metric_append_point(
- collection_handle,
- this_ut,
- NAN, NAN, NAN,
- 1, 0,
- SN_EMPTY_SLOT);
- }
- }
- }
- }
+ timing_step(TIMING_STEP_DBENGINE_FIRST_CHECK);
rrdeng_store_metric_append_point(collection_handle,
point_in_time_ut,
@@ -776,10 +742,10 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
handle = rrdeng_query_handle_get();
register_query_handle(handle);
- if(unlikely(priority < STORAGE_PRIORITY_HIGH))
+ if (unlikely(priority < STORAGE_PRIORITY_HIGH))
priority = STORAGE_PRIORITY_HIGH;
- else if(unlikely(priority > STORAGE_PRIORITY_BEST_EFFORT))
- priority = STORAGE_PRIORITY_BEST_EFFORT;
+ else if (unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE))
+ priority = STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1;
handle->ctx = ctx;
handle->metric = metric;
@@ -809,6 +775,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
rrddim_handle->start_time_s = handle->start_time_s;
rrddim_handle->end_time_s = handle->end_time_s;
rrddim_handle->priority = priority;
+ rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
pg_cache_preload(handle);
@@ -824,6 +791,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
rrddim_handle->start_time_s = handle->start_time_s;
rrddim_handle->end_time_s = 0;
rrddim_handle->priority = priority;
+ rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
}
}
@@ -906,6 +874,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
// We need to get a new page
if (!rrdeng_load_page_next(rrddim_handle, false)) {
+ handle->now_s = rrddim_handle->end_time_s;
storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
goto prepare_for_next_iteration;
}
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index feb79b977..514954af7 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -12,11 +12,8 @@
#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
-extern int db_engine_use_malloc;
-extern int default_rrdeng_page_fetch_timeout;
-extern int default_rrdeng_page_fetch_retries;
extern int default_rrdeng_page_cache_mb;
-extern int db_engine_journal_indexing;
+extern int default_rrdeng_extent_cache_mb;
extern int db_engine_journal_check;
extern int default_rrdeng_disk_quota_mb;
extern int default_multidb_disk_quota_mb;
@@ -27,9 +24,6 @@ extern size_t tier_page_size[];
#define CTX_POINT_SIZE_BYTES(ctx) page_type_size[(ctx)->config.page_type]
void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid);
-void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid,
- uuid_t *ret_uuid);
-
STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 7ec626c59..984a591e8 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -1,72 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-#define BUFSIZE (512)
-
-/* Caller must hold descriptor lock */
-//void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug)
-//{
-// if(log_debug && !(debug_flags & D_RRDENGINE))
-// return;
-//
-// BUFFER *wb = buffer_create(512);
-//
-// if(!descr) {
-// buffer_sprintf(wb, "DBENGINE: %s : descr is NULL", msg);
-// }
-// else {
-// struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
-// char uuid_str[UUID_STR_LEN];
-//
-// uuid_unparse_lower(*descr->id, uuid_str);
-// buffer_sprintf(wb, "DBENGINE: %s : page(%p) metric:%s, len:%"PRIu32", time:%"PRIu64"->%"PRIu64", update_every:%u, type:%u, xt_offset:",
-// msg,
-// pg_cache_descr->page, uuid_str,
-// descr->page_length,
-// (uint64_t)descr->start_time_ut,
-// (uint64_t)descr->end_time_ut,
-// (uint32_t)descr->update_every_s,
-// (uint32_t)descr->type
-// );
-// if (!descr->extent) {
-// buffer_strcat(wb, "N/A");
-// } else {
-// buffer_sprintf(wb, "%"PRIu64, descr->extent->offset);
-// }
-//
-// buffer_sprintf(wb, ", flags:0x%2.2lX refcnt:%u", pg_cache_descr->flags, pg_cache_descr->refcnt);
-// }
-//
-// if(log_debug)
-// debug(D_RRDENGINE, "%s", buffer_tostring(wb));
-// else
-// internal_error(true, "%s", buffer_tostring(wb));
-//
-// buffer_free(wb);
-//}
-//
-//void print_page_descr(struct rrdeng_page_descr *descr)
-//{
-// char uuid_str[UUID_STR_LEN];
-// char str[BUFSIZE + 1];
-// int pos = 0;
-//
-// uuid_unparse_lower(*descr->id, uuid_str);
-// pos += snprintfz(str, BUFSIZE - pos, "id=%s\n"
-// "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
-// uuid_str,
-// descr->page_length,
-// (uint64_t)descr->start_time_ut,
-// (uint64_t)descr->end_time_ut);
-// if (!descr->extent) {
-// pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
-// } else {
-// pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
-// }
-// snprintfz(str + pos, BUFSIZE - pos, "\n\n");
-// fputs(str, stderr);
-//}
-
int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
{
int ret;