diff options
Diffstat (limited to '')
-rw-r--r-- | database/engine/rrdengineapi.c | 298 |
1 files changed, 264 insertions, 34 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index a87ce6d64..bf373f31c 100644 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -218,6 +218,208 @@ void rrdeng_store_metric_finalize(RRDDIM *rd) } } +/* Returns 1 if the data collection interval is well defined, 0 otherwise */ +static int metrics_with_known_interval(struct rrdeng_page_descr *descr) +{ + unsigned page_entries; + + if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time)) + return 0; + page_entries = descr->page_length / sizeof(storage_number); + if (likely(page_entries > 1)) { + return 1; + } + return 0; +} + +static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) +{ + return (uint32_t *)&page_info->scratch[0]; +} + +static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) +{ + return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; +} + +/** + * Calculates the regions of different data collection intervals in a netdata chart in the time range + * [start_time,end_time]. This call takes the netdata chart read lock. + * @param st the netdata chart whose data collection interval boundaries are calculated. + * @param start_time inclusive starting time in usec + * @param end_time inclusive ending time in usec + * @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a + * reference dimension that that have different data collection intervals and overlap with the time range + * [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set + * to NULL nothing was allocated. + * @param max_intervalp is derefenced and set to be the largest data collection interval of all regions. + * @return number of regions with different data collection intervals. + */ +unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp) +{ + struct pg_cache_page_index *page_index; + struct rrdengine_instance *ctx; + unsigned pages_nr; + RRDDIM *rd_iter, *rd; + struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev; + unsigned i, j, page_entries, region_points, page_points, regions, max_interval; + time_t now; + usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page; + struct rrdeng_region_info *region_info_array; + uint8_t is_first_region_initialized; + + ctx = st->rrdhost->rrdeng_ctx; + regions = 1; + *max_intervalp = max_interval = 0; + region_info_array = NULL; + *region_info_arrayp = NULL; + page_info_array = NULL; + + rrdset_rdlock(st); + for(rd_iter = st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { + /* + * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions + * but it is a best effort approximation with a bias towards older metrics in a chart. It + * matches netdata behaviour in the sense that dimensions are generally aligned in a chart + * and older dimensions contain more information about the time range. It does not work well + * for metrics that have recently stopped being collected. + */ + curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid, + start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); + if (INVALID_TIME != curr_time && curr_time < min_time) { + rd = rd_iter; + min_time = curr_time; + } + } + rrdset_unlock(st); + if (NULL == rd) { + return 1; + } + pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + &page_info_array, &page_index); + if (pages_nr) { + /* conservative allocation, will reduce the size later if necessary */ + region_info_array = mallocz(sizeof(*region_info_array) * pages_nr); + } + is_first_region_initialized = 0; + region_points = 0; + + /* pages loop */ + for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) { + old_prev = prev; + prev = curr; + curr = &page_info_array[i]; + *pginfo_to_points(curr) = 0; /* initialize to invalid page */ + *pginfo_to_dt(curr) = 0; /* no known data collection interval yet */ + if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time)) { + info("Ignoring page with invalid timestamp."); + prev = old_prev; + continue; + } + page_entries = curr->page_length / sizeof(storage_number); + assert(0 != page_entries); + if (likely(1 != page_entries)) { + dt = (curr->end_time - curr->start_time) / (page_entries - 1); + *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt); + if (unlikely(0 == *pginfo_to_dt(curr))) + *pginfo_to_dt(curr) = 1; + } else { + dt = 0; + } + for (j = 0, page_points = 0 ; j < page_entries ; ++j) { + uint8_t is_metric_out_of_order, is_metric_earlier_than_range; + + is_metric_earlier_than_range = 0; + is_metric_out_of_order = 0; + + current_position_time = curr->start_time + j * dt; + now = current_position_time / USEC_PER_SEC; + if (now > end_time) { /* there will be no more pages in the time range */ + break; + } + if (now < start_time) + is_metric_earlier_than_range = 1; + if (unlikely(current_position_time < max_time)) /* just went back in time */ + is_metric_out_of_order = 1; + if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) { + if (unlikely(is_metric_out_of_order)) + info("Ignoring metric with out of order timestamp."); + continue; /* next entry */ + } + /* here is a valid metric */ + ++page_points; + region_info_array[regions - 1].points = ++region_points; + max_time = current_position_time; + if (1 == page_points) + first_valid_time_in_page = current_position_time; + if (unlikely(!is_first_region_initialized)) { + assert(1 == regions); + /* this is the first region */ + region_info_array[0].start_time = current_position_time; + is_first_region_initialized = 1; + } + } + *pginfo_to_points(curr) = page_points; + if (0 == page_points) { + prev = old_prev; + continue; + } + + if (unlikely(0 == dt)) { /* unknown data collection interval */ + assert(1 == page_points); + + if (likely(NULL != prev)) { /* get interval from previous page */ + *pginfo_to_dt(curr) = *pginfo_to_dt(prev); + } else { /* there is no previous page in the query */ + struct rrdeng_page_info db_page_info; + + /* go to database */ + pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time, + metrics_with_known_interval, &db_page_info); + if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME || + 0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */ + *pginfo_to_dt(curr) = rd->update_every; + } else { + unsigned db_entries; + usec_t db_dt; + + db_entries = db_page_info.page_length / sizeof(storage_number); + db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1); + *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt); + if (unlikely(0 == *pginfo_to_dt(curr))) + *pginfo_to_dt(curr) = 1; + + } + } + } + if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) { + info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32, + *pginfo_to_dt(prev), *pginfo_to_dt(curr)); + region_info_array[regions++ - 1].points -= page_points; + region_info_array[regions - 1].points = region_points = page_points; + region_info_array[regions - 1].start_time = first_valid_time_in_page; + } + if (*pginfo_to_dt(curr) > max_interval) + max_interval = *pginfo_to_dt(curr); + region_info_array[regions - 1].update_every = *pginfo_to_dt(curr); + } + if (page_info_array) + freez(page_info_array); + if (region_info_array) { + if (likely(is_first_region_initialized)) { + /* free unnecessary memory */ + region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions); + *region_info_arrayp = region_info_array; + *max_intervalp = max_interval; + } else { + /* empty result */ + freez(region_info_array); + } + } + return regions; +} + /* * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). @@ -226,80 +428,108 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; + unsigned pages_nr; ctx = rd->rrdset->rrdhost->rrdeng_ctx; rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; handle = &rrdimm_handle->rrdeng; + handle->next_page_time = start_time; handle->now = start_time; - handle->dt = rd->rrdset->update_every; + handle->position = 0; handle->ctx = ctx; handle->descr = NULL; - handle->page_index = pg_cache_preload(ctx, rd->state->rrdeng_uuid, - start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); + pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + NULL, &handle->page_index); + if (unlikely(NULL == handle->page_index || 0 == pages_nr)) + /* there are no metrics to load */ + handle->next_page_time = INVALID_TIME; } -storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) +/* Returns the metric and sets its timestamp into current_time */ +storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; struct rrdeng_page_descr *descr; storage_number *page, ret; - unsigned position; - usec_t point_in_time; + unsigned position, entries; + usec_t next_page_time, current_position_time; handle = &rrdimm_handle->rrdeng; - if (unlikely(INVALID_TIME == handle->now)) { + if (unlikely(INVALID_TIME == handle->next_page_time)) { return SN_EMPTY_SLOT; } ctx = handle->ctx; - point_in_time = handle->now * USEC_PER_SEC; - descr = handle->descr; - - if (unlikely(NULL == handle->page_index)) { - ret = SN_EMPTY_SLOT; - goto out; + if (unlikely(NULL == (descr = handle->descr))) { + /* it's the first call */ + next_page_time = handle->next_page_time * USEC_PER_SEC; } + position = handle->position + 1; + if (unlikely(NULL == descr || - point_in_time < descr->start_time || - point_in_time > descr->end_time)) { + position >= (descr->page_length / sizeof(storage_number)))) { + /* We need to get a new page */ if (descr) { + /* Drop old page's reference */ + handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1; + if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { + goto no_more_metrics; + } + next_page_time = handle->next_page_time * USEC_PER_SEC; #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif pg_cache_put(ctx, descr); handle->descr = NULL; } - descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time); + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, + next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); if (NULL == descr) { - ret = SN_EMPTY_SLOT; - goto out; + goto no_more_metrics; } #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); #endif handle->descr = descr; - } - if (unlikely(INVALID_TIME == descr->start_time || - INVALID_TIME == descr->end_time)) { - ret = SN_EMPTY_SLOT; - goto out; + if (unlikely(INVALID_TIME == descr->start_time || + INVALID_TIME == descr->end_time)) { + goto no_more_metrics; + } + if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) { + /* we're in the middle of the page somewhere */ + entries = descr->page_length / sizeof(storage_number); + position = ((uint64_t)(next_page_time - descr->start_time)) * entries / + (descr->end_time - descr->start_time + 1); + } else { + position = 0; + } } page = descr->pg_cache_descr->page; - if (unlikely(descr->start_time == descr->end_time)) { - ret = page[0]; - goto out; - } - position = ((uint64_t)(point_in_time - descr->start_time)) * (descr->page_length / sizeof(storage_number)) / - (descr->end_time - descr->start_time + 1); ret = page[position]; + entries = descr->page_length / sizeof(storage_number); + if (entries > 1) { + usec_t dt; -out: - handle->now += handle->dt; - if (unlikely(handle->now > rrdimm_handle->end_time)) { - handle->now = INVALID_TIME; + dt = (descr->end_time - descr->start_time) / (entries - 1); + current_position_time = descr->start_time + position * dt; + } else { + current_position_time = descr->start_time; } + handle->position = position; + handle->now = current_position_time / USEC_PER_SEC; +/* assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); + The above assertion is an approximation and needs to take update_every into account */ + if (unlikely(handle->now >= rrdimm_handle->end_time)) { + /* next calls will not load any more metrics */ + handle->next_page_time = INVALID_TIME; + } + *current_time = handle->now; return ret; + +no_more_metrics: + handle->next_page_time = INVALID_TIME; + return SN_EMPTY_SLOT; } int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) @@ -307,7 +537,7 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) struct rrdeng_query_handle *handle; handle = &rrdimm_handle->rrdeng; - return (INVALID_TIME == handle->now); + return (INVALID_TIME == handle->next_page_time); } /* |