diff options
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-x | database/engine/rrdengineapi.c | 267 |
1 files changed, 118 insertions, 149 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 27497bbb8..ddc306ed7 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -35,16 +35,16 @@ __attribute__((constructor)) void initialize_multidb_ctx(void) { multidb_ctx[4] = &multidb_ctx_storage_tier4; } -int default_rrdeng_page_fetch_timeout = 3; -int default_rrdeng_page_fetch_retries = 3; int db_engine_journal_check = 0; int default_rrdeng_disk_quota_mb = 256; int default_multidb_disk_quota_mb = 256; #if defined(ENV32BIT) int default_rrdeng_page_cache_mb = 16; +int default_rrdeng_extent_cache_mb = 0; #else int default_rrdeng_page_cache_mb = 32; +int default_rrdeng_extent_cache_mb = 0; #endif // ---------------------------------------------------------------------------- @@ -163,7 +163,7 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE } #ifdef NETDATA_INTERNAL_CHECKS - if(uuid_compare(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric)) != 0) { + if(uuid_memcmp(&rd->metric_uuid, mrg_metric_uuid(main_mrg, metric)) != 0) { char uuid1[UUID_STR_LEN + 1]; char uuid2[UUID_STR_LEN + 1]; @@ -255,8 +255,11 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri struct rrdeng_collect_handle *handle; handle = callocz(1, sizeof(struct rrdeng_collect_handle)); + handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE; handle->metric = metric; handle->page = NULL; + handle->data = NULL; + handle->data_size = 0; handle->page_position = 0; handle->page_entries_max = 0; handle->update_every_ut = (usec_t)update_every * USEC_PER_SEC; @@ -340,6 +343,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h handle->page_flags = 0; handle->page_position = 0; handle->page_entries_max = 0; + handle->data = NULL; + handle->data_size = 0; // important! // we should never zero page end time ut, because this will allow @@ -348,6 +353,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h // handle->page_start_time_ut; check_and_fix_mrg_update_every(handle); + + timing_step(TIMING_STEP_DBENGINE_FLUSH_PAGE); } static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *handle, @@ -365,7 +372,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha .end_time_s = point_in_time_s, .size = data_size, .data = data, - .update_every_s = update_every_s, + .update_every_s = (uint32_t) update_every_s, .hot = true }; @@ -414,62 +421,57 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha handle->page_flags |= RRDENG_PAGE_CREATED_IN_FUTURE; check_and_fix_mrg_update_every(handle); + + timing_step(TIMING_STEP_DBENGINE_CREATE_NEW_PAGE); } -static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) { - struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); - size_t size; +static size_t aligned_allocation_entries(size_t max_slots, size_t target_slot, time_t now_s) { + size_t slots = target_slot; + size_t pos = (now_s % max_slots); - if(handle->options & RRDENG_FIRST_PAGE_ALLOCATED) { - // any page except the first - size = tier_page_size[ctx->config.tier]; - } - else { - size_t final_slots = 0; + if(pos > slots) + slots += max_slots - pos; - // the first page - handle->options |= RRDENG_FIRST_PAGE_ALLOCATED; - size_t max_size = tier_page_size[ctx->config.tier]; - size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx); + else if(pos < slots) + slots -= pos; - if(handle->alignment->initial_slots) { - final_slots = handle->alignment->initial_slots; - } - else { - max_slots -= 3; + else + slots = max_slots; - size_t smaller_slot = indexing_partition((Word_t)handle->alignment, max_slots); - final_slots = smaller_slot; + return slots; +} - time_t now_s = (time_t)(point_in_time_ut / USEC_PER_SEC); - size_t current_pos = (now_s % max_slots); +static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) { + struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); - if(current_pos > final_slots) - final_slots += max_slots - current_pos; + size_t max_size = tier_page_size[ctx->config.tier]; + size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx); - else if(current_pos < final_slots) - final_slots -= current_pos; + size_t slots = aligned_allocation_entries( + max_slots, + indexing_partition((Word_t) handle->alignment, max_slots), + (time_t) (point_in_time_ut / USEC_PER_SEC) + ); - if(final_slots < 3) { - final_slots += 3; - smaller_slot += 3; + if(slots < max_slots / 3) + slots = max_slots / 3; - if(smaller_slot >= max_slots) - smaller_slot -= max_slots; - } + if(slots < 3) + slots = 3; - max_slots += 3; - handle->alignment->initial_slots = smaller_slot + 3; + size_t size = slots * CTX_POINT_SIZE_BYTES(ctx); - internal_fatal(handle->alignment->initial_slots < 3 || handle->alignment->initial_slots >= max_slots, "ooops! wrong distribution of metrics across time"); - internal_fatal(final_slots < 3 || final_slots >= max_slots, "ooops! wrong distribution of metrics across time"); - } + // internal_error(true, "PAGE ALLOC %zu bytes (%zu max)", size, max_size); - size = final_slots * CTX_POINT_SIZE_BYTES(ctx); - } + internal_fatal(slots < 3 || slots > max_slots, "ooops! wrong distribution of metrics across time"); + internal_fatal(size > tier_page_size[ctx->config.tier] || size < CTX_POINT_SIZE_BYTES(ctx) * 2, "ooops! wrong page size"); *data_size = size; - return dbengine_page_alloc(size); + void *d = dbengine_page_alloc(size); + + timing_step(TIMING_STEP_DBENGINE_PAGE_ALLOC); + + return d; } static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_handle, @@ -484,75 +486,33 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); - bool perfect_page_alignment = false; - void *data; - size_t data_size; + if(unlikely(!handle->data)) + handle->data = rrdeng_alloc_new_metric_data(handle, &handle->data_size, point_in_time_ut); - if(likely(handle->page)) { - /* Make alignment decisions */ - if (handle->page_position == handle->alignment->page_position) { - /* this is the leading dimension that defines chart alignment */ - perfect_page_alignment = true; - } - - /* is the metric far enough out of alignment with the others? */ - if (unlikely(handle->page_position + 1 < handle->alignment->page_position)) - handle->options |= RRDENG_CHO_UNALIGNED; + timing_step(TIMING_STEP_DBENGINE_CHECK_DATA); - if (unlikely((handle->options & RRDENG_CHO_UNALIGNED) && - /* did the other metrics change page? */ - handle->alignment->page_position <= 1)) { - handle->options &= ~RRDENG_CHO_UNALIGNED; - handle->page_flags |= RRDENG_PAGE_UNALIGNED; - rrdeng_store_metric_flush_current_page(collection_handle); - - data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut); - } - else { - data = pgc_page_data(handle->page); - data_size = pgc_page_data_size(main_cache, handle->page); - } + if(likely(ctx->config.page_type == PAGE_METRICS)) { + storage_number *tier0_metric_data = handle->data; + tier0_metric_data[handle->page_position] = pack_storage_number(n, flags); + } + else if(likely(ctx->config.page_type == PAGE_TIER)) { + storage_number_tier1_t *tier12_metric_data = handle->data; + storage_number_tier1_t number_tier1; + number_tier1.sum_value = (float) n; + number_tier1.min_value = (float) min_value; + number_tier1.max_value = (float) max_value; + number_tier1.anomaly_count = anomaly_count; + number_tier1.count = count; + tier12_metric_data[handle->page_position] = number_tier1; } else - data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut); - - switch (ctx->config.page_type) { - case PAGE_METRICS: { - storage_number *tier0_metric_data = data; - tier0_metric_data[handle->page_position] = pack_storage_number(n, flags); - } - break; + fatal("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type); - case PAGE_TIER: { - storage_number_tier1_t *tier12_metric_data = data; - storage_number_tier1_t number_tier1; - number_tier1.sum_value = (float)n; - number_tier1.min_value = (float)min_value; - number_tier1.max_value = (float)max_value; - number_tier1.anomaly_count = anomaly_count; - number_tier1.count = count; - tier12_metric_data[handle->page_position] = number_tier1; - } - break; - - default: { - static bool logged = false; - if(!logged) { - error("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type); - logged = true; - } - } - break; - } + timing_step(TIMING_STEP_DBENGINE_PACK); if(unlikely(!handle->page)){ - rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, data, data_size); + rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, handle->data, handle->data_size); // handle->position is set to 1 already - - if (0 == handle->alignment->page_position) { - /* this is the leading dimension that defines chart alignment */ - perfect_page_alignment = true; - } } else { // update an existing page @@ -566,11 +526,12 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_ } } - if (perfect_page_alignment) - handle->alignment->page_position = handle->page_position; + timing_step(TIMING_STEP_DBENGINE_PAGE_FIN); // update the metric information mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, (time_t) (point_in_time_ut / USEC_PER_SEC)); + + timing_step(TIMING_STEP_DBENGINE_MRG_UPDATE); } static void store_metric_next_error_log(struct rrdeng_collect_handle *handle, usec_t point_in_time_ut, const char *msg) { @@ -612,6 +573,8 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, const uint16_t anomaly_count, const SN_FLAGS flags) { + timing_step(TIMING_STEP_RRDSET_STORE_METRIC); + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; #ifdef NETDATA_INTERNAL_CHECKS @@ -619,59 +582,62 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, handle->page_flags |= RRDENG_PAGE_FUTURE_POINT; #endif - if(likely(handle->page_end_time_ut + handle->update_every_ut == point_in_time_ut)) { + usec_t delta_ut = point_in_time_ut - handle->page_end_time_ut; + + if(likely(delta_ut == handle->update_every_ut)) { // happy path ; } + else if(unlikely(point_in_time_ut > handle->page_end_time_ut)) { + if(handle->page) { + if (unlikely(delta_ut < handle->update_every_ut)) { + handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL; + rrdeng_store_metric_flush_current_page(collection_handle); + } + else if (unlikely(delta_ut % handle->update_every_ut)) { + handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED; + rrdeng_store_metric_flush_current_page(collection_handle); + } + else { + size_t points_gap = delta_ut / handle->update_every_ut; + size_t page_remaining_points = handle->page_entries_max - handle->page_position; + + if (points_gap >= page_remaining_points) { + handle->page_flags |= RRDENG_PAGE_BIG_GAP; + rrdeng_store_metric_flush_current_page(collection_handle); + } + else { + // loop to fill the gap + handle->page_flags |= RRDENG_PAGE_GAP; + + usec_t stop_ut = point_in_time_ut - handle->update_every_ut; + for (usec_t this_ut = handle->page_end_time_ut + handle->update_every_ut; + this_ut <= stop_ut; + this_ut = handle->page_end_time_ut + handle->update_every_ut) { + rrdeng_store_metric_append_point( + collection_handle, + this_ut, + NAN, NAN, NAN, + 1, 0, + SN_EMPTY_SLOT); + } + } + } + } + } else if(unlikely(point_in_time_ut < handle->page_end_time_ut)) { handle->page_flags |= RRDENG_PAGE_PAST_COLLECTION; store_metric_next_error_log(handle, point_in_time_ut, "is older than the"); return; } - else if(unlikely(point_in_time_ut == handle->page_end_time_ut)) { + else /* if(unlikely(point_in_time_ut == handle->page_end_time_ut)) */ { handle->page_flags |= RRDENG_PAGE_REPEATED_COLLECTION; store_metric_next_error_log(handle, point_in_time_ut, "is at the same time as the"); return; } - else if(handle->page) { - usec_t delta_ut = point_in_time_ut - handle->page_end_time_ut; - - if(unlikely(delta_ut < handle->update_every_ut)) { - handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL; - rrdeng_store_metric_flush_current_page(collection_handle); - } - else if(unlikely(delta_ut % handle->update_every_ut)) { - handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED; - rrdeng_store_metric_flush_current_page(collection_handle); - } - else { - size_t points_gap = delta_ut / handle->update_every_ut; - size_t page_remaining_points = handle->page_entries_max - handle->page_position; - - if(points_gap >= page_remaining_points) { - handle->page_flags |= RRDENG_PAGE_BIG_GAP; - rrdeng_store_metric_flush_current_page(collection_handle); - } - else { - // loop to fill the gap - handle->page_flags |= RRDENG_PAGE_GAP; - - usec_t stop_ut = point_in_time_ut - handle->update_every_ut; - for(usec_t this_ut = handle->page_end_time_ut + handle->update_every_ut; - this_ut <= stop_ut ; - this_ut = handle->page_end_time_ut + handle->update_every_ut) { - rrdeng_store_metric_append_point( - collection_handle, - this_ut, - NAN, NAN, NAN, - 1, 0, - SN_EMPTY_SLOT); - } - } - } - } + timing_step(TIMING_STEP_DBENGINE_FIRST_CHECK); rrdeng_store_metric_append_point(collection_handle, point_in_time_ut, @@ -776,10 +742,10 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, handle = rrdeng_query_handle_get(); register_query_handle(handle); - if(unlikely(priority < STORAGE_PRIORITY_HIGH)) + if (unlikely(priority < STORAGE_PRIORITY_HIGH)) priority = STORAGE_PRIORITY_HIGH; - else if(unlikely(priority > STORAGE_PRIORITY_BEST_EFFORT)) - priority = STORAGE_PRIORITY_BEST_EFFORT; + else if (unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE)) + priority = STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1; handle->ctx = ctx; handle->metric = metric; @@ -809,6 +775,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, rrddim_handle->start_time_s = handle->start_time_s; rrddim_handle->end_time_s = handle->end_time_s; rrddim_handle->priority = priority; + rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE; pg_cache_preload(handle); @@ -824,6 +791,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, rrddim_handle->start_time_s = handle->start_time_s; rrddim_handle->end_time_s = 0; rrddim_handle->priority = priority; + rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE; } } @@ -906,6 +874,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim // We need to get a new page if (!rrdeng_load_page_next(rrddim_handle, false)) { + handle->now_s = rrddim_handle->end_time_s; storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s); goto prepare_for_next_iteration; } |