summaryrefslogtreecommitdiffstats
path: root/web/api/queries/query.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
commitaa2fe8ccbfcb117efa207d10229eeeac5d0f97c7 (patch)
tree941cbdd387b41c1a81587c20a6df9f0e5e0ff7ab /web/api/queries/query.c
parentAdding upstream version 1.37.1. (diff)
downloadnetdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.tar.xz
netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.zip
Adding upstream version 1.38.0.upstream/1.38.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'web/api/queries/query.c')
-rw-r--r--web/api/queries/query.c712
1 files changed, 468 insertions, 244 deletions
diff --git a/web/api/queries/query.c b/web/api/queries/query.c
index 0365b6e96..1f10c5137 100644
--- a/web/api/queries/query.c
+++ b/web/api/queries/query.c
@@ -17,6 +17,8 @@
#include "percentile/percentile.h"
#include "trimmed_mean/trimmed_mean.h"
+#define POINTS_TO_EXPAND_QUERY 5
+
// ----------------------------------------------------------------------------
static struct {
@@ -694,7 +696,7 @@ static inline void rrdr_done(RRDR *r, long rrdr_line) {
// tier management
static bool query_metric_is_valid_tier(QUERY_METRIC *qm, size_t tier) {
- if(!qm->tiers[tier].db_metric_handle || !qm->tiers[tier].db_first_time_t || !qm->tiers[tier].db_last_time_t || !qm->tiers[tier].db_update_every)
+ if(!qm->tiers[tier].db_metric_handle || !qm->tiers[tier].db_first_time_s || !qm->tiers[tier].db_last_time_s || !qm->tiers[tier].db_update_every_s)
return false;
return true;
@@ -705,11 +707,11 @@ static size_t query_metric_first_working_tier(QUERY_METRIC *qm) {
// find the db time-range for this tier for all metrics
STORAGE_METRIC_HANDLE *db_metric_handle = qm->tiers[tier].db_metric_handle;
- time_t first_t = qm->tiers[tier].db_first_time_t;
- time_t last_t = qm->tiers[tier].db_last_time_t;
- time_t update_every = qm->tiers[tier].db_update_every;
+ time_t first_time_s = qm->tiers[tier].db_first_time_s;
+ time_t last_time_s = qm->tiers[tier].db_last_time_s;
+ time_t update_every_s = qm->tiers[tier].db_update_every_s;
- if(!db_metric_handle || !first_t || !last_t || !update_every)
+ if(!db_metric_handle || !first_time_s || !last_time_s || !update_every_s)
continue;
return tier;
@@ -718,19 +720,23 @@ static size_t query_metric_first_working_tier(QUERY_METRIC *qm) {
return 0;
}
-static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_t, time_t db_update_every, time_t after_wanted, time_t before_wanted, size_t points_wanted, size_t tier __maybe_unused) {
- if(db_first_t == 0 || db_last_t == 0 || db_update_every == 0)
+static long query_plan_points_coverage_weight(time_t db_first_time_s, time_t db_last_time_s, time_t db_update_every_s, time_t after_wanted, time_t before_wanted, size_t points_wanted, size_t tier __maybe_unused) {
+ if(db_first_time_s == 0 ||
+ db_last_time_s == 0 ||
+ db_update_every_s == 0 ||
+ db_first_time_s > before_wanted ||
+ db_last_time_s < after_wanted)
return -LONG_MAX;
- time_t common_first_t = MAX(db_first_t, after_wanted);
- time_t common_last_t = MIN(db_last_t, before_wanted);
+ long long common_first_t = MAX(db_first_time_s, after_wanted);
+ long long common_last_t = MIN(db_last_time_s, before_wanted);
- long time_coverage = (common_last_t - common_first_t) * 1000000 / (before_wanted - after_wanted);
- size_t points_wanted_in_coverage = points_wanted * time_coverage / 1000000;
+ long long time_coverage = (common_last_t - common_first_t) * 1000000LL / (before_wanted - after_wanted);
+ long long points_wanted_in_coverage = (long long)points_wanted * time_coverage / 1000000LL;
- long points_available = (common_last_t - common_first_t) / db_update_every;
- long points_delta = (long)(points_available - points_wanted_in_coverage);
- long points_coverage = (points_delta < 0) ? (long)(points_available * time_coverage / points_wanted_in_coverage) : time_coverage;
+ long long points_available = (common_last_t - common_first_t) / db_update_every_s;
+ long long points_delta = (long)(points_available - points_wanted_in_coverage);
+ long long points_coverage = (points_delta < 0) ? (long)(points_available * time_coverage / points_wanted_in_coverage) : time_coverage;
// a way to benefit higher tiers
// points_coverage += (long)tier * 10000;
@@ -738,7 +744,7 @@ static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_
if(points_available <= 0)
return -LONG_MAX;
- return points_coverage;
+ return (long)(points_coverage + (25000LL * tier)); // 2.5% benefit for each higher tier
}
static size_t query_metric_best_tier_for_timeframe(QUERY_METRIC *qm, time_t after_wanted, time_t before_wanted, size_t points_wanted) {
@@ -748,27 +754,49 @@ static size_t query_metric_best_tier_for_timeframe(QUERY_METRIC *qm, time_t afte
if(unlikely(after_wanted == before_wanted || points_wanted <= 0))
return query_metric_first_working_tier(qm);
- long weight[storage_tiers];
+ time_t min_first_time_s = 0;
+ time_t max_last_time_s = 0;
+
+ for(size_t tier = 0; tier < storage_tiers ; tier++) {
+ time_t first_time_s = qm->tiers[tier].db_first_time_s;
+ time_t last_time_s = qm->tiers[tier].db_last_time_s;
+
+ if(!min_first_time_s || (first_time_s && first_time_s < min_first_time_s))
+ min_first_time_s = first_time_s;
+
+ if(!max_last_time_s || (last_time_s && last_time_s > max_last_time_s))
+ max_last_time_s = last_time_s;
+ }
for(size_t tier = 0; tier < storage_tiers ; tier++) {
// find the db time-range for this tier for all metrics
STORAGE_METRIC_HANDLE *db_metric_handle = qm->tiers[tier].db_metric_handle;
- time_t first_t = qm->tiers[tier].db_first_time_t;
- time_t last_t = qm->tiers[tier].db_last_time_t;
- time_t update_every = qm->tiers[tier].db_update_every;
-
- if(!db_metric_handle || !first_t || !last_t || !update_every) {
- weight[tier] = -LONG_MAX;
+ time_t first_time_s = qm->tiers[tier].db_first_time_s;
+ time_t last_time_s = qm->tiers[tier].db_last_time_s;
+ time_t update_every_s = qm->tiers[tier].db_update_every_s;
+
+ if( !db_metric_handle ||
+ !first_time_s ||
+ !last_time_s ||
+ !update_every_s ||
+ first_time_s > before_wanted ||
+ last_time_s < after_wanted
+ ) {
+ qm->tiers[tier].weight = -LONG_MAX;
continue;
}
- weight[tier] = query_plan_points_coverage_weight(first_t, last_t, update_every, after_wanted, before_wanted, points_wanted, tier);
+ internal_fatal(first_time_s > before_wanted || last_time_s < after_wanted, "QUERY: invalid db durations");
+
+ qm->tiers[tier].weight = query_plan_points_coverage_weight(
+ min_first_time_s, max_last_time_s, update_every_s,
+ after_wanted, before_wanted, points_wanted, tier);
}
size_t best_tier = 0;
for(size_t tier = 1; tier < storage_tiers ; tier++) {
- if(weight[tier] >= weight[best_tier])
+ if(qm->tiers[tier].weight >= qm->tiers[best_tier].weight)
best_tier = tier;
}
@@ -788,38 +816,38 @@ static size_t rrddim_find_best_tier_for_timeframe(QUERY_TARGET *qt, time_t after
for(size_t tier = 0; tier < storage_tiers ; tier++) {
- time_t common_first_t = 0;
- time_t common_last_t = 0;
- time_t common_update_every = 0;
+ time_t common_first_time_s = 0;
+ time_t common_last_time_s = 0;
+ time_t common_update_every_s = 0;
// find the db time-range for this tier for all metrics
for(size_t i = 0, used = qt->query.used; i < used ; i++) {
QUERY_METRIC *qm = &qt->query.array[i];
- time_t first_t = qm->tiers[tier].db_first_time_t;
- time_t last_t = qm->tiers[tier].db_last_time_t;
- time_t update_every = qm->tiers[tier].db_update_every;
+ time_t first_time_s = qm->tiers[tier].db_first_time_s;
+ time_t last_time_s = qm->tiers[tier].db_last_time_s;
+ time_t update_every_s = qm->tiers[tier].db_update_every_s;
- if(!first_t || !last_t || !update_every)
+ if(!first_time_s || !last_time_s || !update_every_s)
continue;
- if(!common_first_t)
- common_first_t = first_t;
+ if(!common_first_time_s)
+ common_first_time_s = first_time_s;
else
- common_first_t = MIN(first_t, common_first_t);
+ common_first_time_s = MIN(first_time_s, common_first_time_s);
- if(!common_last_t)
- common_last_t = last_t;
+ if(!common_last_time_s)
+ common_last_time_s = last_time_s;
else
- common_last_t = MAX(last_t, common_last_t);
+ common_last_time_s = MAX(last_time_s, common_last_time_s);
- if(!common_update_every)
- common_update_every = update_every;
+ if(!common_update_every_s)
+ common_update_every_s = update_every_s;
else
- common_update_every = MIN(update_every, common_update_every);
+ common_update_every_s = MIN(update_every_s, common_update_every_s);
}
- weight[tier] = query_plan_points_coverage_weight(common_first_t, common_last_t, common_update_every, after_wanted, before_wanted, points_wanted, tier);
+ weight[tier] = query_plan_points_coverage_weight(common_first_time_s, common_last_time_s, common_update_every_s, after_wanted, before_wanted, points_wanted, tier);
}
size_t best_tier = 0;
@@ -842,19 +870,19 @@ static time_t rrdset_find_natural_update_every_for_timeframe(QUERY_TARGET *qt, t
best_tier = rrddim_find_best_tier_for_timeframe(qt, after_wanted, before_wanted, points_wanted);
// find the db minimum update every for this tier for all metrics
- time_t common_update_every = default_rrd_update_every;
+ time_t common_update_every_s = default_rrd_update_every;
for(size_t i = 0, used = qt->query.used; i < used ; i++) {
QUERY_METRIC *qm = &qt->query.array[i];
- time_t update_every = qm->tiers[best_tier].db_update_every;
+ time_t update_every_s = qm->tiers[best_tier].db_update_every_s;
if(!i)
- common_update_every = update_every;
+ common_update_every_s = update_every_s;
else
- common_update_every = MIN(update_every, common_update_every);
+ common_update_every_s = MIN(update_every_s, common_update_every_s);
}
- return common_update_every;
+ return common_update_every_s;
}
// ----------------------------------------------------------------------------
@@ -888,17 +916,6 @@ QUERY_POINT QUERY_POINT_EMPTY = {
#define query_point_set_id(point, point_id) debug_dummy()
#endif
-typedef struct query_plan_entry {
- size_t tier;
- time_t after;
- time_t before;
-} QUERY_PLAN_ENTRY;
-
-typedef struct query_plan {
- size_t entries;
- QUERY_PLAN_ENTRY data[RRD_STORAGE_TIERS*2];
-} QUERY_PLAN;
-
typedef struct query_engine_ops {
// configuration
RRDR *r;
@@ -908,14 +925,15 @@ typedef struct query_engine_ops {
TIER_QUERY_FETCH tier_query_fetch;
// query planer
- QUERY_PLAN plan;
size_t current_plan;
time_t current_plan_expire_time;
+ time_t plan_expanded_after;
+ time_t plan_expanded_before;
// storage queries
size_t tier;
struct query_metric_tier *tier_ptr;
- struct storage_engine_query_handle handle;
+ struct storage_engine_query_handle *handle;
STORAGE_POINT (*next_metric)(struct storage_engine_query_handle *handle);
int (*is_finished)(struct storage_engine_query_handle *handle);
void (*finalize)(struct storage_engine_query_handle *handle);
@@ -937,31 +955,128 @@ typedef struct query_engine_ops {
// ----------------------------------------------------------------------------
// query planer
-#define query_plan_should_switch_plan(ops, now) ((now) >= (ops).current_plan_expire_time)
+#define query_plan_should_switch_plan(ops, now) ((now) >= (ops)->current_plan_expire_time)
+
+static size_t query_planer_expand_duration_in_points(time_t this_update_every, time_t next_update_every) {
+
+ time_t delta = this_update_every - next_update_every;
+ if(delta < 0) delta = -delta;
+
+ size_t points;
+ if(delta < this_update_every * POINTS_TO_EXPAND_QUERY)
+ points = POINTS_TO_EXPAND_QUERY;
+ else
+ points = (delta + this_update_every - 1) / this_update_every;
+
+ return points;
+}
-static void query_planer_activate_plan(QUERY_ENGINE_OPS *ops, size_t plan_id, time_t overwrite_after) {
- if(unlikely(plan_id >= ops->plan.entries))
- plan_id = ops->plan.entries - 1;
+static void query_planer_initialize_plans(QUERY_ENGINE_OPS *ops) {
+ QUERY_METRIC *qm = ops->qm;
+
+ for(size_t p = 0; p < qm->plan.used ; p++) {
+ size_t tier = qm->plan.array[p].tier;
+ time_t update_every = qm->tiers[tier].db_update_every_s;
+
+ size_t points_to_add_to_after;
+ if(p > 0) {
+ // there is another plan before to this
+
+ size_t tier0 = qm->plan.array[p - 1].tier;
+ time_t update_every0 = qm->tiers[tier0].db_update_every_s;
+
+ points_to_add_to_after = query_planer_expand_duration_in_points(update_every, update_every0);
+ }
+ else
+ points_to_add_to_after = (tier == 0) ? 0 : POINTS_TO_EXPAND_QUERY;
- time_t after = ops->plan.data[plan_id].after;
- time_t before = ops->plan.data[plan_id].before;
+ size_t points_to_add_to_before;
+ if(p + 1 < qm->plan.used) {
+ // there is another plan after to this
- if(overwrite_after > after && overwrite_after < before)
- after = overwrite_after;
+ size_t tier1 = qm->plan.array[p+1].tier;
+ time_t update_every1 = qm->tiers[tier1].db_update_every_s;
- ops->tier = ops->plan.data[plan_id].tier;
- ops->tier_ptr = &ops->qm->tiers[ops->tier];
- ops->tier_ptr->eng->api.query_ops.init(ops->tier_ptr->db_metric_handle, &ops->handle, after, before);
- ops->next_metric = ops->tier_ptr->eng->api.query_ops.next_metric;
- ops->is_finished = ops->tier_ptr->eng->api.query_ops.is_finished;
- ops->finalize = ops->tier_ptr->eng->api.query_ops.finalize;
+ points_to_add_to_before = query_planer_expand_duration_in_points(update_every, update_every1);
+ }
+ else
+ points_to_add_to_before = POINTS_TO_EXPAND_QUERY;
+
+ time_t after = qm->plan.array[p].after - (time_t)(update_every * points_to_add_to_after);
+ time_t before = qm->plan.array[p].before + (time_t)(update_every * points_to_add_to_before);
+
+ qm->plan.array[p].expanded_after = after;
+ qm->plan.array[p].expanded_before = before;
+
+ struct query_metric_tier *tier_ptr = &qm->tiers[tier];
+ tier_ptr->eng->api.query_ops.init(
+ tier_ptr->db_metric_handle,
+ &qm->plan.array[p].handle,
+ after, before,
+ ops->r->internal.qt->request.priority);
+
+ qm->plan.array[p].next_metric = tier_ptr->eng->api.query_ops.next_metric;
+ qm->plan.array[p].is_finished = tier_ptr->eng->api.query_ops.is_finished;
+ qm->plan.array[p].finalize = tier_ptr->eng->api.query_ops.finalize;
+ qm->plan.array[p].initialized = true;
+ qm->plan.array[p].finalized = false;
+ }
+}
+
+static void query_planer_finalize_plan(QUERY_ENGINE_OPS *ops, size_t plan_id) {
+ QUERY_METRIC *qm = ops->qm;
+
+ if(qm->plan.array[plan_id].initialized && !qm->plan.array[plan_id].finalized) {
+ qm->plan.array[plan_id].finalize(&qm->plan.array[plan_id].handle);
+ qm->plan.array[plan_id].initialized = false;
+ qm->plan.array[plan_id].finalized = true;
+ qm->plan.array[plan_id].next_metric = NULL;
+ qm->plan.array[plan_id].is_finished = NULL;
+ qm->plan.array[plan_id].finalize = NULL;
+
+ if(ops->current_plan == plan_id) {
+ ops->next_metric = NULL;
+ ops->is_finished = NULL;
+ ops->finalize = NULL;
+ }
+ }
+}
+
+static void query_planer_finalize_remaining_plans(QUERY_ENGINE_OPS *ops) {
+ QUERY_METRIC *qm = ops->qm;
+
+ for(size_t p = 0; p < qm->plan.used ; p++)
+ query_planer_finalize_plan(ops, p);
+}
+
+static void query_planer_activate_plan(QUERY_ENGINE_OPS *ops, size_t plan_id, time_t overwrite_after __maybe_unused) {
+ QUERY_METRIC *qm = ops->qm;
+
+ internal_fatal(plan_id >= qm->plan.used, "QUERY: invalid plan_id given");
+ internal_fatal(!qm->plan.array[plan_id].initialized, "QUERY: plan has not been initialized");
+ internal_fatal(qm->plan.array[plan_id].finalized, "QUERY: plan has been finalized");
+
+ internal_fatal(qm->plan.array[plan_id].after > qm->plan.array[plan_id].before, "QUERY: flipped after/before");
+
+ ops->tier = qm->plan.array[plan_id].tier;
+ ops->tier_ptr = &qm->tiers[ops->tier];
+ ops->handle = &qm->plan.array[plan_id].handle;
+ ops->next_metric = qm->plan.array[plan_id].next_metric;
+ ops->is_finished = qm->plan.array[plan_id].is_finished;
+ ops->finalize = qm->plan.array[plan_id].finalize;
ops->current_plan = plan_id;
- ops->current_plan_expire_time = ops->plan.data[plan_id].before;
+
+ if(plan_id + 1 < qm->plan.used && qm->plan.array[plan_id + 1].after < qm->plan.array[plan_id].before)
+ ops->current_plan_expire_time = qm->plan.array[plan_id + 1].after;
+ else
+ ops->current_plan_expire_time = qm->plan.array[plan_id].before;
+
+ ops->plan_expanded_after = qm->plan.array[plan_id].expanded_after;
+ ops->plan_expanded_before = qm->plan.array[plan_id].expanded_before;
}
-static void query_planer_next_plan(QUERY_ENGINE_OPS *ops, time_t now, time_t last_point_end_time) {
- internal_error(now < ops->current_plan_expire_time && now < ops->plan.data[ops->current_plan].before,
- "QUERY: switching query plan too early!");
+static bool query_planer_next_plan(QUERY_ENGINE_OPS *ops, time_t now, time_t last_point_end_time) {
+ QUERY_METRIC *qm = ops->qm;
size_t old_plan = ops->current_plan;
@@ -969,32 +1084,26 @@ static void query_planer_next_plan(QUERY_ENGINE_OPS *ops, time_t now, time_t las
do {
ops->current_plan++;
- if (ops->current_plan >= ops->plan.entries) {
+ if (ops->current_plan >= qm->plan.used) {
ops->current_plan = old_plan;
ops->current_plan_expire_time = ops->r->internal.qt->window.before;
// let the query run with current plan
// we will not switch it
- return;
+ return false;
}
- next_plan_before_time = ops->plan.data[ops->current_plan].before;
+ next_plan_before_time = qm->plan.array[ops->current_plan].before;
} while(now >= next_plan_before_time || last_point_end_time >= next_plan_before_time);
- if(!query_metric_is_valid_tier(ops->qm, ops->plan.data[ops->current_plan].tier)) {
+ if(!query_metric_is_valid_tier(qm, qm->plan.array[ops->current_plan].tier)) {
ops->current_plan = old_plan;
ops->current_plan_expire_time = ops->r->internal.qt->window.before;
- return;
- }
-
- if(ops->finalize) {
- ops->finalize(&ops->handle);
- ops->finalize = NULL;
- ops->is_finished = NULL;
+ return false;
}
- // internal_error(true, "QUERY: switched plan to %zu (all is %zu), previous expiration was %ld, this starts at %ld, now is %ld, last_point_end_time %ld", ops->current_plan, ops->plan.entries, ops->plan.data[ops->current_plan-1].before, ops->plan.data[ops->current_plan].after, now, last_point_end_time);
-
+ query_planer_finalize_plan(ops, old_plan);
query_planer_activate_plan(ops, ops->current_plan, MIN(now, last_point_end_time));
+ return true;
}
static int compare_query_plan_entries_on_start_time(const void *a, const void *b) {
@@ -1004,59 +1113,66 @@ static int compare_query_plan_entries_on_start_time(const void *a, const void *b
}
static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before_wanted, size_t points_wanted) {
- //BUFFER *wb = buffer_create(1000);
- //buffer_sprintf(wb, "QUERY PLAN for chart '%s' dimension '%s', from %ld to %ld:", rd->rrdset->name, rd->name, after_wanted, before_wanted);
+ QUERY_METRIC *qm = ops->qm;
// put our selected tier as the first plan
size_t selected_tier;
if(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER
&& ops->r->internal.qt->window.tier < storage_tiers
- && query_metric_is_valid_tier(ops->qm, ops->r->internal.qt->window.tier)) {
+ && query_metric_is_valid_tier(qm, ops->r->internal.qt->window.tier)) {
selected_tier = ops->r->internal.qt->window.tier;
}
else {
- selected_tier = query_metric_best_tier_for_timeframe(ops->qm, after_wanted, before_wanted, points_wanted);
+ selected_tier = query_metric_best_tier_for_timeframe(qm, after_wanted, before_wanted, points_wanted);
if(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER)
ops->r->internal.query_options &= ~RRDR_OPTION_SELECTED_TIER;
+
+ if(!query_metric_is_valid_tier(qm, selected_tier))
+ return false;
+
+ if(qm->tiers[selected_tier].db_first_time_s > before_wanted ||
+ qm->tiers[selected_tier].db_last_time_s < after_wanted)
+ return false;
}
- ops->plan.entries = 1;
- ops->plan.data[0].tier = selected_tier;
- ops->plan.data[0].after = ops->qm->tiers[selected_tier].db_first_time_t;
- ops->plan.data[0].before = ops->qm->tiers[selected_tier].db_last_time_t;
+ qm->plan.used = 1;
+ qm->plan.array[0].tier = selected_tier;
+ qm->plan.array[0].after = (qm->tiers[selected_tier].db_first_time_s < after_wanted) ? after_wanted : qm->tiers[selected_tier].db_first_time_s;
+ qm->plan.array[0].before = (qm->tiers[selected_tier].db_last_time_s > before_wanted) ? before_wanted : qm->tiers[selected_tier].db_last_time_s;
if(!(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER)) {
// the selected tier
- time_t selected_tier_first_time_t = ops->plan.data[0].after;
- time_t selected_tier_last_time_t = ops->plan.data[0].before;
-
- //buffer_sprintf(wb, ": SELECTED tier %zu, from %ld to %ld", selected_tier, ops->plan.data[0].after, ops->plan.data[0].before);
+ time_t selected_tier_first_time_s = qm->plan.array[0].after;
+ time_t selected_tier_last_time_s = qm->plan.array[0].before;
// check if our selected tier can start the query
- if (selected_tier_first_time_t > after_wanted) {
+ if (selected_tier_first_time_s > after_wanted) {
// we need some help from other tiers
for (size_t tr = (int)selected_tier + 1; tr < storage_tiers; tr++) {
- if(!query_metric_is_valid_tier(ops->qm, tr))
+ if(!query_metric_is_valid_tier(qm, tr))
continue;
// find the first time of this tier
- time_t first_time_t = ops->qm->tiers[tr].db_first_time_t;
-
- //buffer_sprintf(wb, ": EVAL AFTER tier %d, %ld", tier, first_time_t);
+ time_t tier_first_time_s = qm->tiers[tr].db_first_time_s;
// can it help?
- if (first_time_t < selected_tier_first_time_t) {
+ if (tier_first_time_s < selected_tier_first_time_s) {
// it can help us add detail at the beginning of the query
QUERY_PLAN_ENTRY t = {
.tier = tr,
- .after = (first_time_t < after_wanted) ? after_wanted : first_time_t,
- .before = selected_tier_first_time_t};
- ops->plan.data[ops->plan.entries++] = t;
+ .after = (tier_first_time_s < after_wanted) ? after_wanted : tier_first_time_s,
+ .before = selected_tier_first_time_s,
+ .initialized = false,
+ .finalized = false,
+ };
+ qm->plan.array[qm->plan.used++] = t;
+
+ internal_fatal(!t.after || !t.before, "QUERY: invalid plan selected");
// prepare for the tier
- selected_tier_first_time_t = t.after;
+ selected_tier_first_time_s = t.after;
if (t.after <= after_wanted)
break;
@@ -1065,28 +1181,33 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before
}
// check if our selected tier can finish the query
- if (selected_tier_last_time_t < before_wanted) {
+ if (selected_tier_last_time_s < before_wanted) {
// we need some help from other tiers
for (int tr = (int)selected_tier - 1; tr >= 0; tr--) {
- if(!query_metric_is_valid_tier(ops->qm, tr))
+ if(!query_metric_is_valid_tier(qm, tr))
continue;
// find the last time of this tier
- time_t last_time_t = ops->qm->tiers[tr].db_last_time_t;
+ time_t tier_last_time_s = qm->tiers[tr].db_last_time_s;
- //buffer_sprintf(wb, ": EVAL BEFORE tier %d, %ld", tier, last_time_t);
+ //buffer_sprintf(wb, ": EVAL BEFORE tier %d, %ld", tier, last_time_s);
// can it help?
- if (last_time_t > selected_tier_last_time_t) {
+ if (tier_last_time_s > selected_tier_last_time_s) {
// it can help us add detail at the end of the query
QUERY_PLAN_ENTRY t = {
.tier = tr,
- .after = selected_tier_last_time_t,
- .before = (last_time_t > before_wanted) ? before_wanted : last_time_t};
- ops->plan.data[ops->plan.entries++] = t;
+ .after = selected_tier_last_time_s,
+ .before = (tier_last_time_s > before_wanted) ? before_wanted : tier_last_time_s,
+ .initialized = false,
+ .finalized = false,
+ };
+ qm->plan.array[qm->plan.used++] = t;
// prepare for the tier
- selected_tier_last_time_t = t.before;
+ selected_tier_last_time_s = t.before;
+
+ internal_fatal(!t.after || !t.before, "QUERY: invalid plan selected");
if (t.before >= before_wanted)
break;
@@ -1096,26 +1217,21 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before
}
// sort the query plan
- if(ops->plan.entries > 1)
- qsort(&ops->plan.data, ops->plan.entries, sizeof(QUERY_PLAN_ENTRY), compare_query_plan_entries_on_start_time);
+ if(qm->plan.used > 1)
+ qsort(&qm->plan.array, qm->plan.used, sizeof(QUERY_PLAN_ENTRY), compare_query_plan_entries_on_start_time);
- // make sure it has the whole timeframe we need
- if(ops->plan.data[0].after < after_wanted)
- ops->plan.data[0].after = after_wanted;
-
- if(ops->plan.data[ops->plan.entries - 1].before > before_wanted)
- ops->plan.data[ops->plan.entries - 1].before = before_wanted;
-
- //buffer_sprintf(wb, ": FINAL STEPS %zu", ops->plan.entries);
-
- //for(size_t i = 0; i < ops->plan.entries ;i++)
- // buffer_sprintf(wb, ": STEP %zu = use tier %zu from %ld to %ld", i+1, ops->plan.data[i].tier, ops->plan.data[i].after, ops->plan.data[i].before);
-
- //internal_error(true, "%s", buffer_tostring(wb));
-
- if(!query_metric_is_valid_tier(ops->qm, ops->plan.data[0].tier))
+ if(!query_metric_is_valid_tier(qm, qm->plan.array[0].tier))
return false;
+#ifdef NETDATA_INTERNAL_CHECKS
+ for(size_t p = 0; p < qm->plan.used ;p++) {
+ internal_fatal(qm->plan.array[p].after > qm->plan.array[p].before, "QUERY: flipped after/before");
+ internal_fatal(qm->plan.array[p].after < after_wanted, "QUERY: too small plan first time");
+ internal_fatal(qm->plan.array[p].before > before_wanted, "QUERY: too big plan last time");
+ }
+#endif
+
+ query_planer_initialize_plans(ops);
query_planer_activate_plan(ops, 0, 0);
return true;
@@ -1146,24 +1262,45 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before
#define query_add_point_to_group(r, point, ops) do { \
if(likely(netdata_double_isnumber((point).value))) { \
if(likely(fpclassify((point).value) != FP_ZERO)) \
- (ops).group_points_non_zero++; \
+ (ops)->group_points_non_zero++; \
\
if(unlikely((point).flags & SN_FLAG_RESET)) \
- (ops).group_value_flags |= RRDR_VALUE_RESET; \
+ (ops)->group_value_flags |= RRDR_VALUE_RESET; \
\
- (ops).grouping_add(r, (point).value); \
+ (ops)->grouping_add(r, (point).value); \
} \
\
- (ops).group_points_added++; \
- (ops).group_anomaly_rate += (point).anomaly; \
+ (ops)->group_points_added++; \
+ (ops)->group_anomaly_rate += (point).anomaly; \
} while(0)
-static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
+static QUERY_ENGINE_OPS *rrd2rrdr_query_prep(RRDR *r, size_t dim_id_in_rrdr) {
QUERY_TARGET *qt = r->internal.qt;
- QUERY_METRIC *qm = &qt->query.array[dim_id_in_rrdr];
+
+ QUERY_ENGINE_OPS *ops = onewayalloc_mallocz(r->internal.owa, sizeof(QUERY_ENGINE_OPS));
+ *ops = (QUERY_ENGINE_OPS) {
+ .r = r,
+ .qm = &qt->query.array[dim_id_in_rrdr],
+ .grouping_add = r->internal.grouping_add,
+ .grouping_flush = r->internal.grouping_flush,
+ .tier_query_fetch = r->internal.tier_query_fetch,
+ .view_update_every = r->update_every,
+ .query_granularity = (time_t)(r->update_every / r->group),
+ .group_value_flags = RRDR_VALUE_NOTHING,
+ };
+
+ if(!query_plan(ops, qt->window.after, qt->window.before, qt->window.points))
+ return NULL;
+
+ return ops;
+}
+
+static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_OPS *ops) {
+ QUERY_TARGET *qt = r->internal.qt;
+ QUERY_METRIC *qm = &qt->query.array[dim_id_in_rrdr]; (void)qm;
size_t points_wanted = qt->window.points;
time_t after_wanted = qt->window.after;
- time_t before_wanted = qt->window.before;
+ time_t before_wanted = qt->window.before; (void)before_wanted;
// bool debug_this = false;
// if(strcmp("user", string2str(rd->id)) == 0 && strcmp("system.cpu", string2str(rd->rrdset->id)) == 0)
@@ -1174,39 +1311,30 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
size_t points_added = 0;
- QUERY_ENGINE_OPS ops = {
- .r = r,
- .qm = qm,
- .grouping_add = r->internal.grouping_add,
- .grouping_flush = r->internal.grouping_flush,
- .tier_query_fetch = r->internal.tier_query_fetch,
- .view_update_every = r->update_every,
- .query_granularity = (time_t)(r->update_every / r->group),
- .group_value_flags = RRDR_VALUE_NOTHING
- };
-
long rrdr_line = -1;
bool use_anomaly_bit_as_value = (r->internal.query_options & RRDR_OPTION_ANOMALY_BIT) ? true : false;
- if(!query_plan(&ops, after_wanted, before_wanted, points_wanted))
- return;
-
NETDATA_DOUBLE min = r->min, max = r->max;
QUERY_POINT last2_point = QUERY_POINT_EMPTY;
QUERY_POINT last1_point = QUERY_POINT_EMPTY;
QUERY_POINT new_point = QUERY_POINT_EMPTY;
- time_t now_start_time = after_wanted - ops.query_granularity;
- time_t now_end_time = after_wanted + ops.view_update_every - ops.query_granularity;
+ // ONE POINT READ-AHEAD
+ // when we switch plans, we read-ahead a point from the next plan
+ // to join them smoothly at the exact time the next plan begins
+ STORAGE_POINT next1_point = STORAGE_POINT_UNSET;
+
+ time_t now_start_time = after_wanted - ops->query_granularity;
+ time_t now_end_time = after_wanted + ops->view_update_every - ops->query_granularity;
size_t db_points_read_since_plan_switch = 0; (void)db_points_read_since_plan_switch;
// The main loop, based on the query granularity we need
- for( ; points_added < points_wanted ; now_start_time = now_end_time, now_end_time += ops.view_update_every) {
+ for( ; points_added < points_wanted ; now_start_time = now_end_time, now_end_time += ops->view_update_every) {
if(unlikely(query_plan_should_switch_plan(ops, now_end_time))) {
- query_planer_next_plan(&ops, now_end_time, new_point.end_time);
+ query_planer_next_plan(ops, now_end_time, new_point.end_time);
db_points_read_since_plan_switch = 0;
}
@@ -1219,7 +1347,7 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
last1_point = new_point;
}
- if(unlikely(ops.is_finished(&ops.handle))) {
+ if(unlikely(ops->is_finished(ops->handle))) {
if(count_same_end_time != 0) {
last2_point = last1_point;
last1_point = new_point;
@@ -1235,29 +1363,62 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
// fetch the new point
{
- db_points_read_since_plan_switch++;
- STORAGE_POINT sp = ops.next_metric(&ops.handle);
+ STORAGE_POINT sp;
+ if(likely(storage_point_is_unset(next1_point))) {
+ db_points_read_since_plan_switch++;
+ sp = ops->next_metric(ops->handle);
+ }
+ else {
+ // ONE POINT READ-AHEAD
+ sp = next1_point;
+ storage_point_unset(next1_point);
+ db_points_read_since_plan_switch = 1;
+ }
+
+ // ONE POINT READ-AHEAD
+ if(unlikely(query_plan_should_switch_plan(ops, sp.end_time_s) &&
+ query_planer_next_plan(ops, now_end_time, new_point.end_time))) {
+
+ // The end time of the current point, crosses our plans (tiers)
+ // so, we switched plan (tier)
+ //
+ // There are 2 cases now:
+ //
+ // A. the entire point of the previous plan is to the future of point from the next plan
+ // B. part of the point of the previous plan overlaps with the point from the next plan
+
+ STORAGE_POINT sp2 = ops->next_metric(ops->handle);
+
+ if(sp.start_time_s > sp2.start_time_s)
+ // the point from the previous plan is useless
+ sp = sp2;
+ else
+ // let the query run from the previous plan
+ // but setting this will also cut off the interpolation
+ // of the point from the previous plan
+ next1_point = sp2;
+ }
- ops.db_points_read_per_tier[ops.tier]++;
- ops.db_total_points_read++;
+ ops->db_points_read_per_tier[ops->tier]++;
+ ops->db_total_points_read++;
- new_point.start_time = sp.start_time;
- new_point.end_time = sp.end_time;
+ new_point.start_time = sp.start_time_s;
+ new_point.end_time = sp.end_time_s;
new_point.anomaly = sp.count ? (NETDATA_DOUBLE)sp.anomaly_count * 100.0 / (NETDATA_DOUBLE)sp.count : 0.0;
- query_point_set_id(new_point, ops.db_total_points_read);
+ query_point_set_id(new_point, ops->db_total_points_read);
// if(debug_this)
// info("QUERY: got point %zu, from time %ld to %ld // now from %ld to %ld // query from %ld to %ld",
// new_point.id, new_point.start_time, new_point.end_time, now_start_time, now_end_time, after_wanted, before_wanted);
//
- // set the right value to the point we got
- if(likely(!storage_point_is_unset(sp) && !storage_point_is_empty(sp))) {
+ // get the right value from the point we got
+ if(likely(!storage_point_is_unset(sp) && !storage_point_is_gap(sp))) {
if(unlikely(use_anomaly_bit_as_value))
new_point.value = new_point.anomaly;
else {
- switch (ops.tier_query_fetch) {
+ switch (ops->tier_query_fetch) {
default:
case TIER_QUERY_FETCH_AVERAGE:
new_point.value = sp.sum / sp.count;
@@ -1284,19 +1445,30 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
}
// check if the db is giving us zero duration points
- if(unlikely(new_point.start_time == new_point.end_time)) {
- internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned point %zu start time %ld, end time %ld, that are both equal",
- qt->id, string2str(qm->dimension.id), new_point.id, new_point.start_time, new_point.end_time);
+ if(unlikely(db_points_read_since_plan_switch > 1 &&
+ new_point.start_time == new_point.end_time)) {
- new_point.start_time = new_point.end_time - ops.tier_ptr->db_update_every;
+ internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned "
+ "point %zu from %ld to %ld, that are both equal",
+ qt->id, string2str(qm->dimension.id),
+ new_point.id, new_point.start_time, new_point.end_time);
+
+ new_point.start_time = new_point.end_time - ops->tier_ptr->db_update_every_s;
}
// check if the db is advancing the query
- if(unlikely(new_point.end_time <= last1_point.end_time)) {
- internal_error(db_points_read_since_plan_switch > 1,
- "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld to %ld, before the last point %zu from %ld to %ld, now is %ld to %ld",
- qt->id, string2str(qm->dimension.id), new_point.id, new_point.start_time, new_point.end_time,
- last1_point.id, last1_point.start_time, last1_point.end_time, now_start_time, now_end_time);
+ if(unlikely(db_points_read_since_plan_switch > 1 &&
+ new_point.end_time <= last1_point.end_time)) {
+
+ internal_error(true,
+ "QUERY: '%s', dimension '%s' next_metric() returned "
+ "point %zu from %ld to %ld, before the "
+ "last point %zu from %ld to %ld, "
+ "now is %ld to %ld",
+ qt->id, string2str(qm->dimension.id),
+ new_point.id, new_point.start_time, new_point.end_time,
+ last1_point.id, last1_point.start_time, last1_point.end_time,
+ now_start_time, now_end_time);
count_same_end_time++;
continue;
@@ -1321,12 +1493,16 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
// at exactly the time we will want
// we only log if this is not point 1
- internal_error(new_point.end_time < after_wanted && new_point.id > 1,
- "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld time %ld, which is entirely before our current timeframe %ld to %ld (and before the entire query, after %ld, before %ld)",
+ internal_error(new_point.end_time < ops->plan_expanded_after &&
+ db_points_read_since_plan_switch > 1,
+ "QUERY: '%s', dimension '%s' next_metric() "
+ "returned point %zu from %ld time %ld, "
+ "which is entirely before our current timeframe %ld to %ld "
+ "(and before the entire query, after %ld, before %ld)",
qt->id, string2str(qm->dimension.id),
new_point.id, new_point.start_time, new_point.end_time,
now_start_time, now_end_time,
- after_wanted, before_wanted);
+ ops->plan_expanded_after, ops->plan_expanded_before);
}
}
@@ -1339,20 +1515,31 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
if(unlikely(count_same_end_time)) {
internal_error(true,
- "QUERY: '%s', dimension '%s', the database does not advance the query, it returned an end time less or equal to the end time of the last point we got %ld, %zu times",
- qt->id, string2str(qm->dimension.id), last1_point.end_time, count_same_end_time);
+ "QUERY: '%s', dimension '%s', the database does not advance the query,"
+ " it returned an end time less or equal to the end time of the last "
+ "point we got %ld, %zu times",
+ qt->id, string2str(qm->dimension.id),
+ last1_point.end_time, count_same_end_time);
if(unlikely(new_point.end_time <= last1_point.end_time))
new_point.end_time = now_end_time;
}
+ time_t stop_time = new_point.end_time;
+ if(unlikely(!storage_point_is_unset(next1_point))) {
+ // ONE POINT READ-AHEAD
+ // the point crosses the start time of the
+ // read ahead storage point we have read
+ stop_time = next1_point.start_time_s;
+ }
+
// the inner loop
// we have 3 points in memory: last2, last1, new
// we select the one to use based on their timestamps
size_t iterations = 0;
- for ( ; now_end_time <= new_point.end_time && points_added < points_wanted ;
- now_end_time += ops.view_update_every, iterations++) {
+ for ( ; now_end_time <= stop_time && points_added < points_wanted ;
+ now_end_time += ops->view_update_every, iterations++) {
// now_start_time is wrong in this loop
// but, we don't need it
@@ -1411,20 +1598,20 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
RRDR_VALUE_FLAGS *rrdr_value_options_ptr = &r->o[rrdr_o_v_index];
// update the dimension options
- if(likely(ops.group_points_non_zero))
+ if(likely(ops->group_points_non_zero))
r->od[dim_id_in_rrdr] |= RRDR_DIMENSION_NONZERO;
// store the specific point options
- *rrdr_value_options_ptr = ops.group_value_flags;
+ *rrdr_value_options_ptr = ops->group_value_flags;
// store the group value
- NETDATA_DOUBLE group_value = ops.grouping_flush(r, rrdr_value_options_ptr);
+ NETDATA_DOUBLE group_value = ops->grouping_flush(r, rrdr_value_options_ptr);
r->v[rrdr_o_v_index] = group_value;
// we only store uint8_t anomaly rates,
// so let's get double precision by storing
// anomaly rates in the range 0 - 200
- r->ar[rrdr_o_v_index] = ops.group_anomaly_rate / (NETDATA_DOUBLE)ops.group_points_added;
+ r->ar[rrdr_o_v_index] = ops->group_anomaly_rate / (NETDATA_DOUBLE)ops->group_points_added;
if(likely(points_added || dim_id_in_rrdr)) {
// find the min/max across all dimensions
@@ -1440,72 +1627,71 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
}
points_added++;
- ops.group_points_added = 0;
- ops.group_value_flags = RRDR_VALUE_NOTHING;
- ops.group_points_non_zero = 0;
- ops.group_anomaly_rate = 0;
+ ops->group_points_added = 0;
+ ops->group_value_flags = RRDR_VALUE_NOTHING;
+ ops->group_points_non_zero = 0;
+ ops->group_anomaly_rate = 0;
}
// the loop above increased "now" by query_granularity,
// but the main loop will increase it too,
// so, let's undo the last iteration of this loop
if(iterations)
- now_end_time -= ops.view_update_every;
+ now_end_time -= ops->view_update_every;
}
- ops.finalize(&ops.handle);
+ query_planer_finalize_remaining_plans(ops);
r->internal.result_points_generated += points_added;
- r->internal.db_points_read += ops.db_total_points_read;
+ r->internal.db_points_read += ops->db_total_points_read;
for(size_t tr = 0; tr < storage_tiers ; tr++)
- r->internal.tier_points_read[tr] += ops.db_points_read_per_tier[tr];
+ r->internal.tier_points_read[tr] += ops->db_points_read_per_tier[tr];
r->min = min;
r->max = max;
r->before = max_date;
- r->after = min_date - ops.view_update_every + ops.query_granularity;
+ r->after = min_date - ops->view_update_every + ops->query_granularity;
rrdr_done(r, rrdr_line);
internal_error(points_added != points_wanted,
"QUERY: '%s', dimension '%s', requested %zu points, but RRDR added %zu (%zu db points read).",
qt->id, string2str(qm->dimension.id),
- (size_t)points_wanted, (size_t)points_added, ops.db_total_points_read);
+ (size_t)points_wanted, (size_t)points_added, ops->db_total_points_read);
}
// ----------------------------------------------------------------------------
// fill the gap of a tier
void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut);
-void store_metric_collection_completed(void);
-void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now) {
+void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s) {
if(unlikely(tier >= storage_tiers)) return;
if(storage_tiers_backfill[tier] == RRD_BACKFILL_NONE) return;
- struct rrddim_tier *t = rd->tiers[tier];
+ struct rrddim_tier *t = &rd->tiers[tier];
if(unlikely(!t)) return;
- time_t latest_time_t = t->query_ops->latest_time(t->db_metric_handle);
+ time_t latest_time_s = t->query_ops->latest_time_s(t->db_metric_handle);
time_t granularity = (time_t)t->tier_grouping * (time_t)rd->update_every;
- time_t time_diff = now - latest_time_t;
+ time_t time_diff = now_s - latest_time_s;
// if the user wants only NEW backfilling, and we don't have any data
- if(storage_tiers_backfill[tier] == RRD_BACKFILL_NEW && latest_time_t <= 0) return;
+ if(storage_tiers_backfill[tier] == RRD_BACKFILL_NEW && latest_time_s <= 0) return;
// there is really nothing we can do
- if(now <= latest_time_t || time_diff < granularity) return;
+ if(now_s <= latest_time_s || time_diff < granularity) return;
struct storage_engine_query_handle handle;
// for each lower tier
for(int read_tier = (int)tier - 1; read_tier >= 0 ; read_tier--){
- time_t smaller_tier_first_time = rd->tiers[read_tier]->query_ops->oldest_time(rd->tiers[read_tier]->db_metric_handle);
- time_t smaller_tier_last_time = rd->tiers[read_tier]->query_ops->latest_time(rd->tiers[read_tier]->db_metric_handle);
- if(smaller_tier_last_time <= latest_time_t) continue; // it is as bad as we are
+ time_t smaller_tier_first_time = rd->tiers[read_tier].query_ops->oldest_time_s(rd->tiers[read_tier].db_metric_handle);
+ time_t smaller_tier_last_time = rd->tiers[read_tier].query_ops->latest_time_s(rd->tiers[read_tier].db_metric_handle);
+ if(smaller_tier_last_time <= latest_time_s) continue; // it is as bad as we are
- long after_wanted = (latest_time_t < smaller_tier_first_time) ? smaller_tier_first_time : latest_time_t;
+ long after_wanted = (latest_time_s < smaller_tier_first_time) ? smaller_tier_first_time : latest_time_s;
long before_wanted = smaller_tier_last_time;
- struct rrddim_tier *tmp = rd->tiers[read_tier];
- tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted);
+ struct rrddim_tier *tmp = &rd->tiers[read_tier];
+ tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted, STORAGE_PRIORITY_HIGH);
size_t points_read = 0;
@@ -1514,9 +1700,9 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now)
STORAGE_POINT sp = tmp->query_ops->next_metric(&handle);
points_read++;
- if(sp.end_time > latest_time_t) {
- latest_time_t = sp.end_time;
- store_metric_at_tier(rd, tier, t, sp, sp.end_time * USEC_PER_SEC);
+ if(sp.end_time_s > latest_time_s) {
+ latest_time_s = sp.end_time_s;
+ store_metric_at_tier(rd, tier, t, sp, sp.end_time_s * USEC_PER_SEC);
}
}
@@ -1551,12 +1737,12 @@ static void rrd2rrdr_log_request_response_metadata(RRDR *r
, const char *msg
) {
- time_t first_entry_t = r->internal.qt->db.first_time_t;
- time_t last_entry_t = r->internal.qt->db.last_time_t;
+ time_t first_entry_s = r->internal.qt->db.first_time_s;
+ time_t last_entry_s = r->internal.qt->db.last_time_s;
internal_error(
- true,
- "rrd2rrdr() on %s update every %ld with %s grouping %s (group: %zu, resampling_time: %ld, resampling_group: %zu), "
+ true,
+ "rrd2rrdr() on %s update every %ld with %s grouping %s (group: %zu, resampling_time: %ld, resampling_group: %zu), "
"after (got: %ld, want: %ld, req: %ld, db: %ld), "
"before (got: %ld, want: %ld, req: %ld, db: %ld), "
"duration (got: %ld, want: %ld, req: %ld, db: %ld), "
@@ -1576,19 +1762,19 @@ static void rrd2rrdr_log_request_response_metadata(RRDR *r
, r->after
, after_wanted
, after_requested
- , first_entry_t
+ , first_entry_s
// before
, r->before
, before_wanted
, before_requested
- , last_entry_t
+ , last_entry_s
// duration
, (long)(r->before - r->after + r->internal.qt->window.query_granularity)
, (long)(before_wanted - after_wanted + r->internal.qt->window.query_granularity)
, (long)before_requested - after_requested
- , (long)((last_entry_t - first_entry_t) + r->internal.qt->window.query_granularity)
+ , (long)((last_entry_s - first_entry_s) + r->internal.qt->window.query_granularity)
// points
, r->rows
@@ -1708,7 +1894,7 @@ bool query_target_calculate_window(QUERY_TARGET *qt) {
time_t resampling_time_requested = qt->request.resampling_time;
RRDR_OPTIONS options = qt->request.options;
size_t tier = qt->request.tier;
- time_t update_every = qt->db.minimum_latest_update_every;
+ time_t update_every = qt->db.minimum_latest_update_every_s;
// RULES
// points_requested = 0
@@ -1763,30 +1949,30 @@ bool query_target_calculate_window(QUERY_TARGET *qt) {
if (after_wanted == 0 || before_wanted == 0) {
relative_period_requested = true;
- time_t first_entry_t = qt->db.first_time_t;
- time_t last_entry_t = qt->db.last_time_t;
+ time_t first_entry_s = qt->db.first_time_s;
+ time_t last_entry_s = qt->db.last_time_s;
- if (first_entry_t == 0 || last_entry_t == 0) {
- internal_error(true, "QUERY: no data detected on query '%s' (db first_entry_t = %ld, last_entry_t = %ld", qt->id, first_entry_t, last_entry_t);
+ if (first_entry_s == 0 || last_entry_s == 0) {
+ internal_error(true, "QUERY: no data detected on query '%s' (db first_entry_t = %ld, last_entry_t = %ld", qt->id, first_entry_s, last_entry_s);
query_debug_log_free();
return false;
}
- query_debug_log(":first_entry_t %ld, last_entry_t %ld", first_entry_t, last_entry_t);
+ query_debug_log(":first_entry_t %ld, last_entry_t %ld", first_entry_s, last_entry_s);
if (after_wanted == 0) {
- after_wanted = first_entry_t;
+ after_wanted = first_entry_s;
query_debug_log(":zero after_wanted %ld", after_wanted);
}
if (before_wanted == 0) {
- before_wanted = last_entry_t;
+ before_wanted = last_entry_s;
before_is_aligned_to_db_end = true;
query_debug_log(":zero before_wanted %ld", before_wanted);
}
if (points_wanted == 0) {
- points_wanted = (last_entry_t - first_entry_t) / update_every;
+ points_wanted = (last_entry_s - first_entry_s) / update_every;
query_debug_log(":zero points_wanted %zu", points_wanted);
}
}
@@ -1804,7 +1990,7 @@ bool query_target_calculate_window(QUERY_TARGET *qt) {
update_every = rrdset_find_natural_update_every_for_timeframe(
qt, after_wanted, before_wanted, points_wanted, options, tier);
- if (update_every <= 0) update_every = qt->db.minimum_latest_update_every;
+ if (update_every <= 0) update_every = qt->db.minimum_latest_update_every_s;
query_debug_log(":natural update every %ld", update_every);
}
@@ -1975,7 +2161,8 @@ RRDR *rrd2rrdr_legacy(
ONEWAYALLOC *owa,
RRDSET *st, size_t points, time_t after, time_t before,
RRDR_GROUPING group_method, time_t resampling_time, RRDR_OPTIONS options, const char *dimensions,
- const char *group_options, time_t timeout, size_t tier, QUERY_SOURCE query_source) {
+ const char *group_options, time_t timeout, size_t tier, QUERY_SOURCE query_source,
+ STORAGE_PRIORITY priority) {
QUERY_TARGET_REQUEST qtr = {
.st = st,
@@ -1990,6 +2177,7 @@ RRDR *rrd2rrdr_legacy(
.timeout = timeout,
.tier = tier,
.query_source = query_source,
+ .priority = priority,
};
return rrd2rrdr(owa, query_target_create(&qtr));
@@ -2056,16 +2244,48 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
if (qt->request.timeout)
now_realtime_timeval(&query_start_time);
+ size_t last_db_points_read = 0;
+ size_t last_result_points_generated = 0;
+
+ QUERY_ENGINE_OPS **ops = onewayalloc_callocz(r->internal.owa, qt->query.used, sizeof(QUERY_ENGINE_OPS *));
+
+ size_t capacity = libuv_worker_threads * 2;
+ size_t max_queries_to_prepare = (qt->query.used > (capacity - 1)) ? (capacity - 1) : qt->query.used;
+ size_t queries_prepared = 0;
+ while(queries_prepared < max_queries_to_prepare) {
+ // preload another query
+ ops[queries_prepared] = rrd2rrdr_query_prep(r, queries_prepared);
+ queries_prepared++;
+ }
+
for(size_t c = 0, max = qt->query.used; c < max ; c++) {
+
+ if(queries_prepared < max) {
+ // preload another query
+ ops[queries_prepared] = rrd2rrdr_query_prep(r, queries_prepared);
+ queries_prepared++;
+ }
+
// set the query target dimension options to rrdr
r->od[c] = qt->query.array[c].dimension.options;
- r->od[c] |= RRDR_DIMENSION_SELECTED;
-
// reset the grouping for the new dimension
r->internal.grouping_reset(r);
- rrd2rrdr_do_dimension(r, c);
+ if(ops[c]) {
+ r->od[c] |= RRDR_DIMENSION_SELECTED;
+ rrd2rrdr_query_execute(r, c, ops[c]);
+ }
+
+ global_statistics_rrdr_query_completed(
+ 1,
+ r->internal.db_points_read - last_db_points_read,
+ r->internal.result_points_generated - last_result_points_generated,
+ qt->request.query_source);
+
+ last_db_points_read = r->internal.db_points_read;
+ last_result_points_generated = r->internal.result_points_generated;
+
if (qt->request.timeout)
now_realtime_timeval(&query_current_time);
@@ -2106,6 +2326,12 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
log_access("QUERY CANCELED RUNTIME EXCEEDED %0.2f ms (LIMIT %lld ms)",
(NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0, (long long)qt->request.timeout);
r->result_options |= RRDR_RESULT_OPTION_CANCEL;
+
+ for(size_t i = c + 1; i < queries_prepared ; i++) {
+ if(ops[i])
+ query_planer_finalize_remaining_plans(ops[i]);
+ }
+
break;
}
}
@@ -2169,7 +2395,5 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
}
}
- global_statistics_rrdr_query_completed(dimensions_used, r->internal.db_points_read,
- r->internal.result_points_generated, qt->request.query_source);
return r;
}