diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-06-09 04:52:47 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-06-09 04:52:57 +0000 |
commit | 00151562145df50cc65e9902d52d5fa77f89fe50 (patch) | |
tree | 2737716802f6725a5074d606ec8fe5422c58a83c /database/engine | |
parent | Releasing debian version 1.34.1-1. (diff) | |
download | netdata-00151562145df50cc65e9902d52d5fa77f89fe50.tar.xz netdata-00151562145df50cc65e9902d52d5fa77f89fe50.zip |
Merging upstream version 1.35.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/journalfile.c | 1 | ||||
-rw-r--r-- | database/engine/metadata_log/logfile.c | 20 | ||||
-rw-r--r-- | database/engine/pagecache.c | 16 | ||||
-rw-r--r-- | database/engine/pagecache.h | 1 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 53 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 25 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 202 |
7 files changed, 203 insertions, 115 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 1541eb10..0b3d3eeb 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -84,6 +84,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); } + memset(ctx->commit_log.buf, 0, buf_size); buf_pos = ctx->commit_log.buf_pos = 0; ctx->commit_log.buf_size = buf_size; } diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c index f5bd9b2d..07eb9b6f 100644 --- a/database/engine/metadata_log/logfile.c +++ b/database/engine/metadata_log/logfile.c @@ -375,19 +375,15 @@ static int scan_metalog_files(struct metalog_instance *ctx) struct metalog_pluginsd_state metalog_parser_state; metalog_pluginsd_state_init(&metalog_parser_state, ctx); - PARSER_USER_OBJECT metalog_parser_object; - metalog_parser_object.enabled = cd.enabled; - metalog_parser_object.host = ctx->rrdeng_ctx->host; - metalog_parser_object.cd = &cd; - metalog_parser_object.trust_durations = 0; - metalog_parser_object.private = &metalog_parser_state; + PARSER_USER_OBJECT metalog_parser_object = { + .enabled = cd.enabled, + .host = ctx->rrdeng_ctx->host, + .cd = &cd, + .trust_durations = 0, + .private = &metalog_parser_state + }; PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT); - if (unlikely(!parser)) { - error("Failed to initialize metadata log parser."); - failed_to_load = matched_files; - goto after_failed_to_parse; - } parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host); parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid); parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context); @@ -428,10 +424,8 @@ static int scan_metalog_files(struct metalog_instance *ctx) size_t count __maybe_unused = metalog_parser_object.count; debug(D_METADATALOG, "Parsing count=%u", (unsigned)count); -after_failed_to_parse: freez(metalogfiles); - return matched_files; } diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 40e24b32..cddbf9e1 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -356,7 +356,7 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_ { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - freez(pg_cache_descr->page); + dbengine_page_free(pg_cache_descr->page); pg_cache_descr->page = NULL; pg_cache_descr->flags &= ~RRD_PAGE_POPULATED; pg_cache_release_pages_unsafe(ctx, 1); @@ -437,7 +437,6 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); if (unlikely(0 == ret)) { uv_rwlock_wrunlock(&page_index->lock); - error("Page under deletion was not in index."); if (unlikely(debug_flags & D_RRDENGINE)) { print_page_descr(descr); } @@ -1067,10 +1066,13 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index page_not_in_cache = 0; uv_rwlock_rdlock(&page_index->lock); + int retry_count = 0; while (1) { descr = find_first_page_in_time_range(page_index, start_time, end_time); - if (NULL == descr || 0 == descr->page_length) { + if (NULL == descr || 0 == descr->page_length || retry_count == MAX_PAGE_CACHE_RETRY_WAIT) { /* non-empty page not found */ + if (retry_count == MAX_PAGE_CACHE_RETRY_WAIT) + error_report("Page cache timeout while waiting for page %p : returning FAIL", descr); uv_rwlock_rdunlock(&page_index->lock); pg_cache_release_pages(ctx, 1); @@ -1114,7 +1116,11 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index print_page_cache_descr(descr); if (!(flags & RRD_PAGE_POPULATED)) page_not_in_cache = 1; - pg_cache_wait_event_unsafe(descr); + + if (pg_cache_timedwait_event_unsafe(descr, 1) == UV_ETIMEDOUT) { + error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count); + ++retry_count; + } rrdeng_page_descr_mutex_unlock(ctx, descr); /* reset scan to find again */ @@ -1222,7 +1228,7 @@ void free_page_cache(struct rrdengine_instance *ctx) /* Check rrdenglocking.c */ pg_cache_descr = descr->pg_cache_descr; if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { - freez(pg_cache_descr->page); + dbengine_page_free(pg_cache_descr->page); bytes_freed += RRDENG_BLOCK_SIZE; } rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index d5350ef5..0ba4639c 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -11,6 +11,7 @@ struct extent_info; struct rrdeng_page_descr; #define INVALID_TIME (0) +#define MAX_PAGE_CACHE_RETRY_WAIT (3) /* Page flags */ #define RRD_PAGE_DIRTY (1LU << 0) diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index a975cfa6..9f43f445 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -11,8 +11,24 @@ rrdeng_stats_t global_flushing_pressure_page_deletions = 0; static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT; +#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2) +#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) +#endif + +void *dbengine_page_alloc() { + void *page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); + if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); + return page; +} + +void dbengine_page_free(void *page) { + munmap(page, RRDENG_BLOCK_SIZE); +} + static void sanity_check(void) { + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)); + /* Magic numbers must fit in the super-blocks */ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); @@ -176,7 +192,7 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str struct extent_info *extent = xt_io_descr->descr_array[0]->extent; for (i = 0 ; i < xt_io_descr->descr_count; ++i) { - page = mallocz(RRDENG_BLOCK_SIZE); + page = dbengine_page_alloc(); descr = xt_io_descr->descr_array[i]; for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { /* care, we don't hold the descriptor mutex */ @@ -331,7 +347,7 @@ after_crc_check: continue; /* Failed to reserve a suitable page */ is_prefetched_page = 1; } - page = mallocz(RRDENG_BLOCK_SIZE); + page = dbengine_page_alloc(); /* care, we don't hold the descriptor mutex */ if (have_read_error) { @@ -735,6 +751,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct fatal("posix_memalign:%s", strerror(ret)); /* freez(xt_io_descr);*/ } + memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes)); (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); xt_io_descr->descr_count = count; @@ -1074,13 +1091,17 @@ void async_cb(uv_async_t *handle) void timer_cb(uv_timer_t* handle) { + worker_is_busy(RRDENG_MAX_OPCODE + 1); + struct rrdengine_worker_config* wc = handle->data; struct rrdengine_instance *ctx = wc->ctx; uv_stop(handle->loop); uv_update_time(handle->loop); - if (unlikely(!ctx->metalog_ctx->initialized)) + if (unlikely(!ctx->metalog_ctx->initialized)) { + worker_is_idle(); return; /* Wait for the metadata log to initialize */ + } rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { @@ -1122,12 +1143,26 @@ void timer_cb(uv_timer_t* handle) debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); } #endif + + worker_is_idle(); } #define MAX_CMD_BATCH_SIZE (256) void rrdeng_worker(void* arg) { + worker_register("DBENGINE"); + worker_register_job_name(RRDENG_NOOP, "noop"); + worker_register_job_name(RRDENG_READ_PAGE, "page read"); + worker_register_job_name(RRDENG_READ_EXTENT, "extent read"); + worker_register_job_name(RRDENG_COMMIT_PAGE, "commit"); + worker_register_job_name(RRDENG_FLUSH_PAGES, "flush"); + worker_register_job_name(RRDENG_SHUTDOWN, "shutdown"); + worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru"); + worker_register_job_name(RRDENG_QUIESCE, "quiesce"); + worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup"); + worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer"); + struct rrdengine_worker_config* wc = arg; struct rrdengine_instance *ctx = wc->ctx; uv_loop_t* loop; @@ -1175,8 +1210,11 @@ void rrdeng_worker(void* arg) fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; + int set_name = 0; while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { + worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); + worker_is_busy(RRDENG_MAX_OPCODE); rrdeng_cleanup_finished_threads(wc); /* wait for commands */ @@ -1193,6 +1231,9 @@ void rrdeng_worker(void* arg) opcode = cmd.opcode; ++cmd_batch_size; + if(likely(opcode != RRDENG_NOOP)) + worker_is_busy(opcode); + switch (opcode) { case RRDENG_NOOP: /* the command queue was empty, do nothing */ @@ -1219,6 +1260,10 @@ void rrdeng_worker(void* arg) break; case RRDENG_READ_EXTENT: do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); + if (unlikely(!set_name)) { + set_name = 1; + uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); + } break; case RRDENG_COMMIT_PAGE: do_commit_transaction(wc, STORE_DATA, NULL); @@ -1265,6 +1310,7 @@ void rrdeng_worker(void* arg) fatal_assert(0 == uv_loop_close(loop)); freez(loop); + worker_unregister(); return; error_after_timer_init: @@ -1277,6 +1323,7 @@ error_after_loop_init: wc->error = UV_EAGAIN; /* wake up initialization thread */ completion_mark_complete(&ctx->rrdengine_completion); + worker_unregister(); } /* C entry point for development purposes diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index b0c8e4d0..c6f89a37 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -34,6 +34,28 @@ struct rrdengine_instance; #define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u" #define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" +struct rrdeng_collect_handle { + struct rrdeng_page_descr *descr, *prev_descr; + unsigned long page_correlation_id; + struct rrdengine_instance *ctx; + // set to 1 when this dimension is not page aligned with the other dimensions in the chart + uint8_t unaligned_page; +}; + +struct rrdeng_query_handle { + struct rrdeng_page_descr *descr; + struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + time_t next_page_time; + time_t now; + unsigned position; + unsigned entries; + storage_number *page; + usec_t page_end_time; + uint32_t page_length; + usec_t dt; + time_t dt_sec; +}; typedef enum { RRDENGINE_STATUS_UNINITIALIZED = 0, @@ -227,6 +249,9 @@ struct rrdengine_instance { struct rrdengine_statistics stats; }; +extern void *dbengine_page_alloc(void); +extern void dbengine_page_free(void *page); + extern int init_rrd_files(struct rrdengine_instance *ctx); extern void finalize_rrd_files(struct rrdengine_instance *ctx); extern void rrdeng_test_quota(struct rrdengine_worker_config* wc); diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 6ebee145..76010a7c 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -126,12 +126,13 @@ void rrdeng_store_metric_init(RRDDIM *rd) struct pg_cache_page_index *page_index; ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); - handle = &rd->state->handle.rrdeng; - handle->ctx = ctx; + handle = callocz(1, sizeof(struct rrdeng_collect_handle)); + handle->ctx = ctx; handle->descr = NULL; handle->prev_descr = NULL; handle->unaligned_page = 0; + rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle; page_index = rd->state->page_index; uv_rwlock_wrlock(&page_index->lock); @@ -162,7 +163,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) struct rrdengine_instance *ctx; struct rrdeng_page_descr *descr; - handle = &rd->state->handle.rrdeng; + handle = (struct rrdeng_collect_handle *)rd->state->handle; ctx = handle->ctx; if (unlikely(!ctx)) return; @@ -202,7 +203,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) /* handle->prev_descr = descr;*/ } } else { - freez(descr->pg_cache_descr->page); + dbengine_page_free(descr->pg_cache_descr->page); rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); freez(descr); } @@ -211,14 +212,13 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) { - struct rrdeng_collect_handle *handle; + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle; struct rrdengine_instance *ctx; struct page_cache *pg_cache; struct rrdeng_page_descr *descr; storage_number *page; uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; - handle = &rd->state->handle.rrdeng; ctx = handle->ctx; pg_cache = &ctx->pg_cache; descr = handle->descr; @@ -301,7 +301,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) struct pg_cache_page_index *page_index; uint8_t can_delete_metric = 0; - handle = &rd->state->handle.rrdeng; + handle = (struct rrdeng_collect_handle *)rd->state->handle; ctx = handle->ctx; page_index = rd->state->page_index; rrdeng_store_metric_flush_current_page(rd); @@ -314,6 +314,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) can_delete_metric = 1; } uv_rwlock_wrunlock(&page_index->lock); + freez(handle); return can_delete_metric; } @@ -406,6 +407,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e is_first_region_initialized = 0; region_points = 0; + int is_out_of_order_reported = 0; /* pages loop */ for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) { old_prev = prev; @@ -446,7 +448,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e is_metric_out_of_order = 1; if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) { if (unlikely(is_metric_out_of_order)) - info("Ignoring metric with out of order timestamp."); + is_out_of_order_reported++; continue; /* next entry */ } /* here is a valid metric */ @@ -519,6 +521,8 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e freez(region_info_array); } } + if (is_out_of_order_reported) + info("Ignored %d metrics with out of order timestamp in %u regions.", is_out_of_order_reported, regions); return regions; } @@ -535,12 +539,14 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; - handle = &rrdimm_handle->rrdeng; + + handle = callocz(1, sizeof(struct rrdeng_query_handle)); handle->next_page_time = start_time; handle->now = start_time; handle->position = 0; handle->ctx = ctx; handle->descr = NULL; + rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) @@ -548,102 +554,109 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand handle->next_page_time = INVALID_TIME; } -/* Returns the metric and sets its timestamp into current_time */ -storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) -{ - struct rrdeng_query_handle *handle; - struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; - storage_number *page, ret; - unsigned position, entries; - usec_t next_page_time = 0, current_position_time, page_end_time = 0; +static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; + uint32_t page_length; + usec_t page_end_time; + unsigned position; - handle = &rrdimm_handle->rrdeng; - if (unlikely(INVALID_TIME == handle->next_page_time)) { - return SN_EMPTY_SLOT; - } - ctx = handle->ctx; - if (unlikely(NULL == (descr = handle->descr))) { - /* it's the first call */ - next_page_time = handle->next_page_time * USEC_PER_SEC; - } else { - pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); - } - position = handle->position + 1; + if (likely(descr)) { + // Drop old page's reference - if (unlikely(NULL == descr || - position >= (page_length / sizeof(storage_number)))) { - /* We need to get a new page */ - if (descr) { - /* Drop old page's reference */ #ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(ctx, descr); - handle->descr = NULL; - handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; - if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { - goto no_more_metrics; - } - next_page_time = handle->next_page_time * USEC_PER_SEC; - } - descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, - next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); - if (NULL == descr) { - goto no_more_metrics; - } + pg_cache_put(ctx, descr); + handle->descr = NULL; + handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1; + + if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) + return 1; + } + + usec_t next_page_time = handle->next_page_time * USEC_PER_SEC; + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); + if (NULL == descr) + return 1; + #ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); + rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); #endif - handle->descr = descr; - pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); - if (unlikely(INVALID_TIME == descr->start_time || - INVALID_TIME == page_end_time)) { - goto no_more_metrics; - } - if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { - /* we're in the middle of the page somewhere */ - entries = page_length / sizeof(storage_number); - position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / - (page_end_time - descr->start_time); - } else { - position = 0; - } + + handle->descr = descr; + pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); + if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time)) + return 1; + + if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { + // we're in the middle of the page somewhere + unsigned entries = page_length / sizeof(storage_number); + position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / + (page_end_time - descr->start_time); } - page = descr->pg_cache_descr->page; - ret = page[position]; - entries = page_length / sizeof(storage_number); - if (entries > 1) { - usec_t dt; + else + position = 0; + + handle->page_end_time = page_end_time; + handle->page_length = page_length; + handle->page = descr->pg_cache_descr->page; + usec_t entries = handle->entries = page_length / sizeof(storage_number); + if (likely(entries > 1)) + handle->dt = (page_end_time - descr->start_time) / (entries - 1); + else + handle->dt = 0; - dt = (page_end_time - descr->start_time) / (entries - 1); - current_position_time = descr->start_time + position * dt; - } else { - current_position_time = descr->start_time; + handle->dt_sec = handle->dt / USEC_PER_SEC; + handle->position = position; + + return 0; +} + +/* Returns the metric and sets its timestamp into current_time */ +storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + + if (unlikely(INVALID_TIME == handle->next_page_time)) + return SN_EMPTY_SLOT; + + struct rrdeng_page_descr *descr = handle->descr; + unsigned position = handle->position + 1; + time_t now = handle->now + handle->dt_sec; + + if (unlikely(!descr || position >= handle->entries)) { + // We need to get a new page + if(rrdeng_load_page_next(rrdimm_handle)) { + // next calls will not load any more metrics + handle->next_page_time = INVALID_TIME; + return SN_EMPTY_SLOT; + } + + descr = handle->descr; + position = handle->position; + now = (descr->start_time + position * handle->dt) / USEC_PER_SEC; } + + storage_number ret = handle->page[position]; handle->position = position; - handle->now = current_position_time / USEC_PER_SEC; -/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); - The above assertion is an approximation and needs to take update_every into account */ - if (unlikely(handle->now >= rrdimm_handle->end_time)) { - /* next calls will not load any more metrics */ + handle->now = now; + + if (unlikely(now >= rrdimm_handle->end_time)) { + // next calls will not load any more metrics handle->next_page_time = INVALID_TIME; } - *current_time = handle->now; - return ret; -no_more_metrics: - handle->next_page_time = INVALID_TIME; - return SN_EMPTY_SLOT; + *current_time = now; + return ret; } int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle; - - handle = &rrdimm_handle->rrdeng; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; return (INVALID_TIME == handle->next_page_time); } @@ -652,19 +665,20 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) */ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle; - struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; - handle = &rrdimm_handle->rrdeng; - ctx = handle->ctx; - descr = handle->descr; if (descr) { #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif pg_cache_put(ctx, descr); } + + // whatever is allocated at rrdeng_load_metric_init() should be freed here + freez(handle); + rrdimm_handle->handle = NULL; } time_t rrdeng_metric_latest_time(RRDDIM *rd) @@ -724,7 +738,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde descr = pg_cache_create_descr(); descr->id = id; /* TODO: add page type: metric, log, something? */ - page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ + page = dbengine_page_alloc(); /*TODO: add page size */ rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; pg_cache_descr->page = page; @@ -949,7 +963,7 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p /* wait for worker thread to initialize */ completion_wait_for(&ctx->rrdengine_completion); completion_destroy(&ctx->rrdengine_completion); - uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); + uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER"); if (ctx->worker_config.error) { goto error_after_rrdeng_worker; } |