diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/s3select/test/s3select_test.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/s3select/test/s3select_test.h')
-rw-r--r-- | src/s3select/test/s3select_test.h | 709 |
1 files changed, 709 insertions, 0 deletions
diff --git a/src/s3select/test/s3select_test.h b/src/s3select/test/s3select_test.h new file mode 100644 index 000000000..307db8a4b --- /dev/null +++ b/src/s3select/test/s3select_test.h @@ -0,0 +1,709 @@ +/* + * /usr/include/boost/bind.hpp:36:1: note: ‘#pragma message: The practice of declaring the Bind placeholders (_1, _2, ...) in the global namespace is deprecated. Please use <boost/bind/bind.hpp> + using namespace boost::placeholders, or define BOOST_BIND_GLOBAL_PLACEHOLDERS to retain the current behavior.’ + */ +#define BOOST_BIND_GLOBAL_PLACEHOLDERS + +#include "s3select.h" +#include "gtest/gtest.h" +#include <string> +#include <fstream> +#include <iomanip> +#include <algorithm> +#include "boost/date_time/gregorian/gregorian.hpp" +#include "boost/date_time/posix_time/posix_time.hpp" + +using namespace s3selectEngine; + +// parquet conversion +// ============================================================ // +#include <cassert> +#include <fstream> +#include <iostream> +#include <memory> + +#ifdef _ARROW_EXIST + +#include <arrow/io/file.h> +#include <arrow/util/logging.h> + +#include <parquet/api/reader.h> +#include <parquet/api/writer.h> + +using parquet::ConvertedType; +using parquet::Repetition; +using parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +#endif + +constexpr int NUM_ROWS = 100000; +constexpr int64_t ROW_GROUP_SIZE = 1024 * 1024; +const char PARQUET_FILENAME[] = "/tmp/csv_converted.parquet"; +#define JSON_NO_RUN "no_run" + +class tokenize { + + public: + const char *s; + std::string input; + const char *p; + bool last_token; + + tokenize(std::string& in):s(0),input(in),p(input.c_str()),last_token(false) + { + }; + + void get_token(std::string& token) + { + if(!*p) + { + token = ""; + last_token = true; + return; + } + + s=p; + while(*p && *p != ',' && *p != '\n') p++; + + token = std::string(s,p); + p++; + } + + bool is_last() + { + return last_token == true; + } +}; + +#ifdef _ARROW_EXIST + +static std::shared_ptr<GroupNode> column_string_2(uint32_t num_of_columns) { + + parquet::schema::NodeVector fields; + + for(uint32_t i=0;i<num_of_columns;i++) + { + std::string column_name = "column_" + std::to_string(i) ; + fields.push_back(PrimitiveNode::Make(column_name, Repetition::OPTIONAL, Type::BYTE_ARRAY, + ConvertedType::NONE)); + } + + return std::static_pointer_cast<GroupNode>( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); +} + +int csv_to_parquet(std::string & csv_object) +{ + + auto csv_num_of_columns = std::count( csv_object.begin(),csv_object.begin() + csv_object.find('\n'),',')+1; + auto csv_num_of_rows = std::count(csv_object.begin(),csv_object.end(),'\n'); + + tokenize csv_tokens(csv_object); + + try { + // Create a local file output stream instance. + + using FileClass = ::arrow::io::FileOutputStream; + std::shared_ptr<FileClass> out_file; + PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); + + // Setup the parquet schema + std::shared_ptr<GroupNode> schema = column_string_2(csv_num_of_columns); + + // Add writer properties + parquet::WriterProperties::Builder builder; + // builder.compression(parquet::Compression::SNAPPY); + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr<parquet::ParquetFileWriter> file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a BufferedRowGroup to keep the RowGroup open until a certain size + parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup(); + + int num_columns = file_writer->num_columns(); + std::vector<int64_t> buffered_values_estimate(num_columns, 0); + + for (int i = 0; !csv_tokens.is_last() && i<csv_num_of_rows; i++) { + int64_t estimated_bytes = 0; + // Get the estimated size of the values that are not written to a page yet + for (int n = 0; n < num_columns; n++) { + estimated_bytes += buffered_values_estimate[n]; + } + + // We need to consider the compressed pages + // as well as the values that are not compressed yet + if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() + + estimated_bytes) > ROW_GROUP_SIZE) { + rg_writer->Close(); + std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0); + rg_writer = file_writer->AppendBufferedRowGroup(); + } + + + int col_id; + for(col_id=0;col_id<num_columns && !csv_tokens.is_last();col_id++) + { + // Write the byte-array column + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); + parquet::ByteArray ba_value; + + std::string token; + csv_tokens.get_token(token); + if(token.size() == 0) + {//null column + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + } + else + { + int16_t definition_level = 1; + ba_value.ptr = (uint8_t*)(token.data()); + ba_value.len = token.size(); + ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value); + } + + buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); + + + } //end-for columns + + if(csv_tokens.is_last() && col_id<num_columns) + { + for(;col_id<num_columns;col_id++) + { + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); + + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + + buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); + } + + } + + } // end-for rows + + // Close the RowGroupWriter + rg_writer->Close(); + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return -1; + } + + return 0; +} + +int run_query_on_parquet_file(const char* input_query, const char* input_file, std::string &result) +{ + int status; + s3select s3select_syntax; + result.clear(); + + status = s3select_syntax.parse_query(input_query); + if (status != 0) + { + std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; + return -1; + } + + FILE *fp=nullptr; + + fp=fopen(input_file,"r"); + + if(!fp){ + std::cout << "can not open " << input_file << std::endl; + return -1; + } + + std::function<int(void)> fp_get_size=[&]() + { + struct stat l_buf; + lstat(input_file,&l_buf); + return l_buf.st_size; + }; + + std::function<size_t(int64_t,int64_t,void*,optional_yield*)> fp_range_req=[&](int64_t start,int64_t length,void *buff,optional_yield*y) + { + fseek(fp,start,SEEK_SET); + size_t read_sz = fread(buff, 1, length, fp); + return read_sz; + }; + + rgw_s3select_api rgw; + rgw.set_get_size_api(fp_get_size); + rgw.set_range_req_api(fp_range_req); + + std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){return 0;};//append + std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){return 0;};//append + + parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); + + //std::string result; + + do + { + try + { + status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format); + } + catch (base_s3select_exception &e) + { + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution + { + if(fp){ + fclose(fp); + } + return -1; + } + } + + if (status < 0) + break; + + } while (0); + + if(fp){ + fclose(fp); + } + return 0; +}// ============================================================ // +#else +int run_query_on_parquet_file(const char* input_query, const char* input_file, std::string &result) +{ + return 0; +} +#endif //_ARROW_EXIST + +std::string convert_to_json(const char* csv_stream, size_t stream_length) +{ + char* m_stream; + char* m_end_stream; + char row_delimiter('\n'); + char column_delimiter(','); + bool previous{true}; + + m_stream = (char*)csv_stream; + m_end_stream = (char*)csv_stream + stream_length; + std::stringstream ss; + ss << std::endl; + ss << "{\"root\" : ["; + ss << std::endl; + while (m_stream < m_end_stream) { + int counter{}; + ss << "{"; + while( *m_stream && (*m_stream != row_delimiter) ) { + if (*m_stream != column_delimiter && previous) { + ss << "\"c" << ++counter << "\"" << ":"; + ss << "\""; + ss << *m_stream; + previous = false; + } else if (*m_stream != column_delimiter) { + ss << *m_stream; + } else if (*m_stream == column_delimiter) { + if (previous) { + ss << "\"c" << ++counter << "\"" << ":"; + ss << "null"; + } else { + ss << "\""; + } + ss << ","; + previous = true; + } + m_stream++; + } + if(previous) { + ss.seekp(-1, std::ios_base::end); + } else { + ss << "\""; + } + previous = true; + ss << "}" << ',' << std::endl; + m_stream++; + } + ss.seekp(-2, std::ios_base::end); + ss << std::endl; + ss << "]" << "}"; + return ss.str(); +} + +const char* convert_query(std::string& expression) +{ + std::string from_clause = "s3object"; + boost::replace_all(expression, from_clause, "s3object[*].root"); + + std::string from_clause_1 = "stdin"; + boost::replace_all(expression, from_clause_1, "s3object[*].root"); + + std::string col_1 = "_1"; + boost::replace_all(expression, col_1, "_1.c1"); + + std::string col_2 = "_2"; + boost::replace_all(expression, col_2, "_1.c2"); + + std::string col_3 = "_3"; + boost::replace_all(expression, col_3, "_1.c3"); + + std::string col_4 = "_4"; + boost::replace_all(expression, col_4, "_1.c4"); + + std::string col_5 = "_5"; + boost::replace_all(expression, col_5, "_1.c5"); + + std::string col_9 = "_9"; + boost::replace_all(expression, col_9, "_1.c9"); + + return expression.c_str(); +} + + +std::string run_expression_in_C_prog(const char* expression) +{ +//purpose: per use-case a c-file is generated, compiles , and finally executed. + +// side note: its possible to do the following: cat test_hello.c | gcc -pipe -x c - -o /dev/stdout > ./1 +// gcc can read and write from/to pipe (use pipe2()) i.e. not using file-system , BUT should also run gcc-output from memory + + const int C_FILE_SIZE=(1024*1024); + std::string c_test_file = std::string("/tmp/test_s3.c"); + std::string c_run_file = std::string("/tmp/s3test"); + + FILE* fp_c_file = fopen(c_test_file.c_str(), "w"); + + //contain return result + char result_buff[100]; + + char* prog_c = 0; + + if(fp_c_file) + { + prog_c = (char*)malloc(C_FILE_SIZE); + + size_t sz=sprintf(prog_c,"#include <stdio.h>\n \ + #include <float.h>\n \ + int main() \ + {\ + printf(\"%%.*e\\n\",DECIMAL_DIG,(double)(%s));\ + } ", expression); + + fwrite(prog_c, 1, sz, fp_c_file); + fclose(fp_c_file); + } + + std::string gcc_and_run_cmd = std::string("gcc ") + c_test_file + " -o " + c_run_file + " -Wall && " + c_run_file; + + FILE* fp_build = popen(gcc_and_run_cmd.c_str(), "r"); //TODO read stderr from pipe + + if(!fp_build) + { + if(prog_c) + free(prog_c); + + return std::string("#ERROR#"); + } + + char * res = fgets(result_buff, sizeof(result_buff), fp_build); + + if(!res) + { + if(prog_c) + free(prog_c); + + fclose(fp_build); + return std::string("#ERROR#"); + } + + unlink(c_run_file.c_str()); + unlink(c_test_file.c_str()); + fclose(fp_build); + + if(prog_c) + free(prog_c); + + return std::string(result_buff); +} + +#define OPER oper[ rand() % oper.size() ] + +class gen_expr +{ + +private: + + int open = 0; + std::string oper= {"+-+*/*"}; + + std::string gexpr() + { + return std::to_string(rand() % 1000) + ".0" + OPER + std::to_string(rand() % 1000) + ".0"; + } + + std::string g_openp() + { + if ((rand() % 3) == 0) + { + open++; + return std::string("("); + } + return std::string(""); + } + + std::string g_closep() + { + if ((rand() % 2) == 0 && open > 0) + { + open--; + return std::string(")"); + } + return std::string(""); + } + +public: + + std::string generate() + { + std::string exp = ""; + open = 0; + + for (int i = 0; i < 10; i++) + { + exp = (exp.size() > 0 ? exp + OPER : std::string("")) + g_openp() + gexpr() + OPER + gexpr() + g_closep(); + } + + if (open) + for (; open--;) + { + exp += ")"; + } + + return exp; + } +}; + +const std::string failure_sign("#failure#"); + +std::string string_to_quot(std::string& s, char quot = '"') +{ + std::string result = ""; + std::stringstream str_strm; + str_strm << s; + std::string temp_str; + int temp_int; + while(!str_strm.eof()) { + str_strm >> temp_str; + if(std::stringstream(temp_str) >> temp_int) { + std::stringstream s1; + s1 << temp_int; + result += quot + s1.str() + quot + "\n"; + } + temp_str = ""; + } + return result; +} + +void parquet_csv_report_error(std::string parquet_result, std::string csv_result) +{ +#ifdef _ARROW_EXIST + ASSERT_EQ(parquet_result,csv_result); +#else + ASSERT_EQ(0,0); +#endif +} + +void json_csv_report_error(std::string json_result, std::string csv_result) +{ + ASSERT_EQ(json_result, csv_result); +} + +std::string run_s3select(std::string expression) +{//purpose: run query on single row and return result(single projections). + s3select s3select_syntax; + + int status = s3select_syntax.parse_query(expression.c_str()); + + if(status) + return failure_sign; + + std::string s3select_result; + s3selectEngine::csv_object s3_csv_object(&s3select_syntax); + std::string in = "1,1,1,1\n"; + std::string csv_obj = in; + std::string parquet_result; + + s3_csv_object.run_s3select_on_object(s3select_result, in.c_str(), in.size(), false, false, true); + + s3select_result = s3select_result.substr(0, s3select_result.find_first_of(",")); + s3select_result = s3select_result.substr(0, s3select_result.find_first_of("\n"));//remove last \n + +#ifdef _ARROW_EXIST + csv_to_parquet(csv_obj); + run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); + parquet_result = parquet_result.substr(0, parquet_result.find_first_of(",")); + parquet_result = parquet_result.substr(0, parquet_result.find_first_of("\n"));//remove last \n + + parquet_csv_report_error(parquet_result,s3select_result); +#endif + + return s3select_result; +} + +void run_s3select_test_opserialization(std::string expression,std::string input, char *row_delimiter, char *column_delimiter) +{//purpose: run query on multiple rows and return result(multiple projections). + s3select s3select_syntax; + + int status = s3select_syntax.parse_query(expression.c_str()); + + if(status) + return; + + std::string s3select_result; + csv_object::csv_defintions csv; + csv.redundant_column = false; + + csv.output_row_delimiter = *row_delimiter; + csv.output_column_delimiter = *column_delimiter; + + s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); + + s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); + + std::string s3select_result1 = s3select_result; + + csv.row_delimiter = *row_delimiter; + csv.column_delimiter = *column_delimiter; + csv.output_row_delimiter = *row_delimiter; + csv.output_column_delimiter = *column_delimiter; + csv.redundant_column = false; + std::string s3select_result_second_phase; + + s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax, csv); + + s3_csv_object_second.run_s3select_on_object(s3select_result_second_phase, s3select_result.c_str(), s3select_result.size(), false, false, true); + + ASSERT_EQ(s3select_result_second_phase, s3select_result1); +} + +std::string run_s3select_opserialization_quot(std::string expression,std::string input, bool quot_always = false, char quot_char = '"') +{//purpose: run query on multiple rows and return result(multiple projections). + s3select s3select_syntax; + + int status = s3select_syntax.parse_query(expression.c_str()); + + if(status) + return failure_sign; + + std::string s3select_result; + csv_object::csv_defintions csv; + + csv.redundant_column = false; + csv.quote_fields_always = quot_always; + csv.output_quot_char = quot_char; + + s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); + + s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); + + return s3select_result; +} + +// JSON tests API's +int run_json_query(const char* json_query, std::string& json_input,std::string& result) +{//purpose: run single-chunk json queries + + s3select s3select_syntax; + int status = s3select_syntax.parse_query(json_query); + if (status != 0) + { + std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; + return -1; + } + + json_object json_query_processor(&s3select_syntax); + result.clear(); + status = json_query_processor.run_s3select_on_stream(result, json_input.data(), json_input.size(), json_input.size()); + std::string prev_result = result; + result.clear(); + status = json_query_processor.run_s3select_on_stream(result, 0, 0, json_input.size()); + + result = prev_result + result; + + return status; +} + +std::string run_s3select(std::string expression,std::string input, const char* json_query = "") +{//purpose: run query on multiple rows and return result(multiple projections). + s3select s3select_syntax; + std::string parquet_input = input; + + std::string js = convert_to_json(input.c_str(), input.size()); + + int status = s3select_syntax.parse_query(expression.c_str()); + + if(status) + return failure_sign; + + std::string s3select_result; + std::string json_result; + s3selectEngine::csv_object s3_csv_object(&s3select_syntax); + s3_csv_object.m_csv_defintion.redundant_column = false; + + s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); + +#ifdef _ARROW_EXIST + static int file_no = 1; + csv_to_parquet(parquet_input); + std::string parquet_result; + run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); + + if (strcmp(parquet_result.c_str(),s3select_result.c_str())) + { + std::cout << "failed on query " << expression << std::endl; + std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl; + + { + std::string buffer; + + std::ifstream f(PARQUET_FILENAME); + f.seekg(0, std::ios::end); + buffer.resize(f.tellg()); + f.seekg(0); + f.read(buffer.data(), buffer.size()); + + std::string fn = std::string("./failed_test_input_") + std::to_string(file_no) + std::string(".parquet"); + std::ofstream fw(fn.c_str()); + fw.write(buffer.data(), buffer.size()); + + fn = std::string("./failed_test_input_") + std::to_string(file_no++) + std::string(".csv"); + std::ofstream fw2(fn.c_str()); + fw2.write(parquet_input.data(), parquet_input.size()); + + } + } + + parquet_csv_report_error(parquet_result,s3select_result); +#endif //_ARROW_EXIST + + if(strlen(json_query) == 0) { + json_query = convert_query(expression); + } + + if(strcmp(json_query,JSON_NO_RUN)) { + run_json_query(json_query, js, json_result); + json_csv_report_error(json_result, s3select_result); + } + + return s3select_result; +} + + + |