summaryrefslogtreecommitdiffstats
path: root/src/s3select/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/s3select/include')
-rw-r--r--src/s3select/include/s3select.h1138
-rw-r--r--src/s3select/include/s3select_csv_parser.h407
-rw-r--r--src/s3select/include/s3select_functions.h1037
-rw-r--r--src/s3select/include/s3select_oper.h1320
4 files changed, 3902 insertions, 0 deletions
diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h
new file mode 100644
index 000000000..77af8c2d9
--- /dev/null
+++ b/src/s3select/include/s3select.h
@@ -0,0 +1,1138 @@
+#ifndef __S3SELECT__
+#define __S3SELECT__
+
+#include <boost/spirit/include/classic_core.hpp>
+#include <boost/algorithm/string.hpp>
+#include <iostream>
+#include <string>
+#include <list>
+#include "s3select_oper.h"
+#include "s3select_functions.h"
+#include "s3select_csv_parser.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <functional>
+
+
+#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}
+
+
+namespace s3selectEngine
+{
+
+/// AST builder
+
+class s3select_projections
+{
+
+private:
+ std::vector<base_statement*> m_projections;
+
+public:
+ bool is_aggregate()
+ {
+ //TODO iterate on projections , and search for aggregate
+ //for(auto p : m_projections){}
+
+ return false;
+ }
+
+ bool semantic()
+ {
+ //TODO check aggragtion function are not nested
+ return false;
+ }
+
+ std::vector<base_statement*>* get()
+ {
+ return &m_projections;
+ }
+
+};
+
+struct actionQ
+{
+// upon parser is accepting a token (lets say some number),
+// it push it into dedicated queue, later those tokens are poped out to build some "higher" contruct (lets say 1 + 2)
+// those containers are used only for parsing phase and not for runtime.
+
+ std::vector<mulldiv_operation::muldiv_t> muldivQ;
+ std::vector<addsub_operation::addsub_op_t> addsubQ;
+ std::vector<arithmetic_operand::cmp_t> arithmetic_compareQ;
+ std::vector<logical_operand::oplog_t> logical_compareQ;
+ std::vector<base_statement*> exprQ;
+ std::vector<base_statement*> funcQ;
+ std::vector<base_statement*> condQ;
+ projection_alias alias_map;
+ std::string from_clause;
+ std::vector<std::string> schema_columns;
+ s3select_projections projections;
+
+};
+
+class base_action : public __clt_allocator
+{
+
+public:
+ base_action() : m_action(0), m_s3select_functions(0) {}
+ actionQ* m_action;
+ void set_action_q(actionQ* a)
+ {
+ m_action = a;
+ }
+ void set_s3select_functions(s3select_functions* s3f)
+ {
+ m_s3select_functions = s3f;
+ }
+ s3select_functions* m_s3select_functions;
+};
+
+struct push_from_clause : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ m_action->from_clause = token;
+ }
+
+};
+static push_from_clause g_push_from_clause;
+
+struct push_number : public base_action //TODO use define for defintion of actions
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ variable* v = S3SELECT_NEW( variable, atoi(token.c_str()));
+
+
+ m_action->exprQ.push_back(v);
+ }
+
+};
+static push_number g_push_number;
+
+struct push_float_number : public base_action //TODO use define for defintion of actions
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ //the parser for float(real_p) is accepting also integers, thus "blocking" integer acceptence and all are float.
+ bsc::parse_info<> info = bsc::parse(token.c_str(), bsc::int_p, bsc::space_p);
+
+ if (!info.full)
+ {
+ char* perr;
+ double d = strtod(token.c_str(), &perr);
+ variable* v = S3SELECT_NEW( variable, d);
+
+ m_action->exprQ.push_back(v);
+ }
+ else
+ {
+ variable* v = S3SELECT_NEW(variable, atoi(token.c_str()));
+
+ m_action->exprQ.push_back(v);
+ }
+ }
+
+};
+static push_float_number g_push_float_number;
+
+struct push_string : public base_action //TODO use define for defintion of actions
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ a++;
+ b--;// remove double quotes
+ std::string token(a, b);
+
+ variable* v = S3SELECT_NEW(variable, token, variable::var_t::COL_VALUE );
+
+ m_action->exprQ.push_back(v);
+ }
+
+};
+static push_string g_push_string;
+
+struct push_variable : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ variable* v = S3SELECT_NEW(variable, token);
+
+ m_action->exprQ.push_back(v);
+ }
+};
+static push_variable g_push_variable;
+
+/////////////////////////arithmetic unit /////////////////
+struct push_addsub : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ if (token.compare("+") == 0)
+ {
+ m_action->addsubQ.push_back(addsub_operation::addsub_op_t::ADD);
+ }
+ else
+ {
+ m_action->addsubQ.push_back(addsub_operation::addsub_op_t::SUB);
+ }
+ }
+};
+static push_addsub g_push_addsub;
+
+struct push_mulop : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ if (token.compare("*") == 0)
+ {
+ m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::MULL);
+ }
+ else if (token.compare("/") == 0)
+ {
+ m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::DIV);
+ }
+ else
+ {
+ m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::POW);
+ }
+ }
+};
+static push_mulop g_push_mulop;
+
+struct push_addsub_binop : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ base_statement* l = 0, *r = 0;
+
+ r = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ l = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ addsub_operation::addsub_op_t o = m_action->addsubQ.back();
+ m_action->addsubQ.pop_back();
+ addsub_operation* as = S3SELECT_NEW(addsub_operation, l, o, r);
+ m_action->exprQ.push_back(as);
+ }
+};
+static push_addsub_binop g_push_addsub_binop;
+
+struct push_mulldiv_binop : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ base_statement* vl = 0, *vr = 0;
+
+ vr = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ vl = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ mulldiv_operation::muldiv_t o = m_action->muldivQ.back();
+ m_action->muldivQ.pop_back();
+ mulldiv_operation* f = S3SELECT_NEW(mulldiv_operation, vl, o, vr);
+ m_action->exprQ.push_back(f);
+ }
+};
+static push_mulldiv_binop g_push_mulldiv_binop;
+
+struct push_function_arg : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* be = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ base_statement* f = m_action->funcQ.back();
+
+ if (dynamic_cast<__function*>(f))
+ {
+ dynamic_cast<__function*>(f)->push_argument(be);
+ }
+ }
+};
+static push_function_arg g_push_function_arg;
+
+struct push_function_name : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ b--;
+ while(*b=='(' || *b == ' ')
+ {
+ b--; //point to function-name
+ }
+
+ std::string fn;
+ fn.assign(a, b-a+1);
+
+ __function* func = S3SELECT_NEW(__function, fn.c_str(), m_s3select_functions);
+ m_action->funcQ.push_back(func);
+ }
+};
+static push_function_name g_push_function_name;
+
+struct push_function_expr : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* func = m_action->funcQ.back();
+ m_action->funcQ.pop_back();
+
+ m_action->exprQ.push_back(func);
+ }
+};
+static push_function_expr g_push_function_expr;
+
+////////////////////// logical unit ////////////////////////
+
+struct push_compare_operator : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ arithmetic_operand::cmp_t c = arithmetic_operand::cmp_t::NA;
+
+ if (token.compare("==") == 0)
+ {
+ c = arithmetic_operand::cmp_t::EQ;
+ }
+ else if (token.compare("!=") == 0)
+ {
+ c = arithmetic_operand::cmp_t::NE;
+ }
+ else if (token.compare(">=") == 0)
+ {
+ c = arithmetic_operand::cmp_t::GE;
+ }
+ else if (token.compare("<=") == 0)
+ {
+ c = arithmetic_operand::cmp_t::LE;
+ }
+ else if (token.compare(">") == 0)
+ {
+ c = arithmetic_operand::cmp_t::GT;
+ }
+ else if (token.compare("<") == 0)
+ {
+ c = arithmetic_operand::cmp_t::LT;
+ }
+ else
+ {
+ c = arithmetic_operand::cmp_t::NA;
+ }
+
+ m_action->arithmetic_compareQ.push_back(c);
+ }
+};
+static push_compare_operator g_push_compare_operator;
+
+struct push_logical_operator : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ logical_operand::oplog_t l = logical_operand::oplog_t::NA;
+
+ if (token.compare("and") == 0)
+ {
+ l = logical_operand::oplog_t::AND;
+ }
+ else if (token.compare("or") == 0)
+ {
+ l = logical_operand::oplog_t::OR;
+ }
+ else
+ {
+ l = logical_operand::oplog_t::NA;
+ }
+
+ m_action->logical_compareQ.push_back(l);
+
+ }
+};
+static push_logical_operator g_push_logical_operator;
+
+struct push_arithmetic_predicate : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* vr, *vl;
+ arithmetic_operand::cmp_t c = m_action->arithmetic_compareQ.back();
+ m_action->arithmetic_compareQ.pop_back();
+ vr = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ vl = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+
+ arithmetic_operand* t = S3SELECT_NEW(arithmetic_operand, vl, c, vr);
+
+ m_action->condQ.push_back(t);
+ }
+};
+static push_arithmetic_predicate g_push_arithmetic_predicate;
+
+struct push_logical_predicate : public base_action
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* tl = 0, *tr = 0;
+ logical_operand::oplog_t oplog = m_action->logical_compareQ.back();
+ m_action->logical_compareQ.pop_back();
+
+ if (m_action->condQ.empty() == false)
+ {
+ tr = m_action->condQ.back();
+ m_action->condQ.pop_back();
+ }
+ if (m_action->condQ.empty() == false)
+ {
+ tl = m_action->condQ.back();
+ m_action->condQ.pop_back();
+ }
+
+ logical_operand* f = S3SELECT_NEW(logical_operand, tl, oplog, tr);
+
+ m_action->condQ.push_back(f);
+ }
+};
+static push_logical_predicate g_push_logical_predicate;
+
+struct push_column_pos : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ variable* v;
+
+ if (token.compare("*") == 0 || token.compare("* ")==0) //TODO space should skip in boost::spirit
+ {
+ v = S3SELECT_NEW(variable, token, variable::var_t::STAR_OPERATION);
+ }
+ else
+ {
+ v = S3SELECT_NEW(variable, token, variable::var_t::POS);
+ }
+
+ m_action->exprQ.push_back(v);
+ }
+
+};
+static push_column_pos g_push_column_pos;
+
+struct push_projection : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ m_action->projections.get()->push_back( m_action->exprQ.back() );
+ m_action->exprQ.pop_back();
+ }
+
+};
+static push_projection g_push_projection;
+
+struct push_alias_projection : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ //extract alias name
+ const char* p=b;
+ while(*(--p) != ' ');
+ std::string alias_name(p+1, b);
+ base_statement* bs = m_action->exprQ.back();
+
+ //mapping alias name to base-statement
+ bool res = m_action->alias_map.insert_new_entry(alias_name, bs);
+ if (res==false)
+ {
+ throw base_s3select_exception(std::string("alias <")+alias_name+std::string("> is already been used in query"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+
+ m_action->projections.get()->push_back( bs );
+ m_action->exprQ.pop_back();
+ }
+
+};
+static push_alias_projection g_push_alias_projection;
+
+/// for the schema description "mini-parser"
+struct push_column : public base_action
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ m_action->schema_columns.push_back(token);
+ }
+
+};
+static push_column g_push_column;
+
+struct push_debug_1 : public base_action
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ }
+
+};
+static push_debug_1 g_push_debug_1;
+
+struct s3select : public bsc::grammar<s3select>
+{
+private:
+
+ actionQ m_actionQ;
+
+ scratch_area m_sca;
+
+ s3select_functions m_s3select_functions;
+
+ std::string error_description;
+
+ s3select_allocator m_s3select_allocator;
+
+ bool aggr_flow;
+
+#define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2)
+#define ATTACH_ACTION_Q( push_name ) {(g_ ## push_name).set_action_q(&m_actionQ); (g_ ## push_name).set_s3select_functions(&m_s3select_functions); (g_ ## push_name).set(&m_s3select_allocator);}
+
+public:
+
+
+ int semantic()
+ {
+ for (auto e : get_projections_list())
+ {
+ base_statement* aggr = 0;
+
+ if ((aggr = e->get_aggregate()) != 0)
+ {
+ if (aggr->is_nested_aggregate(aggr))
+ {
+ error_description = "nested aggregation function is illegal i.e. sum(...sum ...)";
+ throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ aggr_flow = true;
+ }
+ }
+
+ if (aggr_flow == true)
+ for (auto e : get_projections_list())
+ {
+ base_statement* skip_expr = e->get_aggregate();
+
+ if (e->is_binop_aggregate_and_column(skip_expr))
+ {
+ error_description = "illegal expression. /select sum(c1) + c1 ..../ is not allow type of query";
+ throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ }
+
+ return 0;
+ }
+
+ int parse_query(const char* input_query)
+ {
+ if(get_projections_list().empty() == false)
+ {
+ return 0; //already parsed
+ }
+
+ try
+ {
+ bsc::parse_info<> info = bsc::parse(input_query, *this, bsc::space_p);
+ auto query_parse_position = info.stop;
+
+ if (!info.full)
+ {
+ std::cout << "failure -->" << query_parse_position << "<---" << std::endl;
+ error_description = std::string("failure -->") + query_parse_position + std::string("<---");
+ return -1;
+ }
+
+ semantic();
+ }
+ catch (base_s3select_exception& e)
+ {
+ std::cout << e.what() << std::endl;
+ error_description.assign(e.what());
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
+ {
+ return -1;
+ }
+ }
+
+ return 0;
+ }
+
+ std::string get_error_description()
+ {
+ return error_description;
+ }
+
+ s3select()
+ {
+ //TODO check option for defining action and push into list
+
+ ATTACH_ACTION_Q(push_from_clause);
+ ATTACH_ACTION_Q(push_number);
+ ATTACH_ACTION_Q(push_logical_operator);
+ ATTACH_ACTION_Q(push_logical_predicate);
+ ATTACH_ACTION_Q(push_compare_operator);
+ ATTACH_ACTION_Q(push_arithmetic_predicate);
+ ATTACH_ACTION_Q(push_addsub);
+ ATTACH_ACTION_Q(push_addsub_binop);
+ ATTACH_ACTION_Q(push_mulop);
+ ATTACH_ACTION_Q(push_mulldiv_binop);
+ ATTACH_ACTION_Q(push_function_arg);
+ ATTACH_ACTION_Q(push_function_name);
+ ATTACH_ACTION_Q(push_function_expr);
+ ATTACH_ACTION_Q(push_float_number);
+ ATTACH_ACTION_Q(push_string);
+ ATTACH_ACTION_Q(push_variable);
+ ATTACH_ACTION_Q(push_column_pos);
+ ATTACH_ACTION_Q(push_projection);
+ ATTACH_ACTION_Q(push_alias_projection);
+ ATTACH_ACTION_Q(push_debug_1);
+
+ error_description.clear();
+
+ m_s3select_functions.set(&m_s3select_allocator);
+
+ aggr_flow = false;
+ }
+
+ bool is_semantic()//TBD traverse and validate semantics per all nodes
+ {
+ base_statement* cond = m_actionQ.exprQ.back();
+
+ return cond->semantic();
+ }
+
+ std::string get_from_clause()
+ {
+ return m_actionQ.from_clause;
+ }
+
+ void load_schema(std::vector< std::string>& scm)
+ {
+ int i = 0;
+ for (auto c : scm)
+ {
+ m_sca.set_column_pos(c.c_str(), i++);
+ }
+ }
+
+ base_statement* get_filter()
+ {
+ if(m_actionQ.condQ.size()==0)
+ {
+ return NULL;
+ }
+
+ return m_actionQ.condQ.back();
+ }
+
+ std::vector<base_statement*> get_projections_list()
+ {
+ return *m_actionQ.projections.get(); //TODO return COPY(?) or to return evalaution results (list of class value{}) / return reference(?)
+ }
+
+ scratch_area* get_scratch_area()
+ {
+ return &m_sca;
+ }
+
+ projection_alias* get_aliases()
+ {
+ return &m_actionQ.alias_map;
+ }
+
+ bool is_aggregate_query()
+ {
+ return aggr_flow == true;
+ }
+
+ ~s3select() {}
+
+
+ template <typename ScannerT>
+ struct definition
+ {
+ definition(s3select const& )
+ {
+ ///// s3select syntax rules and actions for building AST
+
+ select_expr = bsc::str_p("select") >> projections >> bsc::str_p("from") >> (s3_object)[BOOST_BIND_ACTION(push_from_clause)] >> !where_clause >> ';';
+
+ projections = projection_expression >> *( ',' >> projection_expression) ;
+
+ projection_expression = (arithmetic_expression >> bsc::str_p("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] | (arithmetic_expression)[BOOST_BIND_ACTION(push_projection)] ;
+
+ alias_name = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)] ;
+
+
+ s3_object = bsc::str_p("stdin") | object_path ;
+
+ object_path = "/" >> *( fs_type >> "/") >> fs_type;
+
+ fs_type = bsc::lexeme_d[+( bsc::alnum_p | bsc::str_p(".") | bsc::str_p("_")) ];
+
+ where_clause = bsc::str_p("where") >> condition_expression;
+
+ condition_expression = (arithmetic_predicate >> *(log_op[BOOST_BIND_ACTION(push_logical_operator)] >> arithmetic_predicate[BOOST_BIND_ACTION(push_logical_predicate)]));
+
+ arithmetic_predicate = (factor >> *(arith_cmp[BOOST_BIND_ACTION(push_compare_operator)] >> factor[BOOST_BIND_ACTION(push_arithmetic_predicate)]));
+
+ factor = (arithmetic_expression) | ('(' >> condition_expression >> ')') ;
+
+ arithmetic_expression = (addsub_operand >> *(addsubop_operator[BOOST_BIND_ACTION(push_addsub)] >> addsub_operand[BOOST_BIND_ACTION(push_addsub_binop)] ));
+
+ addsub_operand = (mulldiv_operand >> *(muldiv_operator[BOOST_BIND_ACTION(push_mulop)] >> mulldiv_operand[BOOST_BIND_ACTION(push_mulldiv_binop)] ));// this non-terminal gives precedense to mull/div
+
+ mulldiv_operand = arithmetic_argument | ('(' >> (arithmetic_expression) >> ')') ;
+
+ list_of_function_arguments = (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)] >> *(',' >> (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)]);
+ function = ((variable >> '(' )[BOOST_BIND_ACTION(push_function_name)] >> !list_of_function_arguments >> ')')[BOOST_BIND_ACTION(push_function_expr)];
+
+ arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] | (number)[BOOST_BIND_ACTION(push_number)] | (column_pos)[BOOST_BIND_ACTION(push_column_pos)] |
+ (string)[BOOST_BIND_ACTION(push_string)] |
+ (function)[BOOST_BIND_ACTION(push_debug_1)] | (variable)[BOOST_BIND_ACTION(push_variable)] ;//function is pushed by right-term
+
+
+ number = bsc::int_p;
+
+ float_number = bsc::real_p;
+
+ string = bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"") ;
+
+ column_pos = ('_'>>+(bsc::digit_p) ) | '*' ;
+
+ muldiv_operator = bsc::str_p("*") | bsc::str_p("/") | bsc::str_p("^");// got precedense
+
+ addsubop_operator = bsc::str_p("+") | bsc::str_p("-");
+
+
+ arith_cmp = bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("==") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!=");
+
+ log_op = bsc::str_p("and") | bsc::str_p("or"); //TODO add NOT (unary)
+
+ variable = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)];
+ }
+
+
+ bsc::rule<ScannerT> variable, select_expr, s3_object, where_clause, number, float_number, string, arith_cmp, log_op, condition_expression, arithmetic_predicate, factor;
+ bsc::rule<ScannerT> muldiv_operator, addsubop_operator, function, arithmetic_expression, addsub_operand, list_of_function_arguments, arithmetic_argument, mulldiv_operand;
+ bsc::rule<ScannerT> fs_type, object_path;
+ bsc::rule<ScannerT> projections, projection_expression, alias_name, column_pos;
+ bsc::rule<ScannerT> const& start() const
+ {
+ return select_expr;
+ }
+ };
+};
+
+/////// handling different object types
+class base_s3object
+{
+
+protected:
+ scratch_area* m_sa;
+ std::string m_obj_name;
+
+public:
+ base_s3object(scratch_area* m) : m_sa(m), m_obj_name("") {}
+
+ void set(scratch_area* m)
+ {
+ m_sa = m;
+ m_obj_name = "";
+ }
+
+ virtual ~base_s3object() {}
+};
+
+
+class csv_object : public base_s3object
+{
+
+public:
+ struct csv_defintions
+ {
+ char row_delimiter;
+ char column_delimiter;
+ char escape_char;
+ char quot_char;
+ bool use_header_info;
+ bool ignore_header_info;//skip first line
+
+ csv_defintions():row_delimiter('\n'), column_delimiter(','), escape_char('\\'), quot_char('"'), use_header_info(false), ignore_header_info(false) {}
+
+ } m_csv_defintion;
+
+ csv_object(s3select* s3_query) :
+ base_s3object(s3_query->get_scratch_area()),
+ m_skip_last_line(false),
+ m_s3_select(0),
+ m_error_count(0),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0)
+ {
+ set(s3_query);
+ csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
+ }
+
+ csv_object(s3select* s3_query, struct csv_defintions csv) :
+ base_s3object(s3_query->get_scratch_area()),
+ m_skip_last_line(false),
+ m_s3_select(0),
+ m_error_count(0),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0)
+ {
+ set(s3_query);
+ m_csv_defintion = csv;
+ csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
+ }
+
+ csv_object():
+ base_s3object(0),
+ m_skip_last_line(false),
+ m_s3_select(0),
+ m_error_count(0),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0)
+ {
+ csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
+ }
+
+private:
+ base_statement* m_where_clause;
+ std::vector<base_statement*> m_projections;
+ bool m_aggr_flow = false; //TODO once per query
+ bool m_is_to_aggregate;
+ bool m_skip_last_line;
+ size_t m_stream_length;
+ std::string m_error_description;
+ char* m_stream;
+ char* m_end_stream;
+ std::vector<char*> m_row_tokens{128};
+ s3select* m_s3_select;
+ csvParser csv_parser;
+ size_t m_error_count;
+ bool m_extract_csv_header_info;
+ std::vector<std::string> m_csv_schema{128};
+
+ //handling arbitrary chunks (rows cut in the middle)
+ bool m_previous_line;
+ bool m_skip_first_line;
+ std::string merge_line;
+ std::string m_last_line;
+ size_t m_processed_bytes;
+
+ int getNextRow()
+ {
+ size_t num_of_tokens=0;
+
+ if(m_stream>=m_end_stream)
+ {
+ return -1;
+ }
+
+ if(csv_parser.parse(m_stream, m_end_stream, &m_row_tokens, &num_of_tokens)<0)
+ {
+ throw base_s3select_exception("failed to parse csv stream", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ m_stream = (char*)csv_parser.currentLoc();
+
+ if (m_skip_last_line && m_stream >= m_end_stream)
+ {
+ return -1;
+ }
+
+ return num_of_tokens;
+
+ }
+
+public:
+
+ void set(s3select* s3_query)
+ {
+ m_s3_select = s3_query;
+ base_s3object::set(m_s3_select->get_scratch_area());
+
+ m_projections = m_s3_select->get_projections_list();
+ m_where_clause = m_s3_select->get_filter();
+
+ if (m_where_clause)
+ {
+ m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases());
+ }
+
+ for (auto p : m_projections)
+ {
+ p->traverse_and_apply(m_sa, m_s3_select->get_aliases());
+ }
+
+ m_aggr_flow = m_s3_select->is_aggregate_query();
+ }
+
+
+ std::string get_error_description()
+ {
+ return m_error_description;
+ }
+
+ virtual ~csv_object() {}
+
+public:
+
+
+ int getMatchRow( std::string& result) //TODO virtual ? getResult
+ {
+ int number_of_tokens = 0;
+
+
+ if (m_aggr_flow == true)
+ {
+ do
+ {
+
+ number_of_tokens = getNextRow();
+ if (number_of_tokens < 0) //end of stream
+ {
+ if (m_is_to_aggregate)
+ for (auto i : m_projections)
+ {
+ i->set_last_call();
+ result.append( i->eval().to_string() );
+ result.append(",");
+ }
+
+ return number_of_tokens;
+ }
+
+ if ((*m_projections.begin())->is_set_last_call())
+ {
+ //should validate while query execution , no update upon nodes are marked with set_last_call
+ throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ m_sa->update(m_row_tokens, number_of_tokens);
+ for (auto a : *m_s3_select->get_aliases()->get())
+ {
+ a.second->invalidate_cache_result();
+ }
+
+ if (!m_where_clause || m_where_clause->eval().i64() == true)
+ for (auto i : m_projections)
+ {
+ i->eval();
+ }
+
+ }
+ while (1);
+ }
+ else
+ {
+
+ do
+ {
+
+ number_of_tokens = getNextRow();
+ if (number_of_tokens < 0)
+ {
+ return number_of_tokens;
+ }
+
+ m_sa->update(m_row_tokens, number_of_tokens);
+ for (auto a : *m_s3_select->get_aliases()->get())
+ {
+ a.second->invalidate_cache_result();
+ }
+
+ }
+ while (m_where_clause && m_where_clause->eval().i64() == false);
+
+ for (auto i : m_projections)
+ {
+ result.append( i->eval().to_string() );
+ result.append(",");
+ }
+ result.append("\n");
+ }
+
+ return number_of_tokens; //TODO wrong
+ }
+
+ int extract_csv_header_info()
+ {
+
+ if (m_csv_defintion.ignore_header_info == true)
+ {
+ while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter ))
+ {
+ m_stream++;
+ }
+ m_stream++;
+ }
+ else if(m_csv_defintion.use_header_info == true)
+ {
+ size_t num_of_tokens = getNextRow();//TODO validate number of tokens
+
+ for(size_t i=0; i<num_of_tokens; i++)
+ {
+ m_csv_schema[i].assign(m_row_tokens[i]);
+ }
+
+ m_s3_select->load_schema(m_csv_schema);
+ }
+
+ m_extract_csv_header_info = true;
+
+ return 0;
+ }
+
+ int run_s3select_on_stream(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size)
+ {
+ //purpose: the cv data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores
+ //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed.
+ int status;
+ std::string tmp_buff;
+ u_int32_t skip_last_bytes = 0;
+ m_processed_bytes += stream_length;
+
+ m_skip_first_line = false;
+
+ if (m_previous_line)
+ {
+ //if previous broken line exist , merge it to current chunk
+ char* p_obj_chunk = (char*)csv_stream;
+ while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk<(csv_stream+stream_length))
+ {
+ p_obj_chunk++;
+ }
+
+ tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
+ merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
+ m_previous_line = false;
+ m_skip_first_line = true;
+
+ status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
+ }
+
+ if (csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter)
+ {
+ //in case of "broken" last line
+ char* p_obj_chunk = (char*)&(csv_stream[stream_length - 1]);
+ while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk>csv_stream)
+ {
+ p_obj_chunk--; //scan until end-of previous line in chunk
+ }
+
+ skip_last_bytes = (&(csv_stream[stream_length - 1]) - p_obj_chunk);
+ m_last_line.assign(p_obj_chunk + 1, p_obj_chunk + 1 + skip_last_bytes); //save it for next chunk
+
+ m_previous_line = true;//it means to skip last line
+
+ }
+
+ status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
+
+ return status;
+ }
+
+ int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate)
+ {
+
+
+ m_stream = (char*)csv_stream;
+ m_end_stream = (char*)csv_stream + stream_length;
+ m_is_to_aggregate = do_aggregate;
+ m_skip_last_line = skip_last_line;
+
+ m_stream_length = stream_length;
+
+ if(m_extract_csv_header_info == false)
+ {
+ extract_csv_header_info();
+ }
+
+ if(skip_first_line)
+ {
+ while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter ))
+ {
+ m_stream++;
+ }
+ m_stream++;//TODO nicer
+ }
+
+ do
+ {
+
+ int num = 0;
+ try
+ {
+ num = getMatchRow(result);
+ }
+ catch (base_s3select_exception& e)
+ {
+ std::cout << e.what() << std::endl;
+ m_error_description = e.what();
+ m_error_count ++;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100)//abort query execution
+ {
+ return -1;
+ }
+ }
+
+ if (num < 0)
+ {
+ break;
+ }
+
+ }
+ while (1);
+
+ return 0;
+ }
+};
+
+};//namespace
+
+#endif
diff --git a/src/s3select/include/s3select_csv_parser.h b/src/s3select/include/s3select_csv_parser.h
new file mode 100644
index 000000000..5527da913
--- /dev/null
+++ b/src/s3select/include/s3select_csv_parser.h
@@ -0,0 +1,407 @@
+#include <iostream>
+
+#include <boost/mpl/vector/vector30.hpp>
+// back-end
+#include <boost/msm/back/state_machine.hpp>
+//front-end
+#include <boost/msm/front/state_machine_def.hpp>
+
+#include <vector>
+
+namespace msm = boost::msm;
+namespace mpl = boost::mpl;
+
+namespace s3selectEngine
+{
+// events
+struct event_column_sep {};
+struct event_eol {};
+struct event_end_of_stream {};
+struct event_not_column_sep {};//i.e any char
+struct event_quote {};
+struct event_escape {};
+struct event_empty {};
+
+
+// front-end: define the FSM structure
+struct csvStateMch_ : public msm::front::state_machine_def<csvStateMch_>
+{
+ char* input_stream;
+ std::vector<char*>* tokens;
+ std::vector<int> has_esc{128};
+ size_t token_idx;
+ size_t escape_idx;
+ char* input_cur_location;
+ char* start_token;
+ bool end_of_parse;
+
+ typedef csvStateMch_ csv_rules;
+
+ csvStateMch_():end_of_parse(false) {}
+
+ void set(const char* input, std::vector<char*>* tk)
+ {
+ input_cur_location = input_stream = const_cast<char*>(input);
+ token_idx = 0;
+ tokens = tk;
+ escape_idx = 0;
+ }
+
+ char get_char()
+ {
+ return *input_cur_location;
+ }
+
+ char get_next_char(const char * end_stream)
+ {
+ if (input_cur_location >= end_stream)
+ return 0;
+
+ input_cur_location++;
+ return *input_cur_location;
+ }
+
+ const char* currentLoc()
+ {
+ return input_cur_location;
+ }
+
+ void parse_escape(char* in, char esc_char='\\')
+ {
+ //assumption atleast one escape and single
+ char* dst, *src;
+
+ dst = src = in;
+
+ while (1)
+ {
+ while (*src && *src != esc_char)
+ {
+ src++; //search for escape
+ }
+
+ if (!*src) //reach end
+ {
+ char* p = src;
+ while (dst < src)
+ {
+ *dst++ = *p++; //full copy
+ }
+ return;
+ }
+ //found escape
+ dst = src; //override escape
+ //if(*(dst+1)=='n') {*dst=10;dst++;} //enables special character
+
+ while (*dst)
+ {
+ *dst = *(dst + 1);
+ dst++;
+ } //copy with shift
+ }
+ }
+
+ // The list of FSM states
+ struct Start_new_token_st : public msm::front::state<>
+ {};//0
+
+ struct In_new_token_st : public msm::front::state<>
+ {};//1
+
+ struct In_quote_st : public msm::front::state<>
+ {};//2
+
+ struct In_esc_in_token_st : public msm::front::state<>
+ {};//3
+
+ struct In_esc_quote_st : public msm::front::state<>
+ {};//4
+
+ struct In_esc_start_token_st : public msm::front::state<>
+ {};//5
+
+ struct End_of_line_st : public msm::front::state<>
+ {};//6
+
+ struct Empty_state : public msm::front::state<>
+ {};//7
+
+
+ // the initial state of the csvStateMch SM. Must be defined
+ typedef Start_new_token_st initial_state;
+
+ void start_new_token()//helper
+ {
+ start_token = input_cur_location;
+ (*tokens)[ token_idx ] = start_token;
+ token_idx++;
+ }
+
+ // transition actions
+ void start_new_token(event_column_sep const&)
+ {
+ *input_cur_location = 0;//remove column-delimiter
+ start_new_token();
+ }
+
+ void start_new_token(event_not_column_sep const&)
+ {
+ start_new_token();
+ }
+
+ //need to handle empty lines(no tokens);
+ void start_new_token(event_eol const&)
+ {
+ if(!token_idx)
+ {
+ return;
+ }
+ (*tokens)[ token_idx ] = start_token;
+ token_idx++;
+ }
+
+ void start_new_token(event_end_of_stream const&) {}
+
+ void in_new_token(event_not_column_sep const&)
+ {
+ if(!*start_token)
+ {
+ start_token = input_cur_location;
+ }
+ }
+
+ void in_new_token(event_eol const&)
+ {
+ *input_cur_location=0;
+ }
+
+ void in_new_token(event_end_of_stream const&) {}
+
+ void in_new_token(event_column_sep const&)
+ {
+ (*tokens)[ token_idx ] = input_cur_location+1;
+ *input_cur_location=0;
+ token_idx++;
+ }
+
+ void in_new_token(event_quote const&)
+ {
+ if(!*start_token)
+ {
+ start_token = input_cur_location;
+ }
+ }
+
+ void in_quote(event_quote const&) {}
+
+ void in_quote(event_column_sep const&) {}
+
+ void in_quote(event_not_column_sep const&) {}
+
+ void in_quote(event_eol const&)
+ {
+ *input_cur_location=0;
+ }
+
+ void in_quote(event_end_of_stream const&)
+ {
+ *input_cur_location=0;
+ }
+
+ void start_new_token(event_quote const&)
+ {
+ start_new_token();
+ }
+
+ void push_escape_pos()
+ {
+ if(escape_idx && has_esc[ escape_idx -1]== (int)(token_idx-1))
+ {
+ return;
+ }
+ has_esc[ escape_idx ] = token_idx-1;
+ escape_idx++;
+ }
+ void in_escape(event_escape const&)
+ {
+ push_escape_pos();
+ }
+ void in_escape_start_new_token(event_escape const&)
+ {
+ start_new_token();
+ push_escape_pos();
+ }
+
+ void in_escape(event_column_sep const&) {}
+ void in_escape(event_not_column_sep const&) {}
+ void in_escape(event_quote const&) {}
+ void in_escape(event_eol const&) {}
+ void in_escape(event_end_of_stream const&) {}
+
+ void empty_action(event_empty const&) {}
+
+ //TODO need a guard for tokens vector size (<MAX)
+ // Transition table for csvStateMch
+ struct transition_table : mpl::vector30<
+ // Start Event Next Action Guard
+ // +---------+-------------+---------+---------------------+----------------------+
+ a_row < Start_new_token_st, event_column_sep , Start_new_token_st , &csv_rules::start_new_token >,
+ a_row < Start_new_token_st, event_not_column_sep , In_new_token_st , &csv_rules::start_new_token >,
+ a_row < Start_new_token_st, event_eol , End_of_line_st , &csv_rules::start_new_token >,
+ a_row < Start_new_token_st, event_end_of_stream , End_of_line_st , &csv_rules::start_new_token >,
+ a_row < In_new_token_st , event_not_column_sep, In_new_token_st, &csv_rules::in_new_token >,
+ a_row < In_new_token_st , event_column_sep , In_new_token_st, &csv_rules::in_new_token >,
+ a_row < In_new_token_st , event_eol , End_of_line_st, &csv_rules::in_new_token >,
+ a_row < In_new_token_st , event_end_of_stream , End_of_line_st, &csv_rules::in_new_token >,
+
+ a_row < Start_new_token_st , event_quote , In_quote_st, &csv_rules::start_new_token >, //open quote
+ a_row < In_new_token_st , event_quote , In_quote_st, &csv_rules::in_quote >, //open quote
+ a_row < In_quote_st , event_quote , In_new_token_st, &csv_rules::in_quote >, //close quote
+ a_row < In_quote_st , event_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote
+ a_row < In_quote_st , event_not_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote
+ a_row < In_quote_st , event_eol , End_of_line_st, &csv_rules::in_quote >, //end of quote/line
+ a_row < In_quote_st , event_end_of_stream , End_of_line_st, &csv_rules::in_quote >, //end of quote/line
+
+
+ //TODO add transitions for escape just before eol , eos.
+ a_row < Start_new_token_st , event_escape , In_esc_start_token_st, &csv_rules::in_escape_start_new_token >,
+ a_row < In_esc_start_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >, //escape column-sep
+ a_row < In_esc_start_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_start_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_start_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >,
+
+ a_row < In_new_token_st, event_escape, In_esc_in_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >,
+
+ a_row < In_quote_st, event_escape, In_esc_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_column_sep, In_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_not_column_sep, In_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_escape, In_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_quote, In_quote_st, &csv_rules::in_escape >
+
+ // +---------+-------------+---------+---------------------+----------------------+
+ > {};
+
+ // Replaces the default no-transition response.
+ template <class FSM, class Event>
+ void no_transition(Event const& e, FSM&, int state)
+ {
+ std::cout << "no transition from state " << state
+ << " on event " << typeid(e).name() << std::endl;
+ }
+}; //// end-of-state-machine
+
+
+
+// Pick a back-end
+typedef msm::back::state_machine<csvStateMch_> csvStateMch;
+
+//
+// Testing utilities.
+//
+
+static char const* const state_names[] = {"Start_new_token_st", "In_new_token_st", "In_quote_st", "In_esc_in_token_st",
+ "In_esc_quote_st", "In_esc_start_token_st", "End_of_line_st", "Empty_state"
+ };
+void pstate(csvStateMch const& p)//debug
+{
+ std::cout << " -> " << state_names[p.current_state()[0]] << std::endl;
+}
+
+
+class csvParser
+{
+
+ csvStateMch p;
+
+ char m_row_delimeter;
+ char m_column_delimiter;
+ char m_quote_char;
+ char m_escape_char;
+
+public:
+
+ csvParser(char rd='\n', char cd=',', char quot='"', char ec='\\'):m_row_delimeter(rd), m_column_delimiter(cd), m_quote_char(quot), m_escape_char(ec) {};
+
+ void set(char row_delimiter, char column_delimiter, char quot_char, char escape_char)
+ {
+ m_row_delimeter = row_delimiter;
+ m_column_delimiter = column_delimiter;
+ m_quote_char = quot_char;
+ m_escape_char = escape_char;
+ }
+
+ int parse(char* input_stream, char* end_stream, std::vector<char*>* tk, size_t* num_of_tokens)
+ {
+ p.set(input_stream, tk);
+
+ // needed to start the highest-level SM. This will call on_entry and mark the start of the SM
+ p.start();
+
+ //TODO for better performance to use template specialization (\n \ , ")
+ do
+ {
+ if (p.get_char() == m_row_delimeter)
+ {
+ p.process_event(event_eol());
+ }
+ else if (p.get_char() == m_column_delimiter)
+ {
+ p.process_event(event_column_sep());
+ }
+ else if (p.get_char() == 0)
+ {
+ p.process_event(event_end_of_stream());
+ }
+ else if (p.get_char() == m_quote_char)
+ {
+ p.process_event(event_quote());
+ }
+ else if (p.get_char() == m_escape_char)
+ {
+ p.process_event(event_escape());
+ }
+ else
+ {
+ p.process_event(event_not_column_sep());
+ }
+
+ if (p.tokens->capacity() <= p.token_idx)
+ {
+ return -1;
+ }
+
+ if (p.currentLoc() >= end_stream)
+ {
+ break;
+ }
+ p.get_next_char(end_stream);
+ }
+ while (p.current_state()[0] != 6);
+
+ p.stop();
+
+ *num_of_tokens = p.token_idx;
+
+ //second pass for escape rules; only token with escape are processed, if any.
+ for(size_t i=0; i<p.escape_idx; i++)
+ {
+ p.parse_escape((*tk)[p.has_esc[i]], m_escape_char);
+ }
+
+ return 0;
+ }
+
+ const char* currentLoc()
+ {
+ return p.currentLoc();
+ }
+
+};//end csv-parser
+
+}//end-namespace
+
+
diff --git a/src/s3select/include/s3select_functions.h b/src/s3select/include/s3select_functions.h
new file mode 100644
index 000000000..77b3db3a7
--- /dev/null
+++ b/src/s3select/include/s3select_functions.h
@@ -0,0 +1,1037 @@
+#ifndef __S3SELECT_FUNCTIONS__
+#define __S3SELECT_FUNCTIONS__
+
+
+#include "s3select_oper.h"
+#define BOOST_BIND_ACTION_PARAM( push_name ,param ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2, param)
+namespace s3selectEngine
+{
+
+struct push_2dig
+{
+ void operator()(const char* a, const char* b, uint32_t* n) const
+ {
+ *n = ((char)(*a) - 48) *10 + ((char)*(a+1)-48) ;
+ }
+
+};
+static push_2dig g_push_2dig;
+
+struct push_4dig
+{
+ void operator()(const char* a, const char* b, uint32_t* n) const
+ {
+ *n = ((char)(*a) - 48) *1000 + ((char)*(a+1)-48)*100 + ((char)*(a+2)-48)*10 + ((char)*(a+3)-48);
+ }
+
+};
+static push_4dig g_push_4dig;
+
+enum class s3select_func_En_t {ADD,
+ SUM,
+ MIN,
+ MAX,
+ COUNT,
+ TO_INT,
+ TO_FLOAT,
+ TO_TIMESTAMP,
+ SUBSTR,
+ EXTRACT,
+ DATE_ADD,
+ DATE_DIFF,
+ UTCNOW
+ };
+
+
+class s3select_functions : public __clt_allocator
+{
+
+private:
+
+ using FunctionLibrary = std::map<std::string, s3select_func_En_t>;
+ const FunctionLibrary m_functions_library =
+ {
+ {"add", s3select_func_En_t::ADD},
+ {"sum", s3select_func_En_t::SUM},
+ {"count", s3select_func_En_t::COUNT},
+ {"min", s3select_func_En_t::MIN},
+ {"max", s3select_func_En_t::MAX},
+ {"int", s3select_func_En_t::TO_INT},
+ {"float", s3select_func_En_t::TO_FLOAT},
+ {"substr", s3select_func_En_t::SUBSTR},
+ {"timestamp", s3select_func_En_t::TO_TIMESTAMP},
+ {"extract", s3select_func_En_t::EXTRACT},
+ {"dateadd", s3select_func_En_t::DATE_ADD},
+ {"datediff", s3select_func_En_t::DATE_DIFF},
+ {"utcnow", s3select_func_En_t::UTCNOW}
+ };
+
+public:
+
+ base_function* create(std::string fn_name);
+};
+
+class __function : public base_statement
+{
+
+private:
+ std::vector<base_statement*> arguments;
+ std::string name;
+ base_function* m_func_impl;
+ s3select_functions* m_s3select_functions;
+ variable m_result;
+
+ void _resolve_name()
+ {
+ if (m_func_impl)
+ {
+ return;
+ }
+
+ base_function* f = m_s3select_functions->create(name);
+ if (!f)
+ {
+ throw base_s3select_exception("function not found", base_s3select_exception::s3select_exp_en_t::FATAL); //should abort query
+ }
+ m_func_impl = f;
+ }
+
+public:
+ virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa)
+ {
+ m_scratch = sa;
+ m_aliases = pa;
+ for (base_statement* ba : arguments)
+ {
+ ba->traverse_and_apply(sa, pa);
+ }
+ }
+
+ virtual bool is_aggregate() // TODO under semantic flow
+ {
+ _resolve_name();
+
+ return m_func_impl->is_aggregate();
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ __function(const char* fname, s3select_functions* s3f) : name(fname), m_func_impl(0), m_s3select_functions(s3f) {}
+
+ virtual value& eval()
+ {
+
+ _resolve_name();
+
+ if (is_last_call == false)
+ {
+ (*m_func_impl)(&arguments, &m_result);
+ }
+ else
+ {
+ (*m_func_impl).get_aggregate_result(&m_result);
+ }
+
+ return m_result.get_value();
+ }
+
+
+
+ virtual std::string print(int ident)
+ {
+ return std::string(0);
+ }
+
+ void push_argument(base_statement* arg)
+ {
+ arguments.push_back(arg);
+ }
+
+
+ std::vector<base_statement*> get_arguments()
+ {
+ return arguments;
+ }
+
+ virtual ~__function()
+ {
+ arguments.clear();
+ }
+};
+
+
+
+/*
+ s3-select function defintions
+*/
+struct _fn_add : public base_function
+{
+
+ value var_result;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+ iter++;
+ base_statement* y = *iter;
+
+ var_result = x->eval() + y->eval();
+
+ *result = var_result;
+
+ return true;
+ }
+};
+
+struct _fn_sum : public base_function
+{
+
+ value sum;
+
+ _fn_sum() : sum(0)
+ {
+ aggregate = true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+
+ try
+ {
+ sum = sum + x->eval();
+ }
+ catch (base_s3select_exception& e)
+ {
+ std::cout << "illegal value for aggregation(sum). skipping." << std::endl;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL)
+ {
+ throw;
+ }
+ }
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ *result = sum ;
+ }
+};
+
+struct _fn_count : public base_function
+{
+
+ int64_t count;
+
+ _fn_count():count(0)
+ {
+ aggregate=true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ count += 1;
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ result->set_value(count);
+ }
+
+};
+
+struct _fn_min : public base_function
+{
+
+ value min;
+
+ _fn_min():min(__INT64_MAX__)
+ {
+ aggregate=true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+
+ if(min > x->eval())
+ {
+ min=x->eval();
+ }
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ *result = min;
+ }
+
+};
+
+struct _fn_max : public base_function
+{
+
+ value max;
+
+ _fn_max():max(-__INT64_MAX__)
+ {
+ aggregate=true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+
+ if(max < x->eval())
+ {
+ max=x->eval();
+ }
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ *result = max;
+ }
+
+};
+
+struct _fn_to_int : public base_function
+{
+
+ value var_result;
+ value func_arg;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ char* perr;
+ int64_t i=0;
+ func_arg = (*args->begin())->eval();
+
+ if (func_arg.type == value::value_En_t::STRING)
+ {
+ i = strtol(func_arg.str(), &perr, 10) ; //TODO check error before constructor
+ }
+ else if (func_arg.type == value::value_En_t::FLOAT)
+ {
+ i = func_arg.dbl();
+ }
+ else
+ {
+ i = func_arg.i64();
+ }
+
+ var_result = i ;
+ *result = var_result;
+
+ return true;
+ }
+
+};
+
+struct _fn_to_float : public base_function
+{
+
+ value var_result;
+ value v_from;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ char* perr;
+ double d=0;
+ value v = (*args->begin())->eval();
+
+ if (v.type == value::value_En_t::STRING)
+ {
+ d = strtod(v.str(), &perr) ; //TODO check error before constructor
+ }
+ else if (v.type == value::value_En_t::FLOAT)
+ {
+ d = v.dbl();
+ }
+ else
+ {
+ d = v.i64();
+ }
+
+ var_result = d;
+ *result = var_result;
+
+ return true;
+ }
+
+};
+
+struct _fn_to_timestamp : public base_function
+{
+ bsc::rule<> separator = bsc::ch_p(":") | bsc::ch_p("-");
+
+ uint32_t yr = 1700, mo = 1, dy = 1;
+ bsc::rule<> dig4 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p >> bsc::digit_p >> bsc::digit_p];
+ bsc::rule<> dig2 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p];
+
+ bsc::rule<> d_yyyymmdd_dig = ((dig4[BOOST_BIND_ACTION_PARAM(push_4dig, &yr)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mo)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &dy)]) >> *(separator));
+
+ uint32_t hr = 0, mn = 0, sc = 0;
+ bsc::rule<> d_time_dig = ((dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &hr)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mn)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &sc)]) >> *(separator));
+
+ boost::posix_time::ptime new_ptime;
+
+ value v_str;
+
+
+ bool datetime_validation()
+ {
+ //TODO temporary , should check for leap year
+
+ if(yr<1700 || yr>2050)
+ {
+ return false;
+ }
+ if (mo<1 || mo>12)
+ {
+ return false;
+ }
+ if (dy<1 || dy>31)
+ {
+ return false;
+ }
+ if (hr>23)
+ {
+ return false;
+ }
+ if (dy>59)
+ {
+ return false;
+ }
+ if (sc>59)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+
+ hr = 0;
+ mn = 0;
+ sc = 0;
+
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size != 1)
+ {
+ throw base_s3select_exception("to_timestamp should have one parameter");
+ }
+
+ base_statement* str = *iter;
+
+ v_str = str->eval();
+
+ if (v_str.type != value::value_En_t::STRING)
+ {
+ throw base_s3select_exception("to_timestamp first argument must be string"); //can skip current row
+ }
+
+ bsc::parse_info<> info_dig = bsc::parse(v_str.str(), d_yyyymmdd_dig >> *(separator) >> d_time_dig);
+
+ if(datetime_validation()==false or !info_dig.full)
+ {
+ throw base_s3select_exception("input date-time is illegal");
+ }
+
+ new_ptime = boost::posix_time::ptime(boost::gregorian::date(yr, mo, dy),
+ boost::posix_time::hours(hr) + boost::posix_time::minutes(mn) + boost::posix_time::seconds(sc));
+
+ result->set_value(&new_ptime);
+
+ return true;
+ }
+
+};
+
+struct _fn_extact_from_timestamp : public base_function
+{
+
+ boost::posix_time::ptime new_ptime;
+
+ value val_date_part;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size < 2)
+ {
+ throw base_s3select_exception("to_timestamp should have 2 parameters");
+ }
+
+ base_statement* date_part = *iter;
+
+ val_date_part = date_part->eval();//TODO could be done once?
+
+ if(val_date_part.is_string()== false)
+ {
+ throw base_s3select_exception("first parameter should be string");
+ }
+
+ iter++;
+
+ base_statement* ts = *iter;
+
+ if(ts->eval().is_timestamp()== false)
+ {
+ throw base_s3select_exception("second parameter is not timestamp");
+ }
+
+ new_ptime = *ts->eval().timestamp();
+
+ if( strcmp(val_date_part.str(), "year")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().year() );
+ }
+ else if( strcmp(val_date_part.str(), "month")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().month() );
+ }
+ else if( strcmp(val_date_part.str(), "day")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().day_of_year() );
+ }
+ else if( strcmp(val_date_part.str(), "week")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().week_number() );
+ }
+ else
+ {
+ throw base_s3select_exception(std::string( val_date_part.str() + std::string(" is not supported ") ).c_str() );
+ }
+
+ return true;
+ }
+
+};
+
+struct _fn_diff_timestamp : public base_function
+{
+
+ value val_date_part;
+ value val_dt1;
+ value val_dt2;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size < 3)
+ {
+ throw base_s3select_exception("datediff need 3 parameters");
+ }
+
+ base_statement* date_part = *iter;
+
+ val_date_part = date_part->eval();
+
+ iter++;
+ base_statement* dt1_param = *iter;
+ val_dt1 = dt1_param->eval();
+ if (val_dt1.is_timestamp() == false)
+ {
+ throw base_s3select_exception("second parameter should be timestamp");
+ }
+
+ iter++;
+ base_statement* dt2_param = *iter;
+ val_dt2 = dt2_param->eval();
+ if (val_dt2.is_timestamp() == false)
+ {
+ throw base_s3select_exception("third parameter should be timestamp");
+ }
+
+ if (strcmp(val_date_part.str(), "year") == 0)
+ {
+ int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ;
+ result->set_value( yr );
+ }
+ else if (strcmp(val_date_part.str(), "month") == 0)
+ {
+ int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ;
+ int64_t mon = val_dt2.timestamp()->date().month() - val_dt1.timestamp()->date().month() ;
+
+ result->set_value( yr*12 + mon );
+ }
+ else if (strcmp(val_date_part.str(), "day") == 0)
+ {
+ boost::gregorian::date_period dp =
+ boost::gregorian::date_period( val_dt1.timestamp()->date(), val_dt2.timestamp()->date());
+ result->set_value( dp.length().days() );
+ }
+ else if (strcmp(val_date_part.str(), "hours") == 0)
+ {
+ boost::posix_time::time_duration td_res = (*val_dt2.timestamp() - *val_dt1.timestamp());
+ result->set_value( td_res.hours());
+ }
+ else
+ {
+ throw base_s3select_exception("first parameter should be string: year,month,hours,day");
+ }
+
+
+ return true;
+ }
+};
+
+struct _fn_add_to_timestamp : public base_function
+{
+
+ boost::posix_time::ptime new_ptime;
+
+ value val_date_part;
+ value val_quantity;
+ value val_timestamp;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size < 3)
+ {
+ throw base_s3select_exception("add_to_timestamp should have 3 parameters");
+ }
+
+ base_statement* date_part = *iter;
+ val_date_part = date_part->eval();//TODO could be done once?
+
+ if(val_date_part.is_string()== false)
+ {
+ throw base_s3select_exception("first parameter should be string");
+ }
+
+ iter++;
+ base_statement* quan = *iter;
+ val_quantity = quan->eval();
+
+ if (val_quantity.is_number() == false)
+ {
+ throw base_s3select_exception("second parameter should be number"); //TODO what about double?
+ }
+
+ iter++;
+ base_statement* ts = *iter;
+ val_timestamp = ts->eval();
+
+ if(val_timestamp.is_timestamp() == false)
+ {
+ throw base_s3select_exception("third parameter should be time-stamp");
+ }
+
+ new_ptime = *val_timestamp.timestamp();
+
+ if( strcmp(val_date_part.str(), "year")==0 )
+ {
+ new_ptime += boost::gregorian::years( val_quantity.i64() );
+ result->set_value( &new_ptime );
+ }
+ else if( strcmp(val_date_part.str(), "month")==0 )
+ {
+ new_ptime += boost::gregorian::months( val_quantity.i64() );
+ result->set_value( &new_ptime );
+ }
+ else if( strcmp(val_date_part.str(), "day")==0 )
+ {
+ new_ptime += boost::gregorian::days( val_quantity.i64() );
+ result->set_value( &new_ptime );
+ }
+ else
+ {
+ throw base_s3select_exception( std::string(val_date_part.str() + std::string(" is not supported for add")).c_str());
+ }
+
+ return true;
+ }
+
+};
+
+struct _fn_utcnow : public base_function
+{
+
+ boost::posix_time::ptime now_ptime;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ int args_size = args->size();
+
+ if (args_size != 0)
+ {
+ throw base_s3select_exception("utcnow does not expect any parameters");
+ }
+
+ now_ptime = boost::posix_time::ptime( boost::posix_time::second_clock::universal_time());
+ result->set_value( &now_ptime );
+
+ return true;
+ }
+};
+
+struct _fn_substr : public base_function
+{
+
+ char buff[4096];// this buffer is persist for the query life time, it use for the results per row(only for the specific function call)
+ //it prevent from intensive use of malloc/free (fragmentation).
+ //should validate result length.
+ //TODO may replace by std::string (dynamic) , or to replace with global allocator , in query scope.
+ value v_str;
+ value v_from;
+ value v_to;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+
+ if (args_size<2)
+ {
+ throw base_s3select_exception("substr accept 2 arguments or 3");
+ }
+
+ base_statement* str = *iter;
+ iter++;
+ base_statement* from = *iter;
+ base_statement* to;
+
+ if (args_size == 3)
+ {
+ iter++;
+ to = *iter;
+ }
+
+ v_str = str->eval();
+
+ if(v_str.type != value::value_En_t::STRING)
+ {
+ throw base_s3select_exception("substr first argument must be string"); //can skip current row
+ }
+
+ int str_length = strlen(v_str.str());
+
+ v_from = from->eval();
+ if(v_from.is_string())
+ {
+ throw base_s3select_exception("substr second argument must be number"); //can skip current row
+ }
+
+ int64_t f;
+ int64_t t;
+
+ if (args_size==3)
+ {
+ v_to = to->eval();
+ if (v_to.is_string())
+ {
+ throw base_s3select_exception("substr third argument must be number"); //can skip row
+ }
+ }
+
+ if (v_from.type == value::value_En_t::FLOAT)
+ {
+ f=v_from.dbl();
+ }
+ else
+ {
+ f=v_from.i64();
+ }
+
+ if (f>str_length)
+ {
+ throw base_s3select_exception("substr start position is too far"); //can skip row
+ }
+
+ if (str_length>(int)sizeof(buff))
+ {
+ throw base_s3select_exception("string too long for internal buffer"); //can skip row
+ }
+
+ if (args_size == 3)
+ {
+ if (v_from.type == value::value_En_t::FLOAT)
+ {
+ t = v_to.dbl();
+ }
+ else
+ {
+ t = v_to.i64();
+ }
+
+ if( (str_length-(f-1)-t) <0)
+ {
+ throw base_s3select_exception("substr length parameter beyond bounderies"); //can skip row
+ }
+
+ strncpy(buff, v_str.str()+f-1, t);
+ }
+ else
+ {
+ strcpy(buff, v_str.str()+f-1);
+ }
+
+ result->set_value(buff);
+
+ return true;
+ }
+
+};
+
+base_function* s3select_functions::create(std::string fn_name)
+{
+ const FunctionLibrary::const_iterator iter = m_functions_library.find(fn_name);
+
+ if (iter == m_functions_library.end())
+ {
+ std::string msg;
+ msg = fn_name + " " + " function not found";
+ throw base_s3select_exception(msg, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ switch (iter->second)
+ {
+ case s3select_func_En_t::ADD:
+ return S3SELECT_NEW(_fn_add);
+ break;
+
+ case s3select_func_En_t::SUM:
+ return S3SELECT_NEW(_fn_sum);
+ break;
+
+ case s3select_func_En_t::COUNT:
+ return S3SELECT_NEW(_fn_count);
+ break;
+
+ case s3select_func_En_t::MIN:
+ return S3SELECT_NEW(_fn_min);
+ break;
+
+ case s3select_func_En_t::MAX:
+ return S3SELECT_NEW(_fn_max);
+ break;
+
+ case s3select_func_En_t::TO_INT:
+ return S3SELECT_NEW(_fn_to_int);
+ break;
+
+ case s3select_func_En_t::TO_FLOAT:
+ return S3SELECT_NEW(_fn_to_float);
+ break;
+
+ case s3select_func_En_t::SUBSTR:
+ return S3SELECT_NEW(_fn_substr);
+ break;
+
+ case s3select_func_En_t::TO_TIMESTAMP:
+ return S3SELECT_NEW(_fn_to_timestamp);
+ break;
+
+ case s3select_func_En_t::EXTRACT:
+ return S3SELECT_NEW(_fn_extact_from_timestamp);
+ break;
+
+ case s3select_func_En_t::DATE_ADD:
+ return S3SELECT_NEW(_fn_add_to_timestamp);
+ break;
+
+ case s3select_func_En_t::DATE_DIFF:
+ return S3SELECT_NEW(_fn_diff_timestamp);
+ break;
+
+ case s3select_func_En_t::UTCNOW:
+ return S3SELECT_NEW(_fn_utcnow);
+ break;
+
+ default:
+ throw base_s3select_exception("internal error while resolving function-name");
+ break;
+ }
+}
+
+bool base_statement::is_function()
+{
+ if (dynamic_cast<__function*>(this))
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+bool base_statement::is_aggregate_exist_in_expression(base_statement* e) //TODO obsolete ?
+{
+ if (e->is_aggregate())
+ {
+ return true;
+ }
+
+ if (e->left() && e->left()->is_aggregate_exist_in_expression(e->left()))
+ {
+ return true;
+ }
+
+ if (e->right() && e->right()->is_aggregate_exist_in_expression(e->right()))
+ {
+ return true;
+ }
+
+ if (e->is_function())
+ {
+ for (auto i : dynamic_cast<__function*>(e)->get_arguments())
+ if (e->is_aggregate_exist_in_expression(i))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+base_statement* base_statement::get_aggregate()
+{
+ //search for aggregation function in AST
+ base_statement* res = 0;
+
+ if (is_aggregate())
+ {
+ return this;
+ }
+
+ if (left() && (res=left()->get_aggregate())!=0)
+ {
+ return res;
+ }
+
+ if (right() && (res=right()->get_aggregate())!=0)
+ {
+ return res;
+ }
+
+ if (is_function())
+ {
+ for (auto i : dynamic_cast<__function*>(this)->get_arguments())
+ {
+ base_statement* b=i->get_aggregate();
+ if (b)
+ {
+ return b;
+ }
+ }
+ }
+ return 0;
+}
+
+bool base_statement::is_nested_aggregate(base_statement* e)
+{
+ //validate for non nested calls for aggregation function, i.e. sum ( min ( ))
+ if (e->is_aggregate())
+ {
+ if (e->left())
+ {
+ if (e->left()->is_aggregate_exist_in_expression(e->left()))
+ {
+ return true;
+ }
+ }
+ else if (e->right())
+ {
+ if (e->right()->is_aggregate_exist_in_expression(e->right()))
+ {
+ return true;
+ }
+ }
+ else if (e->is_function())
+ {
+ for (auto i : dynamic_cast<__function*>(e)->get_arguments())
+ {
+ if (i->is_aggregate_exist_in_expression(i))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ return false;
+}
+
+// select sum(c2) ... + c1 ... is not allowed. a binary operation with scalar is OK. i.e. select sum() + 1
+bool base_statement::is_binop_aggregate_and_column(base_statement* skip_expression)
+{
+ if (left() && left() != skip_expression) //can traverse to left
+ {
+ if (left()->is_column())
+ {
+ return true;
+ }
+ else if (left()->is_binop_aggregate_and_column(skip_expression) == true)
+ {
+ return true;
+ }
+ }
+
+ if (right() && right() != skip_expression) //can traverse right
+ {
+ if (right()->is_column())
+ {
+ return true;
+ }
+ else if (right()->is_binop_aggregate_and_column(skip_expression) == true)
+ {
+ return true;
+ }
+ }
+
+ if (this != skip_expression && is_function())
+ {
+
+ __function* f = (dynamic_cast<__function*>(this));
+ std::vector<base_statement*> l = f->get_arguments();
+ for (auto i : l)
+ {
+ if (i!=skip_expression && i->is_column())
+ {
+ return true;
+ }
+ if (i->is_binop_aggregate_and_column(skip_expression) == true)
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+} //namespace s3selectEngine
+
+#endif
diff --git a/src/s3select/include/s3select_oper.h b/src/s3select/include/s3select_oper.h
new file mode 100644
index 000000000..591e5f9da
--- /dev/null
+++ b/src/s3select/include/s3select_oper.h
@@ -0,0 +1,1320 @@
+#ifndef __S3SELECT_OPER__
+#define __S3SELECT_OPER__
+
+#include <string>
+#include <iostream>
+#include <list>
+#include <map>
+#include <vector>
+#include <string.h>
+#include <math.h>
+
+#include <boost/lexical_cast.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/bind.hpp>
+namespace bsc = BOOST_SPIRIT_CLASSIC_NS;
+
+namespace s3selectEngine
+{
+
+class base_s3select_exception
+{
+
+public:
+ enum class s3select_exp_en_t
+ {
+ NONE,
+ ERROR,
+ FATAL
+ } ;
+
+private:
+ s3select_exp_en_t m_severity;
+
+public:
+ std::string _msg;
+ base_s3select_exception(const char* n) : m_severity(s3select_exp_en_t::NONE)
+ {
+ _msg.assign(n);
+ }
+ base_s3select_exception(const char* n, s3select_exp_en_t severity) : m_severity(severity)
+ {
+ _msg.assign(n);
+ }
+ base_s3select_exception(std::string n, s3select_exp_en_t severity) : m_severity(severity)
+ {
+ _msg = n;
+ }
+
+ virtual const char* what()
+ {
+ return _msg.c_str();
+ }
+
+ s3select_exp_en_t severity()
+ {
+ return m_severity;
+ }
+
+ virtual ~base_s3select_exception() {}
+};
+
+
+// pointer to dynamic allocated buffer , which used for placement new.
+static __thread char* _s3select_buff_ptr =0;
+
+class s3select_allocator //s3select is the "owner"
+{
+private:
+
+ std::vector<char*> list_of_buff;
+ u_int32_t m_idx;
+
+public:
+#define __S3_ALLOCATION_BUFF__ (8*1024)
+ s3select_allocator():m_idx(0)
+ {
+ list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__));
+ }
+
+ void set_global_buff()
+ {
+ char* buff = list_of_buff.back();
+ _s3select_buff_ptr = &buff[ m_idx ];
+ }
+
+ void check_capacity(size_t sz)
+ {
+ if (sz>__S3_ALLOCATION_BUFF__)
+ {
+ throw base_s3select_exception("requested size too big", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if ((m_idx + sz) >= __S3_ALLOCATION_BUFF__)
+ {
+ list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__));
+ m_idx = 0;
+ }
+ }
+
+ void inc(size_t sz)
+ {
+ m_idx += sz;
+ m_idx += sizeof(char*) - (m_idx % sizeof(char*)); //alignment
+ }
+
+ void zero()
+ {
+ //not a must, its for safty.
+ _s3select_buff_ptr=0;
+ }
+
+ virtual ~s3select_allocator()
+ {
+ for(auto b : list_of_buff)
+ {
+ free(b);
+ }
+ }
+};
+
+class __clt_allocator
+{
+public:
+ s3select_allocator* m_s3select_allocator;
+
+public:
+
+ __clt_allocator():m_s3select_allocator(0) {}
+
+ void set(s3select_allocator* a)
+ {
+ m_s3select_allocator = a;
+ }
+};
+
+// placement new for allocation of all s3select objects on single(or few) buffers, deallocation of those objects is by releasing the buffer.
+#define S3SELECT_NEW( type , ... ) [=]() \
+ { \
+ m_s3select_allocator->check_capacity(sizeof( type )); \
+ m_s3select_allocator->set_global_buff(); \
+ auto res=new (_s3select_buff_ptr) type(__VA_ARGS__); \
+ m_s3select_allocator->inc(sizeof( type )); \
+ m_s3select_allocator->zero(); \
+ return res; \
+ }();
+
+class scratch_area
+{
+
+private:
+ std::vector<std::string_view> m_columns{128};
+ int m_upper_bound;
+
+ std::vector<std::pair<std::string, int >> m_column_name_pos;
+
+public:
+
+ void set_column_pos(const char* n, int pos)//TODO use std::string
+ {
+ m_column_name_pos.push_back( std::pair<const char*, int>(n, pos));
+ }
+
+ void update(std::vector<char*> tokens, size_t num_of_tokens)
+ {
+ size_t i=0;
+ for(auto s : tokens)
+ {
+ if (i>=num_of_tokens)
+ {
+ break;
+ }
+
+ m_columns[i++] = s;
+ }
+ m_upper_bound = i;
+
+ }
+
+ int get_column_pos(const char* n)
+ {
+ //done only upon building the AST , not on "runtime"
+
+ std::vector<std::pair<std::string, int >>::iterator iter;
+
+ for( auto iter : m_column_name_pos)
+ {
+ if (!strcmp(iter.first.c_str(), n))
+ {
+ return iter.second;
+ }
+ }
+
+ return -1;
+ }
+
+ std::string_view get_column_value(int column_pos)
+ {
+
+ if ((column_pos >= m_upper_bound) || column_pos < 0)
+ {
+ throw base_s3select_exception("column_position_is_wrong", base_s3select_exception::s3select_exp_en_t::ERROR);
+ }
+
+ return m_columns[column_pos];
+ }
+
+ int get_num_of_columns()
+ {
+ return m_upper_bound;
+ }
+};
+
+class base_statement;
+class projection_alias
+{
+//purpose: mapping between alias-name to base_statement*
+//those routines are *NOT* intensive, works once per query parse time.
+
+private:
+ std::vector< std::pair<std::string, base_statement*> > alias_map;
+
+public:
+ std::vector< std::pair<std::string, base_statement*> >* get()
+ {
+ return &alias_map;
+ }
+
+ bool insert_new_entry(std::string alias_name, base_statement* bs)
+ {
+ //purpose: only unique alias names.
+
+ for(auto alias: alias_map)
+ {
+ if(alias.first.compare(alias_name) == 0)
+ {
+ return false; //alias name already exist
+ }
+
+ }
+ std::pair<std::string, base_statement*> new_alias(alias_name, bs);
+ alias_map.push_back(new_alias);
+
+ return true;
+ }
+
+ base_statement* search_alias(std::string alias_name)
+ {
+ for(auto alias: alias_map)
+ {
+ if(alias.first.compare(alias_name) == 0)
+ {
+ return alias.second; //refernce to execution node
+ }
+ }
+ return 0;
+ }
+};
+
+struct binop_plus
+{
+ double operator()(double a, double b)
+ {
+ return a+b;
+ }
+};
+
+struct binop_minus
+{
+ double operator()(double a, double b)
+ {
+ return a-b;
+ }
+};
+
+struct binop_mult
+{
+ double operator()(double a, double b)
+ {
+ return a * b;
+ }
+};
+
+struct binop_div
+{
+ double operator()(double a, double b)
+ {
+ return a / b;
+ }
+};
+
+struct binop_pow
+{
+ double operator()(double a, double b)
+ {
+ return pow(a, b);
+ }
+};
+
+class value
+{
+
+public:
+ typedef union
+ {
+ int64_t num;
+ char* str;//TODO consider string_view
+ double dbl;
+ boost::posix_time::ptime* timestamp;
+ } value_t;
+
+private:
+ value_t __val;
+ std::string m_to_string;
+ std::string m_str_value;
+
+public:
+ enum class value_En_t
+ {
+ DECIMAL,
+ FLOAT,
+ STRING,
+ TIMESTAMP,
+ NA
+ } ;
+ value_En_t type;
+
+ value(int64_t n) : type(value_En_t::DECIMAL)
+ {
+ __val.num = n;
+ }
+ value(int n) : type(value_En_t::DECIMAL)
+ {
+ __val.num = n;
+ }
+ value(bool b) : type(value_En_t::DECIMAL)
+ {
+ __val.num = (int64_t)b;
+ }
+ value(double d) : type(value_En_t::FLOAT)
+ {
+ __val.dbl = d;
+ }
+ value(boost::posix_time::ptime* timestamp) : type(value_En_t::TIMESTAMP)
+ {
+ __val.timestamp = timestamp;
+ }
+
+ value(const char* s) : type(value_En_t::STRING)
+ {
+ m_str_value.assign(s);
+ __val.str = m_str_value.data();
+ }
+
+ value():type(value_En_t::NA)
+ {
+ __val.num=0;
+ }
+
+ bool is_number() const
+ {
+ if ((type == value_En_t::DECIMAL || type == value_En_t::FLOAT))
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ bool is_string() const
+ {
+ return type == value_En_t::STRING;
+ }
+ bool is_timestamp() const
+ {
+ return type == value_En_t::TIMESTAMP;
+ }
+
+
+ std::string& to_string() //TODO very intensive , must improve this
+ {
+
+ if (type != value_En_t::STRING)
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ m_to_string.assign( boost::lexical_cast<std::string>(__val.num) );
+ }
+ else if(type == value_En_t::FLOAT)
+ {
+ m_to_string = boost::lexical_cast<std::string>(__val.dbl);
+ }
+ else
+ {
+ m_to_string = to_simple_string( *__val.timestamp );
+ }
+ }
+ else
+ {
+ m_to_string.assign( __val.str );
+ }
+
+ return m_to_string;
+ }
+
+
+ value& operator=(value& o)
+ {
+ if(this->type == value_En_t::STRING)
+ {
+ m_str_value.assign(o.str());
+ __val.str = m_str_value.data();
+ }
+ else
+ {
+ this->__val = o.__val;
+ }
+
+ this->type = o.type;
+
+ return *this;
+ }
+
+ value& operator=(const char* s)
+ {
+ m_str_value.assign(s);
+ this->__val.str = m_str_value.data();
+ this->type = value_En_t::STRING;
+
+ return *this;
+ }
+
+ value& operator=(int64_t i)
+ {
+ this->__val.num = i;
+ this->type = value_En_t::DECIMAL;
+
+ return *this;
+ }
+
+ value& operator=(double d)
+ {
+ this->__val.dbl = d;
+ this->type = value_En_t::FLOAT;
+
+ return *this;
+ }
+
+ value& operator=(bool b)
+ {
+ this->__val.num = (int64_t)b;
+ this->type = value_En_t::DECIMAL;
+
+ return *this;
+ }
+
+ value& operator=(boost::posix_time::ptime* p)
+ {
+ this->__val.timestamp = p;
+ this->type = value_En_t::TIMESTAMP;
+
+ return *this;
+ }
+
+ int64_t i64()
+ {
+ return __val.num;
+ }
+
+ const char* str()
+ {
+ return __val.str;
+ }
+
+ double dbl()
+ {
+ return __val.dbl;
+ }
+
+ boost::posix_time::ptime* timestamp() const
+ {
+ return __val.timestamp;
+ }
+
+ bool operator<(const value& v)//basic compare operator , most itensive runtime operation
+ {
+ //TODO NA possible?
+ if (is_string() && v.is_string())
+ {
+ return strcmp(__val.str, v.__val.str) < 0;
+ }
+
+ if (is_number() && v.is_number())
+ {
+
+ if(type != v.type) //conversion //TODO find better way
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ return (double)__val.num < v.__val.dbl;
+ }
+ else
+ {
+ return __val.dbl < (double)v.__val.num;
+ }
+ }
+ else //no conversion
+ {
+ if(type == value_En_t::DECIMAL)
+ {
+ return __val.num < v.__val.num;
+ }
+ else
+ {
+ return __val.dbl < v.__val.dbl;
+ }
+
+ }
+ }
+
+ if(is_timestamp() && v.is_timestamp())
+ {
+ return *timestamp() < *(v.timestamp());
+ }
+
+ throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
+ }
+
+ bool operator>(const value& v) //basic compare operator , most itensive runtime operation
+ {
+ //TODO NA possible?
+ if (is_string() && v.is_string())
+ {
+ return strcmp(__val.str, v.__val.str) > 0;
+ }
+
+ if (is_number() && v.is_number())
+ {
+
+ if(type != v.type) //conversion //TODO find better way
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ return (double)__val.num > v.__val.dbl;
+ }
+ else
+ {
+ return __val.dbl > (double)v.__val.num;
+ }
+ }
+ else //no conversion
+ {
+ if(type == value_En_t::DECIMAL)
+ {
+ return __val.num > v.__val.num;
+ }
+ else
+ {
+ return __val.dbl > v.__val.dbl;
+ }
+
+ }
+ }
+
+ if(is_timestamp() && v.is_timestamp())
+ {
+ return *timestamp() > *(v.timestamp());
+ }
+
+ throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
+ }
+
+ bool operator==(const value& v) //basic compare operator , most itensive runtime operation
+ {
+ //TODO NA possible?
+ if (is_string() && v.is_string())
+ {
+ return strcmp(__val.str, v.__val.str) == 0;
+ }
+
+
+ if (is_number() && v.is_number())
+ {
+
+ if(type != v.type) //conversion //TODO find better way
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ return (double)__val.num == v.__val.dbl;
+ }
+ else
+ {
+ return __val.dbl == (double)v.__val.num;
+ }
+ }
+ else //no conversion
+ {
+ if(type == value_En_t::DECIMAL)
+ {
+ return __val.num == v.__val.num;
+ }
+ else
+ {
+ return __val.dbl == v.__val.dbl;
+ }
+
+ }
+ }
+
+ if(is_timestamp() && v.is_timestamp())
+ {
+ return *timestamp() == *(v.timestamp());
+ }
+
+ throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
+ }
+ bool operator<=(const value& v)
+ {
+ return !(*this>v);
+ }
+ bool operator>=(const value& v)
+ {
+ return !(*this<v);
+ }
+ bool operator!=(const value& v)
+ {
+ return !(*this == v);
+ }
+
+ template<typename binop> //conversion rules for arithmetical binary operations
+ value& compute(value& l, const value& r) //left should be this, it contain the result
+ {
+ binop __op;
+
+ if (l.is_string() || r.is_string())
+ {
+ throw base_s3select_exception("illegal binary operation with string");
+ }
+
+ if (l.type != r.type)
+ {
+ //conversion
+
+ if (l.type == value_En_t::DECIMAL)
+ {
+ l.__val.dbl = __op((double)l.__val.num, r.__val.dbl);
+ l.type = value_En_t::FLOAT;
+ }
+ else
+ {
+ l.__val.dbl = __op(l.__val.dbl, (double)r.__val.num);
+ l.type = value_En_t::FLOAT;
+ }
+ }
+ else
+ {
+ //no conversion
+
+ if (l.type == value_En_t::DECIMAL)
+ {
+ l.__val.num = __op(l.__val.num, r.__val.num );
+ l.type = value_En_t::DECIMAL;
+ }
+ else
+ {
+ l.__val.dbl = __op(l.__val.dbl, r.__val.dbl );
+ l.type = value_En_t::FLOAT;
+ }
+ }
+
+ return l;
+ }
+
+ value& operator+(const value& v)
+ {
+ return compute<binop_plus>(*this, v);
+ }
+
+ value& operator-(const value& v)
+ {
+ return compute<binop_minus>(*this, v);
+ }
+
+ value& operator*(const value& v)
+ {
+ return compute<binop_mult>(*this, v);
+ }
+
+ value& operator/(const value& v) // TODO handle division by zero
+ {
+ return compute<binop_div>(*this, v);
+ }
+
+ value& operator^(const value& v)
+ {
+ return compute<binop_pow>(*this, v);
+ }
+
+};
+
+class base_statement
+{
+
+protected:
+
+ scratch_area* m_scratch;
+ projection_alias* m_aliases;
+ bool is_last_call; //valid only for aggregation functions
+ bool m_is_cache_result;
+ value m_alias_result;
+ base_statement* m_projection_alias;
+ int m_eval_stack_depth;
+
+public:
+ base_statement():m_scratch(0), is_last_call(false), m_is_cache_result(false), m_projection_alias(0), m_eval_stack_depth(0) {}
+ virtual value& eval() =0;
+ virtual base_statement* left()
+ {
+ return 0;
+ }
+ virtual base_statement* right()
+ {
+ return 0;
+ }
+ virtual std::string print(int ident) =0;//TODO complete it, one option to use level parametr in interface ,
+ virtual bool semantic() =0;//done once , post syntax , traverse all nodes and validate semantics.
+
+ virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa)
+ {
+ m_scratch = sa;
+ m_aliases = pa;
+ if (left())
+ {
+ left()->traverse_and_apply(m_scratch, m_aliases);
+ }
+ if (right())
+ {
+ right()->traverse_and_apply(m_scratch, m_aliases);
+ }
+ }
+
+ virtual bool is_aggregate()
+ {
+ return false;
+ }
+ virtual bool is_column()
+ {
+ return false;
+ }
+
+ bool is_function();
+ bool is_aggregate_exist_in_expression(base_statement* e);//TODO obsolete ?
+ base_statement* get_aggregate();
+ bool is_nested_aggregate(base_statement* e);
+ bool is_binop_aggregate_and_column(base_statement* skip);
+
+ virtual void set_last_call()
+ {
+ is_last_call = true;
+ if(left())
+ {
+ left()->set_last_call();
+ }
+ if(right())
+ {
+ right()->set_last_call();
+ }
+ }
+
+ bool is_set_last_call()
+ {
+ return is_last_call;
+ }
+
+ void invalidate_cache_result()
+ {
+ m_is_cache_result = false;
+ }
+
+ bool is_result_cached()
+ {
+ return m_is_cache_result == true;
+ }
+
+ void set_result_cache(value& eval_result)
+ {
+ m_alias_result = eval_result;
+ m_is_cache_result = true;
+ }
+
+ void dec_call_stack_depth()
+ {
+ m_eval_stack_depth --;
+ }
+
+ value& get_result_cache()
+ {
+ return m_alias_result;
+ }
+
+ int& get_eval_call_depth()
+ {
+ m_eval_stack_depth++;
+ return m_eval_stack_depth;
+ }
+
+ virtual ~base_statement() {}
+
+};
+
+class variable : public base_statement
+{
+
+public:
+
+ enum class var_t
+ {
+ NA,
+ VAR,//schema column (i.e. age , price , ...)
+ COL_VALUE, //concrete value
+ POS, // CSV column number (i.e. _1 , _2 ... )
+ STAR_OPERATION, //'*'
+ } ;
+ var_t m_var_type;
+
+private:
+
+ std::string _name;
+ int column_pos;
+ value var_value;
+ std::string m_star_op_result;
+ char m_star_op_result_charc[4096]; //TODO should be dynamic
+
+ const int undefined_column_pos = -1;
+ const int column_alias = -2;
+
+public:
+ variable():m_var_type(var_t::NA), _name(""), column_pos(-1) {}
+
+ variable(int64_t i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
+
+ variable(double d) : m_var_type(var_t::COL_VALUE), _name("#"), column_pos(-1), var_value(d) {}
+
+ variable(int i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
+
+ variable(const std::string& n) : m_var_type(var_t::VAR), _name(n), column_pos(-1) {}
+
+ variable(const std::string& n, var_t tp) : m_var_type(var_t::NA)
+ {
+ if(tp == variable::var_t::POS)
+ {
+ _name = n;
+ m_var_type = tp;
+ int pos = atoi( n.c_str() + 1 ); //TODO >0 < (schema definition , semantic analysis)
+ column_pos = pos -1;// _1 is the first column ( zero position )
+ }
+ else if (tp == variable::var_t::COL_VALUE)
+ {
+ _name = "#";
+ m_var_type = tp;
+ column_pos = -1;
+ var_value = n.c_str();
+ }
+ else if (tp ==variable::var_t::STAR_OPERATION)
+ {
+ _name = "#";
+ m_var_type = tp;
+ column_pos = -1;
+ }
+ }
+
+ void operator=(value& v)
+ {
+ var_value = v;
+ }
+
+ void set_value(const char* s)
+ {
+ var_value = s;
+ }
+
+ void set_value(double d)
+ {
+ var_value = d;
+ }
+
+ void set_value(int64_t i)
+ {
+ var_value = i;
+ }
+
+ void set_value(boost::posix_time::ptime* p)
+ {
+ var_value = p;
+ }
+
+ virtual ~variable() {}
+
+ virtual bool is_column() //is reference to column.
+ {
+ if(m_var_type == var_t::VAR || m_var_type == var_t::POS)
+ {
+ return true;
+ }
+ return false;
+ }
+
+ value& get_value()
+ {
+ return var_value; //TODO is it correct
+ }
+ virtual value::value_En_t get_value_type()
+ {
+ return var_value.type;
+ }
+
+
+ value& star_operation() //purpose return content of all columns in a input stream
+ {
+
+
+ int i;
+ size_t pos=0;
+ int num_of_columns = m_scratch->get_num_of_columns();
+ for(i=0; i<num_of_columns-1; i++)
+ {
+ size_t len = m_scratch->get_column_value(i).size();
+ if((pos+len)>sizeof(m_star_op_result_charc))
+ {
+ throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);
+ pos += len;
+ m_star_op_result_charc[ pos ] = ',';//TODO need for another abstraction (per file type)
+ pos ++;
+
+ }
+
+ size_t len = m_scratch->get_column_value(i).size();
+ if((pos+len)>sizeof(m_star_op_result_charc))
+ {
+ throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);
+ m_star_op_result_charc[ pos + len ] = 0;
+ var_value = (char*)&m_star_op_result_charc[0];
+ return var_value;
+ }
+
+ virtual value& eval()
+ {
+ if (m_var_type == var_t::COL_VALUE)
+ {
+ return var_value; // a literal,could be deciml / float / string
+ }
+ else if(m_var_type == var_t::STAR_OPERATION)
+ {
+ return star_operation();
+ }
+ else if (column_pos == undefined_column_pos)
+ {
+ //done once , for the first time
+ column_pos = m_scratch->get_column_pos(_name.c_str());
+
+ if(column_pos>=0 && m_aliases->search_alias(_name.c_str()))
+ {
+ throw base_s3select_exception(std::string("multiple definition of column {") + _name + "} as schema-column and alias", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+
+ if (column_pos == undefined_column_pos)
+ {
+ //not belong to schema , should exist in aliases
+ m_projection_alias = m_aliases->search_alias(_name.c_str());
+
+ //not enter this scope again
+ column_pos = column_alias;
+ if(m_projection_alias == 0)
+ {
+ throw base_s3select_exception(std::string("alias {")+_name+std::string("} or column not exist in schema"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ }
+
+ }
+
+ if (m_projection_alias)
+ {
+ if (m_projection_alias->get_eval_call_depth()>2)
+ {
+ throw base_s3select_exception("number of calls exceed maximum size, probably a cyclic reference to alias", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if (m_projection_alias->is_result_cached() == false)
+ {
+ var_value = m_projection_alias->eval();
+ m_projection_alias->set_result_cache(var_value);
+ }
+ else
+ {
+ var_value = m_projection_alias->get_result_cache();
+ }
+
+ m_projection_alias->dec_call_stack_depth();
+ }
+ else
+ {
+ var_value = (char*)m_scratch->get_column_value(column_pos).data(); //no allocation. returning pointer of allocated space
+ }
+
+ return var_value;
+ }
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident,' ') + std::string("var:") + std::to_string(var_value.__val.num);
+ //return out;
+ return std::string("#");//TBD
+ }
+
+ virtual bool semantic()
+ {
+ return false;
+ }
+
+};
+
+class arithmetic_operand : public base_statement
+{
+
+public:
+
+ enum class cmp_t {NA, EQ, LE, LT, GT, GE, NE} ;
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ cmp_t _cmp;
+ value var_value;
+
+public:
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident,' ') + "compare:" += std::to_string(_cmp) + "\n" + l->print(ident-5) +r->print(ident+5);
+ //return out;
+ return std::string("#");//TBD
+ }
+
+ virtual value& eval()
+ {
+
+ switch (_cmp)
+ {
+ case cmp_t::EQ:
+ return var_value = (l->eval() == r->eval());
+ break;
+
+ case cmp_t::LE:
+ return var_value = (l->eval() <= r->eval());
+ break;
+
+ case cmp_t::GE:
+ return var_value = (l->eval() >= r->eval());
+ break;
+
+ case cmp_t::NE:
+ return var_value = (l->eval() != r->eval());
+ break;
+
+ case cmp_t::GT:
+ return var_value = (l->eval() > r->eval());
+ break;
+
+ case cmp_t::LT:
+ return var_value = (l->eval() < r->eval());
+ break;
+
+ default:
+ throw base_s3select_exception("internal error");
+ break;
+ }
+ }
+
+ arithmetic_operand(base_statement* _l, cmp_t c, base_statement* _r):l(_l), r(_r), _cmp(c) {}
+
+ virtual ~arithmetic_operand() {}
+};
+
+class logical_operand : public base_statement
+{
+
+public:
+
+ enum class oplog_t {AND, OR, NA};
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ oplog_t _oplog;
+ value var_value;
+
+public:
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ logical_operand(base_statement* _l, oplog_t _o, base_statement* _r):l(_l), r(_r), _oplog(_o) {}
+
+ virtual ~logical_operand() {}
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident, ' ') + "logical_operand:" += std::to_string(_oplog) + "\n" + l->print(ident - 5) + r->print(ident + 5);
+ //return out;
+ return std::string("#");//TBD
+ }
+ virtual value& eval()
+ {
+ if (_oplog == oplog_t::AND)
+ {
+ if (!l || !r)
+ {
+ throw base_s3select_exception("missing operand for logical and", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ return var_value = (l->eval().i64() && r->eval().i64());
+ }
+ else
+ {
+ if (!l || !r)
+ {
+ throw base_s3select_exception("missing operand for logical or", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ return var_value = (l->eval().i64() || r->eval().i64());
+ }
+ }
+
+};
+
+class mulldiv_operation : public base_statement
+{
+
+public:
+
+ enum class muldiv_t {NA, MULL, DIV, POW} ;
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ muldiv_t _mulldiv;
+ value var_value;
+
+public:
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident, ' ') + "mulldiv_operation:" += std::to_string(_mulldiv) + "\n" + l->print(ident - 5) + r->print(ident + 5);
+ //return out;
+ return std::string("#");//TBD
+ }
+
+ virtual value& eval()
+ {
+ switch (_mulldiv)
+ {
+ case muldiv_t::MULL:
+ return var_value = l->eval() * r->eval();
+ break;
+
+ case muldiv_t::DIV:
+ return var_value = l->eval() / r->eval();
+ break;
+
+ case muldiv_t::POW:
+ return var_value = l->eval() ^ r->eval();
+ break;
+
+ default:
+ throw base_s3select_exception("internal error");
+ break;
+ }
+ }
+
+ mulldiv_operation(base_statement* _l, muldiv_t c, base_statement* _r):l(_l), r(_r), _mulldiv(c) {}
+
+ virtual ~mulldiv_operation() {}
+};
+
+class addsub_operation : public base_statement
+{
+
+public:
+
+ enum class addsub_op_t {ADD, SUB, NA};
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ addsub_op_t _op;
+ value var_value;
+
+public:
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ addsub_operation(base_statement* _l, addsub_op_t _o, base_statement* _r):l(_l), r(_r), _op(_o) {}
+
+ virtual ~addsub_operation() {}
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident, ' ') + "addsub_operation:" += std::to_string(_op) + "\n" + l->print(ident - 5) + r->print(ident + 5);
+ return std::string("#");//TBD
+ }
+
+ virtual value& eval()
+ {
+ if (_op == addsub_op_t::NA) // -num , +num , unary-operation on number
+ {
+ if (l)
+ {
+ return var_value = l->eval();
+ }
+ else if (r)
+ {
+ return var_value = r->eval();
+ }
+ }
+ else if (_op == addsub_op_t::ADD)
+ {
+ return var_value = (l->eval() + r->eval());
+ }
+ else
+ {
+ return var_value = (l->eval() - r->eval());
+ }
+
+ return var_value;
+ }
+};
+
+class base_function
+{
+
+protected:
+ bool aggregate;
+
+public:
+ //TODO add semantic to base-function , it operate once on function creation
+ // validate semantic on creation instead on run-time
+ virtual bool operator()(std::vector<base_statement*>* args, variable* result) = 0;
+ base_function() : aggregate(false) {}
+ bool is_aggregate()
+ {
+ return aggregate == true;
+ }
+ virtual void get_aggregate_result(variable*) {}
+
+ virtual ~base_function() {}
+};
+
+
+};//namespace
+
+#endif