diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/stream_processor | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/stream_processor')
-rw-r--r-- | fluent-bit/src/stream_processor/CMakeLists.txt | 21 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/README.md | 84 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp.c | 2157 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_aggregate_func.c | 364 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_func_record.c | 77 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_func_time.c | 95 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_groupby.c | 82 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_key.c | 231 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_snapshot.c | 277 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_stream.c | 168 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/flb_sp_window.c | 122 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/parser/CMakeLists.txt | 31 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/parser/flb_sp_parser.c | 851 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/parser/sql.l | 190 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/parser/sql.y | 437 |
15 files changed, 5187 insertions, 0 deletions
diff --git a/fluent-bit/src/stream_processor/CMakeLists.txt b/fluent-bit/src/stream_processor/CMakeLists.txt new file mode 100644 index 00000000..de2c2fe3 --- /dev/null +++ b/fluent-bit/src/stream_processor/CMakeLists.txt @@ -0,0 +1,21 @@ +project(stream-processor C) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) + +add_subdirectory(parser) + +set(src + flb_sp.c + flb_sp_key.c + flb_sp_func_time.c + flb_sp_func_record.c + flb_sp_stream.c + flb_sp_snapshot.c + flb_sp_window.c + flb_sp_groupby.c + flb_sp_aggregate_func.c + ) + +add_library(flb-sp STATIC ${src}) +target_link_libraries(flb-sp rbtree) +target_link_libraries(flb-sp flb-sp-parser) diff --git a/fluent-bit/src/stream_processor/README.md b/fluent-bit/src/stream_processor/README.md new file mode 100644 index 00000000..d39a51ef --- /dev/null +++ b/fluent-bit/src/stream_processor/README.md @@ -0,0 +1,84 @@ +## SQL Statement Syntax + +The following is the SQL statement syntax supported by Fluent Bit stream processor in EBNF form. For readability, we assume the conventional definition for integer, float and string values. A single quote in a constant string literal has to be escaped with an extra one. For instance, the string representation of `O'Keefe` in the query will be `'O''Keefe'`. + +```xml +<sql_stmt> := <create> | <select> +<create> := CREATE STREAM <id> AS <select> | CREATE STREAM <id> WITH (<properties>) AS <select> +<properties> := <property> | <property>, <properties> +<property> := <id> = '<id>' +<select> := SELECT <keys> FROM <source> [WHERE <condition>] + [WINDOW TUMBLING (<integer> SECOND) | WINDOW HOPPING (<integer> SECOND, ADVANCE BY <integer> SECOND)] + [GROUP BY <record_keys>] +<keys> := '*' | <record_keys> +<record_keys> := <record_key> | <record_key>, <record_keys> +<record_key> := <exp> | <exp> AS <id> +<exp> := <key> | <fun> +<fun> := AVG(<key>) | SUM(<key>) | COUNT(<key>) | COUNT(*) | MIN(<key>) | MAX(<key>) | TIMESERIES_FORECAST(<key>, <integer>) +<source> := STREAM:<id> | TAG:<id> +<condition> := <key> | <value> | <key> <relation> <value> | (<condition>) + | NOT <condition> | <condition> AND <condition> | <condition> OR <condition> + | @record.contains(<key>) | <id> IS NULL | <id> IS NOT NULL +<key> := <id> | <id><subkey-idx> +<subkey-idx> := [<id>] | <subkey-idx>[<id>] +<relation> := = | != | <> | < | <= | > | >= +<id> := <letter> <characters> +<characters> := <letter> | <digit> | _ | <characters> <characters> +<value> := true | false | <integer> | <float> | '<string>' +``` + +In addition to the common aggregation functions, Stream Processor provides the timeseries function `TIMESERIES_FORECAST`, which uses [simple linear regression algorithm](<https://en.wikipedia.org/wiki/Simple_linear_regression) to predict the value of a (dependent) variable in future. + +### Timeseries Functions + +| name | description | +| ------------------------- | ------------------------------------------------------ | +| TIMESERIES_FORECAST(x, t) | forecasts the value of x at current time + t seconds | + +### Time Functions + +| name | description | example | +| ---------------- | ------------------------------------------------- | ------------------- | +| NOW() | adds system time using format: %Y-%m-%d %H:%M:%S | 2019-03-09 21:36:05 | +| UNIX_TIMESTAMP() | add current Unix timestamp | 1552196165 | + +### Record Functions + +| name | description | example | +| ------------- | ------------------------------------------------------------ | ----------------- | +| RECORD_TAG() | append Tag string associated to the record | samples | +| RECORD_TIME() | append record Timestamp in _double_ format: seconds.nanoseconds | 1552196165.705683 | + +## Type of windows + +FluentBit stream processor has implemented two time-based windows: hopping window and tumbling window. + +### Hopping window + +In hopping window (also known as sliding window), records are stored in a time window of the interval in seconds defined as the parameter. The `ADVANCE BY` parameter determines the time the window slides forward. Aggregation functions are computed over the records inside a window, and reported right before window moves. + +For example. the hopping window `WINDOW HOPPING (10 SECOND, ADVANCE BY 2 SECOND)` behaves like this: + +``` +[ x x x x x ... x x x x x ] +<--------- 10 sec --------> + [ x x x x x ... x x x x x ] +<- 2 sec -><--------- 10 sec --------> + [ x x x x x ... x x x x x ] + <- 2 sec -><--------- 10 sec --------> +``` + +### Tumbling window + +A tumbling window is similar to a hopping window where `ADVANCE BY` value is the same as the window size. That means the new window doesn't include any record from the previous one. + +For example. the tumbling window `WINDOW TUMBLING (10 SECOND)` works like this: + +``` +[ x x x x x ... x x x x x ] +<--------- 10 sec --------> + [ x x x x x ... x x x x x ] + <--------- 10 sec --------> + [ x x x x x ... x x x x x ] + <--------- 10 sec --------> +``` diff --git a/fluent-bit/src/stream_processor/flb_sp.c b/fluent-bit/src/stream_processor/flb_sp.c new file mode 100644 index 00000000..00eb2f18 --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp.c @@ -0,0 +1,2157 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_router.h> +#include <fluent-bit/flb_config_format.h> +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_key.h> +#include <fluent-bit/stream_processor/flb_sp_stream.h> +#include <fluent-bit/stream_processor/flb_sp_snapshot.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_func_time.h> +#include <fluent-bit/stream_processor/flb_sp_func_record.h> +#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h> +#include <fluent-bit/stream_processor/flb_sp_window.h> +#include <fluent-bit/stream_processor/flb_sp_groupby.h> + +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#ifndef _WIN32 +#include <unistd.h> +#endif + +/* don't do this at home */ +#define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d) +#define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d) + +/* String type to numerical conversion */ +#define FLB_STR_INT 1 +#define FLB_STR_FLOAT 2 + +/* Read and process file system configuration file */ +static int sp_config_file(struct flb_config *config, struct flb_sp *sp, + const char *file) +{ + int ret; + flb_sds_t name; + flb_sds_t exec; + char *cfg = NULL; + char tmp[PATH_MAX + 1]; + struct stat st; + struct mk_list *head; + struct flb_sp_task *task; + struct flb_cf *cf; + struct flb_cf_section *section; + +#ifndef FLB_HAVE_STATIC_CONF + ret = stat(file, &st); + if (ret == -1 && errno == ENOENT) { + /* Try to resolve the real path (if exists) */ + if (file[0] == '/') { + flb_error("[sp] cannot open configuration file: %s", file); + return -1; + } + + if (config->conf_path) { + snprintf(tmp, PATH_MAX, "%s%s", config->conf_path, file); + cfg = tmp; + } + } + else { + cfg = (char *) file; + } + + cf = flb_cf_create_from_file(NULL, cfg); +#else + cf = flb_config_static_open(file); +#endif + + if (!cf) { + return -1; + } + + /* Read all 'stream_task' sections */ + mk_list_foreach(head, &cf->sections) { + section = mk_list_entry(head, struct flb_cf_section, _head); + if (strcasecmp(section->name, "stream_task") != 0) { + continue; + } + + name = NULL; + exec = NULL; + + /* name */ + name = flb_cf_section_property_get_string(cf, section, "name"); + if (!name) { + flb_error("[sp] task 'name' not found in file '%s'", cfg); + goto fconf_error; + } + + /* exec */ + exec = flb_cf_section_property_get_string(cf, section, "exec"); + if (!exec) { + flb_error("[sp] task '%s' don't have an 'exec' command", name); + goto fconf_error; + } + + /* Register the task */ + task = flb_sp_task_create(sp, name, exec); + if (!task) { + goto fconf_error; + } + flb_sds_destroy(name); + flb_sds_destroy(exec); + name = NULL; + exec = NULL; + } + + flb_cf_destroy(cf); + return 0; + +fconf_error: + if (name) { + flb_sds_destroy(name); + } + if (exec) { + flb_sds_destroy(exec); + } + flb_cf_destroy(cf); + return -1; +} + +static int sp_task_to_instance(struct flb_sp_task *task, struct flb_sp *sp) +{ + struct mk_list *head; + struct flb_input_instance *in; + + if (task->cmd->source_type != FLB_SP_STREAM) { + return -1; + } + + mk_list_foreach(head, &sp->config->inputs) { + in = mk_list_entry(head, struct flb_input_instance, _head); + if (in->alias) { + if (strcasecmp(in->alias, task->cmd->source_name) == 0) { + task->source_instance = in; + return 0; + } + } + + if (strcasecmp(in->name, task->cmd->source_name) == 0) { + task->source_instance = in; + return 0; + } + } + + return -1; +} + +static void sp_info(struct flb_sp *sp) +{ + struct mk_list *head; + struct flb_sp_task *task; + + flb_info("[sp] stream processor started"); + + mk_list_foreach(head, &sp->tasks) { + task = mk_list_entry(head, struct flb_sp_task, _head); + flb_info("[sp] registered task: %s", task->name); + } +} + +int subkeys_compare(struct mk_list *subkeys1, struct mk_list *subkeys2) +{ + int i; + struct flb_slist_entry *entry1; + struct flb_slist_entry *entry2; + + if (!subkeys1 && !subkeys2) { + return 0; + } + + if (!subkeys1 || !subkeys2) { + return -1; + } + + if (mk_list_size(subkeys1) != mk_list_size(subkeys2)) { + return -1; + } + + entry1 = mk_list_entry_first(subkeys1, struct flb_slist_entry, _head); + entry2 = mk_list_entry_first(subkeys2, struct flb_slist_entry, _head); + + for (i = 0; i < mk_list_size(subkeys1); i++) { + if (flb_sds_cmp(entry1->str, entry2->str, flb_sds_len(entry2->str)) != 0) { + return -1; + } + + entry1 = mk_list_entry_next(&entry1->_head, struct flb_slist_entry, + _head, subkeys1); + entry2 = mk_list_entry_next(&entry2->_head, struct flb_slist_entry, + _head, subkeys2); + } + + return 0; +} + +static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd) +{ + int aggr = 0; + int not_aggr = 0; + struct mk_list *head; + struct mk_list *head_gb; + struct flb_sp_cmd_key *key; + struct flb_sp_cmd_gb_key *gb_key; + + mk_list_foreach(head, &cmd->keys) { + key = mk_list_entry(head, struct flb_sp_cmd_key, _head); + if (key->time_func > 0 || key->record_func > 0) { + continue; + } + + if (key->aggr_func > 0) { + /* AVG, SUM, COUNT or timeseries functions */ + aggr++; + } + else { + mk_list_foreach(head_gb, &cmd->gb_keys) { + gb_key = mk_list_entry(head_gb, struct flb_sp_cmd_gb_key, _head); + + if (!key->name) { /* Key name is a wildcard '*' */ + break; + } + + if (flb_sds_cmp(key->name, gb_key->name, + flb_sds_len(gb_key->name)) == 0) { + if (subkeys_compare(key->subkeys, gb_key->subkeys) != 0) { + continue; + } + + not_aggr--; + + /* Map key selector with group-by */ + key->gb_key = gb_key; + break; + } + } + + not_aggr++; + } + } + + /* + * if some aggregated function is required, not aggregated keys are + * not allowed so we return an error (-1). + */ + if (aggr > 0 && not_aggr == 0) { + return aggr; + } + else if (aggr > 0 && not_aggr > 0) { + return -1; + } + + return 0; +} + +/* + * Convert a string to a numerical representation: + * + * - if output number is an integer, 'i' is set and returns FLB_STR_INT + * - if output number is a float, 'd' is set and returns FLB_STR_FLOAT + * - if no conversion is possible (not a number), returns -1 + */ +static int string_to_number(const char *str, int len, int64_t *i, double *d) +{ + int c; + int dots = 0; + char *end; + int64_t i_out; + double d_out; + + /* Detect if this is a floating point number */ + for (c = 0; c < len; c++) { + if (str[c] == '.') { + dots++; + } + } + + if (dots > 1) { + return -1; + } + else if (dots == 1) { + /* Floating point number */ + errno = 0; + d_out = strtold(str, &end); + + /* Check for various possible errors */ + if ((errno == ERANGE || (errno != 0 && d_out == 0))) { + return -1; + } + + if (end == str) { + return -1; + } + + *d = d_out; + return FLB_STR_FLOAT; + } + else { + /* Integer */ + errno = 0; + i_out = strtoll(str, &end, 10); + + /* Check for various possible errors */ + if ((errno == ERANGE || (errno != 0 && i_out == 0))) { + return -1; + } + + if (end == str) { + return -1; + } + + *i = i_out; + return FLB_STR_INT; + } + + return -1; +} + +/* + * Convert a msgpack object value to a number 'if possible'. The conversion + * result is either stored on 'i' for 64 bits integers or in 'd' for + * float/doubles. + * + * This function aims to take care of strings representing a value too. + */ +static int object_to_number(msgpack_object obj, int64_t *i, double *d, + int convert_str_to_num) +{ + int ret; + int64_t i_out; + double d_out; + char str_num[20]; + + if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER || + obj.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { + *i = obj.via.i64; + return FLB_STR_INT; + } + else if (obj.type == MSGPACK_OBJECT_FLOAT32 || + obj.type == MSGPACK_OBJECT_FLOAT) { + *d = obj.via.f64; + return FLB_STR_FLOAT; + } + else if (obj.type == MSGPACK_OBJECT_STR && convert_str_to_num == FLB_TRUE) { + /* A numeric representation of a string should not exceed 19 chars */ + if (obj.via.str.size > 19) { + return -1; + } + + memcpy(str_num, obj.via.str.ptr, obj.via.str.size); + str_num[obj.via.str.size] = '\0'; + + ret = string_to_number(str_num, obj.via.str.size, + &i_out, &d_out); + if (ret == FLB_STR_FLOAT) { + *d = d_out; + return FLB_STR_FLOAT; + } + else if (ret == FLB_STR_INT) { + *i = i_out; + return FLB_STR_INT; + } + } + + return -1; +} + +int flb_sp_snapshot_create(struct flb_sp_task *task) +{ + struct flb_sp_cmd *cmd; + struct flb_sp_snapshot *snapshot; + + cmd = task->cmd; + + snapshot = (struct flb_sp_snapshot *) flb_calloc(1, sizeof(struct flb_sp_snapshot)); + if (!snapshot) { + flb_error("[sp] could not create snapshot '%s'", cmd->stream_name); + return -1; + } + + mk_list_init(&snapshot->pages); + snapshot->record_limit = cmd->limit; + + if (flb_sp_cmd_stream_prop_get(cmd, "seconds") != NULL) { + snapshot->time_limit = atoi(flb_sp_cmd_stream_prop_get(cmd, "seconds")); + } + + if (snapshot->time_limit == 0 && snapshot->record_limit == 0) { + flb_error("[sp] could not create snapshot '%s': size is not defined", + cmd->stream_name); + flb_sp_snapshot_destroy(snapshot); + return -1; + } + + task->snapshot = snapshot; + return 0; +} + +struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, + const char *query) +{ + int fd; + int ret; + struct mk_event *event; + struct flb_sp_cmd *cmd; + struct flb_sp_task *task; + + /* + * Parse and validate the incoming exec query and create the 'command' + * context (this will be associated to the task in a later step + */ + cmd = flb_sp_cmd_create(query); + + if (!cmd) { + flb_error("[sp] invalid query on task '%s': '%s'", name, query); + return NULL; + } + + /* Check if we got an invalid type due an error/restriction */ + if (cmd->status == FLB_SP_ERROR) { + flb_error("[sp] invalid query on task '%s': '%s'", name, query); + flb_sp_cmd_destroy(cmd); + return NULL; + } + + /* Create the task context */ + task = flb_calloc(1, sizeof(struct flb_sp_task)); + if (!task) { + flb_errno(); + flb_sp_cmd_destroy(cmd); + return NULL; + } + task->name = flb_sds_create(name); + if (!task->name) { + flb_free(task); + flb_sp_cmd_destroy(cmd); + return NULL; + } + + task->query = flb_sds_create(query); + if (!task->query) { + flb_sds_destroy(task->name); + flb_free(task); + flb_sp_cmd_destroy(cmd); + return NULL; + } + + task->sp = sp; + task->cmd = cmd; + mk_list_add(&task->_head, &sp->tasks); + + /* + * Assume no aggregated keys exists, if so, a different strategy is + * required to process the records. + */ + task->aggregate_keys = FLB_FALSE; + + mk_list_init(&task->window.data); + mk_list_init(&task->window.aggregate_list); + rb_tree_new(&task->window.aggregate_tree, flb_sp_groupby_compare); + + mk_list_init(&task->window.hopping_slot); + + /* Check and validate aggregated keys */ + ret = sp_cmd_aggregated_keys(task->cmd); + if (ret == -1) { + flb_error("[sp] aggregated query cannot mix not aggregated keys: %s", + query); + flb_sp_task_destroy(task); + return NULL; + } + else if (ret > 0) { + task->aggregate_keys = FLB_TRUE; + + task->window.type = cmd->window.type; + + /* Register a timer event when task contains aggregation rules */ + if (task->window.type != FLB_SP_WINDOW_DEFAULT) { + /* Initialize event loop context */ + event = &task->window.event; + MK_EVENT_ZERO(event); + + /* Run every 'size' seconds */ + fd = mk_event_timeout_create(sp->config->evl, + cmd->window.size, (long) 0, + &task->window.event); + if (fd == -1) { + flb_error("[sp] registration for task %s failed", task->name); + flb_free(task); + return NULL; + } + task->window.fd = fd; + + if (task->window.type == FLB_SP_WINDOW_HOPPING) { + /* Initialize event loop context */ + event = &task->window.event_hop; + MK_EVENT_ZERO(event); + + /* Run every 'size' seconds */ + fd = mk_event_timeout_create(sp->config->evl, + cmd->window.advance_by, (long) 0, + &task->window.event_hop); + if (fd == -1) { + flb_error("[sp] registration for task %s failed", task->name); + flb_free(task); + return NULL; + } + task->window.advance_by = cmd->window.advance_by; + task->window.fd_hop = fd; + task->window.first_hop = true; + } + } + } + + /* Init snapshot page list */ + if (cmd->type == FLB_SP_CREATE_SNAPSHOT) { + if (flb_sp_snapshot_create(task) == -1) { + flb_sp_task_destroy(task); + return NULL; + } + } + + /* + * If the task involves a stream creation (CREATE STREAM abc..), create + * the stream. + */ + if (cmd->type == FLB_SP_CREATE_STREAM || + cmd->type == FLB_SP_CREATE_SNAPSHOT || + cmd->type == FLB_SP_FLUSH_SNAPSHOT) { + + ret = flb_sp_stream_create(cmd->stream_name, task, sp); + if (ret == -1) { + flb_error("[sp] could not create stream '%s'", cmd->stream_name); + flb_sp_task_destroy(task); + return NULL; + } + } + + /* + * Based in the command type, check if the source of data is a known + * stream so make a reference on this task for a quick comparisson and + * access it when processing data. + */ + sp_task_to_instance(task, sp); + return task; +} + +void groupby_nums_destroy(struct aggregate_num *groupby_nums, int size) +{ + int i; + + for (i = 0; i < size; i++) { + if (groupby_nums[i].type == FLB_SP_STRING) { + flb_sds_destroy(groupby_nums[i].string); + } + } + + flb_free(groupby_nums); +} + +/* + * Destroy aggregation node context: before to use this function make sure + * to unlink from the linked list. + */ +void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd, + struct aggregate_node *aggr_node) +{ + int i; + int key_id; + struct mk_list *head; + struct aggregate_num *num; + struct flb_sp_cmd_key *ckey; + + for (i = 0; i < aggr_node->nums_size; i++) { + num = &aggr_node->nums[i]; + if (num->type == FLB_SP_STRING) { + flb_sds_destroy(num->string); + } + } + + groupby_nums_destroy(aggr_node->groupby_nums, aggr_node->groupby_keys); + + key_id = 0; + mk_list_foreach(head, &cmd->keys) { + ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head); + + if (!ckey->aggr_func) { + key_id++; + continue; + } + + aggregate_func_destroy[ckey->aggr_func - 1](aggr_node, key_id); + key_id++; + } + + flb_free(aggr_node->nums); + flb_free(aggr_node->aggregate_data); + flb_free(aggr_node); +} + +void flb_sp_window_destroy(struct flb_sp_cmd *cmd, + struct flb_sp_task_window *window) +{ + struct flb_sp_window_data *data; + struct aggregate_node *aggr_node; + struct flb_sp_hopping_slot *hs; + struct mk_list *head; + struct mk_list *tmp; + struct mk_list *head_hs; + struct mk_list *tmp_hs; + + mk_list_foreach_safe(head, tmp, &window->data) { + data = mk_list_entry(head, struct flb_sp_window_data, _head); + flb_free(data->buf_data); + mk_list_del(&data->_head); + flb_free(data); + } + + mk_list_foreach_safe(head, tmp, &window->aggregate_list) { + aggr_node = mk_list_entry(head, struct aggregate_node, _head); + mk_list_del(&aggr_node->_head); + flb_sp_aggregate_node_destroy(cmd, aggr_node); + } + + mk_list_foreach_safe(head, tmp, &window->hopping_slot) { + hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head); + mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) { + aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head); + mk_list_del(&aggr_node->_head); + flb_sp_aggregate_node_destroy(cmd, aggr_node); + } + rb_tree_destroy(&hs->aggregate_tree); + flb_free(hs); + } + + rb_tree_destroy(&window->aggregate_tree); +} + +void flb_sp_task_destroy(struct flb_sp_task *task) +{ + flb_sds_destroy(task->name); + flb_sds_destroy(task->query); + flb_sp_window_destroy(task->cmd, &task->window); + flb_sp_snapshot_destroy(task->snapshot); + mk_list_del(&task->_head); + + if (task->stream) { + flb_sp_stream_destroy(task->stream, task->sp); + } + + flb_sp_cmd_destroy(task->cmd); + flb_free(task); +} + +/* Create the stream processor context */ +struct flb_sp *flb_sp_create(struct flb_config *config) +{ + int i = 0; + int ret; + char buf[32]; + struct mk_list *head; + struct flb_sp *sp; + struct flb_slist_entry *e; + struct flb_sp_task *task; + + /* Allocate context */ + sp = flb_malloc(sizeof(struct flb_sp)); + if (!sp) { + flb_errno(); + return NULL; + } + sp->config = config; + mk_list_init(&sp->tasks); + + /* Check for pre-configured Tasks (command line) */ + mk_list_foreach(head, &config->stream_processor_tasks) { + e = mk_list_entry(head, struct flb_slist_entry, _head); + snprintf(buf, sizeof(buf) - 1, "flb-console:%i", i); + i++; + task = flb_sp_task_create(sp, buf, e->str); + if (!task) { + continue; + } + } + + /* Lookup configuration file if any */ + if (config->stream_processor_file) { + ret = sp_config_file(config, sp, config->stream_processor_file); + if (ret == -1) { + flb_error("[sp] could not initialize stream processor"); + flb_sp_destroy(sp); + return NULL; + } + } + + /* Write sp info to stdout */ + sp_info(sp); + + return sp; +} + +void free_value(struct flb_exp_val *v) +{ + if (!v) { + return; + } + + if (v->type == FLB_EXP_STRING) { + flb_sds_destroy(v->val.string); + } + + flb_free(v); +} + +static void itof_convert(struct flb_exp_val *val) +{ + if (val->type != FLB_EXP_INT) { + return; + } + + val->type = FLB_EXP_FLOAT; + val->val.f64 = (double) val->val.i64; +} + +/* Convert (string) expression to number */ +static void exp_string_to_number(struct flb_exp_val *val) +{ + int ret; + int len; + int64_t i = 0; + char *str; + double d = 0.0; + + len = flb_sds_len(val->val.string); + str = val->val.string; + + ret = string_to_number(str, len, &i, &d); + if (ret == -1) { + return; + } + + /* Assign to proper type */ + if (ret == FLB_STR_FLOAT) { + flb_sds_destroy(val->val.string); + val->type = FLB_EXP_FLOAT; + val->val.f64 = d; + } + else if (ret == FLB_STR_INT) { + flb_sds_destroy(val->val.string); + val->type = FLB_EXP_INT; + val->val.i64 = i; + } +} + +static void numerical_comp(struct flb_exp_val *left, + struct flb_exp_val *right, + struct flb_exp_val *result, int op) +{ + result->type = FLB_EXP_BOOL; + + if (left == NULL || right == NULL) { + result->val.boolean = false; + return; + } + + /* Check if left expression value is a number, if so, convert it */ + if (left->type == FLB_EXP_STRING && right->type != FLB_EXP_STRING) { + exp_string_to_number(left); + } + + if (left->type == FLB_EXP_INT && right->type == FLB_EXP_FLOAT) { + itof_convert(left); + } + else if (left->type == FLB_EXP_FLOAT && right->type == FLB_EXP_INT) { + itof_convert(right); + } + + switch (op) { + case FLB_EXP_EQ: + if (left->type == right->type) { + switch(left->type) { + case FLB_EXP_NULL: + result->val.boolean = true; + break; + case FLB_EXP_BOOL: + result->val.boolean = (left->val.boolean == right->val.boolean); + break; + case FLB_EXP_INT: + result->val.boolean = (left->val.i64 == right->val.i64); + break; + case FLB_EXP_FLOAT: + result->val.boolean = (left->val.f64 == right->val.f64); + break; + case FLB_EXP_STRING: + if (flb_sds_len(left->val.string) != + flb_sds_len(right->val.string)) { + result->val.boolean = false; + } + else if (strncmp(left->val.string, right->val.string, + flb_sds_len(left->val.string)) != 0) { + result->val.boolean = false; + } + else { + result->val.boolean = true; + } + break; + default: + result->val.boolean = false; + break; + } + } + else { + result->val.boolean = false; + } + break; + case FLB_EXP_LT: + if (left->type == right->type) { + switch(left->type) { + case FLB_EXP_INT: + result->val.boolean = (left->val.i64 < right->val.i64); + break; + case FLB_EXP_FLOAT: + result->val.boolean = (left->val.f64 < right->val.f64); + break; + case FLB_EXP_STRING: + if (strncmp(left->val.string, right->val.string, + flb_sds_len(left->val.string)) < 0) { + result->val.boolean = true; + } + else { + result->val.boolean = false; + } + break; + default: + result->val.boolean = false; + break; + } + } + else { + result->val.boolean = false; + } + break; + case FLB_EXP_LTE: + if (left->type == right->type) { + switch(left->type) { + case FLB_EXP_INT: + result->val.boolean = (left->val.i64 <= right->val.i64); + break; + case FLB_EXP_FLOAT: + result->val.boolean = (left->val.f64 <= right->val.f64); + break; + case FLB_EXP_STRING: + if (strncmp(left->val.string, right->val.string, + flb_sds_len(left->val.string)) <= 0) { + result->val.boolean = true; + } + else { + result->val.boolean = false; + } + break; + default: + result->val.boolean = false; + break; + } + } + else { + result->val.boolean = false; + } + break; + case FLB_EXP_GT: + if (left->type == right->type) { + switch(left->type) { + case FLB_EXP_INT: + result->val.boolean = (left->val.i64 > right->val.i64); + break; + case FLB_EXP_FLOAT: + result->val.boolean = (left->val.f64 > right->val.f64); + break; + case FLB_EXP_STRING: + if (strncmp(left->val.string, right->val.string, + flb_sds_len(left->val.string)) > 0) { + result->val.boolean = true; + } + else { + result->val.boolean = false; + } + break; + default: + result->val.boolean = false; + break; + } + } + else { + result->val.boolean = false; + } + break; + case FLB_EXP_GTE: + if (left->type == right->type) { + switch(left->type) { + case FLB_EXP_INT: + result->val.boolean = (left->val.i64 >= right->val.i64); + break; + case FLB_EXP_FLOAT: + result->val.boolean = (left->val.f64 >= right->val.f64); + break; + case FLB_EXP_STRING: + if (strncmp(left->val.string, right->val.string, + flb_sds_len(left->val.string)) >= 0) { + result->val.boolean = true; + } + else { + result->val.boolean = false; + } + break; + default: + result->val.boolean = false; + break; + } + } + else { + result->val.boolean = false; + } + break; + } +} + +static bool value_to_bool(struct flb_exp_val *val) { + bool result = FLB_FALSE; + + switch (val->type) { + case FLB_EXP_BOOL: + result = val->val.boolean; + break; + case FLB_EXP_INT: + result = val->val.i64 > 0; + break; + case FLB_EXP_FLOAT: + result = val->val.f64 > 0; + break; + case FLB_EXP_STRING: + result = true; + break; + } + + return result; +} + + +static void logical_operation(struct flb_exp_val *left, + struct flb_exp_val *right, + struct flb_exp_val *result, int op) +{ + bool lval; + bool rval; + + result->type = FLB_EXP_BOOL; + + /* Null is always interpreted as false in a logical operation */ + lval = left ? value_to_bool(left) : false; + rval = right ? value_to_bool(right) : false; + + switch (op) { + case FLB_EXP_NOT: + result->val.boolean = !lval; + break; + case FLB_EXP_AND: + result->val.boolean = lval & rval; + break; + case FLB_EXP_OR: + result->val.boolean = lval | rval; + break; + } +} + +static struct flb_exp_val *reduce_expression(struct flb_exp *expression, + const char *tag, int tag_len, + struct flb_time *tms, + msgpack_object *map) +{ + int operation; + flb_sds_t s; + flb_sds_t tmp_sds = NULL; + struct flb_exp_key *key; + struct flb_sp_value *sval; + struct flb_exp_val *ret, *left, *right; + struct flb_exp_val *result; + + if (!expression) { + return NULL; + } + + result = flb_calloc(1, sizeof(struct flb_exp_val)); + if (!result) { + flb_errno(); + return NULL; + } + + switch (expression->type) { + case FLB_EXP_NULL: + result->type = expression->type; + break; + case FLB_EXP_BOOL: + result->type = expression->type; + result->val.boolean = ((struct flb_exp_val *) expression)->val.boolean; + break; + case FLB_EXP_INT: + result->type = expression->type; + result->val.i64 = ((struct flb_exp_val *) expression)->val.i64; + break; + case FLB_EXP_FLOAT: + result->type = expression->type; + result->val.f64 = ((struct flb_exp_val *) expression)->val.f64; + break; + case FLB_EXP_STRING: + s = ((struct flb_exp_val *) expression)->val.string; + result->type = expression->type; + result->val.string = flb_sds_create_size(flb_sds_len(s)); + tmp_sds = flb_sds_copy(result->val.string, s, flb_sds_len(s)); + if (tmp_sds != result->val.string) { + result->val.string = tmp_sds; + } + break; + case FLB_EXP_KEY: + key = (struct flb_exp_key *) expression; + sval = flb_sp_key_to_value(key->name, *map, key->subkeys); + if (sval) { + result->type = sval->type; + result->val = sval->val; + flb_free(sval); + return result; + } + else { + flb_free(result); + return NULL; + } + break; + case FLB_EXP_FUNC: + /* we don't need result */ + flb_free(result); + ret = reduce_expression(((struct flb_exp_func *) expression)->param, + tag, tag_len, tms, map); + result = ((struct flb_exp_func *) expression)->cb_func(tag, tag_len, + tms, ret); + free_value(ret); + break; + case FLB_LOGICAL_OP: + left = reduce_expression(expression->left, + tag, tag_len, tms, map); + right = reduce_expression(expression->right, + tag, tag_len, tms, map); + + operation = ((struct flb_exp_op *) expression)->operation; + + switch (operation) { + case FLB_EXP_PAR: + if (left == NULL) { /* Null is always interpreted as false in a + logical operation */ + result->type = FLB_EXP_BOOL; + result->val.boolean = false; + } + else { /* Left and right sides of a logical operation reduce to + boolean values */ + result->type = FLB_EXP_BOOL; + result->val.boolean = left->val.boolean; + } + break; + case FLB_EXP_EQ: + case FLB_EXP_LT: + case FLB_EXP_LTE: + case FLB_EXP_GT: + case FLB_EXP_GTE: + numerical_comp(left, right, result, operation); + break; + case FLB_EXP_NOT: + case FLB_EXP_AND: + case FLB_EXP_OR: + logical_operation(left, right, result, operation); + break; + } + free_value(left); + free_value(right); + } + return result; +} + + +void package_results(const char *tag, int tag_len, + char **out_buf, size_t *out_size, + struct flb_sp_task *task) +{ + int i; + int len; + int map_entries; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + struct aggregate_num *num; + struct flb_time tm; + struct flb_sp_cmd_key *ckey; + struct flb_sp_cmd *cmd = task->cmd; + struct mk_list *head; + struct aggregate_node *aggr_node; + struct flb_sp_cmd_gb_key *gb_key = NULL; + + map_entries = mk_list_size(&cmd->keys); + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + mk_list_foreach(head, &task->window.aggregate_list) { + aggr_node = mk_list_entry(head, struct aggregate_node, _head); + + /* set outgoing array + map and it fixed size */ + msgpack_pack_array(&mp_pck, 2); + + flb_time_get(&tm); + flb_time_append_to_msgpack(&tm, &mp_pck, 0); + msgpack_pack_map(&mp_pck, map_entries); + + /* Packaging results */ + ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key, _head); + for (i = 0; i < map_entries; i++) { + num = &aggr_node->nums[i]; + + /* Check if there is a defined function */ + if (ckey->time_func > 0) { + flb_sp_func_time(&mp_pck, ckey); + goto next; + } + else if (ckey->record_func > 0) { + flb_sp_func_record(tag, tag_len, &tm, &mp_pck, ckey); + goto next; + } + + /* Pack key */ + if (ckey->alias) { + msgpack_pack_str(&mp_pck, flb_sds_len(ckey->alias)); + msgpack_pack_str_body(&mp_pck, + ckey->alias, + flb_sds_len(ckey->alias)); + } + else { + len = 0; + char *c_name; + if (!ckey->name) { + c_name = "*"; + } + else { + c_name = ckey->name; + } + + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, c_name, len); + } + + /* + * If a group_by key is mapped as a source of this key, + * change the 'num' reference to obtain the proper information + * for the grouped key value. + */ + if (ckey->gb_key != NULL) { + gb_key = ckey->gb_key; + if (aggr_node->groupby_keys > 0) { + num = &aggr_node->groupby_nums[gb_key->id]; + } + } + + /* Pack value */ + switch (ckey->aggr_func) { + case FLB_SP_NOP: + if (num->type == FLB_SP_NUM_I64) { + msgpack_pack_int64(&mp_pck, num->i64); + } + else if (num->type == FLB_SP_NUM_F64) { + msgpack_pack_float(&mp_pck, num->f64); + } + else if (num->type == FLB_SP_STRING) { + msgpack_pack_str(&mp_pck, + flb_sds_len(num->string)); + msgpack_pack_str_body(&mp_pck, + num->string, + flb_sds_len(num->string)); + } + else if (num->type == FLB_SP_BOOLEAN) { + if (num->boolean) { + msgpack_pack_true(&mp_pck); + } + else { + msgpack_pack_false(&mp_pck); + } + } + break; + default: + aggregate_func_calc[ckey->aggr_func - 1](aggr_node, ckey, &mp_pck, i); + break; + } + +next: + ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key, + _head, &cmd->keys); + } + } + + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; +} + +static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *task, + msgpack_object map, + int convert_str_to_num) +{ + int i; + int ret; + int map_size; + int key_id; + int map_entries; + int gb_entries; + int values_found; + int64_t ival; + double dval; + struct flb_sp_value *sval; + struct aggregate_num *gb_nums; + struct aggregate_node *aggr_node; + struct flb_sp_cmd *cmd; + struct flb_sp_cmd_gb_key *gb_key; + struct mk_list *head; + struct rb_tree_node *rb_result; + msgpack_object key; + + aggr_node = NULL; + cmd = task->cmd; + map_size = map.via.map.size; + values_found = 0; + + /* Number of expected output entries in the map */ + map_entries = mk_list_size(&cmd->keys); + gb_entries = mk_list_size(&cmd->gb_keys); + + if (gb_entries > 0) { + gb_nums = flb_calloc(1, sizeof(struct aggregate_num) * gb_entries); + if (!gb_nums) { + return NULL; + } + + /* extract GROUP BY values */ + for (i = 0; i < map_size; i++) { /* extract group-by values */ + key = map.via.map.ptr[i].key; + + key_id = 0; + mk_list_foreach(head, &cmd->gb_keys) { + gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key, + _head); + if (flb_sds_cmp(gb_key->name, key.via.str.ptr, + key.via.str.size) != 0) { + key_id++; + continue; + } + + sval = flb_sp_key_to_value(gb_key->name, map, gb_key->subkeys); + if (!sval) { + /* If evaluation fails/sub-key doesn't exist */ + key_id++; + continue; + } + + values_found++; + + /* Convert string to number if that is possible */ + ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num); + if (ret == -1) { + if (sval->o.type == MSGPACK_OBJECT_STR) { + gb_nums[key_id].type = FLB_SP_STRING; + gb_nums[key_id].string = + flb_sds_create_len(sval->o.via.str.ptr, + sval->o.via.str.size); + } + else if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) { + gb_nums[key_id].type = FLB_SP_NUM_I64; + gb_nums[key_id].i64 = sval->o.via.boolean; + } + } + else if (ret == FLB_STR_INT) { + gb_nums[key_id].type = FLB_SP_NUM_I64; + gb_nums[key_id].i64 = ival; + } + else if (ret == FLB_STR_FLOAT) { + gb_nums[key_id].type = FLB_SP_NUM_F64; + gb_nums[key_id].f64 = dval; + } + + key_id++; + flb_sp_key_value_destroy(sval); + } + } + + /* if some GROUP BY keys are not found in the record */ + if (values_found < gb_entries) { + groupby_nums_destroy(gb_nums, gb_entries); + return NULL; + } + + aggr_node = (struct aggregate_node *) flb_calloc(1, sizeof(struct aggregate_node)); + if (!aggr_node) { + flb_errno(); + groupby_nums_destroy(gb_nums, gb_entries); + return NULL; + } + + aggr_node->groupby_keys = gb_entries; + aggr_node->groupby_nums = gb_nums; + + rb_tree_find_or_insert(&task->window.aggregate_tree, aggr_node, &aggr_node->_rb_head, &rb_result); + if (&aggr_node->_rb_head != rb_result) { + /* We don't need aggr_node anymore */ + flb_sp_aggregate_node_destroy(cmd, aggr_node); + + aggr_node = container_of(rb_result, struct aggregate_node, _rb_head); + container_of(rb_result, struct aggregate_node, _rb_head)->records++; + } + else { + aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries); + if (!aggr_node->nums) { + flb_sp_aggregate_node_destroy(cmd, aggr_node); + return NULL; + } + aggr_node->records = 1; + aggr_node->nums_size = map_entries; + aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries); + mk_list_add(&aggr_node->_head, &task->window.aggregate_list); + } + } + else { /* If query doesn't have GROUP BY */ + if (!mk_list_size(&task->window.aggregate_list)) { + aggr_node = flb_calloc(1, sizeof(struct aggregate_node)); + if (!aggr_node) { + flb_errno(); + return NULL; + } + aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries); + if (!aggr_node->nums) { + flb_sp_aggregate_node_destroy(cmd, aggr_node); + return NULL; + } + + aggr_node->nums_size = map_entries; + aggr_node->records = 1; + aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries); + mk_list_add(&aggr_node->_head, &task->window.aggregate_list); + } + else { + aggr_node = mk_list_entry_first(&task->window.aggregate_list, struct aggregate_node, _head); + aggr_node->records++; + } + } + + return aggr_node; +} + +/* + * Process data, task and it defined command involves the call of aggregation + * functions (AVG, SUM, COUNT, MIN, MAX). + */ +int sp_process_data_aggr(const char *buf_data, size_t buf_size, + const char *tag, int tag_len, + struct flb_sp_task *task, + struct flb_sp *sp, + int convert_str_to_num) +{ + int i; + int ok; + int ret; + int map_size; + int key_id; + size_t off; + int64_t ival; + double dval; + msgpack_object root; + msgpack_object map; + msgpack_unpacked result; + msgpack_object key; + msgpack_object *obj; + struct aggregate_num *nums = NULL; + struct mk_list *head; + struct flb_time tms; + struct flb_sp_cmd *cmd = task->cmd; + struct flb_sp_cmd_key *ckey; + struct flb_sp_value *sval; + struct flb_exp_val *condition; + struct aggregate_node *aggr_node; + + /* Number of expected output entries in the map */ + off = 0; + + /* vars initialization */ + ok = MSGPACK_UNPACK_SUCCESS; + msgpack_unpacked_init(&result); + + /* Iterate incoming records */ + while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) { + root = result.data; + + /* extract timestamp */ + flb_time_pop_from_msgpack(&tms, &result, &obj); + + /* get the map data and it size (number of items) */ + map = root.via.array.ptr[1]; + map_size = map.via.map.size; + + /* Evaluate condition */ + if (cmd->condition) { + condition = reduce_expression(cmd->condition, + tag, tag_len, &tms, &map); + if (!condition) { + continue; + } + else if (!condition->val.boolean) { + flb_free(condition); + continue; + } + else { + flb_free(condition); + } + } + + aggr_node = sp_process_aggregate_data(task, map, convert_str_to_num); + if (!aggr_node) + { + continue; + } + + task->window.records++; + + nums = aggr_node->nums; + + /* Iterate each map key and see if it matches any command key */ + for (i = 0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + + if (key.type != MSGPACK_OBJECT_STR) { + continue; + } + + + /* + * Iterate each command key. Note that since the command key + * can have different aggregation functions to the same key + * we should compare all of them. + */ + key_id = 0; + mk_list_foreach(head, &cmd->keys) { + ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head); + + if (!ckey->name) { + key_id++; + continue; + } + + if (flb_sds_cmp(ckey->name, key.via.str.ptr, + key.via.str.size) != 0) { + key_id++; + continue; + } + + /* convert the value if it string */ + sval = flb_sp_key_to_value(ckey->name, map, ckey->subkeys); + if (!sval) { + key_id++; + continue; + } + + /* + * Convert value to a numeric representation only if key has an + * assigned aggregation function + */ + ival = 0; + dval = 0.0; + if (ckey->aggr_func != FLB_SP_NOP) { + ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num); + if (ret == -1) { + /* Value cannot be represented as a number */ + key_id++; + flb_sp_key_value_destroy(sval); + continue; + } + + /* + * If a floating pointer number exists, we use the same data + * type for the output. + */ + if (dval != 0.0 && nums[key_id].type == FLB_SP_NUM_I64) { + nums[key_id].type = FLB_SP_NUM_F64; + nums[key_id].f64 = (double) nums[key_id].i64; + } + + aggregate_func_add[ckey->aggr_func - 1](aggr_node, ckey, key_id, &tms, ival, dval); + } + else { + if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) { + nums[key_id].type = FLB_SP_BOOLEAN; + nums[key_id].boolean = sval->o.via.boolean; + } + if (sval->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER || + sval->o.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { + nums[key_id].type = FLB_SP_NUM_I64; + nums[key_id].i64 = sval->o.via.i64; + } + else if (sval->o.type == MSGPACK_OBJECT_FLOAT32 || + sval->o.type == MSGPACK_OBJECT_FLOAT) { + nums[key_id].type = FLB_SP_NUM_F64; + nums[key_id].f64 = sval->o.via.f64; + } + else if (sval->o.type == MSGPACK_OBJECT_STR) { + nums[key_id].type = FLB_SP_STRING; + if (nums[key_id].string == NULL) { + nums[key_id].string = + flb_sds_create_len(sval->o.via.str.ptr, + sval->o.via.str.size); + } + } + } + + key_id++; + flb_sp_key_value_destroy(sval); + } + } + } + + msgpack_unpacked_destroy(&result); + return task->window.records; +} + +/* + * Data processing (no aggregation functions) + */ +int sp_process_data(const char *tag, int tag_len, + const char *buf_data, size_t buf_size, + char **out_buf, size_t *out_size, + struct flb_sp_task *task, + struct flb_sp *sp) +{ + int i; + int ok; + int ret; + int map_size; + int map_entries; + int records; + uint8_t h; + off_t map_off; + off_t no_data; + size_t off; + size_t off_copy; + size_t snapshot_out_size; + char *tmp; + char *snapshot_out_buffer; + msgpack_object root; + msgpack_object *obj; + msgpack_object key; + msgpack_object val; + msgpack_unpacked result; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_object map; + struct flb_time tms; + struct mk_list *head; + struct flb_sp_cmd *cmd; + struct flb_sp_cmd_key *cmd_key; + struct flb_exp_val *condition; + struct flb_sp_value *sval; + + /* Vars initialization */ + off = 0; + off_copy = off; + records = 0; + cmd = task->cmd; + ok = MSGPACK_UNPACK_SUCCESS; + msgpack_unpacked_init(&result); + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + snapshot_out_size = 0; + snapshot_out_buffer = NULL; + + /* Iterate incoming records */ + while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) { + root = result.data; + + /* extract timestamp */ + flb_time_pop_from_msgpack(&tms, &result, &obj); + + /* Store the buffer if the stream is a snapshot */ + if (cmd->type == FLB_SP_CREATE_SNAPSHOT) { + flb_sp_snapshot_update(task, buf_data + off_copy, off - off_copy, &tms); + off_copy = off; + continue; + } + + /* get the map data and it size (number of items) */ + map = root.via.array.ptr[1]; + map_size = map.via.map.size; + + /* Evaluate condition */ + if (cmd->condition) { + condition = reduce_expression(cmd->condition, + tag, tag_len, &tms, &map); + if (!condition) { + continue; + } + else if (!condition->val.boolean) { + flb_free(condition); + continue; + } + else { + flb_free(condition); + } + } + + records++; + + /* Flush the snapshot if condition holds */ + if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) { + if (flb_sp_snapshot_flush(sp, task, &snapshot_out_buffer, + &snapshot_out_size) == -1) { + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + return -1; + } + continue; + } + + + /* + * If for some reason the Task keys did not insert any data, we will + * need to discard any changes and reset the buffer position, let's + * keep the memory size for that purpose. + */ + no_data = mp_sbuf.size; + + /* Pack main array */ + msgpack_pack_array(&mp_pck, 2); + msgpack_pack_object(&mp_pck, root.via.array.ptr[0]); + + /* + * Save the current size/position of the buffer since this is + * where the Map header will be stored. + */ + map_off = mp_sbuf.size; + + /* + * In the new record register the same number of items, if due to + * fields selection the number is lower, we perform an adjustment + */ + msgpack_pack_map(&mp_pck, map_size); + + /* Counter for new entries added to the outgoing map */ + map_entries = 0; + + /* Iterate key selection */ + mk_list_foreach(head, &cmd->keys) { + cmd_key = mk_list_entry(head, struct flb_sp_cmd_key, _head); + if (cmd_key->time_func > 0) { + /* Process time function */ + ret = flb_sp_func_time(&mp_pck, cmd_key); + if (ret > 0) { + map_entries += ret; + } + continue; + } + else if (cmd_key->record_func > 0) { + ret = flb_sp_func_record(tag, tag_len, &tms, &mp_pck, cmd_key); + if (ret > 0) { + map_entries += ret; + } + continue; + } + + /* Lookup selection key in the incoming map */ + for (i = 0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + + if (key.type != MSGPACK_OBJECT_STR) { + continue; + } + + /* Wildcard selection: * */ + if (cmd_key->name == NULL) { + msgpack_pack_object(&mp_pck, key); + msgpack_pack_object(&mp_pck, val); + map_entries++; + continue; + } + + /* Compare lengths */ + if (flb_sds_cmp(cmd_key->name, + key.via.str.ptr, key.via.str.size) != 0) { + continue; + } + + /* + * Package key name: + * + * Check if the command ask for an alias 'key AS abc' + */ + if (cmd_key->alias) { + msgpack_pack_str(&mp_pck, + flb_sds_len(cmd_key->alias)); + msgpack_pack_str_body(&mp_pck, + cmd_key->alias, + flb_sds_len(cmd_key->alias)); + } + else { + msgpack_pack_object(&mp_pck, key); + } + + /* Package value */ + sval = flb_sp_key_to_value(cmd_key->name, map, + cmd_key->subkeys); + if (sval) { + msgpack_pack_object(&mp_pck, sval->o); + flb_sp_key_value_destroy(sval); + } + + map_entries++; + } + } + + /* Final Map size adjustment */ + if (map_entries == 0) { + mp_sbuf.size = no_data; + } + else { + /* + * The fields were packed, now we need to adjust the map size + * to set the proper number of fields appended to the record. + */ + tmp = mp_sbuf.data + map_off; + h = tmp[0]; + if (h >> 4 == 0x8) { + *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) map_entries); + } + else if (h == 0xde) { + tmp++; + pack_uint16(tmp, map_entries); + } + else if (h == 0xdf) { + tmp++; + pack_uint32(tmp, map_entries); + } + } + } + + msgpack_unpacked_destroy(&result); + + if (records == 0) { + msgpack_sbuffer_destroy(&mp_sbuf); + return 0; + } + + /* Use snapshot out buffer if it is flush stream */ + if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) { + if (snapshot_out_size == 0) { + msgpack_sbuffer_destroy(&mp_sbuf); + flb_free(snapshot_out_buffer); + return 0; + } + else { + *out_buf = snapshot_out_buffer; + *out_size = snapshot_out_size; + return records; + } + } + + /* set outgoing results */ + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + + return records; +} + +int sp_process_hopping_slot(const char *tag, int tag_len, + struct flb_sp_task *task) +{ + int i; + int key_id; + int map_entries; + int gb_entries; + struct flb_sp_cmd *cmd = task->cmd; + struct mk_list *head; + struct mk_list *head_hs; + struct aggregate_node *aggr_node; + struct aggregate_node *aggr_node_hs; + struct aggregate_node *aggr_node_prev; + struct flb_sp_hopping_slot *hs; + struct flb_sp_hopping_slot *hs_; + struct rb_tree_node *rb_result; + struct flb_sp_cmd_key *ckey; + rb_result_t result; + + map_entries = mk_list_size(&cmd->keys); + gb_entries = mk_list_size(&cmd->gb_keys); + + /* Initialize a hoping slot */ + hs = flb_calloc(1, sizeof(struct flb_sp_hopping_slot)); + if (!hs) { + flb_errno(); + return -1; + } + + mk_list_init(&hs->aggregate_list); + rb_tree_new(&hs->aggregate_tree, flb_sp_groupby_compare); + + /* Loop over aggregation nodes on window */ + mk_list_foreach(head, &task->window.aggregate_list) { + /* Window aggregation node */ + aggr_node = mk_list_entry(head, struct aggregate_node, _head); + + /* Create a hopping slot aggregation node */ + aggr_node_hs = flb_calloc(1, sizeof(struct aggregate_node)); + if (!aggr_node_hs) { + flb_errno(); + flb_free(hs); + return -1; + } + + aggr_node_hs->nums = malloc(sizeof(struct aggregate_node) * map_entries); + if (!aggr_node_hs->nums) { + flb_errno(); + flb_free(hs); + flb_free(aggr_node_hs); + return -1; + } + + memcpy(aggr_node_hs->nums, aggr_node->nums, sizeof(struct aggregate_num) * map_entries); + aggr_node_hs->records = aggr_node->records; + + /* Clone aggregate data */ + key_id = 0; + mk_list_foreach(head_hs, &cmd->keys) { + ckey = mk_list_entry(head_hs, struct flb_sp_cmd_key, _head); + + if (ckey->aggr_func) { + if (!aggr_node_hs->aggregate_data) { + aggr_node_hs->aggregate_data = (struct aggregate_data **) + flb_calloc(1, sizeof(struct aggregate_data *) * map_entries); + if (!aggr_node_hs->aggregate_data) { + flb_errno(); + flb_free(hs); + flb_free(aggr_node_hs->nums); + flb_free(aggr_node_hs); + return -1; + } + } + + if (aggregate_func_clone[ckey->aggr_func - 1](aggr_node_hs, aggr_node, ckey, key_id) == -1) { + flb_errno(); + flb_free(aggr_node_hs->nums); + flb_free(aggr_node_hs->aggregate_data); + flb_free(aggr_node_hs); + flb_free(hs); + return -1; + } + } + + key_id++; + } + + /* Traverse over previous slots to calculate values/record numbers */ + mk_list_foreach(head_hs, &task->window.hopping_slot) { + hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head); + result = rb_tree_find(&hs_->aggregate_tree, aggr_node, &rb_result); + /* If corresponding aggregation node exists in previous hopping slot, + * calculate aggregation values + */ + if (result == RB_OK) { + aggr_node_prev = mk_list_entry(rb_result, struct aggregate_node, + _rb_head); + aggr_node_hs->records -= aggr_node_prev->records; + + key_id = 0; + ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key, + _head); + for (i = 0; i < map_entries; i++) { + if (ckey->aggr_func) { + aggregate_func_remove[ckey->aggr_func - 1](aggr_node_hs, aggr_node_prev, i); + } + + ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key, + _head, &cmd->keys); + } + } + } + + if (aggr_node_hs->records > 0) { + aggr_node_hs->groupby_nums = + flb_calloc(1, sizeof(struct aggregate_node) * gb_entries); + if (gb_entries > 0 && !aggr_node_hs->groupby_nums) { + flb_errno(); + flb_free(hs); + flb_free(aggr_node_hs->nums); + flb_free(aggr_node_hs->aggregate_data); + flb_free(aggr_node_hs); + return -1; + } + + if (aggr_node_hs->groupby_nums != NULL) { + memcpy(aggr_node_hs->groupby_nums, aggr_node->groupby_nums, + sizeof(struct aggregate_num) * gb_entries); + } + + aggr_node_hs->nums_size = aggr_node->nums_size; + aggr_node_hs->groupby_keys = aggr_node->groupby_keys; + + rb_tree_insert(&hs->aggregate_tree, aggr_node_hs, &aggr_node_hs->_rb_head); + mk_list_add(&aggr_node_hs->_head, &hs->aggregate_list); + } + else { + flb_free(aggr_node_hs->nums); + flb_free(aggr_node_hs->aggregate_data); + flb_free(aggr_node_hs); + } + } + + hs->records = task->window.records; + mk_list_foreach(head_hs, &task->window.hopping_slot) { + hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head); + hs->records -= hs_->records; + } + + mk_list_add(&hs->_head, &task->window.hopping_slot); + + return 0; +} + +/* Iterate and find input chunks to process */ +int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in, + const char *tag, int tag_len, + const char *buf_data, size_t buf_size) + +{ + int ret; + size_t out_size; + char *out_buf; + struct mk_list *head; + struct flb_sp_task *task; + struct flb_sp_cmd *cmd; + + /* Lookup tasks that match the incoming instance data */ + mk_list_foreach(head, &sp->tasks) { + task = mk_list_entry(head, struct flb_sp_task, _head); + cmd = task->cmd; + + if (cmd->source_type == FLB_SP_STREAM) { + if (task->source_instance != in) { + continue; + } + } + else if (cmd->source_type == FLB_SP_TAG) { + ret = flb_router_match(tag, tag_len, cmd->source_name, NULL); + if (ret == FLB_FALSE) { + continue; + } + } + + /* We found a task that matches the stream rule */ + if (task->aggregate_keys == FLB_TRUE) { + ret = sp_process_data_aggr(buf_data, buf_size, + tag, tag_len, + task, sp, in->config->stream_processor_str_conv); + + if (ret == -1) { + flb_error("[sp] error processing records for '%s'", + task->name); + continue; + } + + if (flb_sp_window_populate(task, buf_data, buf_size) == -1) { + flb_error("[sp] error populating window for '%s'", + task->name); + continue; + } + + if (task->window.type == FLB_SP_WINDOW_DEFAULT) { + package_results(tag, tag_len, &out_buf, &out_size, task); + flb_sp_window_prune(task); + } + } + else { + ret = sp_process_data(tag, tag_len, + buf_data, buf_size, + &out_buf, &out_size, + task, sp); + + if (ret == -1) { + flb_error("[sp] error processing records for '%s'", + task->name); + continue; + } + } + + if (ret == 0) { + /* no records */ + continue; + } + + /* + * This task involves append data to a stream, which + * means: register the output of the query as data + * generated by an input instance plugin. + */ + if (task->aggregate_keys != FLB_TRUE || + task->window.type == FLB_SP_WINDOW_DEFAULT) { + /* + * Add to stream processing stream if there is no + * aggregation function. Otherwise, write it at timer event + */ + if (task->stream) { + flb_sp_stream_append_data(out_buf, out_size, task->stream); + } + else { + flb_pack_print(out_buf, out_size); + flb_free(out_buf); + } + } + } + + return -1; +} + +int flb_sp_fd_event(int fd, struct flb_sp *sp) +{ + bool update_timer_event; + char *out_buf; + char *tag = NULL; + int tag_len = 0; + int fd_timeout = 0; + size_t out_size; + struct mk_list *tmp; + struct mk_list *head; + struct flb_sp_task *task; + struct flb_input_instance *in = NULL; + + /* Lookup Tasks that matches the incoming event */ + mk_list_foreach_safe(head, tmp, &sp->tasks) { + task = mk_list_entry(head, struct flb_sp_task, _head); + + if (fd == task->window.fd) { + update_timer_event = task->window.type == FLB_SP_WINDOW_HOPPING && + task->window.first_hop; + + in = task->source_instance; + if (in) { + if (in->tag && in->tag_len > 0) { + tag = in->tag; + tag_len = in->tag_len; + } + else { + tag = in->name; + tag_len = strlen(in->name); + } + } + else { + in = NULL; + } + + if (task->window.records > 0) { + /* find input tag from task source */ + package_results(tag, tag_len, &out_buf, &out_size, task); + if (task->stream) { + flb_sp_stream_append_data(out_buf, out_size, task->stream); + } + else { + flb_pack_print(out_buf, out_size); + flb_free(out_buf); + } + + } + + flb_sp_window_prune(task); + + flb_utils_timer_consume(fd); + + if (update_timer_event && in) { + task->window.first_hop = false; + mk_event_timeout_destroy(in->config->evl, &task->window.event); + mk_event_closesocket(fd); + + fd_timeout = mk_event_timeout_create(in->config->evl, + task->window.advance_by, (long) 0, + &task->window.event); + if (fd_timeout == -1) { + flb_error("[sp] registration for task (updating timer event) %s failed", task->name); + return -1; + } + task->window.fd = fd_timeout; + } + + break; + } + else if (fd == task->window.fd_hop) { + in = task->source_instance; + if (in) { + if (in->tag && in->tag_len > 0) { + tag = in->tag; + tag_len = in->tag_len; + } + else { + tag = in->name; + tag_len = strlen(in->name); + } + } + sp_process_hopping_slot(tag, tag_len, task); + flb_utils_timer_consume(fd); + } + } + return 0; +} + +/* Destroy stream processor context */ +void flb_sp_destroy(struct flb_sp *sp) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_sp_task *task; + + /* destroy tasks */ + mk_list_foreach_safe(head, tmp, &sp->tasks) { + task = mk_list_entry(head, struct flb_sp_task, _head); + flb_sp_task_destroy(task); + } + + flb_free(sp); +} diff --git a/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c b/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c new file mode 100644 index 00000000..624be878 --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c @@ -0,0 +1,364 @@ +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h> + +char aggregate_func_string[AGGREGATE_FUNCTIONS][sizeof("TIMESERIES_FORECAST") + 1] = { + "AVG", + "SUM", + "COUNT", + "MIN", + "MAX", + "TIMESERIES_FORECAST" +}; + +int aggregate_func_clone_nop(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + struct flb_sp_cmd_key *ckey, + int key_id) { + return 0; +} + +int aggregate_func_clone_timeseries_forecast(struct aggregate_node *aggr_node_clone, + struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id) { + struct timeseries_forecast *forecast_clone; + struct timeseries_forecast *forecast; + + forecast_clone = (struct timeseries_forecast *) aggr_node_clone->aggregate_data[key_id]; + if (!forecast_clone) { + forecast_clone = (struct timeseries_forecast *) flb_calloc(1, sizeof(struct timeseries_forecast)); + if (!forecast_clone) { + return -1; + } + + forecast_clone->future_time = ckey->constant; + aggr_node_clone->aggregate_data[key_id] = (struct aggregate_data *) forecast_clone; + } + + forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + + forecast_clone->sigma_x = forecast->sigma_x; + forecast_clone->sigma_y = forecast->sigma_y; + forecast_clone->sigma_xy = forecast->sigma_xy; + forecast_clone->sigma_x2 = forecast->sigma_x2; + + return 0; +} + +/* Summarize a value into the temporary array considering data type */ +void aggregate_func_add_sum(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + aggr_node->nums[key_id].i64 += ival; + aggr_node->nums[key_id].ops++; + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + if (dval != 0.0) { + aggr_node->nums[key_id].f64 += dval; + } + else { + aggr_node->nums[key_id].f64 += (double) ival; + } + aggr_node->nums[key_id].ops++; + } +} + +void aggregate_func_add_count(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { +} + +/* Calculate the minimum value considering data type */ +void aggregate_func_add_min(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { + + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].i64 > ival) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + } + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + if (dval != 0.0) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 > dval) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + } + } + else { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = (double) ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 > (double) ival) { + aggr_node->nums[key_id].f64 = ival; + aggr_node->nums[key_id].ops++; + } + } + } + } +} + +/* Calculate the maximum value considering data type */ +void aggregate_func_add_max(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) { + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].i64 < ival) { + aggr_node->nums[key_id].i64 = ival; + aggr_node->nums[key_id].ops++; + } + } + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + if (dval != 0.0) { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 < dval) { + aggr_node->nums[key_id].f64 = dval; + aggr_node->nums[key_id].ops++; + } + } + } + else { + if (aggr_node->nums[key_id].ops == 0) { + aggr_node->nums[key_id].f64 = (double) ival; + aggr_node->nums[key_id].ops++; + } + else { + if (aggr_node->nums[key_id].f64 < (double) ival) { + aggr_node->nums[key_id].f64 = (double) ival; + aggr_node->nums[key_id].ops++; + } + } + } + } +} + +void aggregate_func_calc_avg(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) { + double dval = 0.0; + /* average = sum(values) / records */ + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + dval = (double) aggr_node->nums[key_id].i64 / aggr_node->records; + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + dval = (double) aggr_node->nums[key_id].f64 / aggr_node->records; + } + + msgpack_pack_float(mp_pck, dval); +} + +void aggregate_func_calc_sum(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) { + /* pack result stored in nums[key_id] */ + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + msgpack_pack_int64(mp_pck, aggr_node->nums[key_id].i64); + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + msgpack_pack_float(mp_pck, aggr_node->nums[key_id].f64); + } +} + +void aggregate_func_calc_count(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) { + /* number of records in total */ + msgpack_pack_int64(mp_pck, aggr_node->records); +} + +void aggregate_func_remove_sum(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + int key_id) { + if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) { + aggr_node->nums[key_id].i64 -= aggr_node_prev->nums[key_id].i64; + } + else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) { + aggr_node->nums[key_id].f64 -= aggr_node_prev->nums[key_id].f64; + } +} + +void aggregate_func_remove_nop(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + int key_id) { +} + +void aggregate_func_add_timeseries_forecast(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + int key_id, + struct flb_time *tms, + int64_t ival, double dval) +{ + double x; + double y; + struct timeseries_forecast *forecast; + + forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + if (!forecast) { + forecast = (struct timeseries_forecast *) flb_calloc(1, sizeof(struct timeseries_forecast)); + /* fixme: return if error */ + + forecast->future_time = ckey->constant; + aggr_node->aggregate_data[key_id] = (struct aggregate_data *) forecast; + } + + if (!forecast->offset) { + forecast->offset = flb_time_to_double(tms); + } + + x = flb_time_to_double(tms) - forecast->offset; + + forecast->latest_x = x; + + if (ival) { + y = (double) ival; + } + else { + y = dval; + } + + forecast->sigma_x += x; + forecast->sigma_y += y; + + forecast->sigma_xy += x * y; + forecast->sigma_x2 += x * x; +} + +void aggregate_func_calc_timeseries_forecast(struct aggregate_node *aggr_node, + struct flb_sp_cmd_key *ckey, + msgpack_packer *mp_pck, + int key_id) +{ + double mean_x; + double mean_y; + double var_x; + double cov_xy; + double result; + /* y = b0 + b1 * x */ + double b0; + double b1; + struct timeseries_forecast *forecast; + + forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + + mean_x = forecast->sigma_x / aggr_node->records; + mean_y = forecast->sigma_y / aggr_node->records; + cov_xy = (forecast->sigma_xy / (double) aggr_node->records) - mean_x * mean_y; + var_x = (forecast->sigma_x2 / aggr_node->records) - mean_x * mean_x; + + b1 = cov_xy / var_x; + b0 = mean_y - b1 * mean_x; + + result = b0 + b1 * (forecast->future_time + forecast->latest_x); + + msgpack_pack_float(mp_pck, result); +} + +void aggregate_func_remove_timeseries_forecast(struct aggregate_node *aggr_node, + struct aggregate_node *aggr_node_prev, + int key_id) +{ + struct timeseries_forecast *forecast_w; + struct timeseries_forecast *forecast_h; + + forecast_w = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id]; + forecast_h = (struct timeseries_forecast *) aggr_node_prev->aggregate_data[key_id]; + + forecast_w->sigma_x -= forecast_h->sigma_x; + forecast_w->sigma_y -= forecast_h->sigma_y; + forecast_w->sigma_xy -= forecast_h->sigma_xy; + forecast_w->sigma_x2 -= forecast_h->sigma_x2; +} + +void aggregate_func_destroy_sum(struct aggregate_node *aggr_node, + int key_id) +{ +} + +void aggregate_func_destroy_timeseries_forecast(struct aggregate_node *aggr_node, + int key_id) +{ + flb_free(aggr_node->aggregate_data[key_id]); +} + +aggregate_function_clone aggregate_func_clone[AGGREGATE_FUNCTIONS] = { + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_nop, + aggregate_func_clone_timeseries_forecast, +}; + +aggregate_function_add aggregate_func_add[AGGREGATE_FUNCTIONS] = { + aggregate_func_add_sum, + aggregate_func_add_sum, + aggregate_func_add_count, + aggregate_func_add_min, + aggregate_func_add_max, + aggregate_func_add_timeseries_forecast, +}; + +aggregate_function_calc aggregate_func_calc[AGGREGATE_FUNCTIONS] = { + aggregate_func_calc_avg, + aggregate_func_calc_sum, + aggregate_func_calc_count, + aggregate_func_calc_sum, + aggregate_func_calc_sum, + aggregate_func_calc_timeseries_forecast, +}; + +aggregate_function_remove aggregate_func_remove[AGGREGATE_FUNCTIONS] = { + aggregate_func_remove_sum, + aggregate_func_remove_sum, + aggregate_func_remove_nop, + aggregate_func_remove_nop, + aggregate_func_remove_nop, + aggregate_func_remove_timeseries_forecast, +}; + +aggregate_function_destroy aggregate_func_destroy[AGGREGATE_FUNCTIONS] = { + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_sum, + aggregate_func_destroy_timeseries_forecast, +}; diff --git a/fluent-bit/src/stream_processor/flb_sp_func_record.c b/fluent-bit/src/stream_processor/flb_sp_func_record.c new file mode 100644 index 00000000..26625a1e --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_func_record.c @@ -0,0 +1,77 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> + +static inline void pack_key(msgpack_packer *mp_pck, + struct flb_sp_cmd_key *cmd_key, + const char *name, int len) +{ + if (cmd_key->alias) { + msgpack_pack_str(mp_pck, flb_sds_len(cmd_key->alias)); + msgpack_pack_str_body(mp_pck, cmd_key->alias, + flb_sds_len(cmd_key->alias)); + } + else { + msgpack_pack_str(mp_pck, len); + msgpack_pack_str_body(mp_pck, name, len); + } +} + +static int func_tag(const char *tag, int tag_len, + msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key) +{ + pack_key(mp_pck, cmd_key, "RECORD_TAG()", 12); + msgpack_pack_str(mp_pck, tag_len); + msgpack_pack_str_body(mp_pck, tag, tag_len); + + return 1; +} + +static int func_time(struct flb_time *tms, msgpack_packer *mp_pck, + struct flb_sp_cmd_key *cmd_key) +{ + double t; + + t = flb_time_to_double(tms); + pack_key(mp_pck, cmd_key, "RECORD_TIME()", 13); + msgpack_pack_double(mp_pck, t); + + return 1; +} + +/* + * Wrapper to handle record functions, returns the number of entries added + * to the map. + */ +int flb_sp_func_record(const char *tag, int tag_len, struct flb_time *tms, + msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key) +{ + switch (cmd_key->record_func) { + case FLB_SP_RECORD_TAG: + return func_tag(tag, tag_len, mp_pck, cmd_key); + case FLB_SP_RECORD_TIME: + return func_time(tms, mp_pck, cmd_key); + }; + + return 0; +} diff --git a/fluent-bit/src/stream_processor/flb_sp_func_time.c b/fluent-bit/src/stream_processor/flb_sp_func_time.c new file mode 100644 index 00000000..3e75eb77 --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_func_time.c @@ -0,0 +1,95 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> + +static inline void pack_key(msgpack_packer *mp_pck, + struct flb_sp_cmd_key *cmd_key, + const char *name, int len) +{ + if (cmd_key->alias) { + msgpack_pack_str(mp_pck, flb_sds_len(cmd_key->alias)); + msgpack_pack_str_body(mp_pck, cmd_key->alias, + flb_sds_len(cmd_key->alias)); + } + else { + msgpack_pack_str(mp_pck, len); + msgpack_pack_str_body(mp_pck, name, len); + } +} + +static int func_now(msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key) +{ + size_t len; + time_t now; + char buf[32]; + struct tm *local; + + local = flb_malloc(sizeof(struct tm)); + if (!local) { + flb_errno(); + return 0; + } + + /* Get current system time */ + now = time(NULL); + localtime_r(&now, local); + + /* Format string value */ + len = strftime(buf, sizeof(buf) - 1, "%Y-%m-%d %H:%M:%S", local); + flb_free(local); + + pack_key(mp_pck, cmd_key, "NOW()", 5); + msgpack_pack_str(mp_pck, len); + msgpack_pack_str_body(mp_pck, buf, len); + + return 1; +} + +static int func_unix_timestamp(msgpack_packer *mp_pck, + struct flb_sp_cmd_key *cmd_key) +{ + time_t now; + + /* Get unix timestamp */ + now = time(NULL); + + pack_key(mp_pck, cmd_key, "UNIX_TIMESTAMP()", 16); + msgpack_pack_uint64(mp_pck, now); + return 1; +} + +/* + * Wrapper to handle time functions, returns the number of entries added + * to the map. + */ +int flb_sp_func_time(msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key) +{ + switch (cmd_key->time_func) { + case FLB_SP_NOW: + return func_now(mp_pck, cmd_key); + case FLB_SP_UNIX_TIMESTAMP: + return func_unix_timestamp(mp_pck, cmd_key); + }; + + return 0; +} diff --git a/fluent-bit/src/stream_processor/flb_sp_groupby.c b/fluent-bit/src/stream_processor/flb_sp_groupby.c new file mode 100644 index 00000000..9cd72c23 --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_groupby.c @@ -0,0 +1,82 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/stream_processor/flb_sp.h> + +int flb_sp_groupby_compare(const void *lhs, const void *rhs) +{ + int i; + int strcmp_result; + struct aggregate_node *left = (struct aggregate_node *) lhs; + struct aggregate_node *right = (struct aggregate_node *) rhs; + struct aggregate_num *lval; + struct aggregate_num *rval; + + for (i = 0; i < left->groupby_keys; i++) { + lval = &left->groupby_nums[i]; + rval = &right->groupby_nums[i]; + + /* Convert integer to double if a float value appears on one side */ + if (lval->type == FLB_SP_NUM_I64 && rval->type == FLB_SP_NUM_F64) { + lval->type = FLB_SP_NUM_F64; + lval->f64 = (double) lval->i64; + } + else if (lval->type == FLB_SP_NUM_F64 && rval->type == FLB_SP_NUM_I64) { + rval->type = FLB_SP_NUM_F64; + rval->f64 = (double) rval->i64; + } + + /* Comparison */ + if (lval->type == FLB_SP_BOOLEAN && rval->type == FLB_SP_BOOLEAN) { + if (lval->boolean != rval->boolean) { + return 1; + } + } + else if (lval->type == FLB_SP_NUM_I64 && rval->type == FLB_SP_NUM_I64) { + if (lval->i64 > rval->i64) { + return 1; + } + + if (lval->i64 < rval->i64) { + return -1; + } + } + else if (lval->type == FLB_SP_NUM_F64 && rval->type == FLB_SP_NUM_F64) { + if (lval->f64 > rval->f64) { + return 1; + } + + if (lval->f64 < rval->f64) { + return -1; + } + } + else if (lval->type == FLB_SP_STRING && rval->type == FLB_SP_STRING) { + strcmp_result = strcmp((const char *) lval->string, (const char *) rval->string); + if (strcmp_result != 0) { + return strcmp_result; + } + } + else { /* Sides have different types */ + return -1; + } + } + + return 0; +} diff --git a/fluent-bit/src/stream_processor/flb_sp_key.c b/fluent-bit/src/stream_processor/flb_sp_key.c new file mode 100644 index 00000000..94562184 --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_key.c @@ -0,0 +1,231 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_slist.h> + +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> + +void flb_sp_key_value_print(struct flb_sp_value *v) +{ + if (v->type == FLB_EXP_BOOL) { + if (v->val.boolean) { + printf("true"); + } + else { + printf("false"); + } + } + else if (v->type == FLB_EXP_INT) { + printf("%" PRId64, v->val.i64); + } + else if (v->type == FLB_EXP_FLOAT) { + printf("%f", v->val.f64); + } + else if (v->type == FLB_EXP_STRING) { + printf("%s", v->val.string); + } + else if (v->type == FLB_EXP_NULL) { + printf("NULL"); + } +} + +/* Map msgpack object intp flb_sp_value representation */ +static int msgpack_object_to_sp_value(msgpack_object o, + struct flb_sp_value *result) +{ + result->o = o; + + /* Compose result with found value */ + if (o.type == MSGPACK_OBJECT_BOOLEAN) { + result->type = FLB_EXP_BOOL; + result->val.boolean = o.via.boolean; + return 0; + } + else if (o.type == MSGPACK_OBJECT_POSITIVE_INTEGER || + o.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { + result->type = FLB_EXP_INT; + result->val.i64 = o.via.i64; + return 0; + } + else if (o.type == MSGPACK_OBJECT_FLOAT32 || + o.type == MSGPACK_OBJECT_FLOAT) { + result->type = FLB_EXP_FLOAT; + result->val.f64 = o.via.f64; + return 0; + } + else if (o.type == MSGPACK_OBJECT_STR) { + result->type = FLB_EXP_STRING; + result->val.string = flb_sds_create_len((char *) o.via.str.ptr, + o.via.str.size); + return 0; + } + else if (o.type == MSGPACK_OBJECT_MAP) { + /* return boolean 'true', just denoting the existence of the key */ + result->type = FLB_EXP_BOOL; + result->val.boolean = true; + return 0; + } + else if (o.type == MSGPACK_OBJECT_NIL) { + result->type = FLB_EXP_NULL; + return 0; + } + + return -1; +} + +/* Lookup perfect match of sub-keys and map content */ +static int subkey_to_value(msgpack_object *map, struct mk_list *subkeys, + struct flb_sp_value *result) +{ + int i = 0; + int ret; + int levels; + int matched = 0; + msgpack_object *key_found = NULL; + msgpack_object key; + msgpack_object val; + msgpack_object cur_map; + struct mk_list *head; + struct flb_slist_entry *entry; + + /* Expected number of map levels in the map */ + levels = mk_list_size(subkeys); + + cur_map = *map; + + mk_list_foreach(head, subkeys) { + /* Key expected key entry */ + entry = mk_list_entry(head, struct flb_slist_entry, _head); + + if (cur_map.type != MSGPACK_OBJECT_MAP) { + break; + } + + /* Get map entry that matches entry name */ + for (i = 0; i < cur_map.via.map.size; i++) { + key = cur_map.via.map.ptr[i].key; + val = cur_map.via.map.ptr[i].val; + + /* A bit obvious, but it's better to validate data type */ + if (key.type != MSGPACK_OBJECT_STR) { + continue; + } + + /* Compare strings by length and content */ + if (flb_sds_cmp(entry->str, + (char *) key.via.str.ptr, + key.via.str.size) != 0) { + key_found = NULL; + continue; + } + + key_found = &key; + cur_map = val; + matched++; + break; + } + + if (levels == matched) { + break; + } + } + + /* No matches */ + if (!key_found || (matched > 0 && levels != matched)) { + return -1; + } + + ret = msgpack_object_to_sp_value(val, result); + if (ret == -1) { + //flb_error("[sp key] cannot process key value"); + return -1; + } + + return 0; +} + +struct flb_sp_value *flb_sp_key_to_value(flb_sds_t ckey, + msgpack_object map, + struct mk_list *subkeys) +{ + int i; + int ret; + int map_size; + msgpack_object key; + msgpack_object val; + struct flb_sp_value *result; + + map_size = map.via.map.size; + for (i = 0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + + /* Compare by length and by key name */ + if (flb_sds_cmp(ckey, key.via.str.ptr, key.via.str.size) != 0) { + continue; + } + + result = flb_calloc(1, sizeof(struct flb_sp_value)); + if (!result) { + flb_errno(); + return NULL; + } + result->o = val; + + if (val.type == MSGPACK_OBJECT_MAP && subkeys != NULL) { + ret = subkey_to_value(&val, subkeys, result); + if (ret == 0) { + return result; + } + else { + flb_free(result); + return NULL; + } + } + else { + ret = msgpack_object_to_sp_value(val, result); + if (ret == -1) { + flb_error("[sp key] cannot process key value"); + flb_free(result); + return NULL; + } + } + + return result; + } + + /* + * NULL return means: failed memory allocation, an invalid value, + * or non-existing key. + */ + return NULL; +} + +void flb_sp_key_value_destroy(struct flb_sp_value *v) +{ + if (v->type == FLB_EXP_STRING) { + flb_sds_destroy(v->val.string); + } + flb_free(v); +} diff --git a/fluent-bit/src/stream_processor/flb_sp_snapshot.c b/fluent-bit/src/stream_processor/flb_sp_snapshot.c new file mode 100644 index 00000000..edef823b --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_snapshot.c @@ -0,0 +1,277 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_snapshot.h> + +static struct flb_sp_snapshot_page *snapshot_page_create() +{ + struct flb_sp_snapshot_page *page; + + page = (struct flb_sp_snapshot_page *) + flb_calloc(1, sizeof(struct flb_sp_snapshot_page)); + if (!page) { + flb_errno(); + return NULL; + } + + page->snapshot_page = (char *) flb_malloc(SNAPSHOT_PAGE_SIZE); + if (!page->snapshot_page) { + flb_errno(); + flb_free(page); + return NULL; + } + + return page; +} + +static int snapshot_cleanup(struct flb_sp_snapshot *snapshot, struct flb_time *tms) +{ + int ok; + size_t off; + size_t off_copy; + msgpack_unpacked result; + msgpack_object *obj; + struct flb_time tms0; + struct flb_sp_snapshot_page *page; + + ok = MSGPACK_UNPACK_SUCCESS; + off = 0; + + while (mk_list_is_empty(&snapshot->pages) != 0) { + page = mk_list_entry_first(&snapshot->pages, struct flb_sp_snapshot_page, + _head); + off = page->start_pos; + off_copy = off; + + msgpack_unpacked_init(&result); + + while (msgpack_unpack_next(&result, page->snapshot_page, page->end_pos, + &off) == ok) { + + if (snapshot->record_limit > 0 && + snapshot->records > snapshot->record_limit) { + page->start_pos = off; + snapshot->records--; + snapshot->size = snapshot->size - (off - off_copy); + off_copy = off; + + continue; + } + + /* extract timestamp */ + flb_time_pop_from_msgpack(&tms0, &result, &obj); + + if (snapshot->time_limit > 0 && + tms->tm.tv_sec - tms0.tm.tv_sec > snapshot->time_limit) { + page->start_pos = off; + snapshot->records--; + snapshot->size = snapshot->size - (off - off_copy); + off_copy = off; + + continue; + } + + break; + } + + msgpack_unpacked_destroy(&result); + + /* If page is empty, free the page and move to the next one */ + if (page->start_pos != page->end_pos) { + break; + } + + mk_list_del(&page->_head); + flb_free(page->snapshot_page); + flb_free(page); + } + + return 0; +} + +static bool snapshot_page_is_full(struct flb_sp_snapshot_page *page, size_t buf_size) +{ + return SNAPSHOT_PAGE_SIZE - page->end_pos < buf_size; +} + +char *flb_sp_snapshot_name_from_flush(flb_sds_t name) +{ + return name + sizeof("__flush_") - 1; +} + +int flb_sp_snapshot_update(struct flb_sp_task *task, const char *buf_data, + size_t buf_size, struct flb_time *tms) +{ + int ok; + size_t off = 0; + struct flb_time tm; + struct flb_sp_snapshot *snapshot; + struct flb_sp_snapshot_page *page; + msgpack_unpacked result; + msgpack_object *obj; + + ok = MSGPACK_UNPACK_SUCCESS; + msgpack_unpacked_init(&result); + + if (buf_size <= 0) { + return -1; + } + + snapshot = (struct flb_sp_snapshot *) task->snapshot; + + /* Create a snapshot pgae if the list is empty */ + if (mk_list_is_empty(&snapshot->pages) == 0) { + page = snapshot_page_create(); + if (!page) { + flb_errno(); + return -1; + } + + mk_list_add(&page->_head, &snapshot->pages); + } + else { + page = mk_list_entry_last(&snapshot->pages, struct flb_sp_snapshot_page, _head); + + if (snapshot_page_is_full(page, buf_size)) { + page = snapshot_page_create(); + if (!page) { + flb_errno(); + return -1; + } + + mk_list_add(&page->_head, &snapshot->pages); + } + } + + memcpy(page->snapshot_page + page->end_pos, buf_data, buf_size); + page->end_pos = page->end_pos + buf_size; + + /* Get the last timestamp */ + while (msgpack_unpack_next(&result, page->snapshot_page, + page->end_pos - page->start_pos, &off) == ok) { + flb_time_pop_from_msgpack(&tm, &result, &obj); + } + + msgpack_unpacked_destroy(&result); + + snapshot->records++; + snapshot->size = snapshot->size + buf_size; + + /* Remove records from snapshot pages based on time/length window */ + snapshot_cleanup(snapshot, tms); + + return 0; +} + +int flb_sp_snapshot_flush(struct flb_sp *sp, struct flb_sp_task *task, + char **out_buf_data, size_t *out_buf_size) +{ + size_t off; + size_t page_size; + char *snapshot_name; + char *out_buf_data_tmp; + struct flb_sp_cmd *cmd; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *snapshot_head; + struct flb_sp_task *snapshot_task; + struct flb_sp_snapshot *snapshot; + struct flb_sp_snapshot_page *page; + + off = 0; + cmd = task->cmd; + snapshot_name = flb_sp_snapshot_name_from_flush(cmd->stream_name); + + /* Lookup Tasks that matches the incoming instance data */ + mk_list_foreach(head, &sp->tasks) { + snapshot_task = mk_list_entry(head, struct flb_sp_task, _head); + cmd = snapshot_task->cmd; + + if (cmd->type == FLB_SP_CREATE_SNAPSHOT && + flb_sds_cmp(cmd->stream_name, snapshot_name, + strlen(snapshot_name)) == 0) { + + snapshot = (struct flb_sp_snapshot *) snapshot_task->snapshot; + + if (snapshot->size == 0) { + break; + } + + if (*out_buf_data == NULL) { + *out_buf_data = (char *) flb_malloc(snapshot->size); + if (!*out_buf_data) { + flb_errno(); + return -1; + } + *out_buf_size = snapshot->size; + } + else { + out_buf_data_tmp = (char *) flb_realloc(*out_buf_data, + *out_buf_size + snapshot->size); + if (!out_buf_data_tmp) { + flb_errno(); + return -1; + } + *out_buf_data = out_buf_data_tmp; + *out_buf_size = *out_buf_size + snapshot->size; + } + + mk_list_foreach_safe(snapshot_head, tmp, &snapshot->pages) { + page = mk_list_entry_first(&snapshot->pages, + struct flb_sp_snapshot_page, _head); + page_size = page->end_pos - page->start_pos; + memcpy(*out_buf_data + off, + page->snapshot_page + page->start_pos, page_size); + off = off + page_size; + + /* Remove page from list */ + mk_list_del(&page->_head); + flb_free(page->snapshot_page); + flb_free(page); + } + + mk_list_init(&snapshot->pages); + + snapshot->records = 0; + snapshot->size = 0; + } + } + + return 0; +} + +void flb_sp_snapshot_destroy(struct flb_sp_snapshot *snapshot) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_sp_snapshot_page *page; + + if (snapshot != NULL) { + mk_list_foreach_safe(head, tmp, &snapshot->pages) { + page = mk_list_entry(head, struct flb_sp_snapshot_page, _head); + mk_list_del(&page->_head); + flb_free(page->snapshot_page); + flb_free(page); + } + flb_free(snapshot); + } +} diff --git a/fluent-bit/src/stream_processor/flb_sp_stream.c b/fluent-bit/src/stream_processor/flb_sp_stream.c new file mode 100644 index 00000000..b4f8a37a --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_stream.c @@ -0,0 +1,168 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_metrics.h> +#include <fluent-bit/flb_storage.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_stream.h> + +/* Function defined in plugins/in_stream_processor/sp.c */ +int in_stream_processor_add_chunk(const char *buf_data, size_t buf_size, + struct flb_input_instance *in); + +int flb_sp_stream_create(const char *name, struct flb_sp_task *task, + struct flb_sp *sp) +{ + int ret; + const char *tmp; + struct flb_input_instance *in; + struct flb_sp_stream *stream; + + /* The name must be different than an input plugin instance name or alias */ + ret = flb_input_name_exists(name, sp->config); + if (ret == FLB_TRUE) { + flb_error("[sp] stream name '%s' already exists", name); + return -1; + } + + /* Create stream context for 'stream processor' */ + stream = flb_calloc(1, sizeof(struct flb_sp_stream)); + if (!stream) { + flb_errno(); + return -1; + } + stream->name = flb_sds_create(name); + if (!stream->name) { + flb_free(stream); + return -1; + } + + /* + * Register an input plugin instance using 'in_stream_processor', that one + * is used as the parent plugin to ingest data back into Fluent Bit + * data pipeline. + */ + in = flb_input_new(sp->config, "stream_processor", NULL, FLB_FALSE); + if (!in) { + flb_error("[sp] cannot create instance of in_stream_processor"); + flb_free(stream); + return -1; + } + + /* Set an alias, otherwise the stream will be called stream_processor.N */ + ret = flb_input_set_property(in, "alias", name); + if (ret == -1) { + flb_warn("[sp] cannot set stream name, using fallback name %s", + in->name); + } + + /* + * Lookup for Stream properties: at this point we only care about a + * possible Tag defined in the query, e.g: + * + * CREATE STREAM data WITH(tag='mydata') as SELECT * FROM STREAM:apache; + */ + tmp = flb_sp_cmd_stream_prop_get(task->cmd, "tag"); + if (tmp) { + /* + * Duplicate value in a new variable since input instance property + * will be released upon plugin exit. + */ + stream->tag = flb_sds_create(tmp); + if (!stream->tag) { + flb_error("[sp] cannot set Tag property"); + flb_sp_stream_destroy(stream, sp); + return -1; + } + + /* Tag property is just an assignation, cannot fail */ + flb_input_set_property(in, "tag", stream->tag); + } + + /* + * Check if the new stream is 'routable' or not + */ + tmp = flb_sp_cmd_stream_prop_get(task->cmd, "routable"); + if (tmp) { + stream->routable = flb_utils_bool(tmp); + flb_input_set_property(in, "routable", tmp); + } + + /* + * Set storage type + */ + tmp = flb_sp_cmd_stream_prop_get(task->cmd, "storage.type"); + if (tmp) { + flb_input_set_property(in, "storage.type", tmp); + } + + /* Initialize instance */ + ret = flb_input_instance_init(in, sp->config); + if (ret == -1) { + flb_error("[sp] cannot initialize instance of in_stream_processor"); + flb_input_instance_exit(in, sp->config); + flb_input_instance_destroy(in); + } + stream->in = in; + + /* Initialize plugin collector (event callback) */ + flb_input_collector_start(0, in); + +#ifdef FLB_HAVE_METRICS + /* Override Metrics title */ + ret = flb_metrics_title(name, in->metrics); + if (ret == -1) { + flb_warn("[sp] cannot set metrics title, using fallback name %s", + in->name); + } +#endif + + /* Storage context */ + ret = flb_storage_input_create(sp->config->cio, in); + if (ret == -1) { + flb_error("[sp] cannot initialize storage for stream '%s'", + name); + flb_sp_stream_destroy(stream, sp); + return -1; + } + + task->stream = stream; + return 0; +} + +int flb_sp_stream_append_data(const char *buf_data, size_t buf_size, + struct flb_sp_stream *stream) +{ + return in_stream_processor_add_chunk(buf_data, buf_size, stream->in); +} + +void flb_sp_stream_destroy(struct flb_sp_stream *stream, struct flb_sp *sp) +{ + flb_sds_destroy(stream->name); + flb_sds_destroy(stream->tag); + flb_input_instance_exit(stream->in, sp->config); + flb_input_instance_destroy(stream->in); + flb_free(stream); +} diff --git a/fluent-bit/src/stream_processor/flb_sp_window.c b/fluent-bit/src/stream_processor/flb_sp_window.c new file mode 100644 index 00000000..3937b427 --- /dev/null +++ b/fluent-bit/src/stream_processor/flb_sp_window.c @@ -0,0 +1,122 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/stream_processor/flb_sp_window.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_groupby.h> +#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h> + +void flb_sp_window_prune(struct flb_sp_task *task) +{ + int i; + int map_entries; + rb_result_t result; + struct aggregate_node *aggr_node; + struct aggregate_node *aggr_node_hs; + struct mk_list *tmp; + struct mk_list *head; + struct flb_sp_hopping_slot *hs; + struct rb_tree_node *rb_result; + struct flb_sp_cmd_key *ckey; + struct flb_sp_cmd *cmd = task->cmd; + + switch (task->window.type) { + case FLB_SP_WINDOW_DEFAULT: + case FLB_SP_WINDOW_TUMBLING: + if (task->window.records > 0) { + mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) { + aggr_node = mk_list_entry(head, struct aggregate_node, _head); + mk_list_del(&aggr_node->_head); + flb_sp_aggregate_node_destroy(cmd, aggr_node); + } + + rb_tree_destroy(&task->window.aggregate_tree); + mk_list_init(&task->window.aggregate_list); + rb_tree_new(&task->window.aggregate_tree, flb_sp_groupby_compare); + task->window.records = 0; + } + break; + case FLB_SP_WINDOW_HOPPING: + if (mk_list_size(&task->window.hopping_slot) == 0) { + return; + } + + hs = mk_list_entry_first(&task->window.hopping_slot, + struct flb_sp_hopping_slot, _head); + mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) { + aggr_node = mk_list_entry(head, struct aggregate_node, _head); + result = rb_tree_find(&hs->aggregate_tree, aggr_node, &rb_result); + if (result == RB_OK) { + aggr_node_hs = mk_list_entry(rb_result, struct aggregate_node, _rb_head); + if (aggr_node_hs->records == aggr_node->records) { + rb_tree_remove(&task->window.aggregate_tree, &aggr_node->_rb_head); + mk_list_del(&aggr_node->_head); + // Destroy aggregation node + flb_sp_aggregate_node_destroy(cmd, aggr_node); + } + else { + aggr_node->records -= aggr_node_hs->records; + map_entries = mk_list_size(&cmd->keys); + + ckey = mk_list_entry_first(&cmd->keys, + struct flb_sp_cmd_key, _head); + for (i = 0; i < map_entries; i++) { + if (ckey->aggr_func) { + aggregate_func_remove[ckey->aggr_func - 1](aggr_node, aggr_node_hs, i); + } + + ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key, + _head, &cmd->keys); + } + } + } + } + task->window.records -= hs->records; + + /* Destroy hopping slot */ + mk_list_foreach_safe(head, tmp, &hs->aggregate_list) { + aggr_node_hs = mk_list_entry(head, struct aggregate_node, _head); + mk_list_del(&aggr_node_hs->_head); + flb_sp_aggregate_node_destroy(cmd, aggr_node_hs); + } + rb_tree_destroy(&hs->aggregate_tree); + mk_list_del(&hs->_head); + flb_free(hs); + + break; + } +} + +int flb_sp_window_populate(struct flb_sp_task *task, const char *buf_data, + size_t buf_size) +{ + switch (task->window.type) { + case FLB_SP_WINDOW_DEFAULT: + case FLB_SP_WINDOW_TUMBLING: + case FLB_SP_WINDOW_HOPPING: + break; + default: + flb_error("[sp] error populating window for '%s': window type unknown", + task->name); + return -1; + } + + return 0; +} diff --git a/fluent-bit/src/stream_processor/parser/CMakeLists.txt b/fluent-bit/src/stream_processor/parser/CMakeLists.txt new file mode 100644 index 00000000..d14b02bb --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/CMakeLists.txt @@ -0,0 +1,31 @@ +flex_target(lexer sql.l "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c" + DEFINES_FILE "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.h" + ) +bison_target(parser sql.y "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c") + +set(sources + flb_sp_parser.c + ) + +if(CMAKE_SYSTEM_NAME MATCHES "Windows") + FLB_DEFINITION(YY_NO_UNISTD_H) + message(STATUS "Specifying YY_NO_UNISTD_H") +endif() + +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + ) + +add_library(flb-sp-parser STATIC + ${sources} + "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c" + "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c" + ) + +add_flex_bison_dependency(lexer parser) +add_dependencies(flb-sp-parser onigmo-static) + +if(FLB_JEMALLOC) + target_link_libraries(flb-sp-parser libjemalloc) +endif() diff --git a/fluent-bit/src/stream_processor/parser/flb_sp_parser.c b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c new file mode 100644 index 00000000..429d0b4c --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c @@ -0,0 +1,851 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h> +#include <fluent-bit/stream_processor/flb_sp_record_func.h> + +#include "sql_parser.h" +#include "sql_lex.h" + +static int swap_tmp_subkeys(struct mk_list **target, struct flb_sp_cmd *cmd) +{ + /* Map context keys into this command key structure */ + *target = cmd->tmp_subkeys; + + cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list)); + if (!cmd->tmp_subkeys) { + flb_errno(); + cmd->tmp_subkeys = *target; + cmd->status = FLB_SP_ERROR; + return -1; + } + + flb_slist_create(cmd->tmp_subkeys); + return 0; +} + +void flb_sp_cmd_destroy(struct flb_sp_cmd *cmd) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_sp_cmd_key *key; + struct flb_sp_cmd_gb_key *gb_key; + struct flb_sp_cmd_prop *prop; + + /* remove keys */ + mk_list_foreach_safe(head, tmp, &cmd->keys) { + key = mk_list_entry(head, struct flb_sp_cmd_key, _head); + mk_list_del(&key->_head); + flb_sp_cmd_key_del(key); + } + + /* remove groupby keys */ + mk_list_foreach_safe(head, tmp, &cmd->gb_keys) { + gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key, _head); + mk_list_del(&gb_key->_head); + flb_sp_cmd_gb_key_del(gb_key); + } + + /* stream */ + if (cmd->stream_name) { + mk_list_foreach_safe(head, tmp, &cmd->stream_props) { + prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head); + mk_list_del(&prop->_head); + flb_sp_cmd_stream_prop_del(prop); + } + flb_sds_destroy(cmd->stream_name); + } + flb_sds_destroy(cmd->source_name); + + if (mk_list_size(&cmd->cond_list) > 0) { + flb_sp_cmd_condition_del(cmd); + } + + if (cmd->tmp_subkeys) { + flb_slist_destroy(cmd->tmp_subkeys); + flb_free(cmd->tmp_subkeys); + } + + flb_free(cmd); +} + +void flb_sp_cmd_key_del(struct flb_sp_cmd_key *key) +{ + if (key->name) { + flb_sds_destroy(key->name); + } + if (key->alias) { + flb_sds_destroy(key->alias); + } + if (key->subkeys) { + flb_slist_destroy(key->subkeys); + flb_free(key->subkeys); + } + flb_free(key); +} + +void flb_sp_cmd_gb_key_del(struct flb_sp_cmd_gb_key *key) +{ + if (key->name) { + flb_sds_destroy(key->name); + } + if (key->subkeys) { + flb_slist_destroy(key->subkeys); + flb_free(key->subkeys); + } + flb_free(key); +} + +struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func, + const char *key_name, + const char *key_alias) +{ + char tmp_alias[256]; + int s; + int ret; + int len; + int aggr_func = 0; + int time_func = 0; + int record_func = 0; + char *tmp; + struct mk_list *head; + struct flb_sp_cmd_key *key; + struct flb_slist_entry *entry; + + /* aggregation function ? */ + if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) { + aggr_func = func; + } + else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) { + /* Time function */ + time_func = func; + } + else if (func >= FLB_SP_RECORD_TAG && func <= FLB_SP_RECORD_TIME) { + /* Record function */ + record_func = func; + } + + key = flb_calloc(1, sizeof(struct flb_sp_cmd_key)); + if (!key) { + flb_errno(); + cmd->status = FLB_SP_ERROR; + return NULL; + } + key->gb_key = NULL; + key->subkeys = NULL; + + /* key name and aliases works when the selection is not a wildcard */ + if (key_name) { + key->name = flb_sds_create(key_name); + if (!key->name) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + } + else { + /* + * Wildcard key only allowed on: + * - no aggregation mode (left side / first entry) + * - aggregation using COUNT(*) + */ + if (mk_list_size(&cmd->keys) > 0 && aggr_func == 0 && + record_func == 0 && time_func == 0) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + } + + if (key_alias) { + key->alias = flb_sds_create(key_alias); + if (!key->alias) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + } + + /* Aggregation function */ + if (aggr_func > 0) { + key->aggr_func = aggr_func; + } + else if (time_func > 0) { + key->time_func = time_func; + } + else if (record_func > 0) { + key->record_func = record_func; + } + + /* Lookup for any subkeys in the temporary list */ + if (mk_list_size(cmd->tmp_subkeys) > 0) { + ret = swap_tmp_subkeys(&key->subkeys, cmd); + if (ret == -1) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + + /* Compose a name key that include listed sub keys */ + if (!key->alias) { + s = flb_sds_len(key->name) + (16 * mk_list_size(key->subkeys)); + key->alias = flb_sds_create_size(s); + if (!key->alias) { + flb_sp_cmd_key_del(key); + return NULL; + } + + tmp = flb_sds_cat(key->alias, key->name, flb_sds_len(key->name)); + if (tmp != key->alias) { + key->alias = tmp; + } + + mk_list_foreach(head, key->subkeys) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + + /* prefix */ + tmp = flb_sds_cat(key->alias, "['", 2); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + + /* selected key name */ + tmp = flb_sds_cat(key->alias, + entry->str, flb_sds_len(entry->str)); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + + /* suffix */ + tmp = flb_sds_cat(key->alias, "']", 2); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + } + + if (aggr_func) { + len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)", + aggregate_func_string[aggr_func - 1], key->alias); + + tmp = flb_sds_copy(key->alias, tmp_alias, len); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + } + } + } + else if (aggr_func && !key->alias) { + if (key->name) { + len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)", + aggregate_func_string[aggr_func - 1], key->name); + } else { + len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(*)", + aggregate_func_string[aggr_func - 1]); + } + + key->alias = flb_sds_create_len(tmp_alias, len); + if (!key->alias) { + flb_sp_cmd_key_del(key); + return NULL; + } + } + + return key; +} + +int flb_sp_cmd_key_add(struct flb_sp_cmd *cmd, int func, const char *key_name) +{ + struct flb_sp_cmd_key *key; + + key = flb_sp_key_create(cmd, func, key_name, cmd->alias); + + if (!key) { + return -1; + } + + mk_list_add(&key->_head, &cmd->keys); + + /* free key alias and set cmd->alias to null */ + if (cmd->alias) { + flb_free(cmd->alias); + cmd->alias = NULL; + } + + return 0; +} + +void flb_sp_cmd_alias_add(struct flb_sp_cmd *cmd, const char *key_alias) +{ + cmd->alias = (char *) key_alias; +} + +int flb_sp_cmd_source(struct flb_sp_cmd *cmd, int type, const char *source) +{ + cmd->source_type = type; + cmd->source_name = flb_sds_create(source); + if (!cmd->source_name) { + flb_errno(); + return -1; + } + + return 0; +} + +void flb_sp_cmd_dump(struct flb_sp_cmd *cmd) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_sp_cmd_key *key; + + /* Lookup keys */ + printf("== KEYS ==\n"); + mk_list_foreach_safe(head, tmp, &cmd->keys) { + key = mk_list_entry(head, struct flb_sp_cmd_key, _head); + printf("- '%s'\n", key->name); + } + printf("== SOURCE ==\n"); + if (cmd->source_type == FLB_SP_STREAM) { + printf("stream => "); + } + else if (cmd->source_type == FLB_SP_TAG) { + printf("tag match => "); + } + + printf("'%s'\n", cmd->source_name); +} + +struct flb_sp_cmd *flb_sp_cmd_create(const char *sql) +{ + int ret; + yyscan_t scanner; + YY_BUFFER_STATE buf; + struct flb_sp_cmd *cmd; + + /* create context */ + cmd = flb_calloc(1, sizeof(struct flb_sp_cmd)); + if (!cmd) { + flb_errno(); + return NULL; + } + cmd->status = FLB_SP_OK; + cmd->type = FLB_SP_SELECT; + + mk_list_init(&cmd->stream_props); + mk_list_init(&cmd->keys); + + /* Condition linked list (we use them to free resources) */ + mk_list_init(&cmd->cond_list); + mk_list_init(&cmd->gb_keys); + + /* Allocate temporary list and initialize */ + cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list)); + if (!cmd->tmp_subkeys) { + flb_errno(); + flb_free(cmd); + return NULL; + } + flb_slist_create(cmd->tmp_subkeys); + + /* Flex/Bison work */ + flb_sp_lex_init(&scanner); + buf = flb_sp__scan_string(sql, scanner); + + ret = flb_sp_parse(cmd, sql, scanner); + + flb_sp__delete_buffer(buf, scanner); + flb_sp_lex_destroy(scanner); + + if (ret != 0) { + flb_sp_cmd_destroy(cmd); + return NULL; + } + + return cmd; +} + +int flb_sp_cmd_stream_new(struct flb_sp_cmd *cmd, const char *stream_name) +{ + cmd->stream_name = flb_sds_create(stream_name); + if (!cmd->stream_name) { + return -1; + } + + cmd->type = FLB_SP_CREATE_STREAM; + return 0; +} + +int flb_sp_cmd_snapshot_new(struct flb_sp_cmd *cmd, const char *snapshot_name) +{ + const char *tmp; + + cmd->stream_name = flb_sds_create(snapshot_name); + if (!cmd->stream_name) { + return -1; + } + + tmp = flb_sp_cmd_stream_prop_get(cmd, "tag"); + if (!tmp) { + cmd->status = FLB_SP_ERROR; + flb_error("[sp] tag for snapshot is required. Add WITH(tag = <TAG>) to the snapshot %s", + snapshot_name); + return -1; + } + + cmd->type = FLB_SP_CREATE_SNAPSHOT; + + return 0; +} + +int flb_sp_cmd_snapshot_flush_new(struct flb_sp_cmd *cmd, const char *snapshot_name) +{ + cmd->stream_name = flb_sds_cat(flb_sds_create("__flush_"), + snapshot_name, strlen(snapshot_name)); + + if (!cmd->stream_name) { + return -1; + } + + cmd->type = FLB_SP_FLUSH_SNAPSHOT; + + return 0; +} + +int flb_sp_cmd_stream_prop_add(struct flb_sp_cmd *cmd, const char *key, const char *val) +{ + struct flb_sp_cmd_prop *prop; + + prop = flb_malloc(sizeof(struct flb_sp_cmd_prop)); + if (!prop) { + flb_errno(); + return -1; + } + + prop->key = flb_sds_create(key); + if (!prop->key) { + flb_free(prop); + return -1; + } + + prop->val = flb_sds_create(val); + if (!prop->val) { + flb_free(prop->key); + flb_free(prop); + return -1; + } + + mk_list_add(&prop->_head, &cmd->stream_props); + return 0; +} + +void flb_sp_cmd_stream_prop_del(struct flb_sp_cmd_prop *prop) +{ + if (prop->key) { + flb_sds_destroy(prop->key); + } + if (prop->val) { + flb_sds_destroy(prop->val); + } + flb_free(prop); +} + +const char *flb_sp_cmd_stream_prop_get(struct flb_sp_cmd *cmd, const char *key) +{ + int len; + struct mk_list *head; + struct flb_sp_cmd_prop *prop; + + if (!key) { + return NULL; + } + len = strlen(key); + + mk_list_foreach(head, &cmd->stream_props) { + prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head); + if (flb_sds_len(prop->key) != len) { + continue; + } + + if (strcmp(prop->key, key) == 0) { + return prop->val; + } + } + + return NULL; +} + +/* WINDOW functions */ + +int flb_sp_cmd_window(struct flb_sp_cmd *cmd, + int window_type, int size, int time_unit, + int advance_by_size, int advance_by_time_unit) +{ + cmd->window.type = window_type; + + switch (time_unit) { + case FLB_SP_TIME_SECOND: + cmd->window.size = (time_t) size; + break; + case FLB_SP_TIME_MINUTE: + cmd->window.size = (time_t) size * 60; + break; + case FLB_SP_TIME_HOUR: + cmd->window.size = (time_t) size * 3600; + break; + } + + if (window_type == FLB_SP_WINDOW_HOPPING) { + switch (advance_by_time_unit) { + case FLB_SP_TIME_SECOND: + cmd->window.advance_by = (time_t) advance_by_size; + break; + case FLB_SP_TIME_MINUTE: + cmd->window.advance_by = (time_t) advance_by_size * 60; + break; + case FLB_SP_TIME_HOUR: + cmd->window.advance_by = (time_t) advance_by_size * 3600; + break; + } + + if (cmd->window.advance_by >= cmd->window.size) { + return -1; + } + } + + return 0; +} + +/* WHERE <condition> functions */ + +struct flb_exp *flb_sp_cmd_operation(struct flb_sp_cmd *cmd, + struct flb_exp *e1, struct flb_exp *e2, + int operation) +{ + struct flb_exp_op *expression; + + expression = flb_malloc(sizeof(struct flb_exp_op)); + if (!expression) { + flb_errno(); + return NULL; + } + + expression->type = FLB_LOGICAL_OP; + expression->left = e1; + expression->right = e2; + expression->operation = operation; + mk_list_add(&expression->_head, &cmd->cond_list); + + return (struct flb_exp *) expression; +} + +struct flb_exp *flb_sp_cmd_comparison(struct flb_sp_cmd *cmd, + struct flb_exp *key, struct flb_exp *val, + int operation) +{ + struct flb_exp_op *expression; + + expression = flb_malloc(sizeof(struct flb_exp_op)); + if (!expression) { + flb_errno(); + return NULL; + } + + expression->type = FLB_LOGICAL_OP; + expression->left = (struct flb_exp *) key; + expression->right = (struct flb_exp *) val; + expression->operation = operation; + mk_list_add(&expression->_head, &cmd->cond_list); + + return (struct flb_exp *) expression; +} + +struct flb_exp *flb_sp_cmd_condition_key(struct flb_sp_cmd *cmd, + const char *identifier) +{ + int ret; + struct flb_exp_key *key; + + key = flb_calloc(1, sizeof(struct flb_exp_key)); + if (!key) { + flb_errno(); + return NULL; + } + + key->type = FLB_EXP_KEY; + key->name = flb_sds_create(identifier); + mk_list_add(&key->_head, &cmd->cond_list); + + if (mk_list_size(cmd->tmp_subkeys) > 0) { + ret = swap_tmp_subkeys(&key->subkeys, cmd); + if (ret == -1) { + flb_sds_destroy(key->name); + mk_list_del(&key->_head); + flb_free(key); + return NULL; + } + } + + return (struct flb_exp *) key; +} + +struct flb_exp *flb_sp_cmd_condition_integer(struct flb_sp_cmd *cmd, + int integer) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_INT; + val->val.i64 = integer; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_float(struct flb_sp_cmd *cmd, float fval) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_FLOAT; + val->val.f64 = fval; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_string(struct flb_sp_cmd *cmd, + const char *string) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_STRING; + val->val.string = flb_sds_create(string); + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_boolean(struct flb_sp_cmd *cmd, + bool boolean) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_BOOL; + val->val.boolean = boolean; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_null(struct flb_sp_cmd *cmd) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_NULL; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_record_function_add(struct flb_sp_cmd *cmd, + char *name, struct flb_exp *param) +{ + char *rf_name; + int i; + struct flb_exp_func *func; + + for (i = 0; i < RECORD_FUNCTIONS_SIZE; i++) + { + rf_name = record_functions[i]; + if (strncmp(rf_name, name, strlen(rf_name)) == 0) + { + func = flb_calloc(1, sizeof(struct flb_exp_func)); + if (!func) { + flb_errno(); + return NULL; + } + + func->type = FLB_EXP_FUNC; + func->name = flb_sds_create(name); + func->cb_func = record_functions_ptr[i]; + func->param = param; + + mk_list_add(&func->_head, &cmd->cond_list); + + return (struct flb_exp *) func; + } + } + + return NULL; +} + +void flb_sp_cmd_condition_add(struct flb_sp_cmd *cmd, struct flb_exp *e) + +{ + cmd->condition = e; +} + +int flb_sp_cmd_gb_key_add(struct flb_sp_cmd *cmd, const char *key) +{ + int ret; + struct flb_sp_cmd_gb_key *gb_key; + + gb_key = flb_calloc(1, sizeof(struct flb_sp_cmd_gb_key)); + if (!gb_key) { + flb_errno(); + return -1; + } + + gb_key->name = flb_sds_create(key); + if (!gb_key->name) { + flb_free(gb_key); + return -1; + } + + gb_key->id = mk_list_size(&cmd->gb_keys); + mk_list_add(&gb_key->_head, &cmd->gb_keys); + + /* Lookup for any subkeys in the temporary list */ + if (mk_list_size(cmd->tmp_subkeys) > 0) { + ret = swap_tmp_subkeys(&gb_key->subkeys, cmd); + if (ret == -1) { + flb_sds_destroy(gb_key->name); + mk_list_del(&gb_key->_head); + flb_free(gb_key); + return -1; + } + } + + return 0; +} + +void flb_sp_cmd_condition_del(struct flb_sp_cmd *cmd) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_exp *exp; + struct flb_exp_key *key; + struct flb_exp_val *val; + struct flb_exp_func *func; + + mk_list_foreach_safe(head, tmp, &cmd->cond_list) { + exp = mk_list_entry(head, struct flb_exp, _head); + if (exp->type == FLB_EXP_KEY) { + key = (struct flb_exp_key *) exp; + flb_sds_destroy(key->name); + if (key->subkeys) { + flb_slist_destroy(key->subkeys); + flb_free(key->subkeys); + } + } + else if (exp->type == FLB_EXP_STRING) { + val = (struct flb_exp_val *) exp; + flb_sds_destroy(val->val.string); + } + else if (exp->type == FLB_EXP_FUNC) { + func = (struct flb_exp_func *) exp; + flb_sds_destroy(func->name); + } + + mk_list_del(&exp->_head); + flb_free(exp); + } +} + +void flb_sp_cmd_limit_add(struct flb_sp_cmd *cmd, int limit) +{ + cmd->limit = limit; +} + +int flb_sp_cmd_timeseries_forecast(struct flb_sp_cmd *cmd, int func, const char *key_name, int seconds) +{ + struct flb_sp_cmd_key *key; + + key = flb_sp_key_create(cmd, func, key_name, cmd->alias); + + if (!key) { + return -1; + } + + mk_list_add(&key->_head, &cmd->keys); + + key->constant = seconds; + + /* free key alias and set cmd->alias to null */ + if (cmd->alias) { + flb_free(cmd->alias); + cmd->alias = NULL; + } + + return 0; +} diff --git a/fluent-bit/src/stream_processor/parser/sql.l b/fluent-bit/src/stream_processor/parser/sql.l new file mode 100644 index 00000000..91e5398e --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/sql.l @@ -0,0 +1,190 @@ +%option prefix="flb_sp_" +%option caseless +%option 8bit reentrant bison-bridge +%option warn noyywrap nodefault +%option nounput +%option noinput + + +%{ +#include <stdio.h> +#include <stdbool.h> +#include <ctype.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_log.h> +#include "sql_parser.h" +#include <fluent-bit/stream_processor/flb_sp_parser.h> + +static inline char *remove_dup_qoutes(const char *s, size_t n) +{ + char *str; + int dups; + int i, j; + + dups = 0; + for (i = 0; i < n; i++) { + if (s[i] == '\'') { + dups++; + i++; + } + } + + str = (char *) flb_malloc(n - dups + 1); + if (!str) { + return NULL; + } + + j = 0; + for (i = 0; i < n; i++, j++) { + if (s[i] == '\'') { + str[j] = '\''; + i++; + } else { + str[j] = s[i]; + } + } + str[j] = '\0'; + + return str; +} + +char* to_upper(char* token, size_t len) +{ + int i; + char* token_; + + token_ = flb_malloc(len * sizeof(char) + 1); + + for (i = 0; i < len; i++) { + token_[i] = toupper(token[i]); + } + + token_[len] = '\0'; + return token_; +} + +int func_to_code(char* name, size_t len) +{ + int code; + char* name_; + + name_ = to_upper(name, len); + code = -1; + + if (!strcmp(name_, "AVG")) { + code = FLB_SP_AVG; + } else if (!strcmp(name_, "SUM")) { + code = FLB_SP_SUM; + } else if (!strcmp(name_, "COUNT")) { + code = FLB_SP_COUNT; + } else if (!strcmp(name_, "MIN")) { + code = FLB_SP_MIN; + } else if (!strcmp(name_, "MAX")) { + code = FLB_SP_MAX; + } else if (!strcmp(name_, "TIMESERIES_FORECAST")) { + code = FLB_SP_FORECAST; + } else if (!strcmp(name_, "NOW")) { + code = FLB_SP_NOW; + } else if (!strcmp(name_, "UNIX_TIMESTAMP")) { + code = FLB_SP_UNIX_TIMESTAMP; + } else if (!strcmp(name_, "RECORD_TAG")) { + code = FLB_SP_RECORD_TAG; + } else if (!strcmp(name_, "RECORD_TIME")) { + code = FLB_SP_RECORD_TIME; + } + + flb_free(name_); + return code; +} + +%} + +%% + + /* SQL */ +CREATE return CREATE; +FLUSH return FLUSH; +STREAM return STREAM; +SNAPSHOT return SNAPSHOT; +WITH return WITH; +SELECT return SELECT; +AS return AS; +FROM return FROM; +STREAM: return FROM_STREAM; +TAG: return FROM_TAG; +WHERE return WHERE; +AND return AND; +OR return OR; +NOT return NOT; +WINDOW return WINDOW; +"GROUP BY" return GROUP_BY; +LIMIT return LIMIT; + +IS return IS; +NULL return NUL; + + /* Aggregation Functions */ +SUM {yylval->integer = func_to_code(yytext, yyleng); return SUM;} +AVG {yylval->integer = func_to_code(yytext, yyleng); return AVG;} +COUNT {yylval->integer = func_to_code(yytext, yyleng); return COUNT;} +MIN {yylval->integer = func_to_code(yytext, yyleng); return MIN;} +MAX {yylval->integer = func_to_code(yytext, yyleng); return MAX;} +TIMESERIES_FORECAST {yylval->integer = func_to_code(yytext, yyleng); return TIMESERIES_FORECAST;}; + + /* Record Functions */ +@RECORD return RECORD; +CONTAINS return CONTAINS; +TIME return TIME; + + + /* Window Types */ +TUMBLING return TUMBLING; +HOPPING return HOPPING; +"ADVANCE BY" return ADVANCE_BY; + + /* Time */ +HOUR return HOUR; +MINUTE return MINUTE; +SECOND return SECOND; + + /* Date / Time Functions */ +NOW {yylval->integer = func_to_code(yytext, yyleng); return NOW;} +UNIX_TIMESTAMP {yylval->integer = func_to_code(yytext, yyleng); return UNIX_TIMESTAMP;} + + /* Record information */ +RECORD_TAG {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TAG;} +RECORD_TIME {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TIME;} + +"true" { yylval->boolean = true; return BOOLTYPE; }; +"false" { yylval->boolean = false; return BOOLTYPE; }; + +-?[1-9][0-9]*|0 { yylval->integer = atoi(yytext); return INTEGER; } +(-?[1-9][0-9]*|0)\.[0-9]+ { yylval->fval = atof(yytext); return FLOATING; } +\'([^']|'{2})*\' { yylval->string = remove_dup_qoutes(yytext + 1, yyleng - 2); return STRING; } + +[_A-Za-z][A-Za-z0-9_.]* { yylval->string = flb_strdup(yytext); return IDENTIFIER; } + +"*" | +"," | +"=" | +"(" | +")" | +"[" | +"]" | +"." | +";" { return yytext[0]; } + +"!=" return NEQ; +"<>" return NEQ; +"<" return LT; +"<=" return LTE; +">" return GT; +">=" return GTE; + +\' return QUOTE; +\n +[ \t]+ /* ignore whitespace */; + +. flb_error("[sp] bad input character '%s' at line %d", yytext, yylineno); + +%% diff --git a/fluent-bit/src/stream_processor/parser/sql.y b/fluent-bit/src/stream_processor/parser/sql.y new file mode 100644 index 00000000..866f95cc --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/sql.y @@ -0,0 +1,437 @@ +%name-prefix="flb_sp_" // replace with %define api.prefix {flb_sp_} +%define api.pure full +%define parse.error verbose +%parse-param { struct flb_sp_cmd *cmd }; +%parse-param { const char *query }; +%lex-param { void *scanner } +%parse-param { void *scanner } + +%{ // definition section (prologue) +#include <stdio.h> +#include <stdlib.h> +#include <ctype.h> + +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> + +#include "sql_parser.h" +#include "sql_lex.h" + +extern int yylex(); + +void yyerror(struct flb_sp_cmd *cmd, const char *query, void *scanner, const char *str) +{ + flb_error("[sp] %s at '%s'", str, query); +} + +%} /* EOF C code */ + +/* Bison declarations */ +/* Known Tokens (refer to sql.l) */ + +/* Keywords */ +%token IDENTIFIER QUOTE + +/* Basic keywords for statements */ +%token CREATE STREAM SNAPSHOT FLUSH WITH SELECT AS FROM FROM_STREAM FROM_TAG +%token WHERE WINDOW GROUP_BY LIMIT + +/* Null keywords */ +%token IS NUL + +/* Aggregation functions */ +%token AVG SUM COUNT MAX MIN TIMESERIES_FORECAST + +/* Record functions */ +%token RECORD CONTAINS TIME + +/* Time functions */ +%token NOW UNIX_TIMESTAMP + + /* Record functions */ +%token RECORD_TAG RECORD_TIME + +/* Value types */ +%token INTEGER FLOATING STRING BOOLTYPE + +/* Logical operation tokens */ +%token AND OR NOT NEQ LT LTE GT GTE + +/* Time tokens */ +%token HOUR MINUTE SECOND + +/* Window tokens */ +%token TUMBLING HOPPING ADVANCE_BY + +/* Union and field types */ +%union +{ + bool boolean; + int integer; + float fval; + char *string; + struct flb_sp_cmd *cmd; + struct flb_exp *expression; +} + +%type <boolean> BOOLTYPE +%type <integer> INTEGER +%type <fval> FLOATING +%type <string> IDENTIFIER +%type <string> STRING +%type <string> record_keys +%type <string> record_key +%type <string> prop_key +%type <string> prop_val +%type <expression> condition +%type <expression> comparison +%type <expression> key +%type <expression> record_func +%type <expression> value +%type <expression> null +%type <integer> time + +%type <integer> time_record_func +%type <integer> NOW UNIX_TIMESTAMP RECORD_TAG RECORD_TIME + +%type <integer> aggregate_func +%type <integer> COUNT AVG SUM MAX MIN TIMESERIES_FORECAST + + +%destructor { flb_free ($$); } IDENTIFIER + +%% /* rules section */ + +statements: create | select + +/* Parser for 'CREATE STREAM' statement */ +create: + CREATE STREAM IDENTIFIER AS select + { + flb_sp_cmd_stream_new(cmd, $3); + flb_free($3); + } + | + CREATE STREAM IDENTIFIER WITH '(' properties ')' AS select + { + flb_sp_cmd_stream_new(cmd, $3); + flb_free($3); + } + | + CREATE SNAPSHOT IDENTIFIER AS SELECT '*' FROM source limit ';' + { + flb_sp_cmd_snapshot_new(cmd, $3); + flb_free($3); + } + | + CREATE SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source limit ';' + { + flb_sp_cmd_snapshot_new(cmd, $3); + flb_free($3); + } + | + FLUSH SNAPSHOT IDENTIFIER AS SELECT '*' FROM source where ';' + { + flb_sp_cmd_snapshot_flush_new(cmd, $3); + flb_free($3); + } + | + FLUSH SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source where ';' + { + flb_sp_cmd_snapshot_flush_new(cmd, $3); + flb_free($3); + } + properties: property + | + properties ',' property + property: prop_key '=' prop_val + { + flb_sp_cmd_stream_prop_add(cmd, $1, $3); + flb_free($1); + flb_free($3); + } + prop_key: IDENTIFIER + prop_val: STRING + +/* Parser for 'SELECT' statement */ +select: SELECT keys FROM source window where groupby limit ';' + { + cmd->type = FLB_SP_SELECT; + } + keys: record_keys + record_keys: record_key + | + record_keys ',' record_key + record_key: '*' + { + flb_sp_cmd_key_add(cmd, -1, NULL); + } + | + IDENTIFIER key_alias + { + flb_sp_cmd_key_add(cmd, -1, $1); + flb_free($1); + } + | + IDENTIFIER record_subkey key_alias + { + flb_sp_cmd_key_add(cmd, -1, $1); + flb_free($1); + } + | + COUNT '(' '*' ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, NULL); + } + | + COUNT '(' IDENTIFIER ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + COUNT '(' IDENTIFIER record_subkey ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + aggregate_func '(' IDENTIFIER ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + aggregate_func '(' IDENTIFIER record_subkey ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + TIMESERIES_FORECAST '(' IDENTIFIER ',' INTEGER ')' key_alias + { + flb_sp_cmd_timeseries_forecast(cmd, $1, $3, $5); + flb_free($3); + } + | + time_record_func '(' ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, NULL); + } + aggregate_func: + AVG | SUM | MAX | MIN + time_record_func: + NOW | UNIX_TIMESTAMP | RECORD_TAG | RECORD_TIME + key_alias: + %empty + | + AS IDENTIFIER + { + flb_sp_cmd_alias_add(cmd, $2); + } + record_subkey: '[' STRING ']' + { + flb_slist_add(cmd->tmp_subkeys, $2); + flb_free($2); + } + | + record_subkey record_subkey + source: FROM_STREAM IDENTIFIER + { + flb_sp_cmd_source(cmd, FLB_SP_STREAM, $2); + flb_free($2); + } + | + FROM_TAG STRING + { + flb_sp_cmd_source(cmd, FLB_SP_TAG, $2); + flb_free($2); + } + window: %empty + | + WINDOW window_spec + where: %empty + | + WHERE condition + { + flb_sp_cmd_condition_add(cmd, $2); + } + groupby: %empty + | + GROUP_BY gb_keys + limit: %empty + | + LIMIT INTEGER + { + flb_sp_cmd_limit_add(cmd, $2); + } + window_spec: + TUMBLING '(' INTEGER time ')' + { + flb_sp_cmd_window(cmd, FLB_SP_WINDOW_TUMBLING, $3, $4, 0, 0); + } + | + HOPPING '(' INTEGER time ',' ADVANCE_BY INTEGER time ')' + { + flb_sp_cmd_window(cmd, FLB_SP_WINDOW_HOPPING, $3, $4, $7, $8); + } + condition: comparison + | + key + { + $$ = flb_sp_cmd_operation(cmd, $1, NULL, FLB_EXP_OR); + } + | + value + { + $$ = flb_sp_cmd_operation(cmd, NULL, $1, FLB_EXP_OR); + } + | + '(' condition ')' + { + $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_PAR); + } + | + NOT condition + { + $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_NOT); + } + | + condition AND condition + { + $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_AND); + } + | + condition OR condition + { + $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_OR); + } + comparison: + key IS null + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ); + } + | + key IS NOT null + { + $$ = flb_sp_cmd_operation(cmd, + flb_sp_cmd_comparison(cmd, $1, $4, FLB_EXP_EQ), + NULL, FLB_EXP_NOT); + } + | + record_func + { + $$ = flb_sp_cmd_comparison(cmd, + $1, + flb_sp_cmd_condition_boolean(cmd, true), + FLB_EXP_EQ); + } + | + record_func '=' value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ); + } + | + record_func NEQ value + { + $$ = flb_sp_cmd_operation(cmd, + flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ), + NULL, FLB_EXP_NOT) + ; + } + | + record_func LT value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LT); + } + | + record_func LTE value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LTE); + } + | + record_func GT value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GT); + } + | + record_func GTE value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GTE); + } + record_func: key /* Similar to an identity function */ + | + RECORD '.' CONTAINS '(' key ')' + { + $$ = flb_sp_record_function_add(cmd, "contains", $5); + } + | + RECORD '.' TIME '(' ')' + { + $$ = flb_sp_record_function_add(cmd, "time", NULL); + } + key: IDENTIFIER + { + $$ = flb_sp_cmd_condition_key(cmd, $1); + flb_free($1); + } + | + IDENTIFIER record_subkey + { + $$ = flb_sp_cmd_condition_key(cmd, $1); + flb_free($1); + } + value: INTEGER + { + $$ = flb_sp_cmd_condition_integer(cmd, $1); + } + | + FLOATING + { + $$ = flb_sp_cmd_condition_float(cmd, $1); + } + | + STRING + { + $$ = flb_sp_cmd_condition_string(cmd, $1); + flb_free($1); + } + | + BOOLTYPE + { + $$ = flb_sp_cmd_condition_boolean(cmd, $1); + } + null: NUL + { + $$ = flb_sp_cmd_condition_null(cmd); + } + time: SECOND + { + $$ = FLB_SP_TIME_SECOND; + } + | + MINUTE + { + $$ = FLB_SP_TIME_MINUTE; + } + | + HOUR + { + $$ = FLB_SP_TIME_HOUR; + } + gb_keys: gb_key + | + gb_key ',' gb_keys + gb_key: IDENTIFIER + { + flb_sp_cmd_gb_key_add(cmd, $1); + flb_free($1); + } + | + IDENTIFIER record_subkey + { + flb_sp_cmd_gb_key_add(cmd, $1); + flb_free($1); + } + ; |