diff options
Diffstat (limited to 'storage/innobase/row/row0log.cc')
-rw-r--r-- | storage/innobase/row/row0log.cc | 4053 |
1 files changed, 4053 insertions, 0 deletions
diff --git a/storage/innobase/row/row0log.cc b/storage/innobase/row/row0log.cc new file mode 100644 index 00000000..336b5a27 --- /dev/null +++ b/storage/innobase/row/row0log.cc @@ -0,0 +1,4053 @@ +/***************************************************************************** + +Copyright (c) 2011, 2018, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2017, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file row/row0log.cc +Modification log for online index creation and online table rebuild + +Created 2011-05-26 Marko Makela +*******************************************************/ + +#include "row0log.h" +#include "row0row.h" +#include "row0ins.h" +#include "row0upd.h" +#include "row0merge.h" +#include "row0ext.h" +#include "log0crypt.h" +#include "data0data.h" +#include "que0que.h" +#include "srv0mon.h" +#include "handler0alter.h" +#include "ut0stage.h" +#include "trx0rec.h" + +#include <sql_class.h> +#include <algorithm> +#include <map> + +Atomic_counter<ulint> onlineddl_rowlog_rows; +ulint onlineddl_rowlog_pct_used; +ulint onlineddl_pct_progress; + +/** Table row modification operations during online table rebuild. +Delete-marked records are not copied to the rebuilt table. */ +enum row_tab_op { + /** Insert a record */ + ROW_T_INSERT = 0x41, + /** Update a record in place */ + ROW_T_UPDATE, + /** Delete (purge) a record */ + ROW_T_DELETE +}; + +/** Index record modification operations during online index creation */ +enum row_op { + /** Insert a record */ + ROW_OP_INSERT = 0x61, + /** Delete a record */ + ROW_OP_DELETE +}; + +/** Size of the modification log entry header, in bytes */ +#define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/ + +/** Log block for modifications during online ALTER TABLE */ +struct row_log_buf_t { + byte* block; /*!< file block buffer */ + size_t size; /*!< length of block in bytes */ + ut_new_pfx_t block_pfx; /*!< opaque descriptor of "block". Set + by ut_allocator::allocate_large() and fed to + ut_allocator::deallocate_large(). */ + mrec_buf_t buf; /*!< buffer for accessing a record + that spans two blocks */ + ulint blocks; /*!< current position in blocks */ + ulint bytes; /*!< current position within block */ + ulonglong total; /*!< logical position, in bytes from + the start of the row_log_table log; + 0 for row_log_online_op() and + row_log_apply(). */ +}; + +/** Tracks BLOB allocation during online ALTER TABLE */ +class row_log_table_blob_t { +public: + /** Constructor (declaring a BLOB freed) + @param offset_arg row_log_t::tail::total */ +#ifdef UNIV_DEBUG + row_log_table_blob_t(ulonglong offset_arg) : + old_offset (0), free_offset (offset_arg), + offset (BLOB_FREED) {} +#else /* UNIV_DEBUG */ + row_log_table_blob_t() : + offset (BLOB_FREED) {} +#endif /* UNIV_DEBUG */ + + /** Declare a BLOB freed again. + @param offset_arg row_log_t::tail::total */ +#ifdef UNIV_DEBUG + void blob_free(ulonglong offset_arg) +#else /* UNIV_DEBUG */ + void blob_free() +#endif /* UNIV_DEBUG */ + { + ut_ad(offset < offset_arg); + ut_ad(offset != BLOB_FREED); + ut_d(old_offset = offset); + ut_d(free_offset = offset_arg); + offset = BLOB_FREED; + } + /** Declare a freed BLOB reused. + @param offset_arg row_log_t::tail::total */ + void blob_alloc(ulonglong offset_arg) { + ut_ad(free_offset <= offset_arg); + ut_d(old_offset = offset); + offset = offset_arg; + } + /** Determine if a BLOB was freed at a given log position + @param offset_arg row_log_t::head::total after the log record + @return true if freed */ + bool is_freed(ulonglong offset_arg) const { + /* This is supposed to be the offset at the end of the + current log record. */ + ut_ad(offset_arg > 0); + /* We should never get anywhere close the magic value. */ + ut_ad(offset_arg < BLOB_FREED); + return(offset_arg < offset); + } +private: + /** Magic value for a freed BLOB */ + static const ulonglong BLOB_FREED = ~0ULL; +#ifdef UNIV_DEBUG + /** Old offset, in case a page was freed, reused, freed, ... */ + ulonglong old_offset; + /** Offset of last blob_free() */ + ulonglong free_offset; +#endif /* UNIV_DEBUG */ + /** Byte offset to the log file */ + ulonglong offset; +}; + +/** @brief Map of off-page column page numbers to 0 or log byte offsets. + +If there is no mapping for a page number, it is safe to access. +If a page number maps to 0, it is an off-page column that has been freed. +If a page number maps to a nonzero number, the number is a byte offset +into the index->online_log, indicating that the page is safe to access +when applying log records starting from that offset. */ +typedef std::map< + ulint, + row_log_table_blob_t, + std::less<ulint>, + ut_allocator<std::pair<const ulint, row_log_table_blob_t> > > + page_no_map; + +/** @brief Buffer for logging modifications during online index creation + +All modifications to an index that is being created will be logged by +row_log_online_op() to this buffer. + +All modifications to a table that is being rebuilt will be logged by +row_log_table_delete(), row_log_table_update(), row_log_table_insert() +to this buffer. + +When head.blocks == tail.blocks, the reader will access tail.block +directly. When also head.bytes == tail.bytes, both counts will be +reset to 0 and the file will be truncated. */ +struct row_log_t { + pfs_os_file_t fd; /*!< file descriptor */ + ib_mutex_t mutex; /*!< mutex protecting error, + max_trx and tail */ + page_no_map* blobs; /*!< map of page numbers of off-page columns + that have been freed during table-rebuilding + ALTER TABLE (row_log_table_*); protected by + index->lock X-latch only */ + dict_table_t* table; /*!< table that is being rebuilt, + or NULL when this is a secondary + index that is being created online */ + bool same_pk;/*!< whether the definition of the PRIMARY KEY + has remained the same */ + const dtuple_t* defaults; + /*!< default values of added, changed columns, + or NULL */ + const ulint* col_map;/*!< mapping of old column numbers to + new ones, or NULL if !table */ + dberr_t error; /*!< error that occurred during online + table rebuild */ + /** The transaction ID of the ALTER TABLE transaction. Any + concurrent DML would necessarily be logged with a larger + transaction ID, because ha_innobase::prepare_inplace_alter_table() + acts as a barrier that ensures that any concurrent transaction + that operates on the table would have been started after + ha_innobase::prepare_inplace_alter_table() returns and before + ha_innobase::commit_inplace_alter_table(commit=true) is invoked. + + Due to the nondeterministic nature of purge and due to the + possibility of upgrading from an earlier version of MariaDB + or MySQL, it is possible that row_log_table_low() would be + fed DB_TRX_ID that precedes than min_trx. We must normalize + such references to reset_trx_id[]. */ + trx_id_t min_trx; + trx_id_t max_trx;/*!< biggest observed trx_id in + row_log_online_op(); + protected by mutex and index->lock S-latch, + or by index->lock X-latch only */ + row_log_buf_t tail; /*!< writer context; + protected by mutex and index->lock S-latch, + or by index->lock X-latch only */ + size_t crypt_tail_size; /*!< size of crypt_tail_size*/ + byte* crypt_tail; /*!< writer context; + temporary buffer used in encryption, + decryption or NULL*/ + row_log_buf_t head; /*!< reader context; protected by MDL only; + modifiable by row_log_apply_ops() */ + size_t crypt_head_size; /*!< size of crypt_tail_size*/ + byte* crypt_head; /*!< reader context; + temporary buffer used in encryption, + decryption or NULL */ + const char* path; /*!< where to create temporary file during + log operation */ + /** the number of core fields in the clustered index of the + source table; before row_log_table_apply() completes, the + table could be emptied, so that table->is_instant() no longer holds, + but all log records must be in the "instant" format. */ + unsigned n_core_fields; + /** the default values of non-core fields when the operation started */ + dict_col_t::def_t* non_core_fields; + bool allow_not_null; /*!< Whether the alter ignore is being + used or if the sql mode is non-strict mode; + if not, NULL values will not be converted to + defaults */ + const TABLE* old_table; /*< Use old table in case of error. */ + + uint64_t n_rows; /*< Number of rows read from the table */ + /** Determine whether the log should be in the 'instant ADD' format + @param[in] index the clustered index of the source table + @return whether to use the 'instant ADD COLUMN' format */ + bool is_instant(const dict_index_t* index) const + { + ut_ad(table); + ut_ad(n_core_fields <= index->n_fields); + return n_core_fields != index->n_fields; + } + + const byte* instant_field_value(ulint n, ulint* len) const + { + ut_ad(n >= n_core_fields); + const dict_col_t::def_t& d= non_core_fields[n - n_core_fields]; + *len = d.len; + return static_cast<const byte*>(d.data); + } +}; + +/** Create the file or online log if it does not exist. +@param[in,out] log online rebuild log +@return true if success, false if not */ +static MY_ATTRIBUTE((warn_unused_result)) +pfs_os_file_t +row_log_tmpfile( + row_log_t* log) +{ + DBUG_ENTER("row_log_tmpfile"); + if (log->fd == OS_FILE_CLOSED) { + log->fd = row_merge_file_create_low(log->path); + DBUG_EXECUTE_IF("row_log_tmpfile_fail", + if (log->fd != OS_FILE_CLOSED) + row_merge_file_destroy_low(log->fd); + log->fd = OS_FILE_CLOSED;); + if (log->fd != OS_FILE_CLOSED) { + MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_LOG_FILES); + } + } + + DBUG_RETURN(log->fd); +} + +/** Allocate the memory for the log buffer. +@param[in,out] log_buf Buffer used for log operation +@return TRUE if success, false if not */ +static MY_ATTRIBUTE((warn_unused_result)) +bool +row_log_block_allocate( + row_log_buf_t& log_buf) +{ + DBUG_ENTER("row_log_block_allocate"); + if (log_buf.block == NULL) { + DBUG_EXECUTE_IF( + "simulate_row_log_allocation_failure", + DBUG_RETURN(false); + ); + + log_buf.block = ut_allocator<byte>(mem_key_row_log_buf) + .allocate_large(srv_sort_buf_size, + &log_buf.block_pfx); + + if (log_buf.block == NULL) { + DBUG_RETURN(false); + } + log_buf.size = srv_sort_buf_size; + } + DBUG_RETURN(true); +} + +/** Free the log buffer. +@param[in,out] log_buf Buffer used for log operation */ +static +void +row_log_block_free( + row_log_buf_t& log_buf) +{ + DBUG_ENTER("row_log_block_free"); + if (log_buf.block != NULL) { + ut_allocator<byte>(mem_key_row_log_buf).deallocate_large( + log_buf.block, &log_buf.block_pfx); + log_buf.block = NULL; + } + DBUG_VOID_RETURN; +} + +/******************************************************//** +Logs an operation to a secondary index that is (or was) being created. */ +void +row_log_online_op( +/*==============*/ + dict_index_t* index, /*!< in/out: index, S or X latched */ + const dtuple_t* tuple, /*!< in: index tuple */ + trx_id_t trx_id) /*!< in: transaction ID for insert, + or 0 for delete */ +{ + byte* b; + ulint extra_size; + ulint size; + ulint mrec_size; + ulint avail_size; + row_log_t* log; + + ut_ad(dtuple_validate(tuple)); + ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index)); + ut_ad(rw_lock_own_flagged(&index->lock, + RW_LOCK_FLAG_X | RW_LOCK_FLAG_S)); + + if (index->is_corrupted()) { + return; + } + + ut_ad(dict_index_is_online_ddl(index)); + + /* Compute the size of the record. This differs from + row_merge_buf_encode(), because here we do not encode + extra_size+1 (and reserve 0 as the end-of-chunk marker). */ + + size = rec_get_converted_size_temp<false>( + index, tuple->fields, tuple->n_fields, &extra_size); + ut_ad(size >= extra_size); + ut_ad(size <= sizeof log->tail.buf); + + mrec_size = ROW_LOG_HEADER_SIZE + + (extra_size >= 0x80) + size + + (trx_id ? DATA_TRX_ID_LEN : 0); + + log = index->online_log; + mutex_enter(&log->mutex); + + if (trx_id > log->max_trx) { + log->max_trx = trx_id; + } + + if (!row_log_block_allocate(log->tail)) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); + + ut_ad(log->tail.bytes < srv_sort_buf_size); + avail_size = srv_sort_buf_size - log->tail.bytes; + + if (mrec_size > avail_size) { + b = log->tail.buf; + } else { + b = log->tail.block + log->tail.bytes; + } + + if (trx_id != 0) { + *b++ = ROW_OP_INSERT; + trx_write_trx_id(b, trx_id); + b += DATA_TRX_ID_LEN; + } else { + *b++ = ROW_OP_DELETE; + } + + if (extra_size < 0x80) { + *b++ = (byte) extra_size; + } else { + ut_ad(extra_size < 0x8000); + *b++ = (byte) (0x80 | (extra_size >> 8)); + *b++ = (byte) extra_size; + } + + rec_convert_dtuple_to_temp<false>( + b + extra_size, index, tuple->fields, tuple->n_fields); + b += size; + + if (mrec_size >= avail_size) { + const os_offset_t byte_offset + = (os_offset_t) log->tail.blocks + * srv_sort_buf_size; + byte* buf = log->tail.block; + + if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { + goto write_failed; + } + + if (mrec_size == avail_size) { + ut_ad(b == &buf[srv_sort_buf_size]); + } else { + ut_ad(b == log->tail.buf + mrec_size); + memcpy(buf + log->tail.bytes, + log->tail.buf, avail_size); + } + + MEM_CHECK_DEFINED(buf, srv_sort_buf_size); + + if (row_log_tmpfile(log) == OS_FILE_CLOSED) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + /* If encryption is enabled encrypt buffer before writing it + to file system. */ + if (log_tmp_is_encrypted()) { + if (!log_tmp_block_encrypt( + buf, srv_sort_buf_size, + log->crypt_tail, byte_offset)) { + log->error = DB_DECRYPTION_FAILED; + goto write_failed; + } + + srv_stats.n_rowlog_blocks_encrypted.inc(); + buf = log->crypt_tail; + } + + log->tail.blocks++; + if (os_file_write( + IORequestWrite, + "(modification log)", + log->fd, + buf, byte_offset, srv_sort_buf_size) + != DB_SUCCESS) { +write_failed: + /* We set the flag directly instead of invoking + dict_set_corrupted_index_cache_only(index) here, + because the index is not "public" yet. */ + index->type |= DICT_CORRUPT; + } + + MEM_UNDEFINED(log->tail.block, srv_sort_buf_size); + MEM_UNDEFINED(buf, srv_sort_buf_size); + + memcpy(log->tail.block, log->tail.buf + avail_size, + mrec_size - avail_size); + log->tail.bytes = mrec_size - avail_size; + } else { + log->tail.bytes += mrec_size; + ut_ad(b == log->tail.block + log->tail.bytes); + } + + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); +err_exit: + mutex_exit(&log->mutex); +} + +/******************************************************//** +Gets the error status of the online index rebuild log. +@return DB_SUCCESS or error code */ +dberr_t +row_log_table_get_error( +/*====================*/ + const dict_index_t* index) /*!< in: clustered index of a table + that is being rebuilt online */ +{ + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + return(index->online_log->error); +} + +/******************************************************//** +Starts logging an operation to a table that is being rebuilt. +@return pointer to log, or NULL if no logging is necessary */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +byte* +row_log_table_open( +/*===============*/ + row_log_t* log, /*!< in/out: online rebuild log */ + ulint size, /*!< in: size of log record */ + ulint* avail) /*!< out: available size for log record */ +{ + mutex_enter(&log->mutex); + + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); + + if (log->error != DB_SUCCESS) { +err_exit: + mutex_exit(&log->mutex); + return(NULL); + } + + if (!row_log_block_allocate(log->tail)) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + ut_ad(log->tail.bytes < srv_sort_buf_size); + *avail = srv_sort_buf_size - log->tail.bytes; + + if (size > *avail) { + /* Make sure log->tail.buf is large enough */ + ut_ad(size <= sizeof log->tail.buf); + return(log->tail.buf); + } else { + return(log->tail.block + log->tail.bytes); + } +} + +/******************************************************//** +Stops logging an operation to a table that is being rebuilt. */ +static MY_ATTRIBUTE((nonnull)) +void +row_log_table_close_func( +/*=====================*/ + dict_index_t* index, /*!< in/out: online rebuilt index */ +#ifdef UNIV_DEBUG + const byte* b, /*!< in: end of log record */ +#endif /* UNIV_DEBUG */ + ulint size, /*!< in: size of log record */ + ulint avail) /*!< in: available size for log record */ +{ + row_log_t* log = index->online_log; + + ut_ad(mutex_own(&log->mutex)); + + if (size >= avail) { + const os_offset_t byte_offset + = (os_offset_t) log->tail.blocks + * srv_sort_buf_size; + byte* buf = log->tail.block; + + if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { + goto write_failed; + } + + if (size == avail) { + ut_ad(b == &buf[srv_sort_buf_size]); + } else { + ut_ad(b == log->tail.buf + size); + memcpy(buf + log->tail.bytes, log->tail.buf, avail); + } + + MEM_CHECK_DEFINED(buf, srv_sort_buf_size); + + if (row_log_tmpfile(log) == OS_FILE_CLOSED) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + /* If encryption is enabled encrypt buffer before writing it + to file system. */ + if (log_tmp_is_encrypted()) { + if (!log_tmp_block_encrypt( + log->tail.block, srv_sort_buf_size, + log->crypt_tail, byte_offset, + index->table->space_id)) { + log->error = DB_DECRYPTION_FAILED; + goto err_exit; + } + + srv_stats.n_rowlog_blocks_encrypted.inc(); + buf = log->crypt_tail; + } + + log->tail.blocks++; + if (os_file_write( + IORequestWrite, + "(modification log)", + log->fd, + buf, byte_offset, srv_sort_buf_size) + != DB_SUCCESS) { +write_failed: + log->error = DB_ONLINE_LOG_TOO_BIG; + } + + MEM_UNDEFINED(log->tail.block, srv_sort_buf_size); + MEM_UNDEFINED(buf, srv_sort_buf_size); + memcpy(log->tail.block, log->tail.buf + avail, size - avail); + log->tail.bytes = size - avail; + } else { + log->tail.bytes += size; + ut_ad(b == log->tail.block + log->tail.bytes); + } + + log->tail.total += size; + MEM_UNDEFINED(log->tail.buf, sizeof log->tail.buf); +err_exit: + mutex_exit(&log->mutex); + + onlineddl_rowlog_rows++; + /* 10000 means 100.00%, 4525 means 45.25% */ + onlineddl_rowlog_pct_used = static_cast<ulint>((log->tail.total * 10000) / srv_online_max_size); +} + +#ifdef UNIV_DEBUG +# define row_log_table_close(index, b, size, avail) \ + row_log_table_close_func(index, b, size, avail) +#else /* UNIV_DEBUG */ +# define row_log_table_close(log, b, size, avail) \ + row_log_table_close_func(index, size, avail) +#endif /* UNIV_DEBUG */ + +/** Check whether a virtual column is indexed in the new table being +created during alter table +@param[in] index cluster index +@param[in] v_no virtual column number +@return true if it is indexed, else false */ +bool +row_log_col_is_indexed( + const dict_index_t* index, + ulint v_no) +{ + return(dict_table_get_nth_v_col( + index->online_log->table, v_no)->m_col.ord_part); +} + +/******************************************************//** +Logs a delete operation to a table that is being rebuilt. +This will be merged in row_log_table_apply_delete(). */ +void +row_log_table_delete( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + const byte* sys) /*!< in: DB_TRX_ID,DB_ROLL_PTR that should + be logged, or NULL to use those in rec */ +{ + ulint old_pk_extra_size; + ulint old_pk_size; + ulint mrec_size; + ulint avail_size; + mem_heap_t* heap = NULL; + const dtuple_t* old_pk; + + ut_ad(dict_index_is_clust(index)); + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); + ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf); + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); + + if (index->online_status != ONLINE_INDEX_CREATION + || (index->type & DICT_CORRUPT) || index->table->corrupted + || index->online_log->error != DB_SUCCESS) { + return; + } + + dict_table_t* new_table = index->online_log->table; + dict_index_t* new_index = dict_table_get_first_index(new_table); + + ut_ad(dict_index_is_clust(new_index)); + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(index->online_log->min_trx); + + /* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */ + if (index->online_log->same_pk) { + dtuple_t* tuple; + ut_ad(new_index->n_uniq == index->n_uniq); + + /* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first + fields of the record. */ + heap = mem_heap_create( + DATA_TRX_ID_LEN + + DTUPLE_EST_ALLOC(new_index->first_user_field())); + old_pk = tuple = dtuple_create(heap, + new_index->first_user_field()); + dict_index_copy_types(tuple, new_index, tuple->n_fields); + dtuple_set_n_fields_cmp(tuple, new_index->n_uniq); + + for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) { + ulint len; + const void* field = rec_get_nth_field( + rec, offsets, i, &len); + dfield_t* dfield = dtuple_get_nth_field( + tuple, i); + ut_ad(len != UNIV_SQL_NULL); + ut_ad(!rec_offs_nth_extern(offsets, i)); + dfield_set_data(dfield, field, len); + } + + dfield_t* db_trx_id = dtuple_get_nth_field( + tuple, new_index->n_uniq); + + const bool replace_sys_fields + = sys + || trx_read_trx_id(static_cast<byte*>(db_trx_id->data)) + < index->online_log->min_trx; + + if (replace_sys_fields) { + if (!sys || trx_read_trx_id(sys) + < index->online_log->min_trx) { + sys = reset_trx_id; + } + + dfield_set_data(db_trx_id, sys, DATA_TRX_ID_LEN); + dfield_set_data(db_trx_id + 1, sys + DATA_TRX_ID_LEN, + DATA_ROLL_PTR_LEN); + } + + ut_d(trx_id_check(db_trx_id->data, + index->online_log->min_trx)); + } else { + /* The PRIMARY KEY has changed. Translate the tuple. */ + old_pk = row_log_table_get_pk( + rec, index, offsets, NULL, &heap); + + if (!old_pk) { + ut_ad(index->online_log->error != DB_SUCCESS); + if (heap) { + goto func_exit; + } + return; + } + } + + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + old_pk_size = rec_get_converted_size_temp<false>( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + + /* 2 = 1 (extra_size) + at least 1 byte payload */ + mrec_size = 2 + old_pk_size; + + if (byte* b = row_log_table_open(index->online_log, + mrec_size, &avail_size)) { + *b++ = ROW_T_DELETE; + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp<false>( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + + b += old_pk_size; + + row_log_table_close(index, b, mrec_size, avail_size); + } + +func_exit: + mem_heap_free(heap); +} + +/******************************************************//** +Logs an insert or update to a table that is being rebuilt. */ +static +void +row_log_table_low_redundant( +/*========================*/ + const rec_t* rec, /*!< in: clustered index leaf + page record in ROW_FORMAT=REDUNDANT, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + bool insert, /*!< in: true if insert, + false if update */ + const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value + (if !insert and a PRIMARY KEY + is being created) */ + const dict_index_t* new_index) + /*!< in: clustered index of the + new table, not latched */ +{ + ulint old_pk_size; + ulint old_pk_extra_size; + ulint size; + ulint extra_size; + ulint mrec_size; + ulint avail_size; + mem_heap_t* heap = NULL; + dtuple_t* tuple; + const ulint n_fields = rec_get_n_fields_old(rec); + + ut_ad(!page_is_comp(page_align(rec))); + ut_ad(index->n_fields >= n_fields); + ut_ad(index->n_fields == n_fields || index->is_instant()); + ut_ad(dict_tf2_is_valid(index->table->flags, index->table->flags2)); + ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */ + ut_ad(dict_index_is_clust(new_index)); + + heap = mem_heap_create(DTUPLE_EST_ALLOC(n_fields)); + tuple = dtuple_create(heap, n_fields); + dict_index_copy_types(tuple, index, n_fields); + + dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index)); + + if (rec_get_1byte_offs_flag(rec)) { + for (ulint i = 0; i < n_fields; i++) { + dfield_t* dfield; + ulint len; + const void* field; + + dfield = dtuple_get_nth_field(tuple, i); + field = rec_get_nth_field_old(rec, i, &len); + + dfield_set_data(dfield, field, len); + } + } else { + for (ulint i = 0; i < n_fields; i++) { + dfield_t* dfield; + ulint len; + const void* field; + + dfield = dtuple_get_nth_field(tuple, i); + field = rec_get_nth_field_old(rec, i, &len); + + dfield_set_data(dfield, field, len); + + if (rec_2_is_field_extern(rec, i)) { + dfield_set_ext(dfield); + } + } + } + + dfield_t* db_trx_id = dtuple_get_nth_field(tuple, index->n_uniq); + ut_ad(dfield_get_len(db_trx_id) == DATA_TRX_ID_LEN); + ut_ad(dfield_get_len(db_trx_id + 1) == DATA_ROLL_PTR_LEN); + + if (trx_read_trx_id(static_cast<const byte*> + (dfield_get_data(db_trx_id))) + < index->online_log->min_trx) { + dfield_set_data(db_trx_id, reset_trx_id, DATA_TRX_ID_LEN); + dfield_set_data(db_trx_id + 1, reset_trx_id + DATA_TRX_ID_LEN, + DATA_ROLL_PTR_LEN); + } + + const bool is_instant = index->online_log->is_instant(index); + rec_comp_status_t status = is_instant + ? REC_STATUS_INSTANT : REC_STATUS_ORDINARY; + + size = rec_get_converted_size_temp<true>( + index, tuple->fields, tuple->n_fields, &extra_size, status); + if (is_instant) { + size++; + extra_size++; + } + + mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80); + + if (insert || index->online_log->same_pk) { + ut_ad(!old_pk); + old_pk_extra_size = old_pk_size = 0; + } else { + ut_ad(old_pk); + ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp); + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + + old_pk_size = rec_get_converted_size_temp<false>( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; + } + + if (byte* b = row_log_table_open(index->online_log, + mrec_size, &avail_size)) { + if (insert) { + *b++ = ROW_T_INSERT; + } else { + *b++ = ROW_T_UPDATE; + + if (old_pk_size) { + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp<false>( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + b += old_pk_size; + } + } + + if (extra_size < 0x80) { + *b++ = static_cast<byte>(extra_size); + } else { + ut_ad(extra_size < 0x8000); + *b++ = static_cast<byte>(0x80 | (extra_size >> 8)); + *b++ = static_cast<byte>(extra_size); + } + + if (status == REC_STATUS_INSTANT) { + ut_ad(is_instant); + if (n_fields <= index->online_log->n_core_fields) { + status = REC_STATUS_ORDINARY; + } + *b = status; + } + + rec_convert_dtuple_to_temp<true>( + b + extra_size, index, tuple->fields, tuple->n_fields, + status); + b += size; + + row_log_table_close(index, b, mrec_size, avail_size); + } + + mem_heap_free(heap); +} + +/******************************************************//** +Logs an insert or update to a table that is being rebuilt. */ +static +void +row_log_table_low( +/*==============*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + bool insert, /*!< in: true if insert, false if update */ + const dtuple_t* old_pk) /*!< in: old PRIMARY KEY value (if !insert + and a PRIMARY KEY is being created) */ +{ + ulint old_pk_size; + ulint old_pk_extra_size; + ulint extra_size; + ulint mrec_size; + ulint avail_size; + const dict_index_t* new_index; + row_log_t* log = index->online_log; + + new_index = dict_table_get_first_index(log->table); + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_clust(new_index)); + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); + ut_ad(rec_offs_size(offsets) <= sizeof log->tail.buf); + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); +#ifdef UNIV_DEBUG + switch (fil_page_get_type(page_align(rec))) { + case FIL_PAGE_INDEX: + break; + case FIL_PAGE_TYPE_INSTANT: + ut_ad(index->is_instant()); + ut_ad(!page_has_siblings(page_align(rec))); + ut_ad(page_get_page_no(page_align(rec)) == index->page); + break; + default: + ut_ad("wrong page type" == 0); + } +#endif /* UNIV_DEBUG */ + ut_ad(!rec_is_metadata(rec, *index)); + ut_ad(page_rec_is_leaf(rec)); + ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets)); + /* old_pk=row_log_table_get_pk() [not needed in INSERT] is a prefix + of the clustered index record (PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR), + with no information on virtual columns */ + ut_ad(!old_pk || !insert); + ut_ad(!old_pk || old_pk->n_v_fields == 0); + + if (index->online_status != ONLINE_INDEX_CREATION + || (index->type & DICT_CORRUPT) || index->table->corrupted + || log->error != DB_SUCCESS) { + return; + } + + if (!rec_offs_comp(offsets)) { + row_log_table_low_redundant( + rec, index, insert, old_pk, new_index); + return; + } + + ut_ad(page_is_comp(page_align(rec))); + ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY + || rec_get_status(rec) == REC_STATUS_INSTANT); + + const ulint omit_size = REC_N_NEW_EXTRA_BYTES; + + const ulint rec_extra_size = rec_offs_extra_size(offsets) - omit_size; + const bool is_instant = log->is_instant(index); + extra_size = rec_extra_size + is_instant; + + unsigned fake_extra_size = 0; + byte fake_extra_buf[3]; + if (is_instant && UNIV_UNLIKELY(!index->is_instant())) { + /* The source table was emptied after ALTER TABLE + started, and it was converted to non-instant format. + Because row_log_table_apply_op() expects to find + all records to be logged in the same way, we will + be unable to copy the rec_extra_size bytes from the + record header, but must convert them here. */ + unsigned n_add = index->n_fields - 1 - log->n_core_fields; + fake_extra_size = rec_get_n_add_field_len(n_add); + ut_ad(fake_extra_size == 1 || fake_extra_size == 2); + extra_size += fake_extra_size; + byte* fake_extra = fake_extra_buf + fake_extra_size; + rec_set_n_add_field(fake_extra, n_add); + ut_ad(fake_extra == fake_extra_buf); + } + + mrec_size = ROW_LOG_HEADER_SIZE + + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size + + is_instant + fake_extra_size; + + if (insert || log->same_pk) { + ut_ad(!old_pk); + old_pk_extra_size = old_pk_size = 0; + } else { + ut_ad(old_pk); + ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp); + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + + old_pk_size = rec_get_converted_size_temp<false>( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; + } + + if (byte* b = row_log_table_open(log, mrec_size, &avail_size)) { + if (insert) { + *b++ = ROW_T_INSERT; + } else { + *b++ = ROW_T_UPDATE; + + if (old_pk_size) { + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp<false>( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + b += old_pk_size; + } + } + + if (extra_size < 0x80) { + *b++ = static_cast<byte>(extra_size); + } else { + ut_ad(extra_size < 0x8000); + *b++ = static_cast<byte>(0x80 | (extra_size >> 8)); + *b++ = static_cast<byte>(extra_size); + } + + if (is_instant) { + *b++ = fake_extra_size + ? REC_STATUS_INSTANT + : rec_get_status(rec); + } else { + ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY); + } + + memcpy(b, rec - rec_extra_size - omit_size, rec_extra_size); + b += rec_extra_size; + memcpy(b, fake_extra_buf + 1, fake_extra_size); + b += fake_extra_size; + ulint len; + ulint trx_id_offs = rec_get_nth_field_offs( + offsets, index->n_uniq, &len); + ut_ad(len == DATA_TRX_ID_LEN); + memcpy(b, rec, rec_offs_data_size(offsets)); + if (trx_read_trx_id(b + trx_id_offs) < log->min_trx) { + memcpy(b + trx_id_offs, + reset_trx_id, sizeof reset_trx_id); + } + b += rec_offs_data_size(offsets); + + row_log_table_close(index, b, mrec_size, avail_size); + } +} + +/******************************************************//** +Logs an update to a table that is being rebuilt. +This will be merged in row_log_table_apply_update(). */ +void +row_log_table_update( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + const dtuple_t* old_pk) /*!< in: row_log_table_get_pk() + before the update */ +{ + row_log_table_low(rec, index, offsets, false, old_pk); +} + +/** Gets the old table column of a PRIMARY KEY column. +@param table old table (before ALTER TABLE) +@param col_map mapping of old column numbers to new ones +@param col_no column position in the new table +@return old table column, or NULL if this is an added column */ +static +const dict_col_t* +row_log_table_get_pk_old_col( +/*=========================*/ + const dict_table_t* table, + const ulint* col_map, + ulint col_no) +{ + for (ulint i = 0; i < table->n_cols; i++) { + if (col_no == col_map[i]) { + return(dict_table_get_nth_col(table, i)); + } + } + + return(NULL); +} + +/** Maps an old table column of a PRIMARY KEY column. +@param[in] ifield clustered index field in the new table (after +ALTER TABLE) +@param[in] index the clustered index of ifield +@param[in,out] dfield clustered index tuple field in the new table +@param[in,out] heap memory heap for allocating dfield contents +@param[in] rec clustered index leaf page record in the old +table +@param[in] offsets rec_get_offsets(rec) +@param[in] i rec field corresponding to col +@param[in] zip_size ROW_FORMAT=COMPRESSED size of the old table +@param[in] max_len maximum length of dfield +@param[in] log row log for the table +@retval DB_INVALID_NULL if a NULL value is encountered +@retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */ +static +dberr_t +row_log_table_get_pk_col( + const dict_field_t* ifield, + const dict_index_t* index, + dfield_t* dfield, + mem_heap_t* heap, + const rec_t* rec, + const rec_offs* offsets, + ulint i, + ulint zip_size, + ulint max_len, + const row_log_t* log) +{ + const byte* field; + ulint len; + + field = rec_get_nth_field(rec, offsets, i, &len); + + if (len == UNIV_SQL_DEFAULT) { + field = log->instant_field_value(i, &len); + } + + if (len == UNIV_SQL_NULL) { + if (!log->allow_not_null) { + return(DB_INVALID_NULL); + } + + unsigned col_no= ifield->col->ind; + ut_ad(col_no < log->defaults->n_fields); + + field = static_cast<const byte*>( + log->defaults->fields[col_no].data); + if (!field) { + return(DB_INVALID_NULL); + } + len = log->defaults->fields[col_no].len; + } + + if (rec_offs_nth_extern(offsets, i)) { + ulint field_len = ifield->prefix_len; + byte* blob_field; + + if (!field_len) { + field_len = ifield->fixed_len; + if (!field_len) { + field_len = max_len + 1; + } + } + + blob_field = static_cast<byte*>( + mem_heap_alloc(heap, field_len)); + + len = btr_copy_externally_stored_field_prefix( + blob_field, field_len, zip_size, field, len); + if (len >= max_len + 1) { + return(DB_TOO_BIG_INDEX_COL); + } + + dfield_set_data(dfield, blob_field, len); + } else { + dfield_set_data(dfield, mem_heap_dup(heap, field, len), len); + } + + return(DB_SUCCESS); +} + +/******************************************************//** +Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR +of a table that is being rebuilt. +@return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table, +or NULL if the PRIMARY KEY definition does not change */ +const dtuple_t* +row_log_table_get_pk( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets,/*!< in: rec_get_offsets(rec,index) */ + byte* sys, /*!< out: DB_TRX_ID,DB_ROLL_PTR for + row_log_table_delete(), or NULL */ + mem_heap_t** heap) /*!< in/out: memory heap where allocated */ +{ + dtuple_t* tuple = NULL; + row_log_t* log = index->online_log; + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(!offsets || rec_offs_validate(rec, index, offsets)); + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); + + ut_ad(log); + ut_ad(log->table); + ut_ad(log->min_trx); + + if (log->same_pk) { + /* The PRIMARY KEY columns are unchanged. */ + if (sys) { + /* Store the DB_TRX_ID,DB_ROLL_PTR. */ + ulint trx_id_offs = index->trx_id_offset; + + if (!trx_id_offs) { + ulint len; + + if (!offsets) { + offsets = rec_get_offsets( + rec, index, nullptr, + index->n_core_fields, + index->db_trx_id() + 1, heap); + } + + trx_id_offs = rec_get_nth_field_offs( + offsets, index->db_trx_id(), &len); + ut_ad(len == DATA_TRX_ID_LEN); + } + + const byte* ptr = trx_read_trx_id(rec + trx_id_offs) + < log->min_trx + ? reset_trx_id + : rec + trx_id_offs; + + memcpy(sys, ptr, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + ut_d(trx_id_check(sys, log->min_trx)); + } + + return(NULL); + } + + mutex_enter(&log->mutex); + + /* log->error is protected by log->mutex. */ + if (log->error == DB_SUCCESS) { + dict_table_t* new_table = log->table; + dict_index_t* new_index + = dict_table_get_first_index(new_table); + const ulint new_n_uniq + = dict_index_get_n_unique(new_index); + + if (!*heap) { + ulint size = 0; + + if (!offsets) { + size += (1 + REC_OFFS_HEADER_SIZE + + unsigned(index->n_fields)) + * sizeof *offsets; + } + + for (ulint i = 0; i < new_n_uniq; i++) { + size += dict_col_get_min_size( + dict_index_get_nth_col(new_index, i)); + } + + *heap = mem_heap_create( + DTUPLE_EST_ALLOC(new_n_uniq + 2) + size); + } + + if (!offsets) { + offsets = rec_get_offsets(rec, index, nullptr, + index->n_core_fields, + ULINT_UNDEFINED, heap); + } + + tuple = dtuple_create(*heap, new_n_uniq + 2); + dict_index_copy_types(tuple, new_index, tuple->n_fields); + dtuple_set_n_fields_cmp(tuple, new_n_uniq); + + const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table); + + const ulint zip_size = index->table->space->zip_size(); + + for (ulint new_i = 0; new_i < new_n_uniq; new_i++) { + dict_field_t* ifield; + dfield_t* dfield; + ulint prtype; + ulint mbminlen, mbmaxlen; + + ifield = dict_index_get_nth_field(new_index, new_i); + dfield = dtuple_get_nth_field(tuple, new_i); + + const ulint col_no + = dict_field_get_col(ifield)->ind; + + if (const dict_col_t* col + = row_log_table_get_pk_old_col( + index->table, log->col_map, col_no)) { + ulint i = dict_col_get_clust_pos(col, index); + + if (i == ULINT_UNDEFINED) { + ut_ad(0); + log->error = DB_CORRUPTION; + goto err_exit; + } + + log->error = row_log_table_get_pk_col( + ifield, new_index, dfield, *heap, + rec, offsets, i, zip_size, max_len, + log); + + if (log->error != DB_SUCCESS) { +err_exit: + tuple = NULL; + goto func_exit; + } + + mbminlen = col->mbminlen; + mbmaxlen = col->mbmaxlen; + prtype = col->prtype; + } else { + /* No matching column was found in the old + table, so this must be an added column. + Copy the default value. */ + ut_ad(log->defaults); + + dfield_copy(dfield, dtuple_get_nth_field( + log->defaults, col_no)); + mbminlen = dfield->type.mbminlen; + mbmaxlen = dfield->type.mbmaxlen; + prtype = dfield->type.prtype; + } + + ut_ad(!dfield_is_ext(dfield)); + ut_ad(!dfield_is_null(dfield)); + + if (ifield->prefix_len) { + ulint len = dtype_get_at_most_n_mbchars( + prtype, mbminlen, mbmaxlen, + ifield->prefix_len, + dfield_get_len(dfield), + static_cast<const char*>( + dfield_get_data(dfield))); + + ut_ad(len <= dfield_get_len(dfield)); + dfield_set_len(dfield, len); + } + } + + const byte* trx_roll = rec + + row_get_trx_id_offset(index, offsets); + + /* Copy the fields, because the fields will be updated + or the record may be moved somewhere else in the B-tree + as part of the upcoming operation. */ + if (trx_read_trx_id(trx_roll) < log->min_trx) { + trx_roll = reset_trx_id; + if (sys) { + memcpy(sys, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + } + } else if (sys) { + memcpy(sys, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + trx_roll = sys; + } else { + trx_roll = static_cast<const byte*>( + mem_heap_dup( + *heap, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)); + } + + ut_d(trx_id_check(trx_roll, log->min_trx)); + + dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq), + trx_roll, DATA_TRX_ID_LEN); + dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1), + trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN); + } + +func_exit: + mutex_exit(&log->mutex); + return(tuple); +} + +/******************************************************//** +Logs an insert to a table that is being rebuilt. +This will be merged in row_log_table_apply_insert(). */ +void +row_log_table_insert( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const rec_offs* offsets)/*!< in: rec_get_offsets(rec,index) */ +{ + row_log_table_low(rec, index, offsets, true, NULL); +} + +/******************************************************//** +Notes that a BLOB is being freed during online ALTER TABLE. */ +void +row_log_table_blob_free( +/*====================*/ + dict_index_t* index, /*!< in/out: clustered index, X-latched */ + ulint page_no)/*!< in: starting page number of the BLOB */ +{ + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); + ut_ad(page_no != FIL_NULL); + + if (index->online_log->error != DB_SUCCESS) { + return; + } + + page_no_map* blobs = index->online_log->blobs; + + if (blobs == NULL) { + index->online_log->blobs = blobs = UT_NEW_NOKEY(page_no_map()); + } + +#ifdef UNIV_DEBUG + const ulonglong log_pos = index->online_log->tail.total; +#else +# define log_pos /* empty */ +#endif /* UNIV_DEBUG */ + + const page_no_map::value_type v(page_no, + row_log_table_blob_t(log_pos)); + + std::pair<page_no_map::iterator,bool> p = blobs->insert(v); + + if (!p.second) { + /* Update the existing mapping. */ + ut_ad(p.first->first == page_no); + p.first->second.blob_free(log_pos); + } +#undef log_pos +} + +/******************************************************//** +Notes that a BLOB is being allocated during online ALTER TABLE. */ +void +row_log_table_blob_alloc( +/*=====================*/ + dict_index_t* index, /*!< in/out: clustered index, X-latched */ + ulint page_no)/*!< in: starting page number of the BLOB */ +{ + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); + + ut_ad(page_no != FIL_NULL); + + if (index->online_log->error != DB_SUCCESS) { + return; + } + + /* Only track allocations if the same page has been freed + earlier. Double allocation without a free is not allowed. */ + if (page_no_map* blobs = index->online_log->blobs) { + page_no_map::iterator p = blobs->find(page_no); + + if (p != blobs->end()) { + ut_ad(p->first == page_no); + p->second.blob_alloc(index->online_log->tail.total); + } + } +} + +/******************************************************//** +Converts a log record to a table row. +@return converted row, or NULL if the conversion fails */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +const dtuple_t* +row_log_table_apply_convert_mrec( +/*=============================*/ + const mrec_t* mrec, /*!< in: merge record */ + dict_index_t* index, /*!< in: index of mrec */ + const rec_offs* offsets, /*!< in: offsets of mrec */ + row_log_t* log, /*!< in: rebuild context */ + mem_heap_t* heap, /*!< in/out: memory heap */ + dberr_t* error) /*!< out: DB_SUCCESS or + DB_MISSING_HISTORY or + reason of failure */ +{ + dtuple_t* row; + + log->n_rows++; + *error = DB_SUCCESS; + + /* This is based on row_build(). */ + if (log->defaults) { + row = dtuple_copy(log->defaults, heap); + /* dict_table_copy_types() would set the fields to NULL */ + for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) { + dict_col_copy_type( + dict_table_get_nth_col(log->table, i), + dfield_get_type(dtuple_get_nth_field(row, i))); + } + } else { + row = dtuple_create(heap, dict_table_get_n_cols(log->table)); + dict_table_copy_types(row, log->table); + } + + for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) { + const dict_field_t* ind_field + = dict_index_get_nth_field(index, i); + + if (ind_field->prefix_len) { + /* Column prefixes can only occur in key + fields, which cannot be stored externally. For + a column prefix, there should also be the full + field in the clustered index tuple. The row + tuple comprises full fields, not prefixes. */ + ut_ad(!rec_offs_nth_extern(offsets, i)); + continue; + } + + const dict_col_t* col + = dict_field_get_col(ind_field); + + if (col->is_dropped()) { + /* the column was instantly dropped earlier */ + ut_ad(index->table->instant); + continue; + } + + ulint col_no + = log->col_map[dict_col_get_no(col)]; + + if (col_no == ULINT_UNDEFINED) { + /* the column is being dropped now */ + continue; + } + + dfield_t* dfield + = dtuple_get_nth_field(row, col_no); + + ulint len; + const byte* data; + + if (rec_offs_nth_extern(offsets, i)) { + ut_ad(rec_offs_any_extern(offsets)); + rw_lock_x_lock(dict_index_get_lock(index)); + + if (const page_no_map* blobs = log->blobs) { + data = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE); + + ulint page_no = mach_read_from_4( + data + len - (BTR_EXTERN_FIELD_REF_SIZE + - BTR_EXTERN_PAGE_NO)); + page_no_map::const_iterator p = blobs->find( + page_no); + if (p != blobs->end() + && p->second.is_freed(log->head.total)) { + /* This BLOB has been freed. + We must not access the row. */ + *error = DB_MISSING_HISTORY; + dfield_set_data(dfield, data, len); + dfield_set_ext(dfield); + goto blob_done; + } + } + + data = btr_rec_copy_externally_stored_field( + mrec, offsets, + index->table->space->zip_size(), + i, &len, heap); + ut_a(data); + dfield_set_data(dfield, data, len); +blob_done: + rw_lock_x_unlock(dict_index_get_lock(index)); + } else { + data = rec_get_nth_field(mrec, offsets, i, &len); + if (len == UNIV_SQL_DEFAULT) { + data = log->instant_field_value(i, &len); + } + dfield_set_data(dfield, data, len); + } + + if (len != UNIV_SQL_NULL && col->mtype == DATA_MYSQL + && col->len != len && !dict_table_is_comp(log->table)) { + + ut_ad(col->len >= len); + if (dict_table_is_comp(index->table)) { + byte* buf = (byte*) mem_heap_alloc(heap, + col->len); + memcpy(buf, dfield->data, len); + memset(buf + len, 0x20, col->len - len); + + dfield_set_data(dfield, buf, col->len); + } else { + /* field length mismatch should not happen + when rebuilding the redundant row format + table. */ + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + } + } + + /* See if any columns were changed to NULL or NOT NULL. */ + const dict_col_t* new_col + = dict_table_get_nth_col(log->table, col_no); + ut_ad(new_col->same_format(*col)); + + /* Assert that prtype matches except for nullability. */ + ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype) + & ~(DATA_NOT_NULL | DATA_VERSIONED + | CHAR_COLL_MASK << 16 | DATA_LONG_TRUE_VARCHAR))); + + if (new_col->prtype == col->prtype) { + continue; + } + + if ((new_col->prtype & DATA_NOT_NULL) + && dfield_is_null(dfield)) { + + const dfield_t& default_field + = log->defaults->fields[col_no]; + + Field* field = log->old_table->field[col->ind]; + + field->set_warning(Sql_condition::WARN_LEVEL_WARN, + WARN_DATA_TRUNCATED, 1, + ulong(log->n_rows)); + + if (!log->allow_not_null) { + /* We got a NULL value for a NOT NULL column. */ + *error = DB_INVALID_NULL; + return NULL; + } + + *dfield = default_field; + } + + /* Adjust the DATA_NOT_NULL flag in the parsed row. */ + dfield_get_type(dfield)->prtype = new_col->prtype; + + ut_ad(dict_col_type_assert_equal(new_col, + dfield_get_type(dfield))); + } + + return(row); +} + +/******************************************************//** +Replays an insert operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_insert_low( +/*===========================*/ + que_thr_t* thr, /*!< in: query graph */ + const dtuple_t* row, /*!< in: table row + in the old table definition */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup) /*!< in/out: for reporting + duplicate key errors */ +{ + dberr_t error; + dtuple_t* entry; + const row_log_t*log = dup->index->online_log; + dict_index_t* index = dict_table_get_first_index(log->table); + ulint n_index = 0; + + ut_ad(dtuple_validate(row)); + + DBUG_LOG("ib_alter_table", + "insert table " << index->table->id << " (index " + << index->id << "): " << rec_printer(row).str()); + + static const ulint flags + = (BTR_CREATE_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG + | BTR_KEEP_SYS_FLAG); + + entry = row_build_index_entry(row, NULL, index, heap); + + error = row_ins_clust_index_entry_low( + flags, BTR_MODIFY_TREE, index, index->n_uniq, + entry, 0, thr); + + switch (error) { + case DB_SUCCESS: + break; + case DB_SUCCESS_LOCKED_REC: + /* The row had already been copied to the table. */ + return(DB_SUCCESS); + default: + return(error); + } + + ut_ad(dict_index_is_clust(index)); + + for (n_index += index->type != DICT_CLUSTERED; + (index = dict_table_get_next_index(index)); n_index++) { + if (index->type & DICT_FTS) { + continue; + } + + entry = row_build_index_entry(row, NULL, index, heap); + error = row_ins_sec_index_entry_low( + flags, BTR_MODIFY_TREE, + index, offsets_heap, heap, entry, + thr_get_trx(thr)->id, thr); + + if (error != DB_SUCCESS) { + if (error == DB_DUPLICATE_KEY) { + thr_get_trx(thr)->error_key_num = n_index; + } + break; + } + } + + return(error); +} + +/******************************************************//** +Replays an insert operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_insert( +/*=======================*/ + que_thr_t* thr, /*!< in: query graph */ + const mrec_t* mrec, /*!< in: record to insert */ + const rec_offs* offsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup) /*!< in/out: for reporting + duplicate key errors */ +{ + row_log_t*log = dup->index->online_log; + dberr_t error; + const dtuple_t* row = row_log_table_apply_convert_mrec( + mrec, dup->index, offsets, log, heap, &error); + + switch (error) { + case DB_MISSING_HISTORY: + ut_ad(log->blobs); + /* Because some BLOBs are missing, we know that the + transaction was rolled back later (a rollback of + an insert can free BLOBs). + We can simply skip the insert: the subsequent + ROW_T_DELETE will be ignored, or a ROW_T_UPDATE will + be interpreted as ROW_T_INSERT. */ + return(DB_SUCCESS); + case DB_SUCCESS: + ut_ad(row != NULL); + break; + default: + ut_ad(0); + /* fall through */ + case DB_INVALID_NULL: + ut_ad(row == NULL); + return(error); + } + + error = row_log_table_apply_insert_low( + thr, row, offsets_heap, heap, dup); + if (error != DB_SUCCESS) { + /* Report the erroneous row using the new + version of the table. */ + innobase_row_to_mysql(dup->table, log->table, row); + } + return(error); +} + +/******************************************************//** +Deletes a record from a table that is being rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_delete_low( +/*===========================*/ + btr_pcur_t* pcur, /*!< in/out: B-tree cursor, + will be trashed */ + const rec_offs* offsets, /*!< in: offsets on pcur */ + mem_heap_t* heap, /*!< in/out: memory heap */ + mtr_t* mtr) /*!< in/out: mini-transaction, + will be committed */ +{ + dberr_t error; + row_ext_t* ext; + dtuple_t* row; + dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index; + + ut_ad(dict_index_is_clust(index)); + + DBUG_LOG("ib_alter_table", + "delete table " << index->table->id << " (index " + << index->id << "): " + << rec_printer(btr_pcur_get_rec(pcur), offsets).str()); + + if (dict_table_get_next_index(index)) { + /* Build a row template for purging secondary index entries. */ + row = row_build( + ROW_COPY_DATA, index, btr_pcur_get_rec(pcur), + offsets, NULL, NULL, NULL, &ext, heap); + } else { + row = NULL; + } + + btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur), + BTR_CREATE_FLAG, false, mtr); + mtr_commit(mtr); + + if (error != DB_SUCCESS) { + return(error); + } + + while ((index = dict_table_get_next_index(index)) != NULL) { + if (index->type & DICT_FTS) { + continue; + } + + const dtuple_t* entry = row_build_index_entry( + row, ext, index, heap); + mtr->start(); + index->set_modified(*mtr); + btr_pcur_open(index, entry, PAGE_CUR_LE, + BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE, + pcur, mtr); +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + /* We did not request buffering. */ + break; + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + goto flag_ok; + } + ut_ad(0); +flag_ok: +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(pcur)) + || btr_pcur_get_low_match(pcur) < index->n_uniq) { + /* All secondary index entries should be + found, because new_table is being modified by + this thread only, and all indexes should be + updated in sync. */ + mtr->commit(); + return(DB_INDEX_CORRUPT); + } + + btr_cur_pessimistic_delete(&error, FALSE, + btr_pcur_get_btr_cur(pcur), + BTR_CREATE_FLAG, false, mtr); + mtr->commit(); + } + + return(error); +} + +/******************************************************//** +Replays a delete operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_delete( +/*=======================*/ + ulint trx_id_col, /*!< in: position of + DB_TRX_ID in the new + clustered index */ + const mrec_t* mrec, /*!< in: merge record */ + const rec_offs* moffsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + const row_log_t* log) /*!< in: online log */ +{ + dict_table_t* new_table = log->table; + dict_index_t* index = dict_table_get_first_index(new_table); + dtuple_t* old_pk; + mtr_t mtr; + btr_pcur_t pcur; + rec_offs* offsets; + + ut_ad(rec_offs_n_fields(moffsets) == index->first_user_field()); + ut_ad(!rec_offs_any_extern(moffsets)); + + /* Convert the row to a search tuple. */ + old_pk = dtuple_create(heap, index->n_uniq); + dict_index_copy_types(old_pk, index, index->n_uniq); + + for (ulint i = 0; i < index->n_uniq; i++) { + ulint len; + const void* field; + field = rec_get_nth_field(mrec, moffsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + dfield_set_data(dtuple_get_nth_field(old_pk, i), + field, len); + } + + mtr_start(&mtr); + index->set_modified(mtr); + btr_pcur_open(index, old_pk, PAGE_CUR_LE, + BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE, + &pcur, &mtr); +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(&pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + /* We did not request buffering. */ + break; + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + goto flag_ok; + } + ut_ad(0); +flag_ok: +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(&pcur)) + || btr_pcur_get_low_match(&pcur) < index->n_uniq) { +all_done: + mtr_commit(&mtr); + /* The record was not found. All done. */ + /* This should only happen when an earlier + ROW_T_INSERT was skipped or + ROW_T_UPDATE was interpreted as ROW_T_DELETE + due to BLOBs having been freed by rollback. */ + return(DB_SUCCESS); + } + + offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, nullptr, + index->n_core_fields, + ULINT_UNDEFINED, &offsets_heap); +#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG + ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets)); +#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ + + /* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */ + + { + ulint len; + const byte* mrec_trx_id + = rec_get_nth_field(mrec, moffsets, trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + const byte* rec_trx_id + = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets, + trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + ut_d(trx_id_check(rec_trx_id, log->min_trx)); + ut_d(trx_id_check(mrec_trx_id, log->min_trx)); + + ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len) + == mrec_trx_id + DATA_TRX_ID_LEN); + ut_ad(len == DATA_ROLL_PTR_LEN); + ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets, + trx_id_col + 1, &len) + == rec_trx_id + DATA_TRX_ID_LEN); + ut_ad(len == DATA_ROLL_PTR_LEN); + + if (memcmp(mrec_trx_id, rec_trx_id, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { + /* The ROW_T_DELETE was logged for a different + PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR. + This is possible if a ROW_T_INSERT was skipped + or a ROW_T_UPDATE was interpreted as ROW_T_DELETE + because some BLOBs were missing due to + (1) rolling back the initial insert, or + (2) purging the BLOB for a later ROW_T_DELETE + (3) purging 'old values' for a later ROW_T_UPDATE + or ROW_T_DELETE. */ + ut_ad(!log->same_pk); + goto all_done; + } + } + + return row_log_table_apply_delete_low(&pcur, offsets, heap, &mtr); +} + +/******************************************************//** +Replays an update operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_update( +/*=======================*/ + que_thr_t* thr, /*!< in: query graph */ + ulint new_trx_id_col, /*!< in: position of + DB_TRX_ID in the new + clustered index */ + const mrec_t* mrec, /*!< in: new value */ + const rec_offs* offsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup, /*!< in/out: for reporting + duplicate key errors */ + const dtuple_t* old_pk) /*!< in: PRIMARY KEY and + DB_TRX_ID,DB_ROLL_PTR + of the old value, + or PRIMARY KEY if same_pk */ +{ + row_log_t* log = dup->index->online_log; + const dtuple_t* row; + dict_index_t* index = dict_table_get_first_index(log->table); + mtr_t mtr; + btr_pcur_t pcur; + dberr_t error; + ulint n_index = 0; + + ut_ad(dtuple_get_n_fields_cmp(old_pk) + == dict_index_get_n_unique(index)); + ut_ad(dtuple_get_n_fields(old_pk) - (log->same_pk ? 0 : 2) + == dict_index_get_n_unique(index)); + + row = row_log_table_apply_convert_mrec( + mrec, dup->index, offsets, log, heap, &error); + + switch (error) { + case DB_MISSING_HISTORY: + /* The record contained BLOBs that are now missing. */ + ut_ad(log->blobs); + /* Whether or not we are updating the PRIMARY KEY, we + know that there should be a subsequent + ROW_T_DELETE for rolling back a preceding ROW_T_INSERT, + overriding this ROW_T_UPDATE record. (*1) + + This allows us to interpret this ROW_T_UPDATE + as ROW_T_DELETE. + + When applying the subsequent ROW_T_DELETE, no matching + record will be found. */ + /* fall through */ + case DB_SUCCESS: + ut_ad(row != NULL); + break; + default: + ut_ad(0); + /* fall through */ + case DB_INVALID_NULL: + ut_ad(row == NULL); + return(error); + } + + mtr_start(&mtr); + index->set_modified(mtr); + btr_pcur_open(index, old_pk, PAGE_CUR_LE, + BTR_MODIFY_TREE, &pcur, &mtr); +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(&pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + ut_ad(0);/* We did not request buffering. */ + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + break; + } +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(&pcur)) + || btr_pcur_get_low_match(&pcur) < index->n_uniq) { + /* The record was not found. This should only happen + when an earlier ROW_T_INSERT or ROW_T_UPDATE was + diverted because BLOBs were freed when the insert was + later rolled back. */ + + ut_ad(log->blobs); + + if (error == DB_SUCCESS) { + /* An earlier ROW_T_INSERT could have been + skipped because of a missing BLOB, like this: + + BEGIN; + INSERT INTO t SET blob_col='blob value'; + UPDATE t SET blob_col=''; + ROLLBACK; + + This would generate the following records: + ROW_T_INSERT (referring to 'blob value') + ROW_T_UPDATE + ROW_T_UPDATE (referring to 'blob value') + ROW_T_DELETE + [ROLLBACK removes the 'blob value'] + + The ROW_T_INSERT would have been skipped + because of a missing BLOB. Now we are + executing the first ROW_T_UPDATE. + The second ROW_T_UPDATE (for the ROLLBACK) + would be interpreted as ROW_T_DELETE, because + the BLOB would be missing. + + We could probably assume that the transaction + has been rolled back and simply skip the + 'insert' part of this ROW_T_UPDATE record. + However, there might be some complex scenario + that could interfere with such a shortcut. + So, we will insert the row (and risk + introducing a bogus duplicate key error + for the ALTER TABLE), and a subsequent + ROW_T_UPDATE or ROW_T_DELETE will delete it. */ + mtr_commit(&mtr); + error = row_log_table_apply_insert_low( + thr, row, offsets_heap, heap, dup); + } else { + /* Some BLOBs are missing, so we are interpreting + this ROW_T_UPDATE as ROW_T_DELETE (see *1). + Because the record was not found, we do nothing. */ + ut_ad(error == DB_MISSING_HISTORY); + error = DB_SUCCESS; +func_exit: + mtr_commit(&mtr); + } +func_exit_committed: + ut_ad(mtr.has_committed()); + + if (error != DB_SUCCESS) { + /* Report the erroneous row using the new + version of the table. */ + innobase_row_to_mysql(dup->table, log->table, row); + } + + return(error); + } + + /* Prepare to update (or delete) the record. */ + rec_offs* cur_offsets = rec_get_offsets( + btr_pcur_get_rec(&pcur), index, nullptr, index->n_core_fields, + ULINT_UNDEFINED, &offsets_heap); + + if (!log->same_pk) { + /* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what + was buffered. */ + ulint len; + const byte* rec_trx_id + = rec_get_nth_field(btr_pcur_get_rec(&pcur), + cur_offsets, index->n_uniq, &len); + const dfield_t* old_pk_trx_id + = dtuple_get_nth_field(old_pk, index->n_uniq); + ut_ad(len == DATA_TRX_ID_LEN); + ut_d(trx_id_check(rec_trx_id, log->min_trx)); + ut_ad(old_pk_trx_id->len == DATA_TRX_ID_LEN); + ut_ad(old_pk_trx_id[1].len == DATA_ROLL_PTR_LEN); + ut_ad(DATA_TRX_ID_LEN + + static_cast<const char*>(old_pk_trx_id->data) + == old_pk_trx_id[1].data); + ut_d(trx_id_check(old_pk_trx_id->data, log->min_trx)); + + if (memcmp(rec_trx_id, old_pk_trx_id->data, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { + /* The ROW_T_UPDATE was logged for a different + DB_TRX_ID,DB_ROLL_PTR. This is possible if an + earlier ROW_T_INSERT or ROW_T_UPDATE was diverted + because some BLOBs were missing due to rolling + back the initial insert or due to purging + the old BLOB values of an update. */ + ut_ad(log->blobs); + if (error != DB_SUCCESS) { + ut_ad(error == DB_MISSING_HISTORY); + /* Some BLOBs are missing, so we are + interpreting this ROW_T_UPDATE as + ROW_T_DELETE (see *1). + Because this is a different row, + we will do nothing. */ + error = DB_SUCCESS; + } else { + /* Because the user record is missing due to + BLOBs that were missing when processing + an earlier log record, we should + interpret the ROW_T_UPDATE as ROW_T_INSERT. + However, there is a different user record + with the same PRIMARY KEY value already. */ + error = DB_DUPLICATE_KEY; + } + + goto func_exit; + } + } + + if (error != DB_SUCCESS) { + ut_ad(error == DB_MISSING_HISTORY); + ut_ad(log->blobs); + /* Some BLOBs are missing, so we are interpreting + this ROW_T_UPDATE as ROW_T_DELETE (see *1). */ + error = row_log_table_apply_delete_low( + &pcur, cur_offsets, heap, &mtr); + goto func_exit_committed; + } + + dtuple_t* entry = row_build_index_entry_low( + row, NULL, index, heap, ROW_BUILD_NORMAL); + upd_t* update = row_upd_build_difference_binary( + index, entry, btr_pcur_get_rec(&pcur), cur_offsets, + false, NULL, heap, dup->table, &error); + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (!update->n_fields) { + /* Nothing to do. */ + goto func_exit; + } + + const bool pk_updated + = upd_get_nth_field(update, 0)->field_no < new_trx_id_col; + + if (pk_updated || rec_offs_any_extern(cur_offsets)) { + /* If the record contains any externally stored + columns, perform the update by delete and insert, + because we will not write any undo log that would + allow purge to free any orphaned externally stored + columns. */ + + if (pk_updated && log->same_pk) { + /* The ROW_T_UPDATE log record should only be + written when the PRIMARY KEY fields of the + record did not change in the old table. We + can only get a change of PRIMARY KEY columns + in the rebuilt table if the PRIMARY KEY was + redefined (!same_pk). */ + ut_ad(0); + error = DB_CORRUPTION; + goto func_exit; + } + + error = row_log_table_apply_delete_low( + &pcur, cur_offsets, heap, &mtr); + ut_ad(mtr.has_committed()); + + if (error == DB_SUCCESS) { + error = row_log_table_apply_insert_low( + thr, row, offsets_heap, heap, dup); + } + + goto func_exit_committed; + } + + dtuple_t* old_row; + row_ext_t* old_ext; + + if (dict_table_get_next_index(index)) { + /* Construct the row corresponding to the old value of + the record. */ + old_row = row_build( + ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur), + cur_offsets, NULL, NULL, NULL, &old_ext, heap); + ut_ad(old_row); + + DBUG_LOG("ib_alter_table", + "update table " << index->table->id + << " (index " << index->id + << ": " << rec_printer(old_row).str() + << " to " << rec_printer(row).str()); + } else { + old_row = NULL; + old_ext = NULL; + } + + big_rec_t* big_rec; + + error = btr_cur_pessimistic_update( + BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG + | BTR_KEEP_POS_FLAG, + btr_pcur_get_btr_cur(&pcur), + &cur_offsets, &offsets_heap, heap, &big_rec, + update, 0, thr, 0, &mtr); + + if (big_rec) { + if (error == DB_SUCCESS) { + error = btr_store_big_rec_extern_fields( + &pcur, cur_offsets, big_rec, &mtr, + BTR_STORE_UPDATE); + } + + dtuple_big_rec_free(big_rec); + } + + for (n_index += index->type != DICT_CLUSTERED; + (index = dict_table_get_next_index(index)); n_index++) { + if (index->type & DICT_FTS) { + continue; + } + + if (error != DB_SUCCESS) { + break; + } + + if (!row_upd_changes_ord_field_binary( + index, update, thr, old_row, NULL)) { + continue; + } + + if (dict_index_has_virtual(index)) { + dtuple_copy_v_fields(old_row, old_pk); + } + + mtr_commit(&mtr); + + entry = row_build_index_entry(old_row, old_ext, index, heap); + if (!entry) { + ut_ad(0); + return(DB_CORRUPTION); + } + + mtr_start(&mtr); + index->set_modified(mtr); + + if (ROW_FOUND != row_search_index_entry( + index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) { + ut_ad(0); + error = DB_CORRUPTION; + break; + } + + btr_cur_pessimistic_delete( + &error, FALSE, btr_pcur_get_btr_cur(&pcur), + BTR_CREATE_FLAG, false, &mtr); + + if (error != DB_SUCCESS) { + break; + } + + mtr_commit(&mtr); + + entry = row_build_index_entry(row, NULL, index, heap); + error = row_ins_sec_index_entry_low( + BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG, + BTR_MODIFY_TREE, index, offsets_heap, heap, + entry, thr_get_trx(thr)->id, thr); + + /* Report correct index name for duplicate key error. */ + if (error == DB_DUPLICATE_KEY) { + thr_get_trx(thr)->error_key_num = n_index; + } + + mtr_start(&mtr); + index->set_modified(mtr); + } + + goto func_exit; +} + +/******************************************************//** +Applies an operation to a table that was rebuilt. +@return NULL on failure (mrec corruption) or when out of data; +pointer to next record on success */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +const mrec_t* +row_log_table_apply_op( +/*===================*/ + que_thr_t* thr, /*!< in: query graph */ + ulint new_trx_id_col, /*!< in: position of + DB_TRX_ID in new index */ + row_merge_dup_t* dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS + or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + const mrec_t* mrec, /*!< in: merge record */ + const mrec_t* mrec_end, /*!< in: end of buffer */ + rec_offs* offsets) /*!< in/out: work area + for parsing mrec */ +{ + row_log_t* log = dup->index->online_log; + dict_index_t* new_index = dict_table_get_first_index(log->table); + ulint extra_size; + const mrec_t* next_mrec; + dtuple_t* old_pk; + + ut_ad(dict_index_is_clust(dup->index)); + ut_ad(dup->index->table != log->table); + ut_ad(log->head.total <= log->tail.total); + + *error = DB_SUCCESS; + + /* 3 = 1 (op type) + 1 (extra_size) + at least 1 byte payload */ + if (mrec + 3 >= mrec_end) { + return(NULL); + } + + const bool is_instant = log->is_instant(dup->index); + const mrec_t* const mrec_start = mrec; + + switch (*mrec++) { + default: + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + case ROW_T_INSERT: + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + ut_ad(extra_size || !is_instant); + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets, + log->n_core_fields, log->non_core_fields, + is_instant + ? static_cast<rec_comp_status_t>( + *(mrec - extra_size)) + : REC_STATUS_ORDINARY); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } else { + log->head.total += ulint(next_mrec - mrec_start); + *error = row_log_table_apply_insert( + thr, mrec, offsets, offsets_heap, + heap, dup); + } + break; + + case ROW_T_DELETE: + /* 1 (extra_size) + at least 1 (payload) */ + if (mrec + 2 >= mrec_end) { + return(NULL); + } + + extra_size = *mrec++; + ut_ad(mrec < mrec_end); + + /* We assume extra_size < 0x100 for the PRIMARY KEY prefix. + For fixed-length PRIMARY key columns, it is 0. */ + mrec += extra_size; + + /* The ROW_T_DELETE record was converted by + rec_convert_dtuple_to_temp() using new_index. */ + ut_ad(!new_index->is_instant()); + rec_offs_set_n_fields(offsets, new_index->first_user_field()); + rec_init_offsets_temp(mrec, new_index, offsets); + next_mrec = mrec + rec_offs_data_size(offsets); + if (next_mrec > mrec_end) { + return(NULL); + } + + log->head.total += ulint(next_mrec - mrec_start); + + *error = row_log_table_apply_delete( + new_trx_id_col, + mrec, offsets, offsets_heap, heap, log); + break; + + case ROW_T_UPDATE: + /* Logically, the log entry consists of the + (PRIMARY KEY,DB_TRX_ID) of the old value (converted + to the new primary key definition) followed by + the new value in the old table definition. If the + definition of the columns belonging to PRIMARY KEY + is not changed, the log will only contain + DB_TRX_ID,new_row. */ + + if (log->same_pk) { + ut_ad(new_index->n_uniq == dup->index->n_uniq); + + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + ut_ad(extra_size || !is_instant); + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets, + log->n_core_fields, + log->non_core_fields, + is_instant + ? static_cast<rec_comp_status_t>( + *(mrec - extra_size)) + : REC_STATUS_ORDINARY); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } + + old_pk = dtuple_create(heap, new_index->n_uniq); + dict_index_copy_types( + old_pk, new_index, old_pk->n_fields); + + /* Copy the PRIMARY KEY fields from mrec to old_pk. */ + for (ulint i = 0; i < new_index->n_uniq; i++) { + const void* field; + ulint len; + dfield_t* dfield; + + ut_ad(!rec_offs_nth_extern(offsets, i)); + + field = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + + dfield = dtuple_get_nth_field(old_pk, i); + dfield_set_data(dfield, field, len); + } + } else { + /* We assume extra_size < 0x100 + for the PRIMARY KEY prefix. */ + mrec += *mrec + 1; + + if (mrec > mrec_end) { + return(NULL); + } + + /* Get offsets for PRIMARY KEY, + DB_TRX_ID, DB_ROLL_PTR. */ + /* The old_pk prefix was converted by + rec_convert_dtuple_to_temp() using new_index. */ + ut_ad(!new_index->is_instant()); + rec_offs_set_n_fields(offsets, + new_index->first_user_field()); + rec_init_offsets_temp(mrec, new_index, offsets); + + next_mrec = mrec + rec_offs_data_size(offsets); + if (next_mrec + 2 > mrec_end) { + return(NULL); + } + + /* Copy the PRIMARY KEY fields and + DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */ + old_pk = dtuple_create(heap, + new_index->first_user_field()); + dict_index_copy_types(old_pk, new_index, + old_pk->n_fields); + + for (ulint i = 0; i < new_index->first_user_field(); + i++) { + const void* field; + ulint len; + dfield_t* dfield; + + ut_ad(!rec_offs_nth_extern(offsets, i)); + + field = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + + dfield = dtuple_get_nth_field(old_pk, i); + dfield_set_data(dfield, field, len); + } + + mrec = next_mrec; + + /* Fetch the new value of the row as it was + in the old table definition. */ + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + ut_ad(extra_size || !is_instant); + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets, + log->n_core_fields, + log->non_core_fields, + is_instant + ? static_cast<rec_comp_status_t>( + *(mrec - extra_size)) + : REC_STATUS_ORDINARY); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } + } + + ut_ad(next_mrec <= mrec_end); + log->head.total += ulint(next_mrec - mrec_start); + dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq); + + *error = row_log_table_apply_update( + thr, new_trx_id_col, + mrec, offsets, offsets_heap, heap, dup, old_pk); + break; + } + + ut_ad(log->head.total <= log->tail.total); + mem_heap_empty(offsets_heap); + mem_heap_empty(heap); + return(next_mrec); +} + +#ifdef HAVE_PSI_STAGE_INTERFACE +/** Estimate how much an ALTER TABLE progress should be incremented per +one block of log applied. +For the other phases of ALTER TABLE we increment the progress with 1 per +page processed. +@return amount of abstract units to add to work_completed when one block +of log is applied. +*/ +inline +ulint +row_log_progress_inc_per_block() +{ + /* We must increment the progress once per page (as in + srv_page_size, default = innodb_page_size=16KiB). + One block here is srv_sort_buf_size (usually 1MiB). */ + const ulint pages_per_block = std::max<ulint>( + ulint(srv_sort_buf_size >> srv_page_size_shift), 1); + + /* Multiply by an artificial factor of 6 to even the pace with + the rest of the ALTER TABLE phases, they process page_size amount + of data faster. */ + return(pages_per_block * 6); +} + +/** Estimate how much work is to be done by the log apply phase +of an ALTER TABLE for this index. +@param[in] index index whose log to assess +@return work to be done by log-apply in abstract units +*/ +ulint +row_log_estimate_work( + const dict_index_t* index) +{ + if (index == NULL || index->online_log == NULL) { + return(0); + } + + const row_log_t* l = index->online_log; + const ulint bytes_left = + static_cast<ulint>(l->tail.total - l->head.total); + const ulint blocks_left = bytes_left / srv_sort_buf_size; + + return(blocks_left * row_log_progress_inc_per_block()); +} +#else /* HAVE_PSI_STAGE_INTERFACE */ +inline +ulint +row_log_progress_inc_per_block() +{ + return(0); +} +#endif /* HAVE_PSI_STAGE_INTERFACE */ + +/** Applies operations to a table was rebuilt. +@param[in] thr query graph +@param[in,out] dup for reporting duplicate key errors +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL, then stage->inc() will be called for each block +of log that is applied. +@return DB_SUCCESS, or error code on failure */ +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +row_log_table_apply_ops( + que_thr_t* thr, + row_merge_dup_t* dup, + ut_stage_alter_t* stage) +{ + dberr_t error; + const mrec_t* mrec = NULL; + const mrec_t* next_mrec; + const mrec_t* mrec_end = NULL; /* silence bogus warning */ + const mrec_t* next_mrec_end; + mem_heap_t* heap; + mem_heap_t* offsets_heap; + rec_offs* offsets; + bool has_index_lock; + dict_index_t* index = const_cast<dict_index_t*>( + dup->index); + dict_table_t* new_table = index->online_log->table; + dict_index_t* new_index = dict_table_get_first_index( + new_table); + const ulint i = 1 + REC_OFFS_HEADER_SIZE + + std::max<ulint>(index->n_fields, + new_index->first_user_field()); + const ulint new_trx_id_col = dict_col_get_clust_pos( + dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index); + trx_t* trx = thr_get_trx(thr); + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(trx->mysql_thd); + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(dict_col_get_clust_pos( + dict_table_get_sys_col(index->table, DATA_TRX_ID), index) + != ULINT_UNDEFINED); + ut_ad(new_trx_id_col > 0); + ut_ad(new_trx_id_col != ULINT_UNDEFINED); + + MEM_UNDEFINED(&mrec_end, sizeof mrec_end); + + offsets = static_cast<rec_offs*>(ut_malloc_nokey(i * sizeof *offsets)); + rec_offs_set_n_alloc(offsets, i); + rec_offs_set_n_fields(offsets, dict_index_get_n_fields(index)); + + heap = mem_heap_create(srv_page_size); + offsets_heap = mem_heap_create(srv_page_size); + has_index_lock = true; + +next_block: + ut_ad(has_index_lock); + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + ut_ad(index->online_log->head.bytes == 0); + + stage->inc(row_log_progress_inc_per_block()); + + if (trx_is_interrupted(trx)) { + goto interrupted; + } + + if (index->is_corrupted()) { + error = DB_INDEX_CORRUPT; + goto func_exit; + } + + ut_ad(dict_index_is_online_ddl(index)); + + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (UNIV_UNLIKELY(index->online_log->head.blocks + > index->online_log->tail.blocks)) { +unexpected_eof: + ib::error() << "Unexpected end of temporary file for table " + << index->table->name; +corruption: + error = DB_CORRUPTION; + goto func_exit; + } + + if (index->online_log->head.blocks + == index->online_log->tail.blocks) { + if (index->online_log->head.blocks) { +#ifdef HAVE_FTRUNCATE + /* Truncate the file in order to save space. */ + if (index->online_log->fd > 0 + && ftruncate(index->online_log->fd, 0) == -1) { + ib::error() + << "\'" << index->name + 1 + << "\' failed with error " + << errno << ":" << strerror(errno); + + goto corruption; + } +#endif /* HAVE_FTRUNCATE */ + index->online_log->head.blocks + = index->online_log->tail.blocks = 0; + } + + next_mrec = index->online_log->tail.block; + next_mrec_end = next_mrec + index->online_log->tail.bytes; + + if (next_mrec_end == next_mrec) { + /* End of log reached. */ +all_done: + ut_ad(has_index_lock); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + index->online_log->head.bytes = 0; + index->online_log->tail.bytes = 0; + error = DB_SUCCESS; + goto func_exit; + } + } else { + os_offset_t ofs; + + ofs = (os_offset_t) index->online_log->head.blocks + * srv_sort_buf_size; + + ut_ad(has_index_lock); + has_index_lock = false; + rw_lock_x_unlock(dict_index_get_lock(index)); + + log_free_check(); + + ut_ad(dict_index_is_online_ddl(index)); + + if (!row_log_block_allocate(index->online_log->head)) { + error = DB_OUT_OF_MEMORY; + goto func_exit; + } + + byte* buf = index->online_log->head.block; + + if (os_file_read_no_error_handling( + IORequestRead, index->online_log->fd, + buf, ofs, srv_sort_buf_size, 0) != DB_SUCCESS) { + ib::error() + << "Unable to read temporary file" + " for table " << index->table->name; + goto corruption; + } + + if (log_tmp_is_encrypted()) { + if (!log_tmp_block_decrypt( + buf, srv_sort_buf_size, + index->online_log->crypt_head, ofs)) { + error = DB_DECRYPTION_FAILED; + goto func_exit; + } + + srv_stats.n_rowlog_blocks_decrypted.inc(); + memcpy(buf, index->online_log->crypt_head, + srv_sort_buf_size); + } + +#ifdef POSIX_FADV_DONTNEED + /* Each block is read exactly once. Free up the file cache. */ + posix_fadvise(index->online_log->fd, + ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ + + next_mrec = index->online_log->head.block; + next_mrec_end = next_mrec + srv_sort_buf_size; + } + + /* This read is not protected by index->online_log->mutex for + performance reasons. We will eventually notice any error that + was flagged by a DML thread. */ + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (mrec) { + /* A partial record was read from the previous block. + Copy the temporary buffer full, as we do not know the + length of the record. Parse subsequent records from + the bigger buffer index->online_log->head.block + or index->online_log->tail.block. */ + + ut_ad(mrec == index->online_log->head.buf); + ut_ad(mrec_end > mrec); + ut_ad(mrec_end < (&index->online_log->head.buf)[1]); + + memcpy((mrec_t*) mrec_end, next_mrec, + ulint((&index->online_log->head.buf)[1] - mrec_end)); + mrec = row_log_table_apply_op( + thr, new_trx_id_col, + dup, &error, offsets_heap, heap, + index->online_log->head.buf, + (&index->online_log->head.buf)[1], offsets); + if (error != DB_SUCCESS) { + goto func_exit; + } else if (UNIV_UNLIKELY(mrec == NULL)) { + /* The record was not reassembled properly. */ + goto corruption; + } + /* The record was previously found out to be + truncated. Now that the parse buffer was extended, + it should proceed beyond the old end of the buffer. */ + ut_a(mrec > mrec_end); + + index->online_log->head.bytes = ulint(mrec - mrec_end); + next_mrec += index->online_log->head.bytes; + } + + ut_ad(next_mrec <= next_mrec_end); + /* The following loop must not be parsing the temporary + buffer, but head.block or tail.block. */ + + /* mrec!=NULL means that the next record starts from the + middle of the block */ + ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0)); + +#ifdef UNIV_DEBUG + if (next_mrec_end == index->online_log->head.block + + srv_sort_buf_size) { + /* If tail.bytes == 0, next_mrec_end can also be at + the end of tail.block. */ + if (index->online_log->tail.bytes == 0) { + ut_ad(next_mrec == next_mrec_end); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes == 0); + } else { + ut_ad(next_mrec == index->online_log->head.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks + > index->online_log->head.blocks); + } + } else if (next_mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes) { + ut_ad(next_mrec == index->online_log->tail.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes + <= index->online_log->tail.bytes); + } else { + ut_error; + } +#endif /* UNIV_DEBUG */ + + mrec_end = next_mrec_end; + + while (!trx_is_interrupted(trx)) { + mrec = next_mrec; + ut_ad(mrec <= mrec_end); + + if (mrec == mrec_end) { + /* We are at the end of the log. + Mark the replay all_done. */ + if (has_index_lock) { + goto all_done; + } + } + + if (!has_index_lock) { + /* We are applying operations from a different + block than the one that is being written to. + We do not hold index->lock in order to + allow other threads to concurrently buffer + modifications. */ + ut_ad(mrec >= index->online_log->head.block); + ut_ad(mrec_end == index->online_log->head.block + + srv_sort_buf_size); + ut_ad(index->online_log->head.bytes + < srv_sort_buf_size); + + /* Take the opportunity to do a redo log + checkpoint if needed. */ + log_free_check(); + } else { + /* We are applying operations from the last block. + Do not allow other threads to buffer anything, + so that we can finally catch up and synchronize. */ + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(mrec >= index->online_log->tail.block); + } + + /* This read is not protected by index->online_log->mutex + for performance reasons. We will eventually notice any + error that was flagged by a DML thread. */ + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + next_mrec = row_log_table_apply_op( + thr, new_trx_id_col, + dup, &error, offsets_heap, heap, + mrec, mrec_end, offsets); + + if (error != DB_SUCCESS) { + goto func_exit; + } else if (next_mrec == next_mrec_end) { + /* The record happened to end on a block boundary. + Do we have more blocks left? */ + if (has_index_lock) { + /* The index will be locked while + applying the last block. */ + goto all_done; + } + + mrec = NULL; +process_next_block: + rw_lock_x_lock(dict_index_get_lock(index)); + has_index_lock = true; + + index->online_log->head.bytes = 0; + index->online_log->head.blocks++; + goto next_block; + } else if (next_mrec != NULL) { + ut_ad(next_mrec < next_mrec_end); + index->online_log->head.bytes + += ulint(next_mrec - mrec); + } else if (has_index_lock) { + /* When mrec is within tail.block, it should + be a complete record, because we are holding + index->lock and thus excluding the writer. */ + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(0); + goto unexpected_eof; + } else { + memcpy(index->online_log->head.buf, mrec, + ulint(mrec_end - mrec)); + mrec_end += ulint(index->online_log->head.buf - mrec); + mrec = index->online_log->head.buf; + goto process_next_block; + } + } + +interrupted: + error = DB_INTERRUPTED; +func_exit: + if (!has_index_lock) { + rw_lock_x_lock(dict_index_get_lock(index)); + } + + mem_heap_free(offsets_heap); + mem_heap_free(heap); + row_log_block_free(index->online_log->head); + ut_free(offsets); + return(error); +} + +/** Apply the row_log_table log to a table upon completing rebuild. +@param[in] thr query graph +@param[in] old_table old table +@param[in,out] table MySQL table (for reporting duplicates) +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->begin_phase_log_table() will be called initially and then +stage->inc() will be called for each block of log that is applied. +@param[in] new_table Altered table +@return DB_SUCCESS, or error code on failure */ +dberr_t +row_log_table_apply( + que_thr_t* thr, + dict_table_t* old_table, + struct TABLE* table, + ut_stage_alter_t* stage, + dict_table_t* new_table) +{ + dberr_t error; + dict_index_t* clust_index; + + thr_get_trx(thr)->error_key_num = 0; + DBUG_EXECUTE_IF("innodb_trx_duplicates", + thr_get_trx(thr)->duplicates = TRX_DUP_REPLACE;); + + stage->begin_phase_log_table(); + + ut_ad(!rw_lock_own(&dict_sys.latch, RW_LOCK_S)); + clust_index = dict_table_get_first_index(old_table); + + if (clust_index->online_log->n_rows == 0) { + clust_index->online_log->n_rows = new_table->stat_n_rows; + } + + rw_lock_x_lock(dict_index_get_lock(clust_index)); + + if (!clust_index->online_log) { + ut_ad(dict_index_get_online_status(clust_index) + == ONLINE_INDEX_COMPLETE); + /* This function should not be called unless + rebuilding a table online. Build in some fault + tolerance. */ + ut_ad(0); + error = DB_ERROR; + } else { + row_merge_dup_t dup = { + clust_index, table, + clust_index->online_log->col_map, 0 + }; + + error = row_log_table_apply_ops(thr, &dup, stage); + + ut_ad(error != DB_SUCCESS + || clust_index->online_log->head.total + == clust_index->online_log->tail.total); + } + + rw_lock_x_unlock(dict_index_get_lock(clust_index)); + DBUG_EXECUTE_IF("innodb_trx_duplicates", + thr_get_trx(thr)->duplicates = 0;); + + return(error); +} + +/******************************************************//** +Allocate the row log for an index and flag the index +for online creation. +@retval true if success, false if not */ +bool +row_log_allocate( +/*=============*/ + const trx_t* trx, /*!< in: the ALTER TABLE transaction */ + dict_index_t* index, /*!< in/out: index */ + dict_table_t* table, /*!< in/out: new table being rebuilt, + or NULL when creating a secondary index */ + bool same_pk,/*!< in: whether the definition of the + PRIMARY KEY has remained the same */ + const dtuple_t* defaults, + /*!< in: default values of + added, changed columns, or NULL */ + const ulint* col_map,/*!< in: mapping of old column + numbers to new ones, or NULL if !table */ + const char* path, /*!< in: where to create temporary file */ + const TABLE* old_table, /*!< in: table definition before alter */ + const bool allow_not_null) /*!< in: allow null to not-null + conversion */ +{ + row_log_t* log; + DBUG_ENTER("row_log_allocate"); + + ut_ad(!dict_index_is_online_ddl(index)); + ut_ad(dict_index_is_clust(index) == !!table); + ut_ad(!table || index->table != table); + ut_ad(same_pk || table); + ut_ad(!table || col_map); + ut_ad(!defaults || col_map); + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE)); + ut_ad(trx->id); + + log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log)); + + if (log == NULL) { + DBUG_RETURN(false); + } + + log->fd = OS_FILE_CLOSED; + mutex_create(LATCH_ID_INDEX_ONLINE_LOG, &log->mutex); + + log->blobs = NULL; + log->table = table; + log->same_pk = same_pk; + log->defaults = defaults; + log->col_map = col_map; + log->error = DB_SUCCESS; + log->min_trx = trx->id; + log->max_trx = 0; + log->tail.blocks = log->tail.bytes = 0; + log->tail.total = 0; + log->tail.block = log->head.block = NULL; + log->crypt_tail = log->crypt_head = NULL; + log->head.blocks = log->head.bytes = 0; + log->head.total = 0; + log->path = path; + log->n_core_fields = index->n_core_fields; + ut_ad(!table || log->is_instant(index) + == (index->n_core_fields < index->n_fields)); + log->allow_not_null = allow_not_null; + log->old_table = old_table; + log->n_rows = 0; + + if (table && index->is_instant()) { + const unsigned n = log->n_core_fields; + log->non_core_fields = UT_NEW_ARRAY_NOKEY( + dict_col_t::def_t, index->n_fields - n); + for (unsigned i = n; i < index->n_fields; i++) { + log->non_core_fields[i - n] + = index->fields[i].col->def_val; + } + } else { + log->non_core_fields = NULL; + } + + dict_index_set_online_status(index, ONLINE_INDEX_CREATION); + + if (log_tmp_is_encrypted()) { + log->crypt_head_size = log->crypt_tail_size = srv_sort_buf_size; + log->crypt_head = static_cast<byte *>( + my_large_malloc(&log->crypt_head_size, MYF(MY_WME))); + log->crypt_tail = static_cast<byte *>( + my_large_malloc(&log->crypt_tail_size, MYF(MY_WME))); + + if (!log->crypt_head || !log->crypt_tail) { + row_log_free(log); + DBUG_RETURN(false); + } + } + + index->online_log = log; + /* While we might be holding an exclusive data dictionary lock + here, in row_log_abort_sec() we will not always be holding it. Use + atomic operations in both cases. */ + MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX); + + DBUG_RETURN(true); +} + +/******************************************************//** +Free the row log for an index that was being created online. */ +void +row_log_free( +/*=========*/ + row_log_t* log) /*!< in,own: row log */ +{ + MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX); + + UT_DELETE(log->blobs); + UT_DELETE_ARRAY(log->non_core_fields); + row_log_block_free(log->tail); + row_log_block_free(log->head); + row_merge_file_destroy_low(log->fd); + + if (log->crypt_head) { + my_large_free(log->crypt_head, log->crypt_head_size); + } + + if (log->crypt_tail) { + my_large_free(log->crypt_tail, log->crypt_tail_size); + } + + mutex_free(&log->mutex); + ut_free(log); +} + +/******************************************************//** +Get the latest transaction ID that has invoked row_log_online_op() +during online creation. +@return latest transaction ID, or 0 if nothing was logged */ +trx_id_t +row_log_get_max_trx( +/*================*/ + dict_index_t* index) /*!< in: index, must be locked */ +{ + ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION); + + ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_S) + && mutex_own(&index->online_log->mutex)) + || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + + return(index->online_log->max_trx); +} + +/******************************************************//** +Applies an operation to a secondary index that was being created. */ +static MY_ATTRIBUTE((nonnull)) +void +row_log_apply_op_low( +/*=================*/ + dict_index_t* index, /*!< in/out: index */ + row_merge_dup_t*dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap for + allocating offsets; can be emptied */ + bool has_index_lock, /*!< in: true if holding index->lock + in exclusive mode */ + enum row_op op, /*!< in: operation being applied */ + trx_id_t trx_id, /*!< in: transaction identifier */ + const dtuple_t* entry) /*!< in: row */ +{ + mtr_t mtr; + btr_cur_t cursor; + rec_offs* offsets = NULL; + + ut_ad(!dict_index_is_clust(index)); + + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X) + == has_index_lock); + + ut_ad(!index->is_corrupted()); + ut_ad(trx_id != 0 || op == ROW_OP_DELETE); + + DBUG_LOG("ib_create_index", + (op == ROW_OP_INSERT ? "insert " : "delete ") + << (has_index_lock ? "locked index " : "unlocked index ") + << index->id << ',' << ib::hex(trx_id) << ": " + << rec_printer(entry).str()); + + mtr_start(&mtr); + index->set_modified(mtr); + + /* We perform the pessimistic variant of the operations if we + already hold index->lock exclusively. First, search the + record. The operation may already have been performed, + depending on when the row in the clustered index was + scanned. */ + btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, + has_index_lock + ? BTR_MODIFY_TREE + : BTR_MODIFY_LEAF, + &cursor, 0, __FILE__, __LINE__, + &mtr); + + ut_ad(dict_index_get_n_unique(index) > 0); + /* This test is somewhat similar to row_ins_must_modify_rec(), + but not identical for unique secondary indexes. */ + if (cursor.low_match >= dict_index_get_n_unique(index) + && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) { + /* We have a matching record. */ + bool exists = (cursor.low_match + == dict_index_get_n_fields(index)); +#ifdef UNIV_DEBUG + rec_t* rec = btr_cur_get_rec(&cursor); + ut_ad(page_rec_is_user_rec(rec)); + ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec))); +#endif /* UNIV_DEBUG */ + + ut_ad(exists || dict_index_is_unique(index)); + + switch (op) { + case ROW_OP_DELETE: + if (!exists) { + /* The existing record matches the + unique secondary index key, but the + PRIMARY KEY columns differ. So, this + exact record does not exist. For + example, we could detect a duplicate + key error in some old index before + logging an ROW_OP_INSERT for our + index. This ROW_OP_DELETE could have + been logged for rolling back + TRX_UNDO_INSERT_REC. */ + goto func_exit; + } + + if (btr_cur_optimistic_delete( + &cursor, BTR_CREATE_FLAG, &mtr)) { + *error = DB_SUCCESS; + break; + } + + if (!has_index_lock) { + /* This needs a pessimistic operation. + Lock the index tree exclusively. */ + mtr_commit(&mtr); + mtr_start(&mtr); + index->set_modified(mtr); + btr_cur_search_to_nth_level( + index, 0, entry, PAGE_CUR_LE, + BTR_MODIFY_TREE, &cursor, 0, + __FILE__, __LINE__, &mtr); + + /* No other thread than the current one + is allowed to modify the index tree. + Thus, the record should still exist. */ + ut_ad(cursor.low_match + >= dict_index_get_n_fields(index)); + ut_ad(page_rec_is_user_rec( + btr_cur_get_rec(&cursor))); + } + + /* As there are no externally stored fields in + a secondary index record, the parameter + rollback=false will be ignored. */ + + btr_cur_pessimistic_delete( + error, FALSE, &cursor, + BTR_CREATE_FLAG, false, &mtr); + break; + case ROW_OP_INSERT: + if (exists) { + /* The record already exists. There + is nothing to be inserted. + This could happen when processing + TRX_UNDO_DEL_MARK_REC in statement + rollback: + + UPDATE of PRIMARY KEY can lead to + statement rollback if the updated + value of the PRIMARY KEY already + exists. In this case, the UPDATE would + be mapped to DELETE;INSERT, and we + only wrote undo log for the DELETE + part. The duplicate key error would be + triggered before logging the INSERT + part. + + Theoretically, we could also get a + similar situation when a DELETE operation + is blocked by a FOREIGN KEY constraint. */ + goto func_exit; + } + + if (dtuple_contains_null(entry)) { + /* The UNIQUE KEY columns match, but + there is a NULL value in the key, and + NULL!=NULL. */ + goto insert_the_rec; + } + + goto duplicate; + } + } else { + switch (op) { + rec_t* rec; + big_rec_t* big_rec; + case ROW_OP_DELETE: + /* The record does not exist. For example, we + could detect a duplicate key error in some old + index before logging an ROW_OP_INSERT for our + index. This ROW_OP_DELETE could be logged for + rolling back TRX_UNDO_INSERT_REC. */ + goto func_exit; + case ROW_OP_INSERT: + if (dict_index_is_unique(index) + && (cursor.up_match + >= dict_index_get_n_unique(index) + || cursor.low_match + >= dict_index_get_n_unique(index)) + && (!index->n_nullable + || !dtuple_contains_null(entry))) { +duplicate: + /* Duplicate key */ + ut_ad(dict_index_is_unique(index)); + row_merge_dup_report(dup, entry->fields); + *error = DB_DUPLICATE_KEY; + goto func_exit; + } +insert_the_rec: + /* Insert the record. As we are inserting into + a secondary index, there cannot be externally + stored columns (!big_rec). */ + *error = btr_cur_optimistic_insert( + BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_CREATE_FLAG, + &cursor, &offsets, &offsets_heap, + const_cast<dtuple_t*>(entry), + &rec, &big_rec, 0, NULL, &mtr); + ut_ad(!big_rec); + if (*error != DB_FAIL) { + break; + } + + if (!has_index_lock) { + /* This needs a pessimistic operation. + Lock the index tree exclusively. */ + mtr_commit(&mtr); + mtr_start(&mtr); + index->set_modified(mtr); + btr_cur_search_to_nth_level( + index, 0, entry, PAGE_CUR_LE, + BTR_MODIFY_TREE, &cursor, 0, + __FILE__, __LINE__, &mtr); + } + + /* We already determined that the + record did not exist. No other thread + than the current one is allowed to + modify the index tree. Thus, the + record should still not exist. */ + + *error = btr_cur_pessimistic_insert( + BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_CREATE_FLAG, + &cursor, &offsets, &offsets_heap, + const_cast<dtuple_t*>(entry), + &rec, &big_rec, + 0, NULL, &mtr); + ut_ad(!big_rec); + break; + } + mem_heap_empty(offsets_heap); + } + + if (*error == DB_SUCCESS && trx_id) { + page_update_max_trx_id(btr_cur_get_block(&cursor), + btr_cur_get_page_zip(&cursor), + trx_id, &mtr); + } + +func_exit: + mtr_commit(&mtr); +} + +/******************************************************//** +Applies an operation to a secondary index that was being created. +@return NULL on failure (mrec corruption) or when out of data; +pointer to next record on success */ +static MY_ATTRIBUTE((nonnull, warn_unused_result)) +const mrec_t* +row_log_apply_op( +/*=============*/ + dict_index_t* index, /*!< in/out: index */ + row_merge_dup_t*dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap for + allocating offsets; can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap for + allocating data tuples */ + bool has_index_lock, /*!< in: true if holding index->lock + in exclusive mode */ + const mrec_t* mrec, /*!< in: merge record */ + const mrec_t* mrec_end, /*!< in: end of buffer */ + rec_offs* offsets) /*!< in/out: work area for + rec_init_offsets_temp() */ + +{ + enum row_op op; + ulint extra_size; + ulint data_size; + dtuple_t* entry; + trx_id_t trx_id; + + /* Online index creation is only used for secondary indexes. */ + ut_ad(!dict_index_is_clust(index)); + + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X) + == has_index_lock); + + if (index->is_corrupted()) { + *error = DB_INDEX_CORRUPT; + return(NULL); + } + + *error = DB_SUCCESS; + + if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) { + return(NULL); + } + + switch (*mrec) { + case ROW_OP_INSERT: + if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) { + return(NULL); + } + + op = static_cast<enum row_op>(*mrec++); + trx_id = trx_read_trx_id(mrec); + mrec += DATA_TRX_ID_LEN; + break; + case ROW_OP_DELETE: + op = static_cast<enum row_op>(*mrec++); + trx_id = 0; + break; + default: +corrupted: + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + } + + extra_size = *mrec++; + + ut_ad(mrec < mrec_end); + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + if (mrec > mrec_end) { + return(NULL); + } + + rec_init_offsets_temp(mrec, index, offsets); + + if (rec_offs_any_extern(offsets)) { + /* There should never be any externally stored fields + in a secondary index, which is what online index + creation is used for. Therefore, the log file must be + corrupted. */ + goto corrupted; + } + + data_size = rec_offs_data_size(offsets); + + mrec += data_size; + + if (mrec > mrec_end) { + return(NULL); + } + + entry = row_rec_to_index_entry_low( + mrec - data_size, index, offsets, heap); + /* Online index creation is only implemented for secondary + indexes, which never contain off-page columns. */ + ut_ad(dtuple_get_n_ext(entry) == 0); + + row_log_apply_op_low(index, dup, error, offsets_heap, + has_index_lock, op, trx_id, entry); + return(mrec); +} + +/** Applies operations to a secondary index that was being created. +@param[in] trx transaction (for checking if the operation was +interrupted) +@param[in,out] index index +@param[in,out] dup for reporting duplicate key errors +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL, then stage->inc() will be called for each block +of log that is applied. +@return DB_SUCCESS, or error code on failure */ +static +dberr_t +row_log_apply_ops( + const trx_t* trx, + dict_index_t* index, + row_merge_dup_t* dup, + ut_stage_alter_t* stage) +{ + dberr_t error; + const mrec_t* mrec = NULL; + const mrec_t* next_mrec; + const mrec_t* mrec_end= NULL; /* silence bogus warning */ + const mrec_t* next_mrec_end; + mem_heap_t* offsets_heap; + mem_heap_t* heap; + rec_offs* offsets; + bool has_index_lock; + const ulint i = 1 + REC_OFFS_HEADER_SIZE + + dict_index_get_n_fields(index); + + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(!index->is_committed()); + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + ut_ad(index->online_log); + + MEM_UNDEFINED(&mrec_end, sizeof mrec_end); + + offsets = static_cast<rec_offs*>(ut_malloc_nokey(i * sizeof *offsets)); + rec_offs_set_n_alloc(offsets, i); + rec_offs_set_n_fields(offsets, dict_index_get_n_fields(index)); + + offsets_heap = mem_heap_create(srv_page_size); + heap = mem_heap_create(srv_page_size); + has_index_lock = true; + +next_block: + ut_ad(has_index_lock); + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + ut_ad(index->online_log->head.bytes == 0); + + stage->inc(row_log_progress_inc_per_block()); + + if (trx_is_interrupted(trx)) { + goto interrupted; + } + + error = index->online_log->error; + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (index->is_corrupted()) { + error = DB_INDEX_CORRUPT; + goto func_exit; + } + + if (UNIV_UNLIKELY(index->online_log->head.blocks + > index->online_log->tail.blocks)) { +unexpected_eof: + ib::error() << "Unexpected end of temporary file for index " + << index->name; +corruption: + error = DB_CORRUPTION; + goto func_exit; + } + + if (index->online_log->head.blocks + == index->online_log->tail.blocks) { + if (index->online_log->head.blocks) { +#ifdef HAVE_FTRUNCATE + /* Truncate the file in order to save space. */ + if (index->online_log->fd > 0 + && ftruncate(index->online_log->fd, 0) == -1) { + ib::error() + << "\'" << index->name + 1 + << "\' failed with error " + << errno << ":" << strerror(errno); + + goto corruption; + } +#endif /* HAVE_FTRUNCATE */ + index->online_log->head.blocks + = index->online_log->tail.blocks = 0; + } + + next_mrec = index->online_log->tail.block; + next_mrec_end = next_mrec + index->online_log->tail.bytes; + + if (next_mrec_end == next_mrec) { + /* End of log reached. */ +all_done: + ut_ad(has_index_lock); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + error = DB_SUCCESS; + goto func_exit; + } + } else { + os_offset_t ofs = static_cast<os_offset_t>( + index->online_log->head.blocks) + * srv_sort_buf_size; + ut_ad(has_index_lock); + has_index_lock = false; + rw_lock_x_unlock(dict_index_get_lock(index)); + + log_free_check(); + + if (!row_log_block_allocate(index->online_log->head)) { + error = DB_OUT_OF_MEMORY; + goto func_exit; + } + + byte* buf = index->online_log->head.block; + + if (os_file_read_no_error_handling( + IORequestRead, index->online_log->fd, + buf, ofs, srv_sort_buf_size, 0) != DB_SUCCESS) { + ib::error() + << "Unable to read temporary file" + " for index " << index->name; + goto corruption; + } + + if (log_tmp_is_encrypted()) { + if (!log_tmp_block_decrypt( + buf, srv_sort_buf_size, + index->online_log->crypt_head, ofs)) { + error = DB_DECRYPTION_FAILED; + goto func_exit; + } + + srv_stats.n_rowlog_blocks_decrypted.inc(); + memcpy(buf, index->online_log->crypt_head, srv_sort_buf_size); + } + +#ifdef POSIX_FADV_DONTNEED + /* Each block is read exactly once. Free up the file cache. */ + posix_fadvise(index->online_log->fd, + ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ + + next_mrec = index->online_log->head.block; + next_mrec_end = next_mrec + srv_sort_buf_size; + } + + if (mrec) { + /* A partial record was read from the previous block. + Copy the temporary buffer full, as we do not know the + length of the record. Parse subsequent records from + the bigger buffer index->online_log->head.block + or index->online_log->tail.block. */ + + ut_ad(mrec == index->online_log->head.buf); + ut_ad(mrec_end > mrec); + ut_ad(mrec_end < (&index->online_log->head.buf)[1]); + + memcpy((mrec_t*) mrec_end, next_mrec, + ulint((&index->online_log->head.buf)[1] - mrec_end)); + mrec = row_log_apply_op( + index, dup, &error, offsets_heap, heap, + has_index_lock, index->online_log->head.buf, + (&index->online_log->head.buf)[1], offsets); + if (error != DB_SUCCESS) { + goto func_exit; + } else if (UNIV_UNLIKELY(mrec == NULL)) { + /* The record was not reassembled properly. */ + goto corruption; + } + /* The record was previously found out to be + truncated. Now that the parse buffer was extended, + it should proceed beyond the old end of the buffer. */ + ut_a(mrec > mrec_end); + + index->online_log->head.bytes = ulint(mrec - mrec_end); + next_mrec += index->online_log->head.bytes; + } + + ut_ad(next_mrec <= next_mrec_end); + /* The following loop must not be parsing the temporary + buffer, but head.block or tail.block. */ + + /* mrec!=NULL means that the next record starts from the + middle of the block */ + ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0)); + +#ifdef UNIV_DEBUG + if (next_mrec_end == index->online_log->head.block + + srv_sort_buf_size) { + /* If tail.bytes == 0, next_mrec_end can also be at + the end of tail.block. */ + if (index->online_log->tail.bytes == 0) { + ut_ad(next_mrec == next_mrec_end); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes == 0); + } else { + ut_ad(next_mrec == index->online_log->head.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks + > index->online_log->head.blocks); + } + } else if (next_mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes) { + ut_ad(next_mrec == index->online_log->tail.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes + <= index->online_log->tail.bytes); + } else { + ut_error; + } +#endif /* UNIV_DEBUG */ + + mrec_end = next_mrec_end; + + while (!trx_is_interrupted(trx)) { + mrec = next_mrec; + ut_ad(mrec < mrec_end); + + if (!has_index_lock) { + /* We are applying operations from a different + block than the one that is being written to. + We do not hold index->lock in order to + allow other threads to concurrently buffer + modifications. */ + ut_ad(mrec >= index->online_log->head.block); + ut_ad(mrec_end == index->online_log->head.block + + srv_sort_buf_size); + ut_ad(index->online_log->head.bytes + < srv_sort_buf_size); + + /* Take the opportunity to do a redo log + checkpoint if needed. */ + log_free_check(); + } else { + /* We are applying operations from the last block. + Do not allow other threads to buffer anything, + so that we can finally catch up and synchronize. */ + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(mrec >= index->online_log->tail.block); + } + + next_mrec = row_log_apply_op( + index, dup, &error, offsets_heap, heap, + has_index_lock, mrec, mrec_end, offsets); + + if (error != DB_SUCCESS) { + goto func_exit; + } else if (next_mrec == next_mrec_end) { + /* The record happened to end on a block boundary. + Do we have more blocks left? */ + if (has_index_lock) { + /* The index will be locked while + applying the last block. */ + goto all_done; + } + + mrec = NULL; +process_next_block: + rw_lock_x_lock(dict_index_get_lock(index)); + has_index_lock = true; + + index->online_log->head.bytes = 0; + index->online_log->head.blocks++; + goto next_block; + } else if (next_mrec != NULL) { + ut_ad(next_mrec < next_mrec_end); + index->online_log->head.bytes + += ulint(next_mrec - mrec); + } else if (has_index_lock) { + /* When mrec is within tail.block, it should + be a complete record, because we are holding + index->lock and thus excluding the writer. */ + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(0); + goto unexpected_eof; + } else { + memcpy(index->online_log->head.buf, mrec, + ulint(mrec_end - mrec)); + mrec_end += ulint(index->online_log->head.buf - mrec); + mrec = index->online_log->head.buf; + goto process_next_block; + } + } + +interrupted: + error = DB_INTERRUPTED; +func_exit: + if (!has_index_lock) { + rw_lock_x_lock(dict_index_get_lock(index)); + } + + switch (error) { + case DB_SUCCESS: + break; + case DB_INDEX_CORRUPT: + if (((os_offset_t) index->online_log->tail.blocks + 1) + * srv_sort_buf_size >= srv_online_max_size) { + /* The log file grew too big. */ + error = DB_ONLINE_LOG_TOO_BIG; + } + /* fall through */ + default: + /* We set the flag directly instead of invoking + dict_set_corrupted_index_cache_only(index) here, + because the index is not "public" yet. */ + index->type |= DICT_CORRUPT; + } + + mem_heap_free(heap); + mem_heap_free(offsets_heap); + row_log_block_free(index->online_log->head); + ut_free(offsets); + return(error); +} + +/** Apply the row log to the index upon completing index creation. +@param[in] trx transaction (for checking if the operation was +interrupted) +@param[in,out] index secondary index +@param[in,out] table MySQL table (for reporting duplicates) +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->begin_phase_log_index() will be called initially and then +stage->inc() will be called for each block of log that is applied. +@return DB_SUCCESS, or error code on failure */ +dberr_t +row_log_apply( + const trx_t* trx, + dict_index_t* index, + struct TABLE* table, + ut_stage_alter_t* stage) +{ + dberr_t error; + row_log_t* log; + row_merge_dup_t dup = { index, table, NULL, 0 }; + DBUG_ENTER("row_log_apply"); + + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(!dict_index_is_clust(index)); + + stage->begin_phase_log_index(); + + log_free_check(); + + rw_lock_x_lock(dict_index_get_lock(index)); + + if (!dict_table_is_corrupted(index->table)) { + error = row_log_apply_ops(trx, index, &dup, stage); + } else { + error = DB_SUCCESS; + } + + if (error != DB_SUCCESS) { + ut_ad(index->table->space); + /* We set the flag directly instead of invoking + dict_set_corrupted_index_cache_only(index) here, + because the index is not "public" yet. */ + index->type |= DICT_CORRUPT; + index->table->drop_aborted = TRUE; + + dict_index_set_online_status(index, ONLINE_INDEX_ABORTED); + } else { + ut_ad(dup.n_dup == 0); + dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE); + } + + log = index->online_log; + index->online_log = NULL; + rw_lock_x_unlock(dict_index_get_lock(index)); + + row_log_free(log); + + DBUG_RETURN(error); +} + +unsigned row_log_get_n_core_fields(const dict_index_t *index) +{ + ut_ad(index->online_log); + return index->online_log->n_core_fields; +} |