diff options
Diffstat (limited to '')
-rw-r--r-- | src/s3select/include/s3select.h | 271 |
1 files changed, 218 insertions, 53 deletions
diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h index 3ac111351..667c92ba9 100644 --- a/src/s3select/include/s3select.h +++ b/src/s3select/include/s3select.h @@ -18,6 +18,7 @@ #include <boost/function.hpp> #include <boost/bind.hpp> #include <functional> +#include <unordered_set> #define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} @@ -1293,11 +1294,11 @@ void push_logical_operator::builder(s3select* self, const char* a, const char* b std::string token(a, b); logical_operand::oplog_t l = logical_operand::oplog_t::NA; - if (token == "and") + if (boost::iequals(token,"and")) { l = logical_operand::oplog_t::AND; } - else if (token == "or") + else if (boost::iequals(token,"or")) { l = logical_operand::oplog_t::OR; } @@ -1633,8 +1634,10 @@ void push_like_predicate_escape::builder(s3select* self, const char* a, const ch void push_is_null_predicate::builder(s3select* self, const char* a, const char* b) const { - //expression is null, is not null + //expression could be is null OR is not null std::string token(a, b); + //to_lower enable case insensitive + boost::algorithm::to_lower(token); bool is_null = true; for(size_t i=0;i<token.size();i++) @@ -1936,7 +1939,7 @@ void push_trim_type::builder(s3select* self, const char* a, const char* b) const { std::string token(a, b); - auto trim_option = [&](const char *s){return strncmp(a,s,strlen(s))==0;}; + auto trim_option = [&](const char *s){return strncasecmp(a,s,strlen(s))==0;}; if(trim_option("leading")) { @@ -2156,10 +2159,12 @@ struct s3select_csv_definitions //TODO bool quote_fields_asneeded; bool redundant_column; bool comment_empty_lines; + bool output_json_format; std::vector<char> comment_chars; std::vector<char> trim_chars; + std::string schema; - s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false) {} + s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false), output_json_format(false) {} }; @@ -2172,6 +2177,8 @@ protected: scratch_area* m_sa; std::string m_obj_name; bool m_aggr_flow = false; //TODO once per query + bool is_star = false; + bool is_json = false; bool m_is_to_aggregate; std::vector<base_statement*> m_projections; base_statement* m_where_clause; @@ -2182,9 +2189,11 @@ protected: unsigned long m_processed_rows; size_t m_returned_bytes_size; std::function<void(const char*)> fp_ext_debug_mesg;//dispache debug message into external system + std::vector<std::string> m_projection_keys{}; public: s3select_csv_definitions m_csv_defintion;//TODO add method for modify + std::string m_error_description; enum class Status { END_OF_STREAM, @@ -2196,6 +2205,16 @@ public: Status m_sql_processing_status; + void set_processing_time_error() + { + m_sql_processing_status = Status::SQL_ERROR; + } + + bool is_processing_time_error() + { + return m_sql_processing_status == Status::SQL_ERROR; + } + Status get_sql_processing_status() { return m_sql_processing_status; @@ -2206,6 +2225,60 @@ public: return m_sql_processing_status == Status::LIMIT_REACHED; } + void set_star_true() { + is_star = true; + } + + void set_projection_keys(std::vector<base_statement*> m_projections) + { + std::vector<std::string> alias_values{}; + std::unordered_set<base_statement*> alias_projection_keys{}; + bool is_output_json_format = m_csv_defintion.output_json_format; + + for (auto& a : *m_s3_select->get_aliases()->get()) + { + alias_values.push_back(a.first); + alias_projection_keys.insert(a.second); + } + + size_t m_alias_index = 0; + int index_json_projection = 0; + is_json = m_s3_select->is_json_query(); + + for (auto& p : m_projections) + { + if(p->is_statement_contain_star_operation()) + { + set_star_true(); + } + p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query()); + + std::string key_from_projection{}; + if(p->is_column()){ + key_from_projection = p->get_key_from_projection(); + } + + if(alias_projection_keys.count(p) == 0 && p->is_column()) { + m_projection_keys.push_back(key_from_projection); + } else if(alias_projection_keys.count(p) > 0 && p->is_column()) { + m_projection_keys.push_back(alias_values[m_alias_index++]); + } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) > 0 ) { + m_projection_keys.push_back(alias_values[m_alias_index++]); + } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) == 0) { + std::string index_json = "_" + std::to_string(++index_json_projection); + m_projection_keys.push_back(index_json); + } + } + + if(m_s3_select->is_json_query()) { + for(auto& k: m_projection_keys) { + size_t lastDotPosition = k.find_last_of('.'); + std::string extractedPart = k.substr(lastDotPosition + 1); + k = extractedPart; + } + } + } + void set_base_defintions(s3select* m) { if(m_s3_select || !m) @@ -2225,10 +2298,8 @@ public: m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query()); } - for (auto& p : m_projections) - { - p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query()); - } + set_projection_keys(m_projections); + m_is_to_aggregate = true;//TODO not correct. should be set upon end-of-stream m_aggr_flow = m_s3_select->is_aggregate_query(); @@ -2269,45 +2340,95 @@ public: return m_returned_bytes_size; } - void result_values_to_string(multi_values& projections_resuls, std::string& result) + void json_result_format(multi_values& projections_results, std::string& result, std::string& output_delimiter) { - size_t i = 0; + result += "{"; + int j = 0; + for (size_t i = 0; i < projections_results.values.size(); ++i) + { + auto& res = projections_results.values[i]; + std::string label = "_"; + label += std::to_string(i + 1); + + if (i > 0) { + result += output_delimiter; + } + + if(!is_star) { + result += "\"" + m_projection_keys[j] + "\":"; + } else if(is_star && !is_json) { + result += "\"" + label + "\":"; + } + + result.append(res->to_string()); + m_returned_bytes_size += strlen(res->to_string()); + ++j; + } + result += "}"; + + } + + + void result_values_to_string(multi_values& projections_resuls, std::string& result) +{ std::string output_delimiter(1,m_csv_defintion.output_column_delimiter); std::string output_row_delimiter(1,m_csv_defintion.output_row_delimiter); + if(m_csv_defintion.output_json_format && projections_resuls.values.size()) { + json_result_format(projections_resuls, result, output_delimiter); + result.append(output_row_delimiter); + return; + } + + size_t i = 0; for(auto& res : projections_resuls.values) { + + std::string column_result; + + try{ + column_result = res->to_string(); + } + catch(std::exception& e) + { + column_result = "{failed to compute projection: " + std::string(e.what()) + "}"; + m_error_description = column_result; + set_processing_time_error(); + } + + if(fp_ext_debug_mesg) - fp_ext_debug_mesg( res->to_string() ); + fp_ext_debug_mesg(column_result.data()); if (m_csv_defintion.quote_fields_always) { std::ostringstream quoted_result; - quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char); + quoted_result << std::quoted(column_result,m_csv_defintion.output_quot_char, m_csv_defintion.escape_char); result.append(quoted_result.str()); + m_returned_bytes_size += quoted_result.str().size(); - }//TODO to add asneeded + }//TODO to add asneeded else { - result.append(res->to_string()); - m_returned_bytes_size += strlen(res->to_string()); + result.append(column_result); + m_returned_bytes_size += column_result.size(); + } - if(!m_csv_defintion.redundant_column) { - if(++i < projections_resuls.values.size()) { - result.append(output_delimiter); - m_returned_bytes_size += output_delimiter.size(); - } - } - else { - result.append(output_delimiter); - m_returned_bytes_size += output_delimiter.size(); - } - } - if(!m_aggr_flow){ - result.append(output_row_delimiter); + if(!m_csv_defintion.redundant_column) { + if(++i < projections_resuls.values.size()) { + result.append(output_delimiter); + m_returned_bytes_size += output_delimiter.size(); + } + } else { + result.append(output_delimiter); m_returned_bytes_size += output_delimiter.size(); + } } - } + if(!m_aggr_flow) { + result.append(output_row_delimiter); + m_returned_bytes_size += output_delimiter.size(); + } +} Status getMatchRow( std::string& result) { @@ -2336,7 +2457,7 @@ public: } result_values_to_string(projections_resuls,result); - return m_sql_processing_status = Status::END_OF_STREAM; + return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::END_OF_STREAM); } m_processed_rows++; @@ -2368,9 +2489,9 @@ public: i->set_last_call(); i->set_skip_non_aggregate(false);//projection column is set to be runnable projections_resuls.push_value( &(i->eval()) ); - } + } result_values_to_string(projections_resuls,result); - return m_sql_processing_status = Status::LIMIT_REACHED; + return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::LIMIT_REACHED); } } while (multiple_row_processing()); @@ -2394,6 +2515,7 @@ public: { a.second->invalidate_cache_result(); } + } while (multiple_row_processing() && m_where_clause && !(where_clause_result = m_where_clause->eval().is_true()) && !(m_is_limit_on && m_processed_rows == m_limit)); @@ -2421,12 +2543,18 @@ public: for (auto& i : m_projections) { projections_resuls.push_value( &(i->eval()) ); - } - result_values_to_string(projections_resuls,result); + } + result_values_to_string(projections_resuls,result); + if(m_sql_processing_status == Status::SQL_ERROR) + { + return m_sql_processing_status; + } } - } - return is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT); + + return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : + (is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT)); + }//getMatchRow @@ -2477,14 +2605,12 @@ public: { //return; } - - set_base_defintions(s3_query); m_csv_defintion = csv; + set_base_defintions(s3_query); } private: bool m_skip_last_line; - std::string m_error_description; char* m_stream; char* m_end_stream; std::vector<char*> m_row_tokens; @@ -2604,26 +2730,36 @@ public: m_error_description = "escaped_char_missing failure while csv parsing"; return -1; } - catch(io::error::escaped_string_not_closed& err) + catch(io::error::escaped_string_not_closed& err) { m_error_description = "escaped_string_not_closed failure while csv parsing"; return -1; } - catch(io::error::line_length_limit_exceeded& err) + catch(io::error::line_length_limit_exceeded& err) { m_error_description = "line_length_limit_exceeded failure while csv parsing"; return -1; } - catch(io::error::with_file_name& err) + catch(io::error::missmatch_of_begin_end& err) { - m_error_description = "with_file_name failure while csv parsing"; + m_error_description = "missmatch_of_begin_end failure while csv parsing" + std::string(err.what()); return -1; } - catch(io::error::with_file_line& err) + catch(io::error::missmatch_end& err) { - m_error_description = "with_file_line failure while csv parsing"; + m_error_description = "missmatch_end failure while csv parsing" + std::string(err.what()); return -1; } + catch(io::error::with_file_name& err) + { + m_error_description = "with_file_name failure while csv parsing"; + return -1; + } + catch(std::exception& e) + { + m_error_description = "error while processing CSV object : " + std::string(e.what()); + return -1; + } return status; } @@ -2634,7 +2770,7 @@ private: //purpose: the CSV data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed. std::string tmp_buff; - + int status = 0; m_processed_bytes += stream_length; m_skip_first_line = false; @@ -2648,6 +2784,22 @@ private: p_obj_chunk++; } + if(*p_obj_chunk != m_csv_defintion.row_delimiter) + {// previous row can not be completed with current chunk + if(fp_ext_debug_mesg) + { + std::string err_mesg = "** the stream chunk is too small for processing(saved for later) **"; + fp_ext_debug_mesg(err_mesg.c_str()); + } + //copy the part to be processed later + tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream)); + //saved for later processing + m_last_line.append(tmp_buff); + m_previous_line = true;//it means to skip last line + //skip processing since the row tail is missing. + return 0; + } + tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream)); merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter; m_previous_line = false; @@ -2655,7 +2807,7 @@ private: m_skip_x_first_bytes = tmp_buff.size()+1; //processing the merged row (previous broken row) - run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false); + status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false); } if (stream_length && csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter) @@ -2676,7 +2828,8 @@ private: stream_length -= (m_last_line.length()); } - return run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size)); + status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size)); + return status; } public: @@ -2696,6 +2849,11 @@ public: m_skip_x_first_bytes=0; } + if(m_stream>m_end_stream) + { + throw base_s3select_exception(std::string("** m_stream > m_end_stream **") + + std::to_string( (m_stream - m_end_stream) ) ,base_s3select_exception::s3select_exp_en_t::FATAL); + } CSVParser _csv_parser("csv", m_stream, m_end_stream); csv_parser = &_csv_parser; csv_parser->set_csv_def( m_csv_defintion.row_delimiter, @@ -2745,6 +2903,10 @@ public: { break;//user should request for sql_processing_status } + if(m_sql_processing_status == Status::SQL_ERROR) + { + return -1; + } } while (true); @@ -2764,7 +2926,6 @@ class parquet_object : public base_s3object { private: - std::string m_error_description; parquet_file_parser* object_reader; parquet_file_parser::column_pos_t m_where_clause_columns; parquet_file_parser::column_pos_t m_projections_columns; @@ -2948,11 +3109,13 @@ private: std::string* m_s3select_result = nullptr; size_t m_row_count; bool star_operation_ind; - std::string m_error_description; bool m_init_json_processor_ind; public: + class csv_definitions : public s3select_csv_definitions + {}; + void init_json_processor(s3select* query) { if(m_init_json_processor_ind) @@ -2997,6 +3160,7 @@ public: if(p->is_statement_contain_star_operation()) { star_operation_ind=true; + set_star_true(); break; } } @@ -3095,7 +3259,7 @@ private: public: - int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size) + int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size, bool json_format = false) { int status=0; m_processed_bytes += stream_length; @@ -3134,8 +3298,9 @@ public: return status; } - void set_json_query(s3select* s3_query) + void set_json_query(s3select* s3_query, csv_definitions csv) { + m_csv_defintion = csv; set_base_defintions(s3_query); init_json_processor(s3_query); } |