summaryrefslogtreecommitdiffstats
path: root/src/s3select/include/s3select.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/s3select/include/s3select.h271
1 files changed, 218 insertions, 53 deletions
diff --git a/src/s3select/include/s3select.h b/src/s3select/include/s3select.h
index 3ac111351..667c92ba9 100644
--- a/src/s3select/include/s3select.h
+++ b/src/s3select/include/s3select.h
@@ -18,6 +18,7 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <functional>
+#include <unordered_set>
#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}
@@ -1293,11 +1294,11 @@ void push_logical_operator::builder(s3select* self, const char* a, const char* b
std::string token(a, b);
logical_operand::oplog_t l = logical_operand::oplog_t::NA;
- if (token == "and")
+ if (boost::iequals(token,"and"))
{
l = logical_operand::oplog_t::AND;
}
- else if (token == "or")
+ else if (boost::iequals(token,"or"))
{
l = logical_operand::oplog_t::OR;
}
@@ -1633,8 +1634,10 @@ void push_like_predicate_escape::builder(s3select* self, const char* a, const ch
void push_is_null_predicate::builder(s3select* self, const char* a, const char* b) const
{
- //expression is null, is not null
+ //expression could be is null OR is not null
std::string token(a, b);
+ //to_lower enable case insensitive
+ boost::algorithm::to_lower(token);
bool is_null = true;
for(size_t i=0;i<token.size();i++)
@@ -1936,7 +1939,7 @@ void push_trim_type::builder(s3select* self, const char* a, const char* b) const
{
std::string token(a, b);
- auto trim_option = [&](const char *s){return strncmp(a,s,strlen(s))==0;};
+ auto trim_option = [&](const char *s){return strncasecmp(a,s,strlen(s))==0;};
if(trim_option("leading"))
{
@@ -2156,10 +2159,12 @@ struct s3select_csv_definitions //TODO
bool quote_fields_asneeded;
bool redundant_column;
bool comment_empty_lines;
+ bool output_json_format;
std::vector<char> comment_chars;
std::vector<char> trim_chars;
+ std::string schema;
- s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false) {}
+ s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false), output_json_format(false) {}
};
@@ -2172,6 +2177,8 @@ protected:
scratch_area* m_sa;
std::string m_obj_name;
bool m_aggr_flow = false; //TODO once per query
+ bool is_star = false;
+ bool is_json = false;
bool m_is_to_aggregate;
std::vector<base_statement*> m_projections;
base_statement* m_where_clause;
@@ -2182,9 +2189,11 @@ protected:
unsigned long m_processed_rows;
size_t m_returned_bytes_size;
std::function<void(const char*)> fp_ext_debug_mesg;//dispache debug message into external system
+ std::vector<std::string> m_projection_keys{};
public:
s3select_csv_definitions m_csv_defintion;//TODO add method for modify
+ std::string m_error_description;
enum class Status {
END_OF_STREAM,
@@ -2196,6 +2205,16 @@ public:
Status m_sql_processing_status;
+ void set_processing_time_error()
+ {
+ m_sql_processing_status = Status::SQL_ERROR;
+ }
+
+ bool is_processing_time_error()
+ {
+ return m_sql_processing_status == Status::SQL_ERROR;
+ }
+
Status get_sql_processing_status()
{
return m_sql_processing_status;
@@ -2206,6 +2225,60 @@ public:
return m_sql_processing_status == Status::LIMIT_REACHED;
}
+ void set_star_true() {
+ is_star = true;
+ }
+
+ void set_projection_keys(std::vector<base_statement*> m_projections)
+ {
+ std::vector<std::string> alias_values{};
+ std::unordered_set<base_statement*> alias_projection_keys{};
+ bool is_output_json_format = m_csv_defintion.output_json_format;
+
+ for (auto& a : *m_s3_select->get_aliases()->get())
+ {
+ alias_values.push_back(a.first);
+ alias_projection_keys.insert(a.second);
+ }
+
+ size_t m_alias_index = 0;
+ int index_json_projection = 0;
+ is_json = m_s3_select->is_json_query();
+
+ for (auto& p : m_projections)
+ {
+ if(p->is_statement_contain_star_operation())
+ {
+ set_star_true();
+ }
+ p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
+
+ std::string key_from_projection{};
+ if(p->is_column()){
+ key_from_projection = p->get_key_from_projection();
+ }
+
+ if(alias_projection_keys.count(p) == 0 && p->is_column()) {
+ m_projection_keys.push_back(key_from_projection);
+ } else if(alias_projection_keys.count(p) > 0 && p->is_column()) {
+ m_projection_keys.push_back(alias_values[m_alias_index++]);
+ } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) > 0 ) {
+ m_projection_keys.push_back(alias_values[m_alias_index++]);
+ } else if(!p->is_column() && is_output_json_format && alias_projection_keys.count(p) == 0) {
+ std::string index_json = "_" + std::to_string(++index_json_projection);
+ m_projection_keys.push_back(index_json);
+ }
+ }
+
+ if(m_s3_select->is_json_query()) {
+ for(auto& k: m_projection_keys) {
+ size_t lastDotPosition = k.find_last_of('.');
+ std::string extractedPart = k.substr(lastDotPosition + 1);
+ k = extractedPart;
+ }
+ }
+ }
+
void set_base_defintions(s3select* m)
{
if(m_s3_select || !m)
@@ -2225,10 +2298,8 @@ public:
m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
}
- for (auto& p : m_projections)
- {
- p->traverse_and_apply(m_sa, m_s3_select->get_aliases(), m_s3_select->is_json_query());
- }
+ set_projection_keys(m_projections);
+
m_is_to_aggregate = true;//TODO not correct. should be set upon end-of-stream
m_aggr_flow = m_s3_select->is_aggregate_query();
@@ -2269,45 +2340,95 @@ public:
return m_returned_bytes_size;
}
- void result_values_to_string(multi_values& projections_resuls, std::string& result)
+ void json_result_format(multi_values& projections_results, std::string& result, std::string& output_delimiter)
{
- size_t i = 0;
+ result += "{";
+ int j = 0;
+ for (size_t i = 0; i < projections_results.values.size(); ++i)
+ {
+ auto& res = projections_results.values[i];
+ std::string label = "_";
+ label += std::to_string(i + 1);
+
+ if (i > 0) {
+ result += output_delimiter;
+ }
+
+ if(!is_star) {
+ result += "\"" + m_projection_keys[j] + "\":";
+ } else if(is_star && !is_json) {
+ result += "\"" + label + "\":";
+ }
+
+ result.append(res->to_string());
+ m_returned_bytes_size += strlen(res->to_string());
+ ++j;
+ }
+ result += "}";
+
+ }
+
+
+ void result_values_to_string(multi_values& projections_resuls, std::string& result)
+{
std::string output_delimiter(1,m_csv_defintion.output_column_delimiter);
std::string output_row_delimiter(1,m_csv_defintion.output_row_delimiter);
+ if(m_csv_defintion.output_json_format && projections_resuls.values.size()) {
+ json_result_format(projections_resuls, result, output_delimiter);
+ result.append(output_row_delimiter);
+ return;
+ }
+
+ size_t i = 0;
for(auto& res : projections_resuls.values)
{
+
+ std::string column_result;
+
+ try{
+ column_result = res->to_string();
+ }
+ catch(std::exception& e)
+ {
+ column_result = "{failed to compute projection: " + std::string(e.what()) + "}";
+ m_error_description = column_result;
+ set_processing_time_error();
+ }
+
+
if(fp_ext_debug_mesg)
- fp_ext_debug_mesg( res->to_string() );
+ fp_ext_debug_mesg(column_result.data());
if (m_csv_defintion.quote_fields_always) {
std::ostringstream quoted_result;
- quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
+ quoted_result << std::quoted(column_result,m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
result.append(quoted_result.str());
+
m_returned_bytes_size += quoted_result.str().size();
- }//TODO to add asneeded
+ }//TODO to add asneeded
else
{
- result.append(res->to_string());
- m_returned_bytes_size += strlen(res->to_string());
+ result.append(column_result);
+ m_returned_bytes_size += column_result.size();
+
}
- if(!m_csv_defintion.redundant_column) {
- if(++i < projections_resuls.values.size()) {
- result.append(output_delimiter);
- m_returned_bytes_size += output_delimiter.size();
- }
- }
- else {
- result.append(output_delimiter);
- m_returned_bytes_size += output_delimiter.size();
- }
- }
- if(!m_aggr_flow){
- result.append(output_row_delimiter);
+ if(!m_csv_defintion.redundant_column) {
+ if(++i < projections_resuls.values.size()) {
+ result.append(output_delimiter);
+ m_returned_bytes_size += output_delimiter.size();
+ }
+ } else {
+ result.append(output_delimiter);
m_returned_bytes_size += output_delimiter.size();
+ }
}
- }
+ if(!m_aggr_flow) {
+ result.append(output_row_delimiter);
+ m_returned_bytes_size += output_delimiter.size();
+ }
+}
Status getMatchRow( std::string& result)
{
@@ -2336,7 +2457,7 @@ public:
}
result_values_to_string(projections_resuls,result);
- return m_sql_processing_status = Status::END_OF_STREAM;
+ return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::END_OF_STREAM);
}
m_processed_rows++;
@@ -2368,9 +2489,9 @@ public:
i->set_last_call();
i->set_skip_non_aggregate(false);//projection column is set to be runnable
projections_resuls.push_value( &(i->eval()) );
- }
+ }
result_values_to_string(projections_resuls,result);
- return m_sql_processing_status = Status::LIMIT_REACHED;
+ return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::LIMIT_REACHED);
}
}
while (multiple_row_processing());
@@ -2394,6 +2515,7 @@ public:
{
a.second->invalidate_cache_result();
}
+
}
while (multiple_row_processing() && m_where_clause && !(where_clause_result = m_where_clause->eval().is_true()) && !(m_is_limit_on && m_processed_rows == m_limit));
@@ -2421,12 +2543,18 @@ public:
for (auto& i : m_projections)
{
projections_resuls.push_value( &(i->eval()) );
- }
- result_values_to_string(projections_resuls,result);
+ }
+ result_values_to_string(projections_resuls,result);
+ if(m_sql_processing_status == Status::SQL_ERROR)
+ {
+ return m_sql_processing_status;
+ }
}
-
}
- return is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT);
+
+ return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) :
+ (is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT));
+
}//getMatchRow
@@ -2477,14 +2605,12 @@ public:
{
//return;
}
-
- set_base_defintions(s3_query);
m_csv_defintion = csv;
+ set_base_defintions(s3_query);
}
private:
bool m_skip_last_line;
- std::string m_error_description;
char* m_stream;
char* m_end_stream;
std::vector<char*> m_row_tokens;
@@ -2604,26 +2730,36 @@ public:
m_error_description = "escaped_char_missing failure while csv parsing";
return -1;
}
- catch(io::error::escaped_string_not_closed& err)
+ catch(io::error::escaped_string_not_closed& err)
{
m_error_description = "escaped_string_not_closed failure while csv parsing";
return -1;
}
- catch(io::error::line_length_limit_exceeded& err)
+ catch(io::error::line_length_limit_exceeded& err)
{
m_error_description = "line_length_limit_exceeded failure while csv parsing";
return -1;
}
- catch(io::error::with_file_name& err)
+ catch(io::error::missmatch_of_begin_end& err)
{
- m_error_description = "with_file_name failure while csv parsing";
+ m_error_description = "missmatch_of_begin_end failure while csv parsing" + std::string(err.what());
return -1;
}
- catch(io::error::with_file_line& err)
+ catch(io::error::missmatch_end& err)
{
- m_error_description = "with_file_line failure while csv parsing";
+ m_error_description = "missmatch_end failure while csv parsing" + std::string(err.what());
return -1;
}
+ catch(io::error::with_file_name& err)
+ {
+ m_error_description = "with_file_name failure while csv parsing";
+ return -1;
+ }
+ catch(std::exception& e)
+ {
+ m_error_description = "error while processing CSV object : " + std::string(e.what());
+ return -1;
+ }
return status;
}
@@ -2634,7 +2770,7 @@ private:
//purpose: the CSV data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores
//for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed.
std::string tmp_buff;
-
+ int status = 0;
m_processed_bytes += stream_length;
m_skip_first_line = false;
@@ -2648,6 +2784,22 @@ private:
p_obj_chunk++;
}
+ if(*p_obj_chunk != m_csv_defintion.row_delimiter)
+ {// previous row can not be completed with current chunk
+ if(fp_ext_debug_mesg)
+ {
+ std::string err_mesg = "** the stream chunk is too small for processing(saved for later) **";
+ fp_ext_debug_mesg(err_mesg.c_str());
+ }
+ //copy the part to be processed later
+ tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
+ //saved for later processing
+ m_last_line.append(tmp_buff);
+ m_previous_line = true;//it means to skip last line
+ //skip processing since the row tail is missing.
+ return 0;
+ }
+
tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
m_previous_line = false;
@@ -2655,7 +2807,7 @@ private:
m_skip_x_first_bytes = tmp_buff.size()+1;
//processing the merged row (previous broken row)
- run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
+ status = run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
}
if (stream_length && csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter)
@@ -2676,7 +2828,8 @@ private:
stream_length -= (m_last_line.length());
}
- return run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
+ status = run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
+ return status;
}
public:
@@ -2696,6 +2849,11 @@ public:
m_skip_x_first_bytes=0;
}
+ if(m_stream>m_end_stream)
+ {
+ throw base_s3select_exception(std::string("** m_stream > m_end_stream **") +
+ std::to_string( (m_stream - m_end_stream) ) ,base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
CSVParser _csv_parser("csv", m_stream, m_end_stream);
csv_parser = &_csv_parser;
csv_parser->set_csv_def( m_csv_defintion.row_delimiter,
@@ -2745,6 +2903,10 @@ public:
{
break;//user should request for sql_processing_status
}
+ if(m_sql_processing_status == Status::SQL_ERROR)
+ {
+ return -1;
+ }
} while (true);
@@ -2764,7 +2926,6 @@ class parquet_object : public base_s3object
{
private:
- std::string m_error_description;
parquet_file_parser* object_reader;
parquet_file_parser::column_pos_t m_where_clause_columns;
parquet_file_parser::column_pos_t m_projections_columns;
@@ -2948,11 +3109,13 @@ private:
std::string* m_s3select_result = nullptr;
size_t m_row_count;
bool star_operation_ind;
- std::string m_error_description;
bool m_init_json_processor_ind;
public:
+ class csv_definitions : public s3select_csv_definitions
+ {};
+
void init_json_processor(s3select* query)
{
if(m_init_json_processor_ind)
@@ -2997,6 +3160,7 @@ public:
if(p->is_statement_contain_star_operation())
{
star_operation_ind=true;
+ set_star_true();
break;
}
}
@@ -3095,7 +3259,7 @@ private:
public:
- int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size)
+ int run_s3select_on_stream(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size, bool json_format = false)
{
int status=0;
m_processed_bytes += stream_length;
@@ -3134,8 +3298,9 @@ public:
return status;
}
- void set_json_query(s3select* s3_query)
+ void set_json_query(s3select* s3_query, csv_definitions csv)
{
+ m_csv_defintion = csv;
set_base_defintions(s3_query);
init_json_processor(s3_query);
}