diff options
Diffstat (limited to 'storage/tokudb/tokudb_update_fun.cc')
-rw-r--r-- | storage/tokudb/tokudb_update_fun.cc | 1967 |
1 files changed, 1967 insertions, 0 deletions
diff --git a/storage/tokudb/tokudb_update_fun.cc b/storage/tokudb/tokudb_update_fun.cc new file mode 100644 index 00000000..f9b8372e --- /dev/null +++ b/storage/tokudb/tokudb_update_fun.cc @@ -0,0 +1,1967 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of TokuDB + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + TokuDBis is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + TokuDB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with TokuDB. If not, see <http://www.gnu.org/licenses/>. + +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +// Update operation codes. These codes get stuffed into update messages, so they can not change. +// The operations are currently stored in a single byte in the update message, so only 256 operations +// are supported. When we need more, we can use the last (255) code to indicate that the operation code +// is expanded beyond 1 byte. +enum { + UPDATE_OP_COL_ADD_OR_DROP = 0, + + UPDATE_OP_EXPAND_VARIABLE_OFFSETS = 1, + UPDATE_OP_EXPAND_INT = 2, + UPDATE_OP_EXPAND_UINT = 3, + UPDATE_OP_EXPAND_CHAR = 4, + UPDATE_OP_EXPAND_BINARY = 5, + UPDATE_OP_EXPAND_BLOB = 6, + + UPDATE_OP_UPDATE_1 = 10, + UPDATE_OP_UPSERT_1 = 11, + UPDATE_OP_UPDATE_2 = 12, + UPDATE_OP_UPSERT_2 = 13, +}; + +// Field types used in the update messages +enum { + UPDATE_TYPE_UNKNOWN = 0, + UPDATE_TYPE_INT = 1, + UPDATE_TYPE_UINT = 2, + UPDATE_TYPE_CHAR = 3, + UPDATE_TYPE_BINARY = 4, + UPDATE_TYPE_VARCHAR = 5, + UPDATE_TYPE_VARBINARY = 6, + UPDATE_TYPE_TEXT = 7, + UPDATE_TYPE_BLOB = 8, +}; + +#define UP_COL_ADD_OR_DROP UPDATE_OP_COL_ADD_OR_DROP + +// add or drop column sub-operations +#define COL_DROP 0xaa +#define COL_ADD 0xbb + +// add or drop column types +#define COL_FIXED 0xcc +#define COL_VAR 0xdd +#define COL_BLOB 0xee + +#define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8 + +// how much space do I need for the mutators? +// static stuff first: +// operation 1 == UP_COL_ADD_OR_DROP +// 8 - old null, new null +// 2 - old num_offset, new num_offset +// 8 - old fixed_field size, new fixed_field_size +// 8 - old and new length of offsets +// 8 - old and new starting null bit position +// TOTAL: 27 + +// dynamic stuff: +// 4 - number of columns +// for each column: +// 1 - add or drop +// 1 - is nullable +// 4 - if nullable, position +// 1 - if add, whether default is null or not +// 1 - if fixed, var, or not +// for fixed, entire default +// for var, 4 bytes length, then entire default +// for blob, nothing +// So, an upperbound is 4 + num_fields(12) + all default stuff + +// static blob stuff: +// 4 - num blobs +// 1 byte for each num blobs in old table +// So, an upperbound is 4 + kc_info->num_blobs + +// dynamic blob stuff: +// for each blob added: +// 1 - state if we are adding or dropping +// 4 - blob index +// if add, 1 len bytes +// at most, 4 0's +// So, upperbound is num_blobs(1+4+1+4) = num_columns*10 + +// The expand varchar offsets message is used to expand the size of an offset +// from 1 to 2 bytes. Not VLQ coded. +// uint8 operation = UPDATE_OP_EXPAND_VARIABLE_OFFSETS +// uint32 number of offsets +// uint32 starting offset of the variable length field offsets + +// Expand the size of a fixed length column message. Not VLQ coded. +// The field type is encoded in the operation code. +// uint8 operation = UPDATE_OP_EXPAND_INT/UINT/CHAR/BINARY +// uint32 offset offset of the field +// uint32 old length the old length of the field's value +// uint32 new length the new length of the field's value + +// uint8 operation = UPDATE_OP_EXPAND_CHAR/BINARY +// uint32 offset offset of the field +// uint32 old length the old length of the field's value +// uint32 new length the new length of the field's value +// uint8 pad char + +// Expand blobs message. VLQ coded. +// uint8 operation = UPDATE_OP_EXPAND_BLOB +// uint32 start variable offset +// uint32 variable offset bytes +// uint32 bytes per offset +// uint32 num blobs = N +// uint8 old lengths[N] +// uint8 new lengths[N] + +// Update and Upsert version 1 messages. Not VLQ coded. Not used anymore, but +// may be in the fractal tree from a previous build. +// +// Field descriptor: +// Operations: +// update operation 4 == { '=', '+', '-' } +// x = k +// x = x + k +// x = x - k +// field type 4 see field types above +// unused 4 unused +// field null num 4 bit 31 is 1 if the field is nullible and the +// remaining bits contain the null bit number +// field offset 4 for fixed fields, this is the offset from +// begining of the row of the field +// value: +// value length 4 == N, length of the value +// value N value to add or subtract +// +// Update_1 message: +// Operation 1 == UPDATE_OP_UPDATE_1 +// fixed field offset 4 offset of the beginning of the fixed fields +// var field offset 4 offset of the variable length offsets +// var_offset_bytes 1 length of offsets (Note: not big enough) +// bytes_per_offset 4 number of bytes per offset +// Number of update ops 4 == N +// Update ops [N] +// +// Upsert_1 message: +// Operation 1 == UPDATE_OP_UPSERT_1 +// Insert row: +// length 4 == N +// data N +// fixed field offset 4 offset of the beginning of the fixed fields +// var field offset 4 offset of the variable length offsets +// var_offset_bytes 1 length of offsets (Note: not big enough) +// bytes_per_offset 4 number of bytes per offset +// Number of update ops 4 == N +// Update ops [N] + +// Update and Upserver version 2 messages. VLQ coded. +// Update version 2 +// uint8 operation = UPDATE_OP_UPDATE_2 +// uint32 number of update ops = N +// uint8 update ops [ N ] +// +// Upsert version 2 +// uint8 operation = UPDATE_OP_UPSERT_2 +// uint32 insert length = N +// uint8 insert data [ N ] +// uint32 number of update ops = M +// update ops [ M ] +// +// Variable fields info +// uint32 update operation = 'v' +// uint32 start offset +// uint32 num varchars +// uint32 bytes per offset +// +// Blobs info +// uint32 update operation = 'b' +// uint32 num blobs = N +// uint8 blob lengths [ N ] +// +// Update operation on fixed length fields +// uint32 update operation = '=', '+', '-' +// uint32 field type +// uint32 null num 0 => not nullable, otherwise encoded as field_null_num + 1 +// uint32 offset +// uint32 value length = N +// uint8 value [ N ] +// +// Update operation on varchar fields +// uint32 update operation = '=' +// uint32 field type +// uint32 null num +// uint32 var index +// uint32 value length = N +// uint8 value [ N ] +// +// Update operation on blob fields +// uint32 update operation = '=' +// uint32 field type +// uint32 null num +// uint32 blob index +// uint32 value length = N +// uint8 value [ N ] + +#include "tokudb_buffer.h" +#include "tokudb_math.h" + +// +// checks whether the bit at index pos in data is set or not +// +static inline bool is_overall_null_position_set(uchar* data, uint32_t pos) { + uint32_t offset = pos/8; + uchar remainder = pos%8; + uchar null_bit = 1<<remainder; + return ((data[offset] & null_bit) != 0); +} + +// +// sets the bit at index pos in data to 1 if is_null, 0 otherwise +// +static inline void set_overall_null_position( + uchar* data, + uint32_t pos, + bool is_null) { + + uint32_t offset = pos/8; + uchar remainder = pos%8; + uchar null_bit = 1<<remainder; + if (is_null) { + data[offset] |= null_bit; + } + else { + data[offset] &= ~null_bit; + } +} + +static inline void copy_null_bits( + uint32_t start_old_pos, + uint32_t start_new_pos, + uint32_t num_bits, + uchar* old_null_bytes, + uchar* new_null_bytes) { + for (uint32_t i = 0; i < num_bits; i++) { + uint32_t curr_old_pos = i + start_old_pos; + uint32_t curr_new_pos = i + start_new_pos; + // copy over old null bytes + if (is_overall_null_position_set(old_null_bytes,curr_old_pos)) { + set_overall_null_position(new_null_bytes,curr_new_pos,true); + } + else { + set_overall_null_position(new_null_bytes,curr_new_pos,false); + } + } +} + +static inline void copy_var_fields( + //index of var fields that we should start writing + uint32_t start_old_num_var_field, + // number of var fields to copy + uint32_t num_var_fields, + //static ptr to where offset bytes begin in old row + uchar* old_var_field_offset_ptr, + //number of offset bytes used in old row + uchar old_num_offset_bytes, + // where the new var data should be written + uchar* start_new_var_field_data_ptr, + // where the new var offsets should be written + uchar* start_new_var_field_offset_ptr, + // pointer to beginning of var fields in new row + uchar* new_var_field_data_ptr, + // pointer to beginning of var fields in old row + uchar* old_var_field_data_ptr, + // number of offset bytes used in new row + uint32_t new_num_offset_bytes, + uint32_t* num_data_bytes_written, + uint32_t* num_offset_bytes_written) { + + uchar* curr_new_var_field_data_ptr = start_new_var_field_data_ptr; + uchar* curr_new_var_field_offset_ptr = start_new_var_field_offset_ptr; + for (uint32_t i = 0; i < num_var_fields; i++) { + uint32_t field_len; + uint32_t start_read_offset; + uint32_t curr_old = i + start_old_num_var_field; + uchar* data_to_copy = NULL; + // get the length and pointer to data that needs to be copied + get_var_field_info( + &field_len, + &start_read_offset, + curr_old, + old_var_field_offset_ptr, + old_num_offset_bytes); + data_to_copy = old_var_field_data_ptr + start_read_offset; + // now need to copy field_len bytes starting from data_to_copy + curr_new_var_field_data_ptr = write_var_field( + curr_new_var_field_offset_ptr, + curr_new_var_field_data_ptr, + new_var_field_data_ptr, + data_to_copy, + field_len, + new_num_offset_bytes); + curr_new_var_field_offset_ptr += new_num_offset_bytes; + } + *num_data_bytes_written = + (uint32_t)(curr_new_var_field_data_ptr - start_new_var_field_data_ptr); + *num_offset_bytes_written = + (uint32_t)(curr_new_var_field_offset_ptr - + start_new_var_field_offset_ptr); +} + +static inline uint32_t copy_toku_blob( + uchar* to_ptr, + uchar* from_ptr, + uint32_t len_bytes, + bool skip) { + + uint32_t length = 0; + if (!skip) { + memcpy(to_ptr, from_ptr, len_bytes); + } + length = get_blob_field_len(from_ptr,len_bytes); + if (!skip) { + memcpy(to_ptr + len_bytes, from_ptr + len_bytes, length); + } + return (length + len_bytes); +} + +static int tokudb_hcad_update_fun(const DBT* old_val, + const DBT* extra, + void (*set_val)(const DBT* new_val, + void* set_extra), + void* set_extra) { + uint32_t max_num_bytes; + uint32_t num_columns; + DBT new_val; + uint32_t num_bytes_left; + uint32_t num_var_fields_to_copy; + uint32_t num_data_bytes_written = 0; + uint32_t num_offset_bytes_written = 0; + int error; + memset(&new_val, 0, sizeof(DBT)); + uchar operation; + uchar* new_val_data = NULL; + uchar* extra_pos = NULL; + uchar* extra_pos_start = NULL; + // + // info for pointers into rows + // + uint32_t old_num_null_bytes; + uint32_t new_num_null_bytes; + uchar old_num_offset_bytes; + uchar new_num_offset_bytes; + uint32_t old_fixed_field_size; + uint32_t new_fixed_field_size; + uint32_t old_len_of_offsets; + uint32_t new_len_of_offsets; + + uchar* old_fixed_field_ptr = NULL; + uchar* new_fixed_field_ptr = NULL; + uint32_t curr_old_fixed_offset; + uint32_t curr_new_fixed_offset; + + uchar* old_null_bytes = NULL; + uchar* new_null_bytes = NULL; + uint32_t curr_old_null_pos; + uint32_t curr_new_null_pos; + uint32_t old_null_bits_left; + uint32_t new_null_bits_left; + uint32_t overall_null_bits_left; + + uint32_t old_num_var_fields; + // uint32_t new_num_var_fields; + uint32_t curr_old_num_var_field; + uint32_t curr_new_num_var_field; + uchar* old_var_field_offset_ptr = NULL; + uchar* new_var_field_offset_ptr = NULL; + uchar* curr_new_var_field_offset_ptr = NULL; + uchar* old_var_field_data_ptr = NULL; + uchar* new_var_field_data_ptr = NULL; + uchar* curr_new_var_field_data_ptr = NULL; + + uint32_t start_blob_offset; + uchar* start_blob_ptr; + uint32_t num_blob_bytes; + + // came across a delete, nothing to update + if (old_val == NULL) { + error = 0; + goto cleanup; + } + + extra_pos_start = (uchar *)extra->data; + extra_pos = (uchar *)extra->data; + + operation = extra_pos[0]; + extra_pos++; + assert_always(operation == UP_COL_ADD_OR_DROP); + + memcpy(&old_num_null_bytes, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + memcpy(&new_num_null_bytes, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + + old_num_offset_bytes = extra_pos[0]; + extra_pos++; + new_num_offset_bytes = extra_pos[0]; + extra_pos++; + + memcpy(&old_fixed_field_size, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + memcpy(&new_fixed_field_size, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + + memcpy(&old_len_of_offsets, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + memcpy(&new_len_of_offsets, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + + max_num_bytes = + old_val->size + extra->size + new_len_of_offsets + new_fixed_field_size; + new_val_data = (uchar *)tokudb::memory::malloc( + max_num_bytes, + MYF(MY_FAE)); + if (new_val_data == NULL) { + error = ENOMEM; + goto cleanup; + } + + old_fixed_field_ptr = (uchar *) old_val->data; + old_fixed_field_ptr += old_num_null_bytes; + new_fixed_field_ptr = new_val_data + new_num_null_bytes; + curr_old_fixed_offset = 0; + curr_new_fixed_offset = 0; + + old_num_var_fields = old_len_of_offsets/old_num_offset_bytes; + // new_num_var_fields = new_len_of_offsets/new_num_offset_bytes; + // following fields will change as we write the variable data + old_var_field_offset_ptr = old_fixed_field_ptr + old_fixed_field_size; + new_var_field_offset_ptr = new_fixed_field_ptr + new_fixed_field_size; + old_var_field_data_ptr = old_var_field_offset_ptr + old_len_of_offsets; + new_var_field_data_ptr = new_var_field_offset_ptr + new_len_of_offsets; + curr_new_var_field_offset_ptr = new_var_field_offset_ptr; + curr_new_var_field_data_ptr = new_var_field_data_ptr; + curr_old_num_var_field = 0; + curr_new_num_var_field = 0; + + old_null_bytes = (uchar *)old_val->data; + new_null_bytes = new_val_data; + + memcpy(&curr_old_null_pos, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + memcpy(&curr_new_null_pos, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + + memcpy(&num_columns, extra_pos, sizeof(num_columns)); + extra_pos += sizeof(num_columns); + + memset(new_null_bytes, 0, new_num_null_bytes); // shut valgrind up + + // + // now go through and apply the change into new_val_data + // + for (uint32_t i = 0; i < num_columns; i++) { + uchar op_type = extra_pos[0]; + bool is_null_default = false; + extra_pos++; + + assert_always(op_type == COL_DROP || op_type == COL_ADD); + bool nullable = (extra_pos[0] != 0); + extra_pos++; + if (nullable) { + uint32_t null_bit_position; + memcpy(&null_bit_position, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + uint32_t num_bits; + if (op_type == COL_DROP) { + assert_always(curr_old_null_pos <= null_bit_position); + num_bits = null_bit_position - curr_old_null_pos; + } else { + assert_always(curr_new_null_pos <= null_bit_position); + num_bits = null_bit_position - curr_new_null_pos; + } + copy_null_bits( + curr_old_null_pos, + curr_new_null_pos, + num_bits, + old_null_bytes, + new_null_bytes); + // update the positions + curr_new_null_pos += num_bits; + curr_old_null_pos += num_bits; + if (op_type == COL_DROP) { + curr_old_null_pos++; // account for dropped column + } else { + is_null_default = (extra_pos[0] != 0); + extra_pos++; + set_overall_null_position( + new_null_bytes, + null_bit_position, + is_null_default); + curr_new_null_pos++; //account for added column + } + } + uchar col_type = extra_pos[0]; + extra_pos++; + if (col_type == COL_FIXED) { + uint32_t col_offset; + uint32_t col_size; + uint32_t num_bytes_to_copy; + memcpy(&col_offset, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + memcpy(&col_size, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + + if (op_type == COL_DROP) { + num_bytes_to_copy = col_offset - curr_old_fixed_offset; + } else { + num_bytes_to_copy = col_offset - curr_new_fixed_offset; + } + memcpy( + new_fixed_field_ptr + curr_new_fixed_offset, + old_fixed_field_ptr + curr_old_fixed_offset, + num_bytes_to_copy); + curr_old_fixed_offset += num_bytes_to_copy; + curr_new_fixed_offset += num_bytes_to_copy; + if (op_type == COL_DROP) { + // move old_fixed_offset val to skip OVER column that is + // being dropped + curr_old_fixed_offset += col_size; + } else { + if (is_null_default) { + // copy zeroes + memset( + new_fixed_field_ptr + curr_new_fixed_offset, + 0, + col_size); + } else { + // copy data from extra_pos into new row + memcpy( + new_fixed_field_ptr + curr_new_fixed_offset, + extra_pos, + col_size); + extra_pos += col_size; + } + curr_new_fixed_offset += col_size; + } + + } else if (col_type == COL_VAR) { + uint32_t var_col_index; + memcpy(&var_col_index, extra_pos, sizeof(uint32_t)); + extra_pos += sizeof(uint32_t); + if (op_type == COL_DROP) { + num_var_fields_to_copy = var_col_index - curr_old_num_var_field; + } else { + num_var_fields_to_copy = var_col_index - curr_new_num_var_field; + } + copy_var_fields( + curr_old_num_var_field, + num_var_fields_to_copy, + old_var_field_offset_ptr, + old_num_offset_bytes, + curr_new_var_field_data_ptr, + curr_new_var_field_offset_ptr, + // pointer to beginning of var fields in new row + new_var_field_data_ptr, + // pointer to beginning of var fields in old row + old_var_field_data_ptr, + // number of offset bytes used in new row + new_num_offset_bytes, + &num_data_bytes_written, + &num_offset_bytes_written); + curr_new_var_field_data_ptr += num_data_bytes_written; + curr_new_var_field_offset_ptr += num_offset_bytes_written; + curr_new_num_var_field += num_var_fields_to_copy; + curr_old_num_var_field += num_var_fields_to_copy; + if (op_type == COL_DROP) { + curr_old_num_var_field++; // skip over dropped field + } else { + if (is_null_default) { + curr_new_var_field_data_ptr = write_var_field( + curr_new_var_field_offset_ptr, + curr_new_var_field_data_ptr, + new_var_field_data_ptr, + NULL, //copying no data + 0, //copying 0 bytes + new_num_offset_bytes); + curr_new_var_field_offset_ptr += new_num_offset_bytes; + } else { + uint32_t data_length; + memcpy(&data_length, extra_pos, sizeof(data_length)); + extra_pos += sizeof(data_length); + curr_new_var_field_data_ptr = write_var_field( + curr_new_var_field_offset_ptr, + curr_new_var_field_data_ptr, + new_var_field_data_ptr, + extra_pos, //copying data from mutator + data_length, //copying data_length bytes + new_num_offset_bytes); + extra_pos += data_length; + curr_new_var_field_offset_ptr += new_num_offset_bytes; + } + curr_new_num_var_field++; //account for added column + } + } else if (col_type == COL_BLOB) { + // handle blob data later + continue; + } else { + assert_unreachable(); + } + } + // finish copying the null stuff + old_null_bits_left = 8*old_num_null_bytes - curr_old_null_pos; + new_null_bits_left = 8*new_num_null_bytes - curr_new_null_pos; + overall_null_bits_left = old_null_bits_left; + set_if_smaller(overall_null_bits_left, new_null_bits_left); + copy_null_bits( + curr_old_null_pos, + curr_new_null_pos, + overall_null_bits_left, + old_null_bytes, + new_null_bytes); + // finish copying fixed field stuff + num_bytes_left = old_fixed_field_size - curr_old_fixed_offset; + memcpy( + new_fixed_field_ptr + curr_new_fixed_offset, + old_fixed_field_ptr + curr_old_fixed_offset, + num_bytes_left); + curr_old_fixed_offset += num_bytes_left; + curr_new_fixed_offset += num_bytes_left; + // sanity check + assert_always(curr_new_fixed_offset == new_fixed_field_size); + + // finish copying var field stuff + num_var_fields_to_copy = old_num_var_fields - curr_old_num_var_field; + copy_var_fields( + curr_old_num_var_field, + num_var_fields_to_copy, + old_var_field_offset_ptr, + old_num_offset_bytes, + curr_new_var_field_data_ptr, + curr_new_var_field_offset_ptr, + // pointer to beginning of var fields in new row + new_var_field_data_ptr, + // pointer to beginning of var fields in old row + old_var_field_data_ptr, + // number of offset bytes used in new row + new_num_offset_bytes, + &num_data_bytes_written, + &num_offset_bytes_written); + curr_new_var_field_offset_ptr += num_offset_bytes_written; + curr_new_var_field_data_ptr += num_data_bytes_written; + // sanity check + assert_always(curr_new_var_field_offset_ptr == new_var_field_data_ptr); + + // start handling blobs + get_blob_field_info( + &start_blob_offset, + old_len_of_offsets, + old_var_field_data_ptr, + old_num_offset_bytes); + start_blob_ptr = old_var_field_data_ptr + start_blob_offset; + // if nothing else in extra, then there are no blobs to add or drop, so + // can copy blobs straight + if ((extra_pos - extra_pos_start) == extra->size) { + num_blob_bytes = old_val->size - (start_blob_ptr - old_null_bytes); + memcpy(curr_new_var_field_data_ptr, start_blob_ptr, num_blob_bytes); + curr_new_var_field_data_ptr += num_blob_bytes; + } else { + // else, there is blob information to process + uchar* len_bytes = NULL; + uint32_t curr_old_blob = 0; + uint32_t curr_new_blob = 0; + uint32_t num_old_blobs = 0; + uchar* curr_old_blob_ptr = start_blob_ptr; + memcpy(&num_old_blobs, extra_pos, sizeof(num_old_blobs)); + extra_pos += sizeof(num_old_blobs); + len_bytes = extra_pos; + extra_pos += num_old_blobs; + // copy over blob fields one by one + while ((extra_pos - extra_pos_start) < extra->size) { + uchar op_type = extra_pos[0]; + extra_pos++; + uint32_t num_blobs_to_copy = 0; + uint32_t blob_index; + memcpy(&blob_index, extra_pos, sizeof(blob_index)); + extra_pos += sizeof(blob_index); + assert_always (op_type == COL_DROP || op_type == COL_ADD); + if (op_type == COL_DROP) { + num_blobs_to_copy = blob_index - curr_old_blob; + } else { + num_blobs_to_copy = blob_index - curr_new_blob; + } + for (uint32_t i = 0; i < num_blobs_to_copy; i++) { + uint32_t num_bytes_written = copy_toku_blob( + curr_new_var_field_data_ptr, + curr_old_blob_ptr, + len_bytes[curr_old_blob + i], + false); + curr_old_blob_ptr += num_bytes_written; + curr_new_var_field_data_ptr += num_bytes_written; + } + curr_old_blob += num_blobs_to_copy; + curr_new_blob += num_blobs_to_copy; + if (op_type == COL_DROP) { + // skip over blob in row + uint32_t num_bytes = copy_toku_blob( + NULL, + curr_old_blob_ptr, + len_bytes[curr_old_blob], + true); + curr_old_blob++; + curr_old_blob_ptr += num_bytes; + } else { + // copy new data + uint32_t new_len_bytes = extra_pos[0]; + extra_pos++; + uint32_t num_bytes = copy_toku_blob( + curr_new_var_field_data_ptr, + extra_pos, + new_len_bytes, + false); + curr_new_blob++; + curr_new_var_field_data_ptr += num_bytes; + extra_pos += num_bytes; + } + } + num_blob_bytes = old_val->size - (curr_old_blob_ptr - old_null_bytes); + memcpy(curr_new_var_field_data_ptr, curr_old_blob_ptr, num_blob_bytes); + curr_new_var_field_data_ptr += num_blob_bytes; + } + new_val.data = new_val_data; + new_val.size = curr_new_var_field_data_ptr - new_val_data; + set_val(&new_val, set_extra); + + error = 0; +cleanup: + tokudb::memory::free(new_val_data); + return error; +} + +// Expand the variable offset array in the old row given the update mesage +// in the extra. +static int tokudb_expand_variable_offsets(const DBT* old_val, + const DBT* extra, + void (*set_val)(const DBT* new_val, + void* set_extra), + void* set_extra) { + int error = 0; + tokudb::buffer extra_val(extra->data, 0, extra->size); + + // decode the operation + uint8_t operation; + extra_val.consume(&operation, sizeof operation); + assert_always(operation == UPDATE_OP_EXPAND_VARIABLE_OFFSETS); + + // decode number of offsets + uint32_t number_of_offsets; + extra_val.consume(&number_of_offsets, sizeof number_of_offsets); + + // decode the offset start + uint32_t offset_start; + extra_val.consume(&offset_start, sizeof offset_start); + + assert_always(extra_val.size() == extra_val.limit()); + + DBT new_val; memset(&new_val, 0, sizeof new_val); + + if (old_val != NULL) { + assert_always(offset_start + number_of_offsets <= old_val->size); + + // compute the new val from the old val + uchar* old_val_ptr = (uchar*)old_val->data; + + // allocate space for the new val's data + uchar* new_val_ptr = (uchar*)tokudb::memory::malloc( + number_of_offsets + old_val->size, + MYF(MY_FAE)); + if (!new_val_ptr) { + error = ENOMEM; + goto cleanup; + } + new_val.data = new_val_ptr; + + // copy up to the start of the varchar offset + memcpy(new_val_ptr, old_val_ptr, offset_start); + new_val_ptr += offset_start; + old_val_ptr += offset_start; + + // expand each offset from 1 to 2 bytes + for (uint32_t i = 0; i < number_of_offsets; i++) { + uint16_t new_offset = *old_val_ptr; + int2store(new_val_ptr, new_offset); + new_val_ptr += 2; + old_val_ptr += 1; + } + + // copy the rest of the row + size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data); + memcpy(new_val_ptr, old_val_ptr, n); + new_val_ptr += n; + old_val_ptr += n; + new_val.size = new_val_ptr - (uchar *)new_val.data; + + assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size); + assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size); + + // set the new val + set_val(&new_val, set_extra); + } + + error = 0; + +cleanup: + tokudb::memory::free(new_val.data); + return error; +} + +// Expand an int field in a old row given the expand message in the extra. +static int tokudb_expand_int_field(const DBT* old_val, + const DBT* extra, + void (*set_val)(const DBT* new_val, + void* set_extra), + void* set_extra) { + int error = 0; + tokudb::buffer extra_val(extra->data, 0, extra->size); + + uint8_t operation; + extra_val.consume(&operation, sizeof operation); + assert_always( + operation == UPDATE_OP_EXPAND_INT || + operation == UPDATE_OP_EXPAND_UINT); + uint32_t the_offset; + extra_val.consume(&the_offset, sizeof the_offset); + uint32_t old_length; + extra_val.consume(&old_length, sizeof old_length); + uint32_t new_length; + extra_val.consume(&new_length, sizeof new_length); + assert_always(extra_val.size() == extra_val.limit()); + + assert_always(new_length >= old_length); // expand only + + DBT new_val; memset(&new_val, 0, sizeof new_val); + + if (old_val != NULL) { + // old field within the old val + assert_always(the_offset + old_length <= old_val->size); + + // compute the new val from the old val + uchar* old_val_ptr = (uchar*)old_val->data; + + // allocate space for the new val's data + uchar* new_val_ptr = (uchar*)tokudb::memory::malloc( + old_val->size + (new_length - old_length), + MYF(MY_FAE)); + if (!new_val_ptr) { + error = ENOMEM; + goto cleanup; + } + new_val.data = new_val_ptr; + + // copy up to the old offset + memcpy(new_val_ptr, old_val_ptr, the_offset); + new_val_ptr += the_offset; + old_val_ptr += the_offset; + + switch (operation) { + case UPDATE_OP_EXPAND_INT: + // fill the entire new value with ones or zeros depending on the + // sign bit the encoding is little endian + memset( + new_val_ptr, + (old_val_ptr[old_length-1] & 0x80) ? 0xff : 0x00, + new_length); + // overlay the low bytes of the new value with the old value + memcpy(new_val_ptr, old_val_ptr, old_length); + new_val_ptr += new_length; + old_val_ptr += old_length; + break; + case UPDATE_OP_EXPAND_UINT: + // fill the entire new value with zeros + memset(new_val_ptr, 0, new_length); + // overlay the low bytes of the new value with the old value + memcpy(new_val_ptr, old_val_ptr, old_length); + new_val_ptr += new_length; + old_val_ptr += old_length; + break; + default: + assert_unreachable(); + } + + // copy the rest + size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data); + memcpy(new_val_ptr, old_val_ptr, n); + new_val_ptr += n; + old_val_ptr += n; + new_val.size = new_val_ptr - (uchar *)new_val.data; + + assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size); + assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size); + + // set the new val + set_val(&new_val, set_extra); + } + + error = 0; + +cleanup: + tokudb::memory::free(new_val.data); + return error; +} + +// Expand a char field in a old row given the expand message in the extra. +static int tokudb_expand_char_field(const DBT* old_val, + const DBT* extra, + void (*set_val)(const DBT* new_val, + void* set_extra), + void* set_extra) { + int error = 0; + tokudb::buffer extra_val(extra->data, 0, extra->size); + + uint8_t operation; + extra_val.consume(&operation, sizeof operation); + assert_always( + operation == UPDATE_OP_EXPAND_CHAR || + operation == UPDATE_OP_EXPAND_BINARY); + uint32_t the_offset; + extra_val.consume(&the_offset, sizeof the_offset); + uint32_t old_length; + extra_val.consume(&old_length, sizeof old_length); + uint32_t new_length; + extra_val.consume(&new_length, sizeof new_length); + uchar pad_char; + extra_val.consume(&pad_char, sizeof pad_char); + assert_always(extra_val.size() == extra_val.limit()); + + assert_always(new_length >= old_length); // expand only + + DBT new_val; memset(&new_val, 0, sizeof new_val); + + if (old_val != NULL) { + // old field within the old val + assert_always(the_offset + old_length <= old_val->size); + + // compute the new val from the old val + uchar* old_val_ptr = (uchar*)old_val->data; + + // allocate space for the new val's data + uchar* new_val_ptr = (uchar*)tokudb::memory::malloc( + old_val->size + (new_length - old_length), + MYF(MY_FAE)); + if (!new_val_ptr) { + error = ENOMEM; + goto cleanup; + } + new_val.data = new_val_ptr; + + // copy up to the old offset + memcpy(new_val_ptr, old_val_ptr, the_offset); + new_val_ptr += the_offset; + old_val_ptr += the_offset; + + switch (operation) { + case UPDATE_OP_EXPAND_CHAR: + case UPDATE_OP_EXPAND_BINARY: + // fill the entire new value with the pad char + memset(new_val_ptr, pad_char, new_length); + // overlay the low bytes of the new value with the old value + memcpy(new_val_ptr, old_val_ptr, old_length); + new_val_ptr += new_length; + old_val_ptr += old_length; + break; + default: + assert_unreachable(); + } + + // copy the rest + size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data); + memcpy(new_val_ptr, old_val_ptr, n); + new_val_ptr += n; + old_val_ptr += n; + new_val.size = new_val_ptr - (uchar *)new_val.data; + + assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size); + assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size); + + // set the new val + set_val(&new_val, set_extra); + } + + error = 0; + +cleanup: + tokudb::memory::free(new_val.data); + return error; +} + +namespace tokudb { + +class var_fields { +public: + inline var_fields() { + } + inline void init_var_fields( + uint32_t var_offset, + uint32_t offset_bytes, + uint32_t bytes_per_offset, + tokudb::buffer* val_buffer) { + + assert_always( + bytes_per_offset == 0 || + bytes_per_offset == 1 || + bytes_per_offset == 2); + m_var_offset = var_offset; + m_val_offset = m_var_offset + offset_bytes; + m_bytes_per_offset = bytes_per_offset; + if (bytes_per_offset > 0) { + m_num_fields = offset_bytes / bytes_per_offset; + } else { + assert_always(offset_bytes == 0); + m_num_fields = 0; + } + m_val_buffer = val_buffer; + } + uint32_t value_offset(uint32_t var_index); + uint32_t value_length(uint32_t var_index); + void update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s); + uint32_t end_offset(); + void replace( + uint32_t var_index, + void* new_val_ptr, + uint32_t new_val_length); +private: + uint32_t read_offset(uint32_t var_index); + void write_offset(uint32_t var_index, uint32_t v); +private: + uint32_t m_var_offset; + uint32_t m_val_offset; + uint32_t m_bytes_per_offset; + uint32_t m_num_fields; + tokudb::buffer* m_val_buffer; +}; + +// Return the ith variable length offset +uint32_t var_fields::read_offset(uint32_t var_index) { + uint32_t offset = 0; + m_val_buffer->read( + &offset, m_bytes_per_offset, m_var_offset + var_index * m_bytes_per_offset); + return offset; +} + +// Write the ith variable length offset with a new offset. +void var_fields::write_offset(uint32_t var_index, uint32_t new_offset) { + m_val_buffer->write( + &new_offset, + m_bytes_per_offset, + m_var_offset + var_index * m_bytes_per_offset); +} + +// Return the offset of the ith variable length field +uint32_t var_fields::value_offset(uint32_t var_index) { + assert_always(var_index < m_num_fields); + if (var_index == 0) + return m_val_offset; + else + return m_val_offset + read_offset(var_index-1); +} + +// Return the length of the ith variable length field +uint32_t var_fields::value_length(uint32_t var_index) { + assert_always(var_index < m_num_fields); + if (var_index == 0) + return read_offset(0); + else + return read_offset(var_index) - read_offset(var_index-1); +} + +// The length of the ith variable length fields changed. +// Update all of the subsequent offsets. +void var_fields::update_offsets( + uint32_t var_index, + uint32_t old_s, + uint32_t new_s) { + + assert_always(var_index < m_num_fields); + if (old_s == new_s) + return; + for (uint i = var_index; i < m_num_fields; i++) { + uint32_t v = read_offset(i); + if (new_s > old_s) + write_offset(i, v + (new_s - old_s)); + else + write_offset(i, v - (old_s - new_s)); + } +} + +uint32_t var_fields::end_offset() { + if (m_num_fields == 0) + return m_val_offset; + else + return m_val_offset + read_offset(m_num_fields-1); +} + +void var_fields::replace( + uint32_t var_index, + void* new_val_ptr, + uint32_t new_val_length) { + + // replace the new val with the extra val + uint32_t the_offset = value_offset(var_index); + uint32_t old_s = value_length(var_index); + uint32_t new_s = new_val_length; + m_val_buffer->replace(the_offset, old_s, new_val_ptr, new_s); + + // update the var offsets + update_offsets(var_index, old_s, new_s); +} + +class blob_fields { +public: + blob_fields() { + } + void init_blob_fields( + uint32_t num_blobs, + const uint8_t* blob_lengths, + tokudb::buffer* val_buffer) { + m_num_blobs = num_blobs; + m_blob_lengths = blob_lengths; + m_val_buffer = val_buffer; + } + void start_blobs(uint32_t offset) { + m_blob_offset = offset; + } + void replace(uint32_t blob_index, uint32_t length, void *p); + + void expand_length( + uint32_t blob_index, + uint8_t old_length_length, + uint8_t new_length_length); +private: + uint32_t read_length(uint32_t offset, size_t size); + void write_length(uint32_t offset, size_t size, uint32_t new_length); + uint32_t blob_offset(uint32_t blob_index); +private: + uint32_t m_blob_offset; + uint32_t m_num_blobs; + const uint8_t *m_blob_lengths; + tokudb::buffer *m_val_buffer; +}; + +uint32_t blob_fields::read_length(uint32_t offset, size_t blob_length) { + uint32_t length = 0; + m_val_buffer->read(&length, blob_length, offset); + return length; +} + +void blob_fields::write_length( + uint32_t offset, + size_t size, + uint32_t new_length) { + m_val_buffer->write(&new_length, size, offset); +} + +uint32_t blob_fields::blob_offset(uint32_t blob_index) { + assert_always(blob_index < m_num_blobs); + uint32_t offset = m_blob_offset; + for (uint i = 0; i < blob_index; i++) { + uint32_t blob_length = m_blob_lengths[i]; + uint32_t length = read_length(offset, blob_length); + offset += blob_length + length; + } + return offset; +} + +void blob_fields::replace( + uint32_t blob_index, + uint32_t new_length, + void* new_value) { + + assert_always(blob_index < m_num_blobs); + + // compute the ith blob offset + uint32_t offset = blob_offset(blob_index); + uint8_t blob_length = m_blob_lengths[blob_index]; + + // read the old length + uint32_t old_length = read_length(offset, blob_length); + + // replace the data + m_val_buffer->replace( + offset + blob_length, + old_length, + new_value, + new_length); + + // write the new length + write_length(offset, blob_length, new_length); +} + +void blob_fields::expand_length( + uint32_t blob_index, + uint8_t old_length_length, + uint8_t new_length_length) { + + assert_always(blob_index < m_num_blobs); + assert_always(old_length_length == m_blob_lengths[blob_index]); + + // compute the ith blob offset + uint32_t offset = blob_offset(blob_index); + + // read the blob length + uint32_t blob_length = read_length(offset, old_length_length); + + // expand the length + m_val_buffer->replace( + offset, + old_length_length, + &blob_length, + new_length_length); +} + +class value_map { +public: + value_map(tokudb::buffer *val_buffer) : m_val_buffer(val_buffer) { + } + + void init_var_fields( + uint32_t var_offset, + uint32_t offset_bytes, + uint32_t bytes_per_offset) { + + m_var_fields.init_var_fields( + var_offset, + offset_bytes, + bytes_per_offset, + m_val_buffer); + } + + void init_blob_fields(uint32_t num_blobs, const uint8_t *blob_lengths) { + m_blob_fields.init_blob_fields(num_blobs, blob_lengths, m_val_buffer); + } + + // Replace the value of a fixed length field + void replace_fixed( + uint32_t the_offset, + uint32_t field_null_num, + void* new_val_ptr, + uint32_t new_val_length) { + + m_val_buffer->replace( + the_offset, + new_val_length, + new_val_ptr, + new_val_length); + maybe_clear_null(field_null_num); + } + + // Replace the value of a variable length field + void replace_varchar( + uint32_t var_index, + uint32_t field_null_num, + void* new_val_ptr, + uint32_t new_val_length) { + + m_var_fields.replace(var_index, new_val_ptr, new_val_length); + maybe_clear_null(field_null_num); + } + + // Replace the value of a blob field + void replace_blob( + uint32_t blob_index, + uint32_t field_null_num, + void* new_val_ptr, + uint32_t new_val_length) { + + m_blob_fields.start_blobs(m_var_fields.end_offset()); + m_blob_fields.replace(blob_index, new_val_length, new_val_ptr); + maybe_clear_null(field_null_num); + } + + void expand_blob_lengths( + uint32_t num_blob, + const uint8_t* old_length, + const uint8_t* new_length); + + void int_op( + uint32_t operation, + uint32_t the_offset, + uint32_t length, + uint32_t field_null_num, + tokudb::buffer& old_val, + void* extra_val); + + void uint_op( + uint32_t operation, + uint32_t the_offset, + uint32_t length, + uint32_t field_null_num, + tokudb::buffer& old_val, + void* extra_val); + +private: + bool is_null(uint32_t null_num, uchar *null_bytes) { + bool field_is_null = false; + if (null_num) { + if (null_num & (1<<31)) + null_num &= ~(1<<31); + else + null_num -= 1; + field_is_null = is_overall_null_position_set(null_bytes, null_num); + } + return field_is_null; + } + + void maybe_clear_null(uint32_t null_num) { + if (null_num) { + if (null_num & (1<<31)) + null_num &= ~(1<<31); + else + null_num -= 1; + set_overall_null_position( + (uchar*)m_val_buffer->data(), + null_num, + false); + } + } + +private: + var_fields m_var_fields; + blob_fields m_blob_fields; + tokudb::buffer *m_val_buffer; +}; + +// Update an int field: signed newval@offset = old_val@offset OP extra_val +void value_map::int_op( + uint32_t operation, + uint32_t the_offset, + uint32_t length, + uint32_t field_null_num, + tokudb::buffer &old_val, + void* extra_val) { + + assert_always(the_offset + length <= m_val_buffer->size()); + assert_always(the_offset + length <= old_val.size()); + assert_always( + length == 1 || length == 2 || length == 3 || + length == 4 || length == 8); + + uchar *old_val_ptr = (uchar *) old_val.data(); + bool field_is_null = is_null(field_null_num, old_val_ptr); + int64_t v = 0; + memcpy(&v, old_val_ptr + the_offset, length); + v = tokudb::int_sign_extend(v, 8*length); + int64_t extra_v = 0; + memcpy(&extra_v, extra_val, length); + extra_v = tokudb::int_sign_extend(extra_v, 8*length); + switch (operation) { + case '+': + if (!field_is_null) { + bool over; + v = tokudb::int_add(v, extra_v, 8*length, &over); + if (over) { + if (extra_v > 0) + v = tokudb::int_high_endpoint(8*length); + else + v = tokudb::int_low_endpoint(8*length); + } + m_val_buffer->replace(the_offset, length, &v, length); + } + break; + case '-': + if (!field_is_null) { + bool over; + v = tokudb::int_sub(v, extra_v, 8*length, &over); + if (over) { + if (extra_v > 0) + v = tokudb::int_low_endpoint(8*length); + else + v = tokudb::int_high_endpoint(8*length); + } + m_val_buffer->replace(the_offset, length, &v, length); + } + break; + default: + assert_unreachable(); + } +} + +// Update an unsigned field: unsigned newval@offset = old_val@offset OP extra_val +void value_map::uint_op( + uint32_t operation, + uint32_t the_offset, + uint32_t length, + uint32_t field_null_num, + tokudb::buffer& old_val, + void* extra_val) { + + assert_always(the_offset + length <= m_val_buffer->size()); + assert_always(the_offset + length <= old_val.size()); + assert_always( + length == 1 || length == 2 || length == 3 || + length == 4 || length == 8); + + uchar *old_val_ptr = (uchar *) old_val.data(); + bool field_is_null = is_null(field_null_num, old_val_ptr); + uint64_t v = 0; + memcpy(&v, old_val_ptr + the_offset, length); + uint64_t extra_v = 0; + memcpy(&extra_v, extra_val, length); + switch (operation) { + case '+': + if (!field_is_null) { + bool over; + v = tokudb::uint_add(v, extra_v, 8*length, &over); + if (over) { + v = tokudb::uint_high_endpoint(8*length); + } + m_val_buffer->replace(the_offset, length, &v, length); + } + break; + case '-': + if (!field_is_null) { + bool over; + v = tokudb::uint_sub(v, extra_v, 8*length, &over); + if (over) { + v = tokudb::uint_low_endpoint(8*length); + } + m_val_buffer->replace(the_offset, length, &v, length); + } + break; + default: + assert_unreachable(); + } +} + +void value_map::expand_blob_lengths( + uint32_t num_blob, + const uint8_t* old_length, + const uint8_t* new_length) { + + uint8_t current_length[num_blob]; + memcpy(current_length, old_length, num_blob); + for (uint32_t i = 0; i < num_blob; i++) { + if (new_length[i] > current_length[i]) { + m_blob_fields.init_blob_fields( + num_blob, + current_length, + m_val_buffer); + m_blob_fields.start_blobs(m_var_fields.end_offset()); + m_blob_fields.expand_length(i, current_length[i], new_length[i]); + current_length[i] = new_length[i]; + } + } +} + +} + +static uint32_t consume_uint32(tokudb::buffer &b) { + uint32_t n; + size_t s = b.consume_ui<uint32_t>(&n); + assert_always(s > 0); + return n; +} + +static uint8_t *consume_uint8_array(tokudb::buffer &b, uint32_t array_size) { + uint8_t *p = (uint8_t *) b.consume_ptr(array_size); + assert_always(p); + return p; +} + +static int tokudb_expand_blobs(const DBT* old_val_dbt, + const DBT* extra, + void (*set_val)(const DBT* new_val_dbt, + void* set_extra), + void* set_extra) { + tokudb::buffer extra_val(extra->data, 0, extra->size); + + uint8_t operation; + extra_val.consume(&operation, sizeof operation); + assert_always(operation == UPDATE_OP_EXPAND_BLOB); + + if (old_val_dbt != NULL) { + // new val = old val + tokudb::buffer new_val; + new_val.append(old_val_dbt->data, old_val_dbt->size); + + tokudb::value_map vd(&new_val); + + // decode variable field info + uint32_t var_field_offset = consume_uint32(extra_val); + uint32_t var_offset_bytes = consume_uint32(extra_val); + uint32_t bytes_per_offset = consume_uint32(extra_val); + vd.init_var_fields( + var_field_offset, + var_offset_bytes, + bytes_per_offset); + + // decode blob info + uint32_t num_blob = consume_uint32(extra_val); + const uint8_t* old_blob_length = + consume_uint8_array(extra_val, num_blob); + const uint8_t* new_blob_length = + consume_uint8_array(extra_val, num_blob); + assert_always(extra_val.size() == extra_val.limit()); + + // expand blob lengths + vd.expand_blob_lengths(num_blob, old_blob_length, new_blob_length); + + // set the new val + DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); + new_val_dbt.data = new_val.data(); + new_val_dbt.size = new_val.size(); + set_val(&new_val_dbt, set_extra); + } + return 0; +} + +// Decode and apply a sequence of update operations defined in the extra to +// the old value and put the result in the new value. +static void apply_1_updates(tokudb::value_map& vd, + tokudb::buffer& old_val, + tokudb::buffer& extra_val) { + uint32_t num_updates; + extra_val.consume(&num_updates, sizeof num_updates); + for ( ; num_updates > 0; num_updates--) { + // get the update operation + uint32_t update_operation; + extra_val.consume(&update_operation, sizeof update_operation); + uint32_t field_type; + extra_val.consume(&field_type, sizeof field_type); + uint32_t unused; + extra_val.consume(&unused, sizeof unused); + uint32_t field_null_num; + extra_val.consume(&field_null_num, sizeof field_null_num); + uint32_t the_offset; + extra_val.consume(&the_offset, sizeof the_offset); + uint32_t extra_val_length; + extra_val.consume(&extra_val_length, sizeof extra_val_length); + void *extra_val_ptr = extra_val.consume_ptr(extra_val_length); + + // apply the update + switch (field_type) { + case UPDATE_TYPE_INT: + if (update_operation == '=') + vd.replace_fixed( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + vd.int_op( + update_operation, + the_offset, + extra_val_length, + field_null_num, + old_val, + extra_val_ptr); + break; + case UPDATE_TYPE_UINT: + if (update_operation == '=') + vd.replace_fixed( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + vd.uint_op( + update_operation, + the_offset, + extra_val_length, + field_null_num, + old_val, + extra_val_ptr); + break; + case UPDATE_TYPE_CHAR: + case UPDATE_TYPE_BINARY: + if (update_operation == '=') + vd.replace_fixed( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + assert_unreachable(); + break; + default: + assert_unreachable(); + break; + } + } + assert_always(extra_val.size() == extra_val.limit()); +} + +// Simple update handler. Decode the update message, apply the update operations +// to the old value, and set the new value. +static int tokudb_update_1_fun(const DBT* old_val_dbt, + const DBT* extra, + void (*set_val)(const DBT* new_val_dbt, + void* set_extra), + void* set_extra) { + tokudb::buffer extra_val(extra->data, 0, extra->size); + + uint8_t operation; + extra_val.consume(&operation, sizeof operation); + assert_always(operation == UPDATE_OP_UPDATE_1); + + if (old_val_dbt != NULL) { + // get the simple descriptor + uint32_t m_fixed_field_offset; + extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset); + uint32_t m_var_field_offset; + extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset); + uint32_t m_var_offset_bytes; + extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes); + uint32_t m_bytes_per_offset; + extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset); + + tokudb::buffer old_val( + old_val_dbt->data, + old_val_dbt->size, + old_val_dbt->size); + + // new val = old val + tokudb::buffer new_val; + new_val.append(old_val_dbt->data, old_val_dbt->size); + + tokudb::value_map vd(&new_val); + vd.init_var_fields( + m_var_field_offset, + m_var_offset_bytes, + m_bytes_per_offset); + + // apply updates to new val + apply_1_updates(vd, old_val, extra_val); + + // set the new val + DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); + new_val_dbt.data = new_val.data(); + new_val_dbt.size = new_val.size(); + set_val(&new_val_dbt, set_extra); + } + + return 0; +} + +// Simple upsert handler. Decode the upsert message. If the key does not exist, +// then insert a new value from the extra. +// Otherwise, apply the update operations to the old value, and then set the +// new value. +static int tokudb_upsert_1_fun(const DBT* old_val_dbt, + const DBT* extra, + void (*set_val)(const DBT* new_val_dbt, + void* set_extra), + void* set_extra) { + tokudb::buffer extra_val(extra->data, 0, extra->size); + + uint8_t operation; + extra_val.consume(&operation, sizeof operation); + assert_always(operation == UPDATE_OP_UPSERT_1); + + uint32_t insert_length; + extra_val.consume(&insert_length, sizeof insert_length); + void *insert_row = extra_val.consume_ptr(insert_length); + + if (old_val_dbt == NULL) { + // insert a new row + DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); + new_val_dbt.size = insert_length; + new_val_dbt.data = insert_row; + set_val(&new_val_dbt, set_extra); + } else { + // decode the simple descriptor + uint32_t m_fixed_field_offset; + extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset); + uint32_t m_var_field_offset; + extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset); + uint32_t m_var_offset_bytes; + extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes); + uint32_t m_bytes_per_offset; + extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset); + + tokudb::buffer old_val( + old_val_dbt->data, + old_val_dbt->size, + old_val_dbt->size); + + // new val = old val + tokudb::buffer new_val; + new_val.append(old_val_dbt->data, old_val_dbt->size); + + tokudb::value_map vd(&new_val); + vd.init_var_fields( + m_var_field_offset, + m_var_offset_bytes, + m_bytes_per_offset); + + // apply updates to new val + apply_1_updates(vd, old_val, extra_val); + + // set the new val + DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); + new_val_dbt.data = new_val.data(); + new_val_dbt.size = new_val.size(); + set_val(&new_val_dbt, set_extra); + } + + return 0; +} + +// Decode and apply a sequence of update operations defined in the extra to the +// old value and put the result in the new value. +static void apply_2_updates(tokudb::value_map& vd, + tokudb::buffer& old_val, + tokudb::buffer& extra_val) { + uint32_t num_updates = consume_uint32(extra_val); + for (uint32_t i = 0; i < num_updates; i++) { + uint32_t update_operation = consume_uint32(extra_val); + if (update_operation == 'v') { + uint32_t var_field_offset = consume_uint32(extra_val); + uint32_t var_offset_bytes = consume_uint32(extra_val); + uint32_t bytes_per_offset = consume_uint32(extra_val); + vd.init_var_fields( + var_field_offset, + var_offset_bytes, + bytes_per_offset); + } else if (update_operation == 'b') { + uint32_t num_blobs = consume_uint32(extra_val); + const uint8_t* blob_lengths = + consume_uint8_array(extra_val, num_blobs); + vd.init_blob_fields(num_blobs, blob_lengths); + } else { + uint32_t field_type = consume_uint32(extra_val); + uint32_t field_null_num = consume_uint32(extra_val); + uint32_t the_offset = consume_uint32(extra_val); + uint32_t extra_val_length = consume_uint32(extra_val); + void* extra_val_ptr = extra_val.consume_ptr(extra_val_length); + assert_always(extra_val_ptr); + + switch (field_type) { + case UPDATE_TYPE_INT: + if (update_operation == '=') + vd.replace_fixed( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + vd.int_op( + update_operation, + the_offset, + extra_val_length, + field_null_num, + old_val, + extra_val_ptr); + break; + case UPDATE_TYPE_UINT: + if (update_operation == '=') + vd.replace_fixed( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + vd.uint_op( + update_operation, + the_offset, + extra_val_length, + field_null_num, + old_val, + extra_val_ptr); + break; + case UPDATE_TYPE_CHAR: + case UPDATE_TYPE_BINARY: + if (update_operation == '=') + vd.replace_fixed( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + assert_unreachable(); + break; + case UPDATE_TYPE_VARBINARY: + case UPDATE_TYPE_VARCHAR: + if (update_operation == '=') + vd.replace_varchar( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + assert_unreachable(); + break; + case UPDATE_TYPE_TEXT: + case UPDATE_TYPE_BLOB: + if (update_operation == '=') + vd.replace_blob( + the_offset, + field_null_num, + extra_val_ptr, + extra_val_length); + else + assert_unreachable(); + break; + default: + assert_unreachable(); + } + } + } + assert_always(extra_val.size() == extra_val.limit()); +} + +// Simple update handler. Decode the update message, apply the update +// operations to the old value, and set the new value. +static int tokudb_update_2_fun(const DBT* old_val_dbt, + const DBT* extra, + void (*set_val)(const DBT* new_val_dbt, + void* set_extra), + void* set_extra) { + tokudb::buffer extra_val(extra->data, 0, extra->size); + + uint8_t op; + extra_val.consume(&op, sizeof op); + assert_always(op == UPDATE_OP_UPDATE_2); + + if (old_val_dbt != NULL) { + tokudb::buffer old_val( + old_val_dbt->data, + old_val_dbt->size, + old_val_dbt->size); + + // new val = old val + tokudb::buffer new_val; + new_val.append(old_val_dbt->data, old_val_dbt->size); + + tokudb::value_map vd(&new_val); + + // apply updates to new val + apply_2_updates(vd, old_val, extra_val); + + // set the new val + DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); + new_val_dbt.data = new_val.data(); + new_val_dbt.size = new_val.size(); + set_val(&new_val_dbt, set_extra); + } + + return 0; +} + +// Simple upsert handler. Decode the upsert message. If the key does not exist, +// then insert a new value from the extra. +// Otherwise, apply the update operations to the old value, and then set the +// new value. +static int tokudb_upsert_2_fun(const DBT* old_val_dbt, + const DBT* extra, + void (*set_val)(const DBT* new_val_dbt, + void* set_extra), + void* set_extra) { + tokudb::buffer extra_val(extra->data, 0, extra->size); + + uint8_t op; + extra_val.consume(&op, sizeof op); + assert_always(op == UPDATE_OP_UPSERT_2); + + uint32_t insert_length = consume_uint32(extra_val); + assert_always(insert_length < extra_val.limit()); + void* insert_row = extra_val.consume_ptr(insert_length); + assert_always(insert_row); + + if (old_val_dbt == NULL) { + // insert a new row + DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); + new_val_dbt.size = insert_length; + new_val_dbt.data = insert_row; + set_val(&new_val_dbt, set_extra); + } else { + tokudb::buffer old_val( + old_val_dbt->data, + old_val_dbt->size, + old_val_dbt->size); + + // new val = old val + tokudb::buffer new_val; + new_val.append(old_val_dbt->data, old_val_dbt->size); + + tokudb::value_map vd(&new_val); + + // apply updates to new val + apply_2_updates(vd, old_val, extra_val); + + // set the new val + DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); + new_val_dbt.data = new_val.data(); + new_val_dbt.size = new_val.size(); + set_val(&new_val_dbt, set_extra); + } + + return 0; +} + +// This function is the update callback function that is registered with the +// YDB environment. It uses the first byte in the update message to identify +// the update message type and call the handler for that message. +int tokudb_update_fun(TOKUDB_UNUSED(DB* db), + TOKUDB_UNUSED(const DBT* key), + const DBT* old_val, + const DBT* extra, + void (*set_val)(const DBT* new_val, void* set_extra), + void* set_extra) { + assert_always(extra->size > 0); + uint8_t* extra_pos = (uchar*)extra->data; + uint8_t operation = extra_pos[0]; + int error; + switch (operation) { + case UPDATE_OP_COL_ADD_OR_DROP: + error = tokudb_hcad_update_fun(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_EXPAND_VARIABLE_OFFSETS: + error = + tokudb_expand_variable_offsets(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_EXPAND_INT: + case UPDATE_OP_EXPAND_UINT: + error = tokudb_expand_int_field(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_EXPAND_CHAR: + case UPDATE_OP_EXPAND_BINARY: + error = tokudb_expand_char_field(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_EXPAND_BLOB: + error = tokudb_expand_blobs(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_UPDATE_1: + error = tokudb_update_1_fun(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_UPSERT_1: + error = tokudb_upsert_1_fun(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_UPDATE_2: + error = tokudb_update_2_fun(old_val, extra, set_val, set_extra); + break; + case UPDATE_OP_UPSERT_2: + error = tokudb_upsert_2_fun(old_val, extra, set_val, set_extra); + break; + default: + assert_unreachable(); + } + return error; +} |