summaryrefslogtreecommitdiffstats
path: root/src/s3select/example
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/s3select/example/CMakeLists.txt23
-rw-r--r--src/s3select/example/csv_to_parquet.cpp417
-rwxr-xr-xsrc/s3select/example/expr_genrator.py9
-rw-r--r--src/s3select/example/generate_rand_csv.c28
-rwxr-xr-xsrc/s3select/example/parse_csv.py12
-rwxr-xr-xsrc/s3select/example/run_test.bash111
-rw-r--r--src/s3select/example/s3select_example.cpp711
7 files changed, 1311 insertions, 0 deletions
diff --git a/src/s3select/example/CMakeLists.txt b/src/s3select/example/CMakeLists.txt
new file mode 100644
index 000000000..8b5c8c070
--- /dev/null
+++ b/src/s3select/example/CMakeLists.txt
@@ -0,0 +1,23 @@
+add_executable(s3select_example s3select_example.cpp)
+target_include_directories(s3select_example PUBLIC ../include ../rapidjson/include)
+
+find_package(Arrow QUIET)
+
+if(Arrow_FOUND)
+ message( "arrow is installed")
+ add_executable(csv_to_parquet csv_to_parquet.cpp)
+ target_include_directories(csv_to_parquet PUBLIC ../include)
+ target_link_libraries(s3select_example boost_date_time boost_system boost_thread parquet arrow boost_filesystem)
+ target_link_libraries(csv_to_parquet boost_date_time boost_system boost_thread parquet arrow)
+else()
+ target_link_libraries(s3select_example boost_date_time boost_system boost_thread boost_filesystem)
+endif()
+
+add_executable(generate_rand_csv generate_rand_csv.c)
+
+add_custom_command(OUTPUT expr_genrator.py COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/expr_genrator.py expr_genrator.py
+ COMMENT "Copy expr_genrator.py"
+ VERBATIM)
+
+add_custom_target(expr_generator ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/expr_genrator.py)
+
diff --git a/src/s3select/example/csv_to_parquet.cpp b/src/s3select/example/csv_to_parquet.cpp
new file mode 100644
index 000000000..37a1ed0f2
--- /dev/null
+++ b/src/s3select/example/csv_to_parquet.cpp
@@ -0,0 +1,417 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cassert>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <iomanip>
+#include <algorithm>
+#include "boost/date_time/gregorian/gregorian.hpp"
+#include "boost/date_time/posix_time/posix_time.hpp"
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+
+#include <arrow/io/file.h>
+#include <arrow/util/logging.h>
+
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+
+using parquet::ConvertedType;
+using parquet::Repetition;
+using parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+/*
+ * This example describes writing and reading Parquet Files in C++ and serves as a
+ * reference to the API.
+ * The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups based on a
+ *certain size
+ **/
+
+/* Parquet is a structured columnar file format
+ * Parquet File = "Parquet data" + "Parquet Metadata"
+ * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
+ * columnar layout
+ * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
+ * Columns
+ * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
+ * complex (nested) type (internal nodes)
+ * For specific details, please refer the format here:
+ * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ **/
+
+#include <string>
+#include <boost/tokenizer.hpp>
+using namespace boost;
+using namespace std;
+
+//constexpr int NUM_ROWS = 10000000;
+constexpr int NUM_ROWS = 10000;
+
+//constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB
+constexpr int64_t ROW_GROUP_SIZE = 1024 * 1024;
+
+const char PARQUET_FILENAME[] = "csv_converted.parquet";
+
+static std::shared_ptr<GroupNode> column_string_2(uint32_t num_of_columns) {
+
+ parquet::schema::NodeVector fields;
+
+ for(uint32_t i=0;i<num_of_columns;i++)
+ {
+ std::string column_name = "column_" + to_string(i) ;
+ fields.push_back(PrimitiveNode::Make(column_name, Repetition::OPTIONAL, Type::BYTE_ARRAY,
+ ConvertedType::NONE));
+ }
+
+ return std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED, fields));
+}
+
+
+class tokenize {
+
+ public:
+ const char *s;
+ std::string input;
+ const char *p;
+ bool last_token;
+
+ tokenize(std::string& in):s(0),input(in),p(input.c_str()),last_token(false)
+ {
+ };
+
+ void get_token(std::string& token)
+ {
+ if(!*p)
+ {
+ token = "";
+ last_token = true;
+ return;
+ }
+
+
+ s=p;
+ while(*p && *p != ',' && *p != '\n') p++;
+
+ token = std::string(s,p);
+ p++;
+ }
+
+ bool is_last()
+ {
+ return last_token == true;
+ }
+};
+
+void generate_rand_columns_csv_datetime(std::string& out, size_t size) {
+ std::stringstream ss;
+ auto year = [](){return rand()%100 + 1900;};
+ auto month = [](){return 1 + rand()%12;};
+ auto day = [](){return 1 + rand()%28;};
+ auto hours = [](){return rand()%24;};
+ auto minutes = [](){return rand()%60;};
+ auto seconds = [](){return rand()%60;};
+
+ for (auto i = 0U; i < size; ++i) {
+ ss << year() << "-" << std::setw(2) << std::setfill('0')<< month() << "-" << std::setw(2) << std::setfill('0')<< day() << "T" <<std::setw(2) << std::setfill('0')<< hours() << ":" << std::setw(2) << std::setfill('0')<< minutes() << ":" << std::setw(2) << std::setfill('0')<<seconds() << "Z" << "," << std::endl;
+ }
+ out = ss.str();
+}
+
+void generate_columns_csv(std::string& out, size_t size) {
+ std::stringstream ss;
+
+ for (auto i = 0U; i < size; ++i) {
+ ss << i << "," << i+1 << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << std::endl;
+ }
+ out = ss.str();
+}
+
+void generate_rand_columns_csv_with_null(std::string& out, size_t size) {
+ std::stringstream ss;
+ auto r = [](){ int x=rand()%1000;if (x<100) return std::string(""); else return std::to_string(x);};
+
+ for (auto i = 0U; i < size; ++i) {
+ ss << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl;
+ }
+ out = ss.str();
+}
+
+void generate_fix_columns_csv(std::string& out, size_t size) {
+ std::stringstream ss;
+ for (auto i = 0U; i < size; ++i) {
+ ss << 1 << "," << 2 << "," << 3 << "," << 4 << "," << 5 << std::endl;
+ }
+ out = ss.str();
+}
+
+void generate_rand_csv_datetime_to_string(std::string& out, std::string& result, size_t size, bool const_frmt = true) {
+
+ std::stringstream ss_out, ss_res;
+ std::string format = "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-";
+ std::string months[12] = {"January", "February", "March","April", "May", "June", "July", "August", "September", "October", "November", "December"};
+ auto year = [](){return rand()%100 + 1900;};
+ auto month = [](){return 1 + rand()%12;};
+ auto day = [](){return 1 + rand()%28;};
+ auto hours = [](){return rand()%24;};
+ auto minutes = [](){return rand()%60;};
+ auto seconds = [](){return rand()%60;};
+ auto fracation_sec = [](){return rand()%1000000;};
+
+ for (auto i = 0U; i < size; ++i)
+ {
+ auto yr = year();
+ auto mnth = month();
+ auto dy = day();
+ auto hr = hours();
+ auto mint = minutes();
+ auto sec = seconds();
+ auto frac_sec = fracation_sec();
+
+ if (const_frmt)
+ {
+ ss_out << yr << "-" << std::setw(2) << std::setfill('0') << mnth << "-" << std::setw(2) << std::setfill('0') << dy << "T" <<std::setw(2) << std::setfill('0') << hr << ":" << std::setw(2) << std::setfill('0') << mint << ":" << std::setw(2) << std::setfill('0') <<sec << "." << frac_sec << "Z" << "," << std::endl;
+
+ ss_res << yr << sec << months[mnth-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy << dy << frac_sec << std::string(11 - std::to_string(frac_sec).length(), '0') << months[mnth-1] << " " << std::setw(2) << std::setfill('0') << hr << (hr < 12 ? "AM" : "PM") << ":" << mint << " -:-" << "," << std::endl;
+ }
+ else
+ {
+ switch(rand()%5)
+ {
+ case 0:
+ format = "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-";
+ ss_res << yr << sec << months[mnth-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy << dy << frac_sec << std::string(11 - std::to_string(frac_sec).length(), '0') << months[mnth-1] << " " << std::setw(2) << std::setfill('0') << hr << (hr < 12 ? "AM" : "PM") << ":" << mint << " -:-" << "," << std::endl;
+ break;
+ case 1:
+ format = "aMMhh";
+ ss_res << (hr < 12 ? "AM" : "PM") << std::setw(2) << std::setfill('0') << mnth << std::setw(2) << std::setfill('0') << (hr%12 == 0 ? 12 : hr%12) << "," << std::endl;
+ break;
+ case 2:
+ format = "y M d ABCDEF";
+ ss_res << yr << " " << mnth << " " << dy << " ABCDEF" << "," << std::endl;
+ break;
+ case 3:
+ format = "W h:MMMM";
+ ss_res << "W " << (hr%12 == 0 ? 12 : hr%12) << ":" << months[mnth-1] << "," << std::endl;
+ break;
+ case 4:
+ format = "H:m:s";
+ ss_res << hr << ":" << mint << ":" << sec << "," << std::endl;
+ break;
+ }
+
+ ss_out << yr << "-" << std::setw(2) << std::setfill('0') << mnth << "-" << std::setw(2) << std::setfill('0') << dy << "T" <<std::setw(2) << std::setfill('0') << hr << ":" << std::setw(2) << std::setfill('0') << mint << ":" << std::setw(2) << std::setfill('0') <<sec << "." << frac_sec << "Z" << "," << format << "," << std::endl;
+ }
+ }
+ out = ss_out.str();
+ result = ss_res.str();
+}
+void generate_rand_columns_csv(std::string& out, size_t size) {
+ std::stringstream ss;
+ auto r = [](){return rand()%1000;};
+
+ for (auto i = 0U; i < size; ++i) {
+ ss << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl;
+ }
+ out = ss.str();
+}
+
+int csv_to_parquet(std::string & csv_object)
+{
+
+ auto csv_num_of_columns = std::count( csv_object.begin(),csv_object.begin() + csv_object.find('\n'),',')+1;
+ auto csv_num_of_rows = std::count(csv_object.begin(),csv_object.end(),'\n');
+
+ tokenize csv_tokens(csv_object);
+
+ try {
+ // Create a local file output stream instance.
+
+ using FileClass = ::arrow::io::FileOutputStream;
+ std::shared_ptr<FileClass> out_file;
+ PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
+
+ // Setup the parquet schema
+ std::shared_ptr<GroupNode> schema = column_string_2(csv_num_of_columns);
+
+ // Add writer properties
+ parquet::WriterProperties::Builder builder;
+ // builder.compression(parquet::Compression::SNAPPY);
+ std::shared_ptr<parquet::WriterProperties> props = builder.build();
+
+ // Create a ParquetFileWriter instance
+ std::shared_ptr<parquet::ParquetFileWriter> file_writer =
+ parquet::ParquetFileWriter::Open(out_file, schema, props);
+
+ // Append a BufferedRowGroup to keep the RowGroup open until a certain size
+ parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
+
+ int num_columns = file_writer->num_columns();
+ std::vector<int64_t> buffered_values_estimate(num_columns, 0);
+
+ for (int i = 0; !csv_tokens.is_last() && i<csv_num_of_rows; i++) {
+ int64_t estimated_bytes = 0;
+ // Get the estimated size of the values that are not written to a page yet
+ for (int n = 0; n < num_columns; n++) {
+ estimated_bytes += buffered_values_estimate[n];
+ }
+
+ // We need to consider the compressed pages
+ // as well as the values that are not compressed yet
+ if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
+ estimated_bytes) > ROW_GROUP_SIZE) {
+ rg_writer->Close();
+ std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
+ rg_writer = file_writer->AppendBufferedRowGroup();
+ }
+
+ int col_id;
+ for(col_id=0;col_id<num_columns && !csv_tokens.is_last();col_id++)
+ {
+ // Write the byte-array column
+ parquet::ByteArrayWriter* ba_writer =
+ static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
+ parquet::ByteArray ba_value;
+
+ std::string token;
+ csv_tokens.get_token(token);
+ if(token.size() == 0)
+ {//null column
+ int16_t definition_level = 0;
+ ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
+ }
+ else
+ {
+ int16_t definition_level = 1;
+ ba_value.ptr = (uint8_t*)(token.data());
+ ba_value.len = token.size();
+ ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
+ }
+
+ buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
+
+
+ } //end-for columns
+
+ if(csv_tokens.is_last() && col_id<num_columns)
+ {
+ for(;col_id<num_columns;col_id++)
+ {
+ parquet::ByteArrayWriter* ba_writer =
+ static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
+
+ int16_t definition_level = 0;
+ ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
+
+ buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
+ }
+
+ }
+
+ } // end-for rows
+
+ // Close the RowGroupWriter
+ rg_writer->Close();
+ // Close the ParquetFileWriter
+ file_writer->Close();
+
+ // Write the bytes to file
+ DCHECK(out_file->Close().ok());
+
+ } catch (const std::exception& e) {
+ std::cerr << "Parquet write error: " << e.what() << std::endl;
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static int csv_file_to_parquet(int argc,char **argv)
+{
+ //open file (CSV) and load into std::string, convert to parquet(save to FS)
+
+ if (argc<2) exit(-1);
+
+ FILE* fp;
+ struct stat l_buf;
+ int st = lstat(argv[1], &l_buf);
+ if(st<0) exit(-1);
+
+ printf("input csv file size = %ld\n",l_buf.st_size);
+
+ char * buffer = new char[ l_buf.st_size ];
+ fp = fopen(argv[1],"r");
+
+ if(!fp) exit(-1);
+
+ size_t read_sz = fread(buffer, 1, l_buf.st_size,fp);
+
+ std::string csv_obj;
+ csv_obj.append(buffer,read_sz);
+
+ csv_to_parquet(csv_obj);
+
+ return 0;
+}
+
+int csv_object_to_parquet(int argc,char **argv)
+{
+ srand(time(0));
+
+ std::string csv_obj;
+ std::string expected_result;
+ generate_rand_columns_csv(csv_obj, 128);
+ //generate_rand_csv_datetime_to_string(csv_obj, expected_result, 10000);
+ //generate_rand_columns_csv_with_null(csv_obj, 10000);
+ //generate_columns_csv(csv_obj,128);
+ //generate_rand_columns_csv_datetime(csv_obj,10000);
+ generate_fix_columns_csv(csv_obj,128);
+ FILE *fp = fopen("10k.csv","w");
+
+ if(fp)
+ {
+ fwrite(csv_obj.data(),csv_obj.size(),1,fp);
+ fclose(fp);
+ }
+ else
+ {
+ exit(-1);
+ }
+
+ //csv_obj="1,2,3,4,5,6,7,8,9,10\n10,20,30,40,50,60,70,80,90,100\n";
+ csv_obj="1,2,3,4\n";
+
+ csv_to_parquet(csv_obj);
+
+ return 0;
+}
+
+int main(int argc,char **argv)
+{
+ return csv_file_to_parquet(argc,argv);
+}
+
diff --git a/src/s3select/example/expr_genrator.py b/src/s3select/example/expr_genrator.py
new file mode 100755
index 000000000..5905e9832
--- /dev/null
+++ b/src/s3select/example/expr_genrator.py
@@ -0,0 +1,9 @@
+import random
+import sys
+
+def expr(depth):
+ if depth==1 or random.random()<1.0/(2**depth-1):
+ return str(int(random.random() * 100) + 1)+".0"
+ return '(' + expr(depth-1) + random.choice(['+','-','*','/']) + expr(depth-1) + ')'
+
+print (expr( int(sys.argv[1]) ))
diff --git a/src/s3select/example/generate_rand_csv.c b/src/s3select/example/generate_rand_csv.c
new file mode 100644
index 000000000..67d52adaa
--- /dev/null
+++ b/src/s3select/example/generate_rand_csv.c
@@ -0,0 +1,28 @@
+#include <stdio.h>
+#include <stdlib.h>
+
+
+int main(int argc, char** argv)
+{
+ if (argc<3)
+ {
+ printf("%s <num-of-rows> <num-of-columns> \n", argv[0]);
+ return -1;
+ }
+
+ srand(1234);
+ int line_no=0;
+ for(int i=0; i<atoi(argv[1]); i++)
+ {
+ printf("%d,", i);
+ for(int y=0; y<atoi(argv[2]); y++)
+ {
+ printf("%d,", rand()%1000);
+ }
+ printf("\n");
+ }
+
+
+
+
+}
diff --git a/src/s3select/example/parse_csv.py b/src/s3select/example/parse_csv.py
new file mode 100755
index 000000000..23fe14add
--- /dev/null
+++ b/src/s3select/example/parse_csv.py
@@ -0,0 +1,12 @@
+#!/usr/bin/python
+
+import csv
+
+with open('stam.txt') as csv_file:
+ csv_reader = csv.reader(csv_file, delimiter=',')
+ line_count = 0
+ for row in csv_reader:
+ #if (int(row[0])==465 and int(row[5])==268): # casting is slower
+ if (row[0]=="465" and row[5]=="268"):
+ print row
+
diff --git a/src/s3select/example/run_test.bash b/src/s3select/example/run_test.bash
new file mode 100755
index 000000000..d0b5c18cb
--- /dev/null
+++ b/src/s3select/example/run_test.bash
@@ -0,0 +1,111 @@
+#!/bin/bash
+
+set -e
+
+PREFIX=${1:-"./example"}
+
+## purpose : sanity tests
+
+s3select_calc()
+{
+l="$*"
+res=$( echo 1 | "$PREFIX"/s3select_example -q "select ${l} from stdin;" )
+echo "$res" | sed 's/.$//'
+}
+
+# create c file with expression , compile it and run it.
+c_calc()
+{
+cat << @@ > "$PREFIX"/tmp.c
+
+#include <stdio.h>
+int main()
+{
+printf("%f\n",$*);
+}
+@@
+gcc -o "$PREFIX"/a.out "$PREFIX"/tmp.c
+"$PREFIX"/a.out
+}
+
+expr_test()
+{
+## test the arithmetic evaluation of s3select against C program
+for i in {1..100}
+do
+ e=$(python3 "$PREFIX"/expr_genrator.py 5)
+ echo expression["$i"]="$e"
+ r1=$(s3select_calc "$e")
+ r2=$(c_calc "$e")
+ echo "$r1" "$r2"
+
+ ## should be zero or very close to zero; ( s3select is C compile program )
+ res=$(echo "" | awk -v e="$e" -v r1="$r1" -v r2="$r2" 'function abs(n){if (n<0) return -n; else return n;}{if (abs(r1-r2) > 0.00001) {print "MISSMATCH result for expression",e;}}')
+ if test "$res" != ""; then
+ echo "$res"
+ exit 1
+ fi
+done
+}
+
+aggregate_test()
+{
+## generate_rand_csv is generating with the same seed
+echo check sum
+s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select sum(int(_1)) from stdin;')
+awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";} {s+=$1;} END{print s;}')
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+echo check min
+s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select min(int(_1)) from stdin;')
+awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";min=100000;} {if(min>$1) min=$1;} END{print min;}')
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+echo check max
+s3select_val=$("$PREFIX"/generate_rand_csv 10 10 | "$PREFIX"/s3select_example -q 'select max(int(_1)) from stdin;')
+awk_val=$("$PREFIX"/generate_rand_csv 10 10 | awk 'BEGIN{FS=",";max=0;} {if(max<$1) max=$1;} END{print max;}' )
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+echo check substr and count
+s3select_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select count(int(_1)) from stdin where int(_1)>200 and int(_1)<250;')
+awk_val=$("$PREFIX"/generate_rand_csv 10000 10 | "$PREFIX"/s3select_example -q 'select substring(_1,1,1) from stdin where int(_1)>200 and int(_1)<250;' | uniq -c | awk '{print $1;}')
+s3select_val=${s3select_val::-1}
+echo "$s3select_val" "$awk_val"
+if test "$s3select_val" -ne "$awk_val"; then
+ exit 1
+fi
+}
+
+parquet_test()
+{
+s3select_val=$(${PREFIX}/s3select_example -q "select count(*) from $(realpath parquet_mix_types.parquet) where _1>555 and _1<777;" | grep -v '^\[')
+
+if test "${s3select_val}" != "221,"; then
+ echo "parquet test failed,${s3select_val}"
+# exit
+fi
+
+s3select_val=$(${PREFIX}/s3select_example -q "select c5,c1,int(_1*0+6),int(_3*0+4),substring(c1,int(_1*0+6),int(_3*0+4)) from $(realpath parquet_mix_types.parquet) where ((c1 like \"%wedd%\") and c0 <100 ) and c5 between 2.1000000000000001 and 2.6200000000000001 and c4 between \"col4_1\" and \"col4_2\";" | grep -v '^\[')
+
+echo ${s3select_val}
+}
+
+###############################################################
+
+expr_test
+aggregate_test
+parquet_test
+
+rm "$PREFIX"/tmp.c "$PREFIX"/a.out
+
+exit 0
+
diff --git a/src/s3select/example/s3select_example.cpp b/src/s3select/example/s3select_example.cpp
new file mode 100644
index 000000000..71aff3d01
--- /dev/null
+++ b/src/s3select/example/s3select_example.cpp
@@ -0,0 +1,711 @@
+#include "s3select.h"
+#include <fstream>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <boost/crc.hpp>
+#include <arpa/inet.h>
+#include <boost/filesystem.hpp>
+#include <boost/tokenizer.hpp>
+
+using namespace s3selectEngine;
+using namespace BOOST_SPIRIT_CLASSIC_NS;
+
+class awsCli_handler {
+
+
+//TODO get parameter
+private:
+ std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
+ std::string m_s3select_query;
+ std::string m_result;
+ std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
+ std::string m_column_delimiter;//TODO remove
+ std::string m_quot;//TODO remove
+ std::string m_row_delimiter;//TODO remove
+ std::string m_compression_type;//TODO remove
+ std::string m_escape_char;//TODO remove
+ std::unique_ptr<char[]> m_buff_header;
+ std::string m_header_info;
+ std::string m_sql_query;
+ uint64_t m_total_object_processing_size;
+
+public:
+
+ awsCli_handler():
+ s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+ m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
+ m_buff_header(std::make_unique<char[]>(1000)),
+ m_total_object_processing_size(0),
+ crc32(std::unique_ptr<boost::crc_32_type>())
+ {
+ }
+
+ enum header_name_En
+ {
+ EVENT_TYPE,
+ CONTENT_TYPE,
+ MESSAGE_TYPE
+ };
+ static const char* header_name_str[3];
+
+ enum header_value_En
+ {
+ RECORDS,
+ OCTET_STREAM,
+ EVENT,
+ CONT
+ };
+ static const char* header_value_str[4];
+
+private:
+
+ void encode_short(char *buff, uint16_t s, int &i)
+ {
+ short x = htons(s);
+ memcpy(buff, &x, sizeof(s));
+ i += sizeof(s);
+ }
+
+ void encode_int(char *buff, u_int32_t s, int &i)
+ {
+ u_int32_t x = htonl(s);
+ memcpy(buff, &x, sizeof(s));
+ i += sizeof(s);
+ }
+
+ int create_header_records(char* buff)
+ {
+ int i = 0;
+
+ //1
+ buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
+ memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
+ i += strlen(header_name_str[EVENT_TYPE]);
+ buff[i++] = char(7);
+ encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
+ memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
+ i += strlen(header_value_str[RECORDS]);
+
+ //2
+ buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
+ memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
+ i += strlen(header_name_str[CONTENT_TYPE]);
+ buff[i++] = char(7);
+ encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
+ memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
+ i += strlen(header_value_str[OCTET_STREAM]);
+
+ //3
+ buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
+ memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
+ i += strlen(header_name_str[MESSAGE_TYPE]);
+ buff[i++] = char(7);
+ encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
+ memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
+ i += strlen(header_value_str[EVENT]);
+
+ return i;
+}
+
+ std::unique_ptr<boost::crc_32_type> crc32;
+
+ int create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len)
+ {
+ u_int32_t total_byte_len = 0;
+ u_int32_t preload_crc = 0;
+ u_int32_t message_crc = 0;
+ int i = 0;
+ char *buff = out_string.data();
+
+ if (crc32 == 0)
+ {
+ // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
+ crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
+ }
+
+ total_byte_len = result_len + 16;
+
+ encode_int(&buff[i], total_byte_len, i);
+ encode_int(&buff[i], header_len, i);
+
+ crc32->reset();
+ *crc32 = std::for_each(buff, buff + 8, *crc32);
+ preload_crc = (*crc32)();
+ encode_int(&buff[i], preload_crc, i);
+
+ i += result_len;
+
+ crc32->reset();
+ *crc32 = std::for_each(buff, buff + i, *crc32);
+ message_crc = (*crc32)();
+
+ int out_encode;
+ encode_int(reinterpret_cast<char*>(&out_encode), message_crc, i);
+ out_string.append(reinterpret_cast<char*>(&out_encode),sizeof(out_encode));
+
+ return i;
+ }
+
+#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
+#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"
+
+public:
+
+ //std::string get_error_description(){}
+
+ std::string get_result()
+ {
+ return m_result;
+ }
+
+ int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size)
+ {
+ int status = 0;
+ csv_object::csv_defintions csv;
+
+ m_result = "012345678901"; //12 positions for header-crc
+
+ int header_size = 0;
+
+ if (m_s3_csv_object == 0)
+ {
+ s3select_syntax->parse_query(query);
+
+ if (m_row_delimiter.size())
+ {
+ csv.row_delimiter = *m_row_delimiter.c_str();
+ }
+
+ if (m_column_delimiter.size())
+ {
+ csv.column_delimiter = *m_column_delimiter.c_str();
+ }
+
+ if (m_quot.size())
+ {
+ csv.quot_char = *m_quot.c_str();
+ }
+
+ if (m_escape_char.size())
+ {
+ csv.escape_char = *m_escape_char.c_str();
+ }
+
+ if (m_header_info.compare("IGNORE") == 0)
+ {
+ csv.ignore_header_info = true;
+ }
+ else if (m_header_info.compare("USE") == 0)
+ {
+ csv.use_header_info = true;
+ }
+
+ m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
+ }
+
+ if (s3select_syntax->get_error_description().empty() == false)
+ {
+ header_size = create_header_records(m_buff_header.get());
+ m_result.append(m_buff_header.get(), header_size);
+ m_result.append(PAYLOAD_LINE);
+ m_result.append(s3select_syntax->get_error_description());
+ //ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
+ status = -1;
+ }
+ else
+ {
+ header_size = create_header_records(m_buff_header.get());
+ m_result.append(m_buff_header.get(), header_size);
+ m_result.append(PAYLOAD_LINE);
+ //status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
+ status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, object_size);
+ if (status < 0)
+ {
+ m_result.append(m_s3_csv_object->get_error_description());
+ }
+ }
+
+ if (m_result.size() > strlen(PAYLOAD_LINE))
+ {
+ m_result.append(END_PAYLOAD_LINE);
+ create_message(m_result, m_result.size() - 12, header_size);
+ //s->formatter->write_bin_data(m_result.data(), buff_len);
+ //if (op_ret < 0)
+ //{
+ // return op_ret;
+ //}
+ }
+ //rgw_flush_formatter_and_reset(s, s->formatter);
+
+ return status;
+ }
+ //int extract_by_tag(std::string tag_name, std::string& result);
+
+ //void convert_escape_seq(std::string& esc);
+
+ //int handle_aws_cli_parameters(std::string& sql_query);
+
+};
+
+const char* awsCli_handler::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
+const char* awsCli_handler::header_value_str[4] = {"Records", "application/octet-stream", "event","cont"};
+int run_on_localFile(char* input_query);
+
+bool is_parquet_file(const char * fn)
+{//diffrentiate between csv and parquet
+ const char * ext = "parquet";
+
+ if(strstr(fn+strlen(fn)-strlen(ext), ext ))
+ {
+ return true;
+ }
+
+ return false;
+}
+
+#ifdef _ARROW_EXIST
+int run_query_on_parquet_file(const char* input_query, const char* input_file)
+{
+ int status;
+ s3select s3select_syntax;
+
+ status = s3select_syntax.parse_query(input_query);
+ if (status != 0)
+ {
+ std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
+ return -1;
+ }
+
+ FILE *fp;
+
+ fp=fopen(input_file,"r");
+
+ if(!fp){
+ std::cout << "can not open " << input_file << std::endl;
+ return -1;
+ }
+
+ std::function<int(void)> fp_get_size=[&]()
+ {
+ struct stat l_buf;
+ lstat(input_file,&l_buf);
+ return l_buf.st_size;
+ };
+
+ std::function<size_t(int64_t,int64_t,void*,optional_yield*)> fp_range_req=[&](int64_t start,int64_t length,void *buff,optional_yield*y)
+ {
+ fseek(fp,start,SEEK_SET);
+ size_t read_sz = fread(buff, 1, length, fp);
+ return read_sz;
+ };
+
+ rgw_s3select_api rgw;
+ rgw.set_get_size_api(fp_get_size);
+ rgw.set_range_req_api(fp_range_req);
+
+ std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){std::cout << result;result.clear();return 0;};
+ std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){result="";return 0;};
+ std::function<void(const char*)> fp_debug = [](const char* msg)
+ {
+ std::cout << "DEBUG: {" << msg << "}" << std::endl;
+ };
+
+ parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);
+ //parquet_processor.set_external_debug_system(fp_debug);
+
+ std::string result;
+
+ do
+ {
+ try
+ {
+ status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format);
+ }
+ catch (base_s3select_exception &e)
+ {
+ std::cout << e.what() << std::endl;
+ //m_error_description = e.what();
+ //m_error_count++;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
+ {
+ return -1;
+ }
+ }
+
+ if(status<0)
+ {
+ std::cout << parquet_processor.get_error_description() << std::endl;
+ break;
+ }
+
+ std::cout << result << std::endl;
+
+ if(status == 2) // limit reached
+ {
+ break;
+ }
+
+ } while (0);
+
+ return 0;
+}
+#else
+int run_query_on_parquet_file(const char* input_query, const char* input_file)
+{
+ std::cout << "arrow is not installed" << std::endl;
+ return 0;
+}
+#endif //_ARROW_EXIST
+
+#define BUFFER_SIZE (4*1024*1024)
+int process_json_query(const char* input_query,const char* fname)
+{//purpose: process json query
+
+ s3select s3select_syntax;
+ int status = s3select_syntax.parse_query(input_query);
+ if (status != 0)
+ {
+ std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
+ return -1;
+ }
+
+ std::ifstream input_file_stream;
+ try {
+ input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary);
+ }
+ catch( ... )
+ {
+ std::cout << "failed to open file " << fname << std::endl;
+ exit(-1);
+ }
+
+ auto object_sz = boost::filesystem::file_size(fname);
+ json_object json_query_processor(&s3select_syntax);
+ std::string buff(BUFFER_SIZE,0);
+ std::string result;
+
+
+ size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount();
+ int chunk_count=0;
+ size_t bytes_read=0;
+ while(read_sz)
+ {
+ bytes_read += read_sz;
+ std::cout << "read next chunk " << chunk_count++ << ":" << read_sz << ":" << bytes_read << "\r";
+ result.clear();
+
+ try{
+ status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz);
+ } catch (base_s3select_exception &e)
+ {
+ std::cout << e.what() << std::endl;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
+ {
+ return -1;
+ }
+ }
+
+ if(result.size())
+ {
+ std::cout << result << std::endl;
+ }
+
+ if(status<0)
+ {
+ std::cout << "failure upon processing " << std::endl;
+ return -1;
+ }
+ if(json_query_processor.is_sql_limit_reached())
+ {
+ std::cout << "json processing reached limit " << std::endl;
+ break;
+ }
+ read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount();
+ }
+ try{
+ result.clear();
+ json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz);
+ } catch (base_s3select_exception &e)
+ {
+ std::cout << e.what() << std::endl;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
+ {
+ return -1;
+ }
+ }
+
+ std::cout << result << std::endl;
+ return 0;
+}
+
+int run_on_localFile(char* input_query)
+{
+ //purpose: demostrate the s3select functionalities
+ s3select s3select_syntax;
+
+ if (!input_query)
+ {
+ std::cout << "type -q 'select ... from ... '" << std::endl;
+ return -1;
+ }
+
+ int status = s3select_syntax.parse_query(input_query);
+ if (status != 0)
+ {
+ std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
+ return -1;
+ }
+
+ std::string object_name = s3select_syntax.get_from_clause();
+
+ if (is_parquet_file(object_name.c_str()))
+ {
+ try {
+ return run_query_on_parquet_file(input_query, object_name.c_str());
+ }
+ catch (base_s3select_exception &e)
+ {
+ std::cout << e.what() << std::endl;
+ if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
+ {
+ return -1;
+ }
+ }
+ }
+
+ FILE* fp = nullptr;
+
+ if (object_name.compare("stdin")==0)
+ {
+ fp = stdin;
+ }
+ else
+ {
+ fp = fopen(object_name.c_str(), "r");
+ }
+
+ if(!fp)
+ {
+ std::cout << " input stream is not valid, abort;" << std::endl;
+ return -1;
+ }
+
+ struct stat statbuf;
+ lstat(object_name.c_str(), &statbuf);
+
+ std::string s3select_result;
+ s3selectEngine::csv_object::csv_defintions csv;
+ csv.use_header_info = false;
+ csv.quote_fields_always=false;
+
+#define CSV_QUOT "CSV_ALWAYS_QUOT"
+#define CSV_COL_DELIM "CSV_COLUMN_DELIMETER"
+#define CSV_ROW_DELIM "CSV_ROW_DELIMITER"
+#define CSV_HEADER_INFO "CSV_HEADER_INFO"
+
+ if(getenv(CSV_QUOT))
+ {
+ csv.quote_fields_always=true;
+ }
+ if(getenv(CSV_COL_DELIM))
+ {
+ csv.column_delimiter=*getenv(CSV_COL_DELIM);
+ }
+ if(getenv(CSV_ROW_DELIM))
+ {
+ csv.row_delimiter=*getenv(CSV_ROW_DELIM);
+ }
+ if(getenv(CSV_HEADER_INFO))
+ {
+ csv.use_header_info = true;
+ }
+
+ s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);
+
+ std::function<void(const char*)> fp_debug = [](const char* msg)
+ {
+ std::cout << "DEBUG" << msg << std::endl;
+ };
+
+ //s3_csv_object.set_external_debug_system(fp_debug);
+
+#define BUFF_SIZE (1024*1024*4) //simulate 4mb parts in s3 object
+ char* buff = (char*)malloc( BUFF_SIZE );
+ while(1)
+ {
+ buff[0]=0;
+ size_t input_sz = fread(buff, 1, BUFF_SIZE, fp);
+ char* in=buff;
+
+ if (!input_sz)
+ {
+ if(fp == stdin)
+ {
+ status = s3_csv_object.run_s3select_on_stream(s3select_result, nullptr, 0, 0);
+ if(s3select_result.size()>0)
+ {
+ std::cout << s3select_result;
+ }
+ }
+ break;
+ }
+
+ if(fp != stdin)
+ {
+ status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size);
+ }
+ else
+ {
+ status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, INT_MAX);
+ }
+
+ if(status<0)
+ {
+ std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() << std::endl;
+ break;
+ }
+
+ if(s3select_result.size()>0)
+ {
+ std::cout << s3select_result;
+ }
+
+ if(!input_sz || feof(fp) || status == 2)
+ {
+ break;
+ }
+
+ s3select_result.clear();
+ }//end-while
+
+ free(buff);
+ fclose(fp);
+
+ return 0;
+}
+
+int run_on_single_query(const char* fname, const char* query)
+{
+
+ std::unique_ptr<awsCli_handler> awscli = std::make_unique<awsCli_handler>() ;
+ std::ifstream input_file_stream;
+ try {
+ input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary);
+ }
+ catch( ... )
+ {
+ std::cout << "failed to open file " << fname << std::endl;
+ exit(-1);
+ }
+
+
+ if (is_parquet_file(fname))
+ {
+ std::string result;
+ int status = run_query_on_parquet_file(query, fname);
+ return status;
+ }
+
+ s3select query_ast;
+ auto status = query_ast.parse_query(query);
+ if(status<0)
+ {
+ std::cout << "failed to parse query : " << query_ast.get_error_description() << std::endl;
+ return -1;
+ }
+
+ if(query_ast.is_json_query())
+ {
+ return process_json_query(query,fname);
+ }
+
+
+ auto file_sz = boost::filesystem::file_size(fname);
+
+ std::string buff(BUFFER_SIZE,0);
+ while (1)
+ {
+ size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount();
+
+ status = awscli->run_s3select(query, buff.data(), read_sz, file_sz);
+ if(status<0)
+ {
+ std::cout << "failure on execution " << std::endl;
+ break;
+ }
+ else
+ {
+ std::cout << awscli->get_result() << std::endl;
+ }
+
+ if(!read_sz || input_file_stream.eof())
+ {
+ break;
+ }
+ }
+
+ return status;
+}
+
+int main(int argc,char **argv)
+{
+ char *query=0;
+ char *fname=0;
+ char *query_file=0;//file contains many queries
+
+ for (int i = 0; i < argc; i++)
+ {
+ if (!strcmp(argv[i], "-key"))
+ {//object recieved as CLI parameter
+ fname = argv[i + 1];
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-q"))
+ {
+ query = argv[i + 1];
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-cmds"))
+ {//query file contain many queries
+ query_file = argv[i + 1];
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-h") || !strcmp(argv[i], "-help"))
+ {
+ std::cout << "CSV_ALWAYS_QUOT= CSV_COLUMN_DELIMETER= CSV_ROW_DELIMITER= CSV_HEADER_INFO= s3select_example -q \"... query ...\" -key object-path -cmds queries-file" << std::endl;
+ exit(0);
+ }
+ }
+
+ if(fname == 0)
+ {//object is in query explicitly.
+ return run_on_localFile(query);
+ }
+
+ if(query_file)
+ {
+ //purpose: run many queries (reside in file) on single file.
+ std::fstream f(query_file, std::ios::in | std::ios::binary);
+ const auto sz = boost::filesystem::file_size(query_file);
+ std::string result(sz, '\0');
+ f.read(result.data(), sz);
+ boost::char_separator<char> sep("\n");
+ boost::tokenizer<boost::char_separator<char>> tokens(result, sep);
+
+ for (const auto& t : tokens) {
+ std::cout << t << std::endl;
+ int status = run_on_single_query(fname,t.c_str());
+ std::cout << "status: " << status << std::endl;
+ }
+
+ return(0);
+ }
+
+ int status = run_on_single_query(fname,query);
+ return status;
+}
+