summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/journalfile.c1
-rw-r--r--database/engine/metadata_log/logfile.c20
-rw-r--r--database/engine/pagecache.c16
-rw-r--r--database/engine/pagecache.h1
-rw-r--r--database/engine/rrdengine.c53
-rw-r--r--database/engine/rrdengine.h25
-rwxr-xr-xdatabase/engine/rrdengineapi.c202
7 files changed, 203 insertions, 115 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 1541eb10a..0b3d3eeb8 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -84,6 +84,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
+ memset(ctx->commit_log.buf, 0, buf_size);
buf_pos = ctx->commit_log.buf_pos = 0;
ctx->commit_log.buf_size = buf_size;
}
diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c
index f5bd9b2d2..07eb9b6fe 100644
--- a/database/engine/metadata_log/logfile.c
+++ b/database/engine/metadata_log/logfile.c
@@ -375,19 +375,15 @@ static int scan_metalog_files(struct metalog_instance *ctx)
struct metalog_pluginsd_state metalog_parser_state;
metalog_pluginsd_state_init(&metalog_parser_state, ctx);
- PARSER_USER_OBJECT metalog_parser_object;
- metalog_parser_object.enabled = cd.enabled;
- metalog_parser_object.host = ctx->rrdeng_ctx->host;
- metalog_parser_object.cd = &cd;
- metalog_parser_object.trust_durations = 0;
- metalog_parser_object.private = &metalog_parser_state;
+ PARSER_USER_OBJECT metalog_parser_object = {
+ .enabled = cd.enabled,
+ .host = ctx->rrdeng_ctx->host,
+ .cd = &cd,
+ .trust_durations = 0,
+ .private = &metalog_parser_state
+ };
PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT);
- if (unlikely(!parser)) {
- error("Failed to initialize metadata log parser.");
- failed_to_load = matched_files;
- goto after_failed_to_parse;
- }
parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host);
parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid);
parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context);
@@ -428,10 +424,8 @@ static int scan_metalog_files(struct metalog_instance *ctx)
size_t count __maybe_unused = metalog_parser_object.count;
debug(D_METADATALOG, "Parsing count=%u", (unsigned)count);
-after_failed_to_parse:
freez(metalogfiles);
-
return matched_files;
}
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 40e24b321..cddbf9e1f 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -356,7 +356,7 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- freez(pg_cache_descr->page);
+ dbengine_page_free(pg_cache_descr->page);
pg_cache_descr->page = NULL;
pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
pg_cache_release_pages_unsafe(ctx, 1);
@@ -437,7 +437,6 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
if (unlikely(0 == ret)) {
uv_rwlock_wrunlock(&page_index->lock);
- error("Page under deletion was not in index.");
if (unlikely(debug_flags & D_RRDENGINE)) {
print_page_descr(descr);
}
@@ -1067,10 +1066,13 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
page_not_in_cache = 0;
uv_rwlock_rdlock(&page_index->lock);
+ int retry_count = 0;
while (1) {
descr = find_first_page_in_time_range(page_index, start_time, end_time);
- if (NULL == descr || 0 == descr->page_length) {
+ if (NULL == descr || 0 == descr->page_length || retry_count == MAX_PAGE_CACHE_RETRY_WAIT) {
/* non-empty page not found */
+ if (retry_count == MAX_PAGE_CACHE_RETRY_WAIT)
+ error_report("Page cache timeout while waiting for page %p : returning FAIL", descr);
uv_rwlock_rdunlock(&page_index->lock);
pg_cache_release_pages(ctx, 1);
@@ -1114,7 +1116,11 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
print_page_cache_descr(descr);
if (!(flags & RRD_PAGE_POPULATED))
page_not_in_cache = 1;
- pg_cache_wait_event_unsafe(descr);
+
+ if (pg_cache_timedwait_event_unsafe(descr, 1) == UV_ETIMEDOUT) {
+ error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count);
+ ++retry_count;
+ }
rrdeng_page_descr_mutex_unlock(ctx, descr);
/* reset scan to find again */
@@ -1222,7 +1228,7 @@ void free_page_cache(struct rrdengine_instance *ctx)
/* Check rrdenglocking.c */
pg_cache_descr = descr->pg_cache_descr;
if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
- freez(pg_cache_descr->page);
+ dbengine_page_free(pg_cache_descr->page);
bytes_freed += RRDENG_BLOCK_SIZE;
}
rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index d5350ef56..0ba4639ce 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -11,6 +11,7 @@ struct extent_info;
struct rrdeng_page_descr;
#define INVALID_TIME (0)
+#define MAX_PAGE_CACHE_RETRY_WAIT (3)
/* Page flags */
#define RRD_PAGE_DIRTY (1LU << 0)
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index a975cfa6e..9f43f4456 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -11,8 +11,24 @@ rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT;
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)
+#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
+#endif
+
+void *dbengine_page_alloc() {
+ void *page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm);
+ if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()");
+ return page;
+}
+
+void dbengine_page_free(void *page) {
+ munmap(page, RRDENG_BLOCK_SIZE);
+}
+
static void sanity_check(void)
{
+ BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2));
+
/* Magic numbers must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
@@ -176,7 +192,7 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str
struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
- page = mallocz(RRDENG_BLOCK_SIZE);
+ page = dbengine_page_alloc();
descr = xt_io_descr->descr_array[i];
for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) {
/* care, we don't hold the descriptor mutex */
@@ -331,7 +347,7 @@ after_crc_check:
continue; /* Failed to reserve a suitable page */
is_prefetched_page = 1;
}
- page = mallocz(RRDENG_BLOCK_SIZE);
+ page = dbengine_page_alloc();
/* care, we don't hold the descriptor mutex */
if (have_read_error) {
@@ -735,6 +751,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
fatal("posix_memalign:%s", strerror(ret));
/* freez(xt_io_descr);*/
}
+ memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes));
(void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
xt_io_descr->descr_count = count;
@@ -1074,13 +1091,17 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
+ worker_is_busy(RRDENG_MAX_OPCODE + 1);
+
struct rrdengine_worker_config* wc = handle->data;
struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
- if (unlikely(!ctx->metalog_ctx->initialized))
+ if (unlikely(!ctx->metalog_ctx->initialized)) {
+ worker_is_idle();
return; /* Wait for the metadata log to initialize */
+ }
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
@@ -1122,12 +1143,26 @@ void timer_cb(uv_timer_t* handle)
debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
}
#endif
+
+ worker_is_idle();
}
#define MAX_CMD_BATCH_SIZE (256)
void rrdeng_worker(void* arg)
{
+ worker_register("DBENGINE");
+ worker_register_job_name(RRDENG_NOOP, "noop");
+ worker_register_job_name(RRDENG_READ_PAGE, "page read");
+ worker_register_job_name(RRDENG_READ_EXTENT, "extent read");
+ worker_register_job_name(RRDENG_COMMIT_PAGE, "commit");
+ worker_register_job_name(RRDENG_FLUSH_PAGES, "flush");
+ worker_register_job_name(RRDENG_SHUTDOWN, "shutdown");
+ worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru");
+ worker_register_job_name(RRDENG_QUIESCE, "quiesce");
+ worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup");
+ worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer");
+
struct rrdengine_worker_config* wc = arg;
struct rrdengine_instance *ctx = wc->ctx;
uv_loop_t* loop;
@@ -1175,8 +1210,11 @@ void rrdeng_worker(void* arg)
fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
+ int set_name = 0;
while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
+ worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
+ worker_is_busy(RRDENG_MAX_OPCODE);
rrdeng_cleanup_finished_threads(wc);
/* wait for commands */
@@ -1193,6 +1231,9 @@ void rrdeng_worker(void* arg)
opcode = cmd.opcode;
++cmd_batch_size;
+ if(likely(opcode != RRDENG_NOOP))
+ worker_is_busy(opcode);
+
switch (opcode) {
case RRDENG_NOOP:
/* the command queue was empty, do nothing */
@@ -1219,6 +1260,10 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_READ_EXTENT:
do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1);
+ if (unlikely(!set_name)) {
+ set_name = 1;
+ uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
+ }
break;
case RRDENG_COMMIT_PAGE:
do_commit_transaction(wc, STORE_DATA, NULL);
@@ -1265,6 +1310,7 @@ void rrdeng_worker(void* arg)
fatal_assert(0 == uv_loop_close(loop));
freez(loop);
+ worker_unregister();
return;
error_after_timer_init:
@@ -1277,6 +1323,7 @@ error_after_loop_init:
wc->error = UV_EAGAIN;
/* wake up initialization thread */
completion_mark_complete(&ctx->rrdengine_completion);
+ worker_unregister();
}
/* C entry point for development purposes
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index b0c8e4d02..c6f89a37a 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -34,6 +34,28 @@ struct rrdengine_instance;
#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+struct rrdeng_collect_handle {
+ struct rrdeng_page_descr *descr, *prev_descr;
+ unsigned long page_correlation_id;
+ struct rrdengine_instance *ctx;
+ // set to 1 when this dimension is not page aligned with the other dimensions in the chart
+ uint8_t unaligned_page;
+};
+
+struct rrdeng_query_handle {
+ struct rrdeng_page_descr *descr;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+ time_t next_page_time;
+ time_t now;
+ unsigned position;
+ unsigned entries;
+ storage_number *page;
+ usec_t page_end_time;
+ uint32_t page_length;
+ usec_t dt;
+ time_t dt_sec;
+};
typedef enum {
RRDENGINE_STATUS_UNINITIALIZED = 0,
@@ -227,6 +249,9 @@ struct rrdengine_instance {
struct rrdengine_statistics stats;
};
+extern void *dbengine_page_alloc(void);
+extern void dbengine_page_free(void *page);
+
extern int init_rrd_files(struct rrdengine_instance *ctx);
extern void finalize_rrd_files(struct rrdengine_instance *ctx);
extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 6ebee1459..76010a7c2 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -126,12 +126,13 @@ void rrdeng_store_metric_init(RRDDIM *rd)
struct pg_cache_page_index *page_index;
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
- handle = &rd->state->handle.rrdeng;
- handle->ctx = ctx;
+ handle = callocz(1, sizeof(struct rrdeng_collect_handle));
+ handle->ctx = ctx;
handle->descr = NULL;
handle->prev_descr = NULL;
handle->unaligned_page = 0;
+ rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle;
page_index = rd->state->page_index;
uv_rwlock_wrlock(&page_index->lock);
@@ -162,7 +163,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
- handle = &rd->state->handle.rrdeng;
+ handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
if (unlikely(!ctx))
return;
@@ -202,7 +203,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
/* handle->prev_descr = descr;*/
}
} else {
- freez(descr->pg_cache_descr->page);
+ dbengine_page_free(descr->pg_cache_descr->page);
rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
freez(descr);
}
@@ -211,14 +212,13 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
- struct rrdeng_collect_handle *handle;
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
struct rrdeng_page_descr *descr;
storage_number *page;
uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
- handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;
@@ -301,7 +301,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
struct pg_cache_page_index *page_index;
uint8_t can_delete_metric = 0;
- handle = &rd->state->handle.rrdeng;
+ handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
page_index = rd->state->page_index;
rrdeng_store_metric_flush_current_page(rd);
@@ -314,6 +314,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
can_delete_metric = 1;
}
uv_rwlock_wrunlock(&page_index->lock);
+ freez(handle);
return can_delete_metric;
}
@@ -406,6 +407,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
is_first_region_initialized = 0;
region_points = 0;
+ int is_out_of_order_reported = 0;
/* pages loop */
for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) {
old_prev = prev;
@@ -446,7 +448,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
is_metric_out_of_order = 1;
if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) {
if (unlikely(is_metric_out_of_order))
- info("Ignoring metric with out of order timestamp.");
+ is_out_of_order_reported++;
continue; /* next entry */
}
/* here is a valid metric */
@@ -519,6 +521,8 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
freez(region_info_array);
}
}
+ if (is_out_of_order_reported)
+ info("Ignored %d metrics with out of order timestamp in %u regions.", is_out_of_order_reported, regions);
return regions;
}
@@ -535,12 +539,14 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
rrdimm_handle->start_time = start_time;
rrdimm_handle->end_time = end_time;
- handle = &rrdimm_handle->rrdeng;
+
+ handle = callocz(1, sizeof(struct rrdeng_query_handle));
handle->next_page_time = start_time;
handle->now = start_time;
handle->position = 0;
handle->ctx = ctx;
handle->descr = NULL;
+ rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
NULL, &handle->page_index);
if (unlikely(NULL == handle->page_index || 0 == pages_nr))
@@ -548,102 +554,109 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
handle->next_page_time = INVALID_TIME;
}
-/* Returns the metric and sets its timestamp into current_time */
-storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
-{
- struct rrdeng_query_handle *handle;
- struct rrdengine_instance *ctx;
- struct rrdeng_page_descr *descr;
- storage_number *page, ret;
- unsigned position, entries;
- usec_t next_page_time = 0, current_position_time, page_end_time = 0;
+static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+
+ struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
+
uint32_t page_length;
+ usec_t page_end_time;
+ unsigned position;
- handle = &rrdimm_handle->rrdeng;
- if (unlikely(INVALID_TIME == handle->next_page_time)) {
- return SN_EMPTY_SLOT;
- }
- ctx = handle->ctx;
- if (unlikely(NULL == (descr = handle->descr))) {
- /* it's the first call */
- next_page_time = handle->next_page_time * USEC_PER_SEC;
- } else {
- pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
- }
- position = handle->position + 1;
+ if (likely(descr)) {
+ // Drop old page's reference
- if (unlikely(NULL == descr ||
- position >= (page_length / sizeof(storage_number)))) {
- /* We need to get a new page */
- if (descr) {
- /* Drop old page's reference */
#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(ctx, descr);
- handle->descr = NULL;
- handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
- if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
- goto no_more_metrics;
- }
- next_page_time = handle->next_page_time * USEC_PER_SEC;
- }
- descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
- next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
- if (NULL == descr) {
- goto no_more_metrics;
- }
+ pg_cache_put(ctx, descr);
+ handle->descr = NULL;
+ handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1;
+
+ if (unlikely(handle->next_page_time > rrdimm_handle->end_time))
+ return 1;
+ }
+
+ usec_t next_page_time = handle->next_page_time * USEC_PER_SEC;
+ descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
+ if (NULL == descr)
+ return 1;
+
#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
#endif
- handle->descr = descr;
- pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
- if (unlikely(INVALID_TIME == descr->start_time ||
- INVALID_TIME == page_end_time)) {
- goto no_more_metrics;
- }
- if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
- /* we're in the middle of the page somewhere */
- entries = page_length / sizeof(storage_number);
- position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
- (page_end_time - descr->start_time);
- } else {
- position = 0;
- }
+
+ handle->descr = descr;
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
+ if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time))
+ return 1;
+
+ if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
+ // we're in the middle of the page somewhere
+ unsigned entries = page_length / sizeof(storage_number);
+ position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
+ (page_end_time - descr->start_time);
}
- page = descr->pg_cache_descr->page;
- ret = page[position];
- entries = page_length / sizeof(storage_number);
- if (entries > 1) {
- usec_t dt;
+ else
+ position = 0;
+
+ handle->page_end_time = page_end_time;
+ handle->page_length = page_length;
+ handle->page = descr->pg_cache_descr->page;
+ usec_t entries = handle->entries = page_length / sizeof(storage_number);
+ if (likely(entries > 1))
+ handle->dt = (page_end_time - descr->start_time) / (entries - 1);
+ else
+ handle->dt = 0;
- dt = (page_end_time - descr->start_time) / (entries - 1);
- current_position_time = descr->start_time + position * dt;
- } else {
- current_position_time = descr->start_time;
+ handle->dt_sec = handle->dt / USEC_PER_SEC;
+ handle->position = position;
+
+ return 0;
+}
+
+/* Returns the metric and sets its timestamp into current_time */
+storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+
+ if (unlikely(INVALID_TIME == handle->next_page_time))
+ return SN_EMPTY_SLOT;
+
+ struct rrdeng_page_descr *descr = handle->descr;
+ unsigned position = handle->position + 1;
+ time_t now = handle->now + handle->dt_sec;
+
+ if (unlikely(!descr || position >= handle->entries)) {
+ // We need to get a new page
+ if(rrdeng_load_page_next(rrdimm_handle)) {
+ // next calls will not load any more metrics
+ handle->next_page_time = INVALID_TIME;
+ return SN_EMPTY_SLOT;
+ }
+
+ descr = handle->descr;
+ position = handle->position;
+ now = (descr->start_time + position * handle->dt) / USEC_PER_SEC;
}
+
+ storage_number ret = handle->page[position];
handle->position = position;
- handle->now = current_position_time / USEC_PER_SEC;
-/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
- The above assertion is an approximation and needs to take update_every into account */
- if (unlikely(handle->now >= rrdimm_handle->end_time)) {
- /* next calls will not load any more metrics */
+ handle->now = now;
+
+ if (unlikely(now >= rrdimm_handle->end_time)) {
+ // next calls will not load any more metrics
handle->next_page_time = INVALID_TIME;
}
- *current_time = handle->now;
- return ret;
-no_more_metrics:
- handle->next_page_time = INVALID_TIME;
- return SN_EMPTY_SLOT;
+ *current_time = now;
+ return ret;
}
int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
{
- struct rrdeng_query_handle *handle;
-
- handle = &rrdimm_handle->rrdeng;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
return (INVALID_TIME == handle->next_page_time);
}
@@ -652,19 +665,20 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
*/
void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
- struct rrdeng_query_handle *handle;
- struct rrdengine_instance *ctx;
- struct rrdeng_page_descr *descr;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+ struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
- handle = &rrdimm_handle->rrdeng;
- ctx = handle->ctx;
- descr = handle->descr;
if (descr) {
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
}
+
+ // whatever is allocated at rrdeng_load_metric_init() should be freed here
+ freez(handle);
+ rrdimm_handle->handle = NULL;
}
time_t rrdeng_metric_latest_time(RRDDIM *rd)
@@ -724,7 +738,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde
descr = pg_cache_create_descr();
descr->id = id; /* TODO: add page type: metric, log, something? */
- page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ page = dbengine_page_alloc(); /*TODO: add page size */
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
@@ -949,7 +963,7 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p
/* wait for worker thread to initialize */
completion_wait_for(&ctx->rrdengine_completion);
completion_destroy(&ctx->rrdengine_completion);
- uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
+ uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER");
if (ctx->worker_config.error) {
goto error_after_rrdeng_worker;
}