From 03bf87dcb06f7021bfb2df2fa8691593c6148aff Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 30 Nov 2022 19:47:00 +0100 Subject: Adding upstream version 1.37.0. Signed-off-by: Daniel Baumann --- database/engine/rrdengineapi.c | 606 +++++++++++++++++++++++++++-------------- 1 file changed, 394 insertions(+), 212 deletions(-) (limited to 'database/engine/rrdengineapi.c') diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index f4da29407..27503baee 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" +#include "../storage_engine.h" /* Default global database instance */ struct rrdengine_instance multidb_ctx_storage_tier0; @@ -35,14 +36,31 @@ int default_multidb_disk_quota_mb = 256; /* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; -static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host, int tier) { - if(tier < 0 || tier >= RRD_STORAGE_TIERS) tier = 0; - if(!host->storage_instance[tier]) tier = 0; - return (struct rrdengine_instance *)host->storage_instance[tier]; +// ---------------------------------------------------------------------------- +// metrics groups + +STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) { + return callocz(1, sizeof(struct pg_alignment)); +} + +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { + if(!smg) return; + + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct pg_alignment *pa = (struct pg_alignment *)smg; + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + if(pa->refcount == 0) + freez(pa); + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); } +// ---------------------------------------------------------------------------- +// metric handle for legacy dbs + /* This UUID is not unique across hosts */ -void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid) +void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid) { EVP_MD_CTX *evpctx; unsigned char hash_value[EVP_MAX_MD_SIZE]; @@ -75,98 +93,136 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu memcpy(ret_uuid, hash_value, sizeof(uuid_t)); } -struct rrdeng_metric_handle { - RRDDIM *rd; - struct rrdengine_instance *ctx; - uuid_t *rrdeng_uuid; // database engine metric UUID - struct pg_cache_page_index *page_index; -}; +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) { + uuid_t legacy_uuid; + rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid); + return rrdeng_metric_get(db_instance, &legacy_uuid, smg); +} -void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle) { - freez(db_metric_handle); +// ---------------------------------------------------------------------------- +// metric handle + +void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + + unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + if(refcount == 0 && page_index->alignment) { + __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST); + page_index->alignment = NULL; + } } -STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { +STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + return db_metric_handle; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; - struct page_cache *pg_cache; - uuid_t legacy_uuid; - uuid_t multihost_legacy_uuid; - Pvoid_t *PValue; + struct pg_alignment *pa = (struct pg_alignment *)smg; + struct page_cache *pg_cache = &ctx->pg_cache; struct pg_cache_page_index *page_index = NULL; - int is_multihost_child = 0; - RRDHOST *host = rd->rrdset->rrdhost; - - pg_cache = &ctx->pg_cache; - - rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid); - if (host != localhost && is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) - is_multihost_child = 1; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t)); - if (likely(NULL != PValue)) { + Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) page_index = *PValue; - } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (is_multihost_child || NULL == PValue) { - /* First time we see the legacy UUID or metric belongs to child host in multi-host DB. - * Drop legacy support, normal path */ - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - uv_rwlock_wrlock(&pg_cache->metrics_index.lock); - PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0); - fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(&rd->metric_uuid); - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; - uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + + if (likely(page_index)) { + __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + + if(pa) { + if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0) + fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).", + page_index->refcount, page_index->writers); + + page_index->alignment = pa; + __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST); } - } else { - /* There are legacy UUIDs in the database, implement backward compatibility */ + } - rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid, - &multihost_legacy_uuid); + return (STORAGE_METRIC_HANDLE *)page_index; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!db_instance, "DBENGINE: db_instance is NULL"); - int need_to_store = uuid_compare(rd->metric_uuid, multihost_legacy_uuid); + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct pg_alignment *pa = (struct pg_alignment *)smg; + struct pg_cache_page_index *page_index; + struct page_cache *pg_cache = &ctx->pg_cache; - uuid_copy(rd->metric_uuid, multihost_legacy_uuid); + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + Pvoid_t *PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(uuid, ctx); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + page_index->alignment = pa; + page_index->refcount = 1; + if(pa) + pa->refcount++; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + + return (STORAGE_METRIC_HANDLE *)page_index; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { + STORAGE_METRIC_HANDLE *db_metric_handle; + + db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg); + if(!db_metric_handle) { + db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg); + if(db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + uuid_copy(rd->metric_uuid, page_index->id); + } + } + if(!db_metric_handle) + db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg); - if (unlikely(need_to_store && !ctx->tier)) - (void)sql_store_dimension(&rd->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, - rd->algorithm); +#ifdef NETDATA_INTERNAL_CHECKS + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + if(uuid_compare(rd->metric_uuid, page_index->id) != 0) { + char uuid1[UUID_STR_LEN + 1]; + char uuid2[UUID_STR_LEN + 1]; + + uuid_unparse(rd->metric_uuid, uuid1); + uuid_unparse(page_index->id, uuid2); + fatal("DBENGINE: uuids do not match, asked for metric '%s', but got page_index of metric '%s'", uuid1, uuid2); } - struct rrdeng_metric_handle *mh = mallocz(sizeof(struct rrdeng_metric_handle)); - mh->rd = rd; - mh->ctx = ctx; - mh->rrdeng_uuid = &page_index->id; - mh->page_index = page_index; - return (STORAGE_METRIC_HANDLE *)mh; + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + if(page_index->ctx != ctx) + fatal("DBENGINE: mixed up rrdengine instances, asked for metric from %p, got from %p", ctx, page_index->ctx); +#endif + + return db_metric_handle; } + +// ---------------------------------------------------------------------------- +// collect ops + /* * Gets a handle for storing metrics to the database. * The handle must be released with rrdeng_store_metric_final(). */ -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; struct rrdeng_collect_handle *handle; - struct pg_cache_page_index *page_index; + + if(!page_index->alignment) + fatal("DBENGINE: metric group is required for collect operations"); handle = callocz(1, sizeof(struct rrdeng_collect_handle)); - handle->metric_handle = metric_handle; - handle->ctx = metric_handle->ctx; + handle->page_index = page_index; handle->descr = NULL; handle->unaligned_page = 0; + page_index->latest_update_every_s = update_every; - page_index = metric_handle->page_index; uv_rwlock_wrlock(&page_index->lock); ++page_index->writers; uv_rwlock_wrunlock(&page_index->lock); @@ -214,7 +270,7 @@ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; - struct rrdengine_instance *ctx = handle->ctx; + struct rrdengine_instance *ctx = handle->page_index->ctx; struct rrdeng_page_descr *descr = handle->descr; if (unlikely(!ctx)) return; @@ -227,9 +283,7 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h page_is_empty = page_has_only_empty_metrics(descr); if (page_is_empty) { - debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "Page has empty metrics only, deleting", true); pg_cache_put(ctx, descr); pg_cache_punch_hole(ctx, descr, 1, 0, NULL); } else @@ -242,8 +296,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h handle->descr = NULL; } -void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, - usec_t point_in_time, +static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time_ut, NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, @@ -252,11 +306,10 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, SN_FLAGS flags) { struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; - struct rrdengine_instance *ctx = handle->ctx; + struct pg_cache_page_index *page_index = handle->page_index; + struct rrdengine_instance *ctx = handle->page_index->ctx; struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = handle->descr; - RRDDIM *rd = metric_handle->rd; void *page; uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; @@ -264,21 +317,33 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, if (descr) { /* Make alignment decisions */ - if (descr->page_length == rd->rrdset->rrddim_page_alignment) { +#ifdef NETDATA_INTERNAL_CHECKS + if(descr->end_time_ut + page_index->latest_update_every_s * USEC_PER_SEC != point_in_time_ut) { + char buffer[200 + 1]; + snprintfz(buffer, 200, + "metrics collected are %s, end_time_ut = %llu, point_in_time_ut = %llu, update_every = %u, delta = %llu", + (point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC > page_index->latest_update_every_s)?"far apart":"not aligned", + descr->end_time_ut / USEC_PER_SEC, + point_in_time_ut / USEC_PER_SEC, + page_index->latest_update_every_s, + point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC); + print_page_cache_descr(descr, buffer, false); + } +#endif + + if (descr->page_length == page_index->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } /* is the metric far enough out of alignment with the others? */ - if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < rd->rrdset->rrddim_page_alignment)) { + if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) { handle->unaligned_page = 1; - debug(D_RRDENGINE, "Metric page is not aligned with chart:"); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "Metric page is not aligned with chart", true); } if (unlikely(handle->unaligned_page && /* did the other metrics change page? */ - rd->rrdset->rrddim_page_alignment <= PAGE_POINT_SIZE_BYTES(descr))) { - debug(D_RRDENGINE, "Flushing unaligned metric page."); + page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) { + print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true); must_flush_unaligned_page = 1; handle->unaligned_page = 0; } @@ -286,16 +351,21 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, if (unlikely(NULL == descr || descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE || must_flush_unaligned_page)) { - rrdeng_store_metric_flush_current_page(collection_handle); - page = rrdeng_create_page(ctx, &metric_handle->page_index->id, &descr); + if(descr) { + print_page_cache_descr(descr, "flushing metric", true); + rrdeng_store_metric_flush_current_page(collection_handle); + } + + page = rrdeng_create_page(ctx, &page_index->id, &descr); fatal_assert(page); + descr->update_every_s = page_index->latest_update_every_s; handle->descr = descr; handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); - if (0 == rd->rrdset->rrddim_page_alignment) { + if (0 == page_index->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } @@ -330,13 +400,13 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, break; } - pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); + pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); if (perfect_page_alignment) - rd->rrdset->rrddim_page_alignment = descr->page_length; - if (unlikely(INVALID_TIME == descr->start_time)) { + page_index->alignment->page_length = descr->page_length; + if (unlikely(INVALID_TIME == descr->start_time_ut)) { unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; - descr->start_time = point_in_time; + descr->start_time_ut = point_in_time_ut; new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1); while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) { @@ -350,20 +420,111 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, } } - pg_cache_insert(ctx, metric_handle->page_index, descr); + pg_cache_insert(ctx, page_index, descr); } else { - pg_cache_add_new_metric_time(metric_handle->page_index, descr); + pg_cache_add_new_metric_time(page_index, descr); } + +// { +// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 }; +// if(uuid_compare(u, page_index->id) == 0) { +// char buffer[100]; +// snprintfz(buffer, 100, "store system.cpu, collect:%u, page_index first:%u, last:%u", +// (uint32_t)(point_in_time / USEC_PER_SEC), +// (uint32_t)(page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// } +// } } +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time_ut, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags) +{ + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + struct rrdeng_page_descr *descr = handle->descr; + + if(likely(descr)) { + usec_t last_point_in_time_ut = descr->end_time_ut; + usec_t update_every_ut = page_index->latest_update_every_s * USEC_PER_SEC; + size_t points_gap = (point_in_time_ut <= last_point_in_time_ut) ? + (size_t)0 : + (size_t)((point_in_time_ut - last_point_in_time_ut) / update_every_ut); + + if(unlikely(points_gap != 1)) { + if (unlikely(points_gap <= 0)) { + time_t now = now_realtime_sec(); + static __thread size_t counter = 0; + static __thread time_t last_time_logged = 0; + counter++; + + if(now - last_time_logged > 600) { + error("DBENGINE: collected point is in the past (repeated %zu times in the last %zu secs). Ignoring these data collection points.", + counter, (size_t)(last_time_logged?(now - last_time_logged):0)); + + last_time_logged = now; + counter = 0; + } + return; + } + + size_t point_size = PAGE_POINT_SIZE_BYTES(descr); + size_t page_size_in_points = RRDENG_BLOCK_SIZE / point_size; + size_t used_points = descr->page_length / point_size; + size_t remaining_points_in_page = page_size_in_points - used_points; + + bool new_point_is_aligned = true; + if(unlikely((point_in_time_ut - last_point_in_time_ut) / points_gap != update_every_ut)) + new_point_is_aligned = false; + + if(unlikely(points_gap > remaining_points_in_page || !new_point_is_aligned)) { +// char buffer[200]; +// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Cutting page.", +// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s); +// print_page_cache_descr(descr, buffer, false); + + rrdeng_store_metric_flush_current_page(collection_handle); + } + else { +// char buffer[200]; +// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Filling the gap.", +// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s); +// print_page_cache_descr(descr, buffer, false); + + // loop to fill the gap + usec_t step_ut = page_index->latest_update_every_s * USEC_PER_SEC; + usec_t last_point_filled_ut = last_point_in_time_ut + step_ut; + + while (last_point_filled_ut < point_in_time_ut) { + rrdeng_store_metric_next_internal( + collection_handle, last_point_filled_ut, NAN, NAN, NAN, + 1, 0, SN_EMPTY_SLOT); + + last_point_filled_ut += step_ut; + } + } + } + } + + rrdeng_store_metric_next_internal(collection_handle, point_in_time_ut, n, min_value, max_value, count, anomaly_count, flags); +} + + /* * Releases the database reference from the handle for storing metrics. * Returns 1 if it's safe to delete the dimension. */ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; - struct pg_cache_page_index *page_index = metric_handle->page_index; + struct pg_cache_page_index *page_index = handle->page_index; uint8_t can_delete_metric = 0; @@ -378,6 +539,18 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { return can_delete_metric; } +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + rrdeng_store_metric_flush_current_page(collection_handle); + uv_rwlock_rdlock(&page_index->lock); + page_index->latest_update_every_s = update_every; + uv_rwlock_rdunlock(&page_index->lock); +} + +// ---------------------------------------------------------------------------- +// query ops + //static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) //{ // return (uint32_t *)&page_info->scratch[0]; @@ -392,49 +565,45 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). */ -void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type) +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, time_t start_time_s, time_t end_time_s) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - struct rrdengine_instance *ctx = metric_handle->ctx; - RRDDIM *rd = metric_handle->rd; + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + struct rrdengine_instance *ctx = page_index->ctx; // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time); struct rrdeng_query_handle *handle; unsigned pages_nr; - rrdimm_handle->start_time = start_time; - rrdimm_handle->end_time = end_time; + if(!page_index->latest_update_every_s) + page_index->latest_update_every_s = default_rrd_update_every; + + rrdimm_handle->start_time_s = start_time_s; + rrdimm_handle->end_time_s = end_time_s; handle = callocz(1, sizeof(struct rrdeng_query_handle)); - handle->next_page_time = start_time; - handle->now = start_time; - handle->tier_query_fetch_type = tier_query_fetch_type; - // TODO we should store the dt of each page in each page - // this will produce wrong values for dt in case the user changes - // the update every of the charts or the tier grouping iterations - handle->dt_sec = get_tier_grouping(ctx->tier) * (time_t)rd->update_every; - handle->dt = handle->dt_sec * USEC_PER_SEC; + handle->wanted_start_time_s = start_time_s; + handle->now_s = start_time_s; handle->position = 0; handle->ctx = ctx; - handle->metric_handle = metric_handle; handle->descr = NULL; + handle->dt_s = page_index->latest_update_every_s; rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; - pages_nr = pg_cache_preload(ctx, metric_handle->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + pages_nr = pg_cache_preload(ctx, &page_index->id, start_time_s * USEC_PER_SEC, end_time_s * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) // there are no metrics to load - handle->next_page_time = INVALID_TIME; + handle->wanted_start_time_s = INVALID_TIME; } -static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { +static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_handle, bool debug_this __maybe_unused) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; struct rrdengine_instance *ctx = handle->ctx; struct rrdeng_page_descr *descr = handle->descr; uint32_t page_length; - usec_t page_end_time; + usec_t page_end_time_ut; unsigned position; if (likely(descr)) { @@ -446,14 +615,15 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { pg_cache_put(ctx, descr); handle->descr = NULL; - handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1; + handle->wanted_start_time_s = (time_t)((handle->page_end_time_ut / USEC_PER_SEC) + handle->dt_s); - if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) + if (unlikely(handle->wanted_start_time_s > rrdimm_handle->end_time_s)) return 1; } - usec_t next_page_time = handle->next_page_time * USEC_PER_SEC; - descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); + usec_t wanted_start_time_ut = handle->wanted_start_time_s * USEC_PER_SEC; + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, + wanted_start_time_ut, rrdimm_handle->end_time_s * USEC_PER_SEC); if (NULL == descr) return 1; @@ -462,77 +632,116 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { #endif handle->descr = descr; - pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); - if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time)) + pg_cache_atomic_get_pg_info(descr, &page_end_time_ut, &page_length); + if (unlikely(INVALID_TIME == descr->start_time_ut || INVALID_TIME == page_end_time_ut || 0 == descr->update_every_s)) { + error("DBENGINE: discarding invalid page descriptor (start_time = %llu, end_time = %llu, update_every_s = %d)", + descr->start_time_ut, page_end_time_ut, descr->update_every_s); return 1; + } - if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { + if (unlikely(descr->start_time_ut != page_end_time_ut && wanted_start_time_ut > descr->start_time_ut)) { // we're in the middle of the page somewhere unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr); - position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / - (page_end_time - descr->start_time); + position = ((uint64_t)(wanted_start_time_ut - descr->start_time_ut)) * (entries - 1) / + (page_end_time_ut - descr->start_time_ut); } else position = 0; - handle->page_end_time = page_end_time; + handle->page_end_time_ut = page_end_time_ut; handle->page_length = page_length; + handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr); handle->page = descr->pg_cache_descr->page; - usec_t entries = handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr); - if (likely(entries > 1)) - handle->dt = (page_end_time - descr->start_time) / (entries - 1); - else { - // TODO we should store the dt of each page in each page - // now we keep the dt of whatever was before - ; - } - - handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC); + handle->dt_s = descr->update_every_s; handle->position = position; +// if(debug_this) +// info("DBENGINE: rrdeng_load_page_next(), " +// "position:%d, " +// "start_time_ut:%llu, " +// "page_end_time_ut:%llu, " +// "next_page_time_ut:%llu, " +// "in_out:%s" +// , position +// , descr->start_time_ut +// , page_end_time_ut +// , +// wanted_start_time_ut, in_out?"true":"false" +// ); + return 0; } // Returns the metric and sets its timestamp into current_time // IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) // IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES -STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; // struct rrdeng_metric_handle *metric_handle = handle->metric_handle; - STORAGE_POINT sp; struct rrdeng_page_descr *descr = handle->descr; + time_t now = handle->now_s + handle->dt_s; + +// bool debug_this = false; +// { +// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 }; +// if(uuid_compare(u, handle->page_index->id) == 0) { +// char buffer[100]; +// snprintfz(buffer, 100, "load system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u", +// (uint32_t)(now), +// (uint32_t)(handle->dt_s), +// (uint32_t)(handle->position), +// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// debug_this = true; +// } +// } + + STORAGE_POINT sp; unsigned position = handle->position + 1; - time_t now = handle->now + handle->dt_sec; storage_number_tier1_t tier1_value; - if (unlikely(INVALID_TIME == handle->next_page_time)) { - handle->next_page_time = INVALID_TIME; - handle->now = now; - storage_point_empty(sp, now - handle->dt_sec, now); + if (unlikely(INVALID_TIME == handle->wanted_start_time_s)) { + handle->wanted_start_time_s = INVALID_TIME; + handle->now_s = now; + storage_point_empty(sp, now - handle->dt_s, now); return sp; } if (unlikely(!descr || position >= handle->entries)) { // We need to get a new page - if(rrdeng_load_page_next(rrdimm_handle)) { + if(rrdeng_load_page_next(rrddim_handle, false)) { // next calls will not load any more metrics - handle->next_page_time = INVALID_TIME; - handle->now = now; - storage_point_empty(sp, now - handle->dt_sec, now); + handle->wanted_start_time_s = INVALID_TIME; + handle->now_s = now; + storage_point_empty(sp, now - handle->dt_s, now); return sp; } descr = handle->descr; position = handle->position; - now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC); + now = (time_t)((descr->start_time_ut / USEC_PER_SEC) + position * descr->update_every_s); + +// if(debug_this) { +// char buffer[100]; +// snprintfz(buffer, 100, "NEW PAGE system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u", +// (uint32_t)(now), +// (uint32_t)(handle->dt_s), +// (uint32_t)(handle->position), +// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// } } - sp.start_time = now - handle->dt_sec; + sp.start_time = now - handle->dt_s; sp.end_time = now; handle->position = position; - handle->now = now; + handle->now_s = now; switch(descr->type) { case PAGE_METRICS: { @@ -567,24 +776,32 @@ STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) break; } - if (unlikely(now >= rrdimm_handle->end_time)) { + if (unlikely(now >= rrddim_handle->end_time_s)) { // next calls will not load any more metrics - handle->next_page_time = INVALID_TIME; + handle->wanted_start_time_s = INVALID_TIME; } +// if(debug_this) +// info("DBENGINE: returning point: " +// "time from %ld to %ld // query from %ld to %ld // wanted_start_time_s %ld" +// , sp.start_time, sp.end_time +// , rrddim_handle->start_time_s, rrddim_handle->end_time_s +// , handle->wanted_start_time_s +// ); + return sp; } -int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; - return (INVALID_TIME == handle->next_page_time); + return (INVALID_TIME == handle->wanted_start_time_s); } /* * Releases the database reference from the handle for loading metrics. */ -void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; struct rrdengine_instance *ctx = handle->ctx; @@ -603,46 +820,12 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) } time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - - struct pg_cache_page_index *page_index = metric_handle->page_index; - return page_index->latest_time / USEC_PER_SEC; + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + return (time_t)(page_index->latest_time_ut / USEC_PER_SEC); } time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - - struct pg_cache_page_index *page_index = metric_handle->page_index; - return page_index->oldest_time / USEC_PER_SEC; -} - -int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier) -{ - struct page_cache *pg_cache; - struct rrdengine_instance *ctx; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - - ctx = get_rrdeng_ctx_from_host(localhost, tier); - if (unlikely(!ctx)) { - error("Failed to fetch multidb context"); - return 1; - } - pg_cache = &ctx->pg_cache; - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - - if (likely(page_index)) { - *first_entry_t = page_index->oldest_time / USEC_PER_SEC; - *last_entry_t = page_index->latest_time / USEC_PER_SEC; - return 0; - } - - return 1; + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + return (time_t)(page_index->oldest_time_ut / USEC_PER_SEC); } int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t) @@ -667,8 +850,8 @@ int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); if (likely(page_index)) { - *first_entry_t = page_index->oldest_time / USEC_PER_SEC; - *last_entry_t = page_index->latest_time / USEC_PER_SEC; + *first_entry_t = page_index->oldest_time_ut / USEC_PER_SEC; + *last_entry_t = page_index->latest_time_ut / USEC_PER_SEC; return 0; } @@ -695,7 +878,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde debug(D_RRDENGINE, "Created new page:"); if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); rrdeng_page_descr_mutex_unlock(ctx, descr); *ret_descr = descr; return page; @@ -767,13 +950,13 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void ** } /* Gets a reference for the page */ -void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle) +void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle) { struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; debug(D_RRDENGINE, "Reading existing page:"); - descr = pg_cache_lookup(ctx, NULL, id, point_in_time); + descr = pg_cache_lookup(ctx, NULL, id, point_in_time_ut); if (NULL == descr) { *handle = NULL; @@ -849,7 +1032,7 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) * Returns 0 on success, negative on error */ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, - unsigned disk_space_mb, int tier) { + unsigned disk_space_mb, size_t tier) { struct rrdengine_instance *ctx; int error; uint32_t max_open_files; @@ -897,7 +1080,6 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure; ctx->metric_API_max_producers = 0; ctx->quiesce = NO_QUIESCE; - ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */ ctx->host = host; memset(&ctx->worker_config, 0, sizeof(ctx->worker_config)); @@ -918,11 +1100,11 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p if (ctx->worker_config.error) { goto error_after_rrdeng_worker; } - error = metalog_init(ctx); - if (error) { - error("Failed to initialize metadata log file event loop."); - goto error_after_rrdeng_worker; - } +// error = metalog_init(ctx); +// if (error) { +// error("Failed to initialize metadata log file event loop."); +// goto error_after_rrdeng_worker; +// } return 0; @@ -1010,13 +1192,13 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); if(likely(points > 1)) - update_every_usec = (descr->end_time - descr->start_time) / (points - 1); + update_every_usec = (descr->end_time_ut - descr->start_time_ut) / (points - 1); else { update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC; stats.single_point_pages++; } - time_t duration_secs = (time_t)((descr->end_time - descr->start_time + update_every_usec)/USEC_PER_SEC); + time_t duration_secs = (time_t)((descr->end_time_ut - descr->start_time_ut + update_every_usec)/USEC_PER_SEC); stats.extents_pages++; stats.pages_uncompressed_bytes += descr->page_length; @@ -1028,11 +1210,11 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { stats.page_types[descr->type].pages_duration_secs += duration_secs; stats.page_types[descr->type].points += points; - if(!stats.first_t || (descr->start_time - update_every_usec) < stats.first_t) - stats.first_t = (descr->start_time - update_every_usec) / USEC_PER_SEC; + if(!stats.first_t || (descr->start_time_ut - update_every_usec) < stats.first_t) + stats.first_t = (descr->start_time_ut - update_every_usec) / USEC_PER_SEC; - if(!stats.last_t || descr->end_time > stats.last_t) - stats.last_t = descr->end_time / USEC_PER_SEC; + if(!stats.last_t || descr->end_time_ut > stats.last_t) + stats.last_t = descr->end_time_ut / USEC_PER_SEC; } } } @@ -1072,7 +1254,7 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { } } - stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index)); + stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index) + sizeof(struct pg_alignment)); stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr)); stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile)); stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr)); -- cgit v1.2.3