diff options
Diffstat (limited to 'fluent-bit/src/stream_processor/flb_sp_aggregate_func.c')
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_aggregate_func.c | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c b/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c new file mode 100644 index 000000000..624be878c --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c @@ -0,0 +1,364 @@ +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h> + +char aggregate_func_string[AGGREGATE_FUNCTIONS][sizeof("TIMESERIES_FORECAST") + 1] = { + "AVG", + "SUM", + "COUNT", + "MIN", + "MAX", + "TIMESERIES_FORECAST" +}; + +int aggregate_func_clone_nop(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + struct flb_sp_cmd_key *ckey, + int key_id) { + return 0; +} + +int aggregate_func_clone_timeseries_forecast(struct aggregate_node *aggr_node_clone, + struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id) { + struct timeseries_forecast *forecast_clone; + struct timeseries_forecast *forecast; + + forecast_clone = (struct timeseries_forecast *) aggr_node_clone->aggregate_data[key_id]; + if (!forecast_clone) { + forecast_clone = (struct timeseries_forecast *) flb_calloc(1, sizeof(struct timeseries_forecast)); + if (!forecast_clone) { + return -1; + } + + forecast_clone->future_time = ckey->constant; + aggr_node_clone->aggregate_data[key_id] = (struct aggregate_data *) forecast_clone; + } + + forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + + forecast_clone->sigma_x = forecast->sigma_x; + forecast_clone->sigma_y = forecast->sigma_y; + forecast_clone->sigma_xy = forecast->sigma_xy; + forecast_clone->sigma_x2 = forecast->sigma_x2; + + return 0; +} + +/* Summarize a value into the temporary array considering data type */ +void aggregate_func_add_sum(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + aggr_node->nums[key_id].i64 += ival; + aggr_node->nums[key_id].ops++; + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + if (dval != 0.0) { + aggr_node->nums[key_id].f64 += dval; + } + else { + aggr_node->nums[key_id].f64 += (double) ival; + } + aggr_node->nums[key_id].ops++; + } +} + +void aggregate_func_add_count(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { +} + +/* Calculate the minimum value considering data type */ +void aggregate_func_add_min(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { + + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].i64 > ival) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + } + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + if (dval != 0.0) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 > dval) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + } + } + else { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = (double) ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 > (double) ival) { + aggr_node->nums[key_id].f64 = ival; + aggr_node->nums[key_id].ops++; + } + } + } + } +} + +/* Calculate the maximum value considering data type */ +void aggregate_func_add_max(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].i64 < ival) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + } + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + if (dval != 0.0) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 < dval) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + } + } + else { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = (double) ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 < (double) ival) { + aggr_node->nums[key_id].f64 = (double) ival; + aggr_node->nums[key_id].ops++; + } + } + } + } +} + +void aggregate_func_calc_avg(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) { + double dval = 0.0; + /* average = sum(values) / records */ + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + dval = (double) aggr_node->nums[key_id].i64 / aggr_node->records; + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + dval = (double) aggr_node->nums[key_id].f64 / aggr_node->records; + } + + msgpack_pack_float(mp_pck, dval); +} + +void aggregate_func_calc_sum(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) { + /* pack result stored in nums[key_id] */ + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + msgpack_pack_int64(mp_pck, aggr_node->nums[key_id].i64); + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + msgpack_pack_float(mp_pck, aggr_node->nums[key_id].f64); + } +} + +void aggregate_func_calc_count(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) { + /* number of records in total */ + msgpack_pack_int64(mp_pck, aggr_node->records); +} + +void aggregate_func_remove_sum(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + int key_id) { + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + aggr_node->nums[key_id].i64 -= aggr_node_prev->nums[key_id].i64; + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + aggr_node->nums[key_id].f64 -= aggr_node_prev->nums[key_id].f64; + } +} + +void aggregate_func_remove_nop(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + int key_id) { +} + +void aggregate_func_add_timeseries_forecast(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) +{ + double x; + double y; + struct timeseries_forecast *forecast; + + forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + if (!forecast) { + forecast = (struct timeseries_forecast *) flb_calloc(1, sizeof(struct timeseries_forecast)); + /* fixme: return if error */ + + forecast->future_time = ckey->constant; + aggr_node->aggregate_data[key_id] = (struct aggregate_data *) forecast; + } + + if (!forecast->offset) { + forecast->offset = flb_time_to_double(tms); + } + + x = flb_time_to_double(tms) - forecast->offset; + + forecast->latest_x = x; + + if (ival) { + y = (double) ival; + } + else { + y = dval; + } + + forecast->sigma_x += x; + forecast->sigma_y += y; + + forecast->sigma_xy += x * y; + forecast->sigma_x2 += x * x; +} + +void aggregate_func_calc_timeseries_forecast(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) +{ + double mean_x; + double mean_y; + double var_x; + double cov_xy; + double result; + /* y = b0 + b1 * x */ + double b0; + double b1; + struct timeseries_forecast *forecast; + + forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + + mean_x = forecast->sigma_x / aggr_node->records; + mean_y = forecast->sigma_y / aggr_node->records; + cov_xy = (forecast->sigma_xy / (double) aggr_node->records) - mean_x * mean_y; + var_x = (forecast->sigma_x2 / aggr_node->records) - mean_x * mean_x; + + b1 = cov_xy / var_x; + b0 = mean_y - b1 * mean_x; + + result = b0 + b1 * (forecast->future_time + forecast->latest_x); + + msgpack_pack_float(mp_pck, result); +} + +void aggregate_func_remove_timeseries_forecast(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + int key_id) +{ + struct timeseries_forecast *forecast_w; + struct timeseries_forecast *forecast_h; + + forecast_w = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + forecast_h = (struct timeseries_forecast *) aggr_node_prev->aggregate_data[key_id]; + + forecast_w->sigma_x -= forecast_h->sigma_x; + forecast_w->sigma_y -= forecast_h->sigma_y; + forecast_w->sigma_xy -= forecast_h->sigma_xy; + forecast_w->sigma_x2 -= forecast_h->sigma_x2; +} + +void aggregate_func_destroy_sum(struct aggregate_node *aggr_node, + int key_id) +{ +} + +void aggregate_func_destroy_timeseries_forecast(struct aggregate_node *aggr_node, + int key_id) +{ + flb_free(aggr_node->aggregate_data[key_id]); +} + +aggregate_function_clone aggregate_func_clone[AGGREGATE_FUNCTIONS] = { + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_timeseries_forecast, +}; + +aggregate_function_add aggregate_func_add[AGGREGATE_FUNCTIONS] = { + aggregate_func_add_sum, + aggregate_func_add_sum, + aggregate_func_add_count, + aggregate_func_add_min, + aggregate_func_add_max, + aggregate_func_add_timeseries_forecast, +}; + +aggregate_function_calc aggregate_func_calc[AGGREGATE_FUNCTIONS] = { + aggregate_func_calc_avg, + aggregate_func_calc_sum, + aggregate_func_calc_count, + aggregate_func_calc_sum, + aggregate_func_calc_sum, + aggregate_func_calc_timeseries_forecast, +}; + +aggregate_function_remove aggregate_func_remove[AGGREGATE_FUNCTIONS] = { + aggregate_func_remove_sum, + aggregate_func_remove_sum, + aggregate_func_remove_nop, + aggregate_func_remove_nop, + aggregate_func_remove_nop, + aggregate_func_remove_timeseries_forecast, +}; + +aggregate_function_destroy aggregate_func_destroy[AGGREGATE_FUNCTIONS] = { + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_timeseries_forecast, +}; |