diff options
Diffstat (limited to 'storage/rocksdb/rdb_converter.cc')
-rw-r--r-- | storage/rocksdb/rdb_converter.cc | 838 |
1 files changed, 838 insertions, 0 deletions
diff --git a/storage/rocksdb/rdb_converter.cc b/storage/rocksdb/rdb_converter.cc new file mode 100644 index 00000000..6f8aa306 --- /dev/null +++ b/storage/rocksdb/rdb_converter.cc @@ -0,0 +1,838 @@ +/* + Copyright (c) 2015, Facebook, Inc. + + This program is f + i the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ + +#include <my_global.h> + +/* This C++ file's header file */ +#include "./rdb_converter.h" + +/* Standard C++ header files */ +#include <algorithm> +#include <map> +#include <string> +#include <vector> + +/* MySQL header files */ +#include "./field.h" +#include "./key.h" +#include "./m_ctype.h" +#include "./my_bit.h" +#include "./my_bitmap.h" +#include "./sql_table.h" + + +/* MyRocks header files */ +#include "./ha_rocksdb.h" +#include "./ha_rocksdb_proto.h" +#include "./my_stacktrace.h" +#include "./rdb_cf_manager.h" +#include "./rdb_psi.h" +#include "./rdb_utils.h" + + +namespace myrocks { + +void dbug_modify_key_varchar8(String *on_disk_rec) { + std::string res; + // The key starts with index number + res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE); + + // Then, a mem-comparable form of a varchar(8) value. + res.append("ABCDE\0\0\0\xFC", 9); + on_disk_rec->length(0); + on_disk_rec->append(res.data(), res.size()); +} + +/* + Convert field from rocksdb storage format into Mysql Record format + @param buf OUT start memory to fill converted data + @param offset IN/OUT decoded data is stored in buf + offset + @param table IN current table + @param field IN current field + @param reader IN rocksdb value slice reader + @param decode IN whether to decode current field + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_convert_to_record_value_decoder::decode(uchar *const buf, uint *offset, + TABLE *table, + my_core::Field *field, + Rdb_field_encoder *field_dec, + Rdb_string_reader *reader, + bool decode, bool is_null) { + int err = HA_EXIT_SUCCESS; + + uint field_offset = field->ptr - table->record[0]; + *offset = field_offset; + uint null_offset = field->null_offset(); + bool maybe_null = field->real_maybe_null(); + field->move_field(buf + field_offset, + maybe_null ? buf + null_offset : nullptr, field->null_bit); + + if (is_null) { + if (decode) { + // This sets the NULL-bit of this record + field->set_null(); + /* + Besides that, set the field value to default value. CHECKSUM TABLE + depends on this. + */ + memcpy(field->ptr, table->s->default_values + field_offset, + field->pack_length()); + } + } else { + if (decode) { + // sets non-null bits for this record + field->set_notnull(); + } + + if (field_dec->m_field_type == MYSQL_TYPE_BLOB) { + err = decode_blob(table, field, reader, decode); + } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) { + err = decode_varchar(field, reader, decode); + } else { + err = decode_fixed_length_field(field, field_dec, reader, decode); + } + } + + // Restore field->ptr and field->null_ptr + field->move_field(table->record[0] + field_offset, + maybe_null ? table->record[0] + null_offset : nullptr, + field->null_bit); + + return err; +} + +/* + Convert blob from rocksdb storage format into Mysql Record format + @param table IN current table + @param field IN current field + @param reader IN rocksdb value slice reader + @param decode IN whether to decode current field + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_convert_to_record_value_decoder::decode_blob(TABLE *table, Field *field, + Rdb_string_reader *reader, + bool decode) { + my_core::Field_blob *blob = (my_core::Field_blob *)field; + + // Get the number of bytes needed to store length + const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; + + const char *data_len_str; + if (!(data_len_str = reader->read(length_bytes))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + memcpy(blob->ptr, data_len_str, length_bytes); + uint32 data_len = + blob->get_length(reinterpret_cast<const uchar *>(data_len_str), + length_bytes); + const char *blob_ptr; + if (!(blob_ptr = reader->read(data_len))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (decode) { + // set 8-byte pointer to 0, like innodb does (relevant for 32-bit + // platforms) + memset(blob->ptr + length_bytes, 0, 8); + memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **)); + } + + return HA_EXIT_SUCCESS; +} + +/* + Convert fixed length field from rocksdb storage format into Mysql Record + format + @param field IN current field + @param field_dec IN data structure conttain field encoding data + @param reader IN rocksdb value slice reader + @param decode IN whether to decode current field + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_convert_to_record_value_decoder::decode_fixed_length_field( + my_core::Field *const field, Rdb_field_encoder *field_dec, + Rdb_string_reader *const reader, bool decode) { + uint len = field_dec->m_pack_length_in_rec; + if (len > 0) { + const char *data_bytes; + if ((data_bytes = reader->read(len)) == nullptr) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (decode) { + memcpy(field->ptr, data_bytes, len); + } + } + + return HA_EXIT_SUCCESS; +} + +/* + Convert varchar field from rocksdb storage format into Mysql Record format + @param field IN current field + @param field_dec IN data structure conttain field encoding data + @param reader IN rocksdb value slice reader + @param decode IN whether to decode current field + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_convert_to_record_value_decoder::decode_varchar( + Field *field, Rdb_string_reader *const reader, bool decode) { + my_core::Field_varstring *const field_var = (my_core::Field_varstring *)field; + + const char *data_len_str; + if (!(data_len_str = reader->read(field_var->length_bytes))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + uint data_len; + // field_var->length_bytes is 1 or 2 + if (field_var->length_bytes == 1) { + data_len = (uchar)data_len_str[0]; + } else { + DBUG_ASSERT(field_var->length_bytes == 2); + data_len = uint2korr(data_len_str); + } + + if (data_len > field_var->field_length) { + // The data on disk is longer than table DDL allows? + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (!reader->read(data_len)) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (decode) { + memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len); + } + + return HA_EXIT_SUCCESS; +} + +template <typename value_field_decoder> +Rdb_value_field_iterator<value_field_decoder>::Rdb_value_field_iterator( + TABLE *table, Rdb_string_reader *value_slice_reader, + const Rdb_converter *rdb_converter, uchar *const buf) + : m_buf(buf) { + DBUG_ASSERT(table != nullptr); + DBUG_ASSERT(buf != nullptr); + + m_table = table; + m_value_slice_reader = value_slice_reader; + auto fields = rdb_converter->get_decode_fields(); + m_field_iter = fields->begin(); + m_field_end = fields->end(); + m_null_bytes = rdb_converter->get_null_bytes(); + m_offset = 0; +} + +// Iterate each requested field and decode one by one +template <typename value_field_decoder> +int Rdb_value_field_iterator<value_field_decoder>::next() { + int err = HA_EXIT_SUCCESS; + while (m_field_iter != m_field_end) { + m_field_dec = m_field_iter->m_field_enc; + bool decode = m_field_iter->m_decode; + bool maybe_null = m_field_dec->maybe_null(); + // This is_null value is bind to how stroage format store its value + m_is_null = maybe_null && ((m_null_bytes[m_field_dec->m_null_offset] & + m_field_dec->m_null_mask) != 0); + + // Skip the bytes we need to skip + int skip = m_field_iter->m_skip; + if (skip && !m_value_slice_reader->read(skip)) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + m_field = m_table->field[m_field_dec->m_field_index]; + // Decode each field + err = value_field_decoder::decode(m_buf, &m_offset, m_table, m_field, + m_field_dec, m_value_slice_reader, decode, + m_is_null); + if (err != HA_EXIT_SUCCESS) { + return err; + } + m_field_iter++; + // Only break for the field that are actually decoding rather than skipping + if (decode) { + break; + } + } + return err; +} + +template <typename value_field_decoder> +bool Rdb_value_field_iterator<value_field_decoder>::end_of_fields() const { + return m_field_iter == m_field_end; +} + +template <typename value_field_decoder> +Field *Rdb_value_field_iterator<value_field_decoder>::get_field() const { + DBUG_ASSERT(m_field != nullptr); + return m_field; +} + +template <typename value_field_decoder> +void *Rdb_value_field_iterator<value_field_decoder>::get_dst() const { + DBUG_ASSERT(m_buf != nullptr); + return m_buf + m_offset; +} + +template <typename value_field_decoder> +int Rdb_value_field_iterator<value_field_decoder>::get_field_index() const { + DBUG_ASSERT(m_field_dec != nullptr); + return m_field_dec->m_field_index; +} + +template <typename value_field_decoder> +enum_field_types Rdb_value_field_iterator<value_field_decoder>::get_field_type() + const { + DBUG_ASSERT(m_field_dec != nullptr); + return m_field_dec->m_field_type; +} + +template <typename value_field_decoder> +bool Rdb_value_field_iterator<value_field_decoder>::is_null() const { + DBUG_ASSERT(m_field != nullptr); + return m_is_null; +} + +/* + Initialize Rdb_converter with table data + @param thd IN Thread context + @param tbl_def IN MyRocks table definition + @param table IN Current open table +*/ +Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def, + TABLE *table) + : m_thd(thd), m_tbl_def(tbl_def), m_table(table) { + DBUG_ASSERT(thd != nullptr); + DBUG_ASSERT(tbl_def != nullptr); + DBUG_ASSERT(table != nullptr); + + m_key_requested = false; + m_verify_row_debug_checksums = false; + m_maybe_unpack_info = false; + m_row_checksums_checked = 0; + m_null_bytes = nullptr; + setup_field_encoders(); +} + +Rdb_converter::~Rdb_converter() { + my_free(m_encoder_arr); + m_encoder_arr = nullptr; + // These are needed to suppress valgrind errors in rocksdb.partition + m_storage_record.free(); +} + +/* + Decide storage type for each encoder +*/ +void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder, + const uint kp) { + auto pk_descr = + m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)]; + // STORE_SOME uses unpack_info. + if (pk_descr->has_unpack_info(kp)) { + DBUG_ASSERT(pk_descr->can_unpack(kp)); + encoder->m_storage_type = Rdb_field_encoder::STORE_SOME; + m_maybe_unpack_info = true; + } else if (pk_descr->can_unpack(kp)) { + encoder->m_storage_type = Rdb_field_encoder::STORE_NONE; + } +} + +/* + @brief + Setup which fields will be unpacked when reading rows + + @detail + Three special cases when we still unpack all fields: + - When client requires decode_all_fields, such as this table is being + updated (m_lock_rows==RDB_LOCK_WRITE). + - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to + read all fields to find whether there is a row checksum at the end. We could + skip the fields instead of decoding them, but currently we do decoding.) + - On index merge as bitmap is cleared during that operation + + @seealso + Rdb_converter::setup_field_encoders() + Rdb_converter::convert_record_from_storage_format() +*/ +void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map, + bool decode_all_fields) { + m_key_requested = false; + m_decoders_vect.clear(); + int last_useful = 0; + int skip_size = 0; + + for (uint i = 0; i < m_table->s->fields; i++) { + // bitmap is cleared on index merge, but it still needs to decode columns + bool field_requested = + decode_all_fields || m_verify_row_debug_checksums || + bitmap_is_clear_all(field_map) || + bitmap_is_set(field_map, m_table->field[i]->field_index); + + // We only need the decoder if the whole record is stored. + if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) { + // the field potentially needs unpacking + if (field_requested) { + // the field is in the read set + m_key_requested = true; + } + continue; + } + + if (field_requested) { + // We will need to decode this field + m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size}); + last_useful = m_decoders_vect.size(); + skip_size = 0; + } else { + if (m_encoder_arr[i].uses_variable_len_encoding() || + m_encoder_arr[i].maybe_null()) { + // For variable-length field, we need to read the data and skip it + m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size}); + skip_size = 0; + } else { + // Fixed-width field can be skipped without looking at it. + // Add appropriate skip_size to the next field. + skip_size += m_encoder_arr[i].m_pack_length_in_rec; + } + } + } + + // It could be that the last few elements are varchars that just do + // skipping. Remove them. + m_decoders_vect.erase(m_decoders_vect.begin() + last_useful, + m_decoders_vect.end()); +} + +void Rdb_converter::setup_field_encoders() { + uint null_bytes_length = 0; + uchar cur_null_mask = 0x1; + + m_encoder_arr = static_cast<Rdb_field_encoder *>( + my_malloc(PSI_INSTRUMENT_ME, m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0))); + if (m_encoder_arr == nullptr) { + return; + } + + for (uint i = 0; i < m_table->s->fields; i++) { + Field *const field = m_table->field[i]; + m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL; + + /* + Check if this field is + - a part of primary key, and + - it can be decoded back from its key image. + If both hold, we don't need to store this field in the value part of + RocksDB's key-value pair. + + If hidden pk exists, we skip this check since the field will never be + part of the hidden pk. + */ + if (!Rdb_key_def::table_has_hidden_pk(m_table)) { + KEY *const pk_info = &m_table->key_info[m_table->s->primary_key]; + for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) { + // key_part->fieldnr is counted from 1 + if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) { + get_storage_type(&m_encoder_arr[i], kp); + break; + } + } + } + + m_encoder_arr[i].m_field_type = field->real_type(); + m_encoder_arr[i].m_field_index = i; + m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec(); + + if (field->real_maybe_null()) { + m_encoder_arr[i].m_null_mask = cur_null_mask; + m_encoder_arr[i].m_null_offset = null_bytes_length; + if (cur_null_mask == 0x80) { + cur_null_mask = 0x1; + null_bytes_length++; + } else { + cur_null_mask = cur_null_mask << 1; + } + } else { + m_encoder_arr[i].m_null_mask = 0; + } + } + + // Count the last, unfinished NULL-bits byte + if (cur_null_mask != 0x1) { + null_bytes_length++; + } + + m_null_bytes_length_in_record = null_bytes_length; +} + +/* + EntryPoint for Decode: + Decode key slice(if requested) and value slice using built-in field + decoders + @param key_def IN key definition to decode + @param dst OUT Mysql buffer to fill decoded content + @param key_slice IN RocksDB key slice to decode + @param value_slice IN RocksDB value slice to decode + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_converter::decode(const std::shared_ptr<Rdb_key_def> &key_def, + uchar *dst, // address to fill data + const rocksdb::Slice *key_slice, + const rocksdb::Slice *value_slice) { + // Currently only support decode primary key, Will add decode secondary later + DBUG_ASSERT(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY || + key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY); + + const rocksdb::Slice *updated_key_slice = key_slice; +#ifndef DBUG_OFF + String last_rowkey; + last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin); + DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1", + { dbug_modify_key_varchar8(&last_rowkey); }); + rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length()); + updated_key_slice = &rowkey_slice; +#endif + return convert_record_from_storage_format(key_def, updated_key_slice, + value_slice, dst); +} + +/* + Decode value slice header + @param reader IN value slice reader + @param pk_def IN key definition to decode + @param unpack_slice OUT unpack info slice + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_converter::decode_value_header( + Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def, + rocksdb::Slice *unpack_slice) { + /* If it's a TTL record, skip the 8 byte TTL value */ + if (pk_def->has_ttl()) { + const char *ttl_bytes; + if ((ttl_bytes = reader->read(ROCKSDB_SIZEOF_TTL_RECORD))) { + memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD); + } else { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + } + + /* Other fields are decoded from the value */ + if (m_null_bytes_length_in_record && + !(m_null_bytes = reader->read(m_null_bytes_length_in_record))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (m_maybe_unpack_info) { + const char *unpack_info = reader->get_current_ptr(); + if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) || + !reader->read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + uint16 unpack_info_len = + rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1)); + *unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len); + + reader->read(unpack_info_len - + Rdb_key_def::get_unpack_header_size(unpack_info[0])); + } + + return HA_EXIT_SUCCESS; +} + +/* + Convert RocksDb key slice and value slice to Mysql format + @param key_def IN key definition to decode + @param key_slice IN RocksDB key slice + @param value_slice IN RocksDB value slice + @param dst OUT MySql format address + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_converter::convert_record_from_storage_format( + const std::shared_ptr<Rdb_key_def> &pk_def, + const rocksdb::Slice *const key_slice, + const rocksdb::Slice *const value_slice, uchar *const dst) { + int err = HA_EXIT_SUCCESS; + + Rdb_string_reader value_slice_reader(value_slice); + rocksdb::Slice unpack_slice; + err = decode_value_header(&value_slice_reader, pk_def, &unpack_slice); + if (err != HA_EXIT_SUCCESS) { + return err; + } + + /* + Decode PK fields from the key + */ + if (m_key_requested) { + err = pk_def->unpack_record(m_table, dst, key_slice, + !unpack_slice.empty() ? &unpack_slice : nullptr, + false /* verify_checksum */); + } + if (err != HA_EXIT_SUCCESS) { + return err; + } + + Rdb_value_field_iterator<Rdb_convert_to_record_value_decoder> + value_field_iterator(m_table, &value_slice_reader, this, dst); + + // Decode value slices + while (!value_field_iterator.end_of_fields()) { + err = value_field_iterator.next(); + + if (err != HA_EXIT_SUCCESS) { + return err; + } + } + + if (m_verify_row_debug_checksums) { + return verify_row_debug_checksum(pk_def, &value_slice_reader, key_slice, + value_slice); + } + return HA_EXIT_SUCCESS; +} + +/* + Verify checksum for row + @param pk_def IN key def + @param reader IN RocksDB value slice reader + @param key IN RocksDB key slice + @param value IN RocksDB value slice + @return + 0 OK + other HA_ERR error code (can be SE-specific) +*/ +int Rdb_converter::verify_row_debug_checksum( + const std::shared_ptr<Rdb_key_def> &pk_def, Rdb_string_reader *reader, + const rocksdb::Slice *key, const rocksdb::Slice *value) { + if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE && + reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) { + uint32_t stored_key_chksum = + rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE)); + uint32_t stored_val_chksum = + rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE)); + + const uint32_t computed_key_chksum = + my_core::my_checksum(0, rdb_slice_to_uchar_ptr(key), key->size()); + const uint32_t computed_val_chksum = + my_core::my_checksum(0, rdb_slice_to_uchar_ptr(value), + value->size() - RDB_CHECKSUM_CHUNK_SIZE); + + DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;); + + if (stored_key_chksum != computed_key_chksum) { + pk_def->report_checksum_mismatch(true, key->data(), key->size()); + return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH; + } + + DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;); + if (stored_val_chksum != computed_val_chksum) { + pk_def->report_checksum_mismatch(false, value->data(), value->size()); + return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH; + } + + m_row_checksums_checked++; + } + if (reader->remaining_bytes()) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + return HA_EXIT_SUCCESS; +} + +/** + Convert record from table->record[0] form into a form that can be written + into rocksdb. + + @param pk_def IN Current key def + @pk_unpack_info IN Unpack info generated during key pack + @is_update_row IN Whether it is update row + @store_row_debug_checksums IN Whether to store checksums + @param ttl_bytes IN/OUT Old ttl value from previous record and + ttl value during current encode + @is_ttl_bytes_updated OUT Whether ttl bytes is updated + @param value_slice OUT Data slice with record data. +*/ +int Rdb_converter::encode_value_slice( + const std::shared_ptr<Rdb_key_def> &pk_def, + const rocksdb::Slice &pk_packed_slice, Rdb_string_writer *pk_unpack_info, + bool is_update_row, bool store_row_debug_checksums, char *ttl_bytes, + bool *is_ttl_bytes_updated, rocksdb::Slice *const value_slice) { + DBUG_ASSERT(pk_def != nullptr); + // Currently only primary key will store value slice + DBUG_ASSERT(pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY || + pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY); + DBUG_ASSERT_IMP(m_maybe_unpack_info, pk_unpack_info); + + bool has_ttl = pk_def->has_ttl(); + bool has_ttl_column = !pk_def->m_ttl_column.empty(); + + m_storage_record.length(0); + + if (has_ttl) { + /* If it's a TTL record, reserve space for 8 byte TTL value in front. */ + m_storage_record.fill( + ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_length_in_record, 0); + // NOTE: is_ttl_bytes_updated is only used for update case + // During update, skip update sk key/values slice iff none of sk fields + // have changed and ttl bytes isn't changed. see + // ha_rocksdb::update_write_sk() for more info + *is_ttl_bytes_updated = false; + char *const data = const_cast<char *>(m_storage_record.ptr()); + if (has_ttl_column) { + DBUG_ASSERT(pk_def->get_ttl_field_index() != UINT_MAX); + Field *const field = m_table->field[pk_def->get_ttl_field_index()]; + DBUG_ASSERT(field->pack_length_in_rec() == ROCKSDB_SIZEOF_TTL_RECORD); + DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG); + + uint64 ts = uint8korr(field->ptr); +#ifndef DBUG_OFF + ts += rdb_dbug_set_ttl_rec_ts(); +#endif + rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts); + if (is_update_row) { + *is_ttl_bytes_updated = + memcmp(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD); + } + // Also store in m_ttl_bytes to propagate to update_write_sk + memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD); + } else { + /* + For implicitly generated TTL records we need to copy over the old + TTL value from the old record in the event of an update. It was stored + in m_ttl_bytes. + + Otherwise, generate a timestamp using the current time. + */ + if (is_update_row) { + memcpy(data, ttl_bytes, sizeof(uint64)); + } else { + uint64 ts = static_cast<uint64>(std::time(nullptr)); +#ifndef DBUG_OFF + ts += rdb_dbug_set_ttl_rec_ts(); +#endif + rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts); + // Also store in m_ttl_bytes to propagate to update_write_sk + memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD); + } + } + } else { + /* All NULL bits are initially 0 */ + m_storage_record.fill(m_null_bytes_length_in_record, 0); + } + + // If a primary key may have non-empty unpack_info for certain values, + // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block + // itself was prepared in Rdb_key_def::pack_record. + if (m_maybe_unpack_info) { + m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()), + pk_unpack_info->get_current_pos()); + } + for (uint i = 0; i < m_table->s->fields; i++) { + Rdb_field_encoder &encoder = m_encoder_arr[i]; + /* Don't pack decodable PK key parts */ + if (encoder.m_storage_type != Rdb_field_encoder::STORE_ALL) { + continue; + } + + Field *const field = m_table->field[i]; + if (encoder.maybe_null()) { + char *data = const_cast<char *>(m_storage_record.ptr()); + if (has_ttl) { + data += ROCKSDB_SIZEOF_TTL_RECORD; + } + + if (field->is_null()) { + data[encoder.m_null_offset] |= encoder.m_null_mask; + /* Don't write anything for NULL values */ + continue; + } + } + + if (encoder.m_field_type == MYSQL_TYPE_BLOB) { + my_core::Field_blob *blob = + reinterpret_cast<my_core::Field_blob *>(field); + /* Get the number of bytes needed to store length*/ + const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; + + /* Store the length of the value */ + m_storage_record.append(reinterpret_cast<char *>(blob->ptr), + length_bytes); + + /* Store the blob value itself */ + char *data_ptr; + memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **)); + m_storage_record.append(data_ptr, blob->get_length()); + } else if (encoder.m_field_type == MYSQL_TYPE_VARCHAR) { + Field_varstring *const field_var = + reinterpret_cast<Field_varstring *>(field); + uint data_len; + /* field_var->length_bytes is 1 or 2 */ + if (field_var->length_bytes == 1) { + data_len = field_var->ptr[0]; + } else { + DBUG_ASSERT(field_var->length_bytes == 2); + data_len = uint2korr(field_var->ptr); + } + m_storage_record.append(reinterpret_cast<char *>(field_var->ptr), + field_var->length_bytes + data_len); + } else { + /* Copy the field data */ + const uint len = field->pack_length_in_rec(); + m_storage_record.append(reinterpret_cast<char *>(field->ptr), len); + } + } + + if (store_row_debug_checksums) { + const uint32_t key_crc32 = my_core::my_checksum( + 0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size()); + const uint32_t val_crc32 = + my_core::my_checksum(0, rdb_mysql_str_to_uchar_str(&m_storage_record), + m_storage_record.length()); + uchar key_crc_buf[RDB_CHECKSUM_SIZE]; + uchar val_crc_buf[RDB_CHECKSUM_SIZE]; + rdb_netbuf_store_uint32(key_crc_buf, key_crc32); + rdb_netbuf_store_uint32(val_crc_buf, val_crc32); + m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1); + m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE); + m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE); + } + + *value_slice = + rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length()); + + return HA_EXIT_SUCCESS; +} +} // namespace myrocks |