diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/s3select/include/s3select_json_parser.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/s3select/include/s3select_json_parser.h')
-rw-r--r-- | src/s3select/include/s3select_json_parser.h | 829 |
1 files changed, 829 insertions, 0 deletions
diff --git a/src/s3select/include/s3select_json_parser.h b/src/s3select/include/s3select_json_parser.h new file mode 100644 index 000000000..aa06163f5 --- /dev/null +++ b/src/s3select/include/s3select_json_parser.h @@ -0,0 +1,829 @@ +#ifndef S3SELECT_JSON_PARSER_H +#define S3SELECT_JSON_PARSER_H + +//TODO add __FILE__ __LINE__ message +#define RAPIDJSON_ASSERT(x) s3select_json_parse_error(x) +bool s3select_json_parse_error(bool b); +bool s3select_json_parse_error(const char* error); + +#include "rapidjson/reader.h" +#include "rapidjson/writer.h" +#include "rapidjson/filereadstream.h" +#include "rapidjson/filewritestream.h" +#include "rapidjson/error/en.h" +#include "rapidjson/document.h" +#include <cassert> +#include <sstream> +#include <vector> +#include <iostream> +#include <functional> +#include <boost/spirit/include/classic_core.hpp> +#include <boost/algorithm/string/predicate.hpp> +#include "s3select_oper.h"//class value +#include <boost/algorithm/string/predicate.hpp> + +#define JSON_PROCESSING_LIMIT_REACHED 2 + +//TODO missing s3selectEngine namespace + +bool s3select_json_parse_error(bool b) +{ + if(!b) + { + const char* error_str = "failure while processing JSON document"; + throw s3selectEngine::base_s3select_exception(error_str, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); + } + return false; +} + +bool s3select_json_parse_error(const char* error) +{ + if(!error) + { + const char* error_str = "failure while processing JSON document"; + throw s3selectEngine::base_s3select_exception(error_str, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); + } + return false; +} + +static auto iequal_predicate = [](std::string& it1, std::string& it2) + { + return boost::iequals(it1,it2); + }; + + +class ChunksStreamer : public rapidjson::MemoryStream { + + //purpose: adding a method `resetBuffer` that enables to parse chunk after chunk + //per each new chunk it reset internal data members + public: + + std::string internal_buffer; + const Ch* next_src_; + size_t next_size_; + + ChunksStreamer():rapidjson::MemoryStream(0,0){next_src_=0;next_size_=0;} + + ChunksStreamer(const Ch *src, size_t size) : rapidjson::MemoryStream(src,size){next_src_=0;next_size_=0;} + + //override Peek methode + Ch Peek() //const + { + if(RAPIDJSON_UNLIKELY(src_ == end_)) + { + if(next_src_)//next chunk exist + {//upon reaching to end of current buffer, to switch with next one + src_ = next_src_; + begin_ = src_; + size_ =next_size_; + end_ = src_ + size_; + + next_src_ = 0; + next_size_ = 0; + return *src_; + } + else return 0; + } + return *src_; + } + + //override Take method + Ch Take() + { + if(RAPIDJSON_UNLIKELY(src_ == end_)) + { + if(next_src_)//next chunk exist + {//upon reaching to end of current buffer, to switch with next one + src_ = next_src_; + begin_ = src_; + size_ = next_size_; + end_ = src_ + size_; + + next_src_ = 0; + next_size_ = 0; + return *src_; + } + else return 0; + } + return *src_++; + } + + void resetBuffer(char* buff, size_t size) + { + if(!src_) + {//first time calling + begin_ = buff; + src_ = buff; + size_ = size; + end_= src_ + size_; + return; + } + + if(!next_src_) + {//save the next-chunk that will be used upon parser reaches end of current buffer + next_src_ = buff; + next_size_ = size; + } + else + {// should not happen + std::cout << "can not replace pointers!!!" << std::endl;//TODO exception + return; + } + } + + void saveRemainingBytes() + {//this routine called per each new chunk + //save the remaining bytes, before its overriden by the next-chunk. + size_t copy_left_sz = getBytesLeft(); //should be very small + internal_buffer.assign(src_,copy_left_sz); + + src_ = internal_buffer.data(); + begin_ = src_; + size_ = copy_left_sz; + end_= src_ + copy_left_sz; + } + + size_t getBytesLeft() { return end_ - src_; } + +}; + +enum class row_state +{ + NA, + OBJECT_START_ROW, + ARRAY_START_ROW +}; + +class json_variable_access { +//purpose: a state-machine for json-variables. +//upon the syntax-parser accepts a variable (projection / where-clause) it create this object. +//this object get events (key,start-array ... etc) as the JSON reader scans the input, +//these events are advancing the states until it reaches to the last one, result with pushing value into scratch-area. + +private: + +// to set the following. +std::vector<std::string>* from_clause; +std::vector<std::string>* key_path; +//m_current_depth : trace the depth of the reader, including "anonymous"(meaning JSON may begin with array that has no name attached to it) +int* m_current_depth; +//m_current_depth_non_anonymous : trace the depth of the reader, NOT including "anonymous" array/object. +//upon user request the following _1.a[12].b, the key-name{a} may reside on some array with no-name, +//the state machine that search for a[12].b, does NOT contain states for that "anonymous" array, +//thus, the state-machine will fail to trace the user request for that specific key.path +int* m_current_depth_non_anonymous; +std::function <int(s3selectEngine::value&,int)>* m_exact_match_cb; +// a state number : (_1).a.b.c[ 17 ].d.e (a.b)=1 (c[)=2 (17)=3 (.d.e)=4 +size_t current_state;//contain the current state of the state machine for searching-expression (each JSON variable in SQL statement has a searching expression) +int nested_array_level;//in the case of array within array it contain the nesting level +int m_json_index; +s3selectEngine::value v_null; +size_t m_from_clause_size; + +struct variable_state_md { + std::vector<std::string> required_path;//set by the syntax-parser. in the case of array its empty + int required_array_entry_no;//set by the syntax-parser, in the case of object-key its -1. + int actual_array_entry_no;//upon scanning the JSON input, this value increased by 1 each new element + int required_depth_size;// depth of state, is aggregated (include the previous). it's the summary of key-elements and array-operator's. + int required_key_depth_size;// same as the above, not including the array-operators. + int last_array_start;//it actually mark the nested-array-level (array within array) +}; + +std::vector<struct variable_state_md> variable_states;//vector is populated upon syntax phase. + +public: + +json_variable_access():from_clause(nullptr),key_path(nullptr),m_current_depth(nullptr),m_current_depth_non_anonymous(nullptr),m_exact_match_cb(nullptr),current_state(-1),nested_array_level(0),m_json_index(-1),v_null(nullptr),m_from_clause_size(0) +{} + +void init( + std::vector<std::string>* reader_from_clause, + std::vector<std::string>* reader_key_path, + int* reader_current_depth, + int* reader_m_current_depth_non_anonymous, + std::function <int(s3selectEngine::value&,int)>* excat_match_cb, + int json_index) +{//this routine should be called before scanning the JSON input + from_clause = reader_from_clause; + key_path = reader_key_path; + m_exact_match_cb = excat_match_cb; + //m_current_depth and m_current_depth_non_anonymous points to the JSON reader variables. + m_current_depth = reader_current_depth; + m_current_depth_non_anonymous = reader_m_current_depth_non_anonymous; + current_state = 0; + m_json_index = json_index; + m_from_clause_size = from_clause->size(); + + //loop on variable_states compute required_depth_size +} + +void clear() +{ + variable_states.clear(); +} + +void debug_info() +{ + auto f = [](std::vector<std::string> x){std::string res;for(auto i : x){res.append(i);res.append(".");};return res;}; + + std::cout << "m_current_depth=" << *m_current_depth << " required_depth_size= " << reader_position_state().required_depth_size << " "; + std::cout << "variable_states[ current_state ].last_array_start=" << reader_position_state().last_array_start; + std::cout << " current_state=" << current_state << " key_path=" << f(*key_path) << std::endl; +} +#define DBG {std::cout << "event=" << __FUNCTION__ << std::endl; debug_info();} +#undef DBG +#define DBG + +void compile_state_machine() +{ + size_t aggregated_required_depth_size = 0; + size_t aggregated_required_key_depth_size = 0; + for(auto& v : variable_states) + { + if(v.required_path.size()) + { + v.required_depth_size = aggregated_required_depth_size + v.required_path.size();//depth size in general, including array + v.required_key_depth_size = aggregated_required_key_depth_size;//depth include ONLY key parts + aggregated_required_key_depth_size += v.required_path.size(); + } + else + { + v.required_depth_size = aggregated_required_depth_size + 1; + } + aggregated_required_depth_size = v.required_depth_size; + } +} + +void push_variable_state(std::vector<std::string>& required_path,int required_array_entry_no) +{ + struct variable_state_md new_state={required_path,required_array_entry_no,-1,0,0,-1}; + variable_states.push_back(new_state); + //TODO required_path.size() > 0 or required_path,required_array_entry_no>=0 : not both + compile_state_machine(); +} + +struct variable_state_md& reader_position_state() +{ + if (current_state>=variable_states.size()) + {//in case the state-machine reached a "dead-end", should push a null for that JSON variable + //going back one state. + (*m_exact_match_cb)(v_null,m_json_index); + decrease_current_state(); + } + + return variable_states[ current_state ]; +} + +bool is_array_state() +{ + return (reader_position_state().required_array_entry_no >= 0); +} + +bool is_reader_located_on_required_depth() +{ + //upon user request `select _1.a.b from s3object[*].c.d;` the c.d sould "cut off" from m_current_depth_non_anonymous + //to get the correct depth of the state-machine + return ((*m_current_depth_non_anonymous - static_cast<int>(m_from_clause_size)) == reader_position_state().required_depth_size); +} + +bool is_on_final_state() +{ + return (current_state == (variable_states.size())); + //&& *m_current_depth == variable_states[ current_state -1 ].required_depth_size); + + // NOTE: by ignoring the current-depth, the matcher gives precedence to key-path match, while not ignoring accessing using array + // meaning, upon requeting a.b[12] , the [12] is not ignored, the a<-->b distance should be calculated as key distance, i.e. not counting array/object with *no keys*. + // user may request 'select _1.phonearray.num'; the reader will traverse `num` exist in `phonearray` +} + +bool is_reader_passed_required_array_entry() +{ + return (reader_position_state().actual_array_entry_no > reader_position_state().required_array_entry_no); +} + +bool is_reader_located_on_array_according_to_current_state() +{ + return (nested_array_level == reader_position_state().last_array_start); +} + +bool is_reader_position_depth_lower_than_required() +{ + //upon user request `select _1.a.b from s3object[*].c.d;` the c.d sould "cut off" from m_current_depth_non_anonymous + //to have the correct depth of the state-machine + return ((*m_current_depth_non_anonymous - static_cast<int>(m_from_clause_size)) < reader_position_state().required_depth_size); +} + +bool is_reader_located_on_array_entry_according_to_current_state() +{ + return (reader_position_state().actual_array_entry_no == reader_position_state().required_array_entry_no); +} + +void increase_current_state() +{ + DBG + + if(current_state >= variable_states.size()) return; + current_state ++; +} + +void decrease_current_state() +{ + DBG + + if(current_state == 0) return; + current_state --; +} + +void key() +{ + DBG + + if(reader_position_state().required_path.size())//current state is a key + { + std::vector<std::string>* filter = &reader_position_state().required_path; + auto required_key_depth_size = reader_position_state().required_key_depth_size; + if(std::equal((*key_path).begin()+(*from_clause).size() + required_key_depth_size, //key-path-start-point + from-clause-depth-size + key-depth + (*key_path).end(), + (*filter).begin(), + (*filter).end(), iequal_predicate)) + { + increase_current_state();//key match according to user request, advancing to the next state + } + } +} + +void increase_array_index() +{ + if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary? what about nesting level + { + DBG + reader_position_state().actual_array_entry_no++; + } +} + +void dec_key() +{ + DBG + + if(is_reader_position_depth_lower_than_required()) + {//actual key-path is shorter than required + decrease_current_state(); + return; + } + + if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary?; json_element_state.back() != ARRAY_STATE) + {//key-path-depth matches, and it an array + if(is_reader_located_on_array_entry_according_to_current_state()) + {//we reached the required array entry + increase_current_state(); + } + else if(is_reader_passed_required_array_entry()) + {//had passed the array entry + decrease_current_state(); + } + } +} + +void new_value(s3selectEngine::value& v) +{ + DBG + + if(is_on_final_state()) + { + (*m_exact_match_cb)(v, m_json_index); + decrease_current_state();//the state-machine reached its final destination, "going back" one state, upon another match condition the matched value will override the last one + } + increase_array_index();//next-value in array +} + +void end_object() +{ + increase_array_index(); +} + +void end_array() +{ + //init the correct array index + DBG + + if(is_reader_located_on_array_according_to_current_state()) + {//it reached end of required array + reader_position_state().actual_array_entry_no = 0; + decrease_current_state(); + } + nested_array_level --; + + // option 1. move out of one array, and enter a new one; option-2. enter an object + increase_array_index();//increase only upon correct array //TODO move it into dec_key()? + dec_key(); +} + +void start_array() +{ + DBG + + nested_array_level++; + if(is_reader_located_on_required_depth()) + {//reader entered an array required by JSON variable + reader_position_state().actual_array_entry_no = 0; + reader_position_state().last_array_start = nested_array_level; + + if(is_reader_located_on_array_entry_according_to_current_state()) + {//we reached the required array entry -> next state + increase_current_state(); + } + } +} + +}; //json_variable_access + +class json_variables_operations { + + public: + + std::vector<std::pair<json_variable_access*,size_t>> json_statement_variables{}; + + void init(std::vector<std::pair<json_variable_access*,size_t>>& jsv, //TODO init upon construction? + std::vector <std::string>* from_clause, + std::vector<std::string>* key_path, + int* current_depth, + int* current_depth_non_anonymous, + std::function <int(s3selectEngine::value&,int)>* exact_match_cb) + { + json_statement_variables = jsv; + int i=0;//the index per JSON variable + for(auto& var : json_statement_variables) + { + var.first->init(from_clause, + key_path, + current_depth, + current_depth_non_anonymous, + exact_match_cb,i++); + } + } + + void start_array() + { + for(auto& j : json_statement_variables) + { + j.first->start_array(); + } + } + void end_array() + { + for(auto& j : json_statement_variables) + { + j.first->end_array(); + } + } + void dec_key() + { + for(auto& j : json_statement_variables) + { + j.first->dec_key(); + } + } + void end_object() + { + for(auto& j : json_statement_variables) + { + j.first->end_object(); + } + } + void key() + { + for(auto& j : json_statement_variables) + { + j.first->key(); + } + } + void new_value(s3selectEngine::value& v) + { + for(auto& j : json_statement_variables) + { + j.first->new_value(v); + } + } +};//json_variables_operations + +class JsonParserHandler : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, JsonParserHandler> { + + public: + + typedef enum {OBJECT_STATE,ARRAY_STATE} en_json_elm_state_t; + typedef std::pair<std::vector<std::string>, s3selectEngine::value> json_key_value_t; + + row_state state = row_state::NA; + std::function <int(s3selectEngine::value&,int)> m_exact_match_cb; + std::function <int(s3selectEngine::scratch_area::json_key_value_t&)> m_star_operation_cb; + + json_variables_operations variable_match_operations; + int row_count{}; + std::vector <std::string> from_clause{}; + bool prefix_match{}; + s3selectEngine::value var_value; + ChunksStreamer stream_buffer; + bool init_buffer_stream; + rapidjson::Reader reader; + std::vector<en_json_elm_state_t> json_element_state; + std::vector<std::string> key_path; + std::function<int(void)> m_s3select_processing; + int m_start_row_depth; + int m_current_depth; + int m_current_depth_non_anonymous; + bool m_star_operation; + int m_sql_processing_status; + bool m_fatal_initialization_ind = false; + std::string m_fatal_initialization_description; + + JsonParserHandler() : prefix_match(false),init_buffer_stream(false),m_start_row_depth(-1),m_current_depth(0),m_current_depth_non_anonymous(0),m_star_operation(false),m_sql_processing_status(0) + { + } + + std::string get_key_path() + {//for debug + std::string res; + for(const auto & i: key_path) + { + res.append(i); + res.append(std::string("/")); + } + return res; + } + + void dec_key_path() + { + if (json_element_state.size()) { + if(json_element_state.back() != ARRAY_STATE) { + if(key_path.size() != 0) { + key_path.pop_back(); + } + } + } + + variable_match_operations.dec_key(); + + //TODO m_current_depth-- should done here + if(m_start_row_depth > m_current_depth) + { + prefix_match = false; + } else + if (prefix_match) { + if (state == row_state::ARRAY_START_ROW && m_start_row_depth == m_current_depth) { + m_sql_processing_status = m_s3select_processing(); //per each element in array + ++row_count; + } + } + } + + void push_new_key_value(s3selectEngine::value& v) + { + if (m_star_operation && prefix_match) + { + json_key_value_t key_value(key_path,v); + m_star_operation_cb(key_value); + } + if (prefix_match) + variable_match_operations.new_value(v); + + dec_key_path(); + } + + bool Null() { + var_value.setnull(); + push_new_key_value(var_value); + return true; } + + bool Bool(bool b) { + var_value = b; + push_new_key_value(var_value); + return true; } + + bool Int(int i) { + var_value = i; + push_new_key_value(var_value); + return true; } + + bool Uint(unsigned u) { + var_value = u; + push_new_key_value(var_value); + return true; } + + bool Int64(int64_t i) { + var_value = i; + push_new_key_value(var_value); + return true; } + + bool Uint64(uint64_t u) { + var_value = u; + push_new_key_value(var_value); + return true; } + + bool Double(double d) { + var_value = d; + push_new_key_value(var_value); + return true; } + + bool String(const char* str, rapidjson::SizeType length, bool copy) { + //TODO use copy + var_value = str; + push_new_key_value(var_value); + return true; + } + + bool Key(const char* str, rapidjson::SizeType length, bool copy) { + key_path.push_back(std::string(str)); + + if(!m_current_depth_non_anonymous){ + //important: upon a key and m_current_depth_non_anonymous is ZERO + //it should advance by 1. to get the correct current depth(for non anonymous counter). + m_current_depth_non_anonymous++; + } + + if(from_clause.size() == 0 || std::equal(key_path.begin(), key_path.end(), from_clause.begin(), from_clause.end(), iequal_predicate)) { + prefix_match = true; + } + + variable_match_operations.key(); + + return true; + } + + bool is_already_row_started() + { + if(state == row_state::OBJECT_START_ROW || state == row_state::ARRAY_START_ROW) + return true; + else + return false; + } + + bool StartObject() { + json_element_state.push_back(OBJECT_STATE); + m_current_depth++; + if(key_path.size()){ + //advancing the counter only upon there is a key. + m_current_depth_non_anonymous++; + } + + if (prefix_match && !is_already_row_started()) { + state = row_state::OBJECT_START_ROW; + m_start_row_depth = m_current_depth; + ++row_count; + } + + return true; + } + + bool EndObject(rapidjson::SizeType memberCount) { + json_element_state.pop_back(); + m_current_depth --; + m_current_depth_non_anonymous --; + + variable_match_operations.end_object(); + + dec_key_path(); + if (state == row_state::OBJECT_START_ROW && (m_start_row_depth > m_current_depth)) { + m_sql_processing_status = m_s3select_processing(); + state = row_state::NA; + } + return true; + } + + bool StartArray() { + json_element_state.push_back(ARRAY_STATE); + m_current_depth++; + if(key_path.size()){ + //advancing the counter only upon there is a key. + m_current_depth_non_anonymous++; + } + + if (prefix_match && !is_already_row_started()) { + state = row_state::ARRAY_START_ROW; + m_start_row_depth = m_current_depth; + } + + variable_match_operations.start_array(); + + return true; + } + + bool EndArray(rapidjson::SizeType elementCount) { + json_element_state.pop_back(); + m_current_depth--; + m_current_depth_non_anonymous--; + + dec_key_path(); + + if (state == row_state::ARRAY_START_ROW && (m_start_row_depth > m_current_depth)) { + state = row_state::NA; + } + + variable_match_operations.end_array(); + + return true; + } + + void set_prefix_match(std::vector<std::string>& requested_prefix_match) + {//purpose: set the filter according to SQL statement(from clause) + from_clause = requested_prefix_match; + if(from_clause.size() ==0) + { + prefix_match = true; + m_start_row_depth = m_current_depth; + } + } + + void set_statement_json_variables(std::vector<std::pair<json_variable_access*,size_t>>& statement_variables) + {//purpose: set the json variables extracted from the SQL statement(projection columns, predicates columns) + variable_match_operations.init( + statement_variables, + &from_clause, + &key_path, + &m_current_depth, + &m_current_depth_non_anonymous, + &m_exact_match_cb); + } + + void set_exact_match_callback(std::function<int(s3selectEngine::value&, int)> f) + {//purpose: upon key is matching one of the exact filters, the callback is called. + m_exact_match_cb = f; + } + + void set_s3select_processing_callback(std::function<int(void)>& f) + {//purpose: execute s3select statement on matching row (according to filters) + m_s3select_processing = f; + } + + void set_push_per_star_operation_callback( std::function <int(s3selectEngine::scratch_area::json_key_value_t&)> cb) + { + m_star_operation_cb = cb; + } + + void set_star_operation() + { + m_star_operation = true; + } + + bool is_fatal_initialization() + { + return m_fatal_initialization_ind; + } + + int process_json_buffer(char* json_buffer,size_t json_buffer_sz, bool end_of_stream=false) + {//user keeps calling with buffers, the method is not aware of the object size. + + + try{ + if(!init_buffer_stream) + { + //set the memoryStreamer + reader.IterativeParseInit(); + init_buffer_stream = true; + + } + + //the non-processed bytes plus the next chunk are copy into main processing buffer + if(!end_of_stream) + stream_buffer.resetBuffer(json_buffer, json_buffer_sz); + + while (!reader.IterativeParseComplete()) { + reader.IterativeParseNext<rapidjson::kParseDefaultFlags>(stream_buffer, *this); + + //once all key-values move into s3select(for further filtering and processing), it should be cleared + + //TODO in the case the chunk is too small or some value in input is too big, the parsing will fail. + if (!end_of_stream && stream_buffer.next_src_==0 && stream_buffer.getBytesLeft() < 2048) + {//the non processed bytes will be processed on next fetched chunk + //TODO save remaining-bytes to internal buffer (or caller will use 2 sets of buffer) + stream_buffer.saveRemainingBytes(); + return 0; + } + if(m_sql_processing_status == JSON_PROCESSING_LIMIT_REACHED)//return status(int) from callback + { + return JSON_PROCESSING_LIMIT_REACHED; + } + + // error message + if(reader.HasParseError()) { + rapidjson::ParseErrorCode c = reader.GetParseErrorCode(); + size_t ofs = reader.GetErrorOffset(); + std::stringstream error_str; + error_str << "parsing error. code:" << c << " position: " << ofs << std::endl; + throw s3selectEngine::base_s3select_exception(error_str.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); + return -1; + } + }//while reader.IterativeParseComplete + } + catch(std::exception &e){ + std::stringstream error_str; + error_str << "failed to process JSON : " << e.what() << std::endl; + throw s3selectEngine::base_s3select_exception(error_str.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); + return -1; + } + return 0; + } +}; + + +#endif + |