summaryrefslogtreecommitdiffstats
path: root/storage/rocksdb/rdb_converter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/rocksdb/rdb_converter.cc')
-rw-r--r--storage/rocksdb/rdb_converter.cc838
1 files changed, 838 insertions, 0 deletions
diff --git a/storage/rocksdb/rdb_converter.cc b/storage/rocksdb/rdb_converter.cc
new file mode 100644
index 00000000..6f8aa306
--- /dev/null
+++ b/storage/rocksdb/rdb_converter.cc
@@ -0,0 +1,838 @@
+/*
+ Copyright (c) 2015, Facebook, Inc.
+
+ This program is f
+ i the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
+
+#include <my_global.h>
+
+/* This C++ file's header file */
+#include "./rdb_converter.h"
+
+/* Standard C++ header files */
+#include <algorithm>
+#include <map>
+#include <string>
+#include <vector>
+
+/* MySQL header files */
+#include "./field.h"
+#include "./key.h"
+#include "./m_ctype.h"
+#include "./my_bit.h"
+#include "./my_bitmap.h"
+#include "./sql_table.h"
+
+
+/* MyRocks header files */
+#include "./ha_rocksdb.h"
+#include "./ha_rocksdb_proto.h"
+#include "./my_stacktrace.h"
+#include "./rdb_cf_manager.h"
+#include "./rdb_psi.h"
+#include "./rdb_utils.h"
+
+
+namespace myrocks {
+
+void dbug_modify_key_varchar8(String *on_disk_rec) {
+ std::string res;
+ // The key starts with index number
+ res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE);
+
+ // Then, a mem-comparable form of a varchar(8) value.
+ res.append("ABCDE\0\0\0\xFC", 9);
+ on_disk_rec->length(0);
+ on_disk_rec->append(res.data(), res.size());
+}
+
+/*
+ Convert field from rocksdb storage format into Mysql Record format
+ @param buf OUT start memory to fill converted data
+ @param offset IN/OUT decoded data is stored in buf + offset
+ @param table IN current table
+ @param field IN current field
+ @param reader IN rocksdb value slice reader
+ @param decode IN whether to decode current field
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_convert_to_record_value_decoder::decode(uchar *const buf, uint *offset,
+ TABLE *table,
+ my_core::Field *field,
+ Rdb_field_encoder *field_dec,
+ Rdb_string_reader *reader,
+ bool decode, bool is_null) {
+ int err = HA_EXIT_SUCCESS;
+
+ uint field_offset = field->ptr - table->record[0];
+ *offset = field_offset;
+ uint null_offset = field->null_offset();
+ bool maybe_null = field->real_maybe_null();
+ field->move_field(buf + field_offset,
+ maybe_null ? buf + null_offset : nullptr, field->null_bit);
+
+ if (is_null) {
+ if (decode) {
+ // This sets the NULL-bit of this record
+ field->set_null();
+ /*
+ Besides that, set the field value to default value. CHECKSUM TABLE
+ depends on this.
+ */
+ memcpy(field->ptr, table->s->default_values + field_offset,
+ field->pack_length());
+ }
+ } else {
+ if (decode) {
+ // sets non-null bits for this record
+ field->set_notnull();
+ }
+
+ if (field_dec->m_field_type == MYSQL_TYPE_BLOB) {
+ err = decode_blob(table, field, reader, decode);
+ } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
+ err = decode_varchar(field, reader, decode);
+ } else {
+ err = decode_fixed_length_field(field, field_dec, reader, decode);
+ }
+ }
+
+ // Restore field->ptr and field->null_ptr
+ field->move_field(table->record[0] + field_offset,
+ maybe_null ? table->record[0] + null_offset : nullptr,
+ field->null_bit);
+
+ return err;
+}
+
+/*
+ Convert blob from rocksdb storage format into Mysql Record format
+ @param table IN current table
+ @param field IN current field
+ @param reader IN rocksdb value slice reader
+ @param decode IN whether to decode current field
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_convert_to_record_value_decoder::decode_blob(TABLE *table, Field *field,
+ Rdb_string_reader *reader,
+ bool decode) {
+ my_core::Field_blob *blob = (my_core::Field_blob *)field;
+
+ // Get the number of bytes needed to store length
+ const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
+
+ const char *data_len_str;
+ if (!(data_len_str = reader->read(length_bytes))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ memcpy(blob->ptr, data_len_str, length_bytes);
+ uint32 data_len =
+ blob->get_length(reinterpret_cast<const uchar *>(data_len_str),
+ length_bytes);
+ const char *blob_ptr;
+ if (!(blob_ptr = reader->read(data_len))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ if (decode) {
+ // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
+ // platforms)
+ memset(blob->ptr + length_bytes, 0, 8);
+ memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **));
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Convert fixed length field from rocksdb storage format into Mysql Record
+ format
+ @param field IN current field
+ @param field_dec IN data structure conttain field encoding data
+ @param reader IN rocksdb value slice reader
+ @param decode IN whether to decode current field
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_convert_to_record_value_decoder::decode_fixed_length_field(
+ my_core::Field *const field, Rdb_field_encoder *field_dec,
+ Rdb_string_reader *const reader, bool decode) {
+ uint len = field_dec->m_pack_length_in_rec;
+ if (len > 0) {
+ const char *data_bytes;
+ if ((data_bytes = reader->read(len)) == nullptr) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ if (decode) {
+ memcpy(field->ptr, data_bytes, len);
+ }
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Convert varchar field from rocksdb storage format into Mysql Record format
+ @param field IN current field
+ @param field_dec IN data structure conttain field encoding data
+ @param reader IN rocksdb value slice reader
+ @param decode IN whether to decode current field
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_convert_to_record_value_decoder::decode_varchar(
+ Field *field, Rdb_string_reader *const reader, bool decode) {
+ my_core::Field_varstring *const field_var = (my_core::Field_varstring *)field;
+
+ const char *data_len_str;
+ if (!(data_len_str = reader->read(field_var->length_bytes))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ uint data_len;
+ // field_var->length_bytes is 1 or 2
+ if (field_var->length_bytes == 1) {
+ data_len = (uchar)data_len_str[0];
+ } else {
+ DBUG_ASSERT(field_var->length_bytes == 2);
+ data_len = uint2korr(data_len_str);
+ }
+
+ if (data_len > field_var->field_length) {
+ // The data on disk is longer than table DDL allows?
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ if (!reader->read(data_len)) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ if (decode) {
+ memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len);
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+template <typename value_field_decoder>
+Rdb_value_field_iterator<value_field_decoder>::Rdb_value_field_iterator(
+ TABLE *table, Rdb_string_reader *value_slice_reader,
+ const Rdb_converter *rdb_converter, uchar *const buf)
+ : m_buf(buf) {
+ DBUG_ASSERT(table != nullptr);
+ DBUG_ASSERT(buf != nullptr);
+
+ m_table = table;
+ m_value_slice_reader = value_slice_reader;
+ auto fields = rdb_converter->get_decode_fields();
+ m_field_iter = fields->begin();
+ m_field_end = fields->end();
+ m_null_bytes = rdb_converter->get_null_bytes();
+ m_offset = 0;
+}
+
+// Iterate each requested field and decode one by one
+template <typename value_field_decoder>
+int Rdb_value_field_iterator<value_field_decoder>::next() {
+ int err = HA_EXIT_SUCCESS;
+ while (m_field_iter != m_field_end) {
+ m_field_dec = m_field_iter->m_field_enc;
+ bool decode = m_field_iter->m_decode;
+ bool maybe_null = m_field_dec->maybe_null();
+ // This is_null value is bind to how stroage format store its value
+ m_is_null = maybe_null && ((m_null_bytes[m_field_dec->m_null_offset] &
+ m_field_dec->m_null_mask) != 0);
+
+ // Skip the bytes we need to skip
+ int skip = m_field_iter->m_skip;
+ if (skip && !m_value_slice_reader->read(skip)) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ m_field = m_table->field[m_field_dec->m_field_index];
+ // Decode each field
+ err = value_field_decoder::decode(m_buf, &m_offset, m_table, m_field,
+ m_field_dec, m_value_slice_reader, decode,
+ m_is_null);
+ if (err != HA_EXIT_SUCCESS) {
+ return err;
+ }
+ m_field_iter++;
+ // Only break for the field that are actually decoding rather than skipping
+ if (decode) {
+ break;
+ }
+ }
+ return err;
+}
+
+template <typename value_field_decoder>
+bool Rdb_value_field_iterator<value_field_decoder>::end_of_fields() const {
+ return m_field_iter == m_field_end;
+}
+
+template <typename value_field_decoder>
+Field *Rdb_value_field_iterator<value_field_decoder>::get_field() const {
+ DBUG_ASSERT(m_field != nullptr);
+ return m_field;
+}
+
+template <typename value_field_decoder>
+void *Rdb_value_field_iterator<value_field_decoder>::get_dst() const {
+ DBUG_ASSERT(m_buf != nullptr);
+ return m_buf + m_offset;
+}
+
+template <typename value_field_decoder>
+int Rdb_value_field_iterator<value_field_decoder>::get_field_index() const {
+ DBUG_ASSERT(m_field_dec != nullptr);
+ return m_field_dec->m_field_index;
+}
+
+template <typename value_field_decoder>
+enum_field_types Rdb_value_field_iterator<value_field_decoder>::get_field_type()
+ const {
+ DBUG_ASSERT(m_field_dec != nullptr);
+ return m_field_dec->m_field_type;
+}
+
+template <typename value_field_decoder>
+bool Rdb_value_field_iterator<value_field_decoder>::is_null() const {
+ DBUG_ASSERT(m_field != nullptr);
+ return m_is_null;
+}
+
+/*
+ Initialize Rdb_converter with table data
+ @param thd IN Thread context
+ @param tbl_def IN MyRocks table definition
+ @param table IN Current open table
+*/
+Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def,
+ TABLE *table)
+ : m_thd(thd), m_tbl_def(tbl_def), m_table(table) {
+ DBUG_ASSERT(thd != nullptr);
+ DBUG_ASSERT(tbl_def != nullptr);
+ DBUG_ASSERT(table != nullptr);
+
+ m_key_requested = false;
+ m_verify_row_debug_checksums = false;
+ m_maybe_unpack_info = false;
+ m_row_checksums_checked = 0;
+ m_null_bytes = nullptr;
+ setup_field_encoders();
+}
+
+Rdb_converter::~Rdb_converter() {
+ my_free(m_encoder_arr);
+ m_encoder_arr = nullptr;
+ // These are needed to suppress valgrind errors in rocksdb.partition
+ m_storage_record.free();
+}
+
+/*
+ Decide storage type for each encoder
+*/
+void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
+ const uint kp) {
+ auto pk_descr =
+ m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)];
+ // STORE_SOME uses unpack_info.
+ if (pk_descr->has_unpack_info(kp)) {
+ DBUG_ASSERT(pk_descr->can_unpack(kp));
+ encoder->m_storage_type = Rdb_field_encoder::STORE_SOME;
+ m_maybe_unpack_info = true;
+ } else if (pk_descr->can_unpack(kp)) {
+ encoder->m_storage_type = Rdb_field_encoder::STORE_NONE;
+ }
+}
+
+/*
+ @brief
+ Setup which fields will be unpacked when reading rows
+
+ @detail
+ Three special cases when we still unpack all fields:
+ - When client requires decode_all_fields, such as this table is being
+ updated (m_lock_rows==RDB_LOCK_WRITE).
+ - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to
+ read all fields to find whether there is a row checksum at the end. We could
+ skip the fields instead of decoding them, but currently we do decoding.)
+ - On index merge as bitmap is cleared during that operation
+
+ @seealso
+ Rdb_converter::setup_field_encoders()
+ Rdb_converter::convert_record_from_storage_format()
+*/
+void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
+ bool decode_all_fields) {
+ m_key_requested = false;
+ m_decoders_vect.clear();
+ int last_useful = 0;
+ int skip_size = 0;
+
+ for (uint i = 0; i < m_table->s->fields; i++) {
+ // bitmap is cleared on index merge, but it still needs to decode columns
+ bool field_requested =
+ decode_all_fields || m_verify_row_debug_checksums ||
+ bitmap_is_clear_all(field_map) ||
+ bitmap_is_set(field_map, m_table->field[i]->field_index);
+
+ // We only need the decoder if the whole record is stored.
+ if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
+ // the field potentially needs unpacking
+ if (field_requested) {
+ // the field is in the read set
+ m_key_requested = true;
+ }
+ continue;
+ }
+
+ if (field_requested) {
+ // We will need to decode this field
+ m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size});
+ last_useful = m_decoders_vect.size();
+ skip_size = 0;
+ } else {
+ if (m_encoder_arr[i].uses_variable_len_encoding() ||
+ m_encoder_arr[i].maybe_null()) {
+ // For variable-length field, we need to read the data and skip it
+ m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size});
+ skip_size = 0;
+ } else {
+ // Fixed-width field can be skipped without looking at it.
+ // Add appropriate skip_size to the next field.
+ skip_size += m_encoder_arr[i].m_pack_length_in_rec;
+ }
+ }
+ }
+
+ // It could be that the last few elements are varchars that just do
+ // skipping. Remove them.
+ m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
+ m_decoders_vect.end());
+}
+
+void Rdb_converter::setup_field_encoders() {
+ uint null_bytes_length = 0;
+ uchar cur_null_mask = 0x1;
+
+ m_encoder_arr = static_cast<Rdb_field_encoder *>(
+ my_malloc(PSI_INSTRUMENT_ME, m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
+ if (m_encoder_arr == nullptr) {
+ return;
+ }
+
+ for (uint i = 0; i < m_table->s->fields; i++) {
+ Field *const field = m_table->field[i];
+ m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL;
+
+ /*
+ Check if this field is
+ - a part of primary key, and
+ - it can be decoded back from its key image.
+ If both hold, we don't need to store this field in the value part of
+ RocksDB's key-value pair.
+
+ If hidden pk exists, we skip this check since the field will never be
+ part of the hidden pk.
+ */
+ if (!Rdb_key_def::table_has_hidden_pk(m_table)) {
+ KEY *const pk_info = &m_table->key_info[m_table->s->primary_key];
+ for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) {
+ // key_part->fieldnr is counted from 1
+ if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) {
+ get_storage_type(&m_encoder_arr[i], kp);
+ break;
+ }
+ }
+ }
+
+ m_encoder_arr[i].m_field_type = field->real_type();
+ m_encoder_arr[i].m_field_index = i;
+ m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec();
+
+ if (field->real_maybe_null()) {
+ m_encoder_arr[i].m_null_mask = cur_null_mask;
+ m_encoder_arr[i].m_null_offset = null_bytes_length;
+ if (cur_null_mask == 0x80) {
+ cur_null_mask = 0x1;
+ null_bytes_length++;
+ } else {
+ cur_null_mask = cur_null_mask << 1;
+ }
+ } else {
+ m_encoder_arr[i].m_null_mask = 0;
+ }
+ }
+
+ // Count the last, unfinished NULL-bits byte
+ if (cur_null_mask != 0x1) {
+ null_bytes_length++;
+ }
+
+ m_null_bytes_length_in_record = null_bytes_length;
+}
+
+/*
+ EntryPoint for Decode:
+ Decode key slice(if requested) and value slice using built-in field
+ decoders
+ @param key_def IN key definition to decode
+ @param dst OUT Mysql buffer to fill decoded content
+ @param key_slice IN RocksDB key slice to decode
+ @param value_slice IN RocksDB value slice to decode
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_converter::decode(const std::shared_ptr<Rdb_key_def> &key_def,
+ uchar *dst, // address to fill data
+ const rocksdb::Slice *key_slice,
+ const rocksdb::Slice *value_slice) {
+ // Currently only support decode primary key, Will add decode secondary later
+ DBUG_ASSERT(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
+ key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
+
+ const rocksdb::Slice *updated_key_slice = key_slice;
+#ifndef DBUG_OFF
+ String last_rowkey;
+ last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin);
+ DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
+ { dbug_modify_key_varchar8(&last_rowkey); });
+ rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length());
+ updated_key_slice = &rowkey_slice;
+#endif
+ return convert_record_from_storage_format(key_def, updated_key_slice,
+ value_slice, dst);
+}
+
+/*
+ Decode value slice header
+ @param reader IN value slice reader
+ @param pk_def IN key definition to decode
+ @param unpack_slice OUT unpack info slice
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_converter::decode_value_header(
+ Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def,
+ rocksdb::Slice *unpack_slice) {
+ /* If it's a TTL record, skip the 8 byte TTL value */
+ if (pk_def->has_ttl()) {
+ const char *ttl_bytes;
+ if ((ttl_bytes = reader->read(ROCKSDB_SIZEOF_TTL_RECORD))) {
+ memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD);
+ } else {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ }
+
+ /* Other fields are decoded from the value */
+ if (m_null_bytes_length_in_record &&
+ !(m_null_bytes = reader->read(m_null_bytes_length_in_record))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ if (m_maybe_unpack_info) {
+ const char *unpack_info = reader->get_current_ptr();
+ if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) ||
+ !reader->read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+
+ uint16 unpack_info_len =
+ rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1));
+ *unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len);
+
+ reader->read(unpack_info_len -
+ Rdb_key_def::get_unpack_header_size(unpack_info[0]));
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Convert RocksDb key slice and value slice to Mysql format
+ @param key_def IN key definition to decode
+ @param key_slice IN RocksDB key slice
+ @param value_slice IN RocksDB value slice
+ @param dst OUT MySql format address
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_converter::convert_record_from_storage_format(
+ const std::shared_ptr<Rdb_key_def> &pk_def,
+ const rocksdb::Slice *const key_slice,
+ const rocksdb::Slice *const value_slice, uchar *const dst) {
+ int err = HA_EXIT_SUCCESS;
+
+ Rdb_string_reader value_slice_reader(value_slice);
+ rocksdb::Slice unpack_slice;
+ err = decode_value_header(&value_slice_reader, pk_def, &unpack_slice);
+ if (err != HA_EXIT_SUCCESS) {
+ return err;
+ }
+
+ /*
+ Decode PK fields from the key
+ */
+ if (m_key_requested) {
+ err = pk_def->unpack_record(m_table, dst, key_slice,
+ !unpack_slice.empty() ? &unpack_slice : nullptr,
+ false /* verify_checksum */);
+ }
+ if (err != HA_EXIT_SUCCESS) {
+ return err;
+ }
+
+ Rdb_value_field_iterator<Rdb_convert_to_record_value_decoder>
+ value_field_iterator(m_table, &value_slice_reader, this, dst);
+
+ // Decode value slices
+ while (!value_field_iterator.end_of_fields()) {
+ err = value_field_iterator.next();
+
+ if (err != HA_EXIT_SUCCESS) {
+ return err;
+ }
+ }
+
+ if (m_verify_row_debug_checksums) {
+ return verify_row_debug_checksum(pk_def, &value_slice_reader, key_slice,
+ value_slice);
+ }
+ return HA_EXIT_SUCCESS;
+}
+
+/*
+ Verify checksum for row
+ @param pk_def IN key def
+ @param reader IN RocksDB value slice reader
+ @param key IN RocksDB key slice
+ @param value IN RocksDB value slice
+ @return
+ 0 OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int Rdb_converter::verify_row_debug_checksum(
+ const std::shared_ptr<Rdb_key_def> &pk_def, Rdb_string_reader *reader,
+ const rocksdb::Slice *key, const rocksdb::Slice *value) {
+ if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE &&
+ reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) {
+ uint32_t stored_key_chksum =
+ rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
+ uint32_t stored_val_chksum =
+ rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
+
+ const uint32_t computed_key_chksum =
+ my_core::my_checksum(0, rdb_slice_to_uchar_ptr(key), key->size());
+ const uint32_t computed_val_chksum =
+ my_core::my_checksum(0, rdb_slice_to_uchar_ptr(value),
+ value->size() - RDB_CHECKSUM_CHUNK_SIZE);
+
+ DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;);
+
+ if (stored_key_chksum != computed_key_chksum) {
+ pk_def->report_checksum_mismatch(true, key->data(), key->size());
+ return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
+ }
+
+ DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;);
+ if (stored_val_chksum != computed_val_chksum) {
+ pk_def->report_checksum_mismatch(false, value->data(), value->size());
+ return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
+ }
+
+ m_row_checksums_checked++;
+ }
+ if (reader->remaining_bytes()) {
+ return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
+ return HA_EXIT_SUCCESS;
+}
+
+/**
+ Convert record from table->record[0] form into a form that can be written
+ into rocksdb.
+
+ @param pk_def IN Current key def
+ @pk_unpack_info IN Unpack info generated during key pack
+ @is_update_row IN Whether it is update row
+ @store_row_debug_checksums IN Whether to store checksums
+ @param ttl_bytes IN/OUT Old ttl value from previous record and
+ ttl value during current encode
+ @is_ttl_bytes_updated OUT Whether ttl bytes is updated
+ @param value_slice OUT Data slice with record data.
+*/
+int Rdb_converter::encode_value_slice(
+ const std::shared_ptr<Rdb_key_def> &pk_def,
+ const rocksdb::Slice &pk_packed_slice, Rdb_string_writer *pk_unpack_info,
+ bool is_update_row, bool store_row_debug_checksums, char *ttl_bytes,
+ bool *is_ttl_bytes_updated, rocksdb::Slice *const value_slice) {
+ DBUG_ASSERT(pk_def != nullptr);
+ // Currently only primary key will store value slice
+ DBUG_ASSERT(pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
+ pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
+ DBUG_ASSERT_IMP(m_maybe_unpack_info, pk_unpack_info);
+
+ bool has_ttl = pk_def->has_ttl();
+ bool has_ttl_column = !pk_def->m_ttl_column.empty();
+
+ m_storage_record.length(0);
+
+ if (has_ttl) {
+ /* If it's a TTL record, reserve space for 8 byte TTL value in front. */
+ m_storage_record.fill(
+ ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_length_in_record, 0);
+ // NOTE: is_ttl_bytes_updated is only used for update case
+ // During update, skip update sk key/values slice iff none of sk fields
+ // have changed and ttl bytes isn't changed. see
+ // ha_rocksdb::update_write_sk() for more info
+ *is_ttl_bytes_updated = false;
+ char *const data = const_cast<char *>(m_storage_record.ptr());
+ if (has_ttl_column) {
+ DBUG_ASSERT(pk_def->get_ttl_field_index() != UINT_MAX);
+ Field *const field = m_table->field[pk_def->get_ttl_field_index()];
+ DBUG_ASSERT(field->pack_length_in_rec() == ROCKSDB_SIZEOF_TTL_RECORD);
+ DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
+
+ uint64 ts = uint8korr(field->ptr);
+#ifndef DBUG_OFF
+ ts += rdb_dbug_set_ttl_rec_ts();
+#endif
+ rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
+ if (is_update_row) {
+ *is_ttl_bytes_updated =
+ memcmp(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
+ }
+ // Also store in m_ttl_bytes to propagate to update_write_sk
+ memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
+ } else {
+ /*
+ For implicitly generated TTL records we need to copy over the old
+ TTL value from the old record in the event of an update. It was stored
+ in m_ttl_bytes.
+
+ Otherwise, generate a timestamp using the current time.
+ */
+ if (is_update_row) {
+ memcpy(data, ttl_bytes, sizeof(uint64));
+ } else {
+ uint64 ts = static_cast<uint64>(std::time(nullptr));
+#ifndef DBUG_OFF
+ ts += rdb_dbug_set_ttl_rec_ts();
+#endif
+ rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
+ // Also store in m_ttl_bytes to propagate to update_write_sk
+ memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
+ }
+ }
+ } else {
+ /* All NULL bits are initially 0 */
+ m_storage_record.fill(m_null_bytes_length_in_record, 0);
+ }
+
+ // If a primary key may have non-empty unpack_info for certain values,
+ // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
+ // itself was prepared in Rdb_key_def::pack_record.
+ if (m_maybe_unpack_info) {
+ m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
+ pk_unpack_info->get_current_pos());
+ }
+ for (uint i = 0; i < m_table->s->fields; i++) {
+ Rdb_field_encoder &encoder = m_encoder_arr[i];
+ /* Don't pack decodable PK key parts */
+ if (encoder.m_storage_type != Rdb_field_encoder::STORE_ALL) {
+ continue;
+ }
+
+ Field *const field = m_table->field[i];
+ if (encoder.maybe_null()) {
+ char *data = const_cast<char *>(m_storage_record.ptr());
+ if (has_ttl) {
+ data += ROCKSDB_SIZEOF_TTL_RECORD;
+ }
+
+ if (field->is_null()) {
+ data[encoder.m_null_offset] |= encoder.m_null_mask;
+ /* Don't write anything for NULL values */
+ continue;
+ }
+ }
+
+ if (encoder.m_field_type == MYSQL_TYPE_BLOB) {
+ my_core::Field_blob *blob =
+ reinterpret_cast<my_core::Field_blob *>(field);
+ /* Get the number of bytes needed to store length*/
+ const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
+
+ /* Store the length of the value */
+ m_storage_record.append(reinterpret_cast<char *>(blob->ptr),
+ length_bytes);
+
+ /* Store the blob value itself */
+ char *data_ptr;
+ memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **));
+ m_storage_record.append(data_ptr, blob->get_length());
+ } else if (encoder.m_field_type == MYSQL_TYPE_VARCHAR) {
+ Field_varstring *const field_var =
+ reinterpret_cast<Field_varstring *>(field);
+ uint data_len;
+ /* field_var->length_bytes is 1 or 2 */
+ if (field_var->length_bytes == 1) {
+ data_len = field_var->ptr[0];
+ } else {
+ DBUG_ASSERT(field_var->length_bytes == 2);
+ data_len = uint2korr(field_var->ptr);
+ }
+ m_storage_record.append(reinterpret_cast<char *>(field_var->ptr),
+ field_var->length_bytes + data_len);
+ } else {
+ /* Copy the field data */
+ const uint len = field->pack_length_in_rec();
+ m_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
+ }
+ }
+
+ if (store_row_debug_checksums) {
+ const uint32_t key_crc32 = my_core::my_checksum(
+ 0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
+ const uint32_t val_crc32 =
+ my_core::my_checksum(0, rdb_mysql_str_to_uchar_str(&m_storage_record),
+ m_storage_record.length());
+ uchar key_crc_buf[RDB_CHECKSUM_SIZE];
+ uchar val_crc_buf[RDB_CHECKSUM_SIZE];
+ rdb_netbuf_store_uint32(key_crc_buf, key_crc32);
+ rdb_netbuf_store_uint32(val_crc_buf, val_crc32);
+ m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1);
+ m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE);
+ m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE);
+ }
+
+ *value_slice =
+ rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length());
+
+ return HA_EXIT_SUCCESS;
+}
+} // namespace myrocks