summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:34 +0000
commitd079b656b4719739b2247dcd9d46e9bec793095a (patch)
treed2c950c70a776bcf697c963151c5bd959f8a9f03 /database/engine/rrdengineapi.c
parentReleasing debian version 1.37.1-2. (diff)
downloadnetdata-d079b656b4719739b2247dcd9d46e9bec793095a.tar.xz
netdata-d079b656b4719739b2247dcd9d46e9bec793095a.zip
Merging upstream version 1.38.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-xdatabase/engine/rrdengineapi.c1683
1 files changed, 909 insertions, 774 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 4525b041..27497bbb 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -1,6 +1,5 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-#include "../storage_engine.h"
/* Default global database instance */
struct rrdengine_instance multidb_ctx_storage_tier0;
@@ -8,12 +7,21 @@ struct rrdengine_instance multidb_ctx_storage_tier1;
struct rrdengine_instance multidb_ctx_storage_tier2;
struct rrdengine_instance multidb_ctx_storage_tier3;
struct rrdengine_instance multidb_ctx_storage_tier4;
+
+#define mrg_metric_ctx(metric) (struct rrdengine_instance *)mrg_metric_section(main_mrg, metric)
+
#if RRD_STORAGE_TIERS != 5
#error RRD_STORAGE_TIERS is not 5 - you need to add allocations here
#endif
struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER};
+#if defined(ENV32BIT)
+size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192};
+#else
+size_t tier_page_size[RRD_STORAGE_TIERS] = {4096, 2048, 384, 384, 384};
+#endif
+
#if PAGE_TYPE_MAX != 1
#error PAGE_TYPE_MAX is not 1 - you need to add allocations here
#endif
@@ -27,14 +35,17 @@ __attribute__((constructor)) void initialize_multidb_ctx(void) {
multidb_ctx[4] = &multidb_ctx_storage_tier4;
}
-int db_engine_use_malloc = 0;
int default_rrdeng_page_fetch_timeout = 3;
int default_rrdeng_page_fetch_retries = 3;
-int default_rrdeng_page_cache_mb = 32;
+int db_engine_journal_check = 0;
int default_rrdeng_disk_quota_mb = 256;
int default_multidb_disk_quota_mb = 256;
-/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
-uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
+
+#if defined(ENV32BIT)
+int default_rrdeng_page_cache_mb = 16;
+#else
+int default_rrdeng_page_cache_mb = 32;
+#endif
// ----------------------------------------------------------------------------
// metrics groups
@@ -90,161 +101,207 @@ void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_
memcpy(ret_uuid, hash_value, sizeof(uuid_t));
}
-/* Transform legacy UUID to be unique across hosts deterministically */
-void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid)
-{
- EVP_MD_CTX *evpctx;
- unsigned char hash_value[EVP_MAX_MD_SIZE];
- unsigned int hash_len;
-
- evpctx = EVP_MD_CTX_create();
- EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
- EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN);
- EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t));
- EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
- EVP_MD_CTX_destroy(evpctx);
- fatal_assert(hash_len > sizeof(uuid_t));
- memcpy(ret_uuid, hash_value, sizeof(uuid_t));
-}
-
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) {
+static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
uuid_t legacy_uuid;
rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
- return rrdeng_metric_get(db_instance, &legacy_uuid);
+ return mrg_metric_get_and_acquire(main_mrg, &legacy_uuid, (Word_t) ctx);
}
// ----------------------------------------------------------------------------
// metric handle
void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
-
- __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
+ METRIC *metric = (METRIC *)db_metric_handle;
+ mrg_metric_release(main_mrg, metric);
}
STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
- return db_metric_handle;
+ METRIC *metric = (METRIC *)db_metric_handle;
+ return (STORAGE_METRIC_HANDLE *) mrg_metric_dup(main_mrg, metric);
}
STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct pg_cache_page_index *page_index = NULL;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t));
- if (likely(NULL != PValue))
- page_index = *PValue;
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
-
- if (likely(page_index))
- __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
-
- return (STORAGE_METRIC_HANDLE *)page_index;
+ return (STORAGE_METRIC_HANDLE *) mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
+static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- struct pg_cache_page_index *page_index;
- struct page_cache *pg_cache = &ctx->pg_cache;
-
- uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
- Pvoid_t *PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t), PJE0);
- fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(uuid, ctx);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
- page_index->refcount = 1;
- uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
-
- return (STORAGE_METRIC_HANDLE *)page_index;
+ MRG_ENTRY entry = {
+ .section = (Word_t)ctx,
+ .first_time_s = 0,
+ .last_time_s = 0,
+ .latest_update_every_s = 0,
+ };
+ uuid_copy(entry.uuid, *uuid);
+
+ METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, NULL);
+ return metric;
}
STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
- STORAGE_METRIC_HANDLE *db_metric_handle;
-
- db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid);
- if(!db_metric_handle) {
- db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset));
- if(db_metric_handle) {
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- uuid_copy(rd->metric_uuid, page_index->id);
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ METRIC *metric;
+
+ metric = mrg_metric_get_and_acquire(main_mrg, &rd->metric_uuid, (Word_t) ctx);
+
+ if(unlikely(!metric)) {
+ if(unlikely(ctx->config.legacy)) {
+ // this is a single host database
+ // generate uuid from the chart and dimensions ids
+ // and overwrite the one supplied by rrddim
+ metric = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset));
+ if (metric)
+ uuid_copy(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric));
}
+
+ if(likely(!metric))
+ metric = rrdeng_metric_create(db_instance, &rd->metric_uuid);
}
- if(!db_metric_handle)
- db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid);
#ifdef NETDATA_INTERNAL_CHECKS
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- if(uuid_compare(rd->metric_uuid, page_index->id) != 0) {
+ if(uuid_compare(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric)) != 0) {
char uuid1[UUID_STR_LEN + 1];
char uuid2[UUID_STR_LEN + 1];
uuid_unparse(rd->metric_uuid, uuid1);
- uuid_unparse(page_index->id, uuid2);
- fatal("DBENGINE: uuids do not match, asked for metric '%s', but got page_index of metric '%s'", uuid1, uuid2);
+ uuid_unparse(*mrg_metric_uuid(main_mrg, metric), uuid2);
+ fatal("DBENGINE: uuids do not match, asked for metric '%s', but got metric '%s'", uuid1, uuid2);
}
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- if(page_index->ctx != ctx)
- fatal("DBENGINE: mixed up rrdengine instances, asked for metric from %p, got from %p", ctx, page_index->ctx);
+ if(mrg_metric_ctx(metric) != ctx)
+ fatal("DBENGINE: mixed up db instances, asked for metric from %p, got from %p",
+ ctx, mrg_metric_ctx(metric));
#endif
- return db_metric_handle;
+ return (STORAGE_METRIC_HANDLE *)metric;
}
// ----------------------------------------------------------------------------
// collect ops
+static inline void check_and_fix_mrg_update_every(struct rrdeng_collect_handle *handle) {
+ if(unlikely((time_t)(handle->update_every_ut / USEC_PER_SEC) != mrg_metric_get_update_every_s(main_mrg, handle->metric))) {
+ internal_error(true, "DBENGINE: collection handle has update every %ld, but the metric registry has %ld. Fixing it.",
+ (time_t)(handle->update_every_ut / USEC_PER_SEC), mrg_metric_get_update_every_s(main_mrg, handle->metric));
+
+ if(unlikely(!handle->update_every_ut))
+ handle->update_every_ut = (usec_t)mrg_metric_get_update_every_s(main_mrg, handle->metric) * USEC_PER_SEC;
+ else
+ mrg_metric_set_update_every(main_mrg, handle->metric, (time_t)(handle->update_every_ut / USEC_PER_SEC));
+ }
+}
+
+static inline bool check_completed_page_consistency(struct rrdeng_collect_handle *handle __maybe_unused) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if (unlikely(!handle->page || !handle->page_entries_max || !handle->page_position || !handle->page_end_time_ut))
+ return false;
+
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
+
+ uuid_t *uuid = mrg_metric_uuid(main_mrg, handle->metric);
+ time_t start_time_s = pgc_page_start_time_s(handle->page);
+ time_t end_time_s = pgc_page_end_time_s(handle->page);
+ time_t update_every_s = pgc_page_update_every_s(handle->page);
+ size_t page_length = handle->page_position * CTX_POINT_SIZE_BYTES(ctx);
+ size_t entries = handle->page_position;
+ time_t overwrite_zero_update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
+
+ if(end_time_s > max_acceptable_collected_time())
+ handle->page_flags |= RRDENG_PAGE_COMPLETED_IN_FUTURE;
+
+ VALIDATED_PAGE_DESCRIPTOR vd = validate_page(
+ uuid,
+ start_time_s,
+ end_time_s,
+ update_every_s,
+ page_length,
+ ctx->config.page_type,
+ entries,
+ 0, // do not check for future timestamps - we inherit the timestamps of the children
+ overwrite_zero_update_every_s,
+ false,
+ "collected",
+ handle->page_flags);
+
+ return vd.is_valid;
+#else
+ return true;
+#endif
+}
+
/*
* Gets a handle for storing metrics to the database.
* The handle must be released with rrdeng_store_metric_final().
*/
STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ METRIC *metric = (METRIC *)db_metric_handle;
+ struct rrdengine_instance *ctx = mrg_metric_ctx(metric);
+
+ bool is_1st_metric_writer = true;
+ if(!mrg_metric_set_writer(main_mrg, metric)) {
+ is_1st_metric_writer = false;
+ char uuid[UUID_STR_LEN + 1];
+ uuid_unparse(*mrg_metric_uuid(main_mrg, metric), uuid);
+ error("DBENGINE: metric '%s' is already collected and should not be collected twice - expect gaps on the charts", uuid);
+ }
+
+ metric = mrg_metric_dup(main_mrg, metric);
+
struct rrdeng_collect_handle *handle;
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
- handle->page_index = page_index;
- handle->descr = NULL;
- handle->unaligned_page = 0;
- page_index->latest_update_every_s = update_every;
+ handle->metric = metric;
+ handle->page = NULL;
+ handle->page_position = 0;
+ handle->page_entries_max = 0;
+ handle->update_every_ut = (usec_t)update_every * USEC_PER_SEC;
+ handle->options = is_1st_metric_writer ? RRDENG_1ST_METRIC_WRITER : 0;
+
+ __atomic_add_fetch(&ctx->atomic.collectors_running, 1, __ATOMIC_RELAXED);
+ if(!is_1st_metric_writer)
+ __atomic_add_fetch(&ctx->atomic.collectors_running_duplicate, 1, __ATOMIC_RELAXED);
+
+ mrg_metric_set_update_every(main_mrg, metric, update_every);
handle->alignment = (struct pg_alignment *)smg;
rrdeng_page_alignment_acquire(handle->alignment);
- uv_rwlock_wrlock(&page_index->lock);
- ++page_index->writers;
- uv_rwlock_wrunlock(&page_index->lock);
+ // this is important!
+ // if we don't set the page_end_time_ut during the first collection
+ // data collection may be able to go back in time and during the addition of new pages
+ // clean pages may be found matching ours!
+
+ time_t db_first_time_s, db_last_time_s, db_update_every_s;
+ mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
+ handle->page_end_time_ut = (usec_t)db_last_time_s * USEC_PER_SEC;
return (STORAGE_COLLECT_HANDLE *)handle;
}
/* The page must be populated and referenced */
-static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
-{
- switch(descr->type) {
+static bool page_has_only_empty_metrics(struct rrdeng_collect_handle *handle) {
+ switch(handle->type) {
case PAGE_METRICS: {
- size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
- storage_number *array = (storage_number *)descr->pg_cache_descr->page;
+ size_t slots = handle->page_position;
+ storage_number *array = (storage_number *)pgc_page_data(handle->page);
for (size_t i = 0 ; i < slots; ++i) {
if(does_storage_number_exist(array[i]))
- return 0;
+ return false;
}
}
break;
case PAGE_TIER: {
- size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
- storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page;
+ size_t slots = handle->page_position;
+ storage_number_tier1_t *array = (storage_number_tier1_t *)pgc_page_data(handle->page);
for (size_t i = 0 ; i < slots; ++i) {
if(fpclassify(array[i].sum_value) != FP_NAN)
- return 0;
+ return false;
}
}
break;
@@ -252,422 +309,585 @@ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
default: {
static bool logged = false;
if(!logged) {
- error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type);
+ error("DBENGINE: cannot check page for nulls on unknown page type id %d", (mrg_metric_ctx(handle->metric))->config.page_type);
logged = true;
}
- return 0;
+ return false;
}
}
- return 1;
+ return true;
}
void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
- struct rrdengine_instance *ctx = handle->page_index->ctx;
- struct rrdeng_page_descr *descr = handle->descr;
-
- if (unlikely(!ctx)) return;
- if (unlikely(!descr)) return;
-
- if (likely(descr->page_length)) {
- int page_is_empty;
-
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-
- page_is_empty = page_has_only_empty_metrics(descr);
- if (page_is_empty) {
- print_page_cache_descr(descr, "Page has empty metrics only, deleting", true);
- pg_cache_put(ctx, descr);
- pg_cache_punch_hole(ctx, descr, 1, 0, NULL);
- } else
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- } else {
- dbengine_page_free(descr->pg_cache_descr->page);
- rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
- rrdeng_page_descr_freez(descr);
+
+ if (unlikely(!handle->page))
+ return;
+
+ if(!handle->page_position || page_has_only_empty_metrics(handle))
+ pgc_page_to_clean_evict_or_release(main_cache, handle->page);
+
+ else {
+ check_completed_page_consistency(handle);
+ mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->page));
+ pgc_page_hot_to_dirty_and_release(main_cache, handle->page);
}
- handle->descr = NULL;
-}
-static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection_handle,
- usec_t point_in_time_ut,
- NETDATA_DOUBLE n,
- NETDATA_DOUBLE min_value,
- NETDATA_DOUBLE max_value,
- uint16_t count,
- uint16_t anomaly_count,
- SN_FLAGS flags)
-{
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- struct pg_cache_page_index *page_index = handle->page_index;
- struct rrdengine_instance *ctx = handle->page_index->ctx;
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = handle->descr;
+ mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, 0);
- void *page;
- uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
+ handle->page = NULL;
+ handle->page_flags = 0;
+ handle->page_position = 0;
+ handle->page_entries_max = 0;
- if (descr) {
- /* Make alignment decisions */
+ // important!
+ // we should never zero page end time ut, because this will allow
+ // collection to go back in time
+ // handle->page_end_time_ut = 0;
+ // handle->page_start_time_ut;
+
+ check_and_fix_mrg_update_every(handle);
+}
+
+static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *handle,
+ struct rrdengine_instance *ctx,
+ usec_t point_in_time_ut,
+ void *data,
+ size_t data_size) {
+ time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
+ const time_t update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
+
+ PGC_ENTRY page_entry = {
+ .section = (Word_t) ctx,
+ .metric_id = mrg_metric_id(main_mrg, handle->metric),
+ .start_time_s = point_in_time_s,
+ .end_time_s = point_in_time_s,
+ .size = data_size,
+ .data = data,
+ .update_every_s = update_every_s,
+ .hot = true
+ };
+
+ size_t conflicts = 0;
+ bool added = true;
+ PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
+ while (unlikely(!added)) {
+ conflicts++;
+
+ char uuid[UUID_STR_LEN + 1];
+ uuid_unparse(*mrg_metric_uuid(main_mrg, handle->metric), uuid);
#ifdef NETDATA_INTERNAL_CHECKS
- if(descr->end_time_ut + page_index->latest_update_every_s * USEC_PER_SEC != point_in_time_ut) {
- char buffer[200 + 1];
- snprintfz(buffer, 200,
- "metrics collected are %s, end_time_ut = %llu, point_in_time_ut = %llu, update_every = %u, delta = %llu",
- (point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC > page_index->latest_update_every_s)?"far apart":"not aligned",
- descr->end_time_ut / USEC_PER_SEC,
- point_in_time_ut / USEC_PER_SEC,
- page_index->latest_update_every_s,
- point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC);
- print_page_cache_descr(descr, buffer, false);
- }
+ internal_error(true,
+#else
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl,
#endif
+ "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache "
+ "with existing %s%s page from %ld to %ld, update every %ld - "
+ "is it collected more than once?",
+ uuid,
+ page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s,
+ pgc_is_page_hot(page) ? "hot" : "not-hot",
+ pgc_page_data(page) == DBENGINE_EMPTY_PAGE ? " gap" : "",
+ pgc_page_start_time_s(page), pgc_page_end_time_s(page), pgc_page_update_every_s(page)
+ );
+
+ pgc_page_release(main_cache, page);
+
+ point_in_time_ut -= handle->update_every_ut;
+ point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
+ page_entry.start_time_s = point_in_time_s;
+ page_entry.end_time_s = point_in_time_s;
+ page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
+ }
- if (descr->page_length == handle->alignment->page_length) {
- /* this is the leading dimension that defines chart alignment */
- perfect_page_alignment = 1;
- }
- /* is the metric far enough out of alignment with the others? */
- if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < handle->alignment->page_length)) {
- handle->unaligned_page = 1;
- print_page_cache_descr(descr, "Metric page is not aligned with chart", true);
- }
- if (unlikely(handle->unaligned_page &&
- /* did the other metrics change page? */
- handle->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
- print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true);
- must_flush_unaligned_page = 1;
- handle->unaligned_page = 0;
- }
+ handle->page_entries_max = data_size / CTX_POINT_SIZE_BYTES(ctx);
+ handle->page_start_time_ut = point_in_time_ut;
+ handle->page_end_time_ut = point_in_time_ut;
+ handle->page_position = 1; // zero is already in our data
+ handle->page = page;
+ handle->page_flags = conflicts? RRDENG_PAGE_CONFLICT : 0;
+
+ if(point_in_time_s > max_acceptable_collected_time())
+ handle->page_flags |= RRDENG_PAGE_CREATED_IN_FUTURE;
+
+ check_and_fix_mrg_update_every(handle);
+}
+
+static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
+ size_t size;
+
+ if(handle->options & RRDENG_FIRST_PAGE_ALLOCATED) {
+ // any page except the first
+ size = tier_page_size[ctx->config.tier];
}
- if (unlikely(NULL == descr ||
- descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE ||
- must_flush_unaligned_page)) {
+ else {
+ size_t final_slots = 0;
- if(descr) {
- print_page_cache_descr(descr, "flushing metric", true);
- rrdeng_store_metric_flush_current_page(collection_handle);
+ // the first page
+ handle->options |= RRDENG_FIRST_PAGE_ALLOCATED;
+ size_t max_size = tier_page_size[ctx->config.tier];
+ size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx);
+
+ if(handle->alignment->initial_slots) {
+ final_slots = handle->alignment->initial_slots;
}
+ else {
+ max_slots -= 3;
- page = rrdeng_create_page(ctx, &page_index->id, &descr);
- fatal_assert(page);
+ size_t smaller_slot = indexing_partition((Word_t)handle->alignment, max_slots);
+ final_slots = smaller_slot;
- descr->update_every_s = page_index->latest_update_every_s;
- handle->descr = descr;
+ time_t now_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
+ size_t current_pos = (now_s % max_slots);
- handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
+ if(current_pos > final_slots)
+ final_slots += max_slots - current_pos;
- if (0 == handle->alignment->page_length) {
- /* this is the leading dimension that defines chart alignment */
- perfect_page_alignment = 1;
+ else if(current_pos < final_slots)
+ final_slots -= current_pos;
+
+ if(final_slots < 3) {
+ final_slots += 3;
+ smaller_slot += 3;
+
+ if(smaller_slot >= max_slots)
+ smaller_slot -= max_slots;
+ }
+
+ max_slots += 3;
+ handle->alignment->initial_slots = smaller_slot + 3;
+
+ internal_fatal(handle->alignment->initial_slots < 3 || handle->alignment->initial_slots >= max_slots, "ooops! wrong distribution of metrics across time");
+ internal_fatal(final_slots < 3 || final_slots >= max_slots, "ooops! wrong distribution of metrics across time");
}
+
+ size = final_slots * CTX_POINT_SIZE_BYTES(ctx);
}
- page = descr->pg_cache_descr->page;
+ *data_size = size;
+ return dbengine_page_alloc(size);
+}
+
+static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_handle,
+ const usec_t point_in_time_ut,
+ const NETDATA_DOUBLE n,
+ const NETDATA_DOUBLE min_value,
+ const NETDATA_DOUBLE max_value,
+ const uint16_t count,
+ const uint16_t anomaly_count,
+ const SN_FLAGS flags)
+{
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- switch (descr->type) {
+ bool perfect_page_alignment = false;
+ void *data;
+ size_t data_size;
+
+ if(likely(handle->page)) {
+ /* Make alignment decisions */
+ if (handle->page_position == handle->alignment->page_position) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = true;
+ }
+
+ /* is the metric far enough out of alignment with the others? */
+ if (unlikely(handle->page_position + 1 < handle->alignment->page_position))
+ handle->options |= RRDENG_CHO_UNALIGNED;
+
+ if (unlikely((handle->options & RRDENG_CHO_UNALIGNED) &&
+ /* did the other metrics change page? */
+ handle->alignment->page_position <= 1)) {
+ handle->options &= ~RRDENG_CHO_UNALIGNED;
+ handle->page_flags |= RRDENG_PAGE_UNALIGNED;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+
+ data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
+ }
+ else {
+ data = pgc_page_data(handle->page);
+ data_size = pgc_page_data_size(main_cache, handle->page);
+ }
+ }
+ else
+ data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
+
+ switch (ctx->config.page_type) {
case PAGE_METRICS: {
- ((storage_number *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = pack_storage_number(n, flags);
+ storage_number *tier0_metric_data = data;
+ tier0_metric_data[handle->page_position] = pack_storage_number(n, flags);
}
break;
case PAGE_TIER: {
+ storage_number_tier1_t *tier12_metric_data = data;
storage_number_tier1_t number_tier1;
number_tier1.sum_value = (float)n;
number_tier1.min_value = (float)min_value;
number_tier1.max_value = (float)max_value;
number_tier1.anomaly_count = anomaly_count;
number_tier1.count = count;
- ((storage_number_tier1_t *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = number_tier1;
+ tier12_metric_data[handle->page_position] = number_tier1;
}
break;
default: {
static bool logged = false;
if(!logged) {
- error("DBENGINE: cannot store metric on unknown page type id %d", descr->type);
+ error("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type);
logged = true;
}
}
break;
}
- pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
+ if(unlikely(!handle->page)){
+ rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, data, data_size);
+ // handle->position is set to 1 already
- if (perfect_page_alignment)
- handle->alignment->page_length = descr->page_length;
- if (unlikely(INVALID_TIME == descr->start_time_ut)) {
- unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
- descr->start_time_ut = point_in_time_ut;
-
- new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1);
- while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) {
- /* Increase ctx->metric_API_max_producers */
- ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers,
- old_metric_API_max_producers,
- new_metric_API_producers);
- if (old_metric_API_max_producers == ret_metric_API_max_producers) {
- /* success */
- break;
- }
+ if (0 == handle->alignment->page_position) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = true;
+ }
+ }
+ else {
+ // update an existing page
+ pgc_page_hot_set_end_time_s(main_cache, handle->page, (time_t) (point_in_time_ut / USEC_PER_SEC));
+ handle->page_end_time_ut = point_in_time_ut;
+
+ if(unlikely(++handle->page_position >= handle->page_entries_max)) {
+ internal_fatal(handle->page_position > handle->page_entries_max, "DBENGINE: exceeded page max number of points");
+ handle->page_flags |= RRDENG_PAGE_FULL;
+ rrdeng_store_metric_flush_current_page(collection_handle);
}
+ }
+
+ if (perfect_page_alignment)
+ handle->alignment->page_position = handle->page_position;
+
+ // update the metric information
+ mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, (time_t) (point_in_time_ut / USEC_PER_SEC));
+}
+
+static void store_metric_next_error_log(struct rrdeng_collect_handle *handle, usec_t point_in_time_ut, const char *msg) {
+ time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
+ char uuid[UUID_STR_LEN + 1];
+ uuid_unparse(*mrg_metric_uuid(main_mrg, handle->metric), uuid);
- pg_cache_insert(ctx, page_index, descr);
- } else {
- pg_cache_add_new_metric_time(page_index, descr);
+ BUFFER *wb = NULL;
+ if(handle->page && handle->page_flags) {
+ wb = buffer_create(0, NULL);
+ collect_page_flags_to_buffer(wb, handle->page_flags);
}
-// {
-// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
-// if(uuid_compare(u, page_index->id) == 0) {
-// char buffer[100];
-// snprintfz(buffer, 100, "store system.cpu, collect:%u, page_index first:%u, last:%u",
-// (uint32_t)(point_in_time / USEC_PER_SEC),
-// (uint32_t)(page_index->oldest_time / USEC_PER_SEC),
-// (uint32_t)(page_index->latest_time / USEC_PER_SEC));
-//
-// print_page_cache_descr(descr, buffer, false);
-// }
-// }
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl,
+ "DBENGINE: metric '%s' collected point at %ld, %s last collection at %ld, "
+ "update every %ld, %s page from %ld to %ld, position %u (of %u), flags: %s",
+ uuid,
+ point_in_time_s,
+ msg,
+ (time_t)(handle->page_end_time_ut / USEC_PER_SEC),
+ (time_t)(handle->update_every_ut / USEC_PER_SEC),
+ handle->page ? "current" : "*LAST*",
+ (time_t)(handle->page_start_time_ut / USEC_PER_SEC),
+ (time_t)(handle->page_end_time_ut / USEC_PER_SEC),
+ handle->page_position, handle->page_entries_max,
+ wb ? buffer_tostring(wb) : ""
+ );
+
+ buffer_free(wb);
}
void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
- usec_t point_in_time_ut,
- NETDATA_DOUBLE n,
- NETDATA_DOUBLE min_value,
- NETDATA_DOUBLE max_value,
- uint16_t count,
- uint16_t anomaly_count,
- SN_FLAGS flags)
+ const usec_t point_in_time_ut,
+ const NETDATA_DOUBLE n,
+ const NETDATA_DOUBLE min_value,
+ const NETDATA_DOUBLE max_value,
+ const uint16_t count,
+ const uint16_t anomaly_count,
+ const SN_FLAGS flags)
{
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- struct pg_cache_page_index *page_index = handle->page_index;
- struct rrdeng_page_descr *descr = handle->descr;
-
- if(likely(descr)) {
- usec_t last_point_in_time_ut = descr->end_time_ut;
- usec_t update_every_ut = page_index->latest_update_every_s * USEC_PER_SEC;
- size_t points_gap = (point_in_time_ut <= last_point_in_time_ut) ?
- (size_t)0 :
- (size_t)((point_in_time_ut - last_point_in_time_ut) / update_every_ut);
-
- if(unlikely(points_gap != 1)) {
- if (unlikely(points_gap <= 0)) {
- time_t now = now_realtime_sec();
- static __thread size_t counter = 0;
- static __thread time_t last_time_logged = 0;
- counter++;
-
- if(now - last_time_logged > 600) {
- error("DBENGINE: collected point is in the past (repeated %zu times in the last %zu secs). Ignoring these data collection points.",
- counter, (size_t)(last_time_logged?(now - last_time_logged):0));
-
- last_time_logged = now;
- counter = 0;
- }
- return;
- }
- size_t point_size = PAGE_POINT_SIZE_BYTES(descr);
- size_t page_size_in_points = RRDENG_BLOCK_SIZE / point_size;
- size_t used_points = descr->page_length / point_size;
- size_t remaining_points_in_page = page_size_in_points - used_points;
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(unlikely(point_in_time_ut > (usec_t)max_acceptable_collected_time() * USEC_PER_SEC))
+ handle->page_flags |= RRDENG_PAGE_FUTURE_POINT;
+#endif
+
+ if(likely(handle->page_end_time_ut + handle->update_every_ut == point_in_time_ut)) {
+ // happy path
+ ;
+ }
+ else if(unlikely(point_in_time_ut < handle->page_end_time_ut)) {
+ handle->page_flags |= RRDENG_PAGE_PAST_COLLECTION;
+ store_metric_next_error_log(handle, point_in_time_ut, "is older than the");
+ return;
+ }
- bool new_point_is_aligned = true;
- if(unlikely((point_in_time_ut - last_point_in_time_ut) / points_gap != update_every_ut))
- new_point_is_aligned = false;
+ else if(unlikely(point_in_time_ut == handle->page_end_time_ut)) {
+ handle->page_flags |= RRDENG_PAGE_REPEATED_COLLECTION;
+ store_metric_next_error_log(handle, point_in_time_ut, "is at the same time as the");
+ return;
+ }
- if(unlikely(points_gap > remaining_points_in_page || !new_point_is_aligned)) {
-// char buffer[200];
-// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Cutting page.",
-// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
-// print_page_cache_descr(descr, buffer, false);
+ else if(handle->page) {
+ usec_t delta_ut = point_in_time_ut - handle->page_end_time_ut;
+ if(unlikely(delta_ut < handle->update_every_ut)) {
+ handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else if(unlikely(delta_ut % handle->update_every_ut)) {
+ handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else {
+ size_t points_gap = delta_ut / handle->update_every_ut;
+ size_t page_remaining_points = handle->page_entries_max - handle->page_position;
+
+ if(points_gap >= page_remaining_points) {
+ handle->page_flags |= RRDENG_PAGE_BIG_GAP;
rrdeng_store_metric_flush_current_page(collection_handle);
}
else {
-// char buffer[200];
-// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Filling the gap.",
-// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
-// print_page_cache_descr(descr, buffer, false);
-
// loop to fill the gap
- usec_t step_ut = page_index->latest_update_every_s * USEC_PER_SEC;
- usec_t last_point_filled_ut = last_point_in_time_ut + step_ut;
-
- while (last_point_filled_ut < point_in_time_ut) {
- rrdeng_store_metric_next_internal(
- collection_handle, last_point_filled_ut, NAN, NAN, NAN,
- 1, 0, SN_EMPTY_SLOT);
-
- last_point_filled_ut += step_ut;
+ handle->page_flags |= RRDENG_PAGE_GAP;
+
+ usec_t stop_ut = point_in_time_ut - handle->update_every_ut;
+ for(usec_t this_ut = handle->page_end_time_ut + handle->update_every_ut;
+ this_ut <= stop_ut ;
+ this_ut = handle->page_end_time_ut + handle->update_every_ut) {
+ rrdeng_store_metric_append_point(
+ collection_handle,
+ this_ut,
+ NAN, NAN, NAN,
+ 1, 0,
+ SN_EMPTY_SLOT);
}
}
}
}
- rrdeng_store_metric_next_internal(collection_handle, point_in_time_ut, n, min_value, max_value, count, anomaly_count, flags);
+ rrdeng_store_metric_append_point(collection_handle,
+ point_in_time_ut,
+ n, min_value, max_value,
+ count, anomaly_count,
+ flags);
}
-
/*
* Releases the database reference from the handle for storing metrics.
* Returns 1 if it's safe to delete the dimension.
*/
int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- struct pg_cache_page_index *page_index = handle->page_index;
-
- uint8_t can_delete_metric = 0;
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
+ handle->page_flags |= RRDENG_PAGE_COLLECT_FINALIZE;
rrdeng_store_metric_flush_current_page(collection_handle);
- uv_rwlock_wrlock(&page_index->lock);
+ rrdeng_page_alignment_release(handle->alignment);
- if (!--page_index->writers && !page_index->page_count)
- can_delete_metric = 1;
+ __atomic_sub_fetch(&ctx->atomic.collectors_running, 1, __ATOMIC_RELAXED);
+ if(!(handle->options & RRDENG_1ST_METRIC_WRITER))
+ __atomic_sub_fetch(&ctx->atomic.collectors_running_duplicate, 1, __ATOMIC_RELAXED);
- uv_rwlock_wrunlock(&page_index->lock);
+ if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_clear_writer(main_mrg, handle->metric))
+ internal_fatal(true, "DBENGINE: metric is already released");
- rrdeng_page_alignment_release(handle->alignment);
+ time_t first_time_s, last_time_s, update_every_s;
+ mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, &update_every_s);
+
+ mrg_metric_release(main_mrg, handle->metric);
freez(handle);
- return can_delete_metric;
+ if(!first_time_s && !last_time_s)
+ return 1;
+
+ return 0;
}
void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- struct pg_cache_page_index *page_index = handle->page_index;
+ check_and_fix_mrg_update_every(handle);
+
+ METRIC *metric = handle->metric;
+ usec_t update_every_ut = (usec_t)update_every * USEC_PER_SEC;
+
+ if(update_every_ut == handle->update_every_ut)
+ return;
+
+ handle->page_flags |= RRDENG_PAGE_UPDATE_EVERY_CHANGE;
rrdeng_store_metric_flush_current_page(collection_handle);
- uv_rwlock_rdlock(&page_index->lock);
- page_index->latest_update_every_s = update_every;
- uv_rwlock_rdunlock(&page_index->lock);
+ mrg_metric_set_update_every(main_mrg, metric, update_every);
+ handle->update_every_ut = update_every_ut;
}
// ----------------------------------------------------------------------------
// query ops
-//static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
-//{
-// return (uint32_t *)&page_info->scratch[0];
-//}
-//
-//static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
-//{
-// return (uint32_t *)&page_info->scratch[sizeof(uint32_t)];
-//}
-//
+#ifdef NETDATA_INTERNAL_CHECKS
+SPINLOCK global_query_handle_spinlock = NETDATA_SPINLOCK_INITIALIZER;
+static struct rrdeng_query_handle *global_query_handle_ll = NULL;
+static void register_query_handle(struct rrdeng_query_handle *handle) {
+ handle->query_pid = gettid();
+ handle->started_time_s = now_realtime_sec();
+
+ netdata_spinlock_lock(&global_query_handle_spinlock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(global_query_handle_ll, handle, prev, next);
+ netdata_spinlock_unlock(&global_query_handle_spinlock);
+}
+static void unregister_query_handle(struct rrdeng_query_handle *handle) {
+ netdata_spinlock_lock(&global_query_handle_spinlock);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(global_query_handle_ll, handle, prev, next);
+ netdata_spinlock_unlock(&global_query_handle_spinlock);
+}
+#else
+static void register_query_handle(struct rrdeng_query_handle *handle __maybe_unused) {
+ ;
+}
+static void unregister_query_handle(struct rrdeng_query_handle *handle __maybe_unused) {
+ ;
+}
+#endif
+
/*
* Gets a handle for loading metrics from the database.
* The handle must be released with rrdeng_load_metric_final().
*/
-void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, time_t start_time_s, time_t end_time_s)
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
+ struct storage_engine_query_handle *rrddim_handle,
+ time_t start_time_s,
+ time_t end_time_s,
+ STORAGE_PRIORITY priority)
{
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- struct rrdengine_instance *ctx = page_index->ctx;
+ usec_t started_ut = now_monotonic_usec();
- // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time);
+ netdata_thread_disable_cancelability();
+ METRIC *metric = (METRIC *)db_metric_handle;
+ struct rrdengine_instance *ctx = mrg_metric_ctx(metric);
struct rrdeng_query_handle *handle;
- unsigned pages_nr;
- if(!page_index->latest_update_every_s)
- page_index->latest_update_every_s = default_rrd_update_every;
+ handle = rrdeng_query_handle_get();
+ register_query_handle(handle);
- rrdimm_handle->start_time_s = start_time_s;
- rrdimm_handle->end_time_s = end_time_s;
+ if(unlikely(priority < STORAGE_PRIORITY_HIGH))
+ priority = STORAGE_PRIORITY_HIGH;
+ else if(unlikely(priority > STORAGE_PRIORITY_BEST_EFFORT))
+ priority = STORAGE_PRIORITY_BEST_EFFORT;
- handle = callocz(1, sizeof(struct rrdeng_query_handle));
- handle->wanted_start_time_s = start_time_s;
- handle->now_s = start_time_s;
- handle->position = 0;
handle->ctx = ctx;
- handle->descr = NULL;
- handle->dt_s = page_index->latest_update_every_s;
- rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
- pages_nr = pg_cache_preload(ctx, &page_index->id, start_time_s * USEC_PER_SEC, end_time_s * USEC_PER_SEC,
- NULL, &handle->page_index);
- if (unlikely(NULL == handle->page_index || 0 == pages_nr))
- // there are no metrics to load
- handle->wanted_start_time_s = INVALID_TIME;
-}
+ handle->metric = metric;
+ handle->priority = priority;
+
+ // IMPORTANT!
+ // It is crucial not to exceed the db boundaries, because dbengine
+ // now has gap caching, so when a gap is detected a negative page
+ // is inserted into the main cache, to avoid scanning the journals
+ // again for pages matching the gap.
+
+ time_t db_first_time_s, db_last_time_s, db_update_every_s;
+ mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
+
+ if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) == PAGE_IS_IN_RANGE) {
+ handle->start_time_s = MAX(start_time_s, db_first_time_s);
+ handle->end_time_s = MIN(end_time_s, db_last_time_s);
+ handle->now_s = handle->start_time_s;
+
+ handle->dt_s = db_update_every_s;
+ if (!handle->dt_s) {
+ handle->dt_s = default_rrd_update_every;
+ mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every);
+ }
-static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_handle, bool debug_this __maybe_unused) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+ rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle;
+ rrddim_handle->start_time_s = handle->start_time_s;
+ rrddim_handle->end_time_s = handle->end_time_s;
+ rrddim_handle->priority = priority;
- struct rrdengine_instance *ctx = handle->ctx;
- struct rrdeng_page_descr *descr = handle->descr;
+ pg_cache_preload(handle);
- uint32_t page_length;
- usec_t page_end_time_ut;
- unsigned position;
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_init, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED);
+ }
+ else {
+ handle->start_time_s = start_time_s;
+ handle->end_time_s = end_time_s;
+ handle->now_s = start_time_s;
+ handle->dt_s = db_update_every_s;
+
+ rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle;
+ rrddim_handle->start_time_s = handle->start_time_s;
+ rrddim_handle->end_time_s = 0;
+ rrddim_handle->priority = priority;
+ }
+}
- if (likely(descr)) {
- // Drop old page's reference
+static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+ struct rrdengine_instance *ctx = handle->ctx;
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
-#endif
+ if (likely(handle->page)) {
+ // we have a page to release
+ pgc_page_release(main_cache, handle->page);
+ handle->page = NULL;
+ }
- pg_cache_put(ctx, descr);
- handle->descr = NULL;
- handle->wanted_start_time_s = (time_t)((handle->page_end_time_ut / USEC_PER_SEC) + handle->dt_s);
+ if (unlikely(handle->now_s > rrddim_handle->end_time_s))
+ return false;
- if (unlikely(handle->wanted_start_time_s > rrdimm_handle->end_time_s))
- return 1;
- }
+ size_t entries;
+ handle->page = pg_cache_lookup_next(ctx, handle->pdc, handle->now_s, handle->dt_s, &entries);
+ if (unlikely(!handle->page))
+ return false;
- usec_t wanted_start_time_ut = handle->wanted_start_time_s * USEC_PER_SEC;
- descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
- wanted_start_time_ut, rrdimm_handle->end_time_s * USEC_PER_SEC);
- if (NULL == descr)
- return 1;
+ internal_fatal(pgc_page_data(handle->page) == DBENGINE_EMPTY_PAGE, "Empty page returned");
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
-#endif
+ time_t page_start_time_s = pgc_page_start_time_s(handle->page);
+ time_t page_end_time_s = pgc_page_end_time_s(handle->page);
+ time_t page_update_every_s = pgc_page_update_every_s(handle->page);
- handle->descr = descr;
- pg_cache_atomic_get_pg_info(descr, &page_end_time_ut, &page_length);
- if (unlikely(INVALID_TIME == descr->start_time_ut || INVALID_TIME == page_end_time_ut || 0 == descr->update_every_s)) {
- error("DBENGINE: discarding invalid page descriptor (start_time = %llu, end_time = %llu, update_every_s = %d)",
- descr->start_time_ut, page_end_time_ut, descr->update_every_s);
- return 1;
- }
+ unsigned position;
+ if(likely(handle->now_s >= page_start_time_s && handle->now_s <= page_end_time_s)) {
+
+ if(unlikely(entries == 1 || page_start_time_s == page_end_time_s || !page_update_every_s)) {
+ position = 0;
+ handle->now_s = page_start_time_s;
+ }
+ else {
+ position = (handle->now_s - page_start_time_s) * (entries - 1) / (page_end_time_s - page_start_time_s);
+ time_t point_end_time_s = page_start_time_s + position * page_update_every_s;
+ while(point_end_time_s < handle->now_s && position + 1 < entries) {
+ // https://github.com/netdata/netdata/issues/14411
+ // we really need a while() here, because the delta may be
+ // 2 points at higher tiers
+ position++;
+ point_end_time_s = page_start_time_s + position * page_update_every_s;
+ }
+ handle->now_s = point_end_time_s;
+ }
- if (unlikely(descr->start_time_ut != page_end_time_ut && wanted_start_time_ut > descr->start_time_ut)) {
- // we're in the middle of the page somewhere
- unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
- position = ((uint64_t)(wanted_start_time_ut - descr->start_time_ut)) * (entries - 1) /
- (page_end_time_ut - descr->start_time_ut);
+ internal_fatal(position >= entries, "DBENGINE: wrong page position calculation");
}
- else
+ else if(handle->now_s < page_start_time_s) {
+ handle->now_s = page_start_time_s;
position = 0;
+ }
+ else {
+ internal_fatal(true, "DBENGINE: this page is entirely in our past and should not be accepted for this query in the first place");
+ handle->now_s = page_end_time_s;
+ position = entries - 1;
+ }
- handle->page_end_time_ut = page_end_time_ut;
- handle->page_length = page_length;
- handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
- handle->page = descr->pg_cache_descr->page;
- handle->dt_s = descr->update_every_s;
+ handle->entries = entries;
handle->position = position;
-
-// if(debug_this)
-// info("DBENGINE: rrdeng_load_page_next(), "
-// "position:%d, "
-// "start_time_ut:%llu, "
-// "page_end_time_ut:%llu, "
-// "next_page_time_ut:%llu, "
-// "in_out:%s"
-// , position
-// , descr->start_time_ut
-// , page_end_time_ut
-// ,
-// wanted_start_time_ut, in_out?"true":"false"
-// );
-
- return 0;
+ handle->metric_data = pgc_page_data((PGC_PAGE *)handle->page);
+ handle->dt_s = page_update_every_s;
+ return true;
}
// Returns the metric and sets its timestamp into current_time
@@ -675,75 +895,28 @@ static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_hand
// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) {
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
- // struct rrdeng_metric_handle *metric_handle = handle->metric_handle;
-
- struct rrdeng_page_descr *descr = handle->descr;
- time_t now = handle->now_s + handle->dt_s;
-
-// bool debug_this = false;
-// {
-// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
-// if(uuid_compare(u, handle->page_index->id) == 0) {
-// char buffer[100];
-// snprintfz(buffer, 100, "load system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
-// (uint32_t)(now),
-// (uint32_t)(handle->dt_s),
-// (uint32_t)(handle->position),
-// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
-// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
-//
-// print_page_cache_descr(descr, buffer, false);
-// debug_this = true;
-// }
-// }
-
STORAGE_POINT sp;
- unsigned position = handle->position + 1;
- storage_number_tier1_t tier1_value;
-
- if (unlikely(INVALID_TIME == handle->wanted_start_time_s)) {
- handle->wanted_start_time_s = INVALID_TIME;
- handle->now_s = now;
- storage_point_empty(sp, now - handle->dt_s, now);
- return sp;
+
+ if (unlikely(handle->now_s > rrddim_handle->end_time_s)) {
+ storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
+ goto prepare_for_next_iteration;
}
- if (unlikely(!descr || position >= handle->entries)) {
+ if (unlikely(!handle->page || handle->position >= handle->entries)) {
// We need to get a new page
- if(rrdeng_load_page_next(rrddim_handle, false)) {
- // next calls will not load any more metrics
- handle->wanted_start_time_s = INVALID_TIME;
- handle->now_s = now;
- storage_point_empty(sp, now - handle->dt_s, now);
- return sp;
- }
- descr = handle->descr;
- position = handle->position;
- now = (time_t)((descr->start_time_ut / USEC_PER_SEC) + position * descr->update_every_s);
-
-// if(debug_this) {
-// char buffer[100];
-// snprintfz(buffer, 100, "NEW PAGE system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
-// (uint32_t)(now),
-// (uint32_t)(handle->dt_s),
-// (uint32_t)(handle->position),
-// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
-// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
-//
-// print_page_cache_descr(descr, buffer, false);
-// }
+ if (!rrdeng_load_page_next(rrddim_handle, false)) {
+ storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
+ goto prepare_for_next_iteration;
+ }
}
- sp.start_time = now - handle->dt_s;
- sp.end_time = now;
-
- handle->position = position;
- handle->now_s = now;
+ sp.start_time_s = handle->now_s - handle->dt_s;
+ sp.end_time_s = handle->now_s;
- switch(descr->type) {
+ switch(handle->ctx->config.page_type) {
case PAGE_METRICS: {
- storage_number n = handle->page[position];
+ storage_number n = handle->metric_data[handle->position];
sp.min = sp.max = sp.sum = unpack_storage_number(n);
sp.flags = n & SN_USER_FLAGS;
sp.count = 1;
@@ -752,7 +925,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
break;
case PAGE_TIER: {
- tier1_value = ((storage_number_tier1_t *)handle->page)[position];
+ storage_number_tier1_t tier1_value = ((storage_number_tier1_t *)handle->metric_data)[handle->position];
sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
sp.count = tier1_value.count;
sp.anomaly_count = tier1_value.anomaly_count;
@@ -766,204 +939,98 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
default: {
static bool logged = false;
if(!logged) {
- error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", descr->type);
+ error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", handle->ctx->config.page_type);
logged = true;
}
- storage_point_empty(sp, sp.start_time, sp.end_time);
+ storage_point_empty(sp, sp.start_time_s, sp.end_time_s);
}
break;
}
- if (unlikely(now >= rrddim_handle->end_time_s)) {
- // next calls will not load any more metrics
- handle->wanted_start_time_s = INVALID_TIME;
- }
+prepare_for_next_iteration:
+ internal_fatal(sp.end_time_s < rrddim_handle->start_time_s, "DBENGINE: this point is too old for this query");
+ internal_fatal(sp.end_time_s < handle->now_s, "DBENGINE: this point is too old for this point in time");
-// if(debug_this)
-// info("DBENGINE: returning point: "
-// "time from %ld to %ld // query from %ld to %ld // wanted_start_time_s %ld"
-// , sp.start_time, sp.end_time
-// , rrddim_handle->start_time_s, rrddim_handle->end_time_s
-// , handle->wanted_start_time_s
-// );
+ handle->now_s += handle->dt_s;
+ handle->position++;
return sp;
}
-int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle)
-{
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
- return (INVALID_TIME == handle->wanted_start_time_s);
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+ return (handle->now_s > rrddim_handle->end_time_s);
}
/*
* Releases the database reference from the handle for loading metrics.
*/
-void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle)
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle)
{
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
- struct rrdengine_instance *ctx = handle->ctx;
- struct rrdeng_page_descr *descr = handle->descr;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
- if (descr) {
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
-#endif
- pg_cache_put(ctx, descr);
- }
+ if (handle->page)
+ pgc_page_release(main_cache, handle->page);
- // whatever is allocated at rrdeng_load_metric_init() should be freed here
- freez(handle);
- rrdimm_handle->handle = NULL;
-}
+ if(!pdc_release_and_destroy_if_unreferenced(handle->pdc, false, false))
+ __atomic_store_n(&handle->pdc->workers_should_stop, true, __ATOMIC_RELAXED);
-time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- return (time_t)(page_index->latest_time_ut / USEC_PER_SEC);
-}
-time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- return (time_t)(page_index->oldest_time_ut / USEC_PER_SEC);
+ unregister_query_handle(handle);
+ rrdeng_query_handle_release(handle);
+ rrddim_handle->handle = NULL;
+ netdata_thread_enable_cancelability();
}
-int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t)
-{
- struct page_cache *pg_cache;
- struct rrdengine_instance *ctx;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
+time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
- ctx = (struct rrdengine_instance *)si;
- if (unlikely(!ctx)) {
- error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
- return 1;
+ if(handle->pdc) {
+ rrdeng_prep_wait(handle->pdc);
+ if (handle->pdc->optimal_end_time_s > rrddim_handle->end_time_s)
+ rrddim_handle->end_time_s = handle->pdc->optimal_end_time_s;
}
- pg_cache = &ctx->pg_cache;
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ return rrddim_handle->end_time_s;
+}
- if (likely(page_index)) {
- *first_entry_t = page_index->oldest_time_ut / USEC_PER_SEC;
- *last_entry_t = page_index->latest_time_ut / USEC_PER_SEC;
- return 0;
- }
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ METRIC *metric = (METRIC *)db_metric_handle;
+ time_t latest_time_s = 0;
- return 1;
-}
+ if (metric)
+ latest_time_s = mrg_metric_get_latest_time_s(main_mrg, metric);
-/* Also gets a reference for the page */
-void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
-{
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
- void *page;
- /* TODO: check maximum number of pages in page cache limit */
-
- descr = pg_cache_create_descr();
- descr->id = id; /* TODO: add page type: metric, log, something? */
- descr->type = ctx->page_type;
- page = dbengine_page_alloc(); /*TODO: add page size */
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- pg_cache_descr->page = page;
- pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
- pg_cache_descr->refcnt = 1;
-
- debug(D_RRDENGINE, "Created new page:");
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- *ret_descr = descr;
- return page;
+ return latest_time_s;
}
-/* The page must not be empty */
-void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
- Word_t page_correlation_id)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- Pvoid_t *PValue;
- unsigned nr_committed_pages;
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ METRIC *metric = (METRIC *)db_metric_handle;
- if (unlikely(NULL == descr)) {
- debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
- return;
- }
- fatal_assert(descr->page_length);
-
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
- PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
- *PValue = descr;
- nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
-
- if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) {
- /* over 50% of pages have not been committed yet */
-
- if (ctx->drop_metrics_under_page_cache_pressure &&
- nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
- /* 100% of pages are dirty */
- struct rrdeng_cmd cmd;
-
- cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
- } else {
- if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) {
- /* only print the first time */
- errno = 0;
- error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
- "Metric data at risk of not being stored in the database, "
- "please reduce disk load or use a faster disk.", ctx->dbfiles_path);
- }
- rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1);
- rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1);
- }
- }
+ time_t oldest_time_s = 0;
+ if (metric)
+ oldest_time_s = mrg_metric_get_first_time_s(main_mrg, metric);
- pg_cache_put(ctx, descr);
+ return oldest_time_s;
}
-/* Gets a reference for the page */
-void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
+bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s)
{
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
-
- debug(D_RRDENGINE, "Reading existing page:");
- descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
- if (NULL == descr) {
- *handle = NULL;
-
- return NULL;
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ if (unlikely(!ctx)) {
+ error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
+ return false;
}
- *handle = descr;
- pg_cache_descr = descr->pg_cache_descr;
- return pg_cache_descr->page;
-}
+ METRIC *metric = mrg_metric_get_and_acquire(main_mrg, dim_uuid, (Word_t) ctx);
+ if (unlikely(!metric))
+ return false;
-/* Gets a reference for the page */
-void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle)
-{
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
+ time_t update_every_s;
+ mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, &update_every_s);
- debug(D_RRDENGINE, "Reading existing page:");
- descr = pg_cache_lookup(ctx, NULL, id, point_in_time_ut);
- if (NULL == descr) {
- *handle = NULL;
-
- return NULL;
- }
- *handle = descr;
- pg_cache_descr = descr->pg_cache_descr;
+ mrg_metric_release(main_mrg, metric);
- return pg_cache_descr->page;
+ return true;
}
/*
@@ -977,62 +1044,126 @@ void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long
if (ctx == NULL)
return;
- struct page_cache *pg_cache = &ctx->pg_cache;
-
- array[0] = (uint64_t)ctx->stats.metric_API_producers;
- array[1] = (uint64_t)ctx->stats.metric_API_consumers;
- array[2] = (uint64_t)pg_cache->page_descriptors;
- array[3] = (uint64_t)pg_cache->populated_pages;
- array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages;
- array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
- array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
- array[7] = (uint64_t)ctx->stats.pg_cache_hits;
- array[8] = (uint64_t)ctx->stats.pg_cache_misses;
- array[9] = (uint64_t)ctx->stats.pg_cache_backfills;
- array[10] = (uint64_t)ctx->stats.pg_cache_evictions;
- array[11] = (uint64_t)ctx->stats.before_compress_bytes;
- array[12] = (uint64_t)ctx->stats.after_compress_bytes;
- array[13] = (uint64_t)ctx->stats.before_decompress_bytes;
- array[14] = (uint64_t)ctx->stats.after_decompress_bytes;
- array[15] = (uint64_t)ctx->stats.io_write_bytes;
- array[16] = (uint64_t)ctx->stats.io_write_requests;
- array[17] = (uint64_t)ctx->stats.io_read_bytes;
- array[18] = (uint64_t)ctx->stats.io_read_requests;
- array[19] = (uint64_t)ctx->stats.io_write_extent_bytes;
- array[20] = (uint64_t)ctx->stats.io_write_extents;
- array[21] = (uint64_t)ctx->stats.io_read_extent_bytes;
- array[22] = (uint64_t)ctx->stats.io_read_extents;
- array[23] = (uint64_t)ctx->stats.datafile_creations;
- array[24] = (uint64_t)ctx->stats.datafile_deletions;
- array[25] = (uint64_t)ctx->stats.journalfile_creations;
- array[26] = (uint64_t)ctx->stats.journalfile_deletions;
- array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
- array[28] = (uint64_t)ctx->stats.io_errors;
- array[29] = (uint64_t)ctx->stats.fs_errors;
- array[30] = (uint64_t)global_io_errors;
- array[31] = (uint64_t)global_fs_errors;
- array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
- array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events;
- array[34] = (uint64_t)global_pg_cache_over_half_dirty_events;
- array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions;
- array[36] = (uint64_t)global_flushing_pressure_page_deletions;
- fatal_assert(RRDENG_NR_STATS == 37);
+ array[0] = (uint64_t)__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED); // API producers
+ array[1] = (uint64_t)__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED); // API consumers
+ array[2] = 0;
+ array[3] = 0;
+ array[4] = 0;
+ array[5] = 0; // (uint64_t)ctx->stats.pg_cache_insertions;
+ array[6] = 0; // (uint64_t)ctx->stats.pg_cache_deletions;
+ array[7] = 0; // (uint64_t)ctx->stats.pg_cache_hits;
+ array[8] = 0; // (uint64_t)ctx->stats.pg_cache_misses;
+ array[9] = 0; // (uint64_t)ctx->stats.pg_cache_backfills;
+ array[10] = 0; // (uint64_t)ctx->stats.pg_cache_evictions;
+ array[11] = (uint64_t)__atomic_load_n(&ctx->stats.before_compress_bytes, __ATOMIC_RELAXED); // used
+ array[12] = (uint64_t)__atomic_load_n(&ctx->stats.after_compress_bytes, __ATOMIC_RELAXED); // used
+ array[13] = (uint64_t)__atomic_load_n(&ctx->stats.before_decompress_bytes, __ATOMIC_RELAXED);
+ array[14] = (uint64_t)__atomic_load_n(&ctx->stats.after_decompress_bytes, __ATOMIC_RELAXED);
+ array[15] = (uint64_t)__atomic_load_n(&ctx->stats.io_write_bytes, __ATOMIC_RELAXED); // used
+ array[16] = (uint64_t)__atomic_load_n(&ctx->stats.io_write_requests, __ATOMIC_RELAXED); // used
+ array[17] = (uint64_t)__atomic_load_n(&ctx->stats.io_read_bytes, __ATOMIC_RELAXED);
+ array[18] = (uint64_t)__atomic_load_n(&ctx->stats.io_read_requests, __ATOMIC_RELAXED); // used
+ array[19] = 0; // (uint64_t)__atomic_load_n(&ctx->stats.io_write_extent_bytes, __ATOMIC_RELAXED);
+ array[20] = 0; // (uint64_t)__atomic_load_n(&ctx->stats.io_write_extents, __ATOMIC_RELAXED);
+ array[21] = 0; // (uint64_t)__atomic_load_n(&ctx->stats.io_read_extent_bytes, __ATOMIC_RELAXED);
+ array[22] = 0; // (uint64_t)__atomic_load_n(&ctx->stats.io_read_extents, __ATOMIC_RELAXED);
+ array[23] = (uint64_t)__atomic_load_n(&ctx->stats.datafile_creations, __ATOMIC_RELAXED);
+ array[24] = (uint64_t)__atomic_load_n(&ctx->stats.datafile_deletions, __ATOMIC_RELAXED);
+ array[25] = (uint64_t)__atomic_load_n(&ctx->stats.journalfile_creations, __ATOMIC_RELAXED);
+ array[26] = (uint64_t)__atomic_load_n(&ctx->stats.journalfile_deletions, __ATOMIC_RELAXED);
+ array[27] = 0; // (uint64_t)__atomic_load_n(&ctx->stats.page_cache_descriptors, __ATOMIC_RELAXED);
+ array[28] = (uint64_t)__atomic_load_n(&ctx->stats.io_errors, __ATOMIC_RELAXED);
+ array[29] = (uint64_t)__atomic_load_n(&ctx->stats.fs_errors, __ATOMIC_RELAXED);
+ array[30] = (uint64_t)__atomic_load_n(&global_io_errors, __ATOMIC_RELAXED); // used
+ array[31] = (uint64_t)__atomic_load_n(&global_fs_errors, __ATOMIC_RELAXED); // used
+ array[32] = (uint64_t)__atomic_load_n(&rrdeng_reserved_file_descriptors, __ATOMIC_RELAXED); // used
+ array[33] = 0; // (uint64_t)__atomic_load_n(&ctx->stats.pg_cache_over_half_dirty_events, __ATOMIC_RELAXED);
+ array[34] = (uint64_t)__atomic_load_n(&global_pg_cache_over_half_dirty_events, __ATOMIC_RELAXED); // used
+ array[35] = 0; // (uint64_t)__atomic_load_n(&ctx->stats.flushing_pressure_page_deletions, __ATOMIC_RELAXED);
+ array[36] = (uint64_t)__atomic_load_n(&global_flushing_pressure_page_deletions, __ATOMIC_RELAXED); // used
+ array[37] = 0; //(uint64_t)pg_cache->active_descriptors;
+
+ fatal_assert(RRDENG_NR_STATS == 38);
}
-/* Releases reference to page */
-void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
-{
- (void)ctx;
- pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
+static void rrdeng_populate_mrg(struct rrdengine_instance *ctx) {
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ size_t datafiles = 0;
+ for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next)
+ datafiles++;
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ size_t cpus = get_netdata_cpus() / storage_tiers;
+ if(cpus > datafiles)
+ cpus = datafiles;
+
+ if(cpus < 1)
+ cpus = 1;
+
+ if(cpus > (size_t)libuv_worker_threads)
+ cpus = (size_t)libuv_worker_threads;
+
+ if(cpus > MRG_PARTITIONS)
+ cpus = MRG_PARTITIONS;
+
+ info("DBENGINE: populating retention to MRG from %zu journal files of tier %d, using %zu threads...", datafiles, ctx->config.tier, cpus);
+
+ if(datafiles > 2) {
+ struct rrdengine_datafile *datafile;
+
+ datafile = ctx->datafiles.first->prev;
+ if(!(datafile->journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE))
+ datafile = datafile->prev;
+
+ if(datafile->journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE) {
+ journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile);
+ datafile->populate_mrg.populated = true;
+ }
+
+ datafile = ctx->datafiles.first;
+ if(datafile->journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE) {
+ journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile);
+ datafile->populate_mrg.populated = true;
+ }
+ }
+
+ ctx->loading.populate_mrg.size = cpus;
+ ctx->loading.populate_mrg.array = callocz(ctx->loading.populate_mrg.size, sizeof(struct completion));
+
+ for (size_t i = 0; i < ctx->loading.populate_mrg.size; i++) {
+ completion_init(&ctx->loading.populate_mrg.array[i]);
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_POPULATE_MRG, NULL, &ctx->loading.populate_mrg.array[i],
+ STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
+ }
+}
+
+void rrdeng_readiness_wait(struct rrdengine_instance *ctx) {
+ for (size_t i = 0; i < ctx->loading.populate_mrg.size; i++) {
+ completion_wait_for(&ctx->loading.populate_mrg.array[i]);
+ completion_destroy(&ctx->loading.populate_mrg.array[i]);
+ }
+
+ freez(ctx->loading.populate_mrg.array);
+ ctx->loading.populate_mrg.array = NULL;
+ ctx->loading.populate_mrg.size = 0;
+
+ info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier);
+}
+
+bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ return ctx->config.legacy;
}
+void rrdeng_exit_mode(struct rrdengine_instance *ctx) {
+ __atomic_store_n(&ctx->quiesce.exit_mode, true, __ATOMIC_RELAXED);
+}
/*
* Returns 0 on success, negative on error
*/
-int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
unsigned disk_space_mb, size_t tier) {
struct rrdengine_instance *ctx;
- int error;
uint32_t max_open_files;
max_open_files = rlimit_nofile.rlim_cur / 4;
@@ -1053,182 +1184,185 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p
if(NULL == ctxp) {
ctx = multidb_ctx[tier];
memset(ctx, 0, sizeof(*ctx));
+ ctx->config.legacy = false;
}
else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
+ ctx->config.legacy = true;
}
- ctx->tier = tier;
- ctx->page_type = tier_page_type[tier];
- ctx->global_compress_alg = RRD_LZ4;
- if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
- page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
- ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE);
- /* try to keep 5% of the page cache free */
- ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100;
+
+ ctx->config.tier = (int)tier;
+ ctx->config.page_type = tier_page_type[tier];
+ ctx->config.global_compress_alg = RRD_LZ4;
if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
- ctx->max_disk_space = disk_space_mb * 1048576LLU;
- strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
- ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
- if (NULL == host)
- strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN);
- else
- strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN);
-
- ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
- ctx->metric_API_max_producers = 0;
- ctx->quiesce = NO_QUIESCE;
- ctx->host = host;
-
- memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
- ctx->worker_config.ctx = ctx;
- init_page_cache(ctx);
- init_commit_log(ctx);
- error = init_rrd_files(ctx);
- if (error) {
- goto error_after_init_rrd_files;
- }
+ ctx->config.max_disk_space = disk_space_mb * 1048576LLU;
+ strncpyz(ctx->config.dbfiles_path, dbfiles_path, sizeof(ctx->config.dbfiles_path) - 1);
+ ctx->config.dbfiles_path[sizeof(ctx->config.dbfiles_path) - 1] = '\0';
- completion_init(&ctx->rrdengine_completion);
- fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
- /* wait for worker thread to initialize */
- completion_wait_for(&ctx->rrdengine_completion);
- completion_destroy(&ctx->rrdengine_completion);
- uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER");
- if (ctx->worker_config.error) {
- goto error_after_rrdeng_worker;
- }
-// error = metalog_init(ctx);
-// if (error) {
-// error("Failed to initialize metadata log file event loop.");
-// goto error_after_rrdeng_worker;
-// }
+ ctx->atomic.transaction_id = 1;
+ ctx->quiesce.enabled = false;
- return 0;
+ if (rrdeng_dbengine_spawn(ctx) && !init_rrd_files(ctx)) {
+ // success - we run this ctx too
+ rrdeng_populate_mrg(ctx);
+ return 0;
+ }
-error_after_rrdeng_worker:
- finalize_rrd_files(ctx);
-error_after_init_rrd_files:
- free_page_cache(ctx);
- if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) {
+ if (ctx->config.legacy) {
freez(ctx);
if (ctxp)
*ctxp = NULL;
}
+
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return UV_EIO;
}
+size_t rrdeng_collectors_running(struct rrdengine_instance *ctx) {
+ return __atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED);
+}
+
/*
* Returns 0 on success, 1 on error
*/
-int rrdeng_exit(struct rrdengine_instance *ctx)
-{
- struct rrdeng_cmd cmd;
-
- if (NULL == ctx) {
+int rrdeng_exit(struct rrdengine_instance *ctx) {
+ if (NULL == ctx)
return 1;
+
+ // FIXME - ktsaou - properly cleanup ctx
+ // 1. make sure all collectors are stopped
+ // 2. make new queries will not be accepted (this is quiesce that has already run)
+ // 3. flush this section of the main cache
+ // 4. then wait for completion
+
+ bool logged = false;
+ while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && !unittest_running) {
+ if(!logged) {
+ info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier);
+ logged = true;
+ }
+ sleep_usec(100 * USEC_PER_MS);
}
- /* TODO: add page to page cache */
- cmd.opcode = RRDENG_SHUTDOWN;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
+ pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx);
- fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
+ info("DBENGINE: shutting down tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
+ struct completion completion = {};
+ completion_init(&completion);
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_SHUTDOWN, NULL, &completion, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL);
+ completion_wait_for(&completion);
+ completion_destroy(&completion);
finalize_rrd_files(ctx);
- //metalog_exit(ctx->metalog_ctx);
- free_page_cache(ctx);
- if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx))
+ if(ctx->config.legacy)
freez(ctx);
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
}
-void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
-{
- struct rrdeng_cmd cmd;
-
- if (NULL == ctx) {
+void rrdeng_prepare_exit(struct rrdengine_instance *ctx) {
+ if (NULL == ctx)
return;
- }
-
- completion_init(&ctx->rrdengine_completion);
- cmd.opcode = RRDENG_QUIESCE;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
- /* wait for dbengine to quiesce */
- completion_wait_for(&ctx->rrdengine_completion);
- completion_destroy(&ctx->rrdengine_completion);
+ // FIXME - ktsaou - properly cleanup ctx
+ // 1. make sure all collectors are stopped
- //metalog_prepare_exit(ctx->metalog_ctx);
+ completion_init(&ctx->quiesce.completion);
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_QUIESCE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
}
-RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
- RRDENG_SIZE_STATS stats = { 0 };
+static void populate_v2_statistics(struct rrdengine_datafile *datafile, RRDENG_SIZE_STATS *stats)
+{
+ struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0);
+ void *data_start = (void *)j2_header;
- for(struct pg_cache_page_index *page_index = ctx->pg_cache.metrics_index.last_page_index;
- page_index != NULL ;page_index = page_index->prev) {
- stats.metrics++;
- stats.metrics_pages += page_index->page_count;
+ if(unlikely(!j2_header))
+ return;
+
+ stats->extents += j2_header->extent_count;
+
+ unsigned entries;
+ struct journal_extent_list *extent_list = (void *) (data_start + j2_header->extent_offset);
+ for (entries = 0; entries < j2_header->extent_count; entries++) {
+ stats->extents_compressed_bytes += extent_list->datafile_size;
+ stats->extents_pages += extent_list->pages;
+ extent_list++;
}
- for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) {
- stats.datafiles++;
+ struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
+ time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
- for(struct extent_info *ei = df->extents.first; ei ; ei = ei->next) {
- stats.extents++;
- stats.extents_compressed_bytes += ei->size;
+ stats->metrics += j2_header->metric_count;
+ for (entries = 0; entries < j2_header->metric_count; entries++) {
- for(int p = 0; p < ei->number_of_pages ;p++) {
- struct rrdeng_page_descr *descr = ei->pages[p];
+ struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
+ stats->metrics_pages += metric_list_header->entries;
+ struct journal_page_list *descr = (void *) (data_start + metric->page_offset + sizeof(struct journal_page_header));
+ for (uint32_t idx=0; idx < metric_list_header->entries; idx++) {
- usec_t update_every_usec;
+ time_t update_every_s;
- size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
+ size_t points = descr->page_length / CTX_POINT_SIZE_BYTES(datafile->ctx);
- if(likely(points > 1))
- update_every_usec = (descr->end_time_ut - descr->start_time_ut) / (points - 1);
- else {
- update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC;
- stats.single_point_pages++;
- }
+ time_t start_time_s = journal_start_time_s + descr->delta_start_s;
+ time_t end_time_s = journal_start_time_s + descr->delta_end_s;
- time_t duration_secs = (time_t)((descr->end_time_ut - descr->start_time_ut + update_every_usec)/USEC_PER_SEC);
+ if(likely(points > 1))
+ update_every_s = (time_t) ((end_time_s - start_time_s) / (points - 1));
+ else {
+ update_every_s = (time_t) (default_rrd_update_every * get_tier_grouping(datafile->ctx->config.tier));
+ stats->single_point_pages++;
+ }
- stats.extents_pages++;
- stats.pages_uncompressed_bytes += descr->page_length;
- stats.pages_duration_secs += duration_secs;
- stats.points += points;
+ time_t duration_s = (time_t)((end_time_s - start_time_s + update_every_s));
- stats.page_types[descr->type].pages++;
- stats.page_types[descr->type].pages_uncompressed_bytes += descr->page_length;
- stats.page_types[descr->type].pages_duration_secs += duration_secs;
- stats.page_types[descr->type].points += points;
+ stats->pages_uncompressed_bytes += descr->page_length;
+ stats->pages_duration_secs += duration_s;
+ stats->points += points;
- if(!stats.first_t || (descr->start_time_ut - update_every_usec) < stats.first_t)
- stats.first_t = (descr->start_time_ut - update_every_usec) / USEC_PER_SEC;
+ stats->page_types[descr->type].pages++;
+ stats->page_types[descr->type].pages_uncompressed_bytes += descr->page_length;
+ stats->page_types[descr->type].pages_duration_secs += duration_s;
+ stats->page_types[descr->type].points += points;
- if(!stats.last_t || descr->end_time_ut > stats.last_t)
- stats.last_t = descr->end_time_ut / USEC_PER_SEC;
- }
+ if(!stats->first_time_s || (start_time_s - update_every_s) < stats->first_time_s)
+ stats->first_time_s = (start_time_s - update_every_s);
+
+ if(!stats->last_time_s || end_time_s > stats->last_time_s)
+ stats->last_time_s = end_time_s;
+
+ descr++;
}
+ metric++;
}
+ journalfile_v2_data_release(datafile->journalfile);
+}
+
+RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
+ RRDENG_SIZE_STATS stats = { 0 };
- stats.currently_collected_metrics = ctx->stats.metric_API_producers;
- stats.max_concurrently_collected_metrics = ctx->metric_API_max_producers;
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) {
+ stats.datafiles++;
+ populate_v2_statistics(df, &stats);
+ }
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ stats.currently_collected_metrics = __atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED);
internal_error(stats.metrics_pages != stats.extents_pages + stats.currently_collected_metrics,
"DBENGINE: metrics pages is %zu, but extents pages is %zu and API consumers is %zu",
stats.metrics_pages, stats.extents_pages, stats.currently_collected_metrics);
- stats.disk_space = ctx->disk_space;
- stats.max_disk_space = ctx->max_disk_space;
+ stats.disk_space = ctx_current_disk_space_get(ctx);
+ stats.max_disk_space = ctx->config.max_disk_space;
- stats.database_retention_secs = (time_t)(stats.last_t - stats.first_t);
+ stats.database_retention_secs = (time_t)(stats.last_time_s - stats.first_time_s);
if(stats.extents_pages)
stats.average_page_size_bytes = (double)stats.pages_uncompressed_bytes / (double)stats.extents_pages;
@@ -1252,21 +1386,22 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
}
}
- stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index) + sizeof(struct pg_alignment));
- stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr));
+// stats.sizeof_metric = 0;
stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile));
- stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr));
- stats.sizeof_point_data = page_type_size[ctx->page_type];
- stats.sizeof_page_data = RRDENG_BLOCK_SIZE;
+ stats.sizeof_page_in_cache = 0; // struct_natural_alignment(sizeof(struct page_cache_descr));
+ stats.sizeof_point_data = page_type_size[ctx->config.page_type];
+ stats.sizeof_page_data = tier_page_size[ctx->config.tier];
stats.pages_per_extent = rrdeng_pages_per_extent;
- stats.sizeof_extent = sizeof(struct extent_info);
- stats.sizeof_page_in_extent = sizeof(struct rrdeng_page_descr *);
-
- stats.sizeof_metric_in_index = 40;
- stats.sizeof_page_in_index = 24;
+// stats.sizeof_metric_in_index = 40;
+// stats.sizeof_page_in_index = 24;
- stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->tier);
+ stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->config.tier);
return stats;
}
+
+struct rrdeng_cache_efficiency_stats rrdeng_get_cache_efficiency_stats(void) {
+ // FIXME - make cache efficiency stats atomic
+ return rrdeng_cache_efficiency_stats;
+}