diff options
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-x | database/engine/rrdengineapi.c | 202 |
1 files changed, 108 insertions, 94 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 6ebee145..76010a7c 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -126,12 +126,13 @@ void rrdeng_store_metric_init(RRDDIM *rd) struct pg_cache_page_index *page_index; ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); - handle = &rd->state->handle.rrdeng; - handle->ctx = ctx; + handle = callocz(1, sizeof(struct rrdeng_collect_handle)); + handle->ctx = ctx; handle->descr = NULL; handle->prev_descr = NULL; handle->unaligned_page = 0; + rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle; page_index = rd->state->page_index; uv_rwlock_wrlock(&page_index->lock); @@ -162,7 +163,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) struct rrdengine_instance *ctx; struct rrdeng_page_descr *descr; - handle = &rd->state->handle.rrdeng; + handle = (struct rrdeng_collect_handle *)rd->state->handle; ctx = handle->ctx; if (unlikely(!ctx)) return; @@ -202,7 +203,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) /* handle->prev_descr = descr;*/ } } else { - freez(descr->pg_cache_descr->page); + dbengine_page_free(descr->pg_cache_descr->page); rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); freez(descr); } @@ -211,14 +212,13 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) { - struct rrdeng_collect_handle *handle; + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle; struct rrdengine_instance *ctx; struct page_cache *pg_cache; struct rrdeng_page_descr *descr; storage_number *page; uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; - handle = &rd->state->handle.rrdeng; ctx = handle->ctx; pg_cache = &ctx->pg_cache; descr = handle->descr; @@ -301,7 +301,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) struct pg_cache_page_index *page_index; uint8_t can_delete_metric = 0; - handle = &rd->state->handle.rrdeng; + handle = (struct rrdeng_collect_handle *)rd->state->handle; ctx = handle->ctx; page_index = rd->state->page_index; rrdeng_store_metric_flush_current_page(rd); @@ -314,6 +314,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) can_delete_metric = 1; } uv_rwlock_wrunlock(&page_index->lock); + freez(handle); return can_delete_metric; } @@ -406,6 +407,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e is_first_region_initialized = 0; region_points = 0; + int is_out_of_order_reported = 0; /* pages loop */ for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) { old_prev = prev; @@ -446,7 +448,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e is_metric_out_of_order = 1; if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) { if (unlikely(is_metric_out_of_order)) - info("Ignoring metric with out of order timestamp."); + is_out_of_order_reported++; continue; /* next entry */ } /* here is a valid metric */ @@ -519,6 +521,8 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e freez(region_info_array); } } + if (is_out_of_order_reported) + info("Ignored %d metrics with out of order timestamp in %u regions.", is_out_of_order_reported, regions); return regions; } @@ -535,12 +539,14 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; - handle = &rrdimm_handle->rrdeng; + + handle = callocz(1, sizeof(struct rrdeng_query_handle)); handle->next_page_time = start_time; handle->now = start_time; handle->position = 0; handle->ctx = ctx; handle->descr = NULL; + rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) @@ -548,102 +554,109 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand handle->next_page_time = INVALID_TIME; } -/* Returns the metric and sets its timestamp into current_time */ -storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) -{ - struct rrdeng_query_handle *handle; - struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; - storage_number *page, ret; - unsigned position, entries; - usec_t next_page_time = 0, current_position_time, page_end_time = 0; +static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; + uint32_t page_length; + usec_t page_end_time; + unsigned position; - handle = &rrdimm_handle->rrdeng; - if (unlikely(INVALID_TIME == handle->next_page_time)) { - return SN_EMPTY_SLOT; - } - ctx = handle->ctx; - if (unlikely(NULL == (descr = handle->descr))) { - /* it's the first call */ - next_page_time = handle->next_page_time * USEC_PER_SEC; - } else { - pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); - } - position = handle->position + 1; + if (likely(descr)) { + // Drop old page's reference - if (unlikely(NULL == descr || - position >= (page_length / sizeof(storage_number)))) { - /* We need to get a new page */ - if (descr) { - /* Drop old page's reference */ #ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(ctx, descr); - handle->descr = NULL; - handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; - if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { - goto no_more_metrics; - } - next_page_time = handle->next_page_time * USEC_PER_SEC; - } - descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, - next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); - if (NULL == descr) { - goto no_more_metrics; - } + pg_cache_put(ctx, descr); + handle->descr = NULL; + handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1; + + if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) + return 1; + } + + usec_t next_page_time = handle->next_page_time * USEC_PER_SEC; + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); + if (NULL == descr) + return 1; + #ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); #endif - handle->descr = descr; - pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); - if (unlikely(INVALID_TIME == descr->start_time || - INVALID_TIME == page_end_time)) { - goto no_more_metrics; - } - if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { - /* we're in the middle of the page somewhere */ - entries = page_length / sizeof(storage_number); - position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / - (page_end_time - descr->start_time); - } else { - position = 0; - } + + handle->descr = descr; + pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); + if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time)) + return 1; + + if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { + // we're in the middle of the page somewhere + unsigned entries = page_length / sizeof(storage_number); + position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / + (page_end_time - descr->start_time); } - page = descr->pg_cache_descr->page; - ret = page[position]; - entries = page_length / sizeof(storage_number); - if (entries > 1) { - usec_t dt; + else + position = 0; + + handle->page_end_time = page_end_time; + handle->page_length = page_length; + handle->page = descr->pg_cache_descr->page; + usec_t entries = handle->entries = page_length / sizeof(storage_number); + if (likely(entries > 1)) + handle->dt = (page_end_time - descr->start_time) / (entries - 1); + else + handle->dt = 0; - dt = (page_end_time - descr->start_time) / (entries - 1); - current_position_time = descr->start_time + position * dt; - } else { - current_position_time = descr->start_time; + handle->dt_sec = handle->dt / USEC_PER_SEC; + handle->position = position; + + return 0; +} + +/* Returns the metric and sets its timestamp into current_time */ +storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + + if (unlikely(INVALID_TIME == handle->next_page_time)) + return SN_EMPTY_SLOT; + + struct rrdeng_page_descr *descr = handle->descr; + unsigned position = handle->position + 1; + time_t now = handle->now + handle->dt_sec; + + if (unlikely(!descr || position >= handle->entries)) { + // We need to get a new page + if(rrdeng_load_page_next(rrdimm_handle)) { + // next calls will not load any more metrics + handle->next_page_time = INVALID_TIME; + return SN_EMPTY_SLOT; + } + + descr = handle->descr; + position = handle->position; + now = (descr->start_time + position * handle->dt) / USEC_PER_SEC; } + + storage_number ret = handle->page[position]; handle->position = position; - handle->now = current_position_time / USEC_PER_SEC; -/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); - The above assertion is an approximation and needs to take update_every into account */ - if (unlikely(handle->now >= rrdimm_handle->end_time)) { - /* next calls will not load any more metrics */ + handle->now = now; + + if (unlikely(now >= rrdimm_handle->end_time)) { + // next calls will not load any more metrics handle->next_page_time = INVALID_TIME; } - *current_time = handle->now; - return ret; -no_more_metrics: - handle->next_page_time = INVALID_TIME; - return SN_EMPTY_SLOT; + *current_time = now; + return ret; } int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle; - - handle = &rrdimm_handle->rrdeng; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; return (INVALID_TIME == handle->next_page_time); } @@ -652,19 +665,20 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) */ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle; - struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; - handle = &rrdimm_handle->rrdeng; - ctx = handle->ctx; - descr = handle->descr; if (descr) { #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif pg_cache_put(ctx, descr); } + + // whatever is allocated at rrdeng_load_metric_init() should be freed here + freez(handle); + rrdimm_handle->handle = NULL; } time_t rrdeng_metric_latest_time(RRDDIM *rd) @@ -724,7 +738,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde descr = pg_cache_create_descr(); descr->id = id; /* TODO: add page type: metric, log, something? */ - page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ + page = dbengine_page_alloc(); /*TODO: add page size */ rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; pg_cache_descr->page = page; @@ -949,7 +963,7 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p /* wait for worker thread to initialize */ completion_wait_for(&ctx->rrdengine_completion); completion_destroy(&ctx->rrdengine_completion); - uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); + uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER"); if (ctx->worker_config.error) { goto error_after_rrdeng_worker; } |