diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/s3select | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/s3select')
-rw-r--r-- | src/s3select/.gitignore | 2 | ||||
-rw-r--r-- | src/s3select/CMakeLists.txt | 19 | ||||
-rw-r--r-- | src/s3select/README.md | 50 | ||||
-rw-r--r-- | src/s3select/example/CMakeLists.txt | 12 | ||||
-rwxr-xr-x | src/s3select/example/expr_genrator.py | 9 | ||||
-rw-r--r-- | src/s3select/example/generate_rand_csv.c | 28 | ||||
-rwxr-xr-x | src/s3select/example/parse_csv.py | 12 | ||||
-rwxr-xr-x | src/s3select/example/run_test.bash | 96 | ||||
-rw-r--r-- | src/s3select/example/s3select_example.cpp | 136 | ||||
-rw-r--r-- | src/s3select/include/s3select.h | 1138 | ||||
-rw-r--r-- | src/s3select/include/s3select_csv_parser.h | 407 | ||||
-rw-r--r-- | src/s3select/include/s3select_functions.h | 1037 | ||||
-rw-r--r-- | src/s3select/include/s3select_oper.h | 1320 | ||||
-rw-r--r-- | src/s3select/s3select-parse-s.png | bin | 0 -> 193829 bytes | |||
-rw-r--r-- | src/s3select/s3select.rst | 259 | ||||
-rw-r--r-- | src/s3select/test/CMakeLists.txt | 6 | ||||
-rw-r--r-- | src/s3select/test/s3select_test.cpp | 244 |
17 files changed, 4775 insertions, 0 deletions
diff --git a/src/s3select/.gitignore b/src/s3select/.gitignore new file mode 100644 index 000000000..d6536badc --- /dev/null +++ b/src/s3select/.gitignore @@ -0,0 +1,2 @@ +build +compile_commands.json diff --git a/src/s3select/CMakeLists.txt b/src/s3select/CMakeLists.txt new file mode 100644 index 000000000..99e0f16a7 --- /dev/null +++ b/src/s3select/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.0) + +project(s3select) + +set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb") +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +find_package(Boost REQUIRED) +find_package(GTest REQUIRED) + +enable_testing() + +add_subdirectory(example) +add_subdirectory(test) + +add_test(NAME run_my_test + COMMAND sh -c "../example/run_test.bash") + diff --git a/src/s3select/README.md b/src/s3select/README.md new file mode 100644 index 000000000..f1408dbdb --- /dev/null +++ b/src/s3select/README.md @@ -0,0 +1,50 @@ +# s3select +**The purpose of s3select engine** is to create an efficient pipe between user client to storage node (the engine should be as close as possible to storage, "moving computation into storage"). + +It enables the user to define the exact portion of data should received by his side. + +It also enables for higher level analytic-applications (such as SPARK-SQL) , using that feature to improve their latency and throughput. + +https://aws.amazon.com/blogs/aws/s3-glacier-select/ + +https://www.qubole.com/blog/amazon-s3-select-integration/ + +The engine is using boost::spirit to define the grammar , and by that building the AST (abstract-syntax-tree). upon statement is accepted by the grammar it create a tree of objects. + +The hierarchy(levels) of the different objects also define their role, i.e. function could be a finite expression, or an argument for an expression, or an argument for other functions, and so forth. + +Bellow is an example for “SQL” statement been parsed and transform into AST. +![alt text](/s3select-parse-s.png) + +The where-clause is boolean expression made of arithmetic expression building blocks. + +Projection is a list of arithmetic expressions + +I created a container (**sudo docker run -it galsl/boost:latest /bin/bash/**) built with boost libraries , for building and running the s3select demo application. + +**The demo can run on CSV files only, as follow. (folder s3select_demo)** +* bash> s3select -q ‘select _1 +_2,_5 * 3 from /...some..full-path/csv.txt where _1 > _2;’ + +* bash> cat /...some..full-path/csv.txt | s3select -q ‘select _1,_5 from stdin where _1 > _2;’ + +* bash> cat /...some..full-path/csv.txt | s3select -q ‘select c1,c5 from stdin where c1 > c2;’ -s ‘c1,c2,c3,c4,c5’ + +* bash> cat /...some..full-path/csv.txt | s3select -q 'select min(int(substr(_1,1,1))) from stdin where substr(_1,1,1) == substr(_2,1,1);' + +-s flag is defining a schema (no type only names) , without schema each column can be accessed with _N (_1 is the first column). + +-q flag is for the query. + +the engine supporting the following arithmetical operations +,-,*,/,^ , ( ) , and also the logical operators and,or. + +s3select is supporting float,decimal,string; it also supports aggregation functions such as max,min,sum,count; the input stream is accepted as string attributes, to operate arithmetical operation it need to CAST, i.e. int(_1) is converting text to integer. + +The demo-app is producing CSV format , thus it can be piped into another s3select statement. + +there is a small app /generate_rand_csv {number-of-rows} {number-of-columns}/ , which generate CSV rows containing only numbers. + +the random numbers are produced with same seed number. + +since it works with STDIN , it possible to concatenate several files into single stream. + +cat file1 file2 file1 file2 | s3select -q ‘ ….. ‘ diff --git a/src/s3select/example/CMakeLists.txt b/src/s3select/example/CMakeLists.txt new file mode 100644 index 000000000..37f99f0b3 --- /dev/null +++ b/src/s3select/example/CMakeLists.txt @@ -0,0 +1,12 @@ +add_executable(s3select_example s3select_example.cpp) +target_include_directories(s3select_example PUBLIC ../include) +target_link_libraries(s3select_example boost_date_time) + +add_executable(generate_rand_csv generate_rand_csv.c) + +add_custom_command(OUTPUT expr_genrator.py COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/expr_genrator.py expr_genrator.py + COMMENT "Copy expr_genrator.py" + VERBATIM) + +add_custom_target(expr_generator ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/expr_genrator.py) + diff --git a/src/s3select/example/expr_genrator.py b/src/s3select/example/expr_genrator.py new file mode 100755 index 000000000..0d21fcee6 --- /dev/null +++ b/src/s3select/example/expr_genrator.py @@ -0,0 +1,9 @@ +import random +import sys + +def expr(depth): + if depth==1 or random.random()<1.0/(2**depth-1): + return str(int(random.random() * 100) + 1)+".0" + return '(' + expr(depth-1) + random.choice(['+','-','*','/']) + expr(depth-1) + ')' + +print expr( int(sys.argv[1]) ) diff --git a/src/s3select/example/generate_rand_csv.c b/src/s3select/example/generate_rand_csv.c new file mode 100644 index 000000000..67d52adaa --- /dev/null +++ b/src/s3select/example/generate_rand_csv.c @@ -0,0 +1,28 @@ +#include <stdio.h> +#include <stdlib.h> + + +int main(int argc, char** argv) +{ + if (argc<3) + { + printf("%s <num-of-rows> <num-of-columns> \n", argv[0]); + return -1; + } + + srand(1234); + int line_no=0; + for(int i=0; i<atoi(argv[1]); i++) + { + printf("%d,", i); + for(int y=0; y<atoi(argv[2]); y++) + { + printf("%d,", rand()%1000); + } + printf("\n"); + } + + + + +} diff --git a/src/s3select/example/parse_csv.py b/src/s3select/example/parse_csv.py new file mode 100755 index 000000000..23fe14add --- /dev/null +++ b/src/s3select/example/parse_csv.py @@ -0,0 +1,12 @@ +#!/usr/bin/python + +import csv + +with open('stam.txt') as csv_file: + csv_reader = csv.reader(csv_file, delimiter=',') + line_count = 0 + for row in csv_reader: + #if (int(row[0])==465 and int(row[5])==268): # casting is slower + if (row[0]=="465" and row[5]=="268"): + print row + diff --git a/src/s3select/example/run_test.bash b/src/s3select/example/run_test.bash new file mode 100755 index 000000000..56654d7ab --- /dev/null +++ b/src/s3select/example/run_test.bash @@ -0,0 +1,96 @@ +#!/bin/bash + +set -e + +PREFIX=${1:-"./example"} + +## purpose : sanity tests + +s3select_calc() +{ +l="$*" +res=$( echo 1 | "$PREFIX"/s3select_example -q "select ${l} from stdin;" ) +echo "$res" | sed 's/.$//' +} + +# create c file with expression , compile it and run it. +c_calc() +{ +cat << @@ > "$PREFIX"/tmp.c + +#include <stdio.h> +int main() +{ +printf("%f\n",$*); +} +@@ +gcc -o "$PREFIX"/a.out "$PREFIX"/tmp.c +"$PREFIX"/a.out +} + +expr_test() +{ +## test the arithmetic evaluation of s3select against C program +for i in {1..100} +do + e=$(python2 "$PREFIX"/expr_genrator.py 5) + echo expression["$i"]="$e" + r1=$(s3select_calc "$e") + r2=$(c_calc "$e") + echo "$r1" "$r2" + + ## should be zero or very close to zero; ( s3select is C compile program ) + res=$(echo "" | awk -v e="$e" -v r1="$r1" -v r2="$r2" 'function abs(n){if (n<0) return -n; else return n;}{if (abs(r1-r2) > 0.00001) {print "MISSMATCH result for expression",e;}}') + if test "$res" != ""; then + echo "$res" + exit 1 + fi +done +} + +aggregate_test() +{ +## generate_rand_csv is generating with the same seed +echo check sum +s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select sum(int(_1)) from stdin;') +awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";} {s+=$1;} END{print s;}') +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +echo check min +s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select min(int(_1)) from stdin;') +awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";min=100000;} {if(min>$1) min=$1;} END{print min;}') +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +echo check max +s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select max(int(_1)) from stdin;') +awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";max=0;} {if(max<$1) max=$1;} END{print max;}' ) +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +echo check substr and count +s3select_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select count(int(_1)) from stdin where int(_1)>200 and int(_1)<250;') +awk_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select substr(_1,1,1) from stdin where int(_1)>200 and int(_1)<250;' | uniq -c | awk '{print $1;}') +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +} + +############################################################### + +expr_test +aggregate_test + +rm "$PREFIX"/tmp.c "$PREFIX"/a.out + +exit 0 + diff --git a/src/s3select/example/s3select_example.cpp b/src/s3select/example/s3select_example.cpp new file mode 100644 index 000000000..840b62c6a --- /dev/null +++ b/src/s3select/example/s3select_example.cpp @@ -0,0 +1,136 @@ +#include "s3select.h" +#include <fstream> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> + +using namespace s3selectEngine; +using namespace BOOST_SPIRIT_CLASSIC_NS; + +int cli_get_schema(const char* input_schema, actionQ& x) +{ + g_push_column.set_action_q(&x); + + rule<> column_name_rule = lexeme_d[(+alpha_p >> *digit_p)]; + + //TODO an issue to resolve with trailing space + parse_info<> info = parse(input_schema, ((column_name_rule)[BOOST_BIND_ACTION(push_column)] >> *(',' >> (column_name_rule)[BOOST_BIND_ACTION(push_column)])), space_p); + + if (!info.full) + { + std::cout << "failure in schema description " << input_schema << std::endl; + return -1; + } + + return 0; +} + +int main(int argc, char** argv) +{ + + //purpose: demostrate the s3select functionalities + s3select s3select_syntax; + + char* input_query = 0; + + for (int i = 0; i < argc; i++) + { + + if (!strcmp(argv[i], "-q")) + { + input_query = argv[i + 1]; + } + } + + + if (!input_query) + { + std::cout << "type -q 'select ... from ... '" << std::endl; + return -1; + } + + + bool to_aggregate = false; + + int status = s3select_syntax.parse_query(input_query); + if (status != 0) + { + std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; + return -1; + } + + std::string object_name = s3select_syntax.get_from_clause(); //TODO stdin + + FILE* fp; + + if (object_name.compare("stdin")==0) + { + fp = stdin; + } + else + { + fp = fopen(object_name.c_str(), "r"); + } + + + if(!fp) + { + std::cout << " input stream is not valid, abort;" << std::endl; + return -1; + } + + struct stat statbuf; + + lstat(object_name.c_str(), &statbuf); + + std::string s3select_result; + s3selectEngine::csv_object::csv_defintions csv; + csv.use_header_info = false; + //csv.column_delimiter='|'; + //csv.row_delimiter='\t'; + + + s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); + //s3selectEngine::csv_object s3_csv_object(&s3select_syntax); + +#define BUFF_SIZE 1024*1024*4 + char* buff = (char*)malloc( BUFF_SIZE ); + while(1) + { + //char buff[4096]; + + //char * in = fgets(buff,sizeof(buff),fp); + size_t input_sz = fread(buff, 1, BUFF_SIZE, fp); + char* in=buff; + //input_sz = strlen(buff); + //size_t input_sz = in == 0 ? 0 : strlen(in); + + //if (!input_sz) to_aggregate = true; + + + //int status = s3_csv_object.run_s3select_on_object(s3select_result,in,input_sz,false,false,to_aggregate); + int status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size); + if(status<0) + { + std::cout << "failure on execution " << std::endl; + break; + } + + if(s3select_result.size()>1) + { + std::cout << s3select_result; + } + + s3select_result = ""; + if(!input_sz || feof(fp)) + { + break; + } + + } + + free(buff); + fclose(fp); + + +} diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h new file mode 100644 index 000000000..77af8c2d9 --- /dev/null +++ b/src/s3select/include/s3select.h @@ -0,0 +1,1138 @@ +#ifndef __S3SELECT__ +#define __S3SELECT__ + +#include <boost/spirit/include/classic_core.hpp> +#include <boost/algorithm/string.hpp> +#include <iostream> +#include <string> +#include <list> +#include "s3select_oper.h" +#include "s3select_functions.h" +#include "s3select_csv_parser.h" +#include <boost/function.hpp> +#include <boost/bind.hpp> +#include <functional> + + +#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} + + +namespace s3selectEngine +{ + +/// AST builder + +class s3select_projections +{ + +private: + std::vector<base_statement*> m_projections; + +public: + bool is_aggregate() + { + //TODO iterate on projections , and search for aggregate + //for(auto p : m_projections){} + + return false; + } + + bool semantic() + { + //TODO check aggragtion function are not nested + return false; + } + + std::vector<base_statement*>* get() + { + return &m_projections; + } + +}; + +struct actionQ +{ +// upon parser is accepting a token (lets say some number), +// it push it into dedicated queue, later those tokens are poped out to build some "higher" contruct (lets say 1 + 2) +// those containers are used only for parsing phase and not for runtime. + + std::vector<mulldiv_operation::muldiv_t> muldivQ; + std::vector<addsub_operation::addsub_op_t> addsubQ; + std::vector<arithmetic_operand::cmp_t> arithmetic_compareQ; + std::vector<logical_operand::oplog_t> logical_compareQ; + std::vector<base_statement*> exprQ; + std::vector<base_statement*> funcQ; + std::vector<base_statement*> condQ; + projection_alias alias_map; + std::string from_clause; + std::vector<std::string> schema_columns; + s3select_projections projections; + +}; + +class base_action : public __clt_allocator +{ + +public: + base_action() : m_action(0), m_s3select_functions(0) {} + actionQ* m_action; + void set_action_q(actionQ* a) + { + m_action = a; + } + void set_s3select_functions(s3select_functions* s3f) + { + m_s3select_functions = s3f; + } + s3select_functions* m_s3select_functions; +}; + +struct push_from_clause : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + m_action->from_clause = token; + } + +}; +static push_from_clause g_push_from_clause; + +struct push_number : public base_action //TODO use define for defintion of actions +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + variable* v = S3SELECT_NEW( variable, atoi(token.c_str())); + + + m_action->exprQ.push_back(v); + } + +}; +static push_number g_push_number; + +struct push_float_number : public base_action //TODO use define for defintion of actions +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + //the parser for float(real_p) is accepting also integers, thus "blocking" integer acceptence and all are float. + bsc::parse_info<> info = bsc::parse(token.c_str(), bsc::int_p, bsc::space_p); + + if (!info.full) + { + char* perr; + double d = strtod(token.c_str(), &perr); + variable* v = S3SELECT_NEW( variable, d); + + m_action->exprQ.push_back(v); + } + else + { + variable* v = S3SELECT_NEW(variable, atoi(token.c_str())); + + m_action->exprQ.push_back(v); + } + } + +}; +static push_float_number g_push_float_number; + +struct push_string : public base_action //TODO use define for defintion of actions +{ + + void operator()(const char* a, const char* b) const + { + a++; + b--;// remove double quotes + std::string token(a, b); + + variable* v = S3SELECT_NEW(variable, token, variable::var_t::COL_VALUE ); + + m_action->exprQ.push_back(v); + } + +}; +static push_string g_push_string; + +struct push_variable : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + variable* v = S3SELECT_NEW(variable, token); + + m_action->exprQ.push_back(v); + } +}; +static push_variable g_push_variable; + +/////////////////////////arithmetic unit ///////////////// +struct push_addsub : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + if (token.compare("+") == 0) + { + m_action->addsubQ.push_back(addsub_operation::addsub_op_t::ADD); + } + else + { + m_action->addsubQ.push_back(addsub_operation::addsub_op_t::SUB); + } + } +}; +static push_addsub g_push_addsub; + +struct push_mulop : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + if (token.compare("*") == 0) + { + m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::MULL); + } + else if (token.compare("/") == 0) + { + m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::DIV); + } + else + { + m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::POW); + } + } +}; +static push_mulop g_push_mulop; + +struct push_addsub_binop : public base_action +{ + void operator()(const char* a, const char* b) const + { + base_statement* l = 0, *r = 0; + + r = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + l = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + addsub_operation::addsub_op_t o = m_action->addsubQ.back(); + m_action->addsubQ.pop_back(); + addsub_operation* as = S3SELECT_NEW(addsub_operation, l, o, r); + m_action->exprQ.push_back(as); + } +}; +static push_addsub_binop g_push_addsub_binop; + +struct push_mulldiv_binop : public base_action +{ + void operator()(const char* a, const char* b) const + { + base_statement* vl = 0, *vr = 0; + + vr = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + vl = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + mulldiv_operation::muldiv_t o = m_action->muldivQ.back(); + m_action->muldivQ.pop_back(); + mulldiv_operation* f = S3SELECT_NEW(mulldiv_operation, vl, o, vr); + m_action->exprQ.push_back(f); + } +}; +static push_mulldiv_binop g_push_mulldiv_binop; + +struct push_function_arg : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* be = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + base_statement* f = m_action->funcQ.back(); + + if (dynamic_cast<__function*>(f)) + { + dynamic_cast<__function*>(f)->push_argument(be); + } + } +}; +static push_function_arg g_push_function_arg; + +struct push_function_name : public base_action +{ + void operator()(const char* a, const char* b) const + { + b--; + while(*b=='(' || *b == ' ') + { + b--; //point to function-name + } + + std::string fn; + fn.assign(a, b-a+1); + + __function* func = S3SELECT_NEW(__function, fn.c_str(), m_s3select_functions); + m_action->funcQ.push_back(func); + } +}; +static push_function_name g_push_function_name; + +struct push_function_expr : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* func = m_action->funcQ.back(); + m_action->funcQ.pop_back(); + + m_action->exprQ.push_back(func); + } +}; +static push_function_expr g_push_function_expr; + +////////////////////// logical unit //////////////////////// + +struct push_compare_operator : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + arithmetic_operand::cmp_t c = arithmetic_operand::cmp_t::NA; + + if (token.compare("==") == 0) + { + c = arithmetic_operand::cmp_t::EQ; + } + else if (token.compare("!=") == 0) + { + c = arithmetic_operand::cmp_t::NE; + } + else if (token.compare(">=") == 0) + { + c = arithmetic_operand::cmp_t::GE; + } + else if (token.compare("<=") == 0) + { + c = arithmetic_operand::cmp_t::LE; + } + else if (token.compare(">") == 0) + { + c = arithmetic_operand::cmp_t::GT; + } + else if (token.compare("<") == 0) + { + c = arithmetic_operand::cmp_t::LT; + } + else + { + c = arithmetic_operand::cmp_t::NA; + } + + m_action->arithmetic_compareQ.push_back(c); + } +}; +static push_compare_operator g_push_compare_operator; + +struct push_logical_operator : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + logical_operand::oplog_t l = logical_operand::oplog_t::NA; + + if (token.compare("and") == 0) + { + l = logical_operand::oplog_t::AND; + } + else if (token.compare("or") == 0) + { + l = logical_operand::oplog_t::OR; + } + else + { + l = logical_operand::oplog_t::NA; + } + + m_action->logical_compareQ.push_back(l); + + } +}; +static push_logical_operator g_push_logical_operator; + +struct push_arithmetic_predicate : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* vr, *vl; + arithmetic_operand::cmp_t c = m_action->arithmetic_compareQ.back(); + m_action->arithmetic_compareQ.pop_back(); + vr = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + vl = m_action->exprQ.back(); + m_action->exprQ.pop_back(); + + arithmetic_operand* t = S3SELECT_NEW(arithmetic_operand, vl, c, vr); + + m_action->condQ.push_back(t); + } +}; +static push_arithmetic_predicate g_push_arithmetic_predicate; + +struct push_logical_predicate : public base_action +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + base_statement* tl = 0, *tr = 0; + logical_operand::oplog_t oplog = m_action->logical_compareQ.back(); + m_action->logical_compareQ.pop_back(); + + if (m_action->condQ.empty() == false) + { + tr = m_action->condQ.back(); + m_action->condQ.pop_back(); + } + if (m_action->condQ.empty() == false) + { + tl = m_action->condQ.back(); + m_action->condQ.pop_back(); + } + + logical_operand* f = S3SELECT_NEW(logical_operand, tl, oplog, tr); + + m_action->condQ.push_back(f); + } +}; +static push_logical_predicate g_push_logical_predicate; + +struct push_column_pos : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + variable* v; + + if (token.compare("*") == 0 || token.compare("* ")==0) //TODO space should skip in boost::spirit + { + v = S3SELECT_NEW(variable, token, variable::var_t::STAR_OPERATION); + } + else + { + v = S3SELECT_NEW(variable, token, variable::var_t::POS); + } + + m_action->exprQ.push_back(v); + } + +}; +static push_column_pos g_push_column_pos; + +struct push_projection : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + m_action->projections.get()->push_back( m_action->exprQ.back() ); + m_action->exprQ.pop_back(); + } + +}; +static push_projection g_push_projection; + +struct push_alias_projection : public base_action +{ + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + //extract alias name + const char* p=b; + while(*(--p) != ' '); + std::string alias_name(p+1, b); + base_statement* bs = m_action->exprQ.back(); + + //mapping alias name to base-statement + bool res = m_action->alias_map.insert_new_entry(alias_name, bs); + if (res==false) + { + throw base_s3select_exception(std::string("alias <")+alias_name+std::string("> is already been used in query"), base_s3select_exception::s3select_exp_en_t::FATAL); + } + + + m_action->projections.get()->push_back( bs ); + m_action->exprQ.pop_back(); + } + +}; +static push_alias_projection g_push_alias_projection; + +/// for the schema description "mini-parser" +struct push_column : public base_action +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + + m_action->schema_columns.push_back(token); + } + +}; +static push_column g_push_column; + +struct push_debug_1 : public base_action +{ + + void operator()(const char* a, const char* b) const + { + std::string token(a, b); + } + +}; +static push_debug_1 g_push_debug_1; + +struct s3select : public bsc::grammar<s3select> +{ +private: + + actionQ m_actionQ; + + scratch_area m_sca; + + s3select_functions m_s3select_functions; + + std::string error_description; + + s3select_allocator m_s3select_allocator; + + bool aggr_flow; + +#define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2) +#define ATTACH_ACTION_Q( push_name ) {(g_ ## push_name).set_action_q(&m_actionQ); (g_ ## push_name).set_s3select_functions(&m_s3select_functions); (g_ ## push_name).set(&m_s3select_allocator);} + +public: + + + int semantic() + { + for (auto e : get_projections_list()) + { + base_statement* aggr = 0; + + if ((aggr = e->get_aggregate()) != 0) + { + if (aggr->is_nested_aggregate(aggr)) + { + error_description = "nested aggregation function is illegal i.e. sum(...sum ...)"; + throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL); + } + + aggr_flow = true; + } + } + + if (aggr_flow == true) + for (auto e : get_projections_list()) + { + base_statement* skip_expr = e->get_aggregate(); + + if (e->is_binop_aggregate_and_column(skip_expr)) + { + error_description = "illegal expression. /select sum(c1) + c1 ..../ is not allow type of query"; + throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL); + } + } + + return 0; + } + + int parse_query(const char* input_query) + { + if(get_projections_list().empty() == false) + { + return 0; //already parsed + } + + try + { + bsc::parse_info<> info = bsc::parse(input_query, *this, bsc::space_p); + auto query_parse_position = info.stop; + + if (!info.full) + { + std::cout << "failure -->" << query_parse_position << "<---" << std::endl; + error_description = std::string("failure -->") + query_parse_position + std::string("<---"); + return -1; + } + + semantic(); + } + catch (base_s3select_exception& e) + { + std::cout << e.what() << std::endl; + error_description.assign(e.what()); + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution + { + return -1; + } + } + + return 0; + } + + std::string get_error_description() + { + return error_description; + } + + s3select() + { + //TODO check option for defining action and push into list + + ATTACH_ACTION_Q(push_from_clause); + ATTACH_ACTION_Q(push_number); + ATTACH_ACTION_Q(push_logical_operator); + ATTACH_ACTION_Q(push_logical_predicate); + ATTACH_ACTION_Q(push_compare_operator); + ATTACH_ACTION_Q(push_arithmetic_predicate); + ATTACH_ACTION_Q(push_addsub); + ATTACH_ACTION_Q(push_addsub_binop); + ATTACH_ACTION_Q(push_mulop); + ATTACH_ACTION_Q(push_mulldiv_binop); + ATTACH_ACTION_Q(push_function_arg); + ATTACH_ACTION_Q(push_function_name); + ATTACH_ACTION_Q(push_function_expr); + ATTACH_ACTION_Q(push_float_number); + ATTACH_ACTION_Q(push_string); + ATTACH_ACTION_Q(push_variable); + ATTACH_ACTION_Q(push_column_pos); + ATTACH_ACTION_Q(push_projection); + ATTACH_ACTION_Q(push_alias_projection); + ATTACH_ACTION_Q(push_debug_1); + + error_description.clear(); + + m_s3select_functions.set(&m_s3select_allocator); + + aggr_flow = false; + } + + bool is_semantic()//TBD traverse and validate semantics per all nodes + { + base_statement* cond = m_actionQ.exprQ.back(); + + return cond->semantic(); + } + + std::string get_from_clause() + { + return m_actionQ.from_clause; + } + + void load_schema(std::vector< std::string>& scm) + { + int i = 0; + for (auto c : scm) + { + m_sca.set_column_pos(c.c_str(), i++); + } + } + + base_statement* get_filter() + { + if(m_actionQ.condQ.size()==0) + { + return NULL; + } + + return m_actionQ.condQ.back(); + } + + std::vector<base_statement*> get_projections_list() + { + return *m_actionQ.projections.get(); //TODO return COPY(?) or to return evalaution results (list of class value{}) / return reference(?) + } + + scratch_area* get_scratch_area() + { + return &m_sca; + } + + projection_alias* get_aliases() + { + return &m_actionQ.alias_map; + } + + bool is_aggregate_query() + { + return aggr_flow == true; + } + + ~s3select() {} + + + template <typename ScannerT> + struct definition + { + definition(s3select const& ) + { + ///// s3select syntax rules and actions for building AST + + select_expr = bsc::str_p("select") >> projections >> bsc::str_p("from") >> (s3_object)[BOOST_BIND_ACTION(push_from_clause)] >> !where_clause >> ';'; + + projections = projection_expression >> *( ',' >> projection_expression) ; + + projection_expression = (arithmetic_expression >> bsc::str_p("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] | (arithmetic_expression)[BOOST_BIND_ACTION(push_projection)] ; + + alias_name = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)] ; + + + s3_object = bsc::str_p("stdin") | object_path ; + + object_path = "/" >> *( fs_type >> "/") >> fs_type; + + fs_type = bsc::lexeme_d[+( bsc::alnum_p | bsc::str_p(".") | bsc::str_p("_")) ]; + + where_clause = bsc::str_p("where") >> condition_expression; + + condition_expression = (arithmetic_predicate >> *(log_op[BOOST_BIND_ACTION(push_logical_operator)] >> arithmetic_predicate[BOOST_BIND_ACTION(push_logical_predicate)])); + + arithmetic_predicate = (factor >> *(arith_cmp[BOOST_BIND_ACTION(push_compare_operator)] >> factor[BOOST_BIND_ACTION(push_arithmetic_predicate)])); + + factor = (arithmetic_expression) | ('(' >> condition_expression >> ')') ; + + arithmetic_expression = (addsub_operand >> *(addsubop_operator[BOOST_BIND_ACTION(push_addsub)] >> addsub_operand[BOOST_BIND_ACTION(push_addsub_binop)] )); + + addsub_operand = (mulldiv_operand >> *(muldiv_operator[BOOST_BIND_ACTION(push_mulop)] >> mulldiv_operand[BOOST_BIND_ACTION(push_mulldiv_binop)] ));// this non-terminal gives precedense to mull/div + + mulldiv_operand = arithmetic_argument | ('(' >> (arithmetic_expression) >> ')') ; + + list_of_function_arguments = (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)] >> *(',' >> (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)]); + function = ((variable >> '(' )[BOOST_BIND_ACTION(push_function_name)] >> !list_of_function_arguments >> ')')[BOOST_BIND_ACTION(push_function_expr)]; + + arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] | (number)[BOOST_BIND_ACTION(push_number)] | (column_pos)[BOOST_BIND_ACTION(push_column_pos)] | + (string)[BOOST_BIND_ACTION(push_string)] | + (function)[BOOST_BIND_ACTION(push_debug_1)] | (variable)[BOOST_BIND_ACTION(push_variable)] ;//function is pushed by right-term + + + number = bsc::int_p; + + float_number = bsc::real_p; + + string = bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"") ; + + column_pos = ('_'>>+(bsc::digit_p) ) | '*' ; + + muldiv_operator = bsc::str_p("*") | bsc::str_p("/") | bsc::str_p("^");// got precedense + + addsubop_operator = bsc::str_p("+") | bsc::str_p("-"); + + + arith_cmp = bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("==") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!="); + + log_op = bsc::str_p("and") | bsc::str_p("or"); //TODO add NOT (unary) + + variable = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)]; + } + + + bsc::rule<ScannerT> variable, select_expr, s3_object, where_clause, number, float_number, string, arith_cmp, log_op, condition_expression, arithmetic_predicate, factor; + bsc::rule<ScannerT> muldiv_operator, addsubop_operator, function, arithmetic_expression, addsub_operand, list_of_function_arguments, arithmetic_argument, mulldiv_operand; + bsc::rule<ScannerT> fs_type, object_path; + bsc::rule<ScannerT> projections, projection_expression, alias_name, column_pos; + bsc::rule<ScannerT> const& start() const + { + return select_expr; + } + }; +}; + +/////// handling different object types +class base_s3object +{ + +protected: + scratch_area* m_sa; + std::string m_obj_name; + +public: + base_s3object(scratch_area* m) : m_sa(m), m_obj_name("") {} + + void set(scratch_area* m) + { + m_sa = m; + m_obj_name = ""; + } + + virtual ~base_s3object() {} +}; + + +class csv_object : public base_s3object +{ + +public: + struct csv_defintions + { + char row_delimiter; + char column_delimiter; + char escape_char; + char quot_char; + bool use_header_info; + bool ignore_header_info;//skip first line + + csv_defintions():row_delimiter('\n'), column_delimiter(','), escape_char('\\'), quot_char('"'), use_header_info(false), ignore_header_info(false) {} + + } m_csv_defintion; + + csv_object(s3select* s3_query) : + base_s3object(s3_query->get_scratch_area()), + m_skip_last_line(false), + m_s3_select(0), + m_error_count(0), + m_extract_csv_header_info(false), + m_previous_line(false), + m_skip_first_line(false), + m_processed_bytes(0) + { + set(s3_query); + csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char); + } + + csv_object(s3select* s3_query, struct csv_defintions csv) : + base_s3object(s3_query->get_scratch_area()), + m_skip_last_line(false), + m_s3_select(0), + m_error_count(0), + m_extract_csv_header_info(false), + m_previous_line(false), + m_skip_first_line(false), + m_processed_bytes(0) + { + set(s3_query); + m_csv_defintion = csv; + csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char); + } + + csv_object(): + base_s3object(0), + m_skip_last_line(false), + m_s3_select(0), + m_error_count(0), + m_extract_csv_header_info(false), + m_previous_line(false), + m_skip_first_line(false), + m_processed_bytes(0) + { + csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char); + } + +private: + base_statement* m_where_clause; + std::vector<base_statement*> m_projections; + bool m_aggr_flow = false; //TODO once per query + bool m_is_to_aggregate; + bool m_skip_last_line; + size_t m_stream_length; + std::string m_error_description; + char* m_stream; + char* m_end_stream; + std::vector<char*> m_row_tokens{128}; + s3select* m_s3_select; + csvParser csv_parser; + size_t m_error_count; + bool m_extract_csv_header_info; + std::vector<std::string> m_csv_schema{128}; + + //handling arbitrary chunks (rows cut in the middle) + bool m_previous_line; + bool m_skip_first_line; + std::string merge_line; + std::string m_last_line; + size_t m_processed_bytes; + + int getNextRow() + { + size_t num_of_tokens=0; + + if(m_stream>=m_end_stream) + { + return -1; + } + + if(csv_parser.parse(m_stream, m_end_stream, &m_row_tokens, &num_of_tokens)<0) + { + throw base_s3select_exception("failed to parse csv stream", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + m_stream = (char*)csv_parser.currentLoc(); + + if (m_skip_last_line && m_stream >= m_end_stream) + { + return -1; + } + + return num_of_tokens; + + } + +public: + + void set(s3select* s3_query) + { + m_s3_select = s3_query; + base_s3object::set(m_s3_select->get_scratch_area()); + + m_projections = m_s3_select->get_projections_list(); + m_where_clause = m_s3_select->get_filter(); + + if (m_where_clause) + { + m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases()); + } + + for (auto p : m_projections) + { + p->traverse_and_apply(m_sa, m_s3_select->get_aliases()); + } + + m_aggr_flow = m_s3_select->is_aggregate_query(); + } + + + std::string get_error_description() + { + return m_error_description; + } + + virtual ~csv_object() {} + +public: + + + int getMatchRow( std::string& result) //TODO virtual ? getResult + { + int number_of_tokens = 0; + + + if (m_aggr_flow == true) + { + do + { + + number_of_tokens = getNextRow(); + if (number_of_tokens < 0) //end of stream + { + if (m_is_to_aggregate) + for (auto i : m_projections) + { + i->set_last_call(); + result.append( i->eval().to_string() ); + result.append(","); + } + + return number_of_tokens; + } + + if ((*m_projections.begin())->is_set_last_call()) + { + //should validate while query execution , no update upon nodes are marked with set_last_call + throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + m_sa->update(m_row_tokens, number_of_tokens); + for (auto a : *m_s3_select->get_aliases()->get()) + { + a.second->invalidate_cache_result(); + } + + if (!m_where_clause || m_where_clause->eval().i64() == true) + for (auto i : m_projections) + { + i->eval(); + } + + } + while (1); + } + else + { + + do + { + + number_of_tokens = getNextRow(); + if (number_of_tokens < 0) + { + return number_of_tokens; + } + + m_sa->update(m_row_tokens, number_of_tokens); + for (auto a : *m_s3_select->get_aliases()->get()) + { + a.second->invalidate_cache_result(); + } + + } + while (m_where_clause && m_where_clause->eval().i64() == false); + + for (auto i : m_projections) + { + result.append( i->eval().to_string() ); + result.append(","); + } + result.append("\n"); + } + + return number_of_tokens; //TODO wrong + } + + int extract_csv_header_info() + { + + if (m_csv_defintion.ignore_header_info == true) + { + while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter )) + { + m_stream++; + } + m_stream++; + } + else if(m_csv_defintion.use_header_info == true) + { + size_t num_of_tokens = getNextRow();//TODO validate number of tokens + + for(size_t i=0; i<num_of_tokens; i++) + { + m_csv_schema[i].assign(m_row_tokens[i]); + } + + m_s3_select->load_schema(m_csv_schema); + } + + m_extract_csv_header_info = true; + + return 0; + } + + int run_s3select_on_stream(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size) + { + //purpose: the cv data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores + //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed. + int status; + std::string tmp_buff; + u_int32_t skip_last_bytes = 0; + m_processed_bytes += stream_length; + + m_skip_first_line = false; + + if (m_previous_line) + { + //if previous broken line exist , merge it to current chunk + char* p_obj_chunk = (char*)csv_stream; + while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk<(csv_stream+stream_length)) + { + p_obj_chunk++; + } + + tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream)); + merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter; + m_previous_line = false; + m_skip_first_line = true; + + status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false); + } + + if (csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter) + { + //in case of "broken" last line + char* p_obj_chunk = (char*)&(csv_stream[stream_length - 1]); + while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk>csv_stream) + { + p_obj_chunk--; //scan until end-of previous line in chunk + } + + skip_last_bytes = (&(csv_stream[stream_length - 1]) - p_obj_chunk); + m_last_line.assign(p_obj_chunk + 1, p_obj_chunk + 1 + skip_last_bytes); //save it for next chunk + + m_previous_line = true;//it means to skip last line + + } + + status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size)); + + return status; + } + + int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate) + { + + + m_stream = (char*)csv_stream; + m_end_stream = (char*)csv_stream + stream_length; + m_is_to_aggregate = do_aggregate; + m_skip_last_line = skip_last_line; + + m_stream_length = stream_length; + + if(m_extract_csv_header_info == false) + { + extract_csv_header_info(); + } + + if(skip_first_line) + { + while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter )) + { + m_stream++; + } + m_stream++;//TODO nicer + } + + do + { + + int num = 0; + try + { + num = getMatchRow(result); + } + catch (base_s3select_exception& e) + { + std::cout << e.what() << std::endl; + m_error_description = e.what(); + m_error_count ++; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100)//abort query execution + { + return -1; + } + } + + if (num < 0) + { + break; + } + + } + while (1); + + return 0; + } +}; + +};//namespace + +#endif diff --git a/src/s3select/include/s3select_csv_parser.h b/src/s3select/include/s3select_csv_parser.h new file mode 100644 index 000000000..5527da913 --- /dev/null +++ b/src/s3select/include/s3select_csv_parser.h @@ -0,0 +1,407 @@ +#include <iostream> + +#include <boost/mpl/vector/vector30.hpp> +// back-end +#include <boost/msm/back/state_machine.hpp> +//front-end +#include <boost/msm/front/state_machine_def.hpp> + +#include <vector> + +namespace msm = boost::msm; +namespace mpl = boost::mpl; + +namespace s3selectEngine +{ +// events +struct event_column_sep {}; +struct event_eol {}; +struct event_end_of_stream {}; +struct event_not_column_sep {};//i.e any char +struct event_quote {}; +struct event_escape {}; +struct event_empty {}; + + +// front-end: define the FSM structure +struct csvStateMch_ : public msm::front::state_machine_def<csvStateMch_> +{ + char* input_stream; + std::vector<char*>* tokens; + std::vector<int> has_esc{128}; + size_t token_idx; + size_t escape_idx; + char* input_cur_location; + char* start_token; + bool end_of_parse; + + typedef csvStateMch_ csv_rules; + + csvStateMch_():end_of_parse(false) {} + + void set(const char* input, std::vector<char*>* tk) + { + input_cur_location = input_stream = const_cast<char*>(input); + token_idx = 0; + tokens = tk; + escape_idx = 0; + } + + char get_char() + { + return *input_cur_location; + } + + char get_next_char(const char * end_stream) + { + if (input_cur_location >= end_stream) + return 0; + + input_cur_location++; + return *input_cur_location; + } + + const char* currentLoc() + { + return input_cur_location; + } + + void parse_escape(char* in, char esc_char='\\') + { + //assumption atleast one escape and single + char* dst, *src; + + dst = src = in; + + while (1) + { + while (*src && *src != esc_char) + { + src++; //search for escape + } + + if (!*src) //reach end + { + char* p = src; + while (dst < src) + { + *dst++ = *p++; //full copy + } + return; + } + //found escape + dst = src; //override escape + //if(*(dst+1)=='n') {*dst=10;dst++;} //enables special character + + while (*dst) + { + *dst = *(dst + 1); + dst++; + } //copy with shift + } + } + + // The list of FSM states + struct Start_new_token_st : public msm::front::state<> + {};//0 + + struct In_new_token_st : public msm::front::state<> + {};//1 + + struct In_quote_st : public msm::front::state<> + {};//2 + + struct In_esc_in_token_st : public msm::front::state<> + {};//3 + + struct In_esc_quote_st : public msm::front::state<> + {};//4 + + struct In_esc_start_token_st : public msm::front::state<> + {};//5 + + struct End_of_line_st : public msm::front::state<> + {};//6 + + struct Empty_state : public msm::front::state<> + {};//7 + + + // the initial state of the csvStateMch SM. Must be defined + typedef Start_new_token_st initial_state; + + void start_new_token()//helper + { + start_token = input_cur_location; + (*tokens)[ token_idx ] = start_token; + token_idx++; + } + + // transition actions + void start_new_token(event_column_sep const&) + { + *input_cur_location = 0;//remove column-delimiter + start_new_token(); + } + + void start_new_token(event_not_column_sep const&) + { + start_new_token(); + } + + //need to handle empty lines(no tokens); + void start_new_token(event_eol const&) + { + if(!token_idx) + { + return; + } + (*tokens)[ token_idx ] = start_token; + token_idx++; + } + + void start_new_token(event_end_of_stream const&) {} + + void in_new_token(event_not_column_sep const&) + { + if(!*start_token) + { + start_token = input_cur_location; + } + } + + void in_new_token(event_eol const&) + { + *input_cur_location=0; + } + + void in_new_token(event_end_of_stream const&) {} + + void in_new_token(event_column_sep const&) + { + (*tokens)[ token_idx ] = input_cur_location+1; + *input_cur_location=0; + token_idx++; + } + + void in_new_token(event_quote const&) + { + if(!*start_token) + { + start_token = input_cur_location; + } + } + + void in_quote(event_quote const&) {} + + void in_quote(event_column_sep const&) {} + + void in_quote(event_not_column_sep const&) {} + + void in_quote(event_eol const&) + { + *input_cur_location=0; + } + + void in_quote(event_end_of_stream const&) + { + *input_cur_location=0; + } + + void start_new_token(event_quote const&) + { + start_new_token(); + } + + void push_escape_pos() + { + if(escape_idx && has_esc[ escape_idx -1]== (int)(token_idx-1)) + { + return; + } + has_esc[ escape_idx ] = token_idx-1; + escape_idx++; + } + void in_escape(event_escape const&) + { + push_escape_pos(); + } + void in_escape_start_new_token(event_escape const&) + { + start_new_token(); + push_escape_pos(); + } + + void in_escape(event_column_sep const&) {} + void in_escape(event_not_column_sep const&) {} + void in_escape(event_quote const&) {} + void in_escape(event_eol const&) {} + void in_escape(event_end_of_stream const&) {} + + void empty_action(event_empty const&) {} + + //TODO need a guard for tokens vector size (<MAX) + // Transition table for csvStateMch + struct transition_table : mpl::vector30< + // Start Event Next Action Guard + // +---------+-------------+---------+---------------------+----------------------+ + a_row < Start_new_token_st, event_column_sep , Start_new_token_st , &csv_rules::start_new_token >, + a_row < Start_new_token_st, event_not_column_sep , In_new_token_st , &csv_rules::start_new_token >, + a_row < Start_new_token_st, event_eol , End_of_line_st , &csv_rules::start_new_token >, + a_row < Start_new_token_st, event_end_of_stream , End_of_line_st , &csv_rules::start_new_token >, + a_row < In_new_token_st , event_not_column_sep, In_new_token_st, &csv_rules::in_new_token >, + a_row < In_new_token_st , event_column_sep , In_new_token_st, &csv_rules::in_new_token >, + a_row < In_new_token_st , event_eol , End_of_line_st, &csv_rules::in_new_token >, + a_row < In_new_token_st , event_end_of_stream , End_of_line_st, &csv_rules::in_new_token >, + + a_row < Start_new_token_st , event_quote , In_quote_st, &csv_rules::start_new_token >, //open quote + a_row < In_new_token_st , event_quote , In_quote_st, &csv_rules::in_quote >, //open quote + a_row < In_quote_st , event_quote , In_new_token_st, &csv_rules::in_quote >, //close quote + a_row < In_quote_st , event_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote + a_row < In_quote_st , event_not_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote + a_row < In_quote_st , event_eol , End_of_line_st, &csv_rules::in_quote >, //end of quote/line + a_row < In_quote_st , event_end_of_stream , End_of_line_st, &csv_rules::in_quote >, //end of quote/line + + + //TODO add transitions for escape just before eol , eos. + a_row < Start_new_token_st , event_escape , In_esc_start_token_st, &csv_rules::in_escape_start_new_token >, + a_row < In_esc_start_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >, //escape column-sep + a_row < In_esc_start_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_start_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_start_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >, + + a_row < In_new_token_st, event_escape, In_esc_in_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >, + a_row < In_esc_in_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >, + + a_row < In_quote_st, event_escape, In_esc_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_column_sep, In_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_not_column_sep, In_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_escape, In_quote_st, &csv_rules::in_escape >, + a_row < In_esc_quote_st, event_quote, In_quote_st, &csv_rules::in_escape > + + // +---------+-------------+---------+---------------------+----------------------+ + > {}; + + // Replaces the default no-transition response. + template <class FSM, class Event> + void no_transition(Event const& e, FSM&, int state) + { + std::cout << "no transition from state " << state + << " on event " << typeid(e).name() << std::endl; + } +}; //// end-of-state-machine + + + +// Pick a back-end +typedef msm::back::state_machine<csvStateMch_> csvStateMch; + +// +// Testing utilities. +// + +static char const* const state_names[] = {"Start_new_token_st", "In_new_token_st", "In_quote_st", "In_esc_in_token_st", + "In_esc_quote_st", "In_esc_start_token_st", "End_of_line_st", "Empty_state" + }; +void pstate(csvStateMch const& p)//debug +{ + std::cout << " -> " << state_names[p.current_state()[0]] << std::endl; +} + + +class csvParser +{ + + csvStateMch p; + + char m_row_delimeter; + char m_column_delimiter; + char m_quote_char; + char m_escape_char; + +public: + + csvParser(char rd='\n', char cd=',', char quot='"', char ec='\\'):m_row_delimeter(rd), m_column_delimiter(cd), m_quote_char(quot), m_escape_char(ec) {}; + + void set(char row_delimiter, char column_delimiter, char quot_char, char escape_char) + { + m_row_delimeter = row_delimiter; + m_column_delimiter = column_delimiter; + m_quote_char = quot_char; + m_escape_char = escape_char; + } + + int parse(char* input_stream, char* end_stream, std::vector<char*>* tk, size_t* num_of_tokens) + { + p.set(input_stream, tk); + + // needed to start the highest-level SM. This will call on_entry and mark the start of the SM + p.start(); + + //TODO for better performance to use template specialization (\n \ , ") + do + { + if (p.get_char() == m_row_delimeter) + { + p.process_event(event_eol()); + } + else if (p.get_char() == m_column_delimiter) + { + p.process_event(event_column_sep()); + } + else if (p.get_char() == 0) + { + p.process_event(event_end_of_stream()); + } + else if (p.get_char() == m_quote_char) + { + p.process_event(event_quote()); + } + else if (p.get_char() == m_escape_char) + { + p.process_event(event_escape()); + } + else + { + p.process_event(event_not_column_sep()); + } + + if (p.tokens->capacity() <= p.token_idx) + { + return -1; + } + + if (p.currentLoc() >= end_stream) + { + break; + } + p.get_next_char(end_stream); + } + while (p.current_state()[0] != 6); + + p.stop(); + + *num_of_tokens = p.token_idx; + + //second pass for escape rules; only token with escape are processed, if any. + for(size_t i=0; i<p.escape_idx; i++) + { + p.parse_escape((*tk)[p.has_esc[i]], m_escape_char); + } + + return 0; + } + + const char* currentLoc() + { + return p.currentLoc(); + } + +};//end csv-parser + +}//end-namespace + + diff --git a/src/s3select/include/s3select_functions.h b/src/s3select/include/s3select_functions.h new file mode 100644 index 000000000..77b3db3a7 --- /dev/null +++ b/src/s3select/include/s3select_functions.h @@ -0,0 +1,1037 @@ +#ifndef __S3SELECT_FUNCTIONS__ +#define __S3SELECT_FUNCTIONS__ + + +#include "s3select_oper.h" +#define BOOST_BIND_ACTION_PARAM( push_name ,param ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2, param) +namespace s3selectEngine +{ + +struct push_2dig +{ + void operator()(const char* a, const char* b, uint32_t* n) const + { + *n = ((char)(*a) - 48) *10 + ((char)*(a+1)-48) ; + } + +}; +static push_2dig g_push_2dig; + +struct push_4dig +{ + void operator()(const char* a, const char* b, uint32_t* n) const + { + *n = ((char)(*a) - 48) *1000 + ((char)*(a+1)-48)*100 + ((char)*(a+2)-48)*10 + ((char)*(a+3)-48); + } + +}; +static push_4dig g_push_4dig; + +enum class s3select_func_En_t {ADD, + SUM, + MIN, + MAX, + COUNT, + TO_INT, + TO_FLOAT, + TO_TIMESTAMP, + SUBSTR, + EXTRACT, + DATE_ADD, + DATE_DIFF, + UTCNOW + }; + + +class s3select_functions : public __clt_allocator +{ + +private: + + using FunctionLibrary = std::map<std::string, s3select_func_En_t>; + const FunctionLibrary m_functions_library = + { + {"add", s3select_func_En_t::ADD}, + {"sum", s3select_func_En_t::SUM}, + {"count", s3select_func_En_t::COUNT}, + {"min", s3select_func_En_t::MIN}, + {"max", s3select_func_En_t::MAX}, + {"int", s3select_func_En_t::TO_INT}, + {"float", s3select_func_En_t::TO_FLOAT}, + {"substr", s3select_func_En_t::SUBSTR}, + {"timestamp", s3select_func_En_t::TO_TIMESTAMP}, + {"extract", s3select_func_En_t::EXTRACT}, + {"dateadd", s3select_func_En_t::DATE_ADD}, + {"datediff", s3select_func_En_t::DATE_DIFF}, + {"utcnow", s3select_func_En_t::UTCNOW} + }; + +public: + + base_function* create(std::string fn_name); +}; + +class __function : public base_statement +{ + +private: + std::vector<base_statement*> arguments; + std::string name; + base_function* m_func_impl; + s3select_functions* m_s3select_functions; + variable m_result; + + void _resolve_name() + { + if (m_func_impl) + { + return; + } + + base_function* f = m_s3select_functions->create(name); + if (!f) + { + throw base_s3select_exception("function not found", base_s3select_exception::s3select_exp_en_t::FATAL); //should abort query + } + m_func_impl = f; + } + +public: + virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa) + { + m_scratch = sa; + m_aliases = pa; + for (base_statement* ba : arguments) + { + ba->traverse_and_apply(sa, pa); + } + } + + virtual bool is_aggregate() // TODO under semantic flow + { + _resolve_name(); + + return m_func_impl->is_aggregate(); + } + + virtual bool semantic() + { + return true; + } + + __function(const char* fname, s3select_functions* s3f) : name(fname), m_func_impl(0), m_s3select_functions(s3f) {} + + virtual value& eval() + { + + _resolve_name(); + + if (is_last_call == false) + { + (*m_func_impl)(&arguments, &m_result); + } + else + { + (*m_func_impl).get_aggregate_result(&m_result); + } + + return m_result.get_value(); + } + + + + virtual std::string print(int ident) + { + return std::string(0); + } + + void push_argument(base_statement* arg) + { + arguments.push_back(arg); + } + + + std::vector<base_statement*> get_arguments() + { + return arguments; + } + + virtual ~__function() + { + arguments.clear(); + } +}; + + + +/* + s3-select function defintions +*/ +struct _fn_add : public base_function +{ + + value var_result; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + iter++; + base_statement* y = *iter; + + var_result = x->eval() + y->eval(); + + *result = var_result; + + return true; + } +}; + +struct _fn_sum : public base_function +{ + + value sum; + + _fn_sum() : sum(0) + { + aggregate = true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + try + { + sum = sum + x->eval(); + } + catch (base_s3select_exception& e) + { + std::cout << "illegal value for aggregation(sum). skipping." << std::endl; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) + { + throw; + } + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = sum ; + } +}; + +struct _fn_count : public base_function +{ + + int64_t count; + + _fn_count():count(0) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + count += 1; + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + result->set_value(count); + } + +}; + +struct _fn_min : public base_function +{ + + value min; + + _fn_min():min(__INT64_MAX__) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + if(min > x->eval()) + { + min=x->eval(); + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = min; + } + +}; + +struct _fn_max : public base_function +{ + + value max; + + _fn_max():max(-__INT64_MAX__) + { + aggregate=true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + base_statement* x = *iter; + + if(max < x->eval()) + { + max=x->eval(); + } + + return true; + } + + virtual void get_aggregate_result(variable* result) + { + *result = max; + } + +}; + +struct _fn_to_int : public base_function +{ + + value var_result; + value func_arg; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + char* perr; + int64_t i=0; + func_arg = (*args->begin())->eval(); + + if (func_arg.type == value::value_En_t::STRING) + { + i = strtol(func_arg.str(), &perr, 10) ; //TODO check error before constructor + } + else if (func_arg.type == value::value_En_t::FLOAT) + { + i = func_arg.dbl(); + } + else + { + i = func_arg.i64(); + } + + var_result = i ; + *result = var_result; + + return true; + } + +}; + +struct _fn_to_float : public base_function +{ + + value var_result; + value v_from; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + char* perr; + double d=0; + value v = (*args->begin())->eval(); + + if (v.type == value::value_En_t::STRING) + { + d = strtod(v.str(), &perr) ; //TODO check error before constructor + } + else if (v.type == value::value_En_t::FLOAT) + { + d = v.dbl(); + } + else + { + d = v.i64(); + } + + var_result = d; + *result = var_result; + + return true; + } + +}; + +struct _fn_to_timestamp : public base_function +{ + bsc::rule<> separator = bsc::ch_p(":") | bsc::ch_p("-"); + + uint32_t yr = 1700, mo = 1, dy = 1; + bsc::rule<> dig4 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p >> bsc::digit_p >> bsc::digit_p]; + bsc::rule<> dig2 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p]; + + bsc::rule<> d_yyyymmdd_dig = ((dig4[BOOST_BIND_ACTION_PARAM(push_4dig, &yr)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mo)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &dy)]) >> *(separator)); + + uint32_t hr = 0, mn = 0, sc = 0; + bsc::rule<> d_time_dig = ((dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &hr)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mn)]) >> *(separator) + >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &sc)]) >> *(separator)); + + boost::posix_time::ptime new_ptime; + + value v_str; + + + bool datetime_validation() + { + //TODO temporary , should check for leap year + + if(yr<1700 || yr>2050) + { + return false; + } + if (mo<1 || mo>12) + { + return false; + } + if (dy<1 || dy>31) + { + return false; + } + if (hr>23) + { + return false; + } + if (dy>59) + { + return false; + } + if (sc>59) + { + return false; + } + + return true; + } + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + + hr = 0; + mn = 0; + sc = 0; + + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size != 1) + { + throw base_s3select_exception("to_timestamp should have one parameter"); + } + + base_statement* str = *iter; + + v_str = str->eval(); + + if (v_str.type != value::value_En_t::STRING) + { + throw base_s3select_exception("to_timestamp first argument must be string"); //can skip current row + } + + bsc::parse_info<> info_dig = bsc::parse(v_str.str(), d_yyyymmdd_dig >> *(separator) >> d_time_dig); + + if(datetime_validation()==false or !info_dig.full) + { + throw base_s3select_exception("input date-time is illegal"); + } + + new_ptime = boost::posix_time::ptime(boost::gregorian::date(yr, mo, dy), + boost::posix_time::hours(hr) + boost::posix_time::minutes(mn) + boost::posix_time::seconds(sc)); + + result->set_value(&new_ptime); + + return true; + } + +}; + +struct _fn_extact_from_timestamp : public base_function +{ + + boost::posix_time::ptime new_ptime; + + value val_date_part; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 2) + { + throw base_s3select_exception("to_timestamp should have 2 parameters"); + } + + base_statement* date_part = *iter; + + val_date_part = date_part->eval();//TODO could be done once? + + if(val_date_part.is_string()== false) + { + throw base_s3select_exception("first parameter should be string"); + } + + iter++; + + base_statement* ts = *iter; + + if(ts->eval().is_timestamp()== false) + { + throw base_s3select_exception("second parameter is not timestamp"); + } + + new_ptime = *ts->eval().timestamp(); + + if( strcmp(val_date_part.str(), "year")==0 ) + { + result->set_value( (int64_t)new_ptime.date().year() ); + } + else if( strcmp(val_date_part.str(), "month")==0 ) + { + result->set_value( (int64_t)new_ptime.date().month() ); + } + else if( strcmp(val_date_part.str(), "day")==0 ) + { + result->set_value( (int64_t)new_ptime.date().day_of_year() ); + } + else if( strcmp(val_date_part.str(), "week")==0 ) + { + result->set_value( (int64_t)new_ptime.date().week_number() ); + } + else + { + throw base_s3select_exception(std::string( val_date_part.str() + std::string(" is not supported ") ).c_str() ); + } + + return true; + } + +}; + +struct _fn_diff_timestamp : public base_function +{ + + value val_date_part; + value val_dt1; + value val_dt2; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 3) + { + throw base_s3select_exception("datediff need 3 parameters"); + } + + base_statement* date_part = *iter; + + val_date_part = date_part->eval(); + + iter++; + base_statement* dt1_param = *iter; + val_dt1 = dt1_param->eval(); + if (val_dt1.is_timestamp() == false) + { + throw base_s3select_exception("second parameter should be timestamp"); + } + + iter++; + base_statement* dt2_param = *iter; + val_dt2 = dt2_param->eval(); + if (val_dt2.is_timestamp() == false) + { + throw base_s3select_exception("third parameter should be timestamp"); + } + + if (strcmp(val_date_part.str(), "year") == 0) + { + int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ; + result->set_value( yr ); + } + else if (strcmp(val_date_part.str(), "month") == 0) + { + int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ; + int64_t mon = val_dt2.timestamp()->date().month() - val_dt1.timestamp()->date().month() ; + + result->set_value( yr*12 + mon ); + } + else if (strcmp(val_date_part.str(), "day") == 0) + { + boost::gregorian::date_period dp = + boost::gregorian::date_period( val_dt1.timestamp()->date(), val_dt2.timestamp()->date()); + result->set_value( dp.length().days() ); + } + else if (strcmp(val_date_part.str(), "hours") == 0) + { + boost::posix_time::time_duration td_res = (*val_dt2.timestamp() - *val_dt1.timestamp()); + result->set_value( td_res.hours()); + } + else + { + throw base_s3select_exception("first parameter should be string: year,month,hours,day"); + } + + + return true; + } +}; + +struct _fn_add_to_timestamp : public base_function +{ + + boost::posix_time::ptime new_ptime; + + value val_date_part; + value val_quantity; + value val_timestamp; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + if (args_size < 3) + { + throw base_s3select_exception("add_to_timestamp should have 3 parameters"); + } + + base_statement* date_part = *iter; + val_date_part = date_part->eval();//TODO could be done once? + + if(val_date_part.is_string()== false) + { + throw base_s3select_exception("first parameter should be string"); + } + + iter++; + base_statement* quan = *iter; + val_quantity = quan->eval(); + + if (val_quantity.is_number() == false) + { + throw base_s3select_exception("second parameter should be number"); //TODO what about double? + } + + iter++; + base_statement* ts = *iter; + val_timestamp = ts->eval(); + + if(val_timestamp.is_timestamp() == false) + { + throw base_s3select_exception("third parameter should be time-stamp"); + } + + new_ptime = *val_timestamp.timestamp(); + + if( strcmp(val_date_part.str(), "year")==0 ) + { + new_ptime += boost::gregorian::years( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else if( strcmp(val_date_part.str(), "month")==0 ) + { + new_ptime += boost::gregorian::months( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else if( strcmp(val_date_part.str(), "day")==0 ) + { + new_ptime += boost::gregorian::days( val_quantity.i64() ); + result->set_value( &new_ptime ); + } + else + { + throw base_s3select_exception( std::string(val_date_part.str() + std::string(" is not supported for add")).c_str()); + } + + return true; + } + +}; + +struct _fn_utcnow : public base_function +{ + + boost::posix_time::ptime now_ptime; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + int args_size = args->size(); + + if (args_size != 0) + { + throw base_s3select_exception("utcnow does not expect any parameters"); + } + + now_ptime = boost::posix_time::ptime( boost::posix_time::second_clock::universal_time()); + result->set_value( &now_ptime ); + + return true; + } +}; + +struct _fn_substr : public base_function +{ + + char buff[4096];// this buffer is persist for the query life time, it use for the results per row(only for the specific function call) + //it prevent from intensive use of malloc/free (fragmentation). + //should validate result length. + //TODO may replace by std::string (dynamic) , or to replace with global allocator , in query scope. + value v_str; + value v_from; + value v_to; + + bool operator()(std::vector<base_statement*>* args, variable* result) + { + std::vector<base_statement*>::iterator iter = args->begin(); + int args_size = args->size(); + + + if (args_size<2) + { + throw base_s3select_exception("substr accept 2 arguments or 3"); + } + + base_statement* str = *iter; + iter++; + base_statement* from = *iter; + base_statement* to; + + if (args_size == 3) + { + iter++; + to = *iter; + } + + v_str = str->eval(); + + if(v_str.type != value::value_En_t::STRING) + { + throw base_s3select_exception("substr first argument must be string"); //can skip current row + } + + int str_length = strlen(v_str.str()); + + v_from = from->eval(); + if(v_from.is_string()) + { + throw base_s3select_exception("substr second argument must be number"); //can skip current row + } + + int64_t f; + int64_t t; + + if (args_size==3) + { + v_to = to->eval(); + if (v_to.is_string()) + { + throw base_s3select_exception("substr third argument must be number"); //can skip row + } + } + + if (v_from.type == value::value_En_t::FLOAT) + { + f=v_from.dbl(); + } + else + { + f=v_from.i64(); + } + + if (f>str_length) + { + throw base_s3select_exception("substr start position is too far"); //can skip row + } + + if (str_length>(int)sizeof(buff)) + { + throw base_s3select_exception("string too long for internal buffer"); //can skip row + } + + if (args_size == 3) + { + if (v_from.type == value::value_En_t::FLOAT) + { + t = v_to.dbl(); + } + else + { + t = v_to.i64(); + } + + if( (str_length-(f-1)-t) <0) + { + throw base_s3select_exception("substr length parameter beyond bounderies"); //can skip row + } + + strncpy(buff, v_str.str()+f-1, t); + } + else + { + strcpy(buff, v_str.str()+f-1); + } + + result->set_value(buff); + + return true; + } + +}; + +base_function* s3select_functions::create(std::string fn_name) +{ + const FunctionLibrary::const_iterator iter = m_functions_library.find(fn_name); + + if (iter == m_functions_library.end()) + { + std::string msg; + msg = fn_name + " " + " function not found"; + throw base_s3select_exception(msg, base_s3select_exception::s3select_exp_en_t::FATAL); + } + + switch (iter->second) + { + case s3select_func_En_t::ADD: + return S3SELECT_NEW(_fn_add); + break; + + case s3select_func_En_t::SUM: + return S3SELECT_NEW(_fn_sum); + break; + + case s3select_func_En_t::COUNT: + return S3SELECT_NEW(_fn_count); + break; + + case s3select_func_En_t::MIN: + return S3SELECT_NEW(_fn_min); + break; + + case s3select_func_En_t::MAX: + return S3SELECT_NEW(_fn_max); + break; + + case s3select_func_En_t::TO_INT: + return S3SELECT_NEW(_fn_to_int); + break; + + case s3select_func_En_t::TO_FLOAT: + return S3SELECT_NEW(_fn_to_float); + break; + + case s3select_func_En_t::SUBSTR: + return S3SELECT_NEW(_fn_substr); + break; + + case s3select_func_En_t::TO_TIMESTAMP: + return S3SELECT_NEW(_fn_to_timestamp); + break; + + case s3select_func_En_t::EXTRACT: + return S3SELECT_NEW(_fn_extact_from_timestamp); + break; + + case s3select_func_En_t::DATE_ADD: + return S3SELECT_NEW(_fn_add_to_timestamp); + break; + + case s3select_func_En_t::DATE_DIFF: + return S3SELECT_NEW(_fn_diff_timestamp); + break; + + case s3select_func_En_t::UTCNOW: + return S3SELECT_NEW(_fn_utcnow); + break; + + default: + throw base_s3select_exception("internal error while resolving function-name"); + break; + } +} + +bool base_statement::is_function() +{ + if (dynamic_cast<__function*>(this)) + { + return true; + } + else + { + return false; + } +} + +bool base_statement::is_aggregate_exist_in_expression(base_statement* e) //TODO obsolete ? +{ + if (e->is_aggregate()) + { + return true; + } + + if (e->left() && e->left()->is_aggregate_exist_in_expression(e->left())) + { + return true; + } + + if (e->right() && e->right()->is_aggregate_exist_in_expression(e->right())) + { + return true; + } + + if (e->is_function()) + { + for (auto i : dynamic_cast<__function*>(e)->get_arguments()) + if (e->is_aggregate_exist_in_expression(i)) + { + return true; + } + } + + return false; +} + +base_statement* base_statement::get_aggregate() +{ + //search for aggregation function in AST + base_statement* res = 0; + + if (is_aggregate()) + { + return this; + } + + if (left() && (res=left()->get_aggregate())!=0) + { + return res; + } + + if (right() && (res=right()->get_aggregate())!=0) + { + return res; + } + + if (is_function()) + { + for (auto i : dynamic_cast<__function*>(this)->get_arguments()) + { + base_statement* b=i->get_aggregate(); + if (b) + { + return b; + } + } + } + return 0; +} + +bool base_statement::is_nested_aggregate(base_statement* e) +{ + //validate for non nested calls for aggregation function, i.e. sum ( min ( )) + if (e->is_aggregate()) + { + if (e->left()) + { + if (e->left()->is_aggregate_exist_in_expression(e->left())) + { + return true; + } + } + else if (e->right()) + { + if (e->right()->is_aggregate_exist_in_expression(e->right())) + { + return true; + } + } + else if (e->is_function()) + { + for (auto i : dynamic_cast<__function*>(e)->get_arguments()) + { + if (i->is_aggregate_exist_in_expression(i)) + { + return true; + } + } + } + return false; + } + return false; +} + +// select sum(c2) ... + c1 ... is not allowed. a binary operation with scalar is OK. i.e. select sum() + 1 +bool base_statement::is_binop_aggregate_and_column(base_statement* skip_expression) +{ + if (left() && left() != skip_expression) //can traverse to left + { + if (left()->is_column()) + { + return true; + } + else if (left()->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + + if (right() && right() != skip_expression) //can traverse right + { + if (right()->is_column()) + { + return true; + } + else if (right()->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + + if (this != skip_expression && is_function()) + { + + __function* f = (dynamic_cast<__function*>(this)); + std::vector<base_statement*> l = f->get_arguments(); + for (auto i : l) + { + if (i!=skip_expression && i->is_column()) + { + return true; + } + if (i->is_binop_aggregate_and_column(skip_expression) == true) + { + return true; + } + } + } + + return false; +} + +} //namespace s3selectEngine + +#endif diff --git a/src/s3select/include/s3select_oper.h b/src/s3select/include/s3select_oper.h new file mode 100644 index 000000000..591e5f9da --- /dev/null +++ b/src/s3select/include/s3select_oper.h @@ -0,0 +1,1320 @@ +#ifndef __S3SELECT_OPER__ +#define __S3SELECT_OPER__ + +#include <string> +#include <iostream> +#include <list> +#include <map> +#include <vector> +#include <string.h> +#include <math.h> + +#include <boost/lexical_cast.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/bind.hpp> +namespace bsc = BOOST_SPIRIT_CLASSIC_NS; + +namespace s3selectEngine +{ + +class base_s3select_exception +{ + +public: + enum class s3select_exp_en_t + { + NONE, + ERROR, + FATAL + } ; + +private: + s3select_exp_en_t m_severity; + +public: + std::string _msg; + base_s3select_exception(const char* n) : m_severity(s3select_exp_en_t::NONE) + { + _msg.assign(n); + } + base_s3select_exception(const char* n, s3select_exp_en_t severity) : m_severity(severity) + { + _msg.assign(n); + } + base_s3select_exception(std::string n, s3select_exp_en_t severity) : m_severity(severity) + { + _msg = n; + } + + virtual const char* what() + { + return _msg.c_str(); + } + + s3select_exp_en_t severity() + { + return m_severity; + } + + virtual ~base_s3select_exception() {} +}; + + +// pointer to dynamic allocated buffer , which used for placement new. +static __thread char* _s3select_buff_ptr =0; + +class s3select_allocator //s3select is the "owner" +{ +private: + + std::vector<char*> list_of_buff; + u_int32_t m_idx; + +public: +#define __S3_ALLOCATION_BUFF__ (8*1024) + s3select_allocator():m_idx(0) + { + list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__)); + } + + void set_global_buff() + { + char* buff = list_of_buff.back(); + _s3select_buff_ptr = &buff[ m_idx ]; + } + + void check_capacity(size_t sz) + { + if (sz>__S3_ALLOCATION_BUFF__) + { + throw base_s3select_exception("requested size too big", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + if ((m_idx + sz) >= __S3_ALLOCATION_BUFF__) + { + list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__)); + m_idx = 0; + } + } + + void inc(size_t sz) + { + m_idx += sz; + m_idx += sizeof(char*) - (m_idx % sizeof(char*)); //alignment + } + + void zero() + { + //not a must, its for safty. + _s3select_buff_ptr=0; + } + + virtual ~s3select_allocator() + { + for(auto b : list_of_buff) + { + free(b); + } + } +}; + +class __clt_allocator +{ +public: + s3select_allocator* m_s3select_allocator; + +public: + + __clt_allocator():m_s3select_allocator(0) {} + + void set(s3select_allocator* a) + { + m_s3select_allocator = a; + } +}; + +// placement new for allocation of all s3select objects on single(or few) buffers, deallocation of those objects is by releasing the buffer. +#define S3SELECT_NEW( type , ... ) [=]() \ + { \ + m_s3select_allocator->check_capacity(sizeof( type )); \ + m_s3select_allocator->set_global_buff(); \ + auto res=new (_s3select_buff_ptr) type(__VA_ARGS__); \ + m_s3select_allocator->inc(sizeof( type )); \ + m_s3select_allocator->zero(); \ + return res; \ + }(); + +class scratch_area +{ + +private: + std::vector<std::string_view> m_columns{128}; + int m_upper_bound; + + std::vector<std::pair<std::string, int >> m_column_name_pos; + +public: + + void set_column_pos(const char* n, int pos)//TODO use std::string + { + m_column_name_pos.push_back( std::pair<const char*, int>(n, pos)); + } + + void update(std::vector<char*> tokens, size_t num_of_tokens) + { + size_t i=0; + for(auto s : tokens) + { + if (i>=num_of_tokens) + { + break; + } + + m_columns[i++] = s; + } + m_upper_bound = i; + + } + + int get_column_pos(const char* n) + { + //done only upon building the AST , not on "runtime" + + std::vector<std::pair<std::string, int >>::iterator iter; + + for( auto iter : m_column_name_pos) + { + if (!strcmp(iter.first.c_str(), n)) + { + return iter.second; + } + } + + return -1; + } + + std::string_view get_column_value(int column_pos) + { + + if ((column_pos >= m_upper_bound) || column_pos < 0) + { + throw base_s3select_exception("column_position_is_wrong", base_s3select_exception::s3select_exp_en_t::ERROR); + } + + return m_columns[column_pos]; + } + + int get_num_of_columns() + { + return m_upper_bound; + } +}; + +class base_statement; +class projection_alias +{ +//purpose: mapping between alias-name to base_statement* +//those routines are *NOT* intensive, works once per query parse time. + +private: + std::vector< std::pair<std::string, base_statement*> > alias_map; + +public: + std::vector< std::pair<std::string, base_statement*> >* get() + { + return &alias_map; + } + + bool insert_new_entry(std::string alias_name, base_statement* bs) + { + //purpose: only unique alias names. + + for(auto alias: alias_map) + { + if(alias.first.compare(alias_name) == 0) + { + return false; //alias name already exist + } + + } + std::pair<std::string, base_statement*> new_alias(alias_name, bs); + alias_map.push_back(new_alias); + + return true; + } + + base_statement* search_alias(std::string alias_name) + { + for(auto alias: alias_map) + { + if(alias.first.compare(alias_name) == 0) + { + return alias.second; //refernce to execution node + } + } + return 0; + } +}; + +struct binop_plus +{ + double operator()(double a, double b) + { + return a+b; + } +}; + +struct binop_minus +{ + double operator()(double a, double b) + { + return a-b; + } +}; + +struct binop_mult +{ + double operator()(double a, double b) + { + return a * b; + } +}; + +struct binop_div +{ + double operator()(double a, double b) + { + return a / b; + } +}; + +struct binop_pow +{ + double operator()(double a, double b) + { + return pow(a, b); + } +}; + +class value +{ + +public: + typedef union + { + int64_t num; + char* str;//TODO consider string_view + double dbl; + boost::posix_time::ptime* timestamp; + } value_t; + +private: + value_t __val; + std::string m_to_string; + std::string m_str_value; + +public: + enum class value_En_t + { + DECIMAL, + FLOAT, + STRING, + TIMESTAMP, + NA + } ; + value_En_t type; + + value(int64_t n) : type(value_En_t::DECIMAL) + { + __val.num = n; + } + value(int n) : type(value_En_t::DECIMAL) + { + __val.num = n; + } + value(bool b) : type(value_En_t::DECIMAL) + { + __val.num = (int64_t)b; + } + value(double d) : type(value_En_t::FLOAT) + { + __val.dbl = d; + } + value(boost::posix_time::ptime* timestamp) : type(value_En_t::TIMESTAMP) + { + __val.timestamp = timestamp; + } + + value(const char* s) : type(value_En_t::STRING) + { + m_str_value.assign(s); + __val.str = m_str_value.data(); + } + + value():type(value_En_t::NA) + { + __val.num=0; + } + + bool is_number() const + { + if ((type == value_En_t::DECIMAL || type == value_En_t::FLOAT)) + { + return true; + } + + return false; + } + + bool is_string() const + { + return type == value_En_t::STRING; + } + bool is_timestamp() const + { + return type == value_En_t::TIMESTAMP; + } + + + std::string& to_string() //TODO very intensive , must improve this + { + + if (type != value_En_t::STRING) + { + if (type == value_En_t::DECIMAL) + { + m_to_string.assign( boost::lexical_cast<std::string>(__val.num) ); + } + else if(type == value_En_t::FLOAT) + { + m_to_string = boost::lexical_cast<std::string>(__val.dbl); + } + else + { + m_to_string = to_simple_string( *__val.timestamp ); + } + } + else + { + m_to_string.assign( __val.str ); + } + + return m_to_string; + } + + + value& operator=(value& o) + { + if(this->type == value_En_t::STRING) + { + m_str_value.assign(o.str()); + __val.str = m_str_value.data(); + } + else + { + this->__val = o.__val; + } + + this->type = o.type; + + return *this; + } + + value& operator=(const char* s) + { + m_str_value.assign(s); + this->__val.str = m_str_value.data(); + this->type = value_En_t::STRING; + + return *this; + } + + value& operator=(int64_t i) + { + this->__val.num = i; + this->type = value_En_t::DECIMAL; + + return *this; + } + + value& operator=(double d) + { + this->__val.dbl = d; + this->type = value_En_t::FLOAT; + + return *this; + } + + value& operator=(bool b) + { + this->__val.num = (int64_t)b; + this->type = value_En_t::DECIMAL; + + return *this; + } + + value& operator=(boost::posix_time::ptime* p) + { + this->__val.timestamp = p; + this->type = value_En_t::TIMESTAMP; + + return *this; + } + + int64_t i64() + { + return __val.num; + } + + const char* str() + { + return __val.str; + } + + double dbl() + { + return __val.dbl; + } + + boost::posix_time::ptime* timestamp() const + { + return __val.timestamp; + } + + bool operator<(const value& v)//basic compare operator , most itensive runtime operation + { + //TODO NA possible? + if (is_string() && v.is_string()) + { + return strcmp(__val.str, v.__val.str) < 0; + } + + if (is_number() && v.is_number()) + { + + if(type != v.type) //conversion //TODO find better way + { + if (type == value_En_t::DECIMAL) + { + return (double)__val.num < v.__val.dbl; + } + else + { + return __val.dbl < (double)v.__val.num; + } + } + else //no conversion + { + if(type == value_En_t::DECIMAL) + { + return __val.num < v.__val.num; + } + else + { + return __val.dbl < v.__val.dbl; + } + + } + } + + if(is_timestamp() && v.is_timestamp()) + { + return *timestamp() < *(v.timestamp()); + } + + throw base_s3select_exception("operands not of the same type(numeric , string), while comparision"); + } + + bool operator>(const value& v) //basic compare operator , most itensive runtime operation + { + //TODO NA possible? + if (is_string() && v.is_string()) + { + return strcmp(__val.str, v.__val.str) > 0; + } + + if (is_number() && v.is_number()) + { + + if(type != v.type) //conversion //TODO find better way + { + if (type == value_En_t::DECIMAL) + { + return (double)__val.num > v.__val.dbl; + } + else + { + return __val.dbl > (double)v.__val.num; + } + } + else //no conversion + { + if(type == value_En_t::DECIMAL) + { + return __val.num > v.__val.num; + } + else + { + return __val.dbl > v.__val.dbl; + } + + } + } + + if(is_timestamp() && v.is_timestamp()) + { + return *timestamp() > *(v.timestamp()); + } + + throw base_s3select_exception("operands not of the same type(numeric , string), while comparision"); + } + + bool operator==(const value& v) //basic compare operator , most itensive runtime operation + { + //TODO NA possible? + if (is_string() && v.is_string()) + { + return strcmp(__val.str, v.__val.str) == 0; + } + + + if (is_number() && v.is_number()) + { + + if(type != v.type) //conversion //TODO find better way + { + if (type == value_En_t::DECIMAL) + { + return (double)__val.num == v.__val.dbl; + } + else + { + return __val.dbl == (double)v.__val.num; + } + } + else //no conversion + { + if(type == value_En_t::DECIMAL) + { + return __val.num == v.__val.num; + } + else + { + return __val.dbl == v.__val.dbl; + } + + } + } + + if(is_timestamp() && v.is_timestamp()) + { + return *timestamp() == *(v.timestamp()); + } + + throw base_s3select_exception("operands not of the same type(numeric , string), while comparision"); + } + bool operator<=(const value& v) + { + return !(*this>v); + } + bool operator>=(const value& v) + { + return !(*this<v); + } + bool operator!=(const value& v) + { + return !(*this == v); + } + + template<typename binop> //conversion rules for arithmetical binary operations + value& compute(value& l, const value& r) //left should be this, it contain the result + { + binop __op; + + if (l.is_string() || r.is_string()) + { + throw base_s3select_exception("illegal binary operation with string"); + } + + if (l.type != r.type) + { + //conversion + + if (l.type == value_En_t::DECIMAL) + { + l.__val.dbl = __op((double)l.__val.num, r.__val.dbl); + l.type = value_En_t::FLOAT; + } + else + { + l.__val.dbl = __op(l.__val.dbl, (double)r.__val.num); + l.type = value_En_t::FLOAT; + } + } + else + { + //no conversion + + if (l.type == value_En_t::DECIMAL) + { + l.__val.num = __op(l.__val.num, r.__val.num ); + l.type = value_En_t::DECIMAL; + } + else + { + l.__val.dbl = __op(l.__val.dbl, r.__val.dbl ); + l.type = value_En_t::FLOAT; + } + } + + return l; + } + + value& operator+(const value& v) + { + return compute<binop_plus>(*this, v); + } + + value& operator-(const value& v) + { + return compute<binop_minus>(*this, v); + } + + value& operator*(const value& v) + { + return compute<binop_mult>(*this, v); + } + + value& operator/(const value& v) // TODO handle division by zero + { + return compute<binop_div>(*this, v); + } + + value& operator^(const value& v) + { + return compute<binop_pow>(*this, v); + } + +}; + +class base_statement +{ + +protected: + + scratch_area* m_scratch; + projection_alias* m_aliases; + bool is_last_call; //valid only for aggregation functions + bool m_is_cache_result; + value m_alias_result; + base_statement* m_projection_alias; + int m_eval_stack_depth; + +public: + base_statement():m_scratch(0), is_last_call(false), m_is_cache_result(false), m_projection_alias(0), m_eval_stack_depth(0) {} + virtual value& eval() =0; + virtual base_statement* left() + { + return 0; + } + virtual base_statement* right() + { + return 0; + } + virtual std::string print(int ident) =0;//TODO complete it, one option to use level parametr in interface , + virtual bool semantic() =0;//done once , post syntax , traverse all nodes and validate semantics. + + virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa) + { + m_scratch = sa; + m_aliases = pa; + if (left()) + { + left()->traverse_and_apply(m_scratch, m_aliases); + } + if (right()) + { + right()->traverse_and_apply(m_scratch, m_aliases); + } + } + + virtual bool is_aggregate() + { + return false; + } + virtual bool is_column() + { + return false; + } + + bool is_function(); + bool is_aggregate_exist_in_expression(base_statement* e);//TODO obsolete ? + base_statement* get_aggregate(); + bool is_nested_aggregate(base_statement* e); + bool is_binop_aggregate_and_column(base_statement* skip); + + virtual void set_last_call() + { + is_last_call = true; + if(left()) + { + left()->set_last_call(); + } + if(right()) + { + right()->set_last_call(); + } + } + + bool is_set_last_call() + { + return is_last_call; + } + + void invalidate_cache_result() + { + m_is_cache_result = false; + } + + bool is_result_cached() + { + return m_is_cache_result == true; + } + + void set_result_cache(value& eval_result) + { + m_alias_result = eval_result; + m_is_cache_result = true; + } + + void dec_call_stack_depth() + { + m_eval_stack_depth --; + } + + value& get_result_cache() + { + return m_alias_result; + } + + int& get_eval_call_depth() + { + m_eval_stack_depth++; + return m_eval_stack_depth; + } + + virtual ~base_statement() {} + +}; + +class variable : public base_statement +{ + +public: + + enum class var_t + { + NA, + VAR,//schema column (i.e. age , price , ...) + COL_VALUE, //concrete value + POS, // CSV column number (i.e. _1 , _2 ... ) + STAR_OPERATION, //'*' + } ; + var_t m_var_type; + +private: + + std::string _name; + int column_pos; + value var_value; + std::string m_star_op_result; + char m_star_op_result_charc[4096]; //TODO should be dynamic + + const int undefined_column_pos = -1; + const int column_alias = -2; + +public: + variable():m_var_type(var_t::NA), _name(""), column_pos(-1) {} + + variable(int64_t i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {} + + variable(double d) : m_var_type(var_t::COL_VALUE), _name("#"), column_pos(-1), var_value(d) {} + + variable(int i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {} + + variable(const std::string& n) : m_var_type(var_t::VAR), _name(n), column_pos(-1) {} + + variable(const std::string& n, var_t tp) : m_var_type(var_t::NA) + { + if(tp == variable::var_t::POS) + { + _name = n; + m_var_type = tp; + int pos = atoi( n.c_str() + 1 ); //TODO >0 < (schema definition , semantic analysis) + column_pos = pos -1;// _1 is the first column ( zero position ) + } + else if (tp == variable::var_t::COL_VALUE) + { + _name = "#"; + m_var_type = tp; + column_pos = -1; + var_value = n.c_str(); + } + else if (tp ==variable::var_t::STAR_OPERATION) + { + _name = "#"; + m_var_type = tp; + column_pos = -1; + } + } + + void operator=(value& v) + { + var_value = v; + } + + void set_value(const char* s) + { + var_value = s; + } + + void set_value(double d) + { + var_value = d; + } + + void set_value(int64_t i) + { + var_value = i; + } + + void set_value(boost::posix_time::ptime* p) + { + var_value = p; + } + + virtual ~variable() {} + + virtual bool is_column() //is reference to column. + { + if(m_var_type == var_t::VAR || m_var_type == var_t::POS) + { + return true; + } + return false; + } + + value& get_value() + { + return var_value; //TODO is it correct + } + virtual value::value_En_t get_value_type() + { + return var_value.type; + } + + + value& star_operation() //purpose return content of all columns in a input stream + { + + + int i; + size_t pos=0; + int num_of_columns = m_scratch->get_num_of_columns(); + for(i=0; i<num_of_columns-1; i++) + { + size_t len = m_scratch->get_column_value(i).size(); + if((pos+len)>sizeof(m_star_op_result_charc)) + { + throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len); + pos += len; + m_star_op_result_charc[ pos ] = ',';//TODO need for another abstraction (per file type) + pos ++; + + } + + size_t len = m_scratch->get_column_value(i).size(); + if((pos+len)>sizeof(m_star_op_result_charc)) + { + throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len); + m_star_op_result_charc[ pos + len ] = 0; + var_value = (char*)&m_star_op_result_charc[0]; + return var_value; + } + + virtual value& eval() + { + if (m_var_type == var_t::COL_VALUE) + { + return var_value; // a literal,could be deciml / float / string + } + else if(m_var_type == var_t::STAR_OPERATION) + { + return star_operation(); + } + else if (column_pos == undefined_column_pos) + { + //done once , for the first time + column_pos = m_scratch->get_column_pos(_name.c_str()); + + if(column_pos>=0 && m_aliases->search_alias(_name.c_str())) + { + throw base_s3select_exception(std::string("multiple definition of column {") + _name + "} as schema-column and alias", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + + if (column_pos == undefined_column_pos) + { + //not belong to schema , should exist in aliases + m_projection_alias = m_aliases->search_alias(_name.c_str()); + + //not enter this scope again + column_pos = column_alias; + if(m_projection_alias == 0) + { + throw base_s3select_exception(std::string("alias {")+_name+std::string("} or column not exist in schema"), base_s3select_exception::s3select_exp_en_t::FATAL); + } + } + + } + + if (m_projection_alias) + { + if (m_projection_alias->get_eval_call_depth()>2) + { + throw base_s3select_exception("number of calls exceed maximum size, probably a cyclic reference to alias", base_s3select_exception::s3select_exp_en_t::FATAL); + } + + if (m_projection_alias->is_result_cached() == false) + { + var_value = m_projection_alias->eval(); + m_projection_alias->set_result_cache(var_value); + } + else + { + var_value = m_projection_alias->get_result_cache(); + } + + m_projection_alias->dec_call_stack_depth(); + } + else + { + var_value = (char*)m_scratch->get_column_value(column_pos).data(); //no allocation. returning pointer of allocated space + } + + return var_value; + } + + virtual std::string print(int ident) + { + //std::string out = std::string(ident,' ') + std::string("var:") + std::to_string(var_value.__val.num); + //return out; + return std::string("#");//TBD + } + + virtual bool semantic() + { + return false; + } + +}; + +class arithmetic_operand : public base_statement +{ + +public: + + enum class cmp_t {NA, EQ, LE, LT, GT, GE, NE} ; + +private: + base_statement* l; + base_statement* r; + + cmp_t _cmp; + value var_value; + +public: + + virtual bool semantic() + { + return true; + } + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual std::string print(int ident) + { + //std::string out = std::string(ident,' ') + "compare:" += std::to_string(_cmp) + "\n" + l->print(ident-5) +r->print(ident+5); + //return out; + return std::string("#");//TBD + } + + virtual value& eval() + { + + switch (_cmp) + { + case cmp_t::EQ: + return var_value = (l->eval() == r->eval()); + break; + + case cmp_t::LE: + return var_value = (l->eval() <= r->eval()); + break; + + case cmp_t::GE: + return var_value = (l->eval() >= r->eval()); + break; + + case cmp_t::NE: + return var_value = (l->eval() != r->eval()); + break; + + case cmp_t::GT: + return var_value = (l->eval() > r->eval()); + break; + + case cmp_t::LT: + return var_value = (l->eval() < r->eval()); + break; + + default: + throw base_s3select_exception("internal error"); + break; + } + } + + arithmetic_operand(base_statement* _l, cmp_t c, base_statement* _r):l(_l), r(_r), _cmp(c) {} + + virtual ~arithmetic_operand() {} +}; + +class logical_operand : public base_statement +{ + +public: + + enum class oplog_t {AND, OR, NA}; + +private: + base_statement* l; + base_statement* r; + + oplog_t _oplog; + value var_value; + +public: + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual bool semantic() + { + return true; + } + + logical_operand(base_statement* _l, oplog_t _o, base_statement* _r):l(_l), r(_r), _oplog(_o) {} + + virtual ~logical_operand() {} + + virtual std::string print(int ident) + { + //std::string out = std::string(ident, ' ') + "logical_operand:" += std::to_string(_oplog) + "\n" + l->print(ident - 5) + r->print(ident + 5); + //return out; + return std::string("#");//TBD + } + virtual value& eval() + { + if (_oplog == oplog_t::AND) + { + if (!l || !r) + { + throw base_s3select_exception("missing operand for logical and", base_s3select_exception::s3select_exp_en_t::FATAL); + } + return var_value = (l->eval().i64() && r->eval().i64()); + } + else + { + if (!l || !r) + { + throw base_s3select_exception("missing operand for logical or", base_s3select_exception::s3select_exp_en_t::FATAL); + } + return var_value = (l->eval().i64() || r->eval().i64()); + } + } + +}; + +class mulldiv_operation : public base_statement +{ + +public: + + enum class muldiv_t {NA, MULL, DIV, POW} ; + +private: + base_statement* l; + base_statement* r; + + muldiv_t _mulldiv; + value var_value; + +public: + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual bool semantic() + { + return true; + } + + virtual std::string print(int ident) + { + //std::string out = std::string(ident, ' ') + "mulldiv_operation:" += std::to_string(_mulldiv) + "\n" + l->print(ident - 5) + r->print(ident + 5); + //return out; + return std::string("#");//TBD + } + + virtual value& eval() + { + switch (_mulldiv) + { + case muldiv_t::MULL: + return var_value = l->eval() * r->eval(); + break; + + case muldiv_t::DIV: + return var_value = l->eval() / r->eval(); + break; + + case muldiv_t::POW: + return var_value = l->eval() ^ r->eval(); + break; + + default: + throw base_s3select_exception("internal error"); + break; + } + } + + mulldiv_operation(base_statement* _l, muldiv_t c, base_statement* _r):l(_l), r(_r), _mulldiv(c) {} + + virtual ~mulldiv_operation() {} +}; + +class addsub_operation : public base_statement +{ + +public: + + enum class addsub_op_t {ADD, SUB, NA}; + +private: + base_statement* l; + base_statement* r; + + addsub_op_t _op; + value var_value; + +public: + + virtual base_statement* left() + { + return l; + } + virtual base_statement* right() + { + return r; + } + + virtual bool semantic() + { + return true; + } + + addsub_operation(base_statement* _l, addsub_op_t _o, base_statement* _r):l(_l), r(_r), _op(_o) {} + + virtual ~addsub_operation() {} + + virtual std::string print(int ident) + { + //std::string out = std::string(ident, ' ') + "addsub_operation:" += std::to_string(_op) + "\n" + l->print(ident - 5) + r->print(ident + 5); + return std::string("#");//TBD + } + + virtual value& eval() + { + if (_op == addsub_op_t::NA) // -num , +num , unary-operation on number + { + if (l) + { + return var_value = l->eval(); + } + else if (r) + { + return var_value = r->eval(); + } + } + else if (_op == addsub_op_t::ADD) + { + return var_value = (l->eval() + r->eval()); + } + else + { + return var_value = (l->eval() - r->eval()); + } + + return var_value; + } +}; + +class base_function +{ + +protected: + bool aggregate; + +public: + //TODO add semantic to base-function , it operate once on function creation + // validate semantic on creation instead on run-time + virtual bool operator()(std::vector<base_statement*>* args, variable* result) = 0; + base_function() : aggregate(false) {} + bool is_aggregate() + { + return aggregate == true; + } + virtual void get_aggregate_result(variable*) {} + + virtual ~base_function() {} +}; + + +};//namespace + +#endif diff --git a/src/s3select/s3select-parse-s.png b/src/s3select/s3select-parse-s.png Binary files differnew file mode 100644 index 000000000..047f87ed6 --- /dev/null +++ b/src/s3select/s3select-parse-s.png diff --git a/src/s3select/s3select.rst b/src/s3select/s3select.rst new file mode 100644 index 000000000..fbe4dfe93 --- /dev/null +++ b/src/s3select/s3select.rst @@ -0,0 +1,259 @@ +=============== + Ceph s3 select +=============== + +.. contents:: + +Overview +-------- + + | The purpose of **s3 select** engine is to create an efficient pipe between user client to storage node (the engine should be close as possible to storage). + | It enables the user to define the exact portion of data should be received by his side. + | It also enables for higher level analytic-applications (such as SPARK-SQL) , using that feature to improve their latency and throughput. + + | For example, a s3-object of several GB (CSV file), a user needs to extract a single column which filtered by another column. + | As the following query: + | ``select customer-id from s3Object where age>30 and age<65;`` + + | Currently the whole s3-object must retrieve from OSD via RGW before filtering and extracting data. + | By "pushing down" the query into OSD , it's possible to save a lot of network and CPU(serialization / deserialization). + + | **The bigger the object, and the more accurate the query, the better the performance**. + +Basic workflow +-------------- + + | S3-select query is sent to RGW via `AWS-CLI <https://docs.aws.amazon.com/cli/latest/reference/s3api/select-object-content.html>`_ + + | It passes the authentication and permission process as an incoming message (POST). + | **RGWSelectObj_ObjStore_S3::send_response_data** is the “entry point”, it handles each fetched chunk according to input object-key. + | **send_response_data** is first handling the input query, it extracts the query and other CLI parameters. + + | Per each new fetched chunk (~4m), it runs the s3-select query on that chunk. + | The current implementation supports CSV objects and since chunks are randomly “cutting” the CSV rows in the middle, those broken-lines (first or last per chunk) are skipped while processing the query. + | Those “broken” lines are stored and later merged with the next broken-line (belong to the next chunk), and finally processed. + + | Per each processed chunk an output message is formatted according to AWS specification and sent back to the client. + | For aggregation queries the last chunk should be identified as the end of input, following that the s3-select-engine initiates end-of-process and produces an aggregate result. + +Design Concepts +--------------- + +AST- Abstract Syntax Tree +~~~~~~~~~~~~~~~~~~~~~~~~~ + | The s3-select main flow is initiated with parsing of input-string (i.e user query), and follows + | with building an AST (abstract-syntax-tree) as a result. + | The execution phase is built upon the AST. + + | ``Base_statement`` is the base for the all object-nodes participating in the execution phase, it consists of the ``eval()`` method which returns the <value> object. + + | ``value`` object is handling the known basic-types such as int,string,float,time-stamp + | It is able to operate comparison and basic arithmetic operations on mentioned types. + + | The execution-flow is actually calling the ``eval()`` method on the root-node (per each projection), it goes all the way down, and returns the actual result (``value`` object) from bottom node to root node(all the way up) . + + | **Alias** programming-construct is an essential part of s3-select language, it enables much better programming especially with objects containing many columns or in the case of complex queries. + + | Upon parsing the statement containing alias construct, it replaces alias with reference to the correct AST-node, on runtime the node is simply evaluated as any other node. + + | There is a risk that self(or cyclic) reference may occur causing stack-overflow(endless-loop), for that concern upon evaluating an alias, it is validated for cyclic reference. + + | Alias also maintains result-cache, meaning upon using the same alias more than once, it’s not evaluating the same node again(it will return the same result),instead it uses the result from cache. + + | Of Course, per each new row the cache is invalidated. + + +S3 select parser definition +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + | The implementation of s3-select uses the `boost::spirit <https://www.boost.org/doc/libs/1_71_0/libs/spirit/classic/doc/grammar.html>`_ the definition of s3-select command is according to AWS. + + | Upon parsing is initiated on input text, and a specific rule is identified, an action which is bound to that rule is executed. + | Those actions are building the AST, each action is unique (as its rule), at the end of the process it forms a structure similar to a tree. + + | As mentioned, running eval() on the root node, execute the s3-select statement (per projection). + | The input stream is accessible to the execution tree, by the scratch-area object, that object is constantly updated per each new row. + +Basic functionalities +~~~~~~~~~~~~~~~~~~~~~ + + | **S3select** has a definite set of functionalities that should be implemented (if we wish to stay compliant with AWS), currently only a portion of it is implemented. + + | The implemented software architecture supports basic arithmetic expressions, logical and compare expressions, including nested function calls and casting operators, that alone enables the user reasonable flexibility. + | review the bellow feature-table_. + + + +Memory handling +~~~~~~~~~~~~~~~ + + | S3select structures and objects are lockless and thread-safe, it uses placement-new in order to reduce the alloc/dealloc intensive cycles, which may impact the main process hosting s3-select. + + | Once AST is built there is no need to allocate memory for the execution itself, the AST is “static” for the query-execution life-cycle. + + | The execution itself is stream-oriented, meaning there is no pre-allocation before execution, object size has no impact on memory consumption. + + | It processes chunk after chunk, row after row, all memory needed for processing resides on AST. + + | The AST is similar to stack behaviour in that it consumes already allocated memory and “releases” it upon completing its task. + +S3 Object different types +~~~~~~~~~~~~~~~~~~~~~~~~~ + + | The processing of input stream is decoupled from s3-select-engine, meaning , each input-type should have its own parser, converting s3-object into columns. + + | Current implementation includes only CSV reader; its parsing definitions are according to AWS. + | The parser is implemented using `boost::state-machine <https://www.boost.org/doc/libs/1_64_0/libs/msm/doc/HTML/index.html>`_. + + | The CSV parser handles NULL,quote,escape rules,field delimiter,row delimiter and users may define (via AWS CLI) all of those dynamically. + +Error Handling +~~~~~~~~~~~~~~ + | S3-select statement may be syntactically correct but semantically wrong, for one example ``select a * b from …`` , where a is number and b is a string. + | Current implementation is for CSV file types, CSV has no schema, column-types may evaluate on runtime. + | The above means that wrong semantic statements may occur on runtime. + + | As for syntax error ``select x frm stdin;`` , the builtin parser fails on first miss-match to language definition, and produces an error message back to client (AWS-CLI). + | The error message is point on location of miss-match. + + | Fatal severity (attached to the exception) will end execution immediately, other error severity are counted, upon reaching 100, it ends execution with an error message. + + +AST denostration +~~~~~~~~~~~~~~~~ +.. ditaa:: + + +---------------------+ + | select | + +------ +---------------------+---------+ + | | | + | | | + | | | + | V | + | +--------------------+ | + | | s3object | | + | +--------------------+ | + | | + V V + +---------------------+ +-------------+ + | projections | | where | + +---------------------+ +-------------+ + | | | + | | | + | | | + | | | + | | | + | | | + V V V + +-----------+ +-----------+ +-------------+ + | multiply | | date | | and | + +-----------+ +-----------+ +-------------+ + | | | | + | | | | + | | | | + | | | | + V V V V + +-------+ +-------+ +-----+ +-----+ + |payment| | 0.3 | | EQ | | LT | + +-------+ +-------+ +--+-----+ +-----+--+ + | | | | + | | | | + V V V V + +-------+ +----+ +-----+ +-----+ + | region| |east| |age | | 30 | + +-------+ +----+ +-----+ +-----+ + +Features Support +---------------- + +.. _feature-table: + +The following table describes the support for s3-select functionalities: + ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Feature | Detailed | Example | ++=================================+=================+=======================================================================+ +| Arithmetic operators | ^ * / + - ( ) | select (int(_1)+int(_2))*int(_9) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| | | select ((1+2)*3.14) ^ 2 from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Compare operators | > < >= <= == != | select _1,_2 from stdin where (int(1)+int(_3))>int(_5); | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| logical operator | AND OR | select count(*) from stdin where int(1)>123 and int(_5)<200; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| casting operator | int(expression) | select int(_1),int( 1.2 + 3.4) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| |float(expression)| | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| | timestamp(...) | select timestamp("1999:10:10-12:23:44") from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | sum | select sum(int(_1)) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | min | select min( int(_1) * int(_5) ) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | max | select max(float(_1)),min(int(_5)) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | count | select count(*) from stdin where (int(1)+int(_3))>int(_5); | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | extract | select count(*) from stdin where | +| | | extract("year",timestamp(_2)) > 1950 | +| | | and extract("year",timestamp(_1)) < 1960; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | dateadd | select count(0) from stdin where | +| | | datediff("year",timestamp(_1),dateadd("day",366,timestamp(_1))) == 1; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | datediff | select count(0) from stdin where | +| | | datediff("month",timestamp(_1),timestamp(_2))) == 2; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | utcnow | select count(0) from stdin where | +| | | datediff("hours",utcnow(),dateadd("day",1,utcnow())) == 24 ; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| String Functions | substr | select count(0) from stdin where | +| | | int(substr(_1,1,4))>1950 and int(substr(_1,1,4))<1960; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| alias support | | select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 | +| | | from stdin where a3>100 and a3<300; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ + +Sending Query to RGW +-------------------- + +Syntax +~~~~~~ +CSV default defintion for field-delimiter,row-delimiter,quote-char,escape-char are: { , \\n " \\ } + +:: + + aws --endpoint-url http://localhost:8000 s3api select-object-content + --bucket {BUCKET-NAME} + --expression-type 'SQL' + --input-serialization + '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' + --output-serialization '{"CSV": {}}' + --key {OBJECT-NAME} + --expression "select count(0) from stdin where int(_1)<10;" output.csv + +CSV parsing behavior +-------------------- + ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Feature | Description | input ==> tokens | ++=================================+=================+=======================================================================+ +| NULL | successive | ,,1,,2, ==> {null}{null}{1}{null}{2}{null} | +| | field delimiter | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| QUOTE | quote character | 11,22,"a,b,c,d",last ==> {11}{22}{"a,b,c,d"}{last} | +| | overrides | | +| | field delimiter | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Escape | escape char | 11,22,str=\\"abcd\\"\\,str2=\\"123\\",last | +| | overrides | ==> {11}{22}{str="abcd",str2="123"}{last} | +| | meta-character. | | +| | escape removed | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| row delimiter | no close quote, | 11,22,a="str,44,55,66 | +| | row delimiter is| ==> {11}{22}{a="str,44,55,66} | +| | closing line | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| csv header info | FileHeaderInfo | "**USE**" value means each token on first line is column-name, | +| | tag | "**IGNORE**" value means to skip the first line | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ diff --git a/src/s3select/test/CMakeLists.txt b/src/s3select/test/CMakeLists.txt new file mode 100644 index 000000000..b3c47325c --- /dev/null +++ b/src/s3select/test/CMakeLists.txt @@ -0,0 +1,6 @@ +add_executable(s3select_test s3select_test.cpp) +target_include_directories(s3select_test PUBLIC ../include) +target_link_libraries(s3select_test gtest gtest_main boost_date_time) + + +gtest_discover_tests(s3select_test) diff --git a/src/s3select/test/s3select_test.cpp b/src/s3select/test/s3select_test.cpp new file mode 100644 index 000000000..5b73845e3 --- /dev/null +++ b/src/s3select/test/s3select_test.cpp @@ -0,0 +1,244 @@ +#include "s3select.h" +#include "gtest/gtest.h" +#include <string> +#include "boost/date_time/gregorian/gregorian.hpp" +#include "boost/date_time/posix_time/posix_time.hpp" + +using namespace s3selectEngine; + +std::string run_expression_in_C_prog(const char* expression) +{ +//purpose: per use-case a c-file is generated, compiles , and finally executed. + +// side note: its possible to do the following: cat test_hello.c | gcc -pipe -x c - -o /dev/stdout > ./1 +// gcc can read and write from/to pipe (use pipe2()) i.e. not using file-system , BUT should also run gcc-output from memory + + const int C_FILE_SIZE=(1024*1024); + std::string c_test_file = std::string("/tmp/test_s3.c"); + std::string c_run_file = std::string("/tmp/s3test"); + + FILE* fp_c_file = fopen(c_test_file.c_str(), "w"); + + //contain return result + char result_buff[100]; + + char* prog_c; + + if(fp_c_file) + { + prog_c = (char*)malloc(C_FILE_SIZE); + + size_t sz=sprintf(prog_c, "#include <stdio.h>\n \ + int main() \ + {\ + printf(\"%%f\\n\",(double)(%s));\ + } ", expression); + + int status = fwrite(prog_c, 1, sz, fp_c_file); + fclose(fp_c_file); + } + + std::string gcc_and_run_cmd = std::string("gcc ") + c_test_file + " -o " + c_run_file + " -Wall && " + c_run_file; + + FILE* fp_build = popen(gcc_and_run_cmd.c_str(), "r"); //TODO read stderr from pipe + + if(!fp_build) + { + return std::string("#ERROR#"); + } + + fgets(result_buff, sizeof(result_buff), fp_build); + + unlink(c_run_file.c_str()); + unlink(c_test_file.c_str()); + + return std::string(result_buff); +} + +#define OPER oper[ rand() % oper.size() ] + +class gen_expr +{ + +private: + + int open = 0; + std::string oper= {"+-+*/*"}; + + std::string gexpr() + { + return std::to_string(rand() % 1000) + ".0" + OPER + std::to_string(rand() % 1000) + ".0"; + } + + std::string g_openp() + { + if ((rand() % 3) == 0) + { + open++; + return std::string("("); + } + return std::string(""); + } + + std::string g_closep() + { + if ((rand() % 2) == 0 && open > 0) + { + open--; + return std::string(")"); + } + return std::string(""); + } + +public: + + std::string generate() + { + std::string exp = ""; + open = 0; + + for (int i = 0; i < 10; i++) + { + exp = (exp.size() > 0 ? exp + OPER : std::string("")) + g_openp() + gexpr() + OPER + gexpr() + g_closep(); + } + + if (open) + for (; open--;) + { + exp += ")"; + } + + return exp; + } +}; + +std::string run_s3select(std::string expression) +{ + s3select s3select_syntax; + + s3select_syntax.parse_query(expression.c_str()); + + std::string s3select_result; + s3selectEngine::csv_object s3_csv_object(&s3select_syntax); + std::string in = "1,1,1,1\n"; + + s3_csv_object.run_s3select_on_object(s3select_result, in.c_str(), in.size(), false, false, true); + + s3select_result = s3select_result.substr(0, s3select_result.find_first_of(",")); + + return s3select_result; +} + +TEST(TestS3SElect, s3select_vs_C) +{ +//purpose: validate correct processing of arithmetical expression, it is done by running the same expression +// in C program. +// the test validate that syntax and execution-tree (including precedence rules) are done correctly + + for(int y=0; y<10; y++) + { + gen_expr g; + std::string exp = g.generate(); + std::string c_result = run_expression_in_C_prog( exp.c_str() ); + + char* err=0; + double c_dbl_res = strtod(c_result.c_str(), &err); + + std::string input_query = "select " + exp + " from stdin;" ; + std::string s3select_res = run_s3select(input_query); + + double s3select_dbl_res = strtod(s3select_res.c_str(), &err); + + //std::cout << exp << " " << s3select_dbl_res << " " << s3select_res << " " << c_dbl_res/s3select_dbl_res << std::endl; + //std::cout << exp << std::endl; + + ASSERT_EQ(c_dbl_res, s3select_dbl_res); + } +} + +TEST(TestS3SElect, ParseQuery) +{ + //TODO syntax issues ? + //TODO error messeges ? + + s3select s3select_syntax; + + run_s3select(std::string("select (1+1) from stdin;")); + + ASSERT_EQ(0, 0); +} + +TEST(TestS3SElect, int_compare_operator) +{ + value a10(10), b11(11), c10(10); + + ASSERT_EQ( a10 < b11, true ); + ASSERT_EQ( a10 > b11, false ); + ASSERT_EQ( a10 >= c10, true ); + ASSERT_EQ( a10 <= c10, true ); + ASSERT_EQ( a10 != b11, true ); + ASSERT_EQ( a10 == b11, false ); + ASSERT_EQ( a10 == c10, true ); +} + +TEST(TestS3SElect, float_compare_operator) +{ + value a10(10.1), b11(11.2), c10(10.1); + + ASSERT_EQ( a10 < b11, true ); + ASSERT_EQ( a10 > b11, false ); + ASSERT_EQ( a10 >= c10, true ); + ASSERT_EQ( a10 <= c10, true ); + ASSERT_EQ( a10 != b11, true ); + ASSERT_EQ( a10 == b11, false ); + ASSERT_EQ( a10 == c10, true ); + +} + +TEST(TestS3SElect, string_compare_operator) +{ + value s1("abc"), s2("def"), s3("abc"); + + ASSERT_EQ( s1 < s2, true ); + ASSERT_EQ( s1 > s2, false ); + ASSERT_EQ( s1 <= s3, true ); + ASSERT_EQ( s1 >= s3, true ); + ASSERT_EQ( s1 != s2, true ); + ASSERT_EQ( s1 == s3, true ); + ASSERT_EQ( s1 == s2, false ); +} + +TEST(TestS3SElect, arithmetic_operator) +{ + value a(1), b(2), c(3), d(4); + + ASSERT_EQ( (a+b).i64(), 3 ); + + ASSERT_EQ( (value(0)-value(2)*value(4)).i64(), -8 ); + ASSERT_EQ( (value(1.23)-value(0.1)*value(2)).dbl(), 1.03 ); + + a=int64_t(1); //a+b modify a + ASSERT_EQ( ( (a+b) * (c+d) ).i64(), 21 ); +} + +TEST(TestS3SElect, timestamp_function) +{ + // TODO: support formats listed here: + // https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-date.html#s3-glacier-select-sql-reference-to-timestamp + const std::string timestamp = "2007-02-23:14:33:01"; + // TODO: out_simestamp should be the same as timestamp + const std::string out_timestamp = "2007-Feb-23 14:33:01"; + const std::string input_query = "select timestamp(\"" + timestamp + "\") from stdin;" ; + std::string s3select_res = run_s3select(input_query); + ASSERT_EQ(s3select_res, out_timestamp); +} + +TEST(TestS3SElect, utcnow_function) +{ + const boost::posix_time::ptime now(boost::posix_time::second_clock::universal_time()); + const std::string input_query = "select utcnow() from stdin;" ; + auto s3select_res = run_s3select(input_query); + const boost::posix_time::ptime res_now; + ASSERT_EQ(s3select_res, boost::posix_time::to_simple_string(now)); +} + |