summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/engine/rrdengineapi.c42
1 files changed, 22 insertions, 20 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index bf373f31..5fa23d8f 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -4,7 +4,7 @@
/* Default global database instance */
static struct rrdengine_instance default_global_ctx;
-int default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+int default_rrdeng_page_cache_mb = 32;
int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB;
/*
@@ -95,9 +95,8 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
if (likely(descr->page_length)) {
int ret, page_is_empty;
-#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
+
if (handle->prev_descr) {
/* unpin old second page */
pg_cache_put(ctx, handle->prev_descr);
@@ -185,16 +184,14 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
}
page = descr->pg_cache_descr->page;
page[descr->page_length / sizeof(number)] = number;
- descr->end_time = point_in_time;
- descr->page_length += sizeof(number);
+ pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number));
+
if (perfect_page_alignment)
rd->rrdset->rrddim_page_alignment = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time)) {
descr->start_time = point_in_time;
-#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, 1);
-#endif
pg_cache_insert(ctx, handle->page_index, descr);
} else {
pg_cache_add_new_metric_time(handle->page_index, descr);
@@ -312,8 +309,9 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
curr = &page_info_array[i];
*pginfo_to_points(curr) = 0; /* initialize to invalid page */
*pginfo_to_dt(curr) = 0; /* no known data collection interval yet */
- if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time)) {
- info("Ignoring page with invalid timestamp.");
+ if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time ||
+ curr->end_time < curr->start_time)) {
+ info("Ignoring page with invalid timestamps.");
prev = old_prev;
continue;
}
@@ -366,7 +364,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
continue;
}
- if (unlikely(0 == dt)) { /* unknown data collection interval */
+ if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */
assert(1 == page_points);
if (likely(NULL != prev)) { /* get interval from previous page */
@@ -454,7 +452,8 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position, entries;
- usec_t next_page_time, current_position_time;
+ usec_t next_page_time, current_position_time, page_end_time;
+ uint32_t page_length;
handle = &rrdimm_handle->rrdeng;
if (unlikely(INVALID_TIME == handle->next_page_time)) {
@@ -464,15 +463,17 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
if (unlikely(NULL == (descr = handle->descr))) {
/* it's the first call */
next_page_time = handle->next_page_time * USEC_PER_SEC;
+ } else {
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
}
position = handle->position + 1;
if (unlikely(NULL == descr ||
- position >= (descr->page_length / sizeof(storage_number)))) {
+ position >= (page_length / sizeof(storage_number)))) {
/* We need to get a new page */
if (descr) {
/* Drop old page's reference */
- handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1;
+ handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
goto no_more_metrics;
}
@@ -492,26 +493,27 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
#endif
handle->descr = descr;
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
if (unlikely(INVALID_TIME == descr->start_time ||
- INVALID_TIME == descr->end_time)) {
+ INVALID_TIME == page_end_time)) {
goto no_more_metrics;
}
- if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) {
+ if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
/* we're in the middle of the page somewhere */
- entries = descr->page_length / sizeof(storage_number);
- position = ((uint64_t)(next_page_time - descr->start_time)) * entries /
- (descr->end_time - descr->start_time + 1);
+ entries = page_length / sizeof(storage_number);
+ position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
+ (page_end_time - descr->start_time);
} else {
position = 0;
}
}
page = descr->pg_cache_descr->page;
ret = page[position];
- entries = descr->page_length / sizeof(storage_number);
+ entries = page_length / sizeof(storage_number);
if (entries > 1) {
usec_t dt;
- dt = (descr->end_time - descr->start_time) / (entries - 1);
+ dt = (page_end_time - descr->start_time) / (entries - 1);
current_position_time = descr->start_time + position * dt;
} else {
current_position_time = descr->start_time;