summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rw-r--r--database/engine/rrdengineapi.c298
1 files changed, 264 insertions, 34 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index a87ce6d6..bf373f31 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -218,6 +218,208 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
}
}
+/* Returns 1 if the data collection interval is well defined, 0 otherwise */
+static int metrics_with_known_interval(struct rrdeng_page_descr *descr)
+{
+ unsigned page_entries;
+
+ if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time))
+ return 0;
+ page_entries = descr->page_length / sizeof(storage_number);
+ if (likely(page_entries > 1)) {
+ return 1;
+ }
+ return 0;
+}
+
+static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
+{
+ return (uint32_t *)&page_info->scratch[0];
+}
+
+static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
+{
+ return (uint32_t *)&page_info->scratch[sizeof(uint32_t)];
+}
+
+/**
+ * Calculates the regions of different data collection intervals in a netdata chart in the time range
+ * [start_time,end_time]. This call takes the netdata chart read lock.
+ * @param st the netdata chart whose data collection interval boundaries are calculated.
+ * @param start_time inclusive starting time in usec
+ * @param end_time inclusive ending time in usec
+ * @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a
+ * reference dimension that that have different data collection intervals and overlap with the time range
+ * [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set
+ * to NULL nothing was allocated.
+ * @param max_intervalp is derefenced and set to be the largest data collection interval of all regions.
+ * @return number of regions with different data collection intervals.
+ */
+unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
+ struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp)
+{
+ struct pg_cache_page_index *page_index;
+ struct rrdengine_instance *ctx;
+ unsigned pages_nr;
+ RRDDIM *rd_iter, *rd;
+ struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev;
+ unsigned i, j, page_entries, region_points, page_points, regions, max_interval;
+ time_t now;
+ usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page;
+ struct rrdeng_region_info *region_info_array;
+ uint8_t is_first_region_initialized;
+
+ ctx = st->rrdhost->rrdeng_ctx;
+ regions = 1;
+ *max_intervalp = max_interval = 0;
+ region_info_array = NULL;
+ *region_info_arrayp = NULL;
+ page_info_array = NULL;
+
+ rrdset_rdlock(st);
+ for(rd_iter = st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) {
+ /*
+ * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions
+ * but it is a best effort approximation with a bias towards older metrics in a chart. It
+ * matches netdata behaviour in the sense that dimensions are generally aligned in a chart
+ * and older dimensions contain more information about the time range. It does not work well
+ * for metrics that have recently stopped being collected.
+ */
+ curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid,
+ start_time * USEC_PER_SEC, end_time * USEC_PER_SEC);
+ if (INVALID_TIME != curr_time && curr_time < min_time) {
+ rd = rd_iter;
+ min_time = curr_time;
+ }
+ }
+ rrdset_unlock(st);
+ if (NULL == rd) {
+ return 1;
+ }
+ pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
+ &page_info_array, &page_index);
+ if (pages_nr) {
+ /* conservative allocation, will reduce the size later if necessary */
+ region_info_array = mallocz(sizeof(*region_info_array) * pages_nr);
+ }
+ is_first_region_initialized = 0;
+ region_points = 0;
+
+ /* pages loop */
+ for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) {
+ old_prev = prev;
+ prev = curr;
+ curr = &page_info_array[i];
+ *pginfo_to_points(curr) = 0; /* initialize to invalid page */
+ *pginfo_to_dt(curr) = 0; /* no known data collection interval yet */
+ if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time)) {
+ info("Ignoring page with invalid timestamp.");
+ prev = old_prev;
+ continue;
+ }
+ page_entries = curr->page_length / sizeof(storage_number);
+ assert(0 != page_entries);
+ if (likely(1 != page_entries)) {
+ dt = (curr->end_time - curr->start_time) / (page_entries - 1);
+ *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt);
+ if (unlikely(0 == *pginfo_to_dt(curr)))
+ *pginfo_to_dt(curr) = 1;
+ } else {
+ dt = 0;
+ }
+ for (j = 0, page_points = 0 ; j < page_entries ; ++j) {
+ uint8_t is_metric_out_of_order, is_metric_earlier_than_range;
+
+ is_metric_earlier_than_range = 0;
+ is_metric_out_of_order = 0;
+
+ current_position_time = curr->start_time + j * dt;
+ now = current_position_time / USEC_PER_SEC;
+ if (now > end_time) { /* there will be no more pages in the time range */
+ break;
+ }
+ if (now < start_time)
+ is_metric_earlier_than_range = 1;
+ if (unlikely(current_position_time < max_time)) /* just went back in time */
+ is_metric_out_of_order = 1;
+ if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) {
+ if (unlikely(is_metric_out_of_order))
+ info("Ignoring metric with out of order timestamp.");
+ continue; /* next entry */
+ }
+ /* here is a valid metric */
+ ++page_points;
+ region_info_array[regions - 1].points = ++region_points;
+ max_time = current_position_time;
+ if (1 == page_points)
+ first_valid_time_in_page = current_position_time;
+ if (unlikely(!is_first_region_initialized)) {
+ assert(1 == regions);
+ /* this is the first region */
+ region_info_array[0].start_time = current_position_time;
+ is_first_region_initialized = 1;
+ }
+ }
+ *pginfo_to_points(curr) = page_points;
+ if (0 == page_points) {
+ prev = old_prev;
+ continue;
+ }
+
+ if (unlikely(0 == dt)) { /* unknown data collection interval */
+ assert(1 == page_points);
+
+ if (likely(NULL != prev)) { /* get interval from previous page */
+ *pginfo_to_dt(curr) = *pginfo_to_dt(prev);
+ } else { /* there is no previous page in the query */
+ struct rrdeng_page_info db_page_info;
+
+ /* go to database */
+ pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time,
+ metrics_with_known_interval, &db_page_info);
+ if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME ||
+ 0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */
+ *pginfo_to_dt(curr) = rd->update_every;
+ } else {
+ unsigned db_entries;
+ usec_t db_dt;
+
+ db_entries = db_page_info.page_length / sizeof(storage_number);
+ db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1);
+ *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt);
+ if (unlikely(0 == *pginfo_to_dt(curr)))
+ *pginfo_to_dt(curr) = 1;
+
+ }
+ }
+ }
+ if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) {
+ info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32,
+ *pginfo_to_dt(prev), *pginfo_to_dt(curr));
+ region_info_array[regions++ - 1].points -= page_points;
+ region_info_array[regions - 1].points = region_points = page_points;
+ region_info_array[regions - 1].start_time = first_valid_time_in_page;
+ }
+ if (*pginfo_to_dt(curr) > max_interval)
+ max_interval = *pginfo_to_dt(curr);
+ region_info_array[regions - 1].update_every = *pginfo_to_dt(curr);
+ }
+ if (page_info_array)
+ freez(page_info_array);
+ if (region_info_array) {
+ if (likely(is_first_region_initialized)) {
+ /* free unnecessary memory */
+ region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions);
+ *region_info_arrayp = region_info_array;
+ *max_intervalp = max_interval;
+ } else {
+ /* empty result */
+ freez(region_info_array);
+ }
+ }
+ return regions;
+}
+
/*
* Gets a handle for loading metrics from the database.
* The handle must be released with rrdeng_load_metric_final().
@@ -226,80 +428,108 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
+ unsigned pages_nr;
ctx = rd->rrdset->rrdhost->rrdeng_ctx;
rrdimm_handle->start_time = start_time;
rrdimm_handle->end_time = end_time;
handle = &rrdimm_handle->rrdeng;
+ handle->next_page_time = start_time;
handle->now = start_time;
- handle->dt = rd->rrdset->update_every;
+ handle->position = 0;
handle->ctx = ctx;
handle->descr = NULL;
- handle->page_index = pg_cache_preload(ctx, rd->state->rrdeng_uuid,
- start_time * USEC_PER_SEC, end_time * USEC_PER_SEC);
+ pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
+ NULL, &handle->page_index);
+ if (unlikely(NULL == handle->page_index || 0 == pages_nr))
+ /* there are no metrics to load */
+ handle->next_page_time = INVALID_TIME;
}
-storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle)
+/* Returns the metric and sets its timestamp into current_time */
+storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
storage_number *page, ret;
- unsigned position;
- usec_t point_in_time;
+ unsigned position, entries;
+ usec_t next_page_time, current_position_time;
handle = &rrdimm_handle->rrdeng;
- if (unlikely(INVALID_TIME == handle->now)) {
+ if (unlikely(INVALID_TIME == handle->next_page_time)) {
return SN_EMPTY_SLOT;
}
ctx = handle->ctx;
- point_in_time = handle->now * USEC_PER_SEC;
- descr = handle->descr;
-
- if (unlikely(NULL == handle->page_index)) {
- ret = SN_EMPTY_SLOT;
- goto out;
+ if (unlikely(NULL == (descr = handle->descr))) {
+ /* it's the first call */
+ next_page_time = handle->next_page_time * USEC_PER_SEC;
}
+ position = handle->position + 1;
+
if (unlikely(NULL == descr ||
- point_in_time < descr->start_time ||
- point_in_time > descr->end_time)) {
+ position >= (descr->page_length / sizeof(storage_number)))) {
+ /* We need to get a new page */
if (descr) {
+ /* Drop old page's reference */
+ handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1;
+ if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
+ goto no_more_metrics;
+ }
+ next_page_time = handle->next_page_time * USEC_PER_SEC;
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
handle->descr = NULL;
}
- descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
+ descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
+ next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
if (NULL == descr) {
- ret = SN_EMPTY_SLOT;
- goto out;
+ goto no_more_metrics;
}
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
#endif
handle->descr = descr;
- }
- if (unlikely(INVALID_TIME == descr->start_time ||
- INVALID_TIME == descr->end_time)) {
- ret = SN_EMPTY_SLOT;
- goto out;
+ if (unlikely(INVALID_TIME == descr->start_time ||
+ INVALID_TIME == descr->end_time)) {
+ goto no_more_metrics;
+ }
+ if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) {
+ /* we're in the middle of the page somewhere */
+ entries = descr->page_length / sizeof(storage_number);
+ position = ((uint64_t)(next_page_time - descr->start_time)) * entries /
+ (descr->end_time - descr->start_time + 1);
+ } else {
+ position = 0;
+ }
}
page = descr->pg_cache_descr->page;
- if (unlikely(descr->start_time == descr->end_time)) {
- ret = page[0];
- goto out;
- }
- position = ((uint64_t)(point_in_time - descr->start_time)) * (descr->page_length / sizeof(storage_number)) /
- (descr->end_time - descr->start_time + 1);
ret = page[position];
+ entries = descr->page_length / sizeof(storage_number);
+ if (entries > 1) {
+ usec_t dt;
-out:
- handle->now += handle->dt;
- if (unlikely(handle->now > rrdimm_handle->end_time)) {
- handle->now = INVALID_TIME;
+ dt = (descr->end_time - descr->start_time) / (entries - 1);
+ current_position_time = descr->start_time + position * dt;
+ } else {
+ current_position_time = descr->start_time;
}
+ handle->position = position;
+ handle->now = current_position_time / USEC_PER_SEC;
+/* assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
+ The above assertion is an approximation and needs to take update_every into account */
+ if (unlikely(handle->now >= rrdimm_handle->end_time)) {
+ /* next calls will not load any more metrics */
+ handle->next_page_time = INVALID_TIME;
+ }
+ *current_time = handle->now;
return ret;
+
+no_more_metrics:
+ handle->next_page_time = INVALID_TIME;
+ return SN_EMPTY_SLOT;
}
int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
@@ -307,7 +537,7 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
struct rrdeng_query_handle *handle;
handle = &rrdimm_handle->rrdeng;
- return (INVALID_TIME == handle->now);
+ return (INVALID_TIME == handle->next_page_time);
}
/*