diff options
Diffstat (limited to '')
-rw-r--r-- | src/s3select/example/CMakeLists.txt | 23 | ||||
-rw-r--r-- | src/s3select/example/csv_to_parquet.cpp | 417 | ||||
-rwxr-xr-x | src/s3select/example/expr_genrator.py | 9 | ||||
-rw-r--r-- | src/s3select/example/generate_rand_csv.c | 28 | ||||
-rwxr-xr-x | src/s3select/example/parse_csv.py | 12 | ||||
-rwxr-xr-x | src/s3select/example/run_test.bash | 111 | ||||
-rw-r--r-- | src/s3select/example/s3select_example.cpp | 711 |
7 files changed, 1311 insertions, 0 deletions
diff --git a/src/s3select/example/CMakeLists.txt b/src/s3select/example/CMakeLists.txt new file mode 100644 index 000000000..8b5c8c070 --- /dev/null +++ b/src/s3select/example/CMakeLists.txt @@ -0,0 +1,23 @@ +add_executable(s3select_example s3select_example.cpp) +target_include_directories(s3select_example PUBLIC ../include ../rapidjson/include) + +find_package(Arrow QUIET) + +if(Arrow_FOUND) + message( "arrow is installed") + add_executable(csv_to_parquet csv_to_parquet.cpp) + target_include_directories(csv_to_parquet PUBLIC ../include) + target_link_libraries(s3select_example boost_date_time boost_system boost_thread parquet arrow boost_filesystem) + target_link_libraries(csv_to_parquet boost_date_time boost_system boost_thread parquet arrow) +else() + target_link_libraries(s3select_example boost_date_time boost_system boost_thread boost_filesystem) +endif() + +add_executable(generate_rand_csv generate_rand_csv.c) + +add_custom_command(OUTPUT expr_genrator.py COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/expr_genrator.py expr_genrator.py + COMMENT "Copy expr_genrator.py" + VERBATIM) + +add_custom_target(expr_generator ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/expr_genrator.py) + diff --git a/src/s3select/example/csv_to_parquet.cpp b/src/s3select/example/csv_to_parquet.cpp new file mode 100644 index 000000000..37a1ed0f2 --- /dev/null +++ b/src/s3select/example/csv_to_parquet.cpp @@ -0,0 +1,417 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cassert> +#include <fstream> +#include <iostream> +#include <memory> +#include <iomanip> +#include <algorithm> +#include "boost/date_time/gregorian/gregorian.hpp" +#include "boost/date_time/posix_time/posix_time.hpp" +#include <stdio.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> + + +#include <arrow/io/file.h> +#include <arrow/util/logging.h> + +#include <parquet/api/reader.h> +#include <parquet/api/writer.h> + +using parquet::ConvertedType; +using parquet::Repetition; +using parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +/* + * This example describes writing and reading Parquet Files in C++ and serves as a + * reference to the API. + * The file contains all the physical data types supported by Parquet. + * This example uses the RowGroupWriter API that supports writing RowGroups based on a + *certain size + **/ + +/* Parquet is a structured columnar file format + * Parquet File = "Parquet data" + "Parquet Metadata" + * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a + * columnar layout + * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their + * Columns + * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a + * complex (nested) type (internal nodes) + * For specific details, please refer the format here: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + **/ + +#include <string> +#include <boost/tokenizer.hpp> +using namespace boost; +using namespace std; + +//constexpr int NUM_ROWS = 10000000; +constexpr int NUM_ROWS = 10000; + +//constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB +constexpr int64_t ROW_GROUP_SIZE = 1024 * 1024; + +const char PARQUET_FILENAME[] = "csv_converted.parquet"; + +static std::shared_ptr<GroupNode> column_string_2(uint32_t num_of_columns) { + + parquet::schema::NodeVector fields; + + for(uint32_t i=0;i<num_of_columns;i++) + { + std::string column_name = "column_" + to_string(i) ; + fields.push_back(PrimitiveNode::Make(column_name, Repetition::OPTIONAL, Type::BYTE_ARRAY, + ConvertedType::NONE)); + } + + return std::static_pointer_cast<GroupNode>( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); +} + + +class tokenize { + + public: + const char *s; + std::string input; + const char *p; + bool last_token; + + tokenize(std::string& in):s(0),input(in),p(input.c_str()),last_token(false) + { + }; + + void get_token(std::string& token) + { + if(!*p) + { + token = ""; + last_token = true; + return; + } + + + s=p; + while(*p && *p != ',' && *p != '\n') p++; + + token = std::string(s,p); + p++; + } + + bool is_last() + { + return last_token == true; + } +}; + +void generate_rand_columns_csv_datetime(std::string& out, size_t size) { + std::stringstream ss; + auto year = [](){return rand()%100 + 1900;}; + auto month = [](){return 1 + rand()%12;}; + auto day = [](){return 1 + rand()%28;}; + auto hours = [](){return rand()%24;}; + auto minutes = [](){return rand()%60;}; + auto seconds = [](){return rand()%60;}; + + for (auto i = 0U; i < size; ++i) { + ss << year() << "-" << std::setw(2) << std::setfill('0')<< month() << "-" << std::setw(2) << std::setfill('0')<< day() << "T" <<std::setw(2) << std::setfill('0')<< hours() << ":" << std::setw(2) << std::setfill('0')<< minutes() << ":" << std::setw(2) << std::setfill('0')<<seconds() << "Z" << "," << std::endl; + } + out = ss.str(); +} + +void generate_columns_csv(std::string& out, size_t size) { + std::stringstream ss; + + for (auto i = 0U; i < size; ++i) { + ss << i << "," << i+1 << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << std::endl; + } + out = ss.str(); +} + +void generate_rand_columns_csv_with_null(std::string& out, size_t size) { + std::stringstream ss; + auto r = [](){ int x=rand()%1000;if (x<100) return std::string(""); else return std::to_string(x);}; + + for (auto i = 0U; i < size; ++i) { + ss << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl; + } + out = ss.str(); +} + +void generate_fix_columns_csv(std::string& out, size_t size) { + std::stringstream ss; + for (auto i = 0U; i < size; ++i) { + ss << 1 << "," << 2 << "," << 3 << "," << 4 << "," << 5 << std::endl; + } + out = ss.str(); +} + +void generate_rand_csv_datetime_to_string(std::string& out, std::string& result, size_t size, bool const_frmt = true) { + + std::stringstream ss_out, ss_res; + std::string format = "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-"; + std::string months[12] = {"January", "February", "March","April", "May", "June", "July", "August", "September", "October", "November", "December"}; + auto year = [](){return rand()%100 + 1900;}; + auto month = [](){return 1 + rand()%12;}; + auto day = [](){return 1 + rand()%28;}; + auto hours = [](){return rand()%24;}; + auto minutes = [](){return rand()%60;}; + auto seconds = [](){return rand()%60;}; + auto fracation_sec = [](){return rand()%1000000;}; + + for (auto i = 0U; i < size; ++i) + { + auto yr = year(); + auto mnth = month(); + auto dy = day(); + auto hr = hours(); + auto mint = minutes(); + auto sec = seconds(); + auto frac_sec = fracation_sec(); + + if (const_frmt) + { + ss_out << yr << "-" << std::setw(2) << std::setfill('0') << mnth << "-" << std::setw(2) << std::setfill('0') << dy << "T" <<std::setw(2) << std::setfill('0') << hr << ":" << std::setw(2) << std::setfill('0') << mint << ":" << std::setw(2) << std::setfill('0') <<sec << "." << frac_sec << "Z" << "," << std::endl; + + ss_res << yr << sec << months[mnth-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy << dy << frac_sec << std::string(11 - std::to_string(frac_sec).length(), '0') << months[mnth-1] << " " << std::setw(2) << std::setfill('0') << hr << (hr < 12 ? "AM" : "PM") << ":" << mint << " -:-" << "," << std::endl; + } + else + { + switch(rand()%5) + { + case 0: + format = "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-"; + ss_res << yr << sec << months[mnth-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy << dy << frac_sec << std::string(11 - std::to_string(frac_sec).length(), '0') << months[mnth-1] << " " << std::setw(2) << std::setfill('0') << hr << (hr < 12 ? "AM" : "PM") << ":" << mint << " -:-" << "," << std::endl; + break; + case 1: + format = "aMMhh"; + ss_res << (hr < 12 ? "AM" : "PM") << std::setw(2) << std::setfill('0') << mnth << std::setw(2) << std::setfill('0') << (hr%12 == 0 ? 12 : hr%12) << "," << std::endl; + break; + case 2: + format = "y M d ABCDEF"; + ss_res << yr << " " << mnth << " " << dy << " ABCDEF" << "," << std::endl; + break; + case 3: + format = "W h:MMMM"; + ss_res << "W " << (hr%12 == 0 ? 12 : hr%12) << ":" << months[mnth-1] << "," << std::endl; + break; + case 4: + format = "H:m:s"; + ss_res << hr << ":" << mint << ":" << sec << "," << std::endl; + break; + } + + ss_out << yr << "-" << std::setw(2) << std::setfill('0') << mnth << "-" << std::setw(2) << std::setfill('0') << dy << "T" <<std::setw(2) << std::setfill('0') << hr << ":" << std::setw(2) << std::setfill('0') << mint << ":" << std::setw(2) << std::setfill('0') <<sec << "." << frac_sec << "Z" << "," << format << "," << std::endl; + } + } + out = ss_out.str(); + result = ss_res.str(); +} +void generate_rand_columns_csv(std::string& out, size_t size) { + std::stringstream ss; + auto r = [](){return rand()%1000;}; + + for (auto i = 0U; i < size; ++i) { + ss << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl; + } + out = ss.str(); +} + +int csv_to_parquet(std::string & csv_object) +{ + + auto csv_num_of_columns = std::count( csv_object.begin(),csv_object.begin() + csv_object.find('\n'),',')+1; + auto csv_num_of_rows = std::count(csv_object.begin(),csv_object.end(),'\n'); + + tokenize csv_tokens(csv_object); + + try { + // Create a local file output stream instance. + + using FileClass = ::arrow::io::FileOutputStream; + std::shared_ptr<FileClass> out_file; + PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); + + // Setup the parquet schema + std::shared_ptr<GroupNode> schema = column_string_2(csv_num_of_columns); + + // Add writer properties + parquet::WriterProperties::Builder builder; + // builder.compression(parquet::Compression::SNAPPY); + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr<parquet::ParquetFileWriter> file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a BufferedRowGroup to keep the RowGroup open until a certain size + parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup(); + + int num_columns = file_writer->num_columns(); + std::vector<int64_t> buffered_values_estimate(num_columns, 0); + + for (int i = 0; !csv_tokens.is_last() && i<csv_num_of_rows; i++) { + int64_t estimated_bytes = 0; + // Get the estimated size of the values that are not written to a page yet + for (int n = 0; n < num_columns; n++) { + estimated_bytes += buffered_values_estimate[n]; + } + + // We need to consider the compressed pages + // as well as the values that are not compressed yet + if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() + + estimated_bytes) > ROW_GROUP_SIZE) { + rg_writer->Close(); + std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0); + rg_writer = file_writer->AppendBufferedRowGroup(); + } + + int col_id; + for(col_id=0;col_id<num_columns && !csv_tokens.is_last();col_id++) + { + // Write the byte-array column + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); + parquet::ByteArray ba_value; + + std::string token; + csv_tokens.get_token(token); + if(token.size() == 0) + {//null column + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + } + else + { + int16_t definition_level = 1; + ba_value.ptr = (uint8_t*)(token.data()); + ba_value.len = token.size(); + ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value); + } + + buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); + + + } //end-for columns + + if(csv_tokens.is_last() && col_id<num_columns) + { + for(;col_id<num_columns;col_id++) + { + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); + + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + + buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); + } + + } + + } // end-for rows + + // Close the RowGroupWriter + rg_writer->Close(); + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return -1; + } + + return 0; +} + + +static int csv_file_to_parquet(int argc,char **argv) +{ + //open file (CSV) and load into std::string, convert to parquet(save to FS) + + if (argc<2) exit(-1); + + FILE* fp; + struct stat l_buf; + int st = lstat(argv[1], &l_buf); + if(st<0) exit(-1); + + printf("input csv file size = %ld\n",l_buf.st_size); + + char * buffer = new char[ l_buf.st_size ]; + fp = fopen(argv[1],"r"); + + if(!fp) exit(-1); + + size_t read_sz = fread(buffer, 1, l_buf.st_size,fp); + + std::string csv_obj; + csv_obj.append(buffer,read_sz); + + csv_to_parquet(csv_obj); + + return 0; +} + +int csv_object_to_parquet(int argc,char **argv) +{ + srand(time(0)); + + std::string csv_obj; + std::string expected_result; + generate_rand_columns_csv(csv_obj, 128); + //generate_rand_csv_datetime_to_string(csv_obj, expected_result, 10000); + //generate_rand_columns_csv_with_null(csv_obj, 10000); + //generate_columns_csv(csv_obj,128); + //generate_rand_columns_csv_datetime(csv_obj,10000); + generate_fix_columns_csv(csv_obj,128); + FILE *fp = fopen("10k.csv","w"); + + if(fp) + { + fwrite(csv_obj.data(),csv_obj.size(),1,fp); + fclose(fp); + } + else + { + exit(-1); + } + + //csv_obj="1,2,3,4,5,6,7,8,9,10\n10,20,30,40,50,60,70,80,90,100\n"; + csv_obj="1,2,3,4\n"; + + csv_to_parquet(csv_obj); + + return 0; +} + +int main(int argc,char **argv) +{ + return csv_file_to_parquet(argc,argv); +} + diff --git a/src/s3select/example/expr_genrator.py b/src/s3select/example/expr_genrator.py new file mode 100755 index 000000000..5905e9832 --- /dev/null +++ b/src/s3select/example/expr_genrator.py @@ -0,0 +1,9 @@ +import random +import sys + +def expr(depth): + if depth==1 or random.random()<1.0/(2**depth-1): + return str(int(random.random() * 100) + 1)+".0" + return '(' + expr(depth-1) + random.choice(['+','-','*','/']) + expr(depth-1) + ')' + +print (expr( int(sys.argv[1]) )) diff --git a/src/s3select/example/generate_rand_csv.c b/src/s3select/example/generate_rand_csv.c new file mode 100644 index 000000000..67d52adaa --- /dev/null +++ b/src/s3select/example/generate_rand_csv.c @@ -0,0 +1,28 @@ +#include <stdio.h> +#include <stdlib.h> + + +int main(int argc, char** argv) +{ + if (argc<3) + { + printf("%s <num-of-rows> <num-of-columns> \n", argv[0]); + return -1; + } + + srand(1234); + int line_no=0; + for(int i=0; i<atoi(argv[1]); i++) + { + printf("%d,", i); + for(int y=0; y<atoi(argv[2]); y++) + { + printf("%d,", rand()%1000); + } + printf("\n"); + } + + + + +} diff --git a/src/s3select/example/parse_csv.py b/src/s3select/example/parse_csv.py new file mode 100755 index 000000000..23fe14add --- /dev/null +++ b/src/s3select/example/parse_csv.py @@ -0,0 +1,12 @@ +#!/usr/bin/python + +import csv + +with open('stam.txt') as csv_file: + csv_reader = csv.reader(csv_file, delimiter=',') + line_count = 0 + for row in csv_reader: + #if (int(row[0])==465 and int(row[5])==268): # casting is slower + if (row[0]=="465" and row[5]=="268"): + print row + diff --git a/src/s3select/example/run_test.bash b/src/s3select/example/run_test.bash new file mode 100755 index 000000000..d0b5c18cb --- /dev/null +++ b/src/s3select/example/run_test.bash @@ -0,0 +1,111 @@ +#!/bin/bash + +set -e + +PREFIX=${1:-"./example"} + +## purpose : sanity tests + +s3select_calc() +{ +l="$*" +res=$( echo 1 | "$PREFIX"/s3select_example -q "select ${l} from stdin;" ) +echo "$res" | sed 's/.$//' +} + +# create c file with expression , compile it and run it. +c_calc() +{ +cat << @@ > "$PREFIX"/tmp.c + +#include <stdio.h> +int main() +{ +printf("%f\n",$*); +} +@@ +gcc -o "$PREFIX"/a.out "$PREFIX"/tmp.c +"$PREFIX"/a.out +} + +expr_test() +{ +## test the arithmetic evaluation of s3select against C program +for i in {1..100} +do + e=$(python3 "$PREFIX"/expr_genrator.py 5) + echo expression["$i"]="$e" + r1=$(s3select_calc "$e") + r2=$(c_calc "$e") + echo "$r1" "$r2" + + ## should be zero or very close to zero; ( s3select is C compile program ) + res=$(echo "" | awk -v e="$e" -v r1="$r1" -v r2="$r2" 'function abs(n){if (n<0) return -n; else return n;}{if (abs(r1-r2) > 0.00001) {print "MISSMATCH result for expression",e;}}') + if test "$res" != ""; then + echo "$res" + exit 1 + fi +done +} + +aggregate_test() +{ +## generate_rand_csv is generating with the same seed +echo check sum +s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select sum(int(_1)) from stdin;') +awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";} {s+=$1;} END{print s;}') +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +echo check min +s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select min(int(_1)) from stdin;') +awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";min=100000;} {if(min>$1) min=$1;} END{print min;}') +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +echo check max +s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select max(int(_1)) from stdin;') +awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";max=0;} {if(max<$1) max=$1;} END{print max;}' ) +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +echo check substr and count +s3select_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select count(int(_1)) from stdin where int(_1)>200 and int(_1)<250;') +awk_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select substring(_1,1,1) from stdin where int(_1)>200 and int(_1)<250;' | uniq -c | awk '{print $1;}') +s3select_val=${s3select_val::-1} +echo "$s3select_val" "$awk_val" +if test "$s3select_val" -ne "$awk_val"; then + exit 1 +fi +} + +parquet_test() +{ +s3select_val=$(${PREFIX}/s3select_example -q "select count(*) from $(realpath parquet_mix_types.parquet) where _1>555 and _1<777;" | grep -v '^\[') + +if test "${s3select_val}" != "221,"; then + echo "parquet test failed,${s3select_val}" +# exit +fi + +s3select_val=$(${PREFIX}/s3select_example -q "select c5,c1,int(_1*0+6),int(_3*0+4),substring(c1,int(_1*0+6),int(_3*0+4)) from $(realpath parquet_mix_types.parquet) where ((c1 like \"%wedd%\") and c0 <100 ) and c5 between 2.1000000000000001 and 2.6200000000000001 and c4 between \"col4_1\" and \"col4_2\";" | grep -v '^\[') + +echo ${s3select_val} +} + +############################################################### + +expr_test +aggregate_test +parquet_test + +rm "$PREFIX"/tmp.c "$PREFIX"/a.out + +exit 0 + diff --git a/src/s3select/example/s3select_example.cpp b/src/s3select/example/s3select_example.cpp new file mode 100644 index 000000000..71aff3d01 --- /dev/null +++ b/src/s3select/example/s3select_example.cpp @@ -0,0 +1,711 @@ +#include "s3select.h" +#include <fstream> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <boost/crc.hpp> +#include <arpa/inet.h> +#include <boost/filesystem.hpp> +#include <boost/tokenizer.hpp> + +using namespace s3selectEngine; +using namespace BOOST_SPIRIT_CLASSIC_NS; + +class awsCli_handler { + + +//TODO get parameter +private: + std::unique_ptr<s3selectEngine::s3select> s3select_syntax; + std::string m_s3select_query; + std::string m_result; + std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object; + std::string m_column_delimiter;//TODO remove + std::string m_quot;//TODO remove + std::string m_row_delimiter;//TODO remove + std::string m_compression_type;//TODO remove + std::string m_escape_char;//TODO remove + std::unique_ptr<char[]> m_buff_header; + std::string m_header_info; + std::string m_sql_query; + uint64_t m_total_object_processing_size; + +public: + + awsCli_handler(): + s3select_syntax(std::make_unique<s3selectEngine::s3select>()), + m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()), + m_buff_header(std::make_unique<char[]>(1000)), + m_total_object_processing_size(0), + crc32(std::unique_ptr<boost::crc_32_type>()) + { + } + + enum header_name_En + { + EVENT_TYPE, + CONTENT_TYPE, + MESSAGE_TYPE + }; + static const char* header_name_str[3]; + + enum header_value_En + { + RECORDS, + OCTET_STREAM, + EVENT, + CONT + }; + static const char* header_value_str[4]; + +private: + + void encode_short(char *buff, uint16_t s, int &i) + { + short x = htons(s); + memcpy(buff, &x, sizeof(s)); + i += sizeof(s); + } + + void encode_int(char *buff, u_int32_t s, int &i) + { + u_int32_t x = htonl(s); + memcpy(buff, &x, sizeof(s)); + i += sizeof(s); + } + + int create_header_records(char* buff) + { + int i = 0; + + //1 + buff[i++] = char(strlen(header_name_str[EVENT_TYPE])); + memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE])); + i += strlen(header_name_str[EVENT_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i); + memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS])); + i += strlen(header_value_str[RECORDS]); + + //2 + buff[i++] = char(strlen(header_name_str[CONTENT_TYPE])); + memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE])); + i += strlen(header_name_str[CONTENT_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i); + memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM])); + i += strlen(header_value_str[OCTET_STREAM]); + + //3 + buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE])); + memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE])); + i += strlen(header_name_str[MESSAGE_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i); + memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT])); + i += strlen(header_value_str[EVENT]); + + return i; +} + + std::unique_ptr<boost::crc_32_type> crc32; + + int create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len) + { + u_int32_t total_byte_len = 0; + u_int32_t preload_crc = 0; + u_int32_t message_crc = 0; + int i = 0; + char *buff = out_string.data(); + + if (crc32 == 0) + { + // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum + crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>); + } + + total_byte_len = result_len + 16; + + encode_int(&buff[i], total_byte_len, i); + encode_int(&buff[i], header_len, i); + + crc32->reset(); + *crc32 = std::for_each(buff, buff + 8, *crc32); + preload_crc = (*crc32)(); + encode_int(&buff[i], preload_crc, i); + + i += result_len; + + crc32->reset(); + *crc32 = std::for_each(buff, buff + i, *crc32); + message_crc = (*crc32)(); + + int out_encode; + encode_int(reinterpret_cast<char*>(&out_encode), message_crc, i); + out_string.append(reinterpret_cast<char*>(&out_encode),sizeof(out_encode)); + + return i; + } + +#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n" +#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>" + +public: + + //std::string get_error_description(){} + + std::string get_result() + { + return m_result; + } + + int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size) + { + int status = 0; + csv_object::csv_defintions csv; + + m_result = "012345678901"; //12 positions for header-crc + + int header_size = 0; + + if (m_s3_csv_object == 0) + { + s3select_syntax->parse_query(query); + + if (m_row_delimiter.size()) + { + csv.row_delimiter = *m_row_delimiter.c_str(); + } + + if (m_column_delimiter.size()) + { + csv.column_delimiter = *m_column_delimiter.c_str(); + } + + if (m_quot.size()) + { + csv.quot_char = *m_quot.c_str(); + } + + if (m_escape_char.size()) + { + csv.escape_char = *m_escape_char.c_str(); + } + + if (m_header_info.compare("IGNORE") == 0) + { + csv.ignore_header_info = true; + } + else if (m_header_info.compare("USE") == 0) + { + csv.use_header_info = true; + } + + m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); + } + + if (s3select_syntax->get_error_description().empty() == false) + { + header_size = create_header_records(m_buff_header.get()); + m_result.append(m_buff_header.get(), header_size); + m_result.append(PAYLOAD_LINE); + m_result.append(s3select_syntax->get_error_description()); + //ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl; + status = -1; + } + else + { + header_size = create_header_records(m_buff_header.get()); + m_result.append(m_buff_header.get(), header_size); + m_result.append(PAYLOAD_LINE); + //status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size); + status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, object_size); + if (status < 0) + { + m_result.append(m_s3_csv_object->get_error_description()); + } + } + + if (m_result.size() > strlen(PAYLOAD_LINE)) + { + m_result.append(END_PAYLOAD_LINE); + create_message(m_result, m_result.size() - 12, header_size); + //s->formatter->write_bin_data(m_result.data(), buff_len); + //if (op_ret < 0) + //{ + // return op_ret; + //} + } + //rgw_flush_formatter_and_reset(s, s->formatter); + + return status; + } + //int extract_by_tag(std::string tag_name, std::string& result); + + //void convert_escape_seq(std::string& esc); + + //int handle_aws_cli_parameters(std::string& sql_query); + +}; + +const char* awsCli_handler::header_name_str[3] = {":event-type", ":content-type", ":message-type"}; +const char* awsCli_handler::header_value_str[4] = {"Records", "application/octet-stream", "event","cont"}; +int run_on_localFile(char* input_query); + +bool is_parquet_file(const char * fn) +{//diffrentiate between csv and parquet + const char * ext = "parquet"; + + if(strstr(fn+strlen(fn)-strlen(ext), ext )) + { + return true; + } + + return false; +} + +#ifdef _ARROW_EXIST +int run_query_on_parquet_file(const char* input_query, const char* input_file) +{ + int status; + s3select s3select_syntax; + + status = s3select_syntax.parse_query(input_query); + if (status != 0) + { + std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; + return -1; + } + + FILE *fp; + + fp=fopen(input_file,"r"); + + if(!fp){ + std::cout << "can not open " << input_file << std::endl; + return -1; + } + + std::function<int(void)> fp_get_size=[&]() + { + struct stat l_buf; + lstat(input_file,&l_buf); + return l_buf.st_size; + }; + + std::function<size_t(int64_t,int64_t,void*,optional_yield*)> fp_range_req=[&](int64_t start,int64_t length,void *buff,optional_yield*y) + { + fseek(fp,start,SEEK_SET); + size_t read_sz = fread(buff, 1, length, fp); + return read_sz; + }; + + rgw_s3select_api rgw; + rgw.set_get_size_api(fp_get_size); + rgw.set_range_req_api(fp_range_req); + + std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){std::cout << result;result.clear();return 0;}; + std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){result="";return 0;}; + std::function<void(const char*)> fp_debug = [](const char* msg) + { + std::cout << "DEBUG: {" << msg << "}" << std::endl; + }; + + parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); + //parquet_processor.set_external_debug_system(fp_debug); + + std::string result; + + do + { + try + { + status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format); + } + catch (base_s3select_exception &e) + { + std::cout << e.what() << std::endl; + //m_error_description = e.what(); + //m_error_count++; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution + { + return -1; + } + } + + if(status<0) + { + std::cout << parquet_processor.get_error_description() << std::endl; + break; + } + + std::cout << result << std::endl; + + if(status == 2) // limit reached + { + break; + } + + } while (0); + + return 0; +} +#else +int run_query_on_parquet_file(const char* input_query, const char* input_file) +{ + std::cout << "arrow is not installed" << std::endl; + return 0; +} +#endif //_ARROW_EXIST + +#define BUFFER_SIZE (4*1024*1024) +int process_json_query(const char* input_query,const char* fname) +{//purpose: process json query + + s3select s3select_syntax; + int status = s3select_syntax.parse_query(input_query); + if (status != 0) + { + std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; + return -1; + } + + std::ifstream input_file_stream; + try { + input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary); + } + catch( ... ) + { + std::cout << "failed to open file " << fname << std::endl; + exit(-1); + } + + auto object_sz = boost::filesystem::file_size(fname); + json_object json_query_processor(&s3select_syntax); + std::string buff(BUFFER_SIZE,0); + std::string result; + + + size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount(); + int chunk_count=0; + size_t bytes_read=0; + while(read_sz) + { + bytes_read += read_sz; + std::cout << "read next chunk " << chunk_count++ << ":" << read_sz << ":" << bytes_read << "\r"; + result.clear(); + + try{ + status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz); + } catch (base_s3select_exception &e) + { + std::cout << e.what() << std::endl; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution + { + return -1; + } + } + + if(result.size()) + { + std::cout << result << std::endl; + } + + if(status<0) + { + std::cout << "failure upon processing " << std::endl; + return -1; + } + if(json_query_processor.is_sql_limit_reached()) + { + std::cout << "json processing reached limit " << std::endl; + break; + } + read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount(); + } + try{ + result.clear(); + json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz); + } catch (base_s3select_exception &e) + { + std::cout << e.what() << std::endl; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution + { + return -1; + } + } + + std::cout << result << std::endl; + return 0; +} + +int run_on_localFile(char* input_query) +{ + //purpose: demostrate the s3select functionalities + s3select s3select_syntax; + + if (!input_query) + { + std::cout << "type -q 'select ... from ... '" << std::endl; + return -1; + } + + int status = s3select_syntax.parse_query(input_query); + if (status != 0) + { + std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; + return -1; + } + + std::string object_name = s3select_syntax.get_from_clause(); + + if (is_parquet_file(object_name.c_str())) + { + try { + return run_query_on_parquet_file(input_query, object_name.c_str()); + } + catch (base_s3select_exception &e) + { + std::cout << e.what() << std::endl; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution + { + return -1; + } + } + } + + FILE* fp = nullptr; + + if (object_name.compare("stdin")==0) + { + fp = stdin; + } + else + { + fp = fopen(object_name.c_str(), "r"); + } + + if(!fp) + { + std::cout << " input stream is not valid, abort;" << std::endl; + return -1; + } + + struct stat statbuf; + lstat(object_name.c_str(), &statbuf); + + std::string s3select_result; + s3selectEngine::csv_object::csv_defintions csv; + csv.use_header_info = false; + csv.quote_fields_always=false; + +#define CSV_QUOT "CSV_ALWAYS_QUOT" +#define CSV_COL_DELIM "CSV_COLUMN_DELIMETER" +#define CSV_ROW_DELIM "CSV_ROW_DELIMITER" +#define CSV_HEADER_INFO "CSV_HEADER_INFO" + + if(getenv(CSV_QUOT)) + { + csv.quote_fields_always=true; + } + if(getenv(CSV_COL_DELIM)) + { + csv.column_delimiter=*getenv(CSV_COL_DELIM); + } + if(getenv(CSV_ROW_DELIM)) + { + csv.row_delimiter=*getenv(CSV_ROW_DELIM); + } + if(getenv(CSV_HEADER_INFO)) + { + csv.use_header_info = true; + } + + s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); + + std::function<void(const char*)> fp_debug = [](const char* msg) + { + std::cout << "DEBUG" << msg << std::endl; + }; + + //s3_csv_object.set_external_debug_system(fp_debug); + +#define BUFF_SIZE (1024*1024*4) //simulate 4mb parts in s3 object + char* buff = (char*)malloc( BUFF_SIZE ); + while(1) + { + buff[0]=0; + size_t input_sz = fread(buff, 1, BUFF_SIZE, fp); + char* in=buff; + + if (!input_sz) + { + if(fp == stdin) + { + status = s3_csv_object.run_s3select_on_stream(s3select_result, nullptr, 0, 0); + if(s3select_result.size()>0) + { + std::cout << s3select_result; + } + } + break; + } + + if(fp != stdin) + { + status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size); + } + else + { + status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, INT_MAX); + } + + if(status<0) + { + std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() << std::endl; + break; + } + + if(s3select_result.size()>0) + { + std::cout << s3select_result; + } + + if(!input_sz || feof(fp) || status == 2) + { + break; + } + + s3select_result.clear(); + }//end-while + + free(buff); + fclose(fp); + + return 0; +} + +int run_on_single_query(const char* fname, const char* query) +{ + + std::unique_ptr<awsCli_handler> awscli = std::make_unique<awsCli_handler>() ; + std::ifstream input_file_stream; + try { + input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary); + } + catch( ... ) + { + std::cout << "failed to open file " << fname << std::endl; + exit(-1); + } + + + if (is_parquet_file(fname)) + { + std::string result; + int status = run_query_on_parquet_file(query, fname); + return status; + } + + s3select query_ast; + auto status = query_ast.parse_query(query); + if(status<0) + { + std::cout << "failed to parse query : " << query_ast.get_error_description() << std::endl; + return -1; + } + + if(query_ast.is_json_query()) + { + return process_json_query(query,fname); + } + + + auto file_sz = boost::filesystem::file_size(fname); + + std::string buff(BUFFER_SIZE,0); + while (1) + { + size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount(); + + status = awscli->run_s3select(query, buff.data(), read_sz, file_sz); + if(status<0) + { + std::cout << "failure on execution " << std::endl; + break; + } + else + { + std::cout << awscli->get_result() << std::endl; + } + + if(!read_sz || input_file_stream.eof()) + { + break; + } + } + + return status; +} + +int main(int argc,char **argv) +{ + char *query=0; + char *fname=0; + char *query_file=0;//file contains many queries + + for (int i = 0; i < argc; i++) + { + if (!strcmp(argv[i], "-key")) + {//object recieved as CLI parameter + fname = argv[i + 1]; + continue; + } + + if (!strcmp(argv[i], "-q")) + { + query = argv[i + 1]; + continue; + } + + if (!strcmp(argv[i], "-cmds")) + {//query file contain many queries + query_file = argv[i + 1]; + continue; + } + + if (!strcmp(argv[i], "-h") || !strcmp(argv[i], "-help")) + { + std::cout << "CSV_ALWAYS_QUOT= CSV_COLUMN_DELIMETER= CSV_ROW_DELIMITER= CSV_HEADER_INFO= s3select_example -q \"... query ...\" -key object-path -cmds queries-file" << std::endl; + exit(0); + } + } + + if(fname == 0) + {//object is in query explicitly. + return run_on_localFile(query); + } + + if(query_file) + { + //purpose: run many queries (reside in file) on single file. + std::fstream f(query_file, std::ios::in | std::ios::binary); + const auto sz = boost::filesystem::file_size(query_file); + std::string result(sz, '\0'); + f.read(result.data(), sz); + boost::char_separator<char> sep("\n"); + boost::tokenizer<boost::char_separator<char>> tokens(result, sep); + + for (const auto& t : tokens) { + std::cout << t << std::endl; + int status = run_on_single_query(fname,t.c_str()); + std::cout << "status: " << status << std::endl; + } + + return(0); + } + + int status = run_on_single_query(fname,query); + return status; +} + |