diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-08-12 07:26:11 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-08-12 07:26:11 +0000 |
commit | 3c315f0fff93aa072472abc10815963ac0035268 (patch) | |
tree | a95f6a96e0e7bd139c010f8dc60b40e5b3062a99 /database/engine/rrdengineapi.c | |
parent | Adding upstream version 1.35.1. (diff) | |
download | netdata-3c315f0fff93aa072472abc10815963ac0035268.tar.xz netdata-3c315f0fff93aa072472abc10815963ac0035268.zip |
Adding upstream version 1.36.0.upstream/1.36.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-x | database/engine/rrdengineapi.c | 766 |
1 files changed, 411 insertions, 355 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 76010a7c2..f4da29407 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -2,17 +2,43 @@ #include "rrdengine.h" /* Default global database instance */ -struct rrdengine_instance multidb_ctx; +struct rrdengine_instance multidb_ctx_storage_tier0; +struct rrdengine_instance multidb_ctx_storage_tier1; +struct rrdengine_instance multidb_ctx_storage_tier2; +struct rrdengine_instance multidb_ctx_storage_tier3; +struct rrdengine_instance multidb_ctx_storage_tier4; +#if RRD_STORAGE_TIERS != 5 +#error RRD_STORAGE_TIERS is not 5 - you need to add allocations here +#endif +struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS]; +uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER}; + +#if PAGE_TYPE_MAX != 1 +#error PAGE_TYPE_MAX is not 1 - you need to add allocations here +#endif +size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)}; + +__attribute__((constructor)) void initialize_multidb_ctx(void) { + multidb_ctx[0] = &multidb_ctx_storage_tier0; + multidb_ctx[1] = &multidb_ctx_storage_tier1; + multidb_ctx[2] = &multidb_ctx_storage_tier2; + multidb_ctx[3] = &multidb_ctx_storage_tier3; + multidb_ctx[4] = &multidb_ctx_storage_tier4; +} +int db_engine_use_malloc = 0; +int default_rrdeng_page_fetch_timeout = 3; +int default_rrdeng_page_fetch_retries = 3; int default_rrdeng_page_cache_mb = 32; int default_rrdeng_disk_quota_mb = 256; int default_multidb_disk_quota_mb = 256; /* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; -static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host) -{ - return host->rrdeng_ctx; +static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host, int tier) { + if(tier < 0 || tier >= RRD_STORAGE_TIERS) tier = 0; + if(!host->storage_instance[tier]) tier = 0; + return (struct rrdengine_instance *)host->storage_instance[tier]; } /* This UUID is not unique across hosts */ @@ -49,10 +75,20 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu memcpy(ret_uuid, hash_value, sizeof(uuid_t)); } -void rrdeng_metric_init(RRDDIM *rd) -{ - struct page_cache *pg_cache; +struct rrdeng_metric_handle { + RRDDIM *rd; struct rrdengine_instance *ctx; + uuid_t *rrdeng_uuid; // database engine metric UUID + struct pg_cache_page_index *page_index; +}; + +void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle) { + freez(db_metric_handle); +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct page_cache *pg_cache; uuid_t legacy_uuid; uuid_t multihost_legacy_uuid; Pvoid_t *PValue; @@ -60,15 +96,10 @@ void rrdeng_metric_init(RRDDIM *rd) int is_multihost_child = 0; RRDHOST *host = rd->rrdset->rrdhost; - ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); - if (unlikely(!ctx)) { - error("Failed to fetch multidb context"); - return; - } pg_cache = &ctx->pg_cache; rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid); - if (host != localhost && host->rrdeng_ctx == &multidb_ctx) + if (host != localhost && is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) is_multihost_child = 1; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); @@ -82,16 +113,16 @@ void rrdeng_metric_init(RRDDIM *rd) * Drop legacy support, normal path */ uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->state->metric_uuid, sizeof(uuid_t)); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t)); if (likely(NULL != PValue)) { page_index = *PValue; } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); if (NULL == PValue) { uv_rwlock_wrlock(&pg_cache->metrics_index.lock); - PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->state->metric_uuid, sizeof(uuid_t), PJE0); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0); fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(&rd->state->metric_uuid); + *PValue = page_index = create_page_index(&rd->metric_uuid); page_index->prev = pg_cache->metrics_index.last_page_index; pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); @@ -102,84 +133,98 @@ void rrdeng_metric_init(RRDDIM *rd) rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid, &multihost_legacy_uuid); - int need_to_store = uuid_compare(rd->state->metric_uuid, multihost_legacy_uuid); + int need_to_store = uuid_compare(rd->metric_uuid, multihost_legacy_uuid); - uuid_copy(rd->state->metric_uuid, multihost_legacy_uuid); + uuid_copy(rd->metric_uuid, multihost_legacy_uuid); - if (unlikely(need_to_store)) - (void)sql_store_dimension(&rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, + if (unlikely(need_to_store && !ctx->tier)) + (void)sql_store_dimension(&rd->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, rd->algorithm); - } - rd->state->rrdeng_uuid = &page_index->id; - rd->state->page_index = page_index; + + struct rrdeng_metric_handle *mh = mallocz(sizeof(struct rrdeng_metric_handle)); + mh->rd = rd; + mh->ctx = ctx; + mh->rrdeng_uuid = &page_index->id; + mh->page_index = page_index; + return (STORAGE_METRIC_HANDLE *)mh; } /* * Gets a handle for storing metrics to the database. * The handle must be released with rrdeng_store_metric_final(). */ -void rrdeng_store_metric_init(RRDDIM *rd) -{ +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct rrdeng_collect_handle *handle; - struct rrdengine_instance *ctx; struct pg_cache_page_index *page_index; - ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); - handle = callocz(1, sizeof(struct rrdeng_collect_handle)); - handle->ctx = ctx; + handle->metric_handle = metric_handle; + handle->ctx = metric_handle->ctx; handle->descr = NULL; - handle->prev_descr = NULL; handle->unaligned_page = 0; - rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle; - page_index = rd->state->page_index; + page_index = metric_handle->page_index; uv_rwlock_wrlock(&page_index->lock); ++page_index->writers; uv_rwlock_wrunlock(&page_index->lock); + + return (STORAGE_COLLECT_HANDLE *)handle; } /* The page must be populated and referenced */ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) { - unsigned i; - uint8_t has_only_empty_metrics = 1; - storage_number *page; + switch(descr->type) { + case PAGE_METRICS: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number *array = (storage_number *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(does_storage_number_exist(array[i])) + return 0; + } + } + break; + + case PAGE_TIER: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(fpclassify(array[i].sum_value) != FP_NAN) + return 0; + } + } + break; - page = descr->pg_cache_descr->page; - for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) { - if (SN_EMPTY_SLOT != page[i]) { - has_only_empty_metrics = 0; - break; + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type); + logged = true; + } + return 0; } } - return has_only_empty_metrics; + + return 1; } -void rrdeng_store_metric_flush_current_page(RRDDIM *rd) -{ - struct rrdeng_collect_handle *handle; - struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; + + if (unlikely(!ctx)) return; + if (unlikely(!descr)) return; - handle = (struct rrdeng_collect_handle *)rd->state->handle; - ctx = handle->ctx; - if (unlikely(!ctx)) - return; - descr = handle->descr; - if (unlikely(NULL == descr)) { - return; - } if (likely(descr->page_length)) { int page_is_empty; rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(ctx, handle->prev_descr); - } page_is_empty = page_has_only_empty_metrics(descr); if (page_is_empty) { debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); @@ -187,41 +232,34 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) print_page_cache_descr(descr); pg_cache_put(ctx, descr); pg_cache_punch_hole(ctx, descr, 1, 0, NULL); - handle->prev_descr = NULL; - } else { - /* - * Disable pinning for now as it leads to deadlocks. When a collector stops collecting the extra pinned page - * eventually gets rotated but it cannot be destroyed due to the extra reference. - */ - /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ -/* rrdeng_page_descr_mutex_lock(ctx, descr); - ret = pg_cache_try_get_unsafe(descr, 0); - rrdeng_page_descr_mutex_unlock(ctx, descr); - fatal_assert(1 == ret);*/ - + } else rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - /* handle->prev_descr = descr;*/ - } } else { dbengine_page_free(descr->pg_cache_descr->page); rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); - freez(descr); + rrdeng_page_descr_freez(descr); } handle->descr = NULL; } -void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags) { - struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle; - struct rrdengine_instance *ctx; - struct page_cache *pg_cache; - struct rrdeng_page_descr *descr; - storage_number *page; - uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; + struct rrdengine_instance *ctx = handle->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = handle->descr; + RRDDIM *rd = metric_handle->rd; - ctx = handle->ctx; - pg_cache = &ctx->pg_cache; - descr = handle->descr; + void *page; + uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; if (descr) { /* Make alignment decisions */ @@ -231,7 +269,7 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n perfect_page_alignment = 1; } /* is the metric far enough out of alignment with the others? */ - if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) { + if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < rd->rrdset->rrddim_page_alignment)) { handle->unaligned_page = 1; debug(D_RRDENGINE, "Metric page is not aligned with chart:"); if (unlikely(debug_flags & D_RRDENGINE)) @@ -239,18 +277,18 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n } if (unlikely(handle->unaligned_page && /* did the other metrics change page? */ - rd->rrdset->rrddim_page_alignment <= sizeof(number))) { + rd->rrdset->rrddim_page_alignment <= PAGE_POINT_SIZE_BYTES(descr))) { debug(D_RRDENGINE, "Flushing unaligned metric page."); must_flush_unaligned_page = 1; handle->unaligned_page = 0; } } if (unlikely(NULL == descr || - descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE || + descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE || must_flush_unaligned_page)) { - rrdeng_store_metric_flush_current_page(rd); + rrdeng_store_metric_flush_current_page(collection_handle); - page = rrdeng_create_page(ctx, &rd->state->page_index->id, &descr); + page = rrdeng_create_page(ctx, &metric_handle->page_index->id, &descr); fatal_assert(page); handle->descr = descr; @@ -262,9 +300,37 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n perfect_page_alignment = 1; } } + page = descr->pg_cache_descr->page; - page[descr->page_length / sizeof(number)] = number; - pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number)); + + switch (descr->type) { + case PAGE_METRICS: { + ((storage_number *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = pack_storage_number(n, flags); + } + break; + + case PAGE_TIER: { + storage_number_tier1_t number_tier1; + number_tier1.sum_value = (float)n; + number_tier1.min_value = (float)min_value; + number_tier1.max_value = (float)max_value; + number_tier1.anomaly_count = anomaly_count; + number_tier1.count = count; + ((storage_number_tier1_t *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = number_tier1; + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot store metric on unknown page type id %d", descr->type); + logged = true; + } + } + break; + } + + pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); if (perfect_page_alignment) rd->rrdset->rrddim_page_alignment = descr->page_length; @@ -284,9 +350,9 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n } } - pg_cache_insert(ctx, rd->state->page_index, descr); + pg_cache_insert(ctx, metric_handle->page_index, descr); } else { - pg_cache_add_new_metric_time(rd->state->page_index, descr); + pg_cache_add_new_metric_time(metric_handle->page_index, descr); } } @@ -294,21 +360,14 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n * Releases the database reference from the handle for storing metrics. * Returns 1 if it's safe to delete the dimension. */ -int rrdeng_store_metric_finalize(RRDDIM *rd) -{ - struct rrdeng_collect_handle *handle; - struct rrdengine_instance *ctx; - struct pg_cache_page_index *page_index; +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; + struct pg_cache_page_index *page_index = metric_handle->page_index; + uint8_t can_delete_metric = 0; - handle = (struct rrdeng_collect_handle *)rd->state->handle; - ctx = handle->ctx; - page_index = rd->state->page_index; - rrdeng_store_metric_flush_current_page(rd); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(ctx, handle->prev_descr); - } + rrdeng_store_metric_flush_current_page(collection_handle); uv_rwlock_wrlock(&page_index->lock); if (!--page_index->writers && !page_index->page_count) { can_delete_metric = 1; @@ -316,241 +375,55 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) uv_rwlock_wrunlock(&page_index->lock); freez(handle); - return can_delete_metric; -} - -/* Returns 1 if the data collection interval is well defined, 0 otherwise */ -static int metrics_with_known_interval(struct rrdeng_page_descr *descr) -{ - unsigned page_entries; - - if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time)) - return 0; - page_entries = descr->page_length / sizeof(storage_number); - if (likely(page_entries > 1)) { - return 1; - } - return 0; -} - -static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) -{ - return (uint32_t *)&page_info->scratch[0]; -} - -static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) -{ - return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; -} - -/** - * Calculates the regions of different data collection intervals in a netdata chart in the time range - * [start_time,end_time]. This call takes the netdata chart read lock. - * @param st the netdata chart whose data collection interval boundaries are calculated. - * @param start_time inclusive starting time in usec - * @param end_time inclusive ending time in usec - * @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a - * reference dimension that that have different data collection intervals and overlap with the time range - * [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set - * to NULL nothing was allocated. - * @param max_intervalp is dereferenced and set to be the largest data collection interval of all regions. - * @return number of regions with different data collection intervals. - */ -unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, - struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list) -{ - struct pg_cache_page_index *page_index; - struct rrdengine_instance *ctx; - unsigned pages_nr; - RRDDIM *rd_iter, *rd; - struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev; - unsigned i, j, page_entries, region_points, page_points, regions, max_interval; - time_t now; - usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page; - struct rrdeng_region_info *region_info_array; - uint8_t is_first_region_initialized; - - ctx = get_rrdeng_ctx_from_host(st->rrdhost); - regions = 1; - *max_intervalp = max_interval = 0; - region_info_array = NULL; - *region_info_arrayp = NULL; - page_info_array = NULL; - - RRDDIM *temp_rd = context_param_list ? context_param_list->rd : NULL; - rrdset_rdlock(st); - for(rd_iter = temp_rd?temp_rd:st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { - /* - * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions - * but it is a best effort approximation with a bias towards older metrics in a chart. It - * matches netdata behaviour in the sense that dimensions are generally aligned in a chart - * and older dimensions contain more information about the time range. It does not work well - * for metrics that have recently stopped being collected. - */ - curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid, - start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); - if (INVALID_TIME != curr_time && curr_time < min_time) { - rd = rd_iter; - min_time = curr_time; - } - } - rrdset_unlock(st); - if (NULL == rd) { - return 1; - } - pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, - &page_info_array, &page_index); - if (pages_nr) { - /* conservative allocation, will reduce the size later if necessary */ - region_info_array = mallocz(sizeof(*region_info_array) * pages_nr); - } - is_first_region_initialized = 0; - region_points = 0; - - int is_out_of_order_reported = 0; - /* pages loop */ - for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) { - old_prev = prev; - prev = curr; - curr = &page_info_array[i]; - *pginfo_to_points(curr) = 0; /* initialize to invalid page */ - *pginfo_to_dt(curr) = 0; /* no known data collection interval yet */ - if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time || - curr->end_time < curr->start_time)) { - info("Ignoring page with invalid timestamps."); - prev = old_prev; - continue; - } - page_entries = curr->page_length / sizeof(storage_number); - fatal_assert(0 != page_entries); - if (likely(1 != page_entries)) { - dt = (curr->end_time - curr->start_time) / (page_entries - 1); - *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt); - if (unlikely(0 == *pginfo_to_dt(curr))) - *pginfo_to_dt(curr) = 1; - } else { - dt = 0; - } - for (j = 0, page_points = 0 ; j < page_entries ; ++j) { - uint8_t is_metric_out_of_order, is_metric_earlier_than_range; - - is_metric_earlier_than_range = 0; - is_metric_out_of_order = 0; - - current_position_time = curr->start_time + j * dt; - now = current_position_time / USEC_PER_SEC; - if (now > end_time) { /* there will be no more pages in the time range */ - break; - } - if (now < start_time) - is_metric_earlier_than_range = 1; - if (unlikely(current_position_time < max_time)) /* just went back in time */ - is_metric_out_of_order = 1; - if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) { - if (unlikely(is_metric_out_of_order)) - is_out_of_order_reported++; - continue; /* next entry */ - } - /* here is a valid metric */ - ++page_points; - region_info_array[regions - 1].points = ++region_points; - max_time = current_position_time; - if (1 == page_points) - first_valid_time_in_page = current_position_time; - if (unlikely(!is_first_region_initialized)) { - fatal_assert(1 == regions); - /* this is the first region */ - region_info_array[0].start_time = current_position_time; - is_first_region_initialized = 1; - } - } - *pginfo_to_points(curr) = page_points; - if (0 == page_points) { - prev = old_prev; - continue; - } - - if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */ - fatal_assert(1 == page_points); - - if (likely(NULL != prev)) { /* get interval from previous page */ - *pginfo_to_dt(curr) = *pginfo_to_dt(prev); - } else { /* there is no previous page in the query */ - struct rrdeng_page_info db_page_info; - - /* go to database */ - pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time, - metrics_with_known_interval, &db_page_info); - if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME || - 0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */ - *pginfo_to_dt(curr) = rd->update_every; - } else { - unsigned db_entries; - usec_t db_dt; - - db_entries = db_page_info.page_length / sizeof(storage_number); - db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1); - *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt); - if (unlikely(0 == *pginfo_to_dt(curr))) - *pginfo_to_dt(curr) = 1; - - } - } - } - if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) { - info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32, - *pginfo_to_dt(prev), *pginfo_to_dt(curr)); - region_info_array[regions++ - 1].points -= page_points; - region_info_array[regions - 1].points = region_points = page_points; - region_info_array[regions - 1].start_time = first_valid_time_in_page; - } - if (*pginfo_to_dt(curr) > max_interval) - max_interval = *pginfo_to_dt(curr); - region_info_array[regions - 1].update_every = *pginfo_to_dt(curr); - } - if (page_info_array) - freez(page_info_array); - if (region_info_array) { - if (likely(is_first_region_initialized)) { - /* free unnecessary memory */ - region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions); - *region_info_arrayp = region_info_array; - *max_intervalp = max_interval; - } else { - /* empty result */ - freez(region_info_array); - } - } - if (is_out_of_order_reported) - info("Ignored %d metrics with out of order timestamp in %u regions.", is_out_of_order_reported, regions); - return regions; + return can_delete_metric; } +//static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) +//{ +// return (uint32_t *)&page_info->scratch[0]; +//} +// +//static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) +//{ +// return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; +//} +// /* * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). */ -void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time) +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct rrdengine_instance *ctx = metric_handle->ctx; + RRDDIM *rd = metric_handle->rd; + + // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time); + struct rrdeng_query_handle *handle; - struct rrdengine_instance *ctx; unsigned pages_nr; - ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; handle = callocz(1, sizeof(struct rrdeng_query_handle)); handle->next_page_time = start_time; handle->now = start_time; + handle->tier_query_fetch_type = tier_query_fetch_type; + // TODO we should store the dt of each page in each page + // this will produce wrong values for dt in case the user changes + // the update every of the charts or the tier grouping iterations + handle->dt_sec = get_tier_grouping(ctx->tier) * (time_t)rd->update_every; + handle->dt = handle->dt_sec * USEC_PER_SEC; handle->position = 0; handle->ctx = ctx; + handle->metric_handle = metric_handle; handle->descr = NULL; rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; - pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + pages_nr = pg_cache_preload(ctx, metric_handle->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) - /* there are no metrics to load */ + // there are no metrics to load handle->next_page_time = INVALID_TIME; } @@ -595,7 +468,7 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { // we're in the middle of the page somewhere - unsigned entries = page_length / sizeof(storage_number); + unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr); position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / (page_end_time - descr->start_time); } @@ -605,53 +478,101 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { handle->page_end_time = page_end_time; handle->page_length = page_length; handle->page = descr->pg_cache_descr->page; - usec_t entries = handle->entries = page_length / sizeof(storage_number); + usec_t entries = handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr); if (likely(entries > 1)) handle->dt = (page_end_time - descr->start_time) / (entries - 1); - else - handle->dt = 0; + else { + // TODO we should store the dt of each page in each page + // now we keep the dt of whatever was before + ; + } - handle->dt_sec = handle->dt / USEC_PER_SEC; + handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC); handle->position = position; return 0; } -/* Returns the metric and sets its timestamp into current_time */ -storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { +// Returns the metric and sets its timestamp into current_time +// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) +// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES +STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + // struct rrdeng_metric_handle *metric_handle = handle->metric_handle; - if (unlikely(INVALID_TIME == handle->next_page_time)) - return SN_EMPTY_SLOT; - + STORAGE_POINT sp; struct rrdeng_page_descr *descr = handle->descr; unsigned position = handle->position + 1; time_t now = handle->now + handle->dt_sec; + storage_number_tier1_t tier1_value; + + if (unlikely(INVALID_TIME == handle->next_page_time)) { + handle->next_page_time = INVALID_TIME; + handle->now = now; + storage_point_empty(sp, now - handle->dt_sec, now); + return sp; + } if (unlikely(!descr || position >= handle->entries)) { // We need to get a new page if(rrdeng_load_page_next(rrdimm_handle)) { // next calls will not load any more metrics handle->next_page_time = INVALID_TIME; - return SN_EMPTY_SLOT; + handle->now = now; + storage_point_empty(sp, now - handle->dt_sec, now); + return sp; } descr = handle->descr; position = handle->position; - now = (descr->start_time + position * handle->dt) / USEC_PER_SEC; + now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC); } - storage_number ret = handle->page[position]; + sp.start_time = now - handle->dt_sec; + sp.end_time = now; + handle->position = position; handle->now = now; + switch(descr->type) { + case PAGE_METRICS: { + storage_number n = handle->page[position]; + sp.min = sp.max = sp.sum = unpack_storage_number(n); + sp.flags = n & SN_USER_FLAGS; + sp.count = 1; + sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; + } + break; + + case PAGE_TIER: { + tier1_value = ((storage_number_tier1_t *)handle->page)[position]; + sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS; + sp.count = tier1_value.count; + sp.anomaly_count = tier1_value.anomaly_count; + sp.min = tier1_value.min_value; + sp.max = tier1_value.max_value; + sp.sum = tier1_value.sum_value; + } + break; + + // we don't know this page type + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", descr->type); + logged = true; + } + storage_point_empty(sp, sp.start_time, sp.end_time); + } + break; + } + if (unlikely(now >= rrdimm_handle->end_time)) { // next calls will not load any more metrics handle->next_page_time = INVALID_TIME; } - *current_time = now; - return ret; + return sp; } int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) @@ -681,31 +602,27 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) rrdimm_handle->handle = NULL; } -time_t rrdeng_metric_latest_time(RRDDIM *rd) -{ - struct pg_cache_page_index *page_index; - - page_index = rd->state->page_index; +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct pg_cache_page_index *page_index = metric_handle->page_index; return page_index->latest_time / USEC_PER_SEC; } -time_t rrdeng_metric_oldest_time(RRDDIM *rd) -{ - struct pg_cache_page_index *page_index; - - page_index = rd->state->page_index; +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; + struct pg_cache_page_index *page_index = metric_handle->page_index; return page_index->oldest_time / USEC_PER_SEC; } -int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t) +int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier) { struct page_cache *pg_cache; struct rrdengine_instance *ctx; Pvoid_t *PValue; struct pg_cache_page_index *page_index = NULL; - ctx = get_rrdeng_ctx_from_host(localhost); + ctx = get_rrdeng_ctx_from_host(localhost, tier); if (unlikely(!ctx)) { error("Failed to fetch multidb context"); return 1; @@ -728,6 +645,36 @@ int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, t return 1; } +int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t) +{ + struct page_cache *pg_cache; + struct rrdengine_instance *ctx; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + + ctx = (struct rrdengine_instance *)si; + if (unlikely(!ctx)) { + error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__); + return 1; + } + pg_cache = &ctx->pg_cache; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + if (likely(page_index)) { + *first_entry_t = page_index->oldest_time / USEC_PER_SEC; + *last_entry_t = page_index->latest_time / USEC_PER_SEC; + return 0; + } + + return 1; +} + /* Also gets a reference for the page */ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr) { @@ -738,6 +685,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde descr = pg_cache_create_descr(); descr->id = id; /* TODO: add page type: metric, log, something? */ + descr->type = ctx->page_type; page = dbengine_page_alloc(); /*TODO: add page size */ rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; @@ -901,8 +849,7 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) * Returns 0 on success, negative on error */ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, - unsigned disk_space_mb) -{ + unsigned disk_space_mb, int tier) { struct rrdengine_instance *ctx; int error; uint32_t max_open_files; @@ -914,19 +861,23 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p if (rrdeng_reserved_file_descriptors > max_open_files) { error( "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", - (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); + (unsigned)rrdeng_reserved_file_descriptors, + (unsigned)max_open_files); rrd_stat_atomic_add(&global_fs_errors, 1); rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return UV_EMFILE; } - if (NULL == ctxp) { - ctx = &multidb_ctx; + if(NULL == ctxp) { + ctx = multidb_ctx[tier]; memset(ctx, 0, sizeof(*ctx)); - } else { + } + else { *ctxp = ctx = callocz(1, sizeof(*ctx)); } + ctx->tier = tier; + ctx->page_type = tier_page_type[tier]; ctx->global_compress_alg = RRD_LZ4; if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; @@ -979,9 +930,10 @@ error_after_rrdeng_worker: finalize_rrd_files(ctx); error_after_init_rrd_files: free_page_cache(ctx); - if (ctx != &multidb_ctx) { + if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) { freez(ctx); - *ctxp = NULL; + if (ctxp) + *ctxp = NULL; } rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return UV_EIO; @@ -1008,9 +960,9 @@ int rrdeng_exit(struct rrdengine_instance *ctx) //metalog_exit(ctx->metalog_ctx); free_page_cache(ctx); - if (ctx != &multidb_ctx) { + if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) freez(ctx); - } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return 0; } @@ -1034,3 +986,107 @@ void rrdeng_prepare_exit(struct rrdengine_instance *ctx) //metalog_prepare_exit(ctx->metalog_ctx); } +RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { + RRDENG_SIZE_STATS stats = { 0 }; + + for(struct pg_cache_page_index *page_index = ctx->pg_cache.metrics_index.last_page_index; + page_index != NULL ;page_index = page_index->prev) { + stats.metrics++; + stats.metrics_pages += page_index->page_count; + } + + for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) { + stats.datafiles++; + + for(struct extent_info *ei = df->extents.first; ei ; ei = ei->next) { + stats.extents++; + stats.extents_compressed_bytes += ei->size; + + for(int p = 0; p < ei->number_of_pages ;p++) { + struct rrdeng_page_descr *descr = ei->pages[p]; + + usec_t update_every_usec; + + size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + + if(likely(points > 1)) + update_every_usec = (descr->end_time - descr->start_time) / (points - 1); + else { + update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC; + stats.single_point_pages++; + } + + time_t duration_secs = (time_t)((descr->end_time - descr->start_time + update_every_usec)/USEC_PER_SEC); + + stats.extents_pages++; + stats.pages_uncompressed_bytes += descr->page_length; + stats.pages_duration_secs += duration_secs; + stats.points += points; + + stats.page_types[descr->type].pages++; + stats.page_types[descr->type].pages_uncompressed_bytes += descr->page_length; + stats.page_types[descr->type].pages_duration_secs += duration_secs; + stats.page_types[descr->type].points += points; + + if(!stats.first_t || (descr->start_time - update_every_usec) < stats.first_t) + stats.first_t = (descr->start_time - update_every_usec) / USEC_PER_SEC; + + if(!stats.last_t || descr->end_time > stats.last_t) + stats.last_t = descr->end_time / USEC_PER_SEC; + } + } + } + + + stats.currently_collected_metrics = ctx->stats.metric_API_producers; + stats.max_concurrently_collected_metrics = ctx->metric_API_max_producers; + + internal_error(stats.metrics_pages != stats.extents_pages + stats.currently_collected_metrics, + "DBENGINE: metrics pages is %zu, but extents pages is %zu and API consumers is %zu", + stats.metrics_pages, stats.extents_pages, stats.currently_collected_metrics); + + stats.disk_space = ctx->disk_space; + stats.max_disk_space = ctx->max_disk_space; + + stats.database_retention_secs = (time_t)(stats.last_t - stats.first_t); + + if(stats.extents_pages) + stats.average_page_size_bytes = (double)stats.pages_uncompressed_bytes / (double)stats.extents_pages; + + if(stats.pages_uncompressed_bytes > 0) + stats.average_compression_savings = 100.0 - ((double)stats.extents_compressed_bytes * 100.0 / (double)stats.pages_uncompressed_bytes); + + if(stats.points) + stats.average_point_duration_secs = (double)stats.pages_duration_secs / (double)stats.points; + + if(stats.metrics) { + stats.average_metric_retention_secs = (double)stats.pages_duration_secs / (double)stats.metrics; + + if(stats.database_retention_secs) { + double metric_coverage = stats.average_metric_retention_secs / (double)stats.database_retention_secs; + double db_retention_days = (double)stats.database_retention_secs / 86400.0; + + stats.estimated_concurrently_collected_metrics = stats.metrics * metric_coverage; + + stats.ephemeral_metrics_per_day_percent = ((double)stats.metrics * 100.0 / (double)stats.estimated_concurrently_collected_metrics - 100.0) / (double)db_retention_days; + } + } + + stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index)); + stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr)); + stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile)); + stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr)); + stats.sizeof_point_data = page_type_size[ctx->page_type]; + stats.sizeof_page_data = RRDENG_BLOCK_SIZE; + stats.pages_per_extent = rrdeng_pages_per_extent; + + stats.sizeof_extent = sizeof(struct extent_info); + stats.sizeof_page_in_extent = sizeof(struct rrdeng_page_descr *); + + stats.sizeof_metric_in_index = 40; + stats.sizeof_page_in_index = 24; + + stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->tier); + + return stats; +} |