From 389020e14594e4894e28d1eb9103c210b142509e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 23 May 2024 18:45:13 +0200 Subject: Adding upstream version 18.2.3. Signed-off-by: Daniel Baumann --- src/s3select/.github/workflows/cmake.yml | 4 +- .../TPCDS/generate_upload_and_remove_infra.bash | 170 ++++++++++ src/s3select/TPCDS/run_tpcds.bash | 31 ++ src/s3select/TPCDS/tpcds_functions.bash | 17 + src/s3select/example/s3select_example.cpp | 64 +++- src/s3select/include/s3select.h | 271 +++++++++++++--- src/s3select/include/s3select_csv_parser.h | 67 +++- src/s3select/include/s3select_functions.h | 110 +++++-- src/s3select/include/s3select_json_parser.h | 1 - src/s3select/include/s3select_oper.h | 35 +- src/s3select/test/s3select_test.cpp | 356 ++++++++++++++++++++- src/s3select/test/s3select_test.h | 98 +++--- 12 files changed, 1092 insertions(+), 132 deletions(-) create mode 100755 src/s3select/TPCDS/generate_upload_and_remove_infra.bash create mode 100644 src/s3select/TPCDS/run_tpcds.bash (limited to 'src/s3select') diff --git a/src/s3select/.github/workflows/cmake.yml b/src/s3select/.github/workflows/cmake.yml index 577eb19cc..f8a836a90 100644 --- a/src/s3select/.github/workflows/cmake.yml +++ b/src/s3select/.github/workflows/cmake.yml @@ -34,10 +34,10 @@ jobs: sudo curl -L https://dist.apache.org/repos/dist/dev/arrow/KEYS | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://apache.jfrog.io/artifactory/arrow/ubuntu focal main" sudo apt-get update - sudo apt-get install -y -V libarrow-dev + sudo apt-get install -y -V libarrow-dev=10.0.1-1 - name: install parquet - run: sudo apt-get install -y -V libparquet-dev + run: sudo apt-get install -y -V libparquet-dev=10.0.1-1 - name: install-gtest diff --git a/src/s3select/TPCDS/generate_upload_and_remove_infra.bash b/src/s3select/TPCDS/generate_upload_and_remove_infra.bash new file mode 100755 index 000000000..2b23dde59 --- /dev/null +++ b/src/s3select/TPCDS/generate_upload_and_remove_infra.bash @@ -0,0 +1,170 @@ +#!/bin/bash + +## this script resides in [galsl/fedora_38:tpcds_v2] docker container. +## the container uses the following repo [ https://github.com/galsalomon66/tpc-ds-datagen-to-aws-s3 ] for the dsdgen application. +## the purpose of this script it to launch multiple instances of the dsdgen-application(depends on number of cores) +## the flow splits between the very-big-tables and the small tables. +## the num_of_cpu defines the size of parallelism, the num_of_partitions defines the amount chunks that combines togather a single table (it could be huge). +## each cycle of parallel generate-application, ended with flow that uploads the generated files into S3(its done in parallel), upon all files are uploaded +## it removes all files, i.e. for 3TB scale there is no need for 3TB of disk-space (as for S3-storage capacity it obvious ...) + + +## TODO set by te user +TPCDS_DIR=/tpcds_output/ + + +all_tables="call_center +catalog_page +customer_address +customer +customer_demographics +date_dim +household_demographics +income_band +item +promotion +reason +ship_mode +store +time_dim +warehouse +web_page +web_site +catalog_returns +catalog_sales +web_returns +web_sales +store_returns +store_sales" + +#big tables and also parent +#parent table means it got a child table, i.e. there is a relation between them. +parent_tables="store_sales catalog_sales web_sales inventory" + +## not a parent table +standalone_tables="call_center catalog_page customer_address customer customer_demographics date_dim household_demographics income_band +item promotion reason ship_mode store time_dim warehouse web_page web_site" +#small_tables="" + +num_of_cpu=56 +num_of_partitions=0 + +create_dsdgen_workers_non_parent_tables() +{ + +[ ! -d ${TPCDS_DIR} ] && echo ${TPCDS_DIR} not exist && exit +num_of_partitions=$(echo 1 | awk -v sc=${SCALE} -v c=${num_of_cpu} '{print int((sc/1000)*c);}') +if [ $num_of_partitions -le 1 ] +then + num_of_partitions=2 +fi + +echo "small tables="num_of_partitions=${num_of_partitions} + +((i=1)) + +for t in ${standalone_tables} +do + for c in $(seq 1 ${num_of_partitions}) + do + ## the command line defines which table, what scale(size), paratition size, what partition to produce and where to produce it. + echo "time ./dsdgen -dir ${TPCDS_DIR} -table ${t} -scale ${SCALE} -force -parallel ${num_of_partitions} -child ${c} &" >> generate_upload_and_remove_exec.bash + ## number of CPU + if [ $(( i++ % ${num_of_cpu} )) -eq 0 ] + then + echo wait >> generate_upload_and_remove_exec.bash + # upon complete with wait, loop on generated dat files, upload each in parallel, each upload is done, remove file + # upload && remove + # + echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash + fi + done +done +echo wait >> generate_upload_and_remove_exec.bash +echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash +echo "echo small tables done." >> generate_upload_and_remove_exec.bash + +chmod +x generate_upload_and_remove_exec.bash +} + +create_dsdgen_workers() +{ + +[ ! -d ${TPCDS_DIR} ] && echo ${TPCDS_DIR} not exist && exit +num_of_partitions=$(echo 1 | awk -v sc=${SCALE} -v c=${num_of_cpu} '{print int((sc/10)*c);}') +echo "big tables="num_of_partitions=${num_of_partitions} +if [ $num_of_partitions -le 1 ] +then + num_of_partitions=2 +fi + +((i=1)) +touch generate_upload_and_remove_exec.bash +rm -f generate_upload_and_remove_exec.bash + +echo "#!/bin/bash" >> generate_upload_and_remove_exec.bash +## upload_and_remove_func.bash include functions for upload and remove +echo ". generate_upload_and_remove_infra.bash" >> generate_upload_and_remove_exec.bash +echo "cd /tpc-ds-datagen-to-aws-s3/tpc-ds/v2.11.0rc2/tools" >> generate_upload_and_remove_exec.bash + +for t in ${parent_tables} +do + for c in $(seq 1 ${num_of_partitions}) + do + echo "time ./dsdgen -dir ${TPCDS_DIR} -table ${t} -scale ${SCALE} -force -parallel ${num_of_partitions} -child ${c} &" >> generate_upload_and_remove_exec.bash + ## number of CPU + if [ $(( i++ % ${num_of_cpu} )) -eq 0 ] + then + echo wait >> generate_upload_and_remove_exec.bash + # upon complete with wait, loop on generated dat files, upload each in parallel, each upload is done, remove file + # upload && remove + # + echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash + fi + done +done +echo wait >> generate_upload_and_remove_exec.bash +echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash +echo "echo big tables done." >> generate_upload_and_remove_exec.bash + +## adding the production of the other tables +create_dsdgen_workers_non_parent_tables + +chmod +x generate_upload_and_remove_exec.bash + +## the generated script bellow contains all is needed for creating TPCDS tables in S3-storage. +## should execute by the user +#./generate_upload_and_remove_exec.bash + +} + +upload_and_remove_worker_func() +{ +# create list of tasks to run in background, remove each uploaded file upon completion +(i=0) +touch upload_and_remove_exec.bash +rm -f upload_and_remove_exec.bash + +echo "#!/bin/bash" >> upload_and_remove_exec.bash + +for f in $(ls ${TPCDS_DIR}/*.dat) +do + #echo $f + table_name=$(basename $f | sed 's/_[0-9]\+_[0-9]\+/ /' | awk '{print $1;}') + echo "(aws s3api put-object --bucket hive --key scale_${SCALE}/${table_name}/$(basename $f) --body ${f} --endpoint-url ${S3_ENDPOINT} > /dev/null 2>&1 && echo upload ${f} && rm -f ${f}) &" >> upload_and_remove_exec.bash + if [ $(( i++ % ${num_of_cpu} )) -eq 0 ] + then + echo wait >> upload_and_remove_exec.bash + fi +done + +echo wait >> upload_and_remove_exec.bash +#upload and remove all generated files +chmod +x upload_and_remove_exec.bash +cp upload_and_remove_exec.bash upload_and_remove.bash_${RANDOM} ## debug + +## start upload and remove in parallel +./upload_and_remove_exec.bash + +} + diff --git a/src/s3select/TPCDS/run_tpcds.bash b/src/s3select/TPCDS/run_tpcds.bash new file mode 100644 index 000000000..f1c3eecf9 --- /dev/null +++ b/src/s3select/TPCDS/run_tpcds.bash @@ -0,0 +1,31 @@ +#!/bin/bash + +## this script is the entry-point of tpcds-data-generation +## the first and the only argument is the SCALE factor + +[ $# -lt 1 ] && echo "type a single number for the scale (2 --> 3000)" && exit + +re='^[0-9]+$' +[[ ! $1 =~ $re ]] && echo "SCALE should be a number" && exit + +## the following code lines accepts env-variables for the S3 system +[ -z ${S3_ENDPOINT} ] && echo "missing env-variable S3_ENDPOINT" && exit +[ -z ${S3_ACCESS_KEY} ] && echo missing env-variable S3_ACCESS_KEY && exit +[ -z ${S3_SECRET_KEY} ] && echo missing env-variable S3_SECRET_KEY && exit + +## updating AWS credentials +cat ~/.aws/credentials | \ + awk -v acc=${S3_ACCESS_KEY} '{if($0 ~ /aws_access_key_id/){print "aws_access_key_id = ",acc;} else{print $0;}}' | \ + awk -v acc=${S3_SECRET_KEY} '{if($0 ~ /aws_secret_access_key/){print "aws_secret_access_key = ",acc;} else{print $0;}}' > /tmp/credentials + +cat /tmp/credentials > ~/.aws/credentials + +export SCALE=$1 + +. ./generate_upload_and_remove_infra.bash + +## create generate_upload_and_remove_exec.bash +create_dsdgen_workers +## running tpcds data generator script +time /generate_upload_and_remove_exec.bash + diff --git a/src/s3select/TPCDS/tpcds_functions.bash b/src/s3select/TPCDS/tpcds_functions.bash index 67a64ff0b..4a3580418 100644 --- a/src/s3select/TPCDS/tpcds_functions.bash +++ b/src/s3select/TPCDS/tpcds_functions.bash @@ -18,6 +18,23 @@ start_tpcds;' } +run_tpcds_v2() +{ +## the following command is using a different container +## this container is using a C version for dsdgen(the tpcds data generation application) +## the container also contains a bash script to run a mulitple instances of dsdgen to accelarate the total throughput + +## the SCALE factor is first and only argument for /run_tpcds.bash script. +## it need to set the following env-variables S3_ENDPOINT,S3_ACCESS_KEY,S3_SECRET_KEY +## and to mount the container directory /tpcds_output to the host directory, this directory is for the dsdgen application +## the generated data is stored there, and later uploaded into S3 storage, and removed. + + +sudo docker run -v /tmp/tpcds_output/:/tpcds_output/ --env S3_ENDPOINT=http://172.17.0.1:8000 --env S3_ACCESS_KEY=b2345678901234567890 \ +--env S3_SECRET_KEY=b234567890123456789012345678901234567890 -it galsl/fedora_38:tpcds_v2 bash -c '/run_tpcds.bash 2' + +} + move_from_tpcds_bucket_to_hive_bucket() { ## for the case it needs to move into different bucket(where trino is point at) diff --git a/src/s3select/example/s3select_example.cpp b/src/s3select/example/s3select_example.cpp index 71aff3d01..3a4b20dc1 100644 --- a/src/s3select/example/s3select_example.cpp +++ b/src/s3select/example/s3select_example.cpp @@ -11,6 +11,9 @@ using namespace s3selectEngine; using namespace BOOST_SPIRIT_CLASSIC_NS; +std::string output_format{}; +std::string header_info{}; + class awsCli_handler { @@ -154,7 +157,7 @@ public: //std::string get_error_description(){} - std::string get_result() + std::string& get_result() { return m_result; } @@ -196,14 +199,20 @@ public: { csv.ignore_header_info = true; } - else if (m_header_info.compare("USE") == 0) + else if (header_info.compare("USE") == 0) { csv.use_header_info = true; } - m_s3_csv_object = std::unique_ptr(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); + if(output_format.compare("JSON") == 0) { + csv.output_json_format = true; + } + + m_s3_csv_object = std::unique_ptr(new s3selectEngine::csv_object()); + m_s3_csv_object->set_csv_query(s3select_syntax.get(), csv); } + if (s3select_syntax->get_error_description().empty() == false) { header_size = create_header_records(m_buff_header.get()); @@ -363,6 +372,8 @@ int process_json_query(const char* input_query,const char* fname) {//purpose: process json query s3select s3select_syntax; + s3selectEngine::json_object m_s3_json_object; + json_object::csv_definitions json; int status = s3select_syntax.parse_query(input_query); if (status != 0) { @@ -370,6 +381,10 @@ int process_json_query(const char* input_query,const char* fname) return -1; } + if(output_format.compare("JSON") == 0) { + json.output_json_format = true; + } + std::ifstream input_file_stream; try { input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary); @@ -381,7 +396,7 @@ int process_json_query(const char* input_query,const char* fname) } auto object_sz = boost::filesystem::file_size(fname); - json_object json_query_processor(&s3select_syntax); + m_s3_json_object.set_json_query(&s3select_syntax, json); std::string buff(BUFFER_SIZE,0); std::string result; @@ -393,10 +408,11 @@ int process_json_query(const char* input_query,const char* fname) { bytes_read += read_sz; std::cout << "read next chunk " << chunk_count++ << ":" << read_sz << ":" << bytes_read << "\r"; + result.clear(); try{ - status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz); + status = m_s3_json_object.run_s3select_on_stream(result, buff.data(), read_sz, object_sz, json.output_json_format); } catch (base_s3select_exception &e) { std::cout << e.what() << std::endl; @@ -416,7 +432,7 @@ int process_json_query(const char* input_query,const char* fname) std::cout << "failure upon processing " << std::endl; return -1; } - if(json_query_processor.is_sql_limit_reached()) + if(m_s3_json_object.is_sql_limit_reached()) { std::cout << "json processing reached limit " << std::endl; break; @@ -425,7 +441,7 @@ int process_json_query(const char* input_query,const char* fname) } try{ result.clear(); - json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz); + m_s3_json_object.run_s3select_on_stream(result, 0, 0, object_sz, json.output_json_format); } catch (base_s3select_exception &e) { std::cout << e.what() << std::endl; @@ -499,6 +515,14 @@ int run_on_localFile(char* input_query) csv.use_header_info = false; csv.quote_fields_always=false; + if(output_format.compare("JSON") == 0) { + csv.output_json_format = true; + } + + if(header_info.compare("USE") == 0) { + csv.use_header_info = true; + } + #define CSV_QUOT "CSV_ALWAYS_QUOT" #define CSV_COL_DELIM "CSV_COLUMN_DELIMETER" #define CSV_ROW_DELIM "CSV_ROW_DELIMITER" @@ -521,7 +545,8 @@ int run_on_localFile(char* input_query) csv.use_header_info = true; } - s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); + s3selectEngine::csv_object s3_csv_object; + s3_csv_object.set_csv_query(&s3select_syntax, csv); std::function fp_debug = [](const char* msg) { @@ -585,6 +610,14 @@ int run_on_localFile(char* input_query) return 0; } +std::string get_ranged_string(std::string& inp) +{ + size_t startPos = inp.find(""); + size_t endPos = inp.find(""); + + return inp.substr(startPos,endPos-startPos); +} + int run_on_single_query(const char* fname, const char* query) { @@ -632,11 +665,12 @@ int run_on_single_query(const char* fname, const char* query) if(status<0) { std::cout << "failure on execution " << std::endl; + std::cout << get_ranged_string( awscli->get_result() ) << std::endl; break; } else { - std::cout << awscli->get_result() << std::endl; + std::cout << get_ranged_string( awscli->get_result() ) << std::endl; } if(!read_sz || input_file_stream.eof()) @@ -662,12 +696,24 @@ int main(int argc,char **argv) continue; } + if (!strcmp(argv[i], "-output")) + {//object recieved as CLI parameter + output_format = argv[i + 1]; + continue; + } + if (!strcmp(argv[i], "-q")) { query = argv[i + 1]; continue; } + if (!strcmp(argv[i], "-HeaderInfo")) + { + header_info = argv[i + 1]; + continue; + } + if (!strcmp(argv[i], "-cmds")) {//query file contain many queries query_file = argv[i + 1]; diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h index 3ac111351..667c92ba9 100644 --- a/src/s3select/include/s3select.h +++ b/src/s3select/include/s3select.h @@ -18,6 +18,7 @@ #include #include #include +#include #define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} @@ -1293,11 +1294,11 @@ void push_logical_operator::builder(s3select* self, const char* a, const char* b std::string token(a, b); logical_operand::oplog_t l = logical_operand::oplog_t::NA; - if (token == "and") + if (boost::iequals(token,"and")) { l = logical_operand::oplog_t::AND; } - else if (token == "or") + else if (boost::iequals(token,"or")) { l = logical_operand::oplog_t::OR; } @@ -1633,8 +1634,10 @@ void push_like_predicate_escape::builder(s3select* self, const char* a, const ch void push_is_null_predicate::builder(s3select* self, const char* a, const char* b) const { - //expression is null, is not null + //expression could be is null OR is not null std::string token(a, b); + //to_lower enable case insensitive + boost::algorithm::to_lower(token); bool is_null = true; for(size_t i=0;i comment_chars; std::vector trim_chars; + std::string schema; - s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false) {} + s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false), output_json_format(false) {} }; @@ -2172,6 +2177,8 @@ protected: scratch_area* m_sa; std::string m_obj_name; bool m_aggr_flow = false; //TODO once per query + bool is_star = false; + bool is_json = false; bool m_is_to_aggregate; std::vector m_projections; base_statement* m_where_clause; @@ -2182,9 +2189,11 @@ protected: unsigned long m_processed_rows; size_t m_returned_bytes_size; std::function fp_ext_debug_mesg;//dispache debug message into external system + std::vector m_projection_keys{}; public: s3select_csv_definitions m_csv_defintion;//TODO add method for modify + std::string m_error_description; enum class Status { END_OF_STREAM, @@ -2196,6 +2205,16 @@ public: Status m_sql_processing_status; + void set_processing_time_error() + { + m_sql_processing_status = Status::SQL_ERROR; + } + + bool is_processing_time_error() + { + return m_sql_processing_status == Status::SQL_ERROR; + } + Status get_sql_processing_status() { return m_sql_processing_status; @@ -2206,6 +2225,60 @@ public: return m_sql_processing_status == Status::LIMIT_REACHED; } + void set_star_true() { + is_star = true; + } + + void set_projection_keys(std::vector m_projections) + { + std::vector alias_values{}; + std::unordered_set alias_projection_keys{}; + bool is_output_json_format = m_csv_defintion.output_json_format; + + for (auto& a : *m_s3_select->get_aliases()->get()) + { + alias_values.push_back(a.first); + alias_projection_keys.insert(a.second); + } + + size_t m_alias_index = 0; + int index_json_projection = 0; + is_json = m_s3_select->is_json_query(); + + for (auto& p : m_projections) + { + if(p->is_statement_contain_star_operation()) + { + set_star_true(); + } + p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query()); + + std::string key_from_projection{}; + if(p->is_column()){ + key_from_projection = p->get_key_from_projection(); + } + + if(alias_projection_keys.count(p) == 0 && p->is_column()) { + m_projection_keys.push_back(key_from_projection); + } else if(alias_projection_keys.count(p) > 0 && p->is_column()) { + m_projection_keys.push_back(alias_values[m_alias_index++]); + } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) > 0 ) { + m_projection_keys.push_back(alias_values[m_alias_index++]); + } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) == 0) { + std::string index_json = "_" + std::to_string(++index_json_projection); + m_projection_keys.push_back(index_json); + } + } + + if(m_s3_select->is_json_query()) { + for(auto& k: m_projection_keys) { + size_t lastDotPosition = k.find_last_of('.'); + std::string extractedPart = k.substr(lastDotPosition + 1); + k = extractedPart; + } + } + } + void set_base_defintions(s3select* m) { if(m_s3_select || !m) @@ -2225,10 +2298,8 @@ public: m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query()); } - for (auto& p : m_projections) - { - p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query()); - } + set_projection_keys(m_projections); + m_is_to_aggregate = true;//TODO not correct. should be set upon end-of-stream m_aggr_flow = m_s3_select->is_aggregate_query(); @@ -2269,45 +2340,95 @@ public: return m_returned_bytes_size; } - void result_values_to_string(multi_values& projections_resuls, std::string& result) + void json_result_format(multi_values& projections_results, std::string& result, std::string& output_delimiter) { - size_t i = 0; + result += "{"; + int j = 0; + for (size_t i = 0; i < projections_results.values.size(); ++i) + { + auto& res = projections_results.values[i]; + std::string label = "_"; + label += std::to_string(i + 1); + + if (i > 0) { + result += output_delimiter; + } + + if(!is_star) { + result += "\"" + m_projection_keys[j] + "\":"; + } else if(is_star && !is_json) { + result += "\"" + label + "\":"; + } + + result.append(res->to_string()); + m_returned_bytes_size += strlen(res->to_string()); + ++j; + } + result += "}"; + + } + + + void result_values_to_string(multi_values& projections_resuls, std::string& result) +{ std::string output_delimiter(1,m_csv_defintion.output_column_delimiter); std::string output_row_delimiter(1,m_csv_defintion.output_row_delimiter); + if(m_csv_defintion.output_json_format && projections_resuls.values.size()) { + json_result_format(projections_resuls, result, output_delimiter); + result.append(output_row_delimiter); + return; + } + + size_t i = 0; for(auto& res : projections_resuls.values) { + + std::string column_result; + + try{ + column_result = res->to_string(); + } + catch(std::exception& e) + { + column_result = "{failed to compute projection: " + std::string(e.what()) + "}"; + m_error_description = column_result; + set_processing_time_error(); + } + + if(fp_ext_debug_mesg) - fp_ext_debug_mesg( res->to_string() ); + fp_ext_debug_mesg(column_result.data()); if (m_csv_defintion.quote_fields_always) { std::ostringstream quoted_result; - quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char); + quoted_result << std::quoted(column_result,m_csv_defintion.output_quot_char, m_csv_defintion.escape_char); result.append(quoted_result.str()); + m_returned_bytes_size += quoted_result.str().size(); - }//TODO to add asneeded + }//TODO to add asneeded else { - result.append(res->to_string()); - m_returned_bytes_size += strlen(res->to_string()); + result.append(column_result); + m_returned_bytes_size += column_result.size(); + } - if(!m_csv_defintion.redundant_column) { - if(++i < projections_resuls.values.size()) { - result.append(output_delimiter); - m_returned_bytes_size += output_delimiter.size(); - } - } - else { - result.append(output_delimiter); - m_returned_bytes_size += output_delimiter.size(); - } - } - if(!m_aggr_flow){ - result.append(output_row_delimiter); + if(!m_csv_defintion.redundant_column) { + if(++i < projections_resuls.values.size()) { + result.append(output_delimiter); + m_returned_bytes_size += output_delimiter.size(); + } + } else { + result.append(output_delimiter); m_returned_bytes_size += output_delimiter.size(); + } } - } + if(!m_aggr_flow) { + result.append(output_row_delimiter); + m_returned_bytes_size += output_delimiter.size(); + } +} Status getMatchRow( std::string& result) { @@ -2336,7 +2457,7 @@ public: } result_values_to_string(projections_resuls,result); - return m_sql_processing_status = Status::END_OF_STREAM; + return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::END_OF_STREAM); } m_processed_rows++; @@ -2368,9 +2489,9 @@ public: i->set_last_call(); i->set_skip_non_aggregate(false);//projection column is set to be runnable projections_resuls.push_value( &(i->eval()) ); - } + } result_values_to_string(projections_resuls,result); - return m_sql_processing_status = Status::LIMIT_REACHED; + return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::LIMIT_REACHED); } } while (multiple_row_processing()); @@ -2394,6 +2515,7 @@ public: { a.second->invalidate_cache_result(); } + } while (multiple_row_processing() && m_where_clause && !(where_clause_result = m_where_clause->eval().is_true()) && !(m_is_limit_on && m_processed_rows == m_limit)); @@ -2421,12 +2543,18 @@ public: for (auto& i : m_projections) { projections_resuls.push_value( &(i->eval()) ); - } - result_values_to_string(projections_resuls,result); + } + result_values_to_string(projections_resuls,result); + if(m_sql_processing_status == Status::SQL_ERROR) + { + return m_sql_processing_status; + } } - } - return is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT); + + return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : + (is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT)); + }//getMatchRow @@ -2477,14 +2605,12 @@ public: { //return; } - - set_base_defintions(s3_query); m_csv_defintion = csv; + set_base_defintions(s3_query); } private: bool m_skip_last_line; - std::string m_error_description; char* m_stream; char* m_end_stream; std::vector m_row_tokens; @@ -2604,26 +2730,36 @@ public: m_error_description = "escaped_char_missing failure while csv parsing"; return -1; } - catch(io::error::escaped_string_not_closed& err) + catch(io::error::escaped_string_not_closed& err) { m_error_description = "escaped_string_not_closed failure while csv parsing"; return -1; } - catch(io::error::line_length_limit_exceeded& err) + catch(io::error::line_length_limit_exceeded& err) { m_error_description = "line_length_limit_exceeded failure while csv parsing"; return -1; } - catch(io::error::with_file_name& err) + catch(io::error::missmatch_of_begin_end& err) { - m_error_description = "with_file_name failure while csv parsing"; + m_error_description = "missmatch_of_begin_end failure while csv parsing" + std::string(err.what()); return -1; } - catch(io::error::with_file_line& err) + catch(io::error::missmatch_end& err) { - m_error_description = "with_file_line failure while csv parsing"; + m_error_description = "missmatch_end failure while csv parsing" + std::string(err.what()); return -1; } + catch(io::error::with_file_name& err) + { + m_error_description = "with_file_name failure while csv parsing"; + return -1; + } + catch(std::exception& e) + { + m_error_description = "error while processing CSV object : " + std::string(e.what()); + return -1; + } return status; } @@ -2634,7 +2770,7 @@ private: //purpose: the CSV data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed. std::string tmp_buff; - + int status = 0; m_processed_bytes += stream_length; m_skip_first_line = false; @@ -2648,6 +2784,22 @@ private: p_obj_chunk++; } + if(*p_obj_chunk != m_csv_defintion.row_delimiter) + {// previous row can not be completed with current chunk + if(fp_ext_debug_mesg) + { + std::string err_mesg = "** the stream chunk is too small for processing(saved for later) **"; + fp_ext_debug_mesg(err_mesg.c_str()); + } + //copy the part to be processed later + tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream)); + //saved for later processing + m_last_line.append(tmp_buff); + m_previous_line = true;//it means to skip last line + //skip processing since the row tail is missing. + return 0; + } + tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream)); merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter; m_previous_line = false; @@ -2655,7 +2807,7 @@ private: m_skip_x_first_bytes = tmp_buff.size()+1; //processing the merged row (previous broken row) - run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false); + status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false); } if (stream_length && csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter) @@ -2676,7 +2828,8 @@ private: stream_length -= (m_last_line.length()); } - return run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size)); + status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size)); + return status; } public: @@ -2696,6 +2849,11 @@ public: m_skip_x_first_bytes=0; } + if(m_stream>m_end_stream) + { + throw base_s3select_exception(std::string("** m_stream > m_end_stream **") + + std::to_string( (m_stream - m_end_stream) ) ,base_s3select_exception::s3select_exp_en_t::FATAL); + } CSVParser _csv_parser("csv", m_stream, m_end_stream); csv_parser = &_csv_parser; csv_parser->set_csv_def( m_csv_defintion.row_delimiter, @@ -2745,6 +2903,10 @@ public: { break;//user should request for sql_processing_status } + if(m_sql_processing_status == Status::SQL_ERROR) + { + return -1; + } } while (true); @@ -2764,7 +2926,6 @@ class parquet_object : public base_s3object { private: - std::string m_error_description; parquet_file_parser* object_reader; parquet_file_parser::column_pos_t m_where_clause_columns; parquet_file_parser::column_pos_t m_projections_columns; @@ -2948,11 +3109,13 @@ private: std::string* m_s3select_result = nullptr; size_t m_row_count; bool star_operation_ind; - std::string m_error_description; bool m_init_json_processor_ind; public: + class csv_definitions : public s3select_csv_definitions + {}; + void init_json_processor(s3select* query) { if(m_init_json_processor_ind) @@ -2997,6 +3160,7 @@ public: if(p->is_statement_contain_star_operation()) { star_operation_ind=true; + set_star_true(); break; } } @@ -3095,7 +3259,7 @@ private: public: - int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size) + int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size, bool json_format = false) { int status=0; m_processed_bytes += stream_length; @@ -3134,8 +3298,9 @@ public: return status; } - void set_json_query(s3select* s3_query) + void set_json_query(s3select* s3_query, csv_definitions csv) { + m_csv_defintion = csv; set_base_defintions(s3_query); init_json_processor(s3_query); } diff --git a/src/s3select/include/s3select_csv_parser.h b/src/s3select/include/s3select_csv_parser.h index dab2e4efa..28e8117a6 100644 --- a/src/s3select/include/s3select_csv_parser.h +++ b/src/s3select/include/s3select_csv_parser.h @@ -13,6 +13,53 @@ namespace io{ , file_line, file_name); } }; + + struct missmatch_of_begin_end : + base, + with_file_name, + with_file_line{ + int begin=-1,end=-1; + void set_begin_end(int b,int e){ + begin=b; + end=e; + } + + void format_error_message()const override{ + std::snprintf(error_message_buffer, sizeof(error_message_buffer), + "***missmatch_of_begin_end*** Line number %d in file \"%s\" begin{%d} > end{%d}" + ,file_line, file_name,begin,end); + } + }; + + struct missmatch_end : + base, + with_file_name, + with_file_line{ + int end=-1; + int block_size=-1; + void set_end_block(int e,int b){ + end = e; + block_size = b; + } + void format_error_message()const override{ + std::snprintf(error_message_buffer, sizeof(error_message_buffer), + "***missmatch_end*** Line number %d in file \"%s\" end{%d} block{%d}" + ,file_line, file_name, end, block_size); + } + }; + + + struct line_is_null : + base, + with_file_name, + with_file_line{ + void format_error_message()const override{ + std::snprintf(error_message_buffer, sizeof(error_message_buffer), + "***line is NULL*** Line number %d in file \"%s\"" + ,file_line, file_name); + } + }; + } namespace detail{ @@ -133,7 +180,11 @@ namespace io{ void chop_next_column(char*&line, char*&col_begin, char*&col_end, char& col_delimiter, char& quote, char& escape_char) { - assert(line != nullptr); + if(line == NULL) + { + io::error::line_is_null err; + throw err; + } col_begin = line; // the col_begin + (... - col_begin) removes the constness @@ -312,8 +363,18 @@ class CSVParser ++file_line; - assert(data_begin < data_end); - assert(data_end <= block_len*2); + if(data_begin > data_end) + { + io::error::missmatch_of_begin_end err; + err.set_begin_end(data_begin,data_end); + throw err; + } + if(data_end > block_len*2) + { + io::error::missmatch_end err; + err.set_end_block(data_end,block_len*2); + throw err; + } if(data_begin >= block_len) { diff --git a/src/s3select/include/s3select_functions.h b/src/s3select/include/s3select_functions.h index 8c507fca1..4d88d772e 100644 --- a/src/s3select/include/s3select_functions.h +++ b/src/s3select/include/s3select_functions.h @@ -350,6 +350,7 @@ private: s3select_functions* m_s3select_functions; variable m_result; bool m_is_aggregate_function; + value eval_result; void _resolve_name() { @@ -437,14 +438,28 @@ public: {//all rows prior to last row if(m_skip_non_aggregate_op == false || is_aggregate() == true) { - (*m_func_impl)(&arguments, &m_result); + try { + (*m_func_impl)(&arguments, &m_result); + } + catch(std::exception& e) + { + std::string error_msg = "[" + m_func_impl->m_function_name + " failed : " + std::string(e.what()) + "]"; + throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL); + } } else if(m_skip_non_aggregate_op == true) { for(auto& p : arguments) {//evaluating the arguments (not the function itself, which is a non-aggregate function) //i.e. in the following use case substring( , sum(),count() ) ; only sum() and count() are evaluated. - p->eval(); + try { + p->eval(); + } + catch(std::exception& e) + { + std::string error_msg = m_func_impl->m_function_name + " failed : " + std::string(e.what()); + throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL); + } } } } @@ -452,9 +467,27 @@ public: {//on the last row, the aggregate function is finalized, //and non-aggregate function is evaluated with the result of aggregate function. if(is_aggregate()) - (*m_func_impl).get_aggregate_result(&m_result); + { + try{ + (*m_func_impl).get_aggregate_result(&m_result); + } + catch(std::exception& e) + { + std::string error_msg = m_func_impl->m_function_name + " failed : " + std::string(e.what()); + throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL); + } + } else - (*m_func_impl)(&arguments, &m_result); + { + try{ + (*m_func_impl)(&arguments, &m_result); + } + catch(std::exception& e) + { + std::string error_msg = m_func_impl->m_function_name + " failed : " + std::string(e.what()); + throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL); + } + } } return m_result.get_value(); @@ -736,6 +769,10 @@ struct _fn_to_int : public base_function var_result = static_cast(v.dbl()); break; + case value::value_En_t::S3NULL: + var_result.setnull(); + break; + default: var_result = v.i64(); break; @@ -782,6 +819,10 @@ struct _fn_to_float : public base_function var_result = v.dbl(); break; + case value::value_En_t::S3NULL: + var_result.setnull(); + break; + default: var_result = v.i64(); break; @@ -2017,6 +2058,11 @@ struct _fn_to_bool : public base_function { i = func_arg.i64(); } + else if (func_arg.type == value::value_En_t::S3NULL) + { + result->set_null(); + return true; + } else { i = 0; @@ -2035,12 +2081,14 @@ struct _fn_to_bool : public base_function struct _fn_trim : public base_function { + //TODO base function trim std::string input_string; value v_remove; value v_input; _fn_trim() { + //default character to remove is blank v_remove = " "; } @@ -2053,13 +2101,16 @@ struct _fn_trim : public base_function { base_statement* str = *iter; v_input = str->eval(); if(v_input.type != value::value_En_t::STRING) { - throw base_s3select_exception("content is not string"); + throw base_s3select_exception("content type is not a string"); } input_string = v_input.str(); if (args_size == 2) { iter++; base_statement* next = *iter; v_remove = next->eval(); + if(v_remove.type != value::value_En_t::STRING) { + throw base_s3select_exception("remove type is not a string"); + } } boost::trim_right_if(input_string,boost::is_any_of(v_remove.str())); boost::trim_left_if(input_string,boost::is_any_of(v_remove.str())); @@ -2069,13 +2120,13 @@ struct _fn_trim : public base_function { }; struct _fn_leading : public base_function { - std::string input_string; value v_remove; value v_input; _fn_leading() { + //default character to remove is blank v_remove = " "; } @@ -2088,13 +2139,16 @@ struct _fn_leading : public base_function { base_statement* str = *iter; v_input = str->eval(); if(v_input.type != value::value_En_t::STRING) { - throw base_s3select_exception("content is not string"); + throw base_s3select_exception("content type is not a string"); } input_string = v_input.str(); if (args_size == 2) { iter++; base_statement* next = *iter; v_remove = next->eval(); + if(v_remove.type != value::value_En_t::STRING) { + throw base_s3select_exception("remove type is not a string"); + } } boost::trim_left_if(input_string,boost::is_any_of(v_remove.str())); result->set_value(input_string.c_str()); @@ -2110,6 +2164,7 @@ struct _fn_trailing : public base_function { _fn_trailing() { + //default character to remove is blank v_remove = " "; } @@ -2122,13 +2177,16 @@ struct _fn_trailing : public base_function { base_statement* str = *iter; v_input = str->eval(); if(v_input.type != value::value_En_t::STRING) { - throw base_s3select_exception("content is not string"); + throw base_s3select_exception("content type is not a string"); } input_string = v_input.str(); if (args_size == 2) { iter++; base_statement* next = *iter; v_remove = next->eval(); + if(v_remove.type != value::value_En_t::STRING) { + throw base_s3select_exception("remove type is not a string"); + } } boost::trim_right_if(input_string,boost::is_any_of(v_remove.str())); result->set_value(input_string.c_str()); @@ -2183,7 +2241,8 @@ struct _fn_decimal_operator : public base_function { iter++; base_statement* expr_scale = *iter; value expr_scale_val = expr_scale->eval(); - + + //parser does the type checking precision = expr_precision_val.i64(); scale = expr_scale_val.i64(); @@ -2195,20 +2254,20 @@ struct _fn_decimal_operator : public base_function { struct _fn_engine_version : public base_function { - const char* version_description =R"(PR #137 : -the change handle the use cases where the JSON input starts with an anonymous array/object this may cause wrong search result per the user request(SQL statement) - -handle the use-case where the user requests a json-key-path that may point to a non-discrete value. i.e. array or an object. -editorial changes. - -fix for CSV flow, in the case of a "broken row" (upon processing stream of data) - -null results upon aggregation functions on an empty group (no match for where clause). + const char* version_description =R"( +-- trim operator: case insensitive #140 +-- add exception handling to avoid crashes, and produce informative messages instead #141 +-- case-insensitive in the case of is null or is not null predicates. #141 +-- a fix for missing check-type, which cause a crash(trim operator) #142 +-- cast null operations returned false instead of null. #143 +-- adding another way to generate TPCDS data, this method is faster and efficient, it launches multiple instances of data-generators and uses less disk space #145 +-- the scripts use the dsdgen application resides on https://github.com/galsalomon66/tpc-ds-datagen-to-aws-s3 +the whole system resides in a container [ docker pull galsl/fedora_38:tpcds_v2 ] #146 +-- upon logical_operand(and/or) the parser-builder does not use case-insensitive compare function, resulting in wrong evaluation #147 )"; - _fn_engine_version() - { + {//it means it will return a single result line, in case of multi-rows input object aggregate = true; } @@ -2535,6 +2594,17 @@ bool base_statement::is_column_reference() const return false; } +std::string base_statement::get_key_from_projection() +{ + variable* v_name = dynamic_cast(this); + + if(v_name) { + return v_name->get_name(); + } else { + throw base_s3select_exception("key not present"); + } +} + bool base_statement::is_nested_aggregate(bool &aggr_flow) const { if (is_aggregate()) diff --git a/src/s3select/include/s3select_json_parser.h b/src/s3select/include/s3select_json_parser.h index aa06163f5..4b10bd732 100644 --- a/src/s3select/include/s3select_json_parser.h +++ b/src/s3select/include/s3select_json_parser.h @@ -826,4 +826,3 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, #endif - diff --git a/src/s3select/include/s3select_oper.h b/src/s3select/include/s3select_oper.h index 89544fc1d..67bfec2a7 100644 --- a/src/s3select/include/s3select_oper.h +++ b/src/s3select/include/s3select_oper.h @@ -389,7 +389,20 @@ struct binop_modulo { throw base_s3select_exception("Mod zero is not allowed"); } else { - return a % b; + return a % b; + } + } +}; + +struct binop_float_modulo +{ + double operator()(double a, double b) + { + if (b == 0) + { + throw base_s3select_exception("Mod zero is not allowed"); + } else { + return fmod(a, b); } } }; @@ -1098,8 +1111,10 @@ public: value & operator%(const value &v) { - if(v.type == value_En_t::DECIMAL) { + if(v.type == value_En_t::DECIMAL && this->type == value_En_t::DECIMAL) { return compute(*this,v); + } else if(v.type == value_En_t::FLOAT || this->type == value_En_t::FLOAT) { + return compute(*this,v); } else { throw base_s3select_exception("wrong use of modulo operation!"); } @@ -1478,6 +1493,7 @@ public: const base_statement* get_aggregate() const; bool is_nested_aggregate(bool&) const; bool is_column_reference() const; + std::string get_key_from_projection(); bool mark_aggreagtion_subtree_to_execute(); bool is_statement_contain_star_operation() const; void push_for_cleanup(std::set&); @@ -1714,7 +1730,7 @@ public: virtual bool is_column() const //is reference to column. { - if(m_var_type == var_t::VARIABLE_NAME || m_var_type == var_t::POS || m_var_type == var_t::STAR_OPERATION) + if(m_var_type == var_t::VARIABLE_NAME || m_var_type == var_t::POS || m_var_type == var_t::STAR_OPERATION || m_var_type == var_t::JSON_VARIABLE) { return true; } @@ -1817,7 +1833,7 @@ public: } } - } + } if (m_projection_alias) { @@ -2375,6 +2391,17 @@ class base_date_diff : public base_function ptime1 += boost::posix_time::minutes(ts1_td.minutes() * -1); ptime2 = ts2_ptime + boost::posix_time::hours(ts2_td.hours() * -1); ptime2 += boost::posix_time::minutes(ts2_td.minutes() * -1); + + try{ + ptime1.date().year(); + ptime2.date().year(); + } + catch(std::exception& e) + { + std::string error_msg = "the input timestamp for the diff operation is not correct : " + std::string(e.what()); + throw base_s3select_exception(error_msg.data(),base_s3select_exception::s3select_exp_en_t::FATAL); + } + } }; diff --git a/src/s3select/test/s3select_test.cpp b/src/s3select/test/s3select_test.cpp index 7c372551c..915f14d26 100644 --- a/src/s3select/test/s3select_test.cpp +++ b/src/s3select/test/s3select_test.cpp @@ -1,5 +1,6 @@ #include "s3select_test.h" + TEST(TestS3SElect, s3select_vs_C) { //purpose: validate correct processing of arithmetical expression, it is done by running the same expression @@ -880,7 +881,10 @@ void test_single_column_single_row(const char* input_query,const char* expected_ { ASSERT_TRUE(false); } - ASSERT_EQ(s3_csv_object.get_error_description(),error_description); + if(s3_csv_object.get_error_description().find(error_description) == std::string::npos ) + { + FAIL() << "getting error: " << s3_csv_object.get_error_description() << " instead of: " << error_description << std::endl; + } return; } @@ -1730,6 +1734,7 @@ TEST(TestS3selectFunctions, boolcast) test_single_column_single_row("select cast(0 as bool) from s3object;","false\n"); test_single_column_single_row("select cast(true as bool) from s3object;","true\n"); test_single_column_single_row("select cast('a' as bool) from s3object;","false\n"); + test_single_column_single_row("select cast(null as bool) from s3object;","null\n"); } TEST(TestS3selectFunctions, floatcast) @@ -1737,6 +1742,7 @@ TEST(TestS3selectFunctions, floatcast) test_single_column_single_row("select cast('1234a' as float) from s3object;","#failure#","extra characters after the number"); test_single_column_single_row("select cast('a1234' as float) from s3object;","#failure#","text cannot be converted to a number"); test_single_column_single_row("select cast('999e+999' as float) from s3object;","#failure#","converted value would fall out of the range of the result type!"); + test_single_column_single_row("select cast(null as float) from s3object;","null\n"); } TEST(TestS3selectFunctions, intcast) @@ -1745,6 +1751,7 @@ TEST(TestS3selectFunctions, intcast) test_single_column_single_row("select cast('a1234' as int) from s3object;","#failure#","text cannot be converted to a number"); test_single_column_single_row("select cast('9223372036854775808' as int) from s3object;","#failure#","converted value would fall out of the range of the result type!"); test_single_column_single_row("select cast('-9223372036854775809' as int) from s3object;","#failure#","converted value would fall out of the range of the result type!"); + test_single_column_single_row("select cast(null as int) from s3object;","null\n"); } TEST(TestS3selectFunctions, predicate_as_projection_column) @@ -2064,6 +2071,12 @@ TEST(TestS3selectFunctions, mod) test_single_column_single_row( "select 5%2 from stdin;","1\n"); } +TEST(TestS3selectFunctions, modfloat) +{ +test_single_column_single_row( "select 5.2%2 from stdin;","1.2000000000000002\n"); +test_single_column_single_row( "select 5.2%2.5 from stdin;","0.20000000000000018\n"); +} + TEST(TestS3selectFunctions, modzero) { test_single_column_single_row( "select 0%2 from stdin;","0\n"); @@ -2124,6 +2137,13 @@ TEST(TestS3selectFunctions, isnullnot) test_single_column_single_row( "select \"true\" from stdin where not nullif(1,2) is null;" ,"true\n"); } +TEST(TestS3selectFunctions, case_insensitive_not_null) +{ +test_single_column_single_row( "select \"false\" from stdin where nullif(1,1) is NOT null;" ,""); +test_single_column_single_row( "select \"false\" from stdin where nullif(1,1) is not Null;" ,""); +test_single_column_single_row( "select \"true\" from stdin where nullif(1,1) is Null;" ,"true\n"); +} + TEST(TestS3selectFunctions, isnull1) { test_single_column_single_row( "select \"true\" from stdin where 7 + null is null;" ,"true\n"); @@ -2529,6 +2549,16 @@ TEST(TestS3selectFunctions, trim11) test_single_column_single_row( "select trim(trailing from trim(leading from \" foobar \")) from stdin ;" ,"foobar\n"); } +TEST(TestS3selectFunctions, trim12) +{ +test_single_column_single_row( "select trim(LEADING '1' from '111abcdef111') from s3object ;" ,"abcdef111\n"); +} + +TEST(TestS3selectFunctions, trim13) +{ +test_single_column_single_row( "select trim(TRAILING '1' from '111abcdef111') from s3object ;" ,"111abcdef\n"); +} + TEST(TestS3selectFunctions, likescape) { test_single_column_single_row("select \"true\" from stdin where \"abc_defgh\" like \"abc$_defgh\" escape \"$\";","true\n"); @@ -3444,3 +3474,327 @@ input_json_data = R"( } + + TEST(TestS3selectFunctions, json_queries_format) +{ + std::string result; + std::string expected_result; + std::string input_query; + +std::string input_json_data = R"( + {"root" : [ + {"c1": 891,"c2": 903,"c3": 78,"c4": 566,"c5": 134,"c6": 121,"c7": 203,"c8": 795,"c9": 82,"c10": 135}, + {"c1": 218,"c2": 881,"c3": 840,"c4": 385,"c5": 385,"c6": 674,"c7": 618,"c8": 99,"c9": 296,"c10": 545}, + {"c1": 218,"c2": 881,"c3": 840,"c4": 385,"c5": 385,"c6": 674,"c7": 618,"c8": 99,"c9": 296,"c10": 545} + ] + } + )"; + + expected_result=R"({"_1":1327} +)"; + input_query = "select sum(_1.c1) from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"_1":1461} +)"; + input_query = "select sum(_1.c1) + min(_1.c5) from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + + expected_result=R"({"c1":891} +{"c1":218} +{"c1":218} +)"; + input_query = "select _1.c1 from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"_1":218,"_2":903} +)"; + input_query = "select min(_1.c1), max(_1.c2) from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"c1":891,"c2":903} +{"c1":218,"c2":881} +{"c1":218,"c2":881} +)"; + input_query = "select _1.c1, _1.c2 from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"c2":903,"c1":891,"c4":566} +{"c2":881,"c1":218,"c4":385} +{"c2":881,"c1":218,"c4":385} +)"; + input_query = "select _1.c2, _1.c1, _1.c4 from s3object[*].root ;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"_1":1794} +{"_1":1099} +{"_1":1099} +)"; + input_query = "select _1.c2 + _1.c1 from s3object[*].root ;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"_1":891} +{"_1":218} +{"_1":218} +)"; + input_query = "select nullif(_1.c1, _1.c2) from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"_1":991} +{"_1":318} +{"_1":318} +)"; + input_query = "select _1.c1 + 100 from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"c13":null} +{"c13":null} +{"c13":null} +)"; + input_query = "select _1.c13 from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"_1":null} +{"_1":null} +{"_1":null} +)"; + input_query = "select _1.c15 * 2 from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"_1":null} +{"_1":null} +{"_1":null} +)"; + input_query = "select _1.c15 + _1.c13 from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"x":891} +{"x":218} +{"x":218} +)"; + input_query = "select coalesce(_1.c1, 0) as x from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"x":891,"c2":903} +{"x":218,"c2":881} +{"x":218,"c2":881} +)"; + input_query = "select _1.c1 as x, _1.c2 from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"c2":903,"x":891} +{"c2":881,"x":218} +{"c2":881,"x":218} +)"; + input_query = "select _1.c2, _1.c1 as x from s3object[*].root;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); +} + +TEST(TestS3selectFunctions, json_queries_format_1) +{ + std::string result; + std::string expected_result; + std::string input_query; + + std::string input_json_data = R"( +{ +"firstName": "Joe", +"lastName": "Jackson", +"gender": "male", +"age": "twenty", +"address": { +"streetAddress": "101", +"city": "San Diego", +"state": "CA" +}, +"phoneNumbers": [ +{ "type": "home1", "number": "7349282_1", "addr": 11}, +{ "type": "home2", "number": "7349282_2", "addr": 22}, +{ "type": "home3", "number": "734928_3", "addr": 33}, +{ "type": "home4", "number": "734928_4", "addr": 44}, +{ "type": "home5", "number": "734928_5", "addr": 55}, +{ "type": "home6", "number": "734928_6", "addr": 66}, +{ "type": "home7", "number": "734928_7", "addr": 77} +] +} +)"; + + expected_result=R"({"gender":male} +)"; + input_query = "select _1.gender from s3object[*] ;"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"streetAddress":101} +)"; + input_query = "select _1.address.streetAddress from s3object[*];"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + + expected_result=R"({"addr":11} +)"; + input_query = "select _1.phoneNumbers[0].addr from s3object[*];"; + + run_json_query(input_query.c_str(), input_json_data, result, true); + ASSERT_EQ(result,expected_result); + +} + +TEST(TestS3selectFunctions, json_format_csv_object) +{ + std::string input_query{}; + std::string s3select_res{}; + + std::string input = R"(383,886,777,915,793,335,386,492,649,421 +362,27,690,59,763,926,540,426,172,736 +211,368,567,429,782,530,862,123,67,135 +929,802,22,58,69,167,393,456,11,42 +229,373,421,919,784,537,198,324,315,370 +413,526,91,980,956,873,862,170,996,281 +305,925,84,327,336,505,846,729,313,857 +124,895,582,545,814,367,434,364,43,750 +87,808,276,178,788,584,403,651,754,399 +932,60,676,368,739,12,226,586,94,539 +)"; + + std::string expected_result{}; + + expected_result=R"({"_1":383,"_2":886} +{"_1":362,"_2":27} +{"_1":211,"_2":368} +{"_1":929,"_2":802} +{"_1":229,"_2":373} +{"_1":413,"_2":526} +{"_1":305,"_2":925} +{"_1":124,"_2":895} +{"_1":87,"_2":808} +{"_1":932,"_2":60} +)"; + + input_query = "select _1,_2 from s3object;"; + + s3select_res = run_s3select(input_query, input, "", true, true); + + ASSERT_EQ(s3select_res, expected_result); + + expected_result=R"({"_1":3975} +)"; + + input_query = "select sum(int(_1)) from s3object;"; + + s3select_res = run_s3select(input_query, input, "", true, true); + + ASSERT_EQ(s3select_res, expected_result); + + expected_result=R"({"x":383,"y":886} +{"x":362,"y":27} +{"x":211,"y":368} +{"x":929,"y":802} +{"x":229,"y":373} +{"x":413,"y":526} +{"x":305,"y":925} +{"x":124,"y":895} +{"x":87,"y":808} +{"x":932,"y":60} +)"; + + input_query = "select _1 as x, _2 as y from s3object;"; + + s3select_res = run_s3select(input_query, input, "", true, true); + + ASSERT_EQ(s3select_res, expected_result); + + expected_result = R"({"_1":8} +{"_1":2} +{"_1":3} +{"_1":8} +{"_1":3} +{"_1":5} +{"_1":9} +{"_1":8} +{"_1":8} +{"_1":6} +)"; + + input_query = "select substring(_2, 1, 1) from s3object;"; + + s3select_res = run_s3select(input_query, input, "", true, true); + + ASSERT_EQ(s3select_res, expected_result); + + expected_result = R"({"x":8} +{"x":2} +{"x":3} +{"x":8} +{"x":3} +{"x":5} +{"x":9} +{"x":8} +{"x":8} +{"x":6} +)"; + + input_query = "select substring(_2, 1, 1) as x from s3object;"; + + s3select_res = run_s3select(input_query, input, "", true, true); + + ASSERT_EQ(s3select_res, expected_result); + + expected_result = R"({"c1":383,"_1":385} +{"c1":362,"_1":364} +{"c1":211,"_1":213} +{"c1":929,"_1":931} +{"c1":229,"_1":231} +{"c1":413,"_1":415} +{"c1":305,"_1":307} +{"c1":124,"_1":126} +{"c1":87,"_1":89} +{"c1":932,"_1":934} +)"; + + input_query = "select cast(_1 as int) as c1, c1 + 2 from s3object;"; + + s3select_res = run_s3select(input_query, input, "", true, true); + + ASSERT_EQ(s3select_res, expected_result); + +} + + + + + + diff --git a/src/s3select/test/s3select_test.h b/src/s3select/test/s3select_test.h index 307db8a4b..9e6fe3a12 100644 --- a/src/s3select/test/s3select_test.h +++ b/src/s3select/test/s3select_test.h @@ -617,7 +617,7 @@ std::string run_s3select_opserialization_quot(std::string expression,std::string } // JSON tests API's -int run_json_query(const char* json_query, std::string& json_input,std::string& result) +int run_json_query(const char* json_query, std::string& json_input,std::string& result, bool json_format = false) {//purpose: run single-chunk json queries s3select s3select_syntax; @@ -628,19 +628,27 @@ int run_json_query(const char* json_query, std::string& json_input,std::string& return -1; } - json_object json_query_processor(&s3select_syntax); + json_object m_s3_json_object; + json_object::csv_definitions json_definitions; + + if(json_format) { + json_definitions.output_json_format = true; + } + + m_s3_json_object.set_json_query(&s3select_syntax, json_definitions); + result.clear(); - status = json_query_processor.run_s3select_on_stream(result, json_input.data(), json_input.size(), json_input.size()); + status = m_s3_json_object.run_s3select_on_stream(result, json_input.data(), json_input.size(), json_input.size(), json_format); std::string prev_result = result; result.clear(); - status = json_query_processor.run_s3select_on_stream(result, 0, 0, json_input.size()); + status = m_s3_json_object.run_s3select_on_stream(result, 0, 0, json_input.size(), json_format); result = prev_result + result; return status; } -std::string run_s3select(std::string expression,std::string input, const char* json_query = "") +std::string run_s3select(std::string expression,std::string input, const char* json_query = "", bool json_format = false, bool csv_json_format = true) {//purpose: run query on multiple rows and return result(multiple projections). s3select s3select_syntax; std::string parquet_input = input; @@ -654,52 +662,64 @@ std::string run_s3select(std::string expression,std::string input, const char* j std::string s3select_result; std::string json_result; - s3selectEngine::csv_object s3_csv_object(&s3select_syntax); - s3_csv_object.m_csv_defintion.redundant_column = false; + + csv_object::csv_defintions csv_definitions; + + if(json_format) { + csv_definitions.output_json_format = true; + } + + csv_definitions.redundant_column = false; + + s3selectEngine::csv_object s3_csv_object; + s3_csv_object.set_csv_query(&s3select_syntax, csv_definitions); s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); -#ifdef _ARROW_EXIST - static int file_no = 1; - csv_to_parquet(parquet_input); - std::string parquet_result; - run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); + if(!csv_json_format) { - if (strcmp(parquet_result.c_str(),s3select_result.c_str())) - { - std::cout << "failed on query " << expression << std::endl; - std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl; + #ifdef _ARROW_EXIST + static int file_no = 1; + csv_to_parquet(parquet_input); + std::string parquet_result; + run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); - { - std::string buffer; + if (strcmp(parquet_result.c_str(),s3select_result.c_str())) + { + std::cout << "failed on query " << expression << std::endl; + std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl; - std::ifstream f(PARQUET_FILENAME); - f.seekg(0, std::ios::end); - buffer.resize(f.tellg()); - f.seekg(0); - f.read(buffer.data(), buffer.size()); + { + std::string buffer; - std::string fn = std::string("./failed_test_input_") + std::to_string(file_no) + std::string(".parquet"); - std::ofstream fw(fn.c_str()); - fw.write(buffer.data(), buffer.size()); + std::ifstream f(PARQUET_FILENAME); + f.seekg(0, std::ios::end); + buffer.resize(f.tellg()); + f.seekg(0); + f.read(buffer.data(), buffer.size()); - fn = std::string("./failed_test_input_") + std::to_string(file_no++) + std::string(".csv"); - std::ofstream fw2(fn.c_str()); - fw2.write(parquet_input.data(), parquet_input.size()); + std::string fn = std::string("./failed_test_input_") + std::to_string(file_no) + std::string(".parquet"); + std::ofstream fw(fn.c_str()); + fw.write(buffer.data(), buffer.size()); - } - } + fn = std::string("./failed_test_input_") + std::to_string(file_no++) + std::string(".csv"); + std::ofstream fw2(fn.c_str()); + fw2.write(parquet_input.data(), parquet_input.size()); - parquet_csv_report_error(parquet_result,s3select_result); -#endif //_ARROW_EXIST + } + } + + parquet_csv_report_error(parquet_result,s3select_result); + #endif //_ARROW_EXIST - if(strlen(json_query) == 0) { - json_query = convert_query(expression); - } + if(strlen(json_query) == 0) { + json_query = convert_query(expression); + } - if(strcmp(json_query,JSON_NO_RUN)) { - run_json_query(json_query, js, json_result); - json_csv_report_error(json_result, s3select_result); + if(strcmp(json_query,JSON_NO_RUN)) { + run_json_query(json_query, js, json_result, json_format); + json_csv_report_error(json_result, s3select_result); + } } return s3select_result; -- cgit v1.2.3