From 7877a98bd9c00db5e81dd2f8c734cba2bab20be7 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 12 Aug 2022 09:26:17 +0200 Subject: Merging upstream version 1.36.0. Signed-off-by: Daniel Baumann --- database/engine/README.md | 227 +++++--- database/engine/datafile.c | 28 +- database/engine/journalfile.c | 30 +- database/engine/metadata_log/metalogpluginsd.c | 2 +- database/engine/pagecache.c | 207 ++++++- database/engine/pagecache.h | 12 +- database/engine/rrddiskprotocol.h | 3 +- database/engine/rrdengine.c | 79 ++- database/engine/rrdengine.h | 9 +- database/engine/rrdengineapi.c | 766 +++++++++++++------------ database/engine/rrdengineapi.h | 105 +++- 11 files changed, 956 insertions(+), 512 deletions(-) (limited to 'database/engine') diff --git a/database/engine/README.md b/database/engine/README.md index 7defcce9d..c67e400f4 100644 --- a/database/engine/README.md +++ b/database/engine/README.md @@ -6,75 +6,114 @@ custom_edit_url: https://github.com/netdata/netdata/edit/master/database/engine/ # Database engine -The Database Engine works like a traditional database. It dedicates a certain amount of RAM to data caching and -indexing, while the rest of the data resides compressed on disk. Unlike other [memory modes](/database/README.md), the -amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression +The Database Engine works like a traditional time series database. Unlike other [database modes](/database/README.md), +the amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression ratio, not a fixed number of metrics collected. -By using both RAM and disk space, the database engine allows for long-term storage of per-second metrics inside of the -Agent itself. +## Tiering -In addition, the database engine is the only memory mode that supports changing the data collection update frequency -(`update_every`) without losing the metrics your Agent already gathered and stored. +Tiering is a mechanism of providing multiple tiers of data with +different [granularity on metrics](/docs/store/distributed-data-architecture.md#granularity-of-metrics). -## Configuration +For Netdata Agents with version `netdata-1.35.0.138.nightly` and greater, `dbengine` supports Tiering, allowing almost +unlimited retention of data. -To use the database engine, open `netdata.conf` and set `memory mode` to `dbengine`. -```conf -[global] - memory mode = dbengine -``` +### Metric size -To configure the database engine, look for the `page cache size` and `dbengine multihost disk space` settings in the -`[global]` section of your `netdata.conf`. The Agent ignores the `history` setting when using the database engine. +Every Tier down samples the exact lower tier (lower tiers have greater resolution). You can have up to 5 +Tiers **[0. . 4]** of data (including the Tier 0, which has the highest resolution) -```conf -[global] - page cache size = 32 - dbengine multihost disk space = 256 -``` +Tier 0 is the default that was always available in `dbengine` mode. Tier 1 is the first level of aggregation, Tier 2 is +the second, and so on. -The above values are the default values for Page Cache size and DB engine disk space quota. Both numbers are -in **MiB**. +Metrics on all tiers except of the _Tier 0_ also store the following five additional values for every point for accurate +representation: -The `page cache size` option determines the amount of RAM in **MiB** dedicated to caching Netdata metric values. The -actual page cache size will be slightly larger than this figure—see the [memory requirements](#memory-requirements) -section for details. +1. The `sum` of the points aggregated +2. The `min` of the points aggregated +3. The `max` of the points aggregated +4. The `count` of the points aggregated (could be constant, but it may not be due to gaps in data collection) +5. The `anomaly_count` of the points aggregated (how many of the aggregated points found anomalous) -The `dbengine multihost disk space` option determines the amount of disk space in **MiB** that is dedicated to storing -Netdata metric values and all related metadata describing them. You can use the [**database engine -calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-ram-disk-space-needed-to-store-metrics) -to correctly set `dbengine multihost disk space` based on your metrics retention policy. The calculator gives an -accurate estimate based on how many child nodes you have, how many metrics your Agent collects, and more. +Among `min`, `max` and `sum`, the correct value is chosen based on the user query. `average` is calculated on the fly at +query time. -### Legacy configuration +### Tiering in a nutshell -The deprecated `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated to storing -Netdata metric values per legacy database engine instance (see [details on the legacy mode](#legacy-mode) below). +The `dbengine` is capable of retaining metrics for years. To further understand the `dbengine` tiering mechanism let's +explore the following configuration. -```conf -[global] - dbengine disk space = 256 ``` +[db] + mode = dbengine + + # per second data collection + update every = 1 + + # enables Tier 1 and Tier 2, Tier 0 is always enabled in dbengine mode + storage tiers = 3 + + # Tier 0, per second data for a week + dbengine multihost disk space MB = 1100 + + # Tier 1, per minute data for a month + dbengine tier 1 multihost disk space MB = 330 + + # Tier 2, per hour data for a year + dbengine tier 2 multihost disk space MB = 67 +``` + +For 2000 metrics, collected every second and retained for a week, Tier 0 needs: 1 byte x 2000 metrics x 3600 secs per +hour x 24 hours per day x 7 days per week = 1100MB. + +By setting `dbengine multihost disk space MB` to `1100`, this node will start maintaining about a week of data. But pay +attention to the number of metrics. If you have more than 2000 metrics on a node, or you need more that a week of high +resolution metrics, you may need to adjust this setting accordingly. + +Tier 1 is by default sampling the data every **60 points of Tier 0**. In our case, Tier 0 is per second, if we want to +transform this information in terms of time then the Tier 1 "resolution" is per minute. + +Tier 1 needs four times more storage per point compared to Tier 0. So, for 2000 metrics, with per minute resolution, +retained for a month, Tier 1 needs: 4 bytes x 2000 metrics x 60 minutes per hour x 24 hours per day x 30 days per month += 330MB. -### Streaming metrics to the database engine +Tier 2 is by default sampling data every 3600 points of Tier 0 (60 of Tier 1, which is the previous exact Tier). Again +in term of "time" (Tier 0 is per second), then Tier 2 is per hour. -When using the multihost database engine, all parent and child nodes share the same `page cache size` and `dbengine -multihost disk space` in a single dbengine instance. The [**database engine -calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-ram-disk-space-needed-to-store-metrics) -helps you properly set `page cache size` and `dbengine multihost disk space` on your parent node to allocate enough -resources based on your metrics retention policy and how many child nodes you have. +The storage requirements are the same to Tier 1. -#### Legacy mode +For 2000 metrics, with per hour resolution, retained for a year, Tier 2 needs: 4 bytes x 2000 metrics x 24 hours per day +x 365 days per year = 67MB. + +## Legacy configuration + +### v1.35.1 and prior + +These versions of the Agent do not support [Tiering](#Tiering). You could change the metric retention for the parent and +all of its children only with the `dbengine multihost disk space MB` setting. This setting accounts the space allocation +for the parent node and all of its children. + +To configure the database engine, look for the `page cache size MB` and `dbengine multihost disk space MB` settings in +the `[db]` section of your `netdata.conf`. + +```conf +[db] + dbengine page cache size MB = 32 + dbengine multihost disk space MB = 256 +``` + +### v1.23.2 and prior _For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five instances in total (`1 parent + 4 child nodes = 5 instances`). -The Agent allocates resources for each instance separately using the `dbengine disk space` (**deprecated**) setting. If -`dbengine disk space`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, which -means the total disk space required to store all instances is, roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`. +The Agent allocates resources for each instance separately using the `dbengine disk space MB` (**deprecated**) setting. +If +`dbengine disk space MB`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, +which means the total disk space required to store all instances is, +roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`. #### Backward compatibility @@ -88,51 +127,54 @@ Agent. ##### Information -For more information about setting `memory mode` on your nodes, in addition to other streaming configurations, see +For more information about setting `[db].mode` on your nodes, in addition to other streaming configurations, see [streaming](/streaming/README.md). -### Memory requirements +## Requirements & limitations -Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the +### Memory + +Using database mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the available memory. There are explicit memory requirements **per** DB engine **instance**: -- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what - the user configured with `page cache size`. +- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what + the user configured with `dbengine page cache size MB`. + -- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata. +- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata. - - roughly speaking this is 3% of the uncompressed disk space taken by the DB files. + - roughly speaking this is 3% of the uncompressed disk space taken by the DB files. - - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space - footprint. + - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space + footprint. An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space` options. -You can use our [database engine -calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-ram-disk-space-needed-to-store-metrics) +You can use +our [database engine calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-ram-disk-space-needed-to-store-metrics) to validate the memory requirements for your particular system(s) and configuration (**out-of-date**). -### Disk space requirements +### Disk space There are explicit disk space requirements **per** DB engine **instance**: -- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what - the user configured with `dbengine multihost disk space` or `dbengine disk space`. +- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what the + user configured with `dbengine multihost disk space` or `dbengine disk space`. -### File descriptor requirements +### File descriptor -The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or -parent server). When configuring your system you should make sure there are at least 50 file descriptors available per +The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or parent +server). When configuring your system you should make sure there are at least 50 file descriptors available per `dbengine` instance. Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use -`memory mode = dbengine`. +`[db].mode = dbengine`. If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to override the Netdata service by running `sudo systemctl edit netdata` and creating a file with contents: @@ -149,7 +191,7 @@ ulimit -n 65536 ``` at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing - `/etc/sysctl.conf`. For linux that would be: +`/etc/sysctl.conf`. For linux that would be: ```conf fs.file-max = 65536 @@ -166,8 +208,8 @@ You can apply the settings by running `sysctl -p` or by rebooting. ## Files -With the DB engine memory mode the metric data are stored in database files. These files are organized in pairs, the -datafiles and their corresponding journalfiles, e.g.: +With the DB engine mode the metric data are stored in database files. These files are organized in pairs, the datafiles +and their corresponding journalfiles, e.g.: ```sh datafile-1-0000000001.ndf @@ -192,15 +234,16 @@ storage at lower granularity. The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store consecutive values generated from the data collectors. Those pages comprise the **Page Cache**. -When those pages fill up they are slowly compressed and flushed to disk. It can take `4096 / 4 = 1024 seconds = 17 -minutes`, for a chart dimension that is being collected every 1 second, to fill a page. Pages can be cut short when we -stop Netdata or the DB engine instance so as to not lose the data. When we query the DB engine for data we trigger disk -read I/O requests that fill the Page Cache with the requested pages and potentially evict cold (not recently used) -pages. +When those pages fill up, they are slowly compressed and flushed to disk. It can +take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected every 1 second, to fill a +page. Pages can be cut short when we stop Netdata or the DB engine instance so as to not lose the data. When we query +the DB engine for data we trigger disk read I/O requests that fill the Page Cache with the requested pages and +potentially evict cold (not recently used) +pages. When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated -and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time. +and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time. The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O traffic so as to create the minimum possible interference with other applications. @@ -215,38 +258,38 @@ Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis of the time-series is emulated and accelerated so that the worker threads can generate as many data points as possible -without delays. +without delays. -We also defined 32 worker threads that perform queries on random metrics with semi-random time ranges. The -starting time of the query is randomly selected between the beginning of the time-series and the time of the latest data -point. The ending time is randomly selected between 1 second and 1 hour after the starting time. The pseudo-random -numbers are generated with a uniform distribution. +We also defined 32 worker threads that perform queries on random metrics with semi-random time ranges. The starting time +of the query is randomly selected between the beginning of the time-series and the time of the latest data point. The +ending time is randomly selected between 1 second and 1 hour after the starting time. The pseudo-random numbers are +generated with a uniform distribution. The data are written to the database at the same time as they are read from it. This is a concurrent read/write mixed -workload with a duration of 60 seconds. The faster `dbengine` runs, the bigger the dataset size becomes since more -data points will be generated. We set a page cache size of 64MiB for the two disk-bound scenarios. This way, the dataset -size of the metric data is much bigger than the RAM that is being used for caching so as to trigger I/O requests most -of the time. In our final scenario, we set the page cache size to 16 GiB. That way, the dataset fits in the page cache -so as to avoid all disk bottlenecks. +workload with a duration of 60 seconds. The faster `dbengine` runs, the bigger the dataset size becomes since more data +points will be generated. We set a page cache size of 64MiB for the two disk-bound scenarios. This way, the dataset size +of the metric data is much bigger than the RAM that is being used for caching so as to trigger I/O requests most of the +time. In our final scenario, we set the page cache size to 16 GiB. That way, the dataset fits in the page cache so as to +avoid all disk bottlenecks. The reported numbers are the following: | device | page cache | dataset | reads/sec | writes/sec | -| :----: | :--------: | ------: | --------: | ---------: | -| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M | -| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M | -| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M | +|:------:|:----------:|--------:|----------:|-----------:| +| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M | +| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M | +| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M | where "reads/sec" is the number of metric data points being read from the database via its API per second and -"writes/sec" is the number of metric data points being written to the database per second. +"writes/sec" is the number of metric data points being written to the database per second. Notice that the HDD numbers are pretty high and not much slower than the SSD numbers. This is thanks to the database engine design being optimized for rotating media. In the database engine disk I/O requests are: -- asynchronous to mask the high I/O latency of HDDs. -- mostly large to reduce the amount of HDD seeking time. -- mostly sequential to reduce the amount of HDD seeking time. -- compressed to reduce the amount of required throughput. +- asynchronous to mask the high I/O latency of HDDs. +- mostly large to reduce the amount of HDD seeking time. +- mostly sequential to reduce the amount of HDD seeking time. +- compressed to reduce the amount of required throughput. As a result, the HDD is not thousands of times slower than the SSD, which is typical for other workloads. diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 46d7a8f10..2ed98ef88 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -444,18 +444,44 @@ void finalize_data_files(struct rrdengine_instance *ctx) struct rrdengine_journalfile *journalfile; struct extent_info *extent, *next_extent; + size_t extents_number = 0; + size_t extents_bytes = 0; + size_t page_compressed_sizes = 0; + + size_t files_number = 0; + size_t files_bytes = 0; + for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { journalfile = datafile->journalfile; next_datafile = datafile->next; for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { + extents_number++; + extents_bytes += sizeof(*extent) + sizeof(struct rrdeng_page_descr *) * extent->number_of_pages; + page_compressed_sizes += extent->size; + next_extent = extent->next; freez(extent); } close_journal_file(journalfile, datafile); close_data_file(datafile); + + files_number++; + files_bytes += sizeof(*journalfile) + sizeof(*datafile); + freez(journalfile); freez(datafile); - } + + if(!files_number) files_number = 1; + if(!extents_number) extents_number = 1; + + info("DBENGINE STATISTICS ON DATAFILES:" + " Files %zu, structures %zu bytes, %0.2f bytes per file." + " Extents %zu, structures %zu bytes, %0.2f bytes per extent." + " Compressed size of all pages: %zu bytes." + , files_number, files_bytes, (double)files_bytes/files_number + , extents_number, extents_bytes, (double)extents_bytes/extents_number + , page_compressed_sizes + ); } diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 0b3d3eeb8..dc61f569d 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -275,6 +275,7 @@ static int check_journal_file_superblock(uv_file file) static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size) { + static BITMAP256 page_error_map; struct page_cache *pg_cache = &ctx->pg_cache; unsigned i, count, payload_length, descr_size, valid_pages; struct rrdeng_page_descr *descr; @@ -301,11 +302,31 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden uuid_t *temp_id; Pvoid_t *PValue; struct pg_cache_page_index *page_index = NULL; + uint8_t page_type = jf_metric_data->descr[i].type; - if (PAGE_METRICS != jf_metric_data->descr[i].type) { - error("Unknown page type encountered."); + if (page_type > PAGE_TYPE_MAX) { + if (!bitmap256_get_bit(&page_error_map, page_type)) { + error("Unknown page type %d encountered.", page_type); + bitmap256_set_bit(&page_error_map, page_type, 1); + } continue; } + uint64_t start_time = jf_metric_data->descr[i].start_time; + uint64_t end_time = jf_metric_data->descr[i].end_time; + + if (unlikely(start_time > end_time)) { + error("Invalid page encountered, start time %lu > end time %lu", start_time , end_time ); + continue; + } + + if (unlikely(start_time == end_time)) { + size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type]; + if (unlikely(entries > 1)) { + error("Invalid page encountered, start time %lu = end time but %zu entries were found", start_time, entries); + continue; + } + } + temp_id = (uuid_t *)jf_metric_data->descr[i].uuid; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); @@ -327,10 +348,11 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden descr = pg_cache_create_descr(); descr->page_length = jf_metric_data->descr[i].page_length; - descr->start_time = jf_metric_data->descr[i].start_time; - descr->end_time = jf_metric_data->descr[i].end_time; + descr->start_time = start_time; + descr->end_time = end_time; descr->id = &page_index->id; descr->extent = extent; + descr->type = page_type; extent->pages[valid_pages++] = descr; pg_cache_insert(ctx, page_index, descr); } diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c index 88c1453a9..a5301bc10 100755 --- a/database/engine/metadata_log/metalogpluginsd.c +++ b/database/engine/metadata_log/metalogpluginsd.c @@ -30,7 +30,7 @@ PARSER_RC metalog_pluginsd_host_action( } if (likely(!uuid_parse(machine_guid, state->host_uuid))) { - int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags); + int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags, 1); if (unlikely(rc)) { errno = 0; error("Failed to store host %s with UUID %s in the database", hostname, machine_guid); diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index cddbf9e1f..39f7642d0 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -3,6 +3,50 @@ #include "rrdengine.h" +ARAL page_descr_aral = { + .element_size = sizeof(struct rrdeng_page_descr), + .elements = 20000, + .filename = "page_descriptors", + .cache_dir = &netdata_configured_cache_dir, + .use_mmap = false, + .internal.initialized = false +}; + +void rrdeng_page_descr_aral_go_singlethreaded(void) { + page_descr_aral.internal.lockless = true; +} +void rrdeng_page_descr_aral_go_multithreaded(void) { + page_descr_aral.internal.lockless = false; +} + +struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void) { + struct rrdeng_page_descr *descr; + descr = arrayalloc_mallocz(&page_descr_aral); + return descr; +} + +void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr) { + arrayalloc_freez(&page_descr_aral, descr); +} + +void rrdeng_page_descr_use_malloc(void) { + if(page_descr_aral.internal.initialized) + error("DBENGINE: cannot change ARAL allocation policy after it has been initialized."); + else + page_descr_aral.use_mmap = false; +} + +void rrdeng_page_descr_use_mmap(void) { + if(page_descr_aral.internal.initialized) + error("DBENGINE: cannot change ARAL allocation policy after it has been initialized."); + else + page_descr_aral.use_mmap = true; +} + +bool rrdeng_page_descr_is_mmap(void) { + return page_descr_aral.use_mmap; +} + /* Forward declarations */ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx); @@ -81,7 +125,7 @@ struct rrdeng_page_descr *pg_cache_create_descr(void) { struct rrdeng_page_descr *descr; - descr = mallocz(sizeof(*descr)); + descr = rrdeng_page_descr_mallocz(); descr->page_length = 0; descr->start_time = INVALID_TIME; descr->end_time = INVALID_TIME; @@ -238,8 +282,7 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb */ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) { - /* it's twice the number of producers since we pin 2 pages per producer */ - return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers; + return ctx->max_cache_pages + (unsigned long)ctx->metric_API_max_producers; } /* @@ -248,8 +291,7 @@ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) */ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) { - /* it's twice the number of producers since we pin 2 pages per producer */ - return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers; + return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; } /* @@ -496,7 +538,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d (void)sleep_usec(1000); /* 1 msec */ } destroy: - freez(descr); + rrdeng_page_descr_freez(descr); pg_cache_update_metric_times(page_index); return can_delete_metric; @@ -1069,9 +1111,9 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index int retry_count = 0; while (1) { descr = find_first_page_in_time_range(page_index, start_time, end_time); - if (NULL == descr || 0 == descr->page_length || retry_count == MAX_PAGE_CACHE_RETRY_WAIT) { + if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) { /* non-empty page not found */ - if (retry_count == MAX_PAGE_CACHE_RETRY_WAIT) + if (retry_count == default_rrdeng_page_fetch_retries) error_report("Page cache timeout while waiting for page %p : returning FAIL", descr); uv_rwlock_rdunlock(&page_index->lock); @@ -1117,7 +1159,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index if (!(flags & RRD_PAGE_POPULATED)) page_not_in_cache = 1; - if (pg_cache_timedwait_event_unsafe(descr, 1) == UV_ETIMEDOUT) { + if (pg_cache_timedwait_event_unsafe(descr, default_rrdeng_page_fetch_timeout) == UV_ETIMEDOUT) { error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count); ++retry_count; } @@ -1196,24 +1238,66 @@ void init_page_cache(struct rrdengine_instance *ctx) init_committed_page_index(ctx); } + + +/* + * METRIC # number + * 1. INDEX: JudyHS # bytes + * 2. DATA: page_index # bytes + * + * PAGE (1 page of 1 metric) # number + * 1. INDEX AT METRIC: page_index->JudyL_array # bytes + * 2. DATA: descr # bytes + * + * PAGE CACHE (1 page of 1 metric at the cache) # number + * 1. pg_cache_descr (if PG_CACHE_DESCR_ALLOCATED) # bytes + * 2. data (if RRD_PAGE_POPULATED) # bytes + * + */ + + void free_page_cache(struct rrdengine_instance *ctx) { struct page_cache *pg_cache = &ctx->pg_cache; - Word_t ret_Judy, bytes_freed = 0; Pvoid_t *PValue; struct pg_cache_page_index *page_index, *prev_page_index; Word_t Index; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; + Word_t metrics_number = 0, + metrics_bytes = 0, + metrics_index_bytes = 0, + metrics_duration = 0; + + Word_t pages_number = 0, + pages_bytes = 0, + pages_index_bytes = 0; + + Word_t pages_size_per_type[256] = { 0 }, + pages_count_per_type[256] = { 0 }; + + Word_t cache_pages_number = 0, + cache_pages_bytes = 0, + cache_pages_data_bytes = 0; + + size_t points_in_db = 0, + uncompressed_points_size = 0, + seconds_in_db = 0, + single_point_pages = 0; + + Word_t pages_dirty_index_bytes = 0; + + usec_t oldest_time_ut = LONG_MAX, latest_time_ut = 0; + /* Free committed page index */ - ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); + pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array); - bytes_freed += ret_Judy; for (page_index = pg_cache->metrics_index.last_page_index ; page_index != NULL ; page_index = prev_page_index) { + prev_page_index = page_index->prev; /* Find first page in range */ @@ -1221,37 +1305,116 @@ void free_page_cache(struct rrdengine_instance *ctx) PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); descr = unlikely(NULL == PValue) ? NULL : *PValue; + size_t metric_duration = 0; + size_t metric_update_every = 0; + size_t metric_single_point_pages = 0; + while (descr != NULL) { /* Iterate all page descriptors of this metric */ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + cache_pages_number++; + /* Check rrdenglocking.c */ pg_cache_descr = descr->pg_cache_descr; if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { dbengine_page_free(pg_cache_descr->page); - bytes_freed += RRDENG_BLOCK_SIZE; + cache_pages_data_bytes += RRDENG_BLOCK_SIZE; } rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); - bytes_freed += sizeof(*pg_cache_descr); + cache_pages_bytes += sizeof(*pg_cache_descr); } - freez(descr); - bytes_freed += sizeof(*descr); + + if(descr->start_time < oldest_time_ut) + oldest_time_ut = descr->start_time; + + if(descr->end_time > latest_time_ut) + latest_time_ut = descr->end_time; + + pages_size_per_type[descr->type] += descr->page_length; + pages_count_per_type[descr->type]++; + + size_t points_in_page = (descr->page_length / PAGE_POINT_SIZE_BYTES(descr)); + size_t page_duration = ((descr->end_time - descr->start_time) / USEC_PER_SEC); + size_t update_every = (page_duration == 0) ? 1 : page_duration / (points_in_page - 1); + + if (!page_duration && metric_update_every) { + page_duration = metric_update_every; + update_every = metric_update_every; + } + else if(page_duration) + metric_update_every = update_every; + + uncompressed_points_size += descr->page_length; + + if(page_duration > 0) { + page_duration = update_every * points_in_page; + metric_duration += page_duration; + seconds_in_db += page_duration; + points_in_db += descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + } + else + metric_single_point_pages++; + + rrdeng_page_descr_freez(descr); + pages_bytes += sizeof(*descr); + pages_number++; PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); descr = unlikely(NULL == PValue) ? NULL : *PValue; } + if(metric_single_point_pages && metric_update_every) { + points_in_db += metric_single_point_pages; + seconds_in_db += metric_update_every * metric_single_point_pages; + metric_duration += metric_update_every * metric_single_point_pages; + } + else + single_point_pages += metric_single_point_pages; + /* Free page index */ - ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0); + pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0); fatal_assert(NULL == page_index->JudyL_array); - bytes_freed += ret_Judy; freez(page_index); - bytes_freed += sizeof(*page_index); + + metrics_number++; + metrics_bytes += sizeof(*page_index); + metrics_duration += metric_duration; } /* Free metrics index */ - ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); + metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); - bytes_freed += ret_Judy; - info("Freed %lu bytes of memory from page cache.", bytes_freed); + if(!metrics_number) metrics_number = 1; + if(!pages_number) pages_number = 1; + if(!cache_pages_number) cache_pages_number = 1; + if(!points_in_db) points_in_db = 1; + if(latest_time_ut == oldest_time_ut) oldest_time_ut -= USEC_PER_SEC; + + if(single_point_pages) { + long double avg_duration = (long double)seconds_in_db / points_in_db; + points_in_db += single_point_pages; + seconds_in_db += (size_t)(avg_duration * single_point_pages); + } + + info("DBENGINE STATISTICS ON METRICS:" + " Metrics: %lu (structures %lu bytes - per metric %0.2f, index (HS) %lu bytes - per metric %0.2f bytes - duration %zu secs) |" + " Page descriptors: %lu (structures %lu bytes - per page %0.2f bytes, index (L) %lu bytes - per page %0.2f, dirty index %lu bytes). |" + " Page cache: %lu pages (structures %lu bytes - per page %0.2f bytes, data %lu bytes). |" + " Points in db %zu, uncompressed size of points database %zu bytes. |" + " Duration of all points %zu seconds, average point duration %0.2f seconds." + " Duration of the database %llu seconds, average metric duration %0.2f seconds, average metric lifetime %0.2f%%." + , metrics_number, metrics_bytes, (double)metrics_bytes/metrics_number, metrics_index_bytes, (double)metrics_index_bytes/metrics_number, metrics_duration + , pages_number, pages_bytes, (double)pages_bytes/pages_number, pages_index_bytes, (double)pages_index_bytes/pages_number, pages_dirty_index_bytes + , cache_pages_number, cache_pages_bytes, (double)cache_pages_bytes/cache_pages_number, cache_pages_data_bytes + , points_in_db, uncompressed_points_size + , seconds_in_db, (double)seconds_in_db/points_in_db + , (latest_time_ut - oldest_time_ut) / USEC_PER_SEC, (double)metrics_duration/metrics_number + , (double)metrics_duration/metrics_number * 100.0 / ((latest_time_ut - oldest_time_ut) / USEC_PER_SEC) + ); + + for(int i = 0; i < 256 ;i++) { + if(pages_count_per_type[i]) + info("DBENGINE STATISTICS ON PAGE TYPES: page type %d total pages %lu, average page size %0.2f bytes", i, pages_count_per_type[i], (double)pages_size_per_type[i]/pages_count_per_type[i]); + } } diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 0ba4639ce..b938b9e05 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -11,7 +11,8 @@ struct extent_info; struct rrdeng_page_descr; #define INVALID_TIME (0) -#define MAX_PAGE_CACHE_RETRY_WAIT (3) +#define MAX_PAGE_CACHE_FETCH_RETRIES (3) +#define PAGE_CACHE_FETCH_WAIT_TIMEOUT (3) /* Page flags */ #define RRD_PAGE_DIRTY (1LU << 0) @@ -62,6 +63,7 @@ struct rrdeng_page_descr { usec_t start_time; usec_t end_time; uint32_t page_length; + uint8_t type; }; #define PAGE_INFO_SCRATCH_SZ (8) @@ -193,6 +195,14 @@ extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx); extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx); extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx); +extern void rrdeng_page_descr_aral_go_singlethreaded(void); +extern void rrdeng_page_descr_aral_go_multithreaded(void); +extern void rrdeng_page_descr_use_malloc(void); +extern void rrdeng_page_descr_use_mmap(void); +extern bool rrdeng_page_descr_is_mmap(void); +extern struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void); +extern void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr); + static inline void pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp) { diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h index db47af531..cb57385a4 100644 --- a/database/engine/rrddiskprotocol.h +++ b/database/engine/rrddiskprotocol.h @@ -35,7 +35,8 @@ struct rrdeng_df_sb { * Page types */ #define PAGE_METRICS (0) -#define PAGE_LOGS (1) /* reserved */ +#define PAGE_TIER (1) +#define PAGE_TYPE_MAX 1 // Maximum page type (inclusive) /* * Data file page descriptor diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 9f43f4456..8b35051d8 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -9,20 +9,28 @@ rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; rrdeng_stats_t global_flushing_pressure_page_deletions = 0; -static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT; +unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; #if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2) #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) #endif void *dbengine_page_alloc() { - void *page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); - if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); + void *page = NULL; + if (unlikely(db_engine_use_malloc)) + page = mallocz(RRDENG_BLOCK_SIZE); + else { + page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); + if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); + } return page; } void dbengine_page_free(void *page) { - munmap(page, RRDENG_BLOCK_SIZE); + if (unlikely(db_engine_use_malloc)) + freez(page); + else + munmap(page, RRDENG_BLOCK_SIZE); } static void sanity_check(void) @@ -227,6 +235,43 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str freez(xt_io_descr); } +static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) { + switch(type) { + case PAGE_METRICS: { + storage_number n = pack_storage_number(NAN, SN_FLAG_NONE); + storage_number *array = (storage_number *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + case PAGE_TIER: { + storage_number_tier1_t n = { + .min_value = NAN, + .max_value = NAN, + .sum_value = NAN, + .count = 1, + .anomaly_count = 0, + }; + storage_number_tier1_t *array = (storage_number_tier1_t *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type); + logged = true; + } + memset(page, 0, page_length); + } + } +} + void read_extent_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -351,8 +396,7 @@ after_crc_check: /* care, we don't hold the descriptor mutex */ if (have_read_error) { - /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */ - memset(page, SN_EMPTY_SLOT, descr->page_length); + fill_page_with_nulls(page, descr->page_length, descr->type); } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); } else { @@ -697,7 +741,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue ; - descr != NULL && count != pages_per_extent ; + descr != NULL && count != rrdeng_pages_per_extent; PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { @@ -773,7 +817,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; descr = xt_io_descr->descr_array[i]; - header->descr[i].type = PAGE_METRICS; + header->descr[i].type = descr->type; uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; header->descr[i].start_time = descr->start_time; @@ -879,6 +923,7 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc) wc->cleanup_thread_deleting_files = 0; aclk_data_rotated(); + rrdcontext_db_rotation(); /* interrupt event loop */ uv_stop(wc->loop); @@ -1066,16 +1111,12 @@ struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) static void load_configuration_dynamic(void) { - unsigned read_num; - static int printed_error = 0; - - read_num = (unsigned) config_get_number(CONFIG_SECTION_GLOBAL, "dbengine extent pages", - MAX_PAGES_PER_EXTENT); - if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) { - pages_per_extent = read_num; - } else if (!printed_error) { - printed_error = 1; - error("Invalid dbengine extent pages %u given. Defaulting to %u.", read_num, pages_per_extent); + unsigned read_num = (unsigned)config_get_number(CONFIG_SECTION_DB, "dbengine pages per extent", MAX_PAGES_PER_EXTENT); + if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) + rrdeng_pages_per_extent = read_num; + else { + error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent); + config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent); } } @@ -1335,7 +1376,7 @@ void rrdengine_main(void) struct rrdengine_instance *ctx; sanity_check(); - ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); + ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB, 0); if (ret) { exit(ret); } diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index c6f89a37a..4b383b622 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -26,6 +26,8 @@ #endif /* NETDATA_RRD_INTERNALS */ +extern unsigned rrdeng_pages_per_extent; + /* Forward declarations */ struct rrdengine_instance; @@ -35,7 +37,8 @@ struct rrdengine_instance; #define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" struct rrdeng_collect_handle { - struct rrdeng_page_descr *descr, *prev_descr; + struct rrdeng_metric_handle *metric_handle; + struct rrdeng_page_descr *descr; unsigned long page_correlation_id; struct rrdengine_instance *ctx; // set to 1 when this dimension is not page aligned with the other dimensions in the chart @@ -43,6 +46,7 @@ struct rrdeng_collect_handle { }; struct rrdeng_query_handle { + struct rrdeng_metric_handle *metric_handle; struct rrdeng_page_descr *descr; struct rrdengine_instance *ctx; struct pg_cache_page_index *page_index; @@ -50,6 +54,7 @@ struct rrdeng_query_handle { time_t now; unsigned position; unsigned entries; + TIER_QUERY_FETCH tier_query_fetch_type; storage_number *page; usec_t page_end_time; uint32_t page_length; @@ -239,12 +244,14 @@ struct rrdengine_instance { char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */ uint64_t disk_space; uint64_t max_disk_space; + int tier; unsigned last_fileno; /* newest index of datafile and journalfile */ unsigned long max_cache_pages; unsigned long cache_pages_low_watermark; unsigned long metric_API_max_producers; uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */ + uint8_t page_type; /* Default page type for this context */ struct rrdengine_statistics stats; }; diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 76010a7c2..f4da29407 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -2,17 +2,43 @@ #include "rrdengine.h" /* Default global database instance */ -struct rrdengine_instance multidb_ctx; +struct rrdengine_instance multidb_ctx_storage_tier0; +struct rrdengine_instance multidb_ctx_storage_tier1; +struct rrdengine_instance multidb_ctx_storage_tier2; +struct rrdengine_instance multidb_ctx_storage_tier3; +struct rrdengine_instance multidb_ctx_storage_tier4; +#if RRD_STORAGE_TIERS != 5 +#error RRD_STORAGE_TIERS is not 5 - you need to add allocations here +#endif +struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS]; +uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER}; + +#if PAGE_TYPE_MAX != 1 +#error PAGE_TYPE_MAX is not 1 - you need to add allocations here +#endif +size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)}; + +__attribute__((constructor)) void initialize_multidb_ctx(void) { + multidb_ctx[0] = &multidb_ctx_storage_tier0; + multidb_ctx[1] = &multidb_ctx_storage_tier1; + multidb_ctx[2] = &multidb_ctx_storage_tier2; + multidb_ctx[3] = &multidb_ctx_storage_tier3; + multidb_ctx[4] = &multidb_ctx_storage_tier4; +} +int db_engine_use_malloc = 0; +int default_rrdeng_page_fetch_timeout = 3; +int default_rrdeng_page_fetch_retries = 3; int default_rrdeng_page_cache_mb = 32; int default_rrdeng_disk_quota_mb = 256; int default_multidb_disk_quota_mb = 256; /* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; -static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host) -{ - return host->rrdeng_ctx; +static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host, int tier) { + if(tier < 0 || tier >= RRD_STORAGE_TIERS) tier = 0; + if(!host->storage_instance[tier]) tier = 0; + return (struct rrdengine_instance *)host->storage_instance[tier]; } /* This UUID is not unique across hosts */ @@ -49,10 +75,20 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu memcpy(ret_uuid, hash_value, sizeof(uuid_t)); } -void rrdeng_metric_init(RRDDIM *rd) -{ - struct page_cache *pg_cache; +struct rrdeng_metric_handle { + RRDDIM *rd; struct rrdengine_instance *ctx; + uuid_t *rrdeng_uuid; // database engine metric UUID + struct pg_cache_page_index *page_index; +}; + +void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle) { + freez(db_metric_handle); +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct page_cache *pg_cache; uuid_t legacy_uuid; uuid_t multihost_legacy_uuid; Pvoid_t *PValue; @@ -60,15 +96,10 @@ void rrdeng_metric_init(RRDDIM *rd) int is_multihost_child = 0; RRDHOST *host = rd->rrdset->rrdhost; - ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); - if (unlikely(!ctx)) { - error("Failed to fetch multidb context"); - return; - } pg_cache = &ctx->pg_cache; rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid); - if (host != localhost && host->rrdeng_ctx == &multidb_ctx) + if (host != localhost && is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) is_multihost_child = 1; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); @@ -82,16 +113,16 @@ void rrdeng_metric_init(RRDDIM *rd) * Drop legacy support, normal path */ uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->state->metric_uuid, sizeof(uuid_t)); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t)); if (likely(NULL != PValue)) { page_index = *PValue; } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); if (NULL == PValue) { uv_rwlock_wrlock(&pg_cache->metrics_index.lock); - PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->state->metric_uuid, sizeof(uuid_t), PJE0); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0); fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(&rd->state->metric_uuid); + *PValue = page_index = create_page_index(&rd->metric_uuid); page_index->prev = pg_cache->metrics_index.last_page_index; pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); @@ -102,84 +133,98 @@ void rrdeng_metric_init(RRDDIM *rd) rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid, &multihost_legacy_uuid); - int need_to_store = uuid_compare(rd->state->metric_uuid, multihost_legacy_uuid); + int need_to_store = uuid_compare(rd->metric_uuid, multihost_legacy_uuid); - uuid_copy(rd->state->metric_uuid, multihost_legacy_uuid); + uuid_copy(rd->metric_uuid, multihost_legacy_uuid); - if (unlikely(need_to_store)) - (void)sql_store_dimension(&rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, + if (unlikely(need_to_store && !ctx->tier)) + (void)sql_store_dimension(&rd->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, rd->algorithm); - } - rd->state->rrdeng_uuid = &page_index->id; - rd->state->page_index = page_index; + + struct rrdeng_metric_handle *mh = mallocz(sizeof(struct rrdeng_metric_handle)); + mh->rd = rd; + mh->ctx = ctx; + mh->rrdeng_uuid = &page_index->id; + mh->page_index = page_index; + return (STORAGE_METRIC_HANDLE *)mh; } /* * Gets a handle for storing metrics to the database. * The handle must be released with rrdeng_store_metric_final(). */ -void rrdeng_store_metric_init(RRDDIM *rd) -{ +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct rrdeng_collect_handle *handle; - struct rrdengine_instance *ctx; struct pg_cache_page_index *page_index; - ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); - handle = callocz(1, sizeof(struct rrdeng_collect_handle)); - handle->ctx = ctx; + handle->metric_handle = metric_handle; + handle->ctx = metric_handle->ctx; handle->descr = NULL; - handle->prev_descr = NULL; handle->unaligned_page = 0; - rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle; - page_index = rd->state->page_index; + page_index = metric_handle->page_index; uv_rwlock_wrlock(&page_index->lock); ++page_index->writers; uv_rwlock_wrunlock(&page_index->lock); + + return (STORAGE_COLLECT_HANDLE *)handle; } /* The page must be populated and referenced */ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) { - unsigned i; - uint8_t has_only_empty_metrics = 1; - storage_number *page; + switch(descr->type) { + case PAGE_METRICS: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number *array = (storage_number *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(does_storage_number_exist(array[i])) + return 0; + } + } + break; + + case PAGE_TIER: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(fpclassify(array[i].sum_value) != FP_NAN) + return 0; + } + } + break; - page = descr->pg_cache_descr->page; - for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) { - if (SN_EMPTY_SLOT != page[i]) { - has_only_empty_metrics = 0; - break; + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type); + logged = true; + } + return 0; } } - return has_only_empty_metrics; + + return 1; } -void rrdeng_store_metric_flush_current_page(RRDDIM *rd) -{ - struct rrdeng_collect_handle *handle; - struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; + + if (unlikely(!ctx)) return; + if (unlikely(!descr)) return; - handle = (struct rrdeng_collect_handle *)rd->state->handle; - ctx = handle->ctx; - if (unlikely(!ctx)) - return; - descr = handle->descr; - if (unlikely(NULL == descr)) { - return; - } if (likely(descr->page_length)) { int page_is_empty; rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(ctx, handle->prev_descr); - } page_is_empty = page_has_only_empty_metrics(descr); if (page_is_empty) { debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); @@ -187,41 +232,34 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) print_page_cache_descr(descr); pg_cache_put(ctx, descr); pg_cache_punch_hole(ctx, descr, 1, 0, NULL); - handle->prev_descr = NULL; - } else { - /* - * Disable pinning for now as it leads to deadlocks. When a collector stops collecting the extra pinned page - * eventually gets rotated but it cannot be destroyed due to the extra reference. - */ - /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ -/* rrdeng_page_descr_mutex_lock(ctx, descr); - ret = pg_cache_try_get_unsafe(descr, 0); - rrdeng_page_descr_mutex_unlock(ctx, descr); - fatal_assert(1 == ret);*/ - + } else rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - /* handle->prev_descr = descr;*/ - } } else { dbengine_page_free(descr->pg_cache_descr->page); rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); - freez(descr); + rrdeng_page_descr_freez(descr); } handle->descr = NULL; } -void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags) { - struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle; - struct rrdengine_instance *ctx; - struct page_cache *pg_cache; - struct rrdeng_page_descr *descr; - storage_number *page; - uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; + struct rrdengine_instance *ctx = handle->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = handle->descr; + RRDDIM *rd = metric_handle->rd; - ctx = handle->ctx; - pg_cache = &ctx->pg_cache; - descr = handle->descr; + void *page; + uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; if (descr) { /* Make alignment decisions */ @@ -231,7 +269,7 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n perfect_page_alignment = 1; } /* is the metric far enough out of alignment with the others? */ - if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) { + if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < rd->rrdset->rrddim_page_alignment)) { handle->unaligned_page = 1; debug(D_RRDENGINE, "Metric page is not aligned with chart:"); if (unlikely(debug_flags & D_RRDENGINE)) @@ -239,18 +277,18 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n } if (unlikely(handle->unaligned_page && /* did the other metrics change page? */ - rd->rrdset->rrddim_page_alignment <= sizeof(number))) { + rd->rrdset->rrddim_page_alignment <= PAGE_POINT_SIZE_BYTES(descr))) { debug(D_RRDENGINE, "Flushing unaligned metric page."); must_flush_unaligned_page = 1; handle->unaligned_page = 0; } } if (unlikely(NULL == descr || - descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE || + descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE || must_flush_unaligned_page)) { - rrdeng_store_metric_flush_current_page(rd); + rrdeng_store_metric_flush_current_page(collection_handle); - page = rrdeng_create_page(ctx, &rd->state->page_index->id, &descr); + page = rrdeng_create_page(ctx, &metric_handle->page_index->id, &descr); fatal_assert(page); handle->descr = descr; @@ -262,9 +300,37 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n perfect_page_alignment = 1; } } + page = descr->pg_cache_descr->page; - page[descr->page_length / sizeof(number)] = number; - pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number)); + + switch (descr->type) { + case PAGE_METRICS: { + ((storage_number *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = pack_storage_number(n, flags); + } + break; + + case PAGE_TIER: { + storage_number_tier1_t number_tier1; + number_tier1.sum_value = (float)n; + number_tier1.min_value = (float)min_value; + number_tier1.max_value = (float)max_value; + number_tier1.anomaly_count = anomaly_count; + number_tier1.count = count; + ((storage_number_tier1_t *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = number_tier1; + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot store metric on unknown page type id %d", descr->type); + logged = true; + } + } + break; + } + + pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); if (perfect_page_alignment) rd->rrdset->rrddim_page_alignment = descr->page_length; @@ -284,9 +350,9 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n } } - pg_cache_insert(ctx, rd->state->page_index, descr); + pg_cache_insert(ctx, metric_handle->page_index, descr); } else { - pg_cache_add_new_metric_time(rd->state->page_index, descr); + pg_cache_add_new_metric_time(metric_handle->page_index, descr); } } @@ -294,21 +360,14 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n * Releases the database reference from the handle for storing metrics. * Returns 1 if it's safe to delete the dimension. */ -int rrdeng_store_metric_finalize(RRDDIM *rd) -{ - struct rrdeng_collect_handle *handle; - struct rrdengine_instance *ctx; - struct pg_cache_page_index *page_index; +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; + struct pg_cache_page_index *page_index = metric_handle->page_index; + uint8_t can_delete_metric = 0; - handle = (struct rrdeng_collect_handle *)rd->state->handle; - ctx = handle->ctx; - page_index = rd->state->page_index; - rrdeng_store_metric_flush_current_page(rd); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(ctx, handle->prev_descr); - } + rrdeng_store_metric_flush_current_page(collection_handle); uv_rwlock_wrlock(&page_index->lock); if (!--page_index->writers && !page_index->page_count) { can_delete_metric = 1; @@ -316,241 +375,55 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) uv_rwlock_wrunlock(&page_index->lock); freez(handle); - return can_delete_metric; -} - -/* Returns 1 if the data collection interval is well defined, 0 otherwise */ -static int metrics_with_known_interval(struct rrdeng_page_descr *descr) -{ - unsigned page_entries; - - if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time)) - return 0; - page_entries = descr->page_length / sizeof(storage_number); - if (likely(page_entries > 1)) { - return 1; - } - return 0; -} - -static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) -{ - return (uint32_t *)&page_info->scratch[0]; -} - -static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) -{ - return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; -} - -/** - * Calculates the regions of different data collection intervals in a netdata chart in the time range - * [start_time,end_time]. This call takes the netdata chart read lock. - * @param st the netdata chart whose data collection interval boundaries are calculated. - * @param start_time inclusive starting time in usec - * @param end_time inclusive ending time in usec - * @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a - * reference dimension that that have different data collection intervals and overlap with the time range - * [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set - * to NULL nothing was allocated. - * @param max_intervalp is dereferenced and set to be the largest data collection interval of all regions. - * @return number of regions with different data collection intervals. - */ -unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, - struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list) -{ - struct pg_cache_page_index *page_index; - struct rrdengine_instance *ctx; - unsigned pages_nr; - RRDDIM *rd_iter, *rd; - struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev; - unsigned i, j, page_entries, region_points, page_points, regions, max_interval; - time_t now; - usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page; - struct rrdeng_region_info *region_info_array; - uint8_t is_first_region_initialized; - - ctx = get_rrdeng_ctx_from_host(st->rrdhost); - regions = 1; - *max_intervalp = max_interval = 0; - region_info_array = NULL; - *region_info_arrayp = NULL; - page_info_array = NULL; - - RRDDIM *temp_rd = context_param_list ? context_param_list->rd : NULL; - rrdset_rdlock(st); - for(rd_iter = temp_rd?temp_rd:st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { - /* - * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions - * but it is a best effort approximation with a bias towards older metrics in a chart. It - * matches netdata behaviour in the sense that dimensions are generally aligned in a chart - * and older dimensions contain more information about the time range. It does not work well - * for metrics that have recently stopped being collected. - */ - curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid, - start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); - if (INVALID_TIME != curr_time && curr_time < min_time) { - rd = rd_iter; - min_time = curr_time; - } - } - rrdset_unlock(st); - if (NULL == rd) { - return 1; - } - pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, - &page_info_array, &page_index); - if (pages_nr) { - /* conservative allocation, will reduce the size later if necessary */ - region_info_array = mallocz(sizeof(*region_info_array) * pages_nr); - } - is_first_region_initialized = 0; - region_points = 0; - - int is_out_of_order_reported = 0; - /* pages loop */ - for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) { - old_prev = prev; - prev = curr; - curr = &page_info_array[i]; - *pginfo_to_points(curr) = 0; /* initialize to invalid page */ - *pginfo_to_dt(curr) = 0; /* no known data collection interval yet */ - if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time || - curr->end_time < curr->start_time)) { - info("Ignoring page with invalid timestamps."); - prev = old_prev; - continue; - } - page_entries = curr->page_length / sizeof(storage_number); - fatal_assert(0 != page_entries); - if (likely(1 != page_entries)) { - dt = (curr->end_time - curr->start_time) / (page_entries - 1); - *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt); - if (unlikely(0 == *pginfo_to_dt(curr))) - *pginfo_to_dt(curr) = 1; - } else { - dt = 0; - } - for (j = 0, page_points = 0 ; j < page_entries ; ++j) { - uint8_t is_metric_out_of_order, is_metric_earlier_than_range; - - is_metric_earlier_than_range = 0; - is_metric_out_of_order = 0; - - current_position_time = curr->start_time + j * dt; - now = current_position_time / USEC_PER_SEC; - if (now > end_time) { /* there will be no more pages in the time range */ - break; - } - if (now < start_time) - is_metric_earlier_than_range = 1; - if (unlikely(current_position_time < max_time)) /* just went back in time */ - is_metric_out_of_order = 1; - if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) { - if (unlikely(is_metric_out_of_order)) - is_out_of_order_reported++; - continue; /* next entry */ - } - /* here is a valid metric */ - ++page_points; - region_info_array[regions - 1].points = ++region_points; - max_time = current_position_time; - if (1 == page_points) - first_valid_time_in_page = current_position_time; - if (unlikely(!is_first_region_initialized)) { - fatal_assert(1 == regions); - /* this is the first region */ - region_info_array[0].start_time = current_position_time; - is_first_region_initialized = 1; - } - } - *pginfo_to_points(curr) = page_points; - if (0 == page_points) { - prev = old_prev; - continue; - } - - if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */ - fatal_assert(1 == page_points); - - if (likely(NULL != prev)) { /* get interval from previous page */ - *pginfo_to_dt(curr) = *pginfo_to_dt(prev); - } else { /* there is no previous page in the query */ - struct rrdeng_page_info db_page_info; - - /* go to database */ - pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time, - metrics_with_known_interval, &db_page_info); - if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME || - 0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */ - *pginfo_to_dt(curr) = rd->update_every; - } else { - unsigned db_entries; - usec_t db_dt; - - db_entries = db_page_info.page_length / sizeof(storage_number); - db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1); - *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt); - if (unlikely(0 == *pginfo_to_dt(curr))) - *pginfo_to_dt(curr) = 1; - - } - } - } - if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) { - info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32, - *pginfo_to_dt(prev), *pginfo_to_dt(curr)); - region_info_array[regions++ - 1].points -= page_points; - region_info_array[regions - 1].points = region_points = page_points; - region_info_array[regions - 1].start_time = first_valid_time_in_page; - } - if (*pginfo_to_dt(curr) > max_interval) - max_interval = *pginfo_to_dt(curr); - region_info_array[regions - 1].update_every = *pginfo_to_dt(curr); - } - if (page_info_array) - freez(page_info_array); - if (region_info_array) { - if (likely(is_first_region_initialized)) { - /* free unnecessary memory */ - region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions); - *region_info_arrayp = region_info_array; - *max_intervalp = max_interval; - } else { - /* empty result */ - freez(region_info_array); - } - } - if (is_out_of_order_reported) - info("Ignored %d metrics with out of order timestamp in %u regions.", is_out_of_order_reported, regions); - return regions; + return can_delete_metric; } +//static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) +//{ +// return (uint32_t *)&page_info->scratch[0]; +//} +// +//static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) +//{ +// return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; +//} +// /* * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). */ -void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time) +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct rrdengine_instance *ctx = metric_handle->ctx; + RRDDIM *rd = metric_handle->rd; + + // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time); + struct rrdeng_query_handle *handle; - struct rrdengine_instance *ctx; unsigned pages_nr; - ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; handle = callocz(1, sizeof(struct rrdeng_query_handle)); handle->next_page_time = start_time; handle->now = start_time; + handle->tier_query_fetch_type = tier_query_fetch_type; + // TODO we should store the dt of each page in each page + // this will produce wrong values for dt in case the user changes + // the update every of the charts or the tier grouping iterations + handle->dt_sec = get_tier_grouping(ctx->tier) * (time_t)rd->update_every; + handle->dt = handle->dt_sec * USEC_PER_SEC; handle->position = 0; handle->ctx = ctx; + handle->metric_handle = metric_handle; handle->descr = NULL; rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; - pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + pages_nr = pg_cache_preload(ctx, metric_handle->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) - /* there are no metrics to load */ + // there are no metrics to load handle->next_page_time = INVALID_TIME; } @@ -595,7 +468,7 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { // we're in the middle of the page somewhere - unsigned entries = page_length / sizeof(storage_number); + unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr); position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / (page_end_time - descr->start_time); } @@ -605,53 +478,101 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { handle->page_end_time = page_end_time; handle->page_length = page_length; handle->page = descr->pg_cache_descr->page; - usec_t entries = handle->entries = page_length / sizeof(storage_number); + usec_t entries = handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr); if (likely(entries > 1)) handle->dt = (page_end_time - descr->start_time) / (entries - 1); - else - handle->dt = 0; + else { + // TODO we should store the dt of each page in each page + // now we keep the dt of whatever was before + ; + } - handle->dt_sec = handle->dt / USEC_PER_SEC; + handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC); handle->position = position; return 0; } -/* Returns the metric and sets its timestamp into current_time */ -storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { +// Returns the metric and sets its timestamp into current_time +// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) +// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES +STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + // struct rrdeng_metric_handle *metric_handle = handle->metric_handle; - if (unlikely(INVALID_TIME == handle->next_page_time)) - return SN_EMPTY_SLOT; - + STORAGE_POINT sp; struct rrdeng_page_descr *descr = handle->descr; unsigned position = handle->position + 1; time_t now = handle->now + handle->dt_sec; + storage_number_tier1_t tier1_value; + + if (unlikely(INVALID_TIME == handle->next_page_time)) { + handle->next_page_time = INVALID_TIME; + handle->now = now; + storage_point_empty(sp, now - handle->dt_sec, now); + return sp; + } if (unlikely(!descr || position >= handle->entries)) { // We need to get a new page if(rrdeng_load_page_next(rrdimm_handle)) { // next calls will not load any more metrics handle->next_page_time = INVALID_TIME; - return SN_EMPTY_SLOT; + handle->now = now; + storage_point_empty(sp, now - handle->dt_sec, now); + return sp; } descr = handle->descr; position = handle->position; - now = (descr->start_time + position * handle->dt) / USEC_PER_SEC; + now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC); } - storage_number ret = handle->page[position]; + sp.start_time = now - handle->dt_sec; + sp.end_time = now; + handle->position = position; handle->now = now; + switch(descr->type) { + case PAGE_METRICS: { + storage_number n = handle->page[position]; + sp.min = sp.max = sp.sum = unpack_storage_number(n); + sp.flags = n & SN_USER_FLAGS; + sp.count = 1; + sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; + } + break; + + case PAGE_TIER: { + tier1_value = ((storage_number_tier1_t *)handle->page)[position]; + sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS; + sp.count = tier1_value.count; + sp.anomaly_count = tier1_value.anomaly_count; + sp.min = tier1_value.min_value; + sp.max = tier1_value.max_value; + sp.sum = tier1_value.sum_value; + } + break; + + // we don't know this page type + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", descr->type); + logged = true; + } + storage_point_empty(sp, sp.start_time, sp.end_time); + } + break; + } + if (unlikely(now >= rrdimm_handle->end_time)) { // next calls will not load any more metrics handle->next_page_time = INVALID_TIME; } - *current_time = now; - return ret; + return sp; } int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) @@ -681,31 +602,27 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) rrdimm_handle->handle = NULL; } -time_t rrdeng_metric_latest_time(RRDDIM *rd) -{ - struct pg_cache_page_index *page_index; - - page_index = rd->state->page_index; +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct pg_cache_page_index *page_index = metric_handle->page_index; return page_index->latest_time / USEC_PER_SEC; } -time_t rrdeng_metric_oldest_time(RRDDIM *rd) -{ - struct pg_cache_page_index *page_index; - - page_index = rd->state->page_index; +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct pg_cache_page_index *page_index = metric_handle->page_index; return page_index->oldest_time / USEC_PER_SEC; } -int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t) +int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier) { struct page_cache *pg_cache; struct rrdengine_instance *ctx; Pvoid_t *PValue; struct pg_cache_page_index *page_index = NULL; - ctx = get_rrdeng_ctx_from_host(localhost); + ctx = get_rrdeng_ctx_from_host(localhost, tier); if (unlikely(!ctx)) { error("Failed to fetch multidb context"); return 1; @@ -728,6 +645,36 @@ int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, t return 1; } +int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t) +{ + struct page_cache *pg_cache; + struct rrdengine_instance *ctx; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + + ctx = (struct rrdengine_instance *)si; + if (unlikely(!ctx)) { + error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__); + return 1; + } + pg_cache = &ctx->pg_cache; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + if (likely(page_index)) { + *first_entry_t = page_index->oldest_time / USEC_PER_SEC; + *last_entry_t = page_index->latest_time / USEC_PER_SEC; + return 0; + } + + return 1; +} + /* Also gets a reference for the page */ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr) { @@ -738,6 +685,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde descr = pg_cache_create_descr(); descr->id = id; /* TODO: add page type: metric, log, something? */ + descr->type = ctx->page_type; page = dbengine_page_alloc(); /*TODO: add page size */ rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; @@ -901,8 +849,7 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) * Returns 0 on success, negative on error */ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, - unsigned disk_space_mb) -{ + unsigned disk_space_mb, int tier) { struct rrdengine_instance *ctx; int error; uint32_t max_open_files; @@ -914,19 +861,23 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p if (rrdeng_reserved_file_descriptors > max_open_files) { error( "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", - (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); + (unsigned)rrdeng_reserved_file_descriptors, + (unsigned)max_open_files); rrd_stat_atomic_add(&global_fs_errors, 1); rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return UV_EMFILE; } - if (NULL == ctxp) { - ctx = &multidb_ctx; + if(NULL == ctxp) { + ctx = multidb_ctx[tier]; memset(ctx, 0, sizeof(*ctx)); - } else { + } + else { *ctxp = ctx = callocz(1, sizeof(*ctx)); } + ctx->tier = tier; + ctx->page_type = tier_page_type[tier]; ctx->global_compress_alg = RRD_LZ4; if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; @@ -979,9 +930,10 @@ error_after_rrdeng_worker: finalize_rrd_files(ctx); error_after_init_rrd_files: free_page_cache(ctx); - if (ctx != &multidb_ctx) { + if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) { freez(ctx); - *ctxp = NULL; + if (ctxp) + *ctxp = NULL; } rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return UV_EIO; @@ -1008,9 +960,9 @@ int rrdeng_exit(struct rrdengine_instance *ctx) //metalog_exit(ctx->metalog_ctx); free_page_cache(ctx); - if (ctx != &multidb_ctx) { + if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) freez(ctx); - } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return 0; } @@ -1034,3 +986,107 @@ void rrdeng_prepare_exit(struct rrdengine_instance *ctx) //metalog_prepare_exit(ctx->metalog_ctx); } +RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { + RRDENG_SIZE_STATS stats = { 0 }; + + for(struct pg_cache_page_index *page_index = ctx->pg_cache.metrics_index.last_page_index; + page_index != NULL ;page_index = page_index->prev) { + stats.metrics++; + stats.metrics_pages += page_index->page_count; + } + + for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) { + stats.datafiles++; + + for(struct extent_info *ei = df->extents.first; ei ; ei = ei->next) { + stats.extents++; + stats.extents_compressed_bytes += ei->size; + + for(int p = 0; p < ei->number_of_pages ;p++) { + struct rrdeng_page_descr *descr = ei->pages[p]; + + usec_t update_every_usec; + + size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + + if(likely(points > 1)) + update_every_usec = (descr->end_time - descr->start_time) / (points - 1); + else { + update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC; + stats.single_point_pages++; + } + + time_t duration_secs = (time_t)((descr->end_time - descr->start_time + update_every_usec)/USEC_PER_SEC); + + stats.extents_pages++; + stats.pages_uncompressed_bytes += descr->page_length; + stats.pages_duration_secs += duration_secs; + stats.points += points; + + stats.page_types[descr->type].pages++; + stats.page_types[descr->type].pages_uncompressed_bytes += descr->page_length; + stats.page_types[descr->type].pages_duration_secs += duration_secs; + stats.page_types[descr->type].points += points; + + if(!stats.first_t || (descr->start_time - update_every_usec) < stats.first_t) + stats.first_t = (descr->start_time - update_every_usec) / USEC_PER_SEC; + + if(!stats.last_t || descr->end_time > stats.last_t) + stats.last_t = descr->end_time / USEC_PER_SEC; + } + } + } + + + stats.currently_collected_metrics = ctx->stats.metric_API_producers; + stats.max_concurrently_collected_metrics = ctx->metric_API_max_producers; + + internal_error(stats.metrics_pages != stats.extents_pages + stats.currently_collected_metrics, + "DBENGINE: metrics pages is %zu, but extents pages is %zu and API consumers is %zu", + stats.metrics_pages, stats.extents_pages, stats.currently_collected_metrics); + + stats.disk_space = ctx->disk_space; + stats.max_disk_space = ctx->max_disk_space; + + stats.database_retention_secs = (time_t)(stats.last_t - stats.first_t); + + if(stats.extents_pages) + stats.average_page_size_bytes = (double)stats.pages_uncompressed_bytes / (double)stats.extents_pages; + + if(stats.pages_uncompressed_bytes > 0) + stats.average_compression_savings = 100.0 - ((double)stats.extents_compressed_bytes * 100.0 / (double)stats.pages_uncompressed_bytes); + + if(stats.points) + stats.average_point_duration_secs = (double)stats.pages_duration_secs / (double)stats.points; + + if(stats.metrics) { + stats.average_metric_retention_secs = (double)stats.pages_duration_secs / (double)stats.metrics; + + if(stats.database_retention_secs) { + double metric_coverage = stats.average_metric_retention_secs / (double)stats.database_retention_secs; + double db_retention_days = (double)stats.database_retention_secs / 86400.0; + + stats.estimated_concurrently_collected_metrics = stats.metrics * metric_coverage; + + stats.ephemeral_metrics_per_day_percent = ((double)stats.metrics * 100.0 / (double)stats.estimated_concurrently_collected_metrics - 100.0) / (double)db_retention_days; + } + } + + stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index)); + stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr)); + stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile)); + stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr)); + stats.sizeof_point_data = page_type_size[ctx->page_type]; + stats.sizeof_page_data = RRDENG_BLOCK_SIZE; + stats.pages_per_extent = rrdeng_pages_per_extent; + + stats.sizeof_extent = sizeof(struct extent_info); + stats.sizeof_page_in_extent = sizeof(struct rrdeng_page_descr *); + + stats.sizeof_metric_in_index = 40; + stats.sizeof_page_in_index = 24; + + stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->tier); + + return stats; +} diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index d263259b6..509aa48ca 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -12,11 +12,17 @@ #define RRDENG_FD_BUDGET_PER_INSTANCE (50) +extern int db_engine_use_malloc; +extern int default_rrdeng_page_fetch_timeout; +extern int default_rrdeng_page_fetch_retries; extern int default_rrdeng_page_cache_mb; extern int default_rrdeng_disk_quota_mb; extern int default_multidb_disk_quota_mb; extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure; -extern struct rrdengine_instance multidb_ctx; +extern struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS]; +extern size_t page_type_size[]; + +#define PAGE_POINT_SIZE_BYTES(x) page_type_size[(x)->type] struct rrdeng_region_info { time_t start_time; @@ -36,29 +42,98 @@ extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + uuid_t *ret_uuid); -extern void rrdeng_metric_init(RRDDIM *rd); -extern void rrdeng_store_metric_init(RRDDIM *rd); -extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd); -extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); -extern int rrdeng_store_metric_finalize(RRDDIM *rd); -extern unsigned - rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, +extern STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance); +extern void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle); + +extern STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle); +extern void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); +extern void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags); +extern int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); + +extern unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list); -extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, - time_t start_time, time_t end_time); -extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time); + +extern void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, + time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type); +extern STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle); + extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle); extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); -extern time_t rrdeng_metric_latest_time(RRDDIM *rd); -extern time_t rrdeng_metric_oldest_time(RRDDIM *rd); +extern time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +extern time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle); + extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array); /* must call once before using anything */ extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, - unsigned disk_space_mb); + unsigned disk_space_mb, int tier); extern int rrdeng_exit(struct rrdengine_instance *ctx); extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx); -extern int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t); +extern int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier); +extern int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t); + +typedef struct rrdengine_size_statistics { + size_t default_granularity_secs; + + size_t sizeof_metric; + size_t sizeof_metric_in_index; + size_t sizeof_page; + size_t sizeof_page_in_index; + size_t sizeof_extent; + size_t sizeof_page_in_extent; + size_t sizeof_datafile; + size_t sizeof_page_in_cache; + size_t sizeof_point_data; + size_t sizeof_page_data; + + size_t pages_per_extent; + + size_t datafiles; + size_t extents; + size_t extents_pages; + size_t points; + size_t metrics; + size_t metrics_pages; + + size_t extents_compressed_bytes; + size_t pages_uncompressed_bytes; + time_t pages_duration_secs; + + struct { + size_t pages; + size_t pages_uncompressed_bytes; + time_t pages_duration_secs; + size_t points; + } page_types[256]; + + size_t single_point_pages; + + usec_t first_t; + usec_t last_t; + + size_t currently_collected_metrics; + size_t max_concurrently_collected_metrics; + size_t estimated_concurrently_collected_metrics; + + size_t disk_space; + size_t max_disk_space; + + time_t database_retention_secs; + double average_compression_savings; + double average_point_duration_secs; + double average_metric_retention_secs; + + double ephemeral_metrics_per_day_percent; + + double average_page_size_bytes; +} RRDENG_SIZE_STATS; + +extern RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx); #endif /* NETDATA_RRDENGINEAPI_H */ -- cgit v1.2.3