summaryrefslogtreecommitdiffstats
path: root/storage/innobase/row/row0ins.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--storage/innobase/row/row0ins.cc3843
1 files changed, 3843 insertions, 0 deletions
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
new file mode 100644
index 00000000..bdee0ed1
--- /dev/null
+++ b/storage/innobase/row/row0ins.cc
@@ -0,0 +1,3843 @@
+/*****************************************************************************
+
+Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2016, 2023, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file row/row0ins.cc
+Insert into a table
+
+Created 4/20/1996 Heikki Tuuri
+*******************************************************/
+
+#include "row0ins.h"
+#include "dict0dict.h"
+#include "trx0rec.h"
+#include "trx0undo.h"
+#include "btr0btr.h"
+#include "btr0cur.h"
+#include "mach0data.h"
+#include "ibuf0ibuf.h"
+#include "que0que.h"
+#include "row0upd.h"
+#include "row0sel.h"
+#include "rem0cmp.h"
+#include "lock0lock.h"
+#include "log0log.h"
+#include "eval0eval.h"
+#include "data0data.h"
+#include "buf0lru.h"
+#include "fts0fts.h"
+#include "fts0types.h"
+#ifdef BTR_CUR_HASH_ADAPT
+# include "btr0sea.h"
+#endif
+#ifdef WITH_WSREP
+#include <wsrep.h>
+#include <mysql/service_wsrep.h>
+#include "ha_prototypes.h"
+#endif /* WITH_WSREP */
+
+/*************************************************************************
+IMPORTANT NOTE: Any operation that generates redo MUST check that there
+is enough space in the redo log before for that operation. This is
+done by calling log_free_check(). The reason for checking the
+availability of the redo log space before the start of the operation is
+that we MUST not hold any synchonization objects when performing the
+check.
+If you make a change in this module make sure that no codepath is
+introduced where a call to log_free_check() is bypassed. */
+
+/** Create an row template for each index of a table. */
+static void ins_node_create_entry_list(ins_node_t *node)
+{
+ node->entry_list.reserve(UT_LIST_GET_LEN(node->table->indexes));
+
+ for (dict_index_t *index= dict_table_get_first_index(node->table); index;
+ index= dict_table_get_next_index(index))
+ {
+ /* Corrupted or incomplete secondary indexes will be filtered out in
+ row_ins(). */
+ dtuple_t *entry= index->online_status >= ONLINE_INDEX_ABORTED
+ ? dtuple_create(node->entry_sys_heap, 0)
+ : row_build_index_entry_low(node->row, NULL, index, node->entry_sys_heap,
+ ROW_BUILD_FOR_INSERT);
+ node->entry_list.push_back(entry);
+ }
+}
+
+/*****************************************************************//**
+Adds system field buffers to a row. */
+static
+void
+row_ins_alloc_sys_fields(
+/*=====================*/
+ ins_node_t* node) /*!< in: insert node */
+{
+ dtuple_t* row;
+ dict_table_t* table;
+ const dict_col_t* col;
+ dfield_t* dfield;
+
+ row = node->row;
+ table = node->table;
+
+ ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
+
+ /* allocate buffer to hold the needed system created hidden columns. */
+ compile_time_assert(DATA_ROW_ID_LEN
+ + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN
+ == sizeof node->sys_buf);
+ memset(node->sys_buf, 0, sizeof node->sys_buf);
+ /* Assign DB_ROLL_PTR to 1 << ROLL_PTR_INSERT_FLAG_POS */
+ node->sys_buf[DATA_ROW_ID_LEN + DATA_TRX_ID_LEN] = 0x80;
+ ut_ad(!memcmp(node->sys_buf + DATA_ROW_ID_LEN, reset_trx_id,
+ sizeof reset_trx_id));
+
+ /* 1. Populate row-id */
+ col = dict_table_get_sys_col(table, DATA_ROW_ID);
+
+ dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
+
+ dfield_set_data(dfield, node->sys_buf, DATA_ROW_ID_LEN);
+
+ /* 2. Populate trx id */
+ col = dict_table_get_sys_col(table, DATA_TRX_ID);
+
+ dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
+
+ dfield_set_data(dfield, &node->sys_buf[DATA_ROW_ID_LEN],
+ DATA_TRX_ID_LEN);
+
+ col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
+
+ dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
+
+ dfield_set_data(dfield, &node->sys_buf[DATA_ROW_ID_LEN
+ + DATA_TRX_ID_LEN],
+ DATA_ROLL_PTR_LEN);
+}
+
+/*********************************************************************//**
+Sets a new row to insert for an INS_DIRECT node. This function is only used
+if we have constructed the row separately, which is a rare case; this
+function is quite slow. */
+void
+ins_node_set_new_row(
+/*=================*/
+ ins_node_t* node, /*!< in: insert node */
+ dtuple_t* row) /*!< in: new row (or first row) for the node */
+{
+ node->state = INS_NODE_SET_IX_LOCK;
+ node->index = NULL;
+ node->entry_list.clear();
+ node->entry = node->entry_list.end();
+
+ node->row = row;
+
+ mem_heap_empty(node->entry_sys_heap);
+
+ /* Create templates for index entries */
+
+ ins_node_create_entry_list(node);
+
+ /* Allocate from entry_sys_heap buffers for sys fields */
+
+ row_ins_alloc_sys_fields(node);
+
+ /* As we allocated a new trx id buf, the trx id should be written
+ there again: */
+
+ node->trx_id = 0;
+}
+
+/*******************************************************************//**
+Does an insert operation by updating a delete-marked existing record
+in the index. This situation can occur if the delete-marked record is
+kept in the index for consistent reads.
+@return DB_SUCCESS or error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_sec_index_entry_by_modify(
+/*==============================*/
+ ulint flags, /*!< in: undo logging and locking flags */
+ ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_INSERT_TREE,
+ depending on whether mtr holds just a leaf
+ latch or also a tree latch */
+ btr_cur_t* cursor, /*!< in: B-tree cursor */
+ rec_offs** offsets,/*!< in/out: offsets on cursor->page_cur.rec */
+ mem_heap_t* offsets_heap,
+ /*!< in/out: memory heap that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ const dtuple_t* entry, /*!< in: index entry to insert */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr) /*!< in: mtr; must be committed before
+ latching any further pages */
+{
+ big_rec_t* dummy_big_rec;
+ upd_t* update;
+ rec_t* rec;
+ dberr_t err;
+
+ rec = btr_cur_get_rec(cursor);
+
+ ut_ad(!cursor->index()->is_clust());
+ ut_ad(rec_offs_validate(rec, cursor->index(), *offsets));
+ ut_ad(!entry->info_bits);
+
+ /* We know that in the alphabetical ordering, entry and rec are
+ identified. But in their binary form there may be differences if
+ there are char fields in them. Therefore we have to calculate the
+ difference. */
+
+ update = row_upd_build_sec_rec_difference_binary(
+ rec, cursor->index(), *offsets, entry, heap);
+
+ if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
+ /* We should never insert in place of a record that
+ has not been delete-marked. The only exception is when
+ online CREATE INDEX copied the changes that we already
+ made to the clustered index, and completed the
+ secondary index creation before we got here. In this
+ case, the change would already be there. The CREATE
+ INDEX should be in wait_while_table_is_used() at least
+ until this INSERT or UPDATE returns. After that point,
+ set_committed(true) would be invoked in
+ commit_inplace_alter_table(). */
+ ut_a(update->n_fields == 0);
+ ut_ad(!dict_index_is_online_ddl(cursor->index()));
+ return cursor->index()->is_committed()
+ ? DB_CORRUPTION : DB_SUCCESS;
+ }
+
+ if (mode == BTR_MODIFY_LEAF) {
+ /* Try an optimistic updating of the record, keeping changes
+ within the page */
+
+ /* TODO: pass only *offsets */
+ err = btr_cur_optimistic_update(
+ flags | BTR_KEEP_SYS_FLAG, cursor,
+ offsets, &offsets_heap, update, 0, thr,
+ thr_get_trx(thr)->id, mtr);
+ switch (err) {
+ case DB_OVERFLOW:
+ case DB_UNDERFLOW:
+ case DB_ZIP_OVERFLOW:
+ err = DB_FAIL;
+ default:
+ break;
+ }
+ } else {
+ ut_ad(mode == BTR_INSERT_TREE);
+ if (buf_pool.running_out()) {
+
+ return(DB_LOCK_TABLE_FULL);
+ }
+
+ err = btr_cur_pessimistic_update(
+ flags | BTR_KEEP_SYS_FLAG, cursor,
+ offsets, &offsets_heap,
+ heap, &dummy_big_rec, update, 0,
+ thr, thr_get_trx(thr)->id, mtr);
+ ut_ad(!dummy_big_rec);
+ }
+
+ return(err);
+}
+
+/*******************************************************************//**
+Does an insert operation by delete unmarking and updating a delete marked
+existing record in the index. This situation can occur if the delete marked
+record is kept in the index for consistent reads.
+@return DB_SUCCESS, DB_FAIL, or error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_clust_index_entry_by_modify(
+/*================================*/
+ btr_pcur_t* pcur, /*!< in/out: a persistent cursor pointing
+ to the clust_rec that is being modified. */
+ ulint flags, /*!< in: undo logging and locking flags */
+ ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
+ depending on whether mtr holds just a leaf
+ latch or also a tree latch */
+ rec_offs** offsets,/*!< out: offsets on cursor->page_cur.rec */
+ mem_heap_t** offsets_heap,
+ /*!< in/out: pointer to memory heap that can
+ be emptied, or NULL */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ const dtuple_t* entry, /*!< in: index entry to insert */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr) /*!< in: mtr; must be committed before
+ latching any further pages */
+{
+ const rec_t* rec;
+ upd_t* update;
+ dberr_t err = DB_SUCCESS;
+ btr_cur_t* cursor = btr_pcur_get_btr_cur(pcur);
+ TABLE* mysql_table = NULL;
+ ut_ad(cursor->index()->is_clust());
+
+ rec = btr_cur_get_rec(cursor);
+
+ ut_ad(rec_get_deleted_flag(rec,
+ cursor->index()->table->not_redundant()));
+ /* In delete-marked records, DB_TRX_ID must
+ always refer to an existing undo log record. */
+ ut_ad(rec_get_trx_id(rec, cursor->index()));
+
+ /* Build an update vector containing all the fields to be modified;
+ NOTE that this vector may NOT contain system columns trx_id or
+ roll_ptr */
+ if (thr->prebuilt != NULL) {
+ mysql_table = thr->prebuilt->m_mysql_table;
+ ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
+ }
+
+ update = row_upd_build_difference_binary(
+ cursor->index(), entry, rec, NULL, true, true,
+ thr_get_trx(thr), heap, mysql_table, &err);
+ if (err != DB_SUCCESS) {
+ return(err);
+ }
+
+ if (mode != BTR_MODIFY_TREE) {
+ ut_ad(mode == BTR_MODIFY_LEAF
+ || mode == BTR_MODIFY_LEAF_ALREADY_LATCHED
+ || mode == BTR_MODIFY_ROOT_AND_LEAF
+ || mode == BTR_MODIFY_ROOT_AND_LEAF_ALREADY_LATCHED);
+
+ /* Try optimistic updating of the record, keeping changes
+ within the page */
+
+ err = btr_cur_optimistic_update(
+ flags, cursor, offsets, offsets_heap, update, 0, thr,
+ thr_get_trx(thr)->id, mtr);
+ switch (err) {
+ case DB_OVERFLOW:
+ case DB_UNDERFLOW:
+ case DB_ZIP_OVERFLOW:
+ err = DB_FAIL;
+ default:
+ break;
+ }
+ } else {
+ if (buf_pool.running_out()) {
+ return DB_LOCK_TABLE_FULL;
+ }
+
+ big_rec_t* big_rec = NULL;
+
+ err = btr_cur_pessimistic_update(
+ flags | BTR_KEEP_POS_FLAG,
+ cursor, offsets, offsets_heap, heap,
+ &big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
+
+ if (big_rec) {
+ ut_a(err == DB_SUCCESS);
+
+ DEBUG_SYNC_C("before_row_ins_upd_extern");
+ err = btr_store_big_rec_extern_fields(
+ pcur, *offsets, big_rec, mtr,
+ BTR_STORE_INSERT_UPDATE);
+ DEBUG_SYNC_C("after_row_ins_upd_extern");
+ dtuple_big_rec_free(big_rec);
+ }
+ }
+
+ return(err);
+}
+
+/*********************************************************************//**
+Returns TRUE if in a cascaded update/delete an ancestor node of node
+updates (not DELETE, but UPDATE) table.
+@return TRUE if an ancestor updates table */
+static
+ibool
+row_ins_cascade_ancestor_updates_table(
+/*===================================*/
+ que_node_t* node, /*!< in: node in a query graph */
+ dict_table_t* table) /*!< in: table */
+{
+ que_node_t* parent;
+
+ for (parent = que_node_get_parent(node);
+ que_node_get_type(parent) == QUE_NODE_UPDATE;
+ parent = que_node_get_parent(parent)) {
+
+ upd_node_t* upd_node;
+
+ upd_node = static_cast<upd_node_t*>(parent);
+
+ if (upd_node->table == table && !upd_node->is_delete) {
+
+ return(TRUE);
+ }
+ }
+
+ return(FALSE);
+}
+
+/*********************************************************************//**
+Returns the number of ancestor UPDATE or DELETE nodes of a
+cascaded update/delete node.
+@return number of ancestors */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+ulint
+row_ins_cascade_n_ancestors(
+/*========================*/
+ que_node_t* node) /*!< in: node in a query graph */
+{
+ que_node_t* parent;
+ ulint n_ancestors = 0;
+
+ for (parent = que_node_get_parent(node);
+ que_node_get_type(parent) == QUE_NODE_UPDATE;
+ parent = que_node_get_parent(parent)) {
+
+ n_ancestors++;
+ }
+
+ return(n_ancestors);
+}
+
+/******************************************************************//**
+Calculates the update vector node->cascade->update for a child table in
+a cascaded update.
+@return whether any FULLTEXT INDEX is affected */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+bool
+row_ins_cascade_calc_update_vec(
+/*============================*/
+ upd_node_t* node, /*!< in: update node of the parent
+ table */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint whose
+ type is != 0 */
+ mem_heap_t* heap, /*!< in: memory heap to use as
+ temporary storage */
+ trx_t* trx) /*!< in: update transaction */
+{
+ upd_node_t* cascade = node->cascade_node;
+ dict_table_t* table = foreign->foreign_table;
+ dict_index_t* index = foreign->foreign_index;
+ upd_t* update;
+ dict_table_t* parent_table;
+ dict_index_t* parent_index;
+ upd_t* parent_update;
+ ulint n_fields_updated;
+ ulint parent_field_no;
+ ulint i;
+ ulint j;
+ bool doc_id_updated = false;
+ unsigned doc_id_pos = 0;
+ doc_id_t new_doc_id = FTS_NULL_DOC_ID;
+ ulint prefix_col;
+
+ ut_a(cascade);
+ ut_a(table);
+ ut_a(index);
+
+ /* Calculate the appropriate update vector which will set the fields
+ in the child index record to the same value (possibly padded with
+ spaces if the column is a fixed length CHAR or FIXBINARY column) as
+ the referenced index record will get in the update. */
+
+ parent_table = node->table;
+ ut_a(parent_table == foreign->referenced_table);
+ parent_index = foreign->referenced_index;
+ parent_update = node->update;
+
+ update = cascade->update;
+
+ update->info_bits = 0;
+
+ n_fields_updated = 0;
+
+ bool affects_fulltext = foreign->affects_fulltext();
+
+ if (table->fts) {
+ doc_id_pos = dict_table_get_nth_col_pos(
+ table, table->fts->doc_col, &prefix_col);
+ }
+
+ for (i = 0; i < foreign->n_fields; i++) {
+
+ parent_field_no = dict_table_get_nth_col_pos(
+ parent_table,
+ dict_index_get_nth_col_no(parent_index, i),
+ &prefix_col);
+
+ for (j = 0; j < parent_update->n_fields; j++) {
+ const upd_field_t* parent_ufield
+ = &parent_update->fields[j];
+
+ if (parent_ufield->field_no == parent_field_no) {
+
+ ulint min_size;
+ const dict_col_t* col;
+ ulint ufield_len;
+ upd_field_t* ufield;
+
+ col = dict_index_get_nth_col(index, i);
+
+ /* A field in the parent index record is
+ updated. Let us make the update vector
+ field for the child table. */
+
+ ufield = update->fields + n_fields_updated;
+
+ ufield->field_no = static_cast<uint16_t>(
+ dict_table_get_nth_col_pos(
+ table, dict_col_get_no(col),
+ &prefix_col));
+
+ ufield->orig_len = 0;
+ ufield->exp = NULL;
+
+ ufield->new_val = parent_ufield->new_val;
+ dfield_get_type(&ufield->new_val)->prtype |=
+ col->prtype & DATA_VERSIONED;
+ ufield_len = dfield_get_len(&ufield->new_val);
+
+ /* Clear the "external storage" flag */
+ dfield_set_len(&ufield->new_val, ufield_len);
+
+ /* Do not allow a NOT NULL column to be
+ updated as NULL */
+
+ if (dfield_is_null(&ufield->new_val)
+ && (col->prtype & DATA_NOT_NULL)) {
+ goto err_exit;
+ }
+
+ /* If the new value would not fit in the
+ column, do not allow the update */
+
+ if (!dfield_is_null(&ufield->new_val)
+ && dtype_get_at_most_n_mbchars(
+ col->prtype,
+ col->mbminlen, col->mbmaxlen,
+ col->len,
+ ufield_len,
+ static_cast<char*>(
+ dfield_get_data(
+ &ufield->new_val)))
+ < ufield_len) {
+ goto err_exit;
+ }
+
+ /* If the parent column type has a different
+ length than the child column type, we may
+ need to pad with spaces the new value of the
+ child column */
+
+ min_size = dict_col_get_min_size(col);
+
+ /* Because UNIV_SQL_NULL (the marker
+ of SQL NULL values) exceeds all possible
+ values of min_size, the test below will
+ not hold for SQL NULL columns. */
+
+ if (min_size > ufield_len) {
+
+ byte* pad;
+ ulint pad_len;
+ byte* padded_data;
+ ulint mbminlen;
+
+ padded_data = static_cast<byte*>(
+ mem_heap_alloc(
+ heap, min_size));
+
+ pad = padded_data + ufield_len;
+ pad_len = min_size - ufield_len;
+
+ memcpy(padded_data,
+ dfield_get_data(&ufield
+ ->new_val),
+ ufield_len);
+
+ mbminlen = dict_col_get_mbminlen(col);
+
+ ut_ad(!(ufield_len % mbminlen));
+ ut_ad(!(min_size % mbminlen));
+
+ if (mbminlen == 1
+ && dtype_get_charset_coll(
+ col->prtype)
+ == DATA_MYSQL_BINARY_CHARSET_COLL) {
+ /* Do not pad BINARY columns */
+ goto err_exit;
+ }
+
+ row_mysql_pad_col(mbminlen,
+ pad, pad_len);
+ dfield_set_data(&ufield->new_val,
+ padded_data, min_size);
+ }
+
+ /* If Doc ID is updated, check whether the
+ Doc ID is valid */
+ if (table->fts
+ && ufield->field_no == doc_id_pos) {
+ doc_id_t n_doc_id;
+
+ n_doc_id =
+ table->fts->cache->next_doc_id;
+
+ new_doc_id = fts_read_doc_id(
+ static_cast<const byte*>(
+ dfield_get_data(
+ &ufield->new_val)));
+
+ affects_fulltext = true;
+ doc_id_updated = true;
+
+ if (new_doc_id <= 0) {
+ ib::error() << "FTS Doc ID"
+ " must be larger than"
+ " 0";
+ goto err_exit;
+ }
+
+ if (new_doc_id < n_doc_id) {
+ ib::error() << "FTS Doc ID"
+ " must be larger than "
+ << n_doc_id - 1
+ << " for table "
+ << table->name;
+ goto err_exit;
+ }
+ }
+
+ n_fields_updated++;
+ }
+ }
+ }
+
+ if (affects_fulltext) {
+ ut_ad(table->fts);
+
+ if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
+ doc_id_t doc_id;
+ doc_id_t* next_doc_id;
+ upd_field_t* ufield;
+
+ next_doc_id = static_cast<doc_id_t*>(mem_heap_alloc(
+ heap, sizeof(doc_id_t)));
+
+ ut_ad(!doc_id_updated);
+ ufield = update->fields + n_fields_updated;
+ fts_get_next_doc_id(table, next_doc_id);
+ doc_id = fts_update_doc_id(table, ufield, next_doc_id);
+ n_fields_updated++;
+ fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
+ } else {
+ if (doc_id_updated) {
+ ut_ad(new_doc_id);
+ fts_trx_add_op(trx, table, new_doc_id,
+ FTS_INSERT, NULL);
+ } else {
+ ib::error() << "FTS Doc ID must be updated"
+ " along with FTS indexed column for"
+ " table " << table->name;
+err_exit:
+ n_fields_updated = ULINT_UNDEFINED;
+ }
+ }
+ }
+
+ update->n_fields = n_fields_updated;
+
+ return affects_fulltext;
+}
+
+/*********************************************************************//**
+Set detailed error message associated with foreign key errors for
+the given transaction. */
+static
+void
+row_ins_set_detailed(
+/*=================*/
+ trx_t* trx, /*!< in: transaction */
+ dict_foreign_t* foreign) /*!< in: foreign key constraint */
+{
+ ut_ad(!srv_read_only_mode);
+
+ mysql_mutex_lock(&srv_misc_tmpfile_mutex);
+ rewind(srv_misc_tmpfile);
+
+ if (os_file_set_eof(srv_misc_tmpfile)) {
+ ut_print_name(srv_misc_tmpfile, trx,
+ foreign->foreign_table_name);
+ std::string fk_str = dict_print_info_on_foreign_key_in_create_format(
+ trx, foreign, FALSE);
+ fputs(fk_str.c_str(), srv_misc_tmpfile);
+ trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
+ } else {
+ trx_set_detailed_error(trx, "temp file operation failed");
+ }
+
+ mysql_mutex_unlock(&srv_misc_tmpfile_mutex);
+}
+
+/*********************************************************************//**
+Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
+and displays information about the given transaction.
+The caller must release dict_foreign_err_mutex. */
+TRANSACTIONAL_TARGET
+static
+void
+row_ins_foreign_trx_print(
+/*======================*/
+ trx_t* trx) /*!< in: transaction */
+{
+ ulint n_rec_locks;
+ ulint n_trx_locks;
+ ulint heap_size;
+
+ ut_ad(!srv_read_only_mode);
+
+ {
+ TMLockMutexGuard g{SRW_LOCK_CALL};
+ n_rec_locks = trx->lock.n_rec_locks;
+ n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
+ heap_size = mem_heap_get_size(trx->lock.lock_heap);
+ }
+
+ mysql_mutex_lock(&dict_foreign_err_mutex);
+ rewind(dict_foreign_err_file);
+ ut_print_timestamp(dict_foreign_err_file);
+ fputs(" Transaction:\n", dict_foreign_err_file);
+
+ trx_print_low(dict_foreign_err_file, trx, 600,
+ n_rec_locks, n_trx_locks, heap_size);
+
+ mysql_mutex_assert_owner(&dict_foreign_err_mutex);
+}
+
+/*********************************************************************//**
+Reports a foreign key error associated with an update or a delete of a
+parent table index entry. */
+static
+void
+row_ins_foreign_report_err(
+/*=======================*/
+ const char* errstr, /*!< in: error string from the viewpoint
+ of the parent table */
+ que_thr_t* thr, /*!< in: query thread whose run_node
+ is an update node */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint */
+ const rec_t* rec, /*!< in: a matching index record in the
+ child table */
+ const dtuple_t* entry) /*!< in: index entry in the parent
+ table */
+{
+ std::string fk_str;
+
+ if (srv_read_only_mode) {
+ return;
+ }
+
+ FILE* ef = dict_foreign_err_file;
+ trx_t* trx = thr_get_trx(thr);
+
+ row_ins_set_detailed(trx, foreign);
+
+ row_ins_foreign_trx_print(trx);
+
+ fputs("Foreign key constraint fails for table ", ef);
+ ut_print_name(ef, trx, foreign->foreign_table_name);
+ fputs(":\n", ef);
+ fk_str = dict_print_info_on_foreign_key_in_create_format(trx, foreign,
+ TRUE);
+ fputs(fk_str.c_str(), ef);
+ putc('\n', ef);
+ fputs(errstr, ef);
+ fprintf(ef, " in parent table, in index %s",
+ foreign->referenced_index->name());
+ if (entry) {
+ fputs(" tuple:\n", ef);
+ dtuple_print(ef, entry);
+ }
+ fputs("\nBut in child table ", ef);
+ ut_print_name(ef, trx, foreign->foreign_table_name);
+ fprintf(ef, ", in index %s", foreign->foreign_index->name());
+ if (rec) {
+ fputs(", there is a record:\n", ef);
+ rec_print(ef, rec, foreign->foreign_index);
+ } else {
+ fputs(", the record is not available\n", ef);
+ }
+ putc('\n', ef);
+
+ mysql_mutex_unlock(&dict_foreign_err_mutex);
+}
+
+/*********************************************************************//**
+Reports a foreign key error to dict_foreign_err_file when we are trying
+to add an index entry to a child table. Note that the adding may be the result
+of an update, too. */
+static
+void
+row_ins_foreign_report_add_err(
+/*===========================*/
+ trx_t* trx, /*!< in: transaction */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint */
+ const rec_t* rec, /*!< in: a record in the parent table:
+ it does not match entry because we
+ have an error! */
+ const dtuple_t* entry) /*!< in: index entry to insert in the
+ child table */
+{
+ std::string fk_str;
+
+ if (srv_read_only_mode) {
+ return;
+ }
+
+ FILE* ef = dict_foreign_err_file;
+
+ row_ins_set_detailed(trx, foreign);
+
+ row_ins_foreign_trx_print(trx);
+
+ fputs("Foreign key constraint fails for table ", ef);
+ ut_print_name(ef, trx, foreign->foreign_table_name);
+ fputs(":\n", ef);
+ fk_str = dict_print_info_on_foreign_key_in_create_format(trx, foreign,
+ TRUE);
+ fputs(fk_str.c_str(), ef);
+ if (foreign->foreign_index) {
+ fprintf(ef, " in parent table, in index %s",
+ foreign->foreign_index->name());
+ } else {
+ fputs(" in parent table", ef);
+ }
+ if (entry) {
+ fputs(" tuple:\n", ef);
+ /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
+ It would be better to only display the user columns. */
+ dtuple_print(ef, entry);
+ }
+ fputs("\nBut in parent table ", ef);
+ ut_print_name(ef, trx, foreign->referenced_table_name);
+ fprintf(ef, ", in index %s,\n"
+ "the closest match we can find is record:\n",
+ foreign->referenced_index->name());
+ if (rec && page_rec_is_supremum(rec)) {
+ /* If the cursor ended on a supremum record, it is better
+ to report the previous record in the error message, so that
+ the user gets a more descriptive error message. */
+ rec = page_rec_get_prev_const(rec);
+ }
+
+ if (rec) {
+ rec_print(ef, rec, foreign->referenced_index);
+ }
+ putc('\n', ef);
+
+ mysql_mutex_unlock(&dict_foreign_err_mutex);
+}
+
+/*********************************************************************//**
+Invalidate the query cache for the given table. */
+static
+void
+row_ins_invalidate_query_cache(
+/*===========================*/
+ que_thr_t* thr, /*!< in: query thread whose run_node
+ is an update node */
+ const char* name) /*!< in: table name prefixed with
+ database name and a '/' character */
+{
+ innobase_invalidate_query_cache(thr_get_trx(thr), name);
+}
+
+/** Fill virtual column information in cascade node for the child table.
+@param[out] cascade child update node
+@param[in] rec clustered rec of child table
+@param[in] index clustered index of child table
+@param[in] node parent update node
+@param[in] foreign foreign key information
+@return error code. */
+static
+dberr_t
+row_ins_foreign_fill_virtual(
+ upd_node_t* cascade,
+ const rec_t* rec,
+ dict_index_t* index,
+ upd_node_t* node,
+ dict_foreign_t* foreign)
+{
+ THD* thd = current_thd;
+ row_ext_t* ext;
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs_init(offsets_);
+ const rec_offs* offsets =
+ rec_get_offsets(rec, index, offsets_, index->n_core_fields,
+ ULINT_UNDEFINED, &cascade->heap);
+ TABLE* mysql_table= NULL;
+ upd_t* update = cascade->update;
+ ulint n_v_fld = index->table->n_v_def;
+ ulint n_diff;
+ upd_field_t* upd_field;
+ dict_vcol_set* v_cols = foreign->v_cols;
+ update->old_vrow = row_build(
+ ROW_COPY_DATA, index, rec,
+ offsets, index->table, NULL, NULL,
+ &ext, update->heap);
+ n_diff = update->n_fields;
+
+ ut_ad(index->table->vc_templ != NULL);
+
+ ib_vcol_row vc(NULL);
+ uchar *record = vc.record(thd, index, &mysql_table);
+ if (!record) {
+ return DB_OUT_OF_MEMORY;
+ }
+ ut_ad(!node->is_delete
+ || (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL));
+ ut_ad(foreign->type & (DICT_FOREIGN_ON_DELETE_SET_NULL
+ | DICT_FOREIGN_ON_UPDATE_SET_NULL
+ | DICT_FOREIGN_ON_UPDATE_CASCADE));
+
+ for (uint16_t i = 0; i < n_v_fld; i++) {
+
+ dict_v_col_t* col = dict_table_get_nth_v_col(
+ index->table, i);
+
+ dict_vcol_set::iterator it = v_cols->find(col);
+
+ if (it == v_cols->end()) {
+ continue;
+ }
+
+ dfield_t* vfield = innobase_get_computed_value(
+ update->old_vrow, col, index,
+ &vc.heap, update->heap, NULL, thd, mysql_table,
+ record, NULL, NULL);
+
+ if (vfield == NULL) {
+ return DB_COMPUTE_VALUE_FAILED;
+ }
+
+ upd_field = update->fields + n_diff;
+
+ upd_field->old_v_val = static_cast<dfield_t*>(
+ mem_heap_alloc(update->heap,
+ sizeof *upd_field->old_v_val));
+
+ dfield_copy(upd_field->old_v_val, vfield);
+
+ upd_field_set_v_field_no(upd_field, i, index);
+
+ dfield_t* new_vfield = innobase_get_computed_value(
+ update->old_vrow, col, index,
+ &vc.heap, update->heap, NULL, thd,
+ mysql_table, record, NULL,
+ update);
+
+ if (new_vfield == NULL) {
+ return DB_COMPUTE_VALUE_FAILED;
+ }
+
+ dfield_copy(&upd_field->new_val, new_vfield);
+
+ if (!dfield_datas_are_binary_equal(
+ upd_field->old_v_val,
+ &upd_field->new_val, 0))
+ n_diff++;
+ }
+
+ update->n_fields = n_diff;
+ return DB_SUCCESS;
+}
+
+#ifdef WITH_WSREP
+dberr_t wsrep_append_foreign_key(trx_t *trx,
+ dict_foreign_t* foreign,
+ const rec_t* clust_rec,
+ dict_index_t* clust_index,
+ bool referenced,
+ upd_node_t* upd_node,
+ bool pa_disable,
+ Wsrep_service_key_type key_type);
+#endif /* WITH_WSREP */
+
+/*********************************************************************//**
+Perform referential actions or checks when a parent row is deleted or updated
+and the constraint had an ON DELETE or ON UPDATE condition which was not
+RESTRICT.
+@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_foreign_check_on_constraint(
+/*================================*/
+ que_thr_t* thr, /*!< in: query thread whose run_node
+ is an update node */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint whose
+ type is != 0 */
+ btr_pcur_t* pcur, /*!< in: cursor placed on a matching
+ index record in the child table */
+ dtuple_t* entry, /*!< in: index entry in the parent
+ table */
+ mtr_t* mtr) /*!< in: mtr holding the latch of pcur
+ page */
+{
+ upd_node_t* node;
+ upd_node_t* cascade;
+ dict_table_t*const*const fktable = &foreign->foreign_table;
+ dict_table_t* table = *fktable;
+ dict_index_t* index;
+ dict_index_t* clust_index;
+ dtuple_t* ref;
+ const rec_t* rec;
+ const rec_t* clust_rec;
+ const buf_block_t* clust_block;
+ upd_t* update;
+ dberr_t err;
+ trx_t* trx;
+ mem_heap_t* tmp_heap = NULL;
+ doc_id_t doc_id = FTS_NULL_DOC_ID;
+
+ DBUG_ENTER("row_ins_foreign_check_on_constraint");
+
+ trx = thr_get_trx(thr);
+
+ /* Since we are going to delete or update a row, we have to invalidate
+ the MySQL query cache for table. A deadlock of threads is not possible
+ here because the caller of this function does not hold any latches with
+ the mutex rank above the lock_sys.latch. The query cache mutex
+ has a rank just above the lock_sys.latch. */
+
+ row_ins_invalidate_query_cache(thr, table->name.m_name);
+
+ node = static_cast<upd_node_t*>(thr->run_node);
+
+ if (node->is_delete && 0 == (foreign->type
+ & (DICT_FOREIGN_ON_DELETE_CASCADE
+ | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
+
+ row_ins_foreign_report_err("Trying to delete",
+ thr, foreign,
+ btr_pcur_get_rec(pcur), entry);
+
+ DBUG_RETURN(DB_ROW_IS_REFERENCED);
+ }
+
+ if (!node->is_delete && 0 == (foreign->type
+ & (DICT_FOREIGN_ON_UPDATE_CASCADE
+ | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
+
+ /* This is an UPDATE */
+
+ row_ins_foreign_report_err("Trying to update",
+ thr, foreign,
+ btr_pcur_get_rec(pcur), entry);
+
+ DBUG_RETURN(DB_ROW_IS_REFERENCED);
+ }
+
+ if (node->cascade_node == NULL) {
+ node->cascade_heap = mem_heap_create(128);
+ node->cascade_node = row_create_update_node_for_mysql(
+ table, node->cascade_heap);
+ que_node_set_parent(node->cascade_node, node);
+
+ }
+ cascade = node->cascade_node;
+ cascade->table = table;
+ cascade->foreign = foreign;
+
+ if (node->is_delete
+ && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
+ cascade->is_delete = PLAIN_DELETE;
+ } else {
+ cascade->is_delete = NO_DELETE;
+
+ if (foreign->n_fields > cascade->update_n_fields) {
+ /* We have to make the update vector longer */
+
+ cascade->update = upd_create(foreign->n_fields,
+ node->cascade_heap);
+ cascade->update_n_fields = foreign->n_fields;
+ }
+
+ /* We do not allow cyclic cascaded updating (DELETE is
+ allowed, but not UPDATE) of the same table, as this
+ can lead to an infinite cycle. Check that we are not
+ updating the same table which is already being
+ modified in this cascade chain. We have to check this
+ also because the modification of the indexes of a
+ 'parent' table may still be incomplete, and we must
+ avoid seeing the indexes of the parent table in an
+ inconsistent state! */
+
+ if (row_ins_cascade_ancestor_updates_table(cascade, table)) {
+
+ /* We do not know if this would break foreign key
+ constraints, but play safe and return an error */
+
+ err = DB_ROW_IS_REFERENCED;
+
+ row_ins_foreign_report_err(
+ "Trying an update, possibly causing a cyclic"
+ " cascaded update\n"
+ "in the child table,", thr, foreign,
+ btr_pcur_get_rec(pcur), entry);
+
+ goto nonstandard_exit_func;
+ }
+ }
+
+ if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
+ err = DB_FOREIGN_EXCEED_MAX_CASCADE;
+
+ row_ins_foreign_report_err(
+ "Trying a too deep cascaded delete or update\n",
+ thr, foreign, btr_pcur_get_rec(pcur), entry);
+
+ goto nonstandard_exit_func;
+ }
+
+ index = pcur->index();
+
+ ut_a(index == foreign->foreign_index);
+
+ rec = btr_pcur_get_rec(pcur);
+
+ tmp_heap = mem_heap_create(256);
+
+ if (dict_index_is_clust(index)) {
+ /* pcur is already positioned in the clustered index of
+ the child table */
+
+ clust_index = index;
+ clust_rec = rec;
+ clust_block = btr_pcur_get_block(pcur);
+ } else {
+ /* We have to look for the record in the clustered index
+ in the child table */
+
+ clust_index = dict_table_get_first_index(table);
+
+ ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
+ tmp_heap);
+ cascade->pcur->old_rec = nullptr;
+ cascade->pcur->btr_cur.page_cur.index = clust_index;
+ err = btr_pcur_open_with_no_init(ref,
+ PAGE_CUR_LE, BTR_SEARCH_LEAF,
+ cascade->pcur, mtr);
+ if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
+ goto nonstandard_exit_func;
+ }
+
+ clust_rec = btr_pcur_get_rec(cascade->pcur);
+ clust_block = btr_pcur_get_block(cascade->pcur);
+
+ if (!page_rec_is_user_rec(clust_rec)
+ || btr_pcur_get_low_match(cascade->pcur)
+ < dict_index_get_n_unique(clust_index)) {
+
+ ib::error() << "In cascade of a foreign key op index "
+ << index->name
+ << " of table " << index->table->name;
+
+ fputs("InnoDB: record ", stderr);
+ rec_print(stderr, rec, index);
+ fputs("\n"
+ "InnoDB: clustered record ", stderr);
+ rec_print(stderr, clust_rec, clust_index);
+ fputs("\n"
+ "InnoDB: Submit a detailed bug report to"
+ " https://jira.mariadb.org/\n", stderr);
+ ut_ad(0);
+ err = DB_SUCCESS;
+
+ goto nonstandard_exit_func;
+ }
+ }
+
+ /* Set an X-lock on the row to delete or update in the child table */
+
+ err = lock_table(table, fktable, LOCK_IX, thr);
+
+ if (err == DB_SUCCESS) {
+ /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
+ we already have a normal shared lock on the appropriate
+ gap if the search criterion was not unique */
+
+ err = lock_clust_rec_read_check_and_lock_alt(
+ 0, clust_block, clust_rec, clust_index,
+ LOCK_X, LOCK_REC_NOT_GAP, thr);
+ }
+
+ if (err != DB_SUCCESS) {
+
+ goto nonstandard_exit_func;
+ }
+
+ if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
+ /* In delete-marked records, DB_TRX_ID must
+ always refer to an existing undo log record. */
+ ut_ad(rec_get_trx_id(clust_rec, clust_index));
+ /* This can happen if there is a circular reference of
+ rows such that cascading delete comes to delete a row
+ already in the process of being delete marked */
+ err = DB_SUCCESS;
+
+ goto nonstandard_exit_func;
+ }
+
+ if (table->fts) {
+ doc_id = fts_get_doc_id_from_rec(
+ clust_rec, clust_index,
+ rec_get_offsets(clust_rec, clust_index, NULL,
+ clust_index->n_core_fields,
+ ULINT_UNDEFINED, &tmp_heap));
+ }
+
+ if (node->is_delete
+ ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
+ : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
+ /* Build the appropriate update vector which sets
+ foreign->n_fields first fields in rec to SQL NULL */
+
+ update = cascade->update;
+
+ update->info_bits = 0;
+ update->n_fields = foreign->n_fields;
+ MEM_UNDEFINED(update->fields,
+ update->n_fields * sizeof *update->fields);
+
+ for (ulint i = 0; i < foreign->n_fields; i++) {
+ upd_field_t* ufield = &update->fields[i];
+ ulint col_no = dict_index_get_nth_col_no(
+ index, i);
+ ulint prefix_col;
+
+ ufield->field_no = static_cast<uint16_t>(
+ dict_table_get_nth_col_pos(
+ table, col_no, &prefix_col));
+ dict_col_t* col = dict_table_get_nth_col(
+ table, col_no);
+ dict_col_copy_type(col, dfield_get_type(&ufield->new_val));
+
+ ufield->orig_len = 0;
+ ufield->exp = NULL;
+ dfield_set_null(&ufield->new_val);
+ }
+
+ if (foreign->affects_fulltext()) {
+ fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
+ }
+
+ if (foreign->v_cols != NULL
+ && foreign->v_cols->size() > 0) {
+ err = row_ins_foreign_fill_virtual(
+ cascade, clust_rec, clust_index,
+ node, foreign);
+
+ if (err != DB_SUCCESS) {
+ goto nonstandard_exit_func;
+ }
+ }
+ } else if (table->fts && cascade->is_delete == PLAIN_DELETE
+ && foreign->affects_fulltext()) {
+ /* DICT_FOREIGN_ON_DELETE_CASCADE case */
+ fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
+ }
+
+ if (!node->is_delete
+ && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
+
+ /* Build the appropriate update vector which sets changing
+ foreign->n_fields first fields in rec to new values */
+
+ bool affects_fulltext = row_ins_cascade_calc_update_vec(
+ node, foreign, tmp_heap, trx);
+
+ if (foreign->v_cols && !foreign->v_cols->empty()) {
+ err = row_ins_foreign_fill_virtual(
+ cascade, clust_rec, clust_index,
+ node, foreign);
+
+ if (err != DB_SUCCESS) {
+ goto nonstandard_exit_func;
+ }
+ }
+
+ switch (cascade->update->n_fields) {
+ case ULINT_UNDEFINED:
+ err = DB_ROW_IS_REFERENCED;
+
+ row_ins_foreign_report_err(
+ "Trying a cascaded update where the"
+ " updated value in the child\n"
+ "table would not fit in the length"
+ " of the column, or the value would\n"
+ "be NULL and the column is"
+ " declared as not NULL in the child table,",
+ thr, foreign, btr_pcur_get_rec(pcur), entry);
+
+ goto nonstandard_exit_func;
+ case 0:
+ /* The update does not change any columns referred
+ to in this foreign key constraint: no need to do
+ anything */
+
+ err = DB_SUCCESS;
+
+ goto nonstandard_exit_func;
+ }
+
+ /* Mark the old Doc ID as deleted */
+ if (affects_fulltext) {
+ ut_ad(table->fts);
+ fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
+ }
+ }
+
+ if (table->versioned() && cascade->is_delete != PLAIN_DELETE
+ && cascade->update->affects_versioned()) {
+ ut_ad(!cascade->historical_heap);
+ cascade->historical_heap = mem_heap_create(srv_page_size);
+ cascade->historical_row = row_build(
+ ROW_COPY_DATA, clust_index, clust_rec, NULL, table,
+ NULL, NULL, NULL, cascade->historical_heap);
+ }
+
+ /* Store pcur position and initialize or store the cascade node
+ pcur stored position */
+
+ btr_pcur_store_position(pcur, mtr);
+
+ if (index == clust_index) {
+ btr_pcur_copy_stored_position(cascade->pcur, pcur);
+ } else {
+ btr_pcur_store_position(cascade->pcur, mtr);
+ }
+
+#ifdef WITH_WSREP
+ if (trx->is_wsrep()) {
+ err = wsrep_append_foreign_key(trx, foreign, clust_rec, clust_index,
+ false, NULL, true,
+ WSREP_SERVICE_KEY_EXCLUSIVE);
+ if (err != DB_SUCCESS) {
+ goto nonstandard_exit_func;
+ }
+ }
+#endif /* WITH_WSREP */
+ mtr_commit(mtr);
+
+ ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
+
+ cascade->state = UPD_NODE_UPDATE_CLUSTERED;
+
+ err = row_update_cascade_for_mysql(thr, cascade,
+ foreign->foreign_table);
+
+ mtr_start(mtr);
+
+ /* Restore pcur position */
+
+ if (pcur->restore_position(BTR_SEARCH_LEAF, mtr)
+ != btr_pcur_t::SAME_ALL) {
+ err = DB_CORRUPTION;
+ }
+
+ if (tmp_heap) {
+ mem_heap_free(tmp_heap);
+ }
+
+ DBUG_RETURN(err);
+
+nonstandard_exit_func:
+
+ if (tmp_heap) {
+ mem_heap_free(tmp_heap);
+ }
+
+ btr_pcur_store_position(pcur, mtr);
+
+ mtr_commit(mtr);
+ mtr_start(mtr);
+
+ if (pcur->restore_position(BTR_SEARCH_LEAF, mtr)
+ != btr_pcur_t::SAME_ALL && err == DB_SUCCESS) {
+ err = DB_CORRUPTION;
+ }
+
+ DBUG_RETURN(err);
+}
+
+/*********************************************************************//**
+Sets a shared lock on a record. Used in locking possible duplicate key
+records and also in checking foreign key constraints.
+@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
+static
+dberr_t
+row_ins_set_shared_rec_lock(
+/*========================*/
+ unsigned type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
+ LOCK_REC_NOT_GAP type lock */
+ const buf_block_t* block, /*!< in: buffer block of rec */
+ const rec_t* rec, /*!< in: record */
+ dict_index_t* index, /*!< in: index */
+ const rec_offs* offsets,/*!< in: rec_get_offsets(rec, index) */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ dberr_t err;
+
+ ut_ad(rec_offs_validate(rec, index, offsets));
+
+ if (dict_index_is_clust(index)) {
+ err = lock_clust_rec_read_check_and_lock(
+ 0, block, rec, index, offsets, LOCK_S, type, thr);
+ } else {
+ err = lock_sec_rec_read_check_and_lock(
+ 0, block, rec, index, offsets, LOCK_S, type, thr);
+ }
+
+ return(err);
+}
+
+/*********************************************************************//**
+Sets a exclusive lock on a record. Used in locking possible duplicate key
+records
+@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
+static
+dberr_t
+row_ins_set_exclusive_rec_lock(
+/*===========================*/
+ unsigned type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
+ LOCK_REC_NOT_GAP type lock */
+ const buf_block_t* block, /*!< in: buffer block of rec */
+ const rec_t* rec, /*!< in: record */
+ dict_index_t* index, /*!< in: index */
+ const rec_offs* offsets,/*!< in: rec_get_offsets(rec, index) */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ dberr_t err;
+
+ ut_ad(rec_offs_validate(rec, index, offsets));
+
+ if (dict_index_is_clust(index)) {
+ err = lock_clust_rec_read_check_and_lock(
+ 0, block, rec, index, offsets, LOCK_X, type, thr);
+ } else {
+ err = lock_sec_rec_read_check_and_lock(
+ 0, block, rec, index, offsets, LOCK_X, type, thr);
+ }
+
+ return(err);
+}
+
+/***************************************************************//**
+Checks if foreign key constraint fails for an index entry. Sets shared locks
+which lock either the success or the failure of the constraint. NOTE that
+the caller must have a shared latch on dict_sys.latch.
+@return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
+dberr_t
+row_ins_check_foreign_constraint(
+/*=============================*/
+ ibool check_ref,/*!< in: TRUE if we want to check that
+ the referenced table is ok, FALSE if we
+ want to check the foreign key table */
+ dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
+ tables mentioned in it must be in the
+ dictionary cache if they exist at all */
+ dict_table_t* table, /*!< in: if check_ref is TRUE, then the foreign
+ table, else the referenced table */
+ dtuple_t* entry, /*!< in: index entry for index */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ upd_node_t* upd_node;
+ ulint n_fields_cmp;
+ btr_pcur_t pcur;
+ int cmp;
+ mtr_t mtr;
+ trx_t* trx = thr_get_trx(thr);
+ mem_heap_t* heap = NULL;
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets = offsets_;
+
+ bool skip_gap_lock;
+
+ skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED);
+
+ DBUG_ENTER("row_ins_check_foreign_constraint");
+
+ rec_offs_init(offsets_);
+
+#ifdef WITH_WSREP
+ upd_node= NULL;
+#endif /* WITH_WSREP */
+
+ if (!trx->check_foreigns) {
+ /* The user has suppressed foreign key checks currently for
+ this session */
+ DBUG_RETURN(DB_SUCCESS);
+ }
+
+ /* If any of the foreign key fields in entry is SQL NULL, we
+ suppress the foreign key check: this is compatible with Oracle,
+ for example */
+ for (ulint i = 0; i < entry->n_fields; i++) {
+ dfield_t* field = dtuple_get_nth_field(entry, i);
+ if (i < foreign->n_fields && dfield_is_null(field)) {
+ DBUG_RETURN(DB_SUCCESS);
+ }
+ /* System Versioning: if row_end != Inf, we
+ suppress the foreign key check */
+ if (field->type.vers_sys_end() && field->vers_history_row()) {
+ DBUG_RETURN(DB_SUCCESS);
+ }
+ }
+
+ if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
+ upd_node = static_cast<upd_node_t*>(thr->run_node);
+
+ if (upd_node->is_delete != PLAIN_DELETE
+ && upd_node->foreign == foreign) {
+ /* If a cascaded update is done as defined by a
+ foreign key constraint, do not check that
+ constraint for the child row. In ON UPDATE CASCADE
+ the update of the parent row is only half done when
+ we come here: if we would check the constraint here
+ for the child row it would fail.
+
+ A QUESTION remains: if in the child table there are
+ several constraints which refer to the same parent
+ table, we should merge all updates to the child as
+ one update? And the updates can be contradictory!
+ Currently we just perform the update associated
+ with each foreign key constraint, one after
+ another, and the user has problems predicting in
+ which order they are performed. */
+
+ DBUG_RETURN(DB_SUCCESS);
+ }
+ }
+
+ if (que_node_get_type(thr->run_node) == QUE_NODE_INSERT) {
+ ins_node_t* insert_node =
+ static_cast<ins_node_t*>(thr->run_node);
+ dict_table_t* table = insert_node->index->table;
+ if (table->versioned()) {
+ dfield_t* row_end = dtuple_get_nth_field(
+ insert_node->row, table->vers_end);
+ if (row_end->vers_history_row()) {
+ DBUG_RETURN(DB_SUCCESS);
+ }
+ }
+ }
+
+ dict_table_t *check_table;
+ dict_index_t *check_index;
+ dberr_t err = DB_SUCCESS;
+
+ {
+ dict_table_t*& fktable = check_ref
+ ? foreign->referenced_table : foreign->foreign_table;
+ check_table = fktable;
+ if (check_table) {
+ err = lock_table(check_table, &fktable, LOCK_IS, thr);
+ if (err != DB_SUCCESS) {
+ goto do_possible_lock_wait;
+ }
+ }
+ check_table = fktable;
+ }
+
+ check_index = check_ref
+ ? foreign->referenced_index : foreign->foreign_index;
+
+ if (!check_table || !check_table->is_readable() || !check_index) {
+ FILE* ef = dict_foreign_err_file;
+ std::string fk_str;
+
+ row_ins_set_detailed(trx, foreign);
+ row_ins_foreign_trx_print(trx);
+
+ fputs("Foreign key constraint fails for table ", ef);
+ ut_print_name(ef, trx, check_ref
+ ? foreign->foreign_table_name
+ : foreign->referenced_table_name);
+ fputs(":\n", ef);
+ fk_str = dict_print_info_on_foreign_key_in_create_format(
+ trx, foreign, TRUE);
+ fputs(fk_str.c_str(), ef);
+ if (check_ref) {
+ if (foreign->foreign_index) {
+ fprintf(ef, "\nTrying to add to index %s"
+ " tuple:\n",
+ foreign->foreign_index->name());
+ } else {
+ fputs("\nTrying to add tuple:\n", ef);
+ }
+ dtuple_print(ef, entry);
+ fputs("\nBut the parent table ", ef);
+ ut_print_name(ef, trx, foreign->referenced_table_name);
+ fputs("\nor its .ibd file or the required index does"
+ " not currently exist!\n", ef);
+ err = DB_NO_REFERENCED_ROW;
+ } else {
+ if (foreign->referenced_index) {
+ fprintf(ef, "\nTrying to modify index %s"
+ " tuple:\n",
+ foreign->referenced_index->name());
+ } else {
+ fputs("\nTrying to modify tuple:\n", ef);
+ }
+ dtuple_print(ef, entry);
+ fputs("\nBut the referencing table ", ef);
+ ut_print_name(ef, trx, foreign->foreign_table_name);
+ fputs("\nor its .ibd file or the required index does"
+ " not currently exist!\n", ef);
+ err = DB_ROW_IS_REFERENCED;
+ }
+
+ mysql_mutex_unlock(&dict_foreign_err_mutex);
+ goto exit_func;
+ }
+
+ mtr_start(&mtr);
+
+ /* Store old value on n_fields_cmp */
+
+ n_fields_cmp = dtuple_get_n_fields_cmp(entry);
+
+ dtuple_set_n_fields_cmp(entry, foreign->n_fields);
+ pcur.btr_cur.page_cur.index = check_index;
+ err = btr_pcur_open(entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
+ if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
+ goto end_scan;
+ }
+
+ /* Scan index records and check if there is a matching record */
+
+ do {
+ const rec_t* rec = btr_pcur_get_rec(&pcur);
+ const buf_block_t* block = btr_pcur_get_block(&pcur);
+
+ if (page_rec_is_infimum(rec)) {
+
+ continue;
+ }
+
+ offsets = rec_get_offsets(rec, check_index, offsets,
+ check_index->n_core_fields,
+ ULINT_UNDEFINED, &heap);
+
+ if (page_rec_is_supremum(rec)) {
+
+ if (skip_gap_lock) {
+
+ continue;
+ }
+
+ err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
+ rec, check_index,
+ offsets, thr);
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ case DB_SUCCESS:
+ continue;
+ default:
+ goto end_scan;
+ }
+ }
+
+ cmp = cmp_dtuple_rec(entry, rec, check_index, offsets);
+
+ if (cmp == 0) {
+ if (rec_get_deleted_flag(rec,
+ rec_offs_comp(offsets))) {
+ /* In delete-marked records, DB_TRX_ID must
+ always refer to an existing undo log record. */
+ ut_ad(!dict_index_is_clust(check_index)
+ || row_get_rec_trx_id(rec, check_index,
+ offsets));
+
+ err = row_ins_set_shared_rec_lock(
+ skip_gap_lock
+ ? LOCK_REC_NOT_GAP
+ : LOCK_ORDINARY, block,
+ rec, check_index, offsets, thr);
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ case DB_SUCCESS:
+ break;
+ default:
+ goto end_scan;
+ }
+ } else {
+ if (check_table->versioned()) {
+ bool history_row = false;
+
+ if (check_index->is_primary()) {
+ history_row = check_index->
+ vers_history_row(rec,
+ offsets);
+ } else if (check_index->
+ vers_history_row(rec,
+ history_row)) {
+ break;
+ }
+
+ if (history_row) {
+ continue;
+ }
+ }
+ /* Found a matching record. Lock only
+ a record because we can allow inserts
+ into gaps */
+
+ err = row_ins_set_shared_rec_lock(
+ LOCK_REC_NOT_GAP, block,
+ rec, check_index, offsets, thr);
+
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ case DB_SUCCESS:
+ break;
+ default:
+ goto end_scan;
+ }
+
+ if (check_ref) {
+ err = DB_SUCCESS;
+#ifdef WITH_WSREP
+ if (trx->is_wsrep()) {
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ rec,
+ check_index,
+ check_ref,
+ upd_node,
+ false,
+ WSREP_SERVICE_KEY_REFERENCE);
+ }
+#endif /* WITH_WSREP */
+ goto end_scan;
+ } else if (foreign->type != 0) {
+ /* There is an ON UPDATE or ON DELETE
+ condition: check them in a separate
+ function */
+
+ err = row_ins_foreign_check_on_constraint(
+ thr, foreign, &pcur, entry,
+ &mtr);
+ if (err != DB_SUCCESS) {
+ /* Since reporting a plain
+ "duplicate key" error
+ message to the user in
+ cases where a long CASCADE
+ operation would lead to a
+ duplicate key in some
+ other table is very
+ confusing, map duplicate
+ key errors resulting from
+ FK constraints to a
+ separate error code. */
+
+ if (err == DB_DUPLICATE_KEY) {
+ err = DB_FOREIGN_DUPLICATE_KEY;
+ }
+
+ goto end_scan;
+ }
+
+ /* row_ins_foreign_check_on_constraint
+ may have repositioned pcur on a
+ different block */
+ block = btr_pcur_get_block(&pcur);
+ } else {
+ row_ins_foreign_report_err(
+ "Trying to delete or update",
+ thr, foreign, rec, entry);
+
+ err = DB_ROW_IS_REFERENCED;
+ goto end_scan;
+ }
+ }
+ } else {
+ ut_a(cmp < 0);
+
+ err = skip_gap_lock
+ ? DB_SUCCESS
+ : row_ins_set_shared_rec_lock(
+ LOCK_GAP, block,
+ rec, check_index, offsets, thr);
+
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ err = DB_SUCCESS;
+ /* fall through */
+ case DB_SUCCESS:
+ if (check_ref) {
+ err = DB_NO_REFERENCED_ROW;
+ row_ins_foreign_report_add_err(
+ trx, foreign, rec, entry);
+ }
+ default:
+ break;
+ }
+
+ goto end_scan;
+ }
+ } while (btr_pcur_move_to_next(&pcur, &mtr));
+
+ if (check_ref) {
+ row_ins_foreign_report_add_err(
+ trx, foreign, btr_pcur_get_rec(&pcur), entry);
+ err = DB_NO_REFERENCED_ROW;
+ } else {
+ err = DB_SUCCESS;
+ }
+
+end_scan:
+ mtr_commit(&mtr);
+ ut_free(pcur.old_rec_buf);
+
+ /* Restore old value */
+ dtuple_set_n_fields_cmp(entry, n_fields_cmp);
+
+do_possible_lock_wait:
+ if (err == DB_LOCK_WAIT) {
+ trx->error_state = err;
+
+ thr->lock_state = QUE_THR_LOCK_ROW;
+
+ err = lock_wait(thr);
+
+ thr->lock_state = QUE_THR_LOCK_NOLOCK;
+
+ if (err == DB_SUCCESS) {
+ err = DB_LOCK_WAIT;
+ }
+ }
+
+exit_func:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ DBUG_RETURN(err);
+}
+
+/** Sets the values of the dtuple fields in ref_entry from the values of
+foreign columns in entry.
+@param[in] foreign foreign key constraint
+@param[in] index clustered index
+@param[in] entry tuple of clustered index
+@param[in] ref_entry tuple of foreign columns
+@return true if all foreign key fields present in clustered index */
+static
+bool row_ins_foreign_index_entry(dict_foreign_t *foreign,
+ const dict_index_t *index,
+ const dtuple_t *entry,
+ dtuple_t *ref_entry)
+{
+ for (ulint i= 0; i < foreign->n_fields; i++)
+ {
+ for (ulint j= 0; j < index->n_fields; j++)
+ {
+ const dict_col_t *col= dict_index_get_nth_col(index, j);
+
+ /* A clustered index may contain instantly dropped columns,
+ which must be skipped. */
+ if (col->is_dropped())
+ continue;
+
+ const char *col_name= dict_table_get_col_name(index->table, col->ind);
+ if (0 == innobase_strcasecmp(col_name, foreign->foreign_col_names[i]))
+ {
+ dfield_copy(&ref_entry->fields[i], &entry->fields[j]);
+ goto got_match;
+ }
+ }
+ return false;
+got_match:
+ continue;
+ }
+
+ return true;
+}
+
+/***************************************************************//**
+Checks if foreign key constraints fail for an index entry. If index
+is not mentioned in any constraint, this function does nothing,
+Otherwise does searches to the indexes of referenced tables and
+sets shared locks which lock either the success or the failure of
+a constraint.
+@return DB_SUCCESS or error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_check_foreign_constraints(
+/*==============================*/
+ dict_table_t* table, /*!< in: table */
+ dict_index_t* index, /*!< in: index */
+ bool pk, /*!< in: index->is_primary() */
+ dtuple_t* entry, /*!< in: index entry for index */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ dict_foreign_t* foreign;
+ dberr_t err = DB_SUCCESS;
+ mem_heap_t* heap = NULL;
+
+ DBUG_ASSERT(index->is_primary() == pk);
+
+ DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
+ "foreign_constraint_check_for_ins");
+
+ for (dict_foreign_set::iterator it = table->foreign_set.begin();
+ err == DB_SUCCESS && it != table->foreign_set.end();
+ ++it) {
+
+ foreign = *it;
+
+ if (foreign->foreign_index == index
+ || (pk && !foreign->foreign_index)) {
+
+ dtuple_t* ref_tuple = entry;
+ if (UNIV_UNLIKELY(!foreign->foreign_index)) {
+ /* Change primary key entry to
+ foreign key index entry */
+ if (!heap) {
+ heap = mem_heap_create(1000);
+ } else {
+ mem_heap_empty(heap);
+ }
+
+ ref_tuple = dtuple_create(
+ heap, foreign->n_fields);
+ dtuple_set_n_fields_cmp(
+ ref_tuple, foreign->n_fields);
+ if (!row_ins_foreign_index_entry(
+ foreign, index, entry, ref_tuple)) {
+ err = DB_NO_REFERENCED_ROW;
+ break;
+ }
+
+ }
+
+ dict_table_t* ref_table = NULL;
+ dict_table_t* referenced_table
+ = foreign->referenced_table;
+
+ if (referenced_table == NULL) {
+
+ ref_table = dict_table_open_on_name(
+ foreign->referenced_table_name_lookup,
+ false, DICT_ERR_IGNORE_NONE);
+ }
+
+ err = row_ins_check_foreign_constraint(
+ TRUE, foreign, table, ref_tuple, thr);
+
+ if (ref_table) {
+ dict_table_close(ref_table);
+ }
+ }
+ }
+
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ return err;
+}
+
+/***************************************************************//**
+Checks if a unique key violation to rec would occur at the index entry
+insert.
+@return TRUE if error */
+static
+ibool
+row_ins_dupl_error_with_rec(
+/*========================*/
+ const rec_t* rec, /*!< in: user record; NOTE that we assume
+ that the caller already has a record lock on
+ the record! */
+ const dtuple_t* entry, /*!< in: entry to insert */
+ dict_index_t* index, /*!< in: index */
+ const rec_offs* offsets)/*!< in: rec_get_offsets(rec, index) */
+{
+ ulint matched_fields;
+ ulint n_unique;
+ ulint i;
+
+ ut_ad(rec_offs_validate(rec, index, offsets));
+
+ n_unique = dict_index_get_n_unique(index);
+
+ matched_fields = 0;
+
+ cmp_dtuple_rec_with_match(entry, rec, index, offsets, &matched_fields);
+
+ if (matched_fields < n_unique) {
+
+ return(FALSE);
+ }
+
+ /* In a unique secondary index we allow equal key values if they
+ contain SQL NULLs */
+
+ if (!dict_index_is_clust(index) && !index->nulls_equal) {
+
+ for (i = 0; i < n_unique; i++) {
+ if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
+
+ return(FALSE);
+ }
+ }
+ }
+
+ return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
+}
+
+/** Determine whether a history row was inserted by this transaction
+(row TRX_ID is the same as current TRX_ID).
+@param index secondary index
+@param rec secondary index record
+@param trx transaction
+@return error code
+@retval DB_SUCCESS on success
+@retval DB_FOREIGN_DUPLICATE_KEY if a history row was inserted by trx */
+static dberr_t vers_row_same_trx(dict_index_t* index, const rec_t* rec,
+ const trx_t& trx)
+{
+ mtr_t mtr;
+ dberr_t ret= DB_SUCCESS;
+ dict_index_t *clust_index= dict_table_get_first_index(index->table);
+ ut_ad(index != clust_index);
+
+ mtr.start();
+
+ if (const rec_t *clust_rec=
+ row_get_clust_rec(BTR_SEARCH_LEAF, rec, index, &clust_index, &mtr))
+ {
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs *clust_offs= offsets_;
+ rec_offs_init(offsets_);
+ mem_heap_t *heap= NULL;
+
+ clust_offs=
+ rec_get_offsets(clust_rec, clust_index, clust_offs,
+ clust_index->n_core_fields, ULINT_UNDEFINED, &heap);
+ if (clust_index->vers_history_row(clust_rec, clust_offs))
+ {
+ ulint trx_id_len;
+ const byte *trx_id= rec_get_nth_field(clust_rec, clust_offs,
+ clust_index->n_uniq, &trx_id_len);
+ ut_ad(trx_id_len == DATA_TRX_ID_LEN);
+
+ if (trx.id == trx_read_trx_id(trx_id))
+ ret= DB_FOREIGN_DUPLICATE_KEY;
+ }
+
+ if (UNIV_LIKELY_NULL(heap))
+ mem_heap_free(heap);
+ }
+ else
+ {
+ ib::error() << "foreign constraints: secondary index " << index->name <<
+ " of table " << index->table->name << " is out of sync";
+ ut_ad("secondary index is out of sync" == 0);
+ ret= DB_TABLE_CORRUPT;
+ }
+
+ mtr.commit();
+ return ret;
+}
+
+/***************************************************************//**
+Scans a unique non-clustered index at a given index entry to determine
+whether a uniqueness violation has occurred for the key value of the entry.
+Set shared locks on possible duplicate records.
+@return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_scan_sec_index_for_duplicate(
+/*=================================*/
+ ulint flags, /*!< in: undo logging and locking flags */
+ dict_index_t* index, /*!< in: non-clustered unique index */
+ dtuple_t* entry, /*!< in: index entry */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr, /*!< in/out: mini-transaction */
+ mem_heap_t* offsets_heap)
+ /*!< in/out: memory heap that can be emptied */
+{
+ ulint n_unique;
+ int cmp;
+ ulint n_fields_cmp;
+ btr_pcur_t pcur;
+ rec_offs offsets_[REC_OFFS_SEC_INDEX_SIZE];
+ rec_offs* offsets = offsets_;
+ DBUG_ENTER("row_ins_scan_sec_index_for_duplicate");
+
+ rec_offs_init(offsets_);
+
+ ut_ad(!index->lock.have_any());
+
+ n_unique = dict_index_get_n_unique(index);
+
+ /* If the secondary index is unique, but one of the fields in the
+ n_unique first fields is NULL, a unique key violation cannot occur,
+ since we define NULL != NULL in this case */
+
+ if (!index->nulls_equal) {
+ for (ulint i = 0; i < n_unique; i++) {
+ if (UNIV_SQL_NULL == dfield_get_len(
+ dtuple_get_nth_field(entry, i))) {
+
+ DBUG_RETURN(DB_SUCCESS);
+ }
+ }
+ }
+
+ /* Store old value on n_fields_cmp */
+
+ n_fields_cmp = dtuple_get_n_fields_cmp(entry);
+
+ dtuple_set_n_fields_cmp(entry, n_unique);
+ pcur.btr_cur.page_cur.index = index;
+ trx_t* const trx = thr_get_trx(thr);
+ dberr_t err = btr_pcur_open(entry, PAGE_CUR_GE, BTR_SEARCH_LEAF,
+ &pcur, mtr);
+ if (err != DB_SUCCESS) {
+ goto end_scan;
+ }
+
+ /* Scan index records and check if there is a duplicate */
+
+ do {
+ const rec_t* rec = btr_pcur_get_rec(&pcur);
+ const buf_block_t* block = btr_pcur_get_block(&pcur);
+ const ulint lock_type = LOCK_ORDINARY;
+
+ if (page_rec_is_infimum(rec)) {
+
+ continue;
+ }
+
+ offsets = rec_get_offsets(rec, index, offsets,
+ index->n_core_fields,
+ ULINT_UNDEFINED, &offsets_heap);
+
+ if (flags & BTR_NO_LOCKING_FLAG) {
+ /* Set no locks when applying log
+ in online table rebuild. */
+ } else if (trx->duplicates) {
+
+ /* If the SQL-query will update or replace
+ duplicate key we will take X-lock for
+ duplicates ( REPLACE, LOAD DATAFILE REPLACE,
+ INSERT ON DUPLICATE KEY UPDATE). */
+
+ err = row_ins_set_exclusive_rec_lock(
+ lock_type, block, rec, index, offsets, thr);
+ } else {
+
+ err = row_ins_set_shared_rec_lock(
+ lock_type, block, rec, index, offsets, thr);
+ }
+
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ err = DB_SUCCESS;
+ case DB_SUCCESS:
+ break;
+ default:
+ goto end_scan;
+ }
+
+ if (page_rec_is_supremum(rec)) {
+
+ continue;
+ }
+
+ cmp = cmp_dtuple_rec(entry, rec, index, offsets);
+
+ if (cmp == 0) {
+ if (row_ins_dupl_error_with_rec(rec, entry,
+ index, offsets)) {
+
+ err = DB_DUPLICATE_KEY;
+
+ trx->error_info = index;
+
+ if (!index->table->versioned()) {
+ } else if (dberr_t e =
+ vers_row_same_trx(index, rec,
+ *trx)) {
+ err = e;
+ goto end_scan;
+ }
+
+ /* If the duplicate is on hidden FTS_DOC_ID,
+ state so in the error log */
+ if (index == index->table->fts_doc_id_index
+ && DICT_TF2_FLAG_IS_SET(
+ index->table,
+ DICT_TF2_FTS_HAS_DOC_ID)) {
+
+ ib::error() << "Duplicate FTS_DOC_ID"
+ " value on table "
+ << index->table->name;
+ }
+
+ goto end_scan;
+ }
+ } else {
+ ut_a(cmp < 0);
+ goto end_scan;
+ }
+ } while (btr_pcur_move_to_next(&pcur, mtr));
+
+end_scan:
+ /* Restore old value */
+ dtuple_set_n_fields_cmp(entry, n_fields_cmp);
+
+ DBUG_RETURN(err);
+}
+
+/** Checks for a duplicate when the table is being rebuilt online.
+@param n_uniq index->db_trx_id()
+@param entry entry being inserted
+@param rec clustered index record at insert position
+@param index clustered index
+@param offsets rec_get_offsets(rec)
+@retval DB_SUCCESS when no duplicate is detected
+@retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
+a newer version of entry (the entry should not be inserted)
+@retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_duplicate_online(ulint n_uniq, const dtuple_t *entry,
+ const rec_t *rec, const dict_index_t *index,
+ rec_offs *offsets)
+{
+ ulint fields = 0;
+
+ /* During rebuild, there should not be any delete-marked rows
+ in the new table. */
+ ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
+ ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
+ ut_ad(n_uniq == index->db_trx_id());
+
+ /* Compare the PRIMARY KEY fields and the DB_TRX_ID, DB_ROLL_PTR. */
+ cmp_dtuple_rec_with_match_low(entry, rec, index, offsets, n_uniq + 2,
+ &fields);
+
+ if (fields < n_uniq) {
+ /* Not a duplicate. */
+ return(DB_SUCCESS);
+ }
+
+ ulint trx_id_len;
+
+ if (fields == n_uniq + 2
+ && memcmp(rec_get_nth_field(rec, offsets, n_uniq, &trx_id_len),
+ reset_trx_id, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
+ ut_ad(trx_id_len == DATA_TRX_ID_LEN);
+ /* rec is an exact match of entry, and DB_TRX_ID belongs
+ to a transaction that started after our ALTER TABLE. */
+ return(DB_SUCCESS_LOCKED_REC);
+ }
+
+ return(DB_DUPLICATE_KEY);
+}
+
+/** Checks for a duplicate when the table is being rebuilt online.
+@retval DB_SUCCESS when no duplicate is detected
+@retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
+a newer version of entry (the entry should not be inserted)
+@retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_duplicate_error_in_clust_online(
+/*====================================*/
+ ulint n_uniq, /*!< in: offset of DB_TRX_ID */
+ const dtuple_t* entry, /*!< in: entry that is being inserted */
+ const btr_cur_t*cursor, /*!< in: cursor on insert position */
+ rec_offs** offsets,/*!< in/out: rec_get_offsets(rec) */
+ mem_heap_t** heap) /*!< in/out: heap for offsets */
+{
+ dberr_t err = DB_SUCCESS;
+ const rec_t* rec = btr_cur_get_rec(cursor);
+
+ ut_ad(!cursor->index()->is_instant());
+
+ if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
+ *offsets = rec_get_offsets(rec, cursor->index(), *offsets,
+ cursor->index()->n_fields,
+ ULINT_UNDEFINED, heap);
+ err = row_ins_duplicate_online(n_uniq, entry,
+ rec, cursor->index(), *offsets);
+ if (err != DB_SUCCESS) {
+ return(err);
+ }
+ }
+
+ if (!(rec = page_rec_get_next_const(btr_cur_get_rec(cursor)))) {
+ return DB_CORRUPTION;
+ }
+
+ if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
+ *offsets = rec_get_offsets(rec, cursor->index(), *offsets,
+ cursor->index()->n_fields,
+ ULINT_UNDEFINED, heap);
+ err = row_ins_duplicate_online(n_uniq, entry,
+ rec, cursor->index(), *offsets);
+ }
+
+ return(err);
+}
+
+/***************************************************************//**
+Checks if a unique key violation error would occur at an index entry
+insert. Sets shared locks on possible duplicate records. Works only
+for a clustered index!
+@retval DB_SUCCESS if no error
+@retval DB_DUPLICATE_KEY if error,
+@retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
+record */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_duplicate_error_in_clust(
+ ulint flags, /*!< in: undo logging and locking flags */
+ btr_cur_t* cursor, /*!< in: B-tree cursor */
+ const dtuple_t* entry, /*!< in: entry to insert */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ dberr_t err;
+ rec_t* rec;
+ ulint n_unique;
+ trx_t* trx = thr_get_trx(thr);
+ mem_heap_t*heap = NULL;
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets = offsets_;
+ rec_offs_init(offsets_);
+
+ ut_ad(cursor->index()->is_clust());
+
+ /* NOTE: For unique non-clustered indexes there may be any number
+ of delete marked records with the same value for the non-clustered
+ index key (remember multiversioning), and which differ only in
+ the row refererence part of the index record, containing the
+ clustered index key fields. For such a secondary index record,
+ to avoid race condition, we must FIRST do the insertion and after
+ that check that the uniqueness condition is not breached! */
+
+ /* NOTE: A problem is that in the B-tree node pointers on an
+ upper level may match more to the entry than the actual existing
+ user records on the leaf level. So, even if low_match would suggest
+ that a duplicate key violation may occur, this may not be the case. */
+
+ n_unique = dict_index_get_n_unique(cursor->index());
+
+ if (cursor->low_match >= n_unique) {
+
+ rec = btr_cur_get_rec(cursor);
+
+ if (!page_rec_is_infimum(rec)) {
+ offsets = rec_get_offsets(rec, cursor->index(),
+ offsets,
+ cursor->index()
+ ->n_core_fields,
+ ULINT_UNDEFINED, &heap);
+
+ /* We set a lock on the possible duplicate: this
+ is needed in logical logging of MySQL to make
+ sure that in roll-forward we get the same duplicate
+ errors as in original execution */
+
+ if (flags & BTR_NO_LOCKING_FLAG) {
+ /* Do nothing if no-locking is set */
+ err = DB_SUCCESS;
+ } else if (trx->duplicates) {
+
+ /* If the SQL-query will update or replace
+ duplicate key we will take X-lock for
+ duplicates ( REPLACE, LOAD DATAFILE REPLACE,
+ INSERT ON DUPLICATE KEY UPDATE). */
+
+ err = row_ins_set_exclusive_rec_lock(
+ LOCK_REC_NOT_GAP,
+ btr_cur_get_block(cursor),
+ rec, cursor->index(), offsets, thr);
+ } else {
+
+ err = row_ins_set_shared_rec_lock(
+ LOCK_REC_NOT_GAP,
+ btr_cur_get_block(cursor), rec,
+ cursor->index(), offsets, thr);
+ }
+
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ case DB_SUCCESS:
+ break;
+ default:
+ goto func_exit;
+ }
+
+ if (row_ins_dupl_error_with_rec(
+ rec, entry, cursor->index(), offsets)) {
+duplicate:
+ trx->error_info = cursor->index();
+ err = DB_DUPLICATE_KEY;
+ if (thr->prebuilt
+ && thr->prebuilt->upd_node
+ && thr->prebuilt->upd_node->is_delete
+ == VERSIONED_DELETE
+ && entry->vers_history_row())
+ {
+ ulint trx_id_len;
+ byte *trx_id = rec_get_nth_field(
+ rec, offsets, n_unique,
+ &trx_id_len);
+ ut_ad(trx_id_len == DATA_TRX_ID_LEN);
+ if (trx->id == trx_read_trx_id(trx_id)) {
+ err = DB_FOREIGN_DUPLICATE_KEY;
+ }
+ }
+ goto func_exit;
+ }
+ }
+ }
+
+ err = DB_SUCCESS;
+
+ if (cursor->up_match >= n_unique) {
+
+ rec = page_rec_get_next(btr_cur_get_rec(cursor));
+
+ if (rec && !page_rec_is_supremum(rec)) {
+ offsets = rec_get_offsets(rec, cursor->index(),
+ offsets,
+ cursor->index()
+ ->n_core_fields,
+ ULINT_UNDEFINED, &heap);
+
+ if (trx->duplicates) {
+
+ /* If the SQL-query will update or replace
+ duplicate key we will take X-lock for
+ duplicates ( REPLACE, LOAD DATAFILE REPLACE,
+ INSERT ON DUPLICATE KEY UPDATE). */
+
+ err = row_ins_set_exclusive_rec_lock(
+ LOCK_REC_NOT_GAP,
+ btr_cur_get_block(cursor),
+ rec, cursor->index(), offsets, thr);
+ } else {
+
+ err = row_ins_set_shared_rec_lock(
+ LOCK_REC_NOT_GAP,
+ btr_cur_get_block(cursor),
+ rec, cursor->index(), offsets, thr);
+ }
+
+ switch (err) {
+ default:
+ break;
+ case DB_SUCCESS_LOCKED_REC:
+ err = DB_SUCCESS;
+ /* fall through */
+ case DB_SUCCESS:
+ if (row_ins_dupl_error_with_rec(
+ rec, entry, cursor->index(),
+ offsets)) {
+ goto duplicate;
+ }
+ }
+ }
+
+ /* This should never happen */
+ err = DB_CORRUPTION;
+ }
+func_exit:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+ return(err);
+}
+
+/***************************************************************//**
+Checks if an index entry has long enough common prefix with an
+existing record so that the intended insert of the entry must be
+changed to a modify of the existing record. In the case of a clustered
+index, the prefix must be n_unique fields long. In the case of a
+secondary index, all fields must be equal. InnoDB never updates
+secondary index records in place, other than clearing or setting the
+delete-mark flag. We could be able to update the non-unique fields
+of a unique secondary index record by checking the cursor->up_match,
+but we do not do so, because it could have some locking implications.
+@return TRUE if the existing record should be updated; FALSE if not */
+UNIV_INLINE
+ibool
+row_ins_must_modify_rec(
+/*====================*/
+ const btr_cur_t* cursor) /*!< in: B-tree cursor */
+{
+ /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
+ Because node pointers on upper levels of the B-tree may match more
+ to entry than to actual user records on the leaf level, we
+ have to check if the candidate record is actually a user record.
+ A clustered index node pointer contains index->n_unique first fields,
+ and a secondary index node pointer contains all index fields. */
+
+ return(cursor->low_match
+ >= dict_index_get_n_unique_in_tree(cursor->index())
+ && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
+}
+
+/** Insert the externally stored fields (off-page columns)
+of a clustered index entry.
+@param[in] entry index entry to insert
+@param[in] big_rec externally stored fields
+@param[in,out] offsets rec_get_offsets()
+@param[in,out] heap memory heap
+@param[in] thd client connection, or NULL
+@param[in] index clustered index
+@return error code
+@retval DB_SUCCESS
+@retval DB_OUT_OF_FILE_SPACE */
+static
+dberr_t
+row_ins_index_entry_big_rec(
+ const dtuple_t* entry,
+ const big_rec_t* big_rec,
+ rec_offs* offsets,
+ mem_heap_t** heap,
+ dict_index_t* index,
+ const void* thd __attribute__((unused)))
+{
+ mtr_t mtr;
+ btr_pcur_t pcur;
+ rec_t* rec;
+
+ pcur.btr_cur.page_cur.index = index;
+ ut_ad(index->is_primary());
+
+ DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
+
+ mtr.start();
+ if (index->table->is_temporary()) {
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
+ } else {
+ index->set_modified(mtr);
+ }
+
+ dberr_t error = btr_pcur_open(entry, PAGE_CUR_LE, BTR_MODIFY_TREE,
+ &pcur, &mtr);
+ if (error != DB_SUCCESS) {
+ return error;
+ }
+
+ rec = btr_pcur_get_rec(&pcur);
+ offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
+ ULINT_UNDEFINED, heap);
+
+ DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
+ error = btr_store_big_rec_extern_fields(
+ &pcur, offsets, big_rec, &mtr, BTR_STORE_INSERT);
+ DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
+
+ mtr.commit();
+
+ ut_free(pcur.old_rec_buf);
+ return(error);
+}
+
+#ifdef HAVE_REPLICATION /* Working around MDEV-24622 */
+extern "C" int thd_is_slave(const MYSQL_THD thd);
+#else
+# define thd_is_slave(thd) 0
+#endif
+
+#if defined __aarch64__&&defined __GNUC__&&__GNUC__==4&&!defined __clang__
+/* Avoid GCC 4.8.5 internal compiler error due to srw_mutex::wr_unlock().
+We would only need this for row_ins_clust_index_entry_low(),
+but GCC 4.8.5 does not support pop_options. */
+# pragma GCC optimize ("O0")
+#endif
+
+/***************************************************************//**
+Tries to insert an entry into a clustered index, ignoring foreign key
+constraints. If a record with the same unique key is found, the other
+record is necessarily marked deleted by a committed transaction, or a
+unique key violation error occurs. The delete marked record is then
+updated to an existing record, and we must write an undo log record on
+the delete marked record.
+@retval DB_SUCCESS on success
+@retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
+@retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
+@return error code */
+dberr_t
+row_ins_clust_index_entry_low(
+/*==========================*/
+ ulint flags, /*!< in: undo logging and locking flags */
+ btr_latch_mode mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
+ depending on whether we wish optimistic or
+ pessimistic descent down the index tree */
+ dict_index_t* index, /*!< in: clustered index */
+ ulint n_uniq, /*!< in: 0 or index->n_uniq */
+ dtuple_t* entry, /*!< in/out: index entry to insert */
+ ulint n_ext, /*!< in: number of externally stored columns */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ btr_pcur_t pcur;
+ dberr_t err = DB_SUCCESS;
+ big_rec_t* big_rec = NULL;
+ mtr_t mtr;
+ uint64_t auto_inc = 0;
+ mem_heap_t* offsets_heap = NULL;
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets = offsets_;
+ rec_offs_init(offsets_);
+ trx_t* trx = thr_get_trx(thr);
+ buf_block_t* block;
+
+ DBUG_ENTER("row_ins_clust_index_entry_low");
+
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(!dict_index_is_unique(index)
+ || n_uniq == dict_index_get_n_unique(index));
+ ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
+ ut_ad(!trx->in_rollback);
+
+ mtr.start();
+
+ if (index->table->is_temporary()) {
+ /* Disable REDO logging as the lifetime of temp-tables is
+ limited to server or connection lifetime and so REDO
+ information is not needed on restart for recovery.
+ Disable locking as temp-tables are local to a connection. */
+
+ ut_ad(flags & BTR_NO_LOCKING_FLAG);
+ ut_ad(!dict_index_is_online_ddl(index));
+ ut_ad(!index->table->persistent_autoinc);
+ ut_ad(!index->is_instant());
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
+ } else {
+ index->set_modified(mtr);
+
+ if (UNIV_UNLIKELY(entry->is_metadata())) {
+ ut_ad(index->is_instant());
+ ut_ad(!dict_index_is_online_ddl(index));
+ ut_ad(mode == BTR_MODIFY_TREE);
+ } else {
+ if (mode == BTR_MODIFY_LEAF
+ && dict_index_is_online_ddl(index)) {
+ mode = BTR_MODIFY_LEAF_ALREADY_LATCHED;
+ mtr_s_lock_index(index, &mtr);
+ }
+
+ if (unsigned ai = index->table->persistent_autoinc) {
+ /* Prepare to persist the AUTO_INCREMENT value
+ from the index entry to PAGE_ROOT_AUTO_INC. */
+ const dfield_t* dfield = dtuple_get_nth_field(
+ entry, ai - 1);
+ if (!dfield_is_null(dfield)) {
+ auto_inc = row_parse_int(
+ static_cast<const byte*>(
+ dfield->data),
+ dfield->len,
+ dfield->type.mtype,
+ dfield->type.prtype
+ & DATA_UNSIGNED);
+ if (auto_inc
+ && mode != BTR_MODIFY_TREE) {
+ mode = btr_latch_mode(
+ BTR_MODIFY_ROOT_AND_LEAF
+ ^ BTR_MODIFY_LEAF
+ ^ mode);
+ }
+ }
+ }
+ }
+ }
+
+ /* Note that we use PAGE_CUR_LE as the search mode, because then
+ the function will return in both low_match and up_match of the
+ cursor sensible values */
+ pcur.btr_cur.page_cur.index = index;
+ err = btr_pcur_open(entry, PAGE_CUR_LE, mode, &pcur, &mtr);
+ if (err != DB_SUCCESS) {
+ index->table->file_unreadable = true;
+err_exit:
+ mtr.commit();
+ goto func_exit;
+ }
+
+ if (auto_inc) {
+ buf_block_t* root
+ = mtr.at_savepoint(mode != BTR_MODIFY_ROOT_AND_LEAF);
+ ut_ad(index->page == root->page.id().page_no());
+ page_set_autoinc(root, auto_inc, &mtr, false);
+ }
+
+ btr_pcur_get_btr_cur(&pcur)->thr = thr;
+
+#ifdef UNIV_DEBUG
+ {
+ page_t* page = btr_pcur_get_page(&pcur);
+ rec_t* first_rec = page_rec_get_next(
+ page_get_infimum_rec(page));
+
+ ut_ad(page_rec_is_supremum(first_rec)
+ || rec_n_fields_is_sane(index, first_rec, entry));
+ }
+#endif /* UNIV_DEBUG */
+
+ block = btr_pcur_get_block(&pcur);
+
+ DBUG_EXECUTE_IF("row_ins_row_level", goto skip_bulk_insert;);
+
+ if (!(flags & BTR_NO_UNDO_LOG_FLAG)
+ && page_is_empty(block->page.frame)
+ && !entry->is_metadata() && !trx->duplicates
+ && !trx->check_unique_secondary && !trx->check_foreigns
+ && !trx->dict_operation
+ && block->page.id().page_no() == index->page
+ && !index->table->skip_alter_undo
+ && !index->table->n_rec_locks
+ && !index->table->is_active_ddl()
+ && !index->table->has_spatial_index()
+ && !index->table->versioned()
+ && !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
+ DEBUG_SYNC_C("empty_root_page_insert");
+
+ trx->bulk_insert = true;
+
+ if (!index->table->is_temporary()) {
+ err = lock_table(index->table, NULL, LOCK_X, thr);
+
+ if (err != DB_SUCCESS) {
+ trx->error_state = err;
+ trx->bulk_insert = false;
+ goto err_exit;
+ }
+
+ if (index->table->n_rec_locks) {
+avoid_bulk:
+ trx->bulk_insert = false;
+ goto skip_bulk_insert;
+ }
+
+#ifdef WITH_WSREP
+ if (trx->is_wsrep())
+ {
+ if (!wsrep_thd_is_local_transaction(trx->mysql_thd))
+ goto skip_bulk_insert;
+ if (wsrep_append_table_key(trx->mysql_thd, *index->table))
+ {
+ trx->error_state = DB_ROLLBACK;
+ goto err_exit;
+ }
+ }
+#endif /* WITH_WSREP */
+
+#ifdef BTR_CUR_HASH_ADAPT
+ if (btr_search_enabled) {
+ btr_search_x_lock_all();
+ index->table->bulk_trx_id = trx->id;
+ btr_search_x_unlock_all();
+ } else {
+ index->table->bulk_trx_id = trx->id;
+ }
+#else /* BTR_CUR_HASH_ADAPT */
+ index->table->bulk_trx_id = trx->id;
+#endif /* BTR_CUR_HASH_ADAPT */
+
+ /* Write TRX_UNDO_EMPTY undo log and
+ start buffering the insert operation */
+ err = trx_undo_report_row_operation(
+ thr, index, entry,
+ nullptr, 0, nullptr, nullptr,
+ nullptr);
+
+ if (err != DB_SUCCESS) {
+ goto avoid_bulk;
+ }
+
+ goto err_exit;
+ }
+ }
+
+skip_bulk_insert:
+ if (UNIV_UNLIKELY(entry->info_bits != 0)) {
+ ut_ad(entry->is_metadata());
+ ut_ad(flags == BTR_NO_LOCKING_FLAG);
+ ut_ad(index->is_instant());
+ ut_ad(!dict_index_is_online_ddl(index));
+
+ const rec_t* rec = btr_pcur_get_rec(&pcur);
+
+ if (rec_get_info_bits(rec, page_rec_is_comp(rec))
+ & REC_INFO_MIN_REC_FLAG) {
+ trx->error_info = index;
+ err = DB_DUPLICATE_KEY;
+ goto err_exit;
+ }
+
+ ut_ad(!row_ins_must_modify_rec(&pcur.btr_cur));
+ goto do_insert;
+ }
+
+ if (rec_is_metadata(btr_pcur_get_rec(&pcur), *index)) {
+ goto do_insert;
+ }
+
+ if (n_uniq
+ && (pcur.btr_cur.up_match >= n_uniq
+ || pcur.btr_cur.low_match >= n_uniq)) {
+
+ if (flags
+ == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
+ /* Set no locks when applying log
+ in online table rebuild. Only check for duplicates. */
+ err = row_ins_duplicate_error_in_clust_online(
+ n_uniq, entry, &pcur.btr_cur,
+ &offsets, &offsets_heap);
+
+ switch (err) {
+ case DB_SUCCESS:
+ break;
+ default:
+ ut_ad(0);
+ /* fall through */
+ case DB_SUCCESS_LOCKED_REC:
+ case DB_DUPLICATE_KEY:
+ trx->error_info = index;
+ }
+ } else {
+ /* Note that the following may return also
+ DB_LOCK_WAIT */
+
+ err = row_ins_duplicate_error_in_clust(
+ flags, &pcur.btr_cur, entry, thr);
+ }
+
+ if (err != DB_SUCCESS) {
+ goto err_exit;
+ }
+ }
+
+ /* Note: Allowing duplicates would qualify for modification of
+ an existing record as the new entry is exactly same as old entry. */
+ if (row_ins_must_modify_rec(&pcur.btr_cur)) {
+ /* There is already an index entry with a long enough common
+ prefix, we must convert the insert into a modify of an
+ existing record */
+ mem_heap_t* entry_heap = mem_heap_create(1024);
+
+ err = row_ins_clust_index_entry_by_modify(
+ &pcur, flags, mode, &offsets, &offsets_heap,
+ entry_heap, entry, thr, &mtr);
+
+ mtr_commit(&mtr);
+ mem_heap_free(entry_heap);
+ } else {
+ if (index->is_instant()) entry->trim(*index);
+do_insert:
+ rec_t* insert_rec;
+
+ if (mode != BTR_MODIFY_TREE) {
+ ut_ad(mode == BTR_MODIFY_LEAF
+ || mode == BTR_MODIFY_LEAF_ALREADY_LATCHED
+ || mode == BTR_MODIFY_ROOT_AND_LEAF
+ || mode
+ == BTR_MODIFY_ROOT_AND_LEAF_ALREADY_LATCHED);
+ err = btr_cur_optimistic_insert(
+ flags, &pcur.btr_cur, &offsets, &offsets_heap,
+ entry, &insert_rec, &big_rec,
+ n_ext, thr, &mtr);
+ } else {
+ if (buf_pool.running_out()) {
+ err = DB_LOCK_TABLE_FULL;
+ goto err_exit;
+ }
+
+ err = btr_cur_optimistic_insert(
+ flags, &pcur.btr_cur,
+ &offsets, &offsets_heap,
+ entry, &insert_rec, &big_rec,
+ n_ext, thr, &mtr);
+
+ if (err == DB_FAIL) {
+ err = btr_cur_pessimistic_insert(
+ flags, &pcur.btr_cur,
+ &offsets, &offsets_heap,
+ entry, &insert_rec, &big_rec,
+ n_ext, thr, &mtr);
+ }
+ }
+
+ mtr.commit();
+
+ if (big_rec) {
+ /* Online table rebuild could read (and
+ ignore) the incomplete record at this point.
+ If online rebuild is in progress, the
+ row_ins_index_entry_big_rec() will write log. */
+
+ DBUG_EXECUTE_IF(
+ "row_ins_extern_checkpoint",
+ log_write_up_to(mtr.commit_lsn(), true););
+ err = row_ins_index_entry_big_rec(
+ entry, big_rec, offsets, &offsets_heap, index,
+ trx->mysql_thd);
+ dtuple_convert_back_big_rec(index, entry, big_rec);
+ }
+ }
+
+func_exit:
+ if (offsets_heap != NULL) {
+ mem_heap_free(offsets_heap);
+ }
+
+ ut_free(pcur.old_rec_buf);
+ DBUG_RETURN(err);
+}
+
+/** Start a mini-transaction.
+@param[in,out] mtr mini-transaction
+@param[in,out] index secondary index */
+static void row_ins_sec_mtr_start(mtr_t *mtr, dict_index_t *index)
+{
+ ut_ad(!dict_index_is_clust(index));
+ ut_ad(mtr->is_named_space(index->table->space));
+
+ const mtr_log_t log_mode = mtr->get_log_mode();
+
+ mtr->start();
+ index->set_modified(*mtr);
+ mtr->set_log_mode(log_mode);
+}
+
+/***************************************************************//**
+Tries to insert an entry into a secondary index. If a record with exactly the
+same fields is found, the other record is necessarily marked deleted.
+It is then unmarked. Otherwise, the entry is just inserted to the index.
+@retval DB_SUCCESS on success
+@retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
+@retval DB_FAIL if retry with BTR_INSERT_TREE is needed
+@return error code */
+dberr_t
+row_ins_sec_index_entry_low(
+/*========================*/
+ ulint flags, /*!< in: undo logging and locking flags */
+ btr_latch_mode mode, /*!< in: BTR_MODIFY_LEAF or BTR_INSERT_TREE,
+ depending on whether we wish optimistic or
+ pessimistic descent down the index tree */
+ dict_index_t* index, /*!< in: secondary index */
+ mem_heap_t* offsets_heap,
+ /*!< in/out: memory heap that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ dtuple_t* entry, /*!< in/out: index entry to insert */
+ trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during
+ row_log_table_apply(), or 0 */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ DBUG_ENTER("row_ins_sec_index_entry_low");
+
+ btr_cur_t cursor;
+ btr_latch_mode search_mode = mode;
+ dberr_t err;
+ ulint n_unique;
+ mtr_t mtr;
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets = offsets_;
+ rec_offs_init(offsets_);
+ rtr_info_t rtr_info;
+
+ ut_ad(!dict_index_is_clust(index));
+ ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_INSERT_TREE);
+
+ cursor.thr = thr;
+ cursor.rtr_info = NULL;
+ cursor.page_cur.index = index;
+ ut_ad(thr_get_trx(thr)->id != 0);
+
+ mtr.start();
+
+ if (index->table->is_temporary()) {
+ /* Disable locking, because temporary tables are never
+ shared between transactions or connections. */
+ ut_ad(flags & BTR_NO_LOCKING_FLAG);
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
+ } else {
+ index->set_modified(mtr);
+ }
+
+ /* Note that we use PAGE_CUR_LE as the search mode, because then
+ the function will return in both low_match and up_match of the
+ cursor sensible values */
+
+ if (index->is_spatial()) {
+ rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
+ rtr_info_update_btr(&cursor, &rtr_info);
+
+ err = rtr_insert_leaf(&cursor, entry, search_mode, &mtr);
+
+ if (err == DB_SUCCESS && search_mode == BTR_MODIFY_LEAF
+ && rtr_info.mbr_adj) {
+ mtr_commit(&mtr);
+ search_mode = mode = BTR_MODIFY_TREE;
+ rtr_clean_rtr_info(&rtr_info, true);
+ rtr_init_rtr_info(&rtr_info, false, &cursor,
+ index, false);
+ rtr_info_update_btr(&cursor, &rtr_info);
+ mtr.start();
+ if (index->table->is_temporary()) {
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
+ } else {
+ index->set_modified(mtr);
+ }
+ err = rtr_insert_leaf(&cursor, entry,
+ search_mode, &mtr);
+ }
+
+ DBUG_EXECUTE_IF(
+ "rtree_test_check_count", {
+ goto func_exit;});
+
+ } else {
+ if (!index->table->is_temporary()) {
+ search_mode = btr_latch_mode(
+ search_mode
+ | (thr_get_trx(thr)->check_unique_secondary
+ ? BTR_INSERT
+ : BTR_INSERT | BTR_IGNORE_SEC_UNIQUE));
+ }
+
+ err = cursor.search_leaf(entry, PAGE_CUR_LE, search_mode,
+ &mtr);
+ }
+
+ if (err != DB_SUCCESS) {
+ if (err == DB_DECRYPTION_FAILED) {
+ btr_decryption_failed(*index);
+ }
+ goto func_exit;
+ }
+
+ if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
+ ut_ad(!dict_index_is_spatial(index));
+ /* The insert was buffered during the search: we are done */
+ goto func_exit;
+ }
+
+#ifdef UNIV_DEBUG
+ {
+ page_t* page = btr_cur_get_page(&cursor);
+ rec_t* first_rec = page_rec_get_next(
+ page_get_infimum_rec(page));
+
+ ut_ad(page_rec_is_supremum(first_rec)
+ || rec_n_fields_is_sane(index, first_rec, entry));
+ }
+#endif /* UNIV_DEBUG */
+
+ n_unique = dict_index_get_n_unique(index);
+
+ if (dict_index_is_unique(index)
+ && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
+ mtr_commit(&mtr);
+
+ DEBUG_SYNC_C("row_ins_sec_index_unique");
+
+ row_ins_sec_mtr_start(&mtr, index);
+
+ err = row_ins_scan_sec_index_for_duplicate(
+ flags, index, entry, thr, &mtr, offsets_heap);
+
+ mtr_commit(&mtr);
+
+ switch (err) {
+ case DB_SUCCESS:
+ break;
+ case DB_DUPLICATE_KEY:
+ if (!index->is_committed()) {
+ ut_ad(!thr_get_trx(thr)
+ ->dict_operation_lock_mode);
+ index->type |= DICT_CORRUPT;
+ /* Do not return any error to the
+ caller. The duplicate will be reported
+ by ALTER TABLE or CREATE UNIQUE INDEX.
+ Unfortunately we cannot report the
+ duplicate key value to the DDL thread,
+ because the altered_table object is
+ private to its call stack. */
+ err = DB_SUCCESS;
+ }
+ /* fall through */
+ default:
+ if (dict_index_is_spatial(index)) {
+ rtr_clean_rtr_info(&rtr_info, true);
+ }
+ DBUG_RETURN(err);
+ }
+
+ row_ins_sec_mtr_start(&mtr, index);
+
+ DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
+
+ /* We did not find a duplicate and we have now
+ locked with s-locks the necessary records to
+ prevent any insertion of a duplicate by another
+ transaction. Let us now reposition the cursor and
+ continue the insertion (bypassing the change buffer). */
+ err = cursor.search_leaf(
+ entry, PAGE_CUR_LE,
+ btr_latch_mode(search_mode
+ & ~(BTR_INSERT
+ | BTR_IGNORE_SEC_UNIQUE)),
+ &mtr);
+ if (err != DB_SUCCESS) {
+ goto func_exit;
+ }
+ }
+
+ if (row_ins_must_modify_rec(&cursor)) {
+ /* There is already an index entry with a long enough common
+ prefix, we must convert the insert into a modify of an
+ existing record */
+ offsets = rec_get_offsets(
+ btr_cur_get_rec(&cursor), index, offsets,
+ index->n_core_fields,
+ ULINT_UNDEFINED, &offsets_heap);
+
+ err = row_ins_sec_index_entry_by_modify(
+ flags, mode, &cursor, &offsets,
+ offsets_heap, heap, entry, thr, &mtr);
+
+ if (err == DB_SUCCESS && dict_index_is_spatial(index)
+ && rtr_info.mbr_adj) {
+ err = rtr_ins_enlarge_mbr(&cursor, &mtr);
+ }
+ } else {
+ rec_t* insert_rec;
+ big_rec_t* big_rec;
+
+ if (mode == BTR_MODIFY_LEAF) {
+ err = btr_cur_optimistic_insert(
+ flags, &cursor, &offsets, &offsets_heap,
+ entry, &insert_rec,
+ &big_rec, 0, thr, &mtr);
+ if (err == DB_SUCCESS
+ && dict_index_is_spatial(index)
+ && rtr_info.mbr_adj) {
+ err = rtr_ins_enlarge_mbr(&cursor, &mtr);
+ }
+ } else {
+ if (buf_pool.running_out()) {
+ err = DB_LOCK_TABLE_FULL;
+ goto func_exit;
+ }
+
+ err = btr_cur_optimistic_insert(
+ flags, &cursor,
+ &offsets, &offsets_heap,
+ entry, &insert_rec,
+ &big_rec, 0, thr, &mtr);
+ if (err == DB_FAIL) {
+ err = btr_cur_pessimistic_insert(
+ flags, &cursor,
+ &offsets, &offsets_heap,
+ entry, &insert_rec,
+ &big_rec, 0, thr, &mtr);
+ }
+ if (err == DB_SUCCESS
+ && dict_index_is_spatial(index)
+ && rtr_info.mbr_adj) {
+ err = rtr_ins_enlarge_mbr(&cursor, &mtr);
+ }
+ }
+
+ if (err == DB_SUCCESS && trx_id) {
+ page_update_max_trx_id(
+ btr_cur_get_block(&cursor),
+ btr_cur_get_page_zip(&cursor),
+ trx_id, &mtr);
+ }
+
+ ut_ad(!big_rec);
+ }
+
+func_exit:
+ if (dict_index_is_spatial(index)) {
+ rtr_clean_rtr_info(&rtr_info, true);
+ }
+
+ mtr_commit(&mtr);
+ DBUG_RETURN(err);
+}
+
+/***************************************************************//**
+Inserts an entry into a clustered index. Tries first optimistic,
+then pessimistic descent down the tree. If the entry matches enough
+to a delete marked record, performs the insert by updating or delete
+unmarking the delete marked record.
+@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
+dberr_t
+row_ins_clust_index_entry(
+/*======================*/
+ dict_index_t* index, /*!< in: clustered index */
+ dtuple_t* entry, /*!< in/out: index entry to insert */
+ que_thr_t* thr, /*!< in: query thread */
+ ulint n_ext) /*!< in: number of externally stored columns */
+{
+ dberr_t err;
+ ulint n_uniq;
+
+ DBUG_ENTER("row_ins_clust_index_entry");
+
+ if (!index->table->foreign_set.empty()) {
+ err = row_ins_check_foreign_constraints(
+ index->table, index, true, entry, thr);
+ if (err != DB_SUCCESS) {
+
+ DBUG_RETURN(err);
+ }
+ }
+
+ n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
+
+#ifdef WITH_WSREP
+ const bool skip_locking
+ = wsrep_thd_skip_locking(thr_get_trx(thr)->mysql_thd);
+ ulint flags = index->table->no_rollback() ? BTR_NO_ROLLBACK
+ : (index->table->is_temporary() || skip_locking)
+ ? BTR_NO_LOCKING_FLAG : 0;
+#ifdef UNIV_DEBUG
+ if (skip_locking && strcmp(wsrep_get_sr_table_name(),
+ index->table->name.m_name)) {
+ WSREP_ERROR("Record locking is disabled in this thread, "
+ "but the table being modified is not "
+ "`%s`: `%s`.", wsrep_get_sr_table_name(),
+ index->table->name.m_name);
+ ut_error;
+ }
+#endif /* UNIV_DEBUG */
+#else
+ ulint flags = index->table->no_rollback() ? BTR_NO_ROLLBACK
+ : index->table->is_temporary()
+ ? BTR_NO_LOCKING_FLAG : 0;
+#endif /* WITH_WSREP */
+ const ulint orig_n_fields = entry->n_fields;
+
+ /* For intermediate table during copy alter table,
+ skip the undo log and record lock checking for
+ insertion operation.
+ */
+ if (index->table->skip_alter_undo) {
+ flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
+ }
+
+ /* Try first optimistic descent to the B-tree */
+ log_free_check();
+
+ err = row_ins_clust_index_entry_low(
+ flags, BTR_MODIFY_LEAF, index, n_uniq, entry,
+ n_ext, thr);
+
+ entry->n_fields = orig_n_fields;
+
+ DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
+ "after_row_ins_clust_index_entry_leaf");
+
+ if (err != DB_FAIL) {
+ DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
+ DBUG_RETURN(err);
+ }
+
+ /* Try then pessimistic descent to the B-tree */
+ log_free_check();
+
+ err = row_ins_clust_index_entry_low(
+ flags, BTR_MODIFY_TREE, index, n_uniq, entry,
+ n_ext, thr);
+
+ entry->n_fields = orig_n_fields;
+
+ DBUG_RETURN(err);
+}
+
+/***************************************************************//**
+Inserts an entry into a secondary index. Tries first optimistic,
+then pessimistic descent down the tree. If the entry matches enough
+to a delete marked record, performs the insert by updating or delete
+unmarking the delete marked record.
+@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
+dberr_t
+row_ins_sec_index_entry(
+/*====================*/
+ dict_index_t* index, /*!< in: secondary index */
+ dtuple_t* entry, /*!< in/out: index entry to insert */
+ que_thr_t* thr, /*!< in: query thread */
+ bool check_foreign) /*!< in: true if check
+ foreign table is needed, false otherwise */
+{
+ dberr_t err = DB_SUCCESS;
+ mem_heap_t* offsets_heap;
+ mem_heap_t* heap;
+ trx_id_t trx_id = 0;
+
+ DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
+ DBUG_SET("-d,row_ins_sec_index_entry_timeout");
+ return(DB_LOCK_WAIT);});
+
+ if (check_foreign && !index->table->foreign_set.empty()) {
+ err = row_ins_check_foreign_constraints(index->table, index,
+ false, entry, thr);
+ if (err != DB_SUCCESS) {
+
+ return(err);
+ }
+ }
+
+ ut_ad(thr_get_trx(thr)->id != 0);
+
+ offsets_heap = mem_heap_create(1024);
+ heap = mem_heap_create(1024);
+
+ /* Try first optimistic descent to the B-tree */
+
+ log_free_check();
+ ulint flags = index->table->is_temporary()
+ ? BTR_NO_LOCKING_FLAG
+ : 0;
+
+ /* For intermediate table during copy alter table,
+ skip the undo log and record lock checking for
+ insertion operation.
+ */
+ if (index->table->skip_alter_undo) {
+ trx_id = thr_get_trx(thr)->id;
+ flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
+ }
+
+ err = row_ins_sec_index_entry_low(
+ flags, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry,
+ trx_id, thr);
+ if (err == DB_FAIL) {
+ mem_heap_empty(heap);
+
+ if (index->table->space == fil_system.sys_space
+ && !(index->type & (DICT_UNIQUE | DICT_SPATIAL))) {
+ ibuf_free_excess_pages();
+ }
+
+ /* Try then pessimistic descent to the B-tree */
+ log_free_check();
+
+ err = row_ins_sec_index_entry_low(
+ flags, BTR_INSERT_TREE, index,
+ offsets_heap, heap, entry, 0, thr);
+ }
+
+ mem_heap_free(heap);
+ mem_heap_free(offsets_heap);
+ return(err);
+}
+
+/***************************************************************//**
+Inserts an index entry to index. Tries first optimistic, then pessimistic
+descent down the tree. If the entry matches enough to a delete marked record,
+performs the insert by updating or delete unmarking the delete marked
+record.
+@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
+static
+dberr_t
+row_ins_index_entry(
+/*================*/
+ dict_index_t* index, /*!< in: index */
+ dtuple_t* entry, /*!< in/out: index entry to insert */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ trx_t* trx = thr_get_trx(thr);
+
+ ut_ad(trx->id || index->table->no_rollback()
+ || index->table->is_temporary());
+
+ DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
+ DBUG_SET("-d,row_ins_index_entry_timeout");
+ return(DB_LOCK_WAIT);});
+
+ if (index->is_btree()) {
+ if (auto t= trx->check_bulk_buffer(index->table)) {
+ /* MDEV-25036 FIXME: check also foreign key
+ constraints */
+ ut_ad(!trx->check_foreigns);
+ return t->bulk_insert_buffered(*entry, *index, trx);
+ }
+ }
+
+ if (index->is_primary()) {
+ return row_ins_clust_index_entry(index, entry, thr, 0);
+ } else {
+ return row_ins_sec_index_entry(index, entry, thr);
+ }
+}
+
+
+/*****************************************************************//**
+This function generate MBR (Minimum Bounding Box) for spatial objects
+and set it to spatial index field. */
+static
+void
+row_ins_spatial_index_entry_set_mbr_field(
+/*======================================*/
+ dfield_t* field, /*!< in/out: mbr field */
+ const dfield_t* row_field) /*!< in: row field */
+{
+ ulint dlen = 0;
+ double mbr[SPDIMS * 2];
+
+ /* This must be a GEOMETRY datatype */
+ ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
+
+ const byte* dptr = static_cast<const byte*>(
+ dfield_get_data(row_field));
+ dlen = dfield_get_len(row_field);
+
+ /* obtain the MBR */
+ rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
+ static_cast<uint>(dlen - GEO_DATA_HEADER_SIZE),
+ SPDIMS, mbr);
+
+ /* Set mbr as index entry data */
+ dfield_write_mbr(field, mbr);
+}
+
+/** Sets the values of the dtuple fields in entry from the values of appropriate
+columns in row.
+@param[in] index index handler
+@param[out] entry index entry to make
+@param[in] row row
+@return DB_SUCCESS if the set is successful */
+static
+dberr_t
+row_ins_index_entry_set_vals(
+ const dict_index_t* index,
+ dtuple_t* entry,
+ const dtuple_t* row)
+{
+ ulint n_fields;
+ ulint i;
+ ulint num_v = dtuple_get_n_v_fields(entry);
+
+ n_fields = dtuple_get_n_fields(entry);
+
+ for (i = 0; i < n_fields + num_v; i++) {
+ dict_field_t* ind_field = NULL;
+ dfield_t* field;
+ const dfield_t* row_field;
+ ulint len;
+ dict_col_t* col;
+
+ if (i >= n_fields) {
+ /* This is virtual field */
+ field = dtuple_get_nth_v_field(entry, i - n_fields);
+ col = &dict_table_get_nth_v_col(
+ index->table, i - n_fields)->m_col;
+ } else {
+ field = dtuple_get_nth_field(entry, i);
+ ind_field = dict_index_get_nth_field(index, i);
+ col = ind_field->col;
+ }
+
+ if (col->is_virtual()) {
+ const dict_v_col_t* v_col
+ = reinterpret_cast<const dict_v_col_t*>(col);
+ ut_ad(dtuple_get_n_fields(row)
+ == dict_table_get_n_cols(index->table));
+ row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
+ } else if (col->is_dropped()) {
+ ut_ad(index->is_primary());
+
+ if (!(col->prtype & DATA_NOT_NULL)) {
+ field->data = NULL;
+ field->len = UNIV_SQL_NULL;
+ field->type.prtype = DATA_BINARY_TYPE;
+ } else {
+ ut_ad(ind_field->fixed_len <= col->len);
+ dfield_set_data(field, field_ref_zero,
+ ind_field->fixed_len);
+ field->type.prtype = DATA_NOT_NULL;
+ }
+
+ field->type.mtype = col->len
+ ? DATA_FIXBINARY : DATA_BINARY;
+ continue;
+ } else {
+ row_field = dtuple_get_nth_field(
+ row, ind_field->col->ind);
+ }
+
+ len = dfield_get_len(row_field);
+
+ /* Check column prefix indexes */
+ if (ind_field != NULL && ind_field->prefix_len > 0
+ && len != UNIV_SQL_NULL) {
+
+ const dict_col_t* col
+ = dict_field_get_col(ind_field);
+
+ len = dtype_get_at_most_n_mbchars(
+ col->prtype, col->mbminlen, col->mbmaxlen,
+ ind_field->prefix_len,
+ len,
+ static_cast<const char*>(
+ dfield_get_data(row_field)));
+
+ ut_ad(!dfield_is_ext(row_field));
+ }
+
+ /* Handle spatial index. For the first field, replace
+ the data with its MBR (Minimum Bounding Box). */
+ if ((i == 0) && dict_index_is_spatial(index)) {
+ if (!row_field->data
+ || row_field->len < GEO_DATA_HEADER_SIZE) {
+ return(DB_CANT_CREATE_GEOMETRY_OBJECT);
+ }
+ row_ins_spatial_index_entry_set_mbr_field(
+ field, row_field);
+ continue;
+ }
+
+ dfield_set_data(field, dfield_get_data(row_field), len);
+ if (dfield_is_ext(row_field)) {
+ ut_ad(dict_index_is_clust(index));
+ dfield_set_ext(field);
+ }
+ }
+
+ return(DB_SUCCESS);
+}
+
+/***********************************************************//**
+Inserts a single index entry to the table.
+@return DB_SUCCESS if operation successfully completed, else error
+code or DB_LOCK_WAIT */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins_index_entry_step(
+/*=====================*/
+ ins_node_t* node, /*!< in: row insert node */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ dberr_t err;
+
+ DBUG_ENTER("row_ins_index_entry_step");
+
+ ut_ad(dtuple_check_typed(node->row));
+
+ err = row_ins_index_entry_set_vals(node->index, *node->entry,
+ node->row);
+
+ if (err != DB_SUCCESS) {
+ DBUG_RETURN(err);
+ }
+
+ ut_ad(dtuple_check_typed(*node->entry));
+
+ err = row_ins_index_entry(node->index, *node->entry, thr);
+
+ DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
+ "after_row_ins_index_entry_step");
+
+ DBUG_RETURN(err);
+}
+
+/***********************************************************//**
+Allocates a row id for row and inits the node->index field. */
+UNIV_INLINE
+void
+row_ins_alloc_row_id_step(
+/*======================*/
+ ins_node_t* node) /*!< in: row insert node */
+{
+ ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
+ if (dict_table_get_first_index(node->table)->is_gen_clust())
+ dict_sys_write_row_id(node->sys_buf, dict_sys.get_new_row_id());
+}
+
+/***********************************************************//**
+Gets a row to insert from the values list. */
+UNIV_INLINE
+void
+row_ins_get_row_from_values(
+/*========================*/
+ ins_node_t* node) /*!< in: row insert node */
+{
+ que_node_t* list_node;
+ dfield_t* dfield;
+ dtuple_t* row;
+ ulint i;
+
+ /* The field values are copied in the buffers of the select node and
+ it is safe to use them until we fetch from select again: therefore
+ we can just copy the pointers */
+
+ row = node->row;
+
+ i = 0;
+ list_node = node->values_list;
+
+ while (list_node) {
+ eval_exp(list_node);
+
+ dfield = dtuple_get_nth_field(row, i);
+ dfield_copy_data(dfield, que_node_get_val(list_node));
+
+ i++;
+ list_node = que_node_get_next(list_node);
+ }
+}
+
+/***********************************************************//**
+Gets a row to insert from the select list. */
+UNIV_INLINE
+void
+row_ins_get_row_from_select(
+/*========================*/
+ ins_node_t* node) /*!< in: row insert node */
+{
+ que_node_t* list_node;
+ dfield_t* dfield;
+ dtuple_t* row;
+ ulint i;
+
+ /* The field values are copied in the buffers of the select node and
+ it is safe to use them until we fetch from select again: therefore
+ we can just copy the pointers */
+
+ row = node->row;
+
+ i = 0;
+ list_node = node->select->select_list;
+
+ while (list_node) {
+ dfield = dtuple_get_nth_field(row, i);
+ dfield_copy_data(dfield, que_node_get_val(list_node));
+
+ i++;
+ list_node = que_node_get_next(list_node);
+ }
+}
+
+/***********************************************************//**
+Inserts a row to a table.
+@return DB_SUCCESS if operation successfully completed, else error
+code or DB_LOCK_WAIT */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+row_ins(
+/*====*/
+ ins_node_t* node, /*!< in: row insert node */
+ que_thr_t* thr) /*!< in: query thread */
+{
+ DBUG_ENTER("row_ins");
+
+ DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
+
+ if (node->state == INS_NODE_ALLOC_ROW_ID) {
+
+ row_ins_alloc_row_id_step(node);
+
+ node->index = dict_table_get_first_index(node->table);
+ ut_ad(node->entry_list.empty() == false);
+ node->entry = node->entry_list.begin();
+
+ if (node->ins_type == INS_SEARCHED) {
+
+ row_ins_get_row_from_select(node);
+
+ } else if (node->ins_type == INS_VALUES) {
+
+ row_ins_get_row_from_values(node);
+ }
+
+ node->state = INS_NODE_INSERT_ENTRIES;
+ }
+
+ ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
+
+ while (dict_index_t *index = node->index) {
+ if (index->type & (DICT_FTS | DICT_CORRUPT)
+ || !index->is_committed()) {
+ } else if (dberr_t err = row_ins_index_entry_step(node, thr)) {
+ DBUG_RETURN(err);
+ }
+ node->index = dict_table_get_next_index(index);
+ ++node->entry;
+ }
+
+ ut_ad(node->entry == node->entry_list.end());
+
+ node->state = INS_NODE_ALLOC_ROW_ID;
+
+ DBUG_RETURN(DB_SUCCESS);
+}
+
+/***********************************************************//**
+Inserts a row to a table. This is a high-level function used in SQL execution
+graphs.
+@return query thread to run next or NULL */
+que_thr_t*
+row_ins_step(
+/*=========*/
+ que_thr_t* thr) /*!< in: query thread */
+{
+ ins_node_t* node;
+ que_node_t* parent;
+ sel_node_t* sel_node;
+ trx_t* trx;
+ dberr_t err;
+
+ ut_ad(thr);
+
+ DEBUG_SYNC_C("innodb_row_ins_step_enter");
+
+ trx = thr_get_trx(thr);
+
+ node = static_cast<ins_node_t*>(thr->run_node);
+
+ ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
+
+ parent = que_node_get_parent(node);
+ sel_node = node->select;
+
+ if (thr->prev_node == parent) {
+ node->state = INS_NODE_SET_IX_LOCK;
+ }
+
+ /* If this is the first time this node is executed (or when
+ execution resumes after wait for the table IX lock), set an
+ IX lock on the table and reset the possible select node. MySQL's
+ partitioned table code may also call an insert within the same
+ SQL statement AFTER it has used this table handle to do a search.
+ This happens, for example, when a row update moves it to another
+ partition. In that case, we have already set the IX lock on the
+ table during the search operation, and there is no need to set
+ it again here. But we must write trx->id to node->sys_buf. */
+
+ if (node->table->no_rollback()) {
+ /* No-rollback tables should only be written to by a
+ single thread at a time, but there can be multiple
+ concurrent readers. We must hold an open table handle. */
+ DBUG_ASSERT(node->table->get_ref_count() > 0);
+ DBUG_ASSERT(node->ins_type == INS_DIRECT);
+ /* No-rollback tables can consist only of a single index. */
+ DBUG_ASSERT(node->entry_list.size() == 1);
+ DBUG_ASSERT(UT_LIST_GET_LEN(node->table->indexes) == 1);
+ /* There should be no possibility for interruption and
+ restarting here. In theory, we could allow resumption
+ from the INS_NODE_INSERT_ENTRIES state here. */
+ DBUG_ASSERT(node->state == INS_NODE_SET_IX_LOCK);
+ node->index = dict_table_get_first_index(node->table);
+ node->entry = node->entry_list.begin();
+ node->state = INS_NODE_INSERT_ENTRIES;
+ goto do_insert;
+ }
+
+ if (node->state == INS_NODE_SET_IX_LOCK) {
+
+ node->state = INS_NODE_ALLOC_ROW_ID;
+
+ if (node->table->is_temporary()) {
+ node->trx_id = trx->id;
+ }
+
+ /* It may be that the current session has not yet started
+ its transaction, or it has been committed: */
+
+ if (trx->id == node->trx_id) {
+ /* No need to do IX-locking */
+
+ goto same_trx;
+ }
+
+ err = lock_table(node->table, NULL, LOCK_IX, thr);
+
+ DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
+ err = DB_LOCK_WAIT;);
+
+ if (err != DB_SUCCESS) {
+ node->state = INS_NODE_SET_IX_LOCK;
+ goto error_handling;
+ }
+
+ node->trx_id = trx->id;
+same_trx:
+ if (node->ins_type == INS_SEARCHED) {
+ /* Reset the cursor */
+ sel_node->state = SEL_NODE_OPEN;
+
+ /* Fetch a row to insert */
+
+ thr->run_node = sel_node;
+
+ return(thr);
+ }
+ }
+
+ if ((node->ins_type == INS_SEARCHED)
+ && (sel_node->state != SEL_NODE_FETCH)) {
+
+ ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
+
+ /* No more rows to insert */
+ thr->run_node = parent;
+
+ return(thr);
+ }
+do_insert:
+ /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
+
+ err = row_ins(node, thr);
+
+error_handling:
+ trx->error_state = err;
+
+ if (err != DB_SUCCESS) {
+ /* err == DB_LOCK_WAIT or SQL error detected */
+ return(NULL);
+ }
+
+ /* DO THE TRIGGER ACTIONS HERE */
+
+ if (node->ins_type == INS_SEARCHED) {
+ /* Fetch a row to insert */
+
+ thr->run_node = sel_node;
+ } else {
+ thr->run_node = que_node_get_parent(node);
+ }
+
+ return(thr);
+}