summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/stream_processor/parser
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/stream_processor/parser')
-rw-r--r--fluent-bit/src/stream_processor/parser/CMakeLists.txt31
-rw-r--r--fluent-bit/src/stream_processor/parser/flb_sp_parser.c851
-rw-r--r--fluent-bit/src/stream_processor/parser/sql.l190
-rw-r--r--fluent-bit/src/stream_processor/parser/sql.y437
4 files changed, 1509 insertions, 0 deletions
diff --git a/fluent-bit/src/stream_processor/parser/CMakeLists.txt b/fluent-bit/src/stream_processor/parser/CMakeLists.txt
new file mode 100644
index 000000000..d14b02bbd
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/CMakeLists.txt
@@ -0,0 +1,31 @@
+flex_target(lexer sql.l "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c"
+ DEFINES_FILE "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.h"
+ )
+bison_target(parser sql.y "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c")
+
+set(sources
+ flb_sp_parser.c
+ )
+
+if(CMAKE_SYSTEM_NAME MATCHES "Windows")
+ FLB_DEFINITION(YY_NO_UNISTD_H)
+ message(STATUS "Specifying YY_NO_UNISTD_H")
+endif()
+
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_BINARY_DIR}
+ )
+
+add_library(flb-sp-parser STATIC
+ ${sources}
+ "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c"
+ "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c"
+ )
+
+add_flex_bison_dependency(lexer parser)
+add_dependencies(flb-sp-parser onigmo-static)
+
+if(FLB_JEMALLOC)
+ target_link_libraries(flb-sp-parser libjemalloc)
+endif()
diff --git a/fluent-bit/src/stream_processor/parser/flb_sp_parser.c b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c
new file mode 100644
index 000000000..429d0b4c1
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c
@@ -0,0 +1,851 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_slist.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h>
+#include <fluent-bit/stream_processor/flb_sp_record_func.h>
+
+#include "sql_parser.h"
+#include "sql_lex.h"
+
+static int swap_tmp_subkeys(struct mk_list **target, struct flb_sp_cmd *cmd)
+{
+ /* Map context keys into this command key structure */
+ *target = cmd->tmp_subkeys;
+
+ cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list));
+ if (!cmd->tmp_subkeys) {
+ flb_errno();
+ cmd->tmp_subkeys = *target;
+ cmd->status = FLB_SP_ERROR;
+ return -1;
+ }
+
+ flb_slist_create(cmd->tmp_subkeys);
+ return 0;
+}
+
+void flb_sp_cmd_destroy(struct flb_sp_cmd *cmd)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_sp_cmd_key *key;
+ struct flb_sp_cmd_gb_key *gb_key;
+ struct flb_sp_cmd_prop *prop;
+
+ /* remove keys */
+ mk_list_foreach_safe(head, tmp, &cmd->keys) {
+ key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+ mk_list_del(&key->_head);
+ flb_sp_cmd_key_del(key);
+ }
+
+ /* remove groupby keys */
+ mk_list_foreach_safe(head, tmp, &cmd->gb_keys) {
+ gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key, _head);
+ mk_list_del(&gb_key->_head);
+ flb_sp_cmd_gb_key_del(gb_key);
+ }
+
+ /* stream */
+ if (cmd->stream_name) {
+ mk_list_foreach_safe(head, tmp, &cmd->stream_props) {
+ prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head);
+ mk_list_del(&prop->_head);
+ flb_sp_cmd_stream_prop_del(prop);
+ }
+ flb_sds_destroy(cmd->stream_name);
+ }
+ flb_sds_destroy(cmd->source_name);
+
+ if (mk_list_size(&cmd->cond_list) > 0) {
+ flb_sp_cmd_condition_del(cmd);
+ }
+
+ if (cmd->tmp_subkeys) {
+ flb_slist_destroy(cmd->tmp_subkeys);
+ flb_free(cmd->tmp_subkeys);
+ }
+
+ flb_free(cmd);
+}
+
+void flb_sp_cmd_key_del(struct flb_sp_cmd_key *key)
+{
+ if (key->name) {
+ flb_sds_destroy(key->name);
+ }
+ if (key->alias) {
+ flb_sds_destroy(key->alias);
+ }
+ if (key->subkeys) {
+ flb_slist_destroy(key->subkeys);
+ flb_free(key->subkeys);
+ }
+ flb_free(key);
+}
+
+void flb_sp_cmd_gb_key_del(struct flb_sp_cmd_gb_key *key)
+{
+ if (key->name) {
+ flb_sds_destroy(key->name);
+ }
+ if (key->subkeys) {
+ flb_slist_destroy(key->subkeys);
+ flb_free(key->subkeys);
+ }
+ flb_free(key);
+}
+
+struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func,
+ const char *key_name,
+ const char *key_alias)
+{
+ char tmp_alias[256];
+ int s;
+ int ret;
+ int len;
+ int aggr_func = 0;
+ int time_func = 0;
+ int record_func = 0;
+ char *tmp;
+ struct mk_list *head;
+ struct flb_sp_cmd_key *key;
+ struct flb_slist_entry *entry;
+
+ /* aggregation function ? */
+ if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) {
+ aggr_func = func;
+ }
+ else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) {
+ /* Time function */
+ time_func = func;
+ }
+ else if (func >= FLB_SP_RECORD_TAG && func <= FLB_SP_RECORD_TIME) {
+ /* Record function */
+ record_func = func;
+ }
+
+ key = flb_calloc(1, sizeof(struct flb_sp_cmd_key));
+ if (!key) {
+ flb_errno();
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ key->gb_key = NULL;
+ key->subkeys = NULL;
+
+ /* key name and aliases works when the selection is not a wildcard */
+ if (key_name) {
+ key->name = flb_sds_create(key_name);
+ if (!key->name) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ }
+ else {
+ /*
+ * Wildcard key only allowed on:
+ * - no aggregation mode (left side / first entry)
+ * - aggregation using COUNT(*)
+ */
+ if (mk_list_size(&cmd->keys) > 0 && aggr_func == 0 &&
+ record_func == 0 && time_func == 0) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ }
+
+ if (key_alias) {
+ key->alias = flb_sds_create(key_alias);
+ if (!key->alias) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+ }
+
+ /* Aggregation function */
+ if (aggr_func > 0) {
+ key->aggr_func = aggr_func;
+ }
+ else if (time_func > 0) {
+ key->time_func = time_func;
+ }
+ else if (record_func > 0) {
+ key->record_func = record_func;
+ }
+
+ /* Lookup for any subkeys in the temporary list */
+ if (mk_list_size(cmd->tmp_subkeys) > 0) {
+ ret = swap_tmp_subkeys(&key->subkeys, cmd);
+ if (ret == -1) {
+ flb_sp_cmd_key_del(key);
+ cmd->status = FLB_SP_ERROR;
+ return NULL;
+ }
+
+ /* Compose a name key that include listed sub keys */
+ if (!key->alias) {
+ s = flb_sds_len(key->name) + (16 * mk_list_size(key->subkeys));
+ key->alias = flb_sds_create_size(s);
+ if (!key->alias) {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+
+ tmp = flb_sds_cat(key->alias, key->name, flb_sds_len(key->name));
+ if (tmp != key->alias) {
+ key->alias = tmp;
+ }
+
+ mk_list_foreach(head, key->subkeys) {
+ entry = mk_list_entry(head, struct flb_slist_entry, _head);
+
+ /* prefix */
+ tmp = flb_sds_cat(key->alias, "['", 2);
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+
+ /* selected key name */
+ tmp = flb_sds_cat(key->alias,
+ entry->str, flb_sds_len(entry->str));
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+
+ /* suffix */
+ tmp = flb_sds_cat(key->alias, "']", 2);
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+ }
+
+ if (aggr_func) {
+ len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)",
+ aggregate_func_string[aggr_func - 1], key->alias);
+
+ tmp = flb_sds_copy(key->alias, tmp_alias, len);
+ if (tmp) {
+ key->alias = tmp;
+ }
+ else {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+ }
+ }
+ }
+ else if (aggr_func && !key->alias) {
+ if (key->name) {
+ len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)",
+ aggregate_func_string[aggr_func - 1], key->name);
+ } else {
+ len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(*)",
+ aggregate_func_string[aggr_func - 1]);
+ }
+
+ key->alias = flb_sds_create_len(tmp_alias, len);
+ if (!key->alias) {
+ flb_sp_cmd_key_del(key);
+ return NULL;
+ }
+ }
+
+ return key;
+}
+
+int flb_sp_cmd_key_add(struct flb_sp_cmd *cmd, int func, const char *key_name)
+{
+ struct flb_sp_cmd_key *key;
+
+ key = flb_sp_key_create(cmd, func, key_name, cmd->alias);
+
+ if (!key) {
+ return -1;
+ }
+
+ mk_list_add(&key->_head, &cmd->keys);
+
+ /* free key alias and set cmd->alias to null */
+ if (cmd->alias) {
+ flb_free(cmd->alias);
+ cmd->alias = NULL;
+ }
+
+ return 0;
+}
+
+void flb_sp_cmd_alias_add(struct flb_sp_cmd *cmd, const char *key_alias)
+{
+ cmd->alias = (char *) key_alias;
+}
+
+int flb_sp_cmd_source(struct flb_sp_cmd *cmd, int type, const char *source)
+{
+ cmd->source_type = type;
+ cmd->source_name = flb_sds_create(source);
+ if (!cmd->source_name) {
+ flb_errno();
+ return -1;
+ }
+
+ return 0;
+}
+
+void flb_sp_cmd_dump(struct flb_sp_cmd *cmd)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_sp_cmd_key *key;
+
+ /* Lookup keys */
+ printf("== KEYS ==\n");
+ mk_list_foreach_safe(head, tmp, &cmd->keys) {
+ key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
+ printf("- '%s'\n", key->name);
+ }
+ printf("== SOURCE ==\n");
+ if (cmd->source_type == FLB_SP_STREAM) {
+ printf("stream => ");
+ }
+ else if (cmd->source_type == FLB_SP_TAG) {
+ printf("tag match => ");
+ }
+
+ printf("'%s'\n", cmd->source_name);
+}
+
+struct flb_sp_cmd *flb_sp_cmd_create(const char *sql)
+{
+ int ret;
+ yyscan_t scanner;
+ YY_BUFFER_STATE buf;
+ struct flb_sp_cmd *cmd;
+
+ /* create context */
+ cmd = flb_calloc(1, sizeof(struct flb_sp_cmd));
+ if (!cmd) {
+ flb_errno();
+ return NULL;
+ }
+ cmd->status = FLB_SP_OK;
+ cmd->type = FLB_SP_SELECT;
+
+ mk_list_init(&cmd->stream_props);
+ mk_list_init(&cmd->keys);
+
+ /* Condition linked list (we use them to free resources) */
+ mk_list_init(&cmd->cond_list);
+ mk_list_init(&cmd->gb_keys);
+
+ /* Allocate temporary list and initialize */
+ cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list));
+ if (!cmd->tmp_subkeys) {
+ flb_errno();
+ flb_free(cmd);
+ return NULL;
+ }
+ flb_slist_create(cmd->tmp_subkeys);
+
+ /* Flex/Bison work */
+ flb_sp_lex_init(&scanner);
+ buf = flb_sp__scan_string(sql, scanner);
+
+ ret = flb_sp_parse(cmd, sql, scanner);
+
+ flb_sp__delete_buffer(buf, scanner);
+ flb_sp_lex_destroy(scanner);
+
+ if (ret != 0) {
+ flb_sp_cmd_destroy(cmd);
+ return NULL;
+ }
+
+ return cmd;
+}
+
+int flb_sp_cmd_stream_new(struct flb_sp_cmd *cmd, const char *stream_name)
+{
+ cmd->stream_name = flb_sds_create(stream_name);
+ if (!cmd->stream_name) {
+ return -1;
+ }
+
+ cmd->type = FLB_SP_CREATE_STREAM;
+ return 0;
+}
+
+int flb_sp_cmd_snapshot_new(struct flb_sp_cmd *cmd, const char *snapshot_name)
+{
+ const char *tmp;
+
+ cmd->stream_name = flb_sds_create(snapshot_name);
+ if (!cmd->stream_name) {
+ return -1;
+ }
+
+ tmp = flb_sp_cmd_stream_prop_get(cmd, "tag");
+ if (!tmp) {
+ cmd->status = FLB_SP_ERROR;
+ flb_error("[sp] tag for snapshot is required. Add WITH(tag = <TAG>) to the snapshot %s",
+ snapshot_name);
+ return -1;
+ }
+
+ cmd->type = FLB_SP_CREATE_SNAPSHOT;
+
+ return 0;
+}
+
+int flb_sp_cmd_snapshot_flush_new(struct flb_sp_cmd *cmd, const char *snapshot_name)
+{
+ cmd->stream_name = flb_sds_cat(flb_sds_create("__flush_"),
+ snapshot_name, strlen(snapshot_name));
+
+ if (!cmd->stream_name) {
+ return -1;
+ }
+
+ cmd->type = FLB_SP_FLUSH_SNAPSHOT;
+
+ return 0;
+}
+
+int flb_sp_cmd_stream_prop_add(struct flb_sp_cmd *cmd, const char *key, const char *val)
+{
+ struct flb_sp_cmd_prop *prop;
+
+ prop = flb_malloc(sizeof(struct flb_sp_cmd_prop));
+ if (!prop) {
+ flb_errno();
+ return -1;
+ }
+
+ prop->key = flb_sds_create(key);
+ if (!prop->key) {
+ flb_free(prop);
+ return -1;
+ }
+
+ prop->val = flb_sds_create(val);
+ if (!prop->val) {
+ flb_free(prop->key);
+ flb_free(prop);
+ return -1;
+ }
+
+ mk_list_add(&prop->_head, &cmd->stream_props);
+ return 0;
+}
+
+void flb_sp_cmd_stream_prop_del(struct flb_sp_cmd_prop *prop)
+{
+ if (prop->key) {
+ flb_sds_destroy(prop->key);
+ }
+ if (prop->val) {
+ flb_sds_destroy(prop->val);
+ }
+ flb_free(prop);
+}
+
+const char *flb_sp_cmd_stream_prop_get(struct flb_sp_cmd *cmd, const char *key)
+{
+ int len;
+ struct mk_list *head;
+ struct flb_sp_cmd_prop *prop;
+
+ if (!key) {
+ return NULL;
+ }
+ len = strlen(key);
+
+ mk_list_foreach(head, &cmd->stream_props) {
+ prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head);
+ if (flb_sds_len(prop->key) != len) {
+ continue;
+ }
+
+ if (strcmp(prop->key, key) == 0) {
+ return prop->val;
+ }
+ }
+
+ return NULL;
+}
+
+/* WINDOW functions */
+
+int flb_sp_cmd_window(struct flb_sp_cmd *cmd,
+ int window_type, int size, int time_unit,
+ int advance_by_size, int advance_by_time_unit)
+{
+ cmd->window.type = window_type;
+
+ switch (time_unit) {
+ case FLB_SP_TIME_SECOND:
+ cmd->window.size = (time_t) size;
+ break;
+ case FLB_SP_TIME_MINUTE:
+ cmd->window.size = (time_t) size * 60;
+ break;
+ case FLB_SP_TIME_HOUR:
+ cmd->window.size = (time_t) size * 3600;
+ break;
+ }
+
+ if (window_type == FLB_SP_WINDOW_HOPPING) {
+ switch (advance_by_time_unit) {
+ case FLB_SP_TIME_SECOND:
+ cmd->window.advance_by = (time_t) advance_by_size;
+ break;
+ case FLB_SP_TIME_MINUTE:
+ cmd->window.advance_by = (time_t) advance_by_size * 60;
+ break;
+ case FLB_SP_TIME_HOUR:
+ cmd->window.advance_by = (time_t) advance_by_size * 3600;
+ break;
+ }
+
+ if (cmd->window.advance_by >= cmd->window.size) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/* WHERE <condition> functions */
+
+struct flb_exp *flb_sp_cmd_operation(struct flb_sp_cmd *cmd,
+ struct flb_exp *e1, struct flb_exp *e2,
+ int operation)
+{
+ struct flb_exp_op *expression;
+
+ expression = flb_malloc(sizeof(struct flb_exp_op));
+ if (!expression) {
+ flb_errno();
+ return NULL;
+ }
+
+ expression->type = FLB_LOGICAL_OP;
+ expression->left = e1;
+ expression->right = e2;
+ expression->operation = operation;
+ mk_list_add(&expression->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) expression;
+}
+
+struct flb_exp *flb_sp_cmd_comparison(struct flb_sp_cmd *cmd,
+ struct flb_exp *key, struct flb_exp *val,
+ int operation)
+{
+ struct flb_exp_op *expression;
+
+ expression = flb_malloc(sizeof(struct flb_exp_op));
+ if (!expression) {
+ flb_errno();
+ return NULL;
+ }
+
+ expression->type = FLB_LOGICAL_OP;
+ expression->left = (struct flb_exp *) key;
+ expression->right = (struct flb_exp *) val;
+ expression->operation = operation;
+ mk_list_add(&expression->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) expression;
+}
+
+struct flb_exp *flb_sp_cmd_condition_key(struct flb_sp_cmd *cmd,
+ const char *identifier)
+{
+ int ret;
+ struct flb_exp_key *key;
+
+ key = flb_calloc(1, sizeof(struct flb_exp_key));
+ if (!key) {
+ flb_errno();
+ return NULL;
+ }
+
+ key->type = FLB_EXP_KEY;
+ key->name = flb_sds_create(identifier);
+ mk_list_add(&key->_head, &cmd->cond_list);
+
+ if (mk_list_size(cmd->tmp_subkeys) > 0) {
+ ret = swap_tmp_subkeys(&key->subkeys, cmd);
+ if (ret == -1) {
+ flb_sds_destroy(key->name);
+ mk_list_del(&key->_head);
+ flb_free(key);
+ return NULL;
+ }
+ }
+
+ return (struct flb_exp *) key;
+}
+
+struct flb_exp *flb_sp_cmd_condition_integer(struct flb_sp_cmd *cmd,
+ int integer)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_INT;
+ val->val.i64 = integer;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_float(struct flb_sp_cmd *cmd, float fval)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_FLOAT;
+ val->val.f64 = fval;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_string(struct flb_sp_cmd *cmd,
+ const char *string)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_STRING;
+ val->val.string = flb_sds_create(string);
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_boolean(struct flb_sp_cmd *cmd,
+ bool boolean)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_BOOL;
+ val->val.boolean = boolean;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_cmd_condition_null(struct flb_sp_cmd *cmd)
+{
+ struct flb_exp_val *val;
+
+ val = flb_malloc(sizeof(struct flb_exp_val));
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ val->type = FLB_EXP_NULL;
+ mk_list_add(&val->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) val;
+}
+
+struct flb_exp *flb_sp_record_function_add(struct flb_sp_cmd *cmd,
+ char *name, struct flb_exp *param)
+{
+ char *rf_name;
+ int i;
+ struct flb_exp_func *func;
+
+ for (i = 0; i < RECORD_FUNCTIONS_SIZE; i++)
+ {
+ rf_name = record_functions[i];
+ if (strncmp(rf_name, name, strlen(rf_name)) == 0)
+ {
+ func = flb_calloc(1, sizeof(struct flb_exp_func));
+ if (!func) {
+ flb_errno();
+ return NULL;
+ }
+
+ func->type = FLB_EXP_FUNC;
+ func->name = flb_sds_create(name);
+ func->cb_func = record_functions_ptr[i];
+ func->param = param;
+
+ mk_list_add(&func->_head, &cmd->cond_list);
+
+ return (struct flb_exp *) func;
+ }
+ }
+
+ return NULL;
+}
+
+void flb_sp_cmd_condition_add(struct flb_sp_cmd *cmd, struct flb_exp *e)
+
+{
+ cmd->condition = e;
+}
+
+int flb_sp_cmd_gb_key_add(struct flb_sp_cmd *cmd, const char *key)
+{
+ int ret;
+ struct flb_sp_cmd_gb_key *gb_key;
+
+ gb_key = flb_calloc(1, sizeof(struct flb_sp_cmd_gb_key));
+ if (!gb_key) {
+ flb_errno();
+ return -1;
+ }
+
+ gb_key->name = flb_sds_create(key);
+ if (!gb_key->name) {
+ flb_free(gb_key);
+ return -1;
+ }
+
+ gb_key->id = mk_list_size(&cmd->gb_keys);
+ mk_list_add(&gb_key->_head, &cmd->gb_keys);
+
+ /* Lookup for any subkeys in the temporary list */
+ if (mk_list_size(cmd->tmp_subkeys) > 0) {
+ ret = swap_tmp_subkeys(&gb_key->subkeys, cmd);
+ if (ret == -1) {
+ flb_sds_destroy(gb_key->name);
+ mk_list_del(&gb_key->_head);
+ flb_free(gb_key);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+void flb_sp_cmd_condition_del(struct flb_sp_cmd *cmd)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_exp *exp;
+ struct flb_exp_key *key;
+ struct flb_exp_val *val;
+ struct flb_exp_func *func;
+
+ mk_list_foreach_safe(head, tmp, &cmd->cond_list) {
+ exp = mk_list_entry(head, struct flb_exp, _head);
+ if (exp->type == FLB_EXP_KEY) {
+ key = (struct flb_exp_key *) exp;
+ flb_sds_destroy(key->name);
+ if (key->subkeys) {
+ flb_slist_destroy(key->subkeys);
+ flb_free(key->subkeys);
+ }
+ }
+ else if (exp->type == FLB_EXP_STRING) {
+ val = (struct flb_exp_val *) exp;
+ flb_sds_destroy(val->val.string);
+ }
+ else if (exp->type == FLB_EXP_FUNC) {
+ func = (struct flb_exp_func *) exp;
+ flb_sds_destroy(func->name);
+ }
+
+ mk_list_del(&exp->_head);
+ flb_free(exp);
+ }
+}
+
+void flb_sp_cmd_limit_add(struct flb_sp_cmd *cmd, int limit)
+{
+ cmd->limit = limit;
+}
+
+int flb_sp_cmd_timeseries_forecast(struct flb_sp_cmd *cmd, int func, const char *key_name, int seconds)
+{
+ struct flb_sp_cmd_key *key;
+
+ key = flb_sp_key_create(cmd, func, key_name, cmd->alias);
+
+ if (!key) {
+ return -1;
+ }
+
+ mk_list_add(&key->_head, &cmd->keys);
+
+ key->constant = seconds;
+
+ /* free key alias and set cmd->alias to null */
+ if (cmd->alias) {
+ flb_free(cmd->alias);
+ cmd->alias = NULL;
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/src/stream_processor/parser/sql.l b/fluent-bit/src/stream_processor/parser/sql.l
new file mode 100644
index 000000000..91e5398e1
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/sql.l
@@ -0,0 +1,190 @@
+%option prefix="flb_sp_"
+%option caseless
+%option 8bit reentrant bison-bridge
+%option warn noyywrap nodefault
+%option nounput
+%option noinput
+
+
+%{
+#include <stdio.h>
+#include <stdbool.h>
+#include <ctype.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_log.h>
+#include "sql_parser.h"
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+
+static inline char *remove_dup_qoutes(const char *s, size_t n)
+{
+ char *str;
+ int dups;
+ int i, j;
+
+ dups = 0;
+ for (i = 0; i < n; i++) {
+ if (s[i] == '\'') {
+ dups++;
+ i++;
+ }
+ }
+
+ str = (char *) flb_malloc(n - dups + 1);
+ if (!str) {
+ return NULL;
+ }
+
+ j = 0;
+ for (i = 0; i < n; i++, j++) {
+ if (s[i] == '\'') {
+ str[j] = '\'';
+ i++;
+ } else {
+ str[j] = s[i];
+ }
+ }
+ str[j] = '\0';
+
+ return str;
+}
+
+char* to_upper(char* token, size_t len)
+{
+ int i;
+ char* token_;
+
+ token_ = flb_malloc(len * sizeof(char) + 1);
+
+ for (i = 0; i < len; i++) {
+ token_[i] = toupper(token[i]);
+ }
+
+ token_[len] = '\0';
+ return token_;
+}
+
+int func_to_code(char* name, size_t len)
+{
+ int code;
+ char* name_;
+
+ name_ = to_upper(name, len);
+ code = -1;
+
+ if (!strcmp(name_, "AVG")) {
+ code = FLB_SP_AVG;
+ } else if (!strcmp(name_, "SUM")) {
+ code = FLB_SP_SUM;
+ } else if (!strcmp(name_, "COUNT")) {
+ code = FLB_SP_COUNT;
+ } else if (!strcmp(name_, "MIN")) {
+ code = FLB_SP_MIN;
+ } else if (!strcmp(name_, "MAX")) {
+ code = FLB_SP_MAX;
+ } else if (!strcmp(name_, "TIMESERIES_FORECAST")) {
+ code = FLB_SP_FORECAST;
+ } else if (!strcmp(name_, "NOW")) {
+ code = FLB_SP_NOW;
+ } else if (!strcmp(name_, "UNIX_TIMESTAMP")) {
+ code = FLB_SP_UNIX_TIMESTAMP;
+ } else if (!strcmp(name_, "RECORD_TAG")) {
+ code = FLB_SP_RECORD_TAG;
+ } else if (!strcmp(name_, "RECORD_TIME")) {
+ code = FLB_SP_RECORD_TIME;
+ }
+
+ flb_free(name_);
+ return code;
+}
+
+%}
+
+%%
+
+ /* SQL */
+CREATE return CREATE;
+FLUSH return FLUSH;
+STREAM return STREAM;
+SNAPSHOT return SNAPSHOT;
+WITH return WITH;
+SELECT return SELECT;
+AS return AS;
+FROM return FROM;
+STREAM: return FROM_STREAM;
+TAG: return FROM_TAG;
+WHERE return WHERE;
+AND return AND;
+OR return OR;
+NOT return NOT;
+WINDOW return WINDOW;
+"GROUP BY" return GROUP_BY;
+LIMIT return LIMIT;
+
+IS return IS;
+NULL return NUL;
+
+ /* Aggregation Functions */
+SUM {yylval->integer = func_to_code(yytext, yyleng); return SUM;}
+AVG {yylval->integer = func_to_code(yytext, yyleng); return AVG;}
+COUNT {yylval->integer = func_to_code(yytext, yyleng); return COUNT;}
+MIN {yylval->integer = func_to_code(yytext, yyleng); return MIN;}
+MAX {yylval->integer = func_to_code(yytext, yyleng); return MAX;}
+TIMESERIES_FORECAST {yylval->integer = func_to_code(yytext, yyleng); return TIMESERIES_FORECAST;};
+
+ /* Record Functions */
+@RECORD return RECORD;
+CONTAINS return CONTAINS;
+TIME return TIME;
+
+
+ /* Window Types */
+TUMBLING return TUMBLING;
+HOPPING return HOPPING;
+"ADVANCE BY" return ADVANCE_BY;
+
+ /* Time */
+HOUR return HOUR;
+MINUTE return MINUTE;
+SECOND return SECOND;
+
+ /* Date / Time Functions */
+NOW {yylval->integer = func_to_code(yytext, yyleng); return NOW;}
+UNIX_TIMESTAMP {yylval->integer = func_to_code(yytext, yyleng); return UNIX_TIMESTAMP;}
+
+ /* Record information */
+RECORD_TAG {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TAG;}
+RECORD_TIME {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TIME;}
+
+"true" { yylval->boolean = true; return BOOLTYPE; };
+"false" { yylval->boolean = false; return BOOLTYPE; };
+
+-?[1-9][0-9]*|0 { yylval->integer = atoi(yytext); return INTEGER; }
+(-?[1-9][0-9]*|0)\.[0-9]+ { yylval->fval = atof(yytext); return FLOATING; }
+\'([^']|'{2})*\' { yylval->string = remove_dup_qoutes(yytext + 1, yyleng - 2); return STRING; }
+
+[_A-Za-z][A-Za-z0-9_.]* { yylval->string = flb_strdup(yytext); return IDENTIFIER; }
+
+"*" |
+"," |
+"=" |
+"(" |
+")" |
+"[" |
+"]" |
+"." |
+";" { return yytext[0]; }
+
+"!=" return NEQ;
+"<>" return NEQ;
+"<" return LT;
+"<=" return LTE;
+">" return GT;
+">=" return GTE;
+
+\' return QUOTE;
+\n
+[ \t]+ /* ignore whitespace */;
+
+. flb_error("[sp] bad input character '%s' at line %d", yytext, yylineno);
+
+%%
diff --git a/fluent-bit/src/stream_processor/parser/sql.y b/fluent-bit/src/stream_processor/parser/sql.y
new file mode 100644
index 000000000..866f95cc0
--- /dev/null
+++ b/fluent-bit/src/stream_processor/parser/sql.y
@@ -0,0 +1,437 @@
+%name-prefix="flb_sp_" // replace with %define api.prefix {flb_sp_}
+%define api.pure full
+%define parse.error verbose
+%parse-param { struct flb_sp_cmd *cmd };
+%parse-param { const char *query };
+%lex-param { void *scanner }
+%parse-param { void *scanner }
+
+%{ // definition section (prologue)
+#include <stdio.h>
+#include <stdlib.h>
+#include <ctype.h>
+
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_slist.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+
+#include "sql_parser.h"
+#include "sql_lex.h"
+
+extern int yylex();
+
+void yyerror(struct flb_sp_cmd *cmd, const char *query, void *scanner, const char *str)
+{
+ flb_error("[sp] %s at '%s'", str, query);
+}
+
+%} /* EOF C code */
+
+/* Bison declarations */
+/* Known Tokens (refer to sql.l) */
+
+/* Keywords */
+%token IDENTIFIER QUOTE
+
+/* Basic keywords for statements */
+%token CREATE STREAM SNAPSHOT FLUSH WITH SELECT AS FROM FROM_STREAM FROM_TAG
+%token WHERE WINDOW GROUP_BY LIMIT
+
+/* Null keywords */
+%token IS NUL
+
+/* Aggregation functions */
+%token AVG SUM COUNT MAX MIN TIMESERIES_FORECAST
+
+/* Record functions */
+%token RECORD CONTAINS TIME
+
+/* Time functions */
+%token NOW UNIX_TIMESTAMP
+
+ /* Record functions */
+%token RECORD_TAG RECORD_TIME
+
+/* Value types */
+%token INTEGER FLOATING STRING BOOLTYPE
+
+/* Logical operation tokens */
+%token AND OR NOT NEQ LT LTE GT GTE
+
+/* Time tokens */
+%token HOUR MINUTE SECOND
+
+/* Window tokens */
+%token TUMBLING HOPPING ADVANCE_BY
+
+/* Union and field types */
+%union
+{
+ bool boolean;
+ int integer;
+ float fval;
+ char *string;
+ struct flb_sp_cmd *cmd;
+ struct flb_exp *expression;
+}
+
+%type <boolean> BOOLTYPE
+%type <integer> INTEGER
+%type <fval> FLOATING
+%type <string> IDENTIFIER
+%type <string> STRING
+%type <string> record_keys
+%type <string> record_key
+%type <string> prop_key
+%type <string> prop_val
+%type <expression> condition
+%type <expression> comparison
+%type <expression> key
+%type <expression> record_func
+%type <expression> value
+%type <expression> null
+%type <integer> time
+
+%type <integer> time_record_func
+%type <integer> NOW UNIX_TIMESTAMP RECORD_TAG RECORD_TIME
+
+%type <integer> aggregate_func
+%type <integer> COUNT AVG SUM MAX MIN TIMESERIES_FORECAST
+
+
+%destructor { flb_free ($$); } IDENTIFIER
+
+%% /* rules section */
+
+statements: create | select
+
+/* Parser for 'CREATE STREAM' statement */
+create:
+ CREATE STREAM IDENTIFIER AS select
+ {
+ flb_sp_cmd_stream_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ CREATE STREAM IDENTIFIER WITH '(' properties ')' AS select
+ {
+ flb_sp_cmd_stream_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ CREATE SNAPSHOT IDENTIFIER AS SELECT '*' FROM source limit ';'
+ {
+ flb_sp_cmd_snapshot_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ CREATE SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source limit ';'
+ {
+ flb_sp_cmd_snapshot_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ FLUSH SNAPSHOT IDENTIFIER AS SELECT '*' FROM source where ';'
+ {
+ flb_sp_cmd_snapshot_flush_new(cmd, $3);
+ flb_free($3);
+ }
+ |
+ FLUSH SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source where ';'
+ {
+ flb_sp_cmd_snapshot_flush_new(cmd, $3);
+ flb_free($3);
+ }
+ properties: property
+ |
+ properties ',' property
+ property: prop_key '=' prop_val
+ {
+ flb_sp_cmd_stream_prop_add(cmd, $1, $3);
+ flb_free($1);
+ flb_free($3);
+ }
+ prop_key: IDENTIFIER
+ prop_val: STRING
+
+/* Parser for 'SELECT' statement */
+select: SELECT keys FROM source window where groupby limit ';'
+ {
+ cmd->type = FLB_SP_SELECT;
+ }
+ keys: record_keys
+ record_keys: record_key
+ |
+ record_keys ',' record_key
+ record_key: '*'
+ {
+ flb_sp_cmd_key_add(cmd, -1, NULL);
+ }
+ |
+ IDENTIFIER key_alias
+ {
+ flb_sp_cmd_key_add(cmd, -1, $1);
+ flb_free($1);
+ }
+ |
+ IDENTIFIER record_subkey key_alias
+ {
+ flb_sp_cmd_key_add(cmd, -1, $1);
+ flb_free($1);
+ }
+ |
+ COUNT '(' '*' ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, NULL);
+ }
+ |
+ COUNT '(' IDENTIFIER ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ COUNT '(' IDENTIFIER record_subkey ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ aggregate_func '(' IDENTIFIER ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ aggregate_func '(' IDENTIFIER record_subkey ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, $3);
+ flb_free($3);
+ }
+ |
+ TIMESERIES_FORECAST '(' IDENTIFIER ',' INTEGER ')' key_alias
+ {
+ flb_sp_cmd_timeseries_forecast(cmd, $1, $3, $5);
+ flb_free($3);
+ }
+ |
+ time_record_func '(' ')' key_alias
+ {
+ flb_sp_cmd_key_add(cmd, $1, NULL);
+ }
+ aggregate_func:
+ AVG | SUM | MAX | MIN
+ time_record_func:
+ NOW | UNIX_TIMESTAMP | RECORD_TAG | RECORD_TIME
+ key_alias:
+ %empty
+ |
+ AS IDENTIFIER
+ {
+ flb_sp_cmd_alias_add(cmd, $2);
+ }
+ record_subkey: '[' STRING ']'
+ {
+ flb_slist_add(cmd->tmp_subkeys, $2);
+ flb_free($2);
+ }
+ |
+ record_subkey record_subkey
+ source: FROM_STREAM IDENTIFIER
+ {
+ flb_sp_cmd_source(cmd, FLB_SP_STREAM, $2);
+ flb_free($2);
+ }
+ |
+ FROM_TAG STRING
+ {
+ flb_sp_cmd_source(cmd, FLB_SP_TAG, $2);
+ flb_free($2);
+ }
+ window: %empty
+ |
+ WINDOW window_spec
+ where: %empty
+ |
+ WHERE condition
+ {
+ flb_sp_cmd_condition_add(cmd, $2);
+ }
+ groupby: %empty
+ |
+ GROUP_BY gb_keys
+ limit: %empty
+ |
+ LIMIT INTEGER
+ {
+ flb_sp_cmd_limit_add(cmd, $2);
+ }
+ window_spec:
+ TUMBLING '(' INTEGER time ')'
+ {
+ flb_sp_cmd_window(cmd, FLB_SP_WINDOW_TUMBLING, $3, $4, 0, 0);
+ }
+ |
+ HOPPING '(' INTEGER time ',' ADVANCE_BY INTEGER time ')'
+ {
+ flb_sp_cmd_window(cmd, FLB_SP_WINDOW_HOPPING, $3, $4, $7, $8);
+ }
+ condition: comparison
+ |
+ key
+ {
+ $$ = flb_sp_cmd_operation(cmd, $1, NULL, FLB_EXP_OR);
+ }
+ |
+ value
+ {
+ $$ = flb_sp_cmd_operation(cmd, NULL, $1, FLB_EXP_OR);
+ }
+ |
+ '(' condition ')'
+ {
+ $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_PAR);
+ }
+ |
+ NOT condition
+ {
+ $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_NOT);
+ }
+ |
+ condition AND condition
+ {
+ $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_AND);
+ }
+ |
+ condition OR condition
+ {
+ $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_OR);
+ }
+ comparison:
+ key IS null
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ);
+ }
+ |
+ key IS NOT null
+ {
+ $$ = flb_sp_cmd_operation(cmd,
+ flb_sp_cmd_comparison(cmd, $1, $4, FLB_EXP_EQ),
+ NULL, FLB_EXP_NOT);
+ }
+ |
+ record_func
+ {
+ $$ = flb_sp_cmd_comparison(cmd,
+ $1,
+ flb_sp_cmd_condition_boolean(cmd, true),
+ FLB_EXP_EQ);
+ }
+ |
+ record_func '=' value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ);
+ }
+ |
+ record_func NEQ value
+ {
+ $$ = flb_sp_cmd_operation(cmd,
+ flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ),
+ NULL, FLB_EXP_NOT)
+ ;
+ }
+ |
+ record_func LT value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LT);
+ }
+ |
+ record_func LTE value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LTE);
+ }
+ |
+ record_func GT value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GT);
+ }
+ |
+ record_func GTE value
+ {
+ $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GTE);
+ }
+ record_func: key /* Similar to an identity function */
+ |
+ RECORD '.' CONTAINS '(' key ')'
+ {
+ $$ = flb_sp_record_function_add(cmd, "contains", $5);
+ }
+ |
+ RECORD '.' TIME '(' ')'
+ {
+ $$ = flb_sp_record_function_add(cmd, "time", NULL);
+ }
+ key: IDENTIFIER
+ {
+ $$ = flb_sp_cmd_condition_key(cmd, $1);
+ flb_free($1);
+ }
+ |
+ IDENTIFIER record_subkey
+ {
+ $$ = flb_sp_cmd_condition_key(cmd, $1);
+ flb_free($1);
+ }
+ value: INTEGER
+ {
+ $$ = flb_sp_cmd_condition_integer(cmd, $1);
+ }
+ |
+ FLOATING
+ {
+ $$ = flb_sp_cmd_condition_float(cmd, $1);
+ }
+ |
+ STRING
+ {
+ $$ = flb_sp_cmd_condition_string(cmd, $1);
+ flb_free($1);
+ }
+ |
+ BOOLTYPE
+ {
+ $$ = flb_sp_cmd_condition_boolean(cmd, $1);
+ }
+ null: NUL
+ {
+ $$ = flb_sp_cmd_condition_null(cmd);
+ }
+ time: SECOND
+ {
+ $$ = FLB_SP_TIME_SECOND;
+ }
+ |
+ MINUTE
+ {
+ $$ = FLB_SP_TIME_MINUTE;
+ }
+ |
+ HOUR
+ {
+ $$ = FLB_SP_TIME_HOUR;
+ }
+ gb_keys: gb_key
+ |
+ gb_key ',' gb_keys
+ gb_key: IDENTIFIER
+ {
+ flb_sp_cmd_gb_key_add(cmd, $1);
+ flb_free($1);
+ }
+ |
+ IDENTIFIER record_subkey
+ {
+ flb_sp_cmd_gb_key_add(cmd, $1);
+ flb_free($1);
+ }
+ ;