summaryrefslogtreecommitdiffstats
path: root/src/s3select
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-23 16:45:17 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-23 16:45:44 +0000
commit17d6a993fc17d533460c5f40f3908c708e057c18 (patch)
tree1a3bd93e0ecd74fa02f93a528fe2f87e5314c4b5 /src/s3select
parentReleasing progress-linux version 18.2.2-0progress7.99u1. (diff)
downloadceph-17d6a993fc17d533460c5f40f3908c708e057c18.tar.xz
ceph-17d6a993fc17d533460c5f40f3908c708e057c18.zip
Merging upstream version 18.2.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/s3select')
-rw-r--r--src/s3select/.github/workflows/cmake.yml4
-rwxr-xr-xsrc/s3select/TPCDS/generate_upload_and_remove_infra.bash170
-rw-r--r--src/s3select/TPCDS/run_tpcds.bash31
-rw-r--r--src/s3select/TPCDS/tpcds_functions.bash17
-rw-r--r--src/s3select/example/s3select_example.cpp64
-rw-r--r--src/s3select/include/s3select.h271
-rw-r--r--src/s3select/include/s3select_csv_parser.h67
-rw-r--r--src/s3select/include/s3select_functions.h110
-rw-r--r--src/s3select/include/s3select_json_parser.h1
-rw-r--r--src/s3select/include/s3select_oper.h35
-rw-r--r--src/s3select/test/s3select_test.cpp356
-rw-r--r--src/s3select/test/s3select_test.h98
12 files changed, 1092 insertions, 132 deletions
diff --git a/src/s3select/.github/workflows/cmake.yml b/src/s3select/.github/workflows/cmake.yml
index 577eb19cc..f8a836a90 100644
--- a/src/s3select/.github/workflows/cmake.yml
+++ b/src/s3select/.github/workflows/cmake.yml
@@ -34,10 +34,10 @@ jobs:
sudo curl -L https://dist.apache.org/repos/dist/dev/arrow/KEYS | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://apache.jfrog.io/artifactory/arrow/ubuntu focal main"
sudo apt-get update
- sudo apt-get install -y -V libarrow-dev
+ sudo apt-get install -y -V libarrow-dev=10.0.1-1
- name: install parquet
- run: sudo apt-get install -y -V libparquet-dev
+ run: sudo apt-get install -y -V libparquet-dev=10.0.1-1
- name: install-gtest
diff --git a/src/s3select/TPCDS/generate_upload_and_remove_infra.bash b/src/s3select/TPCDS/generate_upload_and_remove_infra.bash
new file mode 100755
index 000000000..2b23dde59
--- /dev/null
+++ b/src/s3select/TPCDS/generate_upload_and_remove_infra.bash
@@ -0,0 +1,170 @@
+#!/bin/bash
+
+## this script resides in [galsl/fedora_38:tpcds_v2] docker container.
+## the container uses the following repo [ https://github.com/galsalomon66/tpc-ds-datagen-to-aws-s3 ] for the dsdgen application.
+## the purpose of this script it to launch multiple instances of the dsdgen-application(depends on number of cores)
+## the flow splits between the very-big-tables and the small tables.
+## the num_of_cpu defines the size of parallelism, the num_of_partitions defines the amount chunks that combines togather a single table (it could be huge).
+## each cycle of parallel generate-application, ended with flow that uploads the generated files into S3(its done in parallel), upon all files are uploaded
+## it removes all files, i.e. for 3TB scale there is no need for 3TB of disk-space (as for S3-storage capacity it obvious ...)
+
+
+## TODO set by te user
+TPCDS_DIR=/tpcds_output/
+
+
+all_tables="call_center
+catalog_page
+customer_address
+customer
+customer_demographics
+date_dim
+household_demographics
+income_band
+item
+promotion
+reason
+ship_mode
+store
+time_dim
+warehouse
+web_page
+web_site
+catalog_returns
+catalog_sales
+web_returns
+web_sales
+store_returns
+store_sales"
+
+#big tables and also parent
+#parent table means it got a child table, i.e. there is a relation between them.
+parent_tables="store_sales catalog_sales web_sales inventory"
+
+## not a parent table
+standalone_tables="call_center catalog_page customer_address customer customer_demographics date_dim household_demographics income_band
+item promotion reason ship_mode store time_dim warehouse web_page web_site"
+#small_tables=""
+
+num_of_cpu=56
+num_of_partitions=0
+
+create_dsdgen_workers_non_parent_tables()
+{
+
+[ ! -d ${TPCDS_DIR} ] && echo ${TPCDS_DIR} not exist && exit
+num_of_partitions=$(echo 1 | awk -v sc=${SCALE} -v c=${num_of_cpu} '{print int((sc/1000)*c);}')
+if [ $num_of_partitions -le 1 ]
+then
+ num_of_partitions=2
+fi
+
+echo "small tables="num_of_partitions=${num_of_partitions}
+
+((i=1))
+
+for t in ${standalone_tables}
+do
+ for c in $(seq 1 ${num_of_partitions})
+ do
+ ## the command line defines which table, what scale(size), paratition size, what partition to produce and where to produce it.
+ echo "time ./dsdgen -dir ${TPCDS_DIR} -table ${t} -scale ${SCALE} -force -parallel ${num_of_partitions} -child ${c} &" >> generate_upload_and_remove_exec.bash
+ ## number of CPU
+ if [ $(( i++ % ${num_of_cpu} )) -eq 0 ]
+ then
+ echo wait >> generate_upload_and_remove_exec.bash
+ # upon complete with wait, loop on generated dat files, upload each in parallel, each upload is done, remove file
+ # upload && remove
+ #
+ echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash
+ fi
+ done
+done
+echo wait >> generate_upload_and_remove_exec.bash
+echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash
+echo "echo small tables done." >> generate_upload_and_remove_exec.bash
+
+chmod +x generate_upload_and_remove_exec.bash
+}
+
+create_dsdgen_workers()
+{
+
+[ ! -d ${TPCDS_DIR} ] && echo ${TPCDS_DIR} not exist && exit
+num_of_partitions=$(echo 1 | awk -v sc=${SCALE} -v c=${num_of_cpu} '{print int((sc/10)*c);}')
+echo "big tables="num_of_partitions=${num_of_partitions}
+if [ $num_of_partitions -le 1 ]
+then
+ num_of_partitions=2
+fi
+
+((i=1))
+touch generate_upload_and_remove_exec.bash
+rm -f generate_upload_and_remove_exec.bash
+
+echo "#!/bin/bash" >> generate_upload_and_remove_exec.bash
+## upload_and_remove_func.bash include functions for upload and remove
+echo ". generate_upload_and_remove_infra.bash" >> generate_upload_and_remove_exec.bash
+echo "cd /tpc-ds-datagen-to-aws-s3/tpc-ds/v2.11.0rc2/tools" >> generate_upload_and_remove_exec.bash
+
+for t in ${parent_tables}
+do
+ for c in $(seq 1 ${num_of_partitions})
+ do
+ echo "time ./dsdgen -dir ${TPCDS_DIR} -table ${t} -scale ${SCALE} -force -parallel ${num_of_partitions} -child ${c} &" >> generate_upload_and_remove_exec.bash
+ ## number of CPU
+ if [ $(( i++ % ${num_of_cpu} )) -eq 0 ]
+ then
+ echo wait >> generate_upload_and_remove_exec.bash
+ # upon complete with wait, loop on generated dat files, upload each in parallel, each upload is done, remove file
+ # upload && remove
+ #
+ echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash
+ fi
+ done
+done
+echo wait >> generate_upload_and_remove_exec.bash
+echo upload_and_remove_worker_func >> generate_upload_and_remove_exec.bash
+echo "echo big tables done." >> generate_upload_and_remove_exec.bash
+
+## adding the production of the other tables
+create_dsdgen_workers_non_parent_tables
+
+chmod +x generate_upload_and_remove_exec.bash
+
+## the generated script bellow contains all is needed for creating TPCDS tables in S3-storage.
+## should execute by the user
+#./generate_upload_and_remove_exec.bash
+
+}
+
+upload_and_remove_worker_func()
+{
+# create list of tasks to run in background, remove each uploaded file upon completion
+(i=0)
+touch upload_and_remove_exec.bash
+rm -f upload_and_remove_exec.bash
+
+echo "#!/bin/bash" >> upload_and_remove_exec.bash
+
+for f in $(ls ${TPCDS_DIR}/*.dat)
+do
+ #echo $f
+ table_name=$(basename $f | sed 's/_[0-9]\+_[0-9]\+/ /' | awk '{print $1;}')
+ echo "(aws s3api put-object --bucket hive --key scale_${SCALE}/${table_name}/$(basename $f) --body ${f} --endpoint-url ${S3_ENDPOINT} > /dev/null 2>&1 && echo upload ${f} && rm -f ${f}) &" >> upload_and_remove_exec.bash
+ if [ $(( i++ % ${num_of_cpu} )) -eq 0 ]
+ then
+ echo wait >> upload_and_remove_exec.bash
+ fi
+done
+
+echo wait >> upload_and_remove_exec.bash
+#upload and remove all generated files
+chmod +x upload_and_remove_exec.bash
+cp upload_and_remove_exec.bash upload_and_remove.bash_${RANDOM} ## debug
+
+## start upload and remove in parallel
+./upload_and_remove_exec.bash
+
+}
+
diff --git a/src/s3select/TPCDS/run_tpcds.bash b/src/s3select/TPCDS/run_tpcds.bash
new file mode 100644
index 000000000..f1c3eecf9
--- /dev/null
+++ b/src/s3select/TPCDS/run_tpcds.bash
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+## this script is the entry-point of tpcds-data-generation
+## the first and the only argument is the SCALE factor
+
+[ $# -lt 1 ] && echo "type a single number for the scale (2 --> 3000)" && exit
+
+re='^[0-9]+$'
+[[ ! $1 =~ $re ]] && echo "SCALE should be a number" && exit
+
+## the following code lines accepts env-variables for the S3 system
+[ -z ${S3_ENDPOINT} ] && echo "missing env-variable S3_ENDPOINT" && exit
+[ -z ${S3_ACCESS_KEY} ] && echo missing env-variable S3_ACCESS_KEY && exit
+[ -z ${S3_SECRET_KEY} ] && echo missing env-variable S3_SECRET_KEY && exit
+
+## updating AWS credentials
+cat ~/.aws/credentials | \
+ awk -v acc=${S3_ACCESS_KEY} '{if($0 ~ /aws_access_key_id/){print "aws_access_key_id = ",acc;} else{print $0;}}' | \
+ awk -v acc=${S3_SECRET_KEY} '{if($0 ~ /aws_secret_access_key/){print "aws_secret_access_key = ",acc;} else{print $0;}}' > /tmp/credentials
+
+cat /tmp/credentials > ~/.aws/credentials
+
+export SCALE=$1
+
+. ./generate_upload_and_remove_infra.bash
+
+## create generate_upload_and_remove_exec.bash
+create_dsdgen_workers
+## running tpcds data generator script
+time /generate_upload_and_remove_exec.bash
+
diff --git a/src/s3select/TPCDS/tpcds_functions.bash b/src/s3select/TPCDS/tpcds_functions.bash
index 67a64ff0b..4a3580418 100644
--- a/src/s3select/TPCDS/tpcds_functions.bash
+++ b/src/s3select/TPCDS/tpcds_functions.bash
@@ -18,6 +18,23 @@ start_tpcds;'
}
+run_tpcds_v2()
+{
+## the following command is using a different container
+## this container is using a C version for dsdgen(the tpcds data generation application)
+## the container also contains a bash script to run a mulitple instances of dsdgen to accelarate the total throughput
+
+## the SCALE factor is first and only argument for /run_tpcds.bash script.
+## it need to set the following env-variables S3_ENDPOINT,S3_ACCESS_KEY,S3_SECRET_KEY
+## and to mount the container directory /tpcds_output to the host directory, this directory is for the dsdgen application
+## the generated data is stored there, and later uploaded into S3 storage, and removed.
+
+
+sudo docker run -v /tmp/tpcds_output/:/tpcds_output/ --env S3_ENDPOINT=http://172.17.0.1:8000 --env S3_ACCESS_KEY=b2345678901234567890 \
+--env S3_SECRET_KEY=b234567890123456789012345678901234567890 -it galsl/fedora_38:tpcds_v2 bash -c '/run_tpcds.bash 2'
+
+}
+
move_from_tpcds_bucket_to_hive_bucket()
{
## for the case it needs to move into different bucket(where trino is point at)
diff --git a/src/s3select/example/s3select_example.cpp b/src/s3select/example/s3select_example.cpp
index 71aff3d01..3a4b20dc1 100644
--- a/src/s3select/example/s3select_example.cpp
+++ b/src/s3select/example/s3select_example.cpp
@@ -11,6 +11,9 @@
using namespace s3selectEngine;
using namespace BOOST_SPIRIT_CLASSIC_NS;
+std::string output_format{};
+std::string header_info{};
+
class awsCli_handler {
@@ -154,7 +157,7 @@ public:
//std::string get_error_description(){}
- std::string get_result()
+ std::string& get_result()
{
return m_result;
}
@@ -196,14 +199,20 @@ public:
{
csv.ignore_header_info = true;
}
- else if (m_header_info.compare("USE") == 0)
+ else if (header_info.compare("USE") == 0)
{
csv.use_header_info = true;
}
- m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
+ if(output_format.compare("JSON") == 0) {
+ csv.output_json_format = true;
+ }
+
+ m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object());
+ m_s3_csv_object->set_csv_query(s3select_syntax.get(), csv);
}
+
if (s3select_syntax->get_error_description().empty() == false)
{
header_size = create_header_records(m_buff_header.get());
@@ -363,6 +372,8 @@ int process_json_query(const char* input_query,const char* fname)
{//purpose: process json query
s3select s3select_syntax;
+ s3selectEngine::json_object m_s3_json_object;
+ json_object::csv_definitions json;
int status = s3select_syntax.parse_query(input_query);
if (status != 0)
{
@@ -370,6 +381,10 @@ int process_json_query(const char* input_query,const char* fname)
return -1;
}
+ if(output_format.compare("JSON") == 0) {
+ json.output_json_format = true;
+ }
+
std::ifstream input_file_stream;
try {
input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary);
@@ -381,7 +396,7 @@ int process_json_query(const char* input_query,const char* fname)
}
auto object_sz = boost::filesystem::file_size(fname);
- json_object json_query_processor(&s3select_syntax);
+ m_s3_json_object.set_json_query(&s3select_syntax, json);
std::string buff(BUFFER_SIZE,0);
std::string result;
@@ -393,10 +408,11 @@ int process_json_query(const char* input_query,const char* fname)
{
bytes_read += read_sz;
std::cout << "read next chunk " << chunk_count++ << ":" << read_sz << ":" << bytes_read << "\r";
+
result.clear();
try{
- status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz);
+ status = m_s3_json_object.run_s3select_on_stream(result, buff.data(), read_sz, object_sz, json.output_json_format);
} catch (base_s3select_exception &e)
{
std::cout << e.what() << std::endl;
@@ -416,7 +432,7 @@ int process_json_query(const char* input_query,const char* fname)
std::cout << "failure upon processing " << std::endl;
return -1;
}
- if(json_query_processor.is_sql_limit_reached())
+ if(m_s3_json_object.is_sql_limit_reached())
{
std::cout << "json processing reached limit " << std::endl;
break;
@@ -425,7 +441,7 @@ int process_json_query(const char* input_query,const char* fname)
}
try{
result.clear();
- json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz);
+ m_s3_json_object.run_s3select_on_stream(result, 0, 0, object_sz, json.output_json_format);
} catch (base_s3select_exception &e)
{
std::cout << e.what() << std::endl;
@@ -499,6 +515,14 @@ int run_on_localFile(char* input_query)
csv.use_header_info = false;
csv.quote_fields_always=false;
+ if(output_format.compare("JSON") == 0) {
+ csv.output_json_format = true;
+ }
+
+ if(header_info.compare("USE") == 0) {
+ csv.use_header_info = true;
+ }
+
#define CSV_QUOT "CSV_ALWAYS_QUOT"
#define CSV_COL_DELIM "CSV_COLUMN_DELIMETER"
#define CSV_ROW_DELIM "CSV_ROW_DELIMITER"
@@ -521,7 +545,8 @@ int run_on_localFile(char* input_query)
csv.use_header_info = true;
}
- s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);
+ s3selectEngine::csv_object s3_csv_object;
+ s3_csv_object.set_csv_query(&s3select_syntax, csv);
std::function<void(const char*)> fp_debug = [](const char* msg)
{
@@ -585,6 +610,14 @@ int run_on_localFile(char* input_query)
return 0;
}
+std::string get_ranged_string(std::string& inp)
+{
+ size_t startPos = inp.find("<Payload>");
+ size_t endPos = inp.find("</Payload>");
+
+ return inp.substr(startPos,endPos-startPos);
+}
+
int run_on_single_query(const char* fname, const char* query)
{
@@ -632,11 +665,12 @@ int run_on_single_query(const char* fname, const char* query)
if(status<0)
{
std::cout << "failure on execution " << std::endl;
+ std::cout << get_ranged_string( awscli->get_result() ) << std::endl;
break;
}
else
{
- std::cout << awscli->get_result() << std::endl;
+ std::cout << get_ranged_string( awscli->get_result() ) << std::endl;
}
if(!read_sz || input_file_stream.eof())
@@ -662,12 +696,24 @@ int main(int argc,char **argv)
continue;
}
+ if (!strcmp(argv[i], "-output"))
+ {//object recieved as CLI parameter
+ output_format = argv[i + 1];
+ continue;
+ }
+
if (!strcmp(argv[i], "-q"))
{
query = argv[i + 1];
continue;
}
+ if (!strcmp(argv[i], "-HeaderInfo"))
+ {
+ header_info = argv[i + 1];
+ continue;
+ }
+
if (!strcmp(argv[i], "-cmds"))
{//query file contain many queries
query_file = argv[i + 1];
diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h
index 3ac111351..667c92ba9 100644
--- a/src/s3select/include/s3select.h
+++ b/src/s3select/include/s3select.h
@@ -18,6 +18,7 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <functional>
+#include <unordered_set>
#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}
@@ -1293,11 +1294,11 @@ void push_logical_operator::builder(s3select* self, const char* a, const char* b
std::string token(a, b);
logical_operand::oplog_t l = logical_operand::oplog_t::NA;
- if (token == "and")
+ if (boost::iequals(token,"and"))
{
l = logical_operand::oplog_t::AND;
}
- else if (token == "or")
+ else if (boost::iequals(token,"or"))
{
l = logical_operand::oplog_t::OR;
}
@@ -1633,8 +1634,10 @@ void push_like_predicate_escape::builder(s3select* self, const char* a, const ch
void push_is_null_predicate::builder(s3select* self, const char* a, const char* b) const
{
- //expression is null, is not null
+ //expression could be is null OR is not null
std::string token(a, b);
+ //to_lower enable case insensitive
+ boost::algorithm::to_lower(token);
bool is_null = true;
for(size_t i=0;i<token.size();i++)
@@ -1936,7 +1939,7 @@ void push_trim_type::builder(s3select* self, const char* a, const char* b) const
{
std::string token(a, b);
- auto trim_option = [&](const char *s){return strncmp(a,s,strlen(s))==0;};
+ auto trim_option = [&](const char *s){return strncasecmp(a,s,strlen(s))==0;};
if(trim_option("leading"))
{
@@ -2156,10 +2159,12 @@ struct s3select_csv_definitions //TODO
bool quote_fields_asneeded;
bool redundant_column;
bool comment_empty_lines;
+ bool output_json_format;
std::vector<char> comment_chars;
std::vector<char> trim_chars;
+ std::string schema;
- s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false) {}
+ s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false), output_json_format(false) {}
};
@@ -2172,6 +2177,8 @@ protected:
scratch_area* m_sa;
std::string m_obj_name;
bool m_aggr_flow = false; //TODO once per query
+ bool is_star = false;
+ bool is_json = false;
bool m_is_to_aggregate;
std::vector<base_statement*> m_projections;
base_statement* m_where_clause;
@@ -2182,9 +2189,11 @@ protected:
unsigned long m_processed_rows;
size_t m_returned_bytes_size;
std::function<void(const char*)> fp_ext_debug_mesg;//dispache debug message into external system
+ std::vector<std::string> m_projection_keys{};
public:
s3select_csv_definitions m_csv_defintion;//TODO add method for modify
+ std::string m_error_description;
enum class Status {
END_OF_STREAM,
@@ -2196,6 +2205,16 @@ public:
Status m_sql_processing_status;
+ void set_processing_time_error()
+ {
+ m_sql_processing_status = Status::SQL_ERROR;
+ }
+
+ bool is_processing_time_error()
+ {
+ return m_sql_processing_status == Status::SQL_ERROR;
+ }
+
Status get_sql_processing_status()
{
return m_sql_processing_status;
@@ -2206,6 +2225,60 @@ public:
return m_sql_processing_status == Status::LIMIT_REACHED;
}
+ void set_star_true() {
+ is_star = true;
+ }
+
+ void set_projection_keys(std::vector<base_statement*> m_projections)
+ {
+ std::vector<std::string> alias_values{};
+ std::unordered_set<base_statement*> alias_projection_keys{};
+ bool is_output_json_format = m_csv_defintion.output_json_format;
+
+ for (auto& a : *m_s3_select->get_aliases()->get())
+ {
+ alias_values.push_back(a.first);
+ alias_projection_keys.insert(a.second);
+ }
+
+ size_t m_alias_index = 0;
+ int index_json_projection = 0;
+ is_json = m_s3_select->is_json_query();
+
+ for (auto& p : m_projections)
+ {
+ if(p->is_statement_contain_star_operation())
+ {
+ set_star_true();
+ }
+ p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
+
+ std::string key_from_projection{};
+ if(p->is_column()){
+ key_from_projection = p->get_key_from_projection();
+ }
+
+ if(alias_projection_keys.count(p) == 0 && p->is_column()) {
+ m_projection_keys.push_back(key_from_projection);
+ } else if(alias_projection_keys.count(p) > 0 && p->is_column()) {
+ m_projection_keys.push_back(alias_values[m_alias_index++]);
+ } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) > 0 ) {
+ m_projection_keys.push_back(alias_values[m_alias_index++]);
+ } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) == 0) {
+ std::string index_json = "_" + std::to_string(++index_json_projection);
+ m_projection_keys.push_back(index_json);
+ }
+ }
+
+ if(m_s3_select->is_json_query()) {
+ for(auto& k: m_projection_keys) {
+ size_t lastDotPosition = k.find_last_of('.');
+ std::string extractedPart = k.substr(lastDotPosition + 1);
+ k = extractedPart;
+ }
+ }
+ }
+
void set_base_defintions(s3select* m)
{
if(m_s3_select || !m)
@@ -2225,10 +2298,8 @@ public:
m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
}
- for (auto& p : m_projections)
- {
- p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
- }
+ set_projection_keys(m_projections);
+
m_is_to_aggregate = true;//TODO not correct. should be set upon end-of-stream
m_aggr_flow = m_s3_select->is_aggregate_query();
@@ -2269,45 +2340,95 @@ public:
return m_returned_bytes_size;
}
- void result_values_to_string(multi_values& projections_resuls, std::string& result)
+ void json_result_format(multi_values& projections_results, std::string& result, std::string& output_delimiter)
{
- size_t i = 0;
+ result += "{";
+ int j = 0;
+ for (size_t i = 0; i < projections_results.values.size(); ++i)
+ {
+ auto& res = projections_results.values[i];
+ std::string label = "_";
+ label += std::to_string(i + 1);
+
+ if (i > 0) {
+ result += output_delimiter;
+ }
+
+ if(!is_star) {
+ result += "\"" + m_projection_keys[j] + "\":";
+ } else if(is_star && !is_json) {
+ result += "\"" + label + "\":";
+ }
+
+ result.append(res->to_string());
+ m_returned_bytes_size += strlen(res->to_string());
+ ++j;
+ }
+ result += "}";
+
+ }
+
+
+ void result_values_to_string(multi_values& projections_resuls, std::string& result)
+{
std::string output_delimiter(1,m_csv_defintion.output_column_delimiter);
std::string output_row_delimiter(1,m_csv_defintion.output_row_delimiter);
+ if(m_csv_defintion.output_json_format && projections_resuls.values.size()) {
+ json_result_format(projections_resuls, result, output_delimiter);
+ result.append(output_row_delimiter);
+ return;
+ }
+
+ size_t i = 0;
for(auto& res : projections_resuls.values)
{
+
+ std::string column_result;
+
+ try{
+ column_result = res->to_string();
+ }
+ catch(std::exception& e)
+ {
+ column_result = "{failed to compute projection: " + std::string(e.what()) + "}";
+ m_error_description = column_result;
+ set_processing_time_error();
+ }
+
+
if(fp_ext_debug_mesg)
- fp_ext_debug_mesg( res->to_string() );
+ fp_ext_debug_mesg(column_result.data());
if (m_csv_defintion.quote_fields_always) {
std::ostringstream quoted_result;
- quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
+ quoted_result << std::quoted(column_result,m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
result.append(quoted_result.str());
+
m_returned_bytes_size += quoted_result.str().size();
- }//TODO to add asneeded
+ }//TODO to add asneeded
else
{
- result.append(res->to_string());
- m_returned_bytes_size += strlen(res->to_string());
+ result.append(column_result);
+ m_returned_bytes_size += column_result.size();
+
}
- if(!m_csv_defintion.redundant_column) {
- if(++i < projections_resuls.values.size()) {
- result.append(output_delimiter);
- m_returned_bytes_size += output_delimiter.size();
- }
- }
- else {
- result.append(output_delimiter);
- m_returned_bytes_size += output_delimiter.size();
- }
- }
- if(!m_aggr_flow){
- result.append(output_row_delimiter);
+ if(!m_csv_defintion.redundant_column) {
+ if(++i < projections_resuls.values.size()) {
+ result.append(output_delimiter);
+ m_returned_bytes_size += output_delimiter.size();
+ }
+ } else {
+ result.append(output_delimiter);
m_returned_bytes_size += output_delimiter.size();
+ }
}
- }
+ if(!m_aggr_flow) {
+ result.append(output_row_delimiter);
+ m_returned_bytes_size += output_delimiter.size();
+ }
+}
Status getMatchRow( std::string& result)
{
@@ -2336,7 +2457,7 @@ public:
}
result_values_to_string(projections_resuls,result);
- return m_sql_processing_status = Status::END_OF_STREAM;
+ return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::END_OF_STREAM);
}
m_processed_rows++;
@@ -2368,9 +2489,9 @@ public:
i->set_last_call();
i->set_skip_non_aggregate(false);//projection column is set to be runnable
projections_resuls.push_value( &(i->eval()) );
- }
+ }
result_values_to_string(projections_resuls,result);
- return m_sql_processing_status = Status::LIMIT_REACHED;
+ return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::LIMIT_REACHED);
}
}
while (multiple_row_processing());
@@ -2394,6 +2515,7 @@ public:
{
a.second->invalidate_cache_result();
}
+
}
while (multiple_row_processing() && m_where_clause && !(where_clause_result = m_where_clause->eval().is_true()) && !(m_is_limit_on && m_processed_rows == m_limit));
@@ -2421,12 +2543,18 @@ public:
for (auto& i : m_projections)
{
projections_resuls.push_value( &(i->eval()) );
- }
- result_values_to_string(projections_resuls,result);
+ }
+ result_values_to_string(projections_resuls,result);
+ if(m_sql_processing_status == Status::SQL_ERROR)
+ {
+ return m_sql_processing_status;
+ }
}
-
}
- return is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT);
+
+ return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) :
+ (is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT));
+
}//getMatchRow
@@ -2477,14 +2605,12 @@ public:
{
//return;
}
-
- set_base_defintions(s3_query);
m_csv_defintion = csv;
+ set_base_defintions(s3_query);
}
private:
bool m_skip_last_line;
- std::string m_error_description;
char* m_stream;
char* m_end_stream;
std::vector<char*> m_row_tokens;
@@ -2604,26 +2730,36 @@ public:
m_error_description = "escaped_char_missing failure while csv parsing";
return -1;
}
- catch(io::error::escaped_string_not_closed& err)
+ catch(io::error::escaped_string_not_closed& err)
{
m_error_description = "escaped_string_not_closed failure while csv parsing";
return -1;
}
- catch(io::error::line_length_limit_exceeded& err)
+ catch(io::error::line_length_limit_exceeded& err)
{
m_error_description = "line_length_limit_exceeded failure while csv parsing";
return -1;
}
- catch(io::error::with_file_name& err)
+ catch(io::error::missmatch_of_begin_end& err)
{
- m_error_description = "with_file_name failure while csv parsing";
+ m_error_description = "missmatch_of_begin_end failure while csv parsing" + std::string(err.what());
return -1;
}
- catch(io::error::with_file_line& err)
+ catch(io::error::missmatch_end& err)
{
- m_error_description = "with_file_line failure while csv parsing";
+ m_error_description = "missmatch_end failure while csv parsing" + std::string(err.what());
return -1;
}
+ catch(io::error::with_file_name& err)
+ {
+ m_error_description = "with_file_name failure while csv parsing";
+ return -1;
+ }
+ catch(std::exception& e)
+ {
+ m_error_description = "error while processing CSV object : " + std::string(e.what());
+ return -1;
+ }
return status;
}
@@ -2634,7 +2770,7 @@ private:
//purpose: the CSV data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores
//for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed.
std::string tmp_buff;
-
+ int status = 0;
m_processed_bytes += stream_length;
m_skip_first_line = false;
@@ -2648,6 +2784,22 @@ private:
p_obj_chunk++;
}
+ if(*p_obj_chunk != m_csv_defintion.row_delimiter)
+ {// previous row can not be completed with current chunk
+ if(fp_ext_debug_mesg)
+ {
+ std::string err_mesg = "** the stream chunk is too small for processing(saved for later) **";
+ fp_ext_debug_mesg(err_mesg.c_str());
+ }
+ //copy the part to be processed later
+ tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
+ //saved for later processing
+ m_last_line.append(tmp_buff);
+ m_previous_line = true;//it means to skip last line
+ //skip processing since the row tail is missing.
+ return 0;
+ }
+
tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
m_previous_line = false;
@@ -2655,7 +2807,7 @@ private:
m_skip_x_first_bytes = tmp_buff.size()+1;
//processing the merged row (previous broken row)
- run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
+ status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
}
if (stream_length && csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter)
@@ -2676,7 +2828,8 @@ private:
stream_length -= (m_last_line.length());
}
- return run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
+ status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
+ return status;
}
public:
@@ -2696,6 +2849,11 @@ public:
m_skip_x_first_bytes=0;
}
+ if(m_stream>m_end_stream)
+ {
+ throw base_s3select_exception(std::string("** m_stream > m_end_stream **") +
+ std::to_string( (m_stream - m_end_stream) ) ,base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
CSVParser _csv_parser("csv", m_stream, m_end_stream);
csv_parser = &_csv_parser;
csv_parser->set_csv_def( m_csv_defintion.row_delimiter,
@@ -2745,6 +2903,10 @@ public:
{
break;//user should request for sql_processing_status
}
+ if(m_sql_processing_status == Status::SQL_ERROR)
+ {
+ return -1;
+ }
} while (true);
@@ -2764,7 +2926,6 @@ class parquet_object : public base_s3object
{
private:
- std::string m_error_description;
parquet_file_parser* object_reader;
parquet_file_parser::column_pos_t m_where_clause_columns;
parquet_file_parser::column_pos_t m_projections_columns;
@@ -2948,11 +3109,13 @@ private:
std::string* m_s3select_result = nullptr;
size_t m_row_count;
bool star_operation_ind;
- std::string m_error_description;
bool m_init_json_processor_ind;
public:
+ class csv_definitions : public s3select_csv_definitions
+ {};
+
void init_json_processor(s3select* query)
{
if(m_init_json_processor_ind)
@@ -2997,6 +3160,7 @@ public:
if(p->is_statement_contain_star_operation())
{
star_operation_ind=true;
+ set_star_true();
break;
}
}
@@ -3095,7 +3259,7 @@ private:
public:
- int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size)
+ int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size, bool json_format = false)
{
int status=0;
m_processed_bytes += stream_length;
@@ -3134,8 +3298,9 @@ public:
return status;
}
- void set_json_query(s3select* s3_query)
+ void set_json_query(s3select* s3_query, csv_definitions csv)
{
+ m_csv_defintion = csv;
set_base_defintions(s3_query);
init_json_processor(s3_query);
}
diff --git a/src/s3select/include/s3select_csv_parser.h b/src/s3select/include/s3select_csv_parser.h
index dab2e4efa..28e8117a6 100644
--- a/src/s3select/include/s3select_csv_parser.h
+++ b/src/s3select/include/s3select_csv_parser.h
@@ -13,6 +13,53 @@ namespace io{
, file_line, file_name);
}
};
+
+ struct missmatch_of_begin_end :
+ base,
+ with_file_name,
+ with_file_line{
+ int begin=-1,end=-1;
+ void set_begin_end(int b,int e){
+ begin=b;
+ end=e;
+ }
+
+ void format_error_message()const override{
+ std::snprintf(error_message_buffer, sizeof(error_message_buffer),
+ "***missmatch_of_begin_end*** Line number %d in file \"%s\" begin{%d} > end{%d}"
+ ,file_line, file_name,begin,end);
+ }
+ };
+
+ struct missmatch_end :
+ base,
+ with_file_name,
+ with_file_line{
+ int end=-1;
+ int block_size=-1;
+ void set_end_block(int e,int b){
+ end = e;
+ block_size = b;
+ }
+ void format_error_message()const override{
+ std::snprintf(error_message_buffer, sizeof(error_message_buffer),
+ "***missmatch_end*** Line number %d in file \"%s\" end{%d} block{%d}"
+ ,file_line, file_name, end, block_size);
+ }
+ };
+
+
+ struct line_is_null :
+ base,
+ with_file_name,
+ with_file_line{
+ void format_error_message()const override{
+ std::snprintf(error_message_buffer, sizeof(error_message_buffer),
+ "***line is NULL*** Line number %d in file \"%s\""
+ ,file_line, file_name);
+ }
+ };
+
}
namespace detail{
@@ -133,7 +180,11 @@ namespace io{
void chop_next_column(char*&line, char*&col_begin, char*&col_end, char& col_delimiter, char& quote, char& escape_char)
{
- assert(line != nullptr);
+ if(line == NULL)
+ {
+ io::error::line_is_null err;
+ throw err;
+ }
col_begin = line;
// the col_begin + (... - col_begin) removes the constness
@@ -312,8 +363,18 @@ class CSVParser
++file_line;
- assert(data_begin < data_end);
- assert(data_end <= block_len*2);
+ if(data_begin > data_end)
+ {
+ io::error::missmatch_of_begin_end err;
+ err.set_begin_end(data_begin,data_end);
+ throw err;
+ }
+ if(data_end > block_len*2)
+ {
+ io::error::missmatch_end err;
+ err.set_end_block(data_end,block_len*2);
+ throw err;
+ }
if(data_begin >= block_len)
{
diff --git a/src/s3select/include/s3select_functions.h b/src/s3select/include/s3select_functions.h
index 8c507fca1..4d88d772e 100644
--- a/src/s3select/include/s3select_functions.h
+++ b/src/s3select/include/s3select_functions.h
@@ -350,6 +350,7 @@ private:
s3select_functions* m_s3select_functions;
variable m_result;
bool m_is_aggregate_function;
+ value eval_result;
void _resolve_name()
{
@@ -437,14 +438,28 @@ public:
{//all rows prior to last row
if(m_skip_non_aggregate_op == false || is_aggregate() == true)
{
- (*m_func_impl)(&arguments, &m_result);
+ try {
+ (*m_func_impl)(&arguments, &m_result);
+ }
+ catch(std::exception& e)
+ {
+ std::string error_msg = "[" + m_func_impl->m_function_name + " failed : " + std::string(e.what()) + "]";
+ throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
}
else if(m_skip_non_aggregate_op == true)
{
for(auto& p : arguments)
{//evaluating the arguments (not the function itself, which is a non-aggregate function)
//i.e. in the following use case substring( , sum(),count() ) ; only sum() and count() are evaluated.
- p->eval();
+ try {
+ p->eval();
+ }
+ catch(std::exception& e)
+ {
+ std::string error_msg = m_func_impl->m_function_name + " failed : " + std::string(e.what());
+ throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
}
}
}
@@ -452,9 +467,27 @@ public:
{//on the last row, the aggregate function is finalized,
//and non-aggregate function is evaluated with the result of aggregate function.
if(is_aggregate())
- (*m_func_impl).get_aggregate_result(&m_result);
+ {
+ try{
+ (*m_func_impl).get_aggregate_result(&m_result);
+ }
+ catch(std::exception& e)
+ {
+ std::string error_msg = m_func_impl->m_function_name + " failed : " + std::string(e.what());
+ throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ }
else
- (*m_func_impl)(&arguments, &m_result);
+ {
+ try{
+ (*m_func_impl)(&arguments, &m_result);
+ }
+ catch(std::exception& e)
+ {
+ std::string error_msg = m_func_impl->m_function_name + " failed : " + std::string(e.what());
+ throw base_s3select_exception(error_msg.data(), base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+ }
}
return m_result.get_value();
@@ -736,6 +769,10 @@ struct _fn_to_int : public base_function
var_result = static_cast<int64_t>(v.dbl());
break;
+ case value::value_En_t::S3NULL:
+ var_result.setnull();
+ break;
+
default:
var_result = v.i64();
break;
@@ -782,6 +819,10 @@ struct _fn_to_float : public base_function
var_result = v.dbl();
break;
+ case value::value_En_t::S3NULL:
+ var_result.setnull();
+ break;
+
default:
var_result = v.i64();
break;
@@ -2017,6 +2058,11 @@ struct _fn_to_bool : public base_function
{
i = func_arg.i64();
}
+ else if (func_arg.type == value::value_En_t::S3NULL)
+ {
+ result->set_null();
+ return true;
+ }
else
{
i = 0;
@@ -2035,12 +2081,14 @@ struct _fn_to_bool : public base_function
struct _fn_trim : public base_function {
+ //TODO base function trim
std::string input_string;
value v_remove;
value v_input;
_fn_trim()
{
+ //default character to remove is blank
v_remove = " ";
}
@@ -2053,13 +2101,16 @@ struct _fn_trim : public base_function {
base_statement* str = *iter;
v_input = str->eval();
if(v_input.type != value::value_En_t::STRING) {
- throw base_s3select_exception("content is not string");
+ throw base_s3select_exception("content type is not a string");
}
input_string = v_input.str();
if (args_size == 2) {
iter++;
base_statement* next = *iter;
v_remove = next->eval();
+ if(v_remove.type != value::value_En_t::STRING) {
+ throw base_s3select_exception("remove type is not a string");
+ }
}
boost::trim_right_if(input_string,boost::is_any_of(v_remove.str()));
boost::trim_left_if(input_string,boost::is_any_of(v_remove.str()));
@@ -2069,13 +2120,13 @@ struct _fn_trim : public base_function {
};
struct _fn_leading : public base_function {
-
std::string input_string;
value v_remove;
value v_input;
_fn_leading()
{
+ //default character to remove is blank
v_remove = " ";
}
@@ -2088,13 +2139,16 @@ struct _fn_leading : public base_function {
base_statement* str = *iter;
v_input = str->eval();
if(v_input.type != value::value_En_t::STRING) {
- throw base_s3select_exception("content is not string");
+ throw base_s3select_exception("content type is not a string");
}
input_string = v_input.str();
if (args_size == 2) {
iter++;
base_statement* next = *iter;
v_remove = next->eval();
+ if(v_remove.type != value::value_En_t::STRING) {
+ throw base_s3select_exception("remove type is not a string");
+ }
}
boost::trim_left_if(input_string,boost::is_any_of(v_remove.str()));
result->set_value(input_string.c_str());
@@ -2110,6 +2164,7 @@ struct _fn_trailing : public base_function {
_fn_trailing()
{
+ //default character to remove is blank
v_remove = " ";
}
@@ -2122,13 +2177,16 @@ struct _fn_trailing : public base_function {
base_statement* str = *iter;
v_input = str->eval();
if(v_input.type != value::value_En_t::STRING) {
- throw base_s3select_exception("content is not string");
+ throw base_s3select_exception("content type is not a string");
}
input_string = v_input.str();
if (args_size == 2) {
iter++;
base_statement* next = *iter;
v_remove = next->eval();
+ if(v_remove.type != value::value_En_t::STRING) {
+ throw base_s3select_exception("remove type is not a string");
+ }
}
boost::trim_right_if(input_string,boost::is_any_of(v_remove.str()));
result->set_value(input_string.c_str());
@@ -2183,7 +2241,8 @@ struct _fn_decimal_operator : public base_function {
iter++;
base_statement* expr_scale = *iter;
value expr_scale_val = expr_scale->eval();
-
+
+ //parser does the type checking
precision = expr_precision_val.i64();
scale = expr_scale_val.i64();
@@ -2195,20 +2254,20 @@ struct _fn_decimal_operator : public base_function {
struct _fn_engine_version : public base_function {
- const char* version_description =R"(PR #137 :
-the change handle the use cases where the JSON input starts with an anonymous array/object this may cause wrong search result per the user request(SQL statement)
-
-handle the use-case where the user requests a json-key-path that may point to a non-discrete value. i.e. array or an object.
-editorial changes.
-
-fix for CSV flow, in the case of a "broken row" (upon processing stream of data)
-
-null results upon aggregation functions on an empty group (no match for where clause).
+ const char* version_description =R"(
+-- trim operator: case insensitive #140
+-- add exception handling to avoid crashes, and produce informative messages instead #141
+-- case-insensitive in the case of is null or is not null predicates. #141
+-- a fix for missing check-type, which cause a crash(trim operator) #142
+-- cast null operations returned false instead of null. #143
+-- adding another way to generate TPCDS data, this method is faster and efficient, it launches multiple instances of data-generators and uses less disk space #145
+-- the scripts use the dsdgen application resides on https://github.com/galsalomon66/tpc-ds-datagen-to-aws-s3
+the whole system resides in a container [ docker pull galsl/fedora_38:tpcds_v2 ] #146
+-- upon logical_operand(and/or) the parser-builder does not use case-insensitive compare function, resulting in wrong evaluation #147
)";
-
_fn_engine_version()
- {
+ {//it means it will return a single result line, in case of multi-rows input object
aggregate = true;
}
@@ -2535,6 +2594,17 @@ bool base_statement::is_column_reference() const
return false;
}
+std::string base_statement::get_key_from_projection()
+{
+ variable* v_name = dynamic_cast<variable*>(this);
+
+ if(v_name) {
+ return v_name->get_name();
+ } else {
+ throw base_s3select_exception("key not present");
+ }
+}
+
bool base_statement::is_nested_aggregate(bool &aggr_flow) const
{
if (is_aggregate())
diff --git a/src/s3select/include/s3select_json_parser.h b/src/s3select/include/s3select_json_parser.h
index aa06163f5..4b10bd732 100644
--- a/src/s3select/include/s3select_json_parser.h
+++ b/src/s3select/include/s3select_json_parser.h
@@ -826,4 +826,3 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
#endif
-
diff --git a/src/s3select/include/s3select_oper.h b/src/s3select/include/s3select_oper.h
index 89544fc1d..67bfec2a7 100644
--- a/src/s3select/include/s3select_oper.h
+++ b/src/s3select/include/s3select_oper.h
@@ -389,7 +389,20 @@ struct binop_modulo
{
throw base_s3select_exception("Mod zero is not allowed");
} else {
- return a % b;
+ return a % b;
+ }
+ }
+};
+
+struct binop_float_modulo
+{
+ double operator()(double a, double b)
+ {
+ if (b == 0)
+ {
+ throw base_s3select_exception("Mod zero is not allowed");
+ } else {
+ return fmod(a, b);
}
}
};
@@ -1098,8 +1111,10 @@ public:
value & operator%(const value &v)
{
- if(v.type == value_En_t::DECIMAL) {
+ if(v.type == value_En_t::DECIMAL && this->type == value_En_t::DECIMAL) {
return compute<binop_modulo>(*this,v);
+ } else if(v.type == value_En_t::FLOAT || this->type == value_En_t::FLOAT) {
+ return compute<binop_float_modulo>(*this,v);
} else {
throw base_s3select_exception("wrong use of modulo operation!");
}
@@ -1478,6 +1493,7 @@ public:
const base_statement* get_aggregate() const;
bool is_nested_aggregate(bool&) const;
bool is_column_reference() const;
+ std::string get_key_from_projection();
bool mark_aggreagtion_subtree_to_execute();
bool is_statement_contain_star_operation() const;
void push_for_cleanup(std::set<base_statement*>&);
@@ -1714,7 +1730,7 @@ public:
virtual bool is_column() const //is reference to column.
{
- if(m_var_type == var_t::VARIABLE_NAME || m_var_type == var_t::POS || m_var_type == var_t::STAR_OPERATION)
+ if(m_var_type == var_t::VARIABLE_NAME || m_var_type == var_t::POS || m_var_type == var_t::STAR_OPERATION || m_var_type == var_t::JSON_VARIABLE)
{
return true;
}
@@ -1817,7 +1833,7 @@ public:
}
}
- }
+ }
if (m_projection_alias)
{
@@ -2375,6 +2391,17 @@ class base_date_diff : public base_function
ptime1 += boost::posix_time::minutes(ts1_td.minutes() * -1);
ptime2 = ts2_ptime + boost::posix_time::hours(ts2_td.hours() * -1);
ptime2 += boost::posix_time::minutes(ts2_td.minutes() * -1);
+
+ try{
+ ptime1.date().year();
+ ptime2.date().year();
+ }
+ catch(std::exception& e)
+ {
+ std::string error_msg = "the input timestamp for the diff operation is not correct : " + std::string(e.what());
+ throw base_s3select_exception(error_msg.data(),base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
}
};
diff --git a/src/s3select/test/s3select_test.cpp b/src/s3select/test/s3select_test.cpp
index 7c372551c..915f14d26 100644
--- a/src/s3select/test/s3select_test.cpp
+++ b/src/s3select/test/s3select_test.cpp
@@ -1,5 +1,6 @@
#include "s3select_test.h"
+
TEST(TestS3SElect, s3select_vs_C)
{
//purpose: validate correct processing of arithmetical expression, it is done by running the same expression
@@ -880,7 +881,10 @@ void test_single_column_single_row(const char* input_query,const char* expected_
{
ASSERT_TRUE(false);
}
- ASSERT_EQ(s3_csv_object.get_error_description(),error_description);
+ if(s3_csv_object.get_error_description().find(error_description) == std::string::npos )
+ {
+ FAIL() << "getting error: " << s3_csv_object.get_error_description() << " instead of: " << error_description << std::endl;
+ }
return;
}
@@ -1730,6 +1734,7 @@ TEST(TestS3selectFunctions, boolcast)
test_single_column_single_row("select cast(0 as bool) from s3object;","false\n");
test_single_column_single_row("select cast(true as bool) from s3object;","true\n");
test_single_column_single_row("select cast('a' as bool) from s3object;","false\n");
+ test_single_column_single_row("select cast(null as bool) from s3object;","null\n");
}
TEST(TestS3selectFunctions, floatcast)
@@ -1737,6 +1742,7 @@ TEST(TestS3selectFunctions, floatcast)
test_single_column_single_row("select cast('1234a' as float) from s3object;","#failure#","extra characters after the number");
test_single_column_single_row("select cast('a1234' as float) from s3object;","#failure#","text cannot be converted to a number");
test_single_column_single_row("select cast('999e+999' as float) from s3object;","#failure#","converted value would fall out of the range of the result type!");
+ test_single_column_single_row("select cast(null as float) from s3object;","null\n");
}
TEST(TestS3selectFunctions, intcast)
@@ -1745,6 +1751,7 @@ TEST(TestS3selectFunctions, intcast)
test_single_column_single_row("select cast('a1234' as int) from s3object;","#failure#","text cannot be converted to a number");
test_single_column_single_row("select cast('9223372036854775808' as int) from s3object;","#failure#","converted value would fall out of the range of the result type!");
test_single_column_single_row("select cast('-9223372036854775809' as int) from s3object;","#failure#","converted value would fall out of the range of the result type!");
+ test_single_column_single_row("select cast(null as int) from s3object;","null\n");
}
TEST(TestS3selectFunctions, predicate_as_projection_column)
@@ -2064,6 +2071,12 @@ TEST(TestS3selectFunctions, mod)
test_single_column_single_row( "select 5%2 from stdin;","1\n");
}
+TEST(TestS3selectFunctions, modfloat)
+{
+test_single_column_single_row( "select 5.2%2 from stdin;","1.2000000000000002\n");
+test_single_column_single_row( "select 5.2%2.5 from stdin;","0.20000000000000018\n");
+}
+
TEST(TestS3selectFunctions, modzero)
{
test_single_column_single_row( "select 0%2 from stdin;","0\n");
@@ -2124,6 +2137,13 @@ TEST(TestS3selectFunctions, isnullnot)
test_single_column_single_row( "select \"true\" from stdin where not nullif(1,2) is null;" ,"true\n");
}
+TEST(TestS3selectFunctions, case_insensitive_not_null)
+{
+test_single_column_single_row( "select \"false\" from stdin where nullif(1,1) is NOT null;" ,"");
+test_single_column_single_row( "select \"false\" from stdin where nullif(1,1) is not Null;" ,"");
+test_single_column_single_row( "select \"true\" from stdin where nullif(1,1) is Null;" ,"true\n");
+}
+
TEST(TestS3selectFunctions, isnull1)
{
test_single_column_single_row( "select \"true\" from stdin where 7 + null is null;" ,"true\n");
@@ -2529,6 +2549,16 @@ TEST(TestS3selectFunctions, trim11)
test_single_column_single_row( "select trim(trailing from trim(leading from \" foobar \")) from stdin ;" ,"foobar\n");
}
+TEST(TestS3selectFunctions, trim12)
+{
+test_single_column_single_row( "select trim(LEADING '1' from '111abcdef111') from s3object ;" ,"abcdef111\n");
+}
+
+TEST(TestS3selectFunctions, trim13)
+{
+test_single_column_single_row( "select trim(TRAILING '1' from '111abcdef111') from s3object ;" ,"111abcdef\n");
+}
+
TEST(TestS3selectFunctions, likescape)
{
test_single_column_single_row("select \"true\" from stdin where \"abc_defgh\" like \"abc$_defgh\" escape \"$\";","true\n");
@@ -3444,3 +3474,327 @@ input_json_data = R"(
}
+
+ TEST(TestS3selectFunctions, json_queries_format)
+{
+ std::string result;
+ std::string expected_result;
+ std::string input_query;
+
+std::string input_json_data = R"(
+ {"root" : [
+ {"c1": 891,"c2": 903,"c3": 78,"c4": 566,"c5": 134,"c6": 121,"c7": 203,"c8": 795,"c9": 82,"c10": 135},
+ {"c1": 218,"c2": 881,"c3": 840,"c4": 385,"c5": 385,"c6": 674,"c7": 618,"c8": 99,"c9": 296,"c10": 545},
+ {"c1": 218,"c2": 881,"c3": 840,"c4": 385,"c5": 385,"c6": 674,"c7": 618,"c8": 99,"c9": 296,"c10": 545}
+ ]
+ }
+ )";
+
+ expected_result=R"({"_1":1327}
+)";
+ input_query = "select sum(_1.c1) from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"_1":1461}
+)";
+ input_query = "select sum(_1.c1) + min(_1.c5) from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+
+ expected_result=R"({"c1":891}
+{"c1":218}
+{"c1":218}
+)";
+ input_query = "select _1.c1 from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"_1":218,"_2":903}
+)";
+ input_query = "select min(_1.c1), max(_1.c2) from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"c1":891,"c2":903}
+{"c1":218,"c2":881}
+{"c1":218,"c2":881}
+)";
+ input_query = "select _1.c1, _1.c2 from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"c2":903,"c1":891,"c4":566}
+{"c2":881,"c1":218,"c4":385}
+{"c2":881,"c1":218,"c4":385}
+)";
+ input_query = "select _1.c2, _1.c1, _1.c4 from s3object[*].root ;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"_1":1794}
+{"_1":1099}
+{"_1":1099}
+)";
+ input_query = "select _1.c2 + _1.c1 from s3object[*].root ;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"_1":891}
+{"_1":218}
+{"_1":218}
+)";
+ input_query = "select nullif(_1.c1, _1.c2) from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"_1":991}
+{"_1":318}
+{"_1":318}
+)";
+ input_query = "select _1.c1 + 100 from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"c13":null}
+{"c13":null}
+{"c13":null}
+)";
+ input_query = "select _1.c13 from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"_1":null}
+{"_1":null}
+{"_1":null}
+)";
+ input_query = "select _1.c15 * 2 from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"_1":null}
+{"_1":null}
+{"_1":null}
+)";
+ input_query = "select _1.c15 + _1.c13 from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"x":891}
+{"x":218}
+{"x":218}
+)";
+ input_query = "select coalesce(_1.c1, 0) as x from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"x":891,"c2":903}
+{"x":218,"c2":881}
+{"x":218,"c2":881}
+)";
+ input_query = "select _1.c1 as x, _1.c2 from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"c2":903,"x":891}
+{"c2":881,"x":218}
+{"c2":881,"x":218}
+)";
+ input_query = "select _1.c2, _1.c1 as x from s3object[*].root;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+}
+
+TEST(TestS3selectFunctions, json_queries_format_1)
+{
+ std::string result;
+ std::string expected_result;
+ std::string input_query;
+
+ std::string input_json_data = R"(
+{
+"firstName": "Joe",
+"lastName": "Jackson",
+"gender": "male",
+"age": "twenty",
+"address": {
+"streetAddress": "101",
+"city": "San Diego",
+"state": "CA"
+},
+"phoneNumbers": [
+{ "type": "home1", "number": "7349282_1", "addr": 11},
+{ "type": "home2", "number": "7349282_2", "addr": 22},
+{ "type": "home3", "number": "734928_3", "addr": 33},
+{ "type": "home4", "number": "734928_4", "addr": 44},
+{ "type": "home5", "number": "734928_5", "addr": 55},
+{ "type": "home6", "number": "734928_6", "addr": 66},
+{ "type": "home7", "number": "734928_7", "addr": 77}
+]
+}
+)";
+
+ expected_result=R"({"gender":male}
+)";
+ input_query = "select _1.gender from s3object[*] ;";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"streetAddress":101}
+)";
+ input_query = "select _1.address.streetAddress from s3object[*];";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+ expected_result=R"({"addr":11}
+)";
+ input_query = "select _1.phoneNumbers[0].addr from s3object[*];";
+
+ run_json_query(input_query.c_str(), input_json_data, result, true);
+ ASSERT_EQ(result,expected_result);
+
+}
+
+TEST(TestS3selectFunctions, json_format_csv_object)
+{
+ std::string input_query{};
+ std::string s3select_res{};
+
+ std::string input = R"(383,886,777,915,793,335,386,492,649,421
+362,27,690,59,763,926,540,426,172,736
+211,368,567,429,782,530,862,123,67,135
+929,802,22,58,69,167,393,456,11,42
+229,373,421,919,784,537,198,324,315,370
+413,526,91,980,956,873,862,170,996,281
+305,925,84,327,336,505,846,729,313,857
+124,895,582,545,814,367,434,364,43,750
+87,808,276,178,788,584,403,651,754,399
+932,60,676,368,739,12,226,586,94,539
+)";
+
+ std::string expected_result{};
+
+ expected_result=R"({"_1":383,"_2":886}
+{"_1":362,"_2":27}
+{"_1":211,"_2":368}
+{"_1":929,"_2":802}
+{"_1":229,"_2":373}
+{"_1":413,"_2":526}
+{"_1":305,"_2":925}
+{"_1":124,"_2":895}
+{"_1":87,"_2":808}
+{"_1":932,"_2":60}
+)";
+
+ input_query = "select _1,_2 from s3object;";
+
+ s3select_res = run_s3select(input_query, input, "", true, true);
+
+ ASSERT_EQ(s3select_res, expected_result);
+
+ expected_result=R"({"_1":3975}
+)";
+
+ input_query = "select sum(int(_1)) from s3object;";
+
+ s3select_res = run_s3select(input_query, input, "", true, true);
+
+ ASSERT_EQ(s3select_res, expected_result);
+
+ expected_result=R"({"x":383,"y":886}
+{"x":362,"y":27}
+{"x":211,"y":368}
+{"x":929,"y":802}
+{"x":229,"y":373}
+{"x":413,"y":526}
+{"x":305,"y":925}
+{"x":124,"y":895}
+{"x":87,"y":808}
+{"x":932,"y":60}
+)";
+
+ input_query = "select _1 as x, _2 as y from s3object;";
+
+ s3select_res = run_s3select(input_query, input, "", true, true);
+
+ ASSERT_EQ(s3select_res, expected_result);
+
+ expected_result = R"({"_1":8}
+{"_1":2}
+{"_1":3}
+{"_1":8}
+{"_1":3}
+{"_1":5}
+{"_1":9}
+{"_1":8}
+{"_1":8}
+{"_1":6}
+)";
+
+ input_query = "select substring(_2, 1, 1) from s3object;";
+
+ s3select_res = run_s3select(input_query, input, "", true, true);
+
+ ASSERT_EQ(s3select_res, expected_result);
+
+ expected_result = R"({"x":8}
+{"x":2}
+{"x":3}
+{"x":8}
+{"x":3}
+{"x":5}
+{"x":9}
+{"x":8}
+{"x":8}
+{"x":6}
+)";
+
+ input_query = "select substring(_2, 1, 1) as x from s3object;";
+
+ s3select_res = run_s3select(input_query, input, "", true, true);
+
+ ASSERT_EQ(s3select_res, expected_result);
+
+ expected_result = R"({"c1":383,"_1":385}
+{"c1":362,"_1":364}
+{"c1":211,"_1":213}
+{"c1":929,"_1":931}
+{"c1":229,"_1":231}
+{"c1":413,"_1":415}
+{"c1":305,"_1":307}
+{"c1":124,"_1":126}
+{"c1":87,"_1":89}
+{"c1":932,"_1":934}
+)";
+
+ input_query = "select cast(_1 as int) as c1, c1 + 2 from s3object;";
+
+ s3select_res = run_s3select(input_query, input, "", true, true);
+
+ ASSERT_EQ(s3select_res, expected_result);
+
+}
+
+
+
+
+
+
diff --git a/src/s3select/test/s3select_test.h b/src/s3select/test/s3select_test.h
index 307db8a4b..9e6fe3a12 100644
--- a/src/s3select/test/s3select_test.h
+++ b/src/s3select/test/s3select_test.h
@@ -617,7 +617,7 @@ std::string run_s3select_opserialization_quot(std::string expression,std::string
}
// JSON tests API's
-int run_json_query(const char* json_query, std::string& json_input,std::string& result)
+int run_json_query(const char* json_query, std::string& json_input,std::string& result, bool json_format = false)
{//purpose: run single-chunk json queries
s3select s3select_syntax;
@@ -628,19 +628,27 @@ int run_json_query(const char* json_query, std::string& json_input,std::string&
return -1;
}
- json_object json_query_processor(&s3select_syntax);
+ json_object m_s3_json_object;
+ json_object::csv_definitions json_definitions;
+
+ if(json_format) {
+ json_definitions.output_json_format = true;
+ }
+
+ m_s3_json_object.set_json_query(&s3select_syntax, json_definitions);
+
result.clear();
- status = json_query_processor.run_s3select_on_stream(result, json_input.data(), json_input.size(), json_input.size());
+ status = m_s3_json_object.run_s3select_on_stream(result, json_input.data(), json_input.size(), json_input.size(), json_format);
std::string prev_result = result;
result.clear();
- status = json_query_processor.run_s3select_on_stream(result, 0, 0, json_input.size());
+ status = m_s3_json_object.run_s3select_on_stream(result, 0, 0, json_input.size(), json_format);
result = prev_result + result;
return status;
}
-std::string run_s3select(std::string expression,std::string input, const char* json_query = "")
+std::string run_s3select(std::string expression,std::string input, const char* json_query = "", bool json_format = false, bool csv_json_format = true)
{//purpose: run query on multiple rows and return result(multiple projections).
s3select s3select_syntax;
std::string parquet_input = input;
@@ -654,52 +662,64 @@ std::string run_s3select(std::string expression,std::string input, const char* j
std::string s3select_result;
std::string json_result;
- s3selectEngine::csv_object s3_csv_object(&s3select_syntax);
- s3_csv_object.m_csv_defintion.redundant_column = false;
+
+ csv_object::csv_defintions csv_definitions;
+
+ if(json_format) {
+ csv_definitions.output_json_format = true;
+ }
+
+ csv_definitions.redundant_column = false;
+
+ s3selectEngine::csv_object s3_csv_object;
+ s3_csv_object.set_csv_query(&s3select_syntax, csv_definitions);
s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true);
-#ifdef _ARROW_EXIST
- static int file_no = 1;
- csv_to_parquet(parquet_input);
- std::string parquet_result;
- run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result);
+ if(!csv_json_format) {
- if (strcmp(parquet_result.c_str(),s3select_result.c_str()))
- {
- std::cout << "failed on query " << expression << std::endl;
- std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl;
+ #ifdef _ARROW_EXIST
+ static int file_no = 1;
+ csv_to_parquet(parquet_input);
+ std::string parquet_result;
+ run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result);
- {
- std::string buffer;
+ if (strcmp(parquet_result.c_str(),s3select_result.c_str()))
+ {
+ std::cout << "failed on query " << expression << std::endl;
+ std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl;
- std::ifstream f(PARQUET_FILENAME);
- f.seekg(0, std::ios::end);
- buffer.resize(f.tellg());
- f.seekg(0);
- f.read(buffer.data(), buffer.size());
+ {
+ std::string buffer;
- std::string fn = std::string("./failed_test_input_") + std::to_string(file_no) + std::string(".parquet");
- std::ofstream fw(fn.c_str());
- fw.write(buffer.data(), buffer.size());
+ std::ifstream f(PARQUET_FILENAME);
+ f.seekg(0, std::ios::end);
+ buffer.resize(f.tellg());
+ f.seekg(0);
+ f.read(buffer.data(), buffer.size());
- fn = std::string("./failed_test_input_") + std::to_string(file_no++) + std::string(".csv");
- std::ofstream fw2(fn.c_str());
- fw2.write(parquet_input.data(), parquet_input.size());
+ std::string fn = std::string("./failed_test_input_") + std::to_string(file_no) + std::string(".parquet");
+ std::ofstream fw(fn.c_str());
+ fw.write(buffer.data(), buffer.size());
- }
- }
+ fn = std::string("./failed_test_input_") + std::to_string(file_no++) + std::string(".csv");
+ std::ofstream fw2(fn.c_str());
+ fw2.write(parquet_input.data(), parquet_input.size());
- parquet_csv_report_error(parquet_result,s3select_result);
-#endif //_ARROW_EXIST
+ }
+ }
+
+ parquet_csv_report_error(parquet_result,s3select_result);
+ #endif //_ARROW_EXIST
- if(strlen(json_query) == 0) {
- json_query = convert_query(expression);
- }
+ if(strlen(json_query) == 0) {
+ json_query = convert_query(expression);
+ }
- if(strcmp(json_query,JSON_NO_RUN)) {
- run_json_query(json_query, js, json_result);
- json_csv_report_error(json_result, s3select_result);
+ if(strcmp(json_query,JSON_NO_RUN)) {
+ run_json_query(json_query, js, json_result, json_format);
+ json_csv_report_error(json_result, s3select_result);
+ }
}
return s3select_result;