diff options
Diffstat (limited to 'fluent-bit/src/stream_processor/parser')
-rw-r--r-- | fluent-bit/src/stream_processor/parser/CMakeLists.txt | 31 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/parser/flb_sp_parser.c | 851 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/parser/sql.l | 190 | ||||
-rw-r--r-- | fluent-bit/src/stream_processor/parser/sql.y | 437 |
4 files changed, 1509 insertions, 0 deletions
diff --git a/fluent-bit/src/stream_processor/parser/CMakeLists.txt b/fluent-bit/src/stream_processor/parser/CMakeLists.txt new file mode 100644 index 000000000..d14b02bbd --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/CMakeLists.txt @@ -0,0 +1,31 @@ +flex_target(lexer sql.l "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c" + DEFINES_FILE "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.h" + ) +bison_target(parser sql.y "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c") + +set(sources + flb_sp_parser.c + ) + +if(CMAKE_SYSTEM_NAME MATCHES "Windows") + FLB_DEFINITION(YY_NO_UNISTD_H) + message(STATUS "Specifying YY_NO_UNISTD_H") +endif() + +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + ) + +add_library(flb-sp-parser STATIC + ${sources} + "${CMAKE_CURRENT_BINARY_DIR}/sql_lex.c" + "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.c" + ) + +add_flex_bison_dependency(lexer parser) +add_dependencies(flb-sp-parser onigmo-static) + +if(FLB_JEMALLOC) + target_link_libraries(flb-sp-parser libjemalloc) +endif() diff --git a/fluent-bit/src/stream_processor/parser/flb_sp_parser.c b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c new file mode 100644 index 000000000..429d0b4c1 --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/flb_sp_parser.c @@ -0,0 +1,851 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> +#include <fluent-bit/stream_processor/flb_sp_aggregate_func.h> +#include <fluent-bit/stream_processor/flb_sp_record_func.h> + +#include "sql_parser.h" +#include "sql_lex.h" + +static int swap_tmp_subkeys(struct mk_list **target, struct flb_sp_cmd *cmd) +{ + /* Map context keys into this command key structure */ + *target = cmd->tmp_subkeys; + + cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list)); + if (!cmd->tmp_subkeys) { + flb_errno(); + cmd->tmp_subkeys = *target; + cmd->status = FLB_SP_ERROR; + return -1; + } + + flb_slist_create(cmd->tmp_subkeys); + return 0; +} + +void flb_sp_cmd_destroy(struct flb_sp_cmd *cmd) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_sp_cmd_key *key; + struct flb_sp_cmd_gb_key *gb_key; + struct flb_sp_cmd_prop *prop; + + /* remove keys */ + mk_list_foreach_safe(head, tmp, &cmd->keys) { + key = mk_list_entry(head, struct flb_sp_cmd_key, _head); + mk_list_del(&key->_head); + flb_sp_cmd_key_del(key); + } + + /* remove groupby keys */ + mk_list_foreach_safe(head, tmp, &cmd->gb_keys) { + gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key, _head); + mk_list_del(&gb_key->_head); + flb_sp_cmd_gb_key_del(gb_key); + } + + /* stream */ + if (cmd->stream_name) { + mk_list_foreach_safe(head, tmp, &cmd->stream_props) { + prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head); + mk_list_del(&prop->_head); + flb_sp_cmd_stream_prop_del(prop); + } + flb_sds_destroy(cmd->stream_name); + } + flb_sds_destroy(cmd->source_name); + + if (mk_list_size(&cmd->cond_list) > 0) { + flb_sp_cmd_condition_del(cmd); + } + + if (cmd->tmp_subkeys) { + flb_slist_destroy(cmd->tmp_subkeys); + flb_free(cmd->tmp_subkeys); + } + + flb_free(cmd); +} + +void flb_sp_cmd_key_del(struct flb_sp_cmd_key *key) +{ + if (key->name) { + flb_sds_destroy(key->name); + } + if (key->alias) { + flb_sds_destroy(key->alias); + } + if (key->subkeys) { + flb_slist_destroy(key->subkeys); + flb_free(key->subkeys); + } + flb_free(key); +} + +void flb_sp_cmd_gb_key_del(struct flb_sp_cmd_gb_key *key) +{ + if (key->name) { + flb_sds_destroy(key->name); + } + if (key->subkeys) { + flb_slist_destroy(key->subkeys); + flb_free(key->subkeys); + } + flb_free(key); +} + +struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func, + const char *key_name, + const char *key_alias) +{ + char tmp_alias[256]; + int s; + int ret; + int len; + int aggr_func = 0; + int time_func = 0; + int record_func = 0; + char *tmp; + struct mk_list *head; + struct flb_sp_cmd_key *key; + struct flb_slist_entry *entry; + + /* aggregation function ? */ + if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) { + aggr_func = func; + } + else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) { + /* Time function */ + time_func = func; + } + else if (func >= FLB_SP_RECORD_TAG && func <= FLB_SP_RECORD_TIME) { + /* Record function */ + record_func = func; + } + + key = flb_calloc(1, sizeof(struct flb_sp_cmd_key)); + if (!key) { + flb_errno(); + cmd->status = FLB_SP_ERROR; + return NULL; + } + key->gb_key = NULL; + key->subkeys = NULL; + + /* key name and aliases works when the selection is not a wildcard */ + if (key_name) { + key->name = flb_sds_create(key_name); + if (!key->name) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + } + else { + /* + * Wildcard key only allowed on: + * - no aggregation mode (left side / first entry) + * - aggregation using COUNT(*) + */ + if (mk_list_size(&cmd->keys) > 0 && aggr_func == 0 && + record_func == 0 && time_func == 0) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + } + + if (key_alias) { + key->alias = flb_sds_create(key_alias); + if (!key->alias) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + } + + /* Aggregation function */ + if (aggr_func > 0) { + key->aggr_func = aggr_func; + } + else if (time_func > 0) { + key->time_func = time_func; + } + else if (record_func > 0) { + key->record_func = record_func; + } + + /* Lookup for any subkeys in the temporary list */ + if (mk_list_size(cmd->tmp_subkeys) > 0) { + ret = swap_tmp_subkeys(&key->subkeys, cmd); + if (ret == -1) { + flb_sp_cmd_key_del(key); + cmd->status = FLB_SP_ERROR; + return NULL; + } + + /* Compose a name key that include listed sub keys */ + if (!key->alias) { + s = flb_sds_len(key->name) + (16 * mk_list_size(key->subkeys)); + key->alias = flb_sds_create_size(s); + if (!key->alias) { + flb_sp_cmd_key_del(key); + return NULL; + } + + tmp = flb_sds_cat(key->alias, key->name, flb_sds_len(key->name)); + if (tmp != key->alias) { + key->alias = tmp; + } + + mk_list_foreach(head, key->subkeys) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + + /* prefix */ + tmp = flb_sds_cat(key->alias, "['", 2); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + + /* selected key name */ + tmp = flb_sds_cat(key->alias, + entry->str, flb_sds_len(entry->str)); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + + /* suffix */ + tmp = flb_sds_cat(key->alias, "']", 2); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + } + + if (aggr_func) { + len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)", + aggregate_func_string[aggr_func - 1], key->alias); + + tmp = flb_sds_copy(key->alias, tmp_alias, len); + if (tmp) { + key->alias = tmp; + } + else { + flb_sp_cmd_key_del(key); + return NULL; + } + } + } + } + else if (aggr_func && !key->alias) { + if (key->name) { + len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(%s)", + aggregate_func_string[aggr_func - 1], key->name); + } else { + len = snprintf(tmp_alias, sizeof(tmp_alias) - 1, "%s(*)", + aggregate_func_string[aggr_func - 1]); + } + + key->alias = flb_sds_create_len(tmp_alias, len); + if (!key->alias) { + flb_sp_cmd_key_del(key); + return NULL; + } + } + + return key; +} + +int flb_sp_cmd_key_add(struct flb_sp_cmd *cmd, int func, const char *key_name) +{ + struct flb_sp_cmd_key *key; + + key = flb_sp_key_create(cmd, func, key_name, cmd->alias); + + if (!key) { + return -1; + } + + mk_list_add(&key->_head, &cmd->keys); + + /* free key alias and set cmd->alias to null */ + if (cmd->alias) { + flb_free(cmd->alias); + cmd->alias = NULL; + } + + return 0; +} + +void flb_sp_cmd_alias_add(struct flb_sp_cmd *cmd, const char *key_alias) +{ + cmd->alias = (char *) key_alias; +} + +int flb_sp_cmd_source(struct flb_sp_cmd *cmd, int type, const char *source) +{ + cmd->source_type = type; + cmd->source_name = flb_sds_create(source); + if (!cmd->source_name) { + flb_errno(); + return -1; + } + + return 0; +} + +void flb_sp_cmd_dump(struct flb_sp_cmd *cmd) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_sp_cmd_key *key; + + /* Lookup keys */ + printf("== KEYS ==\n"); + mk_list_foreach_safe(head, tmp, &cmd->keys) { + key = mk_list_entry(head, struct flb_sp_cmd_key, _head); + printf("- '%s'\n", key->name); + } + printf("== SOURCE ==\n"); + if (cmd->source_type == FLB_SP_STREAM) { + printf("stream => "); + } + else if (cmd->source_type == FLB_SP_TAG) { + printf("tag match => "); + } + + printf("'%s'\n", cmd->source_name); +} + +struct flb_sp_cmd *flb_sp_cmd_create(const char *sql) +{ + int ret; + yyscan_t scanner; + YY_BUFFER_STATE buf; + struct flb_sp_cmd *cmd; + + /* create context */ + cmd = flb_calloc(1, sizeof(struct flb_sp_cmd)); + if (!cmd) { + flb_errno(); + return NULL; + } + cmd->status = FLB_SP_OK; + cmd->type = FLB_SP_SELECT; + + mk_list_init(&cmd->stream_props); + mk_list_init(&cmd->keys); + + /* Condition linked list (we use them to free resources) */ + mk_list_init(&cmd->cond_list); + mk_list_init(&cmd->gb_keys); + + /* Allocate temporary list and initialize */ + cmd->tmp_subkeys = flb_malloc(sizeof(struct mk_list)); + if (!cmd->tmp_subkeys) { + flb_errno(); + flb_free(cmd); + return NULL; + } + flb_slist_create(cmd->tmp_subkeys); + + /* Flex/Bison work */ + flb_sp_lex_init(&scanner); + buf = flb_sp__scan_string(sql, scanner); + + ret = flb_sp_parse(cmd, sql, scanner); + + flb_sp__delete_buffer(buf, scanner); + flb_sp_lex_destroy(scanner); + + if (ret != 0) { + flb_sp_cmd_destroy(cmd); + return NULL; + } + + return cmd; +} + +int flb_sp_cmd_stream_new(struct flb_sp_cmd *cmd, const char *stream_name) +{ + cmd->stream_name = flb_sds_create(stream_name); + if (!cmd->stream_name) { + return -1; + } + + cmd->type = FLB_SP_CREATE_STREAM; + return 0; +} + +int flb_sp_cmd_snapshot_new(struct flb_sp_cmd *cmd, const char *snapshot_name) +{ + const char *tmp; + + cmd->stream_name = flb_sds_create(snapshot_name); + if (!cmd->stream_name) { + return -1; + } + + tmp = flb_sp_cmd_stream_prop_get(cmd, "tag"); + if (!tmp) { + cmd->status = FLB_SP_ERROR; + flb_error("[sp] tag for snapshot is required. Add WITH(tag = <TAG>) to the snapshot %s", + snapshot_name); + return -1; + } + + cmd->type = FLB_SP_CREATE_SNAPSHOT; + + return 0; +} + +int flb_sp_cmd_snapshot_flush_new(struct flb_sp_cmd *cmd, const char *snapshot_name) +{ + cmd->stream_name = flb_sds_cat(flb_sds_create("__flush_"), + snapshot_name, strlen(snapshot_name)); + + if (!cmd->stream_name) { + return -1; + } + + cmd->type = FLB_SP_FLUSH_SNAPSHOT; + + return 0; +} + +int flb_sp_cmd_stream_prop_add(struct flb_sp_cmd *cmd, const char *key, const char *val) +{ + struct flb_sp_cmd_prop *prop; + + prop = flb_malloc(sizeof(struct flb_sp_cmd_prop)); + if (!prop) { + flb_errno(); + return -1; + } + + prop->key = flb_sds_create(key); + if (!prop->key) { + flb_free(prop); + return -1; + } + + prop->val = flb_sds_create(val); + if (!prop->val) { + flb_free(prop->key); + flb_free(prop); + return -1; + } + + mk_list_add(&prop->_head, &cmd->stream_props); + return 0; +} + +void flb_sp_cmd_stream_prop_del(struct flb_sp_cmd_prop *prop) +{ + if (prop->key) { + flb_sds_destroy(prop->key); + } + if (prop->val) { + flb_sds_destroy(prop->val); + } + flb_free(prop); +} + +const char *flb_sp_cmd_stream_prop_get(struct flb_sp_cmd *cmd, const char *key) +{ + int len; + struct mk_list *head; + struct flb_sp_cmd_prop *prop; + + if (!key) { + return NULL; + } + len = strlen(key); + + mk_list_foreach(head, &cmd->stream_props) { + prop = mk_list_entry(head, struct flb_sp_cmd_prop, _head); + if (flb_sds_len(prop->key) != len) { + continue; + } + + if (strcmp(prop->key, key) == 0) { + return prop->val; + } + } + + return NULL; +} + +/* WINDOW functions */ + +int flb_sp_cmd_window(struct flb_sp_cmd *cmd, + int window_type, int size, int time_unit, + int advance_by_size, int advance_by_time_unit) +{ + cmd->window.type = window_type; + + switch (time_unit) { + case FLB_SP_TIME_SECOND: + cmd->window.size = (time_t) size; + break; + case FLB_SP_TIME_MINUTE: + cmd->window.size = (time_t) size * 60; + break; + case FLB_SP_TIME_HOUR: + cmd->window.size = (time_t) size * 3600; + break; + } + + if (window_type == FLB_SP_WINDOW_HOPPING) { + switch (advance_by_time_unit) { + case FLB_SP_TIME_SECOND: + cmd->window.advance_by = (time_t) advance_by_size; + break; + case FLB_SP_TIME_MINUTE: + cmd->window.advance_by = (time_t) advance_by_size * 60; + break; + case FLB_SP_TIME_HOUR: + cmd->window.advance_by = (time_t) advance_by_size * 3600; + break; + } + + if (cmd->window.advance_by >= cmd->window.size) { + return -1; + } + } + + return 0; +} + +/* WHERE <condition> functions */ + +struct flb_exp *flb_sp_cmd_operation(struct flb_sp_cmd *cmd, + struct flb_exp *e1, struct flb_exp *e2, + int operation) +{ + struct flb_exp_op *expression; + + expression = flb_malloc(sizeof(struct flb_exp_op)); + if (!expression) { + flb_errno(); + return NULL; + } + + expression->type = FLB_LOGICAL_OP; + expression->left = e1; + expression->right = e2; + expression->operation = operation; + mk_list_add(&expression->_head, &cmd->cond_list); + + return (struct flb_exp *) expression; +} + +struct flb_exp *flb_sp_cmd_comparison(struct flb_sp_cmd *cmd, + struct flb_exp *key, struct flb_exp *val, + int operation) +{ + struct flb_exp_op *expression; + + expression = flb_malloc(sizeof(struct flb_exp_op)); + if (!expression) { + flb_errno(); + return NULL; + } + + expression->type = FLB_LOGICAL_OP; + expression->left = (struct flb_exp *) key; + expression->right = (struct flb_exp *) val; + expression->operation = operation; + mk_list_add(&expression->_head, &cmd->cond_list); + + return (struct flb_exp *) expression; +} + +struct flb_exp *flb_sp_cmd_condition_key(struct flb_sp_cmd *cmd, + const char *identifier) +{ + int ret; + struct flb_exp_key *key; + + key = flb_calloc(1, sizeof(struct flb_exp_key)); + if (!key) { + flb_errno(); + return NULL; + } + + key->type = FLB_EXP_KEY; + key->name = flb_sds_create(identifier); + mk_list_add(&key->_head, &cmd->cond_list); + + if (mk_list_size(cmd->tmp_subkeys) > 0) { + ret = swap_tmp_subkeys(&key->subkeys, cmd); + if (ret == -1) { + flb_sds_destroy(key->name); + mk_list_del(&key->_head); + flb_free(key); + return NULL; + } + } + + return (struct flb_exp *) key; +} + +struct flb_exp *flb_sp_cmd_condition_integer(struct flb_sp_cmd *cmd, + int integer) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_INT; + val->val.i64 = integer; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_float(struct flb_sp_cmd *cmd, float fval) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_FLOAT; + val->val.f64 = fval; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_string(struct flb_sp_cmd *cmd, + const char *string) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_STRING; + val->val.string = flb_sds_create(string); + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_boolean(struct flb_sp_cmd *cmd, + bool boolean) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_BOOL; + val->val.boolean = boolean; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_cmd_condition_null(struct flb_sp_cmd *cmd) +{ + struct flb_exp_val *val; + + val = flb_malloc(sizeof(struct flb_exp_val)); + if (!val) { + flb_errno(); + return NULL; + } + + val->type = FLB_EXP_NULL; + mk_list_add(&val->_head, &cmd->cond_list); + + return (struct flb_exp *) val; +} + +struct flb_exp *flb_sp_record_function_add(struct flb_sp_cmd *cmd, + char *name, struct flb_exp *param) +{ + char *rf_name; + int i; + struct flb_exp_func *func; + + for (i = 0; i < RECORD_FUNCTIONS_SIZE; i++) + { + rf_name = record_functions[i]; + if (strncmp(rf_name, name, strlen(rf_name)) == 0) + { + func = flb_calloc(1, sizeof(struct flb_exp_func)); + if (!func) { + flb_errno(); + return NULL; + } + + func->type = FLB_EXP_FUNC; + func->name = flb_sds_create(name); + func->cb_func = record_functions_ptr[i]; + func->param = param; + + mk_list_add(&func->_head, &cmd->cond_list); + + return (struct flb_exp *) func; + } + } + + return NULL; +} + +void flb_sp_cmd_condition_add(struct flb_sp_cmd *cmd, struct flb_exp *e) + +{ + cmd->condition = e; +} + +int flb_sp_cmd_gb_key_add(struct flb_sp_cmd *cmd, const char *key) +{ + int ret; + struct flb_sp_cmd_gb_key *gb_key; + + gb_key = flb_calloc(1, sizeof(struct flb_sp_cmd_gb_key)); + if (!gb_key) { + flb_errno(); + return -1; + } + + gb_key->name = flb_sds_create(key); + if (!gb_key->name) { + flb_free(gb_key); + return -1; + } + + gb_key->id = mk_list_size(&cmd->gb_keys); + mk_list_add(&gb_key->_head, &cmd->gb_keys); + + /* Lookup for any subkeys in the temporary list */ + if (mk_list_size(cmd->tmp_subkeys) > 0) { + ret = swap_tmp_subkeys(&gb_key->subkeys, cmd); + if (ret == -1) { + flb_sds_destroy(gb_key->name); + mk_list_del(&gb_key->_head); + flb_free(gb_key); + return -1; + } + } + + return 0; +} + +void flb_sp_cmd_condition_del(struct flb_sp_cmd *cmd) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_exp *exp; + struct flb_exp_key *key; + struct flb_exp_val *val; + struct flb_exp_func *func; + + mk_list_foreach_safe(head, tmp, &cmd->cond_list) { + exp = mk_list_entry(head, struct flb_exp, _head); + if (exp->type == FLB_EXP_KEY) { + key = (struct flb_exp_key *) exp; + flb_sds_destroy(key->name); + if (key->subkeys) { + flb_slist_destroy(key->subkeys); + flb_free(key->subkeys); + } + } + else if (exp->type == FLB_EXP_STRING) { + val = (struct flb_exp_val *) exp; + flb_sds_destroy(val->val.string); + } + else if (exp->type == FLB_EXP_FUNC) { + func = (struct flb_exp_func *) exp; + flb_sds_destroy(func->name); + } + + mk_list_del(&exp->_head); + flb_free(exp); + } +} + +void flb_sp_cmd_limit_add(struct flb_sp_cmd *cmd, int limit) +{ + cmd->limit = limit; +} + +int flb_sp_cmd_timeseries_forecast(struct flb_sp_cmd *cmd, int func, const char *key_name, int seconds) +{ + struct flb_sp_cmd_key *key; + + key = flb_sp_key_create(cmd, func, key_name, cmd->alias); + + if (!key) { + return -1; + } + + mk_list_add(&key->_head, &cmd->keys); + + key->constant = seconds; + + /* free key alias and set cmd->alias to null */ + if (cmd->alias) { + flb_free(cmd->alias); + cmd->alias = NULL; + } + + return 0; +} diff --git a/fluent-bit/src/stream_processor/parser/sql.l b/fluent-bit/src/stream_processor/parser/sql.l new file mode 100644 index 000000000..91e5398e1 --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/sql.l @@ -0,0 +1,190 @@ +%option prefix="flb_sp_" +%option caseless +%option 8bit reentrant bison-bridge +%option warn noyywrap nodefault +%option nounput +%option noinput + + +%{ +#include <stdio.h> +#include <stdbool.h> +#include <ctype.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_log.h> +#include "sql_parser.h" +#include <fluent-bit/stream_processor/flb_sp_parser.h> + +static inline char *remove_dup_qoutes(const char *s, size_t n) +{ + char *str; + int dups; + int i, j; + + dups = 0; + for (i = 0; i < n; i++) { + if (s[i] == '\'') { + dups++; + i++; + } + } + + str = (char *) flb_malloc(n - dups + 1); + if (!str) { + return NULL; + } + + j = 0; + for (i = 0; i < n; i++, j++) { + if (s[i] == '\'') { + str[j] = '\''; + i++; + } else { + str[j] = s[i]; + } + } + str[j] = '\0'; + + return str; +} + +char* to_upper(char* token, size_t len) +{ + int i; + char* token_; + + token_ = flb_malloc(len * sizeof(char) + 1); + + for (i = 0; i < len; i++) { + token_[i] = toupper(token[i]); + } + + token_[len] = '\0'; + return token_; +} + +int func_to_code(char* name, size_t len) +{ + int code; + char* name_; + + name_ = to_upper(name, len); + code = -1; + + if (!strcmp(name_, "AVG")) { + code = FLB_SP_AVG; + } else if (!strcmp(name_, "SUM")) { + code = FLB_SP_SUM; + } else if (!strcmp(name_, "COUNT")) { + code = FLB_SP_COUNT; + } else if (!strcmp(name_, "MIN")) { + code = FLB_SP_MIN; + } else if (!strcmp(name_, "MAX")) { + code = FLB_SP_MAX; + } else if (!strcmp(name_, "TIMESERIES_FORECAST")) { + code = FLB_SP_FORECAST; + } else if (!strcmp(name_, "NOW")) { + code = FLB_SP_NOW; + } else if (!strcmp(name_, "UNIX_TIMESTAMP")) { + code = FLB_SP_UNIX_TIMESTAMP; + } else if (!strcmp(name_, "RECORD_TAG")) { + code = FLB_SP_RECORD_TAG; + } else if (!strcmp(name_, "RECORD_TIME")) { + code = FLB_SP_RECORD_TIME; + } + + flb_free(name_); + return code; +} + +%} + +%% + + /* SQL */ +CREATE return CREATE; +FLUSH return FLUSH; +STREAM return STREAM; +SNAPSHOT return SNAPSHOT; +WITH return WITH; +SELECT return SELECT; +AS return AS; +FROM return FROM; +STREAM: return FROM_STREAM; +TAG: return FROM_TAG; +WHERE return WHERE; +AND return AND; +OR return OR; +NOT return NOT; +WINDOW return WINDOW; +"GROUP BY" return GROUP_BY; +LIMIT return LIMIT; + +IS return IS; +NULL return NUL; + + /* Aggregation Functions */ +SUM {yylval->integer = func_to_code(yytext, yyleng); return SUM;} +AVG {yylval->integer = func_to_code(yytext, yyleng); return AVG;} +COUNT {yylval->integer = func_to_code(yytext, yyleng); return COUNT;} +MIN {yylval->integer = func_to_code(yytext, yyleng); return MIN;} +MAX {yylval->integer = func_to_code(yytext, yyleng); return MAX;} +TIMESERIES_FORECAST {yylval->integer = func_to_code(yytext, yyleng); return TIMESERIES_FORECAST;}; + + /* Record Functions */ +@RECORD return RECORD; +CONTAINS return CONTAINS; +TIME return TIME; + + + /* Window Types */ +TUMBLING return TUMBLING; +HOPPING return HOPPING; +"ADVANCE BY" return ADVANCE_BY; + + /* Time */ +HOUR return HOUR; +MINUTE return MINUTE; +SECOND return SECOND; + + /* Date / Time Functions */ +NOW {yylval->integer = func_to_code(yytext, yyleng); return NOW;} +UNIX_TIMESTAMP {yylval->integer = func_to_code(yytext, yyleng); return UNIX_TIMESTAMP;} + + /* Record information */ +RECORD_TAG {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TAG;} +RECORD_TIME {yylval->integer = func_to_code(yytext, yyleng); return RECORD_TIME;} + +"true" { yylval->boolean = true; return BOOLTYPE; }; +"false" { yylval->boolean = false; return BOOLTYPE; }; + +-?[1-9][0-9]*|0 { yylval->integer = atoi(yytext); return INTEGER; } +(-?[1-9][0-9]*|0)\.[0-9]+ { yylval->fval = atof(yytext); return FLOATING; } +\'([^']|'{2})*\' { yylval->string = remove_dup_qoutes(yytext + 1, yyleng - 2); return STRING; } + +[_A-Za-z][A-Za-z0-9_.]* { yylval->string = flb_strdup(yytext); return IDENTIFIER; } + +"*" | +"," | +"=" | +"(" | +")" | +"[" | +"]" | +"." | +";" { return yytext[0]; } + +"!=" return NEQ; +"<>" return NEQ; +"<" return LT; +"<=" return LTE; +">" return GT; +">=" return GTE; + +\' return QUOTE; +\n +[ \t]+ /* ignore whitespace */; + +. flb_error("[sp] bad input character '%s' at line %d", yytext, yylineno); + +%% diff --git a/fluent-bit/src/stream_processor/parser/sql.y b/fluent-bit/src/stream_processor/parser/sql.y new file mode 100644 index 000000000..866f95cc0 --- /dev/null +++ b/fluent-bit/src/stream_processor/parser/sql.y @@ -0,0 +1,437 @@ +%name-prefix="flb_sp_" // replace with %define api.prefix {flb_sp_} +%define api.pure full +%define parse.error verbose +%parse-param { struct flb_sp_cmd *cmd }; +%parse-param { const char *query }; +%lex-param { void *scanner } +%parse-param { void *scanner } + +%{ // definition section (prologue) +#include <stdio.h> +#include <stdlib.h> +#include <ctype.h> + +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/stream_processor/flb_sp_parser.h> + +#include "sql_parser.h" +#include "sql_lex.h" + +extern int yylex(); + +void yyerror(struct flb_sp_cmd *cmd, const char *query, void *scanner, const char *str) +{ + flb_error("[sp] %s at '%s'", str, query); +} + +%} /* EOF C code */ + +/* Bison declarations */ +/* Known Tokens (refer to sql.l) */ + +/* Keywords */ +%token IDENTIFIER QUOTE + +/* Basic keywords for statements */ +%token CREATE STREAM SNAPSHOT FLUSH WITH SELECT AS FROM FROM_STREAM FROM_TAG +%token WHERE WINDOW GROUP_BY LIMIT + +/* Null keywords */ +%token IS NUL + +/* Aggregation functions */ +%token AVG SUM COUNT MAX MIN TIMESERIES_FORECAST + +/* Record functions */ +%token RECORD CONTAINS TIME + +/* Time functions */ +%token NOW UNIX_TIMESTAMP + + /* Record functions */ +%token RECORD_TAG RECORD_TIME + +/* Value types */ +%token INTEGER FLOATING STRING BOOLTYPE + +/* Logical operation tokens */ +%token AND OR NOT NEQ LT LTE GT GTE + +/* Time tokens */ +%token HOUR MINUTE SECOND + +/* Window tokens */ +%token TUMBLING HOPPING ADVANCE_BY + +/* Union and field types */ +%union +{ + bool boolean; + int integer; + float fval; + char *string; + struct flb_sp_cmd *cmd; + struct flb_exp *expression; +} + +%type <boolean> BOOLTYPE +%type <integer> INTEGER +%type <fval> FLOATING +%type <string> IDENTIFIER +%type <string> STRING +%type <string> record_keys +%type <string> record_key +%type <string> prop_key +%type <string> prop_val +%type <expression> condition +%type <expression> comparison +%type <expression> key +%type <expression> record_func +%type <expression> value +%type <expression> null +%type <integer> time + +%type <integer> time_record_func +%type <integer> NOW UNIX_TIMESTAMP RECORD_TAG RECORD_TIME + +%type <integer> aggregate_func +%type <integer> COUNT AVG SUM MAX MIN TIMESERIES_FORECAST + + +%destructor { flb_free ($$); } IDENTIFIER + +%% /* rules section */ + +statements: create | select + +/* Parser for 'CREATE STREAM' statement */ +create: + CREATE STREAM IDENTIFIER AS select + { + flb_sp_cmd_stream_new(cmd, $3); + flb_free($3); + } + | + CREATE STREAM IDENTIFIER WITH '(' properties ')' AS select + { + flb_sp_cmd_stream_new(cmd, $3); + flb_free($3); + } + | + CREATE SNAPSHOT IDENTIFIER AS SELECT '*' FROM source limit ';' + { + flb_sp_cmd_snapshot_new(cmd, $3); + flb_free($3); + } + | + CREATE SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source limit ';' + { + flb_sp_cmd_snapshot_new(cmd, $3); + flb_free($3); + } + | + FLUSH SNAPSHOT IDENTIFIER AS SELECT '*' FROM source where ';' + { + flb_sp_cmd_snapshot_flush_new(cmd, $3); + flb_free($3); + } + | + FLUSH SNAPSHOT IDENTIFIER WITH '(' properties ')' AS SELECT '*' FROM source where ';' + { + flb_sp_cmd_snapshot_flush_new(cmd, $3); + flb_free($3); + } + properties: property + | + properties ',' property + property: prop_key '=' prop_val + { + flb_sp_cmd_stream_prop_add(cmd, $1, $3); + flb_free($1); + flb_free($3); + } + prop_key: IDENTIFIER + prop_val: STRING + +/* Parser for 'SELECT' statement */ +select: SELECT keys FROM source window where groupby limit ';' + { + cmd->type = FLB_SP_SELECT; + } + keys: record_keys + record_keys: record_key + | + record_keys ',' record_key + record_key: '*' + { + flb_sp_cmd_key_add(cmd, -1, NULL); + } + | + IDENTIFIER key_alias + { + flb_sp_cmd_key_add(cmd, -1, $1); + flb_free($1); + } + | + IDENTIFIER record_subkey key_alias + { + flb_sp_cmd_key_add(cmd, -1, $1); + flb_free($1); + } + | + COUNT '(' '*' ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, NULL); + } + | + COUNT '(' IDENTIFIER ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + COUNT '(' IDENTIFIER record_subkey ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + aggregate_func '(' IDENTIFIER ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + aggregate_func '(' IDENTIFIER record_subkey ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, $3); + flb_free($3); + } + | + TIMESERIES_FORECAST '(' IDENTIFIER ',' INTEGER ')' key_alias + { + flb_sp_cmd_timeseries_forecast(cmd, $1, $3, $5); + flb_free($3); + } + | + time_record_func '(' ')' key_alias + { + flb_sp_cmd_key_add(cmd, $1, NULL); + } + aggregate_func: + AVG | SUM | MAX | MIN + time_record_func: + NOW | UNIX_TIMESTAMP | RECORD_TAG | RECORD_TIME + key_alias: + %empty + | + AS IDENTIFIER + { + flb_sp_cmd_alias_add(cmd, $2); + } + record_subkey: '[' STRING ']' + { + flb_slist_add(cmd->tmp_subkeys, $2); + flb_free($2); + } + | + record_subkey record_subkey + source: FROM_STREAM IDENTIFIER + { + flb_sp_cmd_source(cmd, FLB_SP_STREAM, $2); + flb_free($2); + } + | + FROM_TAG STRING + { + flb_sp_cmd_source(cmd, FLB_SP_TAG, $2); + flb_free($2); + } + window: %empty + | + WINDOW window_spec + where: %empty + | + WHERE condition + { + flb_sp_cmd_condition_add(cmd, $2); + } + groupby: %empty + | + GROUP_BY gb_keys + limit: %empty + | + LIMIT INTEGER + { + flb_sp_cmd_limit_add(cmd, $2); + } + window_spec: + TUMBLING '(' INTEGER time ')' + { + flb_sp_cmd_window(cmd, FLB_SP_WINDOW_TUMBLING, $3, $4, 0, 0); + } + | + HOPPING '(' INTEGER time ',' ADVANCE_BY INTEGER time ')' + { + flb_sp_cmd_window(cmd, FLB_SP_WINDOW_HOPPING, $3, $4, $7, $8); + } + condition: comparison + | + key + { + $$ = flb_sp_cmd_operation(cmd, $1, NULL, FLB_EXP_OR); + } + | + value + { + $$ = flb_sp_cmd_operation(cmd, NULL, $1, FLB_EXP_OR); + } + | + '(' condition ')' + { + $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_PAR); + } + | + NOT condition + { + $$ = flb_sp_cmd_operation(cmd, $2, NULL, FLB_EXP_NOT); + } + | + condition AND condition + { + $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_AND); + } + | + condition OR condition + { + $$ = flb_sp_cmd_operation(cmd, $1, $3, FLB_EXP_OR); + } + comparison: + key IS null + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ); + } + | + key IS NOT null + { + $$ = flb_sp_cmd_operation(cmd, + flb_sp_cmd_comparison(cmd, $1, $4, FLB_EXP_EQ), + NULL, FLB_EXP_NOT); + } + | + record_func + { + $$ = flb_sp_cmd_comparison(cmd, + $1, + flb_sp_cmd_condition_boolean(cmd, true), + FLB_EXP_EQ); + } + | + record_func '=' value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ); + } + | + record_func NEQ value + { + $$ = flb_sp_cmd_operation(cmd, + flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_EQ), + NULL, FLB_EXP_NOT) + ; + } + | + record_func LT value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LT); + } + | + record_func LTE value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_LTE); + } + | + record_func GT value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GT); + } + | + record_func GTE value + { + $$ = flb_sp_cmd_comparison(cmd, $1, $3, FLB_EXP_GTE); + } + record_func: key /* Similar to an identity function */ + | + RECORD '.' CONTAINS '(' key ')' + { + $$ = flb_sp_record_function_add(cmd, "contains", $5); + } + | + RECORD '.' TIME '(' ')' + { + $$ = flb_sp_record_function_add(cmd, "time", NULL); + } + key: IDENTIFIER + { + $$ = flb_sp_cmd_condition_key(cmd, $1); + flb_free($1); + } + | + IDENTIFIER record_subkey + { + $$ = flb_sp_cmd_condition_key(cmd, $1); + flb_free($1); + } + value: INTEGER + { + $$ = flb_sp_cmd_condition_integer(cmd, $1); + } + | + FLOATING + { + $$ = flb_sp_cmd_condition_float(cmd, $1); + } + | + STRING + { + $$ = flb_sp_cmd_condition_string(cmd, $1); + flb_free($1); + } + | + BOOLTYPE + { + $$ = flb_sp_cmd_condition_boolean(cmd, $1); + } + null: NUL + { + $$ = flb_sp_cmd_condition_null(cmd); + } + time: SECOND + { + $$ = FLB_SP_TIME_SECOND; + } + | + MINUTE + { + $$ = FLB_SP_TIME_MINUTE; + } + | + HOUR + { + $$ = FLB_SP_TIME_HOUR; + } + gb_keys: gb_key + | + gb_key ',' gb_keys + gb_key: IDENTIFIER + { + flb_sp_cmd_gb_key_add(cmd, $1); + flb_free($1); + } + | + IDENTIFIER record_subkey + { + flb_sp_cmd_gb_key_add(cmd, $1); + flb_free($1); + } + ; |