summaryrefslogtreecommitdiffstats
path: root/storage/rocksdb/rdb_datadic.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/rocksdb/rdb_datadic.cc')
-rw-r--r--storage/rocksdb/rdb_datadic.cc5427
1 files changed, 5427 insertions, 0 deletions
diff --git a/storage/rocksdb/rdb_datadic.cc b/storage/rocksdb/rdb_datadic.cc
new file mode 100644
index 00000000..9afc2100
--- /dev/null
+++ b/storage/rocksdb/rdb_datadic.cc
@@ -0,0 +1,5427 @@
+/*
+ Copyright (c) 2012,2013 Monty Program Ab
+ Copyright (c) 2020, MariaDB Corporation.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+/* For use of 'PRIu64': */
+#define __STDC_FORMAT_MACROS
+
+#include <my_global.h>
+/* This C++ file's header file */
+#include "./rdb_datadic.h"
+
+#include <inttypes.h>
+/* C++ standard header files */
+#include <algorithm>
+#include <array>
+#include <limits>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+/* MySQL header files */
+#include "./field.h"
+#include "./key.h"
+#include "./m_ctype.h"
+#include "./my_bit.h"
+#include "./my_bitmap.h"
+#include "./sql_table.h"
+
+/* MyRocks header files */
+#include "./ha_rocksdb.h"
+#include "./ha_rocksdb_proto.h"
+#include "./my_stacktrace.h"
+#include "./rdb_cf_manager.h"
+#include "./rdb_psi.h"
+#include "./rdb_utils.h"
+
+namespace myrocks {
+
+void get_mem_comparable_space(const CHARSET_INFO *cs,
+ const std::vector<uchar> **xfrm, size_t *xfrm_len,
+ size_t *mb_len);
+
+/*
+ MariaDB's replacement for FB/MySQL Field::check_field_name_match :
+*/
+inline bool field_check_field_name_match(Field *field, const char *name)
+{
+ return (0 == my_strcasecmp(system_charset_info,
+ field->field_name.str,
+ name));
+}
+
+
+/*
+ Decode current key field
+ @param fpi IN data structure contains field metadata
+ @param field IN current field
+ @param reader IN key slice reader
+ @param unp_reader IN unpack information reader
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code
+*/
+int Rdb_convert_to_record_key_decoder::decode_field(
+ Rdb_field_packing *fpi, Field *field, Rdb_string_reader *reader,
+ const uchar *const default_value, Rdb_string_reader *unpack_reader) {
+ if (fpi->m_maybe_null) {
+ const char *nullp;
+ if (!(nullp = reader->read(1))) {
+ return HA_EXIT_FAILURE;
+ }
+
+ if (*nullp == 0) {
+ /* Set the NULL-bit of this field */
+ field->set_null();
+ /* Also set the field to its default value */
+ memcpy(field->ptr, default_value, field->pack_length());
+ return HA_EXIT_SUCCESS;
+ } else if (*nullp == 1) {
+ field->set_notnull();
+ } else {
+ return HA_EXIT_FAILURE;
+ }
+ }
+
+ return (fpi->m_unpack_func)(fpi, field, field->ptr, reader, unpack_reader);
+}
+
+/*
+ Decode current key field
+
+ @param buf OUT the buf starting address
+ @param offset OUT the bytes offset when data is written
+ @param fpi IN data structure contains field metadata
+ @param table IN current table
+ @param field IN current field
+ @param has_unpack_inf IN whether contains unpack inf
+ @param reader IN key slice reader
+ @param unp_reader IN unpack information reader
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code
+*/
+int Rdb_convert_to_record_key_decoder::decode(
+ uchar *const buf, uint *offset, Rdb_field_packing *fpi, TABLE *table,
+ Field *field, bool has_unpack_info, Rdb_string_reader *reader,
+ Rdb_string_reader *unpack_reader) {
+ DBUG_ASSERT(buf != nullptr);
+ DBUG_ASSERT(offset != nullptr);
+
+ uint field_offset = field->ptr - table->record[0];
+ *offset = field_offset;
+ uint null_offset = field->null_offset();
+ bool maybe_null = field->real_maybe_null();
+
+ field->move_field(buf + field_offset,
+ maybe_null ? buf + null_offset : nullptr, field->null_bit);
+
+ // If we need unpack info, but there is none, tell the unpack function
+ // this by passing unp_reader as nullptr. If we never read unpack_info
+ // during unpacking anyway, then there won't an error.
+ bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
+
+ int res =
+ decode_field(fpi, field, reader, table->s->default_values + field_offset,
+ maybe_missing_unpack ? nullptr : unpack_reader);
+
+ // Restore field->ptr and field->null_ptr
+ field->move_field(table->record[0] + field_offset,
+ maybe_null ? table->record[0] + null_offset : nullptr,
+ field->null_bit);
+ if (res != UNPACK_SUCCESS) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Skip current key field
+
+ @param fpi IN data structure contains field metadata
+ @param field IN current field
+ @param reader IN key slice reader
+ @param unp_reader IN unpack information reader
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code
+*/
+int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
+ const Field *field,
+ Rdb_string_reader *reader,
+ Rdb_string_reader *unp_reader) {
+ /* It is impossible to unpack the column. Skip it. */
+ if (fpi->m_maybe_null) {
+ const char *nullp;
+ if (!(nullp = reader->read(1))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ if (*nullp == 0) {
+ /* This is a NULL value */
+ return HA_EXIT_SUCCESS;
+ }
+ /* If NULL marker is not '0', it can be only '1' */
+ if (*nullp != 1) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ }
+ if ((fpi->m_skip_func)(fpi, field, reader)) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ // If this is a space padded varchar, we need to skip the indicator
+ // bytes for trailing bytes. They're useless since we can't restore the
+ // field anyway.
+ //
+ // There is a special case for prefixed varchars where we do not
+ // generate unpack info, because we know prefixed varchars cannot be
+ // unpacked. In this case, it is not necessary to skip.
+ if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
+ !fpi->m_unpack_info_stores_value) {
+ unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
+ }
+ return HA_EXIT_SUCCESS;
+}
+
+Rdb_key_field_iterator::Rdb_key_field_iterator(
+ const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
+ Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
+ bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
+ m_key_def = key_def;
+ m_pack_info = pack_info;
+ m_iter_index = 0;
+ m_iter_end = key_def->get_key_parts();
+ m_reader = reader;
+ m_unp_reader = unp_reader;
+ m_table = table;
+ m_has_unpack_info = has_unpack_info;
+ m_covered_bitmap = covered_bitmap;
+ m_buf = buf;
+ m_secondary_key =
+ (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
+ m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
+ m_is_hidden_pk =
+ (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
+ m_curr_bitmap_pos = 0;
+ m_offset = 0;
+}
+
+void *Rdb_key_field_iterator::get_dst() const { return m_buf + m_offset; }
+
+int Rdb_key_field_iterator::get_field_index() const {
+ DBUG_ASSERT(m_field != nullptr);
+ return m_field->field_index;
+}
+
+bool Rdb_key_field_iterator::get_is_null() const { return m_is_null; }
+Field *Rdb_key_field_iterator::get_field() const {
+ DBUG_ASSERT(m_field != nullptr);
+ return m_field;
+}
+
+bool Rdb_key_field_iterator::has_next() { return m_iter_index < m_iter_end; }
+
+/**
+ Iterate each field in the key and decode/skip one by one
+*/
+int Rdb_key_field_iterator::next() {
+ int status = HA_EXIT_SUCCESS;
+ while (m_iter_index < m_iter_end) {
+ int curr_index = m_iter_index++;
+
+ m_fpi = &m_pack_info[curr_index];
+ /*
+ Hidden pk field is packed at the end of the secondary keys, but the SQL
+ layer does not know about it. Skip retrieving field if hidden pk.
+ */
+ if ((m_secondary_key && m_hidden_pk_exists &&
+ curr_index + 1 == m_iter_end) ||
+ m_is_hidden_pk) {
+ DBUG_ASSERT(m_fpi->m_unpack_func);
+ if ((m_fpi->m_skip_func)(m_fpi, nullptr, m_reader)) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ return HA_EXIT_SUCCESS;
+ }
+
+ m_field = m_fpi->get_field_in_table(m_table);
+
+ bool covered_column = true;
+ if (m_covered_bitmap != nullptr &&
+ m_field->real_type() == MYSQL_TYPE_VARCHAR && !m_fpi->m_covered) {
+ uint tmp= m_curr_bitmap_pos++;
+ covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
+ bitmap_is_set(m_covered_bitmap, tmp);
+ }
+
+ if (m_fpi->m_unpack_func && covered_column) {
+ /* It is possible to unpack this column. Do it. */
+ status = Rdb_convert_to_record_key_decoder::decode(
+ m_buf, &m_offset, m_fpi, m_table, m_field, m_has_unpack_info,
+ m_reader, m_unp_reader);
+ if (status) {
+ return status;
+ }
+ break;
+ } else {
+ status = Rdb_convert_to_record_key_decoder::skip(m_fpi, m_field, m_reader,
+ m_unp_reader);
+ if (status) {
+ return status;
+ }
+ }
+ }
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Rdb_key_def class implementation
+*/
+Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
+ rocksdb::ColumnFamilyHandle *cf_handle_arg,
+ uint16_t index_dict_version_arg, uchar index_type_arg,
+ uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
+ bool is_per_partition_cf_arg, const char *_name,
+ Rdb_index_stats _stats, uint32 index_flags_bitmap,
+ uint32 ttl_rec_offset, uint64 ttl_duration)
+ : m_index_number(indexnr_arg),
+ m_cf_handle(cf_handle_arg),
+ m_index_dict_version(index_dict_version_arg),
+ m_index_type(index_type_arg),
+ m_kv_format_version(kv_format_version_arg),
+ m_is_reverse_cf(is_reverse_cf_arg),
+ m_is_per_partition_cf(is_per_partition_cf_arg),
+ m_name(_name),
+ m_stats(_stats),
+ m_index_flags_bitmap(index_flags_bitmap),
+ m_ttl_rec_offset(ttl_rec_offset),
+ m_ttl_duration(ttl_duration),
+ m_ttl_column(""),
+ m_pk_part_no(nullptr),
+ m_pack_info(nullptr),
+ m_keyno(keyno_arg),
+ m_key_parts(0),
+ m_ttl_pk_key_part_offset(UINT_MAX),
+ m_ttl_field_index(UINT_MAX),
+ m_prefix_extractor(nullptr),
+ m_maxlength(0) // means 'not intialized'
+{
+ mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
+ rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
+ m_total_index_flags_length =
+ calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
+ DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
+ m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
+ m_total_index_flags_length == 0);
+ DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
+ m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
+ m_total_index_flags_length == 0);
+ DBUG_ASSERT(m_cf_handle != nullptr);
+}
+
+Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
+ : m_index_number(k.m_index_number),
+ m_cf_handle(k.m_cf_handle),
+ m_is_reverse_cf(k.m_is_reverse_cf),
+ m_is_per_partition_cf(k.m_is_per_partition_cf),
+ m_name(k.m_name),
+ m_stats(k.m_stats),
+ m_index_flags_bitmap(k.m_index_flags_bitmap),
+ m_ttl_rec_offset(k.m_ttl_rec_offset),
+ m_ttl_duration(k.m_ttl_duration),
+ m_ttl_column(k.m_ttl_column),
+ m_pk_part_no(k.m_pk_part_no),
+ m_pack_info(k.m_pack_info),
+ m_keyno(k.m_keyno),
+ m_key_parts(k.m_key_parts),
+ m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
+ m_ttl_field_index(UINT_MAX),
+ m_prefix_extractor(k.m_prefix_extractor),
+ m_maxlength(k.m_maxlength) {
+ mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
+ rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
+ m_total_index_flags_length =
+ calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
+ DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
+ m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
+ m_total_index_flags_length == 0);
+ DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
+ m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
+ m_total_index_flags_length == 0);
+ if (k.m_pack_info) {
+ const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
+ void *pack_info= my_malloc(PSI_INSTRUMENT_ME, size, MYF(0));
+ memcpy(pack_info, k.m_pack_info, size);
+ m_pack_info = reinterpret_cast<Rdb_field_packing *>(pack_info);
+ }
+
+ if (k.m_pk_part_no) {
+ const size_t size = sizeof(uint) * m_key_parts;
+ m_pk_part_no = reinterpret_cast<uint *>(my_malloc(PSI_INSTRUMENT_ME, size, MYF(0)));
+ memcpy(m_pk_part_no, k.m_pk_part_no, size);
+ }
+}
+
+Rdb_key_def::~Rdb_key_def() {
+ mysql_mutex_destroy(&m_mutex);
+
+ my_free(m_pk_part_no);
+ m_pk_part_no = nullptr;
+
+ my_free(m_pack_info);
+ m_pack_info = nullptr;
+}
+
+uint Rdb_key_def::setup(const TABLE *const tbl,
+ const Rdb_tbl_def *const tbl_def) {
+ DBUG_ASSERT(tbl != nullptr);
+ DBUG_ASSERT(tbl_def != nullptr);
+
+ /*
+ Set max_length based on the table. This can be called concurrently from
+ multiple threads, so there is a mutex to protect this code.
+ */
+ const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
+ const bool hidden_pk_exists = table_has_hidden_pk(tbl);
+ const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
+ if (!m_maxlength) {
+ RDB_MUTEX_LOCK_CHECK(m_mutex);
+ if (m_maxlength != 0) {
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
+ return HA_EXIT_SUCCESS;
+ }
+
+ KEY *key_info = nullptr;
+ KEY *pk_info = nullptr;
+ if (!is_hidden_pk) {
+ key_info = &tbl->key_info[m_keyno];
+ if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
+ m_name = std::string(key_info->name.str);
+ } else {
+ m_name = HIDDEN_PK_NAME;
+ }
+
+ if (secondary_key) {
+ m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts;
+ } else {
+ pk_info = nullptr;
+ m_pk_key_parts = 0;
+ }
+
+ // "unique" secondary keys support:
+ m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts;
+
+ if (secondary_key) {
+ /*
+ In most cases, SQL layer puts PK columns as invisible suffix at the
+ end of secondary key. There are cases where this doesn't happen:
+ - unique secondary indexes.
+ - partitioned tables.
+
+ Internally, we always need PK columns as suffix (and InnoDB does,
+ too, if you were wondering).
+
+ The loop below will attempt to put all PK columns at the end of key
+ definition. Columns that are already included in the index (either
+ by the user or by "extended keys" feature) are not included for the
+ second time.
+ */
+ m_key_parts += m_pk_key_parts;
+ }
+
+ if (secondary_key) {
+ m_pk_part_no = reinterpret_cast<uint *>(
+ my_malloc(PSI_INSTRUMENT_ME, sizeof(uint) * m_key_parts, MYF(0)));
+ } else {
+ m_pk_part_no = nullptr;
+ }
+
+ const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
+ m_pack_info =
+ reinterpret_cast<Rdb_field_packing *>(my_malloc(PSI_INSTRUMENT_ME, size, MYF(0)));
+
+ /*
+ Guaranteed not to error here as checks have been made already during
+ table creation.
+ */
+ Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
+ &m_ttl_field_index, true);
+
+ size_t max_len = INDEX_NUMBER_SIZE;
+ int unpack_len = 0;
+ int max_part_len = 0;
+ bool simulating_extkey = false;
+ uint dst_i = 0;
+
+ uint keyno_to_set = m_keyno;
+ uint keypart_to_set = 0;
+
+ if (is_hidden_pk) {
+ Field *field = nullptr;
+ m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
+ m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
+ max_len += m_pack_info[dst_i].m_max_image_len;
+ max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
+ dst_i++;
+ } else {
+ KEY_PART_INFO *key_part = key_info->key_part;
+
+ /* this loop also loops over the 'extended key' tail */
+ for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
+ Field *const field = key_part ? key_part->field : nullptr;
+
+ if (key_part && key_part->key_part_flag & HA_REVERSE_SORT)
+ {
+ my_error(ER_ILLEGAL_HA_CREATE_OPTION, MYF(0),
+ "ROCKSDB", "DESC");
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
+ return HA_EXIT_FAILURE;
+ }
+
+ if (simulating_extkey && !hidden_pk_exists) {
+ DBUG_ASSERT(secondary_key);
+ /* Check if this field is already present in the key definition */
+ bool found = false;
+ for (uint j= 0; j < key_info->ext_key_parts; j++) {
+ if (field->field_index ==
+ key_info->key_part[j].field->field_index &&
+ key_part->length == key_info->key_part[j].length) {
+ found = true;
+ break;
+ }
+ }
+
+ if (found) {
+ key_part++;
+ continue;
+ }
+ }
+
+ if (field && field->real_maybe_null()) max_len += 1; // NULL-byte
+
+ m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
+ key_part ? key_part->length : 0);
+ m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
+
+ if (pk_info) {
+ m_pk_part_no[dst_i] = -1;
+ for (uint j = 0; j < m_pk_key_parts; j++) {
+ if (field->field_index == pk_info->key_part[j].field->field_index) {
+ m_pk_part_no[dst_i] = j;
+ break;
+ }
+ }
+ } else if (secondary_key && hidden_pk_exists) {
+ /*
+ The hidden pk can never be part of the sk. So it is always
+ appended to the end of the sk.
+ */
+ m_pk_part_no[dst_i] = -1;
+ if (simulating_extkey) m_pk_part_no[dst_i] = 0;
+ }
+
+ max_len += m_pack_info[dst_i].m_max_image_len;
+
+ max_part_len =
+ std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
+
+ /*
+ Check key part name here, if it matches the TTL column then we store
+ the offset of the TTL key part here.
+ */
+ if (!m_ttl_column.empty() &&
+ field_check_field_name_match(field, m_ttl_column.c_str())) {
+ DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
+ DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
+ DBUG_ASSERT(!field->real_maybe_null());
+ m_ttl_pk_key_part_offset = dst_i;
+ }
+
+ key_part++;
+ /*
+ For "unique" secondary indexes, pretend they have
+ "index extensions".
+
+ MariaDB also has this property: if an index has a partially-covered
+ column like KEY(varchar_col(N)), then the SQL layer will think it is
+ not "extended" with PK columns. The code below handles this case,
+ also.
+ */
+ if (secondary_key && src_i+1 == key_info->ext_key_parts) {
+ simulating_extkey = true;
+ if (!hidden_pk_exists) {
+ keyno_to_set = tbl->s->primary_key;
+ key_part = pk_info->key_part;
+ keypart_to_set = (uint)-1;
+ } else {
+ keyno_to_set = tbl_def->m_key_count - 1;
+ key_part = nullptr;
+ keypart_to_set = 0;
+ }
+ }
+
+ dst_i++;
+ }
+ }
+
+ m_key_parts = dst_i;
+
+ /* Initialize the memory needed by the stats structure */
+ m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
+
+ /* Cache prefix extractor for bloom filter usage later */
+ rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
+ m_prefix_extractor = opt.prefix_extractor;
+
+ /*
+ This should be the last member variable set before releasing the mutex
+ so that other threads can't see the object partially set up.
+ */
+ m_maxlength = max_len;
+
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
+ }
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Determine if the table has TTL enabled by parsing the table comment.
+
+ @param[IN] table_arg
+ @param[IN] tbl_def_arg
+ @param[OUT] ttl_duration Default TTL value parsed from table comment
+*/
+uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
+ const Rdb_tbl_def *const tbl_def_arg,
+ uint64 *ttl_duration) {
+ DBUG_ASSERT(table_arg != nullptr);
+ DBUG_ASSERT(tbl_def_arg != nullptr);
+ DBUG_ASSERT(ttl_duration != nullptr);
+ std::string table_comment(table_arg->s->comment.str,
+ table_arg->s->comment.length);
+
+ bool ttl_duration_per_part_match_found = false;
+ std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
+ table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
+ RDB_TTL_DURATION_QUALIFIER);
+
+ /* If we don't have a ttl duration, nothing to do here. */
+ if (ttl_duration_str.empty()) {
+ return HA_EXIT_SUCCESS;
+ }
+
+ /*
+ Catch errors where a non-integral value was used as ttl duration, strtoull
+ will return 0.
+ */
+ *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
+ if (!*ttl_duration) {
+ my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
+ return HA_EXIT_FAILURE;
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Determine if the table has TTL enabled by parsing the table comment.
+
+ @param[IN] table_arg
+ @param[IN] tbl_def_arg
+ @param[OUT] ttl_column TTL column in the table
+ @param[IN] skip_checks Skip validation checks (when called in
+ setup())
+*/
+uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
+ const Rdb_tbl_def *const tbl_def_arg,
+ std::string *ttl_column,
+ uint *ttl_field_index, bool skip_checks) {
+ std::string table_comment(table_arg->s->comment.str,
+ table_arg->s->comment.length);
+ /*
+ Check if there is a TTL column specified. Note that this is not required
+ and if omitted, an 8-byte ttl field will be prepended to each record
+ implicitly.
+ */
+ bool ttl_col_per_part_match_found = false;
+ std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
+ table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
+ RDB_TTL_COL_QUALIFIER);
+
+ if (skip_checks) {
+ for (uint i = 0; i < table_arg->s->fields; i++) {
+ Field *const field = table_arg->field[i];
+ if (field_check_field_name_match(field, ttl_col_str.c_str())) {
+ *ttl_column = ttl_col_str;
+ *ttl_field_index = i;
+ }
+ }
+ return HA_EXIT_SUCCESS;
+ }
+
+ /* Check if TTL column exists in table */
+ if (!ttl_col_str.empty()) {
+ bool found = false;
+ for (uint i = 0; i < table_arg->s->fields; i++) {
+ Field *const field = table_arg->field[i];
+ if (field_check_field_name_match(field, ttl_col_str.c_str()) &&
+ field->real_type() == MYSQL_TYPE_LONGLONG &&
+ field->key_type() == HA_KEYTYPE_ULONGLONG &&
+ !field->real_maybe_null()) {
+ *ttl_column = ttl_col_str;
+ *ttl_field_index = i;
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
+ return HA_EXIT_FAILURE;
+ }
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+const std::string Rdb_key_def::gen_qualifier_for_table(
+ const char *const qualifier, const std::string &partition_name) {
+ bool has_partition = !partition_name.empty();
+ std::string qualifier_str = "";
+
+ if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
+ return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
+ : qualifier_str + RDB_CF_NAME_QUALIFIER +
+ RDB_QUALIFIER_VALUE_SEP;
+ } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
+ return has_partition
+ ? gen_ttl_duration_qualifier_for_partition(partition_name)
+ : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
+ RDB_QUALIFIER_VALUE_SEP;
+ } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
+ return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
+ : qualifier_str + RDB_TTL_COL_QUALIFIER +
+ RDB_QUALIFIER_VALUE_SEP;
+ } else {
+ DBUG_ASSERT(0);
+ }
+
+ return qualifier_str;
+}
+
+/*
+ Formats the string and returns the column family name assignment part for a
+ specific partition.
+*/
+const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
+ const std::string &prefix) {
+ DBUG_ASSERT(!prefix.empty());
+
+ return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
+ RDB_QUALIFIER_VALUE_SEP;
+}
+
+const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
+ const std::string &prefix) {
+ DBUG_ASSERT(!prefix.empty());
+
+ return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
+ RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
+}
+
+const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
+ const std::string &prefix) {
+ DBUG_ASSERT(!prefix.empty());
+
+ return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
+ RDB_QUALIFIER_VALUE_SEP;
+}
+
+const std::string Rdb_key_def::parse_comment_for_qualifier(
+ const std::string &comment, const TABLE *const table_arg,
+ const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
+ const char *const qualifier) {
+ DBUG_ASSERT(table_arg != nullptr);
+ DBUG_ASSERT(tbl_def_arg != nullptr);
+ DBUG_ASSERT(per_part_match_found != nullptr);
+ DBUG_ASSERT(qualifier != nullptr);
+
+ std::string empty_result;
+
+ // Flag which marks if partition specific options were found.
+ *per_part_match_found = false;
+
+ if (comment.empty()) {
+ return empty_result;
+ }
+
+ // Let's fetch the comment for a index and check if there's a custom key
+ // name specified for a partition we are handling.
+ std::vector<std::string> v =
+ myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
+
+ std::string search_str = gen_qualifier_for_table(qualifier);
+
+ // If table has partitions then we need to check if user has requested
+ // qualifiers on a per partition basis.
+ //
+ // NOTE: this means if you specify a qualifier for a specific partition it
+ // will take precedence the 'table level' qualifier if one exists.
+ std::string search_str_part;
+ if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) {
+ std::string partition_name = tbl_def_arg->base_partition();
+ DBUG_ASSERT(!partition_name.empty());
+ search_str_part = gen_qualifier_for_table(qualifier, partition_name);
+ }
+
+ DBUG_ASSERT(!search_str.empty());
+
+ // Basic O(N) search for a matching assignment. At most we expect maybe
+ // ten or so elements here.
+ if (!search_str_part.empty()) {
+ for (const auto &it : v) {
+ if (it.substr(0, search_str_part.length()) == search_str_part) {
+ // We found a prefix match. Try to parse it as an assignment.
+ std::vector<std::string> tokens =
+ myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
+
+ // We found a custom qualifier, it was in the form we expected it to be.
+ // Return that instead of whatever we initially wanted to return. In
+ // a case below the `foo` part will be returned to the caller.
+ //
+ // p3_cfname=foo
+ //
+ // If no value was specified then we'll return an empty string which
+ // later gets translated into using a default CF.
+ if (tokens.size() == 2) {
+ *per_part_match_found = true;
+ return tokens[1];
+ } else {
+ return empty_result;
+ }
+ }
+ }
+ }
+
+ // Do this loop again, this time searching for 'table level' qualifiers if we
+ // didn't find any partition level qualifiers above.
+ for (const auto &it : v) {
+ if (it.substr(0, search_str.length()) == search_str) {
+ std::vector<std::string> tokens =
+ myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
+ if (tokens.size() == 2) {
+ return tokens[1];
+ } else {
+ return empty_result;
+ }
+ }
+ }
+
+ // If we didn't find any partitioned/non-partitioned qualifiers, return an
+ // empty string.
+ return empty_result;
+}
+
+/**
+ Read a memcmp key part from a slice using the passed in reader.
+
+ Returns -1 if field was null, 1 if error, 0 otherwise.
+*/
+int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
+ Rdb_string_reader *reader,
+ const uint part_num) const {
+ /* It is impossible to unpack the column. Skip it. */
+ if (m_pack_info[part_num].m_maybe_null) {
+ const char *nullp;
+ if (!(nullp = reader->read(1))) return 1;
+ if (*nullp == 0) {
+ /* This is a NULL value */
+ return -1;
+ } else {
+ /* If NULL marker is not '0', it can be only '1' */
+ if (*nullp != 1) return 1;
+ }
+ }
+
+ Rdb_field_packing *fpi = &m_pack_info[part_num];
+ DBUG_ASSERT(table_arg->s != nullptr);
+
+ bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
+ (table_arg->s->primary_key == MAX_INDEXES);
+ Field *field = nullptr;
+ if (!is_hidden_pk_part) {
+ field = fpi->get_field_in_table(table_arg);
+ }
+ if ((fpi->m_skip_func)(fpi, field, reader)) {
+ return 1;
+ }
+ return 0;
+}
+
+/**
+ Get a mem-comparable form of Primary Key from mem-comparable form of this key
+
+ @param
+ pk_descr Primary Key descriptor
+ key Index tuple from this key in mem-comparable form
+ pk_buffer OUT Put here mem-comparable form of the Primary Key.
+
+ @note
+ It may or may not be possible to restore primary key columns to their
+ mem-comparable form. To handle all cases, this function copies mem-
+ comparable forms directly.
+
+ RocksDB SE supports "Extended keys". This means that PK columns are present
+ at the end of every key. If the key already includes PK columns, then
+ these columns are not present at the end of the key.
+
+ Because of the above, we copy each primary key column.
+
+ @todo
+ If we checked crc32 checksums in this function, we would catch some CRC
+ violations that we currently don't. On the other hand, there is a broader
+ set of queries for which we would check the checksum twice.
+*/
+
+uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
+ const Rdb_key_def &pk_descr,
+ const rocksdb::Slice *const key,
+ uchar *const pk_buffer) const {
+ DBUG_ASSERT(table != nullptr);
+ DBUG_ASSERT(key != nullptr);
+ DBUG_ASSERT(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
+ DBUG_ASSERT(pk_buffer);
+
+ uint size = 0;
+ uchar *buf = pk_buffer;
+ DBUG_ASSERT(m_pk_key_parts);
+
+ /* Put the PK number */
+ rdb_netbuf_store_index(buf, pk_descr.m_index_number);
+ buf += INDEX_NUMBER_SIZE;
+ size += INDEX_NUMBER_SIZE;
+
+ const char *start_offs[MAX_REF_PARTS];
+ const char *end_offs[MAX_REF_PARTS];
+ int pk_key_part;
+ uint i;
+ Rdb_string_reader reader(key);
+
+ // Skip the index number
+ if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
+
+ for (i = 0; i < m_key_parts; i++) {
+ if ((pk_key_part = m_pk_part_no[i]) != -1) {
+ start_offs[pk_key_part] = reader.get_current_ptr();
+ }
+
+ if (read_memcmp_key_part(table, &reader, i) > 0) {
+ return RDB_INVALID_KEY_LEN;
+ }
+
+ if (pk_key_part != -1) {
+ end_offs[pk_key_part] = reader.get_current_ptr();
+ }
+ }
+
+ for (i = 0; i < m_pk_key_parts; i++) {
+ const uint part_size = end_offs[i] - start_offs[i];
+ memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
+ buf += part_size;
+ size += part_size;
+ }
+
+ return size;
+}
+
+/**
+ Get a mem-comparable form of Secondary Key from mem-comparable form of this
+ key, without the extended primary key tail.
+
+ @param
+ key Index tuple from this key in mem-comparable form
+ sk_buffer OUT Put here mem-comparable form of the Secondary Key.
+ n_null_fields OUT Put number of null fields contained within sk entry
+*/
+uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
+ const rocksdb::Slice &key,
+ uchar *sk_buffer,
+ uint *n_null_fields) const {
+ DBUG_ASSERT(table != nullptr);
+ DBUG_ASSERT(sk_buffer != nullptr);
+ DBUG_ASSERT(n_null_fields != nullptr);
+ DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
+
+ uchar *buf = sk_buffer;
+
+ int res;
+ Rdb_string_reader reader(&key);
+ const char *start = reader.get_current_ptr();
+
+ // Skip the index number
+ if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
+
+ for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
+ if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
+ return RDB_INVALID_KEY_LEN;
+ } else if (res == -1) {
+ (*n_null_fields)++;
+ }
+ }
+
+ uint sk_memcmp_len = reader.get_current_ptr() - start;
+ memcpy(buf, start, sk_memcmp_len);
+ return sk_memcmp_len;
+}
+
+/**
+ Convert index tuple into storage (i.e. mem-comparable) format
+
+ @detail
+ Currently this is done by unpacking into record_buffer and then
+ packing index columns into storage format.
+
+ @param pack_buffer Temporary area for packing varchar columns. Its
+ size is at least max_storage_fmt_length() bytes.
+*/
+
+uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
+ uchar *const packed_tuple,
+ uchar *const record_buffer,
+ const uchar *const key_tuple,
+ const key_part_map &keypart_map) const {
+ DBUG_ASSERT(tbl != nullptr);
+ DBUG_ASSERT(pack_buffer != nullptr);
+ DBUG_ASSERT(packed_tuple != nullptr);
+ DBUG_ASSERT(key_tuple != nullptr);
+
+ /* We were given a record in KeyTupleFormat. First, save it to record */
+ const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
+ key_restore(record_buffer, key_tuple, &tbl->key_info[m_keyno], key_len);
+
+ uint n_used_parts = my_count_bits(keypart_map);
+ if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0; // Full key is used
+
+ /* Then, convert the record into a mem-comparable form */
+ return pack_record(tbl, pack_buffer, record_buffer, packed_tuple, nullptr,
+ false, 0, n_used_parts);
+}
+
+/**
+ @brief
+ Check if "unpack info" data includes checksum.
+
+ @detail
+ This is used only by CHECK TABLE to count the number of rows that have
+ checksums.
+*/
+
+bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
+ size_t size = unpack_info.size();
+ if (size == 0) {
+ return false;
+ }
+ const uchar *ptr = (const uchar *)unpack_info.data();
+
+ // Skip unpack info if present.
+ if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
+ const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
+ SHIP_ASSERT(size >= skip_len);
+
+ size -= skip_len;
+ ptr += skip_len;
+ }
+
+ return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
+}
+
+/*
+ @return Number of bytes that were changed
+*/
+int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
+ DBUG_ASSERT(packed_tuple != nullptr);
+
+ int changed = 0;
+ uchar *p = packed_tuple + len - 1;
+ for (; p > packed_tuple; p--) {
+ changed++;
+ if (*p != uchar(0xFF)) {
+ *p = *p + 1;
+ break;
+ }
+ *p = '\0';
+ }
+ return changed;
+}
+
+/*
+ @return Number of bytes that were changed
+*/
+int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
+ DBUG_ASSERT(packed_tuple != nullptr);
+
+ int changed = 0;
+ uchar *p = packed_tuple + len - 1;
+ for (; p > packed_tuple; p--) {
+ changed++;
+ if (*p != uchar(0x00)) {
+ *p = *p - 1;
+ break;
+ }
+ *p = 0xFF;
+ }
+ return changed;
+}
+
+static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
+ {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
+ {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
+
+/*
+ @return The length in bytes of the header specified by the given tag
+*/
+size_t Rdb_key_def::get_unpack_header_size(char tag) {
+ DBUG_ASSERT(is_unpack_data_tag(tag));
+ return UNPACK_HEADER_SIZES.at(tag);
+}
+
+/*
+ Get a bitmap indicating which varchar columns must be covered for this
+ lookup to be covered. If the bitmap is a subset of the covered bitmap, then
+ the lookup is covered. If it can already be determined that the lookup is
+ not covered, map->bitmap will be set to null.
+ */
+void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
+ DBUG_ASSERT(map->bitmap == nullptr);
+ my_bitmap_init(map, nullptr, MAX_REF_PARTS);
+ uint curr_bitmap_pos = 0;
+
+ // Indicates which columns in the read set might be covered.
+ MY_BITMAP maybe_covered_bitmap;
+ my_bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits);
+
+ for (uint i = 0; i < m_key_parts; i++) {
+ if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
+ continue;
+ }
+
+ Field *const field = m_pack_info[i].get_field_in_table(table);
+
+ // Columns which are always covered are not stored in the covered bitmap so
+ // we can ignore them here too.
+ if (m_pack_info[i].m_covered &&
+ bitmap_is_set(table->read_set, field->field_index)) {
+ bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
+ continue;
+ }
+
+ switch (field->real_type()) {
+ // This type may be covered depending on the record. If it was requested,
+ // we require the covered bitmap to have this bit set.
+ case MYSQL_TYPE_VARCHAR:
+ if (curr_bitmap_pos < MAX_REF_PARTS) {
+ if (bitmap_is_set(table->read_set, field->field_index)) {
+ bitmap_set_bit(map, curr_bitmap_pos);
+ bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
+ }
+ curr_bitmap_pos++;
+ } else {
+ my_bitmap_free(&maybe_covered_bitmap);
+ my_bitmap_free(map);
+ return;
+ }
+ break;
+ // This column is a type which is never covered. If it was requested, we
+ // know this lookup will never be covered.
+ default:
+ if (bitmap_is_set(table->read_set, field->field_index)) {
+ my_bitmap_free(&maybe_covered_bitmap);
+ my_bitmap_free(map);
+ return;
+ }
+ break;
+ }
+ }
+
+ // If there are columns which are not covered in the read set, the lookup
+ // can't be covered.
+ if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
+ my_bitmap_free(map);
+ }
+ my_bitmap_free(&maybe_covered_bitmap);
+}
+
+/*
+ Return true if for this secondary index
+ - All of the requested columns are in the index
+ - All values for columns that are prefix-only indexes are shorter or equal
+ in length to the prefix
+ */
+bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
+ const MY_BITMAP *const lookup_bitmap) const {
+ DBUG_ASSERT(lookup_bitmap != nullptr);
+ if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
+ return false;
+ }
+
+ Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
+
+ // Check if this unpack_info has a covered_bitmap
+ const char *unpack_header = unp_reader.get_current_ptr();
+ const bool has_covered_unpack_info =
+ unp_reader.remaining_bytes() &&
+ unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
+ if (!has_covered_unpack_info ||
+ !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
+ return false;
+ }
+
+ MY_BITMAP covered_bitmap;
+ my_bitmap_map covered_bits;
+ my_bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS);
+ covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
+ sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
+ RDB_UNPACK_COVERED_DATA_LEN_SIZE);
+
+ return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
+}
+
+/* Indicates that all key parts can be unpacked to cover a secondary lookup */
+bool Rdb_key_def::can_cover_lookup() const {
+ for (uint i = 0; i < m_key_parts; i++) {
+ if (!m_pack_info[i].m_covered) return false;
+ }
+ return true;
+}
+
+uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
+ uchar *tuple, uchar *const packed_tuple,
+ uchar *const pack_buffer,
+ Rdb_string_writer *const unpack_info,
+ uint *const n_null_fields) const {
+ if (field->real_maybe_null()) {
+ DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
+ if (field->is_real_null()) {
+ /* NULL value. store '\0' so that it sorts before non-NULL values */
+ *tuple++ = 0;
+ /* That's it, don't store anything else */
+ if (n_null_fields) (*n_null_fields)++;
+ return tuple;
+ } else {
+ /* Not a NULL value. Store '1' */
+ *tuple++ = 1;
+ }
+ }
+
+ const bool create_unpack_info =
+ (unpack_info && // we were requested to generate unpack_info
+ pack_info->uses_unpack_info()); // and this keypart uses it
+ Rdb_pack_field_context pack_ctx(unpack_info);
+
+ // Set the offset for methods which do not take an offset as an argument
+ DBUG_ASSERT(
+ is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
+
+ (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
+
+ /* Make "unpack info" to be stored in the value */
+ if (create_unpack_info) {
+ (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
+ &pack_ctx);
+ }
+
+ return tuple;
+}
+
+/**
+ Get index columns from the record and pack them into mem-comparable form.
+
+ @param
+ tbl Table we're working on
+ record IN Record buffer with fields in table->record format
+ pack_buffer IN Temporary area for packing varchars. The size is
+ at least max_storage_fmt_length() bytes.
+ packed_tuple OUT Key in the mem-comparable form
+ unpack_info OUT Unpack data
+ unpack_info_len OUT Unpack data length
+ n_key_parts Number of keyparts to process. 0 means all of them.
+ n_null_fields OUT Number of key fields with NULL value.
+ ttl_bytes IN Previous ttl bytes from old record for update case or
+ current ttl bytes from just packed primary key/value
+ @detail
+ Some callers do not need the unpack information, they can pass
+ unpack_info=nullptr, unpack_info_len=nullptr.
+
+ @return
+ Length of the packed tuple
+*/
+
+uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
+ const uchar *const record,
+ uchar *const packed_tuple,
+ Rdb_string_writer *const unpack_info,
+ const bool should_store_row_debug_checksums,
+ const longlong hidden_pk_id, uint n_key_parts,
+ uint *const n_null_fields,
+ const char *const ttl_bytes) const {
+ DBUG_ASSERT(tbl != nullptr);
+ DBUG_ASSERT(pack_buffer != nullptr);
+ DBUG_ASSERT(record != nullptr);
+ DBUG_ASSERT(packed_tuple != nullptr);
+ // Checksums for PKs are made when record is packed.
+ // We should never attempt to make checksum just from PK values
+ DBUG_ASSERT_IMP(should_store_row_debug_checksums,
+ (m_index_type == INDEX_TYPE_SECONDARY));
+
+ uchar *tuple = packed_tuple;
+ size_t unpack_start_pos = size_t(-1);
+ size_t unpack_len_pos = size_t(-1);
+ size_t covered_bitmap_pos = size_t(-1);
+ const bool hidden_pk_exists = table_has_hidden_pk(tbl);
+
+ rdb_netbuf_store_index(tuple, m_index_number);
+ tuple += INDEX_NUMBER_SIZE;
+
+ // If n_key_parts is 0, it means all columns.
+ // The following includes the 'extended key' tail.
+ // The 'extended key' includes primary key. This is done to 'uniqify'
+ // non-unique indexes
+ const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
+
+ // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
+ // hidden key part. So we skip it (its always 1 part).
+ if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
+ n_key_parts = m_key_parts - 1;
+ } else if (use_all_columns) {
+ n_key_parts = m_key_parts;
+ }
+
+ if (n_null_fields) *n_null_fields = 0;
+
+ // Check if we need a covered bitmap. If it is certain that all key parts are
+ // covering, we don't need one.
+ bool store_covered_bitmap = false;
+ if (unpack_info && use_covered_bitmap_format()) {
+ for (uint i = 0; i < n_key_parts; i++) {
+ if (!m_pack_info[i].m_covered) {
+ store_covered_bitmap = true;
+ break;
+ }
+ }
+ }
+
+ const char tag =
+ store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
+
+ if (unpack_info) {
+ unpack_info->clear();
+
+ if (m_index_type == INDEX_TYPE_SECONDARY &&
+ m_total_index_flags_length > 0) {
+ // Reserve space for index flag fields
+ unpack_info->allocate(m_total_index_flags_length);
+
+ // Insert TTL timestamp
+ if (has_ttl() && ttl_bytes) {
+ write_index_flag_field(unpack_info,
+ reinterpret_cast<const uchar *>(ttl_bytes),
+ Rdb_key_def::TTL_FLAG);
+ }
+ }
+
+ unpack_start_pos = unpack_info->get_current_pos();
+ unpack_info->write_uint8(tag);
+ unpack_len_pos = unpack_info->get_current_pos();
+ // we don't know the total length yet, so write a zero
+ unpack_info->write_uint16(0);
+
+ if (store_covered_bitmap) {
+ // Reserve two bytes for the covered bitmap. This will store, for key
+ // parts which are not always covering, whether or not it is covering
+ // for this record.
+ covered_bitmap_pos = unpack_info->get_current_pos();
+ unpack_info->write_uint16(0);
+ }
+ }
+
+ MY_BITMAP covered_bitmap;
+ my_bitmap_map covered_bits;
+ uint curr_bitmap_pos = 0;
+ my_bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS);
+
+ for (uint i = 0; i < n_key_parts; i++) {
+ // Fill hidden pk id into the last key part for secondary keys for tables
+ // with no pk
+ if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
+ m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
+ break;
+ }
+
+ Field *const field = m_pack_info[i].get_field_in_table(tbl);
+ DBUG_ASSERT(field != nullptr);
+
+ uint field_offset = field->ptr - tbl->record[0];
+ uint null_offset = field->null_offset(tbl->record[0]);
+ bool maybe_null = field->real_maybe_null();
+
+ field->move_field(
+ const_cast<uchar *>(record) + field_offset,
+ maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
+ field->null_bit);
+ // WARNING! Don't return without restoring field->ptr and field->null_ptr
+
+ tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
+ unpack_info, n_null_fields);
+
+ // If this key part is a prefix of a VARCHAR field, check if it's covered.
+ if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
+ !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
+ size_t data_length = field->data_length();
+ uint16 key_length;
+ if (m_pk_part_no[i] == (uint)-1) {
+ key_length = tbl->key_info[get_keyno()].key_part[i].length;
+ } else {
+ key_length =
+ tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
+ }
+
+ if (m_pack_info[i].m_unpack_func != nullptr &&
+ data_length <= key_length) {
+ bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
+ }
+ curr_bitmap_pos++;
+ }
+
+ // Restore field->ptr and field->null_ptr
+ field->move_field(tbl->record[0] + field_offset,
+ maybe_null ? tbl->record[0] + null_offset : nullptr,
+ field->null_bit);
+ }
+
+ if (unpack_info) {
+ const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
+ DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
+
+ // Don't store the unpack_info if it has only the header (that is, there's
+ // no meaningful content).
+ // Primary Keys are special: for them, store the unpack_info even if it's
+ // empty (provided m_maybe_unpack_info==true, see
+ // ha_rocksdb::convert_record_to_storage_format)
+ if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
+ if (len == get_unpack_header_size(tag) && !covered_bits) {
+ unpack_info->truncate(unpack_start_pos);
+ } else if (store_covered_bitmap) {
+ unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
+ }
+ } else {
+ unpack_info->write_uint16_at(unpack_len_pos, len);
+ }
+
+ //
+ // Secondary keys have key and value checksums in the value part
+ // Primary key is a special case (the value part has non-indexed columns),
+ // so the checksums are computed and stored by
+ // ha_rocksdb::convert_record_to_storage_format
+ //
+ if (should_store_row_debug_checksums) {
+ const uint32_t key_crc32 =
+ my_checksum(0, packed_tuple, tuple - packed_tuple);
+ const uint32_t val_crc32 =
+ my_checksum(0, unpack_info->ptr(), unpack_info->get_current_pos());
+
+ unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
+ unpack_info->write_uint32(key_crc32);
+ unpack_info->write_uint32(val_crc32);
+ }
+ }
+
+ DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
+
+ return tuple - packed_tuple;
+}
+
+/**
+ Pack the hidden primary key into mem-comparable form.
+
+ @param
+ tbl Table we're working on
+ hidden_pk_id IN New value to be packed into key
+ packed_tuple OUT Key in the mem-comparable form
+
+ @return
+ Length of the packed tuple
+*/
+
+uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
+ uchar *const packed_tuple) const {
+ DBUG_ASSERT(packed_tuple != nullptr);
+
+ uchar *tuple = packed_tuple;
+ rdb_netbuf_store_index(tuple, m_index_number);
+ tuple += INDEX_NUMBER_SIZE;
+ DBUG_ASSERT(m_key_parts == 1);
+ DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
+ m_pack_info[0].m_max_image_len));
+
+ m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
+
+ DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
+ return tuple - packed_tuple;
+}
+
+/*
+ Function of type rdb_index_field_pack_t
+*/
+
+void Rdb_key_def::pack_with_make_sort_key(
+ Rdb_field_packing *const fpi, Field *const field,
+ uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
+ Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
+ DBUG_ASSERT(fpi != nullptr);
+ DBUG_ASSERT(field != nullptr);
+ DBUG_ASSERT(dst != nullptr);
+ DBUG_ASSERT(*dst != nullptr);
+
+ const int max_len = fpi->m_max_image_len;
+ MY_BITMAP*old_map;
+
+ old_map= dbug_tmp_use_all_columns(field->table,
+ &field->table->read_set);
+ field->sort_string(*dst, max_len);
+ dbug_tmp_restore_column_map(&field->table->read_set, old_map);
+ *dst += max_len;
+}
+
+/*
+ Compares two keys without unpacking
+
+ @detail
+ @return
+ 0 - Ok. column_index is the index of the first column which is different.
+ -1 if two kes are equal
+ 1 - Data format error.
+*/
+int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
+ const rocksdb::Slice *key2,
+ std::size_t *const column_index) const {
+ DBUG_ASSERT(key1 != nullptr);
+ DBUG_ASSERT(key2 != nullptr);
+ DBUG_ASSERT(column_index != nullptr);
+
+ // the caller should check the return value and
+ // not rely on column_index being valid
+ *column_index = 0xbadf00d;
+
+ Rdb_string_reader reader1(key1);
+ Rdb_string_reader reader2(key2);
+
+ // Skip the index number
+ if ((!reader1.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
+
+ if ((!reader2.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
+
+ for (uint i = 0; i < m_key_parts; i++) {
+ const Rdb_field_packing *const fpi = &m_pack_info[i];
+ if (fpi->m_maybe_null) {
+ const auto nullp1 = reader1.read(1);
+ const auto nullp2 = reader2.read(1);
+
+ if (nullp1 == nullptr || nullp2 == nullptr) {
+ return HA_EXIT_FAILURE;
+ }
+
+ if (*nullp1 != *nullp2) {
+ *column_index = i;
+ return HA_EXIT_SUCCESS;
+ }
+
+ if (*nullp1 == 0) {
+ /* This is a NULL value */
+ continue;
+ }
+ }
+
+ const auto before_skip1 = reader1.get_current_ptr();
+ const auto before_skip2 = reader2.get_current_ptr();
+ DBUG_ASSERT(fpi->m_skip_func);
+ if ((fpi->m_skip_func)(fpi, nullptr, &reader1)) {
+ return HA_EXIT_FAILURE;
+ }
+ if ((fpi->m_skip_func)(fpi, nullptr, &reader2)) {
+ return HA_EXIT_FAILURE;
+ }
+ const auto size1 = reader1.get_current_ptr() - before_skip1;
+ const auto size2 = reader2.get_current_ptr() - before_skip2;
+ if (size1 != size2) {
+ *column_index = i;
+ return HA_EXIT_SUCCESS;
+ }
+
+ if (memcmp(before_skip1, before_skip2, size1) != 0) {
+ *column_index = i;
+ return HA_EXIT_SUCCESS;
+ }
+ }
+
+ *column_index = m_key_parts;
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ @brief
+ Given a zero-padded key, determine its real key length
+
+ @detail
+ Fixed-size skip functions just read.
+*/
+
+size_t Rdb_key_def::key_length(const TABLE *const table,
+ const rocksdb::Slice &key) const {
+ DBUG_ASSERT(table != nullptr);
+
+ Rdb_string_reader reader(&key);
+
+ if ((!reader.read(INDEX_NUMBER_SIZE))) {
+ return size_t(-1);
+ }
+ for (uint i = 0; i < m_key_parts; i++) {
+ const Rdb_field_packing *fpi = &m_pack_info[i];
+ const Field *field = nullptr;
+ if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY) {
+ field = fpi->get_field_in_table(table);
+ }
+ if ((fpi->m_skip_func)(fpi, field, &reader)) {
+ return size_t(-1);
+ }
+ }
+ return key.size() - reader.remaining_bytes();
+}
+
+/*
+ Take mem-comparable form and unpack_info and unpack it to Table->record
+
+ @detail
+ not all indexes support this
+
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code
+*/
+
+int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
+ const rocksdb::Slice *const packed_key,
+ const rocksdb::Slice *const unpack_info,
+ const bool verify_row_debug_checksums) const {
+ Rdb_string_reader reader(packed_key);
+ Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
+
+ // There is no checksuming data after unpack_info for primary keys, because
+ // the layout there is different. The checksum is verified in
+ // ha_rocksdb::convert_record_from_storage_format instead.
+ DBUG_ASSERT_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
+ !verify_row_debug_checksums);
+
+ // Skip the index number
+ if ((!reader.read(INDEX_NUMBER_SIZE))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ // For secondary keys, we expect the value field to contain index flags,
+ // unpack data, and checksum data in that order. One or all can be missing,
+ // but they cannot be reordered.
+ if (unp_reader.remaining_bytes()) {
+ if (m_index_type == INDEX_TYPE_SECONDARY &&
+ m_total_index_flags_length > 0 &&
+ !unp_reader.read(m_total_index_flags_length)) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ }
+
+ const char *unpack_header = unp_reader.get_current_ptr();
+ bool has_unpack_info =
+ unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
+ if (has_unpack_info) {
+ if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ }
+
+ // Read the covered bitmap
+ MY_BITMAP covered_bitmap;
+ my_bitmap_map covered_bits;
+ bool has_covered_bitmap =
+ has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
+ if (has_covered_bitmap) {
+ my_bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS);
+ covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
+ sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
+ RDB_UNPACK_COVERED_DATA_LEN_SIZE);
+ }
+
+ int err = HA_EXIT_SUCCESS;
+
+
+ Rdb_key_field_iterator iter(
+ this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
+ has_covered_bitmap ? &covered_bitmap : nullptr, buf);
+ while (iter.has_next()) {
+ err = iter.next();
+ if (err) {
+ return err;
+ }
+ }
+
+ /*
+ Check checksum values if present
+ */
+ const char *ptr;
+ if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
+ if (verify_row_debug_checksums) {
+ uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
+ (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
+ const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
+ (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
+
+ const uint32_t computed_key_chksum =
+ my_checksum(0, packed_key->data(), packed_key->size());
+ const uint32_t computed_val_chksum =
+ my_checksum(0, unpack_info->data(),
+ unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
+
+ DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
+ stored_key_chksum++;);
+
+ if (stored_key_chksum != computed_key_chksum) {
+ report_checksum_mismatch(true, packed_key->data(), packed_key->size());
+ return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
+ }
+
+ if (stored_val_chksum != computed_val_chksum) {
+ report_checksum_mismatch(false, unpack_info->data(),
+ unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
+ return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
+ }
+ } else {
+ /* The checksums are present but we are not checking checksums */
+ }
+ }
+
+ if (reader.remaining_bytes()) return HA_ERR_ROCKSDB_CORRUPT_DATA;
+
+ return HA_EXIT_SUCCESS;
+}
+
+bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
+ return table->s->primary_key == MAX_INDEXES;
+}
+
+void Rdb_key_def::report_checksum_mismatch(const bool is_key,
+ const char *const data,
+ const size_t data_size) const {
+ // NO_LINT_DEBUG
+ sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
+ is_key ? "key" : "value", get_index_number());
+
+ const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
+ // NO_LINT_DEBUG
+ sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
+ (uint64_t)data_size, buf.c_str());
+
+ my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
+}
+
+bool Rdb_key_def::index_format_min_check(const int pk_min,
+ const int sk_min) const {
+ switch (m_index_type) {
+ case INDEX_TYPE_PRIMARY:
+ case INDEX_TYPE_HIDDEN_PRIMARY:
+ return (m_kv_format_version >= pk_min);
+ case INDEX_TYPE_SECONDARY:
+ return (m_kv_format_version >= sk_min);
+ default:
+ DBUG_ASSERT(0);
+ return false;
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////////////////////
+// Rdb_field_packing
+///////////////////////////////////////////////////////////////////////////////////////////
+
+/*
+ Function of type rdb_index_field_skip_t
+*/
+
+int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
+ const Field *const field
+ MY_ATTRIBUTE((__unused__)),
+ Rdb_string_reader *const reader) {
+ if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
+ split in the middle of an UTF-8 character. See the implementation of
+ unpack_binary_or_utf8_varchar.
+*/
+#define RDB_ESCAPE_LENGTH 9
+#define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
+static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
+ "RDB_ESCAPE_LENGTH-1 must be even.");
+
+#define RDB_ENCODED_SIZE(len) \
+ ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
+ RDB_ESCAPE_LENGTH
+
+#define RDB_LEGACY_ENCODED_SIZE(len) \
+ ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
+ RDB_LEGACY_ESCAPE_LENGTH
+
+/*
+ Function of type rdb_index_field_skip_t
+*/
+
+int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
+ const Field *const field,
+ Rdb_string_reader *const reader) {
+ const uchar *ptr;
+ bool finished = false;
+
+ size_t dst_len; /* How much data can be there */
+ if (field) {
+ const Field_varstring *const field_var =
+ static_cast<const Field_varstring *>(field);
+ dst_len = field_var->pack_length() - field_var->length_bytes;
+ } else {
+ dst_len = UINT_MAX;
+ }
+
+ bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
+
+ /* Decode the length-emitted encoding here */
+ while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
+ uint used_bytes;
+
+ /* See pack_with_varchar_encoding. */
+ if (use_legacy_format) {
+ used_bytes = calc_unpack_legacy_variable_format(
+ ptr[RDB_ESCAPE_LENGTH - 1], &finished);
+ } else {
+ used_bytes =
+ calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
+ }
+
+ if (used_bytes == (uint)-1 || dst_len < used_bytes) {
+ return HA_EXIT_FAILURE; // Corruption in the data
+ }
+
+ if (finished) {
+ break;
+ }
+
+ dst_len -= used_bytes;
+ }
+
+ if (!finished) {
+ return HA_EXIT_FAILURE;
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
+const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
+const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
+
+/*
+ Skip a keypart that uses Variable-Length Space-Padded encoding
+*/
+
+int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
+ const Field *const field,
+ Rdb_string_reader *const reader) {
+ const uchar *ptr;
+ bool finished = false;
+
+ size_t dst_len = UINT_MAX; /* How much data can be there */
+
+ if (field) {
+ const Field_varstring *const field_var =
+ static_cast<const Field_varstring *>(field);
+ dst_len = field_var->pack_length() - field_var->length_bytes;
+ }
+
+ /* Decode the length-emitted encoding here */
+ while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
+ // See pack_with_varchar_space_pad
+ const uchar c = ptr[fpi->m_segment_size - 1];
+ if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
+ // This is the last segment
+ finished = true;
+ break;
+ } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
+ c == VARCHAR_CMP_GREATER_THAN_SPACES) {
+ // This is not the last segment
+ if ((fpi->m_segment_size - 1) > dst_len) {
+ // The segment is full of data but the table field can't hold that
+ // much! This must be data corruption.
+ return HA_EXIT_FAILURE;
+ }
+ dst_len -= (fpi->m_segment_size - 1);
+ } else {
+ // Encountered a value that's none of the VARCHAR_CMP* constants
+ // It's data corruption.
+ return HA_EXIT_FAILURE;
+ }
+ }
+ return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
+}
+
+/*
+ Function of type rdb_index_field_unpack_t
+*/
+
+int Rdb_key_def::unpack_integer(
+ Rdb_field_packing *const fpi, Field *const field, uchar *const to,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
+ const int length = fpi->m_max_image_len;
+
+ const uchar *from;
+ if (!(from = (const uchar *)reader->read(length))) {
+ return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
+ }
+
+#ifdef WORDS_BIGENDIAN
+ {
+ if (static_cast<Field_num *>(field)->unsigned_flag) {
+ to[0] = from[0];
+ } else {
+ to[0] = static_cast<char>(from[0] ^ 128); // Reverse the sign bit.
+ }
+ memcpy(to + 1, from + 1, length - 1);
+ }
+#else
+ {
+ const int sign_byte = from[0];
+ if (static_cast<Field_num *>(field)->unsigned_flag) {
+ to[length - 1] = sign_byte;
+ } else {
+ to[length - 1] =
+ static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
+ }
+ for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
+ }
+#endif
+ return UNPACK_SUCCESS;
+}
+
+#if !defined(WORDS_BIGENDIAN)
+static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
+#if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
+ // A few systems store the most-significant _word_ first on little-endian
+ dst[0] = src[3];
+ dst[1] = src[2];
+ dst[2] = src[1];
+ dst[3] = src[0];
+ dst[4] = src[7];
+ dst[5] = src[6];
+ dst[6] = src[5];
+ dst[7] = src[4];
+#else
+ dst[0] = src[7];
+ dst[1] = src[6];
+ dst[2] = src[5];
+ dst[3] = src[4];
+ dst[4] = src[3];
+ dst[5] = src[2];
+ dst[6] = src[1];
+ dst[7] = src[0];
+#endif
+}
+
+static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
+ dst[0] = src[3];
+ dst[1] = src[2];
+ dst[2] = src[1];
+ dst[3] = src[0];
+}
+#else
+#define rdb_swap_double_bytes nullptr
+#define rdb_swap_float_bytes nullptr
+#endif
+
+int Rdb_key_def::unpack_floating_point(
+ uchar *const dst, Rdb_string_reader *const reader, const size_t size,
+ const int exp_digit, const uchar *const zero_pattern,
+ const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
+ const uchar *const from = (const uchar *)reader->read(size);
+ if (from == nullptr) {
+ /* Mem-comparable image doesn't have enough bytes */
+ return UNPACK_FAILURE;
+ }
+
+ /* Check to see if the value is zero */
+ if (memcmp(from, zero_pattern, size) == 0) {
+ memcpy(dst, zero_val, size);
+ return UNPACK_SUCCESS;
+ }
+
+#if defined(WORDS_BIGENDIAN)
+ // On big-endian, output can go directly into result
+ uchar *const tmp = dst;
+#else
+ // Otherwise use a temporary buffer to make byte-swapping easier later
+ uchar tmp[8];
+#endif
+
+ memcpy(tmp, from, size);
+
+ if (tmp[0] & 0x80) {
+ // If the high bit is set the original value was positive so
+ // remove the high bit and subtract one from the exponent.
+ ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
+ exp_part &= 0x7FFF; // clear high bit;
+ exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent
+ tmp[0] = (uchar)(exp_part >> 8);
+ tmp[1] = (uchar)exp_part;
+ } else {
+ // Otherwise the original value was negative and all bytes have been
+ // negated.
+ for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
+ }
+
+#if !defined(WORDS_BIGENDIAN)
+ // On little-endian, swap the bytes around
+ swap_func(dst, tmp);
+#else
+ DBUG_ASSERT(swap_func == nullptr);
+#endif
+
+ return UNPACK_SUCCESS;
+}
+
+#if !defined(DBL_EXP_DIG)
+#define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
+#endif
+
+/*
+ Function of type rdb_index_field_unpack_t
+
+ Unpack a double by doing the reverse action of change_double_for_sort
+ (sql/filesort.cc). Note that this only works on IEEE values.
+ Note also that this code assumes that NaN and +/-Infinity are never
+ allowed in the database.
+*/
+int Rdb_key_def::unpack_double(
+ Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
+ Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
+ static double zero_val = 0.0;
+ static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
+
+ return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
+ zero_pattern, (const uchar *)&zero_val,
+ rdb_swap_double_bytes);
+}
+
+#if !defined(FLT_EXP_DIG)
+#define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
+#endif
+
+/*
+ Function of type rdb_index_field_unpack_t
+
+ Unpack a float by doing the reverse action of Field_float::make_sort_key
+ (sql/field.cc). Note that this only works on IEEE values.
+ Note also that this code assumes that NaN and +/-Infinity are never
+ allowed in the database.
+*/
+int Rdb_key_def::unpack_float(
+ Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
+ uchar *const field_ptr, Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
+ static float zero_val = 0.0;
+ static const uchar zero_pattern[4] = {128, 0, 0, 0};
+
+ return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
+ zero_pattern, (const uchar *)&zero_val,
+ rdb_swap_float_bytes);
+}
+
+/*
+ Function of type rdb_index_field_unpack_t used to
+ Unpack by doing the reverse action to Field_newdate::make_sort_key.
+*/
+
+int Rdb_key_def::unpack_newdate(
+ Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
+ uchar *const field_ptr, Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
+ const char *from;
+ DBUG_ASSERT(fpi->m_max_image_len == 3);
+
+ if (!(from = reader->read(3))) {
+ /* Mem-comparable image doesn't have enough bytes */
+ return UNPACK_FAILURE;
+ }
+
+ field_ptr[0] = from[2];
+ field_ptr[1] = from[1];
+ field_ptr[2] = from[0];
+ return UNPACK_SUCCESS;
+}
+
+/*
+ Function of type rdb_index_field_unpack_t, used to
+ Unpack the string by copying it over.
+ This is for BINARY(n) where the value occupies the whole length.
+*/
+
+int Rdb_key_def::unpack_binary_str(
+ Rdb_field_packing *const fpi, Field *const field, uchar *const to,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
+ const char *from;
+ if (!(from = reader->read(fpi->m_max_image_len))) {
+ /* Mem-comparable image doesn't have enough bytes */
+ return UNPACK_FAILURE;
+ }
+
+ memcpy(to, from, fpi->m_max_image_len);
+ return UNPACK_SUCCESS;
+}
+
+/*
+ Function of type rdb_index_field_unpack_t.
+ For UTF-8, we need to convert 2-byte wide-character entities back into
+ UTF8 sequences.
+*/
+
+int Rdb_key_def::unpack_utf8_str(
+ Rdb_field_packing *const fpi, Field *const field, uchar *dst,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
+ my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
+ const uchar *src;
+ if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
+ /* Mem-comparable image doesn't have enough bytes */
+ return UNPACK_FAILURE;
+ }
+
+ const uchar *const src_end = src + fpi->m_max_image_len;
+ uchar *const dst_end = dst + field->pack_length();
+
+ while (src < src_end) {
+ my_wc_t wc = (src[0] << 8) | src[1];
+ src += 2;
+ int res = cset->wc_mb(wc, dst, dst_end);
+ DBUG_ASSERT(res > 0 && res <= 3);
+ if (res < 0) return UNPACK_FAILURE;
+ dst += res;
+ }
+
+ cset->fill(reinterpret_cast<char *>(dst), dst_end - dst,
+ cset->pad_char);
+ return UNPACK_SUCCESS;
+}
+
+/*
+ This is the original algorithm to encode a variable binary field. It
+ sets a flag byte every Nth byte. The flag value is (255 - #pad) where
+ #pad is the number of padding bytes that were needed (0 if all N-1
+ bytes were used).
+
+ If N=8 and the field is:
+ * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
+ * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
+ And the 4 byte string compares as greater than the 3 byte string
+
+ Unfortunately the algorithm has a flaw. If the input is exactly a
+ multiple of N-1, an extra N bytes are written. Since we usually use
+ N=9, an 8 byte input will generate 18 bytes of output instead of the
+ 9 bytes of output that is optimal.
+
+ See pack_variable_format for the newer algorithm.
+*/
+void Rdb_key_def::pack_legacy_variable_format(
+ const uchar *src, // The data to encode
+ size_t src_len, // The length of the data to encode
+ uchar **dst) // The location to encode the data
+{
+ size_t copy_len;
+ size_t padding_bytes;
+ uchar *ptr = *dst;
+
+ do {
+ copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
+ padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
+ memcpy(ptr, src, copy_len);
+ ptr += copy_len;
+ src += copy_len;
+ // pad with zeros if necessary
+ if (padding_bytes > 0) {
+ memset(ptr, 0, padding_bytes);
+ ptr += padding_bytes;
+ }
+
+ *(ptr++) = 255 - padding_bytes;
+
+ src_len -= copy_len;
+ } while (padding_bytes == 0);
+
+ *dst = ptr;
+}
+
+/*
+ This is the new algorithm. Similarly to the legacy format the input
+ is split up into N-1 bytes and a flag byte is used as the Nth byte
+ in the output.
+
+ - If the previous segment needed any padding the flag is set to the
+ number of bytes used (0..N-2). 0 is possible in the first segment
+ if the input is 0 bytes long.
+ - If no padding was used and there is no more data left in the input
+ the flag is set to N-1
+ - If no padding was used and there is still data left in the input the
+ flag is set to N.
+
+ For N=9, the following input values encode to the specified
+ outout (where 'X' indicates a byte of the original input):
+ - 0 bytes is encoded as 0 0 0 0 0 0 0 0 0
+ - 1 byte is encoded as X 0 0 0 0 0 0 0 1
+ - 2 bytes is encoded as X X 0 0 0 0 0 0 2
+ - 7 bytes is encoded as X X X X X X X 0 7
+ - 8 bytes is encoded as X X X X X X X X 8
+ - 9 bytes is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
+ - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
+*/
+void Rdb_key_def::pack_variable_format(
+ const uchar *src, // The data to encode
+ size_t src_len, // The length of the data to encode
+ uchar **dst) // The location to encode the data
+{
+ uchar *ptr = *dst;
+
+ for (;;) {
+ // Figure out how many bytes to copy, copy them and adjust pointers
+ const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
+ memcpy(ptr, src, copy_len);
+ ptr += copy_len;
+ src += copy_len;
+ src_len -= copy_len;
+
+ // Are we at the end of the input?
+ if (src_len == 0) {
+ // pad with zeros if necessary;
+ const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
+ if (padding_bytes > 0) {
+ memset(ptr, 0, padding_bytes);
+ ptr += padding_bytes;
+ }
+
+ // Put the flag byte (0 - N-1) in the output
+ *(ptr++) = (uchar)copy_len;
+ break;
+ }
+
+ // We have more data - put the flag byte (N) in and continue
+ *(ptr++) = RDB_ESCAPE_LENGTH;
+ }
+
+ *dst = ptr;
+}
+
+/*
+ Function of type rdb_index_field_pack_t
+*/
+
+void Rdb_key_def::pack_with_varchar_encoding(
+ Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
+ Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
+ const CHARSET_INFO *const charset = field->charset();
+ Field_varstring *const field_var = (Field_varstring *)field;
+
+ const size_t value_length = (field_var->length_bytes == 1)
+ ? (uint)*field->ptr
+ : uint2korr(field->ptr);
+ size_t xfrm_len = charset->strnxfrm(
+ buf, fpi->m_max_image_len, field_var->char_length(),
+ field_var->ptr + field_var->length_bytes, value_length, 0);
+
+ /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
+ if (fpi->m_use_legacy_varbinary_format) {
+ pack_legacy_variable_format(buf, xfrm_len, dst);
+ } else {
+ pack_variable_format(buf, xfrm_len, dst);
+ }
+}
+
+/*
+ Compare the string in [buf..buf_end) with a string that is an infinite
+ sequence of strings in space_xfrm
+*/
+
+static int rdb_compare_string_with_spaces(
+ const uchar *buf, const uchar *const buf_end,
+ const std::vector<uchar> *const space_xfrm) {
+ int cmp = 0;
+ while (buf < buf_end) {
+ size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
+ if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
+ buf += bytes;
+ }
+ return cmp;
+}
+
+static const int RDB_TRIMMED_CHARS_OFFSET = 8;
+/*
+ Pack the data with Variable-Length Space-Padded Encoding.
+
+ The encoding is there to meet two goals:
+
+ Goal#1. Comparison. The SQL standard says
+
+ " If the collation for the comparison has the PAD SPACE characteristic,
+ for the purposes of the comparison, the shorter value is effectively
+ extended to the length of the longer by concatenation of <space>s on the
+ right.
+
+ At the moment, all MySQL collations except one have the PAD SPACE
+ characteristic. The exception is the "binary" collation that is used by
+ [VAR]BINARY columns. (Note that binary collations for specific charsets,
+ like utf8_bin or latin1_bin are not the same as "binary" collation, they have
+ the PAD SPACE characteristic).
+
+ Goal#2 is to preserve the number of trailing spaces in the original value.
+
+ This is achieved by using the following encoding:
+ The key part:
+ - Stores mem-comparable image of the column
+ - It is stored in chunks of fpi->m_segment_size bytes (*)
+ = If the remainder of the chunk is not occupied, it is padded with mem-
+ comparable image of the space character (cs->pad_char to be precise).
+ - The last byte of the chunk shows how the rest of column's mem-comparable
+ image would compare to mem-comparable image of the column extended with
+ spaces. There are three possible values.
+ - VARCHAR_CMP_LESS_THAN_SPACES,
+ - VARCHAR_CMP_EQUAL_TO_SPACES
+ - VARCHAR_CMP_GREATER_THAN_SPACES
+
+ VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
+ is spaces, or something that sorts as spaces, so there is no reason to store
+ it).
+
+ Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
+
+ 'abcd\0' => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0 ' <VARCHAR_CMP_EQUAL> ]
+ 'abcd' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
+ 'abcd ' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
+ 'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
+
+ As mentioned above, the last chunk is padded with mem-comparable images of
+ cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
+
+ fpi->m_segment_size depends on the used collation. It is chosen to be such
+ that no mem-comparable image of space will ever stretch across the segments
+ (see get_segment_size_from_collation).
+
+ == The value part (aka unpack_info) ==
+ The value part stores the number of space characters that one needs to add
+ when unpacking the string.
+ - If the number is positive, it means add this many spaces at the end
+ - If the number is negative, it means padding has added extra spaces which
+ must be removed.
+
+ Storage considerations
+ - depending on column's max size, the number may occupy 1 or 2 bytes
+ - the number of spaces that need to be removed is not more than
+ RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
+ then store it as unsigned.
+
+ @seealso
+ unpack_binary_or_utf8_varchar_space_pad
+ unpack_simple_varchar_space_pad
+ dummy_make_unpack_info
+ skip_variable_space_pad
+*/
+
+void Rdb_key_def::pack_with_varchar_space_pad(
+ Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
+ Rdb_pack_field_context *const pack_ctx) {
+ Rdb_string_writer *const unpack_info = pack_ctx->writer;
+ const CHARSET_INFO *const charset = field->charset();
+ const auto field_var = static_cast<Field_varstring *>(field);
+
+ const size_t value_length = (field_var->length_bytes == 1)
+ ? (uint)*field->ptr
+ : uint2korr(field->ptr);
+
+ const size_t trimmed_len = charset->lengthsp(
+ (const char *)field_var->ptr + field_var->length_bytes,
+ value_length);
+ const size_t xfrm_len = charset->strnxfrm(
+ buf, fpi->m_max_image_len, field_var->char_length(),
+ field_var->ptr + field_var->length_bytes, trimmed_len, 0);
+
+ /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
+ uchar *const buf_end = buf + xfrm_len;
+
+ size_t encoded_size = 0;
+ uchar *ptr = *dst;
+ size_t padding_bytes;
+ while (true) {
+ const size_t copy_len =
+ std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
+ padding_bytes = fpi->m_segment_size - 1 - copy_len;
+ memcpy(ptr, buf, copy_len);
+ ptr += copy_len;
+ buf += copy_len;
+
+ if (padding_bytes) {
+ memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
+ ptr += padding_bytes;
+ *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment
+ } else {
+ // Compare the string suffix with a hypothetical infinite string of
+ // spaces. It could be that the first difference is beyond the end of
+ // current chunk.
+ const int cmp =
+ rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
+
+ if (cmp < 0) {
+ *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
+ } else if (cmp > 0) {
+ *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
+ } else {
+ // It turns out all the rest are spaces.
+ *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
+ }
+ }
+ encoded_size += fpi->m_segment_size;
+
+ if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
+ }
+
+ // m_unpack_info_stores_value means unpack_info stores the whole original
+ // value. There is no need to store the number of trimmed/padded endspaces
+ // in that case.
+ if (unpack_info && !fpi->m_unpack_info_stores_value) {
+ // (value_length - trimmed_len) is the number of trimmed space *characters*
+ // then, padding_bytes is the number of *bytes* added as padding
+ // then, we add 8, because we don't store negative values.
+ DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
+ DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
+ const size_t removed_chars =
+ RDB_TRIMMED_CHARS_OFFSET +
+ (value_length - trimmed_len) / fpi->space_mb_len -
+ padding_bytes / fpi->space_xfrm_len;
+
+ if (fpi->m_unpack_info_uses_two_bytes) {
+ unpack_info->write_uint16(removed_chars);
+ } else {
+ DBUG_ASSERT(removed_chars < 0x100);
+ unpack_info->write_uint8(removed_chars);
+ }
+ }
+
+ *dst += encoded_size;
+}
+
+/*
+ Calculate the number of used bytes in the chunk and whether this is the
+ last chunk in the input. This is based on the old legacy format - see
+ pack_legacy_variable_format.
+ */
+uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
+ uint pad = 255 - flag;
+ uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
+ if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
+ return (uint)-1;
+ }
+
+ *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
+ return used_bytes;
+}
+
+/*
+ Calculate the number of used bytes in the chunk and whether this is the
+ last chunk in the input. This is based on the new format - see
+ pack_variable_format.
+ */
+uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
+ // Check for invalid flag values
+ if (flag > RDB_ESCAPE_LENGTH) {
+ return (uint)-1;
+ }
+
+ // Values from 1 to N-1 indicate this is the last chunk and that is how
+ // many bytes were used
+ if (flag < RDB_ESCAPE_LENGTH) {
+ *done = true;
+ return flag;
+ }
+
+ // A value of N means we used N-1 bytes and had more to go
+ *done = false;
+ return RDB_ESCAPE_LENGTH - 1;
+}
+
+/*
+ Unpack data that has charset information. Each two bytes of the input is
+ treated as a wide-character and converted to its multibyte equivalent in
+ the output.
+ */
+static int unpack_charset(
+ const CHARSET_INFO *cset, // character set information
+ const uchar *src, // source data to unpack
+ uint src_len, // length of source data
+ uchar *dst, // destination of unpacked data
+ uint dst_len, // length of destination data
+ uint *used_bytes) // output number of bytes used
+{
+ if (src_len & 1) {
+ /*
+ UTF-8 characters are encoded into two-byte entities. There is no way
+ we can have an odd number of bytes after encoding.
+ */
+ return UNPACK_FAILURE;
+ }
+
+ uchar *dst_end = dst + dst_len;
+ uint used = 0;
+
+ for (uint ii = 0; ii < src_len; ii += 2) {
+ my_wc_t wc = (src[ii] << 8) | src[ii + 1];
+ int res = cset->wc_mb(wc, dst + used, dst_end);
+ DBUG_ASSERT(res > 0 && res <= 3);
+ if (res < 0) {
+ return UNPACK_FAILURE;
+ }
+
+ used += res;
+ }
+
+ *used_bytes = used;
+ return UNPACK_SUCCESS;
+}
+
+/*
+ Function of type rdb_index_field_unpack_t
+*/
+
+int Rdb_key_def::unpack_binary_or_utf8_varchar(
+ Rdb_field_packing *const fpi, Field *const field, uchar *dst,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
+ const uchar *ptr;
+ size_t len = 0;
+ bool finished = false;
+ uchar *d0 = dst;
+ Field_varstring *const field_var = (Field_varstring *)field;
+ dst += field_var->length_bytes;
+ // How much we can unpack
+ size_t dst_len = field_var->pack_length() - field_var->length_bytes;
+
+ bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
+
+ /* Decode the length-emitted encoding here */
+ while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
+ uint used_bytes;
+
+ /* See pack_with_varchar_encoding. */
+ if (use_legacy_format) {
+ used_bytes = calc_unpack_legacy_variable_format(
+ ptr[RDB_ESCAPE_LENGTH - 1], &finished);
+ } else {
+ used_bytes =
+ calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
+ }
+
+ if (used_bytes == (uint)-1 || dst_len < used_bytes) {
+ return UNPACK_FAILURE; // Corruption in the data
+ }
+
+ /*
+ Now, we need to decode used_bytes of data and append them to the value.
+ */
+ if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
+ int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst,
+ dst_len, &used_bytes);
+ if (err != UNPACK_SUCCESS) {
+ return err;
+ }
+ } else {
+ memcpy(dst, ptr, used_bytes);
+ }
+
+ dst += used_bytes;
+ dst_len -= used_bytes;
+ len += used_bytes;
+
+ if (finished) {
+ break;
+ }
+ }
+
+ if (!finished) {
+ return UNPACK_FAILURE;
+ }
+
+ /* Save the length */
+ if (field_var->length_bytes == 1) {
+ d0[0] = (uchar)len;
+ } else {
+ DBUG_ASSERT(field_var->length_bytes == 2);
+ int2store(d0, len);
+ }
+ return UNPACK_SUCCESS;
+}
+
+/*
+ @seealso
+ pack_with_varchar_space_pad - packing function
+ unpack_simple_varchar_space_pad - unpacking function for 'simple'
+ charsets.
+ skip_variable_space_pad - skip function
+*/
+int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
+ Rdb_field_packing *const fpi, Field *const field, uchar *dst,
+ Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
+ const uchar *ptr;
+ size_t len = 0;
+ bool finished = false;
+ Field_varstring *const field_var = static_cast<Field_varstring *>(field);
+ uchar *d0 = dst;
+ uchar *dst_end = dst + field_var->pack_length();
+ dst += field_var->length_bytes;
+
+ uint space_padding_bytes = 0;
+ uint extra_spaces;
+ if ((fpi->m_unpack_info_uses_two_bytes
+ ? unp_reader->read_uint16(&extra_spaces)
+ : unp_reader->read_uint8(&extra_spaces))) {
+ return UNPACK_FAILURE;
+ }
+
+ if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
+ space_padding_bytes =
+ -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
+ extra_spaces = 0;
+ } else {
+ extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
+ }
+
+ space_padding_bytes *= fpi->space_xfrm_len;
+
+ /* Decode the length-emitted encoding here */
+ while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
+ const char last_byte = ptr[fpi->m_segment_size - 1];
+ size_t used_bytes;
+ if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment
+ {
+ if (space_padding_bytes > (fpi->m_segment_size - 1)) {
+ return UNPACK_FAILURE; // Cannot happen, corrupted data
+ }
+ used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
+ finished = true;
+ } else {
+ if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
+ last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
+ return UNPACK_FAILURE; // Invalid value
+ }
+ used_bytes = fpi->m_segment_size - 1;
+ }
+
+ // Now, need to decode used_bytes of data and append them to the value.
+ if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
+ if (used_bytes & 1) {
+ /*
+ UTF-8 characters are encoded into two-byte entities. There is no way
+ we can have an odd number of bytes after encoding.
+ */
+ return UNPACK_FAILURE;
+ }
+
+ const uchar *src = ptr;
+ const uchar *const src_end = ptr + used_bytes;
+ while (src < src_end) {
+ my_wc_t wc = (src[0] << 8) | src[1];
+ src += 2;
+ const CHARSET_INFO *cset = fpi->m_varchar_charset;
+ int res = cset->wc_mb(wc, dst, dst_end);
+ DBUG_ASSERT(res <= 3);
+ if (res <= 0) return UNPACK_FAILURE;
+ dst += res;
+ len += res;
+ }
+ } else {
+ if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
+ memcpy(dst, ptr, used_bytes);
+ dst += used_bytes;
+ len += used_bytes;
+ }
+
+ if (finished) {
+ if (extra_spaces) {
+ // Both binary and UTF-8 charset store space as ' ',
+ // so the following is ok:
+ if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
+ memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
+ len += extra_spaces;
+ }
+ break;
+ }
+ }
+
+ if (!finished) return UNPACK_FAILURE;
+
+ /* Save the length */
+ if (field_var->length_bytes == 1) {
+ d0[0] = (uchar)len;
+ } else {
+ DBUG_ASSERT(field_var->length_bytes == 2);
+ int2store(d0, len);
+ }
+ return UNPACK_SUCCESS;
+}
+
+/////////////////////////////////////////////////////////////////////////
+
+/*
+ Function of type rdb_make_unpack_info_t
+*/
+
+void Rdb_key_def::make_unpack_unknown(
+ const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
+ const Field *const field, Rdb_pack_field_context *const pack_ctx) {
+ pack_ctx->writer->write(field->ptr, field->pack_length());
+}
+
+/*
+ This point of this function is only to indicate that unpack_info is
+ available.
+
+ The actual unpack_info data is produced by the function that packs the key,
+ that is, pack_with_varchar_space_pad.
+*/
+
+void Rdb_key_def::dummy_make_unpack_info(
+ const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
+ const Field *field MY_ATTRIBUTE((__unused__)),
+ Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
+ // Do nothing
+}
+
+/*
+ Function of type rdb_index_field_unpack_t
+*/
+
+int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi,
+ Field *const field, uchar *const dst,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader) {
+ const uchar *ptr;
+ const uint len = fpi->m_unpack_data_len;
+ // We don't use anything from the key, so skip over it.
+ if (skip_max_length(fpi, field, reader)) {
+ return UNPACK_FAILURE;
+ }
+
+ DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
+
+ if ((ptr = (const uchar *)unp_reader->read(len))) {
+ memcpy(dst, ptr, len);
+ return UNPACK_SUCCESS;
+ }
+ return UNPACK_FAILURE;
+}
+
+/*
+ Function of type rdb_make_unpack_info_t
+*/
+
+void Rdb_key_def::make_unpack_unknown_varchar(
+ const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
+ const Field *const field, Rdb_pack_field_context *const pack_ctx) {
+ const auto f = static_cast<const Field_varstring *>(field);
+ uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
+ len += f->length_bytes;
+ pack_ctx->writer->write(field->ptr, len);
+}
+
+/*
+ Function of type rdb_index_field_unpack_t
+
+ @detail
+ Unpack a key part in an "unknown" collation from its
+ (mem_comparable_form, unpack_info) form.
+
+ "Unknown" means we have no clue about how mem_comparable_form is made from
+ the original string, so we keep the whole original string in the unpack_info.
+
+ @seealso
+ make_unpack_unknown, unpack_unknown
+*/
+
+int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
+ Field *const field, uchar *dst,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader) {
+ const uchar *ptr;
+ uchar *const d0 = dst;
+ const auto f = static_cast<Field_varstring *>(field);
+ dst += f->length_bytes;
+ const uint len_bytes = f->length_bytes;
+ // We don't use anything from the key, so skip over it.
+ if ((fpi->m_skip_func)(fpi, field, reader)) {
+ return UNPACK_FAILURE;
+ }
+
+ DBUG_ASSERT(len_bytes > 0);
+ DBUG_ASSERT(unp_reader != nullptr);
+
+ if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
+ memcpy(d0, ptr, len_bytes);
+ const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
+ if ((ptr = (const uchar *)unp_reader->read(len))) {
+ memcpy(dst, ptr, len);
+ return UNPACK_SUCCESS;
+ }
+ }
+ return UNPACK_FAILURE;
+}
+
+/*
+ Write unpack_data for a "simple" collation
+*/
+static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
+ const Rdb_collation_codec *const codec,
+ const uchar *const src,
+ const size_t src_len) {
+ for (uint i = 0; i < src_len; i++) {
+ writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
+ }
+}
+
+static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
+ const Rdb_collation_codec *const codec,
+ const uchar *const src, const size_t src_len,
+ uchar *const dst) {
+ for (uint i = 0; i < src_len; i++) {
+ if (codec->m_dec_size[src[i]] > 0) {
+ uint *ret;
+ DBUG_ASSERT(reader != nullptr);
+
+ if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
+ return UNPACK_FAILURE;
+ }
+ dst[i] = codec->m_dec_idx[*ret][src[i]];
+ } else {
+ dst[i] = codec->m_dec_idx[0][src[i]];
+ }
+ }
+
+ return UNPACK_SUCCESS;
+}
+
+/*
+ Function of type rdb_make_unpack_info_t
+
+ @detail
+ Make unpack_data for VARCHAR(n) in a "simple" charset.
+*/
+
+void Rdb_key_def::make_unpack_simple_varchar(
+ const Rdb_collation_codec *const codec, const Field *const field,
+ Rdb_pack_field_context *const pack_ctx) {
+ const auto f = static_cast<const Field_varstring *>(field);
+ uchar *const src = f->ptr + f->length_bytes;
+ const size_t src_len =
+ f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
+ Rdb_bit_writer bit_writer(pack_ctx->writer);
+ // The std::min compares characters with bytes, but for simple collations,
+ // mbmaxlen = 1.
+ rdb_write_unpack_simple(&bit_writer, codec, src,
+ std::min((size_t)f->char_length(), src_len));
+}
+
+/*
+ Function of type rdb_index_field_unpack_t
+
+ @seealso
+ pack_with_varchar_space_pad - packing function
+ unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
+*/
+
+int Rdb_key_def::unpack_simple_varchar_space_pad(
+ Rdb_field_packing *const fpi, Field *const field, uchar *dst,
+ Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
+ const uchar *ptr;
+ size_t len = 0;
+ bool finished = false;
+ uchar *d0 = dst;
+ const Field_varstring *const field_var =
+ static_cast<Field_varstring *>(field);
+ // For simple collations, char_length is also number of bytes.
+ DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
+ uchar *dst_end = dst + field_var->pack_length();
+ dst += field_var->length_bytes;
+ Rdb_bit_reader bit_reader(unp_reader);
+
+ uint space_padding_bytes = 0;
+ uint extra_spaces;
+ DBUG_ASSERT(unp_reader != nullptr);
+
+ if ((fpi->m_unpack_info_uses_two_bytes
+ ? unp_reader->read_uint16(&extra_spaces)
+ : unp_reader->read_uint8(&extra_spaces))) {
+ return UNPACK_FAILURE;
+ }
+
+ if (extra_spaces <= 8) {
+ space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
+ extra_spaces = 0;
+ } else {
+ extra_spaces -= 8;
+ }
+
+ space_padding_bytes *= fpi->space_xfrm_len;
+
+ /* Decode the length-emitted encoding here */
+ while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
+ const char last_byte =
+ ptr[fpi->m_segment_size - 1]; // number of padding bytes
+ size_t used_bytes;
+ if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
+ // this is the last one
+ if (space_padding_bytes > (fpi->m_segment_size - 1)) {
+ return UNPACK_FAILURE; // Cannot happen, corrupted data
+ }
+ used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
+ finished = true;
+ } else {
+ if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
+ last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
+ return UNPACK_FAILURE;
+ }
+ used_bytes = fpi->m_segment_size - 1;
+ }
+
+ if (dst + used_bytes > dst_end) {
+ // The value on disk is longer than the field definition allows?
+ return UNPACK_FAILURE;
+ }
+
+ uint ret;
+ if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
+ used_bytes, dst)) != UNPACK_SUCCESS) {
+ return ret;
+ }
+
+ dst += used_bytes;
+ len += used_bytes;
+
+ if (finished) {
+ if (extra_spaces) {
+ if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
+ // pad_char has a 1-byte form in all charsets that
+ // are handled by rdb_init_collation_mapping.
+ memset(dst, field_var->charset()->pad_char, extra_spaces);
+ len += extra_spaces;
+ }
+ break;
+ }
+ }
+
+ if (!finished) return UNPACK_FAILURE;
+
+ /* Save the length */
+ if (field_var->length_bytes == 1) {
+ d0[0] = (uchar)len;
+ } else {
+ DBUG_ASSERT(field_var->length_bytes == 2);
+ int2store(d0, len);
+ }
+ return UNPACK_SUCCESS;
+}
+
+/*
+ Function of type rdb_make_unpack_info_t
+
+ @detail
+ Make unpack_data for CHAR(n) value in a "simple" charset.
+ It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
+
+ @seealso
+ The VARCHAR variant is in make_unpack_simple_varchar
+*/
+
+void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
+ const Field *const field,
+ Rdb_pack_field_context *const pack_ctx) {
+ const uchar *const src = field->ptr;
+ Rdb_bit_writer bit_writer(pack_ctx->writer);
+ rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
+}
+
+/*
+ Function of type rdb_index_field_unpack_t
+*/
+
+int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi,
+ Field *const field MY_ATTRIBUTE((__unused__)),
+ uchar *const dst,
+ Rdb_string_reader *const reader,
+ Rdb_string_reader *const unp_reader) {
+ const uchar *ptr;
+ const uint len = fpi->m_max_image_len;
+ Rdb_bit_reader bit_reader(unp_reader);
+
+ if (!(ptr = (const uchar *)reader->read(len))) {
+ return UNPACK_FAILURE;
+ }
+
+ return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
+ fpi->m_charset_codec, ptr, len, dst);
+}
+
+// See Rdb_charset_space_info::spaces_xfrm
+const int RDB_SPACE_XFRM_SIZE = 32;
+
+// A class holding information about how space character is represented in a
+// charset.
+class Rdb_charset_space_info {
+ public:
+ Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
+ Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
+ Rdb_charset_space_info() = default;
+
+ // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
+ std::vector<uchar> spaces_xfrm;
+
+ // length(strxfrm(' '))
+ size_t space_xfrm_len;
+
+ // length of the space character itself
+ // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
+ // (length=2)
+ size_t space_mb_len;
+};
+
+static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
+ rdb_mem_comparable_space;
+
+/*
+ @brief
+ For a given charset, get
+ - strxfrm(' '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
+ - length of strxfrm(charset, ' ')
+ - length of the space character in the charset
+
+ @param cs IN Charset to get the space for
+ @param ptr OUT A few space characters
+ @param len OUT Return length of the space (in bytes)
+
+ @detail
+ It is tempting to pre-generate mem-comparable form of space character for
+ every charset on server startup.
+ One can't do that: some charsets are not initialized until somebody
+ attempts to use them (e.g. create or open a table that has a field that
+ uses the charset).
+*/
+
+static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
+ const std::vector<uchar> **xfrm,
+ size_t *const xfrm_len,
+ size_t *const mb_len) {
+ DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
+ if (!rdb_mem_comparable_space[cs->number].get()) {
+ RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
+ if (!rdb_mem_comparable_space[cs->number].get()) {
+ // Upper bound of how many bytes can be occupied by multi-byte form of a
+ // character in any charset.
+ const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
+ DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
+
+ // multi-byte form of the ' ' (space) character
+ uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
+
+ const size_t space_mb_len = cs->wc_mb(
+ (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
+
+ // mem-comparable image of the space character
+ std::array<uchar, 20> space;
+
+ const size_t space_len = cs->strnxfrm(
+ space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
+ Rdb_charset_space_info *const info = new Rdb_charset_space_info;
+ info->space_xfrm_len = space_len;
+ info->space_mb_len = space_mb_len;
+ while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
+ info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
+ space.data() + space_len);
+ }
+ rdb_mem_comparable_space[cs->number].reset(info);
+ }
+ RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
+ }
+
+ *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
+ *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
+ *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
+}
+
+mysql_mutex_t rdb_mem_cmp_space_mutex;
+
+std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
+ rdb_collation_data;
+mysql_mutex_t rdb_collation_data_mutex;
+
+bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
+ return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 &&
+ !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD));
+}
+
+static const Rdb_collation_codec *rdb_init_collation_mapping(
+ const my_core::CHARSET_INFO *const cs) {
+ DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
+ const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
+
+ if (codec == nullptr && rdb_is_collation_supported(cs)) {
+ RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
+
+ codec = rdb_collation_data[cs->number];
+ if (codec == nullptr) {
+ Rdb_collation_codec *cur = nullptr;
+
+ // Compute reverse mapping for simple collations.
+ if (rdb_is_collation_supported(cs)) {
+ cur = new Rdb_collation_codec;
+ std::map<uchar, std::vector<uchar>> rev_map;
+ size_t max_conflict_size = 0;
+ for (int src = 0; src < 256; src++) {
+ uchar dst = cs->sort_order[src];
+ rev_map[dst].push_back(src);
+ max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
+ }
+ cur->m_dec_idx.resize(max_conflict_size);
+
+ for (auto const &p : rev_map) {
+ uchar dst = p.first;
+ for (uint idx = 0; idx < p.second.size(); idx++) {
+ uchar src = p.second[idx];
+ uchar bits =
+ my_bit_log2_uint32(my_round_up_to_next_power(p.second.size()));
+ cur->m_enc_idx[src] = idx;
+ cur->m_enc_size[src] = bits;
+ cur->m_dec_size[dst] = bits;
+ cur->m_dec_idx[idx][dst] = src;
+ }
+ }
+
+ cur->m_make_unpack_info_func = {{Rdb_key_def::make_unpack_simple_varchar,
+ Rdb_key_def::make_unpack_simple}};
+ cur->m_unpack_func = {{Rdb_key_def::unpack_simple_varchar_space_pad,
+ Rdb_key_def::unpack_simple}};
+ } else {
+ // Out of luck for now.
+ }
+
+ if (cur != nullptr) {
+ codec = cur;
+ cur->m_cs = cs;
+ rdb_collation_data[cs->number] = cur;
+ }
+ }
+
+ RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
+ }
+
+ return codec;
+}
+
+static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
+ int ret;
+ if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN ||
+ cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) {
+ /*
+ In these collations, a character produces one weight, which is 3 bytes.
+ Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
+ get 3*3+1=10
+ */
+ ret = 10;
+ } else {
+ /*
+ All other collations. There are two classes:
+ - Unicode-based, except for collations mentioned in the if-condition.
+ For these all weights are 2 bytes long, a character may produce 0..8
+ weights.
+ in any case, 8 bytes of payload in the segment guarantee that the last
+ space character won't span across segments.
+
+ - Collations not based on unicode. These have length(strxfrm(' '))=1,
+ there nothing to worry about.
+
+ In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
+ */
+ ret = 9;
+ }
+ DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
+ return ret;
+}
+
+/*
+ @brief
+ Setup packing of index field into its mem-comparable form
+
+ @detail
+ - It is possible produce mem-comparable form for any datatype.
+ - Some datatypes also allow to unpack the original value from its
+ mem-comparable form.
+ = Some of these require extra information to be stored in "unpack_info".
+ unpack_info is not a part of mem-comparable form, it is only used to
+ restore the original value
+
+ @param
+ field IN field to be packed/un-packed
+
+ @return
+ TRUE - Field can be read with index-only reads
+ FALSE - Otherwise
+*/
+
+bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
+ const Field *const field, const uint keynr_arg,
+ const uint key_part_arg,
+ const uint16 key_length) {
+ int res = false;
+ enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
+
+ m_keynr = keynr_arg;
+ m_key_part = key_part_arg;
+
+ m_maybe_null = field ? field->real_maybe_null() : false;
+ m_unpack_func = nullptr;
+ m_make_unpack_info_func = nullptr;
+ m_unpack_data_len = 0;
+ space_xfrm = nullptr; // safety
+ // whether to use legacy format for varchar
+ m_use_legacy_varbinary_format = false;
+ // ha_rocksdb::index_flags() will pass key_descr == null to
+ // see whether field(column) can be read-only reads through return value,
+ // but the legacy vs. new varchar format doesn't affect return value.
+ // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
+ if (!key_descr || key_descr->use_legacy_varbinary_format()) {
+ m_use_legacy_varbinary_format = true;
+ }
+ /* Calculate image length. By default, is is pack_length() */
+ m_max_image_len =
+ field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
+ m_skip_func = Rdb_key_def::skip_max_length;
+ m_pack_func = Rdb_key_def::pack_with_make_sort_key;
+
+ m_covered = false;
+
+ switch (type) {
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_TINY:
+ m_unpack_func = Rdb_key_def::unpack_integer;
+ m_covered = true;
+ return true;
+
+ case MYSQL_TYPE_DOUBLE:
+ m_unpack_func = Rdb_key_def::unpack_double;
+ m_covered = true;
+ return true;
+
+ case MYSQL_TYPE_FLOAT:
+ m_unpack_func = Rdb_key_def::unpack_float;
+ m_covered = true;
+ return true;
+
+ case MYSQL_TYPE_NEWDECIMAL:
+ /*
+ Decimal is packed with Field_new_decimal::make_sort_key, which just
+ does memcpy.
+ Unpacking decimal values was supported only after fix for issue#253,
+ because of that ha_rocksdb::get_storage_type() handles decimal values
+ in a special way.
+ */
+ case MYSQL_TYPE_DATETIME2:
+ case MYSQL_TYPE_TIMESTAMP2:
+ /* These are packed with Field_temporal_with_date_and_timef::make_sort_key
+ */
+ case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
+ case MYSQL_TYPE_YEAR: /* YEAR is packed with Field_tiny::make_sort_key */
+ /* Everything that comes here is packed with just a memcpy(). */
+ m_unpack_func = Rdb_key_def::unpack_binary_str;
+ m_covered = true;
+ return true;
+
+ case MYSQL_TYPE_NEWDATE:
+ /*
+ This is packed by Field_newdate::make_sort_key. It assumes the data is
+ 3 bytes, and packing is done by swapping the byte order (for both big-
+ and little-endian)
+ */
+ m_unpack_func = Rdb_key_def::unpack_newdate;
+ m_covered = true;
+ return true;
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB: {
+ if (key_descr) {
+ // The my_charset_bin collation is special in that it will consider
+ // shorter strings sorting as less than longer strings.
+ //
+ // See Field_blob::make_sort_key for details.
+ m_max_image_len =
+ key_length + (field->charset()->number == COLLATION_BINARY
+ ? reinterpret_cast<const Field_blob *>(field)
+ ->pack_length_no_ptr()
+ : 0);
+ // Return false because indexes on text/blob will always require
+ // a prefix. With a prefix, the optimizer will not be able to do an
+ // index-only scan since there may be content occuring after the prefix
+ // length.
+ return false;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ m_unpack_info_stores_value = false;
+ /* Handle [VAR](CHAR|BINARY) */
+
+ if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
+ /*
+ For CHAR-based columns, check how strxfrm image will take.
+ field->field_length = field->char_length() * cs->mbmaxlen.
+ */
+ const CHARSET_INFO *cs = field->charset();
+ m_max_image_len = cs->strnxfrmlen(type == MYSQL_TYPE_STRING ?
+ field->pack_length() :
+ field->field_length);
+ }
+ const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
+ const CHARSET_INFO *cs = field->charset();
+ // max_image_len before chunking is taken into account
+ const int max_image_len_before_chunks = m_max_image_len;
+
+ if (is_varchar) {
+ // The default for varchar is variable-length, without space-padding for
+ // comparisons
+ m_varchar_charset = cs;
+ m_skip_func = Rdb_key_def::skip_variable_length;
+ m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
+ if (!key_descr || key_descr->use_legacy_varbinary_format()) {
+ m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
+ } else {
+ // Calculate the maximum size of the short section plus the
+ // maximum size of the long section
+ m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
+ }
+
+ const auto field_var = static_cast<const Field_varstring *>(field);
+ m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
+ }
+
+ if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
+ // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
+ // information about character-based datatypes are compared.
+ bool use_unknown_collation = false;
+ DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
+ use_unknown_collation = true;);
+
+ if (cs->number == COLLATION_BINARY) {
+ // - SQL layer pads BINARY(N) so that it always is N bytes long.
+ // - For VARBINARY(N), values may have different lengths, so we're using
+ // variable-length encoding. This is also the only charset where the
+ // values are not space-padded for comparison.
+ m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
+ : Rdb_key_def::unpack_binary_str;
+ res = true;
+ } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) {
+ // For _bin collations, mem-comparable form of the string is the string
+ // itself.
+
+ if (is_varchar) {
+ // VARCHARs - are compared as if they were space-padded - but are
+ // not actually space-padded (reading the value back produces the
+ // original value, without the padding)
+ m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
+ m_skip_func = Rdb_key_def::skip_variable_space_pad;
+ m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
+ m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
+ m_segment_size = get_segment_size_from_collation(cs);
+ m_max_image_len =
+ (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
+ m_segment_size;
+ rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
+ &space_mb_len);
+ } else {
+ // SQL layer pads CHAR(N) values to their maximum length.
+ // We just store that and restore it back.
+ m_unpack_func = (cs->number == COLLATION_LATIN1_BIN)
+ ? Rdb_key_def::unpack_binary_str
+ : Rdb_key_def::unpack_utf8_str;
+ }
+ res = true;
+ } else {
+ // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
+
+ res = true; // index-only scans are possible
+ m_unpack_data_len = is_varchar ? 0 : field->field_length;
+ const uint idx = is_varchar ? 0 : 1;
+ const Rdb_collation_codec *codec = nullptr;
+
+ if (is_varchar) {
+ // VARCHAR requires space-padding for doing comparisons
+ //
+ // The check for cs->levels_for_order is to catch
+ // latin2_czech_cs and cp1250_czech_cs - multi-level collations
+ // that Variable-Length Space Padded Encoding can't handle.
+ // It is not expected to work for any other multi-level collations,
+ // either.
+ // Currently we handle these collations as NO_PAD, even if they have
+ // PAD_SPACE attribute.
+ if (cs->levels_for_order == 1) {
+ m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
+ m_skip_func = Rdb_key_def::skip_variable_space_pad;
+ m_segment_size = get_segment_size_from_collation(cs);
+ m_max_image_len =
+ (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
+ m_segment_size;
+ rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
+ &space_mb_len);
+ } else {
+ // NO_LINT_DEBUG
+ sql_print_warning(
+ "RocksDB: you're trying to create an index "
+ "with a multi-level collation %s",
+ cs->cs_name.str);
+ // NO_LINT_DEBUG
+ sql_print_warning(
+ "MyRocks will handle this collation internally "
+ " as if it had a NO_PAD attribute.");
+ m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
+ m_skip_func = Rdb_key_def::skip_variable_length;
+ }
+ }
+
+ if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
+ // The collation allows to store extra information in the unpack_info
+ // which can be used to restore the original value from the
+ // mem-comparable form.
+ m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
+ m_unpack_func = codec->m_unpack_func[idx];
+ m_charset_codec = codec;
+ } else if (use_unknown_collation) {
+ // We have no clue about how this collation produces mem-comparable
+ // form. Our way of restoring the original value is to keep a copy of
+ // the original value in unpack_info.
+ m_unpack_info_stores_value = true;
+ m_make_unpack_info_func = is_varchar
+ ? Rdb_key_def::make_unpack_unknown_varchar
+ : Rdb_key_def::make_unpack_unknown;
+ m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
+ : Rdb_key_def::unpack_unknown;
+ } else {
+ // Same as above: we don't know how to restore the value from its
+ // mem-comparable form.
+ // Here, we just indicate to the SQL layer we can't do it.
+ DBUG_ASSERT(m_unpack_func == nullptr);
+ m_unpack_info_stores_value = false;
+ res = false; // Indicate that index-only reads are not possible
+ }
+ }
+
+ // Make an adjustment: if this column is partially covered, tell the SQL
+ // layer we can't do index-only scans. Later when we perform an index read,
+ // we'll check on a record-by-record basis if we can do an index-only scan
+ // or not.
+ uint field_length;
+ if (field->table) {
+ field_length = field->table->field[field->field_index]->field_length;
+ } else {
+ field_length = field->field_length;
+ }
+
+ if (field_length != key_length) {
+ res = false;
+ // If this index doesn't support covered bitmaps, then we won't know
+ // during a read if the column is actually covered or not. If so, we need
+ // to assume the column isn't covered and skip it during unpacking.
+ //
+ // If key_descr == NULL, then this is a dummy field and we probably don't
+ // need to perform this step. However, to preserve the behavior before
+ // this change, we'll only skip this step if we have an index which
+ // supports covered bitmaps.
+ if (!key_descr || !key_descr->use_covered_bitmap_format()) {
+ m_unpack_func = nullptr;
+ m_make_unpack_info_func = nullptr;
+ m_unpack_info_stores_value = true;
+ }
+ }
+ }
+
+ m_covered = res;
+ return res;
+}
+
+Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
+ return tbl->key_info[m_keynr].key_part[m_key_part].field;
+}
+
+void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
+ const longlong hidden_pk_id) const {
+ DBUG_ASSERT(m_max_image_len == 8);
+
+ String to;
+ rdb_netstr_append_uint64(&to, hidden_pk_id);
+ memcpy(*dst, to.ptr(), m_max_image_len);
+
+ *dst += m_max_image_len;
+}
+
+///////////////////////////////////////////////////////////////////////////////////////////
+// Rdb_ddl_manager
+///////////////////////////////////////////////////////////////////////////////////////////
+
+Rdb_tbl_def::~Rdb_tbl_def() {
+ auto ddl_manager = rdb_get_ddl_manager();
+ /* Don't free key definitions */
+ if (m_key_descr_arr) {
+ for (uint i = 0; i < m_key_count; i++) {
+ if (ddl_manager && m_key_descr_arr[i]) {
+ ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
+ }
+
+ m_key_descr_arr[i] = nullptr;
+ }
+
+ delete[] m_key_descr_arr;
+ m_key_descr_arr = nullptr;
+ }
+}
+
+/*
+ Put table definition DDL entry. Actual write is done at
+ Rdb_dict_manager::commit.
+
+ We write
+ dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
+
+ Where key entries are a tuple of
+ ( cf_id, index_nr )
+*/
+
+bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
+ rocksdb::WriteBatch *const batch,
+ const rocksdb::Slice &key) {
+ StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
+ indexes.alloc(Rdb_key_def::VERSION_SIZE +
+ m_key_count * Rdb_key_def::PACKED_SIZE * 2);
+ rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
+
+ for (uint i = 0; i < m_key_count; i++) {
+ const Rdb_key_def &kd = *m_key_descr_arr[i];
+
+ uchar flags =
+ (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
+ (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0);
+
+ const uint cf_id = kd.get_cf()->GetID();
+ /*
+ If cf_id already exists, cf_flags must be the same.
+ To prevent race condition, reading/modifying/committing CF flags
+ need to be protected by mutex (dict_manager->lock()).
+ When RocksDB supports transaction with pessimistic concurrency
+ control, we can switch to use it and removing mutex.
+ */
+ uint existing_cf_flags;
+ const std::string cf_name = kd.get_cf()->GetName();
+
+ if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
+ // For the purposes of comparison we'll clear the partitioning bit. The
+ // intent here is to make sure that both partitioned and non-partitioned
+ // tables can refer to the same CF.
+ existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
+ flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
+
+ if (existing_cf_flags != flags) {
+ my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags,
+ existing_cf_flags);
+ return true;
+ }
+ } else {
+ dict->add_cf_flags(batch, cf_id, flags);
+ }
+
+ rdb_netstr_append_uint32(&indexes, cf_id);
+
+ uint32 index_number = kd.get_index_number();
+ rdb_netstr_append_uint32(&indexes, index_number);
+
+ struct Rdb_index_info index_info;
+ index_info.m_gl_index_id = {cf_id, index_number};
+ index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
+ index_info.m_index_type = kd.m_index_type;
+ index_info.m_kv_version = kd.m_kv_format_version;
+ index_info.m_index_flags = kd.m_index_flags_bitmap;
+ index_info.m_ttl_duration = kd.m_ttl_duration;
+
+ dict->add_or_update_index_cf_mapping(batch, &index_info);
+ }
+
+ const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
+
+ dict->put_key(batch, key, svalue);
+ return false;
+}
+
+time_t Rdb_tbl_def::get_create_time() {
+ time_t create_time = m_create_time;
+
+ if (create_time == CREATE_TIME_UNKNOWN) {
+ // Read it from the .frm file. It's not a problem if several threads do this
+ // concurrently
+ char path[FN_REFLEN];
+ snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
+ m_dbname.c_str(), m_tablename.c_str(), reg_ext);
+ unpack_filename(path,path);
+ MY_STAT f_stat;
+ if (my_stat(path, &f_stat, MYF(0)))
+ create_time = f_stat.st_ctime;
+ else
+ create_time = 0; // will be shown as SQL NULL
+ m_create_time = create_time;
+ }
+ return create_time;
+}
+
+// Length that each index flag takes inside the record.
+// Each index in the array maps to the enum INDEX_FLAG
+static const std::array<uint, 1> index_flag_lengths = {
+ {ROCKSDB_SIZEOF_TTL_RECORD}};
+
+bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
+ return flag & index_flags;
+}
+
+uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
+ enum INDEX_FLAG flag,
+ uint *const length) {
+ DBUG_ASSERT_IMP(flag != MAX_FLAG,
+ Rdb_key_def::has_index_flag(index_flags, flag));
+
+ uint offset = 0;
+ for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
+ int mask = 1 << bit;
+
+ /* Exit once we've reached the proper flag */
+ if (flag & mask) {
+ if (length != nullptr) {
+ *length = index_flag_lengths[bit];
+ }
+ break;
+ }
+
+ if (index_flags & mask) {
+ offset += index_flag_lengths[bit];
+ }
+ }
+
+ return offset;
+}
+
+void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
+ const uchar *const val,
+ enum INDEX_FLAG flag) const {
+ uint len;
+ uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
+ DBUG_ASSERT(offset + len <= buf->get_current_pos());
+ memcpy(buf->ptr() + offset, val, len);
+}
+
+void Rdb_tbl_def::check_if_is_mysql_system_table() {
+ static const char *const system_dbs[] = {
+ "mysql",
+ "performance_schema",
+ "information_schema",
+ };
+
+ m_is_mysql_system_table = false;
+ for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
+ if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
+ m_is_mysql_system_table = true;
+ break;
+ }
+ }
+}
+
+void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
+ m_is_read_free_rpl_table =
+#if 0 // MARIAROCKS_NOT_YET : read-free replication is not supported
+ rdb_read_free_regex_handler.matches(base_tablename());
+#else
+ false;
+#endif
+}
+
+void Rdb_tbl_def::set_name(const std::string &name) {
+ int err MY_ATTRIBUTE((__unused__));
+
+ m_dbname_tablename = name;
+ err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
+ &m_partition);
+ DBUG_ASSERT(err == 0);
+
+ check_if_is_mysql_system_table();
+}
+
+GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
+ for (uint i = 0; i < m_key_count; i++) {
+ auto &k = m_key_descr_arr[i];
+ if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
+ k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
+ return k->get_gl_index_id();
+ }
+ }
+
+ // Every table must have a primary key, even if it's hidden.
+ abort();
+ return GL_INDEX_ID();
+}
+
+void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
+ m_index_num_to_keydef.erase(gl_index_id);
+}
+
+void Rdb_ddl_manager::add_uncommitted_keydefs(
+ const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
+ mysql_rwlock_wrlock(&m_rwlock);
+ for (const auto &index : indexes) {
+ m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
+ }
+ mysql_rwlock_unlock(&m_rwlock);
+}
+
+void Rdb_ddl_manager::remove_uncommitted_keydefs(
+ const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
+ mysql_rwlock_wrlock(&m_rwlock);
+ for (const auto &index : indexes) {
+ m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
+ }
+ mysql_rwlock_unlock(&m_rwlock);
+}
+
+namespace // anonymous namespace = not visible outside this source file
+{
+struct Rdb_validate_tbls : public Rdb_tables_scanner {
+ using tbl_info_t = std::pair<std::string, bool>;
+ using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
+
+ tbl_list_t m_list;
+
+ int add_table(Rdb_tbl_def *tdef) override;
+
+ bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
+
+ bool scan_for_frms(const std::string &datadir, const std::string &dbname,
+ bool *has_errors);
+
+ bool check_frm_file(const std::string &fullpath, const std::string &dbname,
+ const std::string &tablename, bool *has_errors);
+};
+} // anonymous namespace
+
+/*
+ Get a list of tables that we expect to have .frm files for. This will use the
+ information just read from the RocksDB data dictionary.
+*/
+int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
+ DBUG_ASSERT(tdef != nullptr);
+
+ /* Add the database/table into the list that are not temp table */
+ if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) {
+ bool is_partition = tdef->base_partition().size() != 0;
+ m_list[tdef->base_dbname()].insert(
+ tbl_info_t(tdef->base_tablename(), is_partition));
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Access the .frm file for this dbname/tablename and see if it is a RocksDB
+ table (or partition table).
+*/
+bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
+ const std::string &dbname,
+ const std::string &tablename,
+ bool *has_errors) {
+ /* Check this .frm file to see what engine it uses */
+ String fullfilename(fullpath.data(), fullpath.length(), &my_charset_bin);
+ fullfilename.append(STRING_WITH_LEN(FN_DIRSEP));
+ fullfilename.append(tablename.data(), tablename.length());
+ fullfilename.append(STRING_WITH_LEN(".frm"));
+
+ /*
+ This function will return the legacy_db_type of the table. Currently
+ it does not reference the first parameter (THD* thd), but if it ever
+ did in the future we would need to make a version that does it without
+ the connection handle as we don't have one here.
+ */
+ char eng_type_buf[NAME_CHAR_LEN+1];
+ LEX_CSTRING eng_type_str = {eng_type_buf, 0};
+ enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(),
+ &eng_type_str, nullptr, nullptr);
+ if (type == TABLE_TYPE_UNKNOWN) {
+ // NO_LINT_DEBUG
+ sql_print_warning("RocksDB: Failed to open/read .from file: %s",
+ fullfilename.ptr());
+ return false;
+ }
+
+ if (type == TABLE_TYPE_NORMAL) {
+ /* For a RocksDB table do we have a reference in the data dictionary? */
+ if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) {
+ /*
+ Attempt to remove the table entry from the list of tables. If this
+ fails then we know we had a .frm file that wasn't registered in RocksDB.
+ */
+ tbl_info_t element(tablename, false);
+ if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
+ // NO_LINT_DEBUG
+ sql_print_warning(
+ "RocksDB: Schema mismatch - "
+ "A .frm file exists for table %s.%s, "
+ "but that table is not registered in RocksDB",
+ dbname.c_str(), tablename.c_str());
+ *has_errors = true;
+ }
+ } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) {
+ /*
+ For partition tables, see if it is in the m_list as a partition,
+ but don't generate an error if it isn't there - we don't know that the
+ .frm is for RocksDB.
+ */
+ if (m_list.count(dbname) > 0) {
+ m_list[dbname].erase(tbl_info_t(tablename, true));
+ }
+ }
+ }
+
+ return true;
+}
+
+/* Scan the database subdirectory for .frm files */
+bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
+ const std::string &dbname,
+ bool *has_errors) {
+ bool result = true;
+ std::string fullpath = datadir + dbname;
+ struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
+
+ /* Access the directory */
+ if (dir_info == nullptr) {
+ // NO_LINT_DEBUG
+ sql_print_warning("RocksDB: Could not open database directory: %s",
+ fullpath.c_str());
+ return false;
+ }
+
+ /* Scan through the files in the directory */
+ struct fileinfo *file_info = dir_info->dir_entry;
+ for (size_t ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
+ /* Find .frm files that are not temp files (those that contain '#sql') */
+ const char *ext = strrchr(file_info->name, '.');
+ if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
+ strcmp(ext, ".frm") == 0) {
+ std::string tablename =
+ std::string(file_info->name, ext - file_info->name);
+
+ /* Check to see if the .frm file is from RocksDB */
+ if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
+ result = false;
+ break;
+ }
+ }
+ }
+
+ /* Remove any databases who have no more tables listed */
+ if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
+ m_list.erase(dbname);
+ }
+
+ /* Release the directory entry */
+ my_dirend(dir_info);
+
+ return result;
+}
+
+/*
+ Scan the datadir for all databases (subdirectories) and get a list of .frm
+ files they contain
+*/
+bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
+ bool *has_errors) {
+ bool result = true;
+ struct st_my_dir *dir_info;
+ struct fileinfo *file_info;
+
+ dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
+ if (dir_info == nullptr) {
+ // NO_LINT_DEBUG
+ sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
+ return false;
+ }
+
+ file_info = dir_info->dir_entry;
+ for (size_t ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
+ /* Ignore files/dirs starting with '.' */
+ if (file_info->name[0] == '.') continue;
+
+ /* Ignore all non-directory files */
+ if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
+
+ /* Scan all the .frm files in the directory */
+ if (!scan_for_frms(datadir, file_info->name, has_errors)) {
+ result = false;
+ break;
+ }
+ }
+
+ /* Release the directory info */
+ my_dirend(dir_info);
+
+ return result;
+}
+
+/*
+ Validate that all auto increment values in the data dictionary are on a
+ supported version.
+*/
+bool Rdb_ddl_manager::validate_auto_incr() {
+ std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
+
+ uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
+ rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
+ const rocksdb::Slice auto_incr_entry_slice(
+ reinterpret_cast<char *>(auto_incr_entry),
+ Rdb_key_def::INDEX_NUMBER_SIZE);
+ for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
+ const rocksdb::Slice key = it->key();
+ const rocksdb::Slice val = it->value();
+ GL_INDEX_ID gl_index_id;
+
+ if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
+ memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
+ break;
+ }
+
+ if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
+ return false;
+ }
+
+ if (val.size() <= Rdb_key_def::VERSION_SIZE) {
+ return false;
+ }
+
+ // Check if we have orphaned entries for whatever reason by cross
+ // referencing ddl entries.
+ auto ptr = reinterpret_cast<const uchar *>(key.data());
+ ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
+ rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
+ if (!m_dict->get_index_info(gl_index_id, nullptr)) {
+ // NO_LINT_DEBUG
+ sql_print_warning(
+ "RocksDB: AUTOINC mismatch - "
+ "Index number (%u, %u) found in AUTOINC "
+ "but does not exist as a DDL entry",
+ gl_index_id.cf_id, gl_index_id.index_id);
+ return false;
+ }
+
+ ptr = reinterpret_cast<const uchar *>(val.data());
+ const int version = rdb_netbuf_read_uint16(&ptr);
+ if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
+ // NO_LINT_DEBUG
+ sql_print_warning(
+ "RocksDB: AUTOINC mismatch - "
+ "Index number (%u, %u) found in AUTOINC "
+ "is on unsupported version %d",
+ gl_index_id.cf_id, gl_index_id.index_id, version);
+ return false;
+ }
+ }
+
+ if (!it->status().ok()) {
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ Validate that all the tables in the RocksDB database dictionary match the .frm
+ files in the datadir
+*/
+bool Rdb_ddl_manager::validate_schemas(void) {
+ bool has_errors = false;
+ const std::string datadir = std::string(mysql_real_data_home);
+ Rdb_validate_tbls table_list;
+
+ /* Get the list of tables from the database dictionary */
+ if (scan_for_tables(&table_list) != 0) {
+ return false;
+ }
+
+ /* Compare that to the list of actual .frm files */
+ if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
+ return false;
+ }
+
+ /*
+ Any tables left in the tables list are ones that are registered in RocksDB
+ but don't have .frm files.
+ */
+ for (const auto &db : table_list.m_list) {
+ for (const auto &table : db.second) {
+ // NO_LINT_DEBUG
+ sql_print_warning(
+ "RocksDB: Schema mismatch - "
+ "Table %s.%s is registered in RocksDB "
+ "but does not have a .frm file",
+ db.first.c_str(), table.first.c_str());
+ has_errors = true;
+ }
+ }
+
+ return !has_errors;
+}
+
+bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
+ Rdb_cf_manager *const cf_manager,
+ const uint32_t validate_tables) {
+ m_dict = dict_arg;
+ mysql_rwlock_init(0, &m_rwlock);
+
+ /* Read the data dictionary and populate the hash */
+ uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
+ rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
+ const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
+ Rdb_key_def::INDEX_NUMBER_SIZE);
+
+ /* Reading data dictionary should always skip bloom filter */
+ rocksdb::Iterator *it = m_dict->new_iterator();
+ int i = 0;
+
+ uint max_index_id_in_dict = 0;
+ m_dict->get_max_index_id(&max_index_id_in_dict);
+
+ for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
+ const uchar *ptr;
+ const uchar *ptr_end;
+ const rocksdb::Slice key = it->key();
+ const rocksdb::Slice val = it->value();
+
+ if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
+ memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
+ break;
+ }
+
+ if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
+ // NO_LINT_DEBUG
+ sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
+ (int)key.size());
+ return true;
+ }
+
+ Rdb_tbl_def *const tdef =
+ new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
+
+ // Now, read the DDLs.
+ const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
+ if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
+ // NO_LINT_DEBUG
+ sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
+ tdef->full_tablename().c_str());
+ return true;
+ }
+ tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
+ tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
+
+ ptr = reinterpret_cast<const uchar *>(val.data());
+ const int version = rdb_netbuf_read_uint16(&ptr);
+ if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: DDL ENTRY Version was not expected."
+ "Expected: %d, Actual: %d",
+ Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
+ return true;
+ }
+ ptr_end = ptr + real_val_size;
+ for (uint keyno = 0; ptr < ptr_end; keyno++) {
+ GL_INDEX_ID gl_index_id;
+ rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
+ uint flags = 0;
+ struct Rdb_index_info index_info;
+ if (!m_dict->get_index_info(gl_index_id, &index_info)) {
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Could not get index information "
+ "for Index Number (%u,%u), table %s",
+ gl_index_id.cf_id, gl_index_id.index_id,
+ tdef->full_tablename().c_str());
+ return true;
+ }
+ if (max_index_id_in_dict < gl_index_id.index_id) {
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Found max index id %u from data dictionary "
+ "but also found larger index id %u from dictionary. "
+ "This should never happen and possibly a bug.",
+ max_index_id_in_dict, gl_index_id.index_id);
+ return true;
+ }
+ if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Could not get Column Family Flags "
+ "for CF Number %d, table %s",
+ gl_index_id.cf_id, tdef->full_tablename().c_str());
+ return true;
+ }
+
+ if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
+ // The per-index cf option is deprecated. Make sure we don't have the
+ // flag set in any existing database. NO_LINT_DEBUG
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
+ "number %d, table %s",
+ gl_index_id.cf_id, tdef->full_tablename().c_str());
+ }
+
+ rocksdb::ColumnFamilyHandle *const cfh =
+ cf_manager->get_cf(gl_index_id.cf_id);
+ DBUG_ASSERT(cfh != nullptr);
+
+ uint32 ttl_rec_offset =
+ Rdb_key_def::has_index_flag(index_info.m_index_flags,
+ Rdb_key_def::TTL_FLAG)
+ ? Rdb_key_def::calculate_index_flag_offset(
+ index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
+ : UINT_MAX;
+
+ /*
+ We can't fully initialize Rdb_key_def object here, because full
+ initialization requires that there is an open TABLE* where we could
+ look at Field* objects and set max_length and other attributes
+ */
+ tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
+ gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
+ index_info.m_index_type, index_info.m_kv_version,
+ flags & Rdb_key_def::REVERSE_CF_FLAG,
+ flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
+ m_dict->get_stats(gl_index_id), index_info.m_index_flags,
+ ttl_rec_offset, index_info.m_ttl_duration);
+ }
+ put(tdef);
+ i++;
+ }
+
+ /*
+ If validate_tables is greater than 0 run the validation. Only fail the
+ initialzation if the setting is 1. If the setting is 2 we continue.
+ */
+ if (validate_tables > 0) {
+ std::string msg;
+ if (!validate_schemas()) {
+ msg =
+ "RocksDB: Problems validating data dictionary "
+ "against .frm files, exiting";
+ } else if (!validate_auto_incr()) {
+ msg =
+ "RocksDB: Problems validating auto increment values in "
+ "data dictionary, exiting";
+ }
+ if (validate_tables == 1 && !msg.empty()) {
+ // NO_LINT_DEBUG
+ sql_print_error("%s", msg.c_str());
+ return true;
+ }
+ }
+
+ // index ids used by applications should not conflict with
+ // data dictionary index ids
+ if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
+ max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
+ }
+
+ m_sequence.init(max_index_id_in_dict + 1);
+
+ if (!it->status().ok()) {
+ rdb_log_status_error(it->status(), "Table_store load error");
+ return true;
+ }
+ delete it;
+ // NO_LINT_DEBUG
+ sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
+ i);
+ return false;
+}
+
+Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
+ const bool lock) {
+ if (lock) {
+ mysql_rwlock_rdlock(&m_rwlock);
+ }
+
+ Rdb_tbl_def *rec = nullptr;
+ const auto it = m_ddl_map.find(table_name);
+ if (it != m_ddl_map.end()) {
+ rec = it->second;
+ }
+
+ if (lock) {
+ mysql_rwlock_unlock(&m_rwlock);
+ }
+
+ return rec;
+}
+
+// this is a safe version of the find() function below. It acquires a read
+// lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
+// are finding it. Copying it into 'ret' increments the count making sure
+// that the object will not be discarded until we are finished with it.
+std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
+ GL_INDEX_ID gl_index_id) {
+ std::shared_ptr<const Rdb_key_def> ret(nullptr);
+
+ mysql_rwlock_rdlock(&m_rwlock);
+
+ auto it = m_index_num_to_keydef.find(gl_index_id);
+ if (it != m_index_num_to_keydef.end()) {
+ const auto table_def = find(it->second.first, false);
+ if (table_def && it->second.second < table_def->m_key_count) {
+ const auto &kd = table_def->m_key_descr_arr[it->second.second];
+ if (kd->max_storage_fmt_length() != 0) {
+ ret = kd;
+ }
+ }
+ } else {
+ auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
+ if (it != m_index_num_to_uncommitted_keydef.end()) {
+ const auto &kd = it->second;
+ if (kd->max_storage_fmt_length() != 0) {
+ ret = kd;
+ }
+ }
+ }
+
+ mysql_rwlock_unlock(&m_rwlock);
+
+ return ret;
+}
+
+// this method assumes at least read-only lock on m_rwlock
+const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
+ GL_INDEX_ID gl_index_id) {
+ auto it = m_index_num_to_keydef.find(gl_index_id);
+ if (it != m_index_num_to_keydef.end()) {
+ auto table_def = find(it->second.first, false);
+ if (table_def) {
+ if (it->second.second < table_def->m_key_count) {
+ return table_def->m_key_descr_arr[it->second.second];
+ }
+ }
+ } else {
+ auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
+ if (it != m_index_num_to_uncommitted_keydef.end()) {
+ return it->second;
+ }
+ }
+
+ static std::shared_ptr<Rdb_key_def> empty = nullptr;
+
+ return empty;
+}
+
+// this method returns the name of the table based on an index id. It acquires
+// a read lock on m_rwlock.
+const std::string Rdb_ddl_manager::safe_get_table_name(
+ const GL_INDEX_ID &gl_index_id) {
+ std::string ret;
+ mysql_rwlock_rdlock(&m_rwlock);
+ auto it = m_index_num_to_keydef.find(gl_index_id);
+ if (it != m_index_num_to_keydef.end()) {
+ ret = it->second.first;
+ }
+ mysql_rwlock_unlock(&m_rwlock);
+ return ret;
+}
+
+void Rdb_ddl_manager::set_stats(
+ const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
+ mysql_rwlock_wrlock(&m_rwlock);
+ for (auto src : stats) {
+ const auto &keydef = find(src.second.m_gl_index_id);
+ if (keydef) {
+ keydef->m_stats = src.second;
+ m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
+ }
+ }
+ mysql_rwlock_unlock(&m_rwlock);
+}
+
+void Rdb_ddl_manager::adjust_stats(
+ const std::vector<Rdb_index_stats> &new_data,
+ const std::vector<Rdb_index_stats> &deleted_data) {
+ mysql_rwlock_wrlock(&m_rwlock);
+ int i = 0;
+ for (const auto &data : {new_data, deleted_data}) {
+ for (const auto &src : data) {
+ const auto &keydef = find(src.m_gl_index_id);
+ if (keydef) {
+ keydef->m_stats.m_distinct_keys_per_prefix.resize(
+ keydef->get_key_parts());
+ keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
+ m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
+ }
+ }
+ i++;
+ }
+ const bool should_save_stats = !m_stats2store.empty();
+ mysql_rwlock_unlock(&m_rwlock);
+ if (should_save_stats) {
+ // Queue an async persist_stats(false) call to the background thread.
+ rdb_queue_save_stats_request();
+ }
+}
+
+void Rdb_ddl_manager::persist_stats(const bool sync) {
+ mysql_rwlock_wrlock(&m_rwlock);
+ const auto local_stats2store = std::move(m_stats2store);
+ m_stats2store.clear();
+ mysql_rwlock_unlock(&m_rwlock);
+
+ // Persist stats
+ const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
+ std::vector<Rdb_index_stats> stats;
+ std::transform(local_stats2store.begin(), local_stats2store.end(),
+ std::back_inserter(stats),
+ [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
+ return s.second;
+ });
+ m_dict->add_stats(wb.get(), stats);
+ m_dict->commit(wb.get(), sync);
+}
+
+/*
+ Put table definition of `tbl` into the mapping, and also write it to the
+ on-disk data dictionary.
+*/
+
+int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
+ rocksdb::WriteBatch *const batch) {
+ Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
+
+ buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
+
+ const std::string &dbname_tablename = tbl->full_tablename();
+ buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
+
+ int res;
+ if ((res = tbl->put_dict(m_dict, batch, buf_writer.to_slice()))) {
+ return res;
+ }
+ if ((res = put(tbl))) {
+ return res;
+ }
+ return HA_EXIT_SUCCESS;
+}
+
+/* Return 0 - ok, other value - error */
+/* TODO:
+ This function modifies m_ddl_map and m_index_num_to_keydef.
+ However, these changes need to be reversed if dict_manager.commit fails
+ See the discussion here: https://reviews.facebook.net/D35925#inline-259167
+ Tracked by https://github.com/facebook/mysql-5.6/issues/33
+*/
+int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
+ Rdb_tbl_def *rec;
+ const std::string &dbname_tablename = tbl->full_tablename();
+
+ if (lock) mysql_rwlock_wrlock(&m_rwlock);
+
+ // We have to do this find because 'tbl' is not yet in the list. We need
+ // to find the one we are replacing ('rec')
+ rec = find(dbname_tablename, false);
+ if (rec) {
+ // Free the old record.
+ delete rec;
+ m_ddl_map.erase(dbname_tablename);
+ }
+ m_ddl_map.emplace(dbname_tablename, tbl);
+
+ for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
+ m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
+ std::make_pair(dbname_tablename, keyno);
+ }
+ tbl->check_and_set_read_free_rpl_table();
+
+ if (lock) mysql_rwlock_unlock(&m_rwlock);
+ return 0;
+}
+
+void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
+ rocksdb::WriteBatch *const batch,
+ const bool lock) {
+ if (lock) mysql_rwlock_wrlock(&m_rwlock);
+
+ Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
+ key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
+ const std::string &dbname_tablename = tbl->full_tablename();
+ key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
+
+ m_dict->delete_key(batch, key_writer.to_slice());
+
+ const auto it = m_ddl_map.find(dbname_tablename);
+ if (it != m_ddl_map.end()) {
+ // Free Rdb_tbl_def
+ delete it->second;
+
+ m_ddl_map.erase(it);
+ }
+
+ if (lock) mysql_rwlock_unlock(&m_rwlock);
+}
+
+bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
+ rocksdb::WriteBatch *const batch) {
+ Rdb_tbl_def *rec;
+ Rdb_tbl_def *new_rec;
+ bool res = true;
+ Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
+
+ mysql_rwlock_wrlock(&m_rwlock);
+ if (!(rec = find(from, false))) {
+ mysql_rwlock_unlock(&m_rwlock);
+ return true;
+ }
+
+ new_rec = new Rdb_tbl_def(to);
+
+ new_rec->m_key_count = rec->m_key_count;
+ new_rec->m_auto_incr_val =
+ rec->m_auto_incr_val.load(std::memory_order_relaxed);
+ new_rec->m_key_descr_arr = rec->m_key_descr_arr;
+
+ new_rec->m_hidden_pk_val =
+ rec->m_hidden_pk_val.load(std::memory_order_relaxed);
+
+ // so that it's not free'd when deleting the old rec
+ rec->m_key_descr_arr = nullptr;
+
+ // Create a new key
+ new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
+
+ const std::string &dbname_tablename = new_rec->full_tablename();
+ new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
+
+ // Create a key to add
+ if (!new_rec->put_dict(m_dict, batch, new_buf_writer.to_slice())) {
+ remove(rec, batch, false);
+ put(new_rec, false);
+ res = false; // ok
+ }
+
+ mysql_rwlock_unlock(&m_rwlock);
+ return res;
+}
+
+void Rdb_ddl_manager::cleanup() {
+ for (const auto &kv : m_ddl_map) {
+ delete kv.second;
+ }
+ m_ddl_map.clear();
+
+ mysql_rwlock_destroy(&m_rwlock);
+ m_sequence.cleanup();
+}
+
+int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
+ int ret;
+ Rdb_tbl_def *rec;
+
+ DBUG_ASSERT(tables_scanner != nullptr);
+
+ mysql_rwlock_rdlock(&m_rwlock);
+
+ ret = 0;
+
+ for (const auto &kv : m_ddl_map) {
+ rec = kv.second;
+ ret = tables_scanner->add_table(rec);
+ if (ret) break;
+ }
+
+ mysql_rwlock_unlock(&m_rwlock);
+ return ret;
+}
+
+/*
+ Rdb_binlog_manager class implementation
+*/
+
+bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) {
+ DBUG_ASSERT(dict_arg != nullptr);
+ m_dict = dict_arg;
+
+ m_key_writer.reset();
+ m_key_writer.write_index(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER);
+ m_key_slice = m_key_writer.to_slice();
+ return false;
+}
+
+void Rdb_binlog_manager::cleanup() {}
+
+/**
+ Set binlog name, pos and optionally gtid into WriteBatch.
+ This function should be called as part of transaction commit,
+ since binlog info is set only at transaction commit.
+ Actual write into RocksDB is not done here, so checking if
+ write succeeded or not is not possible here.
+ @param binlog_name Binlog name
+ @param binlog_pos Binlog pos
+ @param batch WriteBatch
+*/
+void Rdb_binlog_manager::update(const char *const binlog_name,
+ const my_off_t binlog_pos,
+ rocksdb::WriteBatchBase *const batch) {
+ if (binlog_name && binlog_pos) {
+ // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024
+ const size_t RDB_MAX_BINLOG_INFO_LEN = 1024;
+ Rdb_buf_writer<RDB_MAX_BINLOG_INFO_LEN> value_writer;
+
+ // store version
+ value_writer.write_uint16(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION);
+
+ // store binlog file name length
+ DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN);
+ const uint16_t binlog_name_len = strlen(binlog_name);
+ value_writer.write_uint16(binlog_name_len);
+
+ // store binlog file name
+ value_writer.write(binlog_name, binlog_name_len);
+
+ // store binlog pos
+ value_writer.write_uint32(binlog_pos);
+
+#ifdef MARIADB_MERGE_2019
+ // store binlog gtid length.
+ // If gtid was not set, store 0 instead
+ const uint16_t binlog_max_gtid_len =
+ binlog_max_gtid ? strlen(binlog_max_gtid) : 0;
+ value_writer.write_uint16(binlog_max_gtid_len);
+
+ if (binlog_max_gtid_len > 0) {
+ // store binlog gtid
+ value_writer.write(binlog_max_gtid, binlog_max_gtid_len);
+ }
+#endif
+
+ m_dict->put_key(batch, m_key_slice, value_writer.to_slice());
+ }
+}
+
+/**
+ Read binlog committed entry stored in RocksDB, then unpack
+ @param[OUT] binlog_name Binlog name
+ @param[OUT] binlog_pos Binlog pos
+ @param[OUT] binlog_gtid Binlog GTID
+ @return
+ true is binlog info was found (valid behavior)
+ false otherwise
+*/
+bool Rdb_binlog_manager::read(char *const binlog_name,
+ my_off_t *const binlog_pos,
+ char *const binlog_gtid) const {
+ bool ret = false;
+ if (binlog_name) {
+ std::string value;
+ rocksdb::Status status = m_dict->get_value(m_key_slice, &value);
+ if (status.ok()) {
+ if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos,
+ binlog_gtid)) {
+ ret = true;
+ }
+ }
+ }
+ return ret;
+}
+
+/**
+ Unpack value then split into binlog_name, binlog_pos (and binlog_gtid)
+ @param[IN] value Binlog state info fetched from RocksDB
+ @param[OUT] binlog_name Binlog name
+ @param[OUT] binlog_pos Binlog pos
+ @param[OUT] binlog_gtid Binlog GTID
+ @return true on error
+*/
+bool Rdb_binlog_manager::unpack_value(const uchar *const value,
+ size_t value_size_arg,
+ char *const binlog_name,
+ my_off_t *const binlog_pos,
+ char *const binlog_gtid) const {
+ uint pack_len = 0;
+ intmax_t value_size= value_size_arg;
+
+ DBUG_ASSERT(binlog_pos != nullptr);
+
+ if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0)
+ return true;
+ // read version
+ const uint16_t version = rdb_netbuf_to_uint16(value);
+
+ pack_len += Rdb_key_def::VERSION_SIZE;
+ if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION) return true;
+
+ if ((value_size -= sizeof(uint16)) < 0)
+ return true;
+
+ // read binlog file name length
+ const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len);
+ pack_len += sizeof(uint16);
+
+ if (binlog_name_len >= (FN_REFLEN+1))
+ return true;
+
+ if ((value_size -= binlog_name_len) < 0)
+ return true;
+
+ if (binlog_name_len) {
+ // read and set binlog name
+ memcpy(binlog_name, value + pack_len, binlog_name_len);
+ binlog_name[binlog_name_len] = '\0';
+ pack_len += binlog_name_len;
+
+ if ((value_size -= sizeof(uint32)) < 0)
+ return true;
+ // read and set binlog pos
+ *binlog_pos = rdb_netbuf_to_uint32(value + pack_len);
+ pack_len += sizeof(uint32);
+
+ if ((value_size -= sizeof(uint16)) < 0)
+ return true;
+ // read gtid length
+ const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len);
+ pack_len += sizeof(uint16);
+
+ if (binlog_gtid_len >= GTID_BUF_LEN)
+ return true;
+ if ((value_size -= binlog_gtid_len) < 0)
+ return true;
+
+ if (binlog_gtid && binlog_gtid_len > 0) {
+ // read and set gtid
+ memcpy(binlog_gtid, value + pack_len, binlog_gtid_len);
+ binlog_gtid[binlog_gtid_len] = '\0';
+ pack_len += binlog_gtid_len;
+ }
+ }
+ return false;
+}
+
+/**
+ Inserts a row into mysql.slave_gtid_info table. Doing this inside
+ storage engine is more efficient than inserting/updating through MySQL.
+
+ @param[IN] id Primary key of the table.
+ @param[IN] db Database name. This is column 2 of the table.
+ @param[IN] gtid Gtid in human readable form. This is column 3 of the table.
+ @param[IN] write_batch Handle to storage engine writer.
+*/
+void Rdb_binlog_manager::update_slave_gtid_info(
+ const uint id, const char *const db, const char *const gtid,
+ rocksdb::WriteBatchBase *const write_batch) {
+ if (id && db && gtid) {
+ // Make sure that if the slave_gtid_info table exists we have a
+ // pointer to it via m_slave_gtid_info_tbl.
+ if (!m_slave_gtid_info_tbl.load()) {
+ m_slave_gtid_info_tbl.store(
+ rdb_get_ddl_manager()->find("mysql.slave_gtid_info"));
+ }
+ if (!m_slave_gtid_info_tbl.load()) {
+ // slave_gtid_info table is not present. Simply return.
+ return;
+ }
+ DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1);
+
+ const std::shared_ptr<const Rdb_key_def> &kd =
+ m_slave_gtid_info_tbl.load()->m_key_descr_arr[0];
+ String value;
+
+ // Build key
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE + 4> key_writer;
+ key_writer.write_index(kd->get_index_number());
+ key_writer.write_uint32(id);
+
+ // Build value
+ Rdb_buf_writer<128> value_writer;
+ DBUG_ASSERT(gtid);
+ const uint db_len = strlen(db);
+ const uint gtid_len = strlen(gtid);
+ // 1 byte used for flags. Empty here.
+ value_writer.write_byte(0);
+
+ // Write column 1.
+ DBUG_ASSERT(strlen(db) <= 64);
+ value_writer.write_byte(db_len);
+ value_writer.write(db, db_len);
+
+ // Write column 2.
+ DBUG_ASSERT(gtid_len <= 56);
+ value_writer.write_byte(gtid_len);
+ value_writer.write(gtid, gtid_len);
+
+ write_batch->Put(kd->get_cf(), key_writer.to_slice(),
+ value_writer.to_slice());
+ }
+}
+
+bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
+ Rdb_cf_manager *const cf_manager) {
+ DBUG_ASSERT(rdb_dict != nullptr);
+ DBUG_ASSERT(cf_manager != nullptr);
+
+ mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
+
+ m_db = rdb_dict;
+
+ m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME);
+ rocksdb::ColumnFamilyHandle *default_cfh =
+ cf_manager->get_cf(DEFAULT_CF_NAME);
+
+ // System CF and default CF should be initialized
+ if (m_system_cfh == nullptr || default_cfh == nullptr) {
+ return HA_EXIT_FAILURE;
+ }
+
+ rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
+
+ m_key_slice_max_index_id =
+ rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
+ Rdb_key_def::INDEX_NUMBER_SIZE);
+
+ resume_drop_indexes();
+ rollback_ongoing_index_creation();
+
+ // Initialize system CF and default CF flags
+ const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
+ rocksdb::WriteBatch *const batch = wb.get();
+
+ add_cf_flags(batch, m_system_cfh->GetID(), 0);
+ add_cf_flags(batch, default_cfh->GetID(), 0);
+ commit(batch);
+
+ return HA_EXIT_SUCCESS;
+}
+
+std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
+ return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
+}
+
+void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
+ const rocksdb::Slice &key,
+ const rocksdb::Slice &value) const {
+ batch->Put(m_system_cfh, key, value);
+}
+
+rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
+ std::string *const value) const {
+ rocksdb::ReadOptions options;
+ options.total_order_seek = true;
+ return m_db->Get(options, m_system_cfh, key, value);
+}
+
+void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
+ const rocksdb::Slice &key) const {
+ batch->Delete(m_system_cfh, key);
+}
+
+rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
+ /* Reading data dictionary should always skip bloom filter */
+ rocksdb::ReadOptions read_options;
+ read_options.total_order_seek = true;
+ return m_db->NewIterator(read_options, m_system_cfh);
+}
+
+int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
+ const bool sync) const {
+ if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
+ int res = HA_EXIT_SUCCESS;
+ rocksdb::WriteOptions options;
+ options.sync = sync;
+ rocksdb::TransactionDBWriteOptimizations optimize;
+ optimize.skip_concurrency_control = true;
+ rocksdb::Status s = m_db->Write(options, optimize, batch);
+ res = !s.ok(); // we return true when something failed
+ if (res) {
+ rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
+ }
+ batch->Clear();
+ return res;
+}
+
+void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
+ Rdb_key_def::DATA_DICT_TYPE dict_type,
+ const GL_INDEX_ID &gl_index_id) {
+ rdb_netbuf_store_uint32(netbuf, dict_type);
+ rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
+ gl_index_id.cf_id);
+ rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
+ gl_index_id.index_id);
+}
+
+void Rdb_dict_manager::delete_with_prefix(
+ rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
+ const GL_INDEX_ID &gl_index_id) const {
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, dict_type, gl_index_id);
+
+ delete_key(batch, key_writer.to_slice());
+}
+
+void Rdb_dict_manager::add_or_update_index_cf_mapping(
+ rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
+ index_info->m_gl_index_id);
+
+ Rdb_buf_writer<256> value_writer;
+
+ value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
+ value_writer.write_byte(index_info->m_index_type);
+ value_writer.write_uint16(index_info->m_kv_version);
+ value_writer.write_uint32(index_info->m_index_flags);
+ value_writer.write_uint64(index_info->m_ttl_duration);
+
+ batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
+}
+
+void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
+ const uint32_t cf_id,
+ const uint32_t cf_flags) const {
+ DBUG_ASSERT(batch != nullptr);
+
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
+ key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
+ key_writer.write_uint32(cf_id);
+
+ Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
+ value_writer;
+ value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
+ value_writer.write_uint32(cf_flags);
+
+ batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
+}
+
+void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
+ const GL_INDEX_ID &gl_index_id) const {
+ delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
+ delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
+ delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
+}
+
+bool Rdb_dict_manager::get_index_info(
+ const GL_INDEX_ID &gl_index_id,
+ struct Rdb_index_info *const index_info) const {
+ if (index_info) {
+ index_info->m_gl_index_id = gl_index_id;
+ }
+
+ bool found = false;
+ bool error = false;
+ std::string value;
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
+
+ const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
+ if (status.ok()) {
+ if (!index_info) {
+ return true;
+ }
+
+ const uchar *const val = (const uchar *)value.c_str();
+ const uchar *ptr = val;
+ index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
+ ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
+
+ switch (index_info->m_index_dict_version) {
+ case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
+ /* Sanity check to prevent reading bogus TTL record. */
+ if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
+ RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
+ RDB_SIZEOF_INDEX_FLAGS +
+ ROCKSDB_SIZEOF_TTL_RECORD) {
+ error = true;
+ break;
+ }
+ index_info->m_index_type = rdb_netbuf_to_byte(ptr);
+ ptr += RDB_SIZEOF_INDEX_TYPE;
+ index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
+ ptr += RDB_SIZEOF_KV_VERSION;
+ index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
+ ptr += RDB_SIZEOF_INDEX_FLAGS;
+ index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
+ found = true;
+ break;
+
+ case Rdb_key_def::INDEX_INFO_VERSION_TTL:
+ /* Sanity check to prevent reading bogus into TTL record. */
+ if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
+ RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
+ ROCKSDB_SIZEOF_TTL_RECORD) {
+ error = true;
+ break;
+ }
+ index_info->m_index_type = rdb_netbuf_to_byte(ptr);
+ ptr += RDB_SIZEOF_INDEX_TYPE;
+ index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
+ ptr += RDB_SIZEOF_KV_VERSION;
+ index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
+ if ((index_info->m_kv_version ==
+ Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
+ index_info->m_ttl_duration > 0) {
+ index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
+ }
+ found = true;
+ break;
+
+ case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
+ case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
+ index_info->m_index_type = rdb_netbuf_to_byte(ptr);
+ ptr += RDB_SIZEOF_INDEX_TYPE;
+ index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
+ found = true;
+ break;
+
+ default:
+ error = true;
+ break;
+ }
+
+ switch (index_info->m_index_type) {
+ case Rdb_key_def::INDEX_TYPE_PRIMARY:
+ case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
+ error = index_info->m_kv_version >
+ Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
+ break;
+ }
+ case Rdb_key_def::INDEX_TYPE_SECONDARY:
+ error = index_info->m_kv_version >
+ Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
+ break;
+ default:
+ error = true;
+ break;
+ }
+ }
+
+ if (error) {
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
+ "from data dictionary. This should never happen "
+ "and it may be a bug.",
+ index_info->m_index_dict_version, index_info->m_index_type,
+ index_info->m_kv_version, index_info->m_ttl_duration);
+ abort();
+ }
+
+ return found;
+}
+
+bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
+ uint32_t *const cf_flags) const {
+ DBUG_ASSERT(cf_flags != nullptr);
+
+ bool found = false;
+ std::string value;
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
+
+ key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
+ key_writer.write_uint32(cf_id);
+
+ const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
+
+ if (status.ok()) {
+ const uchar *val = (const uchar *)value.c_str();
+ DBUG_ASSERT(val);
+
+ const uint16_t version = rdb_netbuf_to_uint16(val);
+
+ if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
+ *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
+ found = true;
+ }
+ }
+
+ return found;
+}
+
+/*
+ Returning index ids that were marked as deleted (via DROP TABLE) but
+ still not removed by drop_index_thread yet, or indexes that are marked as
+ ongoing creation.
+ */
+void Rdb_dict_manager::get_ongoing_index_operation(
+ std::unordered_set<GL_INDEX_ID> *gl_index_ids,
+ Rdb_key_def::DATA_DICT_TYPE dd_type) const {
+ DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
+ dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
+
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
+ index_writer.write_uint32(dd_type);
+ const rocksdb::Slice index_slice = index_writer.to_slice();
+
+ rocksdb::Iterator *it = new_iterator();
+ for (it->Seek(index_slice); it->Valid(); it->Next()) {
+ rocksdb::Slice key = it->key();
+ const uchar *const ptr = (const uchar *)key.data();
+
+ /*
+ Ongoing drop/create index operations require key to be of the form:
+ dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
+
+ This may need to be changed in the future if we want to process a new
+ ddl_type with different format.
+ */
+ if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
+ rdb_netbuf_to_uint32(ptr) != dd_type) {
+ break;
+ }
+
+ // We don't check version right now since currently we always store only
+ // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
+ // If increasing version number, we need to add version check logic here.
+ GL_INDEX_ID gl_index_id;
+ gl_index_id.cf_id =
+ rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
+ gl_index_id.index_id =
+ rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
+ gl_index_ids->insert(gl_index_id);
+ }
+ delete it;
+}
+
+/*
+ Returning true if index_id is create/delete ongoing (undergoing creation or
+ marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
+ or not.
+ */
+bool Rdb_dict_manager::is_index_operation_ongoing(
+ const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
+ DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
+ dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
+
+ bool found = false;
+ std::string value;
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, dd_type, gl_index_id);
+
+ const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
+ if (status.ok()) {
+ found = true;
+ }
+ return found;
+}
+
+/*
+ Adding index_id to data dictionary so that the index id is removed
+ by drop_index_thread, or to track online index creation.
+ */
+void Rdb_dict_manager::start_ongoing_index_operation(
+ rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
+ Rdb_key_def::DATA_DICT_TYPE dd_type) const {
+ DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
+ dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
+
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
+
+ dump_index_id(&key_writer, dd_type, gl_index_id);
+
+ // version as needed
+ if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
+ value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
+ } else {
+ value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
+ }
+
+ batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
+}
+
+/*
+ Removing index_id from data dictionary to confirm drop_index_thread
+ completed dropping entire key/values of the index_id
+ */
+void Rdb_dict_manager::end_ongoing_index_operation(
+ rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
+ Rdb_key_def::DATA_DICT_TYPE dd_type) const {
+ DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
+ dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
+
+ delete_with_prefix(batch, dd_type, gl_index_id);
+}
+
+/*
+ Returning true if there is no target index ids to be removed
+ by drop_index_thread
+ */
+bool Rdb_dict_manager::is_drop_index_empty() const {
+ std::unordered_set<GL_INDEX_ID> gl_index_ids;
+ get_ongoing_drop_indexes(&gl_index_ids);
+ return gl_index_ids.empty();
+}
+
+/*
+ This function is supposed to be called by DROP TABLE. Logging messages
+ that dropping indexes started, and adding data dictionary so that
+ all associated indexes to be removed
+ */
+void Rdb_dict_manager::add_drop_table(
+ std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
+ rocksdb::WriteBatch *const batch) const {
+ std::unordered_set<GL_INDEX_ID> dropped_index_ids;
+ for (uint32 i = 0; i < n_keys; i++) {
+ dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
+ }
+
+ add_drop_index(dropped_index_ids, batch);
+}
+
+/*
+ Called during inplace index drop operations. Logging messages
+ that dropping indexes started, and adding data dictionary so that
+ all associated indexes to be removed
+ */
+void Rdb_dict_manager::add_drop_index(
+ const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
+ rocksdb::WriteBatch *const batch) const {
+ for (const auto &gl_index_id : gl_index_ids) {
+ log_start_drop_index(gl_index_id, "Begin");
+ start_drop_index(batch, gl_index_id);
+ }
+}
+
+/*
+ Called during inplace index creation operations. Logging messages
+ that adding indexes started, and updates data dictionary with all associated
+ indexes to be added.
+ */
+void Rdb_dict_manager::add_create_index(
+ const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
+ rocksdb::WriteBatch *const batch) const {
+ for (const auto &gl_index_id : gl_index_ids) {
+ // NO_LINT_DEBUG
+ sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)",
+ gl_index_id.cf_id, gl_index_id.index_id);
+ start_create_index(batch, gl_index_id);
+ }
+}
+
+/*
+ This function is supposed to be called by drop_index_thread, when it
+ finished dropping any index, or at the completion of online index creation.
+ */
+void Rdb_dict_manager::finish_indexes_operation(
+ const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
+ Rdb_key_def::DATA_DICT_TYPE dd_type) const {
+ DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
+ dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
+
+ const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
+ rocksdb::WriteBatch *const batch = wb.get();
+
+ std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
+ get_ongoing_create_indexes(&incomplete_create_indexes);
+
+ for (const auto &gl_index_id : gl_index_ids) {
+ if (is_index_operation_ongoing(gl_index_id, dd_type)) {
+ end_ongoing_index_operation(batch, gl_index_id, dd_type);
+
+ /*
+ Remove the corresponding incomplete create indexes from data
+ dictionary as well
+ */
+ if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
+ if (incomplete_create_indexes.count(gl_index_id)) {
+ end_ongoing_index_operation(batch, gl_index_id,
+ Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
+ }
+ }
+ }
+
+ if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
+ delete_index_info(batch, gl_index_id);
+ }
+ }
+ commit(batch);
+}
+
+/*
+ This function is supposed to be called when initializing
+ Rdb_dict_manager (at startup). If there is any index ids that are
+ drop ongoing, printing out messages for diagnostics purposes.
+ */
+void Rdb_dict_manager::resume_drop_indexes() const {
+ std::unordered_set<GL_INDEX_ID> gl_index_ids;
+ get_ongoing_drop_indexes(&gl_index_ids);
+
+ uint max_index_id_in_dict = 0;
+ get_max_index_id(&max_index_id_in_dict);
+
+ for (const auto &gl_index_id : gl_index_ids) {
+ log_start_drop_index(gl_index_id, "Resume");
+ if (max_index_id_in_dict < gl_index_id.index_id) {
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Found max index id %u from data dictionary "
+ "but also found dropped index id (%u,%u) from drop_index "
+ "dictionary. This should never happen and is possibly a "
+ "bug.",
+ max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
+ abort();
+ }
+ }
+}
+
+void Rdb_dict_manager::rollback_ongoing_index_creation() const {
+ const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
+ rocksdb::WriteBatch *const batch = wb.get();
+
+ std::unordered_set<GL_INDEX_ID> gl_index_ids;
+ get_ongoing_create_indexes(&gl_index_ids);
+
+ for (const auto &gl_index_id : gl_index_ids) {
+ // NO_LINT_DEBUG
+ sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)",
+ gl_index_id.cf_id, gl_index_id.index_id);
+
+ start_drop_index(batch, gl_index_id);
+ }
+
+ commit(batch);
+}
+
+void Rdb_dict_manager::log_start_drop_table(
+ const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
+ const char *const log_action) const {
+ for (uint32 i = 0; i < n_keys; i++) {
+ log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
+ }
+}
+
+void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
+ const char *log_action) const {
+ struct Rdb_index_info index_info;
+ if (!get_index_info(gl_index_id, &index_info)) {
+ /*
+ If we don't find the index info, it could be that it's because it was a
+ partially created index that isn't in the data dictionary yet that needs
+ to be rolled back.
+ */
+ std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
+ get_ongoing_create_indexes(&incomplete_create_indexes);
+
+ if (!incomplete_create_indexes.count(gl_index_id)) {
+ /* If it's not a partially created index, something is very wrong. */
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Failed to get column family info "
+ "from index id (%u,%u). MyRocks data dictionary may "
+ "get corrupted.",
+ gl_index_id.cf_id, gl_index_id.index_id);
+ if (rocksdb_ignore_datadic_errors)
+ {
+ sql_print_error("RocksDB: rocksdb_ignore_datadic_errors=1, "
+ "trying to continue");
+ return;
+ }
+ abort();
+ }
+ }
+}
+
+bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
+ bool found = false;
+ std::string value;
+
+ const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
+ if (status.ok()) {
+ const uchar *const val = (const uchar *)value.c_str();
+ const uint16_t version = rdb_netbuf_to_uint16(val);
+ if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
+ *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
+ found = true;
+ }
+ }
+ return found;
+}
+
+bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
+ const uint32_t index_id) const {
+ DBUG_ASSERT(batch != nullptr);
+
+ uint32_t old_index_id = -1;
+ if (get_max_index_id(&old_index_id)) {
+ if (old_index_id > index_id) {
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Found max index id %u from data dictionary "
+ "but trying to update to older value %u. This should "
+ "never happen and possibly a bug.",
+ old_index_id, index_id);
+ return true;
+ }
+ }
+
+ Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
+ value_writer;
+ value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
+ value_writer.write_uint32(index_id);
+
+ batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
+ return false;
+}
+
+void Rdb_dict_manager::add_stats(
+ rocksdb::WriteBatch *const batch,
+ const std::vector<Rdb_index_stats> &stats) const {
+ DBUG_ASSERT(batch != nullptr);
+
+ for (const auto &it : stats) {
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
+
+ // IndexStats::materialize takes complete care of serialization including
+ // storing the version
+ const auto value =
+ Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
+
+ batch->Put(m_system_cfh, key_writer.to_slice(), value);
+ }
+}
+
+Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
+
+ std::string value;
+ const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
+ if (status.ok()) {
+ std::vector<Rdb_index_stats> v;
+ // unmaterialize checks if the version matches
+ if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
+ return v[0];
+ }
+ }
+
+ return Rdb_index_stats();
+}
+
+rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
+ rocksdb::WriteBatchBase *batch, const GL_INDEX_ID &gl_index_id,
+ ulonglong val, bool overwrite) const {
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
+
+ // Value is constructed by storing the version and the value.
+ Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
+ ROCKSDB_SIZEOF_AUTOINC_VALUE>
+ value_writer;
+ value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
+ value_writer.write_uint64(val);
+
+ if (overwrite) {
+ return batch->Put(m_system_cfh, key_writer.to_slice(),
+ value_writer.to_slice());
+ }
+ return batch->Merge(m_system_cfh, key_writer.to_slice(),
+ value_writer.to_slice());
+}
+
+bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
+ ulonglong *new_val) const {
+ Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
+ dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
+
+ std::string value;
+ const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
+
+ if (status.ok()) {
+ const uchar *const val = reinterpret_cast<const uchar *>(value.data());
+
+ if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
+ *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
+ return true;
+ }
+ }
+ return false;
+}
+
+uint Rdb_seq_generator::get_and_update_next_number(
+ Rdb_dict_manager *const dict) {
+ DBUG_ASSERT(dict != nullptr);
+
+ uint res;
+ RDB_MUTEX_LOCK_CHECK(m_mutex);
+
+ res = m_next_number++;
+
+ const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
+ rocksdb::WriteBatch *const batch = wb.get();
+
+ DBUG_ASSERT(batch != nullptr);
+ dict->update_max_index_id(batch, res);
+ dict->commit(batch);
+
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
+
+ return res;
+}
+
+} // namespace myrocks