summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/stream_processor
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/stream_processor')
-rw-r--r--fluent-bit/src/stream_processor/CMakeLists.txt21
-rw-r--r--fluent-bit/src/stream_processor/README.md84
-rw-r--r--fluent-bit/src/stream_processor/flb_sp.c2157
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_aggregate_func.c364
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_func_record.c77
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_func_time.c95
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_groupby.c82
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_key.c231
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_snapshot.c277
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_stream.c168
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_window.c122
-rw-r--r--fluent-bit/src/stream_processor/parser/CMakeLists.txt31
-rw-r--r--fluent-bit/src/stream_processor/parser/flb_sp_parser.c851
-rw-r--r--fluent-bit/src/stream_processor/parser/sql.l190
-rw-r--r--fluent-bit/src/stream_processor/parser/sql.y437
15 files changed, 5187 insertions, 0 deletions
diff --git a/fluent-bit/src/stream_processor/CMakeLists.txt b/fluent-bit/src/stream_processor/CMakeLists.txt
new file mode 100644
index 000000000..de2c2fe38
--- /dev/null
+++ b/fluent-bit/src/stream_processor/CMakeLists.txt
@@ -0,0 +1,21 @@
+project(stream-processor C)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR})
+
+add_subdirectory(parser)
+
+set(src
+ flb_sp.c
+ flb_sp_key.c
+ flb_sp_func_time.c
+ flb_sp_func_record.c
+ flb_sp_stream.c
+ flb_sp_snapshot.c
+ flb_sp_window.c
+ flb_sp_groupby.c
+ flb_sp_aggregate_func.c
+ )
+
+add_library(flb-sp STATIC ${src})
+target_link_libraries(flb-sp rbtree)
+target_link_libraries(flb-sp flb-sp-parser)
diff --git a/fluent-bit/src/stream_processor/README.md b/fluent-bit/src/stream_processor/README.md
new file mode 100644
index 000000000..d39a51ef5
--- /dev/null
+++ b/fluent-bit/src/stream_processor/README.md
@@ -0,0 +1,84 @@
+## SQL Statement Syntax
+
+The following is the SQL statement syntax supported by Fluent Bit stream processor in EBNF form. For readability, we assume the conventional definition for integer, float and string values. A single quote in a constant string literal has to be escaped with an extra one. For instance, the string representation of `O'Keefe` in the query will be `'O''Keefe'`.
+
+```xml
+<sql_stmt> := <create> | <select>
+<create> := CREATE STREAM <id> AS <select> | CREATE STREAM <id> WITH (<properties>) AS <select>
+<properties> := <property> | <property>, <properties>
+<property> := <id> = '<id>'
+<select> := SELECT <keys> FROM <source> [WHERE <condition>]
+ [WINDOW TUMBLING (<integer> SECOND) | WINDOW HOPPING (<integer> SECOND, ADVANCE BY <integer> SECOND)]
+ [GROUP BY <record_keys>]
+<keys> := '*' | <record_keys>
+<record_keys> := <record_key> | <record_key>, <record_keys>
+<record_key> := <exp> | <exp> AS <id>
+<exp> := <key> | <fun>
+<fun> := AVG(<key>) | SUM(<key>) | COUNT(<key>) | COUNT(*) | MIN(<key>) | MAX(<key>) | TIMESERIES_FORECAST(<key>, <integer>)
+<source> := STREAM:<id> | TAG:<id>
+<condition> := <key> | <value> | <key> <relation> <value> | (<condition>)
+ | NOT <condition> | <condition> AND <condition> | <condition> OR <condition>
+ | @record.contains(<key>) | <id> IS NULL | <id> IS NOT NULL
+<key> := <id> | <id><subkey-idx>
+<subkey-idx> := [<id>] | <subkey-idx>[<id>]
+<relation> := = | != | <> | < | <= | > | >=
+<id> := <letter> <characters>
+<characters> := <letter> | <digit> | _ | <characters> <characters>
+<value> := true | false | <integer> | <float> | '<string>'
+```
+
+In addition to the common aggregation functions, Stream Processor provides the timeseries function `TIMESERIES_FORECAST`, which uses [simple linear regression algorithm](<https://en.wikipedia.org/wiki/Simple_linear_regression) to predict the value of a (dependent) variable in future.
+
+### Timeseries Functions
+
+| name | description |
+| ------------------------- | ------------------------------------------------------ |
+| TIMESERIES_FORECAST(x, t) | forecasts the value of x at current time + t seconds |
+
+### Time Functions
+
+| name | description | example |
+| ---------------- | ------------------------------------------------- | ------------------- |
+| NOW() | adds system time using format: %Y-%m-%d %H:%M:%S | 2019-03-09 21:36:05 |
+| UNIX_TIMESTAMP() | add current Unix timestamp | 1552196165 |
+
+### Record Functions
+
+| name | description | example |
+| ------------- | ------------------------------------------------------------ | ----------------- |
+| RECORD_TAG() | append Tag string associated to the record | samples |
+| RECORD_TIME() | append record Timestamp in _double_ format: seconds.nanoseconds | 1552196165.705683 |
+
+## Type of windows
+
+FluentBit stream processor has implemented two time-based windows: hopping window and tumbling window.
+
+### Hopping window
+
+In hopping window (also known as sliding window), records are stored in a time window of the interval in seconds defined as the parameter. The `ADVANCE BY` parameter determines the time the window slides forward. Aggregation functions are computed over the records inside a window, and reported right before window moves.
+
+For example. the hopping window `WINDOW HOPPING (10 SECOND, ADVANCE BY 2 SECOND)` behaves like this:
+
+```
+[ x x x x x ... x x x x x ]
+<--------- 10 sec -------->
+ [ x x x x x ... x x x x x ]
+<- 2 sec -><--------- 10 sec -------->
+ [ x x x x x ... x x x x x ]
+ <- 2 sec -><--------- 10 sec -------->
+```
+
+### Tumbling window
+
+A tumbling window is similar to a hopping window where `ADVANCE BY` value is the same as the window size. That means the new window doesn't include any record from the previous one.
+
+For example. the tumbling window `WINDOW TUMBLING (10 SECOND)` works like this:
+
+```
+[ x x x x x ... x x x x x ]
+<--------- 10 sec -------->
+ [ x x x x x ... x x x x x ]
+ <--------- 10 sec -------->
+ [ x x x x x ... x x x x x ]
+ <--------- 10 sec -------->
+```
diff --git a/fluent-bit/src/stream_processor/flb_sp.c b/fluent-bit/src/stream_processor/flb_sp.c
new file mode 100644
index 000000000..00eb2f18b
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp.c
@@ -0,0 +1,2157 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_slist.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_router.h>
+#include <fluent-bit/flb_config_format.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_key.h>
+#include <fluent-bit/stream_processor/flb_sp_stream.h>
+#include <fluent-bit/stream_processor/flb_sp_snapshot.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_func_time.h>
+#include <fluent-bit/stream_processor/flb_sp_func_record.h>
+#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h>
+#include <fluent-bit/stream_processor/flb_sp_window.h>
+#include <fluent-bit/stream_processor/flb_sp_groupby.h>
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#ifndef _WIN32
+#include <unistd.h>
+#endif
+
+/* don't do this at home */
+#define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d)
+#define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d)
+
+/* String type to numerical conversion */
+#define FLB_STR_INT 1
+#define FLB_STR_FLOAT 2
+
+/* Read and process file system configuration file */
+static int sp_config_file(struct flb_config *config, struct flb_sp *sp,
+ const char *file)
+{
+ int ret;
+ flb_sds_t name;
+ flb_sds_t exec;
+ char *cfg = NULL;
+ char tmp[PATH_MAX + 1];
+ struct stat st;
+ struct mk_list *head;
+ struct flb_sp_task *task;
+ struct flb_cf *cf;
+ struct flb_cf_section *section;
+
+#ifndef FLB_HAVE_STATIC_CONF
+ ret = stat(file, &st);
+ if (ret == -1 && errno == ENOENT) {
+ /* Try to resolve the real path (if exists) */
+ if (file[0] == '/') {
+ flb_error("[sp] cannot open configuration file: %s", file);
+ return -1;
+ }
+
+ if (config->conf_path) {
+ snprintf(tmp, PATH_MAX, "%s%s", config->conf_path, file);
+ cfg = tmp;
+ }
+ }
+ else {
+ cfg = (char *) file;
+ }
+
+ cf = flb_cf_create_from_file(NULL, cfg);
+#else
+ cf = flb_config_static_open(file);
+#endif
+
+ if (!cf) {
+ return -1;
+ }
+
+ /* Read all 'stream_task' sections */
+ mk_list_foreach(head, &cf->sections) {
+ section = mk_list_entry(head, struct flb_cf_section, _head);
+ if (strcasecmp(section->name, "stream_task") != 0) {
+ continue;
+ }
+
+ name = NULL;
+ exec = NULL;
+
+ /* name */
+ name = flb_cf_section_property_get_string(cf, section, "name");
+ if (!name) {
+ flb_error("[sp] task 'name' not found in file '%s'", cfg);
+ goto fconf_error;
+ }
+
+ /* exec */
+ exec = flb_cf_section_property_get_string(cf, section, "exec");
+ if (!exec) {
+ flb_error("[sp] task '%s' don't have an 'exec' command", name);
+ goto fconf_error;
+ }
+
+ /* Register the task */
+ task = flb_sp_task_create(sp, name, exec);
+ if (!task) {
+ goto fconf_error;
+ }
+ flb_sds_destroy(name);
+ flb_sds_destroy(exec);
+ name = NULL;
+ exec = NULL;
+ }
+
+ flb_cf_destroy(cf);
+ return 0;
+
+fconf_error:
+ if (name) {
+ flb_sds_destroy(name);
+ }
+ if (exec) {
+ flb_sds_destroy(exec);
+ }
+ flb_cf_destroy(cf);
+ return -1;
+}
+
+static int sp_task_to_instance(struct flb_sp_task *task, struct flb_sp *sp)
+{
+ struct mk_list *head;
+ struct flb_input_instance *in;
+
+ if (task->cmd->source_type != FLB_SP_STREAM) {
+ return -1;
+ }
+
+ mk_list_foreach(head, &sp->config->inputs) {
+ in = mk_list_entry(head, struct flb_input_instance, _head);
+ if (in->alias) {
+ if (strcasecmp(in->alias, task->cmd->source_name) == 0) {
+ task->source_instance = in;
+ return 0;
+ }
+ }
+
+ if (strcasecmp(in->name, task->cmd->source_name) == 0) {
+ task->source_instance = in;
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
+static void sp_info(struct flb_sp *sp)
+{
+ struct mk_list *head;
+ struct flb_sp_task *task;
+
+ flb_info("[sp] stream processor started");
+
+ mk_list_foreach(head, &sp->tasks) {
+ task = mk_list_entry(head, struct flb_sp_task, _head);
+ flb_info("[sp] registered task: %s", task->name);
+ }
+}
+
+int subkeys_compare(struct mk_list *subkeys1, struct mk_list *subkeys2)
+{
+ int i;
+ struct flb_slist_entry *entry1;
+ struct flb_slist_entry *entry2;
+
+ if (!subkeys1 && !subkeys2) {
+ return 0;
+ }
+
+ if (!subkeys1 || !subkeys2) {
+ return -1;
+ }
+
+ if (mk_list_size(subkeys1) != mk_list_size(subkeys2)) {
+ return -1;
+ }
+
+ entry1 = mk_list_entry_first(subkeys1, struct flb_slist_entry, _head);
+ entry2 = mk_list_entry_first(subkeys2, struct flb_slist_entry, _head);
+
+ for (i = 0; i < mk_list_size(subkeys1); i++) {
+ if (flb_sds_cmp(entry1->str, entry2->str, flb_sds_len(entry2->str)) != 0) {
+ return -1;
+ }
+
+ entry1 = mk_list_entry_next(&entry1->_head, struct flb_slist_entry,
+ _head, subkeys1);
+ entry2 = mk_list_entry_next(&entry2->_head, struct flb_slist_entry,
+ _head, subkeys2);
+ }
+
+ return 0;
+}
+
+static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd)
+{
+ int aggr = 0;
+ int not_aggr = 0;
+ struct mk_list *head;
+ struct mk_list *head_gb;
+ struct flb_sp_cmd_key *key;
+ struct flb_sp_cmd_gb_key *gb_key;
+
+ mk_list_foreach(head, &cmd->keys) {
+ key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+ if (key->time_func > 0 || key->record_func > 0) {
+ continue;
+ }
+
+ if (key->aggr_func > 0) {
+ /* AVG, SUM, COUNT or timeseries functions */
+ aggr++;
+ }
+ else {
+ mk_list_foreach(head_gb, &cmd->gb_keys) {
+ gb_key = mk_list_entry(head_gb, struct flb_sp_cmd_gb_key, _head);
+
+ if (!key->name) { /* Key name is a wildcard '*' */
+ break;
+ }
+
+ if (flb_sds_cmp(key->name, gb_key->name,
+ flb_sds_len(gb_key->name)) == 0) {
+ if (subkeys_compare(key->subkeys, gb_key->subkeys) != 0) {
+ continue;
+ }
+
+ not_aggr--;
+
+ /* Map key selector with group-by */
+ key->gb_key = gb_key;
+ break;
+ }
+ }
+
+ not_aggr++;
+ }
+ }
+
+ /*
+ * if some aggregated function is required, not aggregated keys are
+ * not allowed so we return an error (-1).
+ */
+ if (aggr > 0 && not_aggr == 0) {
+ return aggr;
+ }
+ else if (aggr > 0 && not_aggr > 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Convert a string to a numerical representation:
+ *
+ * - if output number is an integer, 'i' is set and returns FLB_STR_INT
+ * - if output number is a float, 'd' is set and returns FLB_STR_FLOAT
+ * - if no conversion is possible (not a number), returns -1
+ */
+static int string_to_number(const char *str, int len, int64_t *i, double *d)
+{
+ int c;
+ int dots = 0;
+ char *end;
+ int64_t i_out;
+ double d_out;
+
+ /* Detect if this is a floating point number */
+ for (c = 0; c < len; c++) {
+ if (str[c] == '.') {
+ dots++;
+ }
+ }
+
+ if (dots > 1) {
+ return -1;
+ }
+ else if (dots == 1) {
+ /* Floating point number */
+ errno = 0;
+ d_out = strtold(str, &end);
+
+ /* Check for various possible errors */
+ if ((errno == ERANGE || (errno != 0 && d_out == 0))) {
+ return -1;
+ }
+
+ if (end == str) {
+ return -1;
+ }
+
+ *d = d_out;
+ return FLB_STR_FLOAT;
+ }
+ else {
+ /* Integer */
+ errno = 0;
+ i_out = strtoll(str, &end, 10);
+
+ /* Check for various possible errors */
+ if ((errno == ERANGE || (errno != 0 && i_out == 0))) {
+ return -1;
+ }
+
+ if (end == str) {
+ return -1;
+ }
+
+ *i = i_out;
+ return FLB_STR_INT;
+ }
+
+ return -1;
+}
+
+/*
+ * Convert a msgpack object value to a number 'if possible'. The conversion
+ * result is either stored on 'i' for 64 bits integers or in 'd' for
+ * float/doubles.
+ *
+ * This function aims to take care of strings representing a value too.
+ */
+static int object_to_number(msgpack_object obj, int64_t *i, double *d,
+ int convert_str_to_num)
+{
+ int ret;
+ int64_t i_out;
+ double d_out;
+ char str_num[20];
+
+ if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
+ obj.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
+ *i = obj.via.i64;
+ return FLB_STR_INT;
+ }
+ else if (obj.type == MSGPACK_OBJECT_FLOAT32 ||
+ obj.type == MSGPACK_OBJECT_FLOAT) {
+ *d = obj.via.f64;
+ return FLB_STR_FLOAT;
+ }
+ else if (obj.type == MSGPACK_OBJECT_STR && convert_str_to_num == FLB_TRUE) {
+ /* A numeric representation of a string should not exceed 19 chars */
+ if (obj.via.str.size > 19) {
+ return -1;
+ }
+
+ memcpy(str_num, obj.via.str.ptr, obj.via.str.size);
+ str_num[obj.via.str.size] = '\0';
+
+ ret = string_to_number(str_num, obj.via.str.size,
+ &i_out, &d_out);
+ if (ret == FLB_STR_FLOAT) {
+ *d = d_out;
+ return FLB_STR_FLOAT;
+ }
+ else if (ret == FLB_STR_INT) {
+ *i = i_out;
+ return FLB_STR_INT;
+ }
+ }
+
+ return -1;
+}
+
+int flb_sp_snapshot_create(struct flb_sp_task *task)
+{
+ struct flb_sp_cmd *cmd;
+ struct flb_sp_snapshot *snapshot;
+
+ cmd = task->cmd;
+
+ snapshot = (struct flb_sp_snapshot *) flb_calloc(1, sizeof(struct flb_sp_snapshot));
+ if (!snapshot) {
+ flb_error("[sp] could not create snapshot '%s'", cmd->stream_name);
+ return -1;
+ }
+
+ mk_list_init(&snapshot->pages);
+ snapshot->record_limit = cmd->limit;
+
+ if (flb_sp_cmd_stream_prop_get(cmd, "seconds") != NULL) {
+ snapshot->time_limit = atoi(flb_sp_cmd_stream_prop_get(cmd, "seconds"));
+ }
+
+ if (snapshot->time_limit == 0 && snapshot->record_limit == 0) {
+ flb_error("[sp] could not create snapshot '%s': size is not defined",
+ cmd->stream_name);
+ flb_sp_snapshot_destroy(snapshot);
+ return -1;
+ }
+
+ task->snapshot = snapshot;
+ return 0;
+}
+
+struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
+ const char *query)
+{
+ int fd;
+ int ret;
+ struct mk_event *event;
+ struct flb_sp_cmd *cmd;
+ struct flb_sp_task *task;
+
+ /*
+ * Parse and validate the incoming exec query and create the 'command'
+ * context (this will be associated to the task in a later step
+ */
+ cmd = flb_sp_cmd_create(query);
+
+ if (!cmd) {
+ flb_error("[sp] invalid query on task '%s': '%s'", name, query);
+ return NULL;
+ }
+
+ /* Check if we got an invalid type due an error/restriction */
+ if (cmd->status == FLB_SP_ERROR) {
+ flb_error("[sp] invalid query on task '%s': '%s'", name, query);
+ flb_sp_cmd_destroy(cmd);
+ return NULL;
+ }
+
+ /* Create the task context */
+ task = flb_calloc(1, sizeof(struct flb_sp_task));
+ if (!task) {
+ flb_errno();
+ flb_sp_cmd_destroy(cmd);
+ return NULL;
+ }
+ task->name = flb_sds_create(name);
+ if (!task->name) {
+ flb_free(task);
+ flb_sp_cmd_destroy(cmd);
+ return NULL;
+ }
+
+ task->query = flb_sds_create(query);
+ if (!task->query) {
+ flb_sds_destroy(task->name);
+ flb_free(task);
+ flb_sp_cmd_destroy(cmd);
+ return NULL;
+ }
+
+ task->sp = sp;
+ task->cmd = cmd;
+ mk_list_add(&task->_head, &sp->tasks);
+
+ /*
+ * Assume no aggregated keys exists, if so, a different strategy is
+ * required to process the records.
+ */
+ task->aggregate_keys = FLB_FALSE;
+
+ mk_list_init(&task->window.data);
+ mk_list_init(&task->window.aggregate_list);
+ rb_tree_new(&task->window.aggregate_tree, flb_sp_groupby_compare);
+
+ mk_list_init(&task->window.hopping_slot);
+
+ /* Check and validate aggregated keys */
+ ret = sp_cmd_aggregated_keys(task->cmd);
+ if (ret == -1) {
+ flb_error("[sp] aggregated query cannot mix not aggregated keys: %s",
+ query);
+ flb_sp_task_destroy(task);
+ return NULL;
+ }
+ else if (ret > 0) {
+ task->aggregate_keys = FLB_TRUE;
+
+ task->window.type = cmd->window.type;
+
+ /* Register a timer event when task contains aggregation rules */
+ if (task->window.type != FLB_SP_WINDOW_DEFAULT) {
+ /* Initialize event loop context */
+ event = &task->window.event;
+ MK_EVENT_ZERO(event);
+
+ /* Run every 'size' seconds */
+ fd = mk_event_timeout_create(sp->config->evl,
+ cmd->window.size, (long) 0,
+ &task->window.event);
+ if (fd == -1) {
+ flb_error("[sp] registration for task %s failed", task->name);
+ flb_free(task);
+ return NULL;
+ }
+ task->window.fd = fd;
+
+ if (task->window.type == FLB_SP_WINDOW_HOPPING) {
+ /* Initialize event loop context */
+ event = &task->window.event_hop;
+ MK_EVENT_ZERO(event);
+
+ /* Run every 'size' seconds */
+ fd = mk_event_timeout_create(sp->config->evl,
+ cmd->window.advance_by, (long) 0,
+ &task->window.event_hop);
+ if (fd == -1) {
+ flb_error("[sp] registration for task %s failed", task->name);
+ flb_free(task);
+ return NULL;
+ }
+ task->window.advance_by = cmd->window.advance_by;
+ task->window.fd_hop = fd;
+ task->window.first_hop = true;
+ }
+ }
+ }
+
+ /* Init snapshot page list */
+ if (cmd->type == FLB_SP_CREATE_SNAPSHOT) {
+ if (flb_sp_snapshot_create(task) == -1) {
+ flb_sp_task_destroy(task);
+ return NULL;
+ }
+ }
+
+ /*
+ * If the task involves a stream creation (CREATE STREAM abc..), create
+ * the stream.
+ */
+ if (cmd->type == FLB_SP_CREATE_STREAM ||
+ cmd->type == FLB_SP_CREATE_SNAPSHOT ||
+ cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
+
+ ret = flb_sp_stream_create(cmd->stream_name, task, sp);
+ if (ret == -1) {
+ flb_error("[sp] could not create stream '%s'", cmd->stream_name);
+ flb_sp_task_destroy(task);
+ return NULL;
+ }
+ }
+
+ /*
+ * Based in the command type, check if the source of data is a known
+ * stream so make a reference on this task for a quick comparisson and
+ * access it when processing data.
+ */
+ sp_task_to_instance(task, sp);
+ return task;
+}
+
+void groupby_nums_destroy(struct aggregate_num *groupby_nums, int size)
+{
+ int i;
+
+ for (i = 0; i < size; i++) {
+ if (groupby_nums[i].type == FLB_SP_STRING) {
+ flb_sds_destroy(groupby_nums[i].string);
+ }
+ }
+
+ flb_free(groupby_nums);
+}
+
+/*
+ * Destroy aggregation node context: before to use this function make sure
+ * to unlink from the linked list.
+ */
+void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
+ struct aggregate_node *aggr_node)
+{
+ int i;
+ int key_id;
+ struct mk_list *head;
+ struct aggregate_num *num;
+ struct flb_sp_cmd_key *ckey;
+
+ for (i = 0; i < aggr_node->nums_size; i++) {
+ num = &aggr_node->nums[i];
+ if (num->type == FLB_SP_STRING) {
+ flb_sds_destroy(num->string);
+ }
+ }
+
+ groupby_nums_destroy(aggr_node->groupby_nums, aggr_node->groupby_keys);
+
+ key_id = 0;
+ mk_list_foreach(head, &cmd->keys) {
+ ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+
+ if (!ckey->aggr_func) {
+ key_id++;
+ continue;
+ }
+
+ aggregate_func_destroy[ckey->aggr_func - 1](aggr_node, key_id);
+ key_id++;
+ }
+
+ flb_free(aggr_node->nums);
+ flb_free(aggr_node->aggregate_data);
+ flb_free(aggr_node);
+}
+
+void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
+ struct flb_sp_task_window *window)
+{
+ struct flb_sp_window_data *data;
+ struct aggregate_node *aggr_node;
+ struct flb_sp_hopping_slot *hs;
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct mk_list *head_hs;
+ struct mk_list *tmp_hs;
+
+ mk_list_foreach_safe(head, tmp, &window->data) {
+ data = mk_list_entry(head, struct flb_sp_window_data, _head);
+ flb_free(data->buf_data);
+ mk_list_del(&data->_head);
+ flb_free(data);
+ }
+
+ mk_list_foreach_safe(head, tmp, &window->aggregate_list) {
+ aggr_node = mk_list_entry(head, struct aggregate_node, _head);
+ mk_list_del(&aggr_node->_head);
+ flb_sp_aggregate_node_destroy(cmd, aggr_node);
+ }
+
+ mk_list_foreach_safe(head, tmp, &window->hopping_slot) {
+ hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head);
+ mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) {
+ aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head);
+ mk_list_del(&aggr_node->_head);
+ flb_sp_aggregate_node_destroy(cmd, aggr_node);
+ }
+ rb_tree_destroy(&hs->aggregate_tree);
+ flb_free(hs);
+ }
+
+ rb_tree_destroy(&window->aggregate_tree);
+}
+
+void flb_sp_task_destroy(struct flb_sp_task *task)
+{
+ flb_sds_destroy(task->name);
+ flb_sds_destroy(task->query);
+ flb_sp_window_destroy(task->cmd, &task->window);
+ flb_sp_snapshot_destroy(task->snapshot);
+ mk_list_del(&task->_head);
+
+ if (task->stream) {
+ flb_sp_stream_destroy(task->stream, task->sp);
+ }
+
+ flb_sp_cmd_destroy(task->cmd);
+ flb_free(task);
+}
+
+/* Create the stream processor context */
+struct flb_sp *flb_sp_create(struct flb_config *config)
+{
+ int i = 0;
+ int ret;
+ char buf[32];
+ struct mk_list *head;
+ struct flb_sp *sp;
+ struct flb_slist_entry *e;
+ struct flb_sp_task *task;
+
+ /* Allocate context */
+ sp = flb_malloc(sizeof(struct flb_sp));
+ if (!sp) {
+ flb_errno();
+ return NULL;
+ }
+ sp->config = config;
+ mk_list_init(&sp->tasks);
+
+ /* Check for pre-configured Tasks (command line) */
+ mk_list_foreach(head, &config->stream_processor_tasks) {
+ e = mk_list_entry(head, struct flb_slist_entry, _head);
+ snprintf(buf, sizeof(buf) - 1, "flb-console:%i", i);
+ i++;
+ task = flb_sp_task_create(sp, buf, e->str);
+ if (!task) {
+ continue;
+ }
+ }
+
+ /* Lookup configuration file if any */
+ if (config->stream_processor_file) {
+ ret = sp_config_file(config, sp, config->stream_processor_file);
+ if (ret == -1) {
+ flb_error("[sp] could not initialize stream processor");
+ flb_sp_destroy(sp);
+ return NULL;
+ }
+ }
+
+ /* Write sp info to stdout */
+ sp_info(sp);
+
+ return sp;
+}
+
+void free_value(struct flb_exp_val *v)
+{
+ if (!v) {
+ return;
+ }
+
+ if (v->type == FLB_EXP_STRING) {
+ flb_sds_destroy(v->val.string);
+ }
+
+ flb_free(v);
+}
+
+static void itof_convert(struct flb_exp_val *val)
+{
+ if (val->type != FLB_EXP_INT) {
+ return;
+ }
+
+ val->type = FLB_EXP_FLOAT;
+ val->val.f64 = (double) val->val.i64;
+}
+
+/* Convert (string) expression to number */
+static void exp_string_to_number(struct flb_exp_val *val)
+{
+ int ret;
+ int len;
+ int64_t i = 0;
+ char *str;
+ double d = 0.0;
+
+ len = flb_sds_len(val->val.string);
+ str = val->val.string;
+
+ ret = string_to_number(str, len, &i, &d);
+ if (ret == -1) {
+ return;
+ }
+
+ /* Assign to proper type */
+ if (ret == FLB_STR_FLOAT) {
+ flb_sds_destroy(val->val.string);
+ val->type = FLB_EXP_FLOAT;
+ val->val.f64 = d;
+ }
+ else if (ret == FLB_STR_INT) {
+ flb_sds_destroy(val->val.string);
+ val->type = FLB_EXP_INT;
+ val->val.i64 = i;
+ }
+}
+
+static void numerical_comp(struct flb_exp_val *left,
+ struct flb_exp_val *right,
+ struct flb_exp_val *result, int op)
+{
+ result->type = FLB_EXP_BOOL;
+
+ if (left == NULL || right == NULL) {
+ result->val.boolean = false;
+ return;
+ }
+
+ /* Check if left expression value is a number, if so, convert it */
+ if (left->type == FLB_EXP_STRING && right->type != FLB_EXP_STRING) {
+ exp_string_to_number(left);
+ }
+
+ if (left->type == FLB_EXP_INT && right->type == FLB_EXP_FLOAT) {
+ itof_convert(left);
+ }
+ else if (left->type == FLB_EXP_FLOAT && right->type == FLB_EXP_INT) {
+ itof_convert(right);
+ }
+
+ switch (op) {
+ case FLB_EXP_EQ:
+ if (left->type == right->type) {
+ switch(left->type) {
+ case FLB_EXP_NULL:
+ result->val.boolean = true;
+ break;
+ case FLB_EXP_BOOL:
+ result->val.boolean = (left->val.boolean == right->val.boolean);
+ break;
+ case FLB_EXP_INT:
+ result->val.boolean = (left->val.i64 == right->val.i64);
+ break;
+ case FLB_EXP_FLOAT:
+ result->val.boolean = (left->val.f64 == right->val.f64);
+ break;
+ case FLB_EXP_STRING:
+ if (flb_sds_len(left->val.string) !=
+ flb_sds_len(right->val.string)) {
+ result->val.boolean = false;
+ }
+ else if (strncmp(left->val.string, right->val.string,
+ flb_sds_len(left->val.string)) != 0) {
+ result->val.boolean = false;
+ }
+ else {
+ result->val.boolean = true;
+ }
+ break;
+ default:
+ result->val.boolean = false;
+ break;
+ }
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ case FLB_EXP_LT:
+ if (left->type == right->type) {
+ switch(left->type) {
+ case FLB_EXP_INT:
+ result->val.boolean = (left->val.i64 < right->val.i64);
+ break;
+ case FLB_EXP_FLOAT:
+ result->val.boolean = (left->val.f64 < right->val.f64);
+ break;
+ case FLB_EXP_STRING:
+ if (strncmp(left->val.string, right->val.string,
+ flb_sds_len(left->val.string)) < 0) {
+ result->val.boolean = true;
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ default:
+ result->val.boolean = false;
+ break;
+ }
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ case FLB_EXP_LTE:
+ if (left->type == right->type) {
+ switch(left->type) {
+ case FLB_EXP_INT:
+ result->val.boolean = (left->val.i64 <= right->val.i64);
+ break;
+ case FLB_EXP_FLOAT:
+ result->val.boolean = (left->val.f64 <= right->val.f64);
+ break;
+ case FLB_EXP_STRING:
+ if (strncmp(left->val.string, right->val.string,
+ flb_sds_len(left->val.string)) <= 0) {
+ result->val.boolean = true;
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ default:
+ result->val.boolean = false;
+ break;
+ }
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ case FLB_EXP_GT:
+ if (left->type == right->type) {
+ switch(left->type) {
+ case FLB_EXP_INT:
+ result->val.boolean = (left->val.i64 > right->val.i64);
+ break;
+ case FLB_EXP_FLOAT:
+ result->val.boolean = (left->val.f64 > right->val.f64);
+ break;
+ case FLB_EXP_STRING:
+ if (strncmp(left->val.string, right->val.string,
+ flb_sds_len(left->val.string)) > 0) {
+ result->val.boolean = true;
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ default:
+ result->val.boolean = false;
+ break;
+ }
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ case FLB_EXP_GTE:
+ if (left->type == right->type) {
+ switch(left->type) {
+ case FLB_EXP_INT:
+ result->val.boolean = (left->val.i64 >= right->val.i64);
+ break;
+ case FLB_EXP_FLOAT:
+ result->val.boolean = (left->val.f64 >= right->val.f64);
+ break;
+ case FLB_EXP_STRING:
+ if (strncmp(left->val.string, right->val.string,
+ flb_sds_len(left->val.string)) >= 0) {
+ result->val.boolean = true;
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ default:
+ result->val.boolean = false;
+ break;
+ }
+ }
+ else {
+ result->val.boolean = false;
+ }
+ break;
+ }
+}
+
+static bool value_to_bool(struct flb_exp_val *val) {
+ bool result = FLB_FALSE;
+
+ switch (val->type) {
+ case FLB_EXP_BOOL:
+ result = val->val.boolean;
+ break;
+ case FLB_EXP_INT:
+ result = val->val.i64 > 0;
+ break;
+ case FLB_EXP_FLOAT:
+ result = val->val.f64 > 0;
+ break;
+ case FLB_EXP_STRING:
+ result = true;
+ break;
+ }
+
+ return result;
+}
+
+
+static void logical_operation(struct flb_exp_val *left,
+ struct flb_exp_val *right,
+ struct flb_exp_val *result, int op)
+{
+ bool lval;
+ bool rval;
+
+ result->type = FLB_EXP_BOOL;
+
+ /* Null is always interpreted as false in a logical operation */
+ lval = left ? value_to_bool(left) : false;
+ rval = right ? value_to_bool(right) : false;
+
+ switch (op) {
+ case FLB_EXP_NOT:
+ result->val.boolean = !lval;
+ break;
+ case FLB_EXP_AND:
+ result->val.boolean = lval & rval;
+ break;
+ case FLB_EXP_OR:
+ result->val.boolean = lval | rval;
+ break;
+ }
+}
+
+static struct flb_exp_val *reduce_expression(struct flb_exp *expression,
+ const char *tag, int tag_len,
+ struct flb_time *tms,
+ msgpack_object *map)
+{
+ int operation;
+ flb_sds_t s;
+ flb_sds_t tmp_sds = NULL;
+ struct flb_exp_key *key;
+ struct flb_sp_value *sval;
+ struct flb_exp_val *ret, *left, *right;
+ struct flb_exp_val *result;
+
+ if (!expression) {
+ return NULL;
+ }
+
+ result = flb_calloc(1, sizeof(struct flb_exp_val));
+ if (!result) {
+ flb_errno();
+ return NULL;
+ }
+
+ switch (expression->type) {
+ case FLB_EXP_NULL:
+ result->type = expression->type;
+ break;
+ case FLB_EXP_BOOL:
+ result->type = expression->type;
+ result->val.boolean = ((struct flb_exp_val *) expression)->val.boolean;
+ break;
+ case FLB_EXP_INT:
+ result->type = expression->type;
+ result->val.i64 = ((struct flb_exp_val *) expression)->val.i64;
+ break;
+ case FLB_EXP_FLOAT:
+ result->type = expression->type;
+ result->val.f64 = ((struct flb_exp_val *) expression)->val.f64;
+ break;
+ case FLB_EXP_STRING:
+ s = ((struct flb_exp_val *) expression)->val.string;
+ result->type = expression->type;
+ result->val.string = flb_sds_create_size(flb_sds_len(s));
+ tmp_sds = flb_sds_copy(result->val.string, s, flb_sds_len(s));
+ if (tmp_sds != result->val.string) {
+ result->val.string = tmp_sds;
+ }
+ break;
+ case FLB_EXP_KEY:
+ key = (struct flb_exp_key *) expression;
+ sval = flb_sp_key_to_value(key->name, *map, key->subkeys);
+ if (sval) {
+ result->type = sval->type;
+ result->val = sval->val;
+ flb_free(sval);
+ return result;
+ }
+ else {
+ flb_free(result);
+ return NULL;
+ }
+ break;
+ case FLB_EXP_FUNC:
+ /* we don't need result */
+ flb_free(result);
+ ret = reduce_expression(((struct flb_exp_func *) expression)->param,
+ tag, tag_len, tms, map);
+ result = ((struct flb_exp_func *) expression)->cb_func(tag, tag_len,
+ tms, ret);
+ free_value(ret);
+ break;
+ case FLB_LOGICAL_OP:
+ left = reduce_expression(expression->left,
+ tag, tag_len, tms, map);
+ right = reduce_expression(expression->right,
+ tag, tag_len, tms, map);
+
+ operation = ((struct flb_exp_op *) expression)->operation;
+
+ switch (operation) {
+ case FLB_EXP_PAR:
+ if (left == NULL) { /* Null is always interpreted as false in a
+ logical operation */
+ result->type = FLB_EXP_BOOL;
+ result->val.boolean = false;
+ }
+ else { /* Left and right sides of a logical operation reduce to
+ boolean values */
+ result->type = FLB_EXP_BOOL;
+ result->val.boolean = left->val.boolean;
+ }
+ break;
+ case FLB_EXP_EQ:
+ case FLB_EXP_LT:
+ case FLB_EXP_LTE:
+ case FLB_EXP_GT:
+ case FLB_EXP_GTE:
+ numerical_comp(left, right, result, operation);
+ break;
+ case FLB_EXP_NOT:
+ case FLB_EXP_AND:
+ case FLB_EXP_OR:
+ logical_operation(left, right, result, operation);
+ break;
+ }
+ free_value(left);
+ free_value(right);
+ }
+ return result;
+}
+
+
+void package_results(const char *tag, int tag_len,
+ char **out_buf, size_t *out_size,
+ struct flb_sp_task *task)
+{
+ int i;
+ int len;
+ int map_entries;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ struct aggregate_num *num;
+ struct flb_time tm;
+ struct flb_sp_cmd_key *ckey;
+ struct flb_sp_cmd *cmd = task->cmd;
+ struct mk_list *head;
+ struct aggregate_node *aggr_node;
+ struct flb_sp_cmd_gb_key *gb_key = NULL;
+
+ map_entries = mk_list_size(&cmd->keys);
+
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ mk_list_foreach(head, &task->window.aggregate_list) {
+ aggr_node = mk_list_entry(head, struct aggregate_node, _head);
+
+ /* set outgoing array + map and it fixed size */
+ msgpack_pack_array(&mp_pck, 2);
+
+ flb_time_get(&tm);
+ flb_time_append_to_msgpack(&tm, &mp_pck, 0);
+ msgpack_pack_map(&mp_pck, map_entries);
+
+ /* Packaging results */
+ ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key, _head);
+ for (i = 0; i < map_entries; i++) {
+ num = &aggr_node->nums[i];
+
+ /* Check if there is a defined function */
+ if (ckey->time_func > 0) {
+ flb_sp_func_time(&mp_pck, ckey);
+ goto next;
+ }
+ else if (ckey->record_func > 0) {
+ flb_sp_func_record(tag, tag_len, &tm, &mp_pck, ckey);
+ goto next;
+ }
+
+ /* Pack key */
+ if (ckey->alias) {
+ msgpack_pack_str(&mp_pck, flb_sds_len(ckey->alias));
+ msgpack_pack_str_body(&mp_pck,
+ ckey->alias,
+ flb_sds_len(ckey->alias));
+ }
+ else {
+ len = 0;
+ char *c_name;
+ if (!ckey->name) {
+ c_name = "*";
+ }
+ else {
+ c_name = ckey->name;
+ }
+
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, c_name, len);
+ }
+
+ /*
+ * If a group_by key is mapped as a source of this key,
+ * change the 'num' reference to obtain the proper information
+ * for the grouped key value.
+ */
+ if (ckey->gb_key != NULL) {
+ gb_key = ckey->gb_key;
+ if (aggr_node->groupby_keys > 0) {
+ num = &aggr_node->groupby_nums[gb_key->id];
+ }
+ }
+
+ /* Pack value */
+ switch (ckey->aggr_func) {
+ case FLB_SP_NOP:
+ if (num->type == FLB_SP_NUM_I64) {
+ msgpack_pack_int64(&mp_pck, num->i64);
+ }
+ else if (num->type == FLB_SP_NUM_F64) {
+ msgpack_pack_float(&mp_pck, num->f64);
+ }
+ else if (num->type == FLB_SP_STRING) {
+ msgpack_pack_str(&mp_pck,
+ flb_sds_len(num->string));
+ msgpack_pack_str_body(&mp_pck,
+ num->string,
+ flb_sds_len(num->string));
+ }
+ else if (num->type == FLB_SP_BOOLEAN) {
+ if (num->boolean) {
+ msgpack_pack_true(&mp_pck);
+ }
+ else {
+ msgpack_pack_false(&mp_pck);
+ }
+ }
+ break;
+ default:
+ aggregate_func_calc[ckey->aggr_func - 1](aggr_node, ckey, &mp_pck, i);
+ break;
+ }
+
+next:
+ ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key,
+ _head, &cmd->keys);
+ }
+ }
+
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+}
+
+static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *task,
+ msgpack_object map,
+ int convert_str_to_num)
+{
+ int i;
+ int ret;
+ int map_size;
+ int key_id;
+ int map_entries;
+ int gb_entries;
+ int values_found;
+ int64_t ival;
+ double dval;
+ struct flb_sp_value *sval;
+ struct aggregate_num *gb_nums;
+ struct aggregate_node *aggr_node;
+ struct flb_sp_cmd *cmd;
+ struct flb_sp_cmd_gb_key *gb_key;
+ struct mk_list *head;
+ struct rb_tree_node *rb_result;
+ msgpack_object key;
+
+ aggr_node = NULL;
+ cmd = task->cmd;
+ map_size = map.via.map.size;
+ values_found = 0;
+
+ /* Number of expected output entries in the map */
+ map_entries = mk_list_size(&cmd->keys);
+ gb_entries = mk_list_size(&cmd->gb_keys);
+
+ if (gb_entries > 0) {
+ gb_nums = flb_calloc(1, sizeof(struct aggregate_num) * gb_entries);
+ if (!gb_nums) {
+ return NULL;
+ }
+
+ /* extract GROUP BY values */
+ for (i = 0; i < map_size; i++) { /* extract group-by values */
+ key = map.via.map.ptr[i].key;
+
+ key_id = 0;
+ mk_list_foreach(head, &cmd->gb_keys) {
+ gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key,
+ _head);
+ if (flb_sds_cmp(gb_key->name, key.via.str.ptr,
+ key.via.str.size) != 0) {
+ key_id++;
+ continue;
+ }
+
+ sval = flb_sp_key_to_value(gb_key->name, map, gb_key->subkeys);
+ if (!sval) {
+ /* If evaluation fails/sub-key doesn't exist */
+ key_id++;
+ continue;
+ }
+
+ values_found++;
+
+ /* Convert string to number if that is possible */
+ ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num);
+ if (ret == -1) {
+ if (sval->o.type == MSGPACK_OBJECT_STR) {
+ gb_nums[key_id].type = FLB_SP_STRING;
+ gb_nums[key_id].string =
+ flb_sds_create_len(sval->o.via.str.ptr,
+ sval->o.via.str.size);
+ }
+ else if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) {
+ gb_nums[key_id].type = FLB_SP_NUM_I64;
+ gb_nums[key_id].i64 = sval->o.via.boolean;
+ }
+ }
+ else if (ret == FLB_STR_INT) {
+ gb_nums[key_id].type = FLB_SP_NUM_I64;
+ gb_nums[key_id].i64 = ival;
+ }
+ else if (ret == FLB_STR_FLOAT) {
+ gb_nums[key_id].type = FLB_SP_NUM_F64;
+ gb_nums[key_id].f64 = dval;
+ }
+
+ key_id++;
+ flb_sp_key_value_destroy(sval);
+ }
+ }
+
+ /* if some GROUP BY keys are not found in the record */
+ if (values_found < gb_entries) {
+ groupby_nums_destroy(gb_nums, gb_entries);
+ return NULL;
+ }
+
+ aggr_node = (struct aggregate_node *) flb_calloc(1, sizeof(struct aggregate_node));
+ if (!aggr_node) {
+ flb_errno();
+ groupby_nums_destroy(gb_nums, gb_entries);
+ return NULL;
+ }
+
+ aggr_node->groupby_keys = gb_entries;
+ aggr_node->groupby_nums = gb_nums;
+
+ rb_tree_find_or_insert(&task->window.aggregate_tree, aggr_node, &aggr_node->_rb_head, &rb_result);
+ if (&aggr_node->_rb_head != rb_result) {
+ /* We don't need aggr_node anymore */
+ flb_sp_aggregate_node_destroy(cmd, aggr_node);
+
+ aggr_node = container_of(rb_result, struct aggregate_node, _rb_head);
+ container_of(rb_result, struct aggregate_node, _rb_head)->records++;
+ }
+ else {
+ aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries);
+ if (!aggr_node->nums) {
+ flb_sp_aggregate_node_destroy(cmd, aggr_node);
+ return NULL;
+ }
+ aggr_node->records = 1;
+ aggr_node->nums_size = map_entries;
+ aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
+ mk_list_add(&aggr_node->_head, &task->window.aggregate_list);
+ }
+ }
+ else { /* If query doesn't have GROUP BY */
+ if (!mk_list_size(&task->window.aggregate_list)) {
+ aggr_node = flb_calloc(1, sizeof(struct aggregate_node));
+ if (!aggr_node) {
+ flb_errno();
+ return NULL;
+ }
+ aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries);
+ if (!aggr_node->nums) {
+ flb_sp_aggregate_node_destroy(cmd, aggr_node);
+ return NULL;
+ }
+
+ aggr_node->nums_size = map_entries;
+ aggr_node->records = 1;
+ aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
+ mk_list_add(&aggr_node->_head, &task->window.aggregate_list);
+ }
+ else {
+ aggr_node = mk_list_entry_first(&task->window.aggregate_list, struct aggregate_node, _head);
+ aggr_node->records++;
+ }
+ }
+
+ return aggr_node;
+}
+
+/*
+ * Process data, task and it defined command involves the call of aggregation
+ * functions (AVG, SUM, COUNT, MIN, MAX).
+ */
+int sp_process_data_aggr(const char *buf_data, size_t buf_size,
+ const char *tag, int tag_len,
+ struct flb_sp_task *task,
+ struct flb_sp *sp,
+ int convert_str_to_num)
+{
+ int i;
+ int ok;
+ int ret;
+ int map_size;
+ int key_id;
+ size_t off;
+ int64_t ival;
+ double dval;
+ msgpack_object root;
+ msgpack_object map;
+ msgpack_unpacked result;
+ msgpack_object key;
+ msgpack_object *obj;
+ struct aggregate_num *nums = NULL;
+ struct mk_list *head;
+ struct flb_time tms;
+ struct flb_sp_cmd *cmd = task->cmd;
+ struct flb_sp_cmd_key *ckey;
+ struct flb_sp_value *sval;
+ struct flb_exp_val *condition;
+ struct aggregate_node *aggr_node;
+
+ /* Number of expected output entries in the map */
+ off = 0;
+
+ /* vars initialization */
+ ok = MSGPACK_UNPACK_SUCCESS;
+ msgpack_unpacked_init(&result);
+
+ /* Iterate incoming records */
+ while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) {
+ root = result.data;
+
+ /* extract timestamp */
+ flb_time_pop_from_msgpack(&tms, &result, &obj);
+
+ /* get the map data and it size (number of items) */
+ map = root.via.array.ptr[1];
+ map_size = map.via.map.size;
+
+ /* Evaluate condition */
+ if (cmd->condition) {
+ condition = reduce_expression(cmd->condition,
+ tag, tag_len, &tms, &map);
+ if (!condition) {
+ continue;
+ }
+ else if (!condition->val.boolean) {
+ flb_free(condition);
+ continue;
+ }
+ else {
+ flb_free(condition);
+ }
+ }
+
+ aggr_node = sp_process_aggregate_data(task, map, convert_str_to_num);
+ if (!aggr_node)
+ {
+ continue;
+ }
+
+ task->window.records++;
+
+ nums = aggr_node->nums;
+
+ /* Iterate each map key and see if it matches any command key */
+ for (i = 0; i < map_size; i++) {
+ key = map.via.map.ptr[i].key;
+
+ if (key.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+
+ /*
+ * Iterate each command key. Note that since the command key
+ * can have different aggregation functions to the same key
+ * we should compare all of them.
+ */
+ key_id = 0;
+ mk_list_foreach(head, &cmd->keys) {
+ ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+
+ if (!ckey->name) {
+ key_id++;
+ continue;
+ }
+
+ if (flb_sds_cmp(ckey->name, key.via.str.ptr,
+ key.via.str.size) != 0) {
+ key_id++;
+ continue;
+ }
+
+ /* convert the value if it string */
+ sval = flb_sp_key_to_value(ckey->name, map, ckey->subkeys);
+ if (!sval) {
+ key_id++;
+ continue;
+ }
+
+ /*
+ * Convert value to a numeric representation only if key has an
+ * assigned aggregation function
+ */
+ ival = 0;
+ dval = 0.0;
+ if (ckey->aggr_func != FLB_SP_NOP) {
+ ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num);
+ if (ret == -1) {
+ /* Value cannot be represented as a number */
+ key_id++;
+ flb_sp_key_value_destroy(sval);
+ continue;
+ }
+
+ /*
+ * If a floating pointer number exists, we use the same data
+ * type for the output.
+ */
+ if (dval != 0.0 && nums[key_id].type == FLB_SP_NUM_I64) {
+ nums[key_id].type = FLB_SP_NUM_F64;
+ nums[key_id].f64 = (double) nums[key_id].i64;
+ }
+
+ aggregate_func_add[ckey->aggr_func - 1](aggr_node, ckey, key_id, &tms, ival, dval);
+ }
+ else {
+ if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) {
+ nums[key_id].type = FLB_SP_BOOLEAN;
+ nums[key_id].boolean = sval->o.via.boolean;
+ }
+ if (sval->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
+ sval->o.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
+ nums[key_id].type = FLB_SP_NUM_I64;
+ nums[key_id].i64 = sval->o.via.i64;
+ }
+ else if (sval->o.type == MSGPACK_OBJECT_FLOAT32 ||
+ sval->o.type == MSGPACK_OBJECT_FLOAT) {
+ nums[key_id].type = FLB_SP_NUM_F64;
+ nums[key_id].f64 = sval->o.via.f64;
+ }
+ else if (sval->o.type == MSGPACK_OBJECT_STR) {
+ nums[key_id].type = FLB_SP_STRING;
+ if (nums[key_id].string == NULL) {
+ nums[key_id].string =
+ flb_sds_create_len(sval->o.via.str.ptr,
+ sval->o.via.str.size);
+ }
+ }
+ }
+
+ key_id++;
+ flb_sp_key_value_destroy(sval);
+ }
+ }
+ }
+
+ msgpack_unpacked_destroy(&result);
+ return task->window.records;
+}
+
+/*
+ * Data processing (no aggregation functions)
+ */
+int sp_process_data(const char *tag, int tag_len,
+ const char *buf_data, size_t buf_size,
+ char **out_buf, size_t *out_size,
+ struct flb_sp_task *task,
+ struct flb_sp *sp)
+{
+ int i;
+ int ok;
+ int ret;
+ int map_size;
+ int map_entries;
+ int records;
+ uint8_t h;
+ off_t map_off;
+ off_t no_data;
+ size_t off;
+ size_t off_copy;
+ size_t snapshot_out_size;
+ char *tmp;
+ char *snapshot_out_buffer;
+ msgpack_object root;
+ msgpack_object *obj;
+ msgpack_object key;
+ msgpack_object val;
+ msgpack_unpacked result;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ msgpack_object map;
+ struct flb_time tms;
+ struct mk_list *head;
+ struct flb_sp_cmd *cmd;
+ struct flb_sp_cmd_key *cmd_key;
+ struct flb_exp_val *condition;
+ struct flb_sp_value *sval;
+
+ /* Vars initialization */
+ off = 0;
+ off_copy = off;
+ records = 0;
+ cmd = task->cmd;
+ ok = MSGPACK_UNPACK_SUCCESS;
+ msgpack_unpacked_init(&result);
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ snapshot_out_size = 0;
+ snapshot_out_buffer = NULL;
+
+ /* Iterate incoming records */
+ while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) {
+ root = result.data;
+
+ /* extract timestamp */
+ flb_time_pop_from_msgpack(&tms, &result, &obj);
+
+ /* Store the buffer if the stream is a snapshot */
+ if (cmd->type == FLB_SP_CREATE_SNAPSHOT) {
+ flb_sp_snapshot_update(task, buf_data + off_copy, off - off_copy, &tms);
+ off_copy = off;
+ continue;
+ }
+
+ /* get the map data and it size (number of items) */
+ map = root.via.array.ptr[1];
+ map_size = map.via.map.size;
+
+ /* Evaluate condition */
+ if (cmd->condition) {
+ condition = reduce_expression(cmd->condition,
+ tag, tag_len, &tms, &map);
+ if (!condition) {
+ continue;
+ }
+ else if (!condition->val.boolean) {
+ flb_free(condition);
+ continue;
+ }
+ else {
+ flb_free(condition);
+ }
+ }
+
+ records++;
+
+ /* Flush the snapshot if condition holds */
+ if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
+ if (flb_sp_snapshot_flush(sp, task, &snapshot_out_buffer,
+ &snapshot_out_size) == -1) {
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return -1;
+ }
+ continue;
+ }
+
+
+ /*
+ * If for some reason the Task keys did not insert any data, we will
+ * need to discard any changes and reset the buffer position, let's
+ * keep the memory size for that purpose.
+ */
+ no_data = mp_sbuf.size;
+
+ /* Pack main array */
+ msgpack_pack_array(&mp_pck, 2);
+ msgpack_pack_object(&mp_pck, root.via.array.ptr[0]);
+
+ /*
+ * Save the current size/position of the buffer since this is
+ * where the Map header will be stored.
+ */
+ map_off = mp_sbuf.size;
+
+ /*
+ * In the new record register the same number of items, if due to
+ * fields selection the number is lower, we perform an adjustment
+ */
+ msgpack_pack_map(&mp_pck, map_size);
+
+ /* Counter for new entries added to the outgoing map */
+ map_entries = 0;
+
+ /* Iterate key selection */
+ mk_list_foreach(head, &cmd->keys) {
+ cmd_key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+ if (cmd_key->time_func > 0) {
+ /* Process time function */
+ ret = flb_sp_func_time(&mp_pck, cmd_key);
+ if (ret > 0) {
+ map_entries += ret;
+ }
+ continue;
+ }
+ else if (cmd_key->record_func > 0) {
+ ret = flb_sp_func_record(tag, tag_len, &tms, &mp_pck, cmd_key);
+ if (ret > 0) {
+ map_entries += ret;
+ }
+ continue;
+ }
+
+ /* Lookup selection key in the incoming map */
+ for (i = 0; i < map_size; i++) {
+ key = map.via.map.ptr[i].key;
+ val = map.via.map.ptr[i].val;
+
+ if (key.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ /* Wildcard selection: * */
+ if (cmd_key->name == NULL) {
+ msgpack_pack_object(&mp_pck, key);
+ msgpack_pack_object(&mp_pck, val);
+ map_entries++;
+ continue;
+ }
+
+ /* Compare lengths */
+ if (flb_sds_cmp(cmd_key->name,
+ key.via.str.ptr, key.via.str.size) != 0) {
+ continue;
+ }
+
+ /*
+ * Package key name:
+ *
+ * Check if the command ask for an alias 'key AS abc'
+ */
+ if (cmd_key->alias) {
+ msgpack_pack_str(&mp_pck,
+ flb_sds_len(cmd_key->alias));
+ msgpack_pack_str_body(&mp_pck,
+ cmd_key->alias,
+ flb_sds_len(cmd_key->alias));
+ }
+ else {
+ msgpack_pack_object(&mp_pck, key);
+ }
+
+ /* Package value */
+ sval = flb_sp_key_to_value(cmd_key->name, map,
+ cmd_key->subkeys);
+ if (sval) {
+ msgpack_pack_object(&mp_pck, sval->o);
+ flb_sp_key_value_destroy(sval);
+ }
+
+ map_entries++;
+ }
+ }
+
+ /* Final Map size adjustment */
+ if (map_entries == 0) {
+ mp_sbuf.size = no_data;
+ }
+ else {
+ /*
+ * The fields were packed, now we need to adjust the map size
+ * to set the proper number of fields appended to the record.
+ */
+ tmp = mp_sbuf.data + map_off;
+ h = tmp[0];
+ if (h >> 4 == 0x8) {
+ *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) map_entries);
+ }
+ else if (h == 0xde) {
+ tmp++;
+ pack_uint16(tmp, map_entries);
+ }
+ else if (h == 0xdf) {
+ tmp++;
+ pack_uint32(tmp, map_entries);
+ }
+ }
+ }
+
+ msgpack_unpacked_destroy(&result);
+
+ if (records == 0) {
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return 0;
+ }
+
+ /* Use snapshot out buffer if it is flush stream */
+ if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
+ if (snapshot_out_size == 0) {
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ flb_free(snapshot_out_buffer);
+ return 0;
+ }
+ else {
+ *out_buf = snapshot_out_buffer;
+ *out_size = snapshot_out_size;
+ return records;
+ }
+ }
+
+ /* set outgoing results */
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+
+ return records;
+}
+
+int sp_process_hopping_slot(const char *tag, int tag_len,
+ struct flb_sp_task *task)
+{
+ int i;
+ int key_id;
+ int map_entries;
+ int gb_entries;
+ struct flb_sp_cmd *cmd = task->cmd;
+ struct mk_list *head;
+ struct mk_list *head_hs;
+ struct aggregate_node *aggr_node;
+ struct aggregate_node *aggr_node_hs;
+ struct aggregate_node *aggr_node_prev;
+ struct flb_sp_hopping_slot *hs;
+ struct flb_sp_hopping_slot *hs_;
+ struct rb_tree_node *rb_result;
+ struct flb_sp_cmd_key *ckey;
+ rb_result_t result;
+
+ map_entries = mk_list_size(&cmd->keys);
+ gb_entries = mk_list_size(&cmd->gb_keys);
+
+ /* Initialize a hoping slot */
+ hs = flb_calloc(1, sizeof(struct flb_sp_hopping_slot));
+ if (!hs) {
+ flb_errno();
+ return -1;
+ }
+
+ mk_list_init(&hs->aggregate_list);
+ rb_tree_new(&hs->aggregate_tree, flb_sp_groupby_compare);
+
+ /* Loop over aggregation nodes on window */
+ mk_list_foreach(head, &task->window.aggregate_list) {
+ /* Window aggregation node */
+ aggr_node = mk_list_entry(head, struct aggregate_node, _head);
+
+ /* Create a hopping slot aggregation node */
+ aggr_node_hs = flb_calloc(1, sizeof(struct aggregate_node));
+ if (!aggr_node_hs) {
+ flb_errno();
+ flb_free(hs);
+ return -1;
+ }
+
+ aggr_node_hs->nums = malloc(sizeof(struct aggregate_node) * map_entries);
+ if (!aggr_node_hs->nums) {
+ flb_errno();
+ flb_free(hs);
+ flb_free(aggr_node_hs);
+ return -1;
+ }
+
+ memcpy(aggr_node_hs->nums, aggr_node->nums, sizeof(struct aggregate_num) * map_entries);
+ aggr_node_hs->records = aggr_node->records;
+
+ /* Clone aggregate data */
+ key_id = 0;
+ mk_list_foreach(head_hs, &cmd->keys) {
+ ckey = mk_list_entry(head_hs, struct flb_sp_cmd_key, _head);
+
+ if (ckey->aggr_func) {
+ if (!aggr_node_hs->aggregate_data) {
+ aggr_node_hs->aggregate_data = (struct aggregate_data **)
+ flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
+ if (!aggr_node_hs->aggregate_data) {
+ flb_errno();
+ flb_free(hs);
+ flb_free(aggr_node_hs->nums);
+ flb_free(aggr_node_hs);
+ return -1;
+ }
+ }
+
+ if (aggregate_func_clone[ckey->aggr_func - 1](aggr_node_hs, aggr_node, ckey, key_id) == -1) {
+ flb_errno();
+ flb_free(aggr_node_hs->nums);
+ flb_free(aggr_node_hs->aggregate_data);
+ flb_free(aggr_node_hs);
+ flb_free(hs);
+ return -1;
+ }
+ }
+
+ key_id++;
+ }
+
+ /* Traverse over previous slots to calculate values/record numbers */
+ mk_list_foreach(head_hs, &task->window.hopping_slot) {
+ hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head);
+ result = rb_tree_find(&hs_->aggregate_tree, aggr_node, &rb_result);
+ /* If corresponding aggregation node exists in previous hopping slot,
+ * calculate aggregation values
+ */
+ if (result == RB_OK) {
+ aggr_node_prev = mk_list_entry(rb_result, struct aggregate_node,
+ _rb_head);
+ aggr_node_hs->records -= aggr_node_prev->records;
+
+ key_id = 0;
+ ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key,
+ _head);
+ for (i = 0; i < map_entries; i++) {
+ if (ckey->aggr_func) {
+ aggregate_func_remove[ckey->aggr_func - 1](aggr_node_hs, aggr_node_prev, i);
+ }
+
+ ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key,
+ _head, &cmd->keys);
+ }
+ }
+ }
+
+ if (aggr_node_hs->records > 0) {
+ aggr_node_hs->groupby_nums =
+ flb_calloc(1, sizeof(struct aggregate_node) * gb_entries);
+ if (gb_entries > 0 && !aggr_node_hs->groupby_nums) {
+ flb_errno();
+ flb_free(hs);
+ flb_free(aggr_node_hs->nums);
+ flb_free(aggr_node_hs->aggregate_data);
+ flb_free(aggr_node_hs);
+ return -1;
+ }
+
+ if (aggr_node_hs->groupby_nums != NULL) {
+ memcpy(aggr_node_hs->groupby_nums, aggr_node->groupby_nums,
+ sizeof(struct aggregate_num) * gb_entries);
+ }
+
+ aggr_node_hs->nums_size = aggr_node->nums_size;
+ aggr_node_hs->groupby_keys = aggr_node->groupby_keys;
+
+ rb_tree_insert(&hs->aggregate_tree, aggr_node_hs, &aggr_node_hs->_rb_head);
+ mk_list_add(&aggr_node_hs->_head, &hs->aggregate_list);
+ }
+ else {
+ flb_free(aggr_node_hs->nums);
+ flb_free(aggr_node_hs->aggregate_data);
+ flb_free(aggr_node_hs);
+ }
+ }
+
+ hs->records = task->window.records;
+ mk_list_foreach(head_hs, &task->window.hopping_slot) {
+ hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head);
+ hs->records -= hs_->records;
+ }
+
+ mk_list_add(&hs->_head, &task->window.hopping_slot);
+
+ return 0;
+}
+
+/* Iterate and find input chunks to process */
+int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in,
+ const char *tag, int tag_len,
+ const char *buf_data, size_t buf_size)
+
+{
+ int ret;
+ size_t out_size;
+ char *out_buf;
+ struct mk_list *head;
+ struct flb_sp_task *task;
+ struct flb_sp_cmd *cmd;
+
+ /* Lookup tasks that match the incoming instance data */
+ mk_list_foreach(head, &sp->tasks) {
+ task = mk_list_entry(head, struct flb_sp_task, _head);
+ cmd = task->cmd;
+
+ if (cmd->source_type == FLB_SP_STREAM) {
+ if (task->source_instance != in) {
+ continue;
+ }
+ }
+ else if (cmd->source_type == FLB_SP_TAG) {
+ ret = flb_router_match(tag, tag_len, cmd->source_name, NULL);
+ if (ret == FLB_FALSE) {
+ continue;
+ }
+ }
+
+ /* We found a task that matches the stream rule */
+ if (task->aggregate_keys == FLB_TRUE) {
+ ret = sp_process_data_aggr(buf_data, buf_size,
+ tag, tag_len,
+ task, sp, in->config->stream_processor_str_conv);
+
+ if (ret == -1) {
+ flb_error("[sp] error processing records for '%s'",
+ task->name);
+ continue;
+ }
+
+ if (flb_sp_window_populate(task, buf_data, buf_size) == -1) {
+ flb_error("[sp] error populating window for '%s'",
+ task->name);
+ continue;
+ }
+
+ if (task->window.type == FLB_SP_WINDOW_DEFAULT) {
+ package_results(tag, tag_len, &out_buf, &out_size, task);
+ flb_sp_window_prune(task);
+ }
+ }
+ else {
+ ret = sp_process_data(tag, tag_len,
+ buf_data, buf_size,
+ &out_buf, &out_size,
+ task, sp);
+
+ if (ret == -1) {
+ flb_error("[sp] error processing records for '%s'",
+ task->name);
+ continue;
+ }
+ }
+
+ if (ret == 0) {
+ /* no records */
+ continue;
+ }
+
+ /*
+ * This task involves append data to a stream, which
+ * means: register the output of the query as data
+ * generated by an input instance plugin.
+ */
+ if (task->aggregate_keys != FLB_TRUE ||
+ task->window.type == FLB_SP_WINDOW_DEFAULT) {
+ /*
+ * Add to stream processing stream if there is no
+ * aggregation function. Otherwise, write it at timer event
+ */
+ if (task->stream) {
+ flb_sp_stream_append_data(out_buf, out_size, task->stream);
+ }
+ else {
+ flb_pack_print(out_buf, out_size);
+ flb_free(out_buf);
+ }
+ }
+ }
+
+ return -1;
+}
+
+int flb_sp_fd_event(int fd, struct flb_sp *sp)
+{
+ bool update_timer_event;
+ char *out_buf;
+ char *tag = NULL;
+ int tag_len = 0;
+ int fd_timeout = 0;
+ size_t out_size;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_sp_task *task;
+ struct flb_input_instance *in = NULL;
+
+ /* Lookup Tasks that matches the incoming event */
+ mk_list_foreach_safe(head, tmp, &sp->tasks) {
+ task = mk_list_entry(head, struct flb_sp_task, _head);
+
+ if (fd == task->window.fd) {
+ update_timer_event = task->window.type == FLB_SP_WINDOW_HOPPING &&
+ task->window.first_hop;
+
+ in = task->source_instance;
+ if (in) {
+ if (in->tag && in->tag_len > 0) {
+ tag = in->tag;
+ tag_len = in->tag_len;
+ }
+ else {
+ tag = in->name;
+ tag_len = strlen(in->name);
+ }
+ }
+ else {
+ in = NULL;
+ }
+
+ if (task->window.records > 0) {
+ /* find input tag from task source */
+ package_results(tag, tag_len, &out_buf, &out_size, task);
+ if (task->stream) {
+ flb_sp_stream_append_data(out_buf, out_size, task->stream);
+ }
+ else {
+ flb_pack_print(out_buf, out_size);
+ flb_free(out_buf);
+ }
+
+ }
+
+ flb_sp_window_prune(task);
+
+ flb_utils_timer_consume(fd);
+
+ if (update_timer_event && in) {
+ task->window.first_hop = false;
+ mk_event_timeout_destroy(in->config->evl, &task->window.event);
+ mk_event_closesocket(fd);
+
+ fd_timeout = mk_event_timeout_create(in->config->evl,
+ task->window.advance_by, (long) 0,
+ &task->window.event);
+ if (fd_timeout == -1) {
+ flb_error("[sp] registration for task (updating timer event) %s failed", task->name);
+ return -1;
+ }
+ task->window.fd = fd_timeout;
+ }
+
+ break;
+ }
+ else if (fd == task->window.fd_hop) {
+ in = task->source_instance;
+ if (in) {
+ if (in->tag && in->tag_len > 0) {
+ tag = in->tag;
+ tag_len = in->tag_len;
+ }
+ else {
+ tag = in->name;
+ tag_len = strlen(in->name);
+ }
+ }
+ sp_process_hopping_slot(tag, tag_len, task);
+ flb_utils_timer_consume(fd);
+ }
+ }
+ return 0;
+}
+
+/* Destroy stream processor context */
+void flb_sp_destroy(struct flb_sp *sp)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_sp_task *task;
+
+ /* destroy tasks */
+ mk_list_foreach_safe(head, tmp, &sp->tasks) {
+ task = mk_list_entry(head, struct flb_sp_task, _head);
+ flb_sp_task_destroy(task);
+ }
+
+ flb_free(sp);
+}
diff --git a/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c b/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c
new file mode 100644
index 000000000..624be878c
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_aggregate_func.c
@@ -0,0 +1,364 @@
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h>
+
+char aggregate_func_string[AGGREGATE_FUNCTIONS][sizeof("TIMESERIES_FORECAST") + 1] = {
+ "AVG",
+ "SUM",
+ "COUNT",
+ "MIN",
+ "MAX",
+ "TIMESERIES_FORECAST"
+};
+
+int aggregate_func_clone_nop(struct aggregate_node *aggr_node,
+ struct aggregate_node *aggr_node_prev,
+ struct flb_sp_cmd_key *ckey,
+ int key_id) {
+ return 0;
+}
+
+int aggregate_func_clone_timeseries_forecast(struct aggregate_node *aggr_node_clone,
+ struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ int key_id) {
+ struct timeseries_forecast *forecast_clone;
+ struct timeseries_forecast *forecast;
+
+ forecast_clone = (struct timeseries_forecast *) aggr_node_clone->aggregate_data[key_id];
+ if (!forecast_clone) {
+ forecast_clone = (struct timeseries_forecast *) flb_calloc(1, sizeof(struct timeseries_forecast));
+ if (!forecast_clone) {
+ return -1;
+ }
+
+ forecast_clone->future_time = ckey->constant;
+ aggr_node_clone->aggregate_data[key_id] = (struct aggregate_data *) forecast_clone;
+ }
+
+ forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id];
+
+ forecast_clone->sigma_x = forecast->sigma_x;
+ forecast_clone->sigma_y = forecast->sigma_y;
+ forecast_clone->sigma_xy = forecast->sigma_xy;
+ forecast_clone->sigma_x2 = forecast->sigma_x2;
+
+ return 0;
+}
+
+/* Summarize a value into the temporary array considering data type */
+void aggregate_func_add_sum(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ int key_id,
+ struct flb_time *tms,
+ int64_t ival, double dval) {
+ if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) {
+ aggr_node->nums[key_id].i64 += ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) {
+ if (dval != 0.0) {
+ aggr_node->nums[key_id].f64 += dval;
+ }
+ else {
+ aggr_node->nums[key_id].f64 += (double) ival;
+ }
+ aggr_node->nums[key_id].ops++;
+ }
+}
+
+void aggregate_func_add_count(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ int key_id,
+ struct flb_time *tms,
+ int64_t ival, double dval) {
+}
+
+/* Calculate the minimum value considering data type */
+void aggregate_func_add_min(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ int key_id,
+ struct flb_time *tms,
+ int64_t ival, double dval) {
+
+ if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) {
+ if (aggr_node->nums[key_id].ops == 0) {
+ aggr_node->nums[key_id].i64 = ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ else {
+ if (aggr_node->nums[key_id].i64 > ival) {
+ aggr_node->nums[key_id].i64 = ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ }
+ }
+ else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) {
+ if (dval != 0.0) {
+ if (aggr_node->nums[key_id].ops == 0) {
+ aggr_node->nums[key_id].f64 = dval;
+ aggr_node->nums[key_id].ops++;
+ }
+ else {
+ if (aggr_node->nums[key_id].f64 > dval) {
+ aggr_node->nums[key_id].f64 = dval;
+ aggr_node->nums[key_id].ops++;
+ }
+ }
+ }
+ else {
+ if (aggr_node->nums[key_id].ops == 0) {
+ aggr_node->nums[key_id].f64 = (double) ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ else {
+ if (aggr_node->nums[key_id].f64 > (double) ival) {
+ aggr_node->nums[key_id].f64 = ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ }
+ }
+ }
+}
+
+/* Calculate the maximum value considering data type */
+void aggregate_func_add_max(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ int key_id,
+ struct flb_time *tms,
+ int64_t ival, double dval) {
+ if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) {
+ if (aggr_node->nums[key_id].ops == 0) {
+ aggr_node->nums[key_id].i64 = ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ else {
+ if (aggr_node->nums[key_id].i64 < ival) {
+ aggr_node->nums[key_id].i64 = ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ }
+ }
+ else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) {
+ if (dval != 0.0) {
+ if (aggr_node->nums[key_id].ops == 0) {
+ aggr_node->nums[key_id].f64 = dval;
+ aggr_node->nums[key_id].ops++;
+ }
+ else {
+ if (aggr_node->nums[key_id].f64 < dval) {
+ aggr_node->nums[key_id].f64 = dval;
+ aggr_node->nums[key_id].ops++;
+ }
+ }
+ }
+ else {
+ if (aggr_node->nums[key_id].ops == 0) {
+ aggr_node->nums[key_id].f64 = (double) ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ else {
+ if (aggr_node->nums[key_id].f64 < (double) ival) {
+ aggr_node->nums[key_id].f64 = (double) ival;
+ aggr_node->nums[key_id].ops++;
+ }
+ }
+ }
+ }
+}
+
+void aggregate_func_calc_avg(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ msgpack_packer *mp_pck,
+ int key_id) {
+ double dval = 0.0;
+ /* average = sum(values) / records */
+ if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) {
+ dval = (double) aggr_node->nums[key_id].i64 / aggr_node->records;
+ }
+ else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) {
+ dval = (double) aggr_node->nums[key_id].f64 / aggr_node->records;
+ }
+
+ msgpack_pack_float(mp_pck, dval);
+}
+
+void aggregate_func_calc_sum(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ msgpack_packer *mp_pck,
+ int key_id) {
+ /* pack result stored in nums[key_id] */
+ if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) {
+ msgpack_pack_int64(mp_pck, aggr_node->nums[key_id].i64);
+ }
+ else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) {
+ msgpack_pack_float(mp_pck, aggr_node->nums[key_id].f64);
+ }
+}
+
+void aggregate_func_calc_count(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ msgpack_packer *mp_pck,
+ int key_id) {
+ /* number of records in total */
+ msgpack_pack_int64(mp_pck, aggr_node->records);
+}
+
+void aggregate_func_remove_sum(struct aggregate_node *aggr_node,
+ struct aggregate_node *aggr_node_prev,
+ int key_id) {
+ if (aggr_node->nums[key_id].type == FLB_SP_NUM_I64) {
+ aggr_node->nums[key_id].i64 -= aggr_node_prev->nums[key_id].i64;
+ }
+ else if (aggr_node->nums[key_id].type == FLB_SP_NUM_F64) {
+ aggr_node->nums[key_id].f64 -= aggr_node_prev->nums[key_id].f64;
+ }
+}
+
+void aggregate_func_remove_nop(struct aggregate_node *aggr_node,
+ struct aggregate_node *aggr_node_prev,
+ int key_id) {
+}
+
+void aggregate_func_add_timeseries_forecast(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ int key_id,
+ struct flb_time *tms,
+ int64_t ival, double dval)
+{
+ double x;
+ double y;
+ struct timeseries_forecast *forecast;
+
+ forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id];
+ if (!forecast) {
+ forecast = (struct timeseries_forecast *) flb_calloc(1, sizeof(struct timeseries_forecast));
+ /* fixme: return if error */
+
+ forecast->future_time = ckey->constant;
+ aggr_node->aggregate_data[key_id] = (struct aggregate_data *) forecast;
+ }
+
+ if (!forecast->offset) {
+ forecast->offset = flb_time_to_double(tms);
+ }
+
+ x = flb_time_to_double(tms) - forecast->offset;
+
+ forecast->latest_x = x;
+
+ if (ival) {
+ y = (double) ival;
+ }
+ else {
+ y = dval;
+ }
+
+ forecast->sigma_x += x;
+ forecast->sigma_y += y;
+
+ forecast->sigma_xy += x * y;
+ forecast->sigma_x2 += x * x;
+}
+
+void aggregate_func_calc_timeseries_forecast(struct aggregate_node *aggr_node,
+ struct flb_sp_cmd_key *ckey,
+ msgpack_packer *mp_pck,
+ int key_id)
+{
+ double mean_x;
+ double mean_y;
+ double var_x;
+ double cov_xy;
+ double result;
+ /* y = b0 + b1 * x */
+ double b0;
+ double b1;
+ struct timeseries_forecast *forecast;
+
+ forecast = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id];
+
+ mean_x = forecast->sigma_x / aggr_node->records;
+ mean_y = forecast->sigma_y / aggr_node->records;
+ cov_xy = (forecast->sigma_xy / (double) aggr_node->records) - mean_x * mean_y;
+ var_x = (forecast->sigma_x2 / aggr_node->records) - mean_x * mean_x;
+
+ b1 = cov_xy / var_x;
+ b0 = mean_y - b1 * mean_x;
+
+ result = b0 + b1 * (forecast->future_time + forecast->latest_x);
+
+ msgpack_pack_float(mp_pck, result);
+}
+
+void aggregate_func_remove_timeseries_forecast(struct aggregate_node *aggr_node,
+ struct aggregate_node *aggr_node_prev,
+ int key_id)
+{
+ struct timeseries_forecast *forecast_w;
+ struct timeseries_forecast *forecast_h;
+
+ forecast_w = (struct timeseries_forecast *) aggr_node->aggregate_data[key_id];
+ forecast_h = (struct timeseries_forecast *) aggr_node_prev->aggregate_data[key_id];
+
+ forecast_w->sigma_x -= forecast_h->sigma_x;
+ forecast_w->sigma_y -= forecast_h->sigma_y;
+ forecast_w->sigma_xy -= forecast_h->sigma_xy;
+ forecast_w->sigma_x2 -= forecast_h->sigma_x2;
+}
+
+void aggregate_func_destroy_sum(struct aggregate_node *aggr_node,
+ int key_id)
+{
+}
+
+void aggregate_func_destroy_timeseries_forecast(struct aggregate_node *aggr_node,
+ int key_id)
+{
+ flb_free(aggr_node->aggregate_data[key_id]);
+}
+
+aggregate_function_clone aggregate_func_clone[AGGREGATE_FUNCTIONS] = {
+ aggregate_func_clone_nop,
+ aggregate_func_clone_nop,
+ aggregate_func_clone_nop,
+ aggregate_func_clone_nop,
+ aggregate_func_clone_nop,
+ aggregate_func_clone_timeseries_forecast,
+};
+
+aggregate_function_add aggregate_func_add[AGGREGATE_FUNCTIONS] = {
+ aggregate_func_add_sum,
+ aggregate_func_add_sum,
+ aggregate_func_add_count,
+ aggregate_func_add_min,
+ aggregate_func_add_max,
+ aggregate_func_add_timeseries_forecast,
+};
+
+aggregate_function_calc aggregate_func_calc[AGGREGATE_FUNCTIONS] = {
+ aggregate_func_calc_avg,
+ aggregate_func_calc_sum,
+ aggregate_func_calc_count,
+ aggregate_func_calc_sum,
+ aggregate_func_calc_sum,
+ aggregate_func_calc_timeseries_forecast,
+};
+
+aggregate_function_remove aggregate_func_remove[AGGREGATE_FUNCTIONS] = {
+ aggregate_func_remove_sum,
+ aggregate_func_remove_sum,
+ aggregate_func_remove_nop,
+ aggregate_func_remove_nop,
+ aggregate_func_remove_nop,
+ aggregate_func_remove_timeseries_forecast,
+};
+
+aggregate_function_destroy aggregate_func_destroy[AGGREGATE_FUNCTIONS] = {
+ aggregate_func_destroy_sum,
+ aggregate_func_destroy_sum,
+ aggregate_func_destroy_sum,
+ aggregate_func_destroy_sum,
+ aggregate_func_destroy_sum,
+ aggregate_func_destroy_timeseries_forecast,
+};
diff --git a/fluent-bit/src/stream_processor/flb_sp_func_record.c b/fluent-bit/src/stream_processor/flb_sp_func_record.c
new file mode 100644
index 000000000..26625a1ec
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_func_record.c
@@ -0,0 +1,77 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+
+static inline void pack_key(msgpack_packer *mp_pck,
+ struct flb_sp_cmd_key *cmd_key,
+ const char *name, int len)
+{
+ if (cmd_key->alias) {
+ msgpack_pack_str(mp_pck, flb_sds_len(cmd_key->alias));
+ msgpack_pack_str_body(mp_pck, cmd_key->alias,
+ flb_sds_len(cmd_key->alias));
+ }
+ else {
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, name, len);
+ }
+}
+
+static int func_tag(const char *tag, int tag_len,
+ msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key)
+{
+ pack_key(mp_pck, cmd_key, "RECORD_TAG()", 12);
+ msgpack_pack_str(mp_pck, tag_len);
+ msgpack_pack_str_body(mp_pck, tag, tag_len);
+
+ return 1;
+}
+
+static int func_time(struct flb_time *tms, msgpack_packer *mp_pck,
+ struct flb_sp_cmd_key *cmd_key)
+{
+ double t;
+
+ t = flb_time_to_double(tms);
+ pack_key(mp_pck, cmd_key, "RECORD_TIME()", 13);
+ msgpack_pack_double(mp_pck, t);
+
+ return 1;
+}
+
+/*
+ * Wrapper to handle record functions, returns the number of entries added
+ * to the map.
+ */
+int flb_sp_func_record(const char *tag, int tag_len, struct flb_time *tms,
+ msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key)
+{
+ switch (cmd_key->record_func) {
+ case FLB_SP_RECORD_TAG:
+ return func_tag(tag, tag_len, mp_pck, cmd_key);
+ case FLB_SP_RECORD_TIME:
+ return func_time(tms, mp_pck, cmd_key);
+ };
+
+ return 0;
+}
diff --git a/fluent-bit/src/stream_processor/flb_sp_func_time.c b/fluent-bit/src/stream_processor/flb_sp_func_time.c
new file mode 100644
index 000000000..3e75eb775
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_func_time.c
@@ -0,0 +1,95 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+
+static inline void pack_key(msgpack_packer *mp_pck,
+ struct flb_sp_cmd_key *cmd_key,
+ const char *name, int len)
+{
+ if (cmd_key->alias) {
+ msgpack_pack_str(mp_pck, flb_sds_len(cmd_key->alias));
+ msgpack_pack_str_body(mp_pck, cmd_key->alias,
+ flb_sds_len(cmd_key->alias));
+ }
+ else {
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, name, len);
+ }
+}
+
+static int func_now(msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key)
+{
+ size_t len;
+ time_t now;
+ char buf[32];
+ struct tm *local;
+
+ local = flb_malloc(sizeof(struct tm));
+ if (!local) {
+ flb_errno();
+ return 0;
+ }
+
+ /* Get current system time */
+ now = time(NULL);
+ localtime_r(&now, local);
+
+ /* Format string value */
+ len = strftime(buf, sizeof(buf) - 1, "%Y-%m-%d %H:%M:%S", local);
+ flb_free(local);
+
+ pack_key(mp_pck, cmd_key, "NOW()", 5);
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, buf, len);
+
+ return 1;
+}
+
+static int func_unix_timestamp(msgpack_packer *mp_pck,
+ struct flb_sp_cmd_key *cmd_key)
+{
+ time_t now;
+
+ /* Get unix timestamp */
+ now = time(NULL);
+
+ pack_key(mp_pck, cmd_key, "UNIX_TIMESTAMP()", 16);
+ msgpack_pack_uint64(mp_pck, now);
+ return 1;
+}
+
+/*
+ * Wrapper to handle time functions, returns the number of entries added
+ * to the map.
+ */
+int flb_sp_func_time(msgpack_packer *mp_pck, struct flb_sp_cmd_key *cmd_key)
+{
+ switch (cmd_key->time_func) {
+ case FLB_SP_NOW:
+ return func_now(mp_pck, cmd_key);
+ case FLB_SP_UNIX_TIMESTAMP:
+ return func_unix_timestamp(mp_pck, cmd_key);
+ };
+
+ return 0;
+}
diff --git a/fluent-bit/src/stream_processor/flb_sp_groupby.c b/fluent-bit/src/stream_processor/flb_sp_groupby.c
new file mode 100644
index 000000000..9cd72c230
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_groupby.c
@@ -0,0 +1,82 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+
+int flb_sp_groupby_compare(const void *lhs, const void *rhs)
+{
+ int i;
+ int strcmp_result;
+ struct aggregate_node *left = (struct aggregate_node *) lhs;
+ struct aggregate_node *right = (struct aggregate_node *) rhs;
+ struct aggregate_num *lval;
+ struct aggregate_num *rval;
+
+ for (i = 0; i < left->groupby_keys; i++) {
+ lval = &left->groupby_nums[i];
+ rval = &right->groupby_nums[i];
+
+ /* Convert integer to double if a float value appears on one side */
+ if (lval->type == FLB_SP_NUM_I64 && rval->type == FLB_SP_NUM_F64) {
+ lval->type = FLB_SP_NUM_F64;
+ lval->f64 = (double) lval->i64;
+ }
+ else if (lval->type == FLB_SP_NUM_F64 && rval->type == FLB_SP_NUM_I64) {
+ rval->type = FLB_SP_NUM_F64;
+ rval->f64 = (double) rval->i64;
+ }
+
+ /* Comparison */
+ if (lval->type == FLB_SP_BOOLEAN && rval->type == FLB_SP_BOOLEAN) {
+ if (lval->boolean != rval->boolean) {
+ return 1;
+ }
+ }
+ else if (lval->type == FLB_SP_NUM_I64 && rval->type == FLB_SP_NUM_I64) {
+ if (lval->i64 > rval->i64) {
+ return 1;
+ }
+
+ if (lval->i64 < rval->i64) {
+ return -1;
+ }
+ }
+ else if (lval->type == FLB_SP_NUM_F64 && rval->type == FLB_SP_NUM_F64) {
+ if (lval->f64 > rval->f64) {
+ return 1;
+ }
+
+ if (lval->f64 < rval->f64) {
+ return -1;
+ }
+ }
+ else if (lval->type == FLB_SP_STRING && rval->type == FLB_SP_STRING) {
+ strcmp_result = strcmp((const char *) lval->string, (const char *) rval->string);
+ if (strcmp_result != 0) {
+ return strcmp_result;
+ }
+ }
+ else { /* Sides have different types */
+ return -1;
+ }
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/src/stream_processor/flb_sp_key.c b/fluent-bit/src/stream_processor/flb_sp_key.c
new file mode 100644
index 000000000..945621843
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_key.c
@@ -0,0 +1,231 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_slist.h>
+
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+
+void flb_sp_key_value_print(struct flb_sp_value *v)
+{
+ if (v->type == FLB_EXP_BOOL) {
+ if (v->val.boolean) {
+ printf("true");
+ }
+ else {
+ printf("false");
+ }
+ }
+ else if (v->type == FLB_EXP_INT) {
+ printf("%" PRId64, v->val.i64);
+ }
+ else if (v->type == FLB_EXP_FLOAT) {
+ printf("%f", v->val.f64);
+ }
+ else if (v->type == FLB_EXP_STRING) {
+ printf("%s", v->val.string);
+ }
+ else if (v->type == FLB_EXP_NULL) {
+ printf("NULL");
+ }
+}
+
+/* Map msgpack object intp flb_sp_value representation */
+static int msgpack_object_to_sp_value(msgpack_object o,
+ struct flb_sp_value *result)
+{
+ result->o = o;
+
+ /* Compose result with found value */
+ if (o.type == MSGPACK_OBJECT_BOOLEAN) {
+ result->type = FLB_EXP_BOOL;
+ result->val.boolean = o.via.boolean;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
+ o.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
+ result->type = FLB_EXP_INT;
+ result->val.i64 = o.via.i64;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_FLOAT32 ||
+ o.type == MSGPACK_OBJECT_FLOAT) {
+ result->type = FLB_EXP_FLOAT;
+ result->val.f64 = o.via.f64;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_STR) {
+ result->type = FLB_EXP_STRING;
+ result->val.string = flb_sds_create_len((char *) o.via.str.ptr,
+ o.via.str.size);
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_MAP) {
+ /* return boolean 'true', just denoting the existence of the key */
+ result->type = FLB_EXP_BOOL;
+ result->val.boolean = true;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_NIL) {
+ result->type = FLB_EXP_NULL;
+ return 0;
+ }
+
+ return -1;
+}
+
+/* Lookup perfect match of sub-keys and map content */
+static int subkey_to_value(msgpack_object *map, struct mk_list *subkeys,
+ struct flb_sp_value *result)
+{
+ int i = 0;
+ int ret;
+ int levels;
+ int matched = 0;
+ msgpack_object *key_found = NULL;
+ msgpack_object key;
+ msgpack_object val;
+ msgpack_object cur_map;
+ struct mk_list *head;
+ struct flb_slist_entry *entry;
+
+ /* Expected number of map levels in the map */
+ levels = mk_list_size(subkeys);
+
+ cur_map = *map;
+
+ mk_list_foreach(head, subkeys) {
+ /* Key expected key entry */
+ entry = mk_list_entry(head, struct flb_slist_entry, _head);
+
+ if (cur_map.type != MSGPACK_OBJECT_MAP) {
+ break;
+ }
+
+ /* Get map entry that matches entry name */
+ for (i = 0; i < cur_map.via.map.size; i++) {
+ key = cur_map.via.map.ptr[i].key;
+ val = cur_map.via.map.ptr[i].val;
+
+ /* A bit obvious, but it's better to validate data type */
+ if (key.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ /* Compare strings by length and content */
+ if (flb_sds_cmp(entry->str,
+ (char *) key.via.str.ptr,
+ key.via.str.size) != 0) {
+ key_found = NULL;
+ continue;
+ }
+
+ key_found = &key;
+ cur_map = val;
+ matched++;
+ break;
+ }
+
+ if (levels == matched) {
+ break;
+ }
+ }
+
+ /* No matches */
+ if (!key_found || (matched > 0 && levels != matched)) {
+ return -1;
+ }
+
+ ret = msgpack_object_to_sp_value(val, result);
+ if (ret == -1) {
+ //flb_error("[sp key] cannot process key value");
+ return -1;
+ }
+
+ return 0;
+}
+
+struct flb_sp_value *flb_sp_key_to_value(flb_sds_t ckey,
+ msgpack_object map,
+ struct mk_list *subkeys)
+{
+ int i;
+ int ret;
+ int map_size;
+ msgpack_object key;
+ msgpack_object val;
+ struct flb_sp_value *result;
+
+ map_size = map.via.map.size;
+ for (i = 0; i < map_size; i++) {
+ key = map.via.map.ptr[i].key;
+ val = map.via.map.ptr[i].val;
+
+ /* Compare by length and by key name */
+ if (flb_sds_cmp(ckey, key.via.str.ptr, key.via.str.size) != 0) {
+ continue;
+ }
+
+ result = flb_calloc(1, sizeof(struct flb_sp_value));
+ if (!result) {
+ flb_errno();
+ return NULL;
+ }
+ result->o = val;
+
+ if (val.type == MSGPACK_OBJECT_MAP && subkeys != NULL) {
+ ret = subkey_to_value(&val, subkeys, result);
+ if (ret == 0) {
+ return result;
+ }
+ else {
+ flb_free(result);
+ return NULL;
+ }
+ }
+ else {
+ ret = msgpack_object_to_sp_value(val, result);
+ if (ret == -1) {
+ flb_error("[sp key] cannot process key value");
+ flb_free(result);
+ return NULL;
+ }
+ }
+
+ return result;
+ }
+
+ /*
+ * NULL return means: failed memory allocation, an invalid value,
+ * or non-existing key.
+ */
+ return NULL;
+}
+
+void flb_sp_key_value_destroy(struct flb_sp_value *v)
+{
+ if (v->type == FLB_EXP_STRING) {
+ flb_sds_destroy(v->val.string);
+ }
+ flb_free(v);
+}
diff --git a/fluent-bit/src/stream_processor/flb_sp_snapshot.c b/fluent-bit/src/stream_processor/flb_sp_snapshot.c
new file mode 100644
index 000000000..edef823b4
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_snapshot.c
@@ -0,0 +1,277 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_snapshot.h>
+
+static struct flb_sp_snapshot_page *snapshot_page_create()
+{
+ struct flb_sp_snapshot_page *page;
+
+ page = (struct flb_sp_snapshot_page *)
+ flb_calloc(1, sizeof(struct flb_sp_snapshot_page));
+ if (!page) {
+ flb_errno();
+ return NULL;
+ }
+
+ page->snapshot_page = (char *) flb_malloc(SNAPSHOT_PAGE_SIZE);
+ if (!page->snapshot_page) {
+ flb_errno();
+ flb_free(page);
+ return NULL;
+ }
+
+ return page;
+}
+
+static int snapshot_cleanup(struct flb_sp_snapshot *snapshot, struct flb_time *tms)
+{
+ int ok;
+ size_t off;
+ size_t off_copy;
+ msgpack_unpacked result;
+ msgpack_object *obj;
+ struct flb_time tms0;
+ struct flb_sp_snapshot_page *page;
+
+ ok = MSGPACK_UNPACK_SUCCESS;
+ off = 0;
+
+ while (mk_list_is_empty(&snapshot->pages) != 0) {
+ page = mk_list_entry_first(&snapshot->pages, struct flb_sp_snapshot_page,
+ _head);
+ off = page->start_pos;
+ off_copy = off;
+
+ msgpack_unpacked_init(&result);
+
+ while (msgpack_unpack_next(&result, page->snapshot_page, page->end_pos,
+ &off) == ok) {
+
+ if (snapshot->record_limit > 0 &&
+ snapshot->records > snapshot->record_limit) {
+ page->start_pos = off;
+ snapshot->records--;
+ snapshot->size = snapshot->size - (off - off_copy);
+ off_copy = off;
+
+ continue;
+ }
+
+ /* extract timestamp */
+ flb_time_pop_from_msgpack(&tms0, &result, &obj);
+
+ if (snapshot->time_limit > 0 &&
+ tms->tm.tv_sec - tms0.tm.tv_sec > snapshot->time_limit) {
+ page->start_pos = off;
+ snapshot->records--;
+ snapshot->size = snapshot->size - (off - off_copy);
+ off_copy = off;
+
+ continue;
+ }
+
+ break;
+ }
+
+ msgpack_unpacked_destroy(&result);
+
+ /* If page is empty, free the page and move to the next one */
+ if (page->start_pos != page->end_pos) {
+ break;
+ }
+
+ mk_list_del(&page->_head);
+ flb_free(page->snapshot_page);
+ flb_free(page);
+ }
+
+ return 0;
+}
+
+static bool snapshot_page_is_full(struct flb_sp_snapshot_page *page, size_t buf_size)
+{
+ return SNAPSHOT_PAGE_SIZE - page->end_pos < buf_size;
+}
+
+char *flb_sp_snapshot_name_from_flush(flb_sds_t name)
+{
+ return name + sizeof("__flush_") - 1;
+}
+
+int flb_sp_snapshot_update(struct flb_sp_task *task, const char *buf_data,
+ size_t buf_size, struct flb_time *tms)
+{
+ int ok;
+ size_t off = 0;
+ struct flb_time tm;
+ struct flb_sp_snapshot *snapshot;
+ struct flb_sp_snapshot_page *page;
+ msgpack_unpacked result;
+ msgpack_object *obj;
+
+ ok = MSGPACK_UNPACK_SUCCESS;
+ msgpack_unpacked_init(&result);
+
+ if (buf_size <= 0) {
+ return -1;
+ }
+
+ snapshot = (struct flb_sp_snapshot *) task->snapshot;
+
+ /* Create a snapshot pgae if the list is empty */
+ if (mk_list_is_empty(&snapshot->pages) == 0) {
+ page = snapshot_page_create();
+ if (!page) {
+ flb_errno();
+ return -1;
+ }
+
+ mk_list_add(&page->_head, &snapshot->pages);
+ }
+ else {
+ page = mk_list_entry_last(&snapshot->pages, struct flb_sp_snapshot_page, _head);
+
+ if (snapshot_page_is_full(page, buf_size)) {
+ page = snapshot_page_create();
+ if (!page) {
+ flb_errno();
+ return -1;
+ }
+
+ mk_list_add(&page->_head, &snapshot->pages);
+ }
+ }
+
+ memcpy(page->snapshot_page + page->end_pos, buf_data, buf_size);
+ page->end_pos = page->end_pos + buf_size;
+
+ /* Get the last timestamp */
+ while (msgpack_unpack_next(&result, page->snapshot_page,
+ page->end_pos - page->start_pos, &off) == ok) {
+ flb_time_pop_from_msgpack(&tm, &result, &obj);
+ }
+
+ msgpack_unpacked_destroy(&result);
+
+ snapshot->records++;
+ snapshot->size = snapshot->size + buf_size;
+
+ /* Remove records from snapshot pages based on time/length window */
+ snapshot_cleanup(snapshot, tms);
+
+ return 0;
+}
+
+int flb_sp_snapshot_flush(struct flb_sp *sp, struct flb_sp_task *task,
+ char **out_buf_data, size_t *out_buf_size)
+{
+ size_t off;
+ size_t page_size;
+ char *snapshot_name;
+ char *out_buf_data_tmp;
+ struct flb_sp_cmd *cmd;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *snapshot_head;
+ struct flb_sp_task *snapshot_task;
+ struct flb_sp_snapshot *snapshot;
+ struct flb_sp_snapshot_page *page;
+
+ off = 0;
+ cmd = task->cmd;
+ snapshot_name = flb_sp_snapshot_name_from_flush(cmd->stream_name);
+
+ /* Lookup Tasks that matches the incoming instance data */
+ mk_list_foreach(head, &sp->tasks) {
+ snapshot_task = mk_list_entry(head, struct flb_sp_task, _head);
+ cmd = snapshot_task->cmd;
+
+ if (cmd->type == FLB_SP_CREATE_SNAPSHOT &&
+ flb_sds_cmp(cmd->stream_name, snapshot_name,
+ strlen(snapshot_name)) == 0) {
+
+ snapshot = (struct flb_sp_snapshot *) snapshot_task->snapshot;
+
+ if (snapshot->size == 0) {
+ break;
+ }
+
+ if (*out_buf_data == NULL) {
+ *out_buf_data = (char *) flb_malloc(snapshot->size);
+ if (!*out_buf_data) {
+ flb_errno();
+ return -1;
+ }
+ *out_buf_size = snapshot->size;
+ }
+ else {
+ out_buf_data_tmp = (char *) flb_realloc(*out_buf_data,
+ *out_buf_size + snapshot->size);
+ if (!out_buf_data_tmp) {
+ flb_errno();
+ return -1;
+ }
+ *out_buf_data = out_buf_data_tmp;
+ *out_buf_size = *out_buf_size + snapshot->size;
+ }
+
+ mk_list_foreach_safe(snapshot_head, tmp, &snapshot->pages) {
+ page = mk_list_entry_first(&snapshot->pages,
+ struct flb_sp_snapshot_page, _head);
+ page_size = page->end_pos - page->start_pos;
+ memcpy(*out_buf_data + off,
+ page->snapshot_page + page->start_pos, page_size);
+ off = off + page_size;
+
+ /* Remove page from list */
+ mk_list_del(&page->_head);
+ flb_free(page->snapshot_page);
+ flb_free(page);
+ }
+
+ mk_list_init(&snapshot->pages);
+
+ snapshot->records = 0;
+ snapshot->size = 0;
+ }
+ }
+
+ return 0;
+}
+
+void flb_sp_snapshot_destroy(struct flb_sp_snapshot *snapshot)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_sp_snapshot_page *page;
+
+ if (snapshot != NULL) {
+ mk_list_foreach_safe(head, tmp, &snapshot->pages) {
+ page = mk_list_entry(head, struct flb_sp_snapshot_page, _head);
+ mk_list_del(&page->_head);
+ flb_free(page->snapshot_page);
+ flb_free(page);
+ }
+ flb_free(snapshot);
+ }
+}
diff --git a/fluent-bit/src/stream_processor/flb_sp_stream.c b/fluent-bit/src/stream_processor/flb_sp_stream.c
new file mode 100644
index 000000000..b4f8a37a2
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_stream.c
@@ -0,0 +1,168 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_stream.h>
+
+/* Function defined in plugins/in_stream_processor/sp.c */
+int in_stream_processor_add_chunk(const char *buf_data, size_t buf_size,
+ struct flb_input_instance *in);
+
+int flb_sp_stream_create(const char *name, struct flb_sp_task *task,
+ struct flb_sp *sp)
+{
+ int ret;
+ const char *tmp;
+ struct flb_input_instance *in;
+ struct flb_sp_stream *stream;
+
+ /* The name must be different than an input plugin instance name or alias */
+ ret = flb_input_name_exists(name, sp->config);
+ if (ret == FLB_TRUE) {
+ flb_error("[sp] stream name '%s' already exists", name);
+ return -1;
+ }
+
+ /* Create stream context for 'stream processor' */
+ stream = flb_calloc(1, sizeof(struct flb_sp_stream));
+ if (!stream) {
+ flb_errno();
+ return -1;
+ }
+ stream->name = flb_sds_create(name);
+ if (!stream->name) {
+ flb_free(stream);
+ return -1;
+ }
+
+ /*
+ * Register an input plugin instance using 'in_stream_processor', that one
+ * is used as the parent plugin to ingest data back into Fluent Bit
+ * data pipeline.
+ */
+ in = flb_input_new(sp->config, "stream_processor", NULL, FLB_FALSE);
+ if (!in) {
+ flb_error("[sp] cannot create instance of in_stream_processor");
+ flb_free(stream);
+ return -1;
+ }
+
+ /* Set an alias, otherwise the stream will be called stream_processor.N */
+ ret = flb_input_set_property(in, "alias", name);
+ if (ret == -1) {
+ flb_warn("[sp] cannot set stream name, using fallback name %s",
+ in->name);
+ }
+
+ /*
+ * Lookup for Stream properties: at this point we only care about a
+ * possible Tag defined in the query, e.g:
+ *
+ * CREATE STREAM data WITH(tag='mydata') as SELECT * FROM STREAM:apache;
+ */
+ tmp = flb_sp_cmd_stream_prop_get(task->cmd, "tag");
+ if (tmp) {
+ /*
+ * Duplicate value in a new variable since input instance property
+ * will be released upon plugin exit.
+ */
+ stream->tag = flb_sds_create(tmp);
+ if (!stream->tag) {
+ flb_error("[sp] cannot set Tag property");
+ flb_sp_stream_destroy(stream, sp);
+ return -1;
+ }
+
+ /* Tag property is just an assignation, cannot fail */
+ flb_input_set_property(in, "tag", stream->tag);
+ }
+
+ /*
+ * Check if the new stream is 'routable' or not
+ */
+ tmp = flb_sp_cmd_stream_prop_get(task->cmd, "routable");
+ if (tmp) {
+ stream->routable = flb_utils_bool(tmp);
+ flb_input_set_property(in, "routable", tmp);
+ }
+
+ /*
+ * Set storage type
+ */
+ tmp = flb_sp_cmd_stream_prop_get(task->cmd, "storage.type");
+ if (tmp) {
+ flb_input_set_property(in, "storage.type", tmp);
+ }
+
+ /* Initialize instance */
+ ret = flb_input_instance_init(in, sp->config);
+ if (ret == -1) {
+ flb_error("[sp] cannot initialize instance of in_stream_processor");
+ flb_input_instance_exit(in, sp->config);
+ flb_input_instance_destroy(in);
+ }
+ stream->in = in;
+
+ /* Initialize plugin collector (event callback) */
+ flb_input_collector_start(0, in);
+
+#ifdef FLB_HAVE_METRICS
+ /* Override Metrics title */
+ ret = flb_metrics_title(name, in->metrics);
+ if (ret == -1) {
+ flb_warn("[sp] cannot set metrics title, using fallback name %s",
+ in->name);
+ }
+#endif
+
+ /* Storage context */
+ ret = flb_storage_input_create(sp->config->cio, in);
+ if (ret == -1) {
+ flb_error("[sp] cannot initialize storage for stream '%s'",
+ name);
+ flb_sp_stream_destroy(stream, sp);
+ return -1;
+ }
+
+ task->stream = stream;
+ return 0;
+}
+
+int flb_sp_stream_append_data(const char *buf_data, size_t buf_size,
+ struct flb_sp_stream *stream)
+{
+ return in_stream_processor_add_chunk(buf_data, buf_size, stream->in);
+}
+
+void flb_sp_stream_destroy(struct flb_sp_stream *stream, struct flb_sp *sp)
+{
+ flb_sds_destroy(stream->name);
+ flb_sds_destroy(stream->tag);
+ flb_input_instance_exit(stream->in, sp->config);
+ flb_input_instance_destroy(stream->in);
+ flb_free(stream);
+}
diff --git a/fluent-bit/src/stream_processor/flb_sp_window.c b/fluent-bit/src/stream_processor/flb_sp_window.c
new file mode 100644
index 000000000..3937b4273
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_window.c
@@ -0,0 +1,122 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_window.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_groupby.h>
+#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h>
+
+void flb_sp_window_prune(struct flb_sp_task *task)
+{
+ int i;
+ int map_entries;
+ rb_result_t result;
+ struct aggregate_node *aggr_node;
+ struct aggregate_node *aggr_node_hs;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_sp_hopping_slot *hs;
+ struct rb_tree_node *rb_result;
+ struct flb_sp_cmd_key *ckey;
+ struct flb_sp_cmd *cmd = task->cmd;
+
+ switch (task->window.type) {
+ case FLB_SP_WINDOW_DEFAULT:
+ case FLB_SP_WINDOW_TUMBLING:
+ if (task->window.records > 0) {
+ mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) {
+ aggr_node = mk_list_entry(head, struct aggregate_node, _head);
+ mk_list_del(&aggr_node->_head);
+ flb_sp_aggregate_node_destroy(cmd, aggr_node);
+ }
+
+ rb_tree_destroy(&task->window.aggregate_tree);
+ mk_list_init(&task->window.aggregate_list);
+ rb_tree_new(&task->window.aggregate_tree, flb_sp_groupby_compare);
+ task->window.records = 0;
+ }
+ break;
+ case FLB_SP_WINDOW_HOPPING:
+ if (mk_list_size(&task->window.hopping_slot) == 0) {
+ return;
+ }
+
+ hs = mk_list_entry_first(&task->window.hopping_slot,
+ struct flb_sp_hopping_slot, _head);
+ mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) {
+ aggr_node = mk_list_entry(head, struct aggregate_node, _head);
+ result = rb_tree_find(&hs->aggregate_tree, aggr_node, &rb_result);
+ if (result == RB_OK) {
+ aggr_node_hs = mk_list_entry(rb_result, struct aggregate_node, _rb_head);
+ if (aggr_node_hs->records == aggr_node->records) {
+ rb_tree_remove(&task->window.aggregate_tree, &aggr_node->_rb_head);
+ mk_list_del(&aggr_node->_head);
+ // Destroy aggregation node
+ flb_sp_aggregate_node_destroy(cmd, aggr_node);
+ }
+ else {
+ aggr_node->records -= aggr_node_hs->records;
+ map_entries = mk_list_size(&cmd->keys);
+
+ ckey = mk_list_entry_first(&cmd->keys,
+ struct flb_sp_cmd_key, _head);
+ for (i = 0; i < map_entries; i++) {
+ if (ckey->aggr_func) {
+ aggregate_func_remove[ckey->aggr_func - 1](aggr_node, aggr_node_hs, i);
+ }
+
+ ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key,
+ _head, &cmd->keys);
+ }
+ }
+ }
+ }
+ task->window.records -= hs->records;
+
+ /* Destroy hopping slot */
+ mk_list_foreach_safe(head, tmp, &hs->aggregate_list) {
+ aggr_node_hs = mk_list_entry(head, struct aggregate_node, _head);
+ mk_list_del(&aggr_node_hs->_head);
+ flb_sp_aggregate_node_destroy(cmd, aggr_node_hs);
+ }
+ rb_tree_destroy(&hs->aggregate_tree);
+ mk_list_del(&hs->_head);
+ flb_free(hs);
+
+ break;
+ }
+}
+
+int flb_sp_window_populate(struct flb_sp_task *task, const char *buf_data,
+ size_t buf_size)
+{
+ switch (task->window.type) {
+ case FLB_SP_WINDOW_DEFAULT:
+ case FLB_SP_WINDOW_TUMBLING:
+ case FLB_SP_WINDOW_HOPPING:
+ break;
+ default:
+ flb_error("[sp] error populating window for '%s': window type unknown",
+ task->name);
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/src/stream_processor/parser/CMakeLists.txt b/fluent-bit/src/stream_processor/parser/CMakeLists.txt
new file mode 100644
index 000000000..d14b02bbd
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/CMakeLists.txt
@@ -0,0 +1,31 @@
+flex_target(lexer sql.l "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c"
+ DEFINES_FILE "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.h"
+ )
+bison_target(parser sql.y "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c")
+
+set(sources
+ flb_sp_parser.c
+ )
+
+if(CMAKE_SYSTEM_NAME MATCHES "Windows")
+ FLB_DEFINITION(YY_NO_UNISTD_H)
+ message(STATUS "Specifying YY_NO_UNISTD_H")
+endif()
+
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_BINARY_DIR}
+ )
+
+add_library(flb-sp-parser STATIC
+ ${sources}
+ "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c"
+ "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c"
+ )
+
+add_flex_bison_dependency(lexer parser)
+add_dependencies(flb-sp-parser onigmo-static)
+
+if(FLB_JEMALLOC)
+ target_link_libraries(flb-sp-parser libjemalloc)
+endif()
diff --git a/fluent-bit/src/stream_processor/parser/flb_sp_parser.c b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c
new file mode 100644
index 000000000..429d0b4c1
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c
@@ -0,0 +1,851 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_slist.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h>
+#include <fluent-bit/stream_processor/flb_sp_record_func.h>
+
+#include "sql_parser.h"
+#include "sql_lex.h"
+
+static int swap_tmp_subkeys(struct mk_list **target, struct flb_sp_cmd *cmd)
+{
+ /* Map context keys into this command key structure */
+ *target = cmd->tmp_subkeys;
+
+ cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list));
+ if (!cmd->tmp_subkeys) {
+ flb_errno();
+ cmd->tmp_subkeys = *target;
+ cmd->status = FLB_SP_ERROR;
+ return -1;
+ }
+
+ flb_slist_create(cmd->tmp_subkeys);
+ return 0;
+}
+
+void flb_sp_cmd_destroy(struct flb_sp_cmd *cmd)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_sp_cmd_key *key;
+ struct flb_sp_cmd_gb_key *gb_key;
+ struct flb_sp_cmd_prop *prop;
+
+ /* remove keys */
+ mk_list_foreach_safe(head, tmp, &cmd->keys) {
+ key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+ mk_list_del(&key->_head);
+ flb_sp_cmd_key_del(key);
+ }
+
+ /* remove groupby keys */
+ mk_list_foreach_safe(head, tmp, &cmd->gb_keys) {
+ gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key, _head);
+ mk_list_del(&gb_key->_head);
+ flb_sp_cmd_gb_key_del(gb_key);
+ }
+
+ /* stream */
+ if (cmd->stream_name) {
+ mk_list_foreach_safe(head, tmp, &cmd->stream_props) {
+ prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head);
+ mk_list_del(&prop->_head);
+ flb_sp_cmd_stream_prop_del(prop);
+ }
+ flb_sds_destroy(cmd->stream_name);
+ }
+ flb_sds_destroy(cmd->source_name);
+
+ if (mk_list_size(&cmd->cond_list) > 0) {
+ flb_sp_cmd_condition_del(cmd);
+ }
+
+ if (cmd->tmp_subkeys) {
+ flb_slist_destroy(cmd->tmp_subkeys);
+ flb_free(cmd->tmp_subkeys);
+ }
+
+ flb_free(cmd);
+}
+
+void flb_sp_cmd_key_del(struct flb_sp_cmd_key *key)
+{
+ if (key->name) {
+ flb_sds_destroy(key->name);
+ }
+ if (key->alias) {
+ flb_sds_destroy(key->alias);
+ }
+ if (key->subkeys) {
+ flb_slist_destroy(key->subkeys);
+ flb_free(key->subkeys);
+ }
+ flb_free(key);
+}
+
+void flb_sp_cmd_gb_key_del(struct flb_sp_cmd_gb_key *key)
+{
+ if (key->name) {
+ flb_sds_destroy(key->name);
+ }
+ if (key->subkeys) {
+ flb_slist_destroy(key->subkeys);
+ flb_free(key->subkeys);
+ }
+ flb_free(key);
+}
+
+struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func,
+ const char *key_name,
+ const char *key_alias)
+{
+ char tmp_alias[256];
+ int s;
+ int ret;
+ int len;
+ int aggr_func = 0;
+ int time_func = 0;
+ int record_func = 0;
+ char *tmp;
+ struct mk_list *head;
+ struct flb_sp_cmd_key *key;
+ struct flb_slist_entry *entry;
+
+ /* aggregation function ? */
+ if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) {
+ aggr_func = func;
+ }
+ else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) {
+ /* Time function */
+ time_func = func;
+ }
+ else if (func >= FLB_SP_RECORD_TAG && func <= FLB_SP_RECORD_TIME) {
+ /* Record function */
+ record_func = func;
+ }
+
+ key = flb_calloc(1, sizeof(struct flb_sp_cmd_key));
+ if (!key) {
+ flb_errno();
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ key->gb_key = NULL;
+ key->subkeys = NULL;
+
+ /* key name and aliases works when the selection is not a wildcard */
+ if (key_name) {
+ key->name = flb_sds_create(key_name);
+ if (!key->name) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ }
+ else {
+ /*
+ * Wildcard key only allowed on:
+ * - no aggregation mode (left side / first entry)
+ * - aggregation using COUNT(*)
+ */
+ if (mk_list_size(&cmd->keys) > 0 && aggr_func == 0 &&
+ record_func == 0 && time_func == 0) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ }
+
+ if (key_alias) {
+ key->alias = flb_sds_create(key_alias);
+ if (!key->alias) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ }
+
+ /* Aggregation function */
+ if (aggr_func > 0) {
+ key->aggr_func = aggr_func;
+ }
+ else if (time_func > 0) {
+ key->time_func = time_func;
+ }
+ else if (record_func > 0) {
+ key->record_func = record_func;
+ }
+
+ /* Lookup for any subkeys in the temporary list */
+ if (mk_list_size(cmd->tmp_subkeys) > 0) {
+ ret = swap_tmp_subkeys(&key->subkeys, cmd);
+ if (ret == -1) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+
+ /* Compose a name key that include listed sub keys */
+ if (!key->alias) {
+ s = flb_sds_len(key->name) + (16 * mk_list_size(key->subkeys));
+ key->alias = flb_sds_create_size(s);
+ if (!key->alias) {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+
+ tmp = flb_sds_cat(key->alias, key->name, flb_sds_len(key->name));
+ if (tmp != key->alias) {
+ key->alias = tmp;
+ }
+
+ mk_list_foreach(head, key->subkeys) {
+ entry = mk_list_entry(head, struct flb_slist_entry, _head);
+
+ /* prefix */
+ tmp = flb_sds_cat(key->alias, "['", 2);
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+
+ /* selected key name */
+ tmp = flb_sds_cat(key->alias,
+ entry->str, flb_sds_len(entry->str));
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+
+ /* suffix */
+ tmp = flb_sds_cat(key->alias, "']", 2);
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+ }
+
+ if (aggr_func) {
+ len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)",
+ aggregate_func_string[aggr_func - 1], key->alias);
+
+ tmp = flb_sds_copy(key->alias, tmp_alias, len);
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+ }
+ }
+ }
+ else if (aggr_func && !key->alias) {
+ if (key->name) {
+ len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)",
+ aggregate_func_string[aggr_func - 1], key->name);
+ } else {
+ len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(*)",
+ aggregate_func_string[aggr_func - 1]);
+ }
+
+ key->alias = flb_sds_create_len(tmp_alias, len);
+ if (!key->alias) {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+ }
+
+ return key;
+}
+
+int flb_sp_cmd_key_add(struct flb_sp_cmd *cmd, int func, const char *key_name)
+{
+ struct flb_sp_cmd_key *key;
+
+ key = flb_sp_key_create(cmd, func, key_name, cmd->alias);
+
+ if (!key) {
+ return -1;
+ }
+
+ mk_list_add(&key->_head, &cmd->keys);
+
+ /* free key alias and set cmd->alias to null */
+ if (cmd->alias) {
+ flb_free(cmd->alias);
+ cmd->alias = NULL;
+ }
+
+ return 0;
+}
+
+void flb_sp_cmd_alias_add(struct flb_sp_cmd *cmd, const char *key_alias)
+{
+ cmd->alias = (char *) key_alias;
+}
+
+int flb_sp_cmd_source(struct flb_sp_cmd *cmd, int type, const char *source)
+{
+ cmd->source_type = type;
+ cmd->source_name = flb_sds_create(source);
+ if (!cmd->source_name) {
+ flb_errno();
+ return -1;
+ }
+
+ return 0;
+}
+
+void flb_sp_cmd_dump(struct flb_sp_cmd *cmd)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_sp_cmd_key *key;
+
+ /* Lookup keys */
+ printf("== KEYS ==\n");
+ mk_list_foreach_safe(head, tmp, &cmd->keys) {
+ key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+ printf("- '%s'\n", key->name);
+ }
+ printf("== SOURCE ==\n");
+ if (cmd->source_type == FLB_SP_STREAM) {
+ printf("stream => ");
+ }
+ else if (cmd->source_type == FLB_SP_TAG) {
+ printf("tag match => ");
+ }
+
+ printf("'%s'\n", cmd->source_name);
+}
+
+struct flb_sp_cmd *flb_sp_cmd_create(const char *sql)
+{
+ int ret;
+ yyscan_t scanner;
+ YY_BUFFER_STATE buf;
+ struct flb_sp_cmd *cmd;
+
+ /* create context */
+ cmd = flb_calloc(1, sizeof(struct flb_sp_cmd));
+ if (!cmd) {
+ flb_errno();
+ return NULL;
+ }
+ cmd->status = FLB_SP_OK;
+ cmd->type = FLB_SP_SELECT;
+
+ mk_list_init(&cmd->stream_props);
+ mk_list_init(&cmd->keys);
+
+ /* Condition linked list (we use them to free resources) */
+ mk_list_init(&cmd->cond_list);
+ mk_list_init(&cmd->gb_keys);
+
+ /* Allocate temporary list and initialize */
+ cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list));
+ if (!cmd->tmp_subkeys) {
+ flb_errno();
+ flb_free(cmd);
+ return NULL;
+ }
+ flb_slist_create(cmd->tmp_subkeys);
+
+ /* Flex/Bison work */
+ flb_sp_lex_init(&scanner);
+ buf = flb_sp__scan_string(sql, scanner);
+
+ ret = flb_sp_parse(cmd, sql, scanner);
+
+ flb_sp__delete_buffer(buf, scanner);
+ flb_sp_lex_destroy(scanner);
+
+ if (ret != 0) {
+ flb_sp_cmd_destroy(cmd);
+ return NULL;
+ }
+
+ return cmd;
+}
+
+int flb_sp_cmd_stream_new(struct flb_sp_cmd *cmd, const char *stream_name)
+{
+ cmd->stream_name = flb_sds_create(stream_name);
+ if (!cmd->stream_name) {
+ return -1;
+ }
+
+ cmd->type = FLB_SP_CREATE_STREAM;
+ return 0;
+}
+
+int flb_sp_cmd_snapshot_new(struct flb_sp_cmd *cmd, const char *snapshot_name)
+{
+ const char *tmp;
+
+ cmd->stream_name = flb_sds_create(snapshot_name);
+ if (!cmd->stream_name) {
+ return -1;
+ }
+
+ tmp = flb_sp_cmd_stream_prop_get(cmd, "tag");
+ if (!tmp) {
+ cmd->status = FLB_SP_ERROR;
+ flb_error("[sp] tag for snapshot is required. Add WITH(tag = <TAG>) to the snapshot %s",
+ snapshot_name);
+ return -1;
+ }
+
+ cmd->type = FLB_SP_CREATE_SNAPSHOT;
+
+ return 0;
+}
+
+int flb_sp_cmd_snapshot_flush_new(struct flb_sp_cmd *cmd, const char *snapshot_name)
+{
+ cmd->stream_name = flb_sds_cat(flb_sds_create("__flush_"),
+ snapshot_name, strlen(snapshot_name));
+
+ if (!cmd->stream_name) {
+ return -1;
+ }
+
+ cmd->type = FLB_SP_FLUSH_SNAPSHOT;
+
+ return 0;
+}
+
+int flb_sp_cmd_stream_prop_add(struct flb_sp_cmd *cmd, const char *key, const char *val)
+{
+ struct flb_sp_cmd_prop *prop;
+
+ prop = flb_malloc(sizeof(struct flb_sp_cmd_prop));
+ if (!prop) {
+ flb_errno();
+ return -1;
+ }
+
+ prop->key = flb_sds_create(key);
+ if (!prop->key) {
+ flb_free(prop);
+ return -1;
+ }
+
+ prop->val = flb_sds_create(val);
+ if (!prop->val) {
+ flb_free(prop->key);
+ flb_free(prop);
+ return -1;
+ }
+
+ mk_list_add(&prop->_head, &cmd->stream_props);
+ return 0;
+}
+
+void flb_sp_cmd_stream_prop_del(struct flb_sp_cmd_prop *prop)
+{
+ if (prop->key) {
+ flb_sds_destroy(prop->key);
+ }
+ if (prop->val) {
+ flb_sds_destroy(prop->val);
+ }
+ flb_free(prop);
+}
+
+const char *flb_sp_cmd_stream_prop_get(struct flb_sp_cmd *cmd, const char *key)
+{
+ int len;
+ struct mk_list *head;
+ struct flb_sp_cmd_prop *prop;
+
+ if (!key) {
+ return NULL;
+ }
+ len = strlen(key);
+
+ mk_list_foreach(head, &cmd->stream_props) {
+ prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head);
+ if (flb_sds_len(prop->key) != len) {
+ continue;
+ }
+
+ if (strcmp(prop->key, key) == 0) {
+ return prop->val;
+ }
+ }
+
+ return NULL;
+}
+
+/* WINDOW functions */
+
+int flb_sp_cmd_window(struct flb_sp_cmd *cmd,
+ int window_type, int size, int time_unit,
+ int advance_by_size, int advance_by_time_unit)
+{
+ cmd->window.type = window_type;
+
+ switch (time_unit) {
+ case FLB_SP_TIME_SECOND:
+ cmd->window.size = (time_t) size;
+ break;
+ case FLB_SP_TIME_MINUTE:
+ cmd->window.size = (time_t) size * 60;
+ break;
+ case FLB_SP_TIME_HOUR:
+ cmd->window.size = (time_t) size * 3600;
+ break;
+ }
+
+ if (window_type == FLB_SP_WINDOW_HOPPING) {
+ switch (advance_by_time_unit) {
+ case FLB_SP_TIME_SECOND:
+ cmd->window.advance_by = (time_t) advance_by_size;
+ break;
+ case FLB_SP_TIME_MINUTE:
+ cmd->window.advance_by = (time_t) advance_by_size * 60;
+ break;
+ case FLB_SP_TIME_HOUR:
+ cmd->window.advance_by = (time_t) advance_by_size * 3600;
+ break;
+ }
+
+ if (cmd->window.advance_by >= cmd->window.size) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/* WHERE <condition> functions */
+
+struct flb_exp *flb_sp_cmd_operation(struct flb_sp_cmd *cmd,
+ struct flb_exp *e1, struct flb_exp *e2,
+ int operation)
+{
+ struct flb_exp_op *expression;
+
+ expression = flb_malloc(sizeof(struct flb_exp_op));
+ if (!expression) {
+ flb_errno();
+ return NULL;
+ }
+
+ expression->type = FLB_LOGICAL_OP;
+ expression->left = e1;
+ expression->right = e2;
+ expression->operation = operation;
+ mk_list_add(&expression->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) expression;
+}
+
+struct flb_exp *flb_sp_cmd_comparison(struct flb_sp_cmd *cmd,
+ struct flb_exp *key, struct flb_exp *val,
+ int operation)
+{
+ struct flb_exp_op *expression;
+
+ expression = flb_malloc(sizeof(struct flb_exp_op));
+ if (!expression) {
+ flb_errno();
+ return NULL;
+ }
+
+ expression->type = FLB_LOGICAL_OP;
+ expression->left = (struct flb_exp *) key;
+ expression->right = (struct flb_exp *) val;
+ expression->operation = operation;
+ mk_list_add(&expression->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) expression;
+}
+
+struct flb_exp *flb_sp_cmd_condition_key(struct flb_sp_cmd *cmd,
+ const char *identifier)
+{
+ int ret;
+ struct flb_exp_key *key;
+
+ key = flb_calloc(1, sizeof(struct flb_exp_key));
+ if (!key) {
+ flb_errno();
+ return NULL;
+ }
+
+ key->type = FLB_EXP_KEY;
+ key->name = flb_sds_create(identifier);
+ mk_list_add(&key->_head, &cmd->cond_list);
+
+ if (mk_list_size(cmd->tmp_subkeys) > 0) {
+ ret = swap_tmp_subkeys(&key->subkeys, cmd);
+ if (ret == -1) {
+ flb_sds_destroy(key->name);
+ mk_list_del(&key->_head);
+ flb_free(key);
+ return NULL;
+ }
+ }
+
+ return (struct flb_exp *) key;
+}
+
+struct flb_exp *flb_sp_cmd_condition_integer(struct flb_sp_cmd *cmd,
+ int integer)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_INT;
+ val->val.i64 = integer;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_float(struct flb_sp_cmd *cmd, float fval)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_FLOAT;
+ val->val.f64 = fval;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_string(struct flb_sp_cmd *cmd,
+ const char *string)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_STRING;
+ val->val.string = flb_sds_create(string);
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_boolean(struct flb_sp_cmd *cmd,
+ bool boolean)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_BOOL;
+ val->val.boolean = boolean;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_null(struct flb_sp_cmd *cmd)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_NULL;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_record_function_add(struct flb_sp_cmd *cmd,
+ char *name, struct flb_exp *param)
+{
+ char *rf_name;
+ int i;
+ struct flb_exp_func *func;
+
+ for (i = 0; i < RECORD_FUNCTIONS_SIZE; i++)
+ {
+ rf_name = record_functions[i];
+ if (strncmp(rf_name, name, strlen(rf_name)) == 0)
+ {
+ func = flb_calloc(1, sizeof(struct flb_exp_func));
+ if (!func) {
+ flb_errno();
+ return NULL;
+ }
+
+ func->type = FLB_EXP_FUNC;
+ func->name = flb_sds_create(name);
+ func->cb_func = record_functions_ptr[i];
+ func->param = param;
+
+ mk_list_add(&func->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) func;
+ }
+ }
+
+ return NULL;
+}
+
+void flb_sp_cmd_condition_add(struct flb_sp_cmd *cmd, struct flb_exp *e)
+
+{
+ cmd->condition = e;
+}
+
+int flb_sp_cmd_gb_key_add(struct flb_sp_cmd *cmd, const char *key)
+{
+ int ret;
+ struct flb_sp_cmd_gb_key *gb_key;
+
+ gb_key = flb_calloc(1, sizeof(struct flb_sp_cmd_gb_key));
+ if (!gb_key) {
+ flb_errno();
+ return -1;
+ }
+
+ gb_key->name = flb_sds_create(key);
+ if (!gb_key->name) {
+ flb_free(gb_key);
+ return -1;
+ }
+
+ gb_key->id = mk_list_size(&cmd->gb_keys);
+ mk_list_add(&gb_key->_head, &cmd->gb_keys);
+
+ /* Lookup for any subkeys in the temporary list */
+ if (mk_list_size(cmd->tmp_subkeys) > 0) {
+ ret = swap_tmp_subkeys(&gb_key->subkeys, cmd);
+ if (ret == -1) {
+ flb_sds_destroy(gb_key->name);
+ mk_list_del(&gb_key->_head);
+ flb_free(gb_key);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+void flb_sp_cmd_condition_del(struct flb_sp_cmd *cmd)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_exp *exp;
+ struct flb_exp_key *key;
+ struct flb_exp_val *val;
+ struct flb_exp_func *func;
+
+ mk_list_foreach_safe(head, tmp, &cmd->cond_list) {
+ exp = mk_list_entry(head, struct flb_exp, _head);
+ if (exp->type == FLB_EXP_KEY) {
+ key = (struct flb_exp_key *) exp;
+ flb_sds_destroy(key->name);
+ if (key->subkeys) {
+ flb_slist_destroy(key->subkeys);
+ flb_free(key->subkeys);
+ }
+ }
+ else if (exp->type == FLB_EXP_STRING) {
+ val = (struct flb_exp_val *) exp;
+ flb_sds_destroy(val->val.string);
+ }
+ else if (exp->type == FLB_EXP_FUNC) {
+ func = (struct flb_exp_func *) exp;
+ flb_sds_destroy(func->name);
+ }
+
+ mk_list_del(&exp->_head);
+ flb_free(exp);
+ }
+}
+
+void flb_sp_cmd_limit_add(struct flb_sp_cmd *cmd, int limit)
+{
+ cmd->limit = limit;
+}
+
+int flb_sp_cmd_timeseries_forecast(struct flb_sp_cmd *cmd, int func, const char *key_name, int seconds)
+{
+ struct flb_sp_cmd_key *key;
+
+ key = flb_sp_key_create(cmd, func, key_name, cmd->alias);
+
+ if (!key) {
+ return -1;
+ }
+
+ mk_list_add(&key->_head, &cmd->keys);
+
+ key->constant = seconds;
+
+ /* free key alias and set cmd->alias to null */
+ if (cmd->alias) {
+ flb_free(cmd->alias);
+ cmd->alias = NULL;
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/src/stream_processor/parser/sql.l b/fluent-bit/src/stream_processor/parser/sql.l
new file mode 100644
index 000000000..91e5398e1
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/sql.l
@@ -0,0 +1,190 @@
+%option prefix="flb_sp_"
+%option caseless
+%option 8bit reentrant bison-bridge
+%option warn noyywrap nodefault
+%option nounput
+%option noinput
+
+
+%{
+#include <stdio.h>
+#include <stdbool.h>
+#include <ctype.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_log.h>
+#include "sql_parser.h"
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+
+static inline char *remove_dup_qoutes(const char *s, size_t n)
+{
+ char *str;
+ int dups;
+ int i, j;
+
+ dups = 0;
+ for (i = 0; i < n; i++) {
+ if (s[i] == '\'') {
+ dups++;
+ i++;
+ }
+ }
+
+ str = (char *) flb_malloc(n - dups + 1);
+ if (!str) {
+ return NULL;
+ }
+
+ j = 0;
+ for (i = 0; i < n; i++, j++) {
+ if (s[i] == '\'') {
+ str[j] = '\'';
+ i++;
+ } else {
+ str[j] = s[i];
+ }
+ }
+ str[j] = '\0';
+
+ return str;
+}
+
+char* to_upper(char* token, size_t len)
+{
+ int i;
+ char* token_;
+
+ token_ = flb_malloc(len * sizeof(char) + 1);
+
+ for (i = 0; i < len; i++) {
+ token_[i] = toupper(token[i]);
+ }
+
+ token_[len] = '\0';
+ return token_;
+}
+
+int func_to_code(char* name, size_t len)
+{
+ int code;
+ char* name_;
+
+ name_ = to_upper(name, len);
+ code = -1;
+
+ if (!strcmp(name_, "AVG")) {
+ code = FLB_SP_AVG;
+ } else if (!strcmp(name_, "SUM")) {
+ code = FLB_SP_SUM;
+ } else if (!strcmp(name_, "COUNT")) {
+ code = FLB_SP_COUNT;
+ } else if (!strcmp(name_, "MIN")) {
+ code = FLB_SP_MIN;
+ } else if (!strcmp(name_, "MAX")) {
+ code = FLB_SP_MAX;
+ } else if (!strcmp(name_, "TIMESERIES_FORECAST")) {
+ code = FLB_SP_FORECAST;
+ } else if (!strcmp(name_, "NOW")) {
+ code = FLB_SP_NOW;
+ } else if (!strcmp(name_, "UNIX_TIMESTAMP")) {
+ code = FLB_SP_UNIX_TIMESTAMP;
+ } else if (!strcmp(name_, "RECORD_TAG")) {
+ code = FLB_SP_RECORD_TAG;
+ } else if (!strcmp(name_, "RECORD_TIME")) {
+ code = FLB_SP_RECORD_TIME;
+ }
+
+ flb_free(name_);
+ return code;
+}
+
+%}
+
+%%
+
+ /* SQL */
+CREATE return CREATE;
+FLUSH return FLUSH;
+STREAM return STREAM;
+SNAPSHOT return SNAPSHOT;
+WITH return WITH;
+SELECT return SELECT;
+AS return AS;
+FROM return FROM;
+STREAM: return FROM_STREAM;
+TAG: return FROM_TAG;
+WHERE return WHERE;
+AND return AND;
+OR return OR;
+NOT return NOT;
+WINDOW return WINDOW;
+"GROUP BY" return GROUP_BY;
+LIMIT return LIMIT;
+
+IS return IS;
+NULL return NUL;
+
+ /* Aggregation Functions */
+SUM {yylval->integer = func_to_code(yytext, yyleng); return SUM;}
+AVG {yylval->integer = func_to_code(yytext, yyleng); return AVG;}
+COUNT {yylval->integer = func_to_code(yytext, yyleng); return COUNT;}
+MIN {yylval->integer = func_to_code(yytext, yyleng); return MIN;}
+MAX {yylval->integer = func_to_code(yytext, yyleng); return MAX;}
+TIMESERIES_FORECAST {yylval->integer = func_to_code(yytext, yyleng); return TIMESERIES_FORECAST;};
+
+ /* Record Functions */
+@RECORD return RECORD;
+CONTAINS return CONTAINS;
+TIME return TIME;
+
+
+ /* Window Types */
+TUMBLING return TUMBLING;
+HOPPING return HOPPING;
+"ADVANCE BY" return ADVANCE_BY;
+
+ /* Time */
+HOUR return HOUR;
+MINUTE return MINUTE;
+SECOND return SECOND;
+
+ /* Date / Time Functions */
+NOW {yylval->integer = func_to_code(yytext, yyleng); return NOW;}
+UNIX_TIMESTAMP {yylval->integer = func_to_code(yytext, yyleng); return UNIX_TIMESTAMP;}
+
+ /* Record information */
+RECORD_TAG {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TAG;}
+RECORD_TIME {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TIME;}
+
+"true" { yylval->boolean = true; return BOOLTYPE; };
+"false" { yylval->boolean = false; return BOOLTYPE; };
+
+-?[1-9][0-9]*|0 { yylval->integer = atoi(yytext); return INTEGER; }
+(-?[1-9][0-9]*|0)\.[0-9]+ { yylval->fval = atof(yytext); return FLOATING; }
+\'([^']|'{2})*\' { yylval->string = remove_dup_qoutes(yytext + 1, yyleng - 2); return STRING; }
+
+[_A-Za-z][A-Za-z0-9_.]* { yylval->string = flb_strdup(yytext); return IDENTIFIER; }
+
+"*" |
+"," |
+"=" |
+"(" |
+")" |
+"[" |
+"]" |
+"." |
+";" { return yytext[0]; }
+
+"!=" return NEQ;
+"<>" return NEQ;
+"<" return LT;
+"<=" return LTE;
+">" return GT;
+">=" return GTE;
+
+\' return QUOTE;
+\n
+[ \t]+ /* ignore whitespace */;
+
+. flb_error("[sp] bad input character '%s' at line %d", yytext, yylineno);
+
+%%
diff --git a/fluent-bit/src/stream_processor/parser/sql.y b/fluent-bit/src/stream_processor/parser/sql.y
new file mode 100644
index 000000000..866f95cc0
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/sql.y
@@ -0,0 +1,437 @@
+%name-prefix="flb_sp_" // replace with %define api.prefix {flb_sp_}
+%define api.pure full
+%define parse.error verbose
+%parse-param { struct flb_sp_cmd *cmd };
+%parse-param { const char *query };
+%lex-param { void *scanner }
+%parse-param { void *scanner }
+
+%{ // definition section (prologue)
+#include <stdio.h>
+#include <stdlib.h>
+#include <ctype.h>
+
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_slist.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+
+#include "sql_parser.h"
+#include "sql_lex.h"
+
+extern int yylex();
+
+void yyerror(struct flb_sp_cmd *cmd, const char *query, void *scanner, const char *str)
+{
+ flb_error("[sp] %s at '%s'", str, query);
+}
+
+%} /* EOF C code */
+
+/* Bison declarations */
+/* Known Tokens (refer to sql.l) */
+
+/* Keywords */
+%token IDENTIFIER QUOTE
+
+/* Basic keywords for statements */
+%token CREATE STREAM SNAPSHOT FLUSH WITH SELECT AS FROM FROM_STREAM FROM_TAG
+%token WHERE WINDOW GROUP_BY LIMIT
+
+/* Null keywords */
+%token IS NUL
+
+/* Aggregation functions */
+%token AVG SUM COUNT MAX MIN TIMESERIES_FORECAST
+
+/* Record functions */
+%token RECORD CONTAINS TIME
+
+/* Time functions */
+%token NOW UNIX_TIMESTAMP
+
+ /* Record functions */
+%token RECORD_TAG RECORD_TIME
+
+/* Value types */
+%token INTEGER FLOATING STRING BOOLTYPE
+
+/* Logical operation tokens */
+%token AND OR NOT NEQ LT LTE GT GTE
+
+/* Time tokens */
+%token HOUR MINUTE SECOND
+
+/* Window tokens */
+%token TUMBLING HOPPING ADVANCE_BY
+
+/* Union and field types */
+%union
+{
+ bool boolean;
+ int integer;
+ float fval;
+ char *string;
+ struct flb_sp_cmd *cmd;
+ struct flb_exp *expression;
+}
+
+%type <boolean> BOOLTYPE
+%type <integer> INTEGER
+%type <fval> FLOATING
+%type <string> IDENTIFIER
+%type <string> STRING
+%type <string> record_keys
+%type <string> record_key
+%type <string> prop_key
+%type <string> prop_val
+%type <expression> condition
+%type <expression> comparison
+%type <expression> key
+%type <expression> record_func
+%type <expression> value
+%type <expression> null
+%type <integer> time
+
+%type <integer> time_record_func
+%type <integer> NOW UNIX_TIMESTAMP RECORD_TAG RECORD_TIME
+
+%type <integer> aggregate_func
+%type <integer> COUNT AVG SUM MAX MIN TIMESERIES_FORECAST
+
+
+%destructor { flb_free ($$); } IDENTIFIER
+
+%% /* rules section */
+
+statements: create | select
+
+/* Parser for 'CREATE STREAM' statement */
+create:
+ CREATE STREAM IDENTIFIER AS select
+ {
+ flb_sp_cmd_stream_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ CREATE STREAM IDENTIFIER WITH '(' properties ')' AS select
+ {
+ flb_sp_cmd_stream_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ CREATE SNAPSHOT IDENTIFIER AS SELECT '*' FROM source limit ';'
+ {
+ flb_sp_cmd_snapshot_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ CREATE SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source limit ';'
+ {
+ flb_sp_cmd_snapshot_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ FLUSH SNAPSHOT IDENTIFIER AS SELECT '*' FROM source where ';'
+ {
+ flb_sp_cmd_snapshot_flush_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ FLUSH SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source where ';'
+ {
+ flb_sp_cmd_snapshot_flush_new(cmd, $3);
+ flb_free($3);
+ }
+ properties: property
+ |
+ properties ',' property
+ property: prop_key '=' prop_val
+ {
+ flb_sp_cmd_stream_prop_add(cmd, $1, $3);
+ flb_free($1);
+ flb_free($3);
+ }
+ prop_key: IDENTIFIER
+ prop_val: STRING
+
+/* Parser for 'SELECT' statement */
+select: SELECT keys FROM source window where groupby limit ';'
+ {
+ cmd->type = FLB_SP_SELECT;
+ }
+ keys: record_keys
+ record_keys: record_key
+ |
+ record_keys ',' record_key
+ record_key: '*'
+ {
+ flb_sp_cmd_key_add(cmd, -1, NULL);
+ }
+ |
+ IDENTIFIER key_alias
+ {
+ flb_sp_cmd_key_add(cmd, -1, $1);
+ flb_free($1);
+ }
+ |
+ IDENTIFIER record_subkey key_alias
+ {
+ flb_sp_cmd_key_add(cmd, -1, $1);
+ flb_free($1);
+ }
+ |
+ COUNT '(' '*' ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, NULL);
+ }
+ |
+ COUNT '(' IDENTIFIER ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ COUNT '(' IDENTIFIER record_subkey ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ aggregate_func '(' IDENTIFIER ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ aggregate_func '(' IDENTIFIER record_subkey ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ TIMESERIES_FORECAST '(' IDENTIFIER ',' INTEGER ')' key_alias
+ {
+ flb_sp_cmd_timeseries_forecast(cmd, $1, $3, $5);
+ flb_free($3);
+ }
+ |
+ time_record_func '(' ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, NULL);
+ }
+ aggregate_func:
+ AVG | SUM | MAX | MIN
+ time_record_func:
+ NOW | UNIX_TIMESTAMP | RECORD_TAG | RECORD_TIME
+ key_alias:
+ %empty
+ |
+ AS IDENTIFIER
+ {
+ flb_sp_cmd_alias_add(cmd, $2);
+ }
+ record_subkey: '[' STRING ']'
+ {
+ flb_slist_add(cmd->tmp_subkeys, $2);
+ flb_free($2);
+ }
+ |
+ record_subkey record_subkey
+ source: FROM_STREAM IDENTIFIER
+ {
+ flb_sp_cmd_source(cmd, FLB_SP_STREAM, $2);
+ flb_free($2);
+ }
+ |
+ FROM_TAG STRING
+ {
+ flb_sp_cmd_source(cmd, FLB_SP_TAG, $2);
+ flb_free($2);
+ }
+ window: %empty
+ |
+ WINDOW window_spec
+ where: %empty
+ |
+ WHERE condition
+ {
+ flb_sp_cmd_condition_add(cmd, $2);
+ }
+ groupby: %empty
+ |
+ GROUP_BY gb_keys
+ limit: %empty
+ |
+ LIMIT INTEGER
+ {
+ flb_sp_cmd_limit_add(cmd, $2);
+ }
+ window_spec:
+ TUMBLING '(' INTEGER time ')'
+ {
+ flb_sp_cmd_window(cmd, FLB_SP_WINDOW_TUMBLING, $3, $4, 0, 0);
+ }
+ |
+ HOPPING '(' INTEGER time ',' ADVANCE_BY INTEGER time ')'
+ {
+ flb_sp_cmd_window(cmd, FLB_SP_WINDOW_HOPPING, $3, $4, $7, $8);
+ }
+ condition: comparison
+ |
+ key
+ {
+ $$ = flb_sp_cmd_operation(cmd, $1, NULL, FLB_EXP_OR);
+ }
+ |
+ value
+ {
+ $$ = flb_sp_cmd_operation(cmd, NULL, $1, FLB_EXP_OR);
+ }
+ |
+ '(' condition ')'
+ {
+ $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_PAR);
+ }
+ |
+ NOT condition
+ {
+ $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_NOT);
+ }
+ |
+ condition AND condition
+ {
+ $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_AND);
+ }
+ |
+ condition OR condition
+ {
+ $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_OR);
+ }
+ comparison:
+ key IS null
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ);
+ }
+ |
+ key IS NOT null
+ {
+ $$ = flb_sp_cmd_operation(cmd,
+ flb_sp_cmd_comparison(cmd, $1, $4, FLB_EXP_EQ),
+ NULL, FLB_EXP_NOT);
+ }
+ |
+ record_func
+ {
+ $$ = flb_sp_cmd_comparison(cmd,
+ $1,
+ flb_sp_cmd_condition_boolean(cmd, true),
+ FLB_EXP_EQ);
+ }
+ |
+ record_func '=' value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ);
+ }
+ |
+ record_func NEQ value
+ {
+ $$ = flb_sp_cmd_operation(cmd,
+ flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ),
+ NULL, FLB_EXP_NOT)
+ ;
+ }
+ |
+ record_func LT value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LT);
+ }
+ |
+ record_func LTE value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LTE);
+ }
+ |
+ record_func GT value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GT);
+ }
+ |
+ record_func GTE value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GTE);
+ }
+ record_func: key /* Similar to an identity function */
+ |
+ RECORD '.' CONTAINS '(' key ')'
+ {
+ $$ = flb_sp_record_function_add(cmd, "contains", $5);
+ }
+ |
+ RECORD '.' TIME '(' ')'
+ {
+ $$ = flb_sp_record_function_add(cmd, "time", NULL);
+ }
+ key: IDENTIFIER
+ {
+ $$ = flb_sp_cmd_condition_key(cmd, $1);
+ flb_free($1);
+ }
+ |
+ IDENTIFIER record_subkey
+ {
+ $$ = flb_sp_cmd_condition_key(cmd, $1);
+ flb_free($1);
+ }
+ value: INTEGER
+ {
+ $$ = flb_sp_cmd_condition_integer(cmd, $1);
+ }
+ |
+ FLOATING
+ {
+ $$ = flb_sp_cmd_condition_float(cmd, $1);
+ }
+ |
+ STRING
+ {
+ $$ = flb_sp_cmd_condition_string(cmd, $1);
+ flb_free($1);
+ }
+ |
+ BOOLTYPE
+ {
+ $$ = flb_sp_cmd_condition_boolean(cmd, $1);
+ }
+ null: NUL
+ {
+ $$ = flb_sp_cmd_condition_null(cmd);
+ }
+ time: SECOND
+ {
+ $$ = FLB_SP_TIME_SECOND;
+ }
+ |
+ MINUTE
+ {
+ $$ = FLB_SP_TIME_MINUTE;
+ }
+ |
+ HOUR
+ {
+ $$ = FLB_SP_TIME_HOUR;
+ }
+ gb_keys: gb_key
+ |
+ gb_key ',' gb_keys
+ gb_key: IDENTIFIER
+ {
+ flb_sp_cmd_gb_key_add(cmd, $1);
+ flb_free($1);
+ }
+ |
+ IDENTIFIER record_subkey
+ {
+ flb_sp_cmd_gb_key_add(cmd, $1);
+ flb_free($1);
+ }
+ ;