diff options
Diffstat (limited to '')
-rw-r--r-- | storage/innobase/row/row0merge.cc | 5406 |
1 files changed, 5406 insertions, 0 deletions
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc new file mode 100644 index 00000000..5df93fe6 --- /dev/null +++ b/storage/innobase/row/row0merge.cc @@ -0,0 +1,5406 @@ +/***************************************************************************** + +Copyright (c) 2005, 2017, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2014, 2023, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file row/row0merge.cc +New index creation routines using a merge sort + +Created 12/4/2005 Jan Lindstrom +Completed by Sunny Bains and Marko Makela +*******************************************************/ +#include <my_global.h> +#include <log.h> +#include <sql_class.h> +#include <math.h> + +#include "row0merge.h" +#include "row0ext.h" +#include "row0log.h" +#include "row0ins.h" +#include "row0row.h" +#include "row0sel.h" +#include "log0crypt.h" +#include "dict0crea.h" +#include "trx0purge.h" +#include "lock0lock.h" +#include "pars0pars.h" +#include "ut0sort.h" +#include "row0ftsort.h" +#include "row0import.h" +#include "row0vers.h" +#include "handler0alter.h" +#include "btr0bulk.h" +#ifdef BTR_CUR_ADAPT +# include "btr0sea.h" +#endif /* BTR_CUR_ADAPT */ +#include "ut0stage.h" +#include "fil0crypt.h" +#include "srv0mon.h" + +/* Ignore posix_fadvise() on those platforms where it does not exist */ +#if defined _WIN32 +# define posix_fadvise(fd, offset, len, advice) /* nothing */ +#endif /* _WIN32 */ + +/* Whether to disable file system cache */ +char srv_disable_sort_file_cache; + +/** Class that caches spatial index row tuples made from a single cluster +index page scan, and then insert into corresponding index tree */ +class spatial_index_info { +public: + /** constructor + @param index spatial index to be created */ + spatial_index_info(dict_index_t *index) : index(index) + { + ut_ad(index->is_spatial()); + } + + /** Caches an index row into index tuple vector + @param[in] row table row + @param[in] ext externally stored column prefixes, or NULL */ + void add(const dtuple_t *row, const row_ext_t *ext, mem_heap_t *heap) + { + dtuple_t *dtuple= row_build_index_entry(row, ext, index, heap); + ut_ad(dtuple); + ut_ad(dtuple->n_fields == index->n_fields); + if (ext) + { + /* Replace any references to ext, because ext will be allocated + from row_heap. */ + for (ulint i= 1; i < dtuple->n_fields; i++) + { + dfield_t &dfield= dtuple->fields[i]; + if (dfield.data >= ext->buf && + dfield.data <= &ext->buf[ext->n_ext * ext->max_len]) + dfield_dup(&dfield, heap); + } + } + m_dtuple_vec.push_back(dtuple); + } + + /** Insert spatial index rows cached in vector into spatial index + @param[in] trx_id transaction id + @param[in] pcur cluster index scanning cursor + @param[in,out] mtr_started whether scan_mtr is active + @param[in,out] heap temporary memory heap + @param[in,out] scan_mtr mini-transaction for pcur + @return DB_SUCCESS if successful, else error number */ + dberr_t insert(trx_id_t trx_id, btr_pcur_t* pcur, + bool& mtr_started, mem_heap_t* heap, mtr_t* scan_mtr) + { + big_rec_t* big_rec; + rec_t* rec; + btr_cur_t ins_cur; + mtr_t mtr; + rtr_info_t rtr_info; + rec_offs* ins_offsets = NULL; + dberr_t error = DB_SUCCESS; + dtuple_t* dtuple; + const ulint flag = BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG; + + ut_ad(mtr_started == scan_mtr->is_active()); + + DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush", + log_sys.set_check_flush_or_checkpoint();); + + for (idx_tuple_vec::iterator it = m_dtuple_vec.begin(); + it != m_dtuple_vec.end(); + ++it) { + dtuple = *it; + ut_ad(dtuple); + + if (log_sys.check_flush_or_checkpoint()) { + if (mtr_started) { + if (!btr_pcur_move_to_prev_on_page(pcur)) { + error = DB_CORRUPTION; + break; + } + btr_pcur_store_position(pcur, scan_mtr); + scan_mtr->commit(); + mtr_started = false; + } + + log_free_check(); + } + + mtr.start(); + index->set_modified(mtr); + + ins_cur.page_cur.index = index; + rtr_init_rtr_info(&rtr_info, false, &ins_cur, index, + false); + rtr_info_update_btr(&ins_cur, &rtr_info); + + error = rtr_insert_leaf(&ins_cur, dtuple, + BTR_MODIFY_LEAF, &mtr); + + /* It need to update MBR in parent entry, + so change search mode to BTR_MODIFY_TREE */ + if (error == DB_SUCCESS && rtr_info.mbr_adj) { + mtr.commit(); + rtr_clean_rtr_info(&rtr_info, true); + rtr_init_rtr_info(&rtr_info, false, &ins_cur, + index, false); + rtr_info_update_btr(&ins_cur, &rtr_info); + mtr.start(); + index->set_modified(mtr); + error = rtr_insert_leaf(&ins_cur, dtuple, + BTR_MODIFY_TREE, &mtr); + } + + if (error == DB_SUCCESS) { + error = btr_cur_optimistic_insert( + flag, &ins_cur, &ins_offsets, + &heap, dtuple, &rec, &big_rec, + 0, NULL, &mtr); + } + + ut_ad(!big_rec); + + if (error == DB_FAIL) { + mtr.commit(); + mtr.start(); + index->set_modified(mtr); + + rtr_clean_rtr_info(&rtr_info, true); + rtr_init_rtr_info(&rtr_info, false, + &ins_cur, index, false); + + rtr_info_update_btr(&ins_cur, &rtr_info); + error = rtr_insert_leaf(&ins_cur, dtuple, + BTR_MODIFY_TREE, &mtr); + + if (error == DB_SUCCESS) { + error = btr_cur_pessimistic_insert( + flag, &ins_cur, &ins_offsets, + &heap, dtuple, &rec, + &big_rec, 0, NULL, &mtr); + } + } + + ut_ad(!big_rec); + + DBUG_EXECUTE_IF( + "row_merge_ins_spatial_fail", + error = DB_FAIL; + ); + + if (error == DB_SUCCESS) { + if (rtr_info.mbr_adj) { + error = rtr_ins_enlarge_mbr( + &ins_cur, &mtr); + } + + if (error == DB_SUCCESS) { + page_update_max_trx_id( + btr_cur_get_block(&ins_cur), + btr_cur_get_page_zip(&ins_cur), + trx_id, &mtr); + } + } + + mtr.commit(); + + rtr_clean_rtr_info(&rtr_info, true); + } + + m_dtuple_vec.clear(); + + return(error); + } + +private: + /** Cache index rows made from a cluster index scan. Usually + for rows on single cluster index page */ + typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> > idx_tuple_vec; + + /** vector used to cache index rows made from cluster index scan */ + idx_tuple_vec m_dtuple_vec; +public: + /** the index being built */ + dict_index_t*const index; +}; + +/* Maximum pending doc memory limit in bytes for a fts tokenization thread */ +#define FTS_PENDING_DOC_MEMORY_LIMIT 1000000 + +/** Insert sorted data tuples to the index. +@param[in] index index to be inserted +@param[in] old_table old table +@param[in] fd file descriptor +@param[in,out] block file buffer +@param[in] row_buf row_buf the sorted data tuples, +or NULL if fd, block will be used instead +@param[in,out] btr_bulk btr bulk instance +@param[in] table_total_rows total rows of old table +@param[in] pct_progress total progress percent untill now +@param[in] pct_cost current progress percent +@param[in] crypt_block buffer for encryption or NULL +@param[in] space space id +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially +and then stage->inc() will be called for each record that is processed. +@param[in] blob_file To read big column field data from + the given blob file. It is + applicable only for bulk insert + operation +@return DB_SUCCESS or error number */ +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +row_merge_insert_index_tuples( + dict_index_t* index, + const dict_table_t* old_table, + const pfs_os_file_t& fd, + row_merge_block_t* block, + const row_merge_buf_t* row_buf, + BtrBulk* btr_bulk, + const ib_uint64_t table_total_rows, + double pct_progress, + double pct_cost, + row_merge_block_t* crypt_block, + ulint space, + ut_stage_alter_t* stage= nullptr, + merge_file_t* blob_file= nullptr); + +/** Encode an index record. +@return size of the record */ +static MY_ATTRIBUTE((nonnull)) +ulint +row_merge_buf_encode( +/*=================*/ + byte** b, /*!< in/out: pointer to + current end of output buffer */ + const dict_index_t* index, /*!< in: index */ + const mtuple_t* entry, /*!< in: index fields + of the record to encode */ + ulint n_fields) /*!< in: number of fields + in the entry */ +{ + ulint size; + ulint extra_size; + + size = rec_get_converted_size_temp<false>( + index, entry->fields, n_fields, &extra_size); + ut_ad(size >= extra_size); + + /* Encode extra_size + 1 */ + if (extra_size + 1 < 0x80) { + *(*b)++ = (byte) (extra_size + 1); + } else { + ut_ad((extra_size + 1) < 0x8000); + *(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8)); + *(*b)++ = (byte) (extra_size + 1); + } + + rec_convert_dtuple_to_temp<false>(*b + extra_size, index, + entry->fields, n_fields); + + *b += size; + return size; +} + +static MY_ATTRIBUTE((malloc, nonnull)) +row_merge_buf_t* +row_merge_buf_create_low( + row_merge_buf_t *buf, mem_heap_t *heap, dict_index_t *index) +{ + ulint max_tuples = srv_sort_buf_size + / std::max<ulint>(1, dict_index_get_min_size(index)); + ut_ad(max_tuples > 0); + ut_ad(max_tuples <= srv_sort_buf_size); + + buf->heap = heap; + buf->index = index; + buf->max_tuples = max_tuples; + buf->tuples = static_cast<mtuple_t*>( + ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples)); + buf->tmp_tuples = buf->tuples + max_tuples; + return(buf); +} + +/******************************************************//** +Allocate a sort buffer. +@return own: sort buffer */ +row_merge_buf_t* +row_merge_buf_create( +/*=================*/ + dict_index_t* index) /*!< in: secondary index */ +{ + row_merge_buf_t* buf; + ulint buf_size; + mem_heap_t* heap; + + buf_size = (sizeof *buf); + + heap = mem_heap_create(buf_size); + + buf = static_cast<row_merge_buf_t*>( + mem_heap_zalloc(heap, buf_size)); + row_merge_buf_create_low(buf, heap, index); + + return(buf); +} + +/******************************************************//** +Empty a sort buffer. +@return sort buffer */ +row_merge_buf_t* +row_merge_buf_empty( +/*================*/ + row_merge_buf_t* buf) /*!< in,own: sort buffer */ +{ + ulint buf_size = sizeof *buf; + ulint max_tuples = buf->max_tuples; + mem_heap_t* heap = buf->heap; + dict_index_t* index = buf->index; + mtuple_t* tuples = buf->tuples; + + mem_heap_empty(heap); + + buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size)); + buf->heap = heap; + buf->index = index; + buf->max_tuples = max_tuples; + buf->tuples = tuples; + buf->tmp_tuples = buf->tuples + max_tuples; + + return(buf); +} + +/******************************************************//** +Deallocate a sort buffer. */ +void +row_merge_buf_free( +/*===============*/ + row_merge_buf_t* buf) /*!< in,own: sort buffer to be freed */ +{ + ut_free(buf->tuples); + mem_heap_free(buf->heap); +} + +/** Convert the field data from compact to redundant format. +@param[in] row_field field to copy from +@param[out] field field to copy to +@param[in] len length of the field data +@param[in] zip_size compressed BLOB page size, + zero for uncompressed BLOBs +@param[in,out] heap memory heap where to allocate data when + converting to ROW_FORMAT=REDUNDANT, or NULL + when not to invoke + row_merge_buf_redundant_convert(). */ +static +void +row_merge_buf_redundant_convert( + const dfield_t* row_field, + dfield_t* field, + ulint len, + ulint zip_size, + mem_heap_t* heap) +{ + ut_ad(field->type.mbminlen == 1); + ut_ad(field->type.mbmaxlen > 1); + + byte* buf = (byte*) mem_heap_alloc(heap, len); + ulint field_len = row_field->len; + ut_ad(field_len <= len); + + if (row_field->ext) { + const byte* field_data = static_cast<const byte*>( + dfield_get_data(row_field)); + ulint ext_len; + + ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE); + ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE, + field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE)); + + byte* data = btr_copy_externally_stored_field( + &ext_len, field_data, zip_size, field_len, heap); + + ut_ad(ext_len < len); + + memcpy(buf, data, ext_len); + field_len = ext_len; + } else { + memcpy(buf, row_field->data, field_len); + } + + memset(buf + field_len, 0x20, len - field_len); + + dfield_set_data(field, buf, len); +} + +/** Insert the tuple into bulk buffer insert operation +@param buf merge buffer for the index operation +@param table bulk insert operation for the table +@param row tuple to be inserted +@return number of rows inserted */ +static ulint row_merge_bulk_buf_add(row_merge_buf_t* buf, + const dict_table_t &table, + const dtuple_t &row) +{ + if (buf->n_tuples >= buf->max_tuples) + return 0; + + const dict_index_t *index= buf->index; + ulint n_fields= dict_index_get_n_fields(index); + mtuple_t *entry= &buf->tuples[buf->n_tuples]; + ulint data_size= 0; + ulint extra_size= UT_BITS_IN_BYTES(unsigned(index->n_nullable)); + dfield_t *field= entry->fields= static_cast<dfield_t*>( + mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields)); + const dict_field_t *ifield= dict_index_get_nth_field(index, 0); + + for (ulint i = 0; i < n_fields; i++, field++, ifield++) + { + dfield_copy(field, &row.fields[i]); + ulint len= dfield_get_len(field); + const dict_col_t* const col= ifield->col; + + if (dfield_is_null(field)) + continue; + + ulint fixed_len= ifield->fixed_len; + + /* CHAR in ROW_FORMAT=REDUNDANT is always + fixed-length, but in the temporary file it is + variable-length for variable-length character sets. */ + if (fixed_len && !index->table->not_redundant() && + col->mbminlen != col->mbmaxlen) + fixed_len= 0; + + if (fixed_len); + else if (len < 128 || (!DATA_BIG_COL(col))) + extra_size++; + else + extra_size += 2; + data_size += len; + } + + /* Add to the total size of the record in row_merge_block_t + the encoded length of extra_size and the extra bytes (extra_size). + See row_merge_buf_write() for the variable-length encoding + of extra_size. */ + data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80); + + /* Reserve bytes for the end marker of row_merge_block_t. */ + if (buf->total_size + data_size >= srv_sort_buf_size) + return 0; + + buf->total_size += data_size; + buf->n_tuples++; + + field= entry->fields; + + do + dfield_dup(field++, buf->heap); + while (--n_fields); + + return 1; +} + +/** Insert a data tuple into a sort buffer. +@param[in,out] buf sort buffer +@param[in] fts_index fts index to be created +@param[in] old_table original table +@param[in] new_table new table +@param[in,out] psort_info parallel sort info +@param[in,out] row table row +@param[in] ext cache of externally stored + column prefixes, or NULL +@param[in] history_fts row is historical in a system-versioned table + on which a FTS_DOC_ID_INDEX(FTS_DOC_ID) exists +@param[in,out] doc_id Doc ID if we are creating + FTS index +@param[in,out] conv_heap memory heap where to allocate data when + converting to ROW_FORMAT=REDUNDANT, or NULL + when not to invoke + row_merge_buf_redundant_convert() +@param[in,out] err set if error occurs +@param[in,out] v_heap heap memory to process data for virtual column +@param[in,out] my_table mysql table object +@param[in] trx transaction object +@param[in] col_collate columns whose collations changed, or nullptr +@return number of rows added, 0 if out of space */ +static +ulint +row_merge_buf_add( + row_merge_buf_t* buf, + dict_index_t* fts_index, + const dict_table_t* old_table, + const dict_table_t* new_table, + fts_psort_t* psort_info, + dtuple_t* row, + const row_ext_t* ext, + const bool history_fts, + doc_id_t* doc_id, + mem_heap_t* conv_heap, + dberr_t* err, + mem_heap_t** v_heap, + TABLE* my_table, + trx_t* trx, + const col_collations* col_collate) +{ + ulint i; + const dict_index_t* index; + mtuple_t* entry; + dfield_t* field; + const dict_field_t* ifield; + ulint n_fields; + ulint data_size; + ulint extra_size; + ulint bucket = 0; + doc_id_t write_doc_id; + ulint n_row_added = 0; + VCOL_STORAGE vcol_storage; + + DBUG_ENTER("row_merge_buf_add"); + + if (buf->n_tuples >= buf->max_tuples) { +error: + n_row_added = 0; + goto end; + } + + DBUG_EXECUTE_IF( + "ib_row_merge_buf_add_two", + if (buf->n_tuples >= 2) DBUG_RETURN(0);); + + UNIV_PREFETCH_R(row->fields); + + /* If we are building FTS index, buf->index points to + the 'fts_sort_idx', and real FTS index is stored in + fts_index */ + index = (buf->index->type & DICT_FTS) ? fts_index : buf->index; + + /* create spatial index should not come here */ + ut_ad(!dict_index_is_spatial(index)); + + n_fields = dict_index_get_n_fields(index); + + entry = &buf->tuples[buf->n_tuples]; + field = entry->fields = static_cast<dfield_t*>( + mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields)); + + data_size = 0; + extra_size = UT_BITS_IN_BYTES(unsigned(index->n_nullable)); + + ifield = dict_index_get_nth_field(index, 0); + + for (i = 0; i < n_fields; i++, field++, ifield++) { + ulint len; + ulint fixed_len; + const dfield_t* row_field; + const dict_col_t* const col = ifield->col; + const dict_v_col_t* const v_col = col->is_virtual() + ? reinterpret_cast<const dict_v_col_t*>(col) + : NULL; + + /* Process the Doc ID column */ + if (!v_col && (history_fts || *doc_id) + && col->ind == index->table->fts->doc_col) { + fts_write_doc_id((byte*) &write_doc_id, *doc_id); + + /* Note: field->data now points to a value on the + stack: &write_doc_id after dfield_set_data(). Because + there is only one doc_id per row, it shouldn't matter. + We allocate a new buffer before we leave the function + later below. */ + + dfield_set_data( + field, &write_doc_id, sizeof(write_doc_id)); + + field->type.mtype = ifield->col->mtype; + field->type.prtype = ifield->col->prtype; + field->type.mbminlen = 0; + field->type.mbmaxlen = 0; + field->type.len = ifield->col->len; + } else { + /* Use callback to get the virtual column value */ + if (v_col) { + dict_index_t* clust_index + = dict_table_get_first_index(new_table); + + if (!vcol_storage.innobase_record && + !innobase_allocate_row_for_vcol( + trx->mysql_thd, clust_index, + v_heap, &my_table, + &vcol_storage)) { + *err = DB_OUT_OF_MEMORY; + goto error; + } + + row_field = innobase_get_computed_value( + row, v_col, clust_index, + v_heap, NULL, ifield, trx->mysql_thd, + my_table, vcol_storage.innobase_record, + old_table, NULL); + + if (row_field == NULL) { + *err = DB_COMPUTE_VALUE_FAILED; + goto error; + } + dfield_copy(field, row_field); + } else { + row_field = dtuple_get_nth_field(row, + col->ind); + dfield_copy(field, row_field); + + /* Copy the column collation to the + tuple field */ + if (col_collate) { + auto it = col_collate->find(col->ind); + if (it != col_collate->end()) { + field->type + .assign(*it->second); + } + } + } + + /* Tokenize and process data for FTS */ + if (!history_fts && (index->type & DICT_FTS)) { + fts_doc_item_t* doc_item; + byte* value; + void* ptr; + const ulint max_trial_count = 10000; + ulint trial_count = 0; + + /* fetch Doc ID if it already exists + in the row, and not supplied by the + caller. Even if the value column is + NULL, we still need to get the Doc + ID so to maintain the correct max + Doc ID */ + if (*doc_id == 0) { + const dfield_t* doc_field; + doc_field = dtuple_get_nth_field( + row, + index->table->fts->doc_col); + *doc_id = (doc_id_t) mach_read_from_8( + static_cast<const byte*>( + dfield_get_data(doc_field))); + + if (*doc_id == 0) { + ib::warn() << "FTS Doc ID is" + " zero. Record" + " skipped"; + goto error; + } + } + + if (dfield_is_null(field)) { + n_row_added = 1; + continue; + } + + ptr = ut_malloc_nokey(sizeof(*doc_item) + + field->len); + + doc_item = static_cast<fts_doc_item_t*>(ptr); + value = static_cast<byte*>(ptr) + + sizeof(*doc_item); + memcpy(value, field->data, field->len); + field->data = value; + + doc_item->field = field; + doc_item->doc_id = *doc_id; + + bucket = static_cast<ulint>( + *doc_id % fts_sort_pll_degree); + + /* Add doc item to fts_doc_list */ + mysql_mutex_lock(&psort_info[bucket].mutex); + + if (psort_info[bucket].error == DB_SUCCESS) { + UT_LIST_ADD_LAST( + psort_info[bucket].fts_doc_list, + doc_item); + psort_info[bucket].memory_used += + sizeof(*doc_item) + field->len; + } else { + ut_free(doc_item); + } + + mysql_mutex_unlock(&psort_info[bucket].mutex); + + /* Sleep when memory used exceeds limit*/ + while (psort_info[bucket].memory_used + > FTS_PENDING_DOC_MEMORY_LIMIT + && trial_count++ < max_trial_count) { + std::this_thread::sleep_for( + std::chrono::milliseconds(1)); + } + + n_row_added = 1; + continue; + } + + /* innobase_get_computed_value() sets the + length of the virtual column field. */ + if (v_col == NULL + && field->len != UNIV_SQL_NULL + && col->mtype == DATA_MYSQL + && col->len != field->len) { + if (conv_heap != NULL) { + row_merge_buf_redundant_convert( + row_field, field, col->len, + old_table->space->zip_size(), + conv_heap); + } + } + } + + len = dfield_get_len(field); + + if (dfield_is_null(field)) { + ut_ad(!(col->prtype & DATA_NOT_NULL)); + continue; + } else if (!ext) { + } else if (dict_index_is_clust(index)) { + /* Flag externally stored fields. */ + const byte* buf = row_ext_lookup(ext, col->ind, + &len); + if (UNIV_LIKELY_NULL(buf)) { + ut_a(buf != field_ref_zero); + if (i < dict_index_get_n_unique(index)) { + dfield_set_data(field, buf, len); + } else { + dfield_set_ext(field); + len = dfield_get_len(field); + } + } + } else if (!v_col) { + /* Only non-virtual column are stored externally */ + const byte* buf = row_ext_lookup(ext, col->ind, + &len); + if (UNIV_LIKELY_NULL(buf)) { + ut_a(buf != field_ref_zero); + dfield_set_data(field, buf, len); + } + } + + /* If a column prefix index, take only the prefix */ + + if (ifield->prefix_len) { + len = dtype_get_at_most_n_mbchars( + col->prtype, + col->mbminlen, col->mbmaxlen, + ifield->prefix_len, + len, + static_cast<char*>(dfield_get_data(field))); + dfield_set_len(field, len); + } + + ut_ad(len <= col->len + || DATA_LARGE_MTYPE(col->mtype)); + + fixed_len = ifield->fixed_len; + if (fixed_len && !dict_table_is_comp(index->table) + && col->mbminlen != col->mbmaxlen) { + /* CHAR in ROW_FORMAT=REDUNDANT is always + fixed-length, but in the temporary file it is + variable-length for variable-length character + sets. */ + fixed_len = 0; + } + + if (fixed_len) { +#ifdef UNIV_DEBUG + /* len should be between size calcualted base on + mbmaxlen and mbminlen */ + ut_ad(len <= fixed_len); + ut_ad(!col->mbmaxlen || len >= col->mbminlen + * (fixed_len / col->mbmaxlen)); + + ut_ad(!dfield_is_ext(field)); +#endif /* UNIV_DEBUG */ + } else if (dfield_is_ext(field)) { + extra_size += 2; + } else if (len < 128 + || (!DATA_BIG_COL(col))) { + extra_size++; + } else { + /* For variable-length columns, we look up the + maximum length from the column itself. If this + is a prefix index column shorter than 256 bytes, + this will waste one byte. */ + extra_size += 2; + } + data_size += len; + } + + /* If this is FTS index, we already populated the sort buffer, return + here */ + if (index->type & DICT_FTS) { + goto end; + } + +#ifdef UNIV_DEBUG + { + ulint size; + ulint extra; + + size = rec_get_converted_size_temp<false>( + index, entry->fields, n_fields, &extra); + + ut_ad(data_size + extra_size == size); + ut_ad(extra_size == extra); + } +#endif /* UNIV_DEBUG */ + + /* Add to the total size of the record in row_merge_block_t + the encoded length of extra_size and the extra bytes (extra_size). + See row_merge_buf_write() for the variable-length encoding + of extra_size. */ + data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80); + + /* Record size can exceed page size while converting to + redundant row format. But there is assert + ut_ad(size < srv_page_size) in rec_offs_data_size(). + It may hit the assert before attempting to insert the row. */ + if (conv_heap != NULL && data_size > srv_page_size) { + *err = DB_TOO_BIG_RECORD; + } + + ut_ad(data_size < srv_sort_buf_size); + + /* Reserve bytes for the end marker of row_merge_block_t. */ + if (buf->total_size + data_size >= srv_sort_buf_size) { + goto error; + } + + buf->total_size += data_size; + buf->n_tuples++; + n_row_added++; + + field = entry->fields; + + /* Copy the data fields. */ + + do { + dfield_dup(field++, buf->heap); + } while (--n_fields); + + if (conv_heap != NULL) { + mem_heap_empty(conv_heap); + } + +end: + if (vcol_storage.innobase_record) + innobase_free_row_for_vcol(&vcol_storage); + DBUG_RETURN(n_row_added); +} + +/*************************************************************//** +Report a duplicate key. */ +void +row_merge_dup_report( +/*=================*/ + row_merge_dup_t* dup, /*!< in/out: for reporting duplicates */ + const dfield_t* entry) /*!< in: duplicate index entry */ +{ + if (!dup->n_dup++ && dup->table) { + /* Only report the first duplicate record, + but count all duplicate records. */ + innobase_fields_to_mysql(dup->table, dup->index, entry); + } +} + +/*************************************************************//** +Compare two tuples. +@return positive, 0, negative if a is greater, equal, less, than b, +respectively */ +static MY_ATTRIBUTE((warn_unused_result)) +int +row_merge_tuple_cmp( +/*================*/ + const dict_index_t* index, /*< in: index tree */ + ulint n_uniq, /*!< in: number of unique fields */ + ulint n_field,/*!< in: number of fields */ + const mtuple_t& a, /*!< in: first tuple to be compared */ + const mtuple_t& b, /*!< in: second tuple to be compared */ + row_merge_dup_t* dup) /*!< in/out: for reporting duplicates, + NULL if non-unique index */ +{ + int cmp; + const dfield_t* af = a.fields; + const dfield_t* bf = b.fields; + ulint n = n_uniq; + const dict_field_t* f = index->fields; + + ut_ad(n_uniq > 0); + ut_ad(n_uniq <= n_field); + + /* Compare the fields of the tuples until a difference is + found or we run out of fields to compare. If !cmp at the + end, the tuples are equal. */ + do { + cmp = cmp_dfield_dfield(af++, bf++, (f++)->descending); + } while (!cmp && --n); + + if (cmp) { + return(cmp); + } + + if (dup) { + /* Report a duplicate value error if the tuples are + logically equal. NULL columns are logically inequal, + although they are equal in the sorting order. Find + out if any of the fields are NULL. */ + for (const dfield_t* df = a.fields; df != af; df++) { + if (dfield_is_null(df)) { + goto no_report; + } + } + + row_merge_dup_report(dup, a.fields); + } + +no_report: + /* The n_uniq fields were equal, but we compare all fields so + that we will get the same (internal) order as in the B-tree. */ + for (n = n_field - n_uniq + 1; --n; ) { + cmp = cmp_dfield_dfield(af++, bf++, (f++)->descending); + if (cmp) { + return(cmp); + } + } + + /* This should never be reached, except in a secondary index + when creating a secondary index and a PRIMARY KEY, and there + is a duplicate in the PRIMARY KEY that has not been detected + yet. Internally, an index must never contain duplicates. */ + return(cmp); +} + +/** Wrapper for row_merge_tuple_sort() to inject some more context to +UT_SORT_FUNCTION_BODY(). +@param tuples array of tuples that being sorted +@param aux work area, same size as tuples[] +@param low lower bound of the sorting area, inclusive +@param high upper bound of the sorting area, inclusive */ +#define row_merge_tuple_sort_ctx(tuples, aux, low, high) \ + row_merge_tuple_sort(index,n_uniq,n_field,dup, tuples, aux, low, high) +/** Wrapper for row_merge_tuple_cmp() to inject some more context to +UT_SORT_FUNCTION_BODY(). +@param a first tuple to be compared +@param b second tuple to be compared +@return positive, 0, negative, if a is greater, equal, less, than b, +respectively */ +#define row_merge_tuple_cmp_ctx(a,b) \ + row_merge_tuple_cmp(index, n_uniq, n_field, a, b, dup) + +/**********************************************************************//** +Merge sort the tuple buffer in main memory. */ +static +void +row_merge_tuple_sort( +/*=================*/ + const dict_index_t* index, /*!< in: index tree */ + ulint n_uniq, /*!< in: number of unique fields */ + ulint n_field,/*!< in: number of fields */ + row_merge_dup_t* dup, /*!< in/out: reporter of duplicates + (NULL if non-unique index) */ + mtuple_t* tuples, /*!< in/out: tuples */ + mtuple_t* aux, /*!< in/out: work area */ + ulint low, /*!< in: lower bound of the + sorting area, inclusive */ + ulint high) /*!< in: upper bound of the + sorting area, exclusive */ +{ + ut_ad(n_field > 0); + ut_ad(n_uniq <= n_field); + + UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx, + tuples, aux, low, high, row_merge_tuple_cmp_ctx); +} + +/******************************************************//** +Sort a buffer. */ +void +row_merge_buf_sort( +/*===============*/ + row_merge_buf_t* buf, /*!< in/out: sort buffer */ + row_merge_dup_t* dup) /*!< in/out: reporter of duplicates + (NULL if non-unique index) */ +{ + ut_ad(!buf->index->is_spatial()); + row_merge_tuple_sort(buf->index, buf->index->n_uniq, buf->index->n_fields, + dup, buf->tuples, buf->tmp_tuples, 0, buf->n_tuples); +} + +/** Write the blob field data to temporary file and fill the offset, +length in the field data +@param field tuple field +@param blob_file file to store the blob data +@param heap heap to store the blob offset and length +@return DB_SUCCESS if successful */ +static dberr_t row_merge_write_blob_to_tmp_file( + dfield_t *field, merge_file_t *blob_file,mem_heap_t **heap) +{ + if (blob_file->fd == OS_FILE_CLOSED) + { + blob_file->fd= row_merge_file_create_low(nullptr); + if (blob_file->fd == OS_FILE_CLOSED) + return DB_OUT_OF_MEMORY; + } + uint64_t val= blob_file->offset; + uint32_t len= field->len; + dberr_t err= os_file_write( + IORequestWrite, "(bulk insert)", blob_file->fd, + field->data, blob_file->offset, len); + + if (err != DB_SUCCESS) + return err; + + byte *data= static_cast<byte*> + (mem_heap_alloc(*heap, BTR_EXTERN_FIELD_REF_SIZE)); + + /* Write zeroes for first 8 bytes */ + memset(data, 0, 8); + /* Write offset for next 8 bytes */ + mach_write_to_8(data + 8, val); + /* Write length of the blob in 4 bytes */ + mach_write_to_4(data + 16, len); + blob_file->offset+= field->len; + blob_file->n_rec++; + dfield_set_data(field, data, BTR_EXTERN_FIELD_REF_SIZE); + dfield_set_ext(field); + return err; +} + +/** This function is invoked when tuple size is greater than +innodb_sort_buffer_size. Basically it recreates the tuple +by writing the blob field to the temporary file. +@param entry index fields to be encode the blob +@param blob_file file to store the blob data +@param heap heap to store the blob offset and blob length +@return tuple which fits into sort_buffer_size */ +static dtuple_t* row_merge_buf_large_tuple(const dtuple_t &entry, + merge_file_t *blob_file, + mem_heap_t **heap) +{ + if (!*heap) + *heap= mem_heap_create(DTUPLE_EST_ALLOC(entry.n_fields)); + + dtuple_t *tuple= dtuple_copy(&entry, *heap); + for (ulint i= 0; i < tuple->n_fields; i++) + { + dfield_t *field= &tuple->fields[i]; + if (dfield_is_null(field) || field->len <= 2000) + continue; + + dberr_t err= row_merge_write_blob_to_tmp_file(field, blob_file, heap); + if (err != DB_SUCCESS) + return nullptr; + } + + return tuple; +} + + +/** Write the field data whose length is more than 2000 bytes +into blob temporary file and write offset, length into the +tuple field +@param entry index fields to be encode the blob +@param n_fields number of fields in the entry +@param heap heap to store the blob offset and blob length +@param blob_file file to store the blob data */ +static dberr_t row_merge_buf_blob(const mtuple_t *entry, ulint n_fields, + mem_heap_t **heap, merge_file_t *blob_file) +{ + + if (!*heap) + *heap= mem_heap_create(100); + + for (ulint i= 0; i < n_fields; i++) + { + dfield_t *field= &entry->fields[i]; + if (dfield_is_null(field) || field->len <= 2000) + continue; + + dberr_t err= row_merge_write_blob_to_tmp_file(field, blob_file, heap); + if (err != DB_SUCCESS) + return err; + } + + return DB_SUCCESS; +} + +/** Write a buffer to a block. +@param buf sorted buffer +@param block buffer for writing to file +@param blob_file blob file handle for doing bulk insert operation */ +dberr_t row_merge_buf_write(const row_merge_buf_t *buf, +#ifndef DBUG_OFF + const merge_file_t *of, /*!< output file */ +#endif + row_merge_block_t *block, + merge_file_t *blob_file) +{ + const dict_index_t* index = buf->index; + ulint n_fields= dict_index_get_n_fields(index); + byte* b = &block[0]; + mem_heap_t* blob_heap = nullptr; + dberr_t err = DB_SUCCESS; + + DBUG_ENTER("row_merge_buf_write"); + + for (ulint i = 0; i < buf->n_tuples; i++) { + const mtuple_t* entry = &buf->tuples[i]; + + if (blob_file) { + ut_ad(buf->index->is_primary()); + err = row_merge_buf_blob( + entry, n_fields, &blob_heap, blob_file); + if (err != DB_SUCCESS) { + goto func_exit; + } + } + + ulint rec_size= row_merge_buf_encode( + &b, index, entry, n_fields); + if (blob_file && rec_size > srv_page_size) { + err = DB_TOO_BIG_RECORD; + goto func_exit; + } + + ut_ad(b < &block[srv_sort_buf_size]); + + DBUG_LOG("ib_merge_sort", + reinterpret_cast<const void*>(b) << ',' + << of->fd << ',' << of->offset << ' ' << + i << ": " << + rec_printer(entry->fields, n_fields).str()); + } + + /* Write an "end-of-chunk" marker. */ + ut_a(b < &block[srv_sort_buf_size]); + ut_a(b == &block[0] + buf->total_size || blob_file); + *b++ = 0; +#ifdef HAVE_valgrind + /* The rest of the block is uninitialized. Initialize it + to avoid bogus warnings. */ + memset(b, 0xff, &block[srv_sort_buf_size] - b); +#endif /* HAVE_valgrind */ + DBUG_LOG("ib_merge_sort", + "write " << reinterpret_cast<const void*>(b) << ',' + << of->fd << ',' << of->offset << " EOF"); +func_exit: + if (blob_heap) { + mem_heap_free(blob_heap); + } + + DBUG_RETURN(err); +} + +/******************************************************//** +Create a memory heap and allocate space for row_merge_rec_offsets() +and mrec_buf_t[3]. +@return memory heap */ +static +mem_heap_t* +row_merge_heap_create( +/*==================*/ + const dict_index_t* index, /*!< in: record descriptor */ + mrec_buf_t** buf, /*!< out: 3 buffers */ + rec_offs** offsets1, /*!< out: offsets */ + rec_offs** offsets2) /*!< out: offsets */ +{ + ulint i = 1 + REC_OFFS_HEADER_SIZE + + dict_index_get_n_fields(index); + mem_heap_t* heap = mem_heap_create(2 * i * sizeof **offsets1 + + 3 * sizeof **buf); + + *buf = static_cast<mrec_buf_t*>( + mem_heap_alloc(heap, 3 * sizeof **buf)); + *offsets1 = static_cast<rec_offs*>( + mem_heap_alloc(heap, i * sizeof **offsets1)); + *offsets2 = static_cast<rec_offs*>( + mem_heap_alloc(heap, i * sizeof **offsets2)); + + rec_offs_set_n_alloc(*offsets1, i); + rec_offs_set_n_alloc(*offsets2, i); + rec_offs_set_n_fields(*offsets1, dict_index_get_n_fields(index)); + rec_offs_set_n_fields(*offsets2, dict_index_get_n_fields(index)); + + return(heap); +} + +/** Read a merge block from the file system. +@return whether the request was completed successfully */ +bool +row_merge_read( +/*===========*/ + const pfs_os_file_t& fd, /*!< in: file descriptor */ + ulint offset, /*!< in: offset where to read + in number of row_merge_block_t + elements */ + row_merge_block_t* buf, /*!< out: data */ + row_merge_block_t* crypt_buf, /*!< in: crypt buf or NULL */ + ulint space) /*!< in: space id */ +{ + os_offset_t ofs = ((os_offset_t) offset) * srv_sort_buf_size; + + DBUG_ENTER("row_merge_read"); + DBUG_LOG("ib_merge_sort", "fd=" << fd << " ofs=" << ofs); + DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE);); + + const dberr_t err = os_file_read( + IORequestRead, fd, buf, ofs, srv_sort_buf_size, nullptr); + + /* If encryption is enabled decrypt buffer */ + if (err == DB_SUCCESS && srv_encrypt_log) { + if (!log_tmp_block_decrypt(buf, srv_sort_buf_size, + crypt_buf, ofs)) { + DBUG_RETURN(false); + } + + srv_stats.n_merge_blocks_decrypted.inc(); + memcpy(buf, crypt_buf, srv_sort_buf_size); + } + +#ifdef POSIX_FADV_DONTNEED + /* Each block is read exactly once. Free up the file cache. */ + posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ + + DBUG_RETURN(err == DB_SUCCESS); +} + +/********************************************************************//** +Write a merge block to the file system. +@return whether the request was completed successfully +@retval false on error +@retval true on success */ +bool +row_merge_write( + const pfs_os_file_t& fd, /*!< in: file descriptor */ + ulint offset, /*!< in: offset where to write, + in number of row_merge_block_t elements */ + const void* buf, /*!< in: data */ + void* crypt_buf, /*!< in: crypt buf or NULL */ + ulint space) /*!< in: space id */ +{ + size_t buf_len = srv_sort_buf_size; + os_offset_t ofs = buf_len * (os_offset_t) offset; + void* out_buf = (void *)buf; + + DBUG_ENTER("row_merge_write"); + DBUG_LOG("ib_merge_sort", "fd=" << fd << " ofs=" << ofs); + DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE);); + + /* For encrypted tables, encrypt data before writing */ + if (srv_encrypt_log) { + if (!log_tmp_block_encrypt(static_cast<const byte*>(buf), + buf_len, + static_cast<byte*>(crypt_buf), + ofs)) { + DBUG_RETURN(false); + } + + srv_stats.n_merge_blocks_encrypted.inc(); + out_buf = crypt_buf; + } + + const bool success = DB_SUCCESS == os_file_write( + IORequestWrite, "(merge)", fd, out_buf, ofs, buf_len); + +#ifdef POSIX_FADV_DONTNEED + /* The block will be needed on the next merge pass, + but it can be evicted from the file cache meanwhile. */ + posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ + + DBUG_RETURN(success); +} + +/********************************************************************//** +Read a merge record. +@return pointer to next record, or NULL on I/O error or end of list */ +const byte* +row_merge_read_rec( +/*===============*/ + row_merge_block_t* block, /*!< in/out: file buffer */ + mrec_buf_t* buf, /*!< in/out: secondary buffer */ + const byte* b, /*!< in: pointer to record */ + const dict_index_t* index, /*!< in: index of the record */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ + ulint* foffs, /*!< in/out: file offset */ + const mrec_t** mrec, /*!< out: pointer to merge record, + or NULL on end of list + (non-NULL on I/O error) */ + rec_offs* offsets,/*!< out: offsets of mrec */ + row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ + ulint space) /*!< in: space id */ +{ + ulint extra_size; + ulint data_size; + ulint avail_size; + + ut_ad(b >= &block[0]); + ut_ad(b < &block[srv_sort_buf_size]); + + ut_ad(rec_offs_get_n_alloc(offsets) == 1 + REC_OFFS_HEADER_SIZE + + dict_index_get_n_fields(index)); + + DBUG_ENTER("row_merge_read_rec"); + + extra_size = *b++; + + if (UNIV_UNLIKELY(!extra_size)) { + /* End of list */ + *mrec = NULL; + DBUG_LOG("ib_merge_sort", + "read " << reinterpret_cast<const void*>(b) << ',' << + reinterpret_cast<const void*>(block) << ',' << + fd << ',' << *foffs << " EOF"); + DBUG_RETURN(NULL); + } + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + if (UNIV_UNLIKELY(b >= &block[srv_sort_buf_size])) { + if (!row_merge_read(fd, ++(*foffs), block, + crypt_block, + space)) { +err_exit: + /* Signal I/O error. */ + *mrec = b; + DBUG_RETURN(NULL); + } + + /* Wrap around to the beginning of the buffer. */ + b = &block[0]; + } + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *b++; + } + + /* Normalize extra_size. Above, value 0 signals "end of list". */ + extra_size--; + + /* Read the extra bytes. */ + + if (UNIV_UNLIKELY(b + extra_size >= &block[srv_sort_buf_size])) { + /* The record spans two blocks. Copy the entire record + to the auxiliary buffer and handle this as a special + case. */ + + avail_size = ulint(&block[srv_sort_buf_size] - b); + ut_ad(avail_size < sizeof *buf); + memcpy(*buf, b, avail_size); + + if (!row_merge_read(fd, ++(*foffs), block, + crypt_block, + space)) { + + goto err_exit; + } + + /* Wrap around to the beginning of the buffer. */ + b = &block[0]; + + /* Copy the record. */ + memcpy(*buf + avail_size, b, extra_size - avail_size); + b += extra_size - avail_size; + + *mrec = *buf + extra_size; + + rec_init_offsets_temp(*mrec, index, offsets); + + data_size = rec_offs_data_size(offsets); + + /* These overflows should be impossible given that + records are much smaller than either buffer, and + the record starts near the beginning of each buffer. */ + ut_a(extra_size + data_size < sizeof *buf); + ut_a(b + data_size < &block[srv_sort_buf_size]); + + /* Copy the data bytes. */ + memcpy(*buf + extra_size, b, data_size); + b += data_size; + + goto func_exit; + } + + *mrec = b + extra_size; + + rec_init_offsets_temp(*mrec, index, offsets); + + data_size = rec_offs_data_size(offsets); + ut_ad(extra_size + data_size < sizeof *buf); + + b += extra_size + data_size; + + if (UNIV_LIKELY(b < &block[srv_sort_buf_size])) { + /* The record fits entirely in the block. + This is the normal case. */ + goto func_exit; + } + + /* The record spans two blocks. Copy it to buf. */ + + b -= extra_size + data_size; + avail_size = ulint(&block[srv_sort_buf_size] - b); + memcpy(*buf, b, avail_size); + *mrec = *buf + extra_size; + + rec_init_offsets_temp(*mrec, index, offsets); + + if (!row_merge_read(fd, ++(*foffs), block, + crypt_block, + space)) { + + goto err_exit; + } + + /* Wrap around to the beginning of the buffer. */ + b = &block[0]; + + /* Copy the rest of the record. */ + memcpy(*buf + avail_size, b, extra_size + data_size - avail_size); + b += extra_size + data_size - avail_size; + +func_exit: + DBUG_LOG("ib_merge_sort", + reinterpret_cast<const void*>(b) << ',' << + reinterpret_cast<const void*>(block) + << ",fd=" << fd << ',' << *foffs << ": " + << rec_printer(*mrec, 0, offsets).str()); + DBUG_RETURN(b); +} + +/********************************************************************//** +Write a merge record. */ +static +void +row_merge_write_rec_low( +/*====================*/ + byte* b, /*!< out: buffer */ + ulint e, /*!< in: encoded extra_size */ +#ifndef DBUG_OFF + ulint size, /*!< in: total size to write */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ + ulint foffs, /*!< in: file offset */ +#endif /* !DBUG_OFF */ + const mrec_t* mrec, /*!< in: record to write */ + const rec_offs* offsets)/*!< in: offsets of mrec */ +#ifdef DBUG_OFF +# define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets) \ + row_merge_write_rec_low(b, e, mrec, offsets) +#endif /* DBUG_OFF */ +{ + DBUG_ENTER("row_merge_write_rec_low"); + +#ifndef DBUG_OFF + const byte* const end = b + size; +#endif /* DBUG_OFF */ + DBUG_ASSERT(e == rec_offs_extra_size(offsets) + 1); + + DBUG_LOG("ib_merge_sort", + reinterpret_cast<const void*>(b) << ",fd=" << fd << ',' + << foffs << ": " << rec_printer(mrec, 0, offsets).str()); + + if (e < 0x80) { + *b++ = (byte) e; + } else { + *b++ = (byte) (0x80 | (e >> 8)); + *b++ = (byte) e; + } + + memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets)); + DBUG_SLOW_ASSERT(b + rec_offs_size(offsets) == end); + DBUG_VOID_RETURN; +} + +/********************************************************************//** +Write a merge record. +@return pointer to end of block, or NULL on error */ +static +byte* +row_merge_write_rec( +/*================*/ + row_merge_block_t* block, /*!< in/out: file buffer */ + mrec_buf_t* buf, /*!< in/out: secondary buffer */ + byte* b, /*!< in: pointer to end of block */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ + ulint* foffs, /*!< in/out: file offset */ + const mrec_t* mrec, /*!< in: record to write */ + const rec_offs* offsets,/*!< in: offsets of mrec */ + row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ + ulint space) /*!< in: space id */ +{ + ulint extra_size; + ulint size; + ulint avail_size; + + ut_ad(block); + ut_ad(buf); + ut_ad(b >= &block[0]); + ut_ad(b < &block[srv_sort_buf_size]); + ut_ad(mrec); + ut_ad(foffs); + ut_ad(mrec < &block[0] || mrec > &block[srv_sort_buf_size]); + ut_ad(mrec < buf[0] || mrec > buf[1]); + + /* Normalize extra_size. Value 0 signals "end of list". */ + extra_size = rec_offs_extra_size(offsets) + 1; + + size = extra_size + (extra_size >= 0x80) + + rec_offs_data_size(offsets); + + if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) { + /* The record spans two blocks. + Copy it to the temporary buffer first. */ + avail_size = ulint(&block[srv_sort_buf_size] - b); + + row_merge_write_rec_low(buf[0], + extra_size, size, fd, *foffs, + mrec, offsets); + + /* Copy the head of the temporary buffer, write + the completed block, and copy the tail of the + record to the head of the new block. */ + memcpy(b, buf[0], avail_size); + + if (!row_merge_write(fd, (*foffs)++, block, + crypt_block, + space)) { + return(NULL); + } + + MEM_UNDEFINED(&block[0], srv_sort_buf_size); + + /* Copy the rest. */ + b = &block[0]; + memcpy(b, buf[0] + avail_size, size - avail_size); + b += size - avail_size; + } else { + row_merge_write_rec_low(b, extra_size, size, fd, *foffs, + mrec, offsets); + b += size; + } + + return(b); +} + +/********************************************************************//** +Write an end-of-list marker. +@return pointer to end of block, or NULL on error */ +static +byte* +row_merge_write_eof( +/*================*/ + row_merge_block_t* block, /*!< in/out: file buffer */ + byte* b, /*!< in: pointer to end of block */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ + ulint* foffs, /*!< in/out: file offset */ + row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ + ulint space) /*!< in: space id */ +{ + ut_ad(block); + ut_ad(b >= &block[0]); + ut_ad(b < &block[srv_sort_buf_size]); + ut_ad(foffs); + + DBUG_ENTER("row_merge_write_eof"); + DBUG_LOG("ib_merge_sort", + reinterpret_cast<const void*>(b) << ',' << + reinterpret_cast<const void*>(block) << + ",fd=" << fd << ',' << *foffs); + + *b++ = 0; + MEM_CHECK_DEFINED(&block[0], b - &block[0]); + MEM_CHECK_ADDRESSABLE(&block[0], srv_sort_buf_size); + + /* The rest of the block is uninitialized. Silence warnings. */ + MEM_MAKE_DEFINED(b, &block[srv_sort_buf_size] - b); + + if (!row_merge_write(fd, (*foffs)++, block, crypt_block, space)) { + DBUG_RETURN(NULL); + } + + MEM_UNDEFINED(&block[0], srv_sort_buf_size); + DBUG_RETURN(&block[0]); +} + +/** Create a temporary file if it has not been created already. +@param[in,out] tmpfd temporary file handle +@param[in] path location for creating temporary file +@return true on success, false on error */ +static MY_ATTRIBUTE((warn_unused_result)) +bool +row_merge_tmpfile_if_needed( + pfs_os_file_t* tmpfd, + const char* path) +{ + if (*tmpfd == OS_FILE_CLOSED) { + *tmpfd = row_merge_file_create_low(path); + if (*tmpfd != OS_FILE_CLOSED) { + MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES); + } + } + + return(*tmpfd != OS_FILE_CLOSED); +} + +/** Create a temporary file for merge sort if it was not created already. +@param[in,out] file merge file structure +@param[in] nrec number of records in the file +@param[in] path location for creating temporary file +@return true on success, false on error */ +static MY_ATTRIBUTE((warn_unused_result)) +bool +row_merge_file_create_if_needed( + merge_file_t* file, + pfs_os_file_t* tmpfd, + ulint nrec, + const char* path) +{ + ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED); + if (file->fd == OS_FILE_CLOSED && row_merge_file_create(file, path)!= OS_FILE_CLOSED) { + MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES); + if (!row_merge_tmpfile_if_needed(tmpfd, path) ) { + return(false); + } + + file->n_rec = nrec; + } + + ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED); + return(file->fd != OS_FILE_CLOSED); +} + +/** Copy the merge data tuple from another merge data tuple. +@param[in] mtuple source merge data tuple +@param[in,out] prev_mtuple destination merge data tuple +@param[in] n_unique number of unique fields exist in the mtuple +@param[in,out] heap memory heap where last_mtuple allocated */ +static +void +row_mtuple_create( + const mtuple_t* mtuple, + mtuple_t* prev_mtuple, + ulint n_unique, + mem_heap_t* heap) +{ + memcpy(prev_mtuple->fields, mtuple->fields, + n_unique * sizeof *mtuple->fields); + + dfield_t* field = prev_mtuple->fields; + + for (ulint i = 0; i < n_unique; i++) { + dfield_dup(field++, heap); + } +} + +/** Compare two merge data tuples. +@param[in] prev_mtuple merge data tuple +@param[in] current_mtuple merge data tuple +@param[in,out] dup reporter of duplicates +@retval positive, 0, negative if current_mtuple is greater, equal, less, than +last_mtuple. */ +static +int +row_mtuple_cmp( + const mtuple_t* prev_mtuple, + const mtuple_t* current_mtuple, + row_merge_dup_t* dup) +{ + ut_ad(dup->index->is_primary()); + const ulint n_uniq= dup->index->n_uniq; + return row_merge_tuple_cmp(dup->index, n_uniq, n_uniq, + *current_mtuple, *prev_mtuple, dup); +} + +/** Insert cached spatial index rows. +@param[in] trx_id transaction id +@param[in] sp_tuples cached spatial rows +@param[in] num_spatial number of spatial indexes +@param[in,out] heap temporary memory heap +@param[in,out] pcur cluster index cursor +@param[in,out] started whether mtr is active +@param[in,out] mtr mini-transaction +@return DB_SUCCESS or error number */ +static +dberr_t +row_merge_spatial_rows( + trx_id_t trx_id, + spatial_index_info** sp_tuples, + ulint num_spatial, + mem_heap_t* heap, + btr_pcur_t* pcur, + bool& started, + mtr_t* mtr) +{ + if (!sp_tuples) + return DB_SUCCESS; + + for (ulint j= 0; j < num_spatial; j++) + if (dberr_t err= sp_tuples[j]->insert(trx_id, pcur, started, heap, mtr)) + return err; + + mem_heap_empty(heap); + return DB_SUCCESS; +} + +/** Check if the geometry field is valid. +@param[in] row the row +@param[in] index spatial index +@return true if it's valid, false if it's invalid. */ +static +bool +row_geo_field_is_valid( + const dtuple_t* row, + dict_index_t* index) +{ + const dict_field_t* ind_field + = dict_index_get_nth_field(index, 0); + const dict_col_t* col + = ind_field->col; + ulint col_no + = dict_col_get_no(col); + const dfield_t* dfield + = dtuple_get_nth_field(row, col_no); + + if (dfield_is_null(dfield) + || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) { + return(false); + } + + return(true); +} + +/** Reads clustered index of the table and create temporary files +containing the index entries for the indexes to be built. +@param[in] trx transaction +@param[in,out] table MySQL table object, for reporting erroneous + records +@param[in] old_table table where rows are read from +@param[in] new_table table where indexes are created; identical to + old_table unless creating a PRIMARY KEY +@param[in] online true if creating indexes online +@param[in] index indexes to be created +@param[in] fts_sort_idx full-text index to be created, or NULL +@param[in] psort_info parallel sort info for fts_sort_idx creation, + or NULL +@param[in] files temporary files +@param[in] key_numbers MySQL key numbers to create +@param[in] n_index number of indexes to create +@param[in] defaults default values of added, changed columns, or NULL +@param[in] add_v newly added virtual columns along with indexes +@param[in] col_map mapping of old column numbers to new ones, or +NULL if old_table == new_table +@param[in] add_autoinc number of added AUTO_INCREMENT columns, or +ULINT_UNDEFINED if none is added +@param[in,out] sequence autoinc sequence +@param[in,out] block file buffer +@param[in] skip_pk_sort whether the new PRIMARY KEY will follow +existing order +@param[in,out] tmpfd temporary file handle +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and +stage->inc() will be called for each page read. +@param[in] pct_cost percent of task weight out of total alter job +@param[in,out] crypt_block crypted file buffer +@param[in] eval_table mysql table used to evaluate virtual column + value, see innobase_get_computed_value(). +@param[in] allow_not_null allow null to not-null conversion +@param[in] col_collate columns whose collations changed, or nullptr +@return DB_SUCCESS or error */ +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +row_merge_read_clustered_index( + trx_t* trx, + struct TABLE* table, + const dict_table_t* old_table, + dict_table_t* new_table, + bool online, + dict_index_t** index, + dict_index_t* fts_sort_idx, + fts_psort_t* psort_info, + merge_file_t* files, + const ulint* key_numbers, + ulint n_index, + const dtuple_t* defaults, + const dict_add_v_col_t* add_v, + const ulint* col_map, + ulint add_autoinc, + ib_sequence_t& sequence, + row_merge_block_t* block, + bool skip_pk_sort, + pfs_os_file_t* tmpfd, + ut_stage_alter_t* stage, + double pct_cost, + row_merge_block_t* crypt_block, + struct TABLE* eval_table, + bool allow_not_null, + const col_collations* col_collate) +{ + dict_index_t* clust_index; /* Clustered index */ + mem_heap_t* row_heap = NULL;/* Heap memory to create + clustered index tuples */ + row_merge_buf_t** merge_buf; /* Temporary list for records*/ + mem_heap_t* v_heap = NULL; /* Heap memory to process large + data for virtual column */ + btr_pcur_t pcur; /* Cursor on the clustered + index */ + mtr_t mtr; /* Mini transaction */ + bool mtr_started = false; + dberr_t err = DB_SUCCESS;/* Return code */ + ulint n_nonnull = 0; /* number of columns + changed to NOT NULL */ + ulint* nonnull = NULL; /* NOT NULL columns */ + dict_index_t* fts_index = NULL;/* FTS index */ + doc_id_t doc_id = 0; + doc_id_t max_doc_id = 0; + ibool add_doc_id = FALSE; + pthread_cond_t* fts_parallel_sort_cond = nullptr; + spatial_index_info** sp_tuples = nullptr; + ulint num_spatial = 0; + BtrBulk* clust_btr_bulk = NULL; + bool clust_temp_file = false; + mem_heap_t* mtuple_heap = NULL; + mtuple_t prev_mtuple; + mem_heap_t* conv_heap = NULL; + double curr_progress = 0.0; + ib_uint64_t read_rows = 0; + ib_uint64_t table_total_rows = 0; + char new_sys_trx_start[8]; + char new_sys_trx_end[8]; + byte any_autoinc_data[8] = {0}; + bool vers_update_trt = false; + + DBUG_ENTER("row_merge_read_clustered_index"); + + ut_ad((old_table == new_table) == !col_map); + ut_ad(!defaults || col_map); + ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE)); + ut_ad(trx->id); + + table_total_rows = dict_table_get_n_rows(old_table); + if(table_total_rows == 0) { + /* We don't know total row count */ + table_total_rows = 1; + } + + trx->op_info = "reading clustered index"; + +#ifdef FTS_INTERNAL_DIAG_PRINT + DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n"); +#endif + + /* Create and initialize memory for record buffers */ + + merge_buf = static_cast<row_merge_buf_t**>( + ut_malloc_nokey(n_index * sizeof *merge_buf)); + + row_merge_dup_t clust_dup = {index[0], table, col_map, 0}; + dfield_t* prev_fields = nullptr; + const ulint n_uniq = dict_index_get_n_unique(index[0]); + + ut_ad(trx->mysql_thd != NULL); + + const char* path = thd_innodb_tmpdir(trx->mysql_thd); + + ut_ad(!skip_pk_sort || dict_index_is_clust(index[0])); + /* There is no previous tuple yet. */ + prev_mtuple.fields = NULL; + + for (ulint i = 0; i < n_index; i++) { + if (index[i]->type & DICT_FTS) { + + /* We are building a FT index, make sure + we have the temporary 'fts_sort_idx' */ + ut_a(fts_sort_idx); + + fts_index = index[i]; + + merge_buf[i] = row_merge_buf_create(fts_sort_idx); + + add_doc_id = DICT_TF2_FLAG_IS_SET( + new_table, DICT_TF2_FTS_ADD_DOC_ID); + + /* If Doc ID does not exist in the table itself, + fetch the first FTS Doc ID */ + if (add_doc_id) { + fts_get_next_doc_id( + (dict_table_t*) new_table, + &doc_id); + ut_ad(doc_id > 0); + } + + row_fts_start_psort(psort_info); + fts_parallel_sort_cond = + &psort_info[0].psort_common->sort_cond; + } else { + if (dict_index_is_spatial(index[i])) { + num_spatial++; + } + + merge_buf[i] = row_merge_buf_create(index[i]); + } + } + + if (num_spatial > 0) { + ulint count = 0; + + sp_tuples = static_cast<spatial_index_info**>( + ut_malloc_nokey(num_spatial + * sizeof(*sp_tuples))); + + for (ulint i = 0; i < n_index; i++) { + if (dict_index_is_spatial(index[i])) { + sp_tuples[count] + = UT_NEW_NOKEY( + spatial_index_info(index[i])); + count++; + } + } + + ut_ad(count == num_spatial); + } + + mtr.start(); + mtr_started = true; + + /* Find the clustered index and create a persistent cursor + based on that. */ + + clust_index = dict_table_get_first_index(old_table); + const ulint old_trx_id_col = ulint(old_table->n_cols) + - (DATA_N_SYS_COLS - DATA_TRX_ID); + ut_ad(old_table->cols[old_trx_id_col].mtype == DATA_SYS); + ut_ad(old_table->cols[old_trx_id_col].prtype + == (DATA_TRX_ID | DATA_NOT_NULL)); + ut_ad(old_table->cols[old_trx_id_col + 1].mtype == DATA_SYS); + ut_ad(old_table->cols[old_trx_id_col + 1].prtype + == (DATA_ROLL_PTR | DATA_NOT_NULL)); + const ulint new_trx_id_col = col_map + ? col_map[old_trx_id_col] : old_trx_id_col; + uint64_t n_rows = 0; + + err = pcur.open_leaf(true, clust_index, BTR_SEARCH_LEAF, &mtr); + if (err != DB_SUCCESS) { +err_exit: + trx->error_key_num = 0; + goto func_exit; + } else { + rec_t* rec = page_rec_get_next(btr_pcur_get_rec(&pcur)); + if (!rec) { +corrupted_metadata: + err = DB_CORRUPTION; + goto err_exit; + } + if (rec_get_info_bits(rec, page_rec_is_comp(rec)) + & REC_INFO_MIN_REC_FLAG) { + if (!clust_index->is_instant()) { + goto corrupted_metadata; + } + if (page_rec_is_comp(rec) + && rec_get_status(rec) != REC_STATUS_INSTANT) { + goto corrupted_metadata; + } + /* Skip the metadata pseudo-record. */ + btr_pcur_get_page_cur(&pcur)->rec = rec; + } else if (clust_index->is_instant()) { + goto corrupted_metadata; + } + } + + /* Check if the table is supposed to be empty for our read view. + + If we read bulk_trx_id as an older transaction ID, it is not + incorrect to check here whether that transaction should be + visible to us. If bulk_trx_id is not visible to us, the table + must have been empty at an earlier point of time, also in our + read view. + + An INSERT would only update bulk_trx_id in + row_ins_clust_index_entry_low() if the table really was empty + (everything had been purged), when holding a leaf page latch + in the clustered index (actually, the root page is the only + leaf page in that case). + + We are holding a clustered index leaf page latch here. + That will obviously prevent any concurrent INSERT from + updating bulk_trx_id while we read it. */ + if (!online) { + } else if (trx_id_t bulk_trx_id = old_table->bulk_trx_id) { + ut_ad(trx->read_view.is_open()); + ut_ad(bulk_trx_id != trx->id); + if (!trx->read_view.changes_visible(bulk_trx_id)) { + goto func_exit; + } + } + + if (old_table != new_table) { + /* The table is being rebuilt. Identify the columns + that were flagged NOT NULL in the new table, so that + we can quickly check that the records in the old table + do not violate the added NOT NULL constraints. */ + + nonnull = static_cast<ulint*>( + ut_malloc_nokey(dict_table_get_n_cols(new_table) + * sizeof *nonnull)); + + for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) { + if (dict_table_get_nth_col(old_table, i)->prtype + & DATA_NOT_NULL) { + continue; + } + + const ulint j = col_map[i]; + + if (j == ULINT_UNDEFINED) { + /* The column was dropped. */ + continue; + } + + if (dict_table_get_nth_col(new_table, j)->prtype + & DATA_NOT_NULL) { + nonnull[n_nonnull++] = j; + } + } + + if (!n_nonnull) { + ut_free(nonnull); + nonnull = NULL; + } + } + + row_heap = mem_heap_create(sizeof(mrec_buf_t)); + + if (dict_table_is_comp(old_table) + && !dict_table_is_comp(new_table)) { + conv_heap = mem_heap_create(sizeof(mrec_buf_t)); + } + + if (skip_pk_sort) { + prev_fields = static_cast<dfield_t*>( + ut_malloc_nokey(n_uniq * sizeof *prev_fields)); + mtuple_heap = mem_heap_create(sizeof(mrec_buf_t)); + } + + mach_write_to_8(new_sys_trx_start, trx->id); + mach_write_to_8(new_sys_trx_end, TRX_ID_MAX); + + /* Scan the clustered index. */ + for (;;) { + /* Do not continue if table pages are still encrypted */ + if (!old_table->is_readable() || !new_table->is_readable()) { + err = DB_DECRYPTION_FAILED; + goto err_exit; + } + + const rec_t* rec; + trx_id_t rec_trx_id; + rec_offs* offsets; + dtuple_t* row; + row_ext_t* ext; + page_cur_t* cur = btr_pcur_get_page_cur(&pcur); + bool history_row, history_fts = false; + + stage->n_pk_recs_inc(); + + if (!page_cur_move_to_next(cur)) { +corrupted_rec: + err = DB_CORRUPTION; + goto err_exit; + } + + if (page_cur_is_after_last(cur)) { + + stage->inc(); + + if (UNIV_UNLIKELY(trx_is_interrupted(trx))) { + err = DB_INTERRUPTED; + goto err_exit; + } + + if (online && old_table != new_table) { + err = row_log_table_get_error(clust_index); + if (err != DB_SUCCESS) { + goto err_exit; + } + } + + /* Insert the cached spatial index rows. */ + err = row_merge_spatial_rows( + trx->id, sp_tuples, num_spatial, + row_heap, &pcur, mtr_started, &mtr); + + if (err != DB_SUCCESS) { + goto func_exit; + } + + mem_heap_empty(row_heap); + + if (!mtr_started) { + goto scan_next; + } + + if (clust_index->lock.is_waiting()) { + /* There are waiters on the clustered + index tree lock, likely the purge + thread. Store and restore the cursor + position, and yield so that scanning a + large table will not starve other + threads. */ + + /* Store the cursor position on the last user + record on the page. */ + if (!btr_pcur_move_to_prev_on_page(&pcur)) { + goto corrupted_index; + } + /* Leaf pages must never be empty, unless + this is the only page in the index tree. */ + if (!btr_pcur_is_on_user_rec(&pcur) + && btr_pcur_get_block(&pcur)->page.id() + .page_no() != clust_index->page) { + goto corrupted_index; + } + + btr_pcur_store_position(&pcur, &mtr); + mtr.commit(); + mtr_started = false; + + /* Give the waiters a chance to proceed. */ + std::this_thread::yield(); +scan_next: + ut_ad(!mtr_started); + ut_ad(!mtr.is_active()); + mtr.start(); + mtr_started = true; + /* Restore position on the record, or its + predecessor if the record was purged + meanwhile. */ + if (pcur.restore_position(BTR_SEARCH_LEAF, + &mtr) + == btr_pcur_t::CORRUPTED) { +corrupted_index: + err = DB_CORRUPTION; + goto func_exit; + } + /* Move to the successor of the + original record. */ + if (!btr_pcur_move_to_next_user_rec( + &pcur, &mtr)) { +end_of_index: + row = NULL; + mtr.commit(); + mtr_started = false; + mem_heap_free(row_heap); + row_heap = NULL; + ut_free(nonnull); + nonnull = NULL; + goto write_buffers; + } + } else { + uint32_t next_page_no = btr_page_get_next( + page_cur_get_page(cur)); + + if (next_page_no == FIL_NULL) { + goto end_of_index; + } + + buf_block_t* block = buf_page_get_gen( + page_id_t(old_table->space->id, + next_page_no), + old_table->space->zip_size(), + RW_S_LATCH, nullptr, BUF_GET, &mtr, + &err, false); + if (!block) { + goto err_exit; + } + + page_cur_set_before_first(block, cur); + if (!page_cur_move_to_next(cur) + || page_cur_is_after_last(cur)) { + goto corrupted_rec; + } + + const auto s = mtr.get_savepoint(); + mtr.rollback_to_savepoint(s - 2, s - 1); + } + } else { + mem_heap_empty(row_heap); + } + + rec = page_cur_get_rec(cur); + + if (online) { + offsets = rec_get_offsets(rec, clust_index, NULL, + clust_index->n_core_fields, + ULINT_UNDEFINED, &row_heap); + rec_trx_id = row_get_rec_trx_id(rec, clust_index, + offsets); + + /* Perform a REPEATABLE READ. + + When rebuilding the table online, + row_log_table_apply() must not see a newer + state of the table when applying the log. + This is mainly to prevent false duplicate key + errors, because the log will identify records + by the PRIMARY KEY, and also to prevent unsafe + BLOB access. + + When creating a secondary index online, this + table scan must not see records that have only + been inserted to the clustered index, but have + not been written to the online_log of + index[]. If we performed READ UNCOMMITTED, it + could happen that the ADD INDEX reaches + ONLINE_INDEX_COMPLETE state between the time + the DML thread has updated the clustered index + but has not yet accessed secondary index. */ + ut_ad(trx->read_view.is_open()); + ut_ad(rec_trx_id != trx->id); + + if (!trx->read_view.changes_visible(rec_trx_id)) { + if (rec_trx_id + >= trx->read_view.low_limit_id() + && rec_trx_id + >= trx_sys.get_max_trx_id()) { + goto corrupted_rec; + } + + rec_t* old_vers; + + row_vers_build_for_consistent_read( + rec, &mtr, clust_index, &offsets, + &trx->read_view, &row_heap, + row_heap, &old_vers, NULL); + + if (!old_vers) { + continue; + } + + /* The old version must necessarily be + in the "prehistory", because the + exclusive lock in + ha_innobase::prepare_inplace_alter_table() + forced the completion of any transactions + that accessed this table. */ + ut_ad(row_get_rec_trx_id(old_vers, clust_index, + offsets) < trx->id); + + rec = old_vers; + rec_trx_id = 0; + } + + if (rec_get_deleted_flag( + rec, + dict_table_is_comp(old_table))) { + /* In delete-marked records, DB_TRX_ID must + always refer to an existing undo log record. + Above, we did reset rec_trx_id = 0 + for rec = old_vers.*/ + ut_ad(rec == page_cur_get_rec(cur) + ? rec_trx_id + : !rec_trx_id); + /* This record was deleted in the latest + committed version, or it was deleted and + then reinserted-by-update before purge + kicked in. Skip it. */ + continue; + } + + ut_ad(!rec_offs_any_null_extern(rec, offsets)); + } else if (rec_get_deleted_flag( + rec, dict_table_is_comp(old_table))) { + /* In delete-marked records, DB_TRX_ID must + always refer to an existing undo log record. */ + ut_d(rec_trx_id = rec_get_trx_id(rec, clust_index)); + ut_ad(rec_trx_id); + /* This must be a purgeable delete-marked record, + and the transaction that delete-marked the record + must have been committed before this + !online ALTER TABLE transaction. */ + ut_ad(rec_trx_id < trx->id); + /* Skip delete-marked records. + + Skipping delete-marked records will make the + created indexes unuseable for transactions + whose read views were created before the index + creation completed, but an attempt to preserve + the history would make it tricky to detect + duplicate keys. */ + continue; + } else { + offsets = rec_get_offsets(rec, clust_index, NULL, + clust_index->n_core_fields, + ULINT_UNDEFINED, &row_heap); + /* This is a locking ALTER TABLE. + + If we are not rebuilding the table, the + DB_TRX_ID does not matter, as it is not being + written to any secondary indexes; see + if (old_table == new_table) below. + + If we are rebuilding the table, the + DB_TRX_ID,DB_ROLL_PTR should be reset, because + there will be no history available. */ + ut_ad(rec_get_trx_id(rec, clust_index) < trx->id); + rec_trx_id = 0; + } + + /* When !online, we are holding a lock on old_table, preventing + any inserts that could have written a record 'stub' before + writing out off-page columns. */ + ut_ad(!rec_offs_any_null_extern(rec, offsets)); + + /* Build a row based on the clustered index. */ + + row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index, + rec, offsets, new_table, + defaults, add_v, col_map, &ext, + row_heap); + ut_ad(row); + + history_row = new_table->versioned() + && dtuple_get_nth_field(row, new_table->vers_end) + ->vers_history_row(); + history_fts = history_row && new_table->fts; + + for (ulint i = 0; i < n_nonnull; i++) { + dfield_t* field = &row->fields[nonnull[i]]; + + ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL); + + if (dfield_is_null(field)) { + + Field* null_field = + table->field[nonnull[i]]; + + null_field->set_warning( + Sql_condition::WARN_LEVEL_WARN, + WARN_DATA_TRUNCATED, 1, + ulong(n_rows + 1)); + + if (!allow_not_null) { + err = DB_INVALID_NULL; + goto err_exit; + } + + const dfield_t& default_field + = defaults->fields[nonnull[i]]; + + *field = default_field; + } + } + + /* Get the next Doc ID */ + if (add_doc_id && !history_fts) { + doc_id++; + } else { + doc_id = 0; + } + + ut_ad(row->fields[new_trx_id_col].type.mtype == DATA_SYS); + ut_ad(row->fields[new_trx_id_col].type.prtype + == (DATA_TRX_ID | DATA_NOT_NULL)); + ut_ad(row->fields[new_trx_id_col].len == DATA_TRX_ID_LEN); + ut_ad(row->fields[new_trx_id_col + 1].type.mtype == DATA_SYS); + ut_ad(row->fields[new_trx_id_col + 1].type.prtype + == (DATA_ROLL_PTR | DATA_NOT_NULL)); + ut_ad(row->fields[new_trx_id_col + 1].len == DATA_ROLL_PTR_LEN); + + if (old_table == new_table) { + /* Do not bother touching DB_TRX_ID,DB_ROLL_PTR + because they are not going to be written into + secondary indexes. */ + } else if (rec_trx_id < trx->id) { + /* Reset the DB_TRX_ID,DB_ROLL_PTR of old rows + for which history is not going to be + available after the rebuild operation. + This essentially mimics row_purge_reset_trx_id(). */ + row->fields[new_trx_id_col].data + = const_cast<byte*>(reset_trx_id); + row->fields[new_trx_id_col + 1].data + = const_cast<byte*>(reset_trx_id + + DATA_TRX_ID_LEN); + } + + if (add_autoinc != ULINT_UNDEFINED) { + + ut_ad(add_autoinc + < dict_table_get_n_user_cols(new_table)); + + dfield_t* dfield = dtuple_get_nth_field(row, + add_autoinc); + + if (new_table->versioned()) { + if (history_row) { + if (dfield_get_type(dfield)->prtype & DATA_NOT_NULL) { + err = DB_UNSUPPORTED; + my_error(ER_UNSUPPORTED_EXTENSION, MYF(0), + old_table->name.m_name); + goto func_exit; + } + dfield_set_null(dfield); + } else { + // set not null + ulint len = dfield_get_type(dfield)->len; + dfield_set_data(dfield, any_autoinc_data, len); + } + } + + if (dfield_is_null(dfield)) { + goto write_buffers; + } + + const dtype_t* dtype = dfield_get_type(dfield); + byte* b = static_cast<byte*>(dfield_get_data(dfield)); + + if (sequence.eof()) { + ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR, + ER_AUTOINC_READ_FAILED, "[NULL]"); + err = DB_ERROR; + goto err_exit; + } + + ulonglong value = sequence++; + + switch (dtype_get_mtype(dtype)) { + case DATA_INT: { + ibool usign; + ulint len = dfield_get_len(dfield); + + usign = dtype_get_prtype(dtype) & DATA_UNSIGNED; + mach_write_ulonglong(b, value, len, usign); + + break; + } + + case DATA_FLOAT: + mach_float_write( + b, static_cast<float>(value)); + break; + + case DATA_DOUBLE: + mach_double_write( + b, static_cast<double>(value)); + break; + + default: + ut_ad(0); + } + } + + if (old_table->versioned()) { + if (!new_table->versioned() + && clust_index->vers_history_row(rec, offsets)) { + continue; + } + } else if (new_table->versioned()) { + dfield_t* start = + dtuple_get_nth_field(row, new_table->vers_start); + dfield_t* end = + dtuple_get_nth_field(row, new_table->vers_end); + dfield_set_data(start, new_sys_trx_start, 8); + dfield_set_data(end, new_sys_trx_end, 8); + vers_update_trt = true; + } + +write_buffers: + /* Build all entries for all the indexes to be created + in a single scan of the clustered index. */ + + n_rows++; + ulint s_idx_cnt = 0; + bool skip_sort = skip_pk_sort + && dict_index_is_clust(merge_buf[0]->index); + + for (ulint k = 0, i = 0; i < n_index; i++, skip_sort = false) { + row_merge_buf_t* buf = merge_buf[i]; + ulint rows_added = 0; + + if (dict_index_is_spatial(buf->index)) { + if (!row) { + continue; + } + + ut_ad(sp_tuples[s_idx_cnt]->index + == buf->index); + + /* If the geometry field is invalid, report + error. */ + if (!row_geo_field_is_valid(row, buf->index)) { + err = DB_CANT_CREATE_GEOMETRY_OBJECT; + break; + } + + sp_tuples[s_idx_cnt]->add(row, ext, buf->heap); + s_idx_cnt++; + + continue; + } + + ut_ad(!row + || !dict_index_is_clust(buf->index) + || trx_id_check(row->fields[new_trx_id_col].data, + trx->id)); + + merge_file_t* file = &files[k++]; + + if (UNIV_LIKELY + (row && (rows_added = row_merge_buf_add( + buf, fts_index, old_table, new_table, + psort_info, row, ext, history_fts, + &doc_id, conv_heap, &err, + &v_heap, eval_table, trx, + col_collate)))) { + + /* If we are creating FTS index, + a single row can generate more + records for tokenized word */ + file->n_rec += rows_added; + + if (err != DB_SUCCESS) { + ut_ad(err == DB_TOO_BIG_RECORD); + break; + } + + if (doc_id > max_doc_id) { + max_doc_id = doc_id; + } + + if (buf->index->type & DICT_FTS) { + /* Check if error occurs in child thread */ + for (ulint j = 0; + j < fts_sort_pll_degree; j++) { + if (psort_info[j].error + != DB_SUCCESS) { + err = psort_info[j].error; + trx->error_key_num = i; + break; + } + } + + if (err != DB_SUCCESS) { + break; + } + } + + if (skip_sort) { + ut_ad(buf->n_tuples > 0); + const mtuple_t* curr = + &buf->tuples[buf->n_tuples - 1]; + + ut_ad(i == 0); + ut_ad(dict_index_is_clust(merge_buf[0]->index)); + /* Detect duplicates by comparing the + current record with previous record. + When temp file is not used, records + should be in sorted order. */ + if (prev_mtuple.fields != NULL + && (row_mtuple_cmp( + &prev_mtuple, curr, + &clust_dup) == 0)) { + + err = DB_DUPLICATE_KEY; + trx->error_key_num + = key_numbers[0]; + goto func_exit; + } + + prev_mtuple.fields = curr->fields; + } + + continue; + } + + if (err == DB_COMPUTE_VALUE_FAILED) { + trx->error_key_num = i; + goto func_exit; + } + + if (buf->index->type & DICT_FTS) { + if (!row || !doc_id) { + continue; + } + } + + /* The buffer must be sufficiently large + to hold at least one record. It may only + be empty when we reach the end of the + clustered index. row_merge_buf_add() + must not have been called in this loop. */ + ut_ad(buf->n_tuples || row == NULL); + + /* We have enough data tuples to form a block. + Sort them and write to disk if temp file is used + or insert into index if temp file is not used. */ + ut_ad(old_table == new_table + ? !dict_index_is_clust(buf->index) + : (i == 0) == dict_index_is_clust(buf->index)); + + /* We have enough data tuples to form a block. + Sort them (if !skip_sort) and write to disk. */ + + if (buf->n_tuples) { + if (skip_sort) { + /* Temporary File is not used. + so insert sorted block to the index */ + if (row != NULL) { + /* We have to do insert the + cached spatial index rows, since + after the mtr_commit, the cluster + index page could be updated, then + the data in cached rows become + invalid. */ + err = row_merge_spatial_rows( + trx->id, sp_tuples, + num_spatial, + row_heap, + &pcur, mtr_started, + &mtr); + + if (err != DB_SUCCESS) { + goto func_exit; + } + + /* We are not at the end of + the scan yet. We must + mtr.commit() in order to be + able to call log_free_check() + in row_merge_insert_index_tuples(). + Due to mtr.commit(), the + current row will be invalid, and + we must reread it on the next + loop iteration. */ + if (mtr_started) { + if (!btr_pcur_move_to_prev_on_page(&pcur)) { + err = DB_CORRUPTION; + goto func_exit; + } + btr_pcur_store_position( + &pcur, &mtr); + + mtr.commit(); + mtr_started = false; + } + } + + mem_heap_empty(mtuple_heap); + prev_mtuple.fields = prev_fields; + + row_mtuple_create( + &buf->tuples[buf->n_tuples - 1], + &prev_mtuple, n_uniq, + mtuple_heap); + + if (clust_btr_bulk == NULL) { + clust_btr_bulk = UT_NEW_NOKEY( + BtrBulk(index[i], + trx)); + } else { + clust_btr_bulk->latch(); + } + + err = row_merge_insert_index_tuples( + index[i], old_table, + OS_FILE_CLOSED, NULL, buf, + clust_btr_bulk, + table_total_rows, + curr_progress, + pct_cost, + crypt_block, + new_table->space_id); + + if (row == NULL) { + err = clust_btr_bulk->finish( + err); + UT_DELETE(clust_btr_bulk); + clust_btr_bulk = NULL; + } else { + /* Release latches for possible + log_free_chck in spatial index + build. */ + clust_btr_bulk->release(); + } + + if (err != DB_SUCCESS) { + break; + } + + if (row != NULL) { + /* Restore the cursor on the + previous clustered index record, + and empty the buffer. The next + iteration of the outer loop will + advance the cursor and read the + next record (the one which we + had to ignore due to the buffer + overflow). */ + mtr.start(); + mtr_started = true; + if (pcur.restore_position( + BTR_SEARCH_LEAF, &mtr) + == btr_pcur_t::CORRUPTED) { + goto corrupted_index; + } + buf = row_merge_buf_empty(buf); + merge_buf[i] = buf; + /* Restart the outer loop on the + record. We did not insert it + into any index yet. */ + ut_ad(i == 0); + break; + } + } else if (dict_index_is_unique(buf->index)) { + row_merge_dup_t dup = { + buf->index, table, col_map, 0}; + + row_merge_buf_sort(buf, &dup); + + if (dup.n_dup) { + err = DB_DUPLICATE_KEY; + trx->error_key_num + = key_numbers[i]; + break; + } + } else { + row_merge_buf_sort(buf, NULL); + } + } else if (online && new_table == old_table) { + /* Note the newest transaction that + modified this index when the scan was + completed. We prevent older readers + from accessing this index, to ensure + read consistency. */ + + ut_a(row == NULL); + + dict_index_t* index = buf->index; + index->lock.x_lock(SRW_LOCK_CALL); + ut_a(dict_index_get_online_status(index) + == ONLINE_INDEX_CREATION); + + trx_id_t max_trx_id = row_log_get_max_trx( + index); + + if (max_trx_id > index->trx_id) { + index->trx_id = max_trx_id; + } + + index->lock.x_unlock(); + } + + /* Secondary index and clustered index which is + not in sorted order can use the temporary file. + Fulltext index should not use the temporary file. */ + if (!skip_sort && !(buf->index->type & DICT_FTS)) { + /* In case we can have all rows in sort buffer, + we can insert directly into the index without + temporary file if clustered index does not uses + temporary file. */ + if (row == NULL && file->fd == OS_FILE_CLOSED + && !clust_temp_file) { + DBUG_EXECUTE_IF( + "row_merge_write_failure", + err = DB_TEMP_FILE_WRITE_FAIL; + trx->error_key_num = i; + goto all_done;); + + DBUG_EXECUTE_IF( + "row_merge_tmpfile_fail", + err = DB_OUT_OF_MEMORY; + trx->error_key_num = i; + goto all_done;); + + BtrBulk btr_bulk(index[i], trx); + + err = row_merge_insert_index_tuples( + index[i], old_table, + OS_FILE_CLOSED, NULL, buf, + &btr_bulk, + table_total_rows, + curr_progress, + pct_cost, + crypt_block, + new_table->space_id); + + err = btr_bulk.finish(err); + + DBUG_EXECUTE_IF( + "row_merge_insert_big_row", + err = DB_TOO_BIG_RECORD;); + + if (err != DB_SUCCESS) { + break; + } + } else { + if (!row_merge_file_create_if_needed( + file, tmpfd, + buf->n_tuples, path)) { + err = DB_OUT_OF_MEMORY; + trx->error_key_num = i; + break; + } + + /* Ensure that duplicates in the + clustered index will be detected before + inserting secondary index records. */ + if (dict_index_is_clust(buf->index)) { + clust_temp_file = true; + } + + ut_ad(file->n_rec > 0); + + row_merge_buf_write(buf, +#ifndef DBUG_OFF + file, +#endif + block); + + if (!row_merge_write( + file->fd, file->offset++, + block, crypt_block, + new_table->space_id)) { + err = DB_TEMP_FILE_WRITE_FAIL; + trx->error_key_num = i; + break; + } + + MEM_UNDEFINED( + &block[0], srv_sort_buf_size); + } + } + merge_buf[i] = row_merge_buf_empty(buf); + buf = merge_buf[i]; + + if (UNIV_LIKELY(row != NULL)) { + /* Try writing the record again, now + that the buffer has been written out + and emptied. */ + + if (UNIV_UNLIKELY + (!(rows_added = row_merge_buf_add( + buf, fts_index, old_table, + new_table, psort_info, + row, ext, history_fts, &doc_id, + conv_heap, &err, &v_heap, + eval_table, trx, col_collate)))) { + /* An empty buffer should have enough + room for at least one record. */ + ut_ad(err == DB_COMPUTE_VALUE_FAILED + || err == DB_OUT_OF_MEMORY + || err == DB_TOO_BIG_RECORD); + } else if (err == DB_SUCCESS) { + file->n_rec += rows_added; + continue; + } + + trx->error_key_num = i; + break; + } + } + + if (row == NULL) { + if (old_table != new_table) { + new_table->stat_n_rows = n_rows; + } + + goto all_done; + } + + if (err != DB_SUCCESS) { + goto func_exit; + } + + if (v_heap) { + mem_heap_empty(v_heap); + } + + /* Increment innodb_onlineddl_pct_progress status variable */ + read_rows++; + if(read_rows % 1000 == 0) { + /* Update progress for each 1000 rows */ + curr_progress = (read_rows >= table_total_rows) ? + pct_cost : + pct_cost * static_cast<double>(read_rows) + / static_cast<double>(table_total_rows); + /* presenting 10.12% as 1012 integer */ + onlineddl_pct_progress = (ulint) (curr_progress * 100); + } + } + +func_exit: + ut_ad(mtr_started == mtr.is_active()); + if (mtr_started) { + mtr.commit(); + } + if (row_heap) { + mem_heap_free(row_heap); + } + ut_free(nonnull); + +all_done: + if (clust_btr_bulk != NULL) { + ut_ad(err != DB_SUCCESS); + clust_btr_bulk->latch(); + err = clust_btr_bulk->finish( + err); + UT_DELETE(clust_btr_bulk); + } + + if (prev_fields) { + ut_free(prev_fields); + mem_heap_free(mtuple_heap); + } + + if (v_heap) { + mem_heap_free(v_heap); + } + + if (conv_heap != NULL) { + mem_heap_free(conv_heap); + } + +#ifdef FTS_INTERNAL_DIAG_PRINT + DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n"); +#endif + if (UNIV_LIKELY_NULL(fts_parallel_sort_cond)) { +wait_again: + /* Check if error occurs in child thread */ + for (ulint j = 0; j < fts_sort_pll_degree; j++) { + if (psort_info[j].error != DB_SUCCESS) { + err = psort_info[j].error; + trx->error_key_num = j; + break; + } + } + + /* Tell all children that parent has done scanning */ + for (ulint i = 0; i < fts_sort_pll_degree; i++) { + if (err == DB_SUCCESS) { + psort_info[i].state = FTS_PARENT_COMPLETE; + } else { + psort_info[i].state = FTS_PARENT_EXITING; + } + } + + /* Now wait all children to report back to be completed */ + timespec abstime; + set_timespec(abstime, 1); + mysql_mutex_lock(&psort_info[0].mutex); + my_cond_timedwait(fts_parallel_sort_cond, + &psort_info[0].mutex.m_mutex, &abstime); + mysql_mutex_unlock(&psort_info[0].mutex); + + for (ulint i = 0; i < fts_sort_pll_degree; i++) { + if (!psort_info[i].child_status) { + goto wait_again; + } + } + + for (ulint j = 0; j < fts_sort_pll_degree; j++) { + psort_info[j].task->wait(); + delete psort_info[j].task; + } + } + +#ifdef FTS_INTERNAL_DIAG_PRINT + DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n"); +#endif + for (ulint i = 0; i < n_index; i++) { + row_merge_buf_free(merge_buf[i]); + } + + row_fts_free_pll_merge_buf(psort_info); + + ut_free(merge_buf); + ut_free(pcur.old_rec_buf); + + if (sp_tuples != NULL) { + for (ulint i = 0; i < num_spatial; i++) { + UT_DELETE(sp_tuples[i]); + } + ut_free(sp_tuples); + } + + /* Update the next Doc ID we used. Table should be locked, so + no concurrent DML */ + if (max_doc_id && err == DB_SUCCESS) { + /* Sync fts cache for other fts indexes to keep all + fts indexes consistent in sync_doc_id. */ + err = fts_sync_table(const_cast<dict_table_t*>(new_table)); + + if (err == DB_SUCCESS) { + new_table->fts->cache->synced_doc_id = max_doc_id; + + /* Update the max value as next FTS_DOC_ID */ + if (max_doc_id >= new_table->fts->cache->next_doc_id) { + new_table->fts->cache->next_doc_id = + max_doc_id + 1; + } + + new_table->fts->cache->first_doc_id = + new_table->fts->cache->next_doc_id; + + err= fts_update_sync_doc_id( + new_table, + new_table->fts->cache->synced_doc_id, + NULL); + } + } + + if (vers_update_trt) { + trx->mod_tables.emplace(new_table, 0) + .first->second.set_versioned(0); + } + + trx->op_info = ""; + + DBUG_RETURN(err); +} + +/** Write a record via buffer 2 and read the next record to buffer N. +@param N number of the buffer (0 or 1) +@param INDEX record descriptor +@param AT_END statement to execute at end of input */ +#define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) \ + do { \ + b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \ + &buf[2], b2, \ + of->fd, &of->offset, \ + mrec##N, offsets##N, \ + crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL , \ + space); \ + if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \ + goto corrupt; \ + } \ + b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\ + &buf[N], b##N, INDEX, \ + file->fd, foffs##N, \ + &mrec##N, offsets##N, \ + crypt_block ? &crypt_block[N * srv_sort_buf_size] : NULL, \ + space); \ + \ + if (UNIV_UNLIKELY(!b##N)) { \ + if (mrec##N) { \ + goto corrupt; \ + } \ + AT_END; \ + } \ + } while (0) + +#ifdef HAVE_PSI_STAGE_INTERFACE +#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \ + do { \ + if (stage != NULL) { \ + stage->inc(); \ + } \ + ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END); \ + } while (0) +#else /* HAVE_PSI_STAGE_INTERFACE */ +#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \ + ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) +#endif /* HAVE_PSI_STAGE_INTERFACE */ + +/** Merge two blocks of records on disk and write a bigger block. +@param[in] dup descriptor of index being created +@param[in] file file containing index entries +@param[in,out] block 3 buffers +@param[in,out] foffs0 offset of first source list in the file +@param[in,out] foffs1 offset of second source list in the file +@param[in,out] of output file +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL stage->inc() will be called for each record +processed. +@param[in,out] crypt_block encryption buffer +@param[in] space tablespace ID for encryption +@return DB_SUCCESS or error code */ +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +row_merge_blocks( + const row_merge_dup_t* dup, + const merge_file_t* file, + row_merge_block_t* block, + ulint* foffs0, + ulint* foffs1, + merge_file_t* of, + ut_stage_alter_t* stage MY_ATTRIBUTE((unused)), + row_merge_block_t* crypt_block, + ulint space) +{ + mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */ + + mrec_buf_t* buf; /*!< buffer for handling + split mrec in block[] */ + const byte* b0; /*!< pointer to block[0] */ + const byte* b1; /*!< pointer to block[srv_sort_buf_size] */ + byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */ + const mrec_t* mrec0; /*!< merge rec, points to block[0] or buf[0] */ + const mrec_t* mrec1; /*!< merge rec, points to + block[srv_sort_buf_size] or buf[1] */ + rec_offs* offsets0;/* offsets of mrec0 */ + rec_offs* offsets1;/* offsets of mrec1 */ + + DBUG_ENTER("row_merge_blocks"); + DBUG_LOG("ib_merge_sort", + "fd=" << file->fd << ',' << *foffs0 << '+' << *foffs1 + << " to fd=" << of->fd << ',' << of->offset); + + heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1); + + /* Write a record and read the next record. Split the output + file in two halves, which can be merged on the following pass. */ + + if (!row_merge_read(file->fd, *foffs0, &block[0], + crypt_block ? &crypt_block[0] : NULL, + space) || + !row_merge_read(file->fd, *foffs1, &block[srv_sort_buf_size], + crypt_block ? &crypt_block[srv_sort_buf_size] : NULL, + space)) { +corrupt: + mem_heap_free(heap); + DBUG_RETURN(DB_CORRUPTION); + } + + b0 = &block[0]; + b1 = &block[srv_sort_buf_size]; + b2 = &block[2 * srv_sort_buf_size]; + + b0 = row_merge_read_rec( + &block[0], &buf[0], b0, dup->index, + file->fd, foffs0, &mrec0, offsets0, + crypt_block ? &crypt_block[0] : NULL, + space); + + b1 = row_merge_read_rec( + &block[srv_sort_buf_size], + &buf[srv_sort_buf_size], b1, dup->index, + file->fd, foffs1, &mrec1, offsets1, + crypt_block ? &crypt_block[srv_sort_buf_size] : NULL, + space); + + if (UNIV_UNLIKELY(!b0 && mrec0) + || UNIV_UNLIKELY(!b1 && mrec1)) { + + goto corrupt; + } + + while (mrec0 && mrec1) { + int cmp = cmp_rec_rec_simple( + mrec0, mrec1, offsets0, offsets1, + dup->index, dup->table); + if (cmp < 0) { + ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged); + } else if (cmp) { + ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged); + } else { + mem_heap_free(heap); + DBUG_RETURN(DB_DUPLICATE_KEY); + } + } + +merged: + if (mrec0) { + /* append all mrec0 to output */ + for (;;) { + ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0); + } + } +done0: + if (mrec1) { + /* append all mrec1 to output */ + for (;;) { + ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1); + } + } +done1: + + mem_heap_free(heap); + + b2 = row_merge_write_eof( + &block[2 * srv_sort_buf_size], + b2, of->fd, &of->offset, + crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL, + space); + DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION); +} + +/** Copy a block of index entries. +@param[in] index index being created +@param[in] file input file +@param[in,out] block 3 buffers +@param[in,out] foffs0 input file offset +@param[in,out] of output file +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL stage->inc() will be called for each record +processed. +@param[in,out] crypt_block encryption buffer +@param[in] space tablespace ID for encryption +@return TRUE on success, FALSE on failure */ +static MY_ATTRIBUTE((warn_unused_result)) +ibool +row_merge_blocks_copy( + const dict_index_t* index, + const merge_file_t* file, + row_merge_block_t* block, + ulint* foffs0, + merge_file_t* of, + ut_stage_alter_t* stage MY_ATTRIBUTE((unused)), + row_merge_block_t* crypt_block, + ulint space) +{ + mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */ + + mrec_buf_t* buf; /*!< buffer for handling + split mrec in block[] */ + const byte* b0; /*!< pointer to block[0] */ + byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */ + const mrec_t* mrec0; /*!< merge rec, points to block[0] */ + rec_offs* offsets0;/* offsets of mrec0 */ + rec_offs* offsets1;/* dummy offsets */ + + DBUG_ENTER("row_merge_blocks_copy"); + DBUG_LOG("ib_merge_sort", + "fd=" << file->fd << ',' << foffs0 + << " to fd=" << of->fd << ',' << of->offset); + + heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1); + + /* Write a record and read the next record. Split the output + file in two halves, which can be merged on the following pass. */ + + if (!row_merge_read(file->fd, *foffs0, &block[0], + crypt_block ? &crypt_block[0] : NULL, + space)) { +corrupt: + mem_heap_free(heap); + DBUG_RETURN(FALSE); + } + + b0 = &block[0]; + + b2 = &block[2 * srv_sort_buf_size]; + + b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, + file->fd, foffs0, &mrec0, offsets0, + crypt_block ? &crypt_block[0] : NULL, + space); + + if (UNIV_UNLIKELY(!b0 && mrec0)) { + + goto corrupt; + } + + if (mrec0) { + /* append all mrec0 to output */ + for (;;) { + ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0); + } + } +done0: + + /* The file offset points to the beginning of the last page + that has been read. Update it to point to the next block. */ + (*foffs0)++; + + mem_heap_free(heap); + + DBUG_RETURN(row_merge_write_eof( + &block[2 * srv_sort_buf_size], + b2, of->fd, &of->offset, + crypt_block + ? &crypt_block[2 * srv_sort_buf_size] + : NULL, space) + != NULL); +} + +/** Merge disk files. +@param[in] trx transaction +@param[in] dup descriptor of index being created +@param[in,out] file file containing index entries +@param[in,out] block 3 buffers +@param[in,out] tmpfd temporary file handle +@param[in,out] num_run Number of runs that remain to be merged +@param[in,out] run_offset Array that contains the first offset number +for each merge run +@param[in,out] stage performance schema accounting object, used by +@param[in,out] crypt_block encryption buffer +@param[in] space tablespace ID for encryption +ALTER TABLE. If not NULL stage->inc() will be called for each record +processed. +@return DB_SUCCESS or error code */ +static +dberr_t +row_merge( + trx_t* trx, + const row_merge_dup_t* dup, + merge_file_t* file, + row_merge_block_t* block, + pfs_os_file_t* tmpfd, + ulint* num_run, + ulint* run_offset, + ut_stage_alter_t* stage, + row_merge_block_t* crypt_block, + ulint space) +{ + ulint foffs0; /*!< first input offset */ + ulint foffs1; /*!< second input offset */ + dberr_t error; /*!< error code */ + merge_file_t of; /*!< output file */ + const ulint ihalf = run_offset[*num_run / 2]; + /*!< half the input file */ + ulint n_run = 0; + /*!< num of runs generated from this merge */ + + MEM_CHECK_ADDRESSABLE(&block[0], 3 * srv_sort_buf_size); + + if (crypt_block) { + MEM_CHECK_ADDRESSABLE(&crypt_block[0], 3 * srv_sort_buf_size); + } + + ut_ad(ihalf < file->offset); + + of.fd = *tmpfd; + of.offset = 0; + of.n_rec = 0; + +#ifdef POSIX_FADV_SEQUENTIAL + /* The input file will be read sequentially, starting from the + beginning and the middle. In Linux, the POSIX_FADV_SEQUENTIAL + affects the entire file. Each block will be read exactly once. */ + posix_fadvise(file->fd, 0, 0, + POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE); +#endif /* POSIX_FADV_SEQUENTIAL */ + + /* Merge blocks to the output file. */ + foffs0 = 0; + foffs1 = ihalf; + + MEM_UNDEFINED(run_offset, *num_run * sizeof *run_offset); + + for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) { + + if (trx_is_interrupted(trx)) { + return(DB_INTERRUPTED); + } + + /* Remember the offset number for this run */ + run_offset[n_run++] = of.offset; + + error = row_merge_blocks(dup, file, block, + &foffs0, &foffs1, &of, stage, + crypt_block, space); + + if (error != DB_SUCCESS) { + return(error); + } + + } + + /* Copy the last blocks, if there are any. */ + + while (foffs0 < ihalf) { + + if (UNIV_UNLIKELY(trx_is_interrupted(trx))) { + return(DB_INTERRUPTED); + } + + /* Remember the offset number for this run */ + run_offset[n_run++] = of.offset; + + if (!row_merge_blocks_copy(dup->index, file, block, + &foffs0, &of, stage, + crypt_block, space)) { + return(DB_CORRUPTION); + } + } + + ut_ad(foffs0 == ihalf); + + while (foffs1 < file->offset) { + + if (trx_is_interrupted(trx)) { + return(DB_INTERRUPTED); + } + + /* Remember the offset number for this run */ + run_offset[n_run++] = of.offset; + + if (!row_merge_blocks_copy(dup->index, file, block, + &foffs1, &of, stage, + crypt_block, space)) { + return(DB_CORRUPTION); + } + } + + ut_ad(foffs1 == file->offset); + + if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) { + return(DB_CORRUPTION); + } + + ut_ad(n_run <= *num_run); + + *num_run = n_run; + + /* Each run can contain one or more offsets. As merge goes on, + the number of runs (to merge) will reduce until we have one + single run. So the number of runs will always be smaller than + the number of offsets in file */ + ut_ad((*num_run) <= file->offset); + + /* The number of offsets in output file is always equal or + smaller than input file */ + ut_ad(of.offset <= file->offset); + + /* Swap file descriptors for the next pass. */ + *tmpfd = file->fd; + *file = of; + + MEM_UNDEFINED(&block[0], 3 * srv_sort_buf_size); + + return(DB_SUCCESS); +} + +/** Merge disk files. +@param[in] trx transaction +@param[in] dup descriptor of index being created +@param[in,out] file file containing index entries +@param[in,out] block 3 buffers +@param[in,out] tmpfd temporary file handle +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially +and then stage->inc() will be called for each record processed. +@return DB_SUCCESS or error code */ +dberr_t +row_merge_sort( + trx_t* trx, + const row_merge_dup_t* dup, + merge_file_t* file, + row_merge_block_t* block, + pfs_os_file_t* tmpfd, + const bool update_progress, + /*!< in: update progress + status variable or not */ + const double pct_progress, + /*!< in: total progress percent + until now */ + const double pct_cost, /*!< in: current progress percent */ + row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ + ulint space, /*!< in: space id */ + ut_stage_alter_t* stage) +{ + const ulint half = file->offset / 2; + ulint num_runs; + ulint* run_offset; + dberr_t error = DB_SUCCESS; + ulint merge_count = 0; + ulint total_merge_sort_count; + double curr_progress = 0; + + DBUG_ENTER("row_merge_sort"); + + /* Record the number of merge runs we need to perform */ + num_runs = file->offset; + + if (stage != NULL) { + stage->begin_phase_sort(log2(double(num_runs))); + } + + /* If num_runs are less than 1, nothing to merge */ + if (num_runs <= 1) { + DBUG_RETURN(error); + } + + total_merge_sort_count = ulint(ceil(log2(double(num_runs)))); + + /* "run_offset" records each run's first offset number */ + run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint)); + + /* This tells row_merge() where to start for the first round + of merge. */ + run_offset[half] = half; + + /* The file should always contain at least one byte (the end + of file marker). Thus, it must be at least one block. */ + ut_ad(file->offset > 0); + + /* These thd_progress* calls will crash on sol10-64 when innodb_plugin + is used. MDEV-9356: innodb.innodb_bug53290 fails (crashes) on + sol10-64 in buildbot. + */ +#ifndef __sun__ + /* Progress report only for "normal" indexes. */ + if (dup && !(dup->index->type & DICT_FTS)) { + thd_progress_init(trx->mysql_thd, 1); + } +#endif /* __sun__ */ + + if (global_system_variables.log_warnings > 2) { + sql_print_information("InnoDB: Online DDL : merge-sorting" + " has estimated " ULINTPF " runs", + num_runs); + } + + /* Merge the runs until we have one big run */ + do { + /* Report progress of merge sort to MySQL for + show processlist progress field */ + /* Progress report only for "normal" indexes. */ +#ifndef __sun__ + if (dup && !(dup->index->type & DICT_FTS)) { + thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset); + } +#endif /* __sun__ */ + + error = row_merge(trx, dup, file, block, tmpfd, + &num_runs, run_offset, stage, + crypt_block, space); + + if(update_progress) { + merge_count++; + curr_progress = (merge_count >= total_merge_sort_count) ? + pct_cost : + pct_cost * static_cast<double>(merge_count) + / static_cast<double>(total_merge_sort_count); + /* presenting 10.12% as 1012 integer */; + onlineddl_pct_progress = (ulint) ((pct_progress + curr_progress) * 100); + } + + if (error != DB_SUCCESS) { + break; + } + + MEM_CHECK_DEFINED(run_offset, num_runs * sizeof *run_offset); + } while (num_runs > 1); + + ut_free(run_offset); + + /* Progress report only for "normal" indexes. */ +#ifndef __sun__ + if (dup && !(dup->index->type & DICT_FTS)) { + thd_progress_end(trx->mysql_thd); + } +#endif /* __sun__ */ + + DBUG_RETURN(error); +} + +/** Copy the blob from the given blob file and store it +in field data for the tuple +@param tuple tuple to be inserted +@param heap heap to allocate the memory for the blob storage +@param blob_file file to handle blob data */ +static dberr_t row_merge_copy_blob_from_file(dtuple_t *tuple, mem_heap_t *heap, + merge_file_t *blob_file) +{ + for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) + { + dfield_t *field= dtuple_get_nth_field(tuple, i); + const byte *field_data= static_cast<byte*>(dfield_get_data(field)); + ulint field_len= dfield_get_len(field); + if (!dfield_is_ext(field)) + continue; + + ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE); + ut_ad(!dfield_is_null(field)); + + ut_ad(mach_read_from_8(field_data) == 0); + uint64_t offset= mach_read_from_8(field_data + 8); + uint32_t len= mach_read_from_4(field_data + 16); + + byte *data= (byte*) mem_heap_alloc(heap, len); + if (dberr_t err= os_file_read(IORequestRead, blob_file->fd, data, + offset, len, nullptr)) + return err; + dfield_set_data(field, data, len); + } + + return DB_SUCCESS; +} + +/** Copy externally stored columns to the data tuple. +@param[in] mrec record containing BLOB pointers, +or NULL to use tuple instead +@param[in] offsets offsets of mrec +@param[in] zip_size compressed page size in bytes, or 0 +@param[in,out] tuple data tuple +@param[in,out] heap memory heap */ +static +void +row_merge_copy_blobs( + const mrec_t* mrec, + const rec_offs* offsets, + ulint zip_size, + dtuple_t* tuple, + mem_heap_t* heap) +{ + ut_ad(mrec == NULL || rec_offs_any_extern(offsets)); + + for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) { + ulint len; + const void* data; + dfield_t* field = dtuple_get_nth_field(tuple, i); + ulint field_len; + const byte* field_data; + + if (!dfield_is_ext(field)) { + continue; + } + + ut_ad(!dfield_is_null(field)); + + /* During the creation of a PRIMARY KEY, the table is + X-locked, and we skip copying records that have been + marked for deletion. Therefore, externally stored + columns cannot possibly be freed between the time the + BLOB pointers are read (row_merge_read_clustered_index()) + and dereferenced (below). */ + if (mrec == NULL) { + field_data + = static_cast<byte*>(dfield_get_data(field)); + field_len = dfield_get_len(field); + + ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE); + + ut_a(memcmp(field_data + field_len + - BTR_EXTERN_FIELD_REF_SIZE, + field_ref_zero, + BTR_EXTERN_FIELD_REF_SIZE)); + + data = btr_copy_externally_stored_field( + &len, field_data, zip_size, field_len, heap); + } else { + data = btr_rec_copy_externally_stored_field( + mrec, offsets, zip_size, i, &len, heap); + } + + /* Because we have locked the table, any records + written by incomplete transactions must have been + rolled back already. There must not be any incomplete + BLOB columns. */ + ut_a(data); + + dfield_set_data(field, data, len); + } +} + +/** Convert a merge record to a typed data tuple. Note that externally +stored fields are not copied to heap. +@param[in,out] index index on the table +@param[in] mtuple merge record +@param[in] heap memory heap from which memory needed is allocated +@return index entry built. */ +static +void +row_merge_mtuple_to_dtuple( + dict_index_t* index, + dtuple_t* dtuple, + const mtuple_t* mtuple) +{ + ut_ad(!dict_index_is_ibuf(index)); + + memcpy(dtuple->fields, mtuple->fields, + dtuple->n_fields * sizeof *mtuple->fields); +} + +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +row_merge_insert_index_tuples( + dict_index_t* index, + const dict_table_t* old_table, + const pfs_os_file_t& fd, + row_merge_block_t* block, + const row_merge_buf_t* row_buf, + BtrBulk* btr_bulk, + const ib_uint64_t table_total_rows, + double pct_progress, + double pct_cost, + row_merge_block_t* crypt_block, + ulint space, + ut_stage_alter_t* stage, + merge_file_t* blob_file) +{ + const byte* b; + mem_heap_t* heap; + mem_heap_t* tuple_heap; + dberr_t error = DB_SUCCESS; + ulint foffs = 0; + rec_offs* offsets; + mrec_buf_t* buf; + ulint n_rows = 0; + dtuple_t* dtuple; + ib_uint64_t inserted_rows = 0; + double curr_progress = 0; + dict_index_t* old_index = NULL; + const mrec_t* mrec = NULL; + mtr_t mtr; + + + DBUG_ENTER("row_merge_insert_index_tuples"); + + ut_ad(!srv_read_only_mode); + ut_ad(!(index->type & DICT_FTS)); + ut_ad(!dict_index_is_spatial(index)); + + if (stage != NULL) { + stage->begin_phase_insert(); + } + + tuple_heap = mem_heap_create(1000); + + { + ulint i = 1 + REC_OFFS_HEADER_SIZE + + dict_index_get_n_fields(index); + heap = mem_heap_create(sizeof *buf + i * sizeof *offsets); + offsets = static_cast<rec_offs*>( + mem_heap_alloc(heap, i * sizeof *offsets)); + rec_offs_set_n_alloc(offsets, i); + rec_offs_set_n_fields(offsets, dict_index_get_n_fields(index)); + } + + if (row_buf != NULL) { + ut_ad(fd == OS_FILE_CLOSED); + ut_ad(block == NULL); + DBUG_EXECUTE_IF("row_merge_read_failure", + error = DB_CORRUPTION; + goto err_exit;); + buf = NULL; + b = NULL; + dtuple = dtuple_create( + heap, dict_index_get_n_fields(index)); + dtuple_set_n_fields_cmp( + dtuple, dict_index_get_n_unique_in_tree(index)); + } else { + b = block; + dtuple = NULL; + + if (!row_merge_read(fd, foffs, block, crypt_block, space)) { + error = DB_CORRUPTION; + goto err_exit; + } else { + buf = static_cast<mrec_buf_t*>( + mem_heap_alloc(heap, sizeof *buf)); + } + } + + for (;;) { + + if (stage != NULL) { + stage->inc(); + } + + if (row_buf != NULL) { + if (n_rows >= row_buf->n_tuples) { + break; + } + + /* Convert merge tuple record from + row buffer to data tuple record */ + row_merge_mtuple_to_dtuple( + index, dtuple, &row_buf->tuples[n_rows]); + n_rows++; + /* BLOB pointers must be copied from dtuple */ + mrec = NULL; + } else { + b = row_merge_read_rec(block, buf, b, index, + fd, &foffs, &mrec, offsets, + crypt_block, + space); + + if (UNIV_UNLIKELY(!b)) { + /* End of list, or I/O error */ + if (mrec) { + error = DB_CORRUPTION; + } + break; + } + + dtuple = row_rec_to_index_entry_low( + mrec, index, offsets, tuple_heap); + } + + old_index = dict_table_get_first_index(old_table); + + if (dict_index_is_clust(index) + && dict_index_is_online_ddl(old_index)) { + error = row_log_table_get_error(old_index); + if (error != DB_SUCCESS) { + break; + } + } + + ut_ad(!dtuple_get_n_ext(dtuple) || index->is_primary()); + + if (!dtuple_get_n_ext(dtuple)) { + } else if (blob_file) { + error = row_merge_copy_blob_from_file( + dtuple, tuple_heap, blob_file); + if (error != DB_SUCCESS) { + break; + } + } else { + /* Off-page columns can be fetched safely + when concurrent modifications to the table + are disabled. (Purge can process delete-marked + records, but row_merge_read_clustered_index() + would have skipped them.) + + When concurrent modifications are enabled, + row_merge_read_clustered_index() will + only see rows from transactions that were + committed before the ALTER TABLE started + (REPEATABLE READ). + + Any modifications after the + row_merge_read_clustered_index() scan + will go through row_log_table_apply(). */ + row_merge_copy_blobs( + mrec, offsets, + old_table->space->zip_size(), + dtuple, tuple_heap); + } + + ut_ad(dtuple_validate(dtuple)); + error = btr_bulk->insert(dtuple); + + if (error != DB_SUCCESS) { + goto err_exit; + } + + mem_heap_empty(tuple_heap); + + /* Increment innodb_onlineddl_pct_progress status variable */ + inserted_rows++; + if(inserted_rows % 1000 == 0) { + /* Update progress for each 1000 rows */ + curr_progress = (inserted_rows >= table_total_rows || + table_total_rows <= 0) ? + pct_cost : + pct_cost * static_cast<double>(inserted_rows) + / static_cast<double>(table_total_rows); + + /* presenting 10.12% as 1012 integer */; + onlineddl_pct_progress = (ulint) ((pct_progress + curr_progress) * 100); + } + } + +err_exit: + mem_heap_free(tuple_heap); + mem_heap_free(heap); + + DBUG_RETURN(error); +} + +/*********************************************************************//** +Drop an index that was created before an error occurred. +The data dictionary must have been locked exclusively by the caller, +because the transaction will not be committed. */ +static +void +row_merge_drop_index_dict( +/*======================*/ + trx_t* trx, /*!< in/out: dictionary transaction */ + index_id_t index_id)/*!< in: index identifier */ +{ + static const char sql[] = + "PROCEDURE DROP_INDEX_PROC () IS\n" + "BEGIN\n" + "DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n" + "DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n" + "END;\n"; + dberr_t error; + pars_info_t* info; + + ut_ad(!srv_read_only_mode); + ut_ad(trx->dict_operation_lock_mode); + ut_ad(trx->dict_operation); + ut_ad(dict_sys.locked()); + + info = pars_info_create(); + pars_info_add_ull_literal(info, "indexid", index_id); + trx->op_info = "dropping index from dictionary"; + error = que_eval_sql(info, sql, trx); + + if (error != DB_SUCCESS) { + /* Even though we ensure that DDL transactions are WAIT + and DEADLOCK free, we could encounter other errors e.g., + DB_TOO_MANY_CONCURRENT_TRXS. */ + trx->error_state = DB_SUCCESS; + + ib::error() << "row_merge_drop_index_dict failed with error " + << error; + } + + trx->op_info = ""; +} + +/*********************************************************************//** +Drop indexes that were created before an error occurred. +The data dictionary must have been locked exclusively by the caller, +because the transaction will not be committed. */ +static +void +row_merge_drop_indexes_dict( +/*========================*/ + trx_t* trx, /*!< in/out: dictionary transaction */ + table_id_t table_id)/*!< in: table identifier */ +{ + static const char sql[] = + "PROCEDURE DROP_INDEXES_PROC () IS\n" + "ixid CHAR;\n" + "found INT;\n" + + "DECLARE CURSOR index_cur IS\n" + " SELECT ID FROM SYS_INDEXES\n" + " WHERE TABLE_ID=:tableid AND\n" + " SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n" + "FOR UPDATE;\n" + + "BEGIN\n" + "found := 1;\n" + "OPEN index_cur;\n" + "WHILE found = 1 LOOP\n" + " FETCH index_cur INTO ixid;\n" + " IF (SQL % NOTFOUND) THEN\n" + " found := 0;\n" + " ELSE\n" + " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n" + " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n" + " END IF;\n" + "END LOOP;\n" + "CLOSE index_cur;\n" + + "END;\n"; + dberr_t error; + pars_info_t* info; + + ut_ad(!srv_read_only_mode); + ut_ad(trx->dict_operation_lock_mode); + ut_ad(trx->dict_operation); + ut_ad(dict_sys.locked()); + + /* It is possible that table->n_ref_count > 1 when + locked=TRUE. In this case, all code that should have an open + handle to the table be waiting for the next statement to execute, + or waiting for a meta-data lock. + + A concurrent purge will be prevented by dict_sys.latch. */ + + info = pars_info_create(); + pars_info_add_ull_literal(info, "tableid", table_id); + trx->op_info = "dropping indexes"; + error = que_eval_sql(info, sql, trx); + + switch (error) { + case DB_SUCCESS: + break; + default: + /* Even though we ensure that DDL transactions are WAIT + and DEADLOCK free, we could encounter other errors e.g., + DB_TOO_MANY_CONCURRENT_TRXS. */ + ib::error() << "row_merge_drop_indexes_dict failed with error " + << error; + /* fall through */ + case DB_TOO_MANY_CONCURRENT_TRXS: + trx->error_state = DB_SUCCESS; + } + + trx->op_info = ""; +} + +/** Drop common internal tables if all fulltext indexes are dropped +@param trx transaction +@param table user table */ +static void row_merge_drop_fulltext_indexes(trx_t *trx, dict_table_t *table) +{ + if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID) || + !table->fts || + !ib_vector_is_empty(table->fts->indexes)) + return; + + for (const dict_index_t *index= dict_table_get_first_index(table); + index; index= dict_table_get_next_index(index)) + if (index->type & DICT_FTS) + return; + + fts_optimize_remove_table(table); + fts_drop_tables(trx, *table); + table->fts->~fts_t(); + table->fts= nullptr; + DICT_TF2_FLAG_UNSET(table, DICT_TF2_FTS); +} + +/** Drop indexes that were created before an error occurred. +The data dictionary must have been locked exclusively by the caller, +because the transaction will not be committed. +@param trx dictionary transaction +@param table table containing the indexes +@param locked True if table is locked, + false - may need to do lazy drop +@param alter_trx Alter table transaction */ +void +row_merge_drop_indexes( + trx_t* trx, + dict_table_t* table, + bool locked, + const trx_t* alter_trx) +{ + dict_index_t* index; + dict_index_t* next_index; + + ut_ad(!srv_read_only_mode); + ut_ad(trx->dict_operation_lock_mode); + ut_ad(trx->dict_operation); + ut_ad(dict_sys.locked()); + + index = dict_table_get_first_index(table); + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE); + + /* the caller should have an open handle to the table */ + ut_ad(table->get_ref_count() >= 1); + + /* It is possible that table->n_ref_count > 1 when + locked=TRUE. In this case, all code that should have an open + handle to the table be waiting for the next statement to execute, + or waiting for a meta-data lock. + + A concurrent purge will be prevented by MDL. */ + + if (!locked && (table->get_ref_count() > 1 + || table->has_lock_other_than(alter_trx))) { + while ((index = dict_table_get_next_index(index)) != NULL) { + ut_ad(!dict_index_is_clust(index)); + + switch (dict_index_get_online_status(index)) { + case ONLINE_INDEX_ABORTED_DROPPED: + continue; + case ONLINE_INDEX_COMPLETE: + if (index->is_committed()) { + /* Do nothing to already + published indexes. */ + } else if (index->type & DICT_FTS) { + /* Drop a completed FULLTEXT + index, due to a timeout during + MDL upgrade for + commit_inplace_alter_table(). + Because only concurrent reads + are allowed (and they are not + seeing this index yet) we + are safe to drop the index. */ + dict_index_t* prev = UT_LIST_GET_PREV( + indexes, index); + /* At least there should be + the clustered index before + this one. */ + ut_ad(prev); + ut_a(table->fts); + fts_drop_index(table, index, trx); + row_merge_drop_index_dict( + trx, index->id); + /* We can remove a DICT_FTS + index from the cache, because + we do not allow ADD FULLTEXT INDEX + with LOCK=NONE. If we allowed that, + we should exclude FTS entries from + prebuilt->ins_node->entry_list + in ins_node_create_entry_list(). */ +#ifdef BTR_CUR_HASH_ADAPT + ut_ad(!index->search_info->ref_count); +#endif /* BTR_CUR_HASH_ADAPT */ + dict_index_remove_from_cache( + table, index); + index = prev; + } else { + index->lock.x_lock(SRW_LOCK_CALL); + dict_index_set_online_status( + index, ONLINE_INDEX_ABORTED); + index->type |= DICT_CORRUPT; + table->drop_aborted = TRUE; + goto drop_aborted; + } + continue; + case ONLINE_INDEX_CREATION: + index->lock.x_lock(SRW_LOCK_CALL); + ut_ad(!index->is_committed()); + row_log_abort_sec(index); + drop_aborted: + index->lock.x_unlock(); + + DEBUG_SYNC_C("merge_drop_index_after_abort"); + /* covered by dict_sys.latch */ + MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX); + /* fall through */ + case ONLINE_INDEX_ABORTED: + /* Drop the index tree from the + data dictionary and free it from + the tablespace, but keep the object + in the data dictionary cache. */ + row_merge_drop_index_dict(trx, index->id); + index->lock.x_lock(SRW_LOCK_CALL); + dict_index_set_online_status( + index, ONLINE_INDEX_ABORTED_DROPPED); + index->lock.x_unlock(); + table->drop_aborted = TRUE; + continue; + } + ut_error; + } + + row_merge_drop_fulltext_indexes(trx, table); + return; + } + + row_merge_drop_indexes_dict(trx, table->id); + + /* Invalidate all row_prebuilt_t::ins_graph that are referring + to this table. That is, force row_get_prebuilt_insert_row() to + rebuild prebuilt->ins_node->entry_list). */ + if (table->def_trx_id < trx->id) { + table->def_trx_id = trx->id; + } else { + ut_ad(table->def_trx_id == trx->id || table->name.part()); + } + + next_index = dict_table_get_next_index(index); + + while ((index = next_index) != NULL) { + /* read the next pointer before freeing the index */ + next_index = dict_table_get_next_index(index); + + ut_ad(!dict_index_is_clust(index)); + + if (!index->is_committed()) { + /* If it is FTS index, drop from table->fts + and also drop its auxiliary tables */ + if (index->type & DICT_FTS) { + ut_a(table->fts); + fts_drop_index(table, index, trx); + } + + switch (dict_index_get_online_status(index)) { + case ONLINE_INDEX_CREATION: + /* This state should only be possible + when prepare_inplace_alter_table() fails + after invoking row_merge_create_index(). + In inplace_alter_table(), + row_merge_build_indexes() + should never leave the index in this state. + It would invoke row_log_abort_sec() on + failure. */ + case ONLINE_INDEX_COMPLETE: + /* In these cases, we are able to drop + the index straight. The DROP INDEX was + never deferred. */ + break; + case ONLINE_INDEX_ABORTED: + case ONLINE_INDEX_ABORTED_DROPPED: + /* covered by dict_sys.latch */ + MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX); + } + + dict_index_remove_from_cache(table, index); + } + } + + row_merge_drop_fulltext_indexes(trx, table); + table->drop_aborted = FALSE; + ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE)); +} + +/** Drop fulltext indexes */ +static ibool row_merge_drop_fts(void *node, void *trx) +{ + auto s= static_cast<sel_node_t*>(node); + + const dfield_t *table_id= que_node_get_val(s->select_list); + ut_ad(table_id->type.mtype == DATA_BINARY); + node= que_node_get_next(s->select_list); + ut_ad(!que_node_get_next(node)); + const dfield_t *index_id= que_node_get_val(node); + ut_ad(index_id->type.mtype == DATA_BINARY); + + static const char sql[]= + "PROCEDURE DROP_TABLES_PROC () IS\n" + "tid CHAR;\n" + "iid CHAR;\n" + + "DECLARE CURSOR cur_tab IS\n" + "SELECT ID FROM SYS_TABLES\n" + "WHERE INSTR(NAME,:name)+45=LENGTH(NAME)" + " AND INSTR('123456',SUBSTR(NAME,LENGTH(NAME)-1,1))>0" + " FOR UPDATE;\n" + + "DECLARE CURSOR cur_idx IS\n" + "SELECT ID FROM SYS_INDEXES\n" + "WHERE TABLE_ID = tid FOR UPDATE;\n" + + "BEGIN\n" + "OPEN cur_tab;\n" + "WHILE 1 = 1 LOOP\n" + " FETCH cur_tab INTO tid;\n" + " IF (SQL % NOTFOUND) THEN EXIT; END IF;\n" + " OPEN cur_idx;\n" + " WHILE 1 = 1 LOOP\n" + " FETCH cur_idx INTO iid;\n" + " IF (SQL % NOTFOUND) THEN EXIT; END IF;\n" + " DELETE FROM SYS_FIELDS WHERE INDEX_ID=iid;\n" + " DELETE FROM SYS_INDEXES WHERE CURRENT OF cur_idx;\n" + " END LOOP;\n" + " CLOSE cur_idx;\n" + " DELETE FROM SYS_COLUMNS WHERE TABLE_ID=tid;\n" + " DELETE FROM SYS_TABLES WHERE CURRENT OF cur_tab;\n" + "END LOOP;\n" + "CLOSE cur_tab;\n" + "END;\n"; + + if (table_id->len == 8 && index_id->len == 8) + { + char buf[sizeof "/FTS_0000000000000000_0000000000000000_INDEX_"]; + snprintf(buf, sizeof buf, "/FTS_%016llx_%016llx_INDEX_", + static_cast<ulonglong> + (mach_read_from_8(static_cast<const byte*>(table_id->data))), + static_cast<ulonglong> + (mach_read_from_8(static_cast<const byte*>(index_id->data)))); + auto pinfo= pars_info_create(); + pars_info_add_str_literal(pinfo, "name", buf); + que_eval_sql(pinfo, sql, static_cast<trx_t*>(trx)); + } + + return true; +} + +/** During recovery, drop recovered index stubs that were created in +prepare_inplace_alter_table_dict(). */ +void row_merge_drop_temp_indexes() +{ + static_assert(DICT_FTS == 32, "compatibility"); + + static const char sql[] = + "PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n" + "ixid CHAR;\n" + "found INT;\n" + + "DECLARE FUNCTION drop_fts;\n" + + "DECLARE CURSOR fts_cur IS\n" + " SELECT TABLE_ID,ID FROM SYS_INDEXES\n" + " WHERE TYPE=32" + " AND SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n" + " FOR UPDATE;\n" + + "DECLARE CURSOR index_cur IS\n" + " SELECT ID FROM SYS_INDEXES\n" + " WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n" + "FOR UPDATE;\n" + + "BEGIN\n" + "found := 1;\n" + "OPEN fts_cur;\n" + "WHILE found = 1 LOOP\n" + " FETCH fts_cur INTO drop_fts();\n" + " IF (SQL % NOTFOUND) THEN\n" + " found := 0;\n" + " END IF;\n" + "END LOOP;\n" + "CLOSE fts_cur;\n" + + "OPEN index_cur;\n" + "WHILE found = 1 LOOP\n" + " FETCH index_cur INTO ixid;\n" + " IF (SQL % NOTFOUND) THEN\n" + " found := 0;\n" + " ELSE\n" + " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n" + " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n" + " END IF;\n" + "END LOOP;\n" + "CLOSE index_cur;\n" + "END;\n"; + + /* Load the table definitions that contain partially defined + indexes, so that the data dictionary information can be checked + when accessing the tablename.ibd files. */ + trx_t* trx = trx_create(); + trx_start_for_ddl(trx); + trx->op_info = "dropping partially created indexes"; + dberr_t error = lock_sys_tables(trx); + + row_mysql_lock_data_dictionary(trx); + /* Ensure that this transaction will be rolled back and locks + will be released, if the server gets killed before the commit + gets written to the redo log. */ + trx->dict_operation = true; + + trx->op_info = "dropping indexes"; + + pars_info_t* pinfo = pars_info_create(); + pars_info_bind_function(pinfo, "drop_fts", row_merge_drop_fts, trx); + if (error == DB_SUCCESS) { + error = que_eval_sql(pinfo, sql, trx); + } + + if (error) { + /* Even though we ensure that DDL transactions are WAIT + and DEADLOCK free, we could encounter other errors e.g., + DB_TOO_MANY_CONCURRENT_TRXS. */ + trx->error_state = DB_SUCCESS; + + ib::error() << "row_merge_drop_temp_indexes(): " << error; + } + + trx_commit_for_mysql(trx); + row_mysql_unlock_data_dictionary(trx); + trx->free(); +} + + +/** Create temporary merge files in the given paramater path, and if +UNIV_PFS_IO defined, register the file descriptor with Performance Schema. +@param[in] path location for creating temporary merge files, or NULL +@return File descriptor */ +pfs_os_file_t +row_merge_file_create_low( + const char* path) +{ + if (!path) { + path = mysql_tmpdir; + } +#ifdef UNIV_PFS_IO + /* This temp file open does not go through normal + file APIs, add instrumentation to register with + performance schema */ + struct PSI_file_locker* locker; + PSI_file_locker_state state; + static const char label[] = "/Innodb Merge Temp File"; + char* name = static_cast<char*>( + ut_malloc_nokey(strlen(path) + sizeof label)); + strcpy(name, path); + strcat(name, label); + + register_pfs_file_open_begin( + &state, locker, innodb_temp_file_key, + PSI_FILE_CREATE, path ? name : label, __FILE__, __LINE__); + +#endif + DBUG_ASSERT(strlen(path) + 2 <= FN_REFLEN); + char filename[FN_REFLEN]; + File f = create_temp_file(filename, path, "ib", + O_BINARY | O_SEQUENTIAL, + MYF(MY_WME | MY_TEMPORARY)); + pfs_os_file_t fd = IF_WIN((os_file_t)my_get_osfhandle(f), f); + +#ifdef UNIV_PFS_IO + register_pfs_file_open_end(locker, fd, + (fd == OS_FILE_CLOSED)?NULL:&fd); + ut_free(name); +#endif + + if (fd == OS_FILE_CLOSED) { + ib::error() << "Cannot create temporary merge file"; + } + return(fd); +} + + +/** Create a merge file in the given location. +@param[out] merge_file merge file structure +@param[in] path location for creating temporary file, or NULL +@return file descriptor, or OS_FILE_CLOSED on error */ +pfs_os_file_t +row_merge_file_create( + merge_file_t* merge_file, + const char* path) +{ + merge_file->fd = row_merge_file_create_low(path); + merge_file->offset = 0; + merge_file->n_rec = 0; + + if (merge_file->fd != OS_FILE_CLOSED) { + if (srv_disable_sort_file_cache) { + os_file_set_nocache(merge_file->fd, + "row0merge.cc", "sort"); + } + } + return(merge_file->fd); +} + +/*********************************************************************//** +Destroy a merge file. And de-register the file from Performance Schema +if UNIV_PFS_IO is defined. */ +void +row_merge_file_destroy_low( +/*=======================*/ + const pfs_os_file_t& fd) /*!< in: merge file descriptor */ +{ + if (fd != OS_FILE_CLOSED) { + int res = mysql_file_close(IF_WIN(my_win_handle2File((os_file_t)fd), fd), + MYF(MY_WME)); + ut_a(res != -1); + } +} +/*********************************************************************//** +Destroy a merge file. */ +void +row_merge_file_destroy( +/*===================*/ + merge_file_t* merge_file) /*!< in/out: merge file structure */ +{ + ut_ad(!srv_read_only_mode); + + if (merge_file->fd != OS_FILE_CLOSED) { + row_merge_file_destroy_low(merge_file->fd); + merge_file->fd = OS_FILE_CLOSED; + } +} + +/*********************************************************************//** +Rename an index in the dictionary that was created. The data +dictionary must have been locked exclusively by the caller, because +the transaction will not be committed. +@return DB_SUCCESS if all OK */ +dberr_t +row_merge_rename_index_to_add( +/*==========================*/ + trx_t* trx, /*!< in/out: transaction */ + table_id_t table_id, /*!< in: table identifier */ + index_id_t index_id) /*!< in: index identifier */ +{ + dberr_t err = DB_SUCCESS; + pars_info_t* info = pars_info_create(); + + /* We use the private SQL parser of Innobase to generate the + query graphs needed in renaming indexes. */ + + static const char rename_index[] = + "PROCEDURE RENAME_INDEX_PROC () IS\n" + "BEGIN\n" + "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n" + "WHERE TABLE_ID = :tableid AND ID = :indexid;\n" + "END;\n"; + + ut_ad(trx->dict_operation_lock_mode); + ut_ad(trx->dict_operation); + + trx->op_info = "renaming index to add"; + + pars_info_add_ull_literal(info, "tableid", table_id); + pars_info_add_ull_literal(info, "indexid", index_id); + + err = que_eval_sql(info, rename_index, trx); + + if (err != DB_SUCCESS) { + /* Even though we ensure that DDL transactions are WAIT + and DEADLOCK free, we could encounter other errors e.g., + DB_TOO_MANY_CONCURRENT_TRXS. */ + trx->error_state = DB_SUCCESS; + + ib::error() << "row_merge_rename_index_to_add failed with" + " error " << err; + } + + trx->op_info = ""; + + return(err); +} + +/** Create the index and load in to the dictionary. +@param[in,out] table the index is on this table +@param[in] index_def the index definition +@param[in] add_v new virtual columns added along with add + index call +@return index, or NULL on error */ +dict_index_t* +row_merge_create_index( + dict_table_t* table, + const index_def_t* index_def, + const dict_add_v_col_t* add_v) +{ + dict_index_t* index; + ulint n_fields = index_def->n_fields; + ulint i; + ulint n_add_vcol = 0; + + DBUG_ENTER("row_merge_create_index"); + + ut_ad(!srv_read_only_mode); + + /* Create the index prototype, using the passed in def, this is not + a persistent operation. We pass 0 as the space id, and determine at + a lower level the space id where to store the table. */ + + index = dict_mem_index_create(table, index_def->name, + index_def->ind_type, n_fields); + index->set_committed(index_def->rebuild); + + for (i = 0; i < n_fields; i++) { + const char* name; + index_field_t* ifield = &index_def->fields[i]; + + if (ifield->is_v_col) { + if (ifield->col_no >= table->n_v_def) { + ut_ad(ifield->col_no < table->n_v_def + + add_v->n_v_col); + ut_ad(ifield->col_no >= table->n_v_def); + name = add_v->v_col_name[ + ifield->col_no - table->n_v_def]; + n_add_vcol++; + } else { + name = dict_table_get_v_col_name( + table, ifield->col_no); + } + } else { + name = dict_table_get_col_name(table, ifield->col_no); + } + + dict_mem_index_add_field(index, name, ifield->prefix_len, + ifield->descending); + } + + if (n_add_vcol) { + index->assign_new_v_col(n_add_vcol); + } + + DBUG_RETURN(index); +} + +/*********************************************************************//** +Check if a transaction can use an index. */ +bool +row_merge_is_index_usable( +/*======================*/ + const trx_t* trx, /*!< in: transaction */ + const dict_index_t* index) /*!< in: index to check */ +{ + if (!index->is_primary() + && dict_index_is_online_ddl(index)) { + /* Indexes that are being created are not useable. */ + return(false); + } + + return(!index->is_corrupted() + && (index->table->is_temporary() || index->table->no_rollback() + || index->trx_id == 0 + || !trx->read_view.is_open() + || trx->read_view.changes_visible(index->trx_id))); +} + +/** Build indexes on a table by reading a clustered index, creating a temporary +file containing index entries, merge sorting these index entries and inserting +sorted index entries to indexes. +@param[in] trx transaction +@param[in] old_table table where rows are read from +@param[in] new_table table where indexes are created; identical to +old_table unless creating a PRIMARY KEY +@param[in] online true if creating indexes online +@param[in] indexes indexes to be created +@param[in] key_numbers MySQL key numbers +@param[in] n_indexes size of indexes[] +@param[in,out] table MySQL table, for reporting erroneous key value +if applicable +@param[in] defaults default values of added, changed columns, or NULL +@param[in] col_map mapping of old column numbers to new ones, or +NULL if old_table == new_table +@param[in] add_autoinc number of added AUTO_INCREMENT columns, or +ULINT_UNDEFINED if none is added +@param[in,out] sequence autoinc sequence +@param[in] skip_pk_sort whether the new PRIMARY KEY will follow +existing order +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of +this function and it will be passed to other functions for further accounting. +@param[in] add_v new virtual columns added along with indexes +@param[in] eval_table mysql table used to evaluate virtual column + value, see innobase_get_computed_value(). +@param[in] allow_not_null allow the conversion from null to not-null +@param[in] col_collate columns whose collations changed, or nullptr +@return DB_SUCCESS or error code */ +dberr_t +row_merge_build_indexes( + trx_t* trx, + dict_table_t* old_table, + dict_table_t* new_table, + bool online, + dict_index_t** indexes, + const ulint* key_numbers, + ulint n_indexes, + struct TABLE* table, + const dtuple_t* defaults, + const ulint* col_map, + ulint add_autoinc, + ib_sequence_t& sequence, + bool skip_pk_sort, + ut_stage_alter_t* stage, + const dict_add_v_col_t* add_v, + struct TABLE* eval_table, + bool allow_not_null, + const col_collations* col_collate) +{ + merge_file_t* merge_files; + row_merge_block_t* block; + ut_new_pfx_t block_pfx; + size_t block_size; + ut_new_pfx_t crypt_pfx; + row_merge_block_t* crypt_block = NULL; + ulint i; + ulint j; + dberr_t error; + pfs_os_file_t tmpfd = OS_FILE_CLOSED; + dict_index_t* fts_sort_idx = NULL; + fts_psort_t* psort_info = NULL; + fts_psort_t* merge_info = NULL; + bool fts_psort_initiated = false; + + double total_static_cost = 0; + double total_dynamic_cost = 0; + ulint total_index_blocks = 0; + double pct_cost=0; + double pct_progress=0; + + DBUG_ENTER("row_merge_build_indexes"); + + ut_ad(!srv_read_only_mode); + ut_ad((old_table == new_table) == !col_map); + ut_ad(!defaults || col_map); + + stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table + ? n_indexes - 1 + : n_indexes); + + /* Allocate memory for merge file data structure and initialize + fields */ + + ut_allocator<row_merge_block_t> alloc(mem_key_row_merge_sort); + + /* This will allocate "3 * srv_sort_buf_size" elements of type + row_merge_block_t. The latter is defined as byte. */ + block_size = 3 * srv_sort_buf_size; + block = alloc.allocate_large(block_size, &block_pfx); + + if (block == NULL) { + DBUG_RETURN(DB_OUT_OF_MEMORY); + } + + crypt_pfx.m_size = 0; /* silence bogus -Wmaybe-uninitialized */ + TRASH_ALLOC(&crypt_pfx, sizeof crypt_pfx); + + if (srv_encrypt_log) { + crypt_block = static_cast<row_merge_block_t*>( + alloc.allocate_large(block_size, + &crypt_pfx)); + + if (crypt_block == NULL) { + DBUG_RETURN(DB_OUT_OF_MEMORY); + } + } + + trx_start_if_not_started_xa(trx, true); + ulint n_merge_files = 0; + + for (ulint i = 0; i < n_indexes; i++) + { + if (!dict_index_is_spatial(indexes[i])) { + n_merge_files++; + } + } + + merge_files = static_cast<merge_file_t*>( + ut_malloc_nokey(n_merge_files * sizeof *merge_files)); + + /* Initialize all the merge file descriptors, so that we + don't call row_merge_file_destroy() on uninitialized + merge file descriptor */ + + for (i = 0; i < n_merge_files; i++) { + merge_files[i].fd = OS_FILE_CLOSED; + merge_files[i].offset = 0; + merge_files[i].n_rec = 0; + } + + total_static_cost = COST_BUILD_INDEX_STATIC + * static_cast<double>(n_indexes) + COST_READ_CLUSTERED_INDEX; + total_dynamic_cost = COST_BUILD_INDEX_DYNAMIC + * static_cast<double>(n_indexes); + for (i = 0; i < n_indexes; i++) { + if (indexes[i]->type & DICT_FTS) { + ibool opt_doc_id_size = FALSE; + + /* To build FTS index, we would need to extract + doc's word, Doc ID, and word's position, so + we need to build a "fts sort index" indexing + on above three 'fields' */ + fts_sort_idx = row_merge_create_fts_sort_index( + indexes[i], old_table, &opt_doc_id_size); + + row_merge_dup_t* dup + = static_cast<row_merge_dup_t*>( + ut_malloc_nokey(sizeof *dup)); + dup->index = fts_sort_idx; + dup->table = table; + dup->col_map = col_map; + dup->n_dup = 0; + + /* This can fail e.g. if temporal files can't be + created */ + if (!row_fts_psort_info_init( + trx, dup, new_table, opt_doc_id_size, + old_table->space->zip_size(), + &psort_info, &merge_info)) { + error = DB_CORRUPTION; + goto func_exit; + } + + /* We need to ensure that we free the resources + allocated */ + fts_psort_initiated = true; + } + } + + if (global_system_variables.log_warnings > 2) { + sql_print_information("InnoDB: Online DDL : Start reading" + " clustered index of the table" + " and create temporary files"); + } + + pct_cost = COST_READ_CLUSTERED_INDEX * 100 / (total_static_cost + total_dynamic_cost); + + /* Do not continue if we can't encrypt table pages */ + if (!old_table->is_readable() || + !new_table->is_readable()) { + error = DB_DECRYPTION_FAILED; + ib_push_warning(trx->mysql_thd, DB_DECRYPTION_FAILED, + "Table %s is encrypted but encryption service or" + " used key_id is not available. " + " Can't continue reading table.", + !old_table->is_readable() ? old_table->name.m_name : + new_table->name.m_name); + goto func_exit; + } + + /* Read clustered index of the table and create files for + secondary index entries for merge sort */ + error = row_merge_read_clustered_index( + trx, table, old_table, new_table, online, indexes, + fts_sort_idx, psort_info, merge_files, key_numbers, + n_indexes, defaults, add_v, col_map, add_autoinc, + sequence, block, skip_pk_sort, &tmpfd, stage, + pct_cost, crypt_block, eval_table, allow_not_null, + col_collate); + + stage->end_phase_read_pk(); + + pct_progress += pct_cost; + + if (global_system_variables.log_warnings > 2) { + sql_print_information("InnoDB: Online DDL : End of reading " + "clustered index of the table" + " and create temporary files"); + } + + for (i = 0; i < n_merge_files; i++) { + total_index_blocks += merge_files[i].offset; + } + + if (error != DB_SUCCESS) { + goto func_exit; + } + + DEBUG_SYNC_C("row_merge_after_scan"); + + /* Now we have files containing index entries ready for + sorting and inserting. */ + + for (ulint k = 0, i = 0; i < n_indexes; i++) { + dict_index_t* sort_idx = indexes[i]; + + if (dict_index_is_spatial(sort_idx)) { + continue; + } + + if (indexes[i]->type & DICT_FTS) { + + sort_idx = fts_sort_idx; + + if (FTS_PLL_MERGE) { + row_fts_start_parallel_merge(merge_info); + for (j = 0; j < FTS_NUM_AUX_INDEX; j++) { + merge_info[j].task->wait(); + delete merge_info[j].task; + } + } else { + /* This cannot report duplicates; an + assertion would fail in that case. */ + error = row_fts_merge_insert( + sort_idx, new_table, + psort_info, 0); + } + +#ifdef FTS_INTERNAL_DIAG_PRINT + DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n"); +#endif + } else if (merge_files[k].fd != OS_FILE_CLOSED) { + char buf[NAME_LEN + 1]; + row_merge_dup_t dup = { + sort_idx, table, col_map, 0}; + + pct_cost = (COST_BUILD_INDEX_STATIC + + (total_dynamic_cost + * static_cast<double>(merge_files[k].offset) + / static_cast<double>(total_index_blocks))) + / (total_static_cost + total_dynamic_cost) + * PCT_COST_MERGESORT_INDEX * 100; + char* bufend = innobase_convert_name( + buf, sizeof buf, + indexes[i]->name, + strlen(indexes[i]->name), + trx->mysql_thd); + buf[bufend - buf]='\0'; + + if (global_system_variables.log_warnings > 2) { + sql_print_information("InnoDB: Online DDL :" + " Start merge-sorting" + " index %s" + " (" ULINTPF + " / " ULINTPF ")," + " estimated cost :" + " %2.4f", + buf, i + 1, n_indexes, + pct_cost); + } + + error = row_merge_sort( + trx, &dup, &merge_files[k], + block, &tmpfd, true, + pct_progress, pct_cost, + crypt_block, new_table->space_id, + stage); + + pct_progress += pct_cost; + + if (global_system_variables.log_warnings > 2) { + sql_print_information("InnoDB: Online DDL :" + " End of " + " merge-sorting index %s" + " (" ULINTPF + " / " ULINTPF ")", + buf, i + 1, n_indexes); + } + + if (error == DB_SUCCESS) { + BtrBulk btr_bulk(sort_idx, trx); + + pct_cost = (COST_BUILD_INDEX_STATIC + + (total_dynamic_cost + * static_cast<double>( + merge_files[k].offset) + / static_cast<double>( + total_index_blocks))) + / (total_static_cost + + total_dynamic_cost) + * PCT_COST_INSERT_INDEX * 100; + + if (global_system_variables.log_warnings > 2) { + sql_print_information( + "InnoDB: Online DDL : Start " + "building index %s" + " (" ULINTPF + " / " ULINTPF "), estimated " + "cost : %2.4f", buf, i + 1, + n_indexes, pct_cost); + } + + error = row_merge_insert_index_tuples( + sort_idx, old_table, + merge_files[k].fd, block, NULL, + &btr_bulk, + merge_files[k].n_rec, pct_progress, pct_cost, + crypt_block, new_table->space_id, + stage); + + error = btr_bulk.finish(error); + + pct_progress += pct_cost; + + if (global_system_variables.log_warnings > 2) { + sql_print_information( + "InnoDB: Online DDL : " + "End of building index %s" + " (" ULINTPF " / " ULINTPF ")", + buf, i + 1, n_indexes); + } + } + } + + /* Close the temporary file to free up space. */ + row_merge_file_destroy(&merge_files[k++]); + + if (indexes[i]->type & DICT_FTS) { + row_fts_psort_info_destroy(psort_info, merge_info); + fts_psort_initiated = false; + } else if (old_table != new_table) { + ut_ad(!sort_idx->online_log); + ut_ad(sort_idx->online_status + == ONLINE_INDEX_COMPLETE); + } + + if (old_table != new_table + || (indexes[i]->type & (DICT_FTS | DICT_SPATIAL)) + || error != DB_SUCCESS || !online) { + /* Do not apply any online log. */ + } else { + if (global_system_variables.log_warnings > 2) { + sql_print_information( + "InnoDB: Online DDL : Applying" + " log to index"); + } + + DEBUG_SYNC_C("row_log_apply_before"); + error = row_log_apply(trx, sort_idx, table, stage); + DEBUG_SYNC_C("row_log_apply_after"); + } + + if (error != DB_SUCCESS) { + trx->error_key_num = key_numbers[i]; + goto func_exit; + } + + if (indexes[i]->type & DICT_FTS + && UNIV_UNLIKELY(fts_enable_diag_print)) { + ib::info() << "Finished building full-text index " + << indexes[i]->name; + } + } + +func_exit: + + DBUG_EXECUTE_IF( + "ib_build_indexes_too_many_concurrent_trxs", + error = DB_TOO_MANY_CONCURRENT_TRXS; + trx->error_state = error;); + + if (fts_psort_initiated) { + /* Clean up FTS psort related resource */ + row_fts_psort_info_destroy(psort_info, merge_info); + fts_psort_initiated = false; + } + + row_merge_file_destroy_low(tmpfd); + + for (i = 0; i < n_merge_files; i++) { + row_merge_file_destroy(&merge_files[i]); + } + + if (fts_sort_idx) { + dict_mem_index_free(fts_sort_idx); + } + + ut_free(merge_files); + + alloc.deallocate_large(block, &block_pfx); + + if (crypt_block) { + alloc.deallocate_large(crypt_block, &crypt_pfx); + } + + DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID); + + if (online && old_table == new_table && error != DB_SUCCESS) { + /* On error, flag all online secondary index creation + as aborted. */ + for (i = 0; i < n_indexes; i++) { + ut_ad(!(indexes[i]->type & DICT_FTS)); + ut_ad(!indexes[i]->is_committed()); + ut_ad(!dict_index_is_clust(indexes[i])); + + /* Completed indexes should be dropped as + well, and indexes whose creation was aborted + should be dropped from the persistent + storage. However, at this point we can only + set some flags in the not-yet-published + indexes. These indexes will be dropped later + in row_merge_drop_indexes(), called by + rollback_inplace_alter_table(). */ + + switch (dict_index_get_online_status(indexes[i])) { + case ONLINE_INDEX_COMPLETE: + break; + case ONLINE_INDEX_CREATION: + indexes[i]->lock.x_lock(SRW_LOCK_CALL); + row_log_abort_sec(indexes[i]); + indexes[i]->type |= DICT_CORRUPT; + indexes[i]->lock.x_unlock(); + new_table->drop_aborted = TRUE; + /* fall through */ + case ONLINE_INDEX_ABORTED_DROPPED: + case ONLINE_INDEX_ABORTED: + MONITOR_ATOMIC_INC( + MONITOR_BACKGROUND_DROP_INDEX); + } + } + + dict_index_t *clust_index= new_table->indexes.start; + clust_index->lock.x_lock(SRW_LOCK_CALL); + ut_ad(!clust_index->online_log || + clust_index->online_log_is_dummy()); + clust_index->online_log= nullptr; + clust_index->lock.x_unlock(); + } + + DBUG_RETURN(error); +} + +dberr_t row_merge_bulk_t::alloc_block() +{ + if (m_block) + return DB_SUCCESS; + m_block= m_alloc.allocate_large_dontdump( + 3 * srv_sort_buf_size, &m_block_pfx); + if (m_block == nullptr) + return DB_OUT_OF_MEMORY; + + m_crypt_pfx.m_size= 0; + TRASH_ALLOC(&m_crypt_pfx, sizeof m_crypt_pfx); + if (srv_encrypt_log) + { + m_crypt_block= static_cast<row_merge_block_t*>( + m_alloc.allocate_large(3 * srv_sort_buf_size, &m_crypt_pfx)); + if (!m_crypt_block) + return DB_OUT_OF_MEMORY; + } + return DB_SUCCESS; +} + +row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table) +{ + ulint n_index= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (!index->is_btree()) + continue; + n_index++; + } + + m_merge_buf= static_cast<row_merge_buf_t*>( + ut_zalloc_nokey(n_index * sizeof *m_merge_buf)); + + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (!index->is_btree()) + continue; + + mem_heap_t *heap= mem_heap_create(100); + row_merge_buf_create_low(&m_merge_buf[i], heap, index); + i++; + } + + m_tmpfd= OS_FILE_CLOSED; + m_blob_file.fd= OS_FILE_CLOSED; + m_blob_file.offset= 0; + m_blob_file.n_rec= 0; +} + +row_merge_bulk_t::~row_merge_bulk_t() +{ + ulint i= 0; + dict_table_t *table= m_merge_buf[0].index->table; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (!index->is_btree()) + continue; + row_merge_buf_free(&m_merge_buf[i]); + if (m_merge_files) + row_merge_file_destroy(&m_merge_files[i]); + i++; + } + + row_merge_file_destroy_low(m_tmpfd); + + row_merge_file_destroy(&m_blob_file); + + ut_free(m_merge_buf); + + ut_free(m_merge_files); + + if (m_block) + m_alloc.deallocate_large(m_block, &m_block_pfx); + + if (m_crypt_block) + m_alloc.deallocate_large(m_crypt_block, &m_crypt_pfx); +} + +void row_merge_bulk_t::init_tmp_file() +{ + if (m_merge_files) + return; + + ulint n_index= 0; + dict_table_t *table= m_merge_buf[0].index->table; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (!index->is_btree()) + continue; + n_index++; + } + + m_merge_files= static_cast<merge_file_t*>( + ut_malloc_nokey(n_index * sizeof *m_merge_files)); + + for (ulint i= 0; i < n_index; i++) + { + m_merge_files[i].fd= OS_FILE_CLOSED; + m_merge_files[i].offset= 0; + m_merge_files[i].n_rec= 0; + } +} + +void row_merge_bulk_t::clean_bulk_buffer(ulint index_no) +{ + mem_heap_empty(m_merge_buf[index_no].heap); + m_merge_buf[index_no].total_size = m_merge_buf[index_no].n_tuples = 0; +} + +bool row_merge_bulk_t::create_tmp_file(ulint index_no) +{ + return row_merge_file_create_if_needed( + &m_merge_files[index_no], &m_tmpfd, + m_merge_buf[index_no].n_tuples, NULL); +} + +dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no) +{ + if (!create_tmp_file(index_no)) + return DB_OUT_OF_MEMORY; + merge_file_t *file= &m_merge_files[index_no]; + row_merge_buf_t *buf= &m_merge_buf[index_no]; + + alloc_block(); + + if (dberr_t err= row_merge_buf_write(buf, +#ifndef DBUG_OFF + file, +#endif + m_block, + index_no == 0 ? &m_blob_file : nullptr)) + return err; + + if (!row_merge_write(file->fd, file->offset++, + m_block, m_crypt_block, + buf->index->table->space->id)) + return DB_TEMP_FILE_WRITE_FAIL; + MEM_UNDEFINED(&m_block[0], srv_sort_buf_size); + return DB_SUCCESS; +} + +dberr_t row_merge_bulk_t::bulk_insert_buffered(const dtuple_t &row, + const dict_index_t &ind, + trx_t *trx) +{ + dberr_t err= DB_SUCCESS; + ulint i= 0; + mem_heap_t *large_tuple_heap= nullptr; + for (dict_index_t *index= UT_LIST_GET_FIRST(ind.table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (!index->is_btree()) + continue; + + if (index != &ind) + { + i++; + continue; + } + row_merge_buf_t *buf= &m_merge_buf[i]; +add_to_buf: + if (row_merge_bulk_buf_add(buf, *ind.table, row)) + { + i++; + goto func_exit; + } + + if (buf->n_tuples == 0) + { + /* Tuple data size is greater than srv_sort_buf_size */ + dtuple_t *big_tuple= row_merge_buf_large_tuple( + row, &m_blob_file, &large_tuple_heap); + if (row_merge_bulk_buf_add(buf, *ind.table, *big_tuple)) + { + i++; + goto func_exit; + } + } + + if (index->is_unique()) + { + row_merge_dup_t dup{index, nullptr, nullptr, 0}; + row_merge_buf_sort(buf, &dup); + if (dup.n_dup) + { + trx->error_info= index; + err= DB_DUPLICATE_KEY; + goto func_exit; + } + } + else + row_merge_buf_sort(buf, NULL); + init_tmp_file(); + merge_file_t *file= &m_merge_files[i]; + file->n_rec+= buf->n_tuples; + err= write_to_tmp_file(i); + if (err != DB_SUCCESS) + { + trx->error_info= index; + goto func_exit; + } + clean_bulk_buffer(i); + buf= &m_merge_buf[i]; + goto add_to_buf; + } + +func_exit: + if (large_tuple_heap) + mem_heap_free(large_tuple_heap); + return err; +} + +dberr_t row_merge_bulk_t::write_to_index(ulint index_no, trx_t *trx) +{ + dberr_t err= DB_SUCCESS; + row_merge_buf_t buf= m_merge_buf[index_no]; + merge_file_t *file= m_merge_files ? + &m_merge_files[index_no] : nullptr; + dict_index_t *index= buf.index; + dict_table_t *table= index->table; + BtrBulk btr_bulk(index, trx); + row_merge_dup_t dup = {index, nullptr, nullptr, 0}; + + if (buf.n_tuples) + { + if (dict_index_is_unique(index)) + { + row_merge_buf_sort(&buf, &dup); + if (dup.n_dup) + { + err= DB_DUPLICATE_KEY; + goto func_exit; + } + } + else row_merge_buf_sort(&buf, NULL); + if (file && file->fd != OS_FILE_CLOSED) + { + file->n_rec+= buf.n_tuples; + err= write_to_tmp_file(index_no); + if (err!= DB_SUCCESS) + goto func_exit; + } + else + { + /* Data got fit in merge buffer. */ + err= row_merge_insert_index_tuples( + index, table, OS_FILE_CLOSED, nullptr, + &buf, &btr_bulk, 0, 0, 0, nullptr, table->space_id, nullptr, + m_blob_file.fd == OS_FILE_CLOSED ? nullptr : &m_blob_file); + goto func_exit; + } + } + + err= row_merge_sort(trx, &dup, file, + m_block, &m_tmpfd, true, 0, 0, + m_crypt_block, table->space_id, nullptr); + if (err != DB_SUCCESS) + goto func_exit; + + err= row_merge_insert_index_tuples( + index, table, file->fd, m_block, nullptr, + &btr_bulk, 0, 0, 0, m_crypt_block, table->space_id, + nullptr, &m_blob_file); + +func_exit: + if (err != DB_SUCCESS) + trx->error_info= index; + else if (index->is_primary() && table->persistent_autoinc) + btr_write_autoinc(index, table->autoinc - 1); + err= btr_bulk.finish(err); + return err; +} + +dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx) +{ + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (!index->is_btree()) + continue; + + dberr_t err= write_to_index(i, trx); + if (err != DB_SUCCESS) + return err; + i++; + } + + return DB_SUCCESS; +} + +dberr_t trx_mod_table_time_t::write_bulk(dict_table_t *table, trx_t *trx) +{ + if (!bulk_store) + return DB_SUCCESS; + dberr_t err= bulk_store->write_to_table(table, trx); + delete bulk_store; + bulk_store= nullptr; + return err; +} + +dberr_t trx_t::bulk_insert_apply_low() +{ + ut_ad(bulk_insert); + ut_ad(!check_unique_secondary); + ut_ad(!check_foreigns); + dberr_t err; + for (auto& t : mod_tables) + if (t.second.is_bulk_insert()) + if ((err= t.second.write_bulk(t.first, this)) != DB_SUCCESS) + goto bulk_rollback; + return DB_SUCCESS; +bulk_rollback: + undo_no_t low_limit= UINT64_MAX; + for (auto& t : mod_tables) + { + if (t.second.is_bulk_insert()) + { + if (t.second.get_first() < low_limit) + low_limit= t.second.get_first(); + delete t.second.bulk_store; + t.second.bulk_store= nullptr; + } + } + trx_savept_t bulk_save{low_limit}; + rollback(&bulk_save); + return err; +} |