diff options
Diffstat (limited to '')
-rw-r--r-- | storage/innobase/row/row0log.cc | 4134 |
1 files changed, 4134 insertions, 0 deletions
diff --git a/storage/innobase/row/row0log.cc b/storage/innobase/row/row0log.cc new file mode 100644 index 00000000..c4f46304 --- /dev/null +++ b/storage/innobase/row/row0log.cc @@ -0,0 +1,4134 @@ +/***************************************************************************** + +Copyright (c) 2011, 2018, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2017, 2023, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file row/row0log.cc +Modification log for online index creation and online table rebuild + +Created 2011-05-26 Marko Makela +*******************************************************/ + +#include "row0log.h" +#include "row0row.h" +#include "row0ins.h" +#include "row0upd.h" +#include "row0merge.h" +#include "row0ext.h" +#include "log0crypt.h" +#include "data0data.h" +#include "que0que.h" +#include "srv0mon.h" +#include "handler0alter.h" +#include "ut0stage.h" +#include "trx0rec.h" + +#include <sql_class.h> +#include <algorithm> +#include <map> + +Atomic_counter<ulint> onlineddl_rowlog_rows; +ulint onlineddl_rowlog_pct_used; +ulint onlineddl_pct_progress; + +/** Table row modification operations during online table rebuild. +Delete-marked records are not copied to the rebuilt table. */ +enum row_tab_op { + /** Insert a record */ + ROW_T_INSERT = 0x41, + /** Update a record in place */ + ROW_T_UPDATE, + /** Delete (purge) a record */ + ROW_T_DELETE +}; + +/** Index record modification operations during online index creation */ +enum row_op { + /** Insert a record */ + ROW_OP_INSERT = 0x61, + /** Delete a record */ + ROW_OP_DELETE +}; + +/** Size of the modification log entry header, in bytes */ +#define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/ + +/** Log block for modifications during online ALTER TABLE */ +struct row_log_buf_t { + byte* block; /*!< file block buffer */ + size_t size; /*!< length of block in bytes */ + ut_new_pfx_t block_pfx; /*!< opaque descriptor of "block". Set + by ut_allocator::allocate_large() and fed to + ut_allocator::deallocate_large(). */ + mrec_buf_t buf; /*!< buffer for accessing a record + that spans two blocks */ + ulint blocks; /*!< current position in blocks */ + ulint bytes; /*!< current position within block */ + ulonglong total; /*!< logical position, in bytes from + the start of the row_log_table log; + 0 for row_log_online_op() and + row_log_apply(). */ +}; + +/** @brief Buffer for logging modifications during online index creation + +All modifications to an index that is being created will be logged by +row_log_online_op() to this buffer. + +All modifications to a table that is being rebuilt will be logged by +row_log_table_delete(), row_log_table_update(), row_log_table_insert() +to this buffer. + +When head.blocks == tail.blocks, the reader will access tail.block +directly. When also head.bytes == tail.bytes, both counts will be +reset to 0 and the file will be truncated. */ +struct row_log_t { + pfs_os_file_t fd; /*!< file descriptor */ + mysql_mutex_t mutex; /*!< mutex protecting error, + max_trx and tail */ + dict_table_t* table; /*!< table that is being rebuilt, + or NULL when this is a secondary + index that is being created online */ + bool same_pk;/*!< whether the definition of the PRIMARY KEY + has remained the same */ + const dtuple_t* defaults; + /*!< default values of added, changed columns, + or NULL */ + const ulint* col_map;/*!< mapping of old column numbers to + new ones, or NULL if !table */ + dberr_t error; /*!< error that occurred during online + table rebuild */ + /** The transaction ID of the ALTER TABLE transaction. Any + concurrent DML would necessarily be logged with a larger + transaction ID, because ha_innobase::prepare_inplace_alter_table() + acts as a barrier that ensures that any concurrent transaction + that operates on the table would have been started after + ha_innobase::prepare_inplace_alter_table() returns and before + ha_innobase::commit_inplace_alter_table(commit=true) is invoked. + + Due to the nondeterministic nature of purge and due to the + possibility of upgrading from an earlier version of MariaDB + or MySQL, it is possible that row_log_table_low() would be + fed DB_TRX_ID that precedes than min_trx. We must normalize + such references to reset_trx_id[]. */ + trx_id_t min_trx; + trx_id_t max_trx;/*!< biggest observed trx_id in + row_log_online_op(); + protected by mutex and index->lock S-latch, + or by index->lock X-latch only */ + row_log_buf_t tail; /*!< writer context; + protected by mutex and index->lock S-latch, + or by index->lock X-latch only */ + size_t crypt_tail_size; /*!< size of crypt_tail_size*/ + byte* crypt_tail; /*!< writer context; + temporary buffer used in encryption, + decryption or NULL*/ + row_log_buf_t head; /*!< reader context; protected by MDL only; + modifiable by row_log_apply_ops() */ + size_t crypt_head_size; /*!< size of crypt_tail_size*/ + byte* crypt_head; /*!< reader context; + temporary buffer used in encryption, + decryption or NULL */ + const char* path; /*!< where to create temporary file during + log operation */ + /** the number of core fields in the clustered index of the + source table; before row_log_table_apply() completes, the + table could be emptied, so that table->is_instant() no longer holds, + but all log records must be in the "instant" format. */ + unsigned n_core_fields; + /** the default values of non-core fields when the operation started */ + dict_col_t::def_t* non_core_fields; + bool allow_not_null; /*!< Whether the alter ignore is being + used or if the sql mode is non-strict mode; + if not, NULL values will not be converted to + defaults */ + const TABLE* old_table; /*< Use old table in case of error. */ + + uint64_t n_rows; /*< Number of rows read from the table */ + + /** Alter table transaction. It can be used to apply the DML logs + into the table */ + const trx_t* alter_trx; + + /** Determine whether the log should be in the 'instant ADD' format + @param[in] index the clustered index of the source table + @return whether to use the 'instant ADD COLUMN' format */ + bool is_instant(const dict_index_t* index) const + { + ut_ad(table); + ut_ad(n_core_fields <= index->n_fields); + return n_core_fields != index->n_fields; + } + + const byte* instant_field_value(ulint n, ulint* len) const + { + ut_ad(n >= n_core_fields); + const dict_col_t::def_t& d= non_core_fields[n - n_core_fields]; + *len = d.len; + return static_cast<const byte*>(d.data); + } +}; + +/** Create the file or online log if it does not exist. +@param[in,out] log online rebuild log +@return true if success, false if not */ +static MY_ATTRIBUTE((warn_unused_result)) +pfs_os_file_t +row_log_tmpfile( + row_log_t* log) +{ + DBUG_ENTER("row_log_tmpfile"); + if (log->fd == OS_FILE_CLOSED) { + log->fd = row_merge_file_create_low(log->path); + DBUG_EXECUTE_IF("row_log_tmpfile_fail", + if (log->fd != OS_FILE_CLOSED) + row_merge_file_destroy_low(log->fd); + log->fd = OS_FILE_CLOSED;); + if (log->fd != OS_FILE_CLOSED) { + MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_LOG_FILES); + } + } + + DBUG_RETURN(log->fd); +} + +/** Allocate the memory for the log buffer. +@param[in,out] log_buf Buffer used for log operation +@return TRUE if success, false if not */ +static MY_ATTRIBUTE((warn_unused_result)) +bool +row_log_block_allocate( + row_log_buf_t& log_buf) +{ + DBUG_ENTER("row_log_block_allocate"); + if (log_buf.block == NULL) { + DBUG_EXECUTE_IF( + "simulate_row_log_allocation_failure", + DBUG_RETURN(false); + ); + + log_buf.block = ut_allocator<byte>(mem_key_row_log_buf) + .allocate_large(srv_sort_buf_size, + &log_buf.block_pfx); + + if (log_buf.block == NULL) { + DBUG_RETURN(false); + } + log_buf.size = srv_sort_buf_size; + } + DBUG_RETURN(true); +} + +/** Free the log buffer. +@param[in,out] log_buf Buffer used for log operation */ +static +void +row_log_block_free( + row_log_buf_t& log_buf) +{ + DBUG_ENTER("row_log_block_free"); + if (log_buf.block != NULL) { + ut_allocator<byte>(mem_key_row_log_buf).deallocate_large( + log_buf.block, &log_buf.block_pfx); + log_buf.block = NULL; + } + DBUG_VOID_RETURN; +} + +/** Logs an operation to a secondary index that is (or was) being created. +@param index index, S or X latched +@param tuple index tuple +@param trx_id transaction ID for insert, or 0 for delete +@retval false if row_log_apply() failure happens +or true otherwise */ +bool row_log_online_op(dict_index_t *index, const dtuple_t *tuple, + trx_id_t trx_id) +{ + byte* b; + ulint extra_size; + ulint size; + ulint mrec_size; + ulint avail_size; + row_log_t* log; + bool success= true; + + ut_ad(dtuple_validate(tuple)); + ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index)); + ut_ad(index->lock.have_x() || index->lock.have_s()); + + if (index->is_corrupted()) { + return success; + } + + ut_ad(dict_index_is_online_ddl(index) + || (index->online_log + && index->online_status == ONLINE_INDEX_COMPLETE)); + + /* Compute the size of the record. This differs from + row_merge_buf_encode(), because here we do not encode + extra_size+1 (and reserve 0 as the end-of-chunk marker). */ + + size = rec_get_converted_size_temp<false>( + index, tuple->fields, tuple->n_fields, &extra_size); + ut_ad(size >= extra_size); + ut_ad(size <= sizeof log->tail.buf); + + mrec_size = ROW_LOG_HEADER_SIZE + + (extra_size >= 0x80) + size + + (trx_id ? DATA_TRX_ID_LEN : 0); + + log = index->online_log; + mysql_mutex_lock(&log->mutex); + +start_log: + if (trx_id > log->max_trx) { + log->max_trx = trx_id; + } + + if (!row_log_block_allocate(log->tail)) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); + + ut_ad(log->tail.bytes < srv_sort_buf_size); + avail_size = srv_sort_buf_size - log->tail.bytes; + + if (mrec_size > avail_size) { + b = log->tail.buf; + } else { + b = log->tail.block + log->tail.bytes; + } + + if (trx_id != 0) { + *b++ = ROW_OP_INSERT; + trx_write_trx_id(b, trx_id); + b += DATA_TRX_ID_LEN; + } else { + *b++ = ROW_OP_DELETE; + } + + if (extra_size < 0x80) { + *b++ = (byte) extra_size; + } else { + ut_ad(extra_size < 0x8000); + *b++ = (byte) (0x80 | (extra_size >> 8)); + *b++ = (byte) extra_size; + } + + rec_convert_dtuple_to_temp<false>( + b + extra_size, index, tuple->fields, tuple->n_fields); + + b += size; + + if (mrec_size >= avail_size) { + const os_offset_t byte_offset + = (os_offset_t) log->tail.blocks + * srv_sort_buf_size; + byte* buf = log->tail.block; + + if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { + if (index->online_status != ONLINE_INDEX_COMPLETE) + goto write_failed; + /* About to run out of log, InnoDB has to + apply the online log for the completed index */ + index->lock.s_unlock(); + dberr_t error= row_log_apply( + log->alter_trx, index, nullptr, nullptr); + index->lock.s_lock(SRW_LOCK_CALL); + if (error != DB_SUCCESS) { + /* Mark all newly added indexes + as corrupted */ + log->error = error; + success = false; + goto err_exit; + } + + /* Recheck whether the index online log */ + if (!index->online_log) { + goto err_exit; + } + + goto start_log; + } + + if (mrec_size == avail_size) { + ut_ad(b == &buf[srv_sort_buf_size]); + } else { + ut_ad(b == log->tail.buf + mrec_size); + memcpy(buf + log->tail.bytes, + log->tail.buf, avail_size); + } + + MEM_CHECK_DEFINED(buf, srv_sort_buf_size); + + if (row_log_tmpfile(log) == OS_FILE_CLOSED) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + /* If encryption is enabled encrypt buffer before writing it + to file system. */ + if (srv_encrypt_log) { + if (!log_tmp_block_encrypt( + buf, srv_sort_buf_size, + log->crypt_tail, byte_offset)) { + log->error = DB_DECRYPTION_FAILED; + goto write_failed; + } + + srv_stats.n_rowlog_blocks_encrypted.inc(); + buf = log->crypt_tail; + } + + log->tail.blocks++; + if (os_file_write( + IORequestWrite, + "(modification log)", + log->fd, + buf, byte_offset, srv_sort_buf_size) + != DB_SUCCESS) { +write_failed: + index->type |= DICT_CORRUPT; + } + + MEM_UNDEFINED(log->tail.block, srv_sort_buf_size); + MEM_UNDEFINED(buf, srv_sort_buf_size); + + memcpy(log->tail.block, log->tail.buf + avail_size, + mrec_size - avail_size); + log->tail.bytes = mrec_size - avail_size; + } else { + log->tail.bytes += mrec_size; + ut_ad(b == log->tail.block + log->tail.bytes); + } + + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); +err_exit: + mysql_mutex_unlock(&log->mutex); + return success; +} + +/******************************************************//** +Gets the error status of the online index rebuild log. +@return DB_SUCCESS or error code */ +dberr_t +row_log_table_get_error( +/*====================*/ + const dict_index_t* index) /*!< in: clustered index of a table + that is being rebuilt online */ +{ + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + return(index->online_log->error); +} + +/******************************************************//** +Starts logging an operation to a table that is being rebuilt. +@return pointer to log, or NULL if no logging is necessary */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +byte* +row_log_table_open( +/*===============*/ + row_log_t* log, /*!< in/out: online rebuild log */ + ulint size, /*!< in: size of log record */ + ulint* avail) /*!< out: available size for log record */ +{ + mysql_mutex_lock(&log->mutex); + + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); + + if (log->error != DB_SUCCESS) { +err_exit: + mysql_mutex_unlock(&log->mutex); + return(NULL); + } + + if (!row_log_block_allocate(log->tail)) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + ut_ad(log->tail.bytes < srv_sort_buf_size); + *avail = srv_sort_buf_size - log->tail.bytes; + + if (size > *avail) { + /* Make sure log->tail.buf is large enough */ + ut_ad(size <= sizeof log->tail.buf); + return(log->tail.buf); + } else { + return(log->tail.block + log->tail.bytes); + } +} + +/******************************************************//** +Stops logging an operation to a table that is being rebuilt. */ +static MY_ATTRIBUTE((nonnull)) +void +row_log_table_close_func( +/*=====================*/ + dict_index_t* index, /*!< in/out: online rebuilt index */ +#ifdef UNIV_DEBUG + const byte* b, /*!< in: end of log record */ +#endif /* UNIV_DEBUG */ + ulint size, /*!< in: size of log record */ + ulint avail) /*!< in: available size for log record */ +{ + row_log_t* log = index->online_log; + + mysql_mutex_assert_owner(&log->mutex); + + if (size >= avail) { + const os_offset_t byte_offset + = (os_offset_t) log->tail.blocks + * srv_sort_buf_size; + byte* buf = log->tail.block; + + if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { + goto write_failed; + } + + if (size == avail) { + ut_ad(b == &buf[srv_sort_buf_size]); + } else { + ut_ad(b == log->tail.buf + size); + memcpy(buf + log->tail.bytes, log->tail.buf, avail); + } + + MEM_CHECK_DEFINED(buf, srv_sort_buf_size); + + if (row_log_tmpfile(log) == OS_FILE_CLOSED) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + /* If encryption is enabled encrypt buffer before writing it + to file system. */ + if (srv_encrypt_log) { + if (!log_tmp_block_encrypt( + log->tail.block, srv_sort_buf_size, + log->crypt_tail, byte_offset, + index->table->space_id)) { + log->error = DB_DECRYPTION_FAILED; + goto err_exit; + } + + srv_stats.n_rowlog_blocks_encrypted.inc(); + buf = log->crypt_tail; + } + + log->tail.blocks++; + if (os_file_write( + IORequestWrite, + "(modification log)", + log->fd, + buf, byte_offset, srv_sort_buf_size) + != DB_SUCCESS) { +write_failed: + log->error = DB_ONLINE_LOG_TOO_BIG; + } + + MEM_UNDEFINED(log->tail.block, srv_sort_buf_size); + MEM_UNDEFINED(buf, srv_sort_buf_size); + memcpy(log->tail.block, log->tail.buf + avail, size - avail); + log->tail.bytes = size - avail; + } else { + log->tail.bytes += size; + ut_ad(b == log->tail.block + log->tail.bytes); + } + + log->tail.total += size; + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); +err_exit: + mysql_mutex_unlock(&log->mutex); + + onlineddl_rowlog_rows++; + /* 10000 means 100.00%, 4525 means 45.25% */ + onlineddl_rowlog_pct_used = static_cast<ulint>((log->tail.total * 10000) / srv_online_max_size); +} + +#ifdef UNIV_DEBUG +# define row_log_table_close(index, b, size, avail) \ + row_log_table_close_func(index, b, size, avail) +#else /* UNIV_DEBUG */ +# define row_log_table_close(log, b, size, avail) \ + row_log_table_close_func(index, size, avail) +#endif /* UNIV_DEBUG */ + +/** Check whether a virtual column is indexed in the new table being +created during alter table +@param[in] index cluster index +@param[in] v_no virtual column number +@return true if it is indexed, else false */ +bool +row_log_col_is_indexed( + const dict_index_t* index, + ulint v_no) +{ + return(dict_table_get_nth_v_col( + index->online_log->table, v_no)->m_col.ord_part); +} + +/******************************************************//** +Logs a delete operation to a table that is being rebuilt. +This will be merged in row_log_table_apply_delete(). */ +void +row_log_table_delete( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + const byte* sys) /*!< in: DB_TRX_ID,DB_ROLL_PTR that should + be logged, or NULL to use those in rec */ +{ + ulint old_pk_extra_size; + ulint old_pk_size; + ulint mrec_size; + ulint avail_size; + mem_heap_t* heap = NULL; + const dtuple_t* old_pk; + + ut_ad(dict_index_is_clust(index)); + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); + ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf); + ut_ad(index->lock.have_any()); + + if (index->online_status != ONLINE_INDEX_CREATION + || (index->type & DICT_CORRUPT) || index->table->corrupted + || index->online_log->error != DB_SUCCESS) { + return; + } + + dict_table_t* new_table = index->online_log->table; + dict_index_t* new_index = dict_table_get_first_index(new_table); + + ut_ad(dict_index_is_clust(new_index)); + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(index->online_log->min_trx); + + /* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */ + if (index->online_log->same_pk) { + dtuple_t* tuple; + ut_ad(new_index->n_uniq == index->n_uniq); + + /* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first + fields of the record. */ + heap = mem_heap_create( + DATA_TRX_ID_LEN + + DTUPLE_EST_ALLOC(new_index->first_user_field())); + old_pk = tuple = dtuple_create(heap, + new_index->first_user_field()); + dict_index_copy_types(tuple, new_index, tuple->n_fields); + dtuple_set_n_fields_cmp(tuple, new_index->n_uniq); + + for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) { + ulint len; + const void* field = rec_get_nth_field( + rec, offsets, i, &len); + dfield_t* dfield = dtuple_get_nth_field( + tuple, i); + ut_ad(len != UNIV_SQL_NULL); + ut_ad(!rec_offs_nth_extern(offsets, i)); + dfield_set_data(dfield, field, len); + } + + dfield_t* db_trx_id = dtuple_get_nth_field( + tuple, new_index->n_uniq); + + const bool replace_sys_fields + = sys + || trx_read_trx_id(static_cast<byte*>(db_trx_id->data)) + < index->online_log->min_trx; + + if (replace_sys_fields) { + if (!sys || trx_read_trx_id(sys) + < index->online_log->min_trx) { + sys = reset_trx_id; + } + + dfield_set_data(db_trx_id, sys, DATA_TRX_ID_LEN); + dfield_set_data(db_trx_id + 1, sys + DATA_TRX_ID_LEN, + DATA_ROLL_PTR_LEN); + } + + ut_d(trx_id_check(db_trx_id->data, + index->online_log->min_trx)); + } else { + /* The PRIMARY KEY has changed. Translate the tuple. */ + old_pk = row_log_table_get_pk( + rec, index, offsets, NULL, &heap); + + if (!old_pk) { + ut_ad(index->online_log->error != DB_SUCCESS); + if (heap) { + goto func_exit; + } + return; + } + } + + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + old_pk_size = rec_get_converted_size_temp<false>( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + + /* 2 = 1 (extra_size) + at least 1 byte payload */ + mrec_size = 2 + old_pk_size; + + if (byte* b = row_log_table_open(index->online_log, + mrec_size, &avail_size)) { + *b++ = ROW_T_DELETE; + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp<false>( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + + b += old_pk_size; + + row_log_table_close(index, b, mrec_size, avail_size); + } + +func_exit: + mem_heap_free(heap); +} + +/******************************************************//** +Logs an insert or update to a table that is being rebuilt. */ +static +void +row_log_table_low_redundant( +/*========================*/ + const rec_t* rec, /*!< in: clustered index leaf + page record in ROW_FORMAT=REDUNDANT, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + bool insert, /*!< in: true if insert, + false if update */ + const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value + (if !insert and a PRIMARY KEY + is being created) */ + const dict_index_t* new_index) + /*!< in: clustered index of the + new table, not latched */ +{ + ulint old_pk_size; + ulint old_pk_extra_size; + ulint size; + ulint extra_size; + ulint mrec_size; + ulint avail_size; + mem_heap_t* heap = NULL; + dtuple_t* tuple; + const ulint n_fields = rec_get_n_fields_old(rec); + + ut_ad(index->n_fields >= n_fields); + ut_ad(index->n_fields == n_fields || index->is_instant()); + ut_ad(dict_tf2_is_valid(index->table->flags, index->table->flags2)); + ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */ + ut_ad(dict_index_is_clust(new_index)); + + heap = mem_heap_create(DTUPLE_EST_ALLOC(n_fields)); + tuple = dtuple_create(heap, n_fields); + dict_index_copy_types(tuple, index, n_fields); + + dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index)); + + if (rec_get_1byte_offs_flag(rec)) { + for (ulint i = 0; i < n_fields; i++) { + dfield_t* dfield; + ulint len; + const void* field; + + dfield = dtuple_get_nth_field(tuple, i); + field = rec_get_nth_field_old(rec, i, &len); + + dfield_set_data(dfield, field, len); + } + } else { + for (ulint i = 0; i < n_fields; i++) { + dfield_t* dfield; + ulint len; + const void* field; + + dfield = dtuple_get_nth_field(tuple, i); + field = rec_get_nth_field_old(rec, i, &len); + + dfield_set_data(dfield, field, len); + + if (rec_2_is_field_extern(rec, i)) { + dfield_set_ext(dfield); + } + } + } + + dfield_t* db_trx_id = dtuple_get_nth_field(tuple, index->n_uniq); + ut_ad(dfield_get_len(db_trx_id) == DATA_TRX_ID_LEN); + ut_ad(dfield_get_len(db_trx_id + 1) == DATA_ROLL_PTR_LEN); + + if (trx_read_trx_id(static_cast<const byte*> + (dfield_get_data(db_trx_id))) + < index->online_log->min_trx) { + dfield_set_data(db_trx_id, reset_trx_id, DATA_TRX_ID_LEN); + dfield_set_data(db_trx_id + 1, reset_trx_id + DATA_TRX_ID_LEN, + DATA_ROLL_PTR_LEN); + } + + const bool is_instant = index->online_log->is_instant(index); + rec_comp_status_t status = is_instant + ? REC_STATUS_INSTANT : REC_STATUS_ORDINARY; + + size = rec_get_converted_size_temp<true>( + index, tuple->fields, tuple->n_fields, &extra_size, status); + if (is_instant) { + size++; + extra_size++; + } + + mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80); + + if (insert || index->online_log->same_pk) { + ut_ad(!old_pk); + old_pk_extra_size = old_pk_size = 0; + } else { + ut_ad(old_pk); + ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp); + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + + old_pk_size = rec_get_converted_size_temp<false>( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; + } + + if (byte* b = row_log_table_open(index->online_log, + mrec_size, &avail_size)) { + if (insert) { + *b++ = ROW_T_INSERT; + } else { + *b++ = ROW_T_UPDATE; + + if (old_pk_size) { + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp<false>( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + b += old_pk_size; + } + } + + if (extra_size < 0x80) { + *b++ = static_cast<byte>(extra_size); + } else { + ut_ad(extra_size < 0x8000); + *b++ = static_cast<byte>(0x80 | (extra_size >> 8)); + *b++ = static_cast<byte>(extra_size); + } + + if (status == REC_STATUS_INSTANT) { + ut_ad(is_instant); + if (n_fields <= index->online_log->n_core_fields) { + status = REC_STATUS_ORDINARY; + } + *b = status; + } + + rec_convert_dtuple_to_temp<true>( + b + extra_size, index, tuple->fields, tuple->n_fields, + status); + b += size; + + row_log_table_close(index, b, mrec_size, avail_size); + } + + mem_heap_free(heap); +} + +/******************************************************//** +Logs an insert or update to a table that is being rebuilt. */ +static +void +row_log_table_low( +/*==============*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + bool insert, /*!< in: true if insert, false if update */ + const dtuple_t* old_pk) /*!< in: old PRIMARY KEY value (if !insert + and a PRIMARY KEY is being created) */ +{ + ulint old_pk_size; + ulint old_pk_extra_size; + ulint extra_size; + ulint mrec_size; + ulint avail_size; + const dict_index_t* new_index; + row_log_t* log = index->online_log; + + new_index = dict_table_get_first_index(log->table); + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_clust(new_index)); + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); + ut_ad(rec_offs_size(offsets) <= sizeof log->tail.buf); + ut_ad(index->lock.have_any()); + + /* old_pk=row_log_table_get_pk() [not needed in INSERT] is a prefix + of the clustered index record (PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR), + with no information on virtual columns */ + ut_ad(!old_pk || !insert); + ut_ad(!old_pk || old_pk->n_v_fields == 0); + + if (index->online_status != ONLINE_INDEX_CREATION + || (index->type & DICT_CORRUPT) || index->table->corrupted + || log->error != DB_SUCCESS) { + return; + } + + if (!rec_offs_comp(offsets)) { + row_log_table_low_redundant( + rec, index, insert, old_pk, new_index); + return; + } + + ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY + || rec_get_status(rec) == REC_STATUS_INSTANT); + + const ulint omit_size = REC_N_NEW_EXTRA_BYTES; + + const ulint rec_extra_size = rec_offs_extra_size(offsets) - omit_size; + const bool is_instant = log->is_instant(index); + extra_size = rec_extra_size + is_instant; + + unsigned fake_extra_size = 0; + byte fake_extra_buf[3]; + if (is_instant && UNIV_UNLIKELY(!index->is_instant())) { + /* The source table was emptied after ALTER TABLE + started, and it was converted to non-instant format. + Because row_log_table_apply_op() expects to find + all records to be logged in the same way, we will + be unable to copy the rec_extra_size bytes from the + record header, but must convert them here. */ + unsigned n_add = index->n_fields - 1 - log->n_core_fields; + fake_extra_size = rec_get_n_add_field_len(n_add); + ut_ad(fake_extra_size == 1 || fake_extra_size == 2); + extra_size += fake_extra_size; + byte* fake_extra = fake_extra_buf + fake_extra_size; + rec_set_n_add_field(fake_extra, n_add); + ut_ad(fake_extra == fake_extra_buf); + } + + mrec_size = ROW_LOG_HEADER_SIZE + + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size + + is_instant + fake_extra_size; + + if (insert || log->same_pk) { + ut_ad(!old_pk); + old_pk_extra_size = old_pk_size = 0; + } else { + ut_ad(old_pk); + ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp); + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + + old_pk_size = rec_get_converted_size_temp<false>( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; + } + + if (byte* b = row_log_table_open(log, mrec_size, &avail_size)) { + if (insert) { + *b++ = ROW_T_INSERT; + } else { + *b++ = ROW_T_UPDATE; + + if (old_pk_size) { + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp<false>( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + b += old_pk_size; + } + } + + if (extra_size < 0x80) { + *b++ = static_cast<byte>(extra_size); + } else { + ut_ad(extra_size < 0x8000); + *b++ = static_cast<byte>(0x80 | (extra_size >> 8)); + *b++ = static_cast<byte>(extra_size); + } + + if (is_instant) { + *b++ = fake_extra_size + ? REC_STATUS_INSTANT + : rec_get_status(rec); + } else { + ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY); + } + + memcpy(b, rec - rec_extra_size - omit_size, rec_extra_size); + b += rec_extra_size; + memcpy(b, fake_extra_buf + 1, fake_extra_size); + b += fake_extra_size; + ulint len; + ulint trx_id_offs = rec_get_nth_field_offs( + offsets, index->n_uniq, &len); + ut_ad(len == DATA_TRX_ID_LEN); + memcpy(b, rec, rec_offs_data_size(offsets)); + if (trx_read_trx_id(b + trx_id_offs) < log->min_trx) { + memcpy(b + trx_id_offs, + reset_trx_id, sizeof reset_trx_id); + } + b += rec_offs_data_size(offsets); + + row_log_table_close(index, b, mrec_size, avail_size); + } +} + +/******************************************************//** +Logs an update to a table that is being rebuilt. +This will be merged in row_log_table_apply_update(). */ +void +row_log_table_update( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + const dtuple_t* old_pk) /*!< in: row_log_table_get_pk() + before the update */ +{ + row_log_table_low(rec, index, offsets, false, old_pk); +} + +/** Gets the old table column of a PRIMARY KEY column. +@param table old table (before ALTER TABLE) +@param col_map mapping of old column numbers to new ones +@param col_no column position in the new table +@return old table column, or NULL if this is an added column */ +static +const dict_col_t* +row_log_table_get_pk_old_col( +/*=========================*/ + const dict_table_t* table, + const ulint* col_map, + ulint col_no) +{ + for (ulint i = 0; i < table->n_cols; i++) { + if (col_no == col_map[i]) { + return(dict_table_get_nth_col(table, i)); + } + } + + return(NULL); +} + +/** Maps an old table column of a PRIMARY KEY column. +@param[in] ifield clustered index field in the new table (after +ALTER TABLE) +@param[in] index the clustered index of ifield +@param[in,out] dfield clustered index tuple field in the new table +@param[in,out] heap memory heap for allocating dfield contents +@param[in] rec clustered index leaf page record in the old +table +@param[in] offsets rec_get_offsets(rec) +@param[in] i rec field corresponding to col +@param[in] zip_size ROW_FORMAT=COMPRESSED size of the old table +@param[in] max_len maximum length of dfield +@param[in] log row log for the table +@retval DB_INVALID_NULL if a NULL value is encountered +@retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */ +static +dberr_t +row_log_table_get_pk_col( + const dict_field_t* ifield, + const dict_index_t* index, + dfield_t* dfield, + mem_heap_t* heap, + const rec_t* rec, + const rec_offs* offsets, + ulint i, + ulint zip_size, + ulint max_len, + const row_log_t* log) +{ + const byte* field; + ulint len; + + field = rec_get_nth_field(rec, offsets, i, &len); + + if (len == UNIV_SQL_DEFAULT) { + field = log->instant_field_value(i, &len); + } + + if (len == UNIV_SQL_NULL) { + if (!log->allow_not_null) { + return(DB_INVALID_NULL); + } + + unsigned col_no= ifield->col->ind; + ut_ad(col_no < log->defaults->n_fields); + + field = static_cast<const byte*>( + log->defaults->fields[col_no].data); + if (!field) { + return(DB_INVALID_NULL); + } + len = log->defaults->fields[col_no].len; + } + + if (rec_offs_nth_extern(offsets, i)) { + ulint field_len = ifield->prefix_len; + byte* blob_field; + + if (!field_len) { + field_len = ifield->fixed_len; + if (!field_len) { + field_len = max_len + 1; + } + } + + blob_field = static_cast<byte*>( + mem_heap_alloc(heap, field_len)); + + len = btr_copy_externally_stored_field_prefix( + blob_field, field_len, zip_size, field, len); + if (len >= max_len + 1) { + return(DB_TOO_BIG_INDEX_COL); + } + + dfield_set_data(dfield, blob_field, len); + } else { + dfield_set_data(dfield, mem_heap_dup(heap, field, len), len); + } + + return(DB_SUCCESS); +} + +/******************************************************//** +Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR +of a table that is being rebuilt. +@return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table, +or NULL if the PRIMARY KEY definition does not change */ +const dtuple_t* +row_log_table_get_pk( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + byte* sys, /*!< out: DB_TRX_ID,DB_ROLL_PTR for + row_log_table_delete(), or NULL */ + mem_heap_t** heap) /*!< in/out: memory heap where allocated */ +{ + dtuple_t* tuple = NULL; + row_log_t* log = index->online_log; + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(!offsets || rec_offs_validate(rec, index, offsets)); + ut_ad(index->lock.have_any()); + ut_ad(log); + ut_ad(log->table); + ut_ad(log->min_trx); + + if (log->same_pk) { + /* The PRIMARY KEY columns are unchanged. */ + if (sys) { + /* Store the DB_TRX_ID,DB_ROLL_PTR. */ + ulint trx_id_offs = index->trx_id_offset; + + if (!trx_id_offs) { + ulint len; + + if (!offsets) { + offsets = rec_get_offsets( + rec, index, nullptr, + index->n_core_fields, + index->db_trx_id() + 1, heap); + } + + trx_id_offs = rec_get_nth_field_offs( + offsets, index->db_trx_id(), &len); + ut_ad(len == DATA_TRX_ID_LEN); + } + + const byte* ptr = trx_read_trx_id(rec + trx_id_offs) + < log->min_trx + ? reset_trx_id + : rec + trx_id_offs; + + memcpy(sys, ptr, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + ut_d(trx_id_check(sys, log->min_trx)); + } + + return(NULL); + } + + mysql_mutex_lock(&log->mutex); + + /* log->error is protected by log->mutex. */ + if (log->error == DB_SUCCESS) { + dict_table_t* new_table = log->table; + dict_index_t* new_index + = dict_table_get_first_index(new_table); + const ulint new_n_uniq + = dict_index_get_n_unique(new_index); + + if (!*heap) { + ulint size = 0; + + if (!offsets) { + size += (1 + REC_OFFS_HEADER_SIZE + + unsigned(index->n_fields)) + * sizeof *offsets; + } + + for (ulint i = 0; i < new_n_uniq; i++) { + size += dict_col_get_min_size( + dict_index_get_nth_col(new_index, i)); + } + + *heap = mem_heap_create( + DTUPLE_EST_ALLOC(new_n_uniq + 2) + size); + } + + if (!offsets) { + offsets = rec_get_offsets(rec, index, nullptr, + index->n_core_fields, + ULINT_UNDEFINED, heap); + } + + tuple = dtuple_create(*heap, new_n_uniq + 2); + dict_index_copy_types(tuple, new_index, tuple->n_fields); + dtuple_set_n_fields_cmp(tuple, new_n_uniq); + + const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table); + + const ulint zip_size = index->table->space->zip_size(); + + for (ulint new_i = 0; new_i < new_n_uniq; new_i++) { + dict_field_t* ifield; + dfield_t* dfield; + ulint prtype; + ulint mbminlen, mbmaxlen; + + ifield = dict_index_get_nth_field(new_index, new_i); + dfield = dtuple_get_nth_field(tuple, new_i); + + const ulint col_no + = dict_field_get_col(ifield)->ind; + + if (const dict_col_t* col + = row_log_table_get_pk_old_col( + index->table, log->col_map, col_no)) { + ulint i = dict_col_get_clust_pos(col, index); + + if (i == ULINT_UNDEFINED) { + ut_ad(0); + log->error = DB_CORRUPTION; + goto err_exit; + } + + log->error = row_log_table_get_pk_col( + ifield, new_index, dfield, *heap, + rec, offsets, i, zip_size, max_len, + log); + + if (log->error != DB_SUCCESS) { +err_exit: + tuple = NULL; + goto func_exit; + } + + mbminlen = col->mbminlen; + mbmaxlen = col->mbmaxlen; + prtype = col->prtype; + } else { + /* No matching column was found in the old + table, so this must be an added column. + Copy the default value. */ + ut_ad(log->defaults); + + dfield_copy(dfield, dtuple_get_nth_field( + log->defaults, col_no)); + mbminlen = dfield->type.mbminlen; + mbmaxlen = dfield->type.mbmaxlen; + prtype = dfield->type.prtype; + } + + ut_ad(!dfield_is_ext(dfield)); + ut_ad(!dfield_is_null(dfield)); + + if (ifield->prefix_len) { + ulint len = dtype_get_at_most_n_mbchars( + prtype, mbminlen, mbmaxlen, + ifield->prefix_len, + dfield_get_len(dfield), + static_cast<const char*>( + dfield_get_data(dfield))); + + ut_ad(len <= dfield_get_len(dfield)); + dfield_set_len(dfield, len); + } + } + + const byte* trx_roll = rec + + row_get_trx_id_offset(index, offsets); + + /* Copy the fields, because the fields will be updated + or the record may be moved somewhere else in the B-tree + as part of the upcoming operation. */ + if (trx_read_trx_id(trx_roll) < log->min_trx) { + trx_roll = reset_trx_id; + if (sys) { + memcpy(sys, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + } + } else if (sys) { + memcpy(sys, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + trx_roll = sys; + } else { + trx_roll = static_cast<const byte*>( + mem_heap_dup( + *heap, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)); + } + + ut_d(trx_id_check(trx_roll, log->min_trx)); + + dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq), + trx_roll, DATA_TRX_ID_LEN); + dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1), + trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN); + } + +func_exit: + mysql_mutex_unlock(&log->mutex); + return(tuple); +} + +/******************************************************//** +Logs an insert to a table that is being rebuilt. +This will be merged in row_log_table_apply_insert(). */ +void +row_log_table_insert( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets)/*!< in: rec_get_offsets(rec,index) */ +{ + row_log_table_low(rec, index, offsets, true, NULL); +} + +/******************************************************//** +Converts a log record to a table row. +@return converted row, or NULL if the conversion fails */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +const dtuple_t* +row_log_table_apply_convert_mrec( +/*=============================*/ + const mrec_t* mrec, /*!< in: merge record */ + dict_index_t* index, /*!< in: index of mrec */ + const rec_offs* offsets, /*!< in: offsets of mrec */ + row_log_t* log, /*!< in: rebuild context */ + mem_heap_t* heap, /*!< in/out: memory heap */ + dberr_t* error) /*!< out: DB_SUCCESS or + DB_MISSING_HISTORY or + reason of failure */ +{ + dtuple_t* row; + + log->n_rows++; + *error = DB_SUCCESS; + + /* This is based on row_build(). */ + if (log->defaults) { + row = dtuple_copy(log->defaults, heap); + /* dict_table_copy_types() would set the fields to NULL */ + for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) { + dict_col_copy_type( + dict_table_get_nth_col(log->table, i), + dfield_get_type(dtuple_get_nth_field(row, i))); + } + } else { + row = dtuple_create(heap, dict_table_get_n_cols(log->table)); + dict_table_copy_types(row, log->table); + } + + for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) { + const dict_field_t* ind_field + = dict_index_get_nth_field(index, i); + + if (ind_field->prefix_len) { + /* Column prefixes can only occur in key + fields, which cannot be stored externally. For + a column prefix, there should also be the full + field in the clustered index tuple. The row + tuple comprises full fields, not prefixes. */ + ut_ad(!rec_offs_nth_extern(offsets, i)); + continue; + } + + const dict_col_t* col + = dict_field_get_col(ind_field); + + if (col->is_dropped()) { + /* the column was instantly dropped earlier */ + ut_ad(index->table->instant); + continue; + } + + ulint col_no + = log->col_map[dict_col_get_no(col)]; + + if (col_no == ULINT_UNDEFINED) { + /* the column is being dropped now */ + continue; + } + + dfield_t* dfield + = dtuple_get_nth_field(row, col_no); + + ulint len; + const byte* data; + + if (rec_offs_nth_extern(offsets, i)) { + ut_ad(rec_offs_any_extern(offsets)); + index->lock.x_lock(SRW_LOCK_CALL); + + data = btr_rec_copy_externally_stored_field( + mrec, offsets, + index->table->space->zip_size(), + i, &len, heap); + ut_a(data); + dfield_set_data(dfield, data, len); + + index->lock.x_unlock(); + } else { + data = rec_get_nth_field(mrec, offsets, i, &len); + if (len == UNIV_SQL_DEFAULT) { + data = log->instant_field_value(i, &len); + } + dfield_set_data(dfield, data, len); + } + + if (len != UNIV_SQL_NULL && col->mtype == DATA_MYSQL + && col->len != len && !dict_table_is_comp(log->table)) { + + ut_ad(col->len >= len); + if (dict_table_is_comp(index->table)) { + byte* buf = (byte*) mem_heap_alloc(heap, + col->len); + memcpy(buf, dfield->data, len); + memset(buf + len, 0x20, col->len - len); + + dfield_set_data(dfield, buf, col->len); + } else { + /* field length mismatch should not happen + when rebuilding the redundant row format + table. */ + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + } + } + + /* See if any columns were changed to NULL or NOT NULL. */ + const dict_col_t* new_col + = dict_table_get_nth_col(log->table, col_no); + ut_ad(new_col->same_format(*col)); + + /* Assert that prtype matches except for nullability. */ + ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype) + & ~(DATA_NOT_NULL | DATA_VERSIONED + | CHAR_COLL_MASK << 16 | DATA_LONG_TRUE_VARCHAR))); + + if (new_col->prtype == col->prtype) { + continue; + } + + if ((new_col->prtype & DATA_NOT_NULL) + && dfield_is_null(dfield)) { + + if (!log->allow_not_null) { + /* We got a NULL value for a NOT NULL column. */ + *error = DB_INVALID_NULL; + return NULL; + } + + const dfield_t& default_field + = log->defaults->fields[col_no]; + + Field* field = log->old_table->field[col->ind]; + + field->set_warning(Sql_condition::WARN_LEVEL_WARN, + WARN_DATA_TRUNCATED, 1, + ulong(log->n_rows)); + + *dfield = default_field; + } + + /* Adjust the DATA_NOT_NULL flag in the parsed row. */ + dfield_get_type(dfield)->prtype = new_col->prtype; + + ut_ad(dict_col_type_assert_equal(new_col, + dfield_get_type(dfield))); + } + + return(row); +} + +/******************************************************//** +Replays an insert operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_insert_low( +/*===========================*/ + que_thr_t* thr, /*!< in: query graph */ + const dtuple_t* row, /*!< in: table row + in the old table definition */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup) /*!< in/out: for reporting + duplicate key errors */ +{ + dberr_t error; + dtuple_t* entry; + const row_log_t*log = dup->index->online_log; + dict_index_t* index = dict_table_get_first_index(log->table); + ulint n_index = 0; + + ut_ad(dtuple_validate(row)); + + DBUG_LOG("ib_alter_table", + "insert table " << index->table->id << " (index " + << index->id << "): " << rec_printer(row).str()); + + static const ulint flags + = (BTR_CREATE_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG + | BTR_KEEP_SYS_FLAG); + + entry = row_build_index_entry(row, NULL, index, heap); + + error = row_ins_clust_index_entry_low( + flags, BTR_MODIFY_TREE, index, index->n_uniq, + entry, 0, thr); + + switch (error) { + case DB_SUCCESS: + break; + case DB_SUCCESS_LOCKED_REC: + /* The row had already been copied to the table. */ + return(DB_SUCCESS); + default: + return(error); + } + + ut_ad(dict_index_is_clust(index)); + + for (n_index += index->type != DICT_CLUSTERED; + (index = dict_table_get_next_index(index)); n_index++) { + if (index->type & DICT_FTS) { + continue; + } + + entry = row_build_index_entry(row, NULL, index, heap); + error = row_ins_sec_index_entry_low( + flags, BTR_INSERT_TREE, + index, offsets_heap, heap, entry, + thr_get_trx(thr)->id, thr); + + if (error != DB_SUCCESS) { + if (error == DB_DUPLICATE_KEY) { + thr_get_trx(thr)->error_key_num = n_index; + } + break; + } + } + + return(error); +} + +/******************************************************//** +Replays an insert operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_insert( +/*=======================*/ + que_thr_t* thr, /*!< in: query graph */ + const mrec_t* mrec, /*!< in: record to insert */ + const rec_offs* offsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup) /*!< in/out: for reporting + duplicate key errors */ +{ + row_log_t*log = dup->index->online_log; + dberr_t error; + const dtuple_t* row = row_log_table_apply_convert_mrec( + mrec, dup->index, offsets, log, heap, &error); + + switch (error) { + case DB_SUCCESS: + ut_ad(row != NULL); + break; + default: + ut_ad(0); + /* fall through */ + case DB_INVALID_NULL: + ut_ad(row == NULL); + return(error); + } + + error = row_log_table_apply_insert_low( + thr, row, offsets_heap, heap, dup); + if (error != DB_SUCCESS) { + /* Report the erroneous row using the new + version of the table. */ + innobase_row_to_mysql(dup->table, log->table, row); + } + return(error); +} + +/******************************************************//** +Deletes a record from a table that is being rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_delete_low( +/*===========================*/ + btr_pcur_t* pcur, /*!< in/out: B-tree cursor, + will be trashed */ + const rec_offs* offsets, /*!< in: offsets on pcur */ + mem_heap_t* heap, /*!< in/out: memory heap */ + mtr_t* mtr) /*!< in/out: mini-transaction, + will be committed */ +{ + dberr_t error; + row_ext_t* ext; + dtuple_t* row; + dict_index_t* index = pcur->index(); + + ut_ad(dict_index_is_clust(index)); + + DBUG_LOG("ib_alter_table", + "delete table " << index->table->id << " (index " + << index->id << "): " + << rec_printer(btr_pcur_get_rec(pcur), offsets).str()); + + if (dict_table_get_next_index(index)) { + /* Build a row template for purging secondary index entries. */ + row = row_build( + ROW_COPY_DATA, index, btr_pcur_get_rec(pcur), + offsets, NULL, NULL, NULL, &ext, heap); + } else { + row = NULL; + } + + btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur), + BTR_CREATE_FLAG, false, mtr); + if (error != DB_SUCCESS) { +err_exit: + mtr->commit(); + return error; + } + + mtr->commit(); + + while ((index = dict_table_get_next_index(index)) != NULL) { + if (index->type & DICT_FTS) { + continue; + } + + const dtuple_t* entry = row_build_index_entry( + row, ext, index, heap); + mtr->start(); + index->set_modified(*mtr); + pcur->btr_cur.page_cur.index = index; + error = btr_pcur_open(entry, PAGE_CUR_LE, BTR_PURGE_TREE, pcur, + mtr); + if (error) { + goto err_exit; + } +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + /* We did not request buffering. */ + break; + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + goto flag_ok; + } + ut_ad(0); +flag_ok: +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(pcur)) + || btr_pcur_get_low_match(pcur) < index->n_uniq) { + /* All secondary index entries should be + found, because new_table is being modified by + this thread only, and all indexes should be + updated in sync. */ + mtr->commit(); + return(DB_INDEX_CORRUPT); + } + + btr_cur_pessimistic_delete(&error, FALSE, + btr_pcur_get_btr_cur(pcur), + BTR_CREATE_FLAG, false, mtr); + mtr->commit(); + } + + return(error); +} + +/******************************************************//** +Replays a delete operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_delete( +/*=======================*/ + ulint trx_id_col, /*!< in: position of + DB_TRX_ID in the new + clustered index */ + const mrec_t* mrec, /*!< in: merge record */ + const rec_offs* moffsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + const row_log_t* log) /*!< in: online log */ +{ + dict_table_t* new_table = log->table; + dict_index_t* index = dict_table_get_first_index(new_table); + dtuple_t* old_pk; + mtr_t mtr; + btr_pcur_t pcur; + rec_offs* offsets; + + pcur.btr_cur.page_cur.index = index; + ut_ad(rec_offs_n_fields(moffsets) == index->first_user_field()); + ut_ad(!rec_offs_any_extern(moffsets)); + + /* Convert the row to a search tuple. */ + old_pk = dtuple_create(heap, index->n_uniq); + dict_index_copy_types(old_pk, index, index->n_uniq); + + for (ulint i = 0; i < index->n_uniq; i++) { + ulint len; + const void* field; + field = rec_get_nth_field(mrec, moffsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + dfield_set_data(dtuple_get_nth_field(old_pk, i), + field, len); + } + + mtr_start(&mtr); + index->set_modified(mtr); + dberr_t err = btr_pcur_open(old_pk, PAGE_CUR_LE, BTR_PURGE_TREE, &pcur, + &mtr); + if (err != DB_SUCCESS) { + goto all_done; + } +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(&pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + /* We did not request buffering. */ + break; + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + goto flag_ok; + } + ut_ad(0); +flag_ok: +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(&pcur)) + || btr_pcur_get_low_match(&pcur) < index->n_uniq) { +all_done: + mtr_commit(&mtr); + /* The record was not found. All done. */ + /* This should only happen when an earlier + ROW_T_INSERT was skipped or + ROW_T_UPDATE was interpreted as ROW_T_DELETE + due to BLOBs having been freed by rollback. */ + return err; + } + + offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, nullptr, + index->n_core_fields, + ULINT_UNDEFINED, &offsets_heap); +#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG + ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets)); +#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ + + /* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */ + + { + ulint len; + const byte* mrec_trx_id + = rec_get_nth_field(mrec, moffsets, trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + const byte* rec_trx_id + = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets, + trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + ut_d(trx_id_check(rec_trx_id, log->min_trx)); + ut_d(trx_id_check(mrec_trx_id, log->min_trx)); + + ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len) + == mrec_trx_id + DATA_TRX_ID_LEN); + ut_ad(len == DATA_ROLL_PTR_LEN); + ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets, + trx_id_col + 1, &len) + == rec_trx_id + DATA_TRX_ID_LEN); + ut_ad(len == DATA_ROLL_PTR_LEN); + + if (memcmp(mrec_trx_id, rec_trx_id, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { + /* The ROW_T_DELETE was logged for a different + PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR. + This is possible if a ROW_T_INSERT was skipped + or a ROW_T_UPDATE was interpreted as ROW_T_DELETE + because some BLOBs were missing due to + (1) rolling back the initial insert, or + (2) purging the BLOB for a later ROW_T_DELETE + (3) purging 'old values' for a later ROW_T_UPDATE + or ROW_T_DELETE. */ + ut_ad(!log->same_pk); + goto all_done; + } + } + + return row_log_table_apply_delete_low(&pcur, offsets, heap, &mtr); +} + +/******************************************************//** +Replays an update operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_update( +/*=======================*/ + que_thr_t* thr, /*!< in: query graph */ + ulint new_trx_id_col, /*!< in: position of + DB_TRX_ID in the new + clustered index */ + const mrec_t* mrec, /*!< in: new value */ + const rec_offs* offsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup, /*!< in/out: for reporting + duplicate key errors */ + const dtuple_t* old_pk) /*!< in: PRIMARY KEY and + DB_TRX_ID,DB_ROLL_PTR + of the old value, + or PRIMARY KEY if same_pk */ +{ + row_log_t* log = dup->index->online_log; + const dtuple_t* row; + dict_index_t* index = dict_table_get_first_index(log->table); + mtr_t mtr; + btr_pcur_t pcur; + dberr_t error; + ulint n_index = 0; + + pcur.btr_cur.page_cur.index = index; + + ut_ad(dtuple_get_n_fields_cmp(old_pk) + == dict_index_get_n_unique(index)); + ut_ad(dtuple_get_n_fields(old_pk) - (log->same_pk ? 0 : 2) + == dict_index_get_n_unique(index)); + + row = row_log_table_apply_convert_mrec( + mrec, dup->index, offsets, log, heap, &error); + + switch (error) { + case DB_SUCCESS: + ut_ad(row != NULL); + break; + default: + ut_ad(0); + /* fall through */ + case DB_INVALID_NULL: + ut_ad(row == NULL); + return(error); + } + + mtr.start(); + index->set_modified(mtr); + error = btr_pcur_open(old_pk, PAGE_CUR_LE, BTR_MODIFY_TREE, &pcur, + &mtr); + if (error != DB_SUCCESS) { +func_exit: + mtr.commit(); +func_exit_committed: + ut_ad(mtr.has_committed()); + ut_free(pcur.old_rec_buf); + + if (error != DB_SUCCESS) { + /* Report the erroneous row using the new + version of the table. */ + innobase_row_to_mysql(dup->table, log->table, row); + } + + return error; + } +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(&pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + ut_ad(0);/* We did not request buffering. */ + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + break; + } +#endif /* UNIV_DEBUG */ + + ut_ad(!page_rec_is_infimum(btr_pcur_get_rec(&pcur)) + && btr_pcur_get_low_match(&pcur) >= index->n_uniq); + + /* Prepare to update (or delete) the record. */ + rec_offs* cur_offsets = rec_get_offsets( + btr_pcur_get_rec(&pcur), index, nullptr, index->n_core_fields, + ULINT_UNDEFINED, &offsets_heap); + +#ifdef UNIV_DEBUG + if (!log->same_pk) { + ulint len; + const byte* rec_trx_id + = rec_get_nth_field(btr_pcur_get_rec(&pcur), + cur_offsets, index->n_uniq, &len); + const dfield_t* old_pk_trx_id + = dtuple_get_nth_field(old_pk, index->n_uniq); + ut_ad(len == DATA_TRX_ID_LEN); + ut_d(trx_id_check(rec_trx_id, log->min_trx)); + ut_ad(old_pk_trx_id->len == DATA_TRX_ID_LEN); + ut_ad(old_pk_trx_id[1].len == DATA_ROLL_PTR_LEN); + ut_ad(DATA_TRX_ID_LEN + + static_cast<const char*>(old_pk_trx_id->data) + == old_pk_trx_id[1].data); + ut_d(trx_id_check(old_pk_trx_id->data, log->min_trx)); + ut_ad(!memcmp(rec_trx_id, old_pk_trx_id->data, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)); + } +#endif + + dtuple_t* entry = row_build_index_entry_low( + row, NULL, index, heap, ROW_BUILD_NORMAL); + upd_t* update = row_upd_build_difference_binary( + index, entry, btr_pcur_get_rec(&pcur), cur_offsets, + false, false, NULL, heap, dup->table, &error); + if (error != DB_SUCCESS || !update->n_fields) { + goto func_exit; + } + + const bool pk_updated + = upd_get_nth_field(update, 0)->field_no < new_trx_id_col; + + if (pk_updated || rec_offs_any_extern(cur_offsets)) { + /* If the record contains any externally stored + columns, perform the update by delete and insert, + because we will not write any undo log that would + allow purge to free any orphaned externally stored + columns. */ + + if (pk_updated && log->same_pk) { + /* The ROW_T_UPDATE log record should only be + written when the PRIMARY KEY fields of the + record did not change in the old table. We + can only get a change of PRIMARY KEY columns + in the rebuilt table if the PRIMARY KEY was + redefined (!same_pk). */ + ut_ad(0); + error = DB_CORRUPTION; + goto func_exit; + } + + error = row_log_table_apply_delete_low( + &pcur, cur_offsets, heap, &mtr); + ut_ad(mtr.has_committed()); + + if (error == DB_SUCCESS) { + error = row_log_table_apply_insert_low( + thr, row, offsets_heap, heap, dup); + } + + goto func_exit_committed; + } + + dtuple_t* old_row; + row_ext_t* old_ext; + + if (dict_table_get_next_index(index)) { + /* Construct the row corresponding to the old value of + the record. */ + old_row = row_build( + ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur), + cur_offsets, NULL, NULL, NULL, &old_ext, heap); + ut_ad(old_row); + + DBUG_LOG("ib_alter_table", + "update table " << index->table->id + << " (index " << index->id + << ": " << rec_printer(old_row).str() + << " to " << rec_printer(row).str()); + } else { + old_row = NULL; + old_ext = NULL; + } + + big_rec_t* big_rec; + + error = btr_cur_pessimistic_update( + BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG + | BTR_KEEP_POS_FLAG, + btr_pcur_get_btr_cur(&pcur), + &cur_offsets, &offsets_heap, heap, &big_rec, + update, 0, thr, 0, &mtr); + + if (big_rec) { + if (error == DB_SUCCESS) { + error = btr_store_big_rec_extern_fields( + &pcur, cur_offsets, big_rec, &mtr, + BTR_STORE_UPDATE); + } + + dtuple_big_rec_free(big_rec); + } + + for (n_index += index->type != DICT_CLUSTERED; + (index = dict_table_get_next_index(index)); n_index++) { + if (!index->is_btree()) { + continue; + } + + if (error != DB_SUCCESS) { + break; + } + + if (!row_upd_changes_ord_field_binary( + index, update, thr, old_row, NULL)) { + continue; + } + + if (dict_index_has_virtual(index)) { + dtuple_copy_v_fields(old_row, old_pk); + } + + mtr.commit(); + + entry = row_build_index_entry(old_row, old_ext, index, heap); + if (!entry) { + ut_ad(0); + error = DB_CORRUPTION; + goto func_exit_committed; + } + + mtr.start(); + index->set_modified(mtr); + pcur.btr_cur.page_cur.index = index; + + ut_free(pcur.old_rec_buf); + pcur.old_rec_buf = nullptr; + + if (ROW_FOUND != row_search_index_entry( + entry, BTR_MODIFY_TREE, &pcur, &mtr)) { + ut_ad(0); + error = DB_CORRUPTION; + break; + } + + btr_cur_pessimistic_delete( + &error, FALSE, btr_pcur_get_btr_cur(&pcur), + BTR_CREATE_FLAG, false, &mtr); + + if (error != DB_SUCCESS) { + break; + } + + mtr.commit(); + + entry = row_build_index_entry(row, NULL, index, heap); + error = row_ins_sec_index_entry_low( + BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG, + BTR_INSERT_TREE, index, offsets_heap, heap, + entry, thr_get_trx(thr)->id, thr); + + /* Report correct index name for duplicate key error. */ + if (error == DB_DUPLICATE_KEY) { + thr_get_trx(thr)->error_key_num = n_index; + } + + mtr.start(); + index->set_modified(mtr); + } + + goto func_exit; +} + +/******************************************************//** +Applies an operation to a table that was rebuilt. +@return NULL on failure (mrec corruption) or when out of data; +pointer to next record on success */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +const mrec_t* +row_log_table_apply_op( +/*===================*/ + que_thr_t* thr, /*!< in: query graph */ + ulint new_trx_id_col, /*!< in: position of + DB_TRX_ID in new index */ + row_merge_dup_t* dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS + or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + const mrec_t* mrec, /*!< in: merge record */ + const mrec_t* mrec_end, /*!< in: end of buffer */ + rec_offs* offsets) /*!< in/out: work area + for parsing mrec */ +{ + row_log_t* log = dup->index->online_log; + dict_index_t* new_index = dict_table_get_first_index(log->table); + ulint extra_size; + const mrec_t* next_mrec; + dtuple_t* old_pk; + + ut_ad(dict_index_is_clust(dup->index)); + ut_ad(dup->index->table != log->table); + ut_ad(log->head.total <= log->tail.total); + + *error = DB_SUCCESS; + + /* 3 = 1 (op type) + 1 (extra_size) + at least 1 byte payload */ + if (mrec + 3 >= mrec_end) { + return(NULL); + } + + const bool is_instant = log->is_instant(dup->index); + const mrec_t* const mrec_start = mrec; + + switch (*mrec++) { + default: + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + case ROW_T_INSERT: + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + ut_ad(extra_size || !is_instant); + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets, + log->n_core_fields, log->non_core_fields, + is_instant + ? static_cast<rec_comp_status_t>( + *(mrec - extra_size)) + : REC_STATUS_ORDINARY); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } else { + log->head.total += ulint(next_mrec - mrec_start); + *error = row_log_table_apply_insert( + thr, mrec, offsets, offsets_heap, + heap, dup); + } + break; + + case ROW_T_DELETE: + extra_size = *mrec++; + ut_ad(mrec < mrec_end); + + /* We assume extra_size < 0x100 for the PRIMARY KEY prefix. + For fixed-length PRIMARY key columns, it is 0. */ + mrec += extra_size; + + /* The ROW_T_DELETE record was converted by + rec_convert_dtuple_to_temp() using new_index. */ + ut_ad(!new_index->is_instant()); + rec_offs_set_n_fields(offsets, new_index->first_user_field()); + rec_init_offsets_temp(mrec, new_index, offsets); + next_mrec = mrec + rec_offs_data_size(offsets); + if (next_mrec > mrec_end) { + return(NULL); + } + + log->head.total += ulint(next_mrec - mrec_start); + + *error = row_log_table_apply_delete( + new_trx_id_col, + mrec, offsets, offsets_heap, heap, log); + break; + + case ROW_T_UPDATE: + /* Logically, the log entry consists of the + (PRIMARY KEY,DB_TRX_ID) of the old value (converted + to the new primary key definition) followed by + the new value in the old table definition. If the + definition of the columns belonging to PRIMARY KEY + is not changed, the log will only contain + DB_TRX_ID,new_row. */ + + if (log->same_pk) { + ut_ad(new_index->n_uniq == dup->index->n_uniq); + + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + ut_ad(extra_size || !is_instant); + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets, + log->n_core_fields, + log->non_core_fields, + is_instant + ? static_cast<rec_comp_status_t>( + *(mrec - extra_size)) + : REC_STATUS_ORDINARY); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } + + old_pk = dtuple_create(heap, new_index->n_uniq); + dict_index_copy_types( + old_pk, new_index, old_pk->n_fields); + + /* Copy the PRIMARY KEY fields from mrec to old_pk. */ + for (ulint i = 0; i < new_index->n_uniq; i++) { + const void* field; + ulint len; + dfield_t* dfield; + + ut_ad(!rec_offs_nth_extern(offsets, i)); + + field = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + + dfield = dtuple_get_nth_field(old_pk, i); + dfield_set_data(dfield, field, len); + } + } else { + /* We assume extra_size < 0x100 + for the PRIMARY KEY prefix. */ + mrec += *mrec + 1; + + if (mrec > mrec_end) { + return(NULL); + } + + /* Get offsets for PRIMARY KEY, + DB_TRX_ID, DB_ROLL_PTR. */ + /* The old_pk prefix was converted by + rec_convert_dtuple_to_temp() using new_index. */ + ut_ad(!new_index->is_instant()); + rec_offs_set_n_fields(offsets, + new_index->first_user_field()); + rec_init_offsets_temp(mrec, new_index, offsets); + + next_mrec = mrec + rec_offs_data_size(offsets); + if (next_mrec + 2 > mrec_end) { + return(NULL); + } + + /* Copy the PRIMARY KEY fields and + DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */ + old_pk = dtuple_create(heap, + new_index->first_user_field()); + dict_index_copy_types(old_pk, new_index, + old_pk->n_fields); + + for (ulint i = 0; i < new_index->first_user_field(); + i++) { + const void* field; + ulint len; + dfield_t* dfield; + + ut_ad(!rec_offs_nth_extern(offsets, i)); + + field = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + + dfield = dtuple_get_nth_field(old_pk, i); + dfield_set_data(dfield, field, len); + } + + mrec = next_mrec; + + /* Fetch the new value of the row as it was + in the old table definition. */ + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + ut_ad(extra_size || !is_instant); + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets, + log->n_core_fields, + log->non_core_fields, + is_instant + ? static_cast<rec_comp_status_t>( + *(mrec - extra_size)) + : REC_STATUS_ORDINARY); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } + } + + ut_ad(next_mrec <= mrec_end); + log->head.total += ulint(next_mrec - mrec_start); + dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq); + + *error = row_log_table_apply_update( + thr, new_trx_id_col, + mrec, offsets, offsets_heap, heap, dup, old_pk); + break; + } + + ut_ad(log->head.total <= log->tail.total); + mem_heap_empty(offsets_heap); + mem_heap_empty(heap); + return(next_mrec); +} + +#ifdef HAVE_PSI_STAGE_INTERFACE +/** Estimate how much an ALTER TABLE progress should be incremented per +one block of log applied. +For the other phases of ALTER TABLE we increment the progress with 1 per +page processed. +@return amount of abstract units to add to work_completed when one block +of log is applied. +*/ +inline +ulint +row_log_progress_inc_per_block() +{ + /* We must increment the progress once per page (as in + srv_page_size, default = innodb_page_size=16KiB). + One block here is srv_sort_buf_size (usually 1MiB). */ + const ulint pages_per_block = std::max<ulint>( + ulint(srv_sort_buf_size >> srv_page_size_shift), 1); + + /* Multiply by an artificial factor of 6 to even the pace with + the rest of the ALTER TABLE phases, they process page_size amount + of data faster. */ + return(pages_per_block * 6); +} + +/** Estimate how much work is to be done by the log apply phase +of an ALTER TABLE for this index. +@param[in] index index whose log to assess +@return work to be done by log-apply in abstract units +*/ +ulint +row_log_estimate_work( + const dict_index_t* index) +{ + if (index == NULL || index->online_log == NULL + || index->online_log_is_dummy()) { + return(0); + } + + const row_log_t* l = index->online_log; + const ulint bytes_left = + static_cast<ulint>(l->tail.total - l->head.total); + const ulint blocks_left = bytes_left / srv_sort_buf_size; + + return(blocks_left * row_log_progress_inc_per_block()); +} +#else /* HAVE_PSI_STAGE_INTERFACE */ +inline +ulint +row_log_progress_inc_per_block() +{ + return(0); +} +#endif /* HAVE_PSI_STAGE_INTERFACE */ + +/** Applies operations to a table was rebuilt. +@param[in] thr query graph +@param[in,out] dup for reporting duplicate key errors +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL, then stage->inc() will be called for each block +of log that is applied. +@return DB_SUCCESS, or error code on failure */ +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +row_log_table_apply_ops( + que_thr_t* thr, + row_merge_dup_t* dup, + ut_stage_alter_t* stage) +{ + dberr_t error; + const mrec_t* mrec = NULL; + const mrec_t* next_mrec; + const mrec_t* mrec_end = NULL; /* silence bogus warning */ + const mrec_t* next_mrec_end; + mem_heap_t* heap; + mem_heap_t* offsets_heap; + rec_offs* offsets; + bool has_index_lock; + dict_index_t* index = const_cast<dict_index_t*>( + dup->index); + dict_table_t* new_table = index->online_log->table; + dict_index_t* new_index = dict_table_get_first_index( + new_table); + const ulint i = 1 + REC_OFFS_HEADER_SIZE + + std::max<ulint>(index->n_fields, + new_index->first_user_field()); + const ulint new_trx_id_col = dict_col_get_clust_pos( + dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index); + trx_t* trx = thr_get_trx(thr); + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(trx->mysql_thd); + ut_ad(index->lock.have_x()); + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(dict_col_get_clust_pos( + dict_table_get_sys_col(index->table, DATA_TRX_ID), index) + != ULINT_UNDEFINED); + ut_ad(new_trx_id_col > 0); + ut_ad(new_trx_id_col != ULINT_UNDEFINED); + + MEM_UNDEFINED(&mrec_end, sizeof mrec_end); + + offsets = static_cast<rec_offs*>(ut_malloc_nokey(i * sizeof *offsets)); + rec_offs_set_n_alloc(offsets, i); + rec_offs_set_n_fields(offsets, dict_index_get_n_fields(index)); + + heap = mem_heap_create(srv_page_size); + offsets_heap = mem_heap_create(srv_page_size); + has_index_lock = true; + +next_block: + ut_ad(has_index_lock); + ut_ad(index->lock.have_u_or_x()); + ut_ad(index->online_log->head.bytes == 0); + + stage->inc(row_log_progress_inc_per_block()); + + if (trx_is_interrupted(trx)) { + goto interrupted; + } + + if (index->is_corrupted()) { + error = DB_INDEX_CORRUPT; + goto func_exit; + } + + ut_ad(dict_index_is_online_ddl(index)); + + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (UNIV_UNLIKELY(index->online_log->head.blocks + > index->online_log->tail.blocks)) { +unexpected_eof: + ib::error() << "Unexpected end of temporary file for table " + << index->table->name; +corruption: + error = DB_CORRUPTION; + goto func_exit; + } + + if (index->online_log->head.blocks + == index->online_log->tail.blocks) { + if (index->online_log->head.blocks) { +#ifdef HAVE_FTRUNCATE + /* Truncate the file in order to save space. */ + if (index->online_log->fd > 0 + && ftruncate(index->online_log->fd, 0) == -1) { + ib::error() + << "\'" << index->name + 1 + << "\' failed with error " + << errno << ":" << strerror(errno); + + goto corruption; + } +#endif /* HAVE_FTRUNCATE */ + index->online_log->head.blocks + = index->online_log->tail.blocks = 0; + } + + next_mrec = index->online_log->tail.block; + next_mrec_end = next_mrec + index->online_log->tail.bytes; + + if (next_mrec_end == next_mrec) { + /* End of log reached. */ +all_done: + ut_ad(has_index_lock); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + index->online_log->head.bytes = 0; + index->online_log->tail.bytes = 0; + error = DB_SUCCESS; + goto func_exit; + } + } else { + os_offset_t ofs; + + ofs = (os_offset_t) index->online_log->head.blocks + * srv_sort_buf_size; + + ut_ad(has_index_lock); + has_index_lock = false; + index->lock.x_unlock(); + + log_free_check(); + + ut_ad(dict_index_is_online_ddl(index)); + + if (!row_log_block_allocate(index->online_log->head)) { + error = DB_OUT_OF_MEMORY; + goto func_exit; + } + + byte* buf = index->online_log->head.block; + + if (DB_SUCCESS + != os_file_read(IORequestRead, index->online_log->fd, + buf, ofs, srv_sort_buf_size, nullptr)) { + ib::error() + << "Unable to read temporary file" + " for table " << index->table->name; + goto corruption; + } + + if (srv_encrypt_log) { + if (!log_tmp_block_decrypt( + buf, srv_sort_buf_size, + index->online_log->crypt_head, ofs)) { + error = DB_DECRYPTION_FAILED; + goto func_exit; + } + + srv_stats.n_rowlog_blocks_decrypted.inc(); + memcpy(buf, index->online_log->crypt_head, + srv_sort_buf_size); + } + +#ifdef POSIX_FADV_DONTNEED + /* Each block is read exactly once. Free up the file cache. */ + posix_fadvise(index->online_log->fd, + ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ + + next_mrec = index->online_log->head.block; + next_mrec_end = next_mrec + srv_sort_buf_size; + } + + /* This read is not protected by index->online_log->mutex for + performance reasons. We will eventually notice any error that + was flagged by a DML thread. */ + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (mrec) { + /* A partial record was read from the previous block. + Copy the temporary buffer full, as we do not know the + length of the record. Parse subsequent records from + the bigger buffer index->online_log->head.block + or index->online_log->tail.block. */ + + ut_ad(mrec == index->online_log->head.buf); + ut_ad(mrec_end > mrec); + ut_ad(mrec_end < (&index->online_log->head.buf)[1]); + + memcpy((mrec_t*) mrec_end, next_mrec, + ulint((&index->online_log->head.buf)[1] - mrec_end)); + mrec = row_log_table_apply_op( + thr, new_trx_id_col, + dup, &error, offsets_heap, heap, + index->online_log->head.buf, + (&index->online_log->head.buf)[1], offsets); + if (error != DB_SUCCESS) { + goto func_exit; + } else if (UNIV_UNLIKELY(mrec == NULL)) { + /* The record was not reassembled properly. */ + goto corruption; + } + /* The record was previously found out to be + truncated. Now that the parse buffer was extended, + it should proceed beyond the old end of the buffer. */ + ut_a(mrec > mrec_end); + + index->online_log->head.bytes = ulint(mrec - mrec_end); + next_mrec += index->online_log->head.bytes; + } + + ut_ad(next_mrec <= next_mrec_end); + /* The following loop must not be parsing the temporary + buffer, but head.block or tail.block. */ + + /* mrec!=NULL means that the next record starts from the + middle of the block */ + ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0)); + +#ifdef UNIV_DEBUG + if (next_mrec_end == index->online_log->head.block + + srv_sort_buf_size) { + /* If tail.bytes == 0, next_mrec_end can also be at + the end of tail.block. */ + if (index->online_log->tail.bytes == 0) { + ut_ad(next_mrec == next_mrec_end); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes == 0); + } else { + ut_ad(next_mrec == index->online_log->head.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks + > index->online_log->head.blocks); + } + } else if (next_mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes) { + ut_ad(next_mrec == index->online_log->tail.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes + <= index->online_log->tail.bytes); + } else { + ut_error; + } +#endif /* UNIV_DEBUG */ + + mrec_end = next_mrec_end; + + while (!trx_is_interrupted(trx)) { + mrec = next_mrec; + ut_ad(mrec <= mrec_end); + + if (mrec == mrec_end) { + /* We are at the end of the log. + Mark the replay all_done. */ + if (has_index_lock) { + goto all_done; + } + } + + if (!has_index_lock) { + /* We are applying operations from a different + block than the one that is being written to. + We do not hold index->lock in order to + allow other threads to concurrently buffer + modifications. */ + ut_ad(mrec >= index->online_log->head.block); + ut_ad(mrec_end == index->online_log->head.block + + srv_sort_buf_size); + ut_ad(index->online_log->head.bytes + < srv_sort_buf_size); + + /* Take the opportunity to do a redo log + checkpoint if needed. */ + log_free_check(); + } else { + /* We are applying operations from the last block. + Do not allow other threads to buffer anything, + so that we can finally catch up and synchronize. */ + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(mrec >= index->online_log->tail.block); + } + + /* This read is not protected by index->online_log->mutex + for performance reasons. We will eventually notice any + error that was flagged by a DML thread. */ + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + next_mrec = row_log_table_apply_op( + thr, new_trx_id_col, + dup, &error, offsets_heap, heap, + mrec, mrec_end, offsets); + + if (error != DB_SUCCESS) { + goto func_exit; + } else if (next_mrec == next_mrec_end) { + /* The record happened to end on a block boundary. + Do we have more blocks left? */ + if (has_index_lock) { + /* The index will be locked while + applying the last block. */ + goto all_done; + } + + mrec = NULL; +process_next_block: + index->lock.x_lock(SRW_LOCK_CALL); + has_index_lock = true; + + index->online_log->head.bytes = 0; + index->online_log->head.blocks++; + goto next_block; + } else if (next_mrec != NULL) { + ut_ad(next_mrec < next_mrec_end); + index->online_log->head.bytes + += ulint(next_mrec - mrec); + } else if (has_index_lock) { + /* When mrec is within tail.block, it should + be a complete record, because we are holding + index->lock and thus excluding the writer. */ + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(0); + goto unexpected_eof; + } else { + memcpy(index->online_log->head.buf, mrec, + ulint(mrec_end - mrec)); + mrec_end += ulint(index->online_log->head.buf - mrec); + mrec = index->online_log->head.buf; + goto process_next_block; + } + } + +interrupted: + error = DB_INTERRUPTED; +func_exit: + if (!has_index_lock) { + index->lock.x_lock(SRW_LOCK_CALL); + } + + mem_heap_free(offsets_heap); + mem_heap_free(heap); + row_log_block_free(index->online_log->head); + ut_free(offsets); + return(error); +} + +/** Apply the row_log_table log to a table upon completing rebuild. +@param[in] thr query graph +@param[in] old_table old table +@param[in,out] table MySQL table (for reporting duplicates) +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->begin_phase_log_table() will be called initially and then +stage->inc() will be called for each block of log that is applied. +@param[in] new_table Altered table +@return DB_SUCCESS, or error code on failure */ +dberr_t +row_log_table_apply( + que_thr_t* thr, + dict_table_t* old_table, + struct TABLE* table, + ut_stage_alter_t* stage, + dict_table_t* new_table) +{ + dberr_t error; + dict_index_t* clust_index; + + thr_get_trx(thr)->error_key_num = 0; + DBUG_EXECUTE_IF("innodb_trx_duplicates", + thr_get_trx(thr)->duplicates = TRX_DUP_REPLACE;); + + stage->begin_phase_log_table(); + + clust_index = dict_table_get_first_index(old_table); + + if (clust_index->online_log->n_rows == 0) { + clust_index->online_log->n_rows = new_table->stat_n_rows; + } + + clust_index->lock.x_lock(SRW_LOCK_CALL); + + if (!clust_index->online_log) { + ut_ad(dict_index_get_online_status(clust_index) + == ONLINE_INDEX_COMPLETE); + /* This function should not be called unless + rebuilding a table online. Build in some fault + tolerance. */ + ut_ad(0); + error = DB_ERROR; + } else { + row_merge_dup_t dup = { + clust_index, table, + clust_index->online_log->col_map, 0 + }; + + error = row_log_table_apply_ops(thr, &dup, stage); + + ut_ad(error != DB_SUCCESS + || clust_index->online_log->head.total + == clust_index->online_log->tail.total); + } + + clust_index->lock.x_unlock(); + DBUG_EXECUTE_IF("innodb_trx_duplicates", + thr_get_trx(thr)->duplicates = 0;); + + return(error); +} + +/******************************************************//** +Allocate the row log for an index and flag the index +for online creation. +@retval true if success, false if not */ +bool +row_log_allocate( +/*=============*/ + const trx_t* trx, /*!< in: the ALTER TABLE transaction */ + dict_index_t* index, /*!< in/out: index */ + dict_table_t* table, /*!< in/out: new table being rebuilt, + or NULL when creating a secondary index */ + bool same_pk,/*!< in: whether the definition of the + PRIMARY KEY has remained the same */ + const dtuple_t* defaults, + /*!< in: default values of + added, changed columns, or NULL */ + const ulint* col_map,/*!< in: mapping of old column + numbers to new ones, or NULL if !table */ + const char* path, /*!< in: where to create temporary file */ + const TABLE* old_table, /*!< in: table definition before alter */ + const bool allow_not_null) /*!< in: allow null to not-null + conversion */ +{ + row_log_t* log; + DBUG_ENTER("row_log_allocate"); + + ut_ad(!dict_index_is_online_ddl(index)); + ut_ad(dict_index_is_clust(index) == !!table); + ut_ad(!table || index->table != table); + ut_ad(same_pk || table); + ut_ad(!table || col_map); + ut_ad(!defaults || col_map); + ut_ad(index->lock.have_u_or_x()); + ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE)); + ut_ad(trx->id); + + log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log)); + + if (log == NULL) { + DBUG_RETURN(false); + } + + log->fd = OS_FILE_CLOSED; + mysql_mutex_init(index_online_log_key, &log->mutex, nullptr); + + log->table = table; + log->same_pk = same_pk; + log->defaults = defaults; + log->col_map = col_map; + log->error = DB_SUCCESS; + log->min_trx = trx->id; + log->max_trx = 0; + log->tail.blocks = log->tail.bytes = 0; + log->tail.total = 0; + log->tail.block = log->head.block = NULL; + log->crypt_tail = log->crypt_head = NULL; + log->head.blocks = log->head.bytes = 0; + log->head.total = 0; + log->path = path; + log->n_core_fields = index->n_core_fields; + ut_ad(!table || log->is_instant(index) + == (index->n_core_fields < index->n_fields)); + log->allow_not_null = allow_not_null; + log->old_table = old_table; + log->n_rows = 0; + + if (table && index->is_instant()) { + const unsigned n = log->n_core_fields; + log->non_core_fields = UT_NEW_ARRAY_NOKEY( + dict_col_t::def_t, index->n_fields - n); + for (unsigned i = n; i < index->n_fields; i++) { + log->non_core_fields[i - n] + = index->fields[i].col->def_val; + } + } else { + log->non_core_fields = NULL; + } + + dict_index_set_online_status(index, ONLINE_INDEX_CREATION); + + if (srv_encrypt_log) { + log->crypt_head_size = log->crypt_tail_size = srv_sort_buf_size; + log->crypt_head = static_cast<byte *>( + my_large_malloc(&log->crypt_head_size, MYF(MY_WME))); + log->crypt_tail = static_cast<byte *>( + my_large_malloc(&log->crypt_tail_size, MYF(MY_WME))); + + if (!log->crypt_head || !log->crypt_tail) { + row_log_free(log); + DBUG_RETURN(false); + } + } + + index->online_log = log; + + if (!table) { + /* Assign the clustered index online log to table. + It can be used by concurrent DML to identify whether + the table has any online DDL */ + index->table->indexes.start->online_log_make_dummy(); + log->alter_trx = trx; + } + + /* While we might be holding an exclusive data dictionary lock + here, in row_log_abort_sec() we will not always be holding it. Use + atomic operations in both cases. */ + MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX); + + DBUG_RETURN(true); +} + +/******************************************************//** +Free the row log for an index that was being created online. */ +void +row_log_free( +/*=========*/ + row_log_t* log) /*!< in,own: row log */ +{ + MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX); + + UT_DELETE_ARRAY(log->non_core_fields); + row_log_block_free(log->tail); + row_log_block_free(log->head); + row_merge_file_destroy_low(log->fd); + + if (log->crypt_head) { + my_large_free(log->crypt_head, log->crypt_head_size); + } + + if (log->crypt_tail) { + my_large_free(log->crypt_tail, log->crypt_tail_size); + } + + mysql_mutex_destroy(&log->mutex); + ut_free(log); +} + +/******************************************************//** +Get the latest transaction ID that has invoked row_log_online_op() +during online creation. +@return latest transaction ID, or 0 if nothing was logged */ +trx_id_t +row_log_get_max_trx( +/*================*/ + dict_index_t* index) /*!< in: index, must be locked */ +{ + ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION); +#ifdef SAFE_MUTEX + ut_ad(index->lock.have_x() + || (index->lock.have_s() + && mysql_mutex_is_owner(&index->online_log->mutex))); +#endif + return(index->online_log->max_trx); +} + +/******************************************************//** +Applies an operation to a secondary index that was being created. */ +static MY_ATTRIBUTE((nonnull)) +void +row_log_apply_op_low( +/*=================*/ + dict_index_t* index, /*!< in/out: index */ + row_merge_dup_t*dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap for + allocating offsets; can be emptied */ + bool has_index_lock, /*!< in: true if holding index->lock + in exclusive mode */ + enum row_op op, /*!< in: operation being applied */ + trx_id_t trx_id, /*!< in: transaction identifier */ + const dtuple_t* entry) /*!< in: row */ +{ + mtr_t mtr; + btr_cur_t cursor; + rec_offs* offsets = NULL; + + ut_ad(!dict_index_is_clust(index)); + + ut_ad(index->lock.have_x() == has_index_lock); + + ut_ad(!index->is_corrupted()); + ut_ad(trx_id != 0 || op == ROW_OP_DELETE); + + DBUG_LOG("ib_create_index", + (op == ROW_OP_INSERT ? "insert " : "delete ") + << (has_index_lock ? "locked index " : "unlocked index ") + << index->id << ',' << ib::hex(trx_id) << ": " + << rec_printer(entry).str()); + + mtr_start(&mtr); + index->set_modified(mtr); + cursor.page_cur.index = index; + if (has_index_lock) { + mtr_x_lock_index(index, &mtr); + } + + /* We perform the pessimistic variant of the operations if we + already hold index->lock exclusively. First, search the + record. The operation may already have been performed, + depending on when the row in the clustered index was + scanned. */ + *error = cursor.search_leaf(entry, PAGE_CUR_LE, has_index_lock + ? BTR_MODIFY_TREE_ALREADY_LATCHED + : BTR_MODIFY_LEAF, &mtr); + if (UNIV_UNLIKELY(*error != DB_SUCCESS)) { + goto func_exit; + } + + ut_ad(dict_index_get_n_unique(index) > 0); + /* This test is somewhat similar to row_ins_must_modify_rec(), + but not identical for unique secondary indexes. */ + if (cursor.low_match >= dict_index_get_n_unique(index) + && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) { + /* We have a matching record. */ + bool exists = (cursor.low_match + == dict_index_get_n_fields(index)); +#ifdef UNIV_DEBUG + rec_t* rec = btr_cur_get_rec(&cursor); + ut_ad(page_rec_is_user_rec(rec)); + ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec))); +#endif /* UNIV_DEBUG */ + + ut_ad(exists || dict_index_is_unique(index)); + + switch (op) { + case ROW_OP_DELETE: + if (!exists) { + /* The existing record matches the + unique secondary index key, but the + PRIMARY KEY columns differ. So, this + exact record does not exist. For + example, we could detect a duplicate + key error in some old index before + logging an ROW_OP_INSERT for our + index. This ROW_OP_DELETE could have + been logged for rolling back + TRX_UNDO_INSERT_REC. */ + goto func_exit; + } + + *error = btr_cur_optimistic_delete( + &cursor, BTR_CREATE_FLAG, &mtr); + + if (*error != DB_FAIL) { + break; + } + + if (!has_index_lock) { + /* This needs a pessimistic operation. + Lock the index tree exclusively. */ + mtr_commit(&mtr); + mtr_start(&mtr); + index->set_modified(mtr); + *error = cursor.search_leaf(entry, PAGE_CUR_LE, + BTR_MODIFY_TREE, + &mtr); + if (UNIV_UNLIKELY(*error != DB_SUCCESS)) { + goto func_exit; + } + /* No other thread than the current one + is allowed to modify the index tree. + Thus, the record should still exist. */ + ut_ad(cursor.low_match + >= dict_index_get_n_fields(index)); + ut_ad(page_rec_is_user_rec( + btr_cur_get_rec(&cursor))); + } + + /* As there are no externally stored fields in + a secondary index record, the parameter + rollback=false will be ignored. */ + + btr_cur_pessimistic_delete( + error, FALSE, &cursor, + BTR_CREATE_FLAG, false, &mtr); + break; + case ROW_OP_INSERT: + if (exists) { + /* The record already exists. There + is nothing to be inserted. + This could happen when processing + TRX_UNDO_DEL_MARK_REC in statement + rollback: + + UPDATE of PRIMARY KEY can lead to + statement rollback if the updated + value of the PRIMARY KEY already + exists. In this case, the UPDATE would + be mapped to DELETE;INSERT, and we + only wrote undo log for the DELETE + part. The duplicate key error would be + triggered before logging the INSERT + part. + + Theoretically, we could also get a + similar situation when a DELETE operation + is blocked by a FOREIGN KEY constraint. */ + goto func_exit; + } + + if (dtuple_contains_null(entry)) { + /* The UNIQUE KEY columns match, but + there is a NULL value in the key, and + NULL!=NULL. */ + goto insert_the_rec; + } + + goto duplicate; + } + } else { + switch (op) { + rec_t* rec; + big_rec_t* big_rec; + case ROW_OP_DELETE: + /* The record does not exist. For example, we + could detect a duplicate key error in some old + index before logging an ROW_OP_INSERT for our + index. This ROW_OP_DELETE could be logged for + rolling back TRX_UNDO_INSERT_REC. */ + goto func_exit; + case ROW_OP_INSERT: + if (dict_index_is_unique(index) + && (cursor.up_match + >= dict_index_get_n_unique(index) + || cursor.low_match + >= dict_index_get_n_unique(index)) + && (!index->n_nullable + || !dtuple_contains_null(entry))) { +duplicate: + /* Duplicate key */ + ut_ad(dict_index_is_unique(index)); + row_merge_dup_report(dup, entry->fields); + *error = DB_DUPLICATE_KEY; + goto func_exit; + } +insert_the_rec: + /* Insert the record. As we are inserting into + a secondary index, there cannot be externally + stored columns (!big_rec). */ + *error = btr_cur_optimistic_insert( + BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_CREATE_FLAG, + &cursor, &offsets, &offsets_heap, + const_cast<dtuple_t*>(entry), + &rec, &big_rec, 0, NULL, &mtr); + ut_ad(!big_rec); + if (*error != DB_FAIL) { + break; + } + + if (!has_index_lock) { + /* This needs a pessimistic operation. + Lock the index tree exclusively. */ + mtr_commit(&mtr); + mtr_start(&mtr); + index->set_modified(mtr); + *error = cursor.search_leaf(entry, PAGE_CUR_LE, + BTR_MODIFY_TREE, + &mtr); + if (*error != DB_SUCCESS) { + break; + } + } + + /* We already determined that the + record did not exist. No other thread + than the current one is allowed to + modify the index tree. Thus, the + record should still not exist. */ + + *error = btr_cur_pessimistic_insert( + BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_CREATE_FLAG, + &cursor, &offsets, &offsets_heap, + const_cast<dtuple_t*>(entry), + &rec, &big_rec, + 0, NULL, &mtr); + ut_ad(!big_rec); + break; + } + mem_heap_empty(offsets_heap); + } + + if (*error == DB_SUCCESS && trx_id) { + page_update_max_trx_id(btr_cur_get_block(&cursor), + btr_cur_get_page_zip(&cursor), + trx_id, &mtr); + } + +func_exit: + mtr_commit(&mtr); +} + +/******************************************************//** +Applies an operation to a secondary index that was being created. +@return NULL on failure (mrec corruption) or when out of data; +pointer to next record on success */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +const mrec_t* +row_log_apply_op( +/*=============*/ + dict_index_t* index, /*!< in/out: index */ + row_merge_dup_t*dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap for + allocating offsets; can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap for + allocating data tuples */ + bool has_index_lock, /*!< in: true if holding index->lock + in exclusive mode */ + const mrec_t* mrec, /*!< in: merge record */ + const mrec_t* mrec_end, /*!< in: end of buffer */ + rec_offs* offsets) /*!< in/out: work area for + rec_init_offsets_temp() */ + +{ + enum row_op op; + ulint extra_size; + ulint data_size; + dtuple_t* entry; + trx_id_t trx_id; + + /* Online index creation is only used for secondary indexes. */ + ut_ad(!dict_index_is_clust(index)); + + ut_ad(index->lock.have_x() == has_index_lock); + + if (index->is_corrupted()) { + *error = DB_INDEX_CORRUPT; + return(NULL); + } + + *error = DB_SUCCESS; + + if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) { + return(NULL); + } + + switch (*mrec) { + case ROW_OP_INSERT: + if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) { + return(NULL); + } + + op = static_cast<enum row_op>(*mrec++); + trx_id = trx_read_trx_id(mrec); + mrec += DATA_TRX_ID_LEN; + break; + case ROW_OP_DELETE: + op = static_cast<enum row_op>(*mrec++); + trx_id = 0; + break; + default: +corrupted: + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + } + + extra_size = *mrec++; + + ut_ad(mrec < mrec_end); + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + if (mrec > mrec_end) { + return(NULL); + } + + rec_init_offsets_temp(mrec, index, offsets); + + if (rec_offs_any_extern(offsets)) { + /* There should never be any externally stored fields + in a secondary index, which is what online index + creation is used for. Therefore, the log file must be + corrupted. */ + goto corrupted; + } + + data_size = rec_offs_data_size(offsets); + + mrec += data_size; + + if (mrec > mrec_end) { + return(NULL); + } + + entry = row_rec_to_index_entry_low( + mrec - data_size, index, offsets, heap); + /* Online index creation is only implemented for secondary + indexes, which never contain off-page columns. */ + ut_ad(dtuple_get_n_ext(entry) == 0); + + row_log_apply_op_low(index, dup, error, offsets_heap, + has_index_lock, op, trx_id, entry); + return(mrec); +} + +/** Applies operations to a secondary index that was being created. +@param[in] trx transaction (for checking if the operation was +interrupted) +@param[in,out] index index +@param[in,out] dup for reporting duplicate key errors +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL, then stage->inc() will be called for each block +of log that is applied or nullptr when row log applied done by DML +thread. +@return DB_SUCCESS, or error code on failure */ +static +dberr_t +row_log_apply_ops( + const trx_t* trx, + dict_index_t* index, + row_merge_dup_t* dup, + ut_stage_alter_t* stage) +{ + dberr_t error; + const mrec_t* mrec = NULL; + const mrec_t* next_mrec; + const mrec_t* mrec_end= NULL; /* silence bogus warning */ + const mrec_t* next_mrec_end; + mem_heap_t* offsets_heap; + mem_heap_t* heap; + rec_offs* offsets; + bool has_index_lock; + const ulint i = 1 + REC_OFFS_HEADER_SIZE + + dict_index_get_n_fields(index); + + ut_ad(dict_index_is_online_ddl(index) + || (index->online_log + && index->online_status == ONLINE_INDEX_COMPLETE)); + ut_ad(!index->is_committed()); + ut_ad(index->lock.have_x()); + ut_ad(index->online_log); + + MEM_UNDEFINED(&mrec_end, sizeof mrec_end); + + offsets = static_cast<rec_offs*>(ut_malloc_nokey(i * sizeof *offsets)); + rec_offs_set_n_alloc(offsets, i); + rec_offs_set_n_fields(offsets, dict_index_get_n_fields(index)); + + offsets_heap = mem_heap_create(srv_page_size); + heap = mem_heap_create(srv_page_size); + has_index_lock = true; + +next_block: + ut_ad(has_index_lock); + ut_ad(index->lock.have_x()); + ut_ad(index->online_log->head.bytes == 0); + + if (stage) { + stage->inc(row_log_progress_inc_per_block()); + } + + if (trx_is_interrupted(trx)) { + goto interrupted; + } + + error = index->online_log->error; + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (index->is_corrupted()) { + error = DB_INDEX_CORRUPT; + goto func_exit; + } + + if (UNIV_UNLIKELY(index->online_log->head.blocks + > index->online_log->tail.blocks)) { +unexpected_eof: + ib::error() << "Unexpected end of temporary file for index " + << index->name; +corruption: + error = DB_CORRUPTION; + goto func_exit; + } + + if (index->online_log->head.blocks + == index->online_log->tail.blocks) { + if (index->online_log->head.blocks) { +#ifdef HAVE_FTRUNCATE + /* Truncate the file in order to save space. */ + if (index->online_log->fd > 0 + && ftruncate(index->online_log->fd, 0) == -1) { + ib::error() + << "\'" << index->name + 1 + << "\' failed with error " + << errno << ":" << strerror(errno); + + goto corruption; + } +#endif /* HAVE_FTRUNCATE */ + index->online_log->head.blocks + = index->online_log->tail.blocks = 0; + } + + next_mrec = index->online_log->tail.block; + next_mrec_end = next_mrec + index->online_log->tail.bytes; + + if (next_mrec_end == next_mrec) { + /* End of log reached. */ +all_done: + ut_ad(has_index_lock); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + index->online_log->tail.bytes = 0; + index->online_log->head.bytes = 0; + error = DB_SUCCESS; + goto func_exit; + } + } else { + os_offset_t ofs = static_cast<os_offset_t>( + index->online_log->head.blocks) + * srv_sort_buf_size; + ut_ad(has_index_lock); + has_index_lock = false; + index->lock.x_unlock(); + + log_free_check(); + + if (!row_log_block_allocate(index->online_log->head)) { + error = DB_OUT_OF_MEMORY; + goto func_exit; + } + + byte* buf = index->online_log->head.block; + + if (DB_SUCCESS + != os_file_read(IORequestRead, index->online_log->fd, + buf, ofs, srv_sort_buf_size, nullptr)) { + ib::error() + << "Unable to read temporary file" + " for index " << index->name; + goto corruption; + } + + if (srv_encrypt_log) { + if (!log_tmp_block_decrypt( + buf, srv_sort_buf_size, + index->online_log->crypt_head, ofs)) { + error = DB_DECRYPTION_FAILED; + goto func_exit; + } + + srv_stats.n_rowlog_blocks_decrypted.inc(); + memcpy(buf, index->online_log->crypt_head, srv_sort_buf_size); + } + +#ifdef POSIX_FADV_DONTNEED + /* Each block is read exactly once. Free up the file cache. */ + posix_fadvise(index->online_log->fd, + ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ + + next_mrec = index->online_log->head.block; + next_mrec_end = next_mrec + srv_sort_buf_size; + } + + if (mrec) { + /* A partial record was read from the previous block. + Copy the temporary buffer full, as we do not know the + length of the record. Parse subsequent records from + the bigger buffer index->online_log->head.block + or index->online_log->tail.block. */ + + ut_ad(mrec == index->online_log->head.buf); + ut_ad(mrec_end > mrec); + ut_ad(mrec_end < (&index->online_log->head.buf)[1]); + + memcpy((mrec_t*) mrec_end, next_mrec, + ulint((&index->online_log->head.buf)[1] - mrec_end)); + mrec = row_log_apply_op( + index, dup, &error, offsets_heap, heap, + has_index_lock, index->online_log->head.buf, + (&index->online_log->head.buf)[1], offsets); + if (error != DB_SUCCESS) { + goto func_exit; + } else if (UNIV_UNLIKELY(mrec == NULL)) { + /* The record was not reassembled properly. */ + goto corruption; + } + /* The record was previously found out to be + truncated. Now that the parse buffer was extended, + it should proceed beyond the old end of the buffer. */ + ut_a(mrec > mrec_end); + + index->online_log->head.bytes = ulint(mrec - mrec_end); + next_mrec += index->online_log->head.bytes; + } + + ut_ad(next_mrec <= next_mrec_end); + /* The following loop must not be parsing the temporary + buffer, but head.block or tail.block. */ + + /* mrec!=NULL means that the next record starts from the + middle of the block */ + ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0)); + +#ifdef UNIV_DEBUG + if (next_mrec_end == index->online_log->head.block + + srv_sort_buf_size) { + /* If tail.bytes == 0, next_mrec_end can also be at + the end of tail.block. */ + if (index->online_log->tail.bytes == 0) { + ut_ad(next_mrec == next_mrec_end); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes == 0); + } else { + ut_ad(next_mrec == index->online_log->head.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks + > index->online_log->head.blocks); + } + } else if (next_mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes) { + ut_ad(next_mrec == index->online_log->tail.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes + <= index->online_log->tail.bytes); + } else { + ut_error; + } +#endif /* UNIV_DEBUG */ + + mrec_end = next_mrec_end; + + while (!trx_is_interrupted(trx)) { + mrec = next_mrec; + ut_ad(mrec < mrec_end); + + if (!has_index_lock) { + /* We are applying operations from a different + block than the one that is being written to. + We do not hold index->lock in order to + allow other threads to concurrently buffer + modifications. */ + ut_ad(mrec >= index->online_log->head.block); + ut_ad(mrec_end == index->online_log->head.block + + srv_sort_buf_size); + ut_ad(index->online_log->head.bytes + < srv_sort_buf_size); + + /* Take the opportunity to do a redo log + checkpoint if needed. */ + log_free_check(); + } else { + /* We are applying operations from the last block. + Do not allow other threads to buffer anything, + so that we can finally catch up and synchronize. */ + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(mrec >= index->online_log->tail.block); + } + + next_mrec = row_log_apply_op( + index, dup, &error, offsets_heap, heap, + has_index_lock, mrec, mrec_end, offsets); + + if (error != DB_SUCCESS) { + goto func_exit; + } else if (next_mrec == next_mrec_end) { + /* The record happened to end on a block boundary. + Do we have more blocks left? */ + if (has_index_lock) { + /* The index will be locked while + applying the last block. */ + goto all_done; + } + + mrec = NULL; +process_next_block: + index->lock.x_lock(SRW_LOCK_CALL); + has_index_lock = true; + + index->online_log->head.bytes = 0; + index->online_log->head.blocks++; + goto next_block; + } else if (next_mrec != NULL) { + ut_ad(next_mrec < next_mrec_end); + index->online_log->head.bytes + += ulint(next_mrec - mrec); + } else if (has_index_lock) { + /* When mrec is within tail.block, it should + be a complete record, because we are holding + index->lock and thus excluding the writer. */ + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(0); + goto unexpected_eof; + } else { + memcpy(index->online_log->head.buf, mrec, + ulint(mrec_end - mrec)); + mrec_end += ulint(index->online_log->head.buf - mrec); + mrec = index->online_log->head.buf; + goto process_next_block; + } + } + +interrupted: + error = DB_INTERRUPTED; +func_exit: + if (!has_index_lock) { + index->lock.x_lock(SRW_LOCK_CALL); + } + + switch (error) { + case DB_SUCCESS: + break; + case DB_INDEX_CORRUPT: + if (((os_offset_t) index->online_log->tail.blocks + 1) + * srv_sort_buf_size >= srv_online_max_size) { + /* The log file grew too big. */ + error = DB_ONLINE_LOG_TOO_BIG; + } + /* fall through */ + default: + index->type |= DICT_CORRUPT; + } + + mem_heap_free(heap); + mem_heap_free(offsets_heap); + row_log_block_free(index->online_log->head); + ut_free(offsets); + return(error); +} + +/** Apply the row log to the index upon completing index creation. +@param[in] trx transaction (for checking if the operation was +interrupted) +@param[in,out] index secondary index +@param[in,out] table MySQL table (for reporting duplicates) +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->begin_phase_log_index() will be called initially and then +stage->inc() will be called for each block of log that is applied or nullptr +when row log has been applied by DML thread. +@return DB_SUCCESS, or error code on failure */ +dberr_t +row_log_apply( + const trx_t* trx, + dict_index_t* index, + struct TABLE* table, + ut_stage_alter_t* stage) +{ + dberr_t error; + row_merge_dup_t dup = { index, table, NULL, 0 }; + DBUG_ENTER("row_log_apply"); + + ut_ad(dict_index_is_online_ddl(index) + || (index->online_log + && index->online_status == ONLINE_INDEX_COMPLETE)); + ut_ad(!dict_index_is_clust(index)); + + if (stage) { + stage->begin_phase_log_index(); + } + + log_free_check(); + + index->lock.x_lock(SRW_LOCK_CALL); + + if (index->online_log && !index->table->corrupted) { + error = row_log_apply_ops(trx, index, &dup, stage); + } else { + error = DB_SUCCESS; + } + + if (error != DB_SUCCESS) { + ut_ad(index->table->space); + index->type |= DICT_CORRUPT; + index->table->drop_aborted = TRUE; + + dict_index_set_online_status(index, ONLINE_INDEX_ABORTED); + } else if (stage) { + /* Mark the index as completed only when it is + being called by DDL thread */ + ut_ad(dup.n_dup == 0); + dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE); + } + + index->lock.x_unlock(); + + DBUG_RETURN(error); +} + +unsigned row_log_get_n_core_fields(const dict_index_t *index) +{ + ut_ad(index->online_log); + return index->online_log->n_core_fields; +} + +dberr_t row_log_get_error(const dict_index_t *index) +{ + ut_ad(index->online_log); + return index->online_log->error; +} + +dberr_t dict_table_t::clear(que_thr_t *thr) +{ + dberr_t err= DB_SUCCESS; + for (dict_index_t *index= UT_LIST_GET_FIRST(indexes); index; + index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + + switch (dict_index_get_online_status(index)) { + case ONLINE_INDEX_ABORTED: + case ONLINE_INDEX_ABORTED_DROPPED: + continue; + case ONLINE_INDEX_COMPLETE: + break; + case ONLINE_INDEX_CREATION: + ut_ad("invalid type" == 0); + MY_ASSERT_UNREACHABLE(); + break; + } + if (dberr_t err_index= index->clear(thr)) + err= err_index; + } + return err; +} + +inline bool UndorecApplier::is_same(roll_ptr_t roll_ptr) const +{ + return uint16_t(roll_ptr) == offset && + uint32_t(roll_ptr >> 16) == page_id.page_no(); +} + +const rec_t * +UndorecApplier::get_old_rec(const dtuple_t &tuple, dict_index_t *index, + const rec_t **clust_rec, rec_offs **offsets) +{ + ut_ad(index->is_primary()); + btr_pcur_t pcur; + + bool found= row_search_on_row_ref(&pcur, BTR_MODIFY_LEAF, + index->table, &tuple, &mtr); + ut_a(found); + *clust_rec= btr_pcur_get_rec(&pcur); + + ulint len= 0; + rec_t *prev_version; + const rec_t *version= *clust_rec; + do + { + *offsets= rec_get_offsets(version, index, *offsets, + index->n_core_fields, ULINT_UNDEFINED, + &heap); + roll_ptr_t roll_ptr= trx_read_roll_ptr( + rec_get_nth_field(version, *offsets, index->db_roll_ptr(), &len)); + ut_ad(len == DATA_ROLL_PTR_LEN); + if (is_same(roll_ptr)) + return version; + trx_undo_prev_version_build(version, index, *offsets, heap, &prev_version, + nullptr, nullptr, 0); + version= prev_version; + } + while (version); + + return nullptr; +} + +/** Clear out all online log of other online indexes after +encountering the error during row_log_apply() in DML thread +@param table table which does online DDL */ +static void row_log_mark_other_online_index_abort(dict_table_t *table) +{ + dict_index_t *clust_index= dict_table_get_first_index(table); + for (dict_index_t *index= dict_table_get_next_index(clust_index); + index; index= dict_table_get_next_index(index)) + { + if (index->online_log && + index->online_status <= ONLINE_INDEX_CREATION && + !index->is_corrupted()) + { + index->lock.x_lock(SRW_LOCK_CALL); + row_log_abort_sec(index); + index->type|= DICT_CORRUPT; + index->lock.x_unlock(); + MONITOR_ATOMIC_INC(MONITOR_BACKGROUND_DROP_INDEX); + } + } + + clust_index->lock.x_lock(SRW_LOCK_CALL); + clust_index->online_log= nullptr; + clust_index->lock.x_unlock(); + table->drop_aborted= TRUE; +} + +void dtype_t::assign(const dict_col_t &col) +{ + prtype= col.prtype; + mtype= col.mtype; + len= col.len; + mbminlen= col.mbminlen; + mbmaxlen= col.mbmaxlen; +} + +inline void dtuple_t::copy_field_types(const dict_index_t &index) +{ + ut_ad(index.n_fields == n_fields); + if (UNIV_LIKELY_NULL(index.change_col_info)) + for (ulint i= 0; i < n_fields; i++) + fields[i].type.assign(*index.fields[i].col); +} + +void UndorecApplier::log_insert(const dtuple_t &tuple, + dict_index_t *clust_index) +{ + DEBUG_SYNC_C("row_log_insert_handle"); + ut_ad(clust_index->is_primary()); + rec_offs offsets_[REC_OFFS_NORMAL_SIZE]; + rec_offs *offsets= offsets_; + + rec_offs_init(offsets_); + mtr.start(); + const rec_t *rec; + const rec_t *match_rec= get_old_rec(tuple, clust_index, &rec, &offsets); + if (!match_rec) + { + mtr.commit(); + return; + } + const rec_t *copy_rec= match_rec; + if (match_rec == rec) + { + copy_rec= rec_copy(mem_heap_alloc( + heap, rec_offs_size(offsets)), match_rec, offsets); + rec_offs_make_valid(copy_rec, clust_index, true, offsets); + } + mtr.commit(); + + dict_table_t *table= clust_index->table; + clust_index->lock.s_lock(SRW_LOCK_CALL); + if (clust_index->online_log && + !clust_index->online_log_is_dummy() && + clust_index->online_status <= ONLINE_INDEX_CREATION) + { + row_log_table_insert(copy_rec, clust_index, offsets); + clust_index->lock.s_unlock(); + } + else + { + clust_index->lock.s_unlock(); + row_ext_t *ext; + dtuple_t *row= row_build(ROW_COPY_POINTERS, clust_index, + copy_rec, offsets, table, nullptr, nullptr, &ext, heap); + + if (table->n_v_cols) + { + /* Update the row with virtual column values present + in the undo log or update vector */ + if (type == TRX_UNDO_UPD_DEL_REC) + row_upd_replace_vcol(row, table, update, false, nullptr, + (cmpl_info & UPD_NODE_NO_ORD_CHANGE) + ? nullptr : undo_rec); + else + trx_undo_read_v_cols(table, undo_rec, row, false); + } + + bool success= true; + for (dict_index_t *index= clust_index; + (index= dict_table_get_next_index(index)) != nullptr; ) + { + index->lock.s_lock(SRW_LOCK_CALL); + if (index->online_log && + index->online_status <= ONLINE_INDEX_CREATION && + !index->is_corrupted()) + { + dtuple_t *entry= row_build_index_entry_low(row, ext, index, + heap, ROW_BUILD_NORMAL); + entry->copy_field_types(*index); + success= row_log_online_op(index, entry, trx_id); + } + + index->lock.s_unlock(); + if (!success) + { + row_log_mark_other_online_index_abort(index->table); + return; + } + } + } +} + +void UndorecApplier::log_update(const dtuple_t &tuple, + dict_index_t *clust_index) +{ + rec_offs offsets_[REC_OFFS_NORMAL_SIZE]; + rec_offs offsets2_[REC_OFFS_NORMAL_SIZE]; + rec_offs *offsets= offsets_; + rec_offs *prev_offsets= offsets2_; + + rec_offs_init(offsets_); + rec_offs_init(offsets2_); + + dict_table_t *table= clust_index->table; + + clust_index->lock.s_lock(SRW_LOCK_CALL); + bool table_rebuild= + (clust_index->online_log + && !clust_index->online_log_is_dummy() + && clust_index->online_status <= ONLINE_INDEX_CREATION); + clust_index->lock.s_unlock(); + + mtr.start(); + const rec_t *rec; + rec_t *prev_version; + bool is_update= (type == TRX_UNDO_UPD_EXIST_REC); + const rec_t *match_rec= get_old_rec(tuple, clust_index, &rec, &offsets); + if (!match_rec) + { + mtr.commit(); + return; + } + + if (table_rebuild) + { + const rec_t *copy_rec= match_rec; + if (match_rec == rec) + copy_rec= rec_copy(mem_heap_alloc( + heap, rec_offs_size(offsets)), match_rec, offsets); + trx_undo_prev_version_build(match_rec, clust_index, offsets, heap, + &prev_version, nullptr, nullptr, 0); + + prev_offsets= rec_get_offsets(prev_version, clust_index, prev_offsets, + clust_index->n_core_fields, + ULINT_UNDEFINED, &heap); + rec_offs_make_valid(copy_rec, clust_index, true, offsets); + mtr.commit(); + + clust_index->lock.s_lock(SRW_LOCK_CALL); + /* Recheck whether clustered index online log has been cleared */ + if (clust_index->online_log) + { + if (is_update) + { + const dtuple_t *rebuilt_old_pk= row_log_table_get_pk( + prev_version, clust_index, prev_offsets, nullptr, &heap); + row_log_table_update(copy_rec, clust_index, offsets, rebuilt_old_pk); + } + else + row_log_table_delete(prev_version, clust_index, prev_offsets, nullptr); + } + clust_index->lock.s_unlock(); + return; + } + + dtuple_t *row= nullptr; + row_ext_t *new_ext; + if (match_rec != rec) + row= row_build(ROW_COPY_POINTERS, clust_index, match_rec, offsets, + clust_index->table, NULL, NULL, &new_ext, heap); + else + row= row_build(ROW_COPY_DATA, clust_index, rec, offsets, + clust_index->table, NULL, NULL, &new_ext, heap); + mtr.commit(); + row_ext_t *old_ext; + dtuple_t *old_row= nullptr; + if (!(this->cmpl_info & UPD_NODE_NO_ORD_CHANGE)) + { + for (ulint i = 0; i < dict_table_get_n_v_cols(table); i++) + dfield_get_type( + dtuple_get_nth_v_field(row, i))->mtype = DATA_MISSING; + } + + if (is_update) + { + old_row= dtuple_copy(row, heap); + row_upd_replace(old_row, &old_ext, clust_index, update, heap); + } + + if (table->n_v_cols) + row_upd_replace_vcol(row, table, update, false, nullptr, + (cmpl_info & UPD_NODE_NO_ORD_CHANGE) + ? nullptr : undo_rec); + + bool success= true; + dict_index_t *index= dict_table_get_next_index(clust_index); + while (index) + { + index->lock.s_lock(SRW_LOCK_CALL); + if (index->online_log && + index->online_status <= ONLINE_INDEX_CREATION && + !index->is_corrupted()) + { + if (is_update) + { + /* Ignore the index if the update doesn't affect the index */ + if (!row_upd_changes_ord_field_binary(index, update, + nullptr, + row, new_ext)) + goto next_index; + dtuple_t *old_entry= row_build_index_entry_low( + old_row, old_ext, index, heap, ROW_BUILD_NORMAL); + + old_entry->copy_field_types(*index); + + success= row_log_online_op(index, old_entry, 0); + + dtuple_t *new_entry= row_build_index_entry_low( + row, new_ext, index, heap, ROW_BUILD_NORMAL); + + new_entry->copy_field_types(*index); + + if (success) + success= row_log_online_op(index, new_entry, trx_id); + } + else + { + dtuple_t *old_entry= row_build_index_entry_low( + row, new_ext, index, heap, ROW_BUILD_NORMAL); + + old_entry->copy_field_types(*index); + + success= row_log_online_op(index, old_entry, 0); + } + } +next_index: + index->lock.s_unlock(); + if (!success) + { + row_log_mark_other_online_index_abort(index->table); + return; + } + index= dict_table_get_next_index(index); + } +} + |