diff options
Diffstat (limited to 'src/s3select/include/s3select_functions.h')
-rw-r--r-- | src/s3select/include/s3select_functions.h | 1037 |
1 files changed, 1037 insertions, 0 deletions
diff --git a/src/s3select/include/s3select_functions.h b/src/s3select/include/s3select_functions.h new file mode 100644 index 000000000..77b3db3a7 --- /dev/null +++ b/src/s3select/include/s3select_functions.h @@ -0,0 +1,1037 @@ +#ifndef __S3SELECT_FUNCTIONS__ +#define __S3SELECT_FUNCTIONS__ + + +#include "s3select_oper.h" +#define BOOST_BIND_ACTION_PARAM( push_name ,param ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2, param) +namespace s3selectEngine +{ + +struct push_2dig +{ + void operator()(const char* a, const char* b, uint32_t* n) const + { + *n = ((char)(*a) - 48) *10 + ((char)*(a+1)-48) ; + } + +}; +static push_2dig g_push_2dig; + +struct push_4dig +{ + void operator()(const char* a, const char* b, uint32_t* n) const + { + *n = ((char)(*a) - 48) *1000 + ((char)*(a+1)-48)*100 + ((char)*(a+2)-48)*10 + ((char)*(a+3)-48); + } + +}; +static push_4dig g_push_4dig; + +enum class s3select_func_En_t {ADD, + SUM, + MIN, + MAX, + COUNT, + TO_INT, + TO_FLOAT, + TO_TIMESTAMP, + SUBSTR, + EXTRACT, + DATE_ADD, + DATE_DIFF, + UTCNOW + }; + + +class s3select_functions : public __clt_allocator +{ + +private: + + using FunctionLibrary = std::map<std::string, s3select_func_En_t>; + const FunctionLibrary m_functions_library = + { + {"add", s3select_func_En_t::ADD}, + {"sum", s3select_func_En_t::SUM}, + {"count", s3select_func_En_t::COUNT}, + {"min", s3select_func_En_t::MIN}, + {"max", s3select_func_En_t::MAX}, + {"int", s3select_func_En_t::TO_INT}, + {"float", s3select_func_En_t::TO_FLOAT}, + {"substr", s3select_func_En_t::SUBSTR}, + {"timestamp", s3select_func_En_t::TO_TIMESTAMP}, + {"extract", s3select_func_En_t::EXTRACT}, + {"dateadd", s3select_func_En_t::DATE_ADD}, + {"datediff", s3select_func_En_t::DATE_DIFF}, + {"utcnow", s3select_func_En_t::UTCNOW} + }; + +public: + + base_function* create(std::string fn_name); +}; + +class __function : public base_statement +{ + +private: + std::vector<base_statement*> arguments; + std::string name; + base_function* m_func_impl; + s3select_functions* m_s3select_functions; + variable m_result; + + void _resolve_name() + { + if (m_func_impl) + { + return; + } + + base_function* f = m_s3select_functions->create(name); + if (!f) + { + throw base_s3select_exception("function not found", base_s3select_exception::s3select_exp_en_t::FATAL); //should abort query + } + m_func_impl = f; + } + +public: + virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa) + { + m_scratch = sa; + m_aliases = pa; + for (base_statement* ba : arguments) + { + ba->traverse_and_apply(sa, pa); + } + } + + virtual bool is_aggregate() // TODO under semantic flow + { + _resolve_name(); + + return m_func_impl->is_aggregate(); + } + + virtual bool semantic() + { + return true; + } + + __function(const char* fname, s3select_functions* s3f) : name(fname), m_func_impl(0), m_s3select_functions(s3f) {} + + virtual value& eval() + { + + _resolve_name(); + + if (is_last_call == false) + { + (*m_func_impl)(&arguments, &m_result); + } + else + { + (*m_func_impl).get_aggregate_result(&m_result); + } + + return m_result.get_value(); + } + + + + virtual std::string print(int ident) + { + return std::string(0); + } + + void push_argument(base_statement* arg) + { + arguments.push_back(arg); + } + + + std::vector<base_statement*> get_arguments() + { + return arguments; + } + + virtual ~__function() + { + arguments.clear(); + } +}; + + + +/* + s3-select function defintions +*/ +struct _fn_add : public base_function +{ + + value var_result; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + iter++; + base_statement* y = *iter; + + var_result = x->eval() + y->eval(); + + *result = var_result; + + return true; + } +}; + +struct _fn_sum : public base_function +{ + + value sum; + + _fn_sum() : sum(0) + { + aggregate = true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + try + { + sum = sum + x->eval(); + } + catch (base_s3select_exception& e) + { + std::cout << "illegal value for aggregation(sum). skipping." << std::endl; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) + { + throw; + } + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = sum ; + } +}; + +struct _fn_count : public base_function +{ + + int64_t count; + + _fn_count():count(0) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + count += 1; + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + result->set_value(count); + } + +}; + +struct _fn_min : public base_function +{ + + value min; + + _fn_min():min(__INT64_MAX__) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + if(min > x->eval()) + { + min=x->eval(); + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = min; + } + +}; + +struct _fn_max : public base_function +{ + + value max; + + _fn_max():max(-__INT64_MAX__) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + if(max < x->eval()) + { + max=x->eval(); + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = max; + } + +}; + +struct _fn_to_int : public base_function +{ + + value var_result; + value func_arg; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + char* perr; + int64_t i=0; + func_arg = (*args->begin())->eval(); + + if (func_arg.type == value::value_En_t::STRING) + { + i = strtol(func_arg.str(), &perr, 10) ; //TODO check error before constructor + } + else if (func_arg.type == value::value_En_t::FLOAT) + { + i = func_arg.dbl(); + } + else + { + i = func_arg.i64(); + } + + var_result = i ; + *result = var_result; + + return true; + } + +}; + +struct _fn_to_float : public base_function +{ + + value var_result; + value v_from; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + char* perr; + double d=0; + value v = (*args->begin())->eval(); + + if (v.type == value::value_En_t::STRING) + { + d = strtod(v.str(), &perr) ; //TODO check error before constructor + } + else if (v.type == value::value_En_t::FLOAT) + { + d = v.dbl(); + } + else + { + d = v.i64(); + } + + var_result = d; + *result = var_result; + + return true; + } + +}; + +struct _fn_to_timestamp : public base_function +{ + bsc::rule<> separator = bsc::ch_p(":") | bsc::ch_p("-"); + + uint32_t yr = 1700, mo = 1, dy = 1; + bsc::rule<> dig4 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p >> bsc::digit_p >> bsc::digit_p]; + bsc::rule<> dig2 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p]; + + bsc::rule<> d_yyyymmdd_dig = ((dig4[BOOST_BIND_ACTION_PARAM(push_4dig, &yr)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mo)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &dy)]) >> *(separator)); + + uint32_t hr = 0, mn = 0, sc = 0; + bsc::rule<> d_time_dig = ((dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &hr)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mn)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &sc)]) >> *(separator)); + + boost::posix_time::ptime new_ptime; + + value v_str; + + + bool datetime_validation() + { + //TODO temporary , should check for leap year + + if(yr<1700 || yr>2050) + { + return false; + } + if (mo<1 || mo>12) + { + return false; + } + if (dy<1 || dy>31) + { + return false; + } + if (hr>23) + { + return false; + } + if (dy>59) + { + return false; + } + if (sc>59) + { + return false; + } + + return true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + + hr = 0; + mn = 0; + sc = 0; + + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size != 1) + { + throw base_s3select_exception("to_timestamp should have one parameter"); + } + + base_statement* str = *iter; + + v_str = str->eval(); + + if (v_str.type != value::value_En_t::STRING) + { + throw base_s3select_exception("to_timestamp first argument must be string"); //can skip current row + } + + bsc::parse_info<> info_dig = bsc::parse(v_str.str(), d_yyyymmdd_dig >> *(separator) >> d_time_dig); + + if(datetime_validation()==false or !info_dig.full) + { + throw base_s3select_exception("input date-time is illegal"); + } + + new_ptime = boost::posix_time::ptime(boost::gregorian::date(yr, mo, dy), + boost::posix_time::hours(hr) + boost::posix_time::minutes(mn) + boost::posix_time::seconds(sc)); + + result->set_value(&new_ptime); + + return true; + } + +}; + +struct _fn_extact_from_timestamp : public base_function +{ + + boost::posix_time::ptime new_ptime; + + value val_date_part; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 2) + { + throw base_s3select_exception("to_timestamp should have 2 parameters"); + } + + base_statement* date_part = *iter; + + val_date_part = date_part->eval();//TODO could be done once? + + if(val_date_part.is_string()== false) + { + throw base_s3select_exception("first parameter should be string"); + } + + iter++; + + base_statement* ts = *iter; + + if(ts->eval().is_timestamp()== false) + { + throw base_s3select_exception("second parameter is not timestamp"); + } + + new_ptime = *ts->eval().timestamp(); + + if( strcmp(val_date_part.str(), "year")==0 ) + { + result->set_value( (int64_t)new_ptime.date().year() ); + } + else if( strcmp(val_date_part.str(), "month")==0 ) + { + result->set_value( (int64_t)new_ptime.date().month() ); + } + else if( strcmp(val_date_part.str(), "day")==0 ) + { + result->set_value( (int64_t)new_ptime.date().day_of_year() ); + } + else if( strcmp(val_date_part.str(), "week")==0 ) + { + result->set_value( (int64_t)new_ptime.date().week_number() ); + } + else + { + throw base_s3select_exception(std::string( val_date_part.str() + std::string(" is not supported ") ).c_str() ); + } + + return true; + } + +}; + +struct _fn_diff_timestamp : public base_function +{ + + value val_date_part; + value val_dt1; + value val_dt2; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 3) + { + throw base_s3select_exception("datediff need 3 parameters"); + } + + base_statement* date_part = *iter; + + val_date_part = date_part->eval(); + + iter++; + base_statement* dt1_param = *iter; + val_dt1 = dt1_param->eval(); + if (val_dt1.is_timestamp() == false) + { + throw base_s3select_exception("second parameter should be timestamp"); + } + + iter++; + base_statement* dt2_param = *iter; + val_dt2 = dt2_param->eval(); + if (val_dt2.is_timestamp() == false) + { + throw base_s3select_exception("third parameter should be timestamp"); + } + + if (strcmp(val_date_part.str(), "year") == 0) + { + int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ; + result->set_value( yr ); + } + else if (strcmp(val_date_part.str(), "month") == 0) + { + int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ; + int64_t mon = val_dt2.timestamp()->date().month() - val_dt1.timestamp()->date().month() ; + + result->set_value( yr*12 + mon ); + } + else if (strcmp(val_date_part.str(), "day") == 0) + { + boost::gregorian::date_period dp = + boost::gregorian::date_period( val_dt1.timestamp()->date(), val_dt2.timestamp()->date()); + result->set_value( dp.length().days() ); + } + else if (strcmp(val_date_part.str(), "hours") == 0) + { + boost::posix_time::time_duration td_res = (*val_dt2.timestamp() - *val_dt1.timestamp()); + result->set_value( td_res.hours()); + } + else + { + throw base_s3select_exception("first parameter should be string: year,month,hours,day"); + } + + + return true; + } +}; + +struct _fn_add_to_timestamp : public base_function +{ + + boost::posix_time::ptime new_ptime; + + value val_date_part; + value val_quantity; + value val_timestamp; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 3) + { + throw base_s3select_exception("add_to_timestamp should have 3 parameters"); + } + + base_statement* date_part = *iter; + val_date_part = date_part->eval();//TODO could be done once? + + if(val_date_part.is_string()== false) + { + throw base_s3select_exception("first parameter should be string"); + } + + iter++; + base_statement* quan = *iter; + val_quantity = quan->eval(); + + if (val_quantity.is_number() == false) + { + throw base_s3select_exception("second parameter should be number"); //TODO what about double? + } + + iter++; + base_statement* ts = *iter; + val_timestamp = ts->eval(); + + if(val_timestamp.is_timestamp() == false) + { + throw base_s3select_exception("third parameter should be time-stamp"); + } + + new_ptime = *val_timestamp.timestamp(); + + if( strcmp(val_date_part.str(), "year")==0 ) + { + new_ptime += boost::gregorian::years( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else if( strcmp(val_date_part.str(), "month")==0 ) + { + new_ptime += boost::gregorian::months( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else if( strcmp(val_date_part.str(), "day")==0 ) + { + new_ptime += boost::gregorian::days( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else + { + throw base_s3select_exception( std::string(val_date_part.str() + std::string(" is not supported for add")).c_str()); + } + + return true; + } + +}; + +struct _fn_utcnow : public base_function +{ + + boost::posix_time::ptime now_ptime; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + int args_size = args->size(); + + if (args_size != 0) + { + throw base_s3select_exception("utcnow does not expect any parameters"); + } + + now_ptime = boost::posix_time::ptime( boost::posix_time::second_clock::universal_time()); + result->set_value( &now_ptime ); + + return true; + } +}; + +struct _fn_substr : public base_function +{ + + char buff[4096];// this buffer is persist for the query life time, it use for the results per row(only for the specific function call) + //it prevent from intensive use of malloc/free (fragmentation). + //should validate result length. + //TODO may replace by std::string (dynamic) , or to replace with global allocator , in query scope. + value v_str; + value v_from; + value v_to; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + + if (args_size<2) + { + throw base_s3select_exception("substr accept 2 arguments or 3"); + } + + base_statement* str = *iter; + iter++; + base_statement* from = *iter; + base_statement* to; + + if (args_size == 3) + { + iter++; + to = *iter; + } + + v_str = str->eval(); + + if(v_str.type != value::value_En_t::STRING) + { + throw base_s3select_exception("substr first argument must be string"); //can skip current row + } + + int str_length = strlen(v_str.str()); + + v_from = from->eval(); + if(v_from.is_string()) + { + throw base_s3select_exception("substr second argument must be number"); //can skip current row + } + + int64_t f; + int64_t t; + + if (args_size==3) + { + v_to = to->eval(); + if (v_to.is_string()) + { + throw base_s3select_exception("substr third argument must be number"); //can skip row + } + } + + if (v_from.type == value::value_En_t::FLOAT) + { + f=v_from.dbl(); + } + else + { + f=v_from.i64(); + } + + if (f>str_length) + { + throw base_s3select_exception("substr start position is too far"); //can skip row + } + + if (str_length>(int)sizeof(buff)) + { + throw base_s3select_exception("string too long for internal buffer"); //can skip row + } + + if (args_size == 3) + { + if (v_from.type == value::value_En_t::FLOAT) + { + t = v_to.dbl(); + } + else + { + t = v_to.i64(); + } + + if( (str_length-(f-1)-t) <0) + { + throw base_s3select_exception("substr length parameter beyond bounderies"); //can skip row + } + + strncpy(buff, v_str.str()+f-1, t); + } + else + { + strcpy(buff, v_str.str()+f-1); + } + + result->set_value(buff); + + return true; + } + +}; + +base_function* s3select_functions::create(std::string fn_name) +{ + const FunctionLibrary::const_iterator iter = m_functions_library.find(fn_name); + + if (iter == m_functions_library.end()) + { + std::string msg; + msg = fn_name + " " + " function not found"; + throw base_s3select_exception(msg, base_s3select_exception::s3select_exp_en_t::FATAL); + } + + switch (iter->second) + { + case s3select_func_En_t::ADD: + return S3SELECT_NEW(_fn_add); + break; + + case s3select_func_En_t::SUM: + return S3SELECT_NEW(_fn_sum); + break; + + case s3select_func_En_t::COUNT: + return S3SELECT_NEW(_fn_count); + break; + + case s3select_func_En_t::MIN: + return S3SELECT_NEW(_fn_min); + break; + + case s3select_func_En_t::MAX: + return S3SELECT_NEW(_fn_max); + break; + + case s3select_func_En_t::TO_INT: + return S3SELECT_NEW(_fn_to_int); + break; + + case s3select_func_En_t::TO_FLOAT: + return S3SELECT_NEW(_fn_to_float); + break; + + case s3select_func_En_t::SUBSTR: + return S3SELECT_NEW(_fn_substr); + break; + + case s3select_func_En_t::TO_TIMESTAMP: + return S3SELECT_NEW(_fn_to_timestamp); + break; + + case s3select_func_En_t::EXTRACT: + return S3SELECT_NEW(_fn_extact_from_timestamp); + break; + + case s3select_func_En_t::DATE_ADD: + return S3SELECT_NEW(_fn_add_to_timestamp); + break; + + case s3select_func_En_t::DATE_DIFF: + return S3SELECT_NEW(_fn_diff_timestamp); + break; + + case s3select_func_En_t::UTCNOW: + return S3SELECT_NEW(_fn_utcnow); + break; + + default: + throw base_s3select_exception("internal error while resolving function-name"); + break; + } +} + +bool base_statement::is_function() +{ + if (dynamic_cast<__function*>(this)) + { + return true; + } + else + { + return false; + } +} + +bool base_statement::is_aggregate_exist_in_expression(base_statement* e) //TODO obsolete ? +{ + if (e->is_aggregate()) + { + return true; + } + + if (e->left() && e->left()->is_aggregate_exist_in_expression(e->left())) + { + return true; + } + + if (e->right() && e->right()->is_aggregate_exist_in_expression(e->right())) + { + return true; + } + + if (e->is_function()) + { + for (auto i : dynamic_cast<__function*>(e)->get_arguments()) + if (e->is_aggregate_exist_in_expression(i)) + { + return true; + } + } + + return false; +} + +base_statement* base_statement::get_aggregate() +{ + //search for aggregation function in AST + base_statement* res = 0; + + if (is_aggregate()) + { + return this; + } + + if (left() && (res=left()->get_aggregate())!=0) + { + return res; + } + + if (right() && (res=right()->get_aggregate())!=0) + { + return res; + } + + if (is_function()) + { + for (auto i : dynamic_cast<__function*>(this)->get_arguments()) + { + base_statement* b=i->get_aggregate(); + if (b) + { + return b; + } + } + } + return 0; +} + +bool base_statement::is_nested_aggregate(base_statement* e) +{ + //validate for non nested calls for aggregation function, i.e. sum ( min ( )) + if (e->is_aggregate()) + { + if (e->left()) + { + if (e->left()->is_aggregate_exist_in_expression(e->left())) + { + return true; + } + } + else if (e->right()) + { + if (e->right()->is_aggregate_exist_in_expression(e->right())) + { + return true; + } + } + else if (e->is_function()) + { + for (auto i : dynamic_cast<__function*>(e)->get_arguments()) + { + if (i->is_aggregate_exist_in_expression(i)) + { + return true; + } + } + } + return false; + } + return false; +} + +// select sum(c2) ... + c1 ... is not allowed. a binary operation with scalar is OK. i.e. select sum() + 1 +bool base_statement::is_binop_aggregate_and_column(base_statement* skip_expression) +{ + if (left() && left() != skip_expression) //can traverse to left + { + if (left()->is_column()) + { + return true; + } + else if (left()->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + + if (right() && right() != skip_expression) //can traverse right + { + if (right()->is_column()) + { + return true; + } + else if (right()->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + + if (this != skip_expression && is_function()) + { + + __function* f = (dynamic_cast<__function*>(this)); + std::vector<base_statement*> l = f->get_arguments(); + for (auto i : l) + { + if (i!=skip_expression && i->is_column()) + { + return true; + } + if (i->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + } + + return false; +} + +} //namespace s3selectEngine + +#endif |