summaryrefslogtreecommitdiffstats
path: root/src/s3select/include/s3select.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/s3select/include/s3select.h')
-rw-r--r--src/s3select/include/s3select.h3153
1 files changed, 3153 insertions, 0 deletions
diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h
new file mode 100644
index 000000000..3ac111351
--- /dev/null
+++ b/src/s3select/include/s3select.h
@@ -0,0 +1,3153 @@
+#ifndef __S3SELECT__
+#define __S3SELECT__
+#define BOOST_SPIRIT_THREADSAFE
+#define CSV_IO_NO_THREAD
+
+#pragma once
+#define BOOST_BIND_GLOBAL_PLACEHOLDERS
+#include <boost/spirit/include/classic_core.hpp>
+#include <boost/algorithm/string.hpp>
+#include <iostream>
+#include <string>
+#include <list>
+#include <deque>
+#include "s3select_oper.h"
+#include "s3select_functions.h"
+#include "s3select_csv_parser.h"
+#include "s3select_json_parser.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <functional>
+
+#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}
+
+namespace s3selectEngine
+{
+
+/// AST builder
+
+class s3select_projections
+{
+
+private:
+ std::vector<base_statement*> m_projections;
+
+public:
+
+ std::vector<base_statement*>* get()
+ {
+ return &m_projections;
+ }
+
+};
+
+static s3select_reserved_word g_s3select_reserve_word;//read-only
+
+struct actionQ
+{
+// upon parser is accepting a token (lets say some number),
+// it push it into dedicated queue, later those tokens are poped out to build some "higher" contruct (lets say 1 + 2)
+// those containers are used only for parsing phase and not for runtime.
+
+ std::vector<mulldiv_operation::muldiv_t> muldivQ;
+ std::vector<addsub_operation::addsub_op_t> addsubQ;
+ std::vector<arithmetic_operand::cmp_t> arithmetic_compareQ;
+ std::vector<logical_operand::oplog_t> logical_compareQ;
+ std::vector<base_statement*> exprQ;
+ std::vector<base_statement*> funcQ;
+ std::vector<base_statement*> whenThenQ;
+ std::vector<base_statement*> inPredicateQ;
+ base_statement* inMainArg;
+ std::vector<std::string> dataTypeQ;
+ std::vector<std::string> trimTypeQ;
+ std::vector<std::string> datePartQ;
+ projection_alias alias_map;
+ std::string from_clause;
+ std::vector<std::string> json_from_clause;
+ bool limit_op;
+ unsigned long limit;
+ std::string column_prefix;
+ std::string table_alias;
+ s3select_projections projections;
+
+ bool projection_or_predicate_state; //true->projection false->predicate(where-clause statement)
+ std::vector<base_statement*> predicate_columns;
+ std::vector<base_statement*> projections_columns;
+ base_statement* first_when_then_expr;
+
+ std::string json_array_name; // _1.a[ ] json_array_name = "a"; upon parser is scanning a correct json-path; json_array_name will contain the array name.
+ std::string json_object_name; // _1.b json_object_name = "b" ; upon parser is scanning a correct json-path; json_object_name will contain the object name.
+ std::deque<size_t> json_array_index_number; // _1.a.c[ some integer number >=0 ]; upon parser is scanning a correct json-path; json_array_index_number will contain the array index.
+ // or in the case of multidimensional contain seiries of index number
+
+ json_variable_access json_var_md;
+
+ std::vector<std::pair<json_variable_access*,size_t>> json_statement_variables_match_expression;//contains all statement variables and their search-expression for locating the correct values in input document
+
+ actionQ(): inMainArg(0),from_clause("##"),limit_op(false),column_prefix("##"),table_alias("##"),projection_or_predicate_state(true),first_when_then_expr(nullptr){}
+
+ std::map<const void*,std::vector<const char*> *> x_map;
+
+ ~actionQ()
+ {
+ for(auto m : x_map)
+ delete m.second;
+ }
+
+ bool is_already_scanned(const void *th,const char *a)
+ {
+ //purpose: caller get indication in the case a specific builder is scan more than once the same text(pointer)
+ auto t = x_map.find(th);
+
+ if(t == x_map.end())
+ {
+ auto v = new std::vector<const char*>;
+ x_map.insert(std::pair<const void*,std::vector<const char*> *>(th,v));
+ v->push_back(a);
+ }
+ else
+ {
+ for(auto& c : *(t->second))
+ {
+ if( strcmp(c,a) == 0)
+ return true;
+ }
+ t->second->push_back(a);
+ }
+ return false;
+ }
+
+};
+
+class s3select;
+
+struct base_ast_builder
+{
+ void operator()(s3select* self, const char* a, const char* b) const;
+
+ virtual void builder(s3select* self, const char* a, const char* b) const = 0;
+
+ virtual ~base_ast_builder() = default;
+};
+
+struct push_from_clause : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_from_clause g_push_from_clause;
+
+struct push_json_from_clause : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_json_from_clause g_push_json_from_clause;
+
+struct push_limit_clause : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_limit_clause g_push_limit_clause;
+
+struct push_number : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_number g_push_number;
+
+struct push_float_number : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_float_number g_push_float_number;
+
+struct push_string : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_string g_push_string;
+
+struct push_variable : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_variable g_push_variable;
+
+struct push_json_variable : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_json_variable g_push_json_variable;
+
+/////////////////////////arithmetic unit /////////////////
+struct push_addsub : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_addsub g_push_addsub;
+
+struct push_mulop : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_mulop g_push_mulop;
+
+struct push_addsub_binop : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_addsub_binop g_push_addsub_binop;
+
+struct push_mulldiv_binop : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_mulldiv_binop g_push_mulldiv_binop;
+
+struct push_function_arg : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_function_arg g_push_function_arg;
+
+struct push_function_name : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_function_name g_push_function_name;
+
+struct push_function_expr : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_function_expr g_push_function_expr;
+
+struct push_cast_expr : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_cast_expr g_push_cast_expr;
+
+struct push_cast_decimal_expr : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_cast_decimal_expr g_push_cast_decimal_expr;
+
+struct push_decimal_operator : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_decimal_operator g_push_decimal_operator;
+
+struct push_data_type : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_data_type g_push_data_type;
+
+////////////////////// logical unit ////////////////////////
+
+struct push_compare_operator : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+
+};
+static push_compare_operator g_push_compare_operator;
+
+struct push_logical_operator : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+
+};
+static push_logical_operator g_push_logical_operator;
+
+struct push_arithmetic_predicate : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+
+};
+static push_arithmetic_predicate g_push_arithmetic_predicate;
+
+struct push_logical_predicate : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_logical_predicate g_push_logical_predicate;
+
+struct push_negation : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_negation g_push_negation;
+
+struct push_column_pos : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_column_pos g_push_column_pos;
+
+struct push_projection : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_projection g_push_projection;
+
+struct push_alias_projection : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_alias_projection g_push_alias_projection;
+
+struct push_between_filter : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_between_filter g_push_between_filter;
+
+struct push_not_between_filter : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_not_between_filter g_push_not_between_filter;
+
+struct push_in_predicate : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_in_predicate g_push_in_predicate;
+
+struct push_in_predicate_arguments : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_in_predicate_arguments g_push_in_predicate_arguments;
+
+struct push_in_predicate_first_arg : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_in_predicate_first_arg g_push_in_predicate_first_arg;
+
+struct push_like_predicate_escape : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_like_predicate_escape g_push_like_predicate_escape;
+
+struct push_like_predicate_no_escape : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_like_predicate_no_escape g_push_like_predicate_no_escape;
+
+struct push_is_null_predicate : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_is_null_predicate g_push_is_null_predicate;
+
+struct push_case_when_else : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_case_when_else g_push_case_when_else;
+
+struct push_when_condition_then : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_when_condition_then g_push_when_condition_then;
+
+struct push_when_value_then : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_when_value_then g_push_when_value_then;
+
+struct push_case_value_when_value_else : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_case_value_when_value_else g_push_case_value_when_value_else;
+
+struct push_substr_from : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_substr_from g_push_substr_from;
+
+struct push_substr_from_for : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_substr_from_for g_push_substr_from_for;
+
+struct push_trim_type : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_trim_type g_push_trim_type;
+
+struct push_trim_whitespace_both : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_trim_whitespace_both g_push_trim_whitespace_both;
+
+struct push_trim_expr_one_side_whitespace : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_trim_expr_one_side_whitespace g_push_trim_expr_one_side_whitespace;
+
+struct push_trim_expr_anychar_anyside : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_trim_expr_anychar_anyside g_push_trim_expr_anychar_anyside;
+
+struct push_datediff : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_datediff g_push_datediff;
+
+struct push_dateadd : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_dateadd g_push_dateadd;
+
+struct push_extract : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_extract g_push_extract;
+
+struct push_date_part : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_date_part g_push_date_part;
+
+struct push_time_to_string_constant : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_time_to_string_constant g_push_time_to_string_constant;
+
+struct push_time_to_string_dynamic : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_time_to_string_dynamic g_push_time_to_string_dynamic;
+
+struct push_string_to_time_constant : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_string_to_time_constant g_push_string_to_time_constant;
+
+struct push_array_number : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_array_number g_push_array_number;
+
+struct push_json_array_name : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_json_array_name g_push_json_array_name;
+
+struct push_json_object : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_json_object g_push_json_object;
+
+struct s3select : public bsc::grammar<s3select>
+{
+private:
+
+ actionQ m_actionQ;
+ scratch_area m_sca;
+ s3select_functions m_s3select_functions;
+ std::string error_description;
+ s3select_allocator m_s3select_allocator;
+ bool aggr_flow = false;
+ bool m_json_query = false;
+ std::set<base_statement*> m_ast_nodes_to_delete;
+ base_function* m_to_timestamp_for_clean = nullptr;
+
+#define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name, const_cast<s3select*>(&self), _1, _2)
+
+public:
+
+ std::set<base_statement*>& get_ast_nodes_to_delete()
+ {
+ return m_ast_nodes_to_delete;
+ }
+
+ base_function* & get_to_timestamp_for_clean()
+ {
+ return m_to_timestamp_for_clean;
+ }
+
+ actionQ* getAction()
+ {
+ return &m_actionQ;
+ }
+
+ s3select_allocator* getAllocator()
+ {
+ return &m_s3select_allocator;
+ }
+
+ s3select_functions* getS3F()
+ {
+ return &m_s3select_functions;
+ }
+
+ int semantic()
+ {
+ for (const auto &e : get_projections_list())
+ {
+ e->resolve_node();
+ //upon validate there is no aggregation-function nested calls, it validates legit aggregation call.
+ if (e->is_nested_aggregate(aggr_flow))
+ {
+ error_description = "nested aggregation function is illegal i.e. sum(...sum ...)";
+ throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ e->push_for_cleanup(m_ast_nodes_to_delete);
+ }
+
+ if(get_filter())
+ get_filter()->push_for_cleanup(m_ast_nodes_to_delete);
+
+ if (aggr_flow == true)
+ {// atleast one projection column contain aggregation function
+ for (const auto &e : get_projections_list())
+ {
+ auto aggregate_expr = e->get_aggregate();
+
+ if (aggregate_expr)
+ {
+ //per each column, subtree is mark to skip except for the aggregation function subtree.
+ //for an example: substring( ... , sum() , count() ) :: the substring is mark to skip execution, while sum and count not.
+ e->set_skip_non_aggregate(true);
+ e->mark_aggreagtion_subtree_to_execute();
+ }
+ else
+ {
+ //in case projection column is not aggregate, the projection column must *not* contain reference to columns.
+ if(e->is_column_reference())
+ {
+ error_description = "illegal query; projection contains aggregation function is not allowed with projection contains column reference";
+ throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ }
+
+ }
+ }
+
+ m_json_query = (m_actionQ.json_from_clause.size() != 0);
+
+ return 0;
+ }
+
+ int parse_query(const char* input_query)
+ {
+ if(get_projections_list().empty() == false)
+ {
+ return 0; //already parsed
+ }
+
+
+ error_description.clear();
+ aggr_flow = false;
+
+ try
+ {
+ bsc::parse_info<> info = bsc::parse(input_query, *this, bsc::space_p);
+ auto query_parse_position = info.stop;
+
+ if (!info.full)
+ {
+ error_description = std::string("failure -->") + query_parse_position + std::string("<---");
+ return -1;
+ }
+
+ semantic();
+ }
+ catch (base_s3select_exception& e)
+ {
+ error_description.assign(e.what());
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
+ {
+ return -1;
+ }
+ }
+
+ return 0;
+ }
+
+ std::string get_error_description()
+ {
+ return error_description;
+ }
+
+ s3select()
+ {
+ m_s3select_functions.setAllocator(&m_s3select_allocator);
+ m_s3select_functions.set_AST_nodes_for_cleanup(&m_ast_nodes_to_delete);
+ }
+
+ bool is_semantic()//TBD traverse and validate semantics per all nodes
+ {
+ base_statement* cond = m_actionQ.exprQ.back();
+
+ return cond->semantic();
+ }
+
+ std::string get_from_clause() const
+ {
+ return m_actionQ.from_clause;
+ }
+
+ bool is_limit()
+ {
+ return m_actionQ.limit_op;
+ }
+
+ unsigned long get_limit()
+ {
+ return m_actionQ.limit;
+ }
+
+ void load_schema(std::vector< std::string>& scm)
+ {
+ int i = 0;
+ for (auto& c : scm)
+ {
+ m_sca.set_column_pos(c.c_str(), i++);
+ }
+ }
+
+ base_statement* get_filter()
+ {
+ if(m_actionQ.exprQ.empty())
+ {
+ return nullptr;
+ }
+
+ return m_actionQ.exprQ.back();
+ }
+
+ std::vector<base_statement*> get_projections_list()
+ {
+ return *m_actionQ.projections.get(); //TODO return COPY(?) or to return evalaution results (list of class value{}) / return reference(?)
+ }
+
+ scratch_area* get_scratch_area()
+ {
+ return &m_sca;
+ }
+
+ projection_alias* get_aliases()
+ {
+ return &m_actionQ.alias_map;
+ }
+
+ std::vector<std::pair<json_variable_access*,size_t>>& get_json_variables_access()
+ {
+ return m_actionQ.json_statement_variables_match_expression;
+ }
+
+ bool is_aggregate_query() const
+ {
+ return aggr_flow == true;
+ }
+
+ bool is_json_query()
+ {
+ return m_json_query;
+ }
+
+ ~s3select()
+ {
+ for(auto it : m_ast_nodes_to_delete)
+ {
+ if (it->is_function())
+ {//upon its a function, call to the implementation destructor
+ if(dynamic_cast<__function*>(it)->impl())
+ dynamic_cast<__function*>(it)->impl()->dtor();
+ }
+ //calling to destrcutor of class-function itself, or non-function destructor
+ it->dtor();
+ }
+
+ for(auto x: m_actionQ.json_statement_variables_match_expression)
+ {//the json_variable_access object is allocated by S3SELECT_NEW. this object contains stl-vector that should be free
+ x.first->~json_variable_access();
+ }
+ if(m_to_timestamp_for_clean)
+ {
+ m_to_timestamp_for_clean->dtor();
+ }
+ }
+
+#define JSON_ROOT_OBJECT "s3object[*]"
+
+//the input is converted to lower case
+#define S3SELECT_KW( reserve_word ) bsc::as_lower_d[ reserve_word ]
+
+ template <typename ScannerT>
+ struct definition
+ {
+ explicit definition(s3select const& self)
+ {
+ ///// s3select syntax rules and actions for building AST
+
+ select_expr = select_expr_base_ >> bsc::lexeme_d[ *(bsc::str_p(" ")|bsc::str_p(";")) ];
+
+ select_expr_base_ = select_expr_base >> S3SELECT_KW("limit") >> (limit_number)[BOOST_BIND_ACTION(push_limit_clause)] | select_expr_base;
+
+ limit_number = (+bsc::digit_p);
+
+ select_expr_base = S3SELECT_KW("select") >> projections >> S3SELECT_KW("from") >> (from_expression)[BOOST_BIND_ACTION(push_from_clause)] >> !where_clause ;
+
+ projections = projection_expression >> *( ',' >> projection_expression) ;
+
+ projection_expression = (arithmetic_expression >> S3SELECT_KW("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] |
+ (arithmetic_expression)[BOOST_BIND_ACTION(push_projection)] |
+ (arithmetic_predicate >> S3SELECT_KW("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] |
+ (arithmetic_predicate)[BOOST_BIND_ACTION(push_projection)] ;
+
+ alias_name = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)] ;
+
+ when_case_else_projection = (S3SELECT_KW("case") >> (+when_stmt) >> S3SELECT_KW("else") >> arithmetic_expression >> S3SELECT_KW("end")) [BOOST_BIND_ACTION(push_case_when_else)];
+
+ when_stmt = (S3SELECT_KW("when") >> condition_expression >> S3SELECT_KW("then") >> arithmetic_expression)[BOOST_BIND_ACTION(push_when_condition_then)];
+
+ when_case_value_when = (S3SELECT_KW("case") >> arithmetic_expression >>
+ (+when_value_then) >> S3SELECT_KW("else") >> arithmetic_expression >> S3SELECT_KW("end")) [BOOST_BIND_ACTION(push_case_value_when_value_else)];
+
+ when_value_then = (S3SELECT_KW("when") >> arithmetic_expression >> S3SELECT_KW("then") >> arithmetic_expression)[BOOST_BIND_ACTION(push_when_value_then)];
+
+ from_expression = (s3_object >> variable_name ) | s3_object;
+
+ //the stdin and object_path are for debug purposes(not part of the specs)
+ s3_object = json_s3_object | S3SELECT_KW("stdin") | S3SELECT_KW("s3object") | object_path;
+
+ json_s3_object = ((S3SELECT_KW(JSON_ROOT_OBJECT)) >> *(bsc::str_p(".") >> json_path_element))[BOOST_BIND_ACTION(push_json_from_clause)];
+
+ json_path_element = bsc::lexeme_d[+( bsc::alnum_p | bsc::str_p("_")) ];
+
+ object_path = "/" >> *( fs_type >> "/") >> fs_type;
+
+ fs_type = bsc::lexeme_d[+( bsc::alnum_p | bsc::str_p(".") | bsc::str_p("_")) ];
+
+ where_clause = S3SELECT_KW("where") >> condition_expression;
+
+ condition_expression = arithmetic_predicate;
+
+ arithmetic_predicate = (S3SELECT_KW("not") >> logical_predicate)[BOOST_BIND_ACTION(push_negation)] | logical_predicate;
+
+ logical_predicate = (logical_and) >> *(or_op[BOOST_BIND_ACTION(push_logical_operator)] >> (logical_and)[BOOST_BIND_ACTION(push_logical_predicate)]);
+
+ logical_and = (cmp_operand) >> *(and_op[BOOST_BIND_ACTION(push_logical_operator)] >> (cmp_operand)[BOOST_BIND_ACTION(push_logical_predicate)]);
+
+ cmp_operand = special_predicates | (factor) >> *(arith_cmp[BOOST_BIND_ACTION(push_compare_operator)] >> (factor)[BOOST_BIND_ACTION(push_arithmetic_predicate)]);
+
+ special_predicates = (is_null) | (is_not_null) | (between_predicate) | (not_between) | (in_predicate) | (like_predicate);
+
+ is_null = ((factor) >> S3SELECT_KW("is") >> S3SELECT_KW("null"))[BOOST_BIND_ACTION(push_is_null_predicate)];
+
+ is_not_null = ((factor) >> S3SELECT_KW("is") >> S3SELECT_KW("not") >> S3SELECT_KW("null"))[BOOST_BIND_ACTION(push_is_null_predicate)];
+
+ between_predicate = (arithmetic_expression >> S3SELECT_KW("between") >> arithmetic_expression >> S3SELECT_KW("and") >> arithmetic_expression)[BOOST_BIND_ACTION(push_between_filter)];
+
+ not_between = (arithmetic_expression >> S3SELECT_KW("not") >> S3SELECT_KW("between") >> arithmetic_expression >> S3SELECT_KW("and") >> arithmetic_expression)[BOOST_BIND_ACTION(push_not_between_filter)];
+
+ in_predicate = (arithmetic_expression >> S3SELECT_KW("in") >> '(' >> arithmetic_expression[BOOST_BIND_ACTION(push_in_predicate_first_arg)] >> *(',' >> arithmetic_expression[BOOST_BIND_ACTION(push_in_predicate_arguments)]) >> ')')[BOOST_BIND_ACTION(push_in_predicate)];
+
+ like_predicate = (like_predicate_escape) |(like_predicate_no_escape);
+
+ like_predicate_no_escape = (arithmetic_expression >> S3SELECT_KW("like") >> arithmetic_expression)[BOOST_BIND_ACTION(push_like_predicate_no_escape)];
+
+ like_predicate_escape = (arithmetic_expression >> S3SELECT_KW("like") >> arithmetic_expression >> S3SELECT_KW("escape") >> arithmetic_expression)[BOOST_BIND_ACTION(push_like_predicate_escape)];
+
+ factor = arithmetic_expression | ( '(' >> arithmetic_predicate >> ')' ) ;
+
+ arithmetic_expression = (addsub_operand >> *(addsubop_operator[BOOST_BIND_ACTION(push_addsub)] >> addsub_operand[BOOST_BIND_ACTION(push_addsub_binop)] ));
+
+ addsub_operand = (mulldiv_operand >> *(muldiv_operator[BOOST_BIND_ACTION(push_mulop)] >> mulldiv_operand[BOOST_BIND_ACTION(push_mulldiv_binop)] ));// this non-terminal gives precedense to mull/div
+
+ mulldiv_operand = arithmetic_argument | ('(' >> (arithmetic_expression) >> ')') ;
+
+ list_of_function_arguments = (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)] >> *(',' >> (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)]);
+
+ reserved_function_names = (S3SELECT_KW("when")|S3SELECT_KW("case")|S3SELECT_KW("then")|S3SELECT_KW("not")|S3SELECT_KW("limit")|S3SELECT_KW("where")|S3SELECT_KW("in")|S3SELECT_KW("between") |
+ S3SELECT_KW("like")|S3SELECT_KW("is") );
+
+ function = ( ((variable_name) >> '(' )[BOOST_BIND_ACTION(push_function_name)] >> !list_of_function_arguments >> ')')[BOOST_BIND_ACTION(push_function_expr)];
+
+ arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] | (number)[BOOST_BIND_ACTION(push_number)] | (json_variable_name)[BOOST_BIND_ACTION(push_json_variable)] |
+ (column_pos)[BOOST_BIND_ACTION(push_column_pos)] |
+ (string)[BOOST_BIND_ACTION(push_string)] | (backtick_string) | (datediff) | (dateadd) | (extract) | (time_to_string_constant) | (time_to_string_dynamic) |
+ (cast) | (substr) | (trim) | (when_case_value_when) | (when_case_else_projection) |
+ (function) | (variable)[BOOST_BIND_ACTION(push_variable)]; //function is pushed by right-term
+
+ cast = cast_as_data_type | cast_as_decimal_expr ;
+
+ cast_as_data_type = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> (data_type) >> ')') [BOOST_BIND_ACTION(push_cast_expr)];
+
+ cast_as_decimal_expr = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> decimal_operator >> ')') [BOOST_BIND_ACTION(push_cast_decimal_expr)];
+
+ decimal_operator = (S3SELECT_KW("decimal") >> '(' >> (number)[BOOST_BIND_ACTION(push_number)] >> ',' >> (number)[BOOST_BIND_ACTION(push_number)] >> ')')
+ [BOOST_BIND_ACTION(push_decimal_operator)];
+
+ data_type = (S3SELECT_KW("int") | S3SELECT_KW("float") | S3SELECT_KW("string") | S3SELECT_KW("timestamp") | S3SELECT_KW("bool"))[BOOST_BIND_ACTION(push_data_type)];
+
+ substr = (substr_from) | (substr_from_for);
+
+ substr_from = (S3SELECT_KW("substring") >> '(' >> (arithmetic_expression >> S3SELECT_KW("from") >> arithmetic_expression) >> ')') [BOOST_BIND_ACTION(push_substr_from)];
+
+ substr_from_for = (S3SELECT_KW("substring") >> '(' >> (arithmetic_expression >> S3SELECT_KW("from") >> arithmetic_expression >> S3SELECT_KW("for") >> arithmetic_expression) >> ')') [BOOST_BIND_ACTION(push_substr_from_for)];
+
+ trim = (trim_whitespace_both) | (trim_one_side_whitespace) | (trim_anychar_anyside);
+
+ trim_one_side_whitespace = (S3SELECT_KW("trim") >> '(' >> (trim_type)[BOOST_BIND_ACTION(push_trim_type)] >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_trim_expr_one_side_whitespace)];
+
+ trim_whitespace_both = (S3SELECT_KW("trim") >> '(' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_trim_whitespace_both)];
+
+ trim_anychar_anyside = (S3SELECT_KW("trim") >> '(' >> ((trim_remove_type)[BOOST_BIND_ACTION(push_trim_type)] >> arithmetic_expression >> S3SELECT_KW("from") >> arithmetic_expression) >> ')') [BOOST_BIND_ACTION(push_trim_expr_anychar_anyside)];
+
+ trim_type = ((S3SELECT_KW("leading") >> S3SELECT_KW("from")) | ( S3SELECT_KW("trailing") >> S3SELECT_KW("from")) | (S3SELECT_KW("both") >> S3SELECT_KW("from")) | S3SELECT_KW("from") );
+
+ trim_remove_type = (S3SELECT_KW("leading") | S3SELECT_KW("trailing") | S3SELECT_KW("both") );
+
+ datediff = (S3SELECT_KW("date_diff") >> '(' >> date_part >> ',' >> arithmetic_expression >> ',' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_datediff)];
+
+ dateadd = (S3SELECT_KW("date_add") >> '(' >> date_part >> ',' >> arithmetic_expression >> ',' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_dateadd)];
+
+ extract = (S3SELECT_KW("extract") >> '(' >> (date_part_extract)[BOOST_BIND_ACTION(push_date_part)] >> S3SELECT_KW("from") >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_extract)];
+
+ date_part = (S3SELECT_KW("year") | S3SELECT_KW("month") | S3SELECT_KW("day") | S3SELECT_KW("hour") | S3SELECT_KW("minute") | S3SELECT_KW("second")) [BOOST_BIND_ACTION(push_date_part)];
+
+ date_part_extract = ((date_part) | S3SELECT_KW("week") | S3SELECT_KW("timezone_hour") | S3SELECT_KW("timezone_minute"));
+
+ time_to_string_constant = (S3SELECT_KW("to_string") >> '(' >> arithmetic_expression >> ',' >> (string)[BOOST_BIND_ACTION(push_string)] >> ')') [BOOST_BIND_ACTION(push_time_to_string_constant)];
+
+ time_to_string_dynamic = (S3SELECT_KW("to_string") >> '(' >> arithmetic_expression >> ',' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_time_to_string_dynamic)];
+
+ number = bsc::int_p;
+
+ float_number = bsc::real_p;
+
+ string = (bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"")) | (bsc::str_p("\'") >> *( bsc::anychar_p - bsc::str_p("\'") ) >> bsc::str_p("\'"));
+
+ backtick_string = (bsc::str_p("`") >> *( bsc::anychar_p - bsc::str_p("`") ) >> bsc::str_p("`")) [BOOST_BIND_ACTION(push_string_to_time_constant)];
+
+ column_pos = (variable_name >> "." >> column_pos_name) | column_pos_name; //TODO what about space
+
+ column_pos_name = ('_'>>+(bsc::digit_p) ) | '*' ;
+
+ muldiv_operator = bsc::str_p("*") | bsc::str_p("/") | bsc::str_p("^") | bsc::str_p("%");// got precedense
+
+ addsubop_operator = bsc::str_p("+") | bsc::str_p("-");
+
+ arith_cmp = bsc::str_p("<>") | bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("=") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!=");
+
+ and_op = S3SELECT_KW("and");
+
+ or_op = S3SELECT_KW("or");
+
+ variable_name = bsc::lexeme_d[(+bsc::alpha_p >> *( bsc::alpha_p | bsc::digit_p | '_') ) - reserved_function_names];
+
+ variable = (variable_name >> "." >> variable_name) | variable_name;
+
+ json_variable_name = bsc::str_p("_1") >> +("." >> (json_array | json_object) );
+
+ json_object = (variable_name)[BOOST_BIND_ACTION(push_json_object)];
+
+ json_array = (variable_name >> +(bsc::str_p("[") >> number[BOOST_BIND_ACTION(push_array_number)] >> bsc::str_p("]")) )[BOOST_BIND_ACTION(push_json_array_name)];
+ }
+
+
+ bsc::rule<ScannerT> cast, data_type, variable, json_variable_name, variable_name, select_expr, select_expr_base, select_expr_base_, s3_object, where_clause, limit_number;
+ bsc::rule<ScannerT> number, float_number, string, backtick_string, from_expression, cast_as_data_type, cast_as_decimal_expr, decimal_operator;
+ bsc::rule<ScannerT> cmp_operand, arith_cmp, condition_expression, arithmetic_predicate, logical_predicate, factor;
+ bsc::rule<ScannerT> trim, trim_whitespace_both, trim_one_side_whitespace, trim_anychar_anyside, trim_type, trim_remove_type, substr, substr_from, substr_from_for;
+ bsc::rule<ScannerT> datediff, dateadd, extract, date_part, date_part_extract, time_to_string_constant, time_to_string_dynamic;
+ bsc::rule<ScannerT> special_predicates, between_predicate, not_between, in_predicate, like_predicate, like_predicate_escape, like_predicate_no_escape, is_null, is_not_null;
+ bsc::rule<ScannerT> muldiv_operator, addsubop_operator, function, arithmetic_expression, addsub_operand, list_of_function_arguments, arithmetic_argument, mulldiv_operand, reserved_function_names;
+ bsc::rule<ScannerT> fs_type, object_path,json_s3_object,json_path_element,json_object,json_array;
+ bsc::rule<ScannerT> projections, projection_expression, alias_name, column_pos,column_pos_name;
+ bsc::rule<ScannerT> when_case_else_projection, when_case_value_when, when_stmt, when_value_then;
+ bsc::rule<ScannerT> logical_and,and_op,or_op;
+ bsc::rule<ScannerT> const& start() const
+ {
+ return select_expr ;
+ }
+ };
+};
+
+void base_ast_builder::operator()(s3select *self, const char *a, const char *b) const
+{
+ //the purpose of the following procedure is to bypass boost::spirit rescan (calling to bind-action more than once per the same text)
+ //which cause wrong AST creation (and later false execution).
+ if (self->getAction()->is_already_scanned((void *)(this), const_cast<char *>(a)))
+ return;
+
+ builder(self, a, b);
+}
+
+void push_from_clause::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b),table_name,alias_name;
+
+ //should search for generic space
+ if(token.find(' ') != std::string::npos)
+ {
+ size_t pos = token.find(' ');
+ table_name = token.substr(0,pos);
+
+ pos = token.rfind(' ');
+ alias_name = token.substr(pos+1,token.size());
+
+ self->getAction()->table_alias = alias_name;
+
+ if(self->getAction()->column_prefix != "##" && self->getAction()->table_alias != self->getAction()->column_prefix)
+ {
+ throw base_s3select_exception(std::string("query can not contain more then a single table-alias"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ token = table_name;
+ }
+
+ self->getAction()->from_clause = token;
+
+ self->getAction()->exprQ.clear();
+}
+
+void push_json_from_clause::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b),table_name,alias_name;
+
+ //TODO handle the star-operation ('*') in from-clause. build the parameters for json-reader search-api's.
+ std::vector<std::string> variable_key_path;
+ const char* delimiter = ".";
+ auto pos = token.find(delimiter);
+
+ if(pos != std::string::npos)
+ {
+ token = token.substr(strlen(JSON_ROOT_OBJECT)+1,token.size());
+ pos = token.find(delimiter);
+ do
+ {
+ variable_key_path.push_back(token.substr(0,pos));
+ if(pos != std::string::npos)
+ token = token.substr(pos+1,token.size());
+ else
+ token = "";
+ pos = token.find(delimiter);
+ }while(token.size());
+ }
+ else
+ {
+ variable_key_path.push_back(JSON_ROOT_OBJECT);
+ }
+
+ self->getAction()->json_from_clause = variable_key_path;
+}
+
+void push_limit_clause::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ self->getAction()->limit_op = true;
+ try
+ {
+ self->getAction()->limit = std::stoul(token);
+ }
+ catch(std::invalid_argument& e)
+ {
+ throw base_s3select_exception(std::string("Invalid argument "), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ catch(std::out_of_range& e)
+ {
+ throw base_s3select_exception(std::string("Out of range "), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+}
+
+void push_number::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ variable* v = S3SELECT_NEW(self, variable, atoi(token.c_str()));
+
+ self->getAction()->exprQ.push_back(v);
+}
+
+void push_float_number::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ //the parser for float(real_p) is accepting also integers, thus "blocking" integer acceptence and all are float.
+ bsc::parse_info<> info = bsc::parse(token.c_str(), bsc::int_p, bsc::space_p);
+
+ if (!info.full)
+ {
+ char* perr;
+ double d = strtod(token.c_str(), &perr);
+ variable* v = S3SELECT_NEW(self, variable, d);
+
+ self->getAction()->exprQ.push_back(v);
+ }
+ else
+ {
+ variable* v = S3SELECT_NEW(self, variable, atoi(token.c_str()));
+
+ self->getAction()->exprQ.push_back(v);
+ }
+}
+
+void push_string::builder(s3select* self, const char* a, const char* b) const
+{
+ a++;
+ b--; // remove double quotes
+ std::string token(a, b);
+
+ variable* v = S3SELECT_NEW(self, variable, token, variable::var_t::COLUMN_VALUE);
+
+ self->getAction()->exprQ.push_back(v);
+}
+
+void push_variable::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ variable* v = nullptr;
+
+ if (g_s3select_reserve_word.is_reserved_word(token))
+ {
+ if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_NULL)
+ {
+ v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_NULL);
+ }
+ else if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_NAN)
+ {
+ v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_NAN);
+ }
+ else if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_FALSE)
+ {
+ v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_FALSE);
+ }
+ else if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_TRUE)
+ {
+ v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_TRUE);
+ }
+ else
+ {
+ v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::NA);
+ }
+
+ }
+ else
+ {
+ size_t pos = token.find('.');
+ std::string alias_name;
+ if(pos != std::string::npos)
+ {
+ alias_name = token.substr(0,pos);
+ pos ++;
+ token = token.substr(pos,token.size());
+
+ if(self->getAction()->column_prefix != "##" && alias_name != self->getAction()->column_prefix)
+ {
+ throw base_s3select_exception(std::string("query can not contain more then a single table-alias"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ self->getAction()->column_prefix = alias_name;
+ }
+ v = S3SELECT_NEW(self, variable, token);
+ }
+
+ self->getAction()->exprQ.push_back(v);
+}
+
+void push_json_variable::builder(s3select* self, const char* a, const char* b) const
+{//purpose: handle the use case of json-variable structure (_1.a.b.c)
+
+ std::string token(a, b);
+ std::vector<std::string> variable_key_path;
+
+ //the following flow determine the index per json variable reside on statement.
+ //per each discovered json_variable, it search the json-variables-vector whether it already exists.
+ //in case it is exist, it uses its index (position in vector)
+ //in case it's not exist its pushes the variable into vector.
+ //the json-index is used upon updating the scratch area or searching for a specific json-variable value.
+
+ size_t json_index=self->getAction()->json_statement_variables_match_expression.size();
+ variable* v = nullptr;
+ json_variable_access* ja = S3SELECT_NEW(self, json_variable_access);
+ *ja = self->getAction()->json_var_md;
+ self->getAction()->json_statement_variables_match_expression.push_back(std::pair<json_variable_access*,size_t>(ja,json_index));
+
+ v = S3SELECT_NEW(self, variable, token, variable::var_t::JSON_VARIABLE, json_index);
+ self->getAction()->exprQ.push_back(v);
+
+ self->getAction()->json_var_md.clear();
+}
+
+void push_array_number::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ //DEBUG - TEMP std::cout << "push_array_number " << token << std::endl;
+
+ self->getAction()->json_array_index_number.push_back(std::stoll(token.c_str()));
+}
+
+void push_json_array_name::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ size_t found = token.find("[");
+ std::string array_name = token.substr(0,found);
+
+ //DEBUG - TEMP std::cout << "push_json_array_name " << array_name << std::endl;
+
+ //remove white-space
+ array_name.erase(std::remove_if(array_name.begin(),
+ array_name.end(),
+ [](unsigned char x){return std::isspace(x);}),
+ array_name.end());
+
+ std::vector<std::string> json_path;
+ std::vector<std::string> empty = {};
+ json_path.push_back(array_name);
+
+ self->getAction()->json_var_md.push_variable_state(json_path, -1);//pushing the array-name, {-1} means, search for object-name
+
+ while(self->getAction()->json_array_index_number.size())
+ {
+ self->getAction()->json_var_md.push_variable_state(empty, self->getAction()->json_array_index_number.front());//pushing empty and number>=0, means array-access
+ self->getAction()->json_array_index_number.pop_front();
+ }
+}
+
+void push_json_object::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ //DEBUG - TEMP std::cout << "push_json_object " << token << std::endl;
+
+ self->getAction()->json_object_name = token;
+ std::vector<std::string> json_path;
+ json_path.push_back(token);
+
+ self->getAction()->json_var_md.push_variable_state(json_path, -1);
+}
+
+void push_addsub::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ if (token == "+")
+ {
+ self->getAction()->addsubQ.push_back(addsub_operation::addsub_op_t::ADD);
+ }
+ else
+ {
+ self->getAction()->addsubQ.push_back(addsub_operation::addsub_op_t::SUB);
+ }
+}
+
+void push_mulop::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ if (token == "*")
+ {
+ self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::MULL);
+ }
+ else if (token == "/")
+ {
+ self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::DIV);
+ }
+ else if(token == "^")
+ {
+ self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::POW);
+ }
+ else
+ {
+ self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::MOD);
+ }
+}
+
+void push_addsub_binop::builder(s3select* self, [[maybe_unused]] const char* a,[[maybe_unused]] const char* b) const
+{
+ base_statement* l = nullptr, *r = nullptr;
+
+ r = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ l = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ addsub_operation::addsub_op_t o = self->getAction()->addsubQ.back();
+ self->getAction()->addsubQ.pop_back();
+ addsub_operation* as = S3SELECT_NEW(self, addsub_operation, l, o, r);
+ self->getAction()->exprQ.push_back(as);
+}
+
+void push_mulldiv_binop::builder(s3select* self, [[maybe_unused]] const char* a, [[maybe_unused]] const char* b) const
+{
+ base_statement* vl = nullptr, *vr = nullptr;
+
+ vr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ vl = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ mulldiv_operation::muldiv_t o = self->getAction()->muldivQ.back();
+ self->getAction()->muldivQ.pop_back();
+ mulldiv_operation* f = S3SELECT_NEW(self, mulldiv_operation, vl, o, vr);
+ self->getAction()->exprQ.push_back(f);
+}
+
+void push_function_arg::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ base_statement* be = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ base_statement* f = self->getAction()->funcQ.back();
+
+ if (dynamic_cast<__function*>(f))
+ {
+ dynamic_cast<__function*>(f)->push_argument(be);
+ }
+}
+
+void push_function_name::builder(s3select* self, const char* a, const char* b) const
+{
+ b--;
+ while (*b == '(' || *b == ' ')
+ {
+ b--; //point to function-name
+ }
+
+ std::string fn;
+ fn.assign(a, b - a + 1);
+
+ __function* func = S3SELECT_NEW(self, __function, fn.c_str(), self->getS3F());
+ self->getAction()->funcQ.push_back(func);
+}
+
+void push_function_expr::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ base_statement* func = self->getAction()->funcQ.back();
+ self->getAction()->funcQ.pop_back();
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_compare_operator::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ arithmetic_operand::cmp_t c = arithmetic_operand::cmp_t::NA;
+
+ if (token == "=")
+ {
+ c = arithmetic_operand::cmp_t::EQ;
+ }
+ else if (token == "!=" || token == "<>")
+ {
+ c = arithmetic_operand::cmp_t::NE;
+ }
+ else if (token == ">=")
+ {
+ c = arithmetic_operand::cmp_t::GE;
+ }
+ else if (token == "<=")
+ {
+ c = arithmetic_operand::cmp_t::LE;
+ }
+ else if (token == ">")
+ {
+ c = arithmetic_operand::cmp_t::GT;
+ }
+ else if (token == "<")
+ {
+ c = arithmetic_operand::cmp_t::LT;
+ }
+
+ self->getAction()->arithmetic_compareQ.push_back(c);
+}
+
+void push_logical_operator::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ logical_operand::oplog_t l = logical_operand::oplog_t::NA;
+
+ if (token == "and")
+ {
+ l = logical_operand::oplog_t::AND;
+ }
+ else if (token == "or")
+ {
+ l = logical_operand::oplog_t::OR;
+ }
+
+ self->getAction()->logical_compareQ.push_back(l);
+}
+
+void push_arithmetic_predicate::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ base_statement* vr, *vl;
+ arithmetic_operand::cmp_t c = self->getAction()->arithmetic_compareQ.back();
+ self->getAction()->arithmetic_compareQ.pop_back();
+
+ if (!self->getAction()->exprQ.empty())
+ {
+ vr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+ else
+ {
+ throw base_s3select_exception(std::string("missing right operand for arithmetic-comparision expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if (!self->getAction()->exprQ.empty())
+ {
+ vl = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+ else
+ {
+ throw base_s3select_exception(std::string("missing left operand for arithmetic-comparision expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ arithmetic_operand* t = S3SELECT_NEW(self, arithmetic_operand, vl, c, vr);
+
+ self->getAction()->exprQ.push_back(t);
+}
+
+void push_logical_predicate::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ base_statement* tl = nullptr, *tr = nullptr;
+ logical_operand::oplog_t oplog = self->getAction()->logical_compareQ.back();
+ self->getAction()->logical_compareQ.pop_back();
+
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ tr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+ else
+ {//should reject by syntax parser
+ throw base_s3select_exception(std::string("missing right operand for logical expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ tl = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+ else
+ {//should reject by syntax parser
+ throw base_s3select_exception(std::string("missing left operand for logical expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ logical_operand* f = S3SELECT_NEW(self, logical_operand, tl, oplog, tr);
+
+ self->getAction()->exprQ.push_back(f);
+}
+
+void push_negation::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ base_statement* pred = nullptr;
+
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ pred = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+ else
+ {
+ throw base_s3select_exception(std::string("failed to create AST for NOT operator"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ //upon NOT operator, the logical and arithmetical operators are "tagged" to negate result.
+ if (dynamic_cast<logical_operand*>(pred))
+ {
+ logical_operand* f = S3SELECT_NEW(self, logical_operand, pred);
+ self->getAction()->exprQ.push_back(f);
+ }
+ else if (dynamic_cast<__function*>(pred) || dynamic_cast<negate_function_operation*>(pred) || dynamic_cast<variable*>(pred))
+ {
+ negate_function_operation* nf = S3SELECT_NEW(self, negate_function_operation, pred);
+ self->getAction()->exprQ.push_back(nf);
+ }
+ else if(dynamic_cast<arithmetic_operand*>(pred))
+ {
+ arithmetic_operand* f = S3SELECT_NEW(self, arithmetic_operand, pred);
+ self->getAction()->exprQ.push_back(f);
+ }
+ else
+ {
+ throw base_s3select_exception(std::string("failed to create AST for NOT operator"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+}
+
+void push_column_pos::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ std::string alias_name;
+ variable* v;
+
+ if (token == "*" || token == "* ") //TODO space should skip in boost::spirit
+ {
+ v = S3SELECT_NEW(self, variable, token, variable::var_t::STAR_OPERATION);
+
+ }
+ else
+ {
+ size_t pos = token.find('.');
+ if(pos != std::string::npos)
+ {
+ alias_name = token.substr(0,pos);
+
+ pos ++;
+ token = token.substr(pos,token.size());
+
+ if(self->getAction()->column_prefix != "##" && self->getAction()->column_prefix != alias_name)
+ {
+ throw base_s3select_exception(std::string("query can not contain more then a single table-alias"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ self->getAction()->column_prefix = alias_name;
+ }
+ v = S3SELECT_NEW(self, variable, token, variable::var_t::POS);
+ }
+
+ self->getAction()->exprQ.push_back(v);
+}
+
+void push_projection::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ self->getAction()->projections.get()->push_back(self->getAction()->exprQ.back());
+ self->getAction()->exprQ.pop_back();
+}
+
+void push_alias_projection::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ //extract alias name
+ const char* p = b;
+ while (*(--p) != ' ')
+ ;
+ std::string alias_name(p + 1, b);
+ base_statement* bs = self->getAction()->exprQ.back();
+
+ //mapping alias name to base-statement
+ bool res = self->getAction()->alias_map.insert_new_entry(alias_name, bs);
+ if (res == false)
+ {
+ throw base_s3select_exception(std::string("alias <") + alias_name + std::string("> is already been used in query"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ self->getAction()->projections.get()->push_back(bs);
+ self->getAction()->exprQ.pop_back();
+}
+
+void push_between_filter::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ std::string between_function("#between#");
+
+ __function* func = S3SELECT_NEW(self, __function, between_function.c_str(), self->getS3F());
+
+ base_statement* second_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(second_expr);
+
+ base_statement* first_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(first_expr);
+
+ base_statement* main_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(main_expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_not_between_filter::builder(s3select* self, const char* a, const char* b) const
+{
+
+ static constexpr const std::string_view not_between_function("#not_between#");
+
+ __function* func = S3SELECT_NEW(self, __function, not_between_function.data(), self->getS3F());
+
+ base_statement* second_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(second_expr);
+
+ base_statement* first_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(first_expr);
+
+ base_statement* main_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(main_expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_in_predicate_first_arg::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ if(self->getAction()->exprQ.empty())
+ {
+ throw base_s3select_exception("failed to create AST for in predicate", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ self->getAction()->inPredicateQ.push_back( self->getAction()->exprQ.back() );
+ self->getAction()->exprQ.pop_back();
+
+ if(self->getAction()->exprQ.empty())
+ {
+ throw base_s3select_exception("failed to create AST for in predicate", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ self->getAction()->inMainArg = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+
+}
+
+void push_in_predicate_arguments::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ if(self->getAction()->exprQ.empty())
+ {
+ throw base_s3select_exception("failed to create AST for in predicate", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ self->getAction()->inPredicateQ.push_back( self->getAction()->exprQ.back() );
+
+ self->getAction()->exprQ.pop_back();
+
+}
+
+void push_in_predicate::builder(s3select* self, const char* a, const char* b) const
+{
+ // expr in (e1,e2,e3 ...)
+ std::string token(a, b);
+
+ std::string in_function("#in_predicate#");
+
+ __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
+
+ while(!self->getAction()->inPredicateQ.empty())
+ {
+ base_statement* ei = self->getAction()->inPredicateQ.back();
+
+ self->getAction()->inPredicateQ.pop_back();
+
+ func->push_argument(ei);
+
+ }
+
+ func->push_argument( self->getAction()->inMainArg );
+
+ self->getAction()->exprQ.push_back(func);
+
+ self->getAction()->inPredicateQ.clear();
+
+ self->getAction()->inMainArg = 0;
+}
+
+void push_like_predicate_no_escape::builder(s3select* self, const char* a, const char* b) const
+{
+
+ std::string token(a, b);
+ std::string in_function("#like_predicate#");
+
+ __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
+
+ variable* v = S3SELECT_NEW(self, variable, "\\",variable::var_t::COLUMN_VALUE);
+ func->push_argument(v);
+
+ // experimenting valgrind-issue happens only on teuthology
+ //self->getS3F()->push_for_cleanup(v);
+
+ base_statement* like_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(like_expr);
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_like_predicate_escape::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+ std::string in_function("#like_predicate#");
+
+ __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(expr);
+
+ base_statement* main_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(main_expr);
+
+ base_statement* escape_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(escape_expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_is_null_predicate::builder(s3select* self, const char* a, const char* b) const
+{
+ //expression is null, is not null
+ std::string token(a, b);
+ bool is_null = true;
+
+ for(size_t i=0;i<token.size();i++)
+ {//TODO use other scan rules
+ bsc::parse_info<> info = bsc::parse(token.c_str()+i, (bsc::str_p("is") >> bsc::str_p("not") >> bsc::str_p("null")) , bsc::space_p);
+ if (info.full)
+ is_null = false;
+ }
+
+ std::string in_function("#is_null#");
+
+ if (is_null == false)
+ {
+ in_function = "#is_not_null#";
+ }
+
+ __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
+
+ if (!self->getAction()->exprQ.empty())
+ {
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(expr);
+ }
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_when_condition_then::builder(s3select* self, const char* a, const char* b) const
+{
+//purpose: each new function node, provide execution for (if {condition} then {expresion} )
+ std::string token(a, b);
+
+ // _fn_when_then
+ __function* func = S3SELECT_NEW(self, __function, "#when-then#", self->getS3F());
+
+ base_statement* then_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* when_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(then_expr);
+ func->push_argument(when_expr);
+
+ self->getAction()->exprQ.push_back(func);
+
+ // the first_when_then_expr mark the first when-then expression, it is been used later upon complete the full statement (case when ... then ... else ... end)
+ if(self->getAction()->first_when_then_expr == nullptr)
+ {
+ self->getAction()->first_when_then_expr = func;
+ }
+}
+
+void push_case_when_else::builder(s3select* self, const char* a, const char* b) const
+{
+//purpose: provide the execution for complete statement, i.e. (case when {expression} then {expression} else {expression} end)
+ std::string token(a, b);
+
+ base_statement* else_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ // _fn_case_when_else
+ __function* func = S3SELECT_NEW(self, __function, "#case-when-else#", self->getS3F());
+
+ func->push_argument(else_expr);
+
+ base_statement* when_then_func = nullptr;
+
+ // the loop ended upon reaching the first when-then
+ while(when_then_func != self->getAction()->first_when_then_expr)
+ {
+ // poping from whenThen-queue and pushing to function arguments list
+ when_then_func = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(when_then_func);
+ }
+
+ self->getAction()->first_when_then_expr = nullptr;
+ //func is the complete statement, implemented by _fn_case_when_else
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_case_value_when_value_else::builder(s3select* self, const char* a, const char* b) const
+{
+//purpose: provide execution for the complete statement. i.e. case-value-when-value-else-value-end
+ std::string token(a, b);
+
+ base_statement* else_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ // _fn_case_when_else
+ __function* func = S3SELECT_NEW(self, __function, "#case-when-else#", self->getS3F());
+
+ // push the else expression
+ func->push_argument(else_expr);
+
+ // poping the case-value
+ base_statement* case_value = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* when_then_func = nullptr;
+
+ //poping all when-value-then expression(_fn_when_value_then) and add the case-value per each
+ while(self->getAction()->whenThenQ.empty() == false)
+ {
+ when_then_func = self->getAction()->whenThenQ.back();
+ if (dynamic_cast<__function*>(when_then_func))
+ {
+ // adding the case-value as argument
+ dynamic_cast<__function*>(when_then_func)->push_argument(case_value);
+ }
+ else
+ throw base_s3select_exception("failed to create AST for case-value-when construct", base_s3select_exception::s3select_exp_en_t::FATAL);
+
+ self->getAction()->whenThenQ.pop_back();
+
+ func->push_argument(when_then_func);
+ }
+ //pushing the execution function for the complete statement
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_when_value_then::builder(s3select* self, const char* a, const char* b) const
+{
+ //provide execution of when-value-then-value :: _fn_when_value_then
+ std::string token(a, b);
+
+ __function* func = S3SELECT_NEW(self, __function, "#when-value-then#", self->getS3F());
+
+ base_statement* then_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* when_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(then_expr);
+ func->push_argument(when_expr);
+ //each when-value-then-value pushed to dedicated queue
+ self->getAction()->whenThenQ.push_back(func);
+}
+
+void push_decimal_operator::builder(s3select* self, const char* a, const char* b) const
+{//decimal(integer,integer)
+ std::string token(a, b);
+
+ base_statement* lhs = nullptr;
+ base_statement* rhs = nullptr;
+
+ //right side (decimal operator)
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ rhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ //left side (decimal operator)
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ lhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ __function* func = S3SELECT_NEW(self, __function, "#decimal_operator#", self->getS3F());
+
+ func->push_argument(rhs);
+ func->push_argument(lhs);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_cast_decimal_expr::builder(s3select* self, const char* a, const char* b) const
+{
+ //cast(expression as decimal(x,y))
+ std::string token(a, b);
+
+ base_statement* lhs = nullptr;
+ base_statement* rhs = nullptr;
+
+ //right side (decimal operator)
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ rhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ //left side - expression
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ lhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ __function* func = S3SELECT_NEW(self, __function, "#cast_as_decimal#", self->getS3F());
+
+ func->push_argument(rhs);
+ func->push_argument(lhs);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_cast_expr::builder(s3select* self, const char* a, const char* b) const
+{
+ //cast(expression as int/float/string/timestamp) --> new function "int/float/string/timestamp" ( args = expression )
+ std::string token(a, b);
+
+ std::string cast_function;
+
+ cast_function = self->getAction()->dataTypeQ.back();
+ self->getAction()->dataTypeQ.pop_back();
+
+ __function* func = S3SELECT_NEW(self, __function, cast_function.c_str(), self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_data_type::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ auto cast_operator = [&](const char *s){return strncasecmp(a,s,strlen(s))==0;};
+
+ if(cast_operator("int"))
+ {
+ self->getAction()->dataTypeQ.push_back("int");
+ }else if(cast_operator("float"))
+ {
+ self->getAction()->dataTypeQ.push_back("float");
+ }else if(cast_operator("string"))
+ {
+ self->getAction()->dataTypeQ.push_back("string");
+ }else if(cast_operator("timestamp"))
+ {
+ self->getAction()->dataTypeQ.push_back("to_timestamp");
+ }else if(cast_operator("bool"))
+ {
+ self->getAction()->dataTypeQ.push_back("to_bool");
+ }
+}
+
+void push_trim_whitespace_both::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ __function* func = S3SELECT_NEW(self, __function, "#trim#", self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_trim_expr_one_side_whitespace::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ std::string trim_function;
+
+ trim_function = self->getAction()->trimTypeQ.back();
+ self->getAction()->trimTypeQ.pop_back();
+
+ __function* func = S3SELECT_NEW(self, __function, trim_function.c_str(), self->getS3F());
+
+ base_statement* inp_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(inp_expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_trim_expr_anychar_anyside::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ std::string trim_function;
+
+ trim_function = self->getAction()->trimTypeQ.back();
+ self->getAction()->trimTypeQ.pop_back();
+
+ __function* func = S3SELECT_NEW(self, __function, trim_function.c_str(), self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(expr);
+
+ base_statement* inp_expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(inp_expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_trim_type::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ auto trim_option = [&](const char *s){return strncmp(a,s,strlen(s))==0;};
+
+ if(trim_option("leading"))
+ {
+ self->getAction()->trimTypeQ.push_back("#leading#");
+ }else if(trim_option("trailing"))
+ {
+ self->getAction()->trimTypeQ.push_back("#trailing#");
+ }else
+ {
+ self->getAction()->trimTypeQ.push_back("#trim#");
+ }
+}
+
+void push_substr_from::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ __function* func = S3SELECT_NEW(self, __function, "substring", self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* start_position = self->getAction()->exprQ.back();
+
+ self->getAction()->exprQ.pop_back();
+ func->push_argument(start_position);
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_substr_from_for::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ __function* func = S3SELECT_NEW(self, __function, "substring", self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* start_position = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* end_position = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(end_position);
+ func->push_argument(start_position);
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_datediff::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ std::string date_op;
+
+ date_op = self->getAction()->datePartQ.back();
+ self->getAction()->datePartQ.pop_back();
+
+ std::string date_function = "#datediff_" + date_op + "#";
+
+ __function* func = S3SELECT_NEW(self, __function, date_function.c_str(), self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* start_position = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(start_position);
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_dateadd::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ std::string date_op;
+
+ date_op = self->getAction()->datePartQ.back();
+ self->getAction()->datePartQ.pop_back();
+
+ std::string date_function = "#dateadd_" + date_op + "#";
+
+ __function* func = S3SELECT_NEW(self, __function, date_function.c_str(), self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* start_position = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(start_position);
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_extract::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ std::string date_op;
+
+ date_op = self->getAction()->datePartQ.back();
+ self->getAction()->datePartQ.pop_back();
+
+ std::string date_function = "#extract_" + date_op + "#";
+
+ __function* func = S3SELECT_NEW(self, __function, date_function.c_str(), self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_date_part::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ self->getAction()->datePartQ.push_back(token);
+}
+
+void push_time_to_string_constant::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ __function* func = S3SELECT_NEW(self, __function, "#to_string_constant#", self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* frmt = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(frmt);
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+
+}
+
+void push_time_to_string_dynamic::builder(s3select* self, const char* a, const char* b) const
+{
+ std::string token(a, b);
+
+ __function* func = S3SELECT_NEW(self, __function, "#to_string_dynamic#", self->getS3F());
+
+ base_statement* expr = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ base_statement* frmt = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+
+ func->push_argument(frmt);
+ func->push_argument(expr);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_string_to_time_constant::builder(s3select* self, const char* a, const char* b) const
+{
+ //token could be a string or a timestamp, we need to check it
+ //upon it is a timestamp format, we need to push the variable as timestamp or else, it as a string
+ //the purpose is to use backticks to convert the string to timestamp in parsing time instead of processing time(Trino uses this approach)
+
+ a++; //remove the first quote
+ b--;
+ std::string token(a, b);
+
+ _fn_to_timestamp* to_timestamp = S3SELECT_NEW(self, _fn_to_timestamp);//TODO the _fn_to_timestamp should release the memory (cleanup)
+ bs_stmt_vec_t args;
+
+ variable* var_string = S3SELECT_NEW(self, variable, token, variable::var_t::COLUMN_VALUE);
+ variable* timestamp = S3SELECT_NEW(self, variable, token, variable::var_t::COLUMN_VALUE);
+
+ (self->get_to_timestamp_for_clean()) = to_timestamp;
+ var_string->push_for_cleanup(self->get_ast_nodes_to_delete());
+ timestamp->push_for_cleanup(self->get_ast_nodes_to_delete());
+
+ args.push_back(var_string);
+
+ try {
+ (*to_timestamp)(&args, timestamp);
+ }
+ catch(std::exception& e)
+ {
+ //it is not a timestamp, it is a string
+ self->getAction()->exprQ.push_back(var_string);
+ return;
+ }
+
+ self->getAction()->exprQ.push_back(timestamp);
+}
+
+struct s3select_csv_definitions //TODO
+{
+ char row_delimiter;
+ char column_delimiter;
+ char output_row_delimiter;
+ char output_column_delimiter;
+ char escape_char;
+ char output_escape_char;
+ char output_quot_char;
+ char quot_char;
+ bool use_header_info;
+ bool ignore_header_info;//skip first line
+ bool quote_fields_always;
+ bool quote_fields_asneeded;
+ bool redundant_column;
+ bool comment_empty_lines;
+ std::vector<char> comment_chars;
+ std::vector<char> trim_chars;
+
+ s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false) {}
+
+};
+
+
+/////// handling different object types
+class base_s3object
+{
+
+protected:
+ scratch_area* m_sa;
+ std::string m_obj_name;
+ bool m_aggr_flow = false; //TODO once per query
+ bool m_is_to_aggregate;
+ std::vector<base_statement*> m_projections;
+ base_statement* m_where_clause;
+ s3select* m_s3_select;
+ size_t m_error_count;
+ bool m_is_limit_on;
+ unsigned long m_limit;
+ unsigned long m_processed_rows;
+ size_t m_returned_bytes_size;
+ std::function<void(const char*)> fp_ext_debug_mesg;//dispache debug message into external system
+
+public:
+ s3select_csv_definitions m_csv_defintion;//TODO add method for modify
+
+ enum class Status {
+ END_OF_STREAM,
+ INITIAL_STAT,
+ NORMAL_EXIT,
+ LIMIT_REACHED,
+ SQL_ERROR
+ };
+
+ Status m_sql_processing_status;
+
+ Status get_sql_processing_status()
+ {
+ return m_sql_processing_status;
+ }
+
+ bool is_sql_limit_reached()
+ {
+ return m_sql_processing_status == Status::LIMIT_REACHED;
+ }
+
+ void set_base_defintions(s3select* m)
+ {
+ if(m_s3_select || !m)
+ {//not to define twice
+ //not to define with null
+ return;
+ }
+
+ m_s3_select=m;
+ m_sa=m_s3_select->get_scratch_area();
+ m_error_count=0;
+ m_projections = m_s3_select->get_projections_list();
+ m_where_clause = m_s3_select->get_filter();
+
+ if (m_where_clause)
+ {
+ m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
+ }
+
+ for (auto& p : m_projections)
+ {
+ p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
+ }
+ m_is_to_aggregate = true;//TODO not correct. should be set upon end-of-stream
+ m_aggr_flow = m_s3_select->is_aggregate_query();
+
+ m_is_limit_on = m_s3_select->is_limit();
+ if(m_is_limit_on)
+ {
+ m_limit = m_s3_select->get_limit();
+ }
+
+ m_processed_rows = 0;
+ }
+
+ base_s3object():m_sa(nullptr),m_is_to_aggregate(false),m_where_clause(nullptr),m_s3_select(nullptr),m_error_count(0),m_returned_bytes_size(0),m_sql_processing_status(Status::INITIAL_STAT){}
+
+ explicit base_s3object(s3select* m):base_s3object()
+ {
+ if(m)
+ {
+ set_base_defintions(m);
+ }
+ }
+
+ virtual bool is_end_of_stream() {return false;}
+ virtual void row_fetch_data() {}
+ virtual void row_update_data() {}
+ virtual void columnar_fetch_where_clause_columns(){}
+ virtual void columnar_fetch_projection(){}
+ // for the case were the rows are not fetched, but "pushed" by the data-source parser (JSON)
+ virtual bool multiple_row_processing(){return true;}
+
+ void set_external_debug_system(std::function<void(const char*)> fp_external)
+ {
+ fp_ext_debug_mesg = fp_external;
+ }
+
+ size_t get_return_result_size()
+ {
+ return m_returned_bytes_size;
+ }
+
+ void result_values_to_string(multi_values& projections_resuls, std::string& result)
+ {
+ size_t i = 0;
+ std::string output_delimiter(1,m_csv_defintion.output_column_delimiter);
+ std::string output_row_delimiter(1,m_csv_defintion.output_row_delimiter);
+
+ for(auto& res : projections_resuls.values)
+ {
+ if(fp_ext_debug_mesg)
+ fp_ext_debug_mesg( res->to_string() );
+
+ if (m_csv_defintion.quote_fields_always) {
+ std::ostringstream quoted_result;
+ quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
+ result.append(quoted_result.str());
+ m_returned_bytes_size += quoted_result.str().size();
+ }//TODO to add asneeded
+ else
+ {
+ result.append(res->to_string());
+ m_returned_bytes_size += strlen(res->to_string());
+ }
+
+ if(!m_csv_defintion.redundant_column) {
+ if(++i < projections_resuls.values.size()) {
+ result.append(output_delimiter);
+ m_returned_bytes_size += output_delimiter.size();
+ }
+ }
+ else {
+ result.append(output_delimiter);
+ m_returned_bytes_size += output_delimiter.size();
+ }
+ }
+ if(!m_aggr_flow){
+ result.append(output_row_delimiter);
+ m_returned_bytes_size += output_delimiter.size();
+ }
+ }
+
+ Status getMatchRow( std::string& result)
+ {
+ multi_values projections_resuls;
+
+ if (m_is_limit_on && m_processed_rows == m_limit)
+ {
+ return m_sql_processing_status = Status::LIMIT_REACHED;
+ }
+
+ if (m_aggr_flow == true)
+ {
+ do
+ {
+ row_fetch_data();
+ columnar_fetch_where_clause_columns();
+ if (is_end_of_stream())
+ {
+ if (m_is_to_aggregate)
+ for (auto& i : m_projections)
+ {
+ i->set_last_call();
+ i->set_skip_non_aggregate(false);//projection column is set to be runnable
+
+ projections_resuls.push_value( &(i->eval()) );
+ }
+
+ result_values_to_string(projections_resuls,result);
+ return m_sql_processing_status = Status::END_OF_STREAM;
+ }
+
+ m_processed_rows++;
+ if ((*m_projections.begin())->is_set_last_call())
+ {
+ //should validate while query execution , no update upon nodes are marked with set_last_call
+ throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ for (auto& a : *m_s3_select->get_aliases()->get())
+ {
+ a.second->invalidate_cache_result();
+ }
+
+ row_update_data();
+ if (!m_where_clause || m_where_clause->eval().is_true())
+ {
+ columnar_fetch_projection();
+ for (auto i : m_projections)
+ {
+ i->eval();
+ }
+ }
+
+ if(m_is_limit_on && m_processed_rows == m_limit)
+ {
+ for (auto& i : m_projections)
+ {
+ i->set_last_call();
+ i->set_skip_non_aggregate(false);//projection column is set to be runnable
+ projections_resuls.push_value( &(i->eval()) );
+ }
+ result_values_to_string(projections_resuls,result);
+ return m_sql_processing_status = Status::LIMIT_REACHED;
+ }
+ }
+ while (multiple_row_processing());
+ }
+ else
+ {
+ //save the where-clause evaluation result (performance perspective)
+ bool where_clause_result = false;
+ do
+ {
+ row_fetch_data();
+ columnar_fetch_where_clause_columns();
+ if(is_end_of_stream())
+ {
+ return m_sql_processing_status = Status::END_OF_STREAM;
+ }
+
+ m_processed_rows++;
+ row_update_data();
+ for (auto& a : *m_s3_select->get_aliases()->get())
+ {
+ a.second->invalidate_cache_result();
+ }
+ }
+ while (multiple_row_processing() && m_where_clause && !(where_clause_result = m_where_clause->eval().is_true()) && !(m_is_limit_on && m_processed_rows == m_limit));
+
+ // in the of JSON it needs to evaluate the where-clause(for the first time)
+ if(!multiple_row_processing() && m_where_clause){
+ where_clause_result = m_where_clause->eval().is_true();
+ }
+
+ if(m_where_clause && ! where_clause_result && m_is_limit_on && m_processed_rows == m_limit)
+ {
+ return m_sql_processing_status = Status::LIMIT_REACHED;
+ }
+
+ bool found = multiple_row_processing();
+
+ if(!multiple_row_processing())
+ {
+ found = !m_where_clause || where_clause_result;
+ }
+
+ if(found)
+ {
+ columnar_fetch_projection();
+ projections_resuls.clear();
+ for (auto& i : m_projections)
+ {
+ projections_resuls.push_value( &(i->eval()) );
+ }
+ result_values_to_string(projections_resuls,result);
+ }
+
+ }
+ return is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT);
+
+ }//getMatchRow
+
+ virtual ~base_s3object() = default;
+
+}; //base_s3object
+
+//TODO config / default-value
+#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
+class csv_object : public base_s3object
+{
+
+public:
+
+ class csv_defintions : public s3select_csv_definitions
+ {};
+
+ explicit csv_object(s3select* s3_query) :
+ base_s3object(s3_query),
+ m_skip_last_line(false),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0) {}
+
+ csv_object(s3select* s3_query, csv_defintions csv) :
+ base_s3object(s3_query),
+ m_skip_last_line(false),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0)
+ {
+ m_csv_defintion = csv;
+ }
+
+ csv_object():
+ base_s3object(nullptr),
+ m_skip_last_line(false),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0) {}
+
+ void set_csv_query(s3select* s3_query,csv_defintions csv)
+ {
+ if(m_s3_select != nullptr)
+ {
+ //return;
+ }
+
+ set_base_defintions(s3_query);
+ m_csv_defintion = csv;
+ }
+
+private:
+ bool m_skip_last_line;
+ std::string m_error_description;
+ char* m_stream;
+ char* m_end_stream;
+ std::vector<char*> m_row_tokens;
+ CSVParser* csv_parser;
+ bool m_extract_csv_header_info;
+ std::vector<std::string> m_csv_schema{128};
+
+ //handling arbitrary chunks (rows cut in the middle)
+ bool m_previous_line;
+ bool m_skip_first_line;
+ std::string merge_line;
+ std::string m_last_line;
+ size_t m_processed_bytes;
+ int64_t m_number_of_tokens;
+ size_t m_skip_x_first_bytes=0;
+
+ std::function<int(std::string&)> fp_s3select_result_format=nullptr;
+ std::function<int(std::string&)> fp_s3select_header_format=nullptr;
+public:
+ void set_result_formatters( std::function<int(std::string&)>& result_format,
+ std::function<int(std::string&)>& header_format)
+ {
+ fp_s3select_result_format = result_format;
+ fp_s3select_header_format = header_format;
+ }
+private:
+ int getNextRow()
+ {
+ size_t num_of_tokens=0;
+ m_row_tokens.clear();
+
+ if (csv_parser->read_row(m_row_tokens))
+ {
+ num_of_tokens = m_row_tokens.size();
+ }
+ else
+ {
+ return -1;
+ }
+
+ return num_of_tokens;
+ }
+
+public:
+
+ std::string get_error_description()
+ {
+ return m_error_description;
+ }
+
+ virtual ~csv_object() = default;
+
+public:
+ virtual bool is_end_of_stream()
+ {
+ return m_number_of_tokens < 0;
+ }
+
+ virtual void row_fetch_data()
+ {
+ m_number_of_tokens = getNextRow();
+ }
+
+ virtual void row_update_data()
+ {
+ m_sa->update(m_row_tokens, m_number_of_tokens);
+ }
+
+
+ int extract_csv_header_info()
+ {
+
+ if (m_csv_defintion.ignore_header_info == true)
+ {
+ csv_parser->next_line();
+ }
+ else if(m_csv_defintion.use_header_info == true)
+ {
+ size_t num_of_tokens = getNextRow();//TODO validate number of tokens
+
+ for(size_t i=0; i<num_of_tokens; i++)
+ {
+ m_csv_schema[i].assign(m_row_tokens[i]);
+ }
+
+ m_s3_select->load_schema(m_csv_schema);
+ }
+
+ m_extract_csv_header_info = true;
+
+ return 0;
+ }
+
+
+ int run_s3select_on_stream(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size)
+ {
+ int status=0;
+ try{
+ status = run_s3select_on_stream_internal(result,csv_stream,stream_length,obj_size);
+ }
+ catch(base_s3select_exception& e)
+ {
+ m_error_description = e.what();
+ m_error_count ++;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100)//abort query execution
+ {
+ return -1;
+ }
+ }
+ catch(chunkalloc_out_of_mem)
+ {
+ m_error_description = "out of memory";
+ return -1;
+ }
+ catch(io::error::escaped_char_missing& err)
+ {
+ m_error_description = "escaped_char_missing failure while csv parsing";
+ return -1;
+ }
+ catch(io::error::escaped_string_not_closed& err)
+ {
+ m_error_description = "escaped_string_not_closed failure while csv parsing";
+ return -1;
+ }
+ catch(io::error::line_length_limit_exceeded& err)
+ {
+ m_error_description = "line_length_limit_exceeded failure while csv parsing";
+ return -1;
+ }
+ catch(io::error::with_file_name& err)
+ {
+ m_error_description = "with_file_name failure while csv parsing";
+ return -1;
+ }
+ catch(io::error::with_file_line& err)
+ {
+ m_error_description = "with_file_line failure while csv parsing";
+ return -1;
+ }
+
+ return status;
+ }
+
+private:
+ int run_s3select_on_stream_internal(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size)
+ {
+ //purpose: the CSV data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores
+ //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed.
+ std::string tmp_buff;
+
+ m_processed_bytes += stream_length;
+
+ m_skip_first_line = false;
+
+ if (m_previous_line)
+ {
+ //if previous broken line exist , merge it to current chunk
+ char* p_obj_chunk = (char*)csv_stream;
+ while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk<(csv_stream+stream_length))
+ {
+ p_obj_chunk++;
+ }
+
+ tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
+ merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
+ m_previous_line = false;
+ m_skip_first_line = true;
+ m_skip_x_first_bytes = tmp_buff.size()+1;
+
+ //processing the merged row (previous broken row)
+ run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
+ }
+
+ if (stream_length && csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter)
+ {
+ //in case of "broken" last line
+ char* p_obj_chunk = (char*)&(csv_stream[stream_length - 1]);
+ while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk>csv_stream)
+ {
+ p_obj_chunk--; //scan until end-of previous line in chunk
+ }
+
+ u_int32_t skip_last_bytes = (&(csv_stream[stream_length - 1]) - p_obj_chunk);
+ m_last_line.assign(p_obj_chunk + 1, p_obj_chunk + 1 + skip_last_bytes); //save it for next chunk
+
+ m_previous_line = true;//it means to skip last line
+
+ //cut out the broken line
+ stream_length -= (m_last_line.length());
+ }
+
+ return run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
+ }
+
+public:
+ int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate)
+ {
+ m_stream = (char*)csv_stream;
+ m_end_stream = (char*)csv_stream + stream_length;
+ m_is_to_aggregate = do_aggregate;
+ m_skip_last_line = skip_last_line;
+
+ if(skip_first_line)
+ {
+ //the stream may start in the middle of a row (maybe in the middle of a quote).
+ //at this point the stream should skip the first row(broken row).
+ //the csv_parser should be init with the fixed stream position.
+ m_stream += m_skip_x_first_bytes;
+ m_skip_x_first_bytes=0;
+ }
+
+ CSVParser _csv_parser("csv", m_stream, m_end_stream);
+ csv_parser = &_csv_parser;
+ csv_parser->set_csv_def( m_csv_defintion.row_delimiter,
+ m_csv_defintion.column_delimiter,
+ m_csv_defintion.quot_char,
+ m_csv_defintion.escape_char,
+ m_csv_defintion.comment_empty_lines,
+ m_csv_defintion.comment_chars,
+ m_csv_defintion.trim_chars);
+
+
+ if(m_extract_csv_header_info == false)
+ {
+ extract_csv_header_info();
+ }
+ do
+ {
+ m_sql_processing_status = Status::INITIAL_STAT;
+ try
+ {
+ getMatchRow(result);
+ }
+ catch (base_s3select_exception& e)
+ {
+ m_error_description = e.what();
+ m_error_count ++;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100 || (m_stream>=m_end_stream))//abort query execution
+ {
+ return -1;
+ }
+ }
+
+ if(fp_s3select_result_format && fp_s3select_header_format)
+ {
+ if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
+ {//there are systems that might resject the response due to its size.
+ fp_s3select_result_format(result);
+ fp_s3select_header_format(result);
+ }
+ }
+
+ if (m_sql_processing_status == Status::END_OF_STREAM)
+ {
+ break;
+ }
+ else if (m_sql_processing_status == Status::LIMIT_REACHED) // limit reached
+ {
+ break;//user should request for sql_processing_status
+ }
+
+ } while (true);
+
+ if(fp_s3select_result_format && fp_s3select_header_format)
+ { //note: it may produce empty response(more the once)
+ //upon empty result, it should return *only* upon last call.
+ fp_s3select_result_format(result);
+ fp_s3select_header_format(result);
+ }
+
+ return 0;
+ }
+};
+
+#ifdef _ARROW_EXIST
+class parquet_object : public base_s3object
+{
+
+private:
+ std::string m_error_description;
+ parquet_file_parser* object_reader;
+ parquet_file_parser::column_pos_t m_where_clause_columns;
+ parquet_file_parser::column_pos_t m_projections_columns;
+ std::vector<parquet_file_parser::parquet_value_t> m_predicate_values;
+ std::vector<parquet_file_parser::parquet_value_t> m_projections_values;
+ bool not_to_increase_first_time;
+
+public:
+
+ parquet_object(std::string parquet_file_name, s3select *s3_query,s3selectEngine::rgw_s3select_api* rgw) : base_s3object(s3_query),object_reader(nullptr)
+ {
+ try{
+
+ object_reader = new parquet_file_parser(parquet_file_name,rgw); //TODO uniq ptr
+ } catch(std::exception &e)
+ {
+ throw base_s3select_exception(std::string("failure while processing parquet meta-data ") + std::string(e.what()) ,base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ parquet_query_setting(nullptr);
+ }
+
+ parquet_object() : base_s3object(nullptr),object_reader(nullptr)
+ {}
+
+ void parquet_query_setting(s3select *s3_query)
+ {
+ if(s3_query)
+ {
+ set_base_defintions(s3_query);
+ }
+ load_meta_data_into_scratch_area();
+ for(auto x : m_s3_select->get_projections_list())
+ {//traverse the AST and extract all columns reside in projection statement.
+ x->extract_columns(m_projections_columns,object_reader->get_num_of_columns());
+ }
+ //traverse the AST and extract all columns reside in where clause.
+ if(m_s3_select->get_filter())
+ m_s3_select->get_filter()->extract_columns(m_where_clause_columns,object_reader->get_num_of_columns());
+
+ not_to_increase_first_time = true;
+ }
+
+ ~parquet_object()
+ {
+ if(object_reader != nullptr)
+ {
+ delete object_reader;
+ }
+
+ }
+
+ std::string get_error_description()
+ {
+ return m_error_description;
+ }
+
+ bool is_set()
+ {
+ return m_s3_select != nullptr;
+ }
+
+ void set_parquet_object(std::string parquet_file_name, s3select *s3_query,s3selectEngine::rgw_s3select_api* rgw) //TODO duplicate code
+ {
+ try{
+
+ object_reader = new parquet_file_parser(parquet_file_name,rgw); //TODO uniq ptr
+ } catch(std::exception &e)
+ {
+ throw base_s3select_exception(std::string("failure while processing parquet meta-data ") + std::string(e.what()) ,base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ parquet_query_setting(s3_query);
+ }
+
+
+ int run_s3select_on_object(std::string &result,
+ std::function<int(std::string&)> fp_s3select_result_format,
+ std::function<int(std::string&)> fp_s3select_header_format)
+ {
+ m_sql_processing_status = Status::INITIAL_STAT;
+ do
+ {
+ try
+ {
+ getMatchRow(result);
+ }
+ catch (base_s3select_exception &e)
+ {
+ m_error_description = e.what();
+ m_error_count++;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count > 100) //abort query execution
+ {
+ return -1;
+ }
+ }
+ catch (std::exception &e)
+ {
+ m_error_description = e.what();
+ m_error_count++;
+ if (m_error_count > 100) //abort query execution
+ {
+ return -1;
+ }
+ }
+
+#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024)
+ if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT)
+ {//AWS-cli limits response size the following callbacks send response upon some threshold
+ fp_s3select_result_format(result);
+
+ if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED))
+ {
+ fp_s3select_header_format(result);
+ }
+ }
+ else
+ {
+ if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED))
+ {
+ fp_s3select_result_format(result);
+ }
+ }
+
+ //TODO is_end_of_stream() required?
+ if (get_sql_processing_status() == Status::END_OF_STREAM || is_end_of_stream() || get_sql_processing_status() == Status::LIMIT_REACHED)
+ {
+ break;
+ }
+
+ } while (1);
+
+ return 0;
+ }
+
+ void load_meta_data_into_scratch_area()
+ {
+ int i=0;
+ for(auto x : object_reader->get_schema())
+ {
+ m_s3_select->get_scratch_area()->set_column_pos(x.first.c_str(),i++);
+ }
+ }
+
+ virtual bool is_end_of_stream()
+ {
+ return object_reader->end_of_stream();
+ }
+
+ virtual void columnar_fetch_where_clause_columns()
+ {
+ if(!not_to_increase_first_time)//for rownum=0
+ object_reader->increase_rownum();
+ else
+ not_to_increase_first_time = false;
+
+ auto status = object_reader->get_column_values_by_positions(m_where_clause_columns, m_predicate_values);
+ if(status<0)//TODO exception?
+ return;
+ m_sa->update(m_predicate_values, m_where_clause_columns);
+ }
+
+ virtual void columnar_fetch_projection()
+ {
+ auto status = object_reader->get_column_values_by_positions(m_projections_columns, m_projections_values);
+ if(status<0)//TODO exception?
+ return;
+ m_sa->update(m_projections_values, m_projections_columns);
+ }
+
+};
+#endif //_ARROW_EXIST
+
+class json_object : public base_s3object
+{
+private:
+
+ JsonParserHandler JsonHandler;
+ size_t m_processed_bytes;
+ bool m_end_of_stream;
+ std::string* m_s3select_result = nullptr;
+ size_t m_row_count;
+ bool star_operation_ind;
+ std::string m_error_description;
+ bool m_init_json_processor_ind;
+
+public:
+
+ void init_json_processor(s3select* query)
+ {
+ if(m_init_json_processor_ind)
+ return;
+
+ m_init_json_processor_ind = true;
+ std::function<int(void)> f_sql = [this](void){auto res = sql_execution_on_row_cb();return res;};
+ std::function<int(s3selectEngine::value&, int)>
+ f_push_to_scratch = [this](s3selectEngine::value& value,int json_var_idx){return push_into_scratch_area_cb(value,json_var_idx);};
+ std::function <int(s3selectEngine::scratch_area::json_key_value_t&)>
+ f_push_key_value_into_scratch_area_per_star_operation = [this](s3selectEngine::scratch_area::json_key_value_t& key_value)
+ {return push_key_value_into_scratch_area_per_star_operation(key_value);};
+
+ //setting the container for all json-variables, to be extracted by the json reader
+ JsonHandler.set_statement_json_variables(query->get_json_variables_access());
+
+
+ //calling to getMatchRow. processing a single row per each call.
+ JsonHandler.set_s3select_processing_callback(f_sql);
+ //upon excat match between input-json-key-path and sql-statement-variable-path the callback pushes to scratch area
+ JsonHandler.set_exact_match_callback(f_push_to_scratch);
+ //upon star-operation(in statemenet) the callback pushes the key-path and value into scratch-area
+ JsonHandler.set_push_per_star_operation_callback(f_push_key_value_into_scratch_area_per_star_operation);
+
+ //the json-from-clause is unique and should exist. otherwise it's a failure.
+ if(query->getAction()->json_from_clause.empty())
+ {
+ JsonHandler.m_fatal_initialization_ind = true;
+ JsonHandler.m_fatal_initialization_description = "the SQL statement is not align with the correct syntax of JSON statement. from-clause is missing.";
+ return;
+ }
+
+ //setting the from clause path
+ if(query->getAction()->json_from_clause[0] == JSON_ROOT_OBJECT)
+ {
+ query->getAction()->json_from_clause.pop_back();
+ }
+ JsonHandler.set_prefix_match(query->getAction()->json_from_clause);
+
+ for (auto& p : m_projections)
+ {
+ if(p->is_statement_contain_star_operation())
+ {
+ star_operation_ind=true;
+ break;
+ }
+ }
+
+ if(star_operation_ind)
+ {
+ JsonHandler.set_star_operation();
+ //upon star-operation the key-path is extracted with the value, each key-value displayed in a seperate row.
+ //the return results end with a line contains the row-number.
+ m_csv_defintion.output_column_delimiter = m_csv_defintion.output_row_delimiter;
+ }
+
+ m_sa->set_parquet_type();//TODO json type
+ }
+
+ json_object(s3select* query):base_s3object(query),m_processed_bytes(0),m_end_of_stream(false),m_row_count(0),star_operation_ind(false),m_init_json_processor_ind(false)
+ {
+ init_json_processor(query);
+ }
+
+ void set_sql_result(std::string& sql_result)
+ {
+ m_s3select_result = &sql_result;
+ }
+
+ json_object(): base_s3object(nullptr), m_processed_bytes(0),m_end_of_stream(false),m_row_count(0),star_operation_ind(false),m_init_json_processor_ind(false) {}
+
+private:
+
+ virtual bool is_end_of_stream()
+ {
+ return m_end_of_stream == true;
+ }
+
+ virtual bool multiple_row_processing()
+ {
+ return false;
+ }
+
+ int sql_execution_on_row_cb()
+ {
+ //execute statement on row
+ //create response (TODO callback)
+
+ size_t result_len = m_s3select_result->size();
+ int status=0;
+ try{
+ getMatchRow(*m_s3select_result);
+ }
+ catch(s3selectEngine::base_s3select_exception& e)
+ {
+ sql_error_handling(e,*m_s3select_result);
+ status = -1;
+ }
+
+ if(is_sql_limit_reached())
+ {
+ status = JSON_PROCESSING_LIMIT_REACHED;//returning number since sql_execution_on_row_cb is a callback; the caller can not access the object
+ }
+
+ m_sa->clear_data();
+ if(star_operation_ind && (m_s3select_result->size() != result_len))
+ {//as explained above the star-operation is displayed differently
+ std::string end_of_row;
+ end_of_row = "#=== " + std::to_string(m_row_count++) + " ===#\n";
+ m_s3select_result->append(end_of_row);
+ }
+ return status;
+ }
+
+ int push_into_scratch_area_cb(s3selectEngine::value& key_value, int json_var_idx)
+ {
+ //upon exact-filter match push value to scratch area with json-idx , it should match variable
+ //push (key path , json-var-idx , value) json-var-idx should be attached per each exact filter
+ m_sa->update_json_varible(key_value,json_var_idx);
+ return 0;
+ }
+
+ int push_key_value_into_scratch_area_per_star_operation(s3selectEngine::scratch_area::json_key_value_t& key_value)
+ {
+ m_sa->get_star_operation_cont()->push_back( key_value );
+ return 0;
+ }
+
+ void sql_error_handling(s3selectEngine::base_s3select_exception& e,std::string& result)
+ {
+ //the JsonHandler makes the call to SQL processing, upon a failure to procees the SQL statement,
+ //the error-handling takes care of the error flow.
+ m_error_description = e.what();
+ m_error_count++;
+ m_s3select_result->append(std::to_string(m_error_count));
+ *m_s3select_result += " : ";
+ m_s3select_result->append(m_error_description);
+ *m_s3select_result += m_csv_defintion.output_row_delimiter;
+ }
+
+public:
+
+ int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size)
+ {
+ int status=0;
+ m_processed_bytes += stream_length;
+ set_sql_result(result);
+
+ if(JsonHandler.is_fatal_initialization())
+ {
+ throw base_s3select_exception(JsonHandler.m_fatal_initialization_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if(!stream_length || !json_stream)//TODO m_processed_bytes(?)
+ {//last processing cycle
+ JsonHandler.process_json_buffer(0, 0, true);//TODO end-of-stream = end-of-row
+ m_end_of_stream = true;
+ sql_execution_on_row_cb();
+ return 0;
+ }
+
+ try{
+ //the handler is processing any buffer size and return results per each buffer
+ status = JsonHandler.process_json_buffer((char*)json_stream, stream_length);
+ }
+ catch(std::exception &e)
+ {
+ std::string error_description = std::string("exception while processing :") + e.what();
+ throw base_s3select_exception(error_description,base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if(status<0)
+ {
+ std::string error_description = std::string("failure upon JSON processing");
+ throw base_s3select_exception(error_description,base_s3select_exception::s3select_exp_en_t::FATAL);
+ return -1;
+ }
+
+ return status;
+ }
+
+ void set_json_query(s3select* s3_query)
+ {
+ set_base_defintions(s3_query);
+ init_json_processor(s3_query);
+ }
+
+ std::string get_error_description()
+ {
+ return m_error_description;
+ }
+
+ ~json_object() = default;
+};
+
+}; // namespace s3selectEngine
+
+#endif