diff options
Diffstat (limited to '')
-rw-r--r-- | web/api/queries/query.c | 2570 |
1 files changed, 1970 insertions, 600 deletions
diff --git a/web/api/queries/query.c b/web/api/queries/query.c index df7e09799..3770d4770 100644 --- a/web/api/queries/query.c +++ b/web/api/queries/query.c @@ -24,7 +24,8 @@ static struct { const char *name; uint32_t hash; - RRDR_GROUPING value; + RRDR_TIME_GROUPING value; + RRDR_TIME_GROUPING add_flush; // One time initialization for the module. // This is called once, when netdata starts. @@ -59,397 +60,445 @@ static struct { {.name = "average", .hash = 0, .value = RRDR_GROUPING_AVERAGE, + .add_flush = RRDR_GROUPING_AVERAGE, .init = NULL, - .create= grouping_create_average, - .reset = grouping_reset_average, - .free = grouping_free_average, - .add = grouping_add_average, - .flush = grouping_flush_average, + .create= tg_average_create, + .reset = tg_average_reset, + .free = tg_average_free, + .add = tg_average_add, + .flush = tg_average_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, - {.name = "mean", // alias on 'average' + {.name = "avg", // alias on 'average' .hash = 0, .value = RRDR_GROUPING_AVERAGE, + .add_flush = RRDR_GROUPING_AVERAGE, .init = NULL, - .create= grouping_create_average, - .reset = grouping_reset_average, - .free = grouping_free_average, - .add = grouping_add_average, - .flush = grouping_flush_average, + .create= tg_average_create, + .reset = tg_average_reset, + .free = tg_average_free, + .add = tg_average_add, + .flush = tg_average_flush, + .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE + }, + {.name = "mean", // alias on 'average' + .hash = 0, + .value = RRDR_GROUPING_AVERAGE, + .add_flush = RRDR_GROUPING_AVERAGE, + .init = NULL, + .create= tg_average_create, + .reset = tg_average_reset, + .free = tg_average_free, + .add = tg_average_add, + .flush = tg_average_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean1", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEAN1, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean1, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_1, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean2", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEAN2, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean2, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_2, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean3", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEAN3, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean3, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_3, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean5", .hash = 0, - .value = RRDR_GROUPING_TRIMMED_MEAN5, + .value = RRDR_GROUPING_TRIMMED_MEAN, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean5, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_5, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean10", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEAN10, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean10, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_10, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean15", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEAN15, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean15, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_15, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean20", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEAN20, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean20, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_20, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean25", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEAN25, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean25, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_25, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-mean", .hash = 0, - .value = RRDR_GROUPING_TRIMMED_MEAN5, + .value = RRDR_GROUPING_TRIMMED_MEAN, + .add_flush = RRDR_GROUPING_TRIMMED_MEAN, .init = NULL, - .create= grouping_create_trimmed_mean5, - .reset = grouping_reset_trimmed_mean, - .free = grouping_free_trimmed_mean, - .add = grouping_add_trimmed_mean, - .flush = grouping_flush_trimmed_mean, + .create= tg_trimmed_mean_create_5, + .reset = tg_trimmed_mean_reset, + .free = tg_trimmed_mean_free, + .add = tg_trimmed_mean_add, + .flush = tg_trimmed_mean_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "incremental_sum", .hash = 0, .value = RRDR_GROUPING_INCREMENTAL_SUM, + .add_flush = RRDR_GROUPING_INCREMENTAL_SUM, .init = NULL, - .create= grouping_create_incremental_sum, - .reset = grouping_reset_incremental_sum, - .free = grouping_free_incremental_sum, - .add = grouping_add_incremental_sum, - .flush = grouping_flush_incremental_sum, + .create= tg_incremental_sum_create, + .reset = tg_incremental_sum_reset, + .free = tg_incremental_sum_free, + .add = tg_incremental_sum_add, + .flush = tg_incremental_sum_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "incremental-sum", .hash = 0, .value = RRDR_GROUPING_INCREMENTAL_SUM, + .add_flush = RRDR_GROUPING_INCREMENTAL_SUM, .init = NULL, - .create= grouping_create_incremental_sum, - .reset = grouping_reset_incremental_sum, - .free = grouping_free_incremental_sum, - .add = grouping_add_incremental_sum, - .flush = grouping_flush_incremental_sum, + .create= tg_incremental_sum_create, + .reset = tg_incremental_sum_reset, + .free = tg_incremental_sum_free, + .add = tg_incremental_sum_add, + .flush = tg_incremental_sum_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "median", .hash = 0, .value = RRDR_GROUPING_MEDIAN, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_median, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median1", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN1, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median1, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_1, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median2", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN2, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median2, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_2, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median3", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN3, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median3, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_3, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median5", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN5, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median5, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_5, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median10", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN10, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median10, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_10, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median15", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN15, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median15, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_15, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median20", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN20, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median20, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_20, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median25", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN25, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median25, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_25, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "trimmed-median", .hash = 0, .value = RRDR_GROUPING_TRIMMED_MEDIAN5, + .add_flush = RRDR_GROUPING_MEDIAN, .init = NULL, - .create= grouping_create_trimmed_median5, - .reset = grouping_reset_median, - .free = grouping_free_median, - .add = grouping_add_median, - .flush = grouping_flush_median, + .create= tg_median_create_trimmed_5, + .reset = tg_median_reset, + .free = tg_median_free, + .add = tg_median_add, + .flush = tg_median_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile25", .hash = 0, .value = RRDR_GROUPING_PERCENTILE25, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile25, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_25, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile50", .hash = 0, .value = RRDR_GROUPING_PERCENTILE50, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile50, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_50, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile75", .hash = 0, .value = RRDR_GROUPING_PERCENTILE75, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile75, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_75, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile80", .hash = 0, .value = RRDR_GROUPING_PERCENTILE80, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile80, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_80, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile90", .hash = 0, .value = RRDR_GROUPING_PERCENTILE90, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile90, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_90, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile95", .hash = 0, - .value = RRDR_GROUPING_PERCENTILE95, + .value = RRDR_GROUPING_PERCENTILE, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile95, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_95, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile97", .hash = 0, .value = RRDR_GROUPING_PERCENTILE97, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile97, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_97, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile98", .hash = 0, .value = RRDR_GROUPING_PERCENTILE98, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile98, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_98, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile99", .hash = 0, .value = RRDR_GROUPING_PERCENTILE99, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile99, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_99, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "percentile", .hash = 0, - .value = RRDR_GROUPING_PERCENTILE95, + .value = RRDR_GROUPING_PERCENTILE, + .add_flush = RRDR_GROUPING_PERCENTILE, .init = NULL, - .create= grouping_create_percentile95, - .reset = grouping_reset_percentile, - .free = grouping_free_percentile, - .add = grouping_add_percentile, - .flush = grouping_flush_percentile, + .create= tg_percentile_create_95, + .reset = tg_percentile_reset, + .free = tg_percentile_free, + .add = tg_percentile_add, + .flush = tg_percentile_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "min", .hash = 0, .value = RRDR_GROUPING_MIN, + .add_flush = RRDR_GROUPING_MIN, .init = NULL, - .create= grouping_create_min, - .reset = grouping_reset_min, - .free = grouping_free_min, - .add = grouping_add_min, - .flush = grouping_flush_min, + .create= tg_min_create, + .reset = tg_min_reset, + .free = tg_min_free, + .add = tg_min_add, + .flush = tg_min_flush, .tier_query_fetch = TIER_QUERY_FETCH_MIN }, {.name = "max", .hash = 0, .value = RRDR_GROUPING_MAX, + .add_flush = RRDR_GROUPING_MAX, .init = NULL, - .create= grouping_create_max, - .reset = grouping_reset_max, - .free = grouping_free_max, - .add = grouping_add_max, - .flush = grouping_flush_max, + .create= tg_max_create, + .reset = tg_max_reset, + .free = tg_max_free, + .add = tg_max_add, + .flush = tg_max_flush, .tier_query_fetch = TIER_QUERY_FETCH_MAX }, {.name = "sum", .hash = 0, .value = RRDR_GROUPING_SUM, + .add_flush = RRDR_GROUPING_SUM, .init = NULL, - .create= grouping_create_sum, - .reset = grouping_reset_sum, - .free = grouping_free_sum, - .add = grouping_add_sum, - .flush = grouping_flush_sum, + .create= tg_sum_create, + .reset = tg_sum_reset, + .free = tg_sum_free, + .add = tg_sum_add, + .flush = tg_sum_flush, .tier_query_fetch = TIER_QUERY_FETCH_SUM }, @@ -457,97 +506,75 @@ static struct { {.name = "stddev", .hash = 0, .value = RRDR_GROUPING_STDDEV, + .add_flush = RRDR_GROUPING_STDDEV, .init = NULL, - .create= grouping_create_stddev, - .reset = grouping_reset_stddev, - .free = grouping_free_stddev, - .add = grouping_add_stddev, - .flush = grouping_flush_stddev, + .create= tg_stddev_create, + .reset = tg_stddev_reset, + .free = tg_stddev_free, + .add = tg_stddev_add, + .flush = tg_stddev_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "cv", // coefficient variation is calculated by stddev .hash = 0, .value = RRDR_GROUPING_CV, + .add_flush = RRDR_GROUPING_CV, .init = NULL, - .create= grouping_create_stddev, // not an error, stddev calculates this too - .reset = grouping_reset_stddev, // not an error, stddev calculates this too - .free = grouping_free_stddev, // not an error, stddev calculates this too - .add = grouping_add_stddev, // not an error, stddev calculates this too - .flush = grouping_flush_coefficient_of_variation, + .create= tg_stddev_create, // not an error, stddev calculates this too + .reset = tg_stddev_reset, // not an error, stddev calculates this too + .free = tg_stddev_free, // not an error, stddev calculates this too + .add = tg_stddev_add, // not an error, stddev calculates this too + .flush = tg_stddev_coefficient_of_variation_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "rsd", // alias of 'cv' .hash = 0, .value = RRDR_GROUPING_CV, + .add_flush = RRDR_GROUPING_CV, .init = NULL, - .create= grouping_create_stddev, // not an error, stddev calculates this too - .reset = grouping_reset_stddev, // not an error, stddev calculates this too - .free = grouping_free_stddev, // not an error, stddev calculates this too - .add = grouping_add_stddev, // not an error, stddev calculates this too - .flush = grouping_flush_coefficient_of_variation, - .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE - }, - - /* - {.name = "mean", // same as average, no need to define it again - .hash = 0, - .value = RRDR_GROUPING_MEAN, - .setup = NULL, - .create= grouping_create_stddev, - .reset = grouping_reset_stddev, - .free = grouping_free_stddev, - .add = grouping_add_stddev, - .flush = grouping_flush_mean, + .create= tg_stddev_create, // not an error, stddev calculates this too + .reset = tg_stddev_reset, // not an error, stddev calculates this too + .free = tg_stddev_free, // not an error, stddev calculates this too + .add = tg_stddev_add, // not an error, stddev calculates this too + .flush = tg_stddev_coefficient_of_variation_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, - */ - - /* - {.name = "variance", // meaningless to offer - .hash = 0, - .value = RRDR_GROUPING_VARIANCE, - .setup = NULL, - .create= grouping_create_stddev, - .reset = grouping_reset_stddev, - .free = grouping_free_stddev, - .add = grouping_add_stddev, - .flush = grouping_flush_variance, - .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE - }, - */ // single exponential smoothing {.name = "ses", .hash = 0, .value = RRDR_GROUPING_SES, - .init = grouping_init_ses, - .create= grouping_create_ses, - .reset = grouping_reset_ses, - .free = grouping_free_ses, - .add = grouping_add_ses, - .flush = grouping_flush_ses, + .add_flush = RRDR_GROUPING_SES, + .init = tg_ses_init, + .create= tg_ses_create, + .reset = tg_ses_reset, + .free = tg_ses_free, + .add = tg_ses_add, + .flush = tg_ses_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "ema", // alias for 'ses' .hash = 0, .value = RRDR_GROUPING_SES, + .add_flush = RRDR_GROUPING_SES, .init = NULL, - .create= grouping_create_ses, - .reset = grouping_reset_ses, - .free = grouping_free_ses, - .add = grouping_add_ses, - .flush = grouping_flush_ses, + .create= tg_ses_create, + .reset = tg_ses_reset, + .free = tg_ses_free, + .add = tg_ses_add, + .flush = tg_ses_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "ewma", // alias for ses .hash = 0, .value = RRDR_GROUPING_SES, + .add_flush = RRDR_GROUPING_SES, .init = NULL, - .create= grouping_create_ses, - .reset = grouping_reset_ses, - .free = grouping_free_ses, - .add = grouping_add_ses, - .flush = grouping_flush_ses, + .create= tg_ses_create, + .reset = tg_ses_reset, + .free = tg_ses_free, + .add = tg_ses_add, + .flush = tg_ses_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, @@ -555,24 +582,26 @@ static struct { {.name = "des", .hash = 0, .value = RRDR_GROUPING_DES, - .init = grouping_init_des, - .create= grouping_create_des, - .reset = grouping_reset_des, - .free = grouping_free_des, - .add = grouping_add_des, - .flush = grouping_flush_des, + .add_flush = RRDR_GROUPING_DES, + .init = tg_des_init, + .create= tg_des_create, + .reset = tg_des_reset, + .free = tg_des_free, + .add = tg_des_add, + .flush = tg_des_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, {.name = "countif", .hash = 0, .value = RRDR_GROUPING_COUNTIF, + .add_flush = RRDR_GROUPING_COUNTIF, .init = NULL, - .create= grouping_create_countif, - .reset = grouping_reset_countif, - .free = grouping_free_countif, - .add = grouping_add_countif, - .flush = grouping_flush_countif, + .create= tg_countif_create, + .reset = tg_countif_reset, + .free = tg_countif_free, + .add = tg_countif_add, + .flush = tg_countif_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE }, @@ -580,17 +609,18 @@ static struct { {.name = NULL, .hash = 0, .value = RRDR_GROUPING_UNDEFINED, + .add_flush = RRDR_GROUPING_AVERAGE, .init = NULL, - .create= grouping_create_average, - .reset = grouping_reset_average, - .free = grouping_free_average, - .add = grouping_add_average, - .flush = grouping_flush_average, + .create= tg_average_create, + .reset = tg_average_reset, + .free = tg_average_free, + .add = tg_average_add, + .flush = tg_average_flush, .tier_query_fetch = TIER_QUERY_FETCH_AVERAGE } }; -void web_client_api_v1_init_grouping(void) { +void time_grouping_init(void) { int i; for(i = 0; api_v1_data_groups[i].name ; i++) { @@ -601,7 +631,7 @@ void web_client_api_v1_init_grouping(void) { } } -const char *group_method2string(RRDR_GROUPING group) { +const char *time_grouping_method2string(RRDR_TIME_GROUPING group) { int i; for(i = 0; api_v1_data_groups[i].name ; i++) { @@ -613,7 +643,7 @@ const char *group_method2string(RRDR_GROUPING group) { return "unknown-group-method"; } -RRDR_GROUPING web_client_api_request_v1_data_group(const char *name, RRDR_GROUPING def) { +RRDR_TIME_GROUPING time_grouping_parse(const char *name, RRDR_TIME_GROUPING def) { int i; uint32_t hash = simple_hash(name); @@ -624,7 +654,7 @@ RRDR_GROUPING web_client_api_request_v1_data_group(const char *name, RRDR_GROUPI return def; } -const char *web_client_api_request_v1_data_group_to_string(RRDR_GROUPING group) { +const char *time_grouping_tostring(RRDR_TIME_GROUPING group) { int i; for(i = 0; api_v1_data_groups[i].name ; i++) @@ -634,28 +664,242 @@ const char *web_client_api_request_v1_data_group_to_string(RRDR_GROUPING group) return "unknown"; } -static void rrdr_set_grouping_function(RRDR *r, RRDR_GROUPING group_method) { +static void rrdr_set_grouping_function(RRDR *r, RRDR_TIME_GROUPING group_method) { int i, found = 0; for(i = 0; !found && api_v1_data_groups[i].name ;i++) { if(api_v1_data_groups[i].value == group_method) { - r->internal.grouping_create = api_v1_data_groups[i].create; - r->internal.grouping_reset = api_v1_data_groups[i].reset; - r->internal.grouping_free = api_v1_data_groups[i].free; - r->internal.grouping_add = api_v1_data_groups[i].add; - r->internal.grouping_flush = api_v1_data_groups[i].flush; - r->internal.tier_query_fetch = api_v1_data_groups[i].tier_query_fetch; + r->time_grouping.create = api_v1_data_groups[i].create; + r->time_grouping.reset = api_v1_data_groups[i].reset; + r->time_grouping.free = api_v1_data_groups[i].free; + r->time_grouping.add = api_v1_data_groups[i].add; + r->time_grouping.flush = api_v1_data_groups[i].flush; + r->time_grouping.tier_query_fetch = api_v1_data_groups[i].tier_query_fetch; + r->time_grouping.add_flush = api_v1_data_groups[i].add_flush; found = 1; } } if(!found) { errno = 0; internal_error(true, "QUERY: grouping method %u not found. Using 'average'", (unsigned int)group_method); - r->internal.grouping_create = grouping_create_average; - r->internal.grouping_reset = grouping_reset_average; - r->internal.grouping_free = grouping_free_average; - r->internal.grouping_add = grouping_add_average; - r->internal.grouping_flush = grouping_flush_average; - r->internal.tier_query_fetch = TIER_QUERY_FETCH_AVERAGE; + r->time_grouping.create = tg_average_create; + r->time_grouping.reset = tg_average_reset; + r->time_grouping.free = tg_average_free; + r->time_grouping.add = tg_average_add; + r->time_grouping.flush = tg_average_flush; + r->time_grouping.tier_query_fetch = TIER_QUERY_FETCH_AVERAGE; + r->time_grouping.add_flush = RRDR_GROUPING_AVERAGE; + } +} + +static inline void time_grouping_add(RRDR *r, NETDATA_DOUBLE value, const RRDR_TIME_GROUPING add_flush) { + switch(add_flush) { + case RRDR_GROUPING_AVERAGE: + tg_average_add(r, value); + break; + + case RRDR_GROUPING_MAX: + tg_max_add(r, value); + break; + + case RRDR_GROUPING_MIN: + tg_min_add(r, value); + break; + + case RRDR_GROUPING_MEDIAN: + tg_median_add(r, value); + break; + + case RRDR_GROUPING_STDDEV: + case RRDR_GROUPING_CV: + tg_stddev_add(r, value); + break; + + case RRDR_GROUPING_SUM: + tg_sum_add(r, value); + break; + + case RRDR_GROUPING_COUNTIF: + tg_countif_add(r, value); + break; + + case RRDR_GROUPING_TRIMMED_MEAN: + tg_trimmed_mean_add(r, value); + break; + + case RRDR_GROUPING_PERCENTILE: + tg_percentile_add(r, value); + break; + + case RRDR_GROUPING_SES: + tg_ses_add(r, value); + break; + + case RRDR_GROUPING_DES: + tg_des_add(r, value); + break; + + case RRDR_GROUPING_INCREMENTAL_SUM: + tg_incremental_sum_add(r, value); + break; + + default: + r->time_grouping.add(r, value); + break; + } +} + +static inline NETDATA_DOUBLE time_grouping_flush(RRDR *r, RRDR_VALUE_FLAGS *rrdr_value_options_ptr, const RRDR_TIME_GROUPING add_flush) { + switch(add_flush) { + case RRDR_GROUPING_AVERAGE: + return tg_average_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_MAX: + return tg_max_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_MIN: + return tg_min_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_MEDIAN: + return tg_median_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_STDDEV: + return tg_stddev_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_CV: + return tg_stddev_coefficient_of_variation_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_SUM: + return tg_sum_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_COUNTIF: + return tg_countif_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_TRIMMED_MEAN: + return tg_trimmed_mean_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_PERCENTILE: + return tg_percentile_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_SES: + return tg_ses_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_DES: + return tg_des_flush(r, rrdr_value_options_ptr); + + case RRDR_GROUPING_INCREMENTAL_SUM: + return tg_incremental_sum_flush(r, rrdr_value_options_ptr); + + default: + return r->time_grouping.flush(r, rrdr_value_options_ptr); + } +} + +RRDR_GROUP_BY group_by_parse(char *s) { + RRDR_GROUP_BY group_by = RRDR_GROUP_BY_NONE; + + while(s) { + char *key = strsep_skip_consecutive_separators(&s, ",| "); + if (!key || !*key) continue; + + if (strcmp(key, "selected") == 0) + group_by |= RRDR_GROUP_BY_SELECTED; + + if (strcmp(key, "dimension") == 0) + group_by |= RRDR_GROUP_BY_DIMENSION; + + if (strcmp(key, "instance") == 0) + group_by |= RRDR_GROUP_BY_INSTANCE; + + if (strcmp(key, "percentage-of-instance") == 0) + group_by |= RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE; + + if (strcmp(key, "label") == 0) + group_by |= RRDR_GROUP_BY_LABEL; + + if (strcmp(key, "node") == 0) + group_by |= RRDR_GROUP_BY_NODE; + + if (strcmp(key, "context") == 0) + group_by |= RRDR_GROUP_BY_CONTEXT; + + if (strcmp(key, "units") == 0) + group_by |= RRDR_GROUP_BY_UNITS; + } + + if((group_by & RRDR_GROUP_BY_SELECTED) && (group_by & ~RRDR_GROUP_BY_SELECTED)) { + internal_error(true, "group-by given by query has 'selected' together with more groupings"); + group_by = RRDR_GROUP_BY_SELECTED; // remove all other groupings + } + + if(group_by & RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE) + group_by = RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE; // remove all other groupings + + return group_by; +} + +void buffer_json_group_by_to_array(BUFFER *wb, RRDR_GROUP_BY group_by) { + if(group_by == RRDR_GROUP_BY_NONE) + buffer_json_add_array_item_string(wb, "none"); + else { + if (group_by & RRDR_GROUP_BY_DIMENSION) + buffer_json_add_array_item_string(wb, "dimension"); + + if (group_by & RRDR_GROUP_BY_INSTANCE) + buffer_json_add_array_item_string(wb, "instance"); + + if (group_by & RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE) + buffer_json_add_array_item_string(wb, "percentage-of-instance"); + + if (group_by & RRDR_GROUP_BY_LABEL) + buffer_json_add_array_item_string(wb, "label"); + + if (group_by & RRDR_GROUP_BY_NODE) + buffer_json_add_array_item_string(wb, "node"); + + if (group_by & RRDR_GROUP_BY_CONTEXT) + buffer_json_add_array_item_string(wb, "context"); + + if (group_by & RRDR_GROUP_BY_UNITS) + buffer_json_add_array_item_string(wb, "units"); + + if (group_by & RRDR_GROUP_BY_SELECTED) + buffer_json_add_array_item_string(wb, "selected"); + } +} + +RRDR_GROUP_BY_FUNCTION group_by_aggregate_function_parse(const char *s) { + if(strcmp(s, "average") == 0) + return RRDR_GROUP_BY_FUNCTION_AVERAGE; + + if(strcmp(s, "avg") == 0) + return RRDR_GROUP_BY_FUNCTION_AVERAGE; + + if(strcmp(s, "min") == 0) + return RRDR_GROUP_BY_FUNCTION_MIN; + + if(strcmp(s, "max") == 0) + return RRDR_GROUP_BY_FUNCTION_MAX; + + if(strcmp(s, "sum") == 0) + return RRDR_GROUP_BY_FUNCTION_SUM; + + return RRDR_GROUP_BY_FUNCTION_AVERAGE; +} + +const char *group_by_aggregate_function_to_string(RRDR_GROUP_BY_FUNCTION group_by_function) { + switch(group_by_function) { + default: + case RRDR_GROUP_BY_FUNCTION_AVERAGE: + return "average"; + + case RRDR_GROUP_BY_FUNCTION_MIN: + return "min"; + + case RRDR_GROUP_BY_FUNCTION_MAX: + return "max"; + + case RRDR_GROUP_BY_FUNCTION_SUM: + return "sum"; } } @@ -670,28 +914,20 @@ static inline NETDATA_DOUBLE *UNUSED_FUNCTION(rrdr_line_values)(RRDR *r, long rr return &r->v[ rrdr_line * r->d ]; } -static inline long rrdr_line_init(RRDR *r, time_t t, long rrdr_line) { +static inline long rrdr_line_init(RRDR *r __maybe_unused, time_t t __maybe_unused, long rrdr_line) { rrdr_line++; - internal_error(rrdr_line >= (long)r->n, + internal_fatal(rrdr_line >= (long)r->n, "QUERY: requested to step above RRDR size for query '%s'", r->internal.qt->id); - internal_error(r->t[rrdr_line] != 0 && r->t[rrdr_line] != t, - "QUERY: overwriting the timestamp of RRDR line %zu from %zu to %zu, of query '%s'", - (size_t)rrdr_line, (size_t)r->t[rrdr_line], (size_t)t, r->internal.qt->id); - - // save the time - r->t[rrdr_line] = t; + internal_fatal(r->t[rrdr_line] != t, + "QUERY: wrong timestamp at RRDR line %ld, expected %ld, got %ld, of query '%s'", + rrdr_line, r->t[rrdr_line], t, r->internal.qt->id); return rrdr_line; } -static inline void rrdr_done(RRDR *r, long rrdr_line) { - r->rows = rrdr_line + 1; -} - - // ---------------------------------------------------------------------------- // tier management @@ -822,7 +1058,7 @@ static size_t rrddim_find_best_tier_for_timeframe(QUERY_TARGET *qt, time_t after // find the db time-range for this tier for all metrics for(size_t i = 0, used = qt->query.used; i < used ; i++) { - QUERY_METRIC *qm = &qt->query.array[i]; + QUERY_METRIC *qm = query_metric(qt, i); time_t first_time_s = qm->tiers[tier].db_first_time_s; time_t last_time_s = qm->tiers[tier].db_last_time_s; @@ -872,7 +1108,7 @@ static time_t rrdset_find_natural_update_every_for_timeframe(QUERY_TARGET *qt, t // find the db minimum update every for this tier for all metrics time_t common_update_every_s = default_rrd_update_every; for(size_t i = 0, used = qt->query.used; i < used ; i++) { - QUERY_METRIC *qm = &qt->query.array[i]; + QUERY_METRIC *qm = query_metric(qt, i); time_t update_every_s = qm->tiers[best_tier].db_update_every_s; @@ -889,24 +1125,20 @@ static time_t rrdset_find_natural_update_every_for_timeframe(QUERY_TARGET *qt, t // query ops typedef struct query_point { - time_t end_time; - time_t start_time; + STORAGE_POINT sp; NETDATA_DOUBLE value; - NETDATA_DOUBLE anomaly; - SN_FLAGS flags; + bool added; #ifdef NETDATA_INTERNAL_CHECKS size_t id; #endif } QUERY_POINT; QUERY_POINT QUERY_POINT_EMPTY = { - .end_time = 0, - .start_time = 0, - .value = NAN, - .anomaly = 0, - .flags = SN_FLAG_NONE, + .sp = STORAGE_POINT_UNSET, + .value = NAN, + .added = false, #ifdef NETDATA_INTERNAL_CHECKS - .id = 0, + .id = 0, #endif }; @@ -934,21 +1166,27 @@ typedef struct query_engine_ops { size_t tier; struct query_metric_tier *tier_ptr; struct storage_engine_query_handle *handle; - STORAGE_POINT (*next_metric)(struct storage_engine_query_handle *handle); - int (*is_finished)(struct storage_engine_query_handle *handle); - void (*finalize)(struct storage_engine_query_handle *handle); // aggregating points over time - void (*grouping_add)(struct rrdresult *r, NETDATA_DOUBLE value); - NETDATA_DOUBLE (*grouping_flush)(struct rrdresult *r, RRDR_VALUE_FLAGS *rrdr_value_options_ptr); size_t group_points_non_zero; size_t group_points_added; - NETDATA_DOUBLE group_anomaly_rate; + STORAGE_POINT group_point; // aggregates min, max, sum, count, anomaly count for each group point + STORAGE_POINT query_point; // aggregates min, max, sum, count, anomaly count across the whole query RRDR_VALUE_FLAGS group_value_flags; // statistics size_t db_total_points_read; size_t db_points_read_per_tier[RRD_STORAGE_TIERS]; + + struct { + time_t expanded_after; + time_t expanded_before; + struct storage_engine_query_handle handle; + bool initialized; + bool finalized; + } plans[QUERY_PLANS_MAX]; + + struct query_engine_ops *next; } QUERY_ENGINE_OPS; @@ -1005,40 +1243,28 @@ static void query_planer_initialize_plans(QUERY_ENGINE_OPS *ops) { time_t after = qm->plan.array[p].after - (time_t)(update_every * points_to_add_to_after); time_t before = qm->plan.array[p].before + (time_t)(update_every * points_to_add_to_before); - qm->plan.array[p].expanded_after = after; - qm->plan.array[p].expanded_before = before; + ops->plans[p].expanded_after = after; + ops->plans[p].expanded_before = before; + + ops->r->internal.qt->db.tiers[tier].queries++; struct query_metric_tier *tier_ptr = &qm->tiers[tier]; - tier_ptr->eng->api.query_ops.init( - tier_ptr->db_metric_handle, - &qm->plan.array[p].handle, - after, before, - ops->r->internal.qt->request.priority); - - qm->plan.array[p].next_metric = tier_ptr->eng->api.query_ops.next_metric; - qm->plan.array[p].is_finished = tier_ptr->eng->api.query_ops.is_finished; - qm->plan.array[p].finalize = tier_ptr->eng->api.query_ops.finalize; - qm->plan.array[p].initialized = true; - qm->plan.array[p].finalized = false; + STORAGE_ENGINE *eng = query_metric_storage_engine(ops->r->internal.qt, qm, tier); + storage_engine_query_init(eng->backend, tier_ptr->db_metric_handle, &ops->plans[p].handle, + after, before, ops->r->internal.qt->request.priority); + + ops->plans[p].initialized = true; + ops->plans[p].finalized = false; } } static void query_planer_finalize_plan(QUERY_ENGINE_OPS *ops, size_t plan_id) { - QUERY_METRIC *qm = ops->qm; - - if(qm->plan.array[plan_id].initialized && !qm->plan.array[plan_id].finalized) { - qm->plan.array[plan_id].finalize(&qm->plan.array[plan_id].handle); - qm->plan.array[plan_id].initialized = false; - qm->plan.array[plan_id].finalized = true; - qm->plan.array[plan_id].next_metric = NULL; - qm->plan.array[plan_id].is_finished = NULL; - qm->plan.array[plan_id].finalize = NULL; + // QUERY_METRIC *qm = ops->qm; - if(ops->current_plan == plan_id) { - ops->next_metric = NULL; - ops->is_finished = NULL; - ops->finalize = NULL; - } + if(ops->plans[plan_id].initialized && !ops->plans[plan_id].finalized) { + storage_engine_query_finalize(&ops->plans[plan_id].handle); + ops->plans[plan_id].initialized = false; + ops->plans[plan_id].finalized = true; } } @@ -1053,17 +1279,14 @@ static void query_planer_activate_plan(QUERY_ENGINE_OPS *ops, size_t plan_id, ti QUERY_METRIC *qm = ops->qm; internal_fatal(plan_id >= qm->plan.used, "QUERY: invalid plan_id given"); - internal_fatal(!qm->plan.array[plan_id].initialized, "QUERY: plan has not been initialized"); - internal_fatal(qm->plan.array[plan_id].finalized, "QUERY: plan has been finalized"); + internal_fatal(!ops->plans[plan_id].initialized, "QUERY: plan has not been initialized"); + internal_fatal(ops->plans[plan_id].finalized, "QUERY: plan has been finalized"); internal_fatal(qm->plan.array[plan_id].after > qm->plan.array[plan_id].before, "QUERY: flipped after/before"); ops->tier = qm->plan.array[plan_id].tier; ops->tier_ptr = &qm->tiers[ops->tier]; - ops->handle = &qm->plan.array[plan_id].handle; - ops->next_metric = qm->plan.array[plan_id].next_metric; - ops->is_finished = qm->plan.array[plan_id].is_finished; - ops->finalize = qm->plan.array[plan_id].finalize; + ops->handle = &ops->plans[plan_id].handle; ops->current_plan = plan_id; if(plan_id + 1 < qm->plan.used && qm->plan.array[plan_id + 1].after < qm->plan.array[plan_id].before) @@ -1071,8 +1294,8 @@ static void query_planer_activate_plan(QUERY_ENGINE_OPS *ops, size_t plan_id, ti else ops->current_plan_expire_time = qm->plan.array[plan_id].before; - ops->plan_expanded_after = qm->plan.array[plan_id].expanded_after; - ops->plan_expanded_before = qm->plan.array[plan_id].expanded_before; + ops->plan_expanded_after = ops->plans[plan_id].expanded_after; + ops->plan_expanded_before = ops->plans[plan_id].expanded_before; } static bool query_planer_next_plan(QUERY_ENGINE_OPS *ops, time_t now, time_t last_point_end_time) { @@ -1117,18 +1340,17 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before // put our selected tier as the first plan size_t selected_tier; + bool switch_tiers = true; - if(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER + if((ops->r->internal.qt->window.options & RRDR_OPTION_SELECTED_TIER) && ops->r->internal.qt->window.tier < storage_tiers && query_metric_is_valid_tier(qm, ops->r->internal.qt->window.tier)) { selected_tier = ops->r->internal.qt->window.tier; + switch_tiers = false; } else { selected_tier = query_metric_best_tier_for_timeframe(qm, after_wanted, before_wanted, points_wanted); - if(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER) - ops->r->internal.query_options &= ~RRDR_OPTION_SELECTED_TIER; - if(!query_metric_is_valid_tier(qm, selected_tier)) return false; @@ -1142,7 +1364,7 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before qm->plan.array[0].after = (qm->tiers[selected_tier].db_first_time_s < after_wanted) ? after_wanted : qm->tiers[selected_tier].db_first_time_s; qm->plan.array[0].before = (qm->tiers[selected_tier].db_last_time_s > before_wanted) ? before_wanted : qm->tiers[selected_tier].db_last_time_s; - if(!(ops->r->internal.query_options & RRDR_OPTION_SELECTED_TIER)) { + if(switch_tiers) { // the selected tier time_t selected_tier_first_time_s = qm->plan.array[0].after; time_t selected_tier_last_time_s = qm->plan.array[0].before; @@ -1150,7 +1372,7 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before // check if our selected tier can start the query if (selected_tier_first_time_s > after_wanted) { // we need some help from other tiers - for (size_t tr = (int)selected_tier + 1; tr < storage_tiers; tr++) { + for (size_t tr = (int)selected_tier + 1; tr < storage_tiers && qm->plan.used < QUERY_PLANS_MAX ; tr++) { if(!query_metric_is_valid_tier(qm, tr)) continue; @@ -1164,9 +1386,9 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before .tier = tr, .after = (tier_first_time_s < after_wanted) ? after_wanted : tier_first_time_s, .before = selected_tier_first_time_s, - .initialized = false, - .finalized = false, }; + ops->plans[qm->plan.used].initialized = false; + ops->plans[qm->plan.used].finalized = false; qm->plan.array[qm->plan.used++] = t; internal_fatal(!t.after || !t.before, "QUERY: invalid plan selected"); @@ -1183,7 +1405,7 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before // check if our selected tier can finish the query if (selected_tier_last_time_s < before_wanted) { // we need some help from other tiers - for (int tr = (int)selected_tier - 1; tr >= 0; tr--) { + for (int tr = (int)selected_tier - 1; tr >= 0 && qm->plan.used < QUERY_PLANS_MAX ; tr--) { if(!query_metric_is_valid_tier(qm, tr)) continue; @@ -1199,9 +1421,9 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before .tier = tr, .after = selected_tier_last_time_s, .before = (tier_last_time_s > before_wanted) ? before_wanted : tier_last_time_s, - .initialized = false, - .finalized = false, }; + ops->plans[qm->plan.used].initialized = false; + ops->plans[qm->plan.used].finalized = false; qm->plan.array[qm->plan.used++] = t; // prepare for the tier @@ -1244,60 +1466,102 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before #define query_interpolate_point(this_point, last_point, now) do { \ if(likely( \ /* the point to interpolate is more than 1s wide */ \ - (this_point).end_time - (this_point).start_time > 1 \ + (this_point).sp.end_time_s - (this_point).sp.start_time_s > 1 \ \ /* the two points are exactly next to each other */ \ - && (last_point).end_time == (this_point).start_time \ + && (last_point).sp.end_time_s == (this_point).sp.start_time_s \ \ /* both points are valid numbers */ \ && netdata_double_isnumber((this_point).value) \ && netdata_double_isnumber((last_point).value) \ \ )) { \ - (this_point).value = (last_point).value + ((this_point).value - (last_point).value) * (1.0 - (NETDATA_DOUBLE)((this_point).end_time - (now)) / (NETDATA_DOUBLE)((this_point).end_time - (this_point).start_time)); \ - (this_point).end_time = now; \ + (this_point).value = (last_point).value + ((this_point).value - (last_point).value) * (1.0 - (NETDATA_DOUBLE)((this_point).sp.end_time_s - (now)) / (NETDATA_DOUBLE)((this_point).sp.end_time_s - (this_point).sp.start_time_s)); \ + (this_point).sp.end_time_s = now; \ } \ } while(0) -#define query_add_point_to_group(r, point, ops) do { \ +#define query_add_point_to_group(r, point, ops, add_flush) do { \ if(likely(netdata_double_isnumber((point).value))) { \ if(likely(fpclassify((point).value) != FP_ZERO)) \ (ops)->group_points_non_zero++; \ \ - if(unlikely((point).flags & SN_FLAG_RESET)) \ + if(unlikely((point).sp.flags & SN_FLAG_RESET)) \ (ops)->group_value_flags |= RRDR_VALUE_RESET; \ \ - (ops)->grouping_add(r, (point).value); \ + time_grouping_add(r, (point).value, add_flush); \ + \ + storage_point_merge_to((ops)->group_point, (point).sp); \ + if(!(point).added) \ + storage_point_merge_to((ops)->query_point, (point).sp); \ } \ \ (ops)->group_points_added++; \ - (ops)->group_anomaly_rate += (point).anomaly; \ } while(0) -static QUERY_ENGINE_OPS *rrd2rrdr_query_prep(RRDR *r, size_t dim_id_in_rrdr) { +static __thread QUERY_ENGINE_OPS *released_ops = NULL; + +static void rrd2rrdr_query_ops_freeall(RRDR *r __maybe_unused) { + while(released_ops) { + QUERY_ENGINE_OPS *ops = released_ops; + released_ops = ops->next; + + onewayalloc_freez(r->internal.owa, ops); + } +} + +static void rrd2rrdr_query_ops_release(QUERY_ENGINE_OPS *ops) { + if(!ops) return; + + ops->next = released_ops; + released_ops = ops; +} + +static QUERY_ENGINE_OPS *rrd2rrdr_query_ops_get(RRDR *r) { + QUERY_ENGINE_OPS *ops; + if(released_ops) { + ops = released_ops; + released_ops = ops->next; + } + else { + ops = onewayalloc_mallocz(r->internal.owa, sizeof(QUERY_ENGINE_OPS)); + } + + memset(ops, 0, sizeof(*ops)); + return ops; +} + +static QUERY_ENGINE_OPS *rrd2rrdr_query_ops_prep(RRDR *r, size_t query_metric_id) { QUERY_TARGET *qt = r->internal.qt; - QUERY_ENGINE_OPS *ops = onewayalloc_mallocz(r->internal.owa, sizeof(QUERY_ENGINE_OPS)); + QUERY_ENGINE_OPS *ops = rrd2rrdr_query_ops_get(r); *ops = (QUERY_ENGINE_OPS) { .r = r, - .qm = &qt->query.array[dim_id_in_rrdr], - .grouping_add = r->internal.grouping_add, - .grouping_flush = r->internal.grouping_flush, - .tier_query_fetch = r->internal.tier_query_fetch, - .view_update_every = r->update_every, - .query_granularity = (time_t)(r->update_every / r->group), + .qm = query_metric(qt, query_metric_id), + .tier_query_fetch = r->time_grouping.tier_query_fetch, + .view_update_every = r->view.update_every, + .query_granularity = (time_t)(r->view.update_every / r->view.group), .group_value_flags = RRDR_VALUE_NOTHING, }; - if(!query_plan(ops, qt->window.after, qt->window.before, qt->window.points)) + if(!query_plan(ops, qt->window.after, qt->window.before, qt->window.points)) { + rrd2rrdr_query_ops_release(ops); return NULL; + } return ops; } static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_OPS *ops) { QUERY_TARGET *qt = r->internal.qt; - QUERY_METRIC *qm = &qt->query.array[dim_id_in_rrdr]; (void)qm; + QUERY_METRIC *qm = ops->qm; + + const RRDR_TIME_GROUPING add_flush = r->time_grouping.add_flush; + + ops->group_point = STORAGE_POINT_UNSET; + ops->query_point = STORAGE_POINT_UNSET; + + RRDR_OPTIONS options = qt->window.options; size_t points_wanted = qt->window.points; time_t after_wanted = qt->window.after; time_t before_wanted = qt->window.before; (void)before_wanted; @@ -1306,15 +1570,12 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ // if(strcmp("user", string2str(rd->id)) == 0 && strcmp("system.cpu", string2str(rd->rrdset->id)) == 0) // debug_this = true; - time_t max_date = 0, - min_date = 0; - size_t points_added = 0; long rrdr_line = -1; - bool use_anomaly_bit_as_value = (r->internal.query_options & RRDR_OPTION_ANOMALY_BIT) ? true : false; + bool use_anomaly_bit_as_value = (r->internal.qt->window.options & RRDR_OPTION_ANOMALY_BIT) ? true : false; - NETDATA_DOUBLE min = r->min, max = r->max; + NETDATA_DOUBLE min = r->view.min, max = r->view.max; QUERY_POINT last2_point = QUERY_POINT_EMPTY; QUERY_POINT last1_point = QUERY_POINT_EMPTY; @@ -1329,12 +1590,14 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ time_t now_end_time = after_wanted + ops->view_update_every - ops->query_granularity; size_t db_points_read_since_plan_switch = 0; (void)db_points_read_since_plan_switch; + size_t query_is_finished_counter = 0; // The main loop, based on the query granularity we need - for( ; points_added < points_wanted ; now_start_time = now_end_time, now_end_time += ops->view_update_every) { + for( ; points_added < points_wanted && query_is_finished_counter <= 10 ; + now_start_time = now_end_time, now_end_time += ops->view_update_every) { if(unlikely(query_plan_should_switch_plan(ops, now_end_time))) { - query_planer_next_plan(ops, now_end_time, new_point.end_time); + query_planer_next_plan(ops, now_end_time, new_point.sp.end_time_s); db_points_read_since_plan_switch = 0; } @@ -1347,26 +1610,35 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ last1_point = new_point; } - if(unlikely(ops->is_finished(ops->handle))) { + if(unlikely(storage_engine_query_is_finished(ops->handle))) { + query_is_finished_counter++; + if(count_same_end_time != 0) { last2_point = last1_point; last1_point = new_point; } new_point = QUERY_POINT_EMPTY; - new_point.start_time = last1_point.end_time; - new_point.end_time = now_end_time; + new_point.sp.start_time_s = last1_point.sp.end_time_s; + new_point.sp.end_time_s = now_end_time; // // if(debug_this) info("QUERY: is finished() returned true"); // break; } + else + query_is_finished_counter = 0; // fetch the new point { STORAGE_POINT sp; if(likely(storage_point_is_unset(next1_point))) { db_points_read_since_plan_switch++; - sp = ops->next_metric(ops->handle); + sp = storage_engine_query_next_metric(ops->handle); + ops->db_points_read_per_tier[ops->tier]++; + ops->db_total_points_read++; + + if(unlikely(options & RRDR_OPTION_ABSOLUTE)) + storage_point_make_positive(sp); } else { // ONE POINT READ-AHEAD @@ -1377,7 +1649,7 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ // ONE POINT READ-AHEAD if(unlikely(query_plan_should_switch_plan(ops, sp.end_time_s) && - query_planer_next_plan(ops, now_end_time, new_point.end_time))) { + query_planer_next_plan(ops, now_end_time, new_point.sp.end_time_s))) { // The end time of the current point, crosses our plans (tiers) // so, we switched plan (tier) @@ -1387,7 +1659,12 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ // A. the entire point of the previous plan is to the future of point from the next plan // B. part of the point of the previous plan overlaps with the point from the next plan - STORAGE_POINT sp2 = ops->next_metric(ops->handle); + STORAGE_POINT sp2 = storage_engine_query_next_metric(ops->handle); + ops->db_points_read_per_tier[ops->tier]++; + ops->db_total_points_read++; + + if(unlikely(options & RRDR_OPTION_ABSOLUTE)) + storage_point_make_positive(sp); if(sp.start_time_s > sp2.start_time_s) // the point from the previous plan is useless @@ -1399,12 +1676,8 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ next1_point = sp2; } - ops->db_points_read_per_tier[ops->tier]++; - ops->db_total_points_read++; - - new_point.start_time = sp.start_time_s; - new_point.end_time = sp.end_time_s; - new_point.anomaly = sp.count ? (NETDATA_DOUBLE)sp.anomaly_count * 100.0 / (NETDATA_DOUBLE)sp.count : 0.0; + new_point.sp = sp; + new_point.added = false; query_point_set_id(new_point, ops->db_total_points_read); // if(debug_this) @@ -1415,13 +1688,13 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ if(likely(!storage_point_is_unset(sp) && !storage_point_is_gap(sp))) { if(unlikely(use_anomaly_bit_as_value)) - new_point.value = new_point.anomaly; + new_point.value = storage_point_anomaly_rate(new_point.sp); else { switch (ops->tier_query_fetch) { default: case TIER_QUERY_FETCH_AVERAGE: - new_point.value = sp.sum / sp.count; + new_point.value = sp.sum / (NETDATA_DOUBLE)sp.count; break; case TIER_QUERY_FETCH_MIN: @@ -1438,36 +1711,34 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ }; } } - else { + else new_point.value = NAN; - new_point.flags = SN_FLAG_NONE; - } } // check if the db is giving us zero duration points if(unlikely(db_points_read_since_plan_switch > 1 && - new_point.start_time == new_point.end_time)) { + new_point.sp.start_time_s == new_point.sp.end_time_s)) { internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned " "point %zu from %ld to %ld, that are both equal", - qt->id, string2str(qm->dimension.id), - new_point.id, new_point.start_time, new_point.end_time); + qt->id, query_metric_id(qt, qm), + new_point.id, new_point.sp.start_time_s, new_point.sp.end_time_s); - new_point.start_time = new_point.end_time - ops->tier_ptr->db_update_every_s; + new_point.sp.start_time_s = new_point.sp.end_time_s - ops->tier_ptr->db_update_every_s; } // check if the db is advancing the query if(unlikely(db_points_read_since_plan_switch > 1 && - new_point.end_time <= last1_point.end_time)) { + new_point.sp.end_time_s <= last1_point.sp.end_time_s)) { internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned " "point %zu from %ld to %ld, before the " "last point %zu from %ld to %ld, " "now is %ld to %ld", - qt->id, string2str(qm->dimension.id), - new_point.id, new_point.start_time, new_point.end_time, - last1_point.id, last1_point.start_time, last1_point.end_time, + qt->id, query_metric_id(qt, qm), + new_point.id, new_point.sp.start_time_s, new_point.sp.end_time_s, + last1_point.id, last1_point.sp.start_time_s, last1_point.sp.end_time_s, now_start_time, now_end_time); count_same_end_time++; @@ -1476,13 +1747,14 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ count_same_end_time = 0; // decide how to use this point - if(likely(new_point.end_time < now_end_time)) { // likely to favor tier0 + if(likely(new_point.sp.end_time_s < now_end_time)) { // likely to favor tier0 // this db point ends before our now_end_time - if(likely(new_point.end_time >= now_start_time)) { // likely to favor tier0 + if(likely(new_point.sp.end_time_s >= now_start_time)) { // likely to favor tier0 // this db point ends after our now_start time - query_add_point_to_group(r, new_point, ops); + query_add_point_to_group(r, new_point, ops, add_flush); + new_point.added = true; } else { // we don't need this db point @@ -1493,14 +1765,14 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ // at exactly the time we will want // we only log if this is not point 1 - internal_error(new_point.end_time < ops->plan_expanded_after && + internal_error(new_point.sp.end_time_s < ops->plan_expanded_after && db_points_read_since_plan_switch > 1, "QUERY: '%s', dimension '%s' next_metric() " "returned point %zu from %ld time %ld, " "which is entirely before our current timeframe %ld to %ld " "(and before the entire query, after %ld, before %ld)", - qt->id, string2str(qm->dimension.id), - new_point.id, new_point.start_time, new_point.end_time, + qt->id, query_metric_id(qt, qm), + new_point.id, new_point.sp.start_time_s, new_point.sp.end_time_s, now_start_time, now_end_time, ops->plan_expanded_after, ops->plan_expanded_before); } @@ -1518,15 +1790,15 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ "QUERY: '%s', dimension '%s', the database does not advance the query," " it returned an end time less or equal to the end time of the last " "point we got %ld, %zu times", - qt->id, string2str(qm->dimension.id), - last1_point.end_time, count_same_end_time); + qt->id, query_metric_id(qt, qm), + last1_point.sp.end_time_s, count_same_end_time); - if(unlikely(new_point.end_time <= last1_point.end_time)) - new_point.end_time = now_end_time; + if(unlikely(new_point.sp.end_time_s <= last1_point.sp.end_time_s)) + new_point.sp.end_time_s = now_end_time; } - time_t stop_time = new_point.end_time; - if(unlikely(!storage_point_is_unset(next1_point))) { + time_t stop_time = new_point.sp.end_time_s; + if(unlikely(!storage_point_is_unset(next1_point) && next1_point.start_time_s >= now_end_time)) { // ONE POINT READ-AHEAD // the point crosses the start time of the // read ahead storage point we have read @@ -1537,18 +1809,20 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ // we have 3 points in memory: last2, last1, new // we select the one to use based on their timestamps - size_t iterations = 0; - for ( ; now_end_time <= stop_time && points_added < points_wanted ; - now_end_time += ops->view_update_every, iterations++) { + internal_fatal(now_end_time > stop_time || points_added >= points_wanted, + "QUERY: first part of query provides invalid point to interpolate (now_end_time %ld, stop_time %ld", + now_end_time, stop_time); + do { // now_start_time is wrong in this loop // but, we don't need it QUERY_POINT current_point; - if(likely(now_end_time > new_point.start_time)) { + if(likely(now_end_time > new_point.sp.start_time_s)) { // it is time for our NEW point to be used current_point = new_point; + new_point.added = true; // first copy, then set it, so that new_point will not be added again query_interpolate_point(current_point, last1_point, now_end_time); // internal_error(current_point.id > 0 @@ -1564,9 +1838,10 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ // current_point.id, current_point.start_time, current_point.end_time, // now_end_time); } - else if(likely(now_end_time <= last1_point.end_time)) { + else if(likely(now_end_time <= last1_point.sp.end_time_s)) { // our LAST point is still valid current_point = last1_point; + last1_point.added = true; // first copy, then set it, so that last1_point will not be added again query_interpolate_point(current_point, last2_point, now_end_time); // internal_error(current_point.id > 0 @@ -1586,14 +1861,11 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ current_point = QUERY_POINT_EMPTY; } - query_add_point_to_group(r, current_point, ops); + query_add_point_to_group(r, current_point, ops, add_flush); rrdr_line = rrdr_line_init(r, now_end_time, rrdr_line); size_t rrdr_o_v_index = rrdr_line * r->d + dim_id_in_rrdr; - if(unlikely(!min_date)) min_date = now_end_time; - max_date = now_end_time; - // find the place to store our values RRDR_VALUE_FLAGS *rrdr_value_options_ptr = &r->o[rrdr_o_v_index]; @@ -1605,15 +1877,12 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ *rrdr_value_options_ptr = ops->group_value_flags; // store the group value - NETDATA_DOUBLE group_value = ops->grouping_flush(r, rrdr_value_options_ptr); + NETDATA_DOUBLE group_value = time_grouping_flush(r, rrdr_value_options_ptr, add_flush); r->v[rrdr_o_v_index] = group_value; - // we only store uint8_t anomaly rates, - // so let's get double precision by storing - // anomaly rates in the range 0 - 200 - r->ar[rrdr_o_v_index] = ops->group_anomaly_rate / (NETDATA_DOUBLE)ops->group_points_added; + r->ar[rrdr_o_v_index] = storage_point_anomaly_rate(ops->group_point); - if(likely(points_added || dim_id_in_rrdr)) { + if(likely(points_added || r->internal.queries_count)) { // find the min/max across all dimensions if(unlikely(group_value < min)) min = group_value; @@ -1621,7 +1890,7 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ } else { - // runs only when dim_id_in_rrdr == 0 && points_added == 0 + // runs only when r->internal.queries_count == 0 && points_added == 0 // so, on the first point added for the query. min = max = group_value; } @@ -1630,31 +1899,38 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ ops->group_points_added = 0; ops->group_value_flags = RRDR_VALUE_NOTHING; ops->group_points_non_zero = 0; - ops->group_anomaly_rate = 0; - } - // the loop above increased "now" by query_granularity, + ops->group_point = STORAGE_POINT_UNSET; + + now_end_time += ops->view_update_every; + } while(now_end_time <= stop_time && points_added < points_wanted); + + // the loop above increased "now" by ops->view_update_every, // but the main loop will increase it too, // so, let's undo the last iteration of this loop - if(iterations) - now_end_time -= ops->view_update_every; + now_end_time -= ops->view_update_every; } query_planer_finalize_remaining_plans(ops); - r->internal.result_points_generated += points_added; - r->internal.db_points_read += ops->db_total_points_read; + qm->query_points = ops->query_point; + + // fill the rest of the points with empty values + while (points_added < points_wanted) { + rrdr_line++; + size_t rrdr_o_v_index = rrdr_line * r->d + dim_id_in_rrdr; + r->o[rrdr_o_v_index] = RRDR_VALUE_EMPTY; + r->v[rrdr_o_v_index] = 0.0; + r->ar[rrdr_o_v_index] = 0.0; + points_added++; + } + + r->internal.queries_count++; + r->view.min = min; + r->view.max = max; + + r->stats.result_points_generated += points_added; + r->stats.db_points_read += ops->db_total_points_read; for(size_t tr = 0; tr < storage_tiers ; tr++) - r->internal.tier_points_read[tr] += ops->db_points_read_per_tier[tr]; - - r->min = min; - r->max = max; - r->before = max_date; - r->after = min_date - ops->view_update_every + ops->query_granularity; - rrdr_done(r, rrdr_line); - - internal_error(points_added != points_wanted, - "QUERY: '%s', dimension '%s', requested %zu points, but RRDR added %zu (%zu db points read).", - qt->id, string2str(qm->dimension.id), - (size_t)points_wanted, (size_t)points_added, ops->db_total_points_read); + qt->db.tiers[tr].points += ops->db_points_read_per_tier[tr]; } // ---------------------------------------------------------------------------- @@ -1669,7 +1945,7 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s struct rrddim_tier *t = &rd->tiers[tier]; if(unlikely(!t)) return; - time_t latest_time_s = t->query_ops->latest_time_s(t->db_metric_handle); + time_t latest_time_s = storage_engine_latest_time_s(t->backend, t->db_metric_handle); time_t granularity = (time_t)t->tier_grouping * (time_t)rd->update_every; time_t time_diff = now_s - latest_time_s; @@ -1683,21 +1959,21 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s // for each lower tier for(int read_tier = (int)tier - 1; read_tier >= 0 ; read_tier--){ - time_t smaller_tier_first_time = rd->tiers[read_tier].query_ops->oldest_time_s(rd->tiers[read_tier].db_metric_handle); - time_t smaller_tier_last_time = rd->tiers[read_tier].query_ops->latest_time_s(rd->tiers[read_tier].db_metric_handle); + time_t smaller_tier_first_time = storage_engine_oldest_time_s(rd->tiers[read_tier].backend, rd->tiers[read_tier].db_metric_handle); + time_t smaller_tier_last_time = storage_engine_latest_time_s(rd->tiers[read_tier].backend, rd->tiers[read_tier].db_metric_handle); if(smaller_tier_last_time <= latest_time_s) continue; // it is as bad as we are long after_wanted = (latest_time_s < smaller_tier_first_time) ? smaller_tier_first_time : latest_time_s; long before_wanted = smaller_tier_last_time; struct rrddim_tier *tmp = &rd->tiers[read_tier]; - tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted, STORAGE_PRIORITY_HIGH); + storage_engine_query_init(tmp->backend, tmp->db_metric_handle, &handle, after_wanted, before_wanted, STORAGE_PRIORITY_HIGH); size_t points_read = 0; - while(!tmp->query_ops->is_finished(&handle)) { + while(!storage_engine_query_is_finished(&handle)) { - STORAGE_POINT sp = tmp->query_ops->next_metric(&handle); + STORAGE_POINT sp = storage_engine_query_next_metric(&handle); points_read++; if(sp.end_time_s > latest_time_s) { @@ -1706,7 +1982,7 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s } } - tmp->query_ops->finalize(&handle); + storage_engine_query_finalize(&handle); store_metric_collection_completed(); global_statistics_backfill_query_completed(points_read); @@ -1721,7 +1997,7 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s #ifdef NETDATA_INTERNAL_CHECKS static void rrd2rrdr_log_request_response_metadata(RRDR *r , RRDR_OPTIONS options __maybe_unused - , RRDR_GROUPING group_method + , RRDR_TIME_GROUPING group_method , bool aligned , size_t group , time_t resampling_time @@ -1737,8 +2013,9 @@ static void rrd2rrdr_log_request_response_metadata(RRDR *r , const char *msg ) { - time_t first_entry_s = r->internal.qt->db.first_time_s; - time_t last_entry_s = r->internal.qt->db.last_time_s; + QUERY_TARGET *qt = r->internal.qt; + time_t first_entry_s = qt->db.first_time_s; + time_t last_entry_s = qt->db.last_time_s; internal_error( true, @@ -1748,33 +2025,33 @@ static void rrd2rrdr_log_request_response_metadata(RRDR *r "duration (got: %ld, want: %ld, req: %ld, db: %ld), " "points (got: %zu, want: %zu, req: %zu), " "%s" - , r->internal.qt->id - , r->internal.qt->window.query_granularity + , qt->id + , qt->window.query_granularity // grouping , (aligned) ? "aligned" : "unaligned" - , group_method2string(group_method) + , time_grouping_method2string(group_method) , group , resampling_time , resampling_group // after - , r->after + , r->view.after , after_wanted , after_requested , first_entry_s // before - , r->before + , r->view.before , before_wanted , before_requested , last_entry_s // duration - , (long)(r->before - r->after + r->internal.qt->window.query_granularity) - , (long)(before_wanted - after_wanted + r->internal.qt->window.query_granularity) + , (long)(r->view.before - r->view.after + qt->window.query_granularity) + , (long)(before_wanted - after_wanted + qt->window.query_granularity) , (long)before_requested - after_requested - , (long)((last_entry_s - first_entry_s) + r->internal.qt->window.query_granularity) + , (long)((last_entry_s - first_entry_s) + qt->window.query_granularity) // points , r->rows @@ -1788,9 +2065,12 @@ static void rrd2rrdr_log_request_response_metadata(RRDR *r #endif // NETDATA_INTERNAL_CHECKS // Returns 1 if an absolute period was requested or 0 if it was a relative period -bool rrdr_relative_window_to_absolute(time_t *after, time_t *before) { +bool rrdr_relative_window_to_absolute(time_t *after, time_t *before, time_t *now_ptr) { time_t now = now_realtime_sec() - 1; + if(now_ptr) + *now_ptr = now; + int absolute_period_requested = -1; long long after_requested, before_requested; @@ -1890,11 +2170,11 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { size_t points_requested = (long)qt->request.points; time_t after_requested = qt->request.after; time_t before_requested = qt->request.before; - RRDR_GROUPING group_method = qt->request.group_method; + RRDR_TIME_GROUPING group_method = qt->request.time_group_method; time_t resampling_time_requested = qt->request.resampling_time; - RRDR_OPTIONS options = qt->request.options; + RRDR_OPTIONS options = qt->window.options; size_t tier = qt->request.tier; - time_t update_every = qt->db.minimum_latest_update_every_s; + time_t update_every = qt->db.minimum_latest_update_every_s ? qt->db.minimum_latest_update_every_s : 1; // RULES // points_requested = 0 @@ -1953,27 +2233,36 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { time_t last_entry_s = qt->db.last_time_s; if (first_entry_s == 0 || last_entry_s == 0) { - internal_error(true, "QUERY: no data detected on query '%s' (db first_entry_t = %ld, last_entry_t = %ld", qt->id, first_entry_s, last_entry_s); - query_debug_log_free(); - return false; - } + internal_error(true, "QUERY: no data detected on query '%s' (db first_entry_t = %ld, last_entry_t = %ld)", qt->id, first_entry_s, last_entry_s); + after_wanted = qt->window.after; + before_wanted = qt->window.before; - query_debug_log(":first_entry_t %ld, last_entry_t %ld", first_entry_s, last_entry_s); + if(after_wanted == before_wanted) + after_wanted = before_wanted - update_every; - if (after_wanted == 0) { - after_wanted = first_entry_s; - query_debug_log(":zero after_wanted %ld", after_wanted); + if (points_wanted == 0) { + points_wanted = (before_wanted - after_wanted) / update_every; + query_debug_log(":zero points_wanted %zu", points_wanted); + } } + else { + query_debug_log(":first_entry_t %ld, last_entry_t %ld", first_entry_s, last_entry_s); - if (before_wanted == 0) { - before_wanted = last_entry_s; - before_is_aligned_to_db_end = true; - query_debug_log(":zero before_wanted %ld", before_wanted); - } + if (after_wanted == 0) { + after_wanted = first_entry_s; + query_debug_log(":zero after_wanted %ld", after_wanted); + } - if (points_wanted == 0) { - points_wanted = (last_entry_s - first_entry_s) / update_every; - query_debug_log(":zero points_wanted %zu", points_wanted); + if (before_wanted == 0) { + before_wanted = last_entry_s; + before_is_aligned_to_db_end = true; + query_debug_log(":zero before_wanted %ld", before_wanted); + } + + if (points_wanted == 0) { + points_wanted = (last_entry_s - first_entry_s) / update_every; + query_debug_log(":zero points_wanted %zu", points_wanted); + } } } @@ -1983,7 +2272,7 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { } // convert our before_wanted and after_wanted to absolute - rrdr_relative_window_to_absolute(&after_wanted, &before_wanted); + rrdr_relative_window_to_absolute(&after_wanted, &before_wanted, NULL); query_debug_log(":relative2absolute after %ld, before %ld", after_wanted, before_wanted); if (natural_points && (options & RRDR_OPTION_SELECTED_TIER) && tier > 0 && storage_tiers > 1) { @@ -2145,8 +2434,8 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { qt->window.relative = relative_period_requested; qt->window.points = points_wanted; qt->window.group = group; - qt->window.group_method = group_method; - qt->window.group_options = qt->request.group_options; + qt->window.time_group_method = group_method; + qt->window.time_group_options = qt->request.time_group_options; qt->window.query_granularity = query_granularity; qt->window.resampling_group = resampling_group; qt->window.resampling_divisor = resampling_divisor; @@ -2157,80 +2446,1081 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { return true; } +// ---------------------------------------------------------------------------- +// group by + +struct group_by_label_key { + DICTIONARY *values; +}; + +static void group_by_label_key_insert_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data) { + // add the key to our r->label_keys global keys dictionary + DICTIONARY *label_keys = data; + dictionary_set(label_keys, dictionary_acquired_item_name(item), NULL, 0); + + // create a dictionary for the values of this key + struct group_by_label_key *k = value; + k->values = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE, NULL, 0); +} + +static void group_by_label_key_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct group_by_label_key *k = value; + dictionary_destroy(k->values); +} + +static int rrdlabels_traversal_cb_to_group_by_label_key(const char *name, const char *value, RRDLABEL_SRC ls __maybe_unused, void *data) { + DICTIONARY *dl = data; + struct group_by_label_key *k = dictionary_set(dl, name, NULL, sizeof(struct group_by_label_key)); + dictionary_set(k->values, value, NULL, 0); + return 1; +} + +void rrdr_json_group_by_labels(BUFFER *wb, const char *key, RRDR *r, RRDR_OPTIONS options) { + if(!r->label_keys || !r->dl) + return; + + buffer_json_member_add_object(wb, key); + + void *t; + dfe_start_read(r->label_keys, t) { + buffer_json_member_add_array(wb, t_dfe.name); + + for(size_t d = 0; d < r->d ;d++) { + if(!rrdr_dimension_should_be_exposed(r->od[d], options)) + continue; + + struct group_by_label_key *k = dictionary_get(r->dl[d], t_dfe.name); + if(k) { + buffer_json_add_array_item_array(wb); + void *tt; + dfe_start_read(k->values, tt) { + buffer_json_add_array_item_string(wb, tt_dfe.name); + } + dfe_done(tt); + buffer_json_array_close(wb); + } + else + buffer_json_add_array_item_string(wb, NULL); + } + + buffer_json_array_close(wb); + } + dfe_done(t); + + buffer_json_object_close(wb); // key +} + +static int group_by_label_is_space(char c) { + if(c == ',' || c == '|') + return 1; + + return 0; +} + +static void rrd2rrdr_set_timestamps(RRDR *r) { + QUERY_TARGET *qt = r->internal.qt; + + internal_fatal(qt->window.points != r->n, "QUERY: mismatch to the number of points in qt and r"); + + r->view.group = qt->window.group; + r->view.update_every = (int) query_view_update_every(qt); + r->view.before = qt->window.before; + r->view.after = qt->window.after; + + r->time_grouping.points_wanted = qt->window.points; + r->time_grouping.resampling_group = qt->window.resampling_group; + r->time_grouping.resampling_divisor = qt->window.resampling_divisor; + + r->rows = qt->window.points; + + size_t points_wanted = qt->window.points; + time_t after_wanted = qt->window.after; + time_t before_wanted = qt->window.before; (void)before_wanted; + + time_t view_update_every = r->view.update_every; + time_t query_granularity = (time_t)(r->view.update_every / r->view.group); + + size_t rrdr_line = 0; + time_t first_point_end_time = after_wanted + view_update_every - query_granularity; + time_t now_end_time = first_point_end_time; + + while (rrdr_line < points_wanted) { + r->t[rrdr_line++] = now_end_time; + now_end_time += view_update_every; + } + + internal_fatal(r->t[0] != first_point_end_time, "QUERY: wrong first timestamp in the query"); + internal_error(r->t[points_wanted - 1] != before_wanted, + "QUERY: wrong last timestamp in the query, expected %ld, found %ld", + before_wanted, r->t[points_wanted - 1]); +} + +static void query_group_by_make_dimension_key(BUFFER *key, RRDR_GROUP_BY group_by, size_t group_by_id, QUERY_TARGET *qt, QUERY_NODE *qn, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi, QUERY_DIMENSION *qd __maybe_unused, QUERY_METRIC *qm, bool query_has_percentage_of_instance) { + buffer_flush(key); + if(unlikely(!query_has_percentage_of_instance && qm->status & RRDR_DIMENSION_HIDDEN)) { + buffer_strcat(key, "__hidden_dimensions__"); + } + else if(unlikely(group_by & RRDR_GROUP_BY_SELECTED)) { + buffer_strcat(key, "selected"); + } + else { + if (group_by & RRDR_GROUP_BY_DIMENSION) { + buffer_fast_strcat(key, "|", 1); + buffer_strcat(key, query_metric_name(qt, qm)); + } + + if (group_by & (RRDR_GROUP_BY_INSTANCE|RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE)) { + buffer_fast_strcat(key, "|", 1); + buffer_strcat(key, string2str(query_instance_id_fqdn(qi, qt->request.version))); + } + + if (group_by & RRDR_GROUP_BY_LABEL) { + DICTIONARY *labels = rrdinstance_acquired_labels(qi->ria); + for (size_t l = 0; l < qt->group_by[group_by_id].used; l++) { + buffer_fast_strcat(key, "|", 1); + rrdlabels_get_value_to_buffer_or_unset(labels, key, qt->group_by[group_by_id].label_keys[l], "[unset]"); + } + } + + if (group_by & RRDR_GROUP_BY_NODE) { + buffer_fast_strcat(key, "|", 1); + buffer_strcat(key, qn->rrdhost->machine_guid); + } + + if (group_by & RRDR_GROUP_BY_CONTEXT) { + buffer_fast_strcat(key, "|", 1); + buffer_strcat(key, rrdcontext_acquired_id(qc->rca)); + } + + if (group_by & RRDR_GROUP_BY_UNITS) { + buffer_fast_strcat(key, "|", 1); + buffer_strcat(key, query_target_has_percentage_units(qt) ? "%" : rrdinstance_acquired_units(qi->ria)); + } + } +} + +static void query_group_by_make_dimension_id(BUFFER *key, RRDR_GROUP_BY group_by, size_t group_by_id, QUERY_TARGET *qt, QUERY_NODE *qn, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi, QUERY_DIMENSION *qd __maybe_unused, QUERY_METRIC *qm, bool query_has_percentage_of_instance) { + buffer_flush(key); + if(unlikely(!query_has_percentage_of_instance && qm->status & RRDR_DIMENSION_HIDDEN)) { + buffer_strcat(key, "__hidden_dimensions__"); + } + else if(unlikely(group_by & RRDR_GROUP_BY_SELECTED)) { + buffer_strcat(key, "selected"); + } + else { + if (group_by & RRDR_GROUP_BY_DIMENSION) { + buffer_strcat(key, query_metric_name(qt, qm)); + } + + if (group_by & (RRDR_GROUP_BY_INSTANCE|RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE)) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + if (group_by & RRDR_GROUP_BY_NODE) + buffer_strcat(key, rrdinstance_acquired_id(qi->ria)); + else + buffer_strcat(key, string2str(query_instance_id_fqdn(qi, qt->request.version))); + } + + if (group_by & RRDR_GROUP_BY_LABEL) { + DICTIONARY *labels = rrdinstance_acquired_labels(qi->ria); + for (size_t l = 0; l < qt->group_by[group_by_id].used; l++) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + rrdlabels_get_value_to_buffer_or_unset(labels, key, qt->group_by[group_by_id].label_keys[l], "[unset]"); + } + } + + if (group_by & RRDR_GROUP_BY_NODE) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + buffer_strcat(key, qn->rrdhost->machine_guid); + } + + if (group_by & RRDR_GROUP_BY_CONTEXT) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + buffer_strcat(key, rrdcontext_acquired_id(qc->rca)); + } + + if (group_by & RRDR_GROUP_BY_UNITS) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + buffer_strcat(key, query_target_has_percentage_units(qt) ? "%" : rrdinstance_acquired_units(qi->ria)); + } + } +} + +static void query_group_by_make_dimension_name(BUFFER *key, RRDR_GROUP_BY group_by, size_t group_by_id, QUERY_TARGET *qt, QUERY_NODE *qn, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi, QUERY_DIMENSION *qd __maybe_unused, QUERY_METRIC *qm, bool query_has_percentage_of_instance) { + buffer_flush(key); + if(unlikely(!query_has_percentage_of_instance && qm->status & RRDR_DIMENSION_HIDDEN)) { + buffer_strcat(key, "__hidden_dimensions__"); + } + else if(unlikely(group_by & RRDR_GROUP_BY_SELECTED)) { + buffer_strcat(key, "selected"); + } + else { + if (group_by & RRDR_GROUP_BY_DIMENSION) { + buffer_strcat(key, query_metric_name(qt, qm)); + } + + if (group_by & (RRDR_GROUP_BY_INSTANCE|RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE)) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + if (group_by & RRDR_GROUP_BY_NODE) + buffer_strcat(key, rrdinstance_acquired_name(qi->ria)); + else + buffer_strcat(key, string2str(query_instance_name_fqdn(qi, qt->request.version))); + } + + if (group_by & RRDR_GROUP_BY_LABEL) { + DICTIONARY *labels = rrdinstance_acquired_labels(qi->ria); + for (size_t l = 0; l < qt->group_by[group_by_id].used; l++) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + rrdlabels_get_value_to_buffer_or_unset(labels, key, qt->group_by[group_by_id].label_keys[l], "[unset]"); + } + } + + if (group_by & RRDR_GROUP_BY_NODE) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + buffer_strcat(key, rrdhost_hostname(qn->rrdhost)); + } + + if (group_by & RRDR_GROUP_BY_CONTEXT) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + buffer_strcat(key, rrdcontext_acquired_id(qc->rca)); + } + + if (group_by & RRDR_GROUP_BY_UNITS) { + if (buffer_strlen(key) != 0) + buffer_fast_strcat(key, ",", 1); + + buffer_strcat(key, query_target_has_percentage_units(qt) ? "%" : rrdinstance_acquired_units(qi->ria)); + } + } +} + +struct rrdr_group_by_entry { + size_t priority; + size_t count; + STRING *id; + STRING *name; + STRING *units; + RRDR_DIMENSION_FLAGS od; + DICTIONARY *dl; +}; + +static RRDR *rrd2rrdr_group_by_initialize(ONEWAYALLOC *owa, QUERY_TARGET *qt) { + RRDR *r_tmp = NULL; + RRDR_OPTIONS options = qt->window.options; + + if(qt->request.version < 2) { + // v1 query + RRDR *r = rrdr_create(owa, qt, qt->query.used, qt->window.points); + if(unlikely(!r)) { + internal_error(true, "QUERY: cannot create RRDR for %s, after=%ld, before=%ld, dimensions=%u, points=%zu", + qt->id, qt->window.after, qt->window.before, qt->query.used, qt->window.points); + return NULL; + } + r->group_by.r = NULL; + + for(size_t d = 0; d < qt->query.used ; d++) { + QUERY_METRIC *qm = query_metric(qt, d); + QUERY_DIMENSION *qd = query_dimension(qt, qm->link.query_dimension_id); + r->di[d] = rrdmetric_acquired_id_dup(qd->rma); + r->dn[d] = rrdmetric_acquired_name_dup(qd->rma); + } + + rrd2rrdr_set_timestamps(r); + return r; + } + // v2 query + + // parse all the group-by label keys + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) { + if (qt->request.group_by[g].group_by & RRDR_GROUP_BY_LABEL && + qt->request.group_by[g].group_by_label && *qt->request.group_by[g].group_by_label) + qt->group_by[g].used = quoted_strings_splitter( + qt->request.group_by[g].group_by_label, qt->group_by[g].label_keys, + GROUP_BY_MAX_LABEL_KEYS, group_by_label_is_space); + + if (!qt->group_by[g].used) + qt->request.group_by[g].group_by &= ~RRDR_GROUP_BY_LABEL; + } + + // make sure there are valid group-by methods + bool query_has_percentage_of_instance = false; + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES - 1 ;g++) { + if(!(qt->request.group_by[g].group_by & SUPPORTED_GROUP_BY_METHODS)) + qt->request.group_by[g].group_by = (g == 0) ? RRDR_GROUP_BY_DIMENSION : RRDR_GROUP_BY_NONE; + + if(qt->request.group_by[g].group_by & RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE) + query_has_percentage_of_instance = true; + } + + // merge all group-by options to upper levels + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES - 1 ;g++) { + if(qt->request.group_by[g].group_by == RRDR_GROUP_BY_NONE) + continue; + + if(qt->request.group_by[g].group_by == RRDR_GROUP_BY_SELECTED) { + for (size_t r = g + 1; r < MAX_QUERY_GROUP_BY_PASSES; r++) + qt->request.group_by[r].group_by = RRDR_GROUP_BY_NONE; + } + else { + for (size_t r = g + 1; r < MAX_QUERY_GROUP_BY_PASSES; r++) { + if (qt->request.group_by[r].group_by == RRDR_GROUP_BY_NONE) + continue; + + if (qt->request.group_by[r].group_by != RRDR_GROUP_BY_SELECTED) { + if(qt->request.group_by[r].group_by & RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE) + qt->request.group_by[g].group_by |= RRDR_GROUP_BY_INSTANCE; + else + qt->request.group_by[g].group_by |= qt->request.group_by[r].group_by; + + if(qt->request.group_by[r].group_by & RRDR_GROUP_BY_LABEL) { + for (size_t lr = 0; lr < qt->group_by[r].used; lr++) { + bool found = false; + for (size_t lg = 0; lg < qt->group_by[g].used; lg++) { + if (strcmp(qt->group_by[g].label_keys[lg], qt->group_by[r].label_keys[lr]) == 0) { + found = true; + break; + } + } + + if (!found && qt->group_by[g].used < GROUP_BY_MAX_LABEL_KEYS * MAX_QUERY_GROUP_BY_PASSES) + qt->group_by[g].label_keys[qt->group_by[g].used++] = qt->group_by[r].label_keys[lr]; + } + } + } + } + } + } + + int added = 0; + RRDR *first_r = NULL, *last_r = NULL; + BUFFER *key = buffer_create(0, NULL); + struct rrdr_group_by_entry *entries = onewayalloc_mallocz(owa, qt->query.used * sizeof(struct rrdr_group_by_entry)); + DICTIONARY *groups = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE); + DICTIONARY *label_keys = NULL; + + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) { + RRDR_GROUP_BY group_by = qt->request.group_by[g].group_by; + + if(group_by == RRDR_GROUP_BY_NONE) + break; + + memset(entries, 0, qt->query.used * sizeof(struct rrdr_group_by_entry)); + dictionary_flush(groups); + added = 0; + + size_t hidden_dimensions = 0; + bool final_grouping = (g == MAX_QUERY_GROUP_BY_PASSES - 1 || qt->request.group_by[g + 1].group_by == RRDR_GROUP_BY_NONE) ? true : false; + + if (final_grouping && (options & RRDR_OPTION_GROUP_BY_LABELS)) + label_keys = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE, NULL, 0); + + QUERY_INSTANCE *last_qi = NULL; + size_t priority = 0; + time_t update_every_max = 0; + for (size_t d = 0; d < qt->query.used; d++) { + QUERY_METRIC *qm = query_metric(qt, d); + QUERY_DIMENSION *qd = query_dimension(qt, qm->link.query_dimension_id); + QUERY_INSTANCE *qi = query_instance(qt, qm->link.query_instance_id); + QUERY_CONTEXT *qc = query_context(qt, qm->link.query_context_id); + QUERY_NODE *qn = query_node(qt, qm->link.query_node_id); + + if (qi != last_qi) { + last_qi = qi; + + time_t update_every = rrdinstance_acquired_update_every(qi->ria); + if (update_every > update_every_max) + update_every_max = update_every; + } + + priority = qd->priority; + + if(qm->status & RRDR_DIMENSION_HIDDEN) + hidden_dimensions++; + + // -------------------------------------------------------------------- + // generate the group by key + + query_group_by_make_dimension_key(key, group_by, g, qt, qn, qc, qi, qd, qm, query_has_percentage_of_instance); + + // lookup the key in the dictionary + + int pos = -1; + int *set = dictionary_set(groups, buffer_tostring(key), &pos, sizeof(pos)); + if (*set == -1) { + // the key just added to the dictionary + + *set = pos = added++; + + // ---------------------------------------------------------------- + // generate the dimension id + + query_group_by_make_dimension_id(key, group_by, g, qt, qn, qc, qi, qd, qm, query_has_percentage_of_instance); + entries[pos].id = string_strdupz(buffer_tostring(key)); + + // ---------------------------------------------------------------- + // generate the dimension name + + query_group_by_make_dimension_name(key, group_by, g, qt, qn, qc, qi, qd, qm, query_has_percentage_of_instance); + entries[pos].name = string_strdupz(buffer_tostring(key)); + + // add the rest of the info + entries[pos].units = rrdinstance_acquired_units_dup(qi->ria); + entries[pos].priority = priority; + + if (label_keys) { + entries[pos].dl = dictionary_create_advanced( + DICT_OPTION_SINGLE_THREADED | DICT_OPTION_FIXED_SIZE | DICT_OPTION_DONT_OVERWRITE_VALUE, + NULL, sizeof(struct group_by_label_key)); + dictionary_register_insert_callback(entries[pos].dl, group_by_label_key_insert_cb, label_keys); + dictionary_register_delete_callback(entries[pos].dl, group_by_label_key_delete_cb, label_keys); + } + } else { + // the key found in the dictionary + pos = *set; + } + + entries[pos].count++; + + if (unlikely(priority < entries[pos].priority)) + entries[pos].priority = priority; + + if(g > 0) + last_r->dgbs[qm->grouped_as.slot] = pos; + else + qm->grouped_as.first_slot = pos; + + qm->grouped_as.slot = pos; + qm->grouped_as.id = entries[pos].id; + qm->grouped_as.name = entries[pos].name; + qm->grouped_as.units = entries[pos].units; + + // copy the dimension flags decided by the query target + // we need this, because if a dimension is explicitly selected + // the query target adds to it the non-zero flag + qm->status |= RRDR_DIMENSION_GROUPED; + + if(query_has_percentage_of_instance) + // when the query has percentage of instance + // there will be no hidden dimensions in the final query + // so we have to remove the hidden flag from all dimensions + entries[pos].od |= qm->status & ~RRDR_DIMENSION_HIDDEN; + else + entries[pos].od |= qm->status; + + if (entries[pos].dl) + rrdlabels_walkthrough_read(rrdinstance_acquired_labels(qi->ria), + rrdlabels_traversal_cb_to_group_by_label_key, entries[pos].dl); + } + + RRDR *r = rrdr_create(owa, qt, added, qt->window.points); + if (!r) { + internal_error(true, + "QUERY: cannot create group by RRDR for %s, after=%ld, before=%ld, dimensions=%d, points=%zu", + qt->id, qt->window.after, qt->window.before, added, qt->window.points); + goto cleanup; + } + + bool hidden_dimension_on_percentage_of_instance = hidden_dimensions && (group_by & RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE); + + // prevent double cleanup in case of error + added = 0; + + if(!last_r) + first_r = last_r = r; + else + last_r->group_by.r = r; + + last_r = r; + + rrd2rrdr_set_timestamps(r); + r->dp = onewayalloc_callocz(owa, r->d, sizeof(*r->dp)); + r->dview = onewayalloc_callocz(owa, r->d, sizeof(*r->dview)); + r->dgbc = onewayalloc_callocz(owa, r->d, sizeof(*r->dgbc)); + r->gbc = onewayalloc_callocz(owa, r->n * r->d, sizeof(*r->gbc)); + r->dqp = onewayalloc_callocz(owa, r->d, sizeof(STORAGE_POINT)); + + if(hidden_dimension_on_percentage_of_instance) + // this is where we are going to group the hidden dimensions + r->vh = onewayalloc_mallocz(owa, r->n * r->d * sizeof(*r->vh)); + + if(!final_grouping) + // this is where we are going to store the slot in the next RRDR + // that we are going to group by the dimension of this RRDR + r->dgbs = onewayalloc_callocz(owa, r->d, sizeof(*r->dgbs)); + + if (label_keys) { + r->dl = onewayalloc_callocz(owa, r->d, sizeof(DICTIONARY *)); + r->label_keys = label_keys; + label_keys = NULL; + } + + // zero r (dimension options, names, and ids) + // this is required, because group-by may lead to empty dimensions + for (size_t d = 0; d < r->d; d++) { + r->di[d] = entries[d].id; + r->dn[d] = entries[d].name; + + r->od[d] = entries[d].od; + r->du[d] = entries[d].units; + r->dp[d] = entries[d].priority; + r->dgbc[d] = entries[d].count; + + if (r->dl) + r->dl[d] = entries[d].dl; + } + + // initialize partial trimming + r->partial_data_trimming.max_update_every = update_every_max; + r->partial_data_trimming.expected_after = + (!(qt->window.options & RRDR_OPTION_RETURN_RAW) && + qt->window.before >= qt->window.now - update_every_max) ? + qt->window.before - update_every_max : + qt->window.before; + r->partial_data_trimming.trimmed_after = qt->window.before; + + // make all values empty + for (size_t i = 0; i != r->n; i++) { + NETDATA_DOUBLE *cn = &r->v[i * r->d]; + RRDR_VALUE_FLAGS *co = &r->o[i * r->d]; + NETDATA_DOUBLE *ar = &r->ar[i * r->d]; + NETDATA_DOUBLE *vh = r->vh ? &r->vh[i * r->d] : NULL; + + for (size_t d = 0; d < r->d; d++) { + cn[d] = NAN; + ar[d] = 0.0; + co[d] = RRDR_VALUE_EMPTY; + + if(vh) + *vh = NAN; + } + } + } + + if(!first_r || !last_r) + goto cleanup; + + r_tmp = rrdr_create(owa, qt, 1, qt->window.points); + if (!r_tmp) { + internal_error(true, + "QUERY: cannot create group by temporary RRDR for %s, after=%ld, before=%ld, dimensions=%d, points=%zu", + qt->id, qt->window.after, qt->window.before, 1, qt->window.points); + goto cleanup; + } + rrd2rrdr_set_timestamps(r_tmp); + r_tmp->group_by.r = first_r; + +cleanup: + if(!first_r || !last_r || !r_tmp) { + if(r_tmp) { + r_tmp->group_by.r = NULL; + rrdr_free(owa, r_tmp); + } + + if(first_r) { + RRDR *r = first_r; + while (r) { + r_tmp = r->group_by.r; + r->group_by.r = NULL; + rrdr_free(owa, r); + r = r_tmp; + } + } + + if(entries && added) { + for (int d = 0; d < added; d++) { + string_freez(entries[d].id); + string_freez(entries[d].name); + string_freez(entries[d].units); + dictionary_destroy(entries[d].dl); + } + } + dictionary_destroy(label_keys); + + first_r = last_r = r_tmp = NULL; + } + + buffer_free(key); + onewayalloc_freez(owa, entries); + dictionary_destroy(groups); + + return r_tmp; +} + +static void rrd2rrdr_group_by_add_metric(RRDR *r_dst, size_t d_dst, RRDR *r_tmp, size_t d_tmp, + RRDR_GROUP_BY_FUNCTION group_by_aggregate_function, + STORAGE_POINT *query_points, size_t pass __maybe_unused) { + if(!r_tmp || r_dst == r_tmp || !(r_tmp->od[d_tmp] & RRDR_DIMENSION_QUERIED)) + return; + + internal_fatal(r_dst->n != r_tmp->n, "QUERY: group-by source and destination do not have the same number of rows"); + internal_fatal(d_dst >= r_dst->d, "QUERY: group-by destination dimension number exceeds destination RRDR size"); + internal_fatal(d_tmp >= r_tmp->d, "QUERY: group-by source dimension number exceeds source RRDR size"); + internal_fatal(!r_dst->dqp, "QUERY: group-by destination is not properly prepared (missing dqp array)"); + internal_fatal(!r_dst->gbc, "QUERY: group-by destination is not properly prepared (missing gbc array)"); + + bool hidden_dimension_on_percentage_of_instance = (r_tmp->od[d_tmp] & RRDR_DIMENSION_HIDDEN) && r_dst->vh; + + if(!hidden_dimension_on_percentage_of_instance) { + r_dst->od[d_dst] |= r_tmp->od[d_tmp]; + storage_point_merge_to(r_dst->dqp[d_dst], *query_points); + } + + // do the group_by + for(size_t i = 0; i != rrdr_rows(r_tmp) ; i++) { + + size_t idx_tmp = i * r_tmp->d + d_tmp; + NETDATA_DOUBLE n_tmp = r_tmp->v[ idx_tmp ]; + RRDR_VALUE_FLAGS o_tmp = r_tmp->o[ idx_tmp ]; + NETDATA_DOUBLE ar_tmp = r_tmp->ar[ idx_tmp ]; + + if(o_tmp & RRDR_VALUE_EMPTY) + continue; + + size_t idx_dst = i * r_dst->d + d_dst; + NETDATA_DOUBLE *cn = (hidden_dimension_on_percentage_of_instance) ? &r_dst->vh[ idx_dst ] : &r_dst->v[ idx_dst ]; + RRDR_VALUE_FLAGS *co = &r_dst->o[ idx_dst ]; + NETDATA_DOUBLE *ar = &r_dst->ar[ idx_dst ]; + uint32_t *gbc = &r_dst->gbc[ idx_dst ]; + + switch(group_by_aggregate_function) { + default: + case RRDR_GROUP_BY_FUNCTION_AVERAGE: + case RRDR_GROUP_BY_FUNCTION_SUM: + if(isnan(*cn)) + *cn = n_tmp; + else + *cn += n_tmp; + break; + + case RRDR_GROUP_BY_FUNCTION_MIN: + if(isnan(*cn) || n_tmp < *cn) + *cn = n_tmp; + break; + + case RRDR_GROUP_BY_FUNCTION_MAX: + if(isnan(*cn) || n_tmp > *cn) + *cn = n_tmp; + break; + } + + if(!hidden_dimension_on_percentage_of_instance) { + *co &= ~RRDR_VALUE_EMPTY; + *co |= (o_tmp & (RRDR_VALUE_RESET | RRDR_VALUE_PARTIAL)); + *ar += ar_tmp; + (*gbc)++; + } + } +} + +static void rrdr2rrdr_group_by_partial_trimming(RRDR *r) { + time_t trimmable_after = r->partial_data_trimming.expected_after; + + // find the point just before the trimmable ones + ssize_t i = (ssize_t)r->n - 1; + for( ; i >= 0 ;i--) { + if (r->t[i] < trimmable_after) + break; + } + + if(unlikely(i < 0)) + return; + + size_t last_row_gbc = 0; + for (; i < (ssize_t)r->n; i++) { + size_t row_gbc = 0; + for (size_t d = 0; d < r->d; d++) { + if (unlikely(!(r->od[d] & RRDR_DIMENSION_QUERIED))) + continue; + + row_gbc += r->gbc[ i * r->d + d ]; + } + + if (unlikely(r->t[i] >= trimmable_after && row_gbc < last_row_gbc)) { + // discard the rest of the points + r->partial_data_trimming.trimmed_after = r->t[i]; + r->rows = i; + break; + } + else + last_row_gbc = row_gbc; + } +} + +static void rrdr2rrdr_group_by_calculate_percentage_of_instance(RRDR *r) { + if(!r->vh) + return; + + for(size_t i = 0; i < r->n ;i++) { + NETDATA_DOUBLE *cn = &r->v[ i * r->d ]; + NETDATA_DOUBLE *ch = &r->vh[ i * r->d ]; + + for(size_t d = 0; d < r->d ;d++) { + NETDATA_DOUBLE n = cn[d]; + NETDATA_DOUBLE h = ch[d]; + + if(isnan(n)) + cn[d] = 0.0; + + else if(isnan(h)) + cn[d] = 100.0; + + else + cn[d] = n * 100.0 / (n + h); + } + } +} + +static void rrd2rrdr_convert_to_percentage(RRDR *r) { + size_t global_min_max_values = 0; + NETDATA_DOUBLE global_min = NAN, global_max = NAN; + + for(size_t i = 0; i != r->n ;i++) { + NETDATA_DOUBLE *cn = &r->v[ i * r->d ]; + RRDR_VALUE_FLAGS *co = &r->o[ i * r->d ]; + + NETDATA_DOUBLE total = 0; + for (size_t d = 0; d < r->d; d++) { + if (unlikely(!(r->od[d] & RRDR_DIMENSION_QUERIED))) + continue; + + if(co[d] & RRDR_VALUE_EMPTY) + continue; + + total += cn[d]; + } + + if(total == 0.0) + total = 1.0; + + for (size_t d = 0; d < r->d; d++) { + if (unlikely(!(r->od[d] & RRDR_DIMENSION_QUERIED))) + continue; + + if(co[d] & RRDR_VALUE_EMPTY) + continue; + + NETDATA_DOUBLE n = cn[d]; + n = cn[d] = n * 100.0 / total; + + if(unlikely(!global_min_max_values++)) + global_min = global_max = n; + else { + if(n < global_min) + global_min = n; + if(n > global_max) + global_max = n; + } + } + } + + r->view.min = global_min; + r->view.max = global_max; + + if(!r->dview) + // v1 query + return; + + // v2 query + + for (size_t d = 0; d < r->d; d++) { + if (unlikely(!(r->od[d] & RRDR_DIMENSION_QUERIED))) + continue; + + size_t count = 0; + NETDATA_DOUBLE min = 0.0, max = 0.0, sum = 0.0, ars = 0.0; + for(size_t i = 0; i != r->rows ;i++) { // we use r->rows to respect trimming + size_t idx = i * r->d + d; + + RRDR_VALUE_FLAGS o = r->o[ idx ]; + + if (o & RRDR_VALUE_EMPTY) + continue; + + NETDATA_DOUBLE ar = r->ar[ idx ]; + ars += ar; + + NETDATA_DOUBLE n = r->v[ idx ]; + sum += n; + + if(!count++) + min = max = n; + else { + if(n < min) + min = n; + if(n > max) + max = n; + } + } + + r->dview[d] = (STORAGE_POINT) { + .sum = sum, + .count = count, + .min = min, + .max = max, + .anomaly_count = (size_t)(ars * (NETDATA_DOUBLE)count), + }; + } +} + +static RRDR *rrd2rrdr_group_by_finalize(RRDR *r_tmp) { + QUERY_TARGET *qt = r_tmp->internal.qt; + RRDR_OPTIONS options = qt->window.options; + + if(!r_tmp->group_by.r) { + // v1 query + if(options & RRDR_OPTION_PERCENTAGE) + rrd2rrdr_convert_to_percentage(r_tmp); + return r_tmp; + } + // v2 query + + // do the additional passes on RRDRs + RRDR *last_r = r_tmp->group_by.r; + rrdr2rrdr_group_by_calculate_percentage_of_instance(last_r); + + RRDR *r = last_r->group_by.r; + size_t pass = 0; + while(r) { + pass++; + for(size_t d = 0; d < last_r->d ;d++) { + rrd2rrdr_group_by_add_metric(r, last_r->dgbs[d], last_r, d, + qt->request.group_by[pass].aggregation, + &last_r->dqp[d], pass); + } + rrdr2rrdr_group_by_calculate_percentage_of_instance(r); + + last_r = r; + r = last_r->group_by.r; + } + + // free all RRDRs except the last one + r = r_tmp; + while(r != last_r) { + r_tmp = r->group_by.r; + r->group_by.r = NULL; + rrdr_free(r->internal.owa, r); + r = r_tmp; + } + r = last_r; + + // find the final aggregation + RRDR_GROUP_BY_FUNCTION aggregation = qt->request.group_by[0].aggregation; + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) + if(qt->request.group_by[g].group_by != RRDR_GROUP_BY_NONE) + aggregation = qt->request.group_by[g].aggregation; + + if(!(options & RRDR_OPTION_RETURN_RAW) && r->partial_data_trimming.expected_after < qt->window.before) + rrdr2rrdr_group_by_partial_trimming(r); + + // apply averaging, remove RRDR_VALUE_EMPTY, find the non-zero dimensions, min and max + size_t global_min_max_values = 0; + size_t dimensions_nonzero = 0; + NETDATA_DOUBLE global_min = NAN, global_max = NAN; + for (size_t d = 0; d < r->d; d++) { + if (unlikely(!(r->od[d] & RRDR_DIMENSION_QUERIED))) + continue; + + size_t points_nonzero = 0; + NETDATA_DOUBLE min = 0, max = 0, sum = 0, ars = 0; + size_t count = 0; + + for(size_t i = 0; i != r->n ;i++) { + size_t idx = i * r->d + d; + + NETDATA_DOUBLE *cn = &r->v[ idx ]; + RRDR_VALUE_FLAGS *co = &r->o[ idx ]; + NETDATA_DOUBLE *ar = &r->ar[ idx ]; + uint32_t gbc = r->gbc[ idx ]; + + if(likely(gbc)) { + *co &= ~RRDR_VALUE_EMPTY; + + if(gbc != r->dgbc[d]) + *co |= RRDR_VALUE_PARTIAL; + + NETDATA_DOUBLE n; + + sum += *cn; + ars += *ar; + + if(aggregation == RRDR_GROUP_BY_FUNCTION_AVERAGE && !query_target_aggregatable(qt)) + n = (*cn /= gbc); + else + n = *cn; + + if(!query_target_aggregatable(qt)) + *ar /= gbc; + + if(islessgreater(n, 0.0)) + points_nonzero++; + + if(unlikely(!count)) + min = max = n; + else { + if(n < min) + min = n; + + if(n > max) + max = n; + } + + if(unlikely(!global_min_max_values++)) + global_min = global_max = n; + else { + if(n < global_min) + global_min = n; + + if(n > global_max) + global_max = n; + } + + count += gbc; + } + } + + if(points_nonzero) { + r->od[d] |= RRDR_DIMENSION_NONZERO; + dimensions_nonzero++; + } + + r->dview[d] = (STORAGE_POINT) { + .sum = sum, + .count = count, + .min = min, + .max = max, + .anomaly_count = (size_t)(ars * RRDR_DVIEW_ANOMALY_COUNT_MULTIPLIER / 100.0), + }; + } + + r->view.min = global_min; + r->view.max = global_max; + + if(!dimensions_nonzero && (qt->window.options & RRDR_OPTION_NONZERO)) { + // all dimensions are zero + // remove the nonzero option + qt->window.options &= ~RRDR_OPTION_NONZERO; + } + + if(options & RRDR_OPTION_PERCENTAGE && !(options & RRDR_OPTION_RETURN_RAW)) + rrd2rrdr_convert_to_percentage(r); + + // update query instance counts in query host and query context + { + size_t h = 0, c = 0, i = 0; + for(; h < qt->nodes.used ; h++) { + QUERY_NODE *qn = &qt->nodes.array[h]; + + for(; c < qt->contexts.used ;c++) { + QUERY_CONTEXT *qc = &qt->contexts.array[c]; + + if(!rrdcontext_acquired_belongs_to_host(qc->rca, qn->rrdhost)) + break; + + for(; i < qt->instances.used ;i++) { + QUERY_INSTANCE *qi = &qt->instances.array[i]; + + if(!rrdinstance_acquired_belongs_to_context(qi->ria, qc->rca)) + break; + + if(qi->metrics.queried) { + qc->instances.queried++; + qn->instances.queried++; + } + else if(qi->metrics.failed) { + qc->instances.failed++; + qn->instances.failed++; + } + } + } + } + } + + return r; +} + +// ---------------------------------------------------------------------------- +// query entry point + RRDR *rrd2rrdr_legacy( ONEWAYALLOC *owa, RRDSET *st, size_t points, time_t after, time_t before, - RRDR_GROUPING group_method, time_t resampling_time, RRDR_OPTIONS options, const char *dimensions, - const char *group_options, time_t timeout, size_t tier, QUERY_SOURCE query_source, + RRDR_TIME_GROUPING group_method, time_t resampling_time, RRDR_OPTIONS options, const char *dimensions, + const char *group_options, time_t timeout_ms, size_t tier, QUERY_SOURCE query_source, STORAGE_PRIORITY priority) { QUERY_TARGET_REQUEST qtr = { + .version = 1, .st = st, .points = points, .after = after, .before = before, - .group_method = group_method, + .time_group_method = group_method, .resampling_time = resampling_time, .options = options, .dimensions = dimensions, - .group_options = group_options, - .timeout = timeout, + .time_group_options = group_options, + .timeout_ms = timeout_ms, .tier = tier, .query_source = query_source, .priority = priority, }; - return rrd2rrdr(owa, query_target_create(&qtr)); + QUERY_TARGET *qt = query_target_create(&qtr); + RRDR *r = rrd2rrdr(owa, qt); + if(!r) { + query_target_release(qt); + return NULL; + } + + r->internal.release_with_rrdr_qt = qt; + return r; } RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { - if(!qt) + if(!qt || !owa) return NULL; - if(!owa) { - query_target_release(qt); - return NULL; - } - // qt.window members are the WANTED ones. // qt.request members are the REQUESTED ones. - RRDR *r = rrdr_create(owa, qt); - if(unlikely(!r)) { - internal_error(true, "QUERY: cannot create RRDR for %s, after=%ld, before=%ld, points=%zu", - qt->id, qt->window.after, qt->window.before, qt->window.points); + RRDR *r_tmp = rrd2rrdr_group_by_initialize(owa, qt); + if(!r_tmp) return NULL; - } - if(unlikely(!r->d || !qt->window.points)) { - internal_error(true, "QUERY: returning empty RRDR (no dimensions in RRDSET) for %s, after=%ld, before=%ld, points=%zu", - qt->id, qt->window.after, qt->window.before, qt->window.points); - return r; - } + // the RRDR we group-by at + RRDR *r = (r_tmp->group_by.r) ? r_tmp->group_by.r : r_tmp; + + // the final RRDR to return to callers + RRDR *last_r = r_tmp; + while(last_r->group_by.r) + last_r = last_r->group_by.r; if(qt->window.relative) - r->result_options |= RRDR_RESULT_OPTION_RELATIVE; + last_r->view.flags |= RRDR_RESULT_FLAG_RELATIVE; else - r->result_options |= RRDR_RESULT_OPTION_ABSOLUTE; - - // ------------------------------------------------------------------------- - // initialize RRDR - - r->group = qt->window.group; - r->update_every = (int) (qt->window.group * qt->window.query_granularity); - r->before = qt->window.before; - r->after = qt->window.after; - r->internal.points_wanted = qt->window.points; - r->internal.resampling_group = qt->window.resampling_group; - r->internal.resampling_divisor = qt->window.resampling_divisor; - r->internal.query_options = qt->window.options; + last_r->view.flags |= RRDR_RESULT_FLAG_ABSOLUTE; // ------------------------------------------------------------------------- // assign the processor functions - rrdr_set_grouping_function(r, qt->window.group_method); + rrdr_set_grouping_function(r_tmp, qt->window.time_group_method); // allocate any memory required by the grouping method - r->internal.grouping_create(r, qt->window.group_options); + r_tmp->time_grouping.create(r_tmp, qt->window.time_group_options); // ------------------------------------------------------------------------- // do the work for each dimension @@ -2239,122 +3529,207 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { size_t max_rows = 0; long dimensions_used = 0, dimensions_nonzero = 0; - struct timeval query_start_time; - struct timeval query_current_time; - if (qt->request.timeout) - now_realtime_timeval(&query_start_time); - size_t last_db_points_read = 0; size_t last_result_points_generated = 0; - QUERY_ENGINE_OPS **ops = onewayalloc_callocz(r->internal.owa, qt->query.used, sizeof(QUERY_ENGINE_OPS *)); + internal_fatal(released_ops, "QUERY: released_ops should be NULL when the query starts"); - size_t capacity = libuv_worker_threads * 2; + QUERY_ENGINE_OPS **ops = NULL; + if(qt->query.used) + ops = onewayalloc_callocz(owa, qt->query.used, sizeof(QUERY_ENGINE_OPS *)); + + size_t capacity = libuv_worker_threads * 10; size_t max_queries_to_prepare = (qt->query.used > (capacity - 1)) ? (capacity - 1) : qt->query.used; size_t queries_prepared = 0; while(queries_prepared < max_queries_to_prepare) { // preload another query - ops[queries_prepared] = rrd2rrdr_query_prep(r, queries_prepared); + ops[queries_prepared] = rrd2rrdr_query_ops_prep(r_tmp, queries_prepared); queries_prepared++; } - for(size_t c = 0, max = qt->query.used; c < max ; c++) { + QUERY_NODE *last_qn = NULL; + usec_t last_ut = now_monotonic_usec(); + usec_t last_qn_ut = last_ut; + + for(size_t d = 0; d < qt->query.used ; d++) { + QUERY_METRIC *qm = query_metric(qt, d); + QUERY_DIMENSION *qd = query_dimension(qt, qm->link.query_dimension_id); + QUERY_INSTANCE *qi = query_instance(qt, qm->link.query_instance_id); + QUERY_CONTEXT *qc = query_context(qt, qm->link.query_context_id); + QUERY_NODE *qn = query_node(qt, qm->link.query_node_id); - if(queries_prepared < max) { + usec_t now_ut = last_ut; + if(qn != last_qn) { + if(last_qn) + last_qn->duration_ut = now_ut - last_qn_ut; + + last_qn = qn; + last_qn_ut = now_ut; + } + + if(queries_prepared < qt->query.used) { // preload another query - ops[queries_prepared] = rrd2rrdr_query_prep(r, queries_prepared); + ops[queries_prepared] = rrd2rrdr_query_ops_prep(r_tmp, queries_prepared); queries_prepared++; } + size_t dim_in_rrdr_tmp = (r_tmp != r) ? 0 : d; + // set the query target dimension options to rrdr - r->od[c] = qt->query.array[c].dimension.options; + r_tmp->od[dim_in_rrdr_tmp] = qm->status; // reset the grouping for the new dimension - r->internal.grouping_reset(r); + r_tmp->time_grouping.reset(r_tmp); - if(ops[c]) { - r->od[c] |= RRDR_DIMENSION_QUERIED; - rrd2rrdr_query_execute(r, c, ops[c]); + if(ops[d]) { + rrd2rrdr_query_execute(r_tmp, dim_in_rrdr_tmp, ops[d]); + r_tmp->od[dim_in_rrdr_tmp] |= RRDR_DIMENSION_QUERIED; + + now_ut = now_monotonic_usec(); + qm->duration_ut = now_ut - last_ut; + last_ut = now_ut; + + if(r_tmp != r) { + // copy back whatever got updated from the temporary r + + // the query updates RRDR_DIMENSION_NONZERO + qm->status = r_tmp->od[dim_in_rrdr_tmp]; + + // the query updates these + r->view.min = r_tmp->view.min; + r->view.max = r_tmp->view.max; + r->view.after = r_tmp->view.after; + r->view.before = r_tmp->view.before; + r->rows = r_tmp->rows; + + rrd2rrdr_group_by_add_metric(r, qm->grouped_as.first_slot, r_tmp, dim_in_rrdr_tmp, + qt->request.group_by[0].aggregation, &qm->query_points, 0); + } + + rrd2rrdr_query_ops_release(ops[d]); // reuse this ops allocation + ops[d] = NULL; + + qi->metrics.queried++; + qc->metrics.queried++; + qn->metrics.queried++; + + qd->status |= QUERY_STATUS_QUERIED; + qm->status |= RRDR_DIMENSION_QUERIED; + + if(qt->request.version >= 2) { + // we need to make the query points positive now + // since we will aggregate it across multiple dimensions + storage_point_make_positive(qm->query_points); + storage_point_merge_to(qi->query_points, qm->query_points); + storage_point_merge_to(qc->query_points, qm->query_points); + storage_point_merge_to(qn->query_points, qm->query_points); + storage_point_merge_to(qt->query_points, qm->query_points); + } } - else + else { + qi->metrics.failed++; + qc->metrics.failed++; + qn->metrics.failed++; + + qd->status |= QUERY_STATUS_FAILED; + qm->status |= RRDR_DIMENSION_FAILED; + continue; + } global_statistics_rrdr_query_completed( 1, - r->internal.db_points_read - last_db_points_read, - r->internal.result_points_generated - last_result_points_generated, + r_tmp->stats.db_points_read - last_db_points_read, + r_tmp->stats.result_points_generated - last_result_points_generated, qt->request.query_source); - last_db_points_read = r->internal.db_points_read; - last_result_points_generated = r->internal.result_points_generated; - - if (qt->request.timeout) - now_realtime_timeval(&query_current_time); + last_db_points_read = r_tmp->stats.db_points_read; + last_result_points_generated = r_tmp->stats.result_points_generated; - if(r->od[c] & RRDR_DIMENSION_NONZERO) + if(qm->status & RRDR_DIMENSION_NONZERO) dimensions_nonzero++; // verify all dimensions are aligned if(unlikely(!dimensions_used)) { - min_before = r->before; - max_after = r->after; + min_before = r->view.before; + max_after = r->view.after; max_rows = r->rows; } else { - if(r->after != max_after) { + if(r->view.after != max_after) { internal_error(true, "QUERY: 'after' mismatch between dimensions for chart '%s': max is %zu, dimension '%s' has %zu", - string2str(qt->query.array[c].dimension.id), (size_t)max_after, string2str(qt->query.array[c].dimension.name), (size_t)r->after); + rrdinstance_acquired_id(qi->ria), (size_t)max_after, rrdmetric_acquired_id(qd->rma), (size_t)r->view.after); - r->after = (r->after > max_after) ? r->after : max_after; + r->view.after = (r->view.after > max_after) ? r->view.after : max_after; } - if(r->before != min_before) { + if(r->view.before != min_before) { internal_error(true, "QUERY: 'before' mismatch between dimensions for chart '%s': max is %zu, dimension '%s' has %zu", - string2str(qt->query.array[c].dimension.id), (size_t)min_before, string2str(qt->query.array[c].dimension.name), (size_t)r->before); + rrdinstance_acquired_id(qi->ria), (size_t)min_before, rrdmetric_acquired_id(qd->rma), (size_t)r->view.before); - r->before = (r->before < min_before) ? r->before : min_before; + r->view.before = (r->view.before < min_before) ? r->view.before : min_before; } if(r->rows != max_rows) { internal_error(true, "QUERY: 'rows' mismatch between dimensions for chart '%s': max is %zu, dimension '%s' has %zu", - string2str(qt->query.array[c].dimension.id), (size_t)max_rows, string2str(qt->query.array[c].dimension.name), (size_t)r->rows); + rrdinstance_acquired_id(qi->ria), (size_t)max_rows, rrdmetric_acquired_id(qd->rma), (size_t)r->rows); r->rows = (r->rows > max_rows) ? r->rows : max_rows; } } dimensions_used++; - if (qt->request.timeout && ((NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0) > (NETDATA_DOUBLE)qt->request.timeout) { + + bool cancel = false; + if (qt->request.interrupt_callback && qt->request.interrupt_callback(qt->request.interrupt_callback_data)) { + cancel = true; + log_access("QUERY INTERRUPTED"); + } + + if (qt->request.timeout_ms && ((NETDATA_DOUBLE)(now_ut - qt->timings.received_ut) / 1000.0) > (NETDATA_DOUBLE)qt->request.timeout_ms) { + cancel = true; log_access("QUERY CANCELED RUNTIME EXCEEDED %0.2f ms (LIMIT %lld ms)", - (NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0, (long long)qt->request.timeout); - r->result_options |= RRDR_RESULT_OPTION_CANCEL; + (NETDATA_DOUBLE)(now_ut - qt->timings.received_ut) / 1000.0, (long long)qt->request.timeout_ms); + } - for(size_t i = c + 1; i < queries_prepared ; i++) { - if(ops[i]) + if(cancel) { + r->view.flags |= RRDR_RESULT_FLAG_CANCEL; + + for(size_t i = d + 1; i < queries_prepared ; i++) { + if(ops[i]) { query_planer_finalize_remaining_plans(ops[i]); + rrd2rrdr_query_ops_release(ops[i]); + ops[i] = NULL; + } } break; } } + // free all resources used by the grouping method + r_tmp->time_grouping.free(r_tmp); + + // get the final RRDR to send to the caller + r = rrd2rrdr_group_by_finalize(r_tmp); + #ifdef NETDATA_INTERNAL_CHECKS - if (dimensions_used) { + if (dimensions_used && !(r->view.flags & RRDR_RESULT_FLAG_CANCEL)) { if(r->internal.log) - rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.time_group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, qt->request.after, qt->window.before, qt->request.before, qt->request.points, qt->window.points, /*after_slot, before_slot,*/ r->internal.log); if(r->rows != qt->window.points) - rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.time_group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, qt->request.after, qt->window.before, qt->request.before, qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "got 'points' is not wanted 'points'"); - if(qt->window.aligned && (r->before % (qt->window.group * qt->window.query_granularity)) != 0) - rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, - qt->window.after, qt->request.after, qt->window.before,qt->request.before, + if(qt->window.aligned && (r->view.before % query_view_update_every(qt)) != 0) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.time_group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + qt->window.after, qt->request.after, qt->window.before, qt->request.before, qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "'before' is not aligned but alignment is required"); @@ -2362,21 +3737,21 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { //if(qt->window.aligned && (r->after % group) != 0) // rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required"); - if(r->before != qt->window.before) - rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + if(r->view.before != qt->window.before) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.time_group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, qt->request.after, qt->window.before, qt->request.before, qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "chart is not aligned to requested 'before'"); - if(r->before != qt->window.before) - rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + if(r->view.before != qt->window.before) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.time_group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, qt->request.after, qt->window.before, qt->request.before, qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "got 'before' is not wanted 'before'"); // reported 'after' varies, depending on group - if(r->after != qt->window.after) - rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + if(r->view.after != qt->window.after) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.time_group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, qt->request.after, qt->window.before, qt->request.before, qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "got 'after' is not wanted 'after'"); @@ -2384,26 +3759,21 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { } #endif - // free all resources used by the grouping method - r->internal.grouping_free(r); + // free the query pipelining ops + for(size_t d = 0; d < qt->query.used ; d++) { + rrd2rrdr_query_ops_release(ops[d]); + ops[d] = NULL; + } + rrd2rrdr_query_ops_freeall(r); + internal_fatal(released_ops, "QUERY: released_ops should be NULL when the query ends"); - if(likely(dimensions_used)) { + onewayalloc_freez(owa, ops); + + if(likely(dimensions_used && (qt->window.options & RRDR_OPTION_NONZERO) && !dimensions_nonzero)) // when all the dimensions are zero, we should return all of them - if (unlikely((qt->window.options & RRDR_OPTION_NONZERO) && !dimensions_nonzero && - !(r->result_options & RRDR_RESULT_OPTION_CANCEL))) { - // all the dimensions are zero - // mark them as NONZERO to send them all - for (size_t c = 0, max = qt->query.used; c < max; c++) { - if (unlikely(r->od[c] & RRDR_DIMENSION_HIDDEN)) continue; - if (unlikely(!(r->od[c] & RRDR_DIMENSION_QUERIED))) continue; - r->od[c] |= RRDR_DIMENSION_NONZERO; - } - } + qt->window.options &= ~RRDR_OPTION_NONZERO; - return r; - } + qt->timings.executed_ut = now_monotonic_usec(); - // we couldn't query any dimension - rrdr_free(owa, r); - return NULL; + return r; } |