summaryrefslogtreecommitdiffstats
path: root/src/s3select
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/s3select
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/s3select')
-rw-r--r--src/s3select/.gitignore2
-rw-r--r--src/s3select/CMakeLists.txt19
-rw-r--r--src/s3select/README.md50
-rw-r--r--src/s3select/example/CMakeLists.txt12
-rwxr-xr-xsrc/s3select/example/expr_genrator.py9
-rw-r--r--src/s3select/example/generate_rand_csv.c28
-rwxr-xr-xsrc/s3select/example/parse_csv.py12
-rwxr-xr-xsrc/s3select/example/run_test.bash96
-rw-r--r--src/s3select/example/s3select_example.cpp136
-rw-r--r--src/s3select/include/s3select.h1138
-rw-r--r--src/s3select/include/s3select_csv_parser.h407
-rw-r--r--src/s3select/include/s3select_functions.h1037
-rw-r--r--src/s3select/include/s3select_oper.h1320
-rw-r--r--src/s3select/s3select-parse-s.pngbin0 -> 193829 bytes
-rw-r--r--src/s3select/s3select.rst259
-rw-r--r--src/s3select/test/CMakeLists.txt6
-rw-r--r--src/s3select/test/s3select_test.cpp244
17 files changed, 4775 insertions, 0 deletions
diff --git a/src/s3select/.gitignore b/src/s3select/.gitignore
new file mode 100644
index 000000000..d6536badc
--- /dev/null
+++ b/src/s3select/.gitignore
@@ -0,0 +1,2 @@
+build
+compile_commands.json
diff --git a/src/s3select/CMakeLists.txt b/src/s3select/CMakeLists.txt
new file mode 100644
index 000000000..99e0f16a7
--- /dev/null
+++ b/src/s3select/CMakeLists.txt
@@ -0,0 +1,19 @@
+cmake_minimum_required(VERSION 3.0)
+
+project(s3select)
+
+set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb")
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+find_package(Boost REQUIRED)
+find_package(GTest REQUIRED)
+
+enable_testing()
+
+add_subdirectory(example)
+add_subdirectory(test)
+
+add_test(NAME run_my_test
+ COMMAND sh -c "../example/run_test.bash")
+
diff --git a/src/s3select/README.md b/src/s3select/README.md
new file mode 100644
index 000000000..f1408dbdb
--- /dev/null
+++ b/src/s3select/README.md
@@ -0,0 +1,50 @@
+# s3select
+**The purpose of s3select engine** is to create an efficient pipe between user client to storage node (the engine should be as close as possible to storage, "moving computation into storage").
+
+It enables the user to define the exact portion of data should received by his side.
+
+It also enables for higher level analytic-applications (such as SPARK-SQL) , using that feature to improve their latency and throughput.
+
+https://aws.amazon.com/blogs/aws/s3-glacier-select/
+
+https://www.qubole.com/blog/amazon-s3-select-integration/
+
+The engine is using boost::spirit to define the grammar , and by that building the AST (abstract-syntax-tree). upon statement is accepted by the grammar it create a tree of objects.
+
+The hierarchy(levels) of the different objects also define their role, i.e. function could be a finite expression, or an argument for an expression, or an argument for other functions, and so forth.
+
+Bellow is an example for “SQL” statement been parsed and transform into AST.
+![alt text](/s3select-parse-s.png)
+
+The where-clause is boolean expression made of arithmetic expression building blocks.
+
+Projection is a list of arithmetic expressions
+
+I created a container (**sudo docker run -it galsl/boost:latest /bin/bash/**) built with boost libraries , for building and running the s3select demo application.
+
+**The demo can run on CSV files only, as follow. (folder s3select_demo)**
+* bash> s3select -q ‘select _1 +_2,_5 * 3 from /...some..full-path/csv.txt where _1 > _2;’
+
+* bash> cat /...some..full-path/csv.txt | s3select -q ‘select _1,_5 from stdin where _1 > _2;’
+
+* bash> cat /...some..full-path/csv.txt | s3select -q ‘select c1,c5 from stdin where c1 > c2;’ -s ‘c1,c2,c3,c4,c5’
+
+* bash> cat /...some..full-path/csv.txt | s3select -q 'select min(int(substr(_1,1,1))) from stdin where substr(_1,1,1) == substr(_2,1,1);'
+
+-s flag is defining a schema (no type only names) , without schema each column can be accessed with _N (_1 is the first column).
+
+-q flag is for the query.
+
+the engine supporting the following arithmetical operations +,-,*,/,^ , ( ) , and also the logical operators and,or.
+
+s3select is supporting float,decimal,string; it also supports aggregation functions such as max,min,sum,count; the input stream is accepted as string attributes, to operate arithmetical operation it need to CAST, i.e. int(_1) is converting text to integer.
+
+The demo-app is producing CSV format , thus it can be piped into another s3select statement.
+
+there is a small app /generate_rand_csv {number-of-rows} {number-of-columns}/ , which generate CSV rows containing only numbers.
+
+the random numbers are produced with same seed number.
+
+since it works with STDIN , it possible to concatenate several files into single stream.
+
+cat file1 file2 file1 file2 | s3select -q ‘ ….. ‘
diff --git a/src/s3select/example/CMakeLists.txt b/src/s3select/example/CMakeLists.txt
new file mode 100644
index 000000000..37f99f0b3
--- /dev/null
+++ b/src/s3select/example/CMakeLists.txt
@@ -0,0 +1,12 @@
+add_executable(s3select_example s3select_example.cpp)
+target_include_directories(s3select_example PUBLIC ../include)
+target_link_libraries(s3select_example boost_date_time)
+
+add_executable(generate_rand_csv generate_rand_csv.c)
+
+add_custom_command(OUTPUT expr_genrator.py COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/expr_genrator.py expr_genrator.py
+ COMMENT "Copy expr_genrator.py"
+ VERBATIM)
+
+add_custom_target(expr_generator ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/expr_genrator.py)
+
diff --git a/src/s3select/example/expr_genrator.py b/src/s3select/example/expr_genrator.py
new file mode 100755
index 000000000..0d21fcee6
--- /dev/null
+++ b/src/s3select/example/expr_genrator.py
@@ -0,0 +1,9 @@
+import random
+import sys
+
+def expr(depth):
+ if depth==1 or random.random()<1.0/(2**depth-1):
+ return str(int(random.random() * 100) + 1)+".0"
+ return '(' + expr(depth-1) + random.choice(['+','-','*','/']) + expr(depth-1) + ')'
+
+print expr( int(sys.argv[1]) )
diff --git a/src/s3select/example/generate_rand_csv.c b/src/s3select/example/generate_rand_csv.c
new file mode 100644
index 000000000..67d52adaa
--- /dev/null
+++ b/src/s3select/example/generate_rand_csv.c
@@ -0,0 +1,28 @@
+#include <stdio.h>
+#include <stdlib.h>
+
+
+int main(int argc, char** argv)
+{
+ if (argc<3)
+ {
+ printf("%s <num-of-rows> <num-of-columns> \n", argv[0]);
+ return -1;
+ }
+
+ srand(1234);
+ int line_no=0;
+ for(int i=0; i<atoi(argv[1]); i++)
+ {
+ printf("%d,", i);
+ for(int y=0; y<atoi(argv[2]); y++)
+ {
+ printf("%d,", rand()%1000);
+ }
+ printf("\n");
+ }
+
+
+
+
+}
diff --git a/src/s3select/example/parse_csv.py b/src/s3select/example/parse_csv.py
new file mode 100755
index 000000000..23fe14add
--- /dev/null
+++ b/src/s3select/example/parse_csv.py
@@ -0,0 +1,12 @@
+#!/usr/bin/python
+
+import csv
+
+with open('stam.txt') as csv_file:
+ csv_reader = csv.reader(csv_file, delimiter=',')
+ line_count = 0
+ for row in csv_reader:
+ #if (int(row[0])==465 and int(row[5])==268): # casting is slower
+ if (row[0]=="465" and row[5]=="268"):
+ print row
+
diff --git a/src/s3select/example/run_test.bash b/src/s3select/example/run_test.bash
new file mode 100755
index 000000000..56654d7ab
--- /dev/null
+++ b/src/s3select/example/run_test.bash
@@ -0,0 +1,96 @@
+#!/bin/bash
+
+set -e
+
+PREFIX=${1:-"./example"}
+
+## purpose : sanity tests
+
+s3select_calc()
+{
+l="$*"
+res=$( echo 1 | "$PREFIX"/s3select_example -q "select ${l} from stdin;" )
+echo "$res" | sed 's/.$//'
+}
+
+# create c file with expression , compile it and run it.
+c_calc()
+{
+cat << @@ > "$PREFIX"/tmp.c
+
+#include <stdio.h>
+int main()
+{
+printf("%f\n",$*);
+}
+@@
+gcc -o "$PREFIX"/a.out "$PREFIX"/tmp.c
+"$PREFIX"/a.out
+}
+
+expr_test()
+{
+## test the arithmetic evaluation of s3select against C program
+for i in {1..100}
+do
+ e=$(python2 "$PREFIX"/expr_genrator.py 5)
+ echo expression["$i"]="$e"
+ r1=$(s3select_calc "$e")
+ r2=$(c_calc "$e")
+ echo "$r1" "$r2"
+
+ ## should be zero or very close to zero; ( s3select is C compile program )
+ res=$(echo "" | awk -v e="$e" -v r1="$r1" -v r2="$r2" 'function abs(n){if (n<0) return -n; else return n;}{if (abs(r1-r2) > 0.00001) {print "MISSMATCH result for expression",e;}}')
+ if test "$res" != ""; then
+ echo "$res"
+ exit 1
+ fi
+done
+}
+
+aggregate_test()
+{
+## generate_rand_csv is generating with the same seed
+echo check sum
+s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select sum(int(_1)) from stdin;')
+awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";} {s+=$1;} END{print s;}')
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+echo check min
+s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select min(int(_1)) from stdin;')
+awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";min=100000;} {if(min>$1) min=$1;} END{print min;}')
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+echo check max
+s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select max(int(_1)) from stdin;')
+awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";max=0;} {if(max<$1) max=$1;} END{print max;}' )
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+echo check substr and count
+s3select_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select count(int(_1)) from stdin where int(_1)>200 and int(_1)<250;')
+awk_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select substr(_1,1,1) from stdin where int(_1)>200 and int(_1)<250;' | uniq -c | awk '{print $1;}')
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+}
+
+###############################################################
+
+expr_test
+aggregate_test
+
+rm "$PREFIX"/tmp.c "$PREFIX"/a.out
+
+exit 0
+
diff --git a/src/s3select/example/s3select_example.cpp b/src/s3select/example/s3select_example.cpp
new file mode 100644
index 000000000..840b62c6a
--- /dev/null
+++ b/src/s3select/example/s3select_example.cpp
@@ -0,0 +1,136 @@
+#include "s3select.h"
+#include <fstream>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+using namespace s3selectEngine;
+using namespace BOOST_SPIRIT_CLASSIC_NS;
+
+int cli_get_schema(const char* input_schema, actionQ& x)
+{
+ g_push_column.set_action_q(&x);
+
+ rule<> column_name_rule = lexeme_d[(+alpha_p >> *digit_p)];
+
+ //TODO an issue to resolve with trailing space
+ parse_info<> info = parse(input_schema, ((column_name_rule)[BOOST_BIND_ACTION(push_column)] >> *(',' >> (column_name_rule)[BOOST_BIND_ACTION(push_column)])), space_p);
+
+ if (!info.full)
+ {
+ std::cout << "failure in schema description " << input_schema << std::endl;
+ return -1;
+ }
+
+ return 0;
+}
+
+int main(int argc, char** argv)
+{
+
+ //purpose: demostrate the s3select functionalities
+ s3select s3select_syntax;
+
+ char* input_query = 0;
+
+ for (int i = 0; i < argc; i++)
+ {
+
+ if (!strcmp(argv[i], "-q"))
+ {
+ input_query = argv[i + 1];
+ }
+ }
+
+
+ if (!input_query)
+ {
+ std::cout << "type -q 'select ... from ... '" << std::endl;
+ return -1;
+ }
+
+
+ bool to_aggregate = false;
+
+ int status = s3select_syntax.parse_query(input_query);
+ if (status != 0)
+ {
+ std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
+ return -1;
+ }
+
+ std::string object_name = s3select_syntax.get_from_clause(); //TODO stdin
+
+ FILE* fp;
+
+ if (object_name.compare("stdin")==0)
+ {
+ fp = stdin;
+ }
+ else
+ {
+ fp = fopen(object_name.c_str(), "r");
+ }
+
+
+ if(!fp)
+ {
+ std::cout << " input stream is not valid, abort;" << std::endl;
+ return -1;
+ }
+
+ struct stat statbuf;
+
+ lstat(object_name.c_str(), &statbuf);
+
+ std::string s3select_result;
+ s3selectEngine::csv_object::csv_defintions csv;
+ csv.use_header_info = false;
+ //csv.column_delimiter='|';
+ //csv.row_delimiter='\t';
+
+
+ s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);
+ //s3selectEngine::csv_object s3_csv_object(&s3select_syntax);
+
+#define BUFF_SIZE 1024*1024*4
+ char* buff = (char*)malloc( BUFF_SIZE );
+ while(1)
+ {
+ //char buff[4096];
+
+ //char * in = fgets(buff,sizeof(buff),fp);
+ size_t input_sz = fread(buff, 1, BUFF_SIZE, fp);
+ char* in=buff;
+ //input_sz = strlen(buff);
+ //size_t input_sz = in == 0 ? 0 : strlen(in);
+
+ //if (!input_sz) to_aggregate = true;
+
+
+ //int status = s3_csv_object.run_s3select_on_object(s3select_result,in,input_sz,false,false,to_aggregate);
+ int status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size);
+ if(status<0)
+ {
+ std::cout << "failure on execution " << std::endl;
+ break;
+ }
+
+ if(s3select_result.size()>1)
+ {
+ std::cout << s3select_result;
+ }
+
+ s3select_result = "";
+ if(!input_sz || feof(fp))
+ {
+ break;
+ }
+
+ }
+
+ free(buff);
+ fclose(fp);
+
+
+}
diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h
new file mode 100644
index 000000000..77af8c2d9
--- /dev/null
+++ b/src/s3select/include/s3select.h
@@ -0,0 +1,1138 @@
+#ifndef __S3SELECT__
+#define __S3SELECT__
+
+#include <boost/spirit/include/classic_core.hpp>
+#include <boost/algorithm/string.hpp>
+#include <iostream>
+#include <string>
+#include <list>
+#include "s3select_oper.h"
+#include "s3select_functions.h"
+#include "s3select_csv_parser.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <functional>
+
+
+#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}
+
+
+namespace s3selectEngine
+{
+
+/// AST builder
+
+class s3select_projections
+{
+
+private:
+ std::vector<base_statement*> m_projections;
+
+public:
+ bool is_aggregate()
+ {
+ //TODO iterate on projections , and search for aggregate
+ //for(auto p : m_projections){}
+
+ return false;
+ }
+
+ bool semantic()
+ {
+ //TODO check aggragtion function are not nested
+ return false;
+ }
+
+ std::vector<base_statement*>* get()
+ {
+ return &m_projections;
+ }
+
+};
+
+struct actionQ
+{
+// upon parser is accepting a token (lets say some number),
+// it push it into dedicated queue, later those tokens are poped out to build some "higher" contruct (lets say 1 + 2)
+// those containers are used only for parsing phase and not for runtime.
+
+ std::vector<mulldiv_operation::muldiv_t> muldivQ;
+ std::vector<addsub_operation::addsub_op_t> addsubQ;
+ std::vector<arithmetic_operand::cmp_t> arithmetic_compareQ;
+ std::vector<logical_operand::oplog_t> logical_compareQ;
+ std::vector<base_statement*> exprQ;
+ std::vector<base_statement*> funcQ;
+ std::vector<base_statement*> condQ;
+ projection_alias alias_map;
+ std::string from_clause;
+ std::vector<std::string> schema_columns;
+ s3select_projections projections;
+
+};
+
+class base_action : public __clt_allocator
+{
+
+public:
+ base_action() : m_action(0), m_s3select_functions(0) {}
+ actionQ* m_action;
+ void set_action_q(actionQ* a)
+ {
+ m_action = a;
+ }
+ void set_s3select_functions(s3select_functions* s3f)
+ {
+ m_s3select_functions = s3f;
+ }
+ s3select_functions* m_s3select_functions;
+};
+
+struct push_from_clause : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ m_action->from_clause = token;
+ }
+
+};
+static push_from_clause g_push_from_clause;
+
+struct push_number : public base_action //TODO use define for defintion of actions
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ variable* v = S3SELECT_NEW( variable, atoi(token.c_str()));
+
+
+ m_action->exprQ.push_back(v);
+ }
+
+};
+static push_number g_push_number;
+
+struct push_float_number : public base_action //TODO use define for defintion of actions
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ //the parser for float(real_p) is accepting also integers, thus "blocking" integer acceptence and all are float.
+ bsc::parse_info<> info = bsc::parse(token.c_str(), bsc::int_p, bsc::space_p);
+
+ if (!info.full)
+ {
+ char* perr;
+ double d = strtod(token.c_str(), &perr);
+ variable* v = S3SELECT_NEW( variable, d);
+
+ m_action->exprQ.push_back(v);
+ }
+ else
+ {
+ variable* v = S3SELECT_NEW(variable, atoi(token.c_str()));
+
+ m_action->exprQ.push_back(v);
+ }
+ }
+
+};
+static push_float_number g_push_float_number;
+
+struct push_string : public base_action //TODO use define for defintion of actions
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ a++;
+ b--;// remove double quotes
+ std::string token(a, b);
+
+ variable* v = S3SELECT_NEW(variable, token, variable::var_t::COL_VALUE );
+
+ m_action->exprQ.push_back(v);
+ }
+
+};
+static push_string g_push_string;
+
+struct push_variable : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ variable* v = S3SELECT_NEW(variable, token);
+
+ m_action->exprQ.push_back(v);
+ }
+};
+static push_variable g_push_variable;
+
+/////////////////////////arithmetic unit /////////////////
+struct push_addsub : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ if (token.compare("+") == 0)
+ {
+ m_action->addsubQ.push_back(addsub_operation::addsub_op_t::ADD);
+ }
+ else
+ {
+ m_action->addsubQ.push_back(addsub_operation::addsub_op_t::SUB);
+ }
+ }
+};
+static push_addsub g_push_addsub;
+
+struct push_mulop : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ if (token.compare("*") == 0)
+ {
+ m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::MULL);
+ }
+ else if (token.compare("/") == 0)
+ {
+ m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::DIV);
+ }
+ else
+ {
+ m_action->muldivQ.push_back(mulldiv_operation::muldiv_t::POW);
+ }
+ }
+};
+static push_mulop g_push_mulop;
+
+struct push_addsub_binop : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ base_statement* l = 0, *r = 0;
+
+ r = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ l = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ addsub_operation::addsub_op_t o = m_action->addsubQ.back();
+ m_action->addsubQ.pop_back();
+ addsub_operation* as = S3SELECT_NEW(addsub_operation, l, o, r);
+ m_action->exprQ.push_back(as);
+ }
+};
+static push_addsub_binop g_push_addsub_binop;
+
+struct push_mulldiv_binop : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ base_statement* vl = 0, *vr = 0;
+
+ vr = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ vl = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ mulldiv_operation::muldiv_t o = m_action->muldivQ.back();
+ m_action->muldivQ.pop_back();
+ mulldiv_operation* f = S3SELECT_NEW(mulldiv_operation, vl, o, vr);
+ m_action->exprQ.push_back(f);
+ }
+};
+static push_mulldiv_binop g_push_mulldiv_binop;
+
+struct push_function_arg : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* be = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ base_statement* f = m_action->funcQ.back();
+
+ if (dynamic_cast<__function*>(f))
+ {
+ dynamic_cast<__function*>(f)->push_argument(be);
+ }
+ }
+};
+static push_function_arg g_push_function_arg;
+
+struct push_function_name : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ b--;
+ while(*b=='(' || *b == ' ')
+ {
+ b--; //point to function-name
+ }
+
+ std::string fn;
+ fn.assign(a, b-a+1);
+
+ __function* func = S3SELECT_NEW(__function, fn.c_str(), m_s3select_functions);
+ m_action->funcQ.push_back(func);
+ }
+};
+static push_function_name g_push_function_name;
+
+struct push_function_expr : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* func = m_action->funcQ.back();
+ m_action->funcQ.pop_back();
+
+ m_action->exprQ.push_back(func);
+ }
+};
+static push_function_expr g_push_function_expr;
+
+////////////////////// logical unit ////////////////////////
+
+struct push_compare_operator : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ arithmetic_operand::cmp_t c = arithmetic_operand::cmp_t::NA;
+
+ if (token.compare("==") == 0)
+ {
+ c = arithmetic_operand::cmp_t::EQ;
+ }
+ else if (token.compare("!=") == 0)
+ {
+ c = arithmetic_operand::cmp_t::NE;
+ }
+ else if (token.compare(">=") == 0)
+ {
+ c = arithmetic_operand::cmp_t::GE;
+ }
+ else if (token.compare("<=") == 0)
+ {
+ c = arithmetic_operand::cmp_t::LE;
+ }
+ else if (token.compare(">") == 0)
+ {
+ c = arithmetic_operand::cmp_t::GT;
+ }
+ else if (token.compare("<") == 0)
+ {
+ c = arithmetic_operand::cmp_t::LT;
+ }
+ else
+ {
+ c = arithmetic_operand::cmp_t::NA;
+ }
+
+ m_action->arithmetic_compareQ.push_back(c);
+ }
+};
+static push_compare_operator g_push_compare_operator;
+
+struct push_logical_operator : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ logical_operand::oplog_t l = logical_operand::oplog_t::NA;
+
+ if (token.compare("and") == 0)
+ {
+ l = logical_operand::oplog_t::AND;
+ }
+ else if (token.compare("or") == 0)
+ {
+ l = logical_operand::oplog_t::OR;
+ }
+ else
+ {
+ l = logical_operand::oplog_t::NA;
+ }
+
+ m_action->logical_compareQ.push_back(l);
+
+ }
+};
+static push_logical_operator g_push_logical_operator;
+
+struct push_arithmetic_predicate : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* vr, *vl;
+ arithmetic_operand::cmp_t c = m_action->arithmetic_compareQ.back();
+ m_action->arithmetic_compareQ.pop_back();
+ vr = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+ vl = m_action->exprQ.back();
+ m_action->exprQ.pop_back();
+
+ arithmetic_operand* t = S3SELECT_NEW(arithmetic_operand, vl, c, vr);
+
+ m_action->condQ.push_back(t);
+ }
+};
+static push_arithmetic_predicate g_push_arithmetic_predicate;
+
+struct push_logical_predicate : public base_action
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ base_statement* tl = 0, *tr = 0;
+ logical_operand::oplog_t oplog = m_action->logical_compareQ.back();
+ m_action->logical_compareQ.pop_back();
+
+ if (m_action->condQ.empty() == false)
+ {
+ tr = m_action->condQ.back();
+ m_action->condQ.pop_back();
+ }
+ if (m_action->condQ.empty() == false)
+ {
+ tl = m_action->condQ.back();
+ m_action->condQ.pop_back();
+ }
+
+ logical_operand* f = S3SELECT_NEW(logical_operand, tl, oplog, tr);
+
+ m_action->condQ.push_back(f);
+ }
+};
+static push_logical_predicate g_push_logical_predicate;
+
+struct push_column_pos : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ variable* v;
+
+ if (token.compare("*") == 0 || token.compare("* ")==0) //TODO space should skip in boost::spirit
+ {
+ v = S3SELECT_NEW(variable, token, variable::var_t::STAR_OPERATION);
+ }
+ else
+ {
+ v = S3SELECT_NEW(variable, token, variable::var_t::POS);
+ }
+
+ m_action->exprQ.push_back(v);
+ }
+
+};
+static push_column_pos g_push_column_pos;
+
+struct push_projection : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ m_action->projections.get()->push_back( m_action->exprQ.back() );
+ m_action->exprQ.pop_back();
+ }
+
+};
+static push_projection g_push_projection;
+
+struct push_alias_projection : public base_action
+{
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ //extract alias name
+ const char* p=b;
+ while(*(--p) != ' ');
+ std::string alias_name(p+1, b);
+ base_statement* bs = m_action->exprQ.back();
+
+ //mapping alias name to base-statement
+ bool res = m_action->alias_map.insert_new_entry(alias_name, bs);
+ if (res==false)
+ {
+ throw base_s3select_exception(std::string("alias <")+alias_name+std::string("> is already been used in query"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+
+ m_action->projections.get()->push_back( bs );
+ m_action->exprQ.pop_back();
+ }
+
+};
+static push_alias_projection g_push_alias_projection;
+
+/// for the schema description "mini-parser"
+struct push_column : public base_action
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+
+ m_action->schema_columns.push_back(token);
+ }
+
+};
+static push_column g_push_column;
+
+struct push_debug_1 : public base_action
+{
+
+ void operator()(const char* a, const char* b) const
+ {
+ std::string token(a, b);
+ }
+
+};
+static push_debug_1 g_push_debug_1;
+
+struct s3select : public bsc::grammar<s3select>
+{
+private:
+
+ actionQ m_actionQ;
+
+ scratch_area m_sca;
+
+ s3select_functions m_s3select_functions;
+
+ std::string error_description;
+
+ s3select_allocator m_s3select_allocator;
+
+ bool aggr_flow;
+
+#define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2)
+#define ATTACH_ACTION_Q( push_name ) {(g_ ## push_name).set_action_q(&m_actionQ); (g_ ## push_name).set_s3select_functions(&m_s3select_functions); (g_ ## push_name).set(&m_s3select_allocator);}
+
+public:
+
+
+ int semantic()
+ {
+ for (auto e : get_projections_list())
+ {
+ base_statement* aggr = 0;
+
+ if ((aggr = e->get_aggregate()) != 0)
+ {
+ if (aggr->is_nested_aggregate(aggr))
+ {
+ error_description = "nested aggregation function is illegal i.e. sum(...sum ...)";
+ throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ aggr_flow = true;
+ }
+ }
+
+ if (aggr_flow == true)
+ for (auto e : get_projections_list())
+ {
+ base_statement* skip_expr = e->get_aggregate();
+
+ if (e->is_binop_aggregate_and_column(skip_expr))
+ {
+ error_description = "illegal expression. /select sum(c1) + c1 ..../ is not allow type of query";
+ throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ }
+
+ return 0;
+ }
+
+ int parse_query(const char* input_query)
+ {
+ if(get_projections_list().empty() == false)
+ {
+ return 0; //already parsed
+ }
+
+ try
+ {
+ bsc::parse_info<> info = bsc::parse(input_query, *this, bsc::space_p);
+ auto query_parse_position = info.stop;
+
+ if (!info.full)
+ {
+ std::cout << "failure -->" << query_parse_position << "<---" << std::endl;
+ error_description = std::string("failure -->") + query_parse_position + std::string("<---");
+ return -1;
+ }
+
+ semantic();
+ }
+ catch (base_s3select_exception& e)
+ {
+ std::cout << e.what() << std::endl;
+ error_description.assign(e.what());
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
+ {
+ return -1;
+ }
+ }
+
+ return 0;
+ }
+
+ std::string get_error_description()
+ {
+ return error_description;
+ }
+
+ s3select()
+ {
+ //TODO check option for defining action and push into list
+
+ ATTACH_ACTION_Q(push_from_clause);
+ ATTACH_ACTION_Q(push_number);
+ ATTACH_ACTION_Q(push_logical_operator);
+ ATTACH_ACTION_Q(push_logical_predicate);
+ ATTACH_ACTION_Q(push_compare_operator);
+ ATTACH_ACTION_Q(push_arithmetic_predicate);
+ ATTACH_ACTION_Q(push_addsub);
+ ATTACH_ACTION_Q(push_addsub_binop);
+ ATTACH_ACTION_Q(push_mulop);
+ ATTACH_ACTION_Q(push_mulldiv_binop);
+ ATTACH_ACTION_Q(push_function_arg);
+ ATTACH_ACTION_Q(push_function_name);
+ ATTACH_ACTION_Q(push_function_expr);
+ ATTACH_ACTION_Q(push_float_number);
+ ATTACH_ACTION_Q(push_string);
+ ATTACH_ACTION_Q(push_variable);
+ ATTACH_ACTION_Q(push_column_pos);
+ ATTACH_ACTION_Q(push_projection);
+ ATTACH_ACTION_Q(push_alias_projection);
+ ATTACH_ACTION_Q(push_debug_1);
+
+ error_description.clear();
+
+ m_s3select_functions.set(&m_s3select_allocator);
+
+ aggr_flow = false;
+ }
+
+ bool is_semantic()//TBD traverse and validate semantics per all nodes
+ {
+ base_statement* cond = m_actionQ.exprQ.back();
+
+ return cond->semantic();
+ }
+
+ std::string get_from_clause()
+ {
+ return m_actionQ.from_clause;
+ }
+
+ void load_schema(std::vector< std::string>& scm)
+ {
+ int i = 0;
+ for (auto c : scm)
+ {
+ m_sca.set_column_pos(c.c_str(), i++);
+ }
+ }
+
+ base_statement* get_filter()
+ {
+ if(m_actionQ.condQ.size()==0)
+ {
+ return NULL;
+ }
+
+ return m_actionQ.condQ.back();
+ }
+
+ std::vector<base_statement*> get_projections_list()
+ {
+ return *m_actionQ.projections.get(); //TODO return COPY(?) or to return evalaution results (list of class value{}) / return reference(?)
+ }
+
+ scratch_area* get_scratch_area()
+ {
+ return &m_sca;
+ }
+
+ projection_alias* get_aliases()
+ {
+ return &m_actionQ.alias_map;
+ }
+
+ bool is_aggregate_query()
+ {
+ return aggr_flow == true;
+ }
+
+ ~s3select() {}
+
+
+ template <typename ScannerT>
+ struct definition
+ {
+ definition(s3select const& )
+ {
+ ///// s3select syntax rules and actions for building AST
+
+ select_expr = bsc::str_p("select") >> projections >> bsc::str_p("from") >> (s3_object)[BOOST_BIND_ACTION(push_from_clause)] >> !where_clause >> ';';
+
+ projections = projection_expression >> *( ',' >> projection_expression) ;
+
+ projection_expression = (arithmetic_expression >> bsc::str_p("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] | (arithmetic_expression)[BOOST_BIND_ACTION(push_projection)] ;
+
+ alias_name = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)] ;
+
+
+ s3_object = bsc::str_p("stdin") | object_path ;
+
+ object_path = "/" >> *( fs_type >> "/") >> fs_type;
+
+ fs_type = bsc::lexeme_d[+( bsc::alnum_p | bsc::str_p(".") | bsc::str_p("_")) ];
+
+ where_clause = bsc::str_p("where") >> condition_expression;
+
+ condition_expression = (arithmetic_predicate >> *(log_op[BOOST_BIND_ACTION(push_logical_operator)] >> arithmetic_predicate[BOOST_BIND_ACTION(push_logical_predicate)]));
+
+ arithmetic_predicate = (factor >> *(arith_cmp[BOOST_BIND_ACTION(push_compare_operator)] >> factor[BOOST_BIND_ACTION(push_arithmetic_predicate)]));
+
+ factor = (arithmetic_expression) | ('(' >> condition_expression >> ')') ;
+
+ arithmetic_expression = (addsub_operand >> *(addsubop_operator[BOOST_BIND_ACTION(push_addsub)] >> addsub_operand[BOOST_BIND_ACTION(push_addsub_binop)] ));
+
+ addsub_operand = (mulldiv_operand >> *(muldiv_operator[BOOST_BIND_ACTION(push_mulop)] >> mulldiv_operand[BOOST_BIND_ACTION(push_mulldiv_binop)] ));// this non-terminal gives precedense to mull/div
+
+ mulldiv_operand = arithmetic_argument | ('(' >> (arithmetic_expression) >> ')') ;
+
+ list_of_function_arguments = (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)] >> *(',' >> (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)]);
+ function = ((variable >> '(' )[BOOST_BIND_ACTION(push_function_name)] >> !list_of_function_arguments >> ')')[BOOST_BIND_ACTION(push_function_expr)];
+
+ arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] | (number)[BOOST_BIND_ACTION(push_number)] | (column_pos)[BOOST_BIND_ACTION(push_column_pos)] |
+ (string)[BOOST_BIND_ACTION(push_string)] |
+ (function)[BOOST_BIND_ACTION(push_debug_1)] | (variable)[BOOST_BIND_ACTION(push_variable)] ;//function is pushed by right-term
+
+
+ number = bsc::int_p;
+
+ float_number = bsc::real_p;
+
+ string = bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"") ;
+
+ column_pos = ('_'>>+(bsc::digit_p) ) | '*' ;
+
+ muldiv_operator = bsc::str_p("*") | bsc::str_p("/") | bsc::str_p("^");// got precedense
+
+ addsubop_operator = bsc::str_p("+") | bsc::str_p("-");
+
+
+ arith_cmp = bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("==") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!=");
+
+ log_op = bsc::str_p("and") | bsc::str_p("or"); //TODO add NOT (unary)
+
+ variable = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)];
+ }
+
+
+ bsc::rule<ScannerT> variable, select_expr, s3_object, where_clause, number, float_number, string, arith_cmp, log_op, condition_expression, arithmetic_predicate, factor;
+ bsc::rule<ScannerT> muldiv_operator, addsubop_operator, function, arithmetic_expression, addsub_operand, list_of_function_arguments, arithmetic_argument, mulldiv_operand;
+ bsc::rule<ScannerT> fs_type, object_path;
+ bsc::rule<ScannerT> projections, projection_expression, alias_name, column_pos;
+ bsc::rule<ScannerT> const& start() const
+ {
+ return select_expr;
+ }
+ };
+};
+
+/////// handling different object types
+class base_s3object
+{
+
+protected:
+ scratch_area* m_sa;
+ std::string m_obj_name;
+
+public:
+ base_s3object(scratch_area* m) : m_sa(m), m_obj_name("") {}
+
+ void set(scratch_area* m)
+ {
+ m_sa = m;
+ m_obj_name = "";
+ }
+
+ virtual ~base_s3object() {}
+};
+
+
+class csv_object : public base_s3object
+{
+
+public:
+ struct csv_defintions
+ {
+ char row_delimiter;
+ char column_delimiter;
+ char escape_char;
+ char quot_char;
+ bool use_header_info;
+ bool ignore_header_info;//skip first line
+
+ csv_defintions():row_delimiter('\n'), column_delimiter(','), escape_char('\\'), quot_char('"'), use_header_info(false), ignore_header_info(false) {}
+
+ } m_csv_defintion;
+
+ csv_object(s3select* s3_query) :
+ base_s3object(s3_query->get_scratch_area()),
+ m_skip_last_line(false),
+ m_s3_select(0),
+ m_error_count(0),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0)
+ {
+ set(s3_query);
+ csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
+ }
+
+ csv_object(s3select* s3_query, struct csv_defintions csv) :
+ base_s3object(s3_query->get_scratch_area()),
+ m_skip_last_line(false),
+ m_s3_select(0),
+ m_error_count(0),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0)
+ {
+ set(s3_query);
+ m_csv_defintion = csv;
+ csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
+ }
+
+ csv_object():
+ base_s3object(0),
+ m_skip_last_line(false),
+ m_s3_select(0),
+ m_error_count(0),
+ m_extract_csv_header_info(false),
+ m_previous_line(false),
+ m_skip_first_line(false),
+ m_processed_bytes(0)
+ {
+ csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
+ }
+
+private:
+ base_statement* m_where_clause;
+ std::vector<base_statement*> m_projections;
+ bool m_aggr_flow = false; //TODO once per query
+ bool m_is_to_aggregate;
+ bool m_skip_last_line;
+ size_t m_stream_length;
+ std::string m_error_description;
+ char* m_stream;
+ char* m_end_stream;
+ std::vector<char*> m_row_tokens{128};
+ s3select* m_s3_select;
+ csvParser csv_parser;
+ size_t m_error_count;
+ bool m_extract_csv_header_info;
+ std::vector<std::string> m_csv_schema{128};
+
+ //handling arbitrary chunks (rows cut in the middle)
+ bool m_previous_line;
+ bool m_skip_first_line;
+ std::string merge_line;
+ std::string m_last_line;
+ size_t m_processed_bytes;
+
+ int getNextRow()
+ {
+ size_t num_of_tokens=0;
+
+ if(m_stream>=m_end_stream)
+ {
+ return -1;
+ }
+
+ if(csv_parser.parse(m_stream, m_end_stream, &m_row_tokens, &num_of_tokens)<0)
+ {
+ throw base_s3select_exception("failed to parse csv stream", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ m_stream = (char*)csv_parser.currentLoc();
+
+ if (m_skip_last_line && m_stream >= m_end_stream)
+ {
+ return -1;
+ }
+
+ return num_of_tokens;
+
+ }
+
+public:
+
+ void set(s3select* s3_query)
+ {
+ m_s3_select = s3_query;
+ base_s3object::set(m_s3_select->get_scratch_area());
+
+ m_projections = m_s3_select->get_projections_list();
+ m_where_clause = m_s3_select->get_filter();
+
+ if (m_where_clause)
+ {
+ m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases());
+ }
+
+ for (auto p : m_projections)
+ {
+ p->traverse_and_apply(m_sa, m_s3_select->get_aliases());
+ }
+
+ m_aggr_flow = m_s3_select->is_aggregate_query();
+ }
+
+
+ std::string get_error_description()
+ {
+ return m_error_description;
+ }
+
+ virtual ~csv_object() {}
+
+public:
+
+
+ int getMatchRow( std::string& result) //TODO virtual ? getResult
+ {
+ int number_of_tokens = 0;
+
+
+ if (m_aggr_flow == true)
+ {
+ do
+ {
+
+ number_of_tokens = getNextRow();
+ if (number_of_tokens < 0) //end of stream
+ {
+ if (m_is_to_aggregate)
+ for (auto i : m_projections)
+ {
+ i->set_last_call();
+ result.append( i->eval().to_string() );
+ result.append(",");
+ }
+
+ return number_of_tokens;
+ }
+
+ if ((*m_projections.begin())->is_set_last_call())
+ {
+ //should validate while query execution , no update upon nodes are marked with set_last_call
+ throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ m_sa->update(m_row_tokens, number_of_tokens);
+ for (auto a : *m_s3_select->get_aliases()->get())
+ {
+ a.second->invalidate_cache_result();
+ }
+
+ if (!m_where_clause || m_where_clause->eval().i64() == true)
+ for (auto i : m_projections)
+ {
+ i->eval();
+ }
+
+ }
+ while (1);
+ }
+ else
+ {
+
+ do
+ {
+
+ number_of_tokens = getNextRow();
+ if (number_of_tokens < 0)
+ {
+ return number_of_tokens;
+ }
+
+ m_sa->update(m_row_tokens, number_of_tokens);
+ for (auto a : *m_s3_select->get_aliases()->get())
+ {
+ a.second->invalidate_cache_result();
+ }
+
+ }
+ while (m_where_clause && m_where_clause->eval().i64() == false);
+
+ for (auto i : m_projections)
+ {
+ result.append( i->eval().to_string() );
+ result.append(",");
+ }
+ result.append("\n");
+ }
+
+ return number_of_tokens; //TODO wrong
+ }
+
+ int extract_csv_header_info()
+ {
+
+ if (m_csv_defintion.ignore_header_info == true)
+ {
+ while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter ))
+ {
+ m_stream++;
+ }
+ m_stream++;
+ }
+ else if(m_csv_defintion.use_header_info == true)
+ {
+ size_t num_of_tokens = getNextRow();//TODO validate number of tokens
+
+ for(size_t i=0; i<num_of_tokens; i++)
+ {
+ m_csv_schema[i].assign(m_row_tokens[i]);
+ }
+
+ m_s3_select->load_schema(m_csv_schema);
+ }
+
+ m_extract_csv_header_info = true;
+
+ return 0;
+ }
+
+ int run_s3select_on_stream(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size)
+ {
+ //purpose: the cv data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores
+ //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed.
+ int status;
+ std::string tmp_buff;
+ u_int32_t skip_last_bytes = 0;
+ m_processed_bytes += stream_length;
+
+ m_skip_first_line = false;
+
+ if (m_previous_line)
+ {
+ //if previous broken line exist , merge it to current chunk
+ char* p_obj_chunk = (char*)csv_stream;
+ while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk<(csv_stream+stream_length))
+ {
+ p_obj_chunk++;
+ }
+
+ tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
+ merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
+ m_previous_line = false;
+ m_skip_first_line = true;
+
+ status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
+ }
+
+ if (csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter)
+ {
+ //in case of "broken" last line
+ char* p_obj_chunk = (char*)&(csv_stream[stream_length - 1]);
+ while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk>csv_stream)
+ {
+ p_obj_chunk--; //scan until end-of previous line in chunk
+ }
+
+ skip_last_bytes = (&(csv_stream[stream_length - 1]) - p_obj_chunk);
+ m_last_line.assign(p_obj_chunk + 1, p_obj_chunk + 1 + skip_last_bytes); //save it for next chunk
+
+ m_previous_line = true;//it means to skip last line
+
+ }
+
+ status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
+
+ return status;
+ }
+
+ int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate)
+ {
+
+
+ m_stream = (char*)csv_stream;
+ m_end_stream = (char*)csv_stream + stream_length;
+ m_is_to_aggregate = do_aggregate;
+ m_skip_last_line = skip_last_line;
+
+ m_stream_length = stream_length;
+
+ if(m_extract_csv_header_info == false)
+ {
+ extract_csv_header_info();
+ }
+
+ if(skip_first_line)
+ {
+ while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter ))
+ {
+ m_stream++;
+ }
+ m_stream++;//TODO nicer
+ }
+
+ do
+ {
+
+ int num = 0;
+ try
+ {
+ num = getMatchRow(result);
+ }
+ catch (base_s3select_exception& e)
+ {
+ std::cout << e.what() << std::endl;
+ m_error_description = e.what();
+ m_error_count ++;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100)//abort query execution
+ {
+ return -1;
+ }
+ }
+
+ if (num < 0)
+ {
+ break;
+ }
+
+ }
+ while (1);
+
+ return 0;
+ }
+};
+
+};//namespace
+
+#endif
diff --git a/src/s3select/include/s3select_csv_parser.h b/src/s3select/include/s3select_csv_parser.h
new file mode 100644
index 000000000..5527da913
--- /dev/null
+++ b/src/s3select/include/s3select_csv_parser.h
@@ -0,0 +1,407 @@
+#include <iostream>
+
+#include <boost/mpl/vector/vector30.hpp>
+// back-end
+#include <boost/msm/back/state_machine.hpp>
+//front-end
+#include <boost/msm/front/state_machine_def.hpp>
+
+#include <vector>
+
+namespace msm = boost::msm;
+namespace mpl = boost::mpl;
+
+namespace s3selectEngine
+{
+// events
+struct event_column_sep {};
+struct event_eol {};
+struct event_end_of_stream {};
+struct event_not_column_sep {};//i.e any char
+struct event_quote {};
+struct event_escape {};
+struct event_empty {};
+
+
+// front-end: define the FSM structure
+struct csvStateMch_ : public msm::front::state_machine_def<csvStateMch_>
+{
+ char* input_stream;
+ std::vector<char*>* tokens;
+ std::vector<int> has_esc{128};
+ size_t token_idx;
+ size_t escape_idx;
+ char* input_cur_location;
+ char* start_token;
+ bool end_of_parse;
+
+ typedef csvStateMch_ csv_rules;
+
+ csvStateMch_():end_of_parse(false) {}
+
+ void set(const char* input, std::vector<char*>* tk)
+ {
+ input_cur_location = input_stream = const_cast<char*>(input);
+ token_idx = 0;
+ tokens = tk;
+ escape_idx = 0;
+ }
+
+ char get_char()
+ {
+ return *input_cur_location;
+ }
+
+ char get_next_char(const char * end_stream)
+ {
+ if (input_cur_location >= end_stream)
+ return 0;
+
+ input_cur_location++;
+ return *input_cur_location;
+ }
+
+ const char* currentLoc()
+ {
+ return input_cur_location;
+ }
+
+ void parse_escape(char* in, char esc_char='\\')
+ {
+ //assumption atleast one escape and single
+ char* dst, *src;
+
+ dst = src = in;
+
+ while (1)
+ {
+ while (*src && *src != esc_char)
+ {
+ src++; //search for escape
+ }
+
+ if (!*src) //reach end
+ {
+ char* p = src;
+ while (dst < src)
+ {
+ *dst++ = *p++; //full copy
+ }
+ return;
+ }
+ //found escape
+ dst = src; //override escape
+ //if(*(dst+1)=='n') {*dst=10;dst++;} //enables special character
+
+ while (*dst)
+ {
+ *dst = *(dst + 1);
+ dst++;
+ } //copy with shift
+ }
+ }
+
+ // The list of FSM states
+ struct Start_new_token_st : public msm::front::state<>
+ {};//0
+
+ struct In_new_token_st : public msm::front::state<>
+ {};//1
+
+ struct In_quote_st : public msm::front::state<>
+ {};//2
+
+ struct In_esc_in_token_st : public msm::front::state<>
+ {};//3
+
+ struct In_esc_quote_st : public msm::front::state<>
+ {};//4
+
+ struct In_esc_start_token_st : public msm::front::state<>
+ {};//5
+
+ struct End_of_line_st : public msm::front::state<>
+ {};//6
+
+ struct Empty_state : public msm::front::state<>
+ {};//7
+
+
+ // the initial state of the csvStateMch SM. Must be defined
+ typedef Start_new_token_st initial_state;
+
+ void start_new_token()//helper
+ {
+ start_token = input_cur_location;
+ (*tokens)[ token_idx ] = start_token;
+ token_idx++;
+ }
+
+ // transition actions
+ void start_new_token(event_column_sep const&)
+ {
+ *input_cur_location = 0;//remove column-delimiter
+ start_new_token();
+ }
+
+ void start_new_token(event_not_column_sep const&)
+ {
+ start_new_token();
+ }
+
+ //need to handle empty lines(no tokens);
+ void start_new_token(event_eol const&)
+ {
+ if(!token_idx)
+ {
+ return;
+ }
+ (*tokens)[ token_idx ] = start_token;
+ token_idx++;
+ }
+
+ void start_new_token(event_end_of_stream const&) {}
+
+ void in_new_token(event_not_column_sep const&)
+ {
+ if(!*start_token)
+ {
+ start_token = input_cur_location;
+ }
+ }
+
+ void in_new_token(event_eol const&)
+ {
+ *input_cur_location=0;
+ }
+
+ void in_new_token(event_end_of_stream const&) {}
+
+ void in_new_token(event_column_sep const&)
+ {
+ (*tokens)[ token_idx ] = input_cur_location+1;
+ *input_cur_location=0;
+ token_idx++;
+ }
+
+ void in_new_token(event_quote const&)
+ {
+ if(!*start_token)
+ {
+ start_token = input_cur_location;
+ }
+ }
+
+ void in_quote(event_quote const&) {}
+
+ void in_quote(event_column_sep const&) {}
+
+ void in_quote(event_not_column_sep const&) {}
+
+ void in_quote(event_eol const&)
+ {
+ *input_cur_location=0;
+ }
+
+ void in_quote(event_end_of_stream const&)
+ {
+ *input_cur_location=0;
+ }
+
+ void start_new_token(event_quote const&)
+ {
+ start_new_token();
+ }
+
+ void push_escape_pos()
+ {
+ if(escape_idx && has_esc[ escape_idx -1]== (int)(token_idx-1))
+ {
+ return;
+ }
+ has_esc[ escape_idx ] = token_idx-1;
+ escape_idx++;
+ }
+ void in_escape(event_escape const&)
+ {
+ push_escape_pos();
+ }
+ void in_escape_start_new_token(event_escape const&)
+ {
+ start_new_token();
+ push_escape_pos();
+ }
+
+ void in_escape(event_column_sep const&) {}
+ void in_escape(event_not_column_sep const&) {}
+ void in_escape(event_quote const&) {}
+ void in_escape(event_eol const&) {}
+ void in_escape(event_end_of_stream const&) {}
+
+ void empty_action(event_empty const&) {}
+
+ //TODO need a guard for tokens vector size (<MAX)
+ // Transition table for csvStateMch
+ struct transition_table : mpl::vector30<
+ // Start Event Next Action Guard
+ // +---------+-------------+---------+---------------------+----------------------+
+ a_row < Start_new_token_st, event_column_sep , Start_new_token_st , &csv_rules::start_new_token >,
+ a_row < Start_new_token_st, event_not_column_sep , In_new_token_st , &csv_rules::start_new_token >,
+ a_row < Start_new_token_st, event_eol , End_of_line_st , &csv_rules::start_new_token >,
+ a_row < Start_new_token_st, event_end_of_stream , End_of_line_st , &csv_rules::start_new_token >,
+ a_row < In_new_token_st , event_not_column_sep, In_new_token_st, &csv_rules::in_new_token >,
+ a_row < In_new_token_st , event_column_sep , In_new_token_st, &csv_rules::in_new_token >,
+ a_row < In_new_token_st , event_eol , End_of_line_st, &csv_rules::in_new_token >,
+ a_row < In_new_token_st , event_end_of_stream , End_of_line_st, &csv_rules::in_new_token >,
+
+ a_row < Start_new_token_st , event_quote , In_quote_st, &csv_rules::start_new_token >, //open quote
+ a_row < In_new_token_st , event_quote , In_quote_st, &csv_rules::in_quote >, //open quote
+ a_row < In_quote_st , event_quote , In_new_token_st, &csv_rules::in_quote >, //close quote
+ a_row < In_quote_st , event_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote
+ a_row < In_quote_st , event_not_column_sep , In_quote_st, &csv_rules::in_quote >, //stay in quote
+ a_row < In_quote_st , event_eol , End_of_line_st, &csv_rules::in_quote >, //end of quote/line
+ a_row < In_quote_st , event_end_of_stream , End_of_line_st, &csv_rules::in_quote >, //end of quote/line
+
+
+ //TODO add transitions for escape just before eol , eos.
+ a_row < Start_new_token_st , event_escape , In_esc_start_token_st, &csv_rules::in_escape_start_new_token >,
+ a_row < In_esc_start_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >, //escape column-sep
+ a_row < In_esc_start_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_start_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_start_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >,
+
+ a_row < In_new_token_st, event_escape, In_esc_in_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_column_sep, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_not_column_sep, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_escape, In_new_token_st, &csv_rules::in_escape >,
+ a_row < In_esc_in_token_st, event_quote, In_new_token_st, &csv_rules::in_escape >,
+
+ a_row < In_quote_st, event_escape, In_esc_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_column_sep, In_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_not_column_sep, In_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_escape, In_quote_st, &csv_rules::in_escape >,
+ a_row < In_esc_quote_st, event_quote, In_quote_st, &csv_rules::in_escape >
+
+ // +---------+-------------+---------+---------------------+----------------------+
+ > {};
+
+ // Replaces the default no-transition response.
+ template <class FSM, class Event>
+ void no_transition(Event const& e, FSM&, int state)
+ {
+ std::cout << "no transition from state " << state
+ << " on event " << typeid(e).name() << std::endl;
+ }
+}; //// end-of-state-machine
+
+
+
+// Pick a back-end
+typedef msm::back::state_machine<csvStateMch_> csvStateMch;
+
+//
+// Testing utilities.
+//
+
+static char const* const state_names[] = {"Start_new_token_st", "In_new_token_st", "In_quote_st", "In_esc_in_token_st",
+ "In_esc_quote_st", "In_esc_start_token_st", "End_of_line_st", "Empty_state"
+ };
+void pstate(csvStateMch const& p)//debug
+{
+ std::cout << " -> " << state_names[p.current_state()[0]] << std::endl;
+}
+
+
+class csvParser
+{
+
+ csvStateMch p;
+
+ char m_row_delimeter;
+ char m_column_delimiter;
+ char m_quote_char;
+ char m_escape_char;
+
+public:
+
+ csvParser(char rd='\n', char cd=',', char quot='"', char ec='\\'):m_row_delimeter(rd), m_column_delimiter(cd), m_quote_char(quot), m_escape_char(ec) {};
+
+ void set(char row_delimiter, char column_delimiter, char quot_char, char escape_char)
+ {
+ m_row_delimeter = row_delimiter;
+ m_column_delimiter = column_delimiter;
+ m_quote_char = quot_char;
+ m_escape_char = escape_char;
+ }
+
+ int parse(char* input_stream, char* end_stream, std::vector<char*>* tk, size_t* num_of_tokens)
+ {
+ p.set(input_stream, tk);
+
+ // needed to start the highest-level SM. This will call on_entry and mark the start of the SM
+ p.start();
+
+ //TODO for better performance to use template specialization (\n \ , ")
+ do
+ {
+ if (p.get_char() == m_row_delimeter)
+ {
+ p.process_event(event_eol());
+ }
+ else if (p.get_char() == m_column_delimiter)
+ {
+ p.process_event(event_column_sep());
+ }
+ else if (p.get_char() == 0)
+ {
+ p.process_event(event_end_of_stream());
+ }
+ else if (p.get_char() == m_quote_char)
+ {
+ p.process_event(event_quote());
+ }
+ else if (p.get_char() == m_escape_char)
+ {
+ p.process_event(event_escape());
+ }
+ else
+ {
+ p.process_event(event_not_column_sep());
+ }
+
+ if (p.tokens->capacity() <= p.token_idx)
+ {
+ return -1;
+ }
+
+ if (p.currentLoc() >= end_stream)
+ {
+ break;
+ }
+ p.get_next_char(end_stream);
+ }
+ while (p.current_state()[0] != 6);
+
+ p.stop();
+
+ *num_of_tokens = p.token_idx;
+
+ //second pass for escape rules; only token with escape are processed, if any.
+ for(size_t i=0; i<p.escape_idx; i++)
+ {
+ p.parse_escape((*tk)[p.has_esc[i]], m_escape_char);
+ }
+
+ return 0;
+ }
+
+ const char* currentLoc()
+ {
+ return p.currentLoc();
+ }
+
+};//end csv-parser
+
+}//end-namespace
+
+
diff --git a/src/s3select/include/s3select_functions.h b/src/s3select/include/s3select_functions.h
new file mode 100644
index 000000000..77b3db3a7
--- /dev/null
+++ b/src/s3select/include/s3select_functions.h
@@ -0,0 +1,1037 @@
+#ifndef __S3SELECT_FUNCTIONS__
+#define __S3SELECT_FUNCTIONS__
+
+
+#include "s3select_oper.h"
+#define BOOST_BIND_ACTION_PARAM( push_name ,param ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2, param)
+namespace s3selectEngine
+{
+
+struct push_2dig
+{
+ void operator()(const char* a, const char* b, uint32_t* n) const
+ {
+ *n = ((char)(*a) - 48) *10 + ((char)*(a+1)-48) ;
+ }
+
+};
+static push_2dig g_push_2dig;
+
+struct push_4dig
+{
+ void operator()(const char* a, const char* b, uint32_t* n) const
+ {
+ *n = ((char)(*a) - 48) *1000 + ((char)*(a+1)-48)*100 + ((char)*(a+2)-48)*10 + ((char)*(a+3)-48);
+ }
+
+};
+static push_4dig g_push_4dig;
+
+enum class s3select_func_En_t {ADD,
+ SUM,
+ MIN,
+ MAX,
+ COUNT,
+ TO_INT,
+ TO_FLOAT,
+ TO_TIMESTAMP,
+ SUBSTR,
+ EXTRACT,
+ DATE_ADD,
+ DATE_DIFF,
+ UTCNOW
+ };
+
+
+class s3select_functions : public __clt_allocator
+{
+
+private:
+
+ using FunctionLibrary = std::map<std::string, s3select_func_En_t>;
+ const FunctionLibrary m_functions_library =
+ {
+ {"add", s3select_func_En_t::ADD},
+ {"sum", s3select_func_En_t::SUM},
+ {"count", s3select_func_En_t::COUNT},
+ {"min", s3select_func_En_t::MIN},
+ {"max", s3select_func_En_t::MAX},
+ {"int", s3select_func_En_t::TO_INT},
+ {"float", s3select_func_En_t::TO_FLOAT},
+ {"substr", s3select_func_En_t::SUBSTR},
+ {"timestamp", s3select_func_En_t::TO_TIMESTAMP},
+ {"extract", s3select_func_En_t::EXTRACT},
+ {"dateadd", s3select_func_En_t::DATE_ADD},
+ {"datediff", s3select_func_En_t::DATE_DIFF},
+ {"utcnow", s3select_func_En_t::UTCNOW}
+ };
+
+public:
+
+ base_function* create(std::string fn_name);
+};
+
+class __function : public base_statement
+{
+
+private:
+ std::vector<base_statement*> arguments;
+ std::string name;
+ base_function* m_func_impl;
+ s3select_functions* m_s3select_functions;
+ variable m_result;
+
+ void _resolve_name()
+ {
+ if (m_func_impl)
+ {
+ return;
+ }
+
+ base_function* f = m_s3select_functions->create(name);
+ if (!f)
+ {
+ throw base_s3select_exception("function not found", base_s3select_exception::s3select_exp_en_t::FATAL); //should abort query
+ }
+ m_func_impl = f;
+ }
+
+public:
+ virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa)
+ {
+ m_scratch = sa;
+ m_aliases = pa;
+ for (base_statement* ba : arguments)
+ {
+ ba->traverse_and_apply(sa, pa);
+ }
+ }
+
+ virtual bool is_aggregate() // TODO under semantic flow
+ {
+ _resolve_name();
+
+ return m_func_impl->is_aggregate();
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ __function(const char* fname, s3select_functions* s3f) : name(fname), m_func_impl(0), m_s3select_functions(s3f) {}
+
+ virtual value& eval()
+ {
+
+ _resolve_name();
+
+ if (is_last_call == false)
+ {
+ (*m_func_impl)(&arguments, &m_result);
+ }
+ else
+ {
+ (*m_func_impl).get_aggregate_result(&m_result);
+ }
+
+ return m_result.get_value();
+ }
+
+
+
+ virtual std::string print(int ident)
+ {
+ return std::string(0);
+ }
+
+ void push_argument(base_statement* arg)
+ {
+ arguments.push_back(arg);
+ }
+
+
+ std::vector<base_statement*> get_arguments()
+ {
+ return arguments;
+ }
+
+ virtual ~__function()
+ {
+ arguments.clear();
+ }
+};
+
+
+
+/*
+ s3-select function defintions
+*/
+struct _fn_add : public base_function
+{
+
+ value var_result;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+ iter++;
+ base_statement* y = *iter;
+
+ var_result = x->eval() + y->eval();
+
+ *result = var_result;
+
+ return true;
+ }
+};
+
+struct _fn_sum : public base_function
+{
+
+ value sum;
+
+ _fn_sum() : sum(0)
+ {
+ aggregate = true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+
+ try
+ {
+ sum = sum + x->eval();
+ }
+ catch (base_s3select_exception& e)
+ {
+ std::cout << "illegal value for aggregation(sum). skipping." << std::endl;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL)
+ {
+ throw;
+ }
+ }
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ *result = sum ;
+ }
+};
+
+struct _fn_count : public base_function
+{
+
+ int64_t count;
+
+ _fn_count():count(0)
+ {
+ aggregate=true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ count += 1;
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ result->set_value(count);
+ }
+
+};
+
+struct _fn_min : public base_function
+{
+
+ value min;
+
+ _fn_min():min(__INT64_MAX__)
+ {
+ aggregate=true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+
+ if(min > x->eval())
+ {
+ min=x->eval();
+ }
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ *result = min;
+ }
+
+};
+
+struct _fn_max : public base_function
+{
+
+ value max;
+
+ _fn_max():max(-__INT64_MAX__)
+ {
+ aggregate=true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ base_statement* x = *iter;
+
+ if(max < x->eval())
+ {
+ max=x->eval();
+ }
+
+ return true;
+ }
+
+ virtual void get_aggregate_result(variable* result)
+ {
+ *result = max;
+ }
+
+};
+
+struct _fn_to_int : public base_function
+{
+
+ value var_result;
+ value func_arg;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ char* perr;
+ int64_t i=0;
+ func_arg = (*args->begin())->eval();
+
+ if (func_arg.type == value::value_En_t::STRING)
+ {
+ i = strtol(func_arg.str(), &perr, 10) ; //TODO check error before constructor
+ }
+ else if (func_arg.type == value::value_En_t::FLOAT)
+ {
+ i = func_arg.dbl();
+ }
+ else
+ {
+ i = func_arg.i64();
+ }
+
+ var_result = i ;
+ *result = var_result;
+
+ return true;
+ }
+
+};
+
+struct _fn_to_float : public base_function
+{
+
+ value var_result;
+ value v_from;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ char* perr;
+ double d=0;
+ value v = (*args->begin())->eval();
+
+ if (v.type == value::value_En_t::STRING)
+ {
+ d = strtod(v.str(), &perr) ; //TODO check error before constructor
+ }
+ else if (v.type == value::value_En_t::FLOAT)
+ {
+ d = v.dbl();
+ }
+ else
+ {
+ d = v.i64();
+ }
+
+ var_result = d;
+ *result = var_result;
+
+ return true;
+ }
+
+};
+
+struct _fn_to_timestamp : public base_function
+{
+ bsc::rule<> separator = bsc::ch_p(":") | bsc::ch_p("-");
+
+ uint32_t yr = 1700, mo = 1, dy = 1;
+ bsc::rule<> dig4 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p >> bsc::digit_p >> bsc::digit_p];
+ bsc::rule<> dig2 = bsc::lexeme_d[bsc::digit_p >> bsc::digit_p];
+
+ bsc::rule<> d_yyyymmdd_dig = ((dig4[BOOST_BIND_ACTION_PARAM(push_4dig, &yr)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mo)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &dy)]) >> *(separator));
+
+ uint32_t hr = 0, mn = 0, sc = 0;
+ bsc::rule<> d_time_dig = ((dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &hr)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &mn)]) >> *(separator)
+ >> (dig2[BOOST_BIND_ACTION_PARAM(push_2dig, &sc)]) >> *(separator));
+
+ boost::posix_time::ptime new_ptime;
+
+ value v_str;
+
+
+ bool datetime_validation()
+ {
+ //TODO temporary , should check for leap year
+
+ if(yr<1700 || yr>2050)
+ {
+ return false;
+ }
+ if (mo<1 || mo>12)
+ {
+ return false;
+ }
+ if (dy<1 || dy>31)
+ {
+ return false;
+ }
+ if (hr>23)
+ {
+ return false;
+ }
+ if (dy>59)
+ {
+ return false;
+ }
+ if (sc>59)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+
+ hr = 0;
+ mn = 0;
+ sc = 0;
+
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size != 1)
+ {
+ throw base_s3select_exception("to_timestamp should have one parameter");
+ }
+
+ base_statement* str = *iter;
+
+ v_str = str->eval();
+
+ if (v_str.type != value::value_En_t::STRING)
+ {
+ throw base_s3select_exception("to_timestamp first argument must be string"); //can skip current row
+ }
+
+ bsc::parse_info<> info_dig = bsc::parse(v_str.str(), d_yyyymmdd_dig >> *(separator) >> d_time_dig);
+
+ if(datetime_validation()==false or !info_dig.full)
+ {
+ throw base_s3select_exception("input date-time is illegal");
+ }
+
+ new_ptime = boost::posix_time::ptime(boost::gregorian::date(yr, mo, dy),
+ boost::posix_time::hours(hr) + boost::posix_time::minutes(mn) + boost::posix_time::seconds(sc));
+
+ result->set_value(&new_ptime);
+
+ return true;
+ }
+
+};
+
+struct _fn_extact_from_timestamp : public base_function
+{
+
+ boost::posix_time::ptime new_ptime;
+
+ value val_date_part;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size < 2)
+ {
+ throw base_s3select_exception("to_timestamp should have 2 parameters");
+ }
+
+ base_statement* date_part = *iter;
+
+ val_date_part = date_part->eval();//TODO could be done once?
+
+ if(val_date_part.is_string()== false)
+ {
+ throw base_s3select_exception("first parameter should be string");
+ }
+
+ iter++;
+
+ base_statement* ts = *iter;
+
+ if(ts->eval().is_timestamp()== false)
+ {
+ throw base_s3select_exception("second parameter is not timestamp");
+ }
+
+ new_ptime = *ts->eval().timestamp();
+
+ if( strcmp(val_date_part.str(), "year")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().year() );
+ }
+ else if( strcmp(val_date_part.str(), "month")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().month() );
+ }
+ else if( strcmp(val_date_part.str(), "day")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().day_of_year() );
+ }
+ else if( strcmp(val_date_part.str(), "week")==0 )
+ {
+ result->set_value( (int64_t)new_ptime.date().week_number() );
+ }
+ else
+ {
+ throw base_s3select_exception(std::string( val_date_part.str() + std::string(" is not supported ") ).c_str() );
+ }
+
+ return true;
+ }
+
+};
+
+struct _fn_diff_timestamp : public base_function
+{
+
+ value val_date_part;
+ value val_dt1;
+ value val_dt2;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size < 3)
+ {
+ throw base_s3select_exception("datediff need 3 parameters");
+ }
+
+ base_statement* date_part = *iter;
+
+ val_date_part = date_part->eval();
+
+ iter++;
+ base_statement* dt1_param = *iter;
+ val_dt1 = dt1_param->eval();
+ if (val_dt1.is_timestamp() == false)
+ {
+ throw base_s3select_exception("second parameter should be timestamp");
+ }
+
+ iter++;
+ base_statement* dt2_param = *iter;
+ val_dt2 = dt2_param->eval();
+ if (val_dt2.is_timestamp() == false)
+ {
+ throw base_s3select_exception("third parameter should be timestamp");
+ }
+
+ if (strcmp(val_date_part.str(), "year") == 0)
+ {
+ int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ;
+ result->set_value( yr );
+ }
+ else if (strcmp(val_date_part.str(), "month") == 0)
+ {
+ int64_t yr = val_dt2.timestamp()->date().year() - val_dt1.timestamp()->date().year() ;
+ int64_t mon = val_dt2.timestamp()->date().month() - val_dt1.timestamp()->date().month() ;
+
+ result->set_value( yr*12 + mon );
+ }
+ else if (strcmp(val_date_part.str(), "day") == 0)
+ {
+ boost::gregorian::date_period dp =
+ boost::gregorian::date_period( val_dt1.timestamp()->date(), val_dt2.timestamp()->date());
+ result->set_value( dp.length().days() );
+ }
+ else if (strcmp(val_date_part.str(), "hours") == 0)
+ {
+ boost::posix_time::time_duration td_res = (*val_dt2.timestamp() - *val_dt1.timestamp());
+ result->set_value( td_res.hours());
+ }
+ else
+ {
+ throw base_s3select_exception("first parameter should be string: year,month,hours,day");
+ }
+
+
+ return true;
+ }
+};
+
+struct _fn_add_to_timestamp : public base_function
+{
+
+ boost::posix_time::ptime new_ptime;
+
+ value val_date_part;
+ value val_quantity;
+ value val_timestamp;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+ if (args_size < 3)
+ {
+ throw base_s3select_exception("add_to_timestamp should have 3 parameters");
+ }
+
+ base_statement* date_part = *iter;
+ val_date_part = date_part->eval();//TODO could be done once?
+
+ if(val_date_part.is_string()== false)
+ {
+ throw base_s3select_exception("first parameter should be string");
+ }
+
+ iter++;
+ base_statement* quan = *iter;
+ val_quantity = quan->eval();
+
+ if (val_quantity.is_number() == false)
+ {
+ throw base_s3select_exception("second parameter should be number"); //TODO what about double?
+ }
+
+ iter++;
+ base_statement* ts = *iter;
+ val_timestamp = ts->eval();
+
+ if(val_timestamp.is_timestamp() == false)
+ {
+ throw base_s3select_exception("third parameter should be time-stamp");
+ }
+
+ new_ptime = *val_timestamp.timestamp();
+
+ if( strcmp(val_date_part.str(), "year")==0 )
+ {
+ new_ptime += boost::gregorian::years( val_quantity.i64() );
+ result->set_value( &new_ptime );
+ }
+ else if( strcmp(val_date_part.str(), "month")==0 )
+ {
+ new_ptime += boost::gregorian::months( val_quantity.i64() );
+ result->set_value( &new_ptime );
+ }
+ else if( strcmp(val_date_part.str(), "day")==0 )
+ {
+ new_ptime += boost::gregorian::days( val_quantity.i64() );
+ result->set_value( &new_ptime );
+ }
+ else
+ {
+ throw base_s3select_exception( std::string(val_date_part.str() + std::string(" is not supported for add")).c_str());
+ }
+
+ return true;
+ }
+
+};
+
+struct _fn_utcnow : public base_function
+{
+
+ boost::posix_time::ptime now_ptime;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ int args_size = args->size();
+
+ if (args_size != 0)
+ {
+ throw base_s3select_exception("utcnow does not expect any parameters");
+ }
+
+ now_ptime = boost::posix_time::ptime( boost::posix_time::second_clock::universal_time());
+ result->set_value( &now_ptime );
+
+ return true;
+ }
+};
+
+struct _fn_substr : public base_function
+{
+
+ char buff[4096];// this buffer is persist for the query life time, it use for the results per row(only for the specific function call)
+ //it prevent from intensive use of malloc/free (fragmentation).
+ //should validate result length.
+ //TODO may replace by std::string (dynamic) , or to replace with global allocator , in query scope.
+ value v_str;
+ value v_from;
+ value v_to;
+
+ bool operator()(std::vector<base_statement*>* args, variable* result)
+ {
+ std::vector<base_statement*>::iterator iter = args->begin();
+ int args_size = args->size();
+
+
+ if (args_size<2)
+ {
+ throw base_s3select_exception("substr accept 2 arguments or 3");
+ }
+
+ base_statement* str = *iter;
+ iter++;
+ base_statement* from = *iter;
+ base_statement* to;
+
+ if (args_size == 3)
+ {
+ iter++;
+ to = *iter;
+ }
+
+ v_str = str->eval();
+
+ if(v_str.type != value::value_En_t::STRING)
+ {
+ throw base_s3select_exception("substr first argument must be string"); //can skip current row
+ }
+
+ int str_length = strlen(v_str.str());
+
+ v_from = from->eval();
+ if(v_from.is_string())
+ {
+ throw base_s3select_exception("substr second argument must be number"); //can skip current row
+ }
+
+ int64_t f;
+ int64_t t;
+
+ if (args_size==3)
+ {
+ v_to = to->eval();
+ if (v_to.is_string())
+ {
+ throw base_s3select_exception("substr third argument must be number"); //can skip row
+ }
+ }
+
+ if (v_from.type == value::value_En_t::FLOAT)
+ {
+ f=v_from.dbl();
+ }
+ else
+ {
+ f=v_from.i64();
+ }
+
+ if (f>str_length)
+ {
+ throw base_s3select_exception("substr start position is too far"); //can skip row
+ }
+
+ if (str_length>(int)sizeof(buff))
+ {
+ throw base_s3select_exception("string too long for internal buffer"); //can skip row
+ }
+
+ if (args_size == 3)
+ {
+ if (v_from.type == value::value_En_t::FLOAT)
+ {
+ t = v_to.dbl();
+ }
+ else
+ {
+ t = v_to.i64();
+ }
+
+ if( (str_length-(f-1)-t) <0)
+ {
+ throw base_s3select_exception("substr length parameter beyond bounderies"); //can skip row
+ }
+
+ strncpy(buff, v_str.str()+f-1, t);
+ }
+ else
+ {
+ strcpy(buff, v_str.str()+f-1);
+ }
+
+ result->set_value(buff);
+
+ return true;
+ }
+
+};
+
+base_function* s3select_functions::create(std::string fn_name)
+{
+ const FunctionLibrary::const_iterator iter = m_functions_library.find(fn_name);
+
+ if (iter == m_functions_library.end())
+ {
+ std::string msg;
+ msg = fn_name + " " + " function not found";
+ throw base_s3select_exception(msg, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ switch (iter->second)
+ {
+ case s3select_func_En_t::ADD:
+ return S3SELECT_NEW(_fn_add);
+ break;
+
+ case s3select_func_En_t::SUM:
+ return S3SELECT_NEW(_fn_sum);
+ break;
+
+ case s3select_func_En_t::COUNT:
+ return S3SELECT_NEW(_fn_count);
+ break;
+
+ case s3select_func_En_t::MIN:
+ return S3SELECT_NEW(_fn_min);
+ break;
+
+ case s3select_func_En_t::MAX:
+ return S3SELECT_NEW(_fn_max);
+ break;
+
+ case s3select_func_En_t::TO_INT:
+ return S3SELECT_NEW(_fn_to_int);
+ break;
+
+ case s3select_func_En_t::TO_FLOAT:
+ return S3SELECT_NEW(_fn_to_float);
+ break;
+
+ case s3select_func_En_t::SUBSTR:
+ return S3SELECT_NEW(_fn_substr);
+ break;
+
+ case s3select_func_En_t::TO_TIMESTAMP:
+ return S3SELECT_NEW(_fn_to_timestamp);
+ break;
+
+ case s3select_func_En_t::EXTRACT:
+ return S3SELECT_NEW(_fn_extact_from_timestamp);
+ break;
+
+ case s3select_func_En_t::DATE_ADD:
+ return S3SELECT_NEW(_fn_add_to_timestamp);
+ break;
+
+ case s3select_func_En_t::DATE_DIFF:
+ return S3SELECT_NEW(_fn_diff_timestamp);
+ break;
+
+ case s3select_func_En_t::UTCNOW:
+ return S3SELECT_NEW(_fn_utcnow);
+ break;
+
+ default:
+ throw base_s3select_exception("internal error while resolving function-name");
+ break;
+ }
+}
+
+bool base_statement::is_function()
+{
+ if (dynamic_cast<__function*>(this))
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+bool base_statement::is_aggregate_exist_in_expression(base_statement* e) //TODO obsolete ?
+{
+ if (e->is_aggregate())
+ {
+ return true;
+ }
+
+ if (e->left() && e->left()->is_aggregate_exist_in_expression(e->left()))
+ {
+ return true;
+ }
+
+ if (e->right() && e->right()->is_aggregate_exist_in_expression(e->right()))
+ {
+ return true;
+ }
+
+ if (e->is_function())
+ {
+ for (auto i : dynamic_cast<__function*>(e)->get_arguments())
+ if (e->is_aggregate_exist_in_expression(i))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+base_statement* base_statement::get_aggregate()
+{
+ //search for aggregation function in AST
+ base_statement* res = 0;
+
+ if (is_aggregate())
+ {
+ return this;
+ }
+
+ if (left() && (res=left()->get_aggregate())!=0)
+ {
+ return res;
+ }
+
+ if (right() && (res=right()->get_aggregate())!=0)
+ {
+ return res;
+ }
+
+ if (is_function())
+ {
+ for (auto i : dynamic_cast<__function*>(this)->get_arguments())
+ {
+ base_statement* b=i->get_aggregate();
+ if (b)
+ {
+ return b;
+ }
+ }
+ }
+ return 0;
+}
+
+bool base_statement::is_nested_aggregate(base_statement* e)
+{
+ //validate for non nested calls for aggregation function, i.e. sum ( min ( ))
+ if (e->is_aggregate())
+ {
+ if (e->left())
+ {
+ if (e->left()->is_aggregate_exist_in_expression(e->left()))
+ {
+ return true;
+ }
+ }
+ else if (e->right())
+ {
+ if (e->right()->is_aggregate_exist_in_expression(e->right()))
+ {
+ return true;
+ }
+ }
+ else if (e->is_function())
+ {
+ for (auto i : dynamic_cast<__function*>(e)->get_arguments())
+ {
+ if (i->is_aggregate_exist_in_expression(i))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ return false;
+}
+
+// select sum(c2) ... + c1 ... is not allowed. a binary operation with scalar is OK. i.e. select sum() + 1
+bool base_statement::is_binop_aggregate_and_column(base_statement* skip_expression)
+{
+ if (left() && left() != skip_expression) //can traverse to left
+ {
+ if (left()->is_column())
+ {
+ return true;
+ }
+ else if (left()->is_binop_aggregate_and_column(skip_expression) == true)
+ {
+ return true;
+ }
+ }
+
+ if (right() && right() != skip_expression) //can traverse right
+ {
+ if (right()->is_column())
+ {
+ return true;
+ }
+ else if (right()->is_binop_aggregate_and_column(skip_expression) == true)
+ {
+ return true;
+ }
+ }
+
+ if (this != skip_expression && is_function())
+ {
+
+ __function* f = (dynamic_cast<__function*>(this));
+ std::vector<base_statement*> l = f->get_arguments();
+ for (auto i : l)
+ {
+ if (i!=skip_expression && i->is_column())
+ {
+ return true;
+ }
+ if (i->is_binop_aggregate_and_column(skip_expression) == true)
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+} //namespace s3selectEngine
+
+#endif
diff --git a/src/s3select/include/s3select_oper.h b/src/s3select/include/s3select_oper.h
new file mode 100644
index 000000000..591e5f9da
--- /dev/null
+++ b/src/s3select/include/s3select_oper.h
@@ -0,0 +1,1320 @@
+#ifndef __S3SELECT_OPER__
+#define __S3SELECT_OPER__
+
+#include <string>
+#include <iostream>
+#include <list>
+#include <map>
+#include <vector>
+#include <string.h>
+#include <math.h>
+
+#include <boost/lexical_cast.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/bind.hpp>
+namespace bsc = BOOST_SPIRIT_CLASSIC_NS;
+
+namespace s3selectEngine
+{
+
+class base_s3select_exception
+{
+
+public:
+ enum class s3select_exp_en_t
+ {
+ NONE,
+ ERROR,
+ FATAL
+ } ;
+
+private:
+ s3select_exp_en_t m_severity;
+
+public:
+ std::string _msg;
+ base_s3select_exception(const char* n) : m_severity(s3select_exp_en_t::NONE)
+ {
+ _msg.assign(n);
+ }
+ base_s3select_exception(const char* n, s3select_exp_en_t severity) : m_severity(severity)
+ {
+ _msg.assign(n);
+ }
+ base_s3select_exception(std::string n, s3select_exp_en_t severity) : m_severity(severity)
+ {
+ _msg = n;
+ }
+
+ virtual const char* what()
+ {
+ return _msg.c_str();
+ }
+
+ s3select_exp_en_t severity()
+ {
+ return m_severity;
+ }
+
+ virtual ~base_s3select_exception() {}
+};
+
+
+// pointer to dynamic allocated buffer , which used for placement new.
+static __thread char* _s3select_buff_ptr =0;
+
+class s3select_allocator //s3select is the "owner"
+{
+private:
+
+ std::vector<char*> list_of_buff;
+ u_int32_t m_idx;
+
+public:
+#define __S3_ALLOCATION_BUFF__ (8*1024)
+ s3select_allocator():m_idx(0)
+ {
+ list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__));
+ }
+
+ void set_global_buff()
+ {
+ char* buff = list_of_buff.back();
+ _s3select_buff_ptr = &buff[ m_idx ];
+ }
+
+ void check_capacity(size_t sz)
+ {
+ if (sz>__S3_ALLOCATION_BUFF__)
+ {
+ throw base_s3select_exception("requested size too big", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if ((m_idx + sz) >= __S3_ALLOCATION_BUFF__)
+ {
+ list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__));
+ m_idx = 0;
+ }
+ }
+
+ void inc(size_t sz)
+ {
+ m_idx += sz;
+ m_idx += sizeof(char*) - (m_idx % sizeof(char*)); //alignment
+ }
+
+ void zero()
+ {
+ //not a must, its for safty.
+ _s3select_buff_ptr=0;
+ }
+
+ virtual ~s3select_allocator()
+ {
+ for(auto b : list_of_buff)
+ {
+ free(b);
+ }
+ }
+};
+
+class __clt_allocator
+{
+public:
+ s3select_allocator* m_s3select_allocator;
+
+public:
+
+ __clt_allocator():m_s3select_allocator(0) {}
+
+ void set(s3select_allocator* a)
+ {
+ m_s3select_allocator = a;
+ }
+};
+
+// placement new for allocation of all s3select objects on single(or few) buffers, deallocation of those objects is by releasing the buffer.
+#define S3SELECT_NEW( type , ... ) [=]() \
+ { \
+ m_s3select_allocator->check_capacity(sizeof( type )); \
+ m_s3select_allocator->set_global_buff(); \
+ auto res=new (_s3select_buff_ptr) type(__VA_ARGS__); \
+ m_s3select_allocator->inc(sizeof( type )); \
+ m_s3select_allocator->zero(); \
+ return res; \
+ }();
+
+class scratch_area
+{
+
+private:
+ std::vector<std::string_view> m_columns{128};
+ int m_upper_bound;
+
+ std::vector<std::pair<std::string, int >> m_column_name_pos;
+
+public:
+
+ void set_column_pos(const char* n, int pos)//TODO use std::string
+ {
+ m_column_name_pos.push_back( std::pair<const char*, int>(n, pos));
+ }
+
+ void update(std::vector<char*> tokens, size_t num_of_tokens)
+ {
+ size_t i=0;
+ for(auto s : tokens)
+ {
+ if (i>=num_of_tokens)
+ {
+ break;
+ }
+
+ m_columns[i++] = s;
+ }
+ m_upper_bound = i;
+
+ }
+
+ int get_column_pos(const char* n)
+ {
+ //done only upon building the AST , not on "runtime"
+
+ std::vector<std::pair<std::string, int >>::iterator iter;
+
+ for( auto iter : m_column_name_pos)
+ {
+ if (!strcmp(iter.first.c_str(), n))
+ {
+ return iter.second;
+ }
+ }
+
+ return -1;
+ }
+
+ std::string_view get_column_value(int column_pos)
+ {
+
+ if ((column_pos >= m_upper_bound) || column_pos < 0)
+ {
+ throw base_s3select_exception("column_position_is_wrong", base_s3select_exception::s3select_exp_en_t::ERROR);
+ }
+
+ return m_columns[column_pos];
+ }
+
+ int get_num_of_columns()
+ {
+ return m_upper_bound;
+ }
+};
+
+class base_statement;
+class projection_alias
+{
+//purpose: mapping between alias-name to base_statement*
+//those routines are *NOT* intensive, works once per query parse time.
+
+private:
+ std::vector< std::pair<std::string, base_statement*> > alias_map;
+
+public:
+ std::vector< std::pair<std::string, base_statement*> >* get()
+ {
+ return &alias_map;
+ }
+
+ bool insert_new_entry(std::string alias_name, base_statement* bs)
+ {
+ //purpose: only unique alias names.
+
+ for(auto alias: alias_map)
+ {
+ if(alias.first.compare(alias_name) == 0)
+ {
+ return false; //alias name already exist
+ }
+
+ }
+ std::pair<std::string, base_statement*> new_alias(alias_name, bs);
+ alias_map.push_back(new_alias);
+
+ return true;
+ }
+
+ base_statement* search_alias(std::string alias_name)
+ {
+ for(auto alias: alias_map)
+ {
+ if(alias.first.compare(alias_name) == 0)
+ {
+ return alias.second; //refernce to execution node
+ }
+ }
+ return 0;
+ }
+};
+
+struct binop_plus
+{
+ double operator()(double a, double b)
+ {
+ return a+b;
+ }
+};
+
+struct binop_minus
+{
+ double operator()(double a, double b)
+ {
+ return a-b;
+ }
+};
+
+struct binop_mult
+{
+ double operator()(double a, double b)
+ {
+ return a * b;
+ }
+};
+
+struct binop_div
+{
+ double operator()(double a, double b)
+ {
+ return a / b;
+ }
+};
+
+struct binop_pow
+{
+ double operator()(double a, double b)
+ {
+ return pow(a, b);
+ }
+};
+
+class value
+{
+
+public:
+ typedef union
+ {
+ int64_t num;
+ char* str;//TODO consider string_view
+ double dbl;
+ boost::posix_time::ptime* timestamp;
+ } value_t;
+
+private:
+ value_t __val;
+ std::string m_to_string;
+ std::string m_str_value;
+
+public:
+ enum class value_En_t
+ {
+ DECIMAL,
+ FLOAT,
+ STRING,
+ TIMESTAMP,
+ NA
+ } ;
+ value_En_t type;
+
+ value(int64_t n) : type(value_En_t::DECIMAL)
+ {
+ __val.num = n;
+ }
+ value(int n) : type(value_En_t::DECIMAL)
+ {
+ __val.num = n;
+ }
+ value(bool b) : type(value_En_t::DECIMAL)
+ {
+ __val.num = (int64_t)b;
+ }
+ value(double d) : type(value_En_t::FLOAT)
+ {
+ __val.dbl = d;
+ }
+ value(boost::posix_time::ptime* timestamp) : type(value_En_t::TIMESTAMP)
+ {
+ __val.timestamp = timestamp;
+ }
+
+ value(const char* s) : type(value_En_t::STRING)
+ {
+ m_str_value.assign(s);
+ __val.str = m_str_value.data();
+ }
+
+ value():type(value_En_t::NA)
+ {
+ __val.num=0;
+ }
+
+ bool is_number() const
+ {
+ if ((type == value_En_t::DECIMAL || type == value_En_t::FLOAT))
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ bool is_string() const
+ {
+ return type == value_En_t::STRING;
+ }
+ bool is_timestamp() const
+ {
+ return type == value_En_t::TIMESTAMP;
+ }
+
+
+ std::string& to_string() //TODO very intensive , must improve this
+ {
+
+ if (type != value_En_t::STRING)
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ m_to_string.assign( boost::lexical_cast<std::string>(__val.num) );
+ }
+ else if(type == value_En_t::FLOAT)
+ {
+ m_to_string = boost::lexical_cast<std::string>(__val.dbl);
+ }
+ else
+ {
+ m_to_string = to_simple_string( *__val.timestamp );
+ }
+ }
+ else
+ {
+ m_to_string.assign( __val.str );
+ }
+
+ return m_to_string;
+ }
+
+
+ value& operator=(value& o)
+ {
+ if(this->type == value_En_t::STRING)
+ {
+ m_str_value.assign(o.str());
+ __val.str = m_str_value.data();
+ }
+ else
+ {
+ this->__val = o.__val;
+ }
+
+ this->type = o.type;
+
+ return *this;
+ }
+
+ value& operator=(const char* s)
+ {
+ m_str_value.assign(s);
+ this->__val.str = m_str_value.data();
+ this->type = value_En_t::STRING;
+
+ return *this;
+ }
+
+ value& operator=(int64_t i)
+ {
+ this->__val.num = i;
+ this->type = value_En_t::DECIMAL;
+
+ return *this;
+ }
+
+ value& operator=(double d)
+ {
+ this->__val.dbl = d;
+ this->type = value_En_t::FLOAT;
+
+ return *this;
+ }
+
+ value& operator=(bool b)
+ {
+ this->__val.num = (int64_t)b;
+ this->type = value_En_t::DECIMAL;
+
+ return *this;
+ }
+
+ value& operator=(boost::posix_time::ptime* p)
+ {
+ this->__val.timestamp = p;
+ this->type = value_En_t::TIMESTAMP;
+
+ return *this;
+ }
+
+ int64_t i64()
+ {
+ return __val.num;
+ }
+
+ const char* str()
+ {
+ return __val.str;
+ }
+
+ double dbl()
+ {
+ return __val.dbl;
+ }
+
+ boost::posix_time::ptime* timestamp() const
+ {
+ return __val.timestamp;
+ }
+
+ bool operator<(const value& v)//basic compare operator , most itensive runtime operation
+ {
+ //TODO NA possible?
+ if (is_string() && v.is_string())
+ {
+ return strcmp(__val.str, v.__val.str) < 0;
+ }
+
+ if (is_number() && v.is_number())
+ {
+
+ if(type != v.type) //conversion //TODO find better way
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ return (double)__val.num < v.__val.dbl;
+ }
+ else
+ {
+ return __val.dbl < (double)v.__val.num;
+ }
+ }
+ else //no conversion
+ {
+ if(type == value_En_t::DECIMAL)
+ {
+ return __val.num < v.__val.num;
+ }
+ else
+ {
+ return __val.dbl < v.__val.dbl;
+ }
+
+ }
+ }
+
+ if(is_timestamp() && v.is_timestamp())
+ {
+ return *timestamp() < *(v.timestamp());
+ }
+
+ throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
+ }
+
+ bool operator>(const value& v) //basic compare operator , most itensive runtime operation
+ {
+ //TODO NA possible?
+ if (is_string() && v.is_string())
+ {
+ return strcmp(__val.str, v.__val.str) > 0;
+ }
+
+ if (is_number() && v.is_number())
+ {
+
+ if(type != v.type) //conversion //TODO find better way
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ return (double)__val.num > v.__val.dbl;
+ }
+ else
+ {
+ return __val.dbl > (double)v.__val.num;
+ }
+ }
+ else //no conversion
+ {
+ if(type == value_En_t::DECIMAL)
+ {
+ return __val.num > v.__val.num;
+ }
+ else
+ {
+ return __val.dbl > v.__val.dbl;
+ }
+
+ }
+ }
+
+ if(is_timestamp() && v.is_timestamp())
+ {
+ return *timestamp() > *(v.timestamp());
+ }
+
+ throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
+ }
+
+ bool operator==(const value& v) //basic compare operator , most itensive runtime operation
+ {
+ //TODO NA possible?
+ if (is_string() && v.is_string())
+ {
+ return strcmp(__val.str, v.__val.str) == 0;
+ }
+
+
+ if (is_number() && v.is_number())
+ {
+
+ if(type != v.type) //conversion //TODO find better way
+ {
+ if (type == value_En_t::DECIMAL)
+ {
+ return (double)__val.num == v.__val.dbl;
+ }
+ else
+ {
+ return __val.dbl == (double)v.__val.num;
+ }
+ }
+ else //no conversion
+ {
+ if(type == value_En_t::DECIMAL)
+ {
+ return __val.num == v.__val.num;
+ }
+ else
+ {
+ return __val.dbl == v.__val.dbl;
+ }
+
+ }
+ }
+
+ if(is_timestamp() && v.is_timestamp())
+ {
+ return *timestamp() == *(v.timestamp());
+ }
+
+ throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
+ }
+ bool operator<=(const value& v)
+ {
+ return !(*this>v);
+ }
+ bool operator>=(const value& v)
+ {
+ return !(*this<v);
+ }
+ bool operator!=(const value& v)
+ {
+ return !(*this == v);
+ }
+
+ template<typename binop> //conversion rules for arithmetical binary operations
+ value& compute(value& l, const value& r) //left should be this, it contain the result
+ {
+ binop __op;
+
+ if (l.is_string() || r.is_string())
+ {
+ throw base_s3select_exception("illegal binary operation with string");
+ }
+
+ if (l.type != r.type)
+ {
+ //conversion
+
+ if (l.type == value_En_t::DECIMAL)
+ {
+ l.__val.dbl = __op((double)l.__val.num, r.__val.dbl);
+ l.type = value_En_t::FLOAT;
+ }
+ else
+ {
+ l.__val.dbl = __op(l.__val.dbl, (double)r.__val.num);
+ l.type = value_En_t::FLOAT;
+ }
+ }
+ else
+ {
+ //no conversion
+
+ if (l.type == value_En_t::DECIMAL)
+ {
+ l.__val.num = __op(l.__val.num, r.__val.num );
+ l.type = value_En_t::DECIMAL;
+ }
+ else
+ {
+ l.__val.dbl = __op(l.__val.dbl, r.__val.dbl );
+ l.type = value_En_t::FLOAT;
+ }
+ }
+
+ return l;
+ }
+
+ value& operator+(const value& v)
+ {
+ return compute<binop_plus>(*this, v);
+ }
+
+ value& operator-(const value& v)
+ {
+ return compute<binop_minus>(*this, v);
+ }
+
+ value& operator*(const value& v)
+ {
+ return compute<binop_mult>(*this, v);
+ }
+
+ value& operator/(const value& v) // TODO handle division by zero
+ {
+ return compute<binop_div>(*this, v);
+ }
+
+ value& operator^(const value& v)
+ {
+ return compute<binop_pow>(*this, v);
+ }
+
+};
+
+class base_statement
+{
+
+protected:
+
+ scratch_area* m_scratch;
+ projection_alias* m_aliases;
+ bool is_last_call; //valid only for aggregation functions
+ bool m_is_cache_result;
+ value m_alias_result;
+ base_statement* m_projection_alias;
+ int m_eval_stack_depth;
+
+public:
+ base_statement():m_scratch(0), is_last_call(false), m_is_cache_result(false), m_projection_alias(0), m_eval_stack_depth(0) {}
+ virtual value& eval() =0;
+ virtual base_statement* left()
+ {
+ return 0;
+ }
+ virtual base_statement* right()
+ {
+ return 0;
+ }
+ virtual std::string print(int ident) =0;//TODO complete it, one option to use level parametr in interface ,
+ virtual bool semantic() =0;//done once , post syntax , traverse all nodes and validate semantics.
+
+ virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa)
+ {
+ m_scratch = sa;
+ m_aliases = pa;
+ if (left())
+ {
+ left()->traverse_and_apply(m_scratch, m_aliases);
+ }
+ if (right())
+ {
+ right()->traverse_and_apply(m_scratch, m_aliases);
+ }
+ }
+
+ virtual bool is_aggregate()
+ {
+ return false;
+ }
+ virtual bool is_column()
+ {
+ return false;
+ }
+
+ bool is_function();
+ bool is_aggregate_exist_in_expression(base_statement* e);//TODO obsolete ?
+ base_statement* get_aggregate();
+ bool is_nested_aggregate(base_statement* e);
+ bool is_binop_aggregate_and_column(base_statement* skip);
+
+ virtual void set_last_call()
+ {
+ is_last_call = true;
+ if(left())
+ {
+ left()->set_last_call();
+ }
+ if(right())
+ {
+ right()->set_last_call();
+ }
+ }
+
+ bool is_set_last_call()
+ {
+ return is_last_call;
+ }
+
+ void invalidate_cache_result()
+ {
+ m_is_cache_result = false;
+ }
+
+ bool is_result_cached()
+ {
+ return m_is_cache_result == true;
+ }
+
+ void set_result_cache(value& eval_result)
+ {
+ m_alias_result = eval_result;
+ m_is_cache_result = true;
+ }
+
+ void dec_call_stack_depth()
+ {
+ m_eval_stack_depth --;
+ }
+
+ value& get_result_cache()
+ {
+ return m_alias_result;
+ }
+
+ int& get_eval_call_depth()
+ {
+ m_eval_stack_depth++;
+ return m_eval_stack_depth;
+ }
+
+ virtual ~base_statement() {}
+
+};
+
+class variable : public base_statement
+{
+
+public:
+
+ enum class var_t
+ {
+ NA,
+ VAR,//schema column (i.e. age , price , ...)
+ COL_VALUE, //concrete value
+ POS, // CSV column number (i.e. _1 , _2 ... )
+ STAR_OPERATION, //'*'
+ } ;
+ var_t m_var_type;
+
+private:
+
+ std::string _name;
+ int column_pos;
+ value var_value;
+ std::string m_star_op_result;
+ char m_star_op_result_charc[4096]; //TODO should be dynamic
+
+ const int undefined_column_pos = -1;
+ const int column_alias = -2;
+
+public:
+ variable():m_var_type(var_t::NA), _name(""), column_pos(-1) {}
+
+ variable(int64_t i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
+
+ variable(double d) : m_var_type(var_t::COL_VALUE), _name("#"), column_pos(-1), var_value(d) {}
+
+ variable(int i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
+
+ variable(const std::string& n) : m_var_type(var_t::VAR), _name(n), column_pos(-1) {}
+
+ variable(const std::string& n, var_t tp) : m_var_type(var_t::NA)
+ {
+ if(tp == variable::var_t::POS)
+ {
+ _name = n;
+ m_var_type = tp;
+ int pos = atoi( n.c_str() + 1 ); //TODO >0 < (schema definition , semantic analysis)
+ column_pos = pos -1;// _1 is the first column ( zero position )
+ }
+ else if (tp == variable::var_t::COL_VALUE)
+ {
+ _name = "#";
+ m_var_type = tp;
+ column_pos = -1;
+ var_value = n.c_str();
+ }
+ else if (tp ==variable::var_t::STAR_OPERATION)
+ {
+ _name = "#";
+ m_var_type = tp;
+ column_pos = -1;
+ }
+ }
+
+ void operator=(value& v)
+ {
+ var_value = v;
+ }
+
+ void set_value(const char* s)
+ {
+ var_value = s;
+ }
+
+ void set_value(double d)
+ {
+ var_value = d;
+ }
+
+ void set_value(int64_t i)
+ {
+ var_value = i;
+ }
+
+ void set_value(boost::posix_time::ptime* p)
+ {
+ var_value = p;
+ }
+
+ virtual ~variable() {}
+
+ virtual bool is_column() //is reference to column.
+ {
+ if(m_var_type == var_t::VAR || m_var_type == var_t::POS)
+ {
+ return true;
+ }
+ return false;
+ }
+
+ value& get_value()
+ {
+ return var_value; //TODO is it correct
+ }
+ virtual value::value_En_t get_value_type()
+ {
+ return var_value.type;
+ }
+
+
+ value& star_operation() //purpose return content of all columns in a input stream
+ {
+
+
+ int i;
+ size_t pos=0;
+ int num_of_columns = m_scratch->get_num_of_columns();
+ for(i=0; i<num_of_columns-1; i++)
+ {
+ size_t len = m_scratch->get_column_value(i).size();
+ if((pos+len)>sizeof(m_star_op_result_charc))
+ {
+ throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);
+ pos += len;
+ m_star_op_result_charc[ pos ] = ',';//TODO need for another abstraction (per file type)
+ pos ++;
+
+ }
+
+ size_t len = m_scratch->get_column_value(i).size();
+ if((pos+len)>sizeof(m_star_op_result_charc))
+ {
+ throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);
+ m_star_op_result_charc[ pos + len ] = 0;
+ var_value = (char*)&m_star_op_result_charc[0];
+ return var_value;
+ }
+
+ virtual value& eval()
+ {
+ if (m_var_type == var_t::COL_VALUE)
+ {
+ return var_value; // a literal,could be deciml / float / string
+ }
+ else if(m_var_type == var_t::STAR_OPERATION)
+ {
+ return star_operation();
+ }
+ else if (column_pos == undefined_column_pos)
+ {
+ //done once , for the first time
+ column_pos = m_scratch->get_column_pos(_name.c_str());
+
+ if(column_pos>=0 && m_aliases->search_alias(_name.c_str()))
+ {
+ throw base_s3select_exception(std::string("multiple definition of column {") + _name + "} as schema-column and alias", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+
+ if (column_pos == undefined_column_pos)
+ {
+ //not belong to schema , should exist in aliases
+ m_projection_alias = m_aliases->search_alias(_name.c_str());
+
+ //not enter this scope again
+ column_pos = column_alias;
+ if(m_projection_alias == 0)
+ {
+ throw base_s3select_exception(std::string("alias {")+_name+std::string("} or column not exist in schema"), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ }
+
+ }
+
+ if (m_projection_alias)
+ {
+ if (m_projection_alias->get_eval_call_depth()>2)
+ {
+ throw base_s3select_exception("number of calls exceed maximum size, probably a cyclic reference to alias", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
+ if (m_projection_alias->is_result_cached() == false)
+ {
+ var_value = m_projection_alias->eval();
+ m_projection_alias->set_result_cache(var_value);
+ }
+ else
+ {
+ var_value = m_projection_alias->get_result_cache();
+ }
+
+ m_projection_alias->dec_call_stack_depth();
+ }
+ else
+ {
+ var_value = (char*)m_scratch->get_column_value(column_pos).data(); //no allocation. returning pointer of allocated space
+ }
+
+ return var_value;
+ }
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident,' ') + std::string("var:") + std::to_string(var_value.__val.num);
+ //return out;
+ return std::string("#");//TBD
+ }
+
+ virtual bool semantic()
+ {
+ return false;
+ }
+
+};
+
+class arithmetic_operand : public base_statement
+{
+
+public:
+
+ enum class cmp_t {NA, EQ, LE, LT, GT, GE, NE} ;
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ cmp_t _cmp;
+ value var_value;
+
+public:
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident,' ') + "compare:" += std::to_string(_cmp) + "\n" + l->print(ident-5) +r->print(ident+5);
+ //return out;
+ return std::string("#");//TBD
+ }
+
+ virtual value& eval()
+ {
+
+ switch (_cmp)
+ {
+ case cmp_t::EQ:
+ return var_value = (l->eval() == r->eval());
+ break;
+
+ case cmp_t::LE:
+ return var_value = (l->eval() <= r->eval());
+ break;
+
+ case cmp_t::GE:
+ return var_value = (l->eval() >= r->eval());
+ break;
+
+ case cmp_t::NE:
+ return var_value = (l->eval() != r->eval());
+ break;
+
+ case cmp_t::GT:
+ return var_value = (l->eval() > r->eval());
+ break;
+
+ case cmp_t::LT:
+ return var_value = (l->eval() < r->eval());
+ break;
+
+ default:
+ throw base_s3select_exception("internal error");
+ break;
+ }
+ }
+
+ arithmetic_operand(base_statement* _l, cmp_t c, base_statement* _r):l(_l), r(_r), _cmp(c) {}
+
+ virtual ~arithmetic_operand() {}
+};
+
+class logical_operand : public base_statement
+{
+
+public:
+
+ enum class oplog_t {AND, OR, NA};
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ oplog_t _oplog;
+ value var_value;
+
+public:
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ logical_operand(base_statement* _l, oplog_t _o, base_statement* _r):l(_l), r(_r), _oplog(_o) {}
+
+ virtual ~logical_operand() {}
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident, ' ') + "logical_operand:" += std::to_string(_oplog) + "\n" + l->print(ident - 5) + r->print(ident + 5);
+ //return out;
+ return std::string("#");//TBD
+ }
+ virtual value& eval()
+ {
+ if (_oplog == oplog_t::AND)
+ {
+ if (!l || !r)
+ {
+ throw base_s3select_exception("missing operand for logical and", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ return var_value = (l->eval().i64() && r->eval().i64());
+ }
+ else
+ {
+ if (!l || !r)
+ {
+ throw base_s3select_exception("missing operand for logical or", base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ return var_value = (l->eval().i64() || r->eval().i64());
+ }
+ }
+
+};
+
+class mulldiv_operation : public base_statement
+{
+
+public:
+
+ enum class muldiv_t {NA, MULL, DIV, POW} ;
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ muldiv_t _mulldiv;
+ value var_value;
+
+public:
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident, ' ') + "mulldiv_operation:" += std::to_string(_mulldiv) + "\n" + l->print(ident - 5) + r->print(ident + 5);
+ //return out;
+ return std::string("#");//TBD
+ }
+
+ virtual value& eval()
+ {
+ switch (_mulldiv)
+ {
+ case muldiv_t::MULL:
+ return var_value = l->eval() * r->eval();
+ break;
+
+ case muldiv_t::DIV:
+ return var_value = l->eval() / r->eval();
+ break;
+
+ case muldiv_t::POW:
+ return var_value = l->eval() ^ r->eval();
+ break;
+
+ default:
+ throw base_s3select_exception("internal error");
+ break;
+ }
+ }
+
+ mulldiv_operation(base_statement* _l, muldiv_t c, base_statement* _r):l(_l), r(_r), _mulldiv(c) {}
+
+ virtual ~mulldiv_operation() {}
+};
+
+class addsub_operation : public base_statement
+{
+
+public:
+
+ enum class addsub_op_t {ADD, SUB, NA};
+
+private:
+ base_statement* l;
+ base_statement* r;
+
+ addsub_op_t _op;
+ value var_value;
+
+public:
+
+ virtual base_statement* left()
+ {
+ return l;
+ }
+ virtual base_statement* right()
+ {
+ return r;
+ }
+
+ virtual bool semantic()
+ {
+ return true;
+ }
+
+ addsub_operation(base_statement* _l, addsub_op_t _o, base_statement* _r):l(_l), r(_r), _op(_o) {}
+
+ virtual ~addsub_operation() {}
+
+ virtual std::string print(int ident)
+ {
+ //std::string out = std::string(ident, ' ') + "addsub_operation:" += std::to_string(_op) + "\n" + l->print(ident - 5) + r->print(ident + 5);
+ return std::string("#");//TBD
+ }
+
+ virtual value& eval()
+ {
+ if (_op == addsub_op_t::NA) // -num , +num , unary-operation on number
+ {
+ if (l)
+ {
+ return var_value = l->eval();
+ }
+ else if (r)
+ {
+ return var_value = r->eval();
+ }
+ }
+ else if (_op == addsub_op_t::ADD)
+ {
+ return var_value = (l->eval() + r->eval());
+ }
+ else
+ {
+ return var_value = (l->eval() - r->eval());
+ }
+
+ return var_value;
+ }
+};
+
+class base_function
+{
+
+protected:
+ bool aggregate;
+
+public:
+ //TODO add semantic to base-function , it operate once on function creation
+ // validate semantic on creation instead on run-time
+ virtual bool operator()(std::vector<base_statement*>* args, variable* result) = 0;
+ base_function() : aggregate(false) {}
+ bool is_aggregate()
+ {
+ return aggregate == true;
+ }
+ virtual void get_aggregate_result(variable*) {}
+
+ virtual ~base_function() {}
+};
+
+
+};//namespace
+
+#endif
diff --git a/src/s3select/s3select-parse-s.png b/src/s3select/s3select-parse-s.png
new file mode 100644
index 000000000..047f87ed6
--- /dev/null
+++ b/src/s3select/s3select-parse-s.png
Binary files differ
diff --git a/src/s3select/s3select.rst b/src/s3select/s3select.rst
new file mode 100644
index 000000000..fbe4dfe93
--- /dev/null
+++ b/src/s3select/s3select.rst
@@ -0,0 +1,259 @@
+===============
+ Ceph s3 select
+===============
+
+.. contents::
+
+Overview
+--------
+
+ | The purpose of **s3 select** engine is to create an efficient pipe between user client to storage node (the engine should be close as possible to storage).
+ | It enables the user to define the exact portion of data should be received by his side.
+ | It also enables for higher level analytic-applications (such as SPARK-SQL) , using that feature to improve their latency and throughput.
+
+ | For example, a s3-object of several GB (CSV file), a user needs to extract a single column which filtered by another column.
+ | As the following query:
+ | ``select customer-id from s3Object where age>30 and age<65;``
+
+ | Currently the whole s3-object must retrieve from OSD via RGW before filtering and extracting data.
+ | By "pushing down" the query into OSD , it's possible to save a lot of network and CPU(serialization / deserialization).
+
+ | **The bigger the object, and the more accurate the query, the better the performance**.
+
+Basic workflow
+--------------
+
+ | S3-select query is sent to RGW via `AWS-CLI <https://docs.aws.amazon.com/cli/latest/reference/s3api/select-object-content.html>`_
+
+ | It passes the authentication and permission process as an incoming message (POST).
+ | **RGWSelectObj_ObjStore_S3::send_response_data** is the “entry point”, it handles each fetched chunk according to input object-key.
+ | **send_response_data** is first handling the input query, it extracts the query and other CLI parameters.
+
+ | Per each new fetched chunk (~4m), it runs the s3-select query on that chunk.
+ | The current implementation supports CSV objects and since chunks are randomly “cutting” the CSV rows in the middle, those broken-lines (first or last per chunk) are skipped while processing the query.
+ | Those “broken” lines are stored and later merged with the next broken-line (belong to the next chunk), and finally processed.
+
+ | Per each processed chunk an output message is formatted according to AWS specification and sent back to the client.
+ | For aggregation queries the last chunk should be identified as the end of input, following that the s3-select-engine initiates end-of-process and produces an aggregate result.
+
+Design Concepts
+---------------
+
+AST- Abstract Syntax Tree
+~~~~~~~~~~~~~~~~~~~~~~~~~
+ | The s3-select main flow is initiated with parsing of input-string (i.e user query), and follows
+ | with building an AST (abstract-syntax-tree) as a result.
+ | The execution phase is built upon the AST.
+
+ | ``Base_statement`` is the base for the all object-nodes participating in the execution phase, it consists of the ``eval()`` method which returns the <value> object.
+
+ | ``value`` object is handling the known basic-types such as int,string,float,time-stamp
+ | It is able to operate comparison and basic arithmetic operations on mentioned types.
+
+ | The execution-flow is actually calling the ``eval()`` method on the root-node (per each projection), it goes all the way down, and returns the actual result (``value`` object) from bottom node to root node(all the way up) .
+
+ | **Alias** programming-construct is an essential part of s3-select language, it enables much better programming especially with objects containing many columns or in the case of complex queries.
+
+ | Upon parsing the statement containing alias construct, it replaces alias with reference to the correct AST-node, on runtime the node is simply evaluated as any other node.
+
+ | There is a risk that self(or cyclic) reference may occur causing stack-overflow(endless-loop), for that concern upon evaluating an alias, it is validated for cyclic reference.
+
+ | Alias also maintains result-cache, meaning upon using the same alias more than once, it’s not evaluating the same node again(it will return the same result),instead it uses the result from cache.
+
+ | Of Course, per each new row the cache is invalidated.
+
+
+S3 select parser definition
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ | The implementation of s3-select uses the `boost::spirit <https://www.boost.org/doc/libs/1_71_0/libs/spirit/classic/doc/grammar.html>`_ the definition of s3-select command is according to AWS.
+
+ | Upon parsing is initiated on input text, and a specific rule is identified, an action which is bound to that rule is executed.
+ | Those actions are building the AST, each action is unique (as its rule), at the end of the process it forms a structure similar to a tree.
+
+ | As mentioned, running eval() on the root node, execute the s3-select statement (per projection).
+ | The input stream is accessible to the execution tree, by the scratch-area object, that object is constantly updated per each new row.
+
+Basic functionalities
+~~~~~~~~~~~~~~~~~~~~~
+
+ | **S3select** has a definite set of functionalities that should be implemented (if we wish to stay compliant with AWS), currently only a portion of it is implemented.
+
+ | The implemented software architecture supports basic arithmetic expressions, logical and compare expressions, including nested function calls and casting operators, that alone enables the user reasonable flexibility.
+ | review the bellow feature-table_.
+
+
+
+Memory handling
+~~~~~~~~~~~~~~~
+
+ | S3select structures and objects are lockless and thread-safe, it uses placement-new in order to reduce the alloc/dealloc intensive cycles, which may impact the main process hosting s3-select.
+
+ | Once AST is built there is no need to allocate memory for the execution itself, the AST is “static” for the query-execution life-cycle.
+
+ | The execution itself is stream-oriented, meaning there is no pre-allocation before execution, object size has no impact on memory consumption.
+
+ | It processes chunk after chunk, row after row, all memory needed for processing resides on AST.
+
+ | The AST is similar to stack behaviour in that it consumes already allocated memory and “releases” it upon completing its task.
+
+S3 Object different types
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ | The processing of input stream is decoupled from s3-select-engine, meaning , each input-type should have its own parser, converting s3-object into columns.
+
+ | Current implementation includes only CSV reader; its parsing definitions are according to AWS.
+ | The parser is implemented using `boost::state-machine <https://www.boost.org/doc/libs/1_64_0/libs/msm/doc/HTML/index.html>`_.
+
+ | The CSV parser handles NULL,quote,escape rules,field delimiter,row delimiter and users may define (via AWS CLI) all of those dynamically.
+
+Error Handling
+~~~~~~~~~~~~~~
+ | S3-select statement may be syntactically correct but semantically wrong, for one example ``select a * b from …`` , where a is number and b is a string.
+ | Current implementation is for CSV file types, CSV has no schema, column-types may evaluate on runtime.
+ | The above means that wrong semantic statements may occur on runtime.
+
+ | As for syntax error ``select x frm stdin;`` , the builtin parser fails on first miss-match to language definition, and produces an error message back to client (AWS-CLI).
+ | The error message is point on location of miss-match.
+
+ | Fatal severity (attached to the exception) will end execution immediately, other error severity are counted, upon reaching 100, it ends execution with an error message.
+
+
+AST denostration
+~~~~~~~~~~~~~~~~
+.. ditaa::
+
+ +---------------------+
+ | select |
+ +------ +---------------------+---------+
+ | | |
+ | | |
+ | | |
+ | V |
+ | +--------------------+ |
+ | | s3object | |
+ | +--------------------+ |
+ | |
+ V V
+ +---------------------+ +-------------+
+ | projections | | where |
+ +---------------------+ +-------------+
+ | | |
+ | | |
+ | | |
+ | | |
+ | | |
+ | | |
+ V V V
+ +-----------+ +-----------+ +-------------+
+ | multiply | | date | | and |
+ +-----------+ +-----------+ +-------------+
+ | | | |
+ | | | |
+ | | | |
+ | | | |
+ V V V V
+ +-------+ +-------+ +-----+ +-----+
+ |payment| | 0.3 | | EQ | | LT |
+ +-------+ +-------+ +--+-----+ +-----+--+
+ | | | |
+ | | | |
+ V V V V
+ +-------+ +----+ +-----+ +-----+
+ | region| |east| |age | | 30 |
+ +-------+ +----+ +-----+ +-----+
+
+Features Support
+----------------
+
+.. _feature-table:
+
+The following table describes the support for s3-select functionalities:
+
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Feature | Detailed | Example |
++=================================+=================+=======================================================================+
+| Arithmetic operators | ^ * / + - ( ) | select (int(_1)+int(_2))*int(_9) from stdin; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| | | select ((1+2)*3.14) ^ 2 from stdin; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Compare operators | > < >= <= == != | select _1,_2 from stdin where (int(1)+int(_3))>int(_5); |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| logical operator | AND OR | select count(*) from stdin where int(1)>123 and int(_5)<200; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| casting operator | int(expression) | select int(_1),int( 1.2 + 3.4) from stdin; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| |float(expression)| |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| | timestamp(...) | select timestamp("1999:10:10-12:23:44") from stdin; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function | sum | select sum(int(_1)) from stdin; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function | min | select min( int(_1) * int(_5) ) from stdin; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function | max | select max(float(_1)),min(int(_5)) from stdin; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function | count | select count(*) from stdin where (int(1)+int(_3))>int(_5); |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions | extract | select count(*) from stdin where |
+| | | extract("year",timestamp(_2)) > 1950 |
+| | | and extract("year",timestamp(_1)) < 1960; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions | dateadd | select count(0) from stdin where |
+| | | datediff("year",timestamp(_1),dateadd("day",366,timestamp(_1))) == 1; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions | datediff | select count(0) from stdin where |
+| | | datediff("month",timestamp(_1),timestamp(_2))) == 2; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions | utcnow | select count(0) from stdin where |
+| | | datediff("hours",utcnow(),dateadd("day",1,utcnow())) == 24 ; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| String Functions | substr | select count(0) from stdin where |
+| | | int(substr(_1,1,4))>1950 and int(substr(_1,1,4))<1960; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| alias support | | select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 |
+| | | from stdin where a3>100 and a3<300; |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+
+Sending Query to RGW
+--------------------
+
+Syntax
+~~~~~~
+CSV default defintion for field-delimiter,row-delimiter,quote-char,escape-char are: { , \\n " \\ }
+
+::
+
+ aws --endpoint-url http://localhost:8000 s3api select-object-content
+ --bucket {BUCKET-NAME}
+ --expression-type 'SQL'
+ --input-serialization
+ '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}'
+ --output-serialization '{"CSV": {}}'
+ --key {OBJECT-NAME}
+ --expression "select count(0) from stdin where int(_1)<10;" output.csv
+
+CSV parsing behavior
+--------------------
+
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Feature | Description | input ==> tokens |
++=================================+=================+=======================================================================+
+| NULL | successive | ,,1,,2, ==> {null}{null}{1}{null}{2}{null} |
+| | field delimiter | |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| QUOTE | quote character | 11,22,"a,b,c,d",last ==> {11}{22}{"a,b,c,d"}{last} |
+| | overrides | |
+| | field delimiter | |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Escape | escape char | 11,22,str=\\"abcd\\"\\,str2=\\"123\\",last |
+| | overrides | ==> {11}{22}{str="abcd",str2="123"}{last} |
+| | meta-character. | |
+| | escape removed | |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| row delimiter | no close quote, | 11,22,a="str,44,55,66 |
+| | row delimiter is| ==> {11}{22}{a="str,44,55,66} |
+| | closing line | |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| csv header info | FileHeaderInfo | "**USE**" value means each token on first line is column-name, |
+| | tag | "**IGNORE**" value means to skip the first line |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
diff --git a/src/s3select/test/CMakeLists.txt b/src/s3select/test/CMakeLists.txt
new file mode 100644
index 000000000..b3c47325c
--- /dev/null
+++ b/src/s3select/test/CMakeLists.txt
@@ -0,0 +1,6 @@
+add_executable(s3select_test s3select_test.cpp)
+target_include_directories(s3select_test PUBLIC ../include)
+target_link_libraries(s3select_test gtest gtest_main boost_date_time)
+
+
+gtest_discover_tests(s3select_test)
diff --git a/src/s3select/test/s3select_test.cpp b/src/s3select/test/s3select_test.cpp
new file mode 100644
index 000000000..5b73845e3
--- /dev/null
+++ b/src/s3select/test/s3select_test.cpp
@@ -0,0 +1,244 @@
+#include "s3select.h"
+#include "gtest/gtest.h"
+#include <string>
+#include "boost/date_time/gregorian/gregorian.hpp"
+#include "boost/date_time/posix_time/posix_time.hpp"
+
+using namespace s3selectEngine;
+
+std::string run_expression_in_C_prog(const char* expression)
+{
+//purpose: per use-case a c-file is generated, compiles , and finally executed.
+
+// side note: its possible to do the following: cat test_hello.c | gcc -pipe -x c - -o /dev/stdout > ./1
+// gcc can read and write from/to pipe (use pipe2()) i.e. not using file-system , BUT should also run gcc-output from memory
+
+ const int C_FILE_SIZE=(1024*1024);
+ std::string c_test_file = std::string("/tmp/test_s3.c");
+ std::string c_run_file = std::string("/tmp/s3test");
+
+ FILE* fp_c_file = fopen(c_test_file.c_str(), "w");
+
+ //contain return result
+ char result_buff[100];
+
+ char* prog_c;
+
+ if(fp_c_file)
+ {
+ prog_c = (char*)malloc(C_FILE_SIZE);
+
+ size_t sz=sprintf(prog_c, "#include <stdio.h>\n \
+ int main() \
+ {\
+ printf(\"%%f\\n\",(double)(%s));\
+ } ", expression);
+
+ int status = fwrite(prog_c, 1, sz, fp_c_file);
+ fclose(fp_c_file);
+ }
+
+ std::string gcc_and_run_cmd = std::string("gcc ") + c_test_file + " -o " + c_run_file + " -Wall && " + c_run_file;
+
+ FILE* fp_build = popen(gcc_and_run_cmd.c_str(), "r"); //TODO read stderr from pipe
+
+ if(!fp_build)
+ {
+ return std::string("#ERROR#");
+ }
+
+ fgets(result_buff, sizeof(result_buff), fp_build);
+
+ unlink(c_run_file.c_str());
+ unlink(c_test_file.c_str());
+
+ return std::string(result_buff);
+}
+
+#define OPER oper[ rand() % oper.size() ]
+
+class gen_expr
+{
+
+private:
+
+ int open = 0;
+ std::string oper= {"+-+*/*"};
+
+ std::string gexpr()
+ {
+ return std::to_string(rand() % 1000) + ".0" + OPER + std::to_string(rand() % 1000) + ".0";
+ }
+
+ std::string g_openp()
+ {
+ if ((rand() % 3) == 0)
+ {
+ open++;
+ return std::string("(");
+ }
+ return std::string("");
+ }
+
+ std::string g_closep()
+ {
+ if ((rand() % 2) == 0 && open > 0)
+ {
+ open--;
+ return std::string(")");
+ }
+ return std::string("");
+ }
+
+public:
+
+ std::string generate()
+ {
+ std::string exp = "";
+ open = 0;
+
+ for (int i = 0; i < 10; i++)
+ {
+ exp = (exp.size() > 0 ? exp + OPER : std::string("")) + g_openp() + gexpr() + OPER + gexpr() + g_closep();
+ }
+
+ if (open)
+ for (; open--;)
+ {
+ exp += ")";
+ }
+
+ return exp;
+ }
+};
+
+std::string run_s3select(std::string expression)
+{
+ s3select s3select_syntax;
+
+ s3select_syntax.parse_query(expression.c_str());
+
+ std::string s3select_result;
+ s3selectEngine::csv_object s3_csv_object(&s3select_syntax);
+ std::string in = "1,1,1,1\n";
+
+ s3_csv_object.run_s3select_on_object(s3select_result, in.c_str(), in.size(), false, false, true);
+
+ s3select_result = s3select_result.substr(0, s3select_result.find_first_of(","));
+
+ return s3select_result;
+}
+
+TEST(TestS3SElect, s3select_vs_C)
+{
+//purpose: validate correct processing of arithmetical expression, it is done by running the same expression
+// in C program.
+// the test validate that syntax and execution-tree (including precedence rules) are done correctly
+
+ for(int y=0; y<10; y++)
+ {
+ gen_expr g;
+ std::string exp = g.generate();
+ std::string c_result = run_expression_in_C_prog( exp.c_str() );
+
+ char* err=0;
+ double c_dbl_res = strtod(c_result.c_str(), &err);
+
+ std::string input_query = "select " + exp + " from stdin;" ;
+ std::string s3select_res = run_s3select(input_query);
+
+ double s3select_dbl_res = strtod(s3select_res.c_str(), &err);
+
+ //std::cout << exp << " " << s3select_dbl_res << " " << s3select_res << " " << c_dbl_res/s3select_dbl_res << std::endl;
+ //std::cout << exp << std::endl;
+
+ ASSERT_EQ(c_dbl_res, s3select_dbl_res);
+ }
+}
+
+TEST(TestS3SElect, ParseQuery)
+{
+ //TODO syntax issues ?
+ //TODO error messeges ?
+
+ s3select s3select_syntax;
+
+ run_s3select(std::string("select (1+1) from stdin;"));
+
+ ASSERT_EQ(0, 0);
+}
+
+TEST(TestS3SElect, int_compare_operator)
+{
+ value a10(10), b11(11), c10(10);
+
+ ASSERT_EQ( a10 < b11, true );
+ ASSERT_EQ( a10 > b11, false );
+ ASSERT_EQ( a10 >= c10, true );
+ ASSERT_EQ( a10 <= c10, true );
+ ASSERT_EQ( a10 != b11, true );
+ ASSERT_EQ( a10 == b11, false );
+ ASSERT_EQ( a10 == c10, true );
+}
+
+TEST(TestS3SElect, float_compare_operator)
+{
+ value a10(10.1), b11(11.2), c10(10.1);
+
+ ASSERT_EQ( a10 < b11, true );
+ ASSERT_EQ( a10 > b11, false );
+ ASSERT_EQ( a10 >= c10, true );
+ ASSERT_EQ( a10 <= c10, true );
+ ASSERT_EQ( a10 != b11, true );
+ ASSERT_EQ( a10 == b11, false );
+ ASSERT_EQ( a10 == c10, true );
+
+}
+
+TEST(TestS3SElect, string_compare_operator)
+{
+ value s1("abc"), s2("def"), s3("abc");
+
+ ASSERT_EQ( s1 < s2, true );
+ ASSERT_EQ( s1 > s2, false );
+ ASSERT_EQ( s1 <= s3, true );
+ ASSERT_EQ( s1 >= s3, true );
+ ASSERT_EQ( s1 != s2, true );
+ ASSERT_EQ( s1 == s3, true );
+ ASSERT_EQ( s1 == s2, false );
+}
+
+TEST(TestS3SElect, arithmetic_operator)
+{
+ value a(1), b(2), c(3), d(4);
+
+ ASSERT_EQ( (a+b).i64(), 3 );
+
+ ASSERT_EQ( (value(0)-value(2)*value(4)).i64(), -8 );
+ ASSERT_EQ( (value(1.23)-value(0.1)*value(2)).dbl(), 1.03 );
+
+ a=int64_t(1); //a+b modify a
+ ASSERT_EQ( ( (a+b) * (c+d) ).i64(), 21 );
+}
+
+TEST(TestS3SElect, timestamp_function)
+{
+ // TODO: support formats listed here:
+ // https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-date.html#s3-glacier-select-sql-reference-to-timestamp
+ const std::string timestamp = "2007-02-23:14:33:01";
+ // TODO: out_simestamp should be the same as timestamp
+ const std::string out_timestamp = "2007-Feb-23 14:33:01";
+ const std::string input_query = "select timestamp(\"" + timestamp + "\") from stdin;" ;
+ std::string s3select_res = run_s3select(input_query);
+ ASSERT_EQ(s3select_res, out_timestamp);
+}
+
+TEST(TestS3SElect, utcnow_function)
+{
+ const boost::posix_time::ptime now(boost::posix_time::second_clock::universal_time());
+ const std::string input_query = "select utcnow() from stdin;" ;
+ auto s3select_res = run_s3select(input_query);
+ const boost::posix_time::ptime res_now;
+ ASSERT_EQ(s3select_res, boost::posix_time::to_simple_string(now));
+}
+