diff options
Diffstat (limited to 'storage/cassandra/cassandra_se.cc')
-rw-r--r-- | storage/cassandra/cassandra_se.cc | 863 |
1 files changed, 863 insertions, 0 deletions
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc new file mode 100644 index 00000000..0086059a --- /dev/null +++ b/storage/cassandra/cassandra_se.cc @@ -0,0 +1,863 @@ + +// Cassandra includes: +#include <inttypes.h> +#include <netinet/in.h> +#include <sys/time.h> +#include <stdio.h> +#include <stdarg.h> + +#include "thrift/Thrift.h" +#include "thrift/transport/TSocket.h" +#include "thrift/transport/TTransport.h" +#include "thrift/transport/TBufferTransports.h" +#include "thrift/protocol/TProtocol.h" +#include "thrift/protocol/TBinaryProtocol.h" +#include "gen-cpp/Cassandra.h" +// cassandra includes end + +#include "cassandra_se.h" + +struct st_mysql_lex_string +{ + char *str; + size_t length; +}; + +using namespace std; +using namespace apache::thrift; +using namespace apache::thrift::transport; +using namespace apache::thrift::protocol; +using namespace org::apache::cassandra; + + +/* + Implementation of connection to one Cassandra column family (ie., table) +*/ +class Cassandra_se_impl: public Cassandra_se_interface +{ + CassandraClient *cass; /* Connection to cassandra */ + + std::string column_family; + std::string keyspace; + + ConsistencyLevel::type write_consistency; + ConsistencyLevel::type read_consistency; + + /* Connection data */ + std::string host; + int port; + /* How many times to retry an operation before giving up */ + int thrift_call_retries_to_do; + + bool inside_try_operation; + + /* DDL data */ + KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */ + CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/ + std::vector<ColumnDef>::iterator column_ddl_it; + + /* The list that was returned by the last key lookup */ + std::vector<ColumnOrSuperColumn> column_data_vec; + std::vector<ColumnOrSuperColumn>::iterator column_data_it; + + /* Insert preparation */ + typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation; + typedef std::map<std::string, ColumnFamilyToMutation> KeyToCfMutationMap; + + KeyToCfMutationMap batch_mutation; /* Prepare operation here */ + int64_t insert_timestamp; + std::vector<Mutation>* insert_list; + + /* Resultset we're reading */ + std::vector<KeySlice> key_slice_vec; + std::vector<KeySlice>::iterator key_slice_it; + + std::string rowkey; /* key of the record we're returning now */ + + SlicePredicate slice_pred; + SliceRange slice_pred_sr; + bool get_slices_returned_less; + bool get_slice_found_rows; + + bool reconnect(); +public: + Cassandra_se_impl() : cass(NULL), + write_consistency(ConsistencyLevel::ONE), + read_consistency(ConsistencyLevel::ONE), + thrift_call_retries_to_do(1), + inside_try_operation(false) + {} + virtual ~Cassandra_se_impl(){ delete cass; } + + /* Connection and DDL checks */ + bool connect(const char *host_arg, int port_arg, const char *keyspace); + void set_column_family(const char *cfname) { column_family.assign(cfname); } + + bool setup_ddl_checks(); + void first_ddl_column(); + bool next_ddl_column(char **name, int *name_len, char **value, int *value_len); + void get_rowkey_type(char **name, char **type); + size_t get_ddl_size(); + const char* get_default_validator(); + + /* Settings */ + void set_consistency_levels(unsigned long read_cons_level, unsigned long write_cons_level); + virtual void set_n_retries(uint retries_arg) { + thrift_call_retries_to_do= retries_arg; + } + + /* Writes */ + void clear_insert_buffer(); + void start_row_insert(const char *key, int key_len); + void add_insert_column(const char *name, int name_len, + const char *value, int value_len); + void add_insert_delete_column(const char *name, int name_len); + void add_row_deletion(const char *key, int key_len, + Column_name_enumerator *col_names, + LEX_STRING *names, uint nnames); + + bool do_insert(); + + /* Reads, point lookups */ + bool get_slice(char *key, size_t key_len, bool *found); + bool get_next_read_column(char **name, int *name_len, + char **value, int *value_len ); + void get_read_rowkey(char **value, int *value_len); + + /* Reads, multi-row scans */ +private: + bool have_rowkey_to_skip; + std::string rowkey_to_skip; + + bool get_range_slices_param_last_key_as_start_key; +public: + bool get_range_slices(bool last_key_as_start_key); + void finish_reading_range_slices(); + bool get_next_range_slice_row(bool *eof); + + /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */ + void clear_read_columns(); + void clear_read_all_columns(); + void add_read_column(const char *name); + + /* Reads, MRR scans */ + void new_lookup_keys(); + int add_lookup_key(const char *key, size_t key_len); + bool multiget_slice(); + + bool get_next_multiget_row(); + + bool truncate(); + + bool remove_row(); + +private: + bool retryable_truncate(); + bool retryable_do_insert(); + bool retryable_remove_row(); + bool retryable_setup_ddl_checks(); + bool retryable_multiget_slice(); + bool retryable_get_range_slices(); + bool retryable_get_slice(); + + std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */ + std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result; + std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it; + + /* Non-inherited utility functions: */ + int64_t get_i64_timestamp(); + + typedef bool (Cassandra_se_impl::*retryable_func_t)(); + bool try_operation(retryable_func_t func); +}; + + +///////////////////////////////////////////////////////////////////////////// +// Connection and setup +///////////////////////////////////////////////////////////////////////////// +Cassandra_se_interface *create_cassandra_se() +{ + return new Cassandra_se_impl; +} + + +bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg) +{ + keyspace.assign(keyspace_arg); + host.assign(host_arg); + port= port_arg; + return reconnect(); +} + + +bool Cassandra_se_impl::reconnect() +{ + + delete cass; + cass= NULL; + + bool res= true; + try { + boost::shared_ptr<TTransport> socket = + boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port)); + boost::shared_ptr<TTransport> tr = + boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket)); + boost::shared_ptr<TProtocol> p = + boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr)); + + cass= new CassandraClient(p); + tr->open(); + cass->set_keyspace(keyspace.c_str()); + + res= false; // success + }catch(TTransportException te){ + print_error("%s [%d]", te.what(), te.getType()); + }catch(InvalidRequestException ire){ + print_error("%s [%s]", ire.what(), ire.why.c_str()); + }catch(NotFoundException nfe){ + print_error("%s", nfe.what()); + }catch(TException e){ + print_error("Thrift exception: %s", e.what()); + }catch (...) { + print_error("Unknown exception"); + } + + if (!res && setup_ddl_checks()) + res= true; + return res; +} + + +void Cassandra_se_impl::set_consistency_levels(unsigned long read_cons_level, + unsigned long write_cons_level) +{ + write_consistency= (ConsistencyLevel::type)(write_cons_level + 1); + read_consistency= (ConsistencyLevel::type)(read_cons_level + 1); +} + + +bool Cassandra_se_impl::retryable_setup_ddl_checks() +{ + try { + + cass->describe_keyspace(ks_def, keyspace); + + } catch (NotFoundException nfe) { + print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what()); + return true; + } + + std::vector<CfDef>::iterator it; + for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++) + { + cf_def= *it; + if (!cf_def.name.compare(column_family)) + return false; + } + + print_error("Column family %s not found in keyspace %s", + column_family.c_str(), + keyspace.c_str()); + return true; +} + +bool Cassandra_se_impl::setup_ddl_checks() +{ + return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks); +} + + +void Cassandra_se_impl::first_ddl_column() +{ + column_ddl_it= cf_def.column_metadata.begin(); +} + + +bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len, + char **type, int *type_len) +{ + if (column_ddl_it == cf_def.column_metadata.end()) + return true; + + *name= (char*)(*column_ddl_it).name.c_str(); + *name_len= (*column_ddl_it).name.length(); + + *type= (char*)(*column_ddl_it).validation_class.c_str(); + *type_len= (*column_ddl_it).validation_class.length(); + + column_ddl_it++; + return false; +} + + +void Cassandra_se_impl::get_rowkey_type(char **name, char **type) +{ + if (cf_def.__isset.key_validation_class) + *type= (char*)cf_def.key_validation_class.c_str(); + else + *type= NULL; + + if (cf_def.__isset.key_alias) + *name= (char*)cf_def.key_alias.c_str(); + else + *name= NULL; +} + +size_t Cassandra_se_impl::get_ddl_size() +{ + return cf_def.column_metadata.size(); +} + +const char* Cassandra_se_impl::get_default_validator() +{ + return cf_def.default_validation_class.c_str(); +} + + +///////////////////////////////////////////////////////////////////////////// +// Data writes +///////////////////////////////////////////////////////////////////////////// +int64_t Cassandra_se_impl::get_i64_timestamp() +{ + struct timeval td; + gettimeofday(&td, NULL); + int64_t ms = td.tv_sec; + ms = ms * 1000; + int64_t usec = td.tv_usec; + usec = usec / 1000; + ms += usec; + + return ms; +} + + +void Cassandra_se_impl::clear_insert_buffer() +{ + batch_mutation.clear(); +} + + +void Cassandra_se_impl::start_row_insert(const char *key, int key_len) +{ + std::string key_to_insert; + key_to_insert.assign(key, key_len); + batch_mutation[key_to_insert]= ColumnFamilyToMutation(); + ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert]; + + cf_mut[column_family]= std::vector<Mutation>(); + insert_list= &cf_mut[column_family]; + + insert_timestamp= get_i64_timestamp(); +} + + +void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, + Column_name_enumerator *col_names, + LEX_STRING *names, uint nnames) +{ + std::string key_to_delete; + key_to_delete.assign(key, key_len); + + batch_mutation[key_to_delete]= ColumnFamilyToMutation(); + ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete]; + + cf_mut[column_family]= std::vector<Mutation>(); + std::vector<Mutation> &mutation_list= cf_mut[column_family]; + + Mutation mut; + mut.__isset.deletion= true; + mut.deletion.__isset.timestamp= true; + mut.deletion.timestamp= get_i64_timestamp(); + mut.deletion.__isset.predicate= true; + + /* + Attempting to delete columns with SliceRange causes exception with message + "Deletion does not yet support SliceRange predicates". + + Delete all columns individually. + */ + SlicePredicate slice_pred; + slice_pred.__isset.column_names= true; + const char *col_name; + while ((col_name= col_names->get_next_name())) + slice_pred.column_names.push_back(std::string(col_name)); + for (uint i= 0; i < nnames; i++) + slice_pred.column_names.push_back(std::string(names[i].str, + names[i].length)); + + mut.deletion.predicate= slice_pred; + + mutation_list.push_back(mut); +} + + +void Cassandra_se_impl::add_insert_column(const char *name, + int name_len, + const char *value, + int value_len) +{ + Mutation mut; + mut.__isset.column_or_supercolumn= true; + mut.column_or_supercolumn.__isset.column= true; + + Column& col=mut.column_or_supercolumn.column; + if (name_len) + col.name.assign(name, name_len); + else + col.name.assign(name); + col.value.assign(value, value_len); + col.timestamp= insert_timestamp; + col.__isset.value= true; + col.__isset.timestamp= true; + insert_list->push_back(mut); +} + +void Cassandra_se_impl::add_insert_delete_column(const char *name, + int name_len) +{ + Mutation mut; + mut.__isset.deletion= true; + mut.deletion.__isset.timestamp= true; + mut.deletion.timestamp= insert_timestamp; + mut.deletion.__isset.predicate= true; + + SlicePredicate slice_pred; + slice_pred.__isset.column_names= true; + slice_pred.column_names.push_back(std::string(name, name_len)); + mut.deletion.predicate= slice_pred; + + insert_list->push_back(mut); +} + + +bool Cassandra_se_impl::retryable_do_insert() +{ + cass->batch_mutate(batch_mutation, write_consistency); + + cassandra_counters.row_inserts+= batch_mutation.size(); + cassandra_counters.row_insert_batches++; + + clear_insert_buffer(); + return 0; +} + + +bool Cassandra_se_impl::do_insert() +{ + /* + zero-size mutations are allowed by Cassandra's batch_mutate but lets not + do them (we may attempt to do it if there is a bulk insert that stores + exactly @@cassandra_insert_batch_size*n elements. + */ + if (batch_mutation.empty()) + return false; + + return try_operation(&Cassandra_se_impl::retryable_do_insert); +} + + +///////////////////////////////////////////////////////////////////////////// +// Reading data +///////////////////////////////////////////////////////////////////////////// + +/* + Make one key lookup. If the record is found, the result is stored locally and + the caller should iterate over it. +*/ + +bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) +{ + bool res; + rowkey.assign(key, key_len); + + if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice))) + *found= get_slice_found_rows; + return res; +} + + +bool Cassandra_se_impl::retryable_get_slice() +{ + ColumnParent cparent; + cparent.column_family= column_family; + + SlicePredicate slice_pred; + SliceRange sr; + sr.start = ""; + sr.finish = ""; + slice_pred.__set_slice_range(sr); + + cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, + read_consistency); + + if (column_data_vec.size() == 0) + { + /* + No columns found. Cassandra doesn't allow records without any column => + this means the seach key doesn't exist + */ + get_slice_found_rows= false; + return false; + } + get_slice_found_rows= true; + + column_data_it= column_data_vec.begin(); + return false; +} + + +bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len, + char **value, int *value_len) +{ + bool use_counter=false; + while (1) + { + if (column_data_it == column_data_vec.end()) + return true; + + if ((*column_data_it).__isset.column) + break; /* Ok it's a real column. Should be always the case. */ + + if ((*column_data_it).__isset.counter_column) + { + use_counter= true; + break; + } + + column_data_it++; + } + + ColumnOrSuperColumn& cs= *column_data_it; + if (use_counter) + { + *name_len= cs.counter_column.name.size(); + *name= (char*)cs.counter_column.name.c_str(); + *value= (char*)&cs.counter_column.value; + *value_len= sizeof(cs.counter_column.value); + } + else + { + *name_len= cs.column.name.size(); + *name= (char*)cs.column.name.c_str(); + *value= (char*)cs.column.value.c_str(); + *value_len= cs.column.value.length(); + } + + column_data_it++; + return false; +} + + +/* Return the rowkey for the record that was read */ + +void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len) +{ + *value= (char*)rowkey.c_str(); + *value_len= rowkey.length(); +} + + +bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key) +{ + get_range_slices_param_last_key_as_start_key= last_key_as_start_key; + + return try_operation(&Cassandra_se_impl::retryable_get_range_slices); +} + + +bool Cassandra_se_impl::retryable_get_range_slices() +{ + bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key; + + ColumnParent cparent; + cparent.column_family= column_family; + + /* SlicePredicate can be used to limit columns we will retrieve */ + + KeyRange key_range; + key_range.__isset.start_key= true; + key_range.__isset.end_key= true; + + if (last_key_as_start_key) + { + key_range.start_key= rowkey; + + have_rowkey_to_skip= true; + rowkey_to_skip= rowkey; + } + else + { + have_rowkey_to_skip= false; + key_range.start_key.assign("", 0); + } + + key_range.end_key.assign("", 0); + key_range.count= read_batch_size; + + cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range, + read_consistency); + + if (key_slice_vec.size() < (uint)read_batch_size) + get_slices_returned_less= true; + else + get_slices_returned_less= false; + + key_slice_it= key_slice_vec.begin(); + return false; +} + + +/* Switch to next row. This may produce an error */ +bool Cassandra_se_impl::get_next_range_slice_row(bool *eof) +{ +restart: + if (key_slice_it == key_slice_vec.end()) + { + if (get_slices_returned_less) + { + *eof= true; + return false; + } + + /* + We have read through all columns in this batch. Try getting the next + batch. + */ + if (get_range_slices(true)) + return true; + + if (key_slice_vec.empty()) + { + *eof= true; + return false; + } + } + + /* + (1) - skip the last row that we have read in the previous batch. + (2) - Rows that were deleted show up as rows without any columns. Skip + them, like CQL does. + */ + if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1) + key_slice_it->columns.size() == 0) // (2) + { + key_slice_it++; + goto restart; + } + + *eof= false; + column_data_vec= key_slice_it->columns; + rowkey= key_slice_it->key; + column_data_it= column_data_vec.begin(); + key_slice_it++; + return false; +} + + +void Cassandra_se_impl::finish_reading_range_slices() +{ + key_slice_vec.clear(); +} + + +void Cassandra_se_impl::clear_read_columns() +{ + slice_pred.column_names.clear(); +} + +void Cassandra_se_impl::clear_read_all_columns() +{ + slice_pred_sr.start = ""; + slice_pred_sr.finish = ""; + slice_pred.__set_slice_range(slice_pred_sr); +} + + +void Cassandra_se_impl::add_read_column(const char *name_arg) +{ + std::string name(name_arg); + slice_pred.__isset.column_names= true; + slice_pred.column_names.push_back(name); +} + + +bool Cassandra_se_impl::truncate() +{ + return try_operation(&Cassandra_se_impl::retryable_truncate); +} + + +bool Cassandra_se_impl::retryable_truncate() +{ + cass->truncate(column_family); + return 0; +} + + +bool Cassandra_se_impl::remove_row() +{ + return try_operation(&Cassandra_se_impl::retryable_remove_row); +} + + +bool Cassandra_se_impl::retryable_remove_row() +{ + ColumnPath column_path; + column_path.column_family= column_family; + cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency); + return 0; +} + +/* + Try calling a function, catching possible Cassandra errors, and re-trying + for "transient" errors. +*/ +bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) +{ + bool res; + int n_attempts= thrift_call_retries_to_do; + + bool was_inside_try_operation= inside_try_operation; + inside_try_operation= true; + + do + { + res= true; + + try { + + if ((res= (this->*func_to_call)())) + { + /* + The function call was made successfully (without timeouts, etc), + but something inside it returned 'true'. + This is supposedly a failure (or "not found" or other negative + result). We need to return this to the caller. + */ + n_attempts= 0; + } + + } catch (InvalidRequestException ire) { + n_attempts= 0; /* there is no point in retrying this operation */ + print_error("%s [%s]", ire.what(), ire.why.c_str()); + } catch (UnavailableException ue) { + cassandra_counters.unavailable_exceptions++; + if (!--n_attempts) + print_error("UnavailableException: %s", ue.what()); + } catch (TimedOutException te) { + /* + Note: this is a timeout generated *inside Cassandra cluster*. + Connection between us and the cluster is ok, but something went wrong + within the cluster. + */ + cassandra_counters.timeout_exceptions++; + if (!--n_attempts) + print_error("TimedOutException: %s", te.what()); + } catch (TTransportException tte) { + /* Something went wrong in communication between us and Cassandra */ + cassandra_counters.network_exceptions++; + + switch (tte.getType()) + { + case TTransportException::NOT_OPEN: + case TTransportException::TIMED_OUT: + case TTransportException::END_OF_FILE: + case TTransportException::INTERRUPTED: + { + if (!was_inside_try_operation && reconnect()) + { + /* Failed to reconnect, no point to retry the operation */ + n_attempts= 0; + print_error("%s", tte.what()); + } + else + { + n_attempts--; + } + break; + } + default: + { + /* + We assume it doesn't make sense to retry for + unknown kinds of TTransportException-s + */ + n_attempts= 0; + print_error("%s", tte.what()); + } + } + }catch(TException e){ + /* todo: we may use retry for certain kinds of Thrift errors */ + n_attempts= 0; + print_error("Thrift exception: %s", e.what()); + } catch (...) { + n_attempts= 0; /* Don't retry */ + print_error("Unknown exception"); + } + + } while (res && n_attempts > 0); + + inside_try_operation= was_inside_try_operation; + return res; +} + +///////////////////////////////////////////////////////////////////////////// +// MRR reads +///////////////////////////////////////////////////////////////////////////// + +void Cassandra_se_impl::new_lookup_keys() +{ + mrr_keys.clear(); +} + + +int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len) +{ + mrr_keys.push_back(std::string(key, key_len)); + return mrr_keys.size(); +} + +bool Cassandra_se_impl::multiget_slice() +{ + return try_operation(&Cassandra_se_impl::retryable_multiget_slice); +} + + +bool Cassandra_se_impl::retryable_multiget_slice() +{ + ColumnParent cparent; + cparent.column_family= column_family; + + SlicePredicate slice_pred; + SliceRange sr; + sr.start = ""; + sr.finish = ""; + slice_pred.__set_slice_range(sr); + + cassandra_counters.multiget_reads++; + cassandra_counters.multiget_keys_scanned += mrr_keys.size(); + cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, + read_consistency); + + cassandra_counters.multiget_rows_read += mrr_result.size(); + mrr_result_it= mrr_result.begin(); + + return false; +} + + +bool Cassandra_se_impl::get_next_multiget_row() +{ + if (mrr_result_it == mrr_result.end()) + return true; /* EOF */ + + column_data_vec= mrr_result_it->second; + rowkey= mrr_result_it->first; + + column_data_it= column_data_vec.begin(); + mrr_result_it++; + return false; +} + + + |