summaryrefslogtreecommitdiffstats
path: root/storage/cassandra/ha_cassandra.h
blob: 29987ec804bc2e0f05ab9ca490ec240d78c89d80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
/*
   Copyright (c) 2012, 2020, MariaDB Corporation.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
#ifdef USE_PRAGMA_INTERFACE
#pragma interface			/* gcc class implementation */
#endif


#include "my_global.h"                   /* ulonglong */
#include "thr_lock.h"                    /* THR_LOCK, THR_LOCK_DATA */
#include "handler.h"                     /* handler */
#include "my_base.h"                     /* ha_rows */

#include "cassandra_se.h"

/** @brief
  CASSANDRA_SHARE is a structure that will be shared among all open handlers.
  This example implements the minimum of what you will probably need.
*/
typedef struct st_cassandra_share {
  char *table_name;
  uint table_name_length,use_count;
  mysql_mutex_t mutex;
  THR_LOCK lock;
} CASSANDRA_SHARE;

class ColumnDataConverter;
struct st_dynamic_column_value;
typedef struct st_dynamic_column_value DYNAMIC_COLUMN_VALUE;

struct ha_table_option_struct;


struct st_dynamic_column_value;

typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
                                   int cass_data_len,
                                   struct st_dynamic_column_value *value,
                                   MEM_ROOT *mem_root);
typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
                                   char **cass_data,
                                   int *cass_data_len,
                                   void *buf, void **freemem);
struct cassandra_type_def
{
  const char *name;
  CAS2DYN_CONVERTER cassandra_to_dynamic;
  DYN2CAS_CONVERTER dynamic_to_cassandra;
};

typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;

enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
  CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
  CT_DECIMAL};

typedef enum cassandtra_type_enum CASSANDRA_TYPE;



/** @brief
  Class definition for the storage engine
*/
class ha_cassandra: public handler
{
  friend class Column_name_enumerator_impl;
  THR_LOCK_DATA lock;      ///< MySQL lock
  CASSANDRA_SHARE *share;    ///< Shared lock info

  Cassandra_se_interface *se;

  /* description of static part of the table definition */
  ColumnDataConverter **field_converters;
  uint n_field_converters;

  CASSANDRA_TYPE_DEF *default_type_def;
  /* description of dynamic columns part */
  CASSANDRA_TYPE_DEF *special_type_field_converters;
  LEX_STRING *special_type_field_names;
  uint n_special_type_fields;
  DYNAMIC_ARRAY dynamic_values, dynamic_names;
  DYNAMIC_STRING dynamic_rec;

  ColumnDataConverter *rowkey_converter;

  bool setup_field_converters(Field **field, uint n_fields);
  void free_field_converters();

  int read_cassandra_columns(bool unpack_pk);
  int check_table_options(struct ha_table_option_struct* options);

  bool doing_insert_batch;
  ha_rows insert_rows_batched;

  uint dyncol_field;
  bool dyncol_set;

  /* Used to produce 'wrong column %s at row %lu' warnings */
  ha_rows insert_lineno;
  void print_conversion_error(const char *field_name,
                              char *cass_value, int cass_value_len);
  int connect_and_check_options(TABLE *table_arg);
public:
  ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
  ~ha_cassandra()
  {
    free_field_converters();
    delete se;
  }

  /** @brief
    The name that will be used for display purposes.
   */
  const char *table_type() const { return "CASSANDRA"; }

  /** @brief
    The name of the index type that will be used for display.
    Don't implement this method unless you really have indexes.
   */
  const char *index_type(uint) override { return "HASH"; }

  /** @brief
    This is a list of flags that indicate what functionality the storage engine
    implements. The current table flags are documented in handler.h
  */
  ulonglong table_flags() const override
  {
    return HA_BINLOG_STMT_CAPABLE |
           HA_REC_NOT_IN_SEQ |
           HA_NO_TRANSACTIONS |
           HA_REQUIRE_PRIMARY_KEY |
           HA_PRIMARY_KEY_IN_READ_INDEX |
           HA_PRIMARY_KEY_REQUIRED_FOR_POSITION |
           HA_NO_AUTO_INCREMENT |
           HA_TABLE_SCAN_ON_INDEX;
  }

  /** @brief
    This is a bitmap of flags that indicates how the storage engine
    implements indexes. The current index flags are documented in
    handler.h. If you do not implement indexes, just return zero here.

      @details
    part is the key part to check. First key part is 0.
    If all_parts is set, MySQL wants to know the flags for the combined
    index, up to and including 'part'.
  */
  ulong index_flags(uint, uint, bool) const override
  {
    return 0;
  }

  /** @brief
    unireg.cc will call max_supported_record_length(), max_supported_keys(),
    max_supported_key_parts(), uint max_supported_key_length()
    to make sure that the storage engine can handle the data it is about to
    send. Return *real* limits of your storage engine here; MySQL will do
    min(your_limits, MySQL_limits) automatically.
   */
  uint max_supported_record_length() const override {return HA_MAX_REC_LENGTH;}

  /* Support only one Primary Key, for now */
  uint max_supported_keys()          const override { return 1; }
  uint max_supported_key_parts()     const override { return 1; }

  /** @brief
    unireg.cc will call this to make sure that the storage engine can handle
    the data it is about to send. Return *real* limits of your storage engine
    here; MySQL will do min(your_limits, MySQL_limits) automatically.

      @details
    There is no need to implement ..._key_... methods if your engine doesn't
    support indexes.
   */
  uint max_supported_key_length() const override
  { return 16*1024; /* just to return something*/ }

  int index_init(uint idx, bool sorted) override;

  int index_read_map(uchar * buf, const uchar * key,
                     key_part_map keypart_map,
                     enum ha_rkey_function find_flag) override;

  /** @brief
    Called in test_quick_select to determine if indexes should be used.
  */
  double scan_time() override
  { return (double) (stats.records+stats.deleted) / 20.0+10; }

  /** @brief
    This method will never be called if you do not implement indexes.
  */
  double read_time(uint, uint, ha_rows rows) override
  { return (double) rows /  20.0+1; }

  void start_bulk_insert(ha_rows rows, uint flags) override;
  int end_bulk_insert() override;

  int reset() override;


  int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
                            uint n_ranges, uint mode, HANDLER_BUFFER *buf)
    override;
  int multi_range_read_next(range_id_t *range_info) override;
  ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
                                      void *seq_init_param,
                                      uint n_ranges, uint *bufsz,
                                      uint *flags, Cost_estimate *cost)
    override;
  ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
                                uint key_parts, uint *bufsz,
                                uint *flags, Cost_estimate *cost)
    override;
  int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
    override;

private:
  bool source_exhausted;
  bool mrr_start_read();
  int check_field_options(Field **fields);
  int read_dyncol(uint *count,
                  DYNAMIC_COLUMN_VALUE **vals, LEX_STRING **names,
                  String *valcol);
  int write_dynamic_row(uint count,
                        DYNAMIC_COLUMN_VALUE *vals,
                        LEX_STRING *names);
  void static free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals,
                               LEX_STRING **names);
  CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
                                               int cass_name_length);
public:
  int open(const char *name, int mode, uint test_if_locked) override;
  int close() override;

  int write_row(const uchar *buf) override;
  int update_row(const uchar *old_data, const uchar *new_data) override;
  int delete_row(const uchar *buf) override;

  /** @brief
    Unlike index_init(), rnd_init() can be called two consecutive times
    without rnd_end() in between (it only makes sense if scan=1). In this
    case, the second call should prepare for the new table scan (e.g if
    rnd_init() allocates the cursor, the second call should position the
    cursor to the start of the table; no need to deallocate and allocate
    it again. This is a required method.
  */
  int rnd_init(bool scan) override;
  int rnd_end() override;
  int rnd_next(uchar *buf) override;
  int rnd_pos(uchar *buf, uchar *pos) override;
  void position(const uchar *record) override;
  int info(uint) override;
  int delete_all_rows() override;
  ha_rows records_in_range(uint, const key_range *min_key,
                           const key_range *max_key,
                           page_range *res) override
  { return HA_POS_ERROR; /* Range scans are not supported */ }

  int create(const char *name, TABLE *form,
             HA_CREATE_INFO *create_info) override;
  bool check_if_incompatible_data(HA_CREATE_INFO *info,
                                  uint table_changes) override;

  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
                             enum thr_lock_type lock_type) override;

  my_bool register_query_cache_table(THD *thd, const char *table_key,
                                     uint key_length,
                                     qc_engine_callback
                                     *engine_callback,
                                     ulonglong *engine_data) override
  {
    /* 
      Do not put data from Cassandra tables into query cache (because there 
      is no way to tell whether the data in cassandra cluster has changed or 
      not)
    */
    return FALSE;
  }
};