diff options
Diffstat (limited to 'web/api/queries/query.c')
-rw-r--r-- | web/api/queries/query.c | 712 |
1 files changed, 468 insertions, 244 deletions
diff --git a/web/api/queries/query.c b/web/api/queries/query.c index 0365b6e9..1f10c513 100644 --- a/web/api/queries/query.c +++ b/web/api/queries/query.c @@ -17,6 +17,8 @@ #include "percentile/percentile.h" #include "trimmed_mean/trimmed_mean.h" +#define POINTS_TO_EXPAND_QUERY 5 + // ---------------------------------------------------------------------------- static struct { @@ -694,7 +696,7 @@ static inline void rrdr_done(RRDR *r, long rrdr_line) { // tier management static bool query_metric_is_valid_tier(QUERY_METRIC *qm, size_t tier) { - if(!qm->tiers[tier].db_metric_handle || !qm->tiers[tier].db_first_time_t || !qm->tiers[tier].db_last_time_t || !qm->tiers[tier].db_update_every) + if(!qm->tiers[tier].db_metric_handle || !qm->tiers[tier].db_first_time_s || !qm->tiers[tier].db_last_time_s || !qm->tiers[tier].db_update_every_s) return false; return true; @@ -705,11 +707,11 @@ static size_t query_metric_first_working_tier(QUERY_METRIC *qm) { // find the db time-range for this tier for all metrics STORAGE_METRIC_HANDLE *db_metric_handle = qm->tiers[tier].db_metric_handle; - time_t first_t = qm->tiers[tier].db_first_time_t; - time_t last_t = qm->tiers[tier].db_last_time_t; - time_t update_every = qm->tiers[tier].db_update_every; + time_t first_time_s = qm->tiers[tier].db_first_time_s; + time_t last_time_s = qm->tiers[tier].db_last_time_s; + time_t update_every_s = qm->tiers[tier].db_update_every_s; - if(!db_metric_handle || !first_t || !last_t || !update_every) + if(!db_metric_handle || !first_time_s || !last_time_s || !update_every_s) continue; return tier; @@ -718,19 +720,23 @@ static size_t query_metric_first_working_tier(QUERY_METRIC *qm) { return 0; } -static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_t, time_t db_update_every, time_t after_wanted, time_t before_wanted, size_t points_wanted, size_t tier __maybe_unused) { - if(db_first_t == 0 || db_last_t == 0 || db_update_every == 0) +static long query_plan_points_coverage_weight(time_t db_first_time_s, time_t db_last_time_s, time_t db_update_every_s, time_t after_wanted, time_t before_wanted, size_t points_wanted, size_t tier __maybe_unused) { + if(db_first_time_s == 0 || + db_last_time_s == 0 || + db_update_every_s == 0 || + db_first_time_s > before_wanted || + db_last_time_s < after_wanted) return -LONG_MAX; - time_t common_first_t = MAX(db_first_t, after_wanted); - time_t common_last_t = MIN(db_last_t, before_wanted); + long long common_first_t = MAX(db_first_time_s, after_wanted); + long long common_last_t = MIN(db_last_time_s, before_wanted); - long time_coverage = (common_last_t - common_first_t) * 1000000 / (before_wanted - after_wanted); - size_t points_wanted_in_coverage = points_wanted * time_coverage / 1000000; + long long time_coverage = (common_last_t - common_first_t) * 1000000LL / (before_wanted - after_wanted); + long long points_wanted_in_coverage = (long long)points_wanted * time_coverage / 1000000LL; - long points_available = (common_last_t - common_first_t) / db_update_every; - long points_delta = (long)(points_available - points_wanted_in_coverage); - long points_coverage = (points_delta < 0) ? (long)(points_available * time_coverage / points_wanted_in_coverage) : time_coverage; + long long points_available = (common_last_t - common_first_t) / db_update_every_s; + long long points_delta = (long)(points_available - points_wanted_in_coverage); + long long points_coverage = (points_delta < 0) ? (long)(points_available * time_coverage / points_wanted_in_coverage) : time_coverage; // a way to benefit higher tiers // points_coverage += (long)tier * 10000; @@ -738,7 +744,7 @@ static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_ if(points_available <= 0) return -LONG_MAX; - return points_coverage; + return (long)(points_coverage + (25000LL * tier)); // 2.5% benefit for each higher tier } static size_t query_metric_best_tier_for_timeframe(QUERY_METRIC *qm, time_t after_wanted, time_t before_wanted, size_t points_wanted) { @@ -748,27 +754,49 @@ static size_t query_metric_best_tier_for_timeframe(QUERY_METRIC *qm, time_t afte if(unlikely(after_wanted == before_wanted || points_wanted <= 0)) return query_metric_first_working_tier(qm); - long weight[storage_tiers]; + time_t min_first_time_s = 0; + time_t max_last_time_s = 0; + + for(size_t tier = 0; tier < storage_tiers ; tier++) { + time_t first_time_s = qm->tiers[tier].db_first_time_s; + time_t last_time_s = qm->tiers[tier].db_last_time_s; + + if(!min_first_time_s || (first_time_s && first_time_s < min_first_time_s)) + min_first_time_s = first_time_s; + + if(!max_last_time_s || (last_time_s && last_time_s > max_last_time_s)) + max_last_time_s = last_time_s; + } for(size_t tier = 0; tier < storage_tiers ; tier++) { // find the db time-range for this tier for all metrics STORAGE_METRIC_HANDLE *db_metric_handle = qm->tiers[tier].db_metric_handle; - time_t first_t = qm->tiers[tier].db_first_time_t; - time_t last_t = qm->tiers[tier].db_last_time_t; - time_t update_every = qm->tiers[tier].db_update_every; - - if(!db_metric_handle || !first_t || !last_t || !update_every) { - weight[tier] = -LONG_MAX; + time_t first_time_s = qm->tiers[tier].db_first_time_s; + time_t last_time_s = qm->tiers[tier].db_last_time_s; + time_t update_every_s = qm->tiers[tier].db_update_every_s; + + if( !db_metric_handle || + !first_time_s || + !last_time_s || + !update_every_s || + first_time_s > before_wanted || + last_time_s < after_wanted + ) { + qm->tiers[tier].weight = -LONG_MAX; continue; } - weight[tier] = query_plan_points_coverage_weight(first_t, last_t, update_every, after_wanted, before_wanted, points_wanted, tier); + internal_fatal(first_time_s > before_wanted || last_time_s < after_wanted, "QUERY: invalid db durations"); + + qm->tiers[tier].weight = query_plan_points_coverage_weight( + min_first_time_s, max_last_time_s, update_every_s, + after_wanted, before_wanted, points_wanted, tier); } size_t best_tier = 0; for(size_t tier = 1; tier < storage_tiers ; tier++) { - if(weight[tier] >= weight[best_tier]) + if(qm->tiers[tier].weight >= qm->tiers[best_tier].weight) best_tier = tier; } @@ -788,38 +816,38 @@ static size_t rrddim_find_best_tier_for_timeframe(QUERY_TARGET *qt, time_t after for(size_t tier = 0; tier < storage_tiers ; tier++) { - time_t common_first_t = 0; - time_t common_last_t = 0; - time_t common_update_every = 0; + time_t common_first_time_s = 0; + time_t common_last_time_s = 0; + time_t common_update_every_s = 0; // find the db time-range for this tier for all metrics for(size_t i = 0, used = qt->query.used; i < used ; i++) { QUERY_METRIC *qm = &qt->query.array[i]; - time_t first_t = qm->tiers[tier].db_first_time_t; - time_t last_t = qm->tiers[tier].db_last_time_t; - time_t update_every = qm->tiers[tier].db_update_every; + time_t first_time_s = qm->tiers[tier].db_first_time_s; + time_t last_time_s = qm->tiers[tier].db_last_time_s; + time_t update_every_s = qm->tiers[tier].db_update_every_s; - if(!first_t || !last_t || !update_every) + if(!first_time_s || !last_time_s || !update_every_s) continue; - if(!common_first_t) - common_first_t = first_t; + if(!common_first_time_s) + common_first_time_s = first_time_s; else - common_first_t = MIN(first_t, common_first_t); + common_first_time_s = MIN(first_time_s, common_first_time_s); - if(!common_last_t) - common_last_t = last_t; + if(!common_last_time_s) + common_last_time_s = last_time_s; else - common_last_t = MAX(last_t, common_last_t); + common_last_time_s = MAX(last_time_s, common_last_time_s); - if(!common_update_every) - common_update_every = update_every; + if(!common_update_every_s) + common_update_every_s = update_every_s; else - common_update_every = MIN(update_every, common_update_every); + common_update_every_s = MIN(update_every_s, common_update_every_s); } - weight[tier] = query_plan_points_coverage_weight(common_first_t, common_last_t, common_update_every, after_wanted, before_wanted, points_wanted, tier); + weight[tier] = query_plan_points_coverage_weight(common_first_time_s, common_last_time_s, common_update_every_s, after_wanted, before_wanted, points_wanted, tier); } size_t best_tier = 0; @@ -842,19 +870,19 @@ static time_t rrdset_find_natural_update_every_for_timeframe(QUERY_TARGET *qt, t best_tier = rrddim_find_best_tier_for_timeframe(qt, after_wanted, before_wanted, points_wanted); // find the db minimum update every for this tier for all metrics - time_t common_update_every = default_rrd_update_every; + time_t common_update_every_s = default_rrd_update_every; for(size_t i = 0, used = qt->query.used; i < used ; i++) { QUERY_METRIC *qm = &qt->query.array[i]; - time_t update_every = qm->tiers[best_tier].db_update_every; + time_t update_every_s = qm->tiers[best_tier].db_update_every_s; if(!i) - common_update_every = update_every; + common_update_every_s = update_every_s; else - common_update_every = MIN(update_every, common_update_every); + common_update_every_s = MIN(update_every_s, common_update_every_s); } - return common_update_every; + return common_update_every_s; } // ---------------------------------------------------------------------------- @@ -888,17 +916,6 @@ QUERY_POINT QUERY_POINT_EMPTY = { #define query_point_set_id(point, point_id) debug_dummy() #endif -typedef struct query_plan_entry { - size_t tier; - time_t after; - time_t before; -} QUERY_PLAN_ENTRY; - -typedef struct query_plan { - size_t entries; - QUERY_PLAN_ENTRY data[RRD_STORAGE_TIERS*2]; -} QUERY_PLAN; - typedef struct query_engine_ops { // configuration RRDR *r; @@ -908,14 +925,15 @@ typedef struct query_engine_ops { TIER_QUERY_FETCH tier_query_fetch; // query planer - QUERY_PLAN plan; size_t current_plan; time_t current_plan_expire_time; + time_t plan_expanded_after; + time_t plan_expanded_before; // storage queries size_t tier; struct query_metric_tier *tier_ptr; - struct storage_engine_query_handle handle; + struct storage_engine_query_handle *handle; STORAGE_POINT (*next_metric)(struct storage_engine_query_handle *handle); int (*is_finished)(struct storage_engine_query_handle *handle); void (*finalize)(struct storage_engine_query_handle *handle); @@ -937,31 +955,128 @@ typedef struct query_engine_ops { // ---------------------------------------------------------------------------- // query planer -#define query_plan_should_switch_plan(ops, now) ((now) >= (ops).current_plan_expire_time) +#define query_plan_should_switch_plan(ops, now) ((now) >= (ops)->current_plan_expire_time) + +static size_t query_planer_expand_duration_in_points(time_t this_update_every, time_t next_update_every) { + + time_t delta = this_update_every - next_update_every; + if(delta < 0) delta = -delta; + + size_t points; + if(delta < this_update_every * POINTS_TO_EXPAND_QUERY) + points = POINTS_TO_EXPAND_QUERY; + else + points = (delta + this_update_every - 1) / this_update_every; + + return points; +} -static void query_planer_activate_plan(QUERY_ENGINE_OPS *ops, size_t plan_id, time_t overwrite_after) { - if(unlikely(plan_id >= ops->plan.entries)) - plan_id = ops->plan.entries - 1; +static void query_planer_initialize_plans(QUERY_ENGINE_OPS *ops) { + QUERY_METRIC *qm = ops->qm; + + for(size_t p = 0; p < qm->plan.used ; p++) { + size_t tier = qm->plan.array[p].tier; + time_t update_every = qm->tiers[tier].db_update_every_s; + + size_t points_to_add_to_after; + if(p > 0) { + // there is another plan before to this + + size_t tier0 = qm->plan.array[p - 1].tier; + time_t update_every0 = qm->tiers[tier0].db_update_every_s; + + points_to_add_to_after = query_planer_expand_duration_in_points(update_every, update_every0); + } + else + points_to_add_to_after = (tier == 0) ? 0 : POINTS_TO_EXPAND_QUERY; - time_t after = ops->plan.data[plan_id].after; - time_t before = ops->plan.data[plan_id].before; + size_t points_to_add_to_before; + if(p + 1 < qm->plan.used) { + // there is another plan after to this - if(overwrite_after > after && overwrite_after < before) - after = overwrite_after; + size_t tier1 = qm->plan.array[p+1].tier; + time_t update_every1 = qm->tiers[tier1].db_update_every_s; - ops->tier = ops->plan.data[plan_id].tier; - ops->tier_ptr = &ops->qm->tiers[ops->tier]; - ops->tier_ptr->eng->api.query_ops.init(ops->tier_ptr->db_metric_handle, &ops->handle, after, before); - ops->next_metric = ops->tier_ptr->eng->api.query_ops.next_metric; - ops->is_finished = ops->tier_ptr->eng->api.query_ops.is_finished; - ops->finalize = ops->tier_ptr->eng->api.query_ops.finalize; + points_to_add_to_before = query_planer_expand_duration_in_points(update_every, update_every1); + } + else + points_to_add_to_before = POINTS_TO_EXPAND_QUERY; + + time_t after = qm->plan.array[p].after - (time_t)(update_every * points_to_add_to_after); + time_t before = qm->plan.array[p].before + (time_t)(update_every * points_to_add_to_before); + + qm->plan.array[p].expanded_after = after; + qm->plan.array[p].expanded_before = before; + + struct query_metric_tier *tier_ptr = &qm->tiers[tier]; + tier_ptr->eng->api.query_ops.init( + tier_ptr->db_metric_handle, + &qm->plan.array[p].handle, + after, before, + ops->r->internal.qt->request.priority); + + qm->plan.array[p].next_metric = tier_ptr->eng->api.query_ops.next_metric; + qm->plan.array[p].is_finished = tier_ptr->eng->api.query_ops.is_finished; + qm->plan.array[p].finalize = tier_ptr->eng->api.query_ops.finalize; + qm->plan.array[p].initialized = true; + qm->plan.array[p].finalized = false; + } +} + +static void query_planer_finalize_plan(QUERY_ENGINE_OPS *ops, size_t plan_id) { + QUERY_METRIC *qm = ops->qm; + + if(qm->plan.array[plan_id].initialized && !qm->plan.array[plan_id].finalized) { + qm->plan.array[plan_id].finalize(&qm->plan.array[plan_id].handle); + qm->plan.array[plan_id].initialized = false; + qm->plan.array[plan_id].finalized = true; + qm->plan.array[plan_id].next_metric = NULL; + qm->plan.array[plan_id].is_finished = NULL; + qm->plan.array[plan_id].finalize = NULL; + + if(ops->current_plan == plan_id) { + ops->next_metric = NULL; + ops->is_finished = NULL; + ops->finalize = NULL; + } + } +} + +static void query_planer_finalize_remaining_plans(QUERY_ENGINE_OPS *ops) { + QUERY_METRIC *qm = ops->qm; + + for(size_t p = 0; p < qm->plan.used ; p++) + query_planer_finalize_plan(ops, p); +} + +static void query_planer_activate_plan(QUERY_ENGINE_OPS *ops, size_t plan_id, time_t overwrite_after __maybe_unused) { + QUERY_METRIC *qm = ops->qm; + + internal_fatal(plan_id >= qm->plan.used, "QUERY: invalid plan_id given"); + internal_fatal(!qm->plan.array[plan_id].initialized, "QUERY: plan has not been initialized"); + internal_fatal(qm->plan.array[plan_id].finalized, "QUERY: plan has been finalized"); + + internal_fatal(qm->plan.array[plan_id].after > qm->plan.array[plan_id].before, "QUERY: flipped after/before"); + + ops->tier = qm->plan.array[plan_id].tier; + ops->tier_ptr = &qm->tiers[ops->tier]; + ops->handle = &qm->plan.array[plan_id].handle; + ops->next_metric = qm->plan.array[plan_id].next_metric; + ops->is_finished = qm->plan.array[plan_id].is_finished; + ops->finalize = qm->plan.array[plan_id].finalize; ops->current_plan = plan_id; - ops->current_plan_expire_time = ops->plan.data[plan_id].before; + + if(plan_id + 1 < qm->plan.used && qm->plan.array[plan_id + 1].after < qm->plan.array[plan_id].before) + ops->current_plan_expire_time = qm->plan.array[plan_id + 1].after; + else + ops->current_plan_expire_time = qm->plan.array[plan_id].before; + + ops->plan_expanded_after = qm->plan.array[plan_id].expanded_after; + ops->plan_expanded_before = qm->plan.array[plan_id].expanded_before; } -static void query_planer_next_plan(QUERY_ENGINE_OPS *ops, time_t now, time_t last_point_end_time) { - internal_error(now < ops->current_plan_expire_time && now < ops->plan.data[ops->current_plan].before, - "QUERY: switching query plan too early!"); +static bool query_planer_next_plan(QUERY_ENGINE_OPS *ops, time_t now, time_t last_point_end_time) { + QUERY_METRIC *qm = ops->qm; size_t old_plan = ops->current_plan; @@ -969,32 +1084,26 @@ static void query_planer_next_plan(QUERY_ENGINE_OPS *ops, time_t now, time_t las do { ops->current_plan++; - if (ops->current_plan >= ops->plan.entries) { + if (ops->current_plan >= qm->plan.used) { ops->current_plan = old_plan; ops->current_plan_expire_time = ops->r->internal.qt->window.before; // let the query run with current plan // we will not switch it - return; + return false; } - next_plan_before_time = ops->plan.data[ops->current_plan].before; + next_plan_before_time = qm->plan.array[ops->current_plan].before; } while(now >= next_plan_before_time || last_point_end_time >= next_plan_before_time); - if(!query_metric_is_valid_tier(ops->qm, ops->plan.data[ops->current_plan].tier)) { + if(!query_metric_is_valid_tier(qm, qm->plan.array[ops->current_plan].tier)) { ops->current_plan = old_plan; ops->current_plan_expire_time = ops->r->internal.qt->window.before; - return; - } - - if(ops->finalize) { - ops->finalize(&ops->handle); - ops->finalize = NULL; - ops->is_finished = NULL; + return false; } - // internal_error(true, "QUERY: switched plan to %zu (all is %zu), previous expiration was %ld, this starts at %ld, now is %ld, last_point_end_time %ld", ops->current_plan, ops->plan.entries, ops->plan.data[ops->current_plan-1].before, ops->plan.data[ops->current_plan].after, now, last_point_end_time); - + query_planer_finalize_plan(ops, old_plan); query_planer_activate_plan(ops, ops->current_plan, MIN(now, last_point_end_time)); + return true; } static int compare_query_plan_entries_on_start_time(const void *a, const void *b) { @@ -1004,59 +1113,66 @@ static int compare_query_plan_entries_on_start_time(const void *a, const void *b } static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before_wanted, size_t points_wanted) { - //BUFFER *wb = buffer_create(1000); - //buffer_sprintf(wb, "QUERY PLAN for chart '%s' dimension '%s', from %ld to %ld:", rd->rrdset->name, rd->name, after_wanted, before_wanted); + QUERY_METRIC *qm = ops->qm; // put our selected tier as the first plan size_t selected_tier; if(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER && ops->r->internal.qt->window.tier < storage_tiers - && query_metric_is_valid_tier(ops->qm, ops->r->internal.qt->window.tier)) { + && query_metric_is_valid_tier(qm, ops->r->internal.qt->window.tier)) { selected_tier = ops->r->internal.qt->window.tier; } else { - selected_tier = query_metric_best_tier_for_timeframe(ops->qm, after_wanted, before_wanted, points_wanted); + selected_tier = query_metric_best_tier_for_timeframe(qm, after_wanted, before_wanted, points_wanted); if(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER) ops->r->internal.query_options &= ~RRDR_OPTION_SELECTED_TIER; + + if(!query_metric_is_valid_tier(qm, selected_tier)) + return false; + + if(qm->tiers[selected_tier].db_first_time_s > before_wanted || + qm->tiers[selected_tier].db_last_time_s < after_wanted) + return false; } - ops->plan.entries = 1; - ops->plan.data[0].tier = selected_tier; - ops->plan.data[0].after = ops->qm->tiers[selected_tier].db_first_time_t; - ops->plan.data[0].before = ops->qm->tiers[selected_tier].db_last_time_t; + qm->plan.used = 1; + qm->plan.array[0].tier = selected_tier; + qm->plan.array[0].after = (qm->tiers[selected_tier].db_first_time_s < after_wanted) ? after_wanted : qm->tiers[selected_tier].db_first_time_s; + qm->plan.array[0].before = (qm->tiers[selected_tier].db_last_time_s > before_wanted) ? before_wanted : qm->tiers[selected_tier].db_last_time_s; if(!(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER)) { // the selected tier - time_t selected_tier_first_time_t = ops->plan.data[0].after; - time_t selected_tier_last_time_t = ops->plan.data[0].before; - - //buffer_sprintf(wb, ": SELECTED tier %zu, from %ld to %ld", selected_tier, ops->plan.data[0].after, ops->plan.data[0].before); + time_t selected_tier_first_time_s = qm->plan.array[0].after; + time_t selected_tier_last_time_s = qm->plan.array[0].before; // check if our selected tier can start the query - if (selected_tier_first_time_t > after_wanted) { + if (selected_tier_first_time_s > after_wanted) { // we need some help from other tiers for (size_t tr = (int)selected_tier + 1; tr < storage_tiers; tr++) { - if(!query_metric_is_valid_tier(ops->qm, tr)) + if(!query_metric_is_valid_tier(qm, tr)) continue; // find the first time of this tier - time_t first_time_t = ops->qm->tiers[tr].db_first_time_t; - - //buffer_sprintf(wb, ": EVAL AFTER tier %d, %ld", tier, first_time_t); + time_t tier_first_time_s = qm->tiers[tr].db_first_time_s; // can it help? - if (first_time_t < selected_tier_first_time_t) { + if (tier_first_time_s < selected_tier_first_time_s) { // it can help us add detail at the beginning of the query QUERY_PLAN_ENTRY t = { .tier = tr, - .after = (first_time_t < after_wanted) ? after_wanted : first_time_t, - .before = selected_tier_first_time_t}; - ops->plan.data[ops->plan.entries++] = t; + .after = (tier_first_time_s < after_wanted) ? after_wanted : tier_first_time_s, + .before = selected_tier_first_time_s, + .initialized = false, + .finalized = false, + }; + qm->plan.array[qm->plan.used++] = t; + + internal_fatal(!t.after || !t.before, "QUERY: invalid plan selected"); // prepare for the tier - selected_tier_first_time_t = t.after; + selected_tier_first_time_s = t.after; if (t.after <= after_wanted) break; @@ -1065,28 +1181,33 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before } // check if our selected tier can finish the query - if (selected_tier_last_time_t < before_wanted) { + if (selected_tier_last_time_s < before_wanted) { // we need some help from other tiers for (int tr = (int)selected_tier - 1; tr >= 0; tr--) { - if(!query_metric_is_valid_tier(ops->qm, tr)) + if(!query_metric_is_valid_tier(qm, tr)) continue; // find the last time of this tier - time_t last_time_t = ops->qm->tiers[tr].db_last_time_t; + time_t tier_last_time_s = qm->tiers[tr].db_last_time_s; - //buffer_sprintf(wb, ": EVAL BEFORE tier %d, %ld", tier, last_time_t); + //buffer_sprintf(wb, ": EVAL BEFORE tier %d, %ld", tier, last_time_s); // can it help? - if (last_time_t > selected_tier_last_time_t) { + if (tier_last_time_s > selected_tier_last_time_s) { // it can help us add detail at the end of the query QUERY_PLAN_ENTRY t = { .tier = tr, - .after = selected_tier_last_time_t, - .before = (last_time_t > before_wanted) ? before_wanted : last_time_t}; - ops->plan.data[ops->plan.entries++] = t; + .after = selected_tier_last_time_s, + .before = (tier_last_time_s > before_wanted) ? before_wanted : tier_last_time_s, + .initialized = false, + .finalized = false, + }; + qm->plan.array[qm->plan.used++] = t; // prepare for the tier - selected_tier_last_time_t = t.before; + selected_tier_last_time_s = t.before; + + internal_fatal(!t.after || !t.before, "QUERY: invalid plan selected"); if (t.before >= before_wanted) break; @@ -1096,26 +1217,21 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before } // sort the query plan - if(ops->plan.entries > 1) - qsort(&ops->plan.data, ops->plan.entries, sizeof(QUERY_PLAN_ENTRY), compare_query_plan_entries_on_start_time); + if(qm->plan.used > 1) + qsort(&qm->plan.array, qm->plan.used, sizeof(QUERY_PLAN_ENTRY), compare_query_plan_entries_on_start_time); - // make sure it has the whole timeframe we need - if(ops->plan.data[0].after < after_wanted) - ops->plan.data[0].after = after_wanted; - - if(ops->plan.data[ops->plan.entries - 1].before > before_wanted) - ops->plan.data[ops->plan.entries - 1].before = before_wanted; - - //buffer_sprintf(wb, ": FINAL STEPS %zu", ops->plan.entries); - - //for(size_t i = 0; i < ops->plan.entries ;i++) - // buffer_sprintf(wb, ": STEP %zu = use tier %zu from %ld to %ld", i+1, ops->plan.data[i].tier, ops->plan.data[i].after, ops->plan.data[i].before); - - //internal_error(true, "%s", buffer_tostring(wb)); - - if(!query_metric_is_valid_tier(ops->qm, ops->plan.data[0].tier)) + if(!query_metric_is_valid_tier(qm, qm->plan.array[0].tier)) return false; +#ifdef NETDATA_INTERNAL_CHECKS + for(size_t p = 0; p < qm->plan.used ;p++) { + internal_fatal(qm->plan.array[p].after > qm->plan.array[p].before, "QUERY: flipped after/before"); + internal_fatal(qm->plan.array[p].after < after_wanted, "QUERY: too small plan first time"); + internal_fatal(qm->plan.array[p].before > before_wanted, "QUERY: too big plan last time"); + } +#endif + + query_planer_initialize_plans(ops); query_planer_activate_plan(ops, 0, 0); return true; @@ -1146,24 +1262,45 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before #define query_add_point_to_group(r, point, ops) do { \ if(likely(netdata_double_isnumber((point).value))) { \ if(likely(fpclassify((point).value) != FP_ZERO)) \ - (ops).group_points_non_zero++; \ + (ops)->group_points_non_zero++; \ \ if(unlikely((point).flags & SN_FLAG_RESET)) \ - (ops).group_value_flags |= RRDR_VALUE_RESET; \ + (ops)->group_value_flags |= RRDR_VALUE_RESET; \ \ - (ops).grouping_add(r, (point).value); \ + (ops)->grouping_add(r, (point).value); \ } \ \ - (ops).group_points_added++; \ - (ops).group_anomaly_rate += (point).anomaly; \ + (ops)->group_points_added++; \ + (ops)->group_anomaly_rate += (point).anomaly; \ } while(0) -static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { +static QUERY_ENGINE_OPS *rrd2rrdr_query_prep(RRDR *r, size_t dim_id_in_rrdr) { QUERY_TARGET *qt = r->internal.qt; - QUERY_METRIC *qm = &qt->query.array[dim_id_in_rrdr]; + + QUERY_ENGINE_OPS *ops = onewayalloc_mallocz(r->internal.owa, sizeof(QUERY_ENGINE_OPS)); + *ops = (QUERY_ENGINE_OPS) { + .r = r, + .qm = &qt->query.array[dim_id_in_rrdr], + .grouping_add = r->internal.grouping_add, + .grouping_flush = r->internal.grouping_flush, + .tier_query_fetch = r->internal.tier_query_fetch, + .view_update_every = r->update_every, + .query_granularity = (time_t)(r->update_every / r->group), + .group_value_flags = RRDR_VALUE_NOTHING, + }; + + if(!query_plan(ops, qt->window.after, qt->window.before, qt->window.points)) + return NULL; + + return ops; +} + +static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_OPS *ops) { + QUERY_TARGET *qt = r->internal.qt; + QUERY_METRIC *qm = &qt->query.array[dim_id_in_rrdr]; (void)qm; size_t points_wanted = qt->window.points; time_t after_wanted = qt->window.after; - time_t before_wanted = qt->window.before; + time_t before_wanted = qt->window.before; (void)before_wanted; // bool debug_this = false; // if(strcmp("user", string2str(rd->id)) == 0 && strcmp("system.cpu", string2str(rd->rrdset->id)) == 0) @@ -1174,39 +1311,30 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { size_t points_added = 0; - QUERY_ENGINE_OPS ops = { - .r = r, - .qm = qm, - .grouping_add = r->internal.grouping_add, - .grouping_flush = r->internal.grouping_flush, - .tier_query_fetch = r->internal.tier_query_fetch, - .view_update_every = r->update_every, - .query_granularity = (time_t)(r->update_every / r->group), - .group_value_flags = RRDR_VALUE_NOTHING - }; - long rrdr_line = -1; bool use_anomaly_bit_as_value = (r->internal.query_options & RRDR_OPTION_ANOMALY_BIT) ? true : false; - if(!query_plan(&ops, after_wanted, before_wanted, points_wanted)) - return; - NETDATA_DOUBLE min = r->min, max = r->max; QUERY_POINT last2_point = QUERY_POINT_EMPTY; QUERY_POINT last1_point = QUERY_POINT_EMPTY; QUERY_POINT new_point = QUERY_POINT_EMPTY; - time_t now_start_time = after_wanted - ops.query_granularity; - time_t now_end_time = after_wanted + ops.view_update_every - ops.query_granularity; + // ONE POINT READ-AHEAD + // when we switch plans, we read-ahead a point from the next plan + // to join them smoothly at the exact time the next plan begins + STORAGE_POINT next1_point = STORAGE_POINT_UNSET; + + time_t now_start_time = after_wanted - ops->query_granularity; + time_t now_end_time = after_wanted + ops->view_update_every - ops->query_granularity; size_t db_points_read_since_plan_switch = 0; (void)db_points_read_since_plan_switch; // The main loop, based on the query granularity we need - for( ; points_added < points_wanted ; now_start_time = now_end_time, now_end_time += ops.view_update_every) { + for( ; points_added < points_wanted ; now_start_time = now_end_time, now_end_time += ops->view_update_every) { if(unlikely(query_plan_should_switch_plan(ops, now_end_time))) { - query_planer_next_plan(&ops, now_end_time, new_point.end_time); + query_planer_next_plan(ops, now_end_time, new_point.end_time); db_points_read_since_plan_switch = 0; } @@ -1219,7 +1347,7 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { last1_point = new_point; } - if(unlikely(ops.is_finished(&ops.handle))) { + if(unlikely(ops->is_finished(ops->handle))) { if(count_same_end_time != 0) { last2_point = last1_point; last1_point = new_point; @@ -1235,29 +1363,62 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { // fetch the new point { - db_points_read_since_plan_switch++; - STORAGE_POINT sp = ops.next_metric(&ops.handle); + STORAGE_POINT sp; + if(likely(storage_point_is_unset(next1_point))) { + db_points_read_since_plan_switch++; + sp = ops->next_metric(ops->handle); + } + else { + // ONE POINT READ-AHEAD + sp = next1_point; + storage_point_unset(next1_point); + db_points_read_since_plan_switch = 1; + } + + // ONE POINT READ-AHEAD + if(unlikely(query_plan_should_switch_plan(ops, sp.end_time_s) && + query_planer_next_plan(ops, now_end_time, new_point.end_time))) { + + // The end time of the current point, crosses our plans (tiers) + // so, we switched plan (tier) + // + // There are 2 cases now: + // + // A. the entire point of the previous plan is to the future of point from the next plan + // B. part of the point of the previous plan overlaps with the point from the next plan + + STORAGE_POINT sp2 = ops->next_metric(ops->handle); + + if(sp.start_time_s > sp2.start_time_s) + // the point from the previous plan is useless + sp = sp2; + else + // let the query run from the previous plan + // but setting this will also cut off the interpolation + // of the point from the previous plan + next1_point = sp2; + } - ops.db_points_read_per_tier[ops.tier]++; - ops.db_total_points_read++; + ops->db_points_read_per_tier[ops->tier]++; + ops->db_total_points_read++; - new_point.start_time = sp.start_time; - new_point.end_time = sp.end_time; + new_point.start_time = sp.start_time_s; + new_point.end_time = sp.end_time_s; new_point.anomaly = sp.count ? (NETDATA_DOUBLE)sp.anomaly_count * 100.0 / (NETDATA_DOUBLE)sp.count : 0.0; - query_point_set_id(new_point, ops.db_total_points_read); + query_point_set_id(new_point, ops->db_total_points_read); // if(debug_this) // info("QUERY: got point %zu, from time %ld to %ld // now from %ld to %ld // query from %ld to %ld", // new_point.id, new_point.start_time, new_point.end_time, now_start_time, now_end_time, after_wanted, before_wanted); // - // set the right value to the point we got - if(likely(!storage_point_is_unset(sp) && !storage_point_is_empty(sp))) { + // get the right value from the point we got + if(likely(!storage_point_is_unset(sp) && !storage_point_is_gap(sp))) { if(unlikely(use_anomaly_bit_as_value)) new_point.value = new_point.anomaly; else { - switch (ops.tier_query_fetch) { + switch (ops->tier_query_fetch) { default: case TIER_QUERY_FETCH_AVERAGE: new_point.value = sp.sum / sp.count; @@ -1284,19 +1445,30 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { } // check if the db is giving us zero duration points - if(unlikely(new_point.start_time == new_point.end_time)) { - internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned point %zu start time %ld, end time %ld, that are both equal", - qt->id, string2str(qm->dimension.id), new_point.id, new_point.start_time, new_point.end_time); + if(unlikely(db_points_read_since_plan_switch > 1 && + new_point.start_time == new_point.end_time)) { - new_point.start_time = new_point.end_time - ops.tier_ptr->db_update_every; + internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned " + "point %zu from %ld to %ld, that are both equal", + qt->id, string2str(qm->dimension.id), + new_point.id, new_point.start_time, new_point.end_time); + + new_point.start_time = new_point.end_time - ops->tier_ptr->db_update_every_s; } // check if the db is advancing the query - if(unlikely(new_point.end_time <= last1_point.end_time)) { - internal_error(db_points_read_since_plan_switch > 1, - "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld to %ld, before the last point %zu from %ld to %ld, now is %ld to %ld", - qt->id, string2str(qm->dimension.id), new_point.id, new_point.start_time, new_point.end_time, - last1_point.id, last1_point.start_time, last1_point.end_time, now_start_time, now_end_time); + if(unlikely(db_points_read_since_plan_switch > 1 && + new_point.end_time <= last1_point.end_time)) { + + internal_error(true, + "QUERY: '%s', dimension '%s' next_metric() returned " + "point %zu from %ld to %ld, before the " + "last point %zu from %ld to %ld, " + "now is %ld to %ld", + qt->id, string2str(qm->dimension.id), + new_point.id, new_point.start_time, new_point.end_time, + last1_point.id, last1_point.start_time, last1_point.end_time, + now_start_time, now_end_time); count_same_end_time++; continue; @@ -1321,12 +1493,16 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { // at exactly the time we will want // we only log if this is not point 1 - internal_error(new_point.end_time < after_wanted && new_point.id > 1, - "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld time %ld, which is entirely before our current timeframe %ld to %ld (and before the entire query, after %ld, before %ld)", + internal_error(new_point.end_time < ops->plan_expanded_after && + db_points_read_since_plan_switch > 1, + "QUERY: '%s', dimension '%s' next_metric() " + "returned point %zu from %ld time %ld, " + "which is entirely before our current timeframe %ld to %ld " + "(and before the entire query, after %ld, before %ld)", qt->id, string2str(qm->dimension.id), new_point.id, new_point.start_time, new_point.end_time, now_start_time, now_end_time, - after_wanted, before_wanted); + ops->plan_expanded_after, ops->plan_expanded_before); } } @@ -1339,20 +1515,31 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { if(unlikely(count_same_end_time)) { internal_error(true, - "QUERY: '%s', dimension '%s', the database does not advance the query, it returned an end time less or equal to the end time of the last point we got %ld, %zu times", - qt->id, string2str(qm->dimension.id), last1_point.end_time, count_same_end_time); + "QUERY: '%s', dimension '%s', the database does not advance the query," + " it returned an end time less or equal to the end time of the last " + "point we got %ld, %zu times", + qt->id, string2str(qm->dimension.id), + last1_point.end_time, count_same_end_time); if(unlikely(new_point.end_time <= last1_point.end_time)) new_point.end_time = now_end_time; } + time_t stop_time = new_point.end_time; + if(unlikely(!storage_point_is_unset(next1_point))) { + // ONE POINT READ-AHEAD + // the point crosses the start time of the + // read ahead storage point we have read + stop_time = next1_point.start_time_s; + } + // the inner loop // we have 3 points in memory: last2, last1, new // we select the one to use based on their timestamps size_t iterations = 0; - for ( ; now_end_time <= new_point.end_time && points_added < points_wanted ; - now_end_time += ops.view_update_every, iterations++) { + for ( ; now_end_time <= stop_time && points_added < points_wanted ; + now_end_time += ops->view_update_every, iterations++) { // now_start_time is wrong in this loop // but, we don't need it @@ -1411,20 +1598,20 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { RRDR_VALUE_FLAGS *rrdr_value_options_ptr = &r->o[rrdr_o_v_index]; // update the dimension options - if(likely(ops.group_points_non_zero)) + if(likely(ops->group_points_non_zero)) r->od[dim_id_in_rrdr] |= RRDR_DIMENSION_NONZERO; // store the specific point options - *rrdr_value_options_ptr = ops.group_value_flags; + *rrdr_value_options_ptr = ops->group_value_flags; // store the group value - NETDATA_DOUBLE group_value = ops.grouping_flush(r, rrdr_value_options_ptr); + NETDATA_DOUBLE group_value = ops->grouping_flush(r, rrdr_value_options_ptr); r->v[rrdr_o_v_index] = group_value; // we only store uint8_t anomaly rates, // so let's get double precision by storing // anomaly rates in the range 0 - 200 - r->ar[rrdr_o_v_index] = ops.group_anomaly_rate / (NETDATA_DOUBLE)ops.group_points_added; + r->ar[rrdr_o_v_index] = ops->group_anomaly_rate / (NETDATA_DOUBLE)ops->group_points_added; if(likely(points_added || dim_id_in_rrdr)) { // find the min/max across all dimensions @@ -1440,72 +1627,71 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { } points_added++; - ops.group_points_added = 0; - ops.group_value_flags = RRDR_VALUE_NOTHING; - ops.group_points_non_zero = 0; - ops.group_anomaly_rate = 0; + ops->group_points_added = 0; + ops->group_value_flags = RRDR_VALUE_NOTHING; + ops->group_points_non_zero = 0; + ops->group_anomaly_rate = 0; } // the loop above increased "now" by query_granularity, // but the main loop will increase it too, // so, let's undo the last iteration of this loop if(iterations) - now_end_time -= ops.view_update_every; + now_end_time -= ops->view_update_every; } - ops.finalize(&ops.handle); + query_planer_finalize_remaining_plans(ops); r->internal.result_points_generated += points_added; - r->internal.db_points_read += ops.db_total_points_read; + r->internal.db_points_read += ops->db_total_points_read; for(size_t tr = 0; tr < storage_tiers ; tr++) - r->internal.tier_points_read[tr] += ops.db_points_read_per_tier[tr]; + r->internal.tier_points_read[tr] += ops->db_points_read_per_tier[tr]; r->min = min; r->max = max; r->before = max_date; - r->after = min_date - ops.view_update_every + ops.query_granularity; + r->after = min_date - ops->view_update_every + ops->query_granularity; rrdr_done(r, rrdr_line); internal_error(points_added != points_wanted, "QUERY: '%s', dimension '%s', requested %zu points, but RRDR added %zu (%zu db points read).", qt->id, string2str(qm->dimension.id), - (size_t)points_wanted, (size_t)points_added, ops.db_total_points_read); + (size_t)points_wanted, (size_t)points_added, ops->db_total_points_read); } // ---------------------------------------------------------------------------- // fill the gap of a tier void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut); -void store_metric_collection_completed(void); -void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now) { +void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s) { if(unlikely(tier >= storage_tiers)) return; if(storage_tiers_backfill[tier] == RRD_BACKFILL_NONE) return; - struct rrddim_tier *t = rd->tiers[tier]; + struct rrddim_tier *t = &rd->tiers[tier]; if(unlikely(!t)) return; - time_t latest_time_t = t->query_ops->latest_time(t->db_metric_handle); + time_t latest_time_s = t->query_ops->latest_time_s(t->db_metric_handle); time_t granularity = (time_t)t->tier_grouping * (time_t)rd->update_every; - time_t time_diff = now - latest_time_t; + time_t time_diff = now_s - latest_time_s; // if the user wants only NEW backfilling, and we don't have any data - if(storage_tiers_backfill[tier] == RRD_BACKFILL_NEW && latest_time_t <= 0) return; + if(storage_tiers_backfill[tier] == RRD_BACKFILL_NEW && latest_time_s <= 0) return; // there is really nothing we can do - if(now <= latest_time_t || time_diff < granularity) return; + if(now_s <= latest_time_s || time_diff < granularity) return; struct storage_engine_query_handle handle; // for each lower tier for(int read_tier = (int)tier - 1; read_tier >= 0 ; read_tier--){ - time_t smaller_tier_first_time = rd->tiers[read_tier]->query_ops->oldest_time(rd->tiers[read_tier]->db_metric_handle); - time_t smaller_tier_last_time = rd->tiers[read_tier]->query_ops->latest_time(rd->tiers[read_tier]->db_metric_handle); - if(smaller_tier_last_time <= latest_time_t) continue; // it is as bad as we are + time_t smaller_tier_first_time = rd->tiers[read_tier].query_ops->oldest_time_s(rd->tiers[read_tier].db_metric_handle); + time_t smaller_tier_last_time = rd->tiers[read_tier].query_ops->latest_time_s(rd->tiers[read_tier].db_metric_handle); + if(smaller_tier_last_time <= latest_time_s) continue; // it is as bad as we are - long after_wanted = (latest_time_t < smaller_tier_first_time) ? smaller_tier_first_time : latest_time_t; + long after_wanted = (latest_time_s < smaller_tier_first_time) ? smaller_tier_first_time : latest_time_s; long before_wanted = smaller_tier_last_time; - struct rrddim_tier *tmp = rd->tiers[read_tier]; - tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted); + struct rrddim_tier *tmp = &rd->tiers[read_tier]; + tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted, STORAGE_PRIORITY_HIGH); size_t points_read = 0; @@ -1514,9 +1700,9 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now) STORAGE_POINT sp = tmp->query_ops->next_metric(&handle); points_read++; - if(sp.end_time > latest_time_t) { - latest_time_t = sp.end_time; - store_metric_at_tier(rd, tier, t, sp, sp.end_time * USEC_PER_SEC); + if(sp.end_time_s > latest_time_s) { + latest_time_s = sp.end_time_s; + store_metric_at_tier(rd, tier, t, sp, sp.end_time_s * USEC_PER_SEC); } } @@ -1551,12 +1737,12 @@ static void rrd2rrdr_log_request_response_metadata(RRDR *r , const char *msg ) { - time_t first_entry_t = r->internal.qt->db.first_time_t; - time_t last_entry_t = r->internal.qt->db.last_time_t; + time_t first_entry_s = r->internal.qt->db.first_time_s; + time_t last_entry_s = r->internal.qt->db.last_time_s; internal_error( - true, - "rrd2rrdr() on %s update every %ld with %s grouping %s (group: %zu, resampling_time: %ld, resampling_group: %zu), " + true, + "rrd2rrdr() on %s update every %ld with %s grouping %s (group: %zu, resampling_time: %ld, resampling_group: %zu), " "after (got: %ld, want: %ld, req: %ld, db: %ld), " "before (got: %ld, want: %ld, req: %ld, db: %ld), " "duration (got: %ld, want: %ld, req: %ld, db: %ld), " @@ -1576,19 +1762,19 @@ static void rrd2rrdr_log_request_response_metadata(RRDR *r , r->after , after_wanted , after_requested - , first_entry_t + , first_entry_s // before , r->before , before_wanted , before_requested - , last_entry_t + , last_entry_s // duration , (long)(r->before - r->after + r->internal.qt->window.query_granularity) , (long)(before_wanted - after_wanted + r->internal.qt->window.query_granularity) , (long)before_requested - after_requested - , (long)((last_entry_t - first_entry_t) + r->internal.qt->window.query_granularity) + , (long)((last_entry_s - first_entry_s) + r->internal.qt->window.query_granularity) // points , r->rows @@ -1708,7 +1894,7 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { time_t resampling_time_requested = qt->request.resampling_time; RRDR_OPTIONS options = qt->request.options; size_t tier = qt->request.tier; - time_t update_every = qt->db.minimum_latest_update_every; + time_t update_every = qt->db.minimum_latest_update_every_s; // RULES // points_requested = 0 @@ -1763,30 +1949,30 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { if (after_wanted == 0 || before_wanted == 0) { relative_period_requested = true; - time_t first_entry_t = qt->db.first_time_t; - time_t last_entry_t = qt->db.last_time_t; + time_t first_entry_s = qt->db.first_time_s; + time_t last_entry_s = qt->db.last_time_s; - if (first_entry_t == 0 || last_entry_t == 0) { - internal_error(true, "QUERY: no data detected on query '%s' (db first_entry_t = %ld, last_entry_t = %ld", qt->id, first_entry_t, last_entry_t); + if (first_entry_s == 0 || last_entry_s == 0) { + internal_error(true, "QUERY: no data detected on query '%s' (db first_entry_t = %ld, last_entry_t = %ld", qt->id, first_entry_s, last_entry_s); query_debug_log_free(); return false; } - query_debug_log(":first_entry_t %ld, last_entry_t %ld", first_entry_t, last_entry_t); + query_debug_log(":first_entry_t %ld, last_entry_t %ld", first_entry_s, last_entry_s); if (after_wanted == 0) { - after_wanted = first_entry_t; + after_wanted = first_entry_s; query_debug_log(":zero after_wanted %ld", after_wanted); } if (before_wanted == 0) { - before_wanted = last_entry_t; + before_wanted = last_entry_s; before_is_aligned_to_db_end = true; query_debug_log(":zero before_wanted %ld", before_wanted); } if (points_wanted == 0) { - points_wanted = (last_entry_t - first_entry_t) / update_every; + points_wanted = (last_entry_s - first_entry_s) / update_every; query_debug_log(":zero points_wanted %zu", points_wanted); } } @@ -1804,7 +1990,7 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { update_every = rrdset_find_natural_update_every_for_timeframe( qt, after_wanted, before_wanted, points_wanted, options, tier); - if (update_every <= 0) update_every = qt->db.minimum_latest_update_every; + if (update_every <= 0) update_every = qt->db.minimum_latest_update_every_s; query_debug_log(":natural update every %ld", update_every); } @@ -1975,7 +2161,8 @@ RRDR *rrd2rrdr_legacy( ONEWAYALLOC *owa, RRDSET *st, size_t points, time_t after, time_t before, RRDR_GROUPING group_method, time_t resampling_time, RRDR_OPTIONS options, const char *dimensions, - const char *group_options, time_t timeout, size_t tier, QUERY_SOURCE query_source) { + const char *group_options, time_t timeout, size_t tier, QUERY_SOURCE query_source, + STORAGE_PRIORITY priority) { QUERY_TARGET_REQUEST qtr = { .st = st, @@ -1990,6 +2177,7 @@ RRDR *rrd2rrdr_legacy( .timeout = timeout, .tier = tier, .query_source = query_source, + .priority = priority, }; return rrd2rrdr(owa, query_target_create(&qtr)); @@ -2056,16 +2244,48 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { if (qt->request.timeout) now_realtime_timeval(&query_start_time); + size_t last_db_points_read = 0; + size_t last_result_points_generated = 0; + + QUERY_ENGINE_OPS **ops = onewayalloc_callocz(r->internal.owa, qt->query.used, sizeof(QUERY_ENGINE_OPS *)); + + size_t capacity = libuv_worker_threads * 2; + size_t max_queries_to_prepare = (qt->query.used > (capacity - 1)) ? (capacity - 1) : qt->query.used; + size_t queries_prepared = 0; + while(queries_prepared < max_queries_to_prepare) { + // preload another query + ops[queries_prepared] = rrd2rrdr_query_prep(r, queries_prepared); + queries_prepared++; + } + for(size_t c = 0, max = qt->query.used; c < max ; c++) { + + if(queries_prepared < max) { + // preload another query + ops[queries_prepared] = rrd2rrdr_query_prep(r, queries_prepared); + queries_prepared++; + } + // set the query target dimension options to rrdr r->od[c] = qt->query.array[c].dimension.options; - r->od[c] |= RRDR_DIMENSION_SELECTED; - // reset the grouping for the new dimension r->internal.grouping_reset(r); - rrd2rrdr_do_dimension(r, c); + if(ops[c]) { + r->od[c] |= RRDR_DIMENSION_SELECTED; + rrd2rrdr_query_execute(r, c, ops[c]); + } + + global_statistics_rrdr_query_completed( + 1, + r->internal.db_points_read - last_db_points_read, + r->internal.result_points_generated - last_result_points_generated, + qt->request.query_source); + + last_db_points_read = r->internal.db_points_read; + last_result_points_generated = r->internal.result_points_generated; + if (qt->request.timeout) now_realtime_timeval(&query_current_time); @@ -2106,6 +2326,12 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { log_access("QUERY CANCELED RUNTIME EXCEEDED %0.2f ms (LIMIT %lld ms)", (NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0, (long long)qt->request.timeout); r->result_options |= RRDR_RESULT_OPTION_CANCEL; + + for(size_t i = c + 1; i < queries_prepared ; i++) { + if(ops[i]) + query_planer_finalize_remaining_plans(ops[i]); + } + break; } } @@ -2169,7 +2395,5 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { } } - global_statistics_rrdr_query_completed(dimensions_used, r->internal.db_points_read, - r->internal.result_points_generated, qt->request.query_source); return r; } |