diff options
Diffstat (limited to 'src/s3select/include')
-rw-r--r-- | src/s3select/include/s3select.h | 1138 | ||||
-rw-r--r-- | src/s3select/include/s3select_csv_parser.h | 407 | ||||
-rw-r--r-- | src/s3select/include/s3select_functions.h | 1037 | ||||
-rw-r--r-- | src/s3select/include/s3select_oper.h | 1320 |
4 files changed, 3902 insertions, 0 deletions
diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h new file mode 100644 index 000000000..77af8c2d9 --- /dev/null +++ b/src/s3select/include/s3select.h @@ -0,0 +1,1138 @@ +#ifndef __S3SELECT__ +#define __S3SELECT__ + +#include <boost/spirit/include/classic_core.hpp> +#include <boost/algorithm/string.hpp> +#include <iostream> +#include <string> +#include <list> +#include "s3select_oper.h" +#include "s3select_functions.h" +#include "s3select_csv_parser.h" +#include <boost/function.hpp> +#include <boost/bind.hpp> +#include <functional> + + +#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} + + +namespace s3selectEngine +{ + +/// AST builder + +class s3select_projections +{ + +private: + std::vector<base_statement*> m_projections; + +public: + bool is_aggregate() + { + //TODO iterate on projections , and search for aggregate + //for(auto p : m_projections){} + + return false; + } + + bool semantic() + { + //TODO check aggragtion function are not nested + return false; + } + + std::vector<base_statement*>* get() + { + return &m_projections; + } + +}; + +struct actionQ +{ +// upon parser is accepting a token (lets say some number), +// it push it into dedicated queue, later those tokens are poped out to build some "higher" contruct (lets say 1 + 2) +// those containers are used only for parsing phase and not for runtime. + + std::vector<mulldiv_operation::muldiv_t> muldivQ; + std::vector<addsub_operation::addsub_op_t> addsubQ; + std::vector<arithmetic_operand::cmp_t> arithmetic_compareQ; + std::vector<logical_operand::oplog_t> logical_compareQ; + std::vector<base_statement*> exprQ; + std::vector<base_statement*> funcQ; + std::vector<base_statement*> condQ; + projection_alias alias_map; + std::string from_clause; + std::vector<std::string> schema_columns; + s3select_projections projections; + +}; + +class base_action : public __clt_allocator +{ + +public: + base_action() : m_action(0), m_s3select_functions(0) {} + actionQ* m_action; + void set_action_q(actionQ* a) + { + m_action = a; + } + void set_s3select_functions(s3select_functions* s3f) + { + m_s3select_functions = s3f; + } + s3select_functions* m_s3select_functions; +}; + +struct push_from_clause : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + m_action->from_clause = token; + } + +}; +static push_from_clause g_push_from_clause; + +struct push_number : public base_action //TODO use define for defintion of actions +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + variable* v = S3SELECT_NEW( variable, atoi(token.c_str())); + + + m_action->exprQ.push_back(v); + } + +}; +static push_number g_push_number; + +struct push_float_number : public base_action //TODO use define for defintion of actions +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + //the parser for float(real_p) is accepting also integers, thus "blocking" integer acceptence and all are float. + bsc::parse_info<> info = bsc::parse(token.c_str(), bsc::int_p, bsc::space_p); + + if (!info.full) + { + char* perr; + double d = strtod(token.c_str(), &perr); + variable* v = S3SELECT_NEW( variable, d); + + m_action->exprQ.push_back(v); + } + else + { + variable* v = S3SELECT_NEW(variable, atoi(token.c_str())); + + m_action->exprQ.push_back(v); + } + } + +}; +static push_float_number g_push_float_number; + +struct push_string : public base_action //TODO use define for defintion of actions +{ + + void operator()(const char* a, const char* b) const + { + a++; + b--;// remove double quotes + std::string token(a, b); + + variable* v = S3SELECT_NEW(variable, token, variable::var_t::COL_VALUE ); + + m_action->exprQ.push_back(v); + } + +}; +static push_string g_push_string; + +struct push_variable : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + variable* v = S3SELECT_NEW(variable, token); + + m_action->exprQ.push_back(v); + } +}; +static push_variable g_push_variable; + +/////////////////////////arithmetic unit ///////////////// +struct push_addsub : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + if (token.compare("+") == 0) + { + m_action->addsubQ.push_back(addsub_operation::addsub_op_t::ADD); + } + else + { + m_action->addsubQ.push_back(addsub_operation::addsub_op_t::SUB); + } + } +}; +static push_addsub g_push_addsub; + +struct push_mulop : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + if (token.compare("*") == 0) + { + m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::MULL); + } + else if (token.compare("/") == 0) + { + m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::DIV); + } + else + { + m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::POW); + } + } +}; +static push_mulop g_push_mulop; + +struct push_addsub_binop : public base_action +{ + void operator()(const char* a, const char* b) const + { + base_statement* l = 0, *r = 0; + + r = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + l = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + addsub_operation::addsub_op_t o = m_action->addsubQ.back(); + m_action->addsubQ.pop_back(); + addsub_operation* as = S3SELECT_NEW(addsub_operation, l, o, r); + m_action->exprQ.push_back(as); + } +}; +static push_addsub_binop g_push_addsub_binop; + +struct push_mulldiv_binop : public base_action +{ + void operator()(const char* a, const char* b) const + { + base_statement* vl = 0, *vr = 0; + + vr = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + vl = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + mulldiv_operation::muldiv_t o = m_action->muldivQ.back(); + m_action->muldivQ.pop_back(); + mulldiv_operation* f = S3SELECT_NEW(mulldiv_operation, vl, o, vr); + m_action->exprQ.push_back(f); + } +}; +static push_mulldiv_binop g_push_mulldiv_binop; + +struct push_function_arg : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* be = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + base_statement* f = m_action->funcQ.back(); + + if (dynamic_cast<__function*>(f)) + { + dynamic_cast<__function*>(f)->push_argument(be); + } + } +}; +static push_function_arg g_push_function_arg; + +struct push_function_name : public base_action +{ + void operator()(const char* a, const char* b) const + { + b--; + while(*b=='(' || *b == ' ') + { + b--; //point to function-name + } + + std::string fn; + fn.assign(a, b-a+1); + + __function* func = S3SELECT_NEW(__function, fn.c_str(), m_s3select_functions); + m_action->funcQ.push_back(func); + } +}; +static push_function_name g_push_function_name; + +struct push_function_expr : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* func = m_action->funcQ.back(); + m_action->funcQ.pop_back(); + + m_action->exprQ.push_back(func); + } +}; +static push_function_expr g_push_function_expr; + +////////////////////// logical unit //////////////////////// + +struct push_compare_operator : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + arithmetic_operand::cmp_t c = arithmetic_operand::cmp_t::NA; + + if (token.compare("==") == 0) + { + c = arithmetic_operand::cmp_t::EQ; + } + else if (token.compare("!=") == 0) + { + c = arithmetic_operand::cmp_t::NE; + } + else if (token.compare(">=") == 0) + { + c = arithmetic_operand::cmp_t::GE; + } + else if (token.compare("<=") == 0) + { + c = arithmetic_operand::cmp_t::LE; + } + else if (token.compare(">") == 0) + { + c = arithmetic_operand::cmp_t::GT; + } + else if (token.compare("<") == 0) + { + c = arithmetic_operand::cmp_t::LT; + } + else + { + c = arithmetic_operand::cmp_t::NA; + } + + m_action->arithmetic_compareQ.push_back(c); + } +}; +static push_compare_operator g_push_compare_operator; + +struct push_logical_operator : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + logical_operand::oplog_t l = logical_operand::oplog_t::NA; + + if (token.compare("and") == 0) + { + l = logical_operand::oplog_t::AND; + } + else if (token.compare("or") == 0) + { + l = logical_operand::oplog_t::OR; + } + else + { + l = logical_operand::oplog_t::NA; + } + + m_action->logical_compareQ.push_back(l); + + } +}; +static push_logical_operator g_push_logical_operator; + +struct push_arithmetic_predicate : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* vr, *vl; + arithmetic_operand::cmp_t c = m_action->arithmetic_compareQ.back(); + m_action->arithmetic_compareQ.pop_back(); + vr = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + vl = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + + arithmetic_operand* t = S3SELECT_NEW(arithmetic_operand, vl, c, vr); + + m_action->condQ.push_back(t); + } +}; +static push_arithmetic_predicate g_push_arithmetic_predicate; + +struct push_logical_predicate : public base_action +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* tl = 0, *tr = 0; + logical_operand::oplog_t oplog = m_action->logical_compareQ.back(); + m_action->logical_compareQ.pop_back(); + + if (m_action->condQ.empty() == false) + { + tr = m_action->condQ.back(); + m_action->condQ.pop_back(); + } + if (m_action->condQ.empty() == false) + { + tl = m_action->condQ.back(); + m_action->condQ.pop_back(); + } + + logical_operand* f = S3SELECT_NEW(logical_operand, tl, oplog, tr); + + m_action->condQ.push_back(f); + } +}; +static push_logical_predicate g_push_logical_predicate; + +struct push_column_pos : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + variable* v; + + if (token.compare("*") == 0 || token.compare("* ")==0) //TODO space should skip in boost::spirit + { + v = S3SELECT_NEW(variable, token, variable::var_t::STAR_OPERATION); + } + else + { + v = S3SELECT_NEW(variable, token, variable::var_t::POS); + } + + m_action->exprQ.push_back(v); + } + +}; +static push_column_pos g_push_column_pos; + +struct push_projection : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + m_action->projections.get()->push_back( m_action->exprQ.back() ); + m_action->exprQ.pop_back(); + } + +}; +static push_projection g_push_projection; + +struct push_alias_projection : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + //extract alias name + const char* p=b; + while(*(--p) != ' '); + std::string alias_name(p+1, b); + base_statement* bs = m_action->exprQ.back(); + + //mapping alias name to base-statement + bool res = m_action->alias_map.insert_new_entry(alias_name, bs); + if (res==false) + { + throw base_s3select_exception(std::string("alias <")+alias_name+std::string("> is already been used in query"), base_s3select_exception::s3select_exp_en_t::FATAL); + } + + + m_action->projections.get()->push_back( bs ); + m_action->exprQ.pop_back(); + } + +}; +static push_alias_projection g_push_alias_projection; + +/// for the schema description "mini-parser" +struct push_column : public base_action +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + m_action->schema_columns.push_back(token); + } + +}; +static push_column g_push_column; + +struct push_debug_1 : public base_action +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + } + +}; +static push_debug_1 g_push_debug_1; + +struct s3select : public bsc::grammar<s3select> +{ +private: + + actionQ m_actionQ; + + scratch_area m_sca; + + s3select_functions m_s3select_functions; + + std::string error_description; + + s3select_allocator m_s3select_allocator; + + bool aggr_flow; + +#define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2) +#define ATTACH_ACTION_Q( push_name ) {(g_ ## push_name).set_action_q(&m_actionQ); (g_ ## push_name).set_s3select_functions(&m_s3select_functions); (g_ ## push_name).set(&m_s3select_allocator);} + +public: + + + int semantic() + { + for (auto e : get_projections_list()) + { + base_statement* aggr = 0; + + if ((aggr = e->get_aggregate()) != 0) + { + if (aggr->is_nested_aggregate(aggr)) + { + error_description = "nested aggregation function is illegal i.e. sum(...sum ...)"; + throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL); + } + + aggr_flow = true; + } + } + + if (aggr_flow == true) + for (auto e : get_projections_list()) + { + base_statement* skip_expr = e->get_aggregate(); + + if (e->is_binop_aggregate_and_column(skip_expr)) + { + error_description = "illegal expression. /select sum(c1) + c1 ..../ is not allow type of query"; + throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL); + } + } + + return 0; + } + + int parse_query(const char* input_query) + { + if(get_projections_list().empty() == false) + { + return 0; //already parsed + } + + try + { + bsc::parse_info<> info = bsc::parse(input_query, *this, bsc::space_p); + auto query_parse_position = info.stop; + + if (!info.full) + { + std::cout << "failure -->" << query_parse_position << "<---" << std::endl; + error_description = std::string("failure -->") + query_parse_position + std::string("<---"); + return -1; + } + + semantic(); + } + catch (base_s3select_exception& e) + { + std::cout << e.what() << std::endl; + error_description.assign(e.what()); + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution + { + return -1; + } + } + + return 0; + } + + std::string get_error_description() + { + return error_description; + } + + s3select() + { + //TODO check option for defining action and push into list + + ATTACH_ACTION_Q(push_from_clause); + ATTACH_ACTION_Q(push_number); + ATTACH_ACTION_Q(push_logical_operator); + ATTACH_ACTION_Q(push_logical_predicate); + ATTACH_ACTION_Q(push_compare_operator); + ATTACH_ACTION_Q(push_arithmetic_predicate); + ATTACH_ACTION_Q(push_addsub); + ATTACH_ACTION_Q(push_addsub_binop); + ATTACH_ACTION_Q(push_mulop); + ATTACH_ACTION_Q(push_mulldiv_binop); + ATTACH_ACTION_Q(push_function_arg); + ATTACH_ACTION_Q(push_function_name); + ATTACH_ACTION_Q(push_function_expr); + ATTACH_ACTION_Q(push_float_number); + ATTACH_ACTION_Q(push_string); + ATTACH_ACTION_Q(push_variable); + ATTACH_ACTION_Q(push_column_pos); + ATTACH_ACTION_Q(push_projection); + ATTACH_ACTION_Q(push_alias_projection); + ATTACH_ACTION_Q(push_debug_1); + + error_description.clear(); + + m_s3select_functions.set(&m_s3select_allocator); + + aggr_flow = false; + } + + bool is_semantic()//TBD traverse and validate semantics per all nodes + { + base_statement* cond = m_actionQ.exprQ.back(); + + return cond->semantic(); + } + + std::string get_from_clause() + { + return m_actionQ.from_clause; + } + + void load_schema(std::vector< std::string>& scm) + { + int i = 0; + for (auto c : scm) + { + m_sca.set_column_pos(c.c_str(), i++); + } + } + + base_statement* get_filter() + { + if(m_actionQ.condQ.size()==0) + { + return NULL; + } + + return m_actionQ.condQ.back(); + } + + std::vector<base_statement*> get_projections_list() + { + return *m_actionQ.projections.get(); //TODO return COPY(?) or to return evalaution results (list of class value{}) / return reference(?) + } + + scratch_area* get_scratch_area() + { + return &m_sca; + } + + projection_alias* get_aliases() + { + return &m_actionQ.alias_map; + } + + bool is_aggregate_query() + { + return aggr_flow == true; + } + + ~s3select() {} + + + template <typename ScannerT> + struct definition + { + definition(s3select const& ) + { + ///// s3select syntax rules and actions for building AST + + select_expr = bsc::str_p("select") >> projections >> bsc::str_p("from") >> (s3_object)[BOOST_BIND_ACTION(push_from_clause)] >> !where_clause >> ';'; + + projections = projection_expression >> *( ',' >> projection_expression) ; + + projection_expression = (arithmetic_expression >> bsc::str_p("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] | (arithmetic_expression)[BOOST_BIND_ACTION(push_projection)] ; + + alias_name = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)] ; + + + s3_object = bsc::str_p("stdin") | object_path ; + + object_path = "/" >> *( fs_type >> "/") >> fs_type; + + fs_type = bsc::lexeme_d[+( bsc::alnum_p | bsc::str_p(".") | bsc::str_p("_")) ]; + + where_clause = bsc::str_p("where") >> condition_expression; + + condition_expression = (arithmetic_predicate >> *(log_op[BOOST_BIND_ACTION(push_logical_operator)] >> arithmetic_predicate[BOOST_BIND_ACTION(push_logical_predicate)])); + + arithmetic_predicate = (factor >> *(arith_cmp[BOOST_BIND_ACTION(push_compare_operator)] >> factor[BOOST_BIND_ACTION(push_arithmetic_predicate)])); + + factor = (arithmetic_expression) | ('(' >> condition_expression >> ')') ; + + arithmetic_expression = (addsub_operand >> *(addsubop_operator[BOOST_BIND_ACTION(push_addsub)] >> addsub_operand[BOOST_BIND_ACTION(push_addsub_binop)] )); + + addsub_operand = (mulldiv_operand >> *(muldiv_operator[BOOST_BIND_ACTION(push_mulop)] >> mulldiv_operand[BOOST_BIND_ACTION(push_mulldiv_binop)] ));// this non-terminal gives precedense to mull/div + + mulldiv_operand = arithmetic_argument | ('(' >> (arithmetic_expression) >> ')') ; + + list_of_function_arguments = (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)] >> *(',' >> (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)]); + function = ((variable >> '(' )[BOOST_BIND_ACTION(push_function_name)] >> !list_of_function_arguments >> ')')[BOOST_BIND_ACTION(push_function_expr)]; + + arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] | (number)[BOOST_BIND_ACTION(push_number)] | (column_pos)[BOOST_BIND_ACTION(push_column_pos)] | + (string)[BOOST_BIND_ACTION(push_string)] | + (function)[BOOST_BIND_ACTION(push_debug_1)] | (variable)[BOOST_BIND_ACTION(push_variable)] ;//function is pushed by right-term + + + number = bsc::int_p; + + float_number = bsc::real_p; + + string = bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"") ; + + column_pos = ('_'>>+(bsc::digit_p) ) | '*' ; + + muldiv_operator = bsc::str_p("*") | bsc::str_p("/") | bsc::str_p("^");// got precedense + + addsubop_operator = bsc::str_p("+") | bsc::str_p("-"); + + + arith_cmp = bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("==") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!="); + + log_op = bsc::str_p("and") | bsc::str_p("or"); //TODO add NOT (unary) + + variable = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)]; + } + + + bsc::rule<ScannerT> variable, select_expr, s3_object, where_clause, number, float_number, string, arith_cmp, log_op, condition_expression, arithmetic_predicate, factor; + bsc::rule<ScannerT> muldiv_operator, addsubop_operator, function, arithmetic_expression, addsub_operand, list_of_function_arguments, arithmetic_argument, mulldiv_operand; + bsc::rule<ScannerT> fs_type, object_path; + bsc::rule<ScannerT> projections, projection_expression, alias_name, column_pos; + bsc::rule<ScannerT> const& start() const + { + return select_expr; + } + }; +}; + +/////// handling different object types +class base_s3object +{ + +protected: + scratch_area* m_sa; + std::string m_obj_name; + +public: + base_s3object(scratch_area* m) : m_sa(m), m_obj_name("") {} + + void set(scratch_area* m) + { + m_sa = m; + m_obj_name = ""; + } + + virtual ~base_s3object() {} +}; + + +class csv_object : public base_s3object +{ + +public: + struct csv_defintions + { + char row_delimiter; + char column_delimiter; + char escape_char; + char quot_char; + bool use_header_info; + bool ignore_header_info;//skip first line + + csv_defintions():row_delimiter('\n'), column_delimiter(','), escape_char('\\'), quot_char('"'), use_header_info(false), ignore_header_info(false) {} + + } m_csv_defintion; + + csv_object(s3select* s3_query) : + base_s3object(s3_query->get_scratch_area()), + m_skip_last_line(false), + m_s3_select(0), + m_error_count(0), + m_extract_csv_header_info(false), + m_previous_line(false), + m_skip_first_line(false), + m_processed_bytes(0) + { + set(s3_query); + csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char); + } + + csv_object(s3select* s3_query, struct csv_defintions csv) : + base_s3object(s3_query->get_scratch_area()), + m_skip_last_line(false), + m_s3_select(0), + m_error_count(0), + m_extract_csv_header_info(false), + m_previous_line(false), + m_skip_first_line(false), + m_processed_bytes(0) + { + set(s3_query); + m_csv_defintion = csv; + csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char); + } + + csv_object(): + base_s3object(0), + m_skip_last_line(false), + m_s3_select(0), + m_error_count(0), + m_extract_csv_header_info(false), + m_previous_line(false), + m_skip_first_line(false), + m_processed_bytes(0) + { + csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char); + } + +private: + base_statement* m_where_clause; + std::vector<base_statement*> m_projections; + bool m_aggr_flow = false; //TODO once per query + bool m_is_to_aggregate; + bool m_skip_last_line; + size_t m_stream_length; + std::string m_error_description; + char* m_stream; + char* m_end_stream; + std::vector<char*> m_row_tokens{128}; + s3select* m_s3_select; + csvParser csv_parser; + size_t m_error_count; + bool m_extract_csv_header_info; + std::vector<std::string> m_csv_schema{128}; + + //handling arbitrary chunks (rows cut in the middle) + bool m_previous_line; + bool m_skip_first_line; + std::string merge_line; + std::string m_last_line; + size_t m_processed_bytes; + + int getNextRow() + { + size_t num_of_tokens=0; + + if(m_stream>=m_end_stream) + { + return -1; + } + + if(csv_parser.parse(m_stream, m_end_stream, &m_row_tokens, &num_of_tokens)<0) + { + throw base_s3select_exception("failed to parse csv stream", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + m_stream = (char*)csv_parser.currentLoc(); + + if (m_skip_last_line && m_stream >= m_end_stream) + { + return -1; + } + + return num_of_tokens; + + } + +public: + + void set(s3select* s3_query) + { + m_s3_select = s3_query; + base_s3object::set(m_s3_select->get_scratch_area()); + + m_projections = m_s3_select->get_projections_list(); + m_where_clause = m_s3_select->get_filter(); + + if (m_where_clause) + { + m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases()); + } + + for (auto p : m_projections) + { + p->traverse_and_apply(m_sa, m_s3_select->get_aliases()); + } + + m_aggr_flow = m_s3_select->is_aggregate_query(); + } + + + std::string get_error_description() + { + return m_error_description; + } + + virtual ~csv_object() {} + +public: + + + int getMatchRow( std::string& result) //TODO virtual ? getResult + { + int number_of_tokens = 0; + + + if (m_aggr_flow == true) + { + do + { + + number_of_tokens = getNextRow(); + if (number_of_tokens < 0) //end of stream + { + if (m_is_to_aggregate) + for (auto i : m_projections) + { + i->set_last_call(); + result.append( i->eval().to_string() ); + result.append(","); + } + + return number_of_tokens; + } + + if ((*m_projections.begin())->is_set_last_call()) + { + //should validate while query execution , no update upon nodes are marked with set_last_call + throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + m_sa->update(m_row_tokens, number_of_tokens); + for (auto a : *m_s3_select->get_aliases()->get()) + { + a.second->invalidate_cache_result(); + } + + if (!m_where_clause || m_where_clause->eval().i64() == true) + for (auto i : m_projections) + { + i->eval(); + } + + } + while (1); + } + else + { + + do + { + + number_of_tokens = getNextRow(); + if (number_of_tokens < 0) + { + return number_of_tokens; + } + + m_sa->update(m_row_tokens, number_of_tokens); + for (auto a : *m_s3_select->get_aliases()->get()) + { + a.second->invalidate_cache_result(); + } + + } + while (m_where_clause && m_where_clause->eval().i64() == false); + + for (auto i : m_projections) + { + result.append( i->eval().to_string() ); + result.append(","); + } + result.append("\n"); + } + + return number_of_tokens; //TODO wrong + } + + int extract_csv_header_info() + { + + if (m_csv_defintion.ignore_header_info == true) + { + while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter )) + { + m_stream++; + } + m_stream++; + } + else if(m_csv_defintion.use_header_info == true) + { + size_t num_of_tokens = getNextRow();//TODO validate number of tokens + + for(size_t i=0; i<num_of_tokens; i++) + { + m_csv_schema[i].assign(m_row_tokens[i]); + } + + m_s3_select->load_schema(m_csv_schema); + } + + m_extract_csv_header_info = true; + + return 0; + } + + int run_s3select_on_stream(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size) + { + //purpose: the cv data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores + //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed. + int status; + std::string tmp_buff; + u_int32_t skip_last_bytes = 0; + m_processed_bytes += stream_length; + + m_skip_first_line = false; + + if (m_previous_line) + { + //if previous broken line exist , merge it to current chunk + char* p_obj_chunk = (char*)csv_stream; + while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk<(csv_stream+stream_length)) + { + p_obj_chunk++; + } + + tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream)); + merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter; + m_previous_line = false; + m_skip_first_line = true; + + status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false); + } + + if (csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter) + { + //in case of "broken" last line + char* p_obj_chunk = (char*)&(csv_stream[stream_length - 1]); + while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk>csv_stream) + { + p_obj_chunk--; //scan until end-of previous line in chunk + } + + skip_last_bytes = (&(csv_stream[stream_length - 1]) - p_obj_chunk); + m_last_line.assign(p_obj_chunk + 1, p_obj_chunk + 1 + skip_last_bytes); //save it for next chunk + + m_previous_line = true;//it means to skip last line + + } + + status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size)); + + return status; + } + + int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate) + { + + + m_stream = (char*)csv_stream; + m_end_stream = (char*)csv_stream + stream_length; + m_is_to_aggregate = do_aggregate; + m_skip_last_line = skip_last_line; + + m_stream_length = stream_length; + + if(m_extract_csv_header_info == false) + { + extract_csv_header_info(); + } + + if(skip_first_line) + { + while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter )) + { + m_stream++; + } + m_stream++;//TODO nicer + } + + do + { + + int num = 0; + try + { + num = getMatchRow(result); + } + catch (base_s3select_exception& e) + { + std::cout << e.what() << std::endl; + m_error_description = e.what(); + m_error_count ++; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100)//abort query execution + { + return -1; + } + } + + if (num < 0) + { + break; + } + + } + while (1); + + return 0; + } +}; + +};//namespace + +#endif diff --git a/src/s3select/include/s3select_csv_parser.h b/src/s3select/include/s3select_csv_parser.h new file mode 100644 index 000000000..5527da913 --- /dev/null +++ b/src/s3select/include/s3select_csv_parser.h @@ -0,0 +1,407 @@ +#include <iostream> + +#include <boost/mpl/vector/vector30.hpp> +// back-end +#include <boost/msm/back/state_machine.hpp> +//front-end +#include <boost/msm/front/state_machine_def.hpp> + +#include <vector> + +namespace msm = boost::msm; +namespace mpl = boost::mpl; + +namespace s3selectEngine +{ +// events +struct event_column_sep {}; +struct event_eol {}; +struct event_end_of_stream {}; +struct event_not_column_sep {};//i.e any char +struct event_quote {}; +struct event_escape {}; +struct event_empty {}; + + +// front-end: define the FSM structure +struct csvStateMch_ : public msm::front::state_machine_def<csvStateMch_> +{ + char* input_stream; + std::vector<char*>* tokens; + std::vector<int> has_esc{128}; + size_t token_idx; + size_t escape_idx; + char* input_cur_location; + char* start_token; + bool end_of_parse; + + typedef csvStateMch_ csv_rules; + + csvStateMch_():end_of_parse(false) {} + + void set(const char* input, std::vector<char*>* tk) + { + input_cur_location = input_stream = const_cast<char*>(input); + token_idx = 0; + tokens = tk; + escape_idx = 0; + } + + char get_char() + { + return *input_cur_location; + } + + char get_next_char(const char * end_stream) + { + if (input_cur_location >= end_stream) + return 0; + + input_cur_location++; + return *input_cur_location; + } + + const char* currentLoc() + { + return input_cur_location; + } + + void parse_escape(char* in, char esc_char='\\') + { + //assumption atleast one escape and single + char* dst, *src; + + dst = src = in; + + while (1) + { + while (*src && *src != esc_char) + { + src++; //search for escape + } + + if (!*src) //reach end + { + char* p = src; + while (dst < src) + { + *dst++ = *p++; //full copy + } + return; + } + //found escape + dst = src; //override escape + //if(*(dst+1)=='n') {*dst=10;dst++;} //enables special character + + while (*dst) + { + *dst = *(dst + 1); + dst++; + } //copy with shift + } + } + + // The list of FSM states + struct Start_new_token_st : public msm::front::state<> + {};//0 + + struct In_new_token_st : public msm::front::state<> + {};//1 + + struct In_quote_st : public msm::front::state<> + {};//2 + + struct In_esc_in_token_st : public msm::front::state<> + {};//3 + + struct In_esc_quote_st : public msm::front::state<> + {};//4 + + struct In_esc_start_token_st : public msm::front::state<> + {};//5 + + struct End_of_line_st : public msm::front::state<> + {};//6 + + struct Empty_state : public msm::front::state<> + {};//7 + + + // the initial state of the csvStateMch SM. Must be defined + typedef Start_new_token_st initial_state; + + void start_new_token()//helper + { + start_token = input_cur_location; + (*tokens)[ token_idx ] = start_token; + token_idx++; + } + + // transition actions + void start_new_token(event_column_sep const&) + { + *input_cur_location = 0;//remove column-delimiter + start_new_token(); + } + + void start_new_token(event_not_column_sep const&) + { + start_new_token(); + } + + //need to handle empty lines(no tokens); + void start_new_token(event_eol const&) + { + if(!token_idx) + { + return; + } + (*tokens)[ token_idx ] = start_token; + token_idx++; + } + + void start_new_token(event_end_of_stream const&) {} + + void in_new_token(event_not_column_sep const&) + { + if(!*start_token) + { + start_token = input_cur_location; + } + } + + void in_new_token(event_eol const&) + { + *input_cur_location=0; + } + + void in_new_token(event_end_of_stream const&) {} + + void in_new_token(event_column_sep const&) + { + (*tokens)[ token_idx ] = input_cur_location+1; + *input_cur_location=0; + token_idx++; + } + + void in_new_token(event_quote const&) + { + if(!*start_token) + { + start_token = input_cur_location; + } + } + + void in_quote(event_quote const&) {} + + void in_quote(event_column_sep const&) {} + + void in_quote(event_not_column_sep const&) {} + + void in_quote(event_eol const&) + { + *input_cur_location=0; + } + + void in_quote(event_end_of_stream const&) + { + *input_cur_location=0; + } + + void start_new_token(event_quote const&) + { + start_new_token(); + } + + void push_escape_pos() + { + if(escape_idx && has_esc[ escape_idx -1]== (int)(token_idx-1)) + { + return; + } + has_esc[ escape_idx ] = token_idx-1; + escape_idx++; + } + void in_escape(event_escape const&) + { + push_escape_pos(); + } + void in_escape_start_new_token(event_escape const&) + { + start_new_token(); + push_escape_pos(); + } + + void in_escape(event_column_sep const&) {} + void in_escape(event_not_column_sep const&) {} + void in_escape(event_quote const&) {} + void in_escape(event_eol const&) {} + void in_escape(event_end_of_stream const&) {} + + void empty_action(event_empty const&) {} + + //TODO need a guard for tokens vector size (<MAX) + // Transition table for csvStateMch + struct transition_table : mpl::vector30< + // Start Event Next Action Guard + // +---------+-------------+---------+---------------------+----------------------+ + a_row < Start_new_token_st, event_column_sep , Start_new_token_st , &csv_rules::start_new_token >, + a_row < Start_new_token_st, event_not_column_sep , In_new_token_st , &csv_rules::start_new_token >, + a_row < Start_new_token_st, event_eol , End_of_line_st , &csv_rules::start_new_token >, + a_row < Start_new_token_st, event_end_of_stream , End_of_line_st , &csv_rules::start_new_token >, + a_row < In_new_token_st , event_not_column_sep, In_new_token_st, &csv_rules::in_new_token >, + a_row < In_new_token_st , event_column_sep , In_new_token_st, &csv_rules::in_new_token >, + a_row < In_new_token_st , event_eol , End_of_line_st, &csv_rules::in_new_token >, + a_row < In_new_token_st , event_end_of_stream , End_of_line_st, &csv_rules::in_new_token >, + + a_row < Start_new_token_st , event_quote , In_quote_st, &csv_rules::start_new_token >, //open quote + a_row < In_new_token_st , event_quote , In_quote_st, &csv_rules::in_quote >, //open quote + a_row < In_quote_st , event_quote , In_new_token_st, &csv_rules::in_quote >, //close quote + a_row < In_quote_st , event_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote + a_row < In_quote_st , event_not_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote + a_row < In_quote_st , event_eol , End_of_line_st, &csv_rules::in_quote >, //end of quote/line + a_row < In_quote_st , event_end_of_stream , End_of_line_st, &csv_rules::in_quote >, //end of quote/line + + + //TODO add transitions for escape just before eol , eos. + a_row < Start_new_token_st , event_escape , In_esc_start_token_st, &csv_rules::in_escape_start_new_token >, + a_row < In_esc_start_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >, //escape column-sep + a_row < In_esc_start_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_start_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_start_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >, + + a_row < In_new_token_st, event_escape, In_esc_in_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >, + + a_row < In_quote_st, event_escape, In_esc_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_column_sep, In_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_not_column_sep, In_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_escape, In_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_quote, In_quote_st, &csv_rules::in_escape > + + // +---------+-------------+---------+---------------------+----------------------+ + > {}; + + // Replaces the default no-transition response. + template <class FSM, class Event> + void no_transition(Event const& e, FSM&, int state) + { + std::cout << "no transition from state " << state + << " on event " << typeid(e).name() << std::endl; + } +}; //// end-of-state-machine + + + +// Pick a back-end +typedef msm::back::state_machine<csvStateMch_> csvStateMch; + +// +// Testing utilities. +// + +static char const* const state_names[] = {"Start_new_token_st", "In_new_token_st", "In_quote_st", "In_esc_in_token_st", + "In_esc_quote_st", "In_esc_start_token_st", "End_of_line_st", "Empty_state" + }; +void pstate(csvStateMch const& p)//debug +{ + std::cout << " -> " << state_names[p.current_state()[0]] << std::endl; +} + + +class csvParser +{ + + csvStateMch p; + + char m_row_delimeter; + char m_column_delimiter; + char m_quote_char; + char m_escape_char; + +public: + + csvParser(char rd='\n', char cd=',', char quot='"', char ec='\\'):m_row_delimeter(rd), m_column_delimiter(cd), m_quote_char(quot), m_escape_char(ec) {}; + + void set(char row_delimiter, char column_delimiter, char quot_char, char escape_char) + { + m_row_delimeter = row_delimiter; + m_column_delimiter = column_delimiter; + m_quote_char = quot_char; + m_escape_char = escape_char; + } + + int parse(char* input_stream, char* end_stream, std::vector<char*>* tk, size_t* num_of_tokens) + { + p.set(input_stream, tk); + + // needed to start the highest-level SM. This will call on_entry and mark the start of the SM + p.start(); + + //TODO for better performance to use template specialization (\n \ , ") + do + { + if (p.get_char() == m_row_delimeter) + { + p.process_event(event_eol()); + } + else if (p.get_char() == m_column_delimiter) + { + p.process_event(event_column_sep()); + } + else if (p.get_char() == 0) + { + p.process_event(event_end_of_stream()); + } + else if (p.get_char() == m_quote_char) + { + p.process_event(event_quote()); + } + else if (p.get_char() == m_escape_char) + { + p.process_event(event_escape()); + } + else + { + p.process_event(event_not_column_sep()); + } + + if (p.tokens->capacity() <= p.token_idx) + { + return -1; + } + + if (p.currentLoc() >= end_stream) + { + break; + } + p.get_next_char(end_stream); + } + while (p.current_state()[0] != 6); + + p.stop(); + + *num_of_tokens = p.token_idx; + + //second pass for escape rules; only token with escape are processed, if any. + for(size_t i=0; i<p.escape_idx; i++) + { + p.parse_escape((*tk)[p.has_esc[i]], m_escape_char); + } + + return 0; + } + + const char* currentLoc() + { + return p.currentLoc(); + } + +};//end csv-parser + +}//end-namespace + + diff --git a/src/s3select/include/s3select_functions.h b/src/s3select/include/s3select_functions.h new file mode 100644 index 000000000..77b3db3a7 --- /dev/null +++ b/src/s3select/include/s3select_functions.h @@ -0,0 +1,1037 @@ +#ifndef __S3SELECT_FUNCTIONS__ +#define __S3SELECT_FUNCTIONS__ + + +#include "s3select_oper.h" +#define BOOST_BIND_ACTION_PARAM( push_name ,param ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2, param) +namespace s3selectEngine +{ + +struct push_2dig +{ + void operator()(const char* a, const char* b, uint32_t* n) const + { + *n = ((char)(*a) - 48) *10 + ((char)*(a+1)-48) ; + } + +}; +static push_2dig g_push_2dig; + +struct push_4dig +{ + void operator()(const char* a, const char* b, uint32_t* n) const + { + *n = ((char)(*a) - 48) *1000 + ((char)*(a+1)-48)*100 + ((char)*(a+2)-48)*10 + ((char)*(a+3)-48); + } + +}; +static push_4dig g_push_4dig; + +enum class s3select_func_En_t {ADD, + SUM, + MIN, + MAX, + COUNT, + TO_INT, + TO_FLOAT, + TO_TIMESTAMP, + SUBSTR, + EXTRACT, + DATE_ADD, + DATE_DIFF, + UTCNOW + }; + + +class s3select_functions : public __clt_allocator +{ + +private: + + using FunctionLibrary = std::map<std::string, s3select_func_En_t>; + const FunctionLibrary m_functions_library = + { + {"add", s3select_func_En_t::ADD}, + {"sum", s3select_func_En_t::SUM}, + {"count", s3select_func_En_t::COUNT}, + {"min", s3select_func_En_t::MIN}, + {"max", s3select_func_En_t::MAX}, + {"int", s3select_func_En_t::TO_INT}, + {"float", s3select_func_En_t::TO_FLOAT}, + {"substr", s3select_func_En_t::SUBSTR}, + {"timestamp", s3select_func_En_t::TO_TIMESTAMP}, + {"extract", s3select_func_En_t::EXTRACT}, + {"dateadd", s3select_func_En_t::DATE_ADD}, + {"datediff", s3select_func_En_t::DATE_DIFF}, + {"utcnow", s3select_func_En_t::UTCNOW} + }; + +public: + + base_function* create(std::string fn_name); +}; + +class __function : public base_statement +{ + +private: + std::vector<base_statement*> arguments; + std::string name; + base_function* m_func_impl; + s3select_functions* m_s3select_functions; + variable m_result; + + void _resolve_name() + { + if (m_func_impl) + { + return; + } + + base_function* f = m_s3select_functions->create(name); + if (!f) + { + throw base_s3select_exception("function not found", base_s3select_exception::s3select_exp_en_t::FATAL); //should abort query + } + m_func_impl = f; + } + +public: + virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa) + { + m_scratch = sa; + m_aliases = pa; + for (base_statement* ba : arguments) + { + ba->traverse_and_apply(sa, pa); + } + } + + virtual bool is_aggregate() // TODO under semantic flow + { + _resolve_name(); + + return m_func_impl->is_aggregate(); + } + + virtual bool semantic() + { + return true; + } + + __function(const char* fname, s3select_functions* s3f) : name(fname), m_func_impl(0), m_s3select_functions(s3f) {} + + virtual value& eval() + { + + _resolve_name(); + + if (is_last_call == false) + { + (*m_func_impl)(&arguments, &m_result); + } + else + { + (*m_func_impl).get_aggregate_result(&m_result); + } + + return m_result.get_value(); + } + + + + virtual std::string print(int ident) + { + return std::string(0); + } + + void push_argument(base_statement* arg) + { + arguments.push_back(arg); + } + + + std::vector<base_statement*> get_arguments() + { + return arguments; + } + + virtual ~__function() + { + arguments.clear(); + } +}; + + + +/* + s3-select function defintions +*/ +struct _fn_add : public base_function +{ + + value var_result; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + iter++; + base_statement* y = *iter; + + var_result = x->eval() + y->eval(); + + *result = var_result; + + return true; + } +}; + +struct _fn_sum : public base_function +{ + + value sum; + + _fn_sum() : sum(0) + { + aggregate = true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + try + { + sum = sum + x->eval(); + } + catch (base_s3select_exception& e) + { + std::cout << "illegal value for aggregation(sum). skipping." << std::endl; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) + { + throw; + } + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = sum ; + } +}; + +struct _fn_count : public base_function +{ + + int64_t count; + + _fn_count():count(0) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + count += 1; + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + result->set_value(count); + } + +}; + +struct _fn_min : public base_function +{ + + value min; + + _fn_min():min(__INT64_MAX__) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + if(min > x->eval()) + { + min=x->eval(); + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = min; + } + +}; + +struct _fn_max : public base_function +{ + + value max; + + _fn_max():max(-__INT64_MAX__) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + if(max < x->eval()) + { + max=x->eval(); + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = max; + } + +}; + +struct _fn_to_int : public base_function +{ + + value var_result; + value func_arg; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + char* perr; + int64_t i=0; + func_arg = (*args->begin())->eval(); + + if (func_arg.type == value::value_En_t::STRING) + { + i = strtol(func_arg.str(), &perr, 10) ; //TODO check error before constructor + } + else if (func_arg.type == value::value_En_t::FLOAT) + { + i = func_arg.dbl(); + } + else + { + i = func_arg.i64(); + } + + var_result = i ; + *result = var_result; + + return true; + } + +}; + +struct _fn_to_float : public base_function +{ + + value var_result; + value v_from; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + char* perr; + double d=0; + value v = (*args->begin())->eval(); + + if (v.type == value::value_En_t::STRING) + { + d = strtod(v.str(), &perr) ; //TODO check error before constructor + } + else if (v.type == value::value_En_t::FLOAT) + { + d = v.dbl(); + } + else + { + d = v.i64(); + } + + var_result = d; + *result = var_result; + + return true; + } + +}; + +struct _fn_to_timestamp : public base_function +{ + bsc::rule<> separator = bsc::ch_p(":") | bsc::ch_p("-"); + + uint32_t yr = 1700, mo = 1, dy = 1; + bsc::rule<> dig4 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p >> bsc::digit_p >> bsc::digit_p]; + bsc::rule<> dig2 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p]; + + bsc::rule<> d_yyyymmdd_dig = ((dig4[BOOST_BIND_ACTION_PARAM(push_4dig, &yr)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mo)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &dy)]) >> *(separator)); + + uint32_t hr = 0, mn = 0, sc = 0; + bsc::rule<> d_time_dig = ((dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &hr)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mn)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &sc)]) >> *(separator)); + + boost::posix_time::ptime new_ptime; + + value v_str; + + + bool datetime_validation() + { + //TODO temporary , should check for leap year + + if(yr<1700 || yr>2050) + { + return false; + } + if (mo<1 || mo>12) + { + return false; + } + if (dy<1 || dy>31) + { + return false; + } + if (hr>23) + { + return false; + } + if (dy>59) + { + return false; + } + if (sc>59) + { + return false; + } + + return true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + + hr = 0; + mn = 0; + sc = 0; + + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size != 1) + { + throw base_s3select_exception("to_timestamp should have one parameter"); + } + + base_statement* str = *iter; + + v_str = str->eval(); + + if (v_str.type != value::value_En_t::STRING) + { + throw base_s3select_exception("to_timestamp first argument must be string"); //can skip current row + } + + bsc::parse_info<> info_dig = bsc::parse(v_str.str(), d_yyyymmdd_dig >> *(separator) >> d_time_dig); + + if(datetime_validation()==false or !info_dig.full) + { + throw base_s3select_exception("input date-time is illegal"); + } + + new_ptime = boost::posix_time::ptime(boost::gregorian::date(yr, mo, dy), + boost::posix_time::hours(hr) + boost::posix_time::minutes(mn) + boost::posix_time::seconds(sc)); + + result->set_value(&new_ptime); + + return true; + } + +}; + +struct _fn_extact_from_timestamp : public base_function +{ + + boost::posix_time::ptime new_ptime; + + value val_date_part; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 2) + { + throw base_s3select_exception("to_timestamp should have 2 parameters"); + } + + base_statement* date_part = *iter; + + val_date_part = date_part->eval();//TODO could be done once? + + if(val_date_part.is_string()== false) + { + throw base_s3select_exception("first parameter should be string"); + } + + iter++; + + base_statement* ts = *iter; + + if(ts->eval().is_timestamp()== false) + { + throw base_s3select_exception("second parameter is not timestamp"); + } + + new_ptime = *ts->eval().timestamp(); + + if( strcmp(val_date_part.str(), "year")==0 ) + { + result->set_value( (int64_t)new_ptime.date().year() ); + } + else if( strcmp(val_date_part.str(), "month")==0 ) + { + result->set_value( (int64_t)new_ptime.date().month() ); + } + else if( strcmp(val_date_part.str(), "day")==0 ) + { + result->set_value( (int64_t)new_ptime.date().day_of_year() ); + } + else if( strcmp(val_date_part.str(), "week")==0 ) + { + result->set_value( (int64_t)new_ptime.date().week_number() ); + } + else + { + throw base_s3select_exception(std::string( val_date_part.str() + std::string(" is not supported ") ).c_str() ); + } + + return true; + } + +}; + +struct _fn_diff_timestamp : public base_function +{ + + value val_date_part; + value val_dt1; + value val_dt2; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 3) + { + throw base_s3select_exception("datediff need 3 parameters"); + } + + base_statement* date_part = *iter; + + val_date_part = date_part->eval(); + + iter++; + base_statement* dt1_param = *iter; + val_dt1 = dt1_param->eval(); + if (val_dt1.is_timestamp() == false) + { + throw base_s3select_exception("second parameter should be timestamp"); + } + + iter++; + base_statement* dt2_param = *iter; + val_dt2 = dt2_param->eval(); + if (val_dt2.is_timestamp() == false) + { + throw base_s3select_exception("third parameter should be timestamp"); + } + + if (strcmp(val_date_part.str(), "year") == 0) + { + int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ; + result->set_value( yr ); + } + else if (strcmp(val_date_part.str(), "month") == 0) + { + int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ; + int64_t mon = val_dt2.timestamp()->date().month() - val_dt1.timestamp()->date().month() ; + + result->set_value( yr*12 + mon ); + } + else if (strcmp(val_date_part.str(), "day") == 0) + { + boost::gregorian::date_period dp = + boost::gregorian::date_period( val_dt1.timestamp()->date(), val_dt2.timestamp()->date()); + result->set_value( dp.length().days() ); + } + else if (strcmp(val_date_part.str(), "hours") == 0) + { + boost::posix_time::time_duration td_res = (*val_dt2.timestamp() - *val_dt1.timestamp()); + result->set_value( td_res.hours()); + } + else + { + throw base_s3select_exception("first parameter should be string: year,month,hours,day"); + } + + + return true; + } +}; + +struct _fn_add_to_timestamp : public base_function +{ + + boost::posix_time::ptime new_ptime; + + value val_date_part; + value val_quantity; + value val_timestamp; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 3) + { + throw base_s3select_exception("add_to_timestamp should have 3 parameters"); + } + + base_statement* date_part = *iter; + val_date_part = date_part->eval();//TODO could be done once? + + if(val_date_part.is_string()== false) + { + throw base_s3select_exception("first parameter should be string"); + } + + iter++; + base_statement* quan = *iter; + val_quantity = quan->eval(); + + if (val_quantity.is_number() == false) + { + throw base_s3select_exception("second parameter should be number"); //TODO what about double? + } + + iter++; + base_statement* ts = *iter; + val_timestamp = ts->eval(); + + if(val_timestamp.is_timestamp() == false) + { + throw base_s3select_exception("third parameter should be time-stamp"); + } + + new_ptime = *val_timestamp.timestamp(); + + if( strcmp(val_date_part.str(), "year")==0 ) + { + new_ptime += boost::gregorian::years( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else if( strcmp(val_date_part.str(), "month")==0 ) + { + new_ptime += boost::gregorian::months( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else if( strcmp(val_date_part.str(), "day")==0 ) + { + new_ptime += boost::gregorian::days( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else + { + throw base_s3select_exception( std::string(val_date_part.str() + std::string(" is not supported for add")).c_str()); + } + + return true; + } + +}; + +struct _fn_utcnow : public base_function +{ + + boost::posix_time::ptime now_ptime; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + int args_size = args->size(); + + if (args_size != 0) + { + throw base_s3select_exception("utcnow does not expect any parameters"); + } + + now_ptime = boost::posix_time::ptime( boost::posix_time::second_clock::universal_time()); + result->set_value( &now_ptime ); + + return true; + } +}; + +struct _fn_substr : public base_function +{ + + char buff[4096];// this buffer is persist for the query life time, it use for the results per row(only for the specific function call) + //it prevent from intensive use of malloc/free (fragmentation). + //should validate result length. + //TODO may replace by std::string (dynamic) , or to replace with global allocator , in query scope. + value v_str; + value v_from; + value v_to; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + + if (args_size<2) + { + throw base_s3select_exception("substr accept 2 arguments or 3"); + } + + base_statement* str = *iter; + iter++; + base_statement* from = *iter; + base_statement* to; + + if (args_size == 3) + { + iter++; + to = *iter; + } + + v_str = str->eval(); + + if(v_str.type != value::value_En_t::STRING) + { + throw base_s3select_exception("substr first argument must be string"); //can skip current row + } + + int str_length = strlen(v_str.str()); + + v_from = from->eval(); + if(v_from.is_string()) + { + throw base_s3select_exception("substr second argument must be number"); //can skip current row + } + + int64_t f; + int64_t t; + + if (args_size==3) + { + v_to = to->eval(); + if (v_to.is_string()) + { + throw base_s3select_exception("substr third argument must be number"); //can skip row + } + } + + if (v_from.type == value::value_En_t::FLOAT) + { + f=v_from.dbl(); + } + else + { + f=v_from.i64(); + } + + if (f>str_length) + { + throw base_s3select_exception("substr start position is too far"); //can skip row + } + + if (str_length>(int)sizeof(buff)) + { + throw base_s3select_exception("string too long for internal buffer"); //can skip row + } + + if (args_size == 3) + { + if (v_from.type == value::value_En_t::FLOAT) + { + t = v_to.dbl(); + } + else + { + t = v_to.i64(); + } + + if( (str_length-(f-1)-t) <0) + { + throw base_s3select_exception("substr length parameter beyond bounderies"); //can skip row + } + + strncpy(buff, v_str.str()+f-1, t); + } + else + { + strcpy(buff, v_str.str()+f-1); + } + + result->set_value(buff); + + return true; + } + +}; + +base_function* s3select_functions::create(std::string fn_name) +{ + const FunctionLibrary::const_iterator iter = m_functions_library.find(fn_name); + + if (iter == m_functions_library.end()) + { + std::string msg; + msg = fn_name + " " + " function not found"; + throw base_s3select_exception(msg, base_s3select_exception::s3select_exp_en_t::FATAL); + } + + switch (iter->second) + { + case s3select_func_En_t::ADD: + return S3SELECT_NEW(_fn_add); + break; + + case s3select_func_En_t::SUM: + return S3SELECT_NEW(_fn_sum); + break; + + case s3select_func_En_t::COUNT: + return S3SELECT_NEW(_fn_count); + break; + + case s3select_func_En_t::MIN: + return S3SELECT_NEW(_fn_min); + break; + + case s3select_func_En_t::MAX: + return S3SELECT_NEW(_fn_max); + break; + + case s3select_func_En_t::TO_INT: + return S3SELECT_NEW(_fn_to_int); + break; + + case s3select_func_En_t::TO_FLOAT: + return S3SELECT_NEW(_fn_to_float); + break; + + case s3select_func_En_t::SUBSTR: + return S3SELECT_NEW(_fn_substr); + break; + + case s3select_func_En_t::TO_TIMESTAMP: + return S3SELECT_NEW(_fn_to_timestamp); + break; + + case s3select_func_En_t::EXTRACT: + return S3SELECT_NEW(_fn_extact_from_timestamp); + break; + + case s3select_func_En_t::DATE_ADD: + return S3SELECT_NEW(_fn_add_to_timestamp); + break; + + case s3select_func_En_t::DATE_DIFF: + return S3SELECT_NEW(_fn_diff_timestamp); + break; + + case s3select_func_En_t::UTCNOW: + return S3SELECT_NEW(_fn_utcnow); + break; + + default: + throw base_s3select_exception("internal error while resolving function-name"); + break; + } +} + +bool base_statement::is_function() +{ + if (dynamic_cast<__function*>(this)) + { + return true; + } + else + { + return false; + } +} + +bool base_statement::is_aggregate_exist_in_expression(base_statement* e) //TODO obsolete ? +{ + if (e->is_aggregate()) + { + return true; + } + + if (e->left() && e->left()->is_aggregate_exist_in_expression(e->left())) + { + return true; + } + + if (e->right() && e->right()->is_aggregate_exist_in_expression(e->right())) + { + return true; + } + + if (e->is_function()) + { + for (auto i : dynamic_cast<__function*>(e)->get_arguments()) + if (e->is_aggregate_exist_in_expression(i)) + { + return true; + } + } + + return false; +} + +base_statement* base_statement::get_aggregate() +{ + //search for aggregation function in AST + base_statement* res = 0; + + if (is_aggregate()) + { + return this; + } + + if (left() && (res=left()->get_aggregate())!=0) + { + return res; + } + + if (right() && (res=right()->get_aggregate())!=0) + { + return res; + } + + if (is_function()) + { + for (auto i : dynamic_cast<__function*>(this)->get_arguments()) + { + base_statement* b=i->get_aggregate(); + if (b) + { + return b; + } + } + } + return 0; +} + +bool base_statement::is_nested_aggregate(base_statement* e) +{ + //validate for non nested calls for aggregation function, i.e. sum ( min ( )) + if (e->is_aggregate()) + { + if (e->left()) + { + if (e->left()->is_aggregate_exist_in_expression(e->left())) + { + return true; + } + } + else if (e->right()) + { + if (e->right()->is_aggregate_exist_in_expression(e->right())) + { + return true; + } + } + else if (e->is_function()) + { + for (auto i : dynamic_cast<__function*>(e)->get_arguments()) + { + if (i->is_aggregate_exist_in_expression(i)) + { + return true; + } + } + } + return false; + } + return false; +} + +// select sum(c2) ... + c1 ... is not allowed. a binary operation with scalar is OK. i.e. select sum() + 1 +bool base_statement::is_binop_aggregate_and_column(base_statement* skip_expression) +{ + if (left() && left() != skip_expression) //can traverse to left + { + if (left()->is_column()) + { + return true; + } + else if (left()->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + + if (right() && right() != skip_expression) //can traverse right + { + if (right()->is_column()) + { + return true; + } + else if (right()->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + + if (this != skip_expression && is_function()) + { + + __function* f = (dynamic_cast<__function*>(this)); + std::vector<base_statement*> l = f->get_arguments(); + for (auto i : l) + { + if (i!=skip_expression && i->is_column()) + { + return true; + } + if (i->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + } + + return false; +} + +} //namespace s3selectEngine + +#endif diff --git a/src/s3select/include/s3select_oper.h b/src/s3select/include/s3select_oper.h new file mode 100644 index 000000000..591e5f9da --- /dev/null +++ b/src/s3select/include/s3select_oper.h @@ -0,0 +1,1320 @@ +#ifndef __S3SELECT_OPER__ +#define __S3SELECT_OPER__ + +#include <string> +#include <iostream> +#include <list> +#include <map> +#include <vector> +#include <string.h> +#include <math.h> + +#include <boost/lexical_cast.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/bind.hpp> +namespace bsc = BOOST_SPIRIT_CLASSIC_NS; + +namespace s3selectEngine +{ + +class base_s3select_exception +{ + +public: + enum class s3select_exp_en_t + { + NONE, + ERROR, + FATAL + } ; + +private: + s3select_exp_en_t m_severity; + +public: + std::string _msg; + base_s3select_exception(const char* n) : m_severity(s3select_exp_en_t::NONE) + { + _msg.assign(n); + } + base_s3select_exception(const char* n, s3select_exp_en_t severity) : m_severity(severity) + { + _msg.assign(n); + } + base_s3select_exception(std::string n, s3select_exp_en_t severity) : m_severity(severity) + { + _msg = n; + } + + virtual const char* what() + { + return _msg.c_str(); + } + + s3select_exp_en_t severity() + { + return m_severity; + } + + virtual ~base_s3select_exception() {} +}; + + +// pointer to dynamic allocated buffer , which used for placement new. +static __thread char* _s3select_buff_ptr =0; + +class s3select_allocator //s3select is the "owner" +{ +private: + + std::vector<char*> list_of_buff; + u_int32_t m_idx; + +public: +#define __S3_ALLOCATION_BUFF__ (8*1024) + s3select_allocator():m_idx(0) + { + list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__)); + } + + void set_global_buff() + { + char* buff = list_of_buff.back(); + _s3select_buff_ptr = &buff[ m_idx ]; + } + + void check_capacity(size_t sz) + { + if (sz>__S3_ALLOCATION_BUFF__) + { + throw base_s3select_exception("requested size too big", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + if ((m_idx + sz) >= __S3_ALLOCATION_BUFF__) + { + list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__)); + m_idx = 0; + } + } + + void inc(size_t sz) + { + m_idx += sz; + m_idx += sizeof(char*) - (m_idx % sizeof(char*)); //alignment + } + + void zero() + { + //not a must, its for safty. + _s3select_buff_ptr=0; + } + + virtual ~s3select_allocator() + { + for(auto b : list_of_buff) + { + free(b); + } + } +}; + +class __clt_allocator +{ +public: + s3select_allocator* m_s3select_allocator; + +public: + + __clt_allocator():m_s3select_allocator(0) {} + + void set(s3select_allocator* a) + { + m_s3select_allocator = a; + } +}; + +// placement new for allocation of all s3select objects on single(or few) buffers, deallocation of those objects is by releasing the buffer. +#define S3SELECT_NEW( type , ... ) [=]() \ + { \ + m_s3select_allocator->check_capacity(sizeof( type )); \ + m_s3select_allocator->set_global_buff(); \ + auto res=new (_s3select_buff_ptr) type(__VA_ARGS__); \ + m_s3select_allocator->inc(sizeof( type )); \ + m_s3select_allocator->zero(); \ + return res; \ + }(); + +class scratch_area +{ + +private: + std::vector<std::string_view> m_columns{128}; + int m_upper_bound; + + std::vector<std::pair<std::string, int >> m_column_name_pos; + +public: + + void set_column_pos(const char* n, int pos)//TODO use std::string + { + m_column_name_pos.push_back( std::pair<const char*, int>(n, pos)); + } + + void update(std::vector<char*> tokens, size_t num_of_tokens) + { + size_t i=0; + for(auto s : tokens) + { + if (i>=num_of_tokens) + { + break; + } + + m_columns[i++] = s; + } + m_upper_bound = i; + + } + + int get_column_pos(const char* n) + { + //done only upon building the AST , not on "runtime" + + std::vector<std::pair<std::string, int >>::iterator iter; + + for( auto iter : m_column_name_pos) + { + if (!strcmp(iter.first.c_str(), n)) + { + return iter.second; + } + } + + return -1; + } + + std::string_view get_column_value(int column_pos) + { + + if ((column_pos >= m_upper_bound) || column_pos < 0) + { + throw base_s3select_exception("column_position_is_wrong", base_s3select_exception::s3select_exp_en_t::ERROR); + } + + return m_columns[column_pos]; + } + + int get_num_of_columns() + { + return m_upper_bound; + } +}; + +class base_statement; +class projection_alias +{ +//purpose: mapping between alias-name to base_statement* +//those routines are *NOT* intensive, works once per query parse time. + +private: + std::vector< std::pair<std::string, base_statement*> > alias_map; + +public: + std::vector< std::pair<std::string, base_statement*> >* get() + { + return &alias_map; + } + + bool insert_new_entry(std::string alias_name, base_statement* bs) + { + //purpose: only unique alias names. + + for(auto alias: alias_map) + { + if(alias.first.compare(alias_name) == 0) + { + return false; //alias name already exist + } + + } + std::pair<std::string, base_statement*> new_alias(alias_name, bs); + alias_map.push_back(new_alias); + + return true; + } + + base_statement* search_alias(std::string alias_name) + { + for(auto alias: alias_map) + { + if(alias.first.compare(alias_name) == 0) + { + return alias.second; //refernce to execution node + } + } + return 0; + } +}; + +struct binop_plus +{ + double operator()(double a, double b) + { + return a+b; + } +}; + +struct binop_minus +{ + double operator()(double a, double b) + { + return a-b; + } +}; + +struct binop_mult +{ + double operator()(double a, double b) + { + return a * b; + } +}; + +struct binop_div +{ + double operator()(double a, double b) + { + return a / b; + } +}; + +struct binop_pow +{ + double operator()(double a, double b) + { + return pow(a, b); + } +}; + +class value +{ + +public: + typedef union + { + int64_t num; + char* str;//TODO consider string_view + double dbl; + boost::posix_time::ptime* timestamp; + } value_t; + +private: + value_t __val; + std::string m_to_string; + std::string m_str_value; + +public: + enum class value_En_t + { + DECIMAL, + FLOAT, + STRING, + TIMESTAMP, + NA + } ; + value_En_t type; + + value(int64_t n) : type(value_En_t::DECIMAL) + { + __val.num = n; + } + value(int n) : type(value_En_t::DECIMAL) + { + __val.num = n; + } + value(bool b) : type(value_En_t::DECIMAL) + { + __val.num = (int64_t)b; + } + value(double d) : type(value_En_t::FLOAT) + { + __val.dbl = d; + } + value(boost::posix_time::ptime* timestamp) : type(value_En_t::TIMESTAMP) + { + __val.timestamp = timestamp; + } + + value(const char* s) : type(value_En_t::STRING) + { + m_str_value.assign(s); + __val.str = m_str_value.data(); + } + + value():type(value_En_t::NA) + { + __val.num=0; + } + + bool is_number() const + { + if ((type == value_En_t::DECIMAL || type == value_En_t::FLOAT)) + { + return true; + } + + return false; + } + + bool is_string() const + { + return type == value_En_t::STRING; + } + bool is_timestamp() const + { + return type == value_En_t::TIMESTAMP; + } + + + std::string& to_string() //TODO very intensive , must improve this + { + + if (type != value_En_t::STRING) + { + if (type == value_En_t::DECIMAL) + { + m_to_string.assign( boost::lexical_cast<std::string>(__val.num) ); + } + else if(type == value_En_t::FLOAT) + { + m_to_string = boost::lexical_cast<std::string>(__val.dbl); + } + else + { + m_to_string = to_simple_string( *__val.timestamp ); + } + } + else + { + m_to_string.assign( __val.str ); + } + + return m_to_string; + } + + + value& operator=(value& o) + { + if(this->type == value_En_t::STRING) + { + m_str_value.assign(o.str()); + __val.str = m_str_value.data(); + } + else + { + this->__val = o.__val; + } + + this->type = o.type; + + return *this; + } + + value& operator=(const char* s) + { + m_str_value.assign(s); + this->__val.str = m_str_value.data(); + this->type = value_En_t::STRING; + + return *this; + } + + value& operator=(int64_t i) + { + this->__val.num = i; + this->type = value_En_t::DECIMAL; + + return *this; + } + + value& operator=(double d) + { + this->__val.dbl = d; + this->type = value_En_t::FLOAT; + + return *this; + } + + value& operator=(bool b) + { + this->__val.num = (int64_t)b; + this->type = value_En_t::DECIMAL; + + return *this; + } + + value& operator=(boost::posix_time::ptime* p) + { + this->__val.timestamp = p; + this->type = value_En_t::TIMESTAMP; + + return *this; + } + + int64_t i64() + { + return __val.num; + } + + const char* str() + { + return __val.str; + } + + double dbl() + { + return __val.dbl; + } + + boost::posix_time::ptime* timestamp() const + { + return __val.timestamp; + } + + bool operator<(const value& v)//basic compare operator , most itensive runtime operation + { + //TODO NA possible? + if (is_string() && v.is_string()) + { + return strcmp(__val.str, v.__val.str) < 0; + } + + if (is_number() && v.is_number()) + { + + if(type != v.type) //conversion //TODO find better way + { + if (type == value_En_t::DECIMAL) + { + return (double)__val.num < v.__val.dbl; + } + else + { + return __val.dbl < (double)v.__val.num; + } + } + else //no conversion + { + if(type == value_En_t::DECIMAL) + { + return __val.num < v.__val.num; + } + else + { + return __val.dbl < v.__val.dbl; + } + + } + } + + if(is_timestamp() && v.is_timestamp()) + { + return *timestamp() < *(v.timestamp()); + } + + throw base_s3select_exception("operands not of the same type(numeric , string), while comparision"); + } + + bool operator>(const value& v) //basic compare operator , most itensive runtime operation + { + //TODO NA possible? + if (is_string() && v.is_string()) + { + return strcmp(__val.str, v.__val.str) > 0; + } + + if (is_number() && v.is_number()) + { + + if(type != v.type) //conversion //TODO find better way + { + if (type == value_En_t::DECIMAL) + { + return (double)__val.num > v.__val.dbl; + } + else + { + return __val.dbl > (double)v.__val.num; + } + } + else //no conversion + { + if(type == value_En_t::DECIMAL) + { + return __val.num > v.__val.num; + } + else + { + return __val.dbl > v.__val.dbl; + } + + } + } + + if(is_timestamp() && v.is_timestamp()) + { + return *timestamp() > *(v.timestamp()); + } + + throw base_s3select_exception("operands not of the same type(numeric , string), while comparision"); + } + + bool operator==(const value& v) //basic compare operator , most itensive runtime operation + { + //TODO NA possible? + if (is_string() && v.is_string()) + { + return strcmp(__val.str, v.__val.str) == 0; + } + + + if (is_number() && v.is_number()) + { + + if(type != v.type) //conversion //TODO find better way + { + if (type == value_En_t::DECIMAL) + { + return (double)__val.num == v.__val.dbl; + } + else + { + return __val.dbl == (double)v.__val.num; + } + } + else //no conversion + { + if(type == value_En_t::DECIMAL) + { + return __val.num == v.__val.num; + } + else + { + return __val.dbl == v.__val.dbl; + } + + } + } + + if(is_timestamp() && v.is_timestamp()) + { + return *timestamp() == *(v.timestamp()); + } + + throw base_s3select_exception("operands not of the same type(numeric , string), while comparision"); + } + bool operator<=(const value& v) + { + return !(*this>v); + } + bool operator>=(const value& v) + { + return !(*this<v); + } + bool operator!=(const value& v) + { + return !(*this == v); + } + + template<typename binop> //conversion rules for arithmetical binary operations + value& compute(value& l, const value& r) //left should be this, it contain the result + { + binop __op; + + if (l.is_string() || r.is_string()) + { + throw base_s3select_exception("illegal binary operation with string"); + } + + if (l.type != r.type) + { + //conversion + + if (l.type == value_En_t::DECIMAL) + { + l.__val.dbl = __op((double)l.__val.num, r.__val.dbl); + l.type = value_En_t::FLOAT; + } + else + { + l.__val.dbl = __op(l.__val.dbl, (double)r.__val.num); + l.type = value_En_t::FLOAT; + } + } + else + { + //no conversion + + if (l.type == value_En_t::DECIMAL) + { + l.__val.num = __op(l.__val.num, r.__val.num ); + l.type = value_En_t::DECIMAL; + } + else + { + l.__val.dbl = __op(l.__val.dbl, r.__val.dbl ); + l.type = value_En_t::FLOAT; + } + } + + return l; + } + + value& operator+(const value& v) + { + return compute<binop_plus>(*this, v); + } + + value& operator-(const value& v) + { + return compute<binop_minus>(*this, v); + } + + value& operator*(const value& v) + { + return compute<binop_mult>(*this, v); + } + + value& operator/(const value& v) // TODO handle division by zero + { + return compute<binop_div>(*this, v); + } + + value& operator^(const value& v) + { + return compute<binop_pow>(*this, v); + } + +}; + +class base_statement +{ + +protected: + + scratch_area* m_scratch; + projection_alias* m_aliases; + bool is_last_call; //valid only for aggregation functions + bool m_is_cache_result; + value m_alias_result; + base_statement* m_projection_alias; + int m_eval_stack_depth; + +public: + base_statement():m_scratch(0), is_last_call(false), m_is_cache_result(false), m_projection_alias(0), m_eval_stack_depth(0) {} + virtual value& eval() =0; + virtual base_statement* left() + { + return 0; + } + virtual base_statement* right() + { + return 0; + } + virtual std::string print(int ident) =0;//TODO complete it, one option to use level parametr in interface , + virtual bool semantic() =0;//done once , post syntax , traverse all nodes and validate semantics. + + virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa) + { + m_scratch = sa; + m_aliases = pa; + if (left()) + { + left()->traverse_and_apply(m_scratch, m_aliases); + } + if (right()) + { + right()->traverse_and_apply(m_scratch, m_aliases); + } + } + + virtual bool is_aggregate() + { + return false; + } + virtual bool is_column() + { + return false; + } + + bool is_function(); + bool is_aggregate_exist_in_expression(base_statement* e);//TODO obsolete ? + base_statement* get_aggregate(); + bool is_nested_aggregate(base_statement* e); + bool is_binop_aggregate_and_column(base_statement* skip); + + virtual void set_last_call() + { + is_last_call = true; + if(left()) + { + left()->set_last_call(); + } + if(right()) + { + right()->set_last_call(); + } + } + + bool is_set_last_call() + { + return is_last_call; + } + + void invalidate_cache_result() + { + m_is_cache_result = false; + } + + bool is_result_cached() + { + return m_is_cache_result == true; + } + + void set_result_cache(value& eval_result) + { + m_alias_result = eval_result; + m_is_cache_result = true; + } + + void dec_call_stack_depth() + { + m_eval_stack_depth --; + } + + value& get_result_cache() + { + return m_alias_result; + } + + int& get_eval_call_depth() + { + m_eval_stack_depth++; + return m_eval_stack_depth; + } + + virtual ~base_statement() {} + +}; + +class variable : public base_statement +{ + +public: + + enum class var_t + { + NA, + VAR,//schema column (i.e. age , price , ...) + COL_VALUE, //concrete value + POS, // CSV column number (i.e. _1 , _2 ... ) + STAR_OPERATION, //'*' + } ; + var_t m_var_type; + +private: + + std::string _name; + int column_pos; + value var_value; + std::string m_star_op_result; + char m_star_op_result_charc[4096]; //TODO should be dynamic + + const int undefined_column_pos = -1; + const int column_alias = -2; + +public: + variable():m_var_type(var_t::NA), _name(""), column_pos(-1) {} + + variable(int64_t i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {} + + variable(double d) : m_var_type(var_t::COL_VALUE), _name("#"), column_pos(-1), var_value(d) {} + + variable(int i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {} + + variable(const std::string& n) : m_var_type(var_t::VAR), _name(n), column_pos(-1) {} + + variable(const std::string& n, var_t tp) : m_var_type(var_t::NA) + { + if(tp == variable::var_t::POS) + { + _name = n; + m_var_type = tp; + int pos = atoi( n.c_str() + 1 ); //TODO >0 < (schema definition , semantic analysis) + column_pos = pos -1;// _1 is the first column ( zero position ) + } + else if (tp == variable::var_t::COL_VALUE) + { + _name = "#"; + m_var_type = tp; + column_pos = -1; + var_value = n.c_str(); + } + else if (tp ==variable::var_t::STAR_OPERATION) + { + _name = "#"; + m_var_type = tp; + column_pos = -1; + } + } + + void operator=(value& v) + { + var_value = v; + } + + void set_value(const char* s) + { + var_value = s; + } + + void set_value(double d) + { + var_value = d; + } + + void set_value(int64_t i) + { + var_value = i; + } + + void set_value(boost::posix_time::ptime* p) + { + var_value = p; + } + + virtual ~variable() {} + + virtual bool is_column() //is reference to column. + { + if(m_var_type == var_t::VAR || m_var_type == var_t::POS) + { + return true; + } + return false; + } + + value& get_value() + { + return var_value; //TODO is it correct + } + virtual value::value_En_t get_value_type() + { + return var_value.type; + } + + + value& star_operation() //purpose return content of all columns in a input stream + { + + + int i; + size_t pos=0; + int num_of_columns = m_scratch->get_num_of_columns(); + for(i=0; i<num_of_columns-1; i++) + { + size_t len = m_scratch->get_column_value(i).size(); + if((pos+len)>sizeof(m_star_op_result_charc)) + { + throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len); + pos += len; + m_star_op_result_charc[ pos ] = ',';//TODO need for another abstraction (per file type) + pos ++; + + } + + size_t len = m_scratch->get_column_value(i).size(); + if((pos+len)>sizeof(m_star_op_result_charc)) + { + throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len); + m_star_op_result_charc[ pos + len ] = 0; + var_value = (char*)&m_star_op_result_charc[0]; + return var_value; + } + + virtual value& eval() + { + if (m_var_type == var_t::COL_VALUE) + { + return var_value; // a literal,could be deciml / float / string + } + else if(m_var_type == var_t::STAR_OPERATION) + { + return star_operation(); + } + else if (column_pos == undefined_column_pos) + { + //done once , for the first time + column_pos = m_scratch->get_column_pos(_name.c_str()); + + if(column_pos>=0 && m_aliases->search_alias(_name.c_str())) + { + throw base_s3select_exception(std::string("multiple definition of column {") + _name + "} as schema-column and alias", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + + if (column_pos == undefined_column_pos) + { + //not belong to schema , should exist in aliases + m_projection_alias = m_aliases->search_alias(_name.c_str()); + + //not enter this scope again + column_pos = column_alias; + if(m_projection_alias == 0) + { + throw base_s3select_exception(std::string("alias {")+_name+std::string("} or column not exist in schema"), base_s3select_exception::s3select_exp_en_t::FATAL); + } + } + + } + + if (m_projection_alias) + { + if (m_projection_alias->get_eval_call_depth()>2) + { + throw base_s3select_exception("number of calls exceed maximum size, probably a cyclic reference to alias", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + if (m_projection_alias->is_result_cached() == false) + { + var_value = m_projection_alias->eval(); + m_projection_alias->set_result_cache(var_value); + } + else + { + var_value = m_projection_alias->get_result_cache(); + } + + m_projection_alias->dec_call_stack_depth(); + } + else + { + var_value = (char*)m_scratch->get_column_value(column_pos).data(); //no allocation. returning pointer of allocated space + } + + return var_value; + } + + virtual std::string print(int ident) + { + //std::string out = std::string(ident,' ') + std::string("var:") + std::to_string(var_value.__val.num); + //return out; + return std::string("#");//TBD + } + + virtual bool semantic() + { + return false; + } + +}; + +class arithmetic_operand : public base_statement +{ + +public: + + enum class cmp_t {NA, EQ, LE, LT, GT, GE, NE} ; + +private: + base_statement* l; + base_statement* r; + + cmp_t _cmp; + value var_value; + +public: + + virtual bool semantic() + { + return true; + } + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual std::string print(int ident) + { + //std::string out = std::string(ident,' ') + "compare:" += std::to_string(_cmp) + "\n" + l->print(ident-5) +r->print(ident+5); + //return out; + return std::string("#");//TBD + } + + virtual value& eval() + { + + switch (_cmp) + { + case cmp_t::EQ: + return var_value = (l->eval() == r->eval()); + break; + + case cmp_t::LE: + return var_value = (l->eval() <= r->eval()); + break; + + case cmp_t::GE: + return var_value = (l->eval() >= r->eval()); + break; + + case cmp_t::NE: + return var_value = (l->eval() != r->eval()); + break; + + case cmp_t::GT: + return var_value = (l->eval() > r->eval()); + break; + + case cmp_t::LT: + return var_value = (l->eval() < r->eval()); + break; + + default: + throw base_s3select_exception("internal error"); + break; + } + } + + arithmetic_operand(base_statement* _l, cmp_t c, base_statement* _r):l(_l), r(_r), _cmp(c) {} + + virtual ~arithmetic_operand() {} +}; + +class logical_operand : public base_statement +{ + +public: + + enum class oplog_t {AND, OR, NA}; + +private: + base_statement* l; + base_statement* r; + + oplog_t _oplog; + value var_value; + +public: + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual bool semantic() + { + return true; + } + + logical_operand(base_statement* _l, oplog_t _o, base_statement* _r):l(_l), r(_r), _oplog(_o) {} + + virtual ~logical_operand() {} + + virtual std::string print(int ident) + { + //std::string out = std::string(ident, ' ') + "logical_operand:" += std::to_string(_oplog) + "\n" + l->print(ident - 5) + r->print(ident + 5); + //return out; + return std::string("#");//TBD + } + virtual value& eval() + { + if (_oplog == oplog_t::AND) + { + if (!l || !r) + { + throw base_s3select_exception("missing operand for logical and", base_s3select_exception::s3select_exp_en_t::FATAL); + } + return var_value = (l->eval().i64() && r->eval().i64()); + } + else + { + if (!l || !r) + { + throw base_s3select_exception("missing operand for logical or", base_s3select_exception::s3select_exp_en_t::FATAL); + } + return var_value = (l->eval().i64() || r->eval().i64()); + } + } + +}; + +class mulldiv_operation : public base_statement +{ + +public: + + enum class muldiv_t {NA, MULL, DIV, POW} ; + +private: + base_statement* l; + base_statement* r; + + muldiv_t _mulldiv; + value var_value; + +public: + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual bool semantic() + { + return true; + } + + virtual std::string print(int ident) + { + //std::string out = std::string(ident, ' ') + "mulldiv_operation:" += std::to_string(_mulldiv) + "\n" + l->print(ident - 5) + r->print(ident + 5); + //return out; + return std::string("#");//TBD + } + + virtual value& eval() + { + switch (_mulldiv) + { + case muldiv_t::MULL: + return var_value = l->eval() * r->eval(); + break; + + case muldiv_t::DIV: + return var_value = l->eval() / r->eval(); + break; + + case muldiv_t::POW: + return var_value = l->eval() ^ r->eval(); + break; + + default: + throw base_s3select_exception("internal error"); + break; + } + } + + mulldiv_operation(base_statement* _l, muldiv_t c, base_statement* _r):l(_l), r(_r), _mulldiv(c) {} + + virtual ~mulldiv_operation() {} +}; + +class addsub_operation : public base_statement +{ + +public: + + enum class addsub_op_t {ADD, SUB, NA}; + +private: + base_statement* l; + base_statement* r; + + addsub_op_t _op; + value var_value; + +public: + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual bool semantic() + { + return true; + } + + addsub_operation(base_statement* _l, addsub_op_t _o, base_statement* _r):l(_l), r(_r), _op(_o) {} + + virtual ~addsub_operation() {} + + virtual std::string print(int ident) + { + //std::string out = std::string(ident, ' ') + "addsub_operation:" += std::to_string(_op) + "\n" + l->print(ident - 5) + r->print(ident + 5); + return std::string("#");//TBD + } + + virtual value& eval() + { + if (_op == addsub_op_t::NA) // -num , +num , unary-operation on number + { + if (l) + { + return var_value = l->eval(); + } + else if (r) + { + return var_value = r->eval(); + } + } + else if (_op == addsub_op_t::ADD) + { + return var_value = (l->eval() + r->eval()); + } + else + { + return var_value = (l->eval() - r->eval()); + } + + return var_value; + } +}; + +class base_function +{ + +protected: + bool aggregate; + +public: + //TODO add semantic to base-function , it operate once on function creation + // validate semantic on creation instead on run-time + virtual bool operator()(std::vector<base_statement*>* args, variable* result) = 0; + base_function() : aggregate(false) {} + bool is_aggregate() + { + return aggregate == true; + } + virtual void get_aggregate_result(variable*) {} + + virtual ~base_function() {} +}; + + +};//namespace + +#endif |