diff options
Diffstat (limited to 'storage/cassandra/ha_cassandra.cc')
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 2604 |
1 files changed, 2604 insertions, 0 deletions
diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc new file mode 100644 index 00000000..54812b51 --- /dev/null +++ b/storage/cassandra/ha_cassandra.cc @@ -0,0 +1,2604 @@ +/* + Copyright (c) 2012, 2020, MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + +#include <my_config.h> +#include <mysql/plugin.h> +#include "ha_cassandra.h" +#include "sql_class.h" + +#define DYNCOL_USUAL 20 +#define DYNCOL_DELTA 100 +#define DYNCOL_USUAL_REC 1024 +#define DYNCOL_DELTA_REC 1024 + +static handler *cassandra_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root); + +extern int dynamic_column_error_message(enum_dyncol_func_result rc); + +handlerton *cassandra_hton; + + +/* + Hash used to track the number of open tables; variable for example share + methods +*/ +static HASH cassandra_open_tables; + +/* The mutex used to init the hash; variable for example share methods */ +mysql_mutex_t cassandra_mutex; + + +/** + Structure for CREATE TABLE options (table options). + It needs to be called ha_table_option_struct. + + The option values can be specified in the CREATE TABLE at the end: + CREATE TABLE ( ... ) *here* +*/ + +struct ha_table_option_struct +{ + const char *thrift_host; + int thrift_port; + const char *keyspace; + const char *column_family; +}; + + +ha_create_table_option cassandra_table_option_list[]= +{ + /* + one option that takes an arbitrary string + */ + HA_TOPTION_STRING("thrift_host", thrift_host), + HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0), + HA_TOPTION_STRING("keyspace", keyspace), + HA_TOPTION_STRING("column_family", column_family), + HA_TOPTION_END +}; + +/** + Structure for CREATE TABLE options (field options). +*/ + +struct ha_field_option_struct +{ + bool dyncol_field; +}; + +ha_create_table_option cassandra_field_option_list[]= +{ + /* + Collect all other columns as dynamic here, + the valid values are YES/NO, ON/OFF, 1/0. + The default is 0, that is false, no, off. + */ + HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0), + HA_FOPTION_END +}; + +static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG, + "Number of rows in an INSERT batch", + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0); + +static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG, + "Number of rows in a multiget(MRR) batch", + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0); + +static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG, + "Number of rows in an rnd_read (full scan) batch", + NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0); + +static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG, + "Number of times to retry Cassandra calls that failed due to timeouts or " + "network communication problems. The default, 0, means not to retry.", + NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0); + +/* These match values in enum_cassandra_consistency_level */ +const char *cassandra_consistency_level[] = +{ + "ONE", + "QUORUM", + "LOCAL_QUORUM", + "EACH_QUORUM", + "ALL", + "ANY", + "TWO", + "THREE", + NullS +}; + +TYPELIB cassandra_consistency_level_typelib= { + array_elements(cassandra_consistency_level) - 1, "", + cassandra_consistency_level, NULL +}; + + +static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG, + "Cassandra consistency level to use for write operations", NULL, NULL, + ONE, &cassandra_consistency_level_typelib); + +static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG, + "Cassandra consistency level to use for read operations", NULL, NULL, + ONE, &cassandra_consistency_level_typelib); + + +mysql_mutex_t cassandra_default_host_lock; +static char* cassandra_default_thrift_host = NULL; +static char cassandra_default_host_buf[256]=""; + +static void +cassandra_default_thrift_host_update(THD *thd, + struct st_mysql_sys_var* var, + void* var_ptr, /*!< out: where the + formal string goes */ + const void* save) /*!< in: immediate result + from check function */ +{ + const char *new_host= *((char**)save); + const size_t max_len= sizeof(cassandra_default_host_buf); + + mysql_mutex_lock(&cassandra_default_host_lock); + + if (new_host) + { + strncpy(cassandra_default_host_buf, new_host, max_len-1); + cassandra_default_host_buf[max_len-1]= 0; + cassandra_default_thrift_host= cassandra_default_host_buf; + } + else + { + cassandra_default_host_buf[0]= 0; + cassandra_default_thrift_host= NULL; + } + + *((const char**)var_ptr)= cassandra_default_thrift_host; + + mysql_mutex_unlock(&cassandra_default_host_lock); +} + + +static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host, + PLUGIN_VAR_RQCMDARG, + "Default host for Cassandra thrift connections", + /*check*/NULL, + cassandra_default_thrift_host_update, + /*default*/NULL); + +static struct st_mysql_sys_var* cassandra_system_variables[]= { + MYSQL_SYSVAR(insert_batch_size), + MYSQL_SYSVAR(multiget_batch_size), + MYSQL_SYSVAR(rnd_batch_size), + + MYSQL_SYSVAR(default_thrift_host), + MYSQL_SYSVAR(write_consistency), + MYSQL_SYSVAR(read_consistency), + MYSQL_SYSVAR(failure_retries), + NULL +}; + +Cassandra_status_vars cassandra_counters; + +/** + @brief + Function we use in the creation of our hash to get key. +*/ + +static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length, + my_bool not_used __attribute__((unused))) +{ + *length=share->table_name_length; + return (uchar*) share->table_name; +} + +#ifdef HAVE_PSI_INTERFACE +static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex; + +static PSI_mutex_info all_cassandra_mutexes[]= +{ + { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL}, + { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0} +}; + +static void init_cassandra_psi_keys() +{ + const char* category= "cassandra"; + int count; + + if (PSI_server == NULL) + return; + + count= array_elements(all_cassandra_mutexes); + PSI_server->register_mutex(category, all_cassandra_mutexes, count); +} +#endif + +static int cassandra_init_func(void *p) +{ + DBUG_ENTER("cassandra_init_func"); + +#ifdef HAVE_PSI_INTERFACE + init_cassandra_psi_keys(); +#endif + + cassandra_hton= (handlerton *)p; + mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST); + (void) my_hash_init(PSI_INSTRUMENT_ME, &cassandra_open_tables,system_charset_info,32,0,0, + (my_hash_get_key) cassandra_get_key,0,0); + + cassandra_hton->create= cassandra_create_handler; + /* + Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE + TABLE to create an *empty* table from scratch. Cassandra table won't be + emptied if re-created. + */ + cassandra_hton->flags= 0; + cassandra_hton->table_options= cassandra_table_option_list; + cassandra_hton->field_options= cassandra_field_option_list; + + mysql_mutex_init(0 /* no instrumentation */, + &cassandra_default_host_lock, MY_MUTEX_INIT_FAST); + + DBUG_RETURN(0); +} + + +static int cassandra_done_func(void *p) +{ + int error= 0; + DBUG_ENTER("cassandra_done_func"); + if (cassandra_open_tables.records) + error= 1; + my_hash_free(&cassandra_open_tables); + mysql_mutex_destroy(&cassandra_mutex); + mysql_mutex_destroy(&cassandra_default_host_lock); + DBUG_RETURN(error); +} + + +/** + @brief + Example of simple lock controls. The "share" it creates is a + structure we will pass to each cassandra handler. Do you have to have + one of these? Well, you have pieces that are used for locking, and + they are needed to function. +*/ + +static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table) +{ + CASSANDRA_SHARE *share; + uint length; + char *tmp_name; + + mysql_mutex_lock(&cassandra_mutex); + length=(uint) strlen(table_name); + + if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables, + (uchar*) table_name, + length))) + { + if (!(share=(CASSANDRA_SHARE *) + my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), PSI_INSTRUMENT_ME, + &share, sizeof(*share), + &tmp_name, length+1, + NullS))) + { + mysql_mutex_unlock(&cassandra_mutex); + return NULL; + } + + share->use_count=0; + share->table_name_length=length; + share->table_name=tmp_name; + strmov(share->table_name,table_name); + if (my_hash_insert(&cassandra_open_tables, (uchar*) share)) + goto error; + thr_lock_init(&share->lock); + mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex, + &share->mutex, MY_MUTEX_INIT_FAST); + } + share->use_count++; + mysql_mutex_unlock(&cassandra_mutex); + + return share; + +error: + mysql_mutex_destroy(&share->mutex); + my_free(share); + + return NULL; +} + + +/** + @brief + Free lock controls. We call this whenever we close a table. If the table had + the last reference to the share, then we free memory associated with it. +*/ + +static int free_share(CASSANDRA_SHARE *share) +{ + mysql_mutex_lock(&cassandra_mutex); + if (!--share->use_count) + { + my_hash_delete(&cassandra_open_tables, (uchar*) share); + thr_lock_delete(&share->lock); + mysql_mutex_destroy(&share->mutex); + my_free(share); + } + mysql_mutex_unlock(&cassandra_mutex); + + return 0; +} + + +static handler* cassandra_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_cassandra(hton, table); +} + + +ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg) + :handler(hton, table_arg), + se(NULL), field_converters(NULL), + special_type_field_converters(NULL), + special_type_field_names(NULL), n_special_type_fields(0), + rowkey_converter(NULL), + dyncol_field(0), dyncol_set(0) +{} + + +int ha_cassandra::connect_and_check_options(TABLE *table_arg) +{ + ha_table_option_struct *options= table_arg->s->option_struct; + int res; + DBUG_ENTER("ha_cassandra::connect_and_check_options"); + + if ((res= check_field_options(table_arg->s->field)) || + (res= check_table_options(options))) + DBUG_RETURN(res); + + se= create_cassandra_se(); + se->set_column_family(options->column_family); + const char *thrift_host= options->thrift_host? options->thrift_host: + cassandra_default_thrift_host; + if (se->connect(thrift_host, options->thrift_port, options->keyspace)) + { + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str()); + DBUG_RETURN(HA_ERR_NO_CONNECTION); + } + + if (setup_field_converters(table_arg->field, table_arg->s->fields)) + { + DBUG_RETURN(HA_ERR_NO_CONNECTION); + } + + DBUG_RETURN(0); +} + + +int ha_cassandra::check_field_options(Field **fields) +{ + Field **field; + uint i; + DBUG_ENTER("ha_cassandra::check_field_options"); + for (field= fields, i= 0; *field; field++, i++) + { + ha_field_option_struct *field_options= (*field)->option_struct; + if (field_options && field_options->dyncol_field) + { + if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB) + { + my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name.str); + DBUG_RETURN(HA_WRONG_CREATE_OPTION); + } + dyncol_set= 1; + dyncol_field= i; + bzero(&dynamic_values, sizeof(dynamic_values)); + bzero(&dynamic_names, sizeof(dynamic_names)); + bzero(&dynamic_rec, sizeof(dynamic_rec)); + } + } + DBUG_RETURN(0); +} + + +int ha_cassandra::open(const char *name, int mode, uint test_if_locked) +{ + DBUG_ENTER("ha_cassandra::open"); + + if (!(share = get_share(name, table))) + DBUG_RETURN(1); + thr_lock_data_init(&share->lock,&lock,NULL); + + DBUG_ASSERT(!se); + /* + Don't do the following on open: it prevents SHOW CREATE TABLE when the server + has gone away. + */ + /* + int res; + if ((res= connect_and_check_options(table))) + { + DBUG_RETURN(res); + } + */ + + info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST); + insert_lineno= 0; + + DBUG_RETURN(0); +} + + +int ha_cassandra::close(void) +{ + DBUG_ENTER("ha_cassandra::close"); + delete se; + se= NULL; + free_field_converters(); + DBUG_RETURN(free_share(share)); +} + + +int ha_cassandra::check_table_options(ha_table_option_struct *options) +{ + if (!options->thrift_host && (!cassandra_default_thrift_host || + !cassandra_default_thrift_host[0])) + { + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), + "thrift_host table option must be specified, or " + "@@cassandra_default_thrift_host must be set"); + return HA_WRONG_CREATE_OPTION; + } + + if (!options->keyspace || !options->column_family) + { + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), + "keyspace and column_family table options must be specified"); + return HA_WRONG_CREATE_OPTION; + } + return 0; +} + + +/** + @brief + create() is called to create a table. The variable name will have the name + of the table. + + @details + When create() is called you do not need to worry about + opening the table. Also, the .frm file will have already been + created so adjusting create_info is not necessary. You can overwrite + the .frm file at this point if you wish to change the table + definition, but there are no methods currently provided for doing + so. + + Called from handle.cc by ha_create_table(). + + @see + ha_create_table() in handle.cc +*/ + +int ha_cassandra::create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *create_info) +{ + int res; + DBUG_ENTER("ha_cassandra::create"); + + if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 || + table_arg->key_info[0].user_defined_key_parts != 1 || + table_arg->key_info[0].key_part[0].fieldnr != 1) + { + my_error(ER_WRONG_COLUMN_NAME, MYF(0), + "Table must have PRIMARY KEY defined over the first column"); + DBUG_RETURN(HA_WRONG_CREATE_OPTION); + } + + DBUG_ASSERT(!se); + if ((res= connect_and_check_options(table_arg))) + DBUG_RETURN(res); + + insert_lineno= 0; + DBUG_RETURN(0); +} + +/* + Mapping needs to + - copy value from MySQL record to Thrift buffer + - copy value from Thrift bufer to MySQL record.. + +*/ + +/* Converter base */ +class ColumnDataConverter +{ +public: + Field *field; + + /* This will save Cassandra's data in the Field */ + virtual int cassandra_to_mariadb(const char *cass_data, + int cass_data_len)=0; + + /* + This will get data from the Field pointer, store Cassandra's form + in internal buffer, and return pointer/size. + + @return + false - OK + true - Failed to convert value (completely, there is no value to insert + at all). + */ + virtual bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0; + virtual ~ColumnDataConverter() {}; +}; + + +class DoubleDataConverter : public ColumnDataConverter +{ + double buf; +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + DBUG_ASSERT(cass_data_len == sizeof(double)); + double *pdata= (double*) cass_data; + field->store(*pdata); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + buf= field->val_real(); + *cass_data= (char*)&buf; + *cass_data_len=sizeof(double); + return false; + } + ~DoubleDataConverter(){} +}; + + +class FloatDataConverter : public ColumnDataConverter +{ + float buf; +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + DBUG_ASSERT(cass_data_len == sizeof(float)); + float *pdata= (float*) cass_data; + field->store(*pdata); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + buf= field->val_real(); + *cass_data= (char*)&buf; + *cass_data_len=sizeof(float); + return false; + } + ~FloatDataConverter(){} +}; + +static void flip64(const char *from, char* to) +{ + to[0]= from[7]; + to[1]= from[6]; + to[2]= from[5]; + to[3]= from[4]; + to[4]= from[3]; + to[5]= from[2]; + to[6]= from[1]; + to[7]= from[0]; +} + +class BigintDataConverter : public ColumnDataConverter +{ + longlong buf; + bool flip; /* is false when reading counter columns */ +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + longlong tmp; + DBUG_ASSERT(cass_data_len == sizeof(longlong)); + if (flip) + flip64(cass_data, (char*)&tmp); + else + memcpy(&tmp, cass_data, sizeof(longlong)); + field->store(tmp); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + longlong tmp= field->val_int(); + if (flip) + flip64((const char*)&tmp, (char*)&buf); + else + memcpy(&buf, &tmp, sizeof(longlong)); + *cass_data= (char*)&buf; + *cass_data_len=sizeof(longlong); + return false; + } + BigintDataConverter(bool flip_arg) : flip(flip_arg) {} + ~BigintDataConverter(){} +}; + +static void flip32(const char *from, char* to) +{ + to[0]= from[3]; + to[1]= from[2]; + to[2]= from[1]; + to[3]= from[0]; +} + + +class TinyintDataConverter : public ColumnDataConverter +{ + char buf; +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + DBUG_ASSERT(cass_data_len == 1); + field->store(cass_data[0]); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + buf= field->val_int()? 1 : 0; /* TODO: error handling? */ + *cass_data= (char*)&buf; + *cass_data_len= 1; + return false; + } + ~TinyintDataConverter(){} +}; + + +class Int32DataConverter : public ColumnDataConverter +{ + int32_t buf; +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + int32_t tmp; + DBUG_ASSERT(cass_data_len == sizeof(int32_t)); + flip32(cass_data, (char*)&tmp); + field->store(tmp); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + int32_t tmp= field->val_int(); + flip32((const char*)&tmp, (char*)&buf); + *cass_data= (char*)&buf; + *cass_data_len=sizeof(int32_t); + return false; + } + ~Int32DataConverter(){} +}; + + +class StringCopyConverter : public ColumnDataConverter +{ + String buf; + size_t max_length; +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + if ((size_t)cass_data_len > max_length) + return 1; + field->store(cass_data, cass_data_len,field->charset()); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + String *pstr= field->val_str(&buf); + *cass_data= (char*)pstr->ptr(); + *cass_data_len= pstr->length(); + return false; + } + StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {} + ~StringCopyConverter(){} +}; + + +class TimestampDataConverter : public ColumnDataConverter +{ + int64_t buf; +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + /* Cassandra data is milliseconds-since-epoch in network byte order */ + int64_t tmp; + DBUG_ASSERT(cass_data_len==8); + flip64(cass_data, (char*)&tmp); + /* + store_TIME's arguments: + - seconds since epoch + - microsecond fraction of a second. + */ + ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + my_time_t ts_time; + ulong ts_microsec; + int64_t tmp; + ts_time= ((Field_timestamp*)field)->get_timestamp(&ts_microsec); + + /* Cassandra needs milliseconds-since-epoch */ + tmp= ((int64_t)ts_time) * 1000 + ts_microsec/1000; + flip64((const char*)&tmp, (char*)&buf); + + *cass_data= (char*)&buf; + *cass_data_len= 8; + return false; + } + ~TimestampDataConverter(){} +}; + + + +static int convert_hex_digit(const char c) +{ + int num; + if (c >= '0' && c <= '9') + num= c - '0'; + else if (c >= 'A' && c <= 'F') + num= c - 'A' + 10; + else if (c >= 'a' && c <= 'f') + num= c - 'a' + 10; + else + return -1; /* Couldn't convert */ + return num; +} + + +const char map2number[]="0123456789abcdef"; + +static void convert_uuid2string(char *str, const char *cass_data) +{ + char *ptr= str; + /* UUID arrives as 16-byte number in network byte order */ + for (uint i=0; i < 16; i++) + { + *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF]; + *(ptr++)= map2number[cass_data[i] & 0xF]; + if (i == 3 || i == 5 || i == 7 || i == 9) + *(ptr++)= '-'; + } + *ptr= 0; +} + +static bool convert_string2uuid(char *buf, const char *str) +{ + int lower, upper; + for (uint i= 0; i < 16; i++) + { + if ((upper= convert_hex_digit(str[0])) == -1 || + (lower= convert_hex_digit(str[1])) == -1) + { + return true; + } + buf[i]= lower | (upper << 4); + str += 2; + if (i == 3 || i == 5 || i == 7 || i == 9) + { + if (str[0] != '-') + return true; + str++; + } + } + return false; +} + + +class UuidDataConverter : public ColumnDataConverter +{ + char buf[16]; /* Binary UUID representation */ + String str_buf; +public: + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) + { + DBUG_ASSERT(cass_data_len==16); + char str[37]; + convert_uuid2string(str, cass_data); + field->store(str, 36,field->charset()); + return 0; + } + + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) + { + String *uuid_str= field->val_str(&str_buf); + + if (uuid_str->length() != 36) + return true; + + if (convert_string2uuid(buf, (char*)uuid_str->c_ptr())) + return true; + *cass_data= buf; + *cass_data_len= 16; + return false; + } + ~UuidDataConverter(){} +}; + +/** + Converting dynamic columns types to/from casandra types +*/ + + +/** + Check and initialize (if it is needed) string MEM_ROOT +*/ +static void alloc_strings_memroot(MEM_ROOT *mem_root) +{ + if (!alloc_root_inited(mem_root)) + { + /* + The mem_root used to allocate UUID (of length 36 + \0) so make + appropriate allocated size + */ + init_alloc_root(PSI_INSTRUMENT_ME, mem_root, + (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 + + ALLOC_ROOT_MIN_BLOCK_SIZE, + (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 + + ALLOC_ROOT_MIN_BLOCK_SIZE, MYF(MY_THREAD_SPECIFIC)); + } +} + +static void free_strings_memroot(MEM_ROOT *mem_root) +{ + if (alloc_root_inited(mem_root)) + free_root(mem_root, MYF(0)); +} + +bool cassandra_to_dyncol_intLong(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + value->type= DYN_COL_INT; +#ifdef WORDS_BIGENDIAN + value->x.long_value= (longlong)*cass_data; +#else + flip64(cass_data, (char *)&value->x.long_value); +#endif + return 0; +} + +bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong *tmp= (longlong *) buff; + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_long(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(longlong); +#ifdef WORDS_BIGENDIAN + *cass_data= (char *)buff; +#else + flip64((char *)buff, (char *)buff + sizeof(longlong)); + *cass_data= (char *)buff + sizeof(longlong); +#endif + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_intInt32(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + int32 tmp; + value->type= DYN_COL_INT; +#ifdef WORDS_BIGENDIAN + tmp= *((int32 *)cass_data); +#else + flip32(cass_data, (char *)&tmp); +#endif + value->x.long_value= tmp; + return 0; +} + + +bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong)); + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_long(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(int32); + *cass_data= (char *)buff; +#ifdef WORDS_BIGENDIAN + *((int32 *) buff) = (int32) *tmp; +#else + { + int32 tmp2= (int32) *tmp; + flip32((char *)&tmp2, (char *)buff); + } +#endif + *freemem= NULL; + return false; +} + + +bool cassandra_to_dyncol_intCounter(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + value->type= DYN_COL_INT; + value->x.long_value= *((longlong *)cass_data); + return 0; +} + + +bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong *tmp= (longlong *)buff; + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_long(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(longlong); + *cass_data= (char *)buff; + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_doubleFloat(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + value->type= DYN_COL_DOUBLE; + value->x.double_value= *((float *)cass_data); + return 0; +} + +bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + double tmp; + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_double(&tmp, value); + if (rc < 0) + return true; + *((float *)buff)= (float) tmp; + *cass_data_len= sizeof(float); + *cass_data= (char *)buff; + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_doubleDouble(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root + __attribute__((unused))) +{ + value->type= DYN_COL_DOUBLE; + value->x.double_value= *((double *)cass_data); + return 0; +} + +bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + double *tmp= (double *)buff; + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_double(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(double); + *cass_data= (char *)buff; + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_strStr(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value, + CHARSET_INFO *cs) +{ + value->type= DYN_COL_STRING; + value->x.string.charset= cs; + value->x.string.value.str= (char *)cass_data; + value->x.string.value.length= cass_data_len; + return 0; +} + +bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem, CHARSET_INFO *cs) +{ + DYNAMIC_STRING tmp; + if (init_dynamic_string(&tmp, NULL, 1024, 1024)) + return 1; + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_str(&tmp, value, cs, '\0'); + if (rc < 0) + { + dynstr_free(&tmp); + return 1; + } + *cass_data_len= tmp.length; + *(cass_data)= tmp.str; + *freemem= tmp.str; + return 0; +} + +bool cassandra_to_dyncol_strBytes(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value, + &my_charset_bin); +} + +bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + return dyncol_to_cassandraStr(value, cass_data, cass_data_len, + buff, freemem, &my_charset_bin); +} + +bool cassandra_to_dyncol_strAscii(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value, + &my_charset_latin1_bin); +} + +bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + return dyncol_to_cassandraStr(value, cass_data, cass_data_len, + buff, freemem, &my_charset_latin1_bin); +} + +bool cassandra_to_dyncol_strUTF8(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value, + &my_charset_utf8mb3_unicode_ci); +} + +bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + return dyncol_to_cassandraStr(value, cass_data, cass_data_len, + buff, freemem, &my_charset_utf8mb3_unicode_ci); +} + +bool cassandra_to_dyncol_strUUID(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root) +{ + value->type= DYN_COL_STRING; + value->x.string.charset= &my_charset_bin; + alloc_strings_memroot(mem_root); + value->x.string.value.str= (char *)alloc_root(mem_root, 37); + if (!value->x.string.value.str) + { + value->x.string.value.length= 0; + return 1; + } + convert_uuid2string(value->x.string.value.str, cass_data); + value->x.string.value.length= 36; + return 0; +} + +bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + DYNAMIC_STRING tmp; + if (init_dynamic_string(&tmp, NULL, 1024, 1024)) + return true; + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_str(&tmp, value, &my_charset_latin1_bin, '\0'); + if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str)) + { + dynstr_free(&tmp); + return true; + } + + *cass_data_len= tmp.length; + *(cass_data)= tmp.str; + *freemem= tmp.str; + return 0; +} + +bool cassandra_to_dyncol_intBool(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value, + MEM_ROOT *mem_root __attribute__((unused))) +{ + value->type= DYN_COL_INT; + value->x.long_value= (cass_data[0] ? 1 : 0); + return 0; +} + +bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong tmp; + enum enum_dyncol_func_result rc= + mariadb_dyncol_val_long(&tmp, value); + if (rc < 0) + return true; + ((char *)buff)[0]= (tmp ? 1 : 0); + *cass_data_len= 1; + *(cass_data)= (char *)buff; + *freemem= 0; + return 0; +} + + +const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType"; +const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type"; +const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType"; + +const char * const validator_float= "org.apache.cassandra.db.marshal.FloatType"; +const char * const validator_double= "org.apache.cassandra.db.marshal.DoubleType"; + +const char * const validator_blob= "org.apache.cassandra.db.marshal.BytesType"; +const char * const validator_ascii= "org.apache.cassandra.db.marshal.AsciiType"; +const char * const validator_text= "org.apache.cassandra.db.marshal.UTF8Type"; + +const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType"; + +const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType"; + +const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType"; + +/* VARINTs are stored as big-endian big numbers. */ +const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType"; +const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType"; + + +static CASSANDRA_TYPE_DEF cassandra_types[]= +{ + { + validator_bigint, + &cassandra_to_dyncol_intLong, + &dyncol_to_cassandraLong + }, + { + validator_int, + &cassandra_to_dyncol_intInt32, + &dyncol_to_cassandraInt32 + }, + { + validator_counter, + cassandra_to_dyncol_intCounter, + &dyncol_to_cassandraCounter + }, + { + validator_float, + &cassandra_to_dyncol_doubleFloat, + &dyncol_to_cassandraFloat + }, + { + validator_double, + &cassandra_to_dyncol_doubleDouble, + &dyncol_to_cassandraDouble + }, + { + validator_blob, + &cassandra_to_dyncol_strBytes, + &dyncol_to_cassandraBytes + }, + { + validator_ascii, + &cassandra_to_dyncol_strAscii, + &dyncol_to_cassandraAscii + }, + { + validator_text, + &cassandra_to_dyncol_strUTF8, + &dyncol_to_cassandraUTF8 + }, + { + validator_timestamp, + &cassandra_to_dyncol_intLong, + &dyncol_to_cassandraLong + }, + { + validator_uuid, + &cassandra_to_dyncol_strUUID, + &dyncol_to_cassandraUUID + }, + { + validator_boolean, + &cassandra_to_dyncol_intBool, + &dyncol_to_cassandraBool + }, + { + validator_varint, + &cassandra_to_dyncol_strBytes, + &dyncol_to_cassandraBytes + }, + { + validator_decimal, + &cassandra_to_dyncol_strBytes, + &dyncol_to_cassandraBytes + } +}; + +CASSANDRA_TYPE get_cassandra_type(const char *validator) +{ + CASSANDRA_TYPE rc; + switch(validator[32]) + { + case 'L': + rc= CT_BIGINT; + break; + case 'I': + rc= (validator[35] == '3' ? CT_INT : CT_VARINT); + rc= CT_INT; + break; + case 'C': + rc= CT_COUNTER; + break; + case 'F': + rc= CT_FLOAT; + break; + case 'D': + switch (validator[33]) + { + case 'o': + rc= CT_DOUBLE; + break; + case 'a': + rc= CT_TIMESTAMP; + break; + case 'e': + rc= CT_DECIMAL; + break; + default: + rc= CT_BLOB; + break; + } + break; + case 'B': + rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB); + break; + case 'A': + rc= CT_ASCII; + break; + case 'U': + rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID); + break; + default: + rc= CT_BLOB; + } + DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0); + return rc; +} + +ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) +{ + ColumnDataConverter *res= NULL; + + switch(field->type()) { + case MYSQL_TYPE_TINY: + if (!strcmp(validator_name, validator_boolean)) + { + res= new TinyintDataConverter; + break; + } + /* fall through: */ + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONGLONG: + { + bool is_counter= false; + if (!strcmp(validator_name, validator_bigint) || + !strcmp(validator_name, validator_timestamp) || + (is_counter= !strcmp(validator_name, validator_counter))) + res= new BigintDataConverter(!is_counter); + break; + } + case MYSQL_TYPE_FLOAT: + if (!strcmp(validator_name, validator_float)) + res= new FloatDataConverter; + break; + + case MYSQL_TYPE_DOUBLE: + if (!strcmp(validator_name, validator_double)) + res= new DoubleDataConverter; + break; + + case MYSQL_TYPE_TIMESTAMP: + if (!strcmp(validator_name, validator_timestamp)) + res= new TimestampDataConverter; + break; + + case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings. + if (!strcmp(validator_name, validator_uuid) && + field->real_type() == MYSQL_TYPE_STRING && + field->field_length == 36) + { + // UUID maps to CHAR(36), its text representation + res= new UuidDataConverter; + break; + } + /* fall through: */ + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_BLOB: + { + /* + Cassandra's "varint" type is a binary-encoded arbitary-length + big-endian number. + - It can be mapped to VARBINARY(N), with sufficiently big N. + - If the value does not fit into N bytes, it is an error. We should not + truncate it, because that is just as good as returning garbage. + - varint should not be mapped to BINARY(N), because BINARY(N) values + are zero-padded, which will work as multiplying the value by + 2^k for some value of k. + */ + if (field->type() == MYSQL_TYPE_VARCHAR && + field->binary() && + (!strcmp(validator_name, validator_varint) || + !strcmp(validator_name, validator_decimal))) + { + res= new StringCopyConverter(field->field_length); + break; + } + + if (!strcmp(validator_name, validator_blob) || + !strcmp(validator_name, validator_ascii) || + !strcmp(validator_name, validator_text)) + { + res= new StringCopyConverter((size_t)-1); + } + break; + } + case MYSQL_TYPE_LONG: + if (!strcmp(validator_name, validator_int)) + res= new Int32DataConverter; + break; + + default:; + } + return res; +} + + +bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) +{ + char *col_name; + int col_name_len; + char *col_type; + int col_type_len; + size_t ddl_fields= se->get_ddl_size(); + const char *default_type= se->get_default_validator(); + uint max_non_default_fields; + DBUG_ENTER("ha_cassandra::setup_field_converters"); + DBUG_ASSERT(default_type); + + DBUG_ASSERT(!field_converters); + DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1); + + /* + We always should take into account that in case of using dynamic columns + sql description contain one field which does not described in + Cassandra DDL also key field is described separately. So that + is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2". + */ + max_non_default_fields= ddl_fields + 2 - n_fields; + if (ddl_fields < (n_fields - dyncol_set - 1)) + { + se->print_error("Some of SQL fields were not mapped to Cassandra's fields"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + /* allocate memory in one chunk */ + size_t memsize= sizeof(ColumnDataConverter*) * n_fields + + (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))* + (dyncol_set ? max_non_default_fields : 0); + if (!(field_converters= (ColumnDataConverter**)my_malloc(PSI_INSTRUMENT_ME, memsize, MYF(0)))) + DBUG_RETURN(true); + bzero(field_converters, memsize); + n_field_converters= n_fields; + + if (dyncol_set) + { + special_type_field_converters= + (CASSANDRA_TYPE_DEF *)(field_converters + n_fields); + special_type_field_names= + ((LEX_STRING*)(special_type_field_converters + max_non_default_fields)); + + if (my_init_dynamic_array(PSI_INSTRUMENT_ME, &dynamic_values, + sizeof(DYNAMIC_COLUMN_VALUE), + DYNCOL_USUAL, DYNCOL_DELTA, MYF(0))) + DBUG_RETURN(true); + else + if (my_init_dynamic_array(PSI_INSTRUMENT_ME, &dynamic_names, + sizeof(LEX_STRING), + DYNCOL_USUAL, DYNCOL_DELTA,MYF(0))) + { + delete_dynamic(&dynamic_values); + DBUG_RETURN(true); + } + else + if (init_dynamic_string(&dynamic_rec, NULL, + DYNCOL_USUAL_REC, DYNCOL_DELTA_REC)) + { + delete_dynamic(&dynamic_values); + delete_dynamic(&dynamic_names); + DBUG_RETURN(true); + } + + /* Dynamic column field has special processing */ + field_converters[dyncol_field]= NULL; + + default_type_def= cassandra_types + get_cassandra_type(default_type); + } + + se->first_ddl_column(); + uint n_mapped= 0; + while (!se->next_ddl_column(&col_name, &col_name_len, &col_type, + &col_type_len)) + { + Field **field; + uint i; + /* Mapping for the 1st field is already known */ + for (field= field_arg + 1, i= 1; *field; field++, i++) + { + if ((!dyncol_set || dyncol_field != i) && + !strcmp((*field)->field_name.str, col_name)) + { + n_mapped++; + ColumnDataConverter **conv= field_converters + (*field)->field_index; + if (!(*conv= map_field_to_validator(*field, col_type))) + { + se->print_error("Failed to map column %s to datatype %s", + (*field)->field_name.str, col_type); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + (*conv)->field= *field; + break; + } + } + if (dyncol_set && !(*field)) // is needed and not found + { + DBUG_PRINT("info",("Field not found: %s", col_name)); + if (strcmp(col_type, default_type)) + { + DBUG_PRINT("info",("Field '%s' non-default type: '%s'", + col_name, col_type)); + special_type_field_names[n_special_type_fields].length= col_name_len; + special_type_field_names[n_special_type_fields].str= col_name; + special_type_field_converters[n_special_type_fields]= + cassandra_types[get_cassandra_type(col_type)]; + n_special_type_fields++; + } + } + } + + if (n_mapped != n_fields - 1 - dyncol_set) + { + Field *first_unmapped= NULL; + /* Find the first field */ + for (uint i= 1; i < n_fields;i++) + { + if (!field_converters[i]) + { + first_unmapped= field_arg[i]; + break; + } + } + DBUG_ASSERT(first_unmapped); + + se->print_error("Field `%s` could not be mapped to any field in Cassandra", + first_unmapped->field_name.str); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + /* + Setup type conversion for row_key. + */ + se->get_rowkey_type(&col_name, &col_type); + if (col_name && strcmp(col_name, (*field_arg)->field_name.str)) + { + se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", + col_name); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + if (!col_name && strcmp("rowkey", (*field_arg)->field_name.str)) + { + se->print_error("target column family has no key_alias defined, " + "PRIMARY KEY column must be named 'rowkey'"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + if (col_type != NULL) + { + if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type))) + { + se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + rowkey_converter->field= *field_arg; + } + else + { + se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + DBUG_RETURN(false); +} + + +void ha_cassandra::free_field_converters() +{ + delete rowkey_converter; + rowkey_converter= NULL; + + if (dyncol_set) + { + delete_dynamic(&dynamic_values); + delete_dynamic(&dynamic_names); + dynstr_free(&dynamic_rec); + } + if (field_converters) + { + for (uint i=0; i < n_field_converters; i++) + if (field_converters[i]) + { + DBUG_ASSERT(!dyncol_set || i != dyncol_field); + delete field_converters[i]; + } + my_free(field_converters); + field_converters= NULL; + } +} + + +int ha_cassandra::index_init(uint idx, bool sorted) +{ + int ires; + if (!se && (ires= connect_and_check_options(table))) + return ires; + return 0; +} + +void store_key_image_to_rec(Field *field, uchar *ptr, uint len); + +int ha_cassandra::index_read_map(uchar *buf, const uchar *key, + key_part_map keypart_map, + enum ha_rkey_function find_flag) +{ + int rc= 0; + DBUG_ENTER("ha_cassandra::index_read_map"); + + if (find_flag != HA_READ_KEY_EXACT) + { + DBUG_ASSERT(0); /* Non-equality lookups should never be done */ + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + + uint key_len= calculate_key_len(table, active_index, key, keypart_map); + store_key_image_to_rec(table->field[0], (uchar*)key, key_len); + + char *cass_key; + int cass_key_len; + MY_BITMAP *old_map; + + old_map= dbug_tmp_use_all_columns(table, &table->read_set); + + if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len)) + { + /* We get here when making lookups like uuid_column='not-an-uuid' */ + dbug_tmp_restore_column_map(&table->read_set, old_map); + DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); + } + + dbug_tmp_restore_column_map(&table->read_set, old_map); + + bool found; + if (se->get_slice(cass_key, cass_key_len, &found)) + { + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + rc= HA_ERR_INTERNAL_ERROR; + } + + /* TODO: what if we're not reading all columns?? */ + if (!found) + rc= HA_ERR_KEY_NOT_FOUND; + else + rc= read_cassandra_columns(false); + + DBUG_RETURN(rc); +} + + +void ha_cassandra::print_conversion_error(const char *field_name, + char *cass_value, + int cass_value_len) +{ + char buf[32]; + char *p= cass_value; + size_t i= 0; + for (; (i < sizeof(buf)-1) && (p < cass_value + cass_value_len); p++) + { + buf[i++]= map2number[(*p >> 4) & 0xF]; + buf[i++]= map2number[*p & 0xF]; + } + buf[i]=0; + + se->print_error("Unable to convert value for field `%s` from Cassandra's data" + " format. Source data is %d bytes, 0x%s%s", + field_name, cass_value_len, buf, + (i == sizeof(buf) - 1)? "..." : ""); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); +} + + + +CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name, + int cass_name_len) +{ + CASSANDRA_TYPE_DEF *type= default_type_def; + for(uint i= 0; i < n_special_type_fields; i++) + { + if (cass_name_len == (int)special_type_field_names[i].length && + memcmp(cass_name, special_type_field_names[i].str, + cass_name_len) == 0) + { + type= special_type_field_converters + i; + break; + } + } + return type; +} + +int ha_cassandra::read_cassandra_columns(bool unpack_pk) +{ + MEM_ROOT strings_root; + char *cass_name; + char *cass_value; + int cass_value_len, cass_name_len; + Field **field; + int res= 0; + ulong total_name_len= 0; + + clear_alloc_root(&strings_root); + /* + cassandra_to_mariadb() calls will use field->store(...) methods, which + require that the column is in the table->write_set + */ + MY_BITMAP *old_map; + old_map= dbug_tmp_use_all_columns(table, &table->write_set); + + /* Start with all fields being NULL */ + for (field= table->field + 1; *field; field++) + (*field)->set_null(); + + while (!se->get_next_read_column(&cass_name, &cass_name_len, + &cass_value, &cass_value_len)) + { + // map to our column. todo: use hash or something.. + bool found= 0; + for (field= table->field + 1; *field; field++) + { + uint fieldnr= (*field)->field_index; + if ((!dyncol_set || dyncol_field != fieldnr) && + !strcmp((*field)->field_name.str, cass_name)) + { + found= 1; + (*field)->set_notnull(); + if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value, + cass_value_len)) + { + print_conversion_error((*field)->field_name.str, cass_value, + cass_value_len); + res=1; + goto err; + } + break; + } + } + if (dyncol_set && !found) + { + DYNAMIC_COLUMN_VALUE val; + LEX_STRING nm; + CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name, + cass_name_len); + nm.str= cass_name; + nm.length= cass_name_len; + if (nm.length > MAX_NAME_LENGTH) + { + se->print_error("Unable to convert value for field `%s`" + " from Cassandra's data format. Name" + " length exceed limit of %u: '%s'", + table->field[dyncol_field]->field_name.str, + (uint)MAX_NAME_LENGTH, cass_name); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + res=1; + goto err; + } + total_name_len+= cass_name_len; + if (nm.length > MAX_TOTAL_NAME_LENGTH) + { + se->print_error("Unable to convert value for field `%s`" + " from Cassandra's data format. Sum of all names" + " length exceed limit of %lu", + table->field[dyncol_field]->field_name.str, + cass_name, (uint)MAX_TOTAL_NAME_LENGTH); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + res=1; + goto err; + } + + if ((res= (*(type->cassandra_to_dynamic))(cass_value, + cass_value_len, &val, + &strings_root)) || + insert_dynamic(&dynamic_names, (uchar *) &nm) || + insert_dynamic(&dynamic_values, (uchar *) &val)) + { + if (res) + { + print_conversion_error(cass_name, cass_value, cass_value_len); + } + free_strings_memroot(&strings_root); + // EOM shouldm be already reported if happened + res= 1; + goto err; + } + } + } + + dynamic_rec.length= 0; + if (dyncol_set) + { + if (mariadb_dyncol_create_many_named(&dynamic_rec, + dynamic_names.elements, + (LEX_STRING *)dynamic_names.buffer, + (DYNAMIC_COLUMN_VALUE *) + dynamic_values.buffer, + FALSE) < 0) + dynamic_rec.length= 0; + + free_strings_memroot(&strings_root); + dynamic_values.elements= dynamic_names.elements= 0; + + if (dynamic_rec.length == 0) + table->field[dyncol_field]->set_null(); + else + { + Field_blob *blob= (Field_blob *)table->field[dyncol_field]; + blob->set_notnull(); + blob->store_length(dynamic_rec.length); + *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))= + dynamic_rec.str; + } + } + + if (unpack_pk) + { + /* Unpack rowkey to primary key */ + field= table->field; + (*field)->set_notnull(); + se->get_read_rowkey(&cass_value, &cass_value_len); + if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len)) + { + print_conversion_error((*field)->field_name.str, cass_value, cass_value_len); + res=1; + goto err; + } + } + +err: + dbug_tmp_restore_column_map(&table->write_set, old_map); + return res; +} + +int ha_cassandra::read_dyncol(uint *count, + DYNAMIC_COLUMN_VALUE **vals, + LEX_STRING **names, + String *valcol) +{ + String *strcol; + DYNAMIC_COLUMN col; + + enum enum_dyncol_func_result rc; + DBUG_ENTER("ha_cassandra::read_dyncol"); + + Field *field= table->field[dyncol_field]; + DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB); + /* It is blob and it does not use buffer */ + strcol= field->val_str(NULL, valcol); + if (field->is_null()) + { + *count= 0; + *names= 0; + *vals= 0; + DBUG_RETURN(0); // nothing to write + } + /* + dynamic_column_vals only read the string so we can + cheat here with assignment + */ + bzero(&col, sizeof(col)); + col.str= (char *)strcol->ptr(); + col.length= strcol->length(); + if ((rc= mariadb_dyncol_unpack(&col, count, names, vals)) < 0) + { + dynamic_column_error_message(rc); + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + } + DBUG_RETURN(0); +} + +int ha_cassandra::write_dynamic_row(uint count, + DYNAMIC_COLUMN_VALUE *vals, + LEX_STRING *names) +{ + uint i; + DBUG_ENTER("ha_cassandra::write_dynamic_row"); + DBUG_ASSERT(dyncol_set); + + + for (i= 0; i < count; i++) + { + char buff[16]; + CASSANDRA_TYPE_DEF *type; + void *freemem= NULL; + char *cass_data; + int cass_data_len; + + DBUG_PRINT("info", ("field %*s", (int)names[i].length, names[i].str)); + type= get_cassandra_field_def(names[i].str, (int) names[i].length); + if ((*type->dynamic_to_cassandra)(vals +i, &cass_data, &cass_data_len, + buff, &freemem)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + names[i].str, insert_lineno); + DBUG_RETURN(HA_ERR_GENERIC); + } + se->add_insert_column(names[i].str, names[i].length, + cass_data, cass_data_len); + if (freemem) + my_free(freemem); + } + DBUG_RETURN(0); +} + +void ha_cassandra::free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals, + LEX_STRING **names) +{ + mariadb_dyncol_unpack_free(*names, *vals); + *vals= 0; + *names= 0; +} + +int ha_cassandra::write_row(const uchar *buf) +{ + MY_BITMAP *old_map; + int ires; + DBUG_ENTER("ha_cassandra::write_row"); + + if (!se && (ires= connect_and_check_options(table))) + DBUG_RETURN(ires); + + if (!doing_insert_batch) + se->clear_insert_buffer(); + + old_map= dbug_tmp_use_all_columns(table, &table->read_set); + + insert_lineno++; + + /* Convert the key */ + char *cass_key; + int cass_key_len; + if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + rowkey_converter->field->field_name.str, insert_lineno); + dbug_tmp_restore_column_map(&table->read_set, old_map); + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + } + se->start_row_insert(cass_key, cass_key_len); + + /* Convert other fields */ + for (uint i= 1; i < table->s->fields; i++) + { + char *cass_data; + int cass_data_len; + if (dyncol_set && dyncol_field == i) + { + String valcol; + DYNAMIC_COLUMN_VALUE *vals; + LEX_STRING *names; + uint count; + int rc; + DBUG_ASSERT(field_converters[i] == NULL); + if (!(rc= read_dyncol(&count, &vals, &names, &valcol))) + rc= write_dynamic_row(count, vals, names); + free_dynamic_row(&vals, &names); + if (rc) + { + dbug_tmp_restore_column_map(&table->read_set, old_map); + DBUG_RETURN(rc); + } + } + else + { + if (field_converters[i]->mariadb_to_cassandra(&cass_data, + &cass_data_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + field_converters[i]->field->field_name.str, insert_lineno); + dbug_tmp_restore_column_map(&table->read_set, old_map); + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + } + se->add_insert_column(field_converters[i]->field->field_name.str, 0, + cass_data, cass_data_len); + } + } + + dbug_tmp_restore_column_map(&table->read_set, old_map); + + bool res; + + if (doing_insert_batch) + { + res= 0; + if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size)) + { + res= se->do_insert(); + insert_rows_batched= 0; + } + } + else + res= se->do_insert(); + + if (res) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); +} + + +void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags) +{ + int ires; + if (!se && (ires= connect_and_check_options(table))) + return; + + doing_insert_batch= true; + insert_rows_batched= 0; + + se->clear_insert_buffer(); +} + + +int ha_cassandra::end_bulk_insert() +{ + DBUG_ENTER("ha_cassandra::end_bulk_insert"); + + if (!doing_insert_batch) + { + /* SQL layer can make end_bulk_insert call without start_bulk_insert call */ + DBUG_RETURN(0); + } + + /* Flush out the insert buffer */ + doing_insert_batch= false; + bool bres= se->do_insert(); + se->clear_insert_buffer(); + + DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); +} + + +int ha_cassandra::rnd_init(bool scan) +{ + bool bres; + int ires; + DBUG_ENTER("ha_cassandra::rnd_init"); + + if (!se && (ires= connect_and_check_options(table))) + DBUG_RETURN(ires); + + if (!scan) + { + /* Prepare for rnd_pos() calls. We don't need to anything. */ + DBUG_RETURN(0); + } + + if (dyncol_set) + { + se->clear_read_all_columns(); + } + else + { + se->clear_read_columns(); + for (uint i= 1; i < table->s->fields; i++) + se->add_read_column(table->field[i]->field_name.str); + } + + se->read_batch_size= THDVAR(table->in_use, rnd_batch_size); + bres= se->get_range_slices(false); + if (bres) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + + DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); +} + + +int ha_cassandra::rnd_end() +{ + DBUG_ENTER("ha_cassandra::rnd_end"); + + se->finish_reading_range_slices(); + DBUG_RETURN(0); +} + + +int ha_cassandra::rnd_next(uchar *buf) +{ + int rc; + bool reached_eof; + DBUG_ENTER("ha_cassandra::rnd_next"); + + // Unpack and return the next record. + if (se->get_next_range_slice_row(&reached_eof)) + { + rc= HA_ERR_INTERNAL_ERROR; + } + else + { + if (reached_eof) + rc= HA_ERR_END_OF_FILE; + else + rc= read_cassandra_columns(true); + } + + DBUG_RETURN(rc); +} + + +int ha_cassandra::delete_all_rows() +{ + bool bres; + int ires; + DBUG_ENTER("ha_cassandra::delete_all_rows"); + + if (!se && (ires= connect_and_check_options(table))) + DBUG_RETURN(ires); + + bres= se->truncate(); + + if (bres) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + + DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); +} + + +int ha_cassandra::delete_row(const uchar *buf) +{ + bool bres; + DBUG_ENTER("ha_cassandra::delete_row"); + + bres= se->remove_row(); + + if (bres) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + + DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); +} + + +int ha_cassandra::info(uint flag) +{ + DBUG_ENTER("ha_cassandra::info"); + + if (!table) + return 1; + + if (flag & HA_STATUS_VARIABLE) + { + stats.records= 1000; + stats.deleted= 0; + } + if (flag & HA_STATUS_CONST) + { + ref_length= table->field[0]->key_length(); + } + + DBUG_RETURN(0); +} + + +void key_copy(uchar *to_key, const uchar *from_record, const KEY *key_info, + uint key_length, bool with_zerofill); + + +void ha_cassandra::position(const uchar *record) +{ + DBUG_ENTER("ha_cassandra::position"); + + /* Copy the primary key to rowid */ + key_copy(ref, (uchar*)record, &table->key_info[0], + table->field[0]->key_length(), true); + + DBUG_VOID_RETURN; +} + + +int ha_cassandra::rnd_pos(uchar *buf, uchar *pos) +{ + int rc; + DBUG_ENTER("ha_cassandra::rnd_pos"); + + int save_active_index= active_index; + active_index= 0; /* The primary key */ + rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT); + + active_index= save_active_index; + + DBUG_RETURN(rc); +} + + +int ha_cassandra::reset() +{ + doing_insert_batch= false; + insert_lineno= 0; + if (se) + { + se->set_consistency_levels(THDVAR(table->in_use, read_consistency), + THDVAR(table->in_use, write_consistency)); + se->set_n_retries(THDVAR(table->in_use, failure_retries)); + } + return 0; +} + +///////////////////////////////////////////////////////////////////////////// +// MRR implementation +///////////////////////////////////////////////////////////////////////////// + + +/* + - The key can be only primary key + - allow equality-ranges only. + - anything else? +*/ +ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, + void *seq_init_param, + uint n_ranges, uint *bufsz, + uint *flags, Cost_estimate *cost) +{ + /* No support for const ranges so far */ + return HA_POS_ERROR; +} + + +ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys, + uint key_parts, uint *bufsz, + uint *flags, Cost_estimate *cost) +{ + /* Can only be equality lookups on the primary key... */ + // TODO anything else? + *flags &= ~HA_MRR_USE_DEFAULT_IMPL; + *flags |= HA_MRR_NO_ASSOCIATION; + + return 10; +} + + +int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param, + uint n_ranges, uint mode, HANDLER_BUFFER *buf) +{ + int res; + mrr_iter= seq->init(seq_init_param, n_ranges, mode); + mrr_funcs= *seq; + res= mrr_start_read(); + return (res? HA_ERR_INTERNAL_ERROR: 0); +} + + +bool ha_cassandra::mrr_start_read() +{ + uint key_len; + + MY_BITMAP *old_map; + old_map= dbug_tmp_use_all_columns(table, &table->read_set); + + se->new_lookup_keys(); + + while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range))) + { + char *cass_key; + int cass_key_len; + + DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE); + + uchar *key= (uchar*)mrr_cur_range.start_key.key; + key_len= mrr_cur_range.start_key.length; + //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS?? + store_key_image_to_rec(table->field[0], (uchar*)key, key_len); + + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); + + // Primitive buffer control + if ((ulong) se->add_lookup_key(cass_key, cass_key_len) > + THDVAR(table->in_use, multiget_batch_size)) + break; + } + + dbug_tmp_restore_column_map(&table->read_set, old_map); + + return se->multiget_slice(); +} + + +int ha_cassandra::multi_range_read_next(range_id_t *range_info) +{ + int res; + while(1) + { + if (!se->get_next_multiget_row()) + { + res= read_cassandra_columns(true); + break; + } + else + { + if (source_exhausted) + { + res= HA_ERR_END_OF_FILE; + break; + } + else + { + if (mrr_start_read()) + { + res= HA_ERR_INTERNAL_ERROR; + break; + } + } + } + /* + We get here if we've refilled the buffer and done another read. Try + reading from results again + */ + } + return res; +} + + +int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size) +{ + const char *mrr_str= "multiget_slice"; + + if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL)) + { + uint mrr_str_len= strlen(mrr_str); + uint copy_len= MY_MIN(mrr_str_len, size); + memcpy(str, mrr_str, size); + return copy_len; + } + return 0; +} + + +class Column_name_enumerator_impl : public Column_name_enumerator +{ + ha_cassandra *obj; + uint idx; +public: + Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {} + const char* get_next_name() + { + if (idx == obj->table->s->fields) + return NULL; + else + return obj->table->field[idx++]->field_name.str; + } +}; + + +int ha_cassandra::update_row(const uchar *old_data, const uchar *new_data) +{ + DYNAMIC_COLUMN_VALUE *oldvals, *vals; + LEX_STRING *oldnames, *names; + uint oldcount, count; + String oldvalcol, valcol; + MY_BITMAP *old_map; + int res; + DBUG_ENTER("ha_cassandra::update_row"); + /* Currently, it is guaranteed that new_data == table->record[0] */ + DBUG_ASSERT(new_data == table->record[0]); + /* For now, just rewrite the full record */ + se->clear_insert_buffer(); + + old_map= dbug_tmp_use_all_columns(table, &table->read_set); + + char *old_key; + int old_key_len; + se->get_read_rowkey(&old_key, &old_key_len); + + /* Get the key we're going to write */ + char *new_key; + int new_key_len; + if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + rowkey_converter->field->field_name.str, insert_lineno); + dbug_tmp_restore_column_map(&table->read_set, old_map); + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + } + + /* + Compare it to the key we've read. For all types that Cassandra supports, + binary byte-wise comparison can be used + */ + bool new_primary_key; + if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len)) + new_primary_key= true; + else + new_primary_key= false; + + if (dyncol_set) + { + Field *field= table->field[dyncol_field]; + /* move to get old_data */ + my_ptrdiff_t diff; + diff= (my_ptrdiff_t) (old_data - new_data); + field->move_field_offset(diff); // Points now at old_data + if ((res= read_dyncol(&oldcount, &oldvals, &oldnames, &oldvalcol))) + DBUG_RETURN(res); + field->move_field_offset(-diff); // back to new_data + if ((res= read_dyncol(&count, &vals, &names, &valcol))) + { + free_dynamic_row(&oldvals, &oldnames); + DBUG_RETURN(res); + } + } + + if (new_primary_key) + { + /* + Primary key value changed. This is essentially a DELETE + INSERT. + Add a DELETE operation into the batch + */ + Column_name_enumerator_impl name_enumerator(this); + se->add_row_deletion(old_key, old_key_len, &name_enumerator, + oldnames, + (dyncol_set ? oldcount : 0)); + oldcount= 0; // they will be deleted + } + + se->start_row_insert(new_key, new_key_len); + + /* Convert other fields */ + for (uint i= 1; i < table->s->fields; i++) + { + char *cass_data; + int cass_data_len; + if (dyncol_set && dyncol_field == i) + { + DBUG_ASSERT(field_converters[i] == NULL); + if ((res= write_dynamic_row(count, vals, names))) + goto err; + } + else + { + if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + field_converters[i]->field->field_name.str, insert_lineno); + dbug_tmp_restore_column_map(&table->read_set, old_map); + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + } + se->add_insert_column(field_converters[i]->field->field_name.str, 0, + cass_data, cass_data_len); + } + } + if (dyncol_set) + { + /* find removed fields */ + uint i= 0, j= 0; + /* both array are sorted */ + for(; i < oldcount; i++) + { + int scmp= 0; + while (j < count && + (scmp = mariadb_dyncol_column_cmp_named(names + j, + oldnames + i)) < 0) + j++; + if (j < count && + scmp == 0) + j++; + else + se->add_insert_delete_column(oldnames[i].str, oldnames[i].length); + } + } + + dbug_tmp_restore_column_map(&table->read_set, old_map); + + res= se->do_insert(); + + if (res) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + +err: + if (dyncol_set) + { + free_dynamic_row(&oldvals, &oldnames); + free_dynamic_row(&vals, &names); + } + + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); +} + + +/* + We can't really have any locks for Cassandra Storage Engine. We're reading + from Cassandra cluster, and other clients can asynchronously modify the data. + + We can enforce locking within this process, but this will not be useful. + + Thus, store_lock() should express that: + - Writes do not block other writes + - Reads should not block anything either, including INSERTs. +*/ +THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + DBUG_ENTER("ha_cassandra::store_lock"); + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) + { + /* Writes allow other writes */ + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE)) + lock_type = TL_WRITE_ALLOW_WRITE; + + /* Reads allow everything, including INSERTs */ + if (lock_type == TL_READ_NO_INSERT) + lock_type = TL_READ; + + lock.type= lock_type; + } + *to++= &lock; + DBUG_RETURN(to); +} + + +/** + check_if_incompatible_data() called if ALTER TABLE can't detect otherwise + if new and old definition are compatible + + @details If there are no other explicit signs like changed number of + fields this function will be called by compare_tables() + (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm + file. + +*/ + +bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info, + uint table_changes) +{ + DBUG_ENTER("ha_cassandra::check_if_incompatible_data"); + /* Checked, we intend to have this empty for Cassandra SE. */ + DBUG_RETURN(COMPATIBLE_DATA_YES); +} + + +void Cassandra_se_interface::print_error(const char *format, ...) +{ + va_list ap; + va_start(ap, format); + // it's not a problem if output was truncated + my_vsnprintf(err_buffer, sizeof(err_buffer), format, ap); + va_end(ap); +} + + +struct st_mysql_storage_engine cassandra_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +static SHOW_VAR cassandra_status_variables[]= { + {"row_inserts", + (char*) &cassandra_counters.row_inserts, SHOW_LONG}, + {"row_insert_batches", + (char*) &cassandra_counters.row_insert_batches, SHOW_LONG}, + + {"multiget_keys_scanned", + (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG}, + {"multiget_reads", + (char*) &cassandra_counters.multiget_reads, SHOW_LONG}, + {"multiget_rows_read", + (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, + + {"network_exceptions", + (char*) &cassandra_counters.network_exceptions, SHOW_LONG}, + {"timeout_exceptions", + (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG}, + {"unavailable_exceptions", + (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG}, + {NullS, NullS, SHOW_LONG} +}; + + + + +maria_declare_plugin(cassandra) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &cassandra_storage_engine, + "CASSANDRA", + "Monty Program Ab", + "Cassandra storage engine", + PLUGIN_LICENSE_GPL, + cassandra_init_func, /* Plugin Init */ + cassandra_done_func, /* Plugin Deinit */ + 0x0001, /* version number (0.1) */ + cassandra_status_variables, /* status variables */ + cassandra_system_variables, /* system variables */ + "0.1", /* string version */ + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */ +} +maria_declare_plugin_end; |