summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-xdatabase/engine/rrdengineapi.c202
1 files changed, 108 insertions, 94 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 6ebee145..76010a7c 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -126,12 +126,13 @@ void rrdeng_store_metric_init(RRDDIM *rd)
struct pg_cache_page_index *page_index;
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
- handle = &rd->state->handle.rrdeng;
- handle->ctx = ctx;
+ handle = callocz(1, sizeof(struct rrdeng_collect_handle));
+ handle->ctx = ctx;
handle->descr = NULL;
handle->prev_descr = NULL;
handle->unaligned_page = 0;
+ rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle;
page_index = rd->state->page_index;
uv_rwlock_wrlock(&page_index->lock);
@@ -162,7 +163,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
- handle = &rd->state->handle.rrdeng;
+ handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
if (unlikely(!ctx))
return;
@@ -202,7 +203,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
/* handle->prev_descr = descr;*/
}
} else {
- freez(descr->pg_cache_descr->page);
+ dbengine_page_free(descr->pg_cache_descr->page);
rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
freez(descr);
}
@@ -211,14 +212,13 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
- struct rrdeng_collect_handle *handle;
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
struct rrdeng_page_descr *descr;
storage_number *page;
uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
- handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;
@@ -301,7 +301,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
struct pg_cache_page_index *page_index;
uint8_t can_delete_metric = 0;
- handle = &rd->state->handle.rrdeng;
+ handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
page_index = rd->state->page_index;
rrdeng_store_metric_flush_current_page(rd);
@@ -314,6 +314,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
can_delete_metric = 1;
}
uv_rwlock_wrunlock(&page_index->lock);
+ freez(handle);
return can_delete_metric;
}
@@ -406,6 +407,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
is_first_region_initialized = 0;
region_points = 0;
+ int is_out_of_order_reported = 0;
/* pages loop */
for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) {
old_prev = prev;
@@ -446,7 +448,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
is_metric_out_of_order = 1;
if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) {
if (unlikely(is_metric_out_of_order))
- info("Ignoring metric with out of order timestamp.");
+ is_out_of_order_reported++;
continue; /* next entry */
}
/* here is a valid metric */
@@ -519,6 +521,8 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
freez(region_info_array);
}
}
+ if (is_out_of_order_reported)
+ info("Ignored %d metrics with out of order timestamp in %u regions.", is_out_of_order_reported, regions);
return regions;
}
@@ -535,12 +539,14 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
rrdimm_handle->start_time = start_time;
rrdimm_handle->end_time = end_time;
- handle = &rrdimm_handle->rrdeng;
+
+ handle = callocz(1, sizeof(struct rrdeng_query_handle));
handle->next_page_time = start_time;
handle->now = start_time;
handle->position = 0;
handle->ctx = ctx;
handle->descr = NULL;
+ rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
NULL, &handle->page_index);
if (unlikely(NULL == handle->page_index || 0 == pages_nr))
@@ -548,102 +554,109 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
handle->next_page_time = INVALID_TIME;
}
-/* Returns the metric and sets its timestamp into current_time */
-storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
-{
- struct rrdeng_query_handle *handle;
- struct rrdengine_instance *ctx;
- struct rrdeng_page_descr *descr;
- storage_number *page, ret;
- unsigned position, entries;
- usec_t next_page_time = 0, current_position_time, page_end_time = 0;
+static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+
+ struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
+
uint32_t page_length;
+ usec_t page_end_time;
+ unsigned position;
- handle = &rrdimm_handle->rrdeng;
- if (unlikely(INVALID_TIME == handle->next_page_time)) {
- return SN_EMPTY_SLOT;
- }
- ctx = handle->ctx;
- if (unlikely(NULL == (descr = handle->descr))) {
- /* it's the first call */
- next_page_time = handle->next_page_time * USEC_PER_SEC;
- } else {
- pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
- }
- position = handle->position + 1;
+ if (likely(descr)) {
+ // Drop old page's reference
- if (unlikely(NULL == descr ||
- position >= (page_length / sizeof(storage_number)))) {
- /* We need to get a new page */
- if (descr) {
- /* Drop old page's reference */
#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(ctx, descr);
- handle->descr = NULL;
- handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
- if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
- goto no_more_metrics;
- }
- next_page_time = handle->next_page_time * USEC_PER_SEC;
- }
- descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
- next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
- if (NULL == descr) {
- goto no_more_metrics;
- }
+ pg_cache_put(ctx, descr);
+ handle->descr = NULL;
+ handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1;
+
+ if (unlikely(handle->next_page_time > rrdimm_handle->end_time))
+ return 1;
+ }
+
+ usec_t next_page_time = handle->next_page_time * USEC_PER_SEC;
+ descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
+ if (NULL == descr)
+ return 1;
+
#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
#endif
- handle->descr = descr;
- pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
- if (unlikely(INVALID_TIME == descr->start_time ||
- INVALID_TIME == page_end_time)) {
- goto no_more_metrics;
- }
- if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
- /* we're in the middle of the page somewhere */
- entries = page_length / sizeof(storage_number);
- position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
- (page_end_time - descr->start_time);
- } else {
- position = 0;
- }
+
+ handle->descr = descr;
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
+ if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time))
+ return 1;
+
+ if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
+ // we're in the middle of the page somewhere
+ unsigned entries = page_length / sizeof(storage_number);
+ position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
+ (page_end_time - descr->start_time);
}
- page = descr->pg_cache_descr->page;
- ret = page[position];
- entries = page_length / sizeof(storage_number);
- if (entries > 1) {
- usec_t dt;
+ else
+ position = 0;
+
+ handle->page_end_time = page_end_time;
+ handle->page_length = page_length;
+ handle->page = descr->pg_cache_descr->page;
+ usec_t entries = handle->entries = page_length / sizeof(storage_number);
+ if (likely(entries > 1))
+ handle->dt = (page_end_time - descr->start_time) / (entries - 1);
+ else
+ handle->dt = 0;
- dt = (page_end_time - descr->start_time) / (entries - 1);
- current_position_time = descr->start_time + position * dt;
- } else {
- current_position_time = descr->start_time;
+ handle->dt_sec = handle->dt / USEC_PER_SEC;
+ handle->position = position;
+
+ return 0;
+}
+
+/* Returns the metric and sets its timestamp into current_time */
+storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+
+ if (unlikely(INVALID_TIME == handle->next_page_time))
+ return SN_EMPTY_SLOT;
+
+ struct rrdeng_page_descr *descr = handle->descr;
+ unsigned position = handle->position + 1;
+ time_t now = handle->now + handle->dt_sec;
+
+ if (unlikely(!descr || position >= handle->entries)) {
+ // We need to get a new page
+ if(rrdeng_load_page_next(rrdimm_handle)) {
+ // next calls will not load any more metrics
+ handle->next_page_time = INVALID_TIME;
+ return SN_EMPTY_SLOT;
+ }
+
+ descr = handle->descr;
+ position = handle->position;
+ now = (descr->start_time + position * handle->dt) / USEC_PER_SEC;
}
+
+ storage_number ret = handle->page[position];
handle->position = position;
- handle->now = current_position_time / USEC_PER_SEC;
-/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
- The above assertion is an approximation and needs to take update_every into account */
- if (unlikely(handle->now >= rrdimm_handle->end_time)) {
- /* next calls will not load any more metrics */
+ handle->now = now;
+
+ if (unlikely(now >= rrdimm_handle->end_time)) {
+ // next calls will not load any more metrics
handle->next_page_time = INVALID_TIME;
}
- *current_time = handle->now;
- return ret;
-no_more_metrics:
- handle->next_page_time = INVALID_TIME;
- return SN_EMPTY_SLOT;
+ *current_time = now;
+ return ret;
}
int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
{
- struct rrdeng_query_handle *handle;
-
- handle = &rrdimm_handle->rrdeng;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
return (INVALID_TIME == handle->next_page_time);
}
@@ -652,19 +665,20 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
*/
void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
- struct rrdeng_query_handle *handle;
- struct rrdengine_instance *ctx;
- struct rrdeng_page_descr *descr;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+ struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
- handle = &rrdimm_handle->rrdeng;
- ctx = handle->ctx;
- descr = handle->descr;
if (descr) {
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
}
+
+ // whatever is allocated at rrdeng_load_metric_init() should be freed here
+ freez(handle);
+ rrdimm_handle->handle = NULL;
}
time_t rrdeng_metric_latest_time(RRDDIM *rd)
@@ -724,7 +738,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde
descr = pg_cache_create_descr();
descr->id = id; /* TODO: add page type: metric, log, something? */
- page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ page = dbengine_page_alloc(); /*TODO: add page size */
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
@@ -949,7 +963,7 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p
/* wait for worker thread to initialize */
completion_wait_for(&ctx->rrdengine_completion);
completion_destroy(&ctx->rrdengine_completion);
- uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
+ uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER");
if (ctx->worker_config.error) {
goto error_after_rrdeng_worker;
}