summaryrefslogtreecommitdiffstats
path: root/storage/innobase/trx/trx0undo.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/trx/trx0undo.cc')
-rw-r--r--storage/innobase/trx/trx0undo.cc1478
1 files changed, 1478 insertions, 0 deletions
diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc
new file mode 100644
index 00000000..203edd9f
--- /dev/null
+++ b/storage/innobase/trx/trx0undo.cc
@@ -0,0 +1,1478 @@
+/*****************************************************************************
+
+Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2014, 2022, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file trx/trx0undo.cc
+Transaction undo log
+
+Created 3/26/1996 Heikki Tuuri
+*******************************************************/
+
+#include "trx0undo.h"
+#include "fsp0fsp.h"
+#include "mach0data.h"
+#include "mtr0log.h"
+#include "srv0mon.h"
+#include "srv0srv.h"
+#include "srv0start.h"
+#include "trx0purge.h"
+#include "trx0rec.h"
+#include "trx0rseg.h"
+#include "log.h"
+
+/* How should the old versions in the history list be managed?
+ ----------------------------------------------------------
+If each transaction is given a whole page for its update undo log, file
+space consumption can be 10 times higher than necessary. Therefore,
+partly filled update undo log pages should be reusable. But then there
+is no way individual pages can be ordered so that the ordering agrees
+with the serialization numbers of the transactions on the pages. Thus,
+the history list must be formed of undo logs, not their header pages as
+it was in the old implementation.
+ However, on a single header page the transactions are placed in
+the order of their serialization numbers. As old versions are purged, we
+may free the page when the last transaction on the page has been purged.
+ A problem is that the purge has to go through the transactions
+in the serialization order. This means that we have to look through all
+rollback segments for the one that has the smallest transaction number
+in its history list.
+ When should we do a purge? A purge is necessary when space is
+running out in any of the rollback segments. Then we may have to purge
+also old version which might be needed by some consistent read. How do
+we trigger the start of a purge? When a transaction writes to an undo log,
+it may notice that the space is running out. When a read view is closed,
+it may make some history superfluous. The server can have an utility which
+periodically checks if it can purge some history.
+ In a parallellized purge we have the problem that a query thread
+can remove a delete marked clustered index record before another query
+thread has processed an earlier version of the record, which cannot then
+be done because the row cannot be constructed from the clustered index
+record. To avoid this problem, we will store in the update and delete mark
+undo record also the columns necessary to construct the secondary index
+entries which are modified.
+ We can latch the stack of versions of a single clustered index record
+by taking a latch on the clustered index page. As long as the latch is held,
+no new versions can be added and no versions removed by undo. But, a purge
+can still remove old versions from the bottom of the stack. */
+
+/* How to protect rollback segments, undo logs, and history lists with
+ -------------------------------------------------------------------
+latches?
+-------
+When a transaction does its first insert or modify in the clustered index, an
+undo log is assigned for it. Then we must have an x-latch to the rollback
+segment header.
+ When the transaction performs modifications or rolls back, its
+undo log is protected by undo page latches.
+Only the thread that is associated with the transaction may hold multiple
+undo page latches at a time. Undo pages are always private to a single
+transaction. Other threads that are performing MVCC reads
+or checking for implicit locks will lock at most one undo page at a time
+in trx_undo_get_undo_rec_low().
+ When the transaction commits, its persistent undo log is added
+to the history list. If it is not suitable for reuse, its slot is reset.
+In both cases, an x-latch must be acquired on the rollback segment header page.
+ The purge operation steps through the history list without modifying
+it until a truncate operation occurs, which can remove undo logs from the end
+of the list and release undo log segments. In stepping through the list,
+s-latches on the undo log pages are enough, but in a truncate, x-latches must
+be obtained on the rollback segment and individual pages. */
+
+/********************************************************************//**
+Creates and initializes an undo log memory object.
+@return own: the undo log memory object */
+static
+trx_undo_t*
+trx_undo_mem_create(
+/*================*/
+ trx_rseg_t* rseg, /*!< in: rollback segment memory object */
+ ulint id, /*!< in: slot index within rseg */
+ trx_id_t trx_id, /*!< in: id of the trx for which the undo log
+ is created */
+ const XID* xid, /*!< in: X/Open XA transaction identification*/
+ uint32_t page_no,/*!< in: undo log header page number */
+ uint16_t offset);/*!< in: undo log header byte offset on page */
+
+/** Determine the start offset of undo log records of an undo log page.
+@param[in] block undo log page
+@param[in] page_no undo log header page number
+@param[in] offset undo log header offset
+@return start offset */
+static
+uint16_t trx_undo_page_get_start(const buf_block_t *block, uint32_t page_no,
+ uint16_t offset)
+{
+ return page_no == block->page.id().page_no()
+ ? mach_read_from_2(offset + TRX_UNDO_LOG_START + block->page.frame)
+ : TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE;
+}
+
+/** Get the first undo log record on a page.
+@param[in] block undo log page
+@param[in] page_no undo log header page number
+@param[in] offset undo log header page offset
+@return pointer to first record
+@retval nullptr if none exists */
+trx_undo_rec_t*
+trx_undo_page_get_first_rec(const buf_block_t *block, uint32_t page_no,
+ uint16_t offset)
+{
+ uint16_t start= trx_undo_page_get_start(block, page_no, offset);
+ return start == trx_undo_page_get_end(block, page_no, offset)
+ ? nullptr : block->page.frame + start;
+}
+
+/** Get the last undo log record on a page.
+@param[in] page undo log page
+@param[in] page_no undo log header page number
+@param[in] offset undo log header page offset
+@return pointer to last record
+@retval NULL if none exists */
+static
+trx_undo_rec_t*
+trx_undo_page_get_last_rec(const buf_block_t *block, uint32_t page_no,
+ uint16_t offset)
+{
+ uint16_t end= trx_undo_page_get_end(block, page_no, offset);
+ return trx_undo_page_get_start(block, page_no, offset) == end
+ ? nullptr
+ : block->page.frame + mach_read_from_2(block->page.frame + end - 2);
+}
+
+/** Get the previous record in an undo log from the previous page.
+@param[in,out] block undo log page
+@param[in] rec undo record offset in the page
+@param[in] page_no undo log header page number
+@param[in] offset undo log header offset on page
+@param[in] shared latching mode: true=RW_S_LATCH, false=RW_X_LATCH
+@param[in,out] mtr mini-transaction
+@return undo log record, the page latched, NULL if none */
+static trx_undo_rec_t*
+trx_undo_get_prev_rec_from_prev_page(buf_block_t *&block, uint16_t rec,
+ uint32_t page_no, uint16_t offset,
+ bool shared, mtr_t *mtr)
+{
+ uint32_t prev_page_no= mach_read_from_4(TRX_UNDO_PAGE_HDR +
+ TRX_UNDO_PAGE_NODE +
+ FLST_PREV + FIL_ADDR_PAGE +
+ block->page.frame);
+
+ if (prev_page_no == FIL_NULL)
+ return nullptr;
+
+ block= buf_page_get(page_id_t(block->page.id().space(), prev_page_no),
+ 0, shared ? RW_S_LATCH : RW_X_LATCH, mtr);
+
+ return block ? trx_undo_page_get_last_rec(block, page_no, offset) : nullptr;
+}
+
+/** Get the previous undo log record.
+@param[in] block undo log page
+@param[in] rec undo log record
+@param[in] page_no undo log header page number
+@param[in] offset undo log header page offset
+@return pointer to record
+@retval NULL if none */
+static
+trx_undo_rec_t*
+trx_undo_page_get_prev_rec(const buf_block_t *block, trx_undo_rec_t *rec,
+ uint32_t page_no, uint16_t offset)
+{
+ ut_ad(block->page.frame == page_align(rec));
+ return
+ rec == block->page.frame + trx_undo_page_get_start(block, page_no, offset)
+ ? nullptr
+ : block->page.frame + mach_read_from_2(rec - 2);
+}
+
+/** Get the previous record in an undo log.
+@param[in,out] block undo log page
+@param[in] rec undo record offset in the page
+@param[in] page_no undo log header page number
+@param[in] offset undo log header offset on page
+@param[in] shared latching mode: true=RW_S_LATCH, false=RW_X_LATCH
+@param[in,out] mtr mini-transaction
+@return undo log record, the page latched, NULL if none */
+trx_undo_rec_t*
+trx_undo_get_prev_rec(buf_block_t *&block, uint16_t rec, uint32_t page_no,
+ uint16_t offset, bool shared, mtr_t *mtr)
+{
+ if (trx_undo_rec_t *prev= trx_undo_page_get_prev_rec(block,
+ block->page.frame + rec,
+ page_no, offset))
+ return prev;
+
+ /* We have to go to the previous undo log page to look for the
+ previous record */
+
+ return trx_undo_get_prev_rec_from_prev_page(block, rec, page_no, offset,
+ shared, mtr);
+}
+
+/** Get the next record in an undo log from the next page.
+@param[in,out] block undo log page
+@param[in] page_no undo log header page number
+@param[in] offset undo log header offset on page
+@param[in] mode latching mode: RW_S_LATCH or RW_X_LATCH
+@param[in,out] mtr mini-transaction
+@return undo log record, the page latched, NULL if none */
+static trx_undo_rec_t*
+trx_undo_get_next_rec_from_next_page(const buf_block_t *&block,
+ uint32_t page_no, uint16_t offset,
+ ulint mode, mtr_t *mtr)
+{
+ if (page_no == block->page.id().page_no() &&
+ mach_read_from_2(block->page.frame + offset + TRX_UNDO_NEXT_LOG))
+ return nullptr;
+
+ uint32_t next= mach_read_from_4(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE +
+ FLST_NEXT + FIL_ADDR_PAGE +
+ block->page.frame);
+ if (next == FIL_NULL)
+ return nullptr;
+
+ block= buf_page_get_gen(page_id_t(block->page.id().space(), next), 0, mode,
+ nullptr, BUF_GET_POSSIBLY_FREED, mtr);
+
+ return block ? trx_undo_page_get_first_rec(block, page_no, offset) : nullptr;
+}
+
+/** Get the first record in an undo log.
+@param[in] space undo log header space
+@param[in] page_no undo log header page number
+@param[in] offset undo log header offset on page
+@param[in] mode latching mode: RW_S_LATCH or RW_X_LATCH
+@param[out] block undo log page
+@param[in,out] mtr mini-transaction
+@param[out] err error code
+@return undo log record, the page latched
+@retval nullptr if none */
+static trx_undo_rec_t*
+trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no,
+ uint16_t offset, ulint mode, const buf_block_t*& block,
+ mtr_t *mtr, dberr_t *err)
+{
+ block= buf_page_get_gen(page_id_t{space.id, page_no}, 0, mode,
+ nullptr, BUF_GET, mtr, err);
+ if (!block)
+ return nullptr;
+
+ if (trx_undo_rec_t *rec= trx_undo_page_get_first_rec(block, page_no, offset))
+ return rec;
+
+ return trx_undo_get_next_rec_from_next_page(block, page_no, offset, mode,
+ mtr);
+}
+
+inline void UndorecApplier::apply_undo_rec(const trx_undo_rec_t *rec)
+{
+ undo_rec= rec;
+ if (!undo_rec)
+ return;
+ offset= page_offset(undo_rec);
+
+ bool updated_extern= false;
+ undo_no_t undo_no= 0;
+ table_id_t table_id= 0;
+ undo_rec= trx_undo_rec_get_pars(undo_rec, &type,
+ &cmpl_info,
+ &updated_extern, &undo_no, &table_id);
+ dict_sys.freeze(SRW_LOCK_CALL);
+ dict_table_t *table= dict_sys.find_table(table_id);
+ dict_sys.unfreeze();
+
+ ut_ad(table);
+ if (!table->is_active_ddl())
+ return;
+
+ dict_index_t *index= dict_table_get_first_index(table);
+ const dtuple_t *undo_tuple;
+ switch (type) {
+ default:
+ ut_ad("invalid type" == 0);
+ MY_ASSERT_UNREACHABLE();
+ case TRX_UNDO_INSERT_REC:
+ undo_rec= trx_undo_rec_get_row_ref(undo_rec, index, &undo_tuple, heap);
+ insert:
+ log_insert(*undo_tuple, index);
+ break;
+ case TRX_UNDO_UPD_EXIST_REC:
+ case TRX_UNDO_UPD_DEL_REC:
+ case TRX_UNDO_DEL_MARK_REC:
+ trx_id_t trx_id;
+ roll_ptr_t roll_ptr;
+ byte info_bits;
+ undo_rec= trx_undo_update_rec_get_sys_cols(
+ undo_rec, &trx_id, &roll_ptr, &info_bits);
+
+ undo_rec= trx_undo_rec_get_row_ref(undo_rec, index, &undo_tuple, heap);
+ undo_rec= trx_undo_update_rec_get_update(undo_rec, index, type, trx_id,
+ roll_ptr, info_bits,
+ heap, &update);
+ if (type == TRX_UNDO_UPD_DEL_REC)
+ goto insert;
+ log_update(*undo_tuple, index);
+ }
+
+ clear_undo_rec();
+}
+
+/** Apply any changes to tables for which online DDL is in progress. */
+ATTRIBUTE_COLD void trx_t::apply_log()
+{
+ const trx_undo_t *undo= rsegs.m_redo.undo;
+ if (!undo || !undo_no)
+ return;
+ page_id_t page_id{rsegs.m_redo.rseg->space->id, undo->hdr_page_no};
+ page_id_t next_page_id(page_id);
+ mtr_t mtr;
+ mtr.start();
+ buf_block_t *block= buf_page_get(page_id, 0, RW_S_LATCH, &mtr);
+ if (UNIV_UNLIKELY(!block))
+ {
+ mtr.commit();
+ return;
+ }
+
+ UndorecApplier log_applier(page_id, id);
+
+ for (;;)
+ {
+ trx_undo_rec_t *rec= trx_undo_page_get_first_rec(block, page_id.page_no(),
+ undo->hdr_offset);
+ while (rec)
+ {
+ block->page.fix();
+ mtr.commit();
+ /* Since we are the only thread who could write to this undo page,
+ it is safe to dereference rec while only holding a buffer-fix. */
+ log_applier.apply_undo_rec(rec);
+ mtr.start();
+ mtr.page_lock(block, RW_S_LATCH);
+ rec= trx_undo_page_get_next_rec(block, page_offset(rec),
+ page_id.page_no(), undo->hdr_offset);
+ }
+
+ uint32_t next= mach_read_from_4(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE +
+ FLST_NEXT + FIL_ADDR_PAGE +
+ block->page.frame);
+ if (next == FIL_NULL)
+ break;
+ next_page_id.set_page_no(next);
+ mtr.commit();
+ mtr.start();
+ block= buf_page_get_gen(next_page_id, 0, RW_S_LATCH, block, BUF_GET, &mtr);
+ if (UNIV_UNLIKELY(!block))
+ break;
+ log_applier.assign_next(next_page_id);
+ }
+ mtr.commit();
+ apply_online_log= false;
+}
+
+/*============== UNDO LOG FILE COPY CREATION AND FREEING ==================*/
+
+/** Initialize an undo log page.
+NOTE: This corresponds to a redo log record and must not be changed!
+@see mtr_t::undo_create()
+@param block undo log page */
+void trx_undo_page_init(const buf_block_t &block)
+{
+ mach_write_to_2(my_assume_aligned<2>(FIL_PAGE_TYPE + block.page.frame),
+ FIL_PAGE_UNDO_LOG);
+ static_assert(TRX_UNDO_PAGE_HDR == FIL_PAGE_DATA, "compatibility");
+ memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE + block.page.frame,
+ 0, 2);
+ mach_write_to_2(my_assume_aligned<2>
+ (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_START + block.page.frame),
+ TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
+ memcpy_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE + block.page.frame,
+ TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_START + block.page.frame,
+ 2);
+ /* The following corresponds to flst_zero_both(), but without writing log. */
+ memset_aligned<4>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_PREV +
+ FIL_ADDR_PAGE + block.page.frame, 0xff, 4);
+ memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_PREV +
+ FIL_ADDR_BYTE + block.page.frame, 0, 2);
+ memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_NEXT +
+ FIL_ADDR_PAGE + block.page.frame, 0xff, 4);
+ memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_NEXT +
+ FIL_ADDR_BYTE + block.page.frame, 0, 2);
+ static_assert(TRX_UNDO_PAGE_NODE + FLST_NEXT + FIL_ADDR_BYTE + 2 ==
+ TRX_UNDO_PAGE_HDR_SIZE, "compatibility");
+ /* Preserve TRX_UNDO_SEG_HDR, but clear the rest of the page. */
+ memset_aligned<2>(TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE +
+ block.page.frame, 0,
+ srv_page_size - (TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE +
+ FIL_PAGE_DATA_END));
+}
+
+/** Look for a free slot for an undo log segment.
+@param rseg_header rollback segment header
+@return slot index
+@retval ULINT_UNDEFINED if not found */
+static ulint trx_rsegf_undo_find_free(const buf_block_t *rseg_header)
+{
+ ulint max_slots= TRX_RSEG_N_SLOTS;
+
+#ifdef UNIV_DEBUG
+ if (trx_rseg_n_slots_debug)
+ max_slots= std::min<ulint>(trx_rseg_n_slots_debug, TRX_RSEG_N_SLOTS);
+#endif
+
+ for (ulint i= 0; i < max_slots; i++)
+ if (trx_rsegf_get_nth_undo(rseg_header, i) == FIL_NULL)
+ return i;
+
+ return ULINT_UNDEFINED;
+}
+
+/** Create an undo log segment.
+@param[in,out] space tablespace
+@param[in,out] rseg_hdr rollback segment header (x-latched)
+@param[out] id undo slot number
+@param[out] err error code
+@param[in,out] mtr mini-transaction
+@return undo log block
+@retval NULL on failure */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+buf_block_t*
+trx_undo_seg_create(fil_space_t *space, buf_block_t *rseg_hdr, ulint *id,
+ dberr_t *err, mtr_t *mtr)
+{
+ buf_block_t* block;
+ uint32_t n_reserved;
+
+ const ulint slot_no = trx_rsegf_undo_find_free(rseg_hdr);
+
+ if (slot_no == ULINT_UNDEFINED) {
+ ib::warn() << "Cannot find a free slot for an undo log. Do"
+ " you have too many active transactions running"
+ " concurrently?";
+
+ *err = DB_TOO_MANY_CONCURRENT_TRXS;
+ return NULL;
+ }
+
+ ut_ad(slot_no < TRX_RSEG_N_SLOTS);
+
+ *err = fsp_reserve_free_extents(&n_reserved, space, 2, FSP_UNDO, mtr);
+ if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
+ return NULL;
+ }
+
+ /* Allocate a new file segment for the undo log */
+ block = fseg_create(space, TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER,
+ mtr, err, true);
+
+ space->release_free_extents(n_reserved);
+
+ if (!block) {
+ return block;
+ }
+
+ mtr->undo_create(*block);
+ trx_undo_page_init(*block);
+
+ mtr->write<2>(*block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
+ + block->page.frame,
+ TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE);
+ mtr->write<2,mtr_t::MAYBE_NOP>(*block,
+ TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG
+ + block->page.frame, 0U);
+
+ flst_init(*block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST
+ + block->page.frame, mtr);
+
+ *err = flst_add_last(block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
+ block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
+ mtr);
+
+ *id = slot_no;
+ mtr->write<4>(*rseg_hdr, TRX_RSEG + TRX_RSEG_UNDO_SLOTS
+ + slot_no * TRX_RSEG_SLOT_SIZE + rseg_hdr->page.frame,
+ block->page.id().page_no());
+
+ *err = DB_SUCCESS;
+ return block;
+}
+
+/** Initialize an undo log header.
+@param[in,out] undo_page undo log segment header page
+@param[in] trx_id transaction identifier
+@param[in,out] mtr mini-transaction
+@return header byte offset on page */
+static uint16_t trx_undo_header_create(buf_block_t *undo_page, trx_id_t trx_id,
+ mtr_t* mtr)
+{
+ /* Reset the TRX_UNDO_PAGE_TYPE in case this page is being
+ repurposed after upgrading to MariaDB 10.3. */
+ byte *undo_type= my_assume_aligned<2>
+ (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE + undo_page->page.frame);
+ ut_ad(mach_read_from_2(undo_type) <= 2);
+ mtr->write<2,mtr_t::MAYBE_NOP>(*undo_page, undo_type, 0U);
+ byte *start= my_assume_aligned<4>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_START +
+ undo_page->page.frame);
+ const uint16_t free= mach_read_from_2(start + 2);
+ static_assert(TRX_UNDO_PAGE_START + 2 == TRX_UNDO_PAGE_FREE,
+ "compatibility");
+ ut_a(free + TRX_UNDO_LOG_XA_HDR_SIZE < srv_page_size - 100);
+
+ mach_write_to_2(start, free + TRX_UNDO_LOG_XA_HDR_SIZE);
+ /* A WRITE of 2 bytes is never longer than a MEMMOVE.
+ So, WRITE 2+2 bytes is better than WRITE+MEMMOVE.
+ But, a MEMSET will only be 1+2 bytes, that is, 1 byte shorter! */
+ memcpy_aligned<2>(start + 2, start, 2);
+ mtr->memset(*undo_page, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_START, 4,
+ start, 2);
+ uint16_t prev_log= mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG +
+ undo_page->page.frame);
+ ut_ad(prev_log < free);
+ alignas(4) byte buf[4];
+ mach_write_to_2(buf, TRX_UNDO_ACTIVE);
+ mach_write_to_2(buf + 2, free);
+ static_assert(TRX_UNDO_STATE + 2 == TRX_UNDO_LAST_LOG, "compatibility");
+ static_assert(!((TRX_UNDO_SEG_HDR + TRX_UNDO_STATE) % 4), "alignment");
+ mtr->memcpy<mtr_t::MAYBE_NOP>
+ (*undo_page, my_assume_aligned<4>
+ (TRX_UNDO_SEG_HDR + TRX_UNDO_STATE + undo_page->page.frame), buf, 4);
+ if (prev_log)
+ mtr->write<2>(*undo_page, prev_log + TRX_UNDO_NEXT_LOG +
+ undo_page->page.frame, free);
+ mtr->write<8,mtr_t::MAYBE_NOP>(*undo_page, free + TRX_UNDO_TRX_ID +
+ undo_page->page.frame, trx_id);
+ if (UNIV_UNLIKELY(mach_read_from_8(free + TRX_UNDO_TRX_NO +
+ undo_page->page.frame) != 0))
+ mtr->memset(undo_page, free + TRX_UNDO_TRX_NO, 8, 0);
+
+ /* Write TRX_UNDO_NEEDS_PURGE=1 and TRX_UNDO_LOG_START. */
+ mach_write_to_2(buf, 1);
+ memcpy_aligned<2>(buf + 2, start, 2);
+ static_assert(TRX_UNDO_NEEDS_PURGE + 2 == TRX_UNDO_LOG_START,
+ "compatibility");
+ mtr->memcpy<mtr_t::MAYBE_NOP>(*undo_page, free + TRX_UNDO_NEEDS_PURGE +
+ undo_page->page.frame, buf, 4);
+ /* Initialize all fields TRX_UNDO_XID_EXISTS to TRX_UNDO_HISTORY_NODE. */
+ if (prev_log)
+ {
+ mtr->memset(undo_page, free + TRX_UNDO_XID_EXISTS,
+ TRX_UNDO_PREV_LOG - TRX_UNDO_XID_EXISTS, 0);
+ mtr->write<2,mtr_t::MAYBE_NOP>(*undo_page, free + TRX_UNDO_PREV_LOG +
+ undo_page->page.frame, prev_log);
+ static_assert(TRX_UNDO_PREV_LOG + 2 == TRX_UNDO_HISTORY_NODE,
+ "compatibility");
+ mtr->memset(undo_page, free + TRX_UNDO_HISTORY_NODE, FLST_NODE_SIZE, 0);
+ static_assert(TRX_UNDO_LOG_OLD_HDR_SIZE == TRX_UNDO_HISTORY_NODE +
+ FLST_NODE_SIZE, "compatibility");
+ }
+ else
+ mtr->memset(undo_page, free + TRX_UNDO_XID_EXISTS,
+ TRX_UNDO_LOG_OLD_HDR_SIZE - TRX_UNDO_XID_EXISTS, 0);
+ return free;
+}
+
+/** Write X/Open XA Transaction Identifier (XID) to undo log header
+@param[in,out] block undo header page
+@param[in] offset undo header record offset
+@param[in] xid distributed transaction identifier
+@param[in,out] mtr mini-transaction */
+static void trx_undo_write_xid(buf_block_t *block, uint16_t offset,
+ const XID &xid, mtr_t *mtr)
+{
+ DBUG_ASSERT(xid.gtrid_length > 0);
+ DBUG_ASSERT(xid.bqual_length >= 0);
+ DBUG_ASSERT(xid.gtrid_length <= MAXGTRIDSIZE);
+ DBUG_ASSERT(xid.bqual_length <= MAXBQUALSIZE);
+ static_assert(MAXGTRIDSIZE + MAXBQUALSIZE == XIDDATASIZE,
+ "gtrid and bqual don't fit xid data");
+ DBUG_ASSERT(mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG +
+ block->page.frame) == offset);
+
+ trx_ulogf_t* log_hdr= block->page.frame + offset;
+
+ mtr->write<4,mtr_t::MAYBE_NOP>(*block, log_hdr + TRX_UNDO_XA_FORMAT,
+ static_cast<uint32_t>(xid.formatID));
+ mtr->write<4,mtr_t::MAYBE_NOP>(*block, log_hdr + TRX_UNDO_XA_TRID_LEN,
+ static_cast<uint32_t>(xid.gtrid_length));
+ mtr->write<4,mtr_t::MAYBE_NOP>(*block, log_hdr + TRX_UNDO_XA_BQUAL_LEN,
+ static_cast<uint32_t>(xid.bqual_length));
+ const ulint xid_length= static_cast<ulint>(xid.gtrid_length
+ + xid.bqual_length);
+ mtr->memcpy<mtr_t::MAYBE_NOP>(*block,
+ &block->page.frame[offset + TRX_UNDO_XA_XID],
+ xid.data, xid_length);
+ if (UNIV_LIKELY(xid_length < XIDDATASIZE))
+ mtr->memset(block, offset + TRX_UNDO_XA_XID + xid_length,
+ XIDDATASIZE - xid_length, 0);
+}
+
+/********************************************************************//**
+Read X/Open XA Transaction Identification (XID) from undo log header */
+static
+void
+trx_undo_read_xid(const trx_ulogf_t* log_hdr, XID* xid)
+{
+ xid->formatID=static_cast<long>(mach_read_from_4(
+ log_hdr + TRX_UNDO_XA_FORMAT));
+
+ xid->gtrid_length=static_cast<long>(mach_read_from_4(
+ log_hdr + TRX_UNDO_XA_TRID_LEN));
+
+ xid->bqual_length=static_cast<long>(mach_read_from_4(
+ log_hdr + TRX_UNDO_XA_BQUAL_LEN));
+
+ memcpy(xid->data, log_hdr + TRX_UNDO_XA_XID, XIDDATASIZE);
+}
+
+/** Allocate an undo log page.
+@param[in,out] undo undo log
+@param[in,out] mtr mini-transaction that does not hold any page latch
+@param[out] err error code
+@return X-latched block if success
+@retval nullptr on failure */
+buf_block_t *trx_undo_add_page(trx_undo_t *undo, mtr_t *mtr, dberr_t *err)
+{
+ buf_block_t *new_block= nullptr;
+ uint32_t n_reserved;
+
+ /* When we add a page to an undo log, this is analogous to
+ a pessimistic insert in a B-tree, and we must reserve the
+ counterpart of the tree latch, which is the rseg mutex. */
+
+ trx_rseg_t *rseg= undo->rseg;
+ rseg->latch.wr_lock(SRW_LOCK_CALL);
+
+ buf_block_t *header_block=
+ buf_page_get_gen(page_id_t{rseg->space->id, undo->hdr_page_no},
+ 0, RW_X_LATCH, nullptr, BUF_GET, mtr, err);
+ if (!header_block)
+ goto func_exit;
+ *err= fsp_reserve_free_extents(&n_reserved, rseg->space, 1, FSP_UNDO, mtr);
+
+ if (UNIV_UNLIKELY(*err != DB_SUCCESS))
+ goto func_exit;
+
+ new_block=
+ fseg_alloc_free_page_general(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER +
+ header_block->page.frame,
+ undo->top_page_no + 1, FSP_UP, true,
+ mtr, mtr, err);
+ rseg->space->release_free_extents(n_reserved);
+
+ if (!new_block)
+ goto func_exit;
+
+ undo->last_page_no= new_block->page.id().page_no();
+
+ mtr->undo_create(*new_block);
+ trx_undo_page_init(*new_block);
+ *err= flst_add_last(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
+ new_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE, mtr);
+ if (UNIV_UNLIKELY(*err != DB_SUCCESS))
+ new_block= nullptr;
+ else
+ {
+ undo->size++;
+ rseg->curr_size++;
+ }
+
+func_exit:
+ rseg->latch.wr_unlock();
+ return new_block;
+}
+
+/********************************************************************//**
+Frees an undo log page that is not the header page.
+@return last page number in remaining log */
+static
+uint32_t
+trx_undo_free_page(
+/*===============*/
+ trx_rseg_t* rseg, /*!< in: rollback segment */
+ bool in_history, /*!< in: TRUE if the undo log is in the history
+ list */
+ uint32_t hdr_page_no, /*!< in: header page number */
+ uint32_t page_no, /*!< in: page number to free: must not be the
+ header page */
+ mtr_t* mtr, /*!< in: mtr which does not have a latch to any
+ undo log page; the caller must have reserved
+ the rollback segment mutex */
+ dberr_t* err) /*!< out: error code */
+{
+ ut_a(hdr_page_no != page_no);
+
+ buf_block_t* undo_block = buf_page_get_gen(page_id_t(rseg->space->id,
+ page_no),
+ 0, RW_X_LATCH, nullptr,
+ BUF_GET, mtr, err);
+ if (UNIV_UNLIKELY(!undo_block)) {
+ return FIL_NULL;
+ }
+ buf_block_t* header_block = buf_page_get_gen(page_id_t(rseg->space->id,
+ hdr_page_no),
+ 0, RW_X_LATCH, nullptr,
+ BUF_GET, mtr, err);
+ if (UNIV_UNLIKELY(!header_block)) {
+ return FIL_NULL;
+ }
+
+ *err = flst_remove(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
+ undo_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
+ mtr);
+
+ if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
+ return FIL_NULL;
+ }
+
+ *err = fseg_free_page(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER
+ + header_block->page.frame,
+ rseg->space, page_no, mtr);
+ if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
+ return FIL_NULL;
+ }
+ buf_page_free(rseg->space, page_no, mtr);
+
+ const fil_addr_t last_addr = flst_get_last(
+ TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST
+ + header_block->page.frame);
+ rseg->curr_size--;
+
+ if (!in_history) {
+ } else if (buf_block_t* rseg_header = rseg->get(mtr, err)) {
+ byte* rseg_hist_size = TRX_RSEG + TRX_RSEG_HISTORY_SIZE
+ + rseg_header->page.frame;
+ uint32_t hist_size = mach_read_from_4(rseg_hist_size);
+ ut_ad(hist_size > 0);
+ mtr->write<4>(*rseg_header, rseg_hist_size, hist_size - 1);
+ } else {
+ return FIL_NULL;
+ }
+
+ return(last_addr.page);
+}
+
+/** Free the last undo log page. The caller must hold the rseg mutex.
+@param[in,out] undo undo log
+@param[in,out] mtr mini-transaction that does not hold any undo log page
+ or that has allocated the undo log page
+@return error code */
+dberr_t trx_undo_free_last_page(trx_undo_t *undo, mtr_t *mtr)
+{
+ ut_ad(undo->hdr_page_no != undo->last_page_no);
+ ut_ad(undo->size > 0);
+ undo->size--;
+
+ dberr_t err;
+ undo->last_page_no= trx_undo_free_page(undo->rseg, false, undo->hdr_page_no,
+ undo->last_page_no, mtr, &err);
+ return err;
+}
+
+/** Truncate the tail of an undo log during rollback.
+@param[in,out] undo undo log
+@param[in] limit all undo logs after this limit will be discarded
+@param[in] is_temp whether this is temporary undo log
+@return error code */
+static dberr_t trx_undo_truncate_end(trx_undo_t &undo, undo_no_t limit,
+ bool is_temp)
+{
+ ut_ad(is_temp == !undo.rseg->is_persistent());
+
+ for (mtr_t mtr;;)
+ {
+ mtr.start();
+ if (is_temp)
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
+
+ trx_undo_rec_t *trunc_here= nullptr;
+ undo.rseg->latch.wr_lock(SRW_LOCK_CALL);
+ dberr_t err;
+ buf_block_t *undo_block=
+ buf_page_get_gen(page_id_t{undo.rseg->space->id, undo.last_page_no},
+ 0, RW_X_LATCH, nullptr, BUF_GET, &mtr, &err);
+ if (UNIV_UNLIKELY(!undo_block))
+ goto func_exit;
+
+ for (trx_undo_rec_t *rec=
+ trx_undo_page_get_last_rec(undo_block,
+ undo.hdr_page_no, undo.hdr_offset);
+ rec; )
+ {
+ if (trx_undo_rec_get_undo_no(rec) < limit)
+ goto func_exit;
+ /* Truncate at least this record off, maybe more */
+ trunc_here= rec;
+ rec= trx_undo_page_get_prev_rec(undo_block, rec,
+ undo.hdr_page_no, undo.hdr_offset);
+ }
+
+ if (undo.last_page_no != undo.hdr_page_no)
+ {
+ err= trx_undo_free_last_page(&undo, &mtr);
+ if (UNIV_UNLIKELY(err != DB_SUCCESS))
+ goto func_exit;
+ undo.rseg->latch.wr_unlock();
+ mtr.commit();
+ continue;
+ }
+
+func_exit:
+ undo.rseg->latch.wr_unlock();
+
+ if (trunc_here && err == DB_SUCCESS)
+ mtr.write<2>(*undo_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE +
+ undo_block->page.frame,
+ ulint(trunc_here - undo_block->page.frame));
+
+ mtr.commit();
+ return err;
+ }
+}
+
+/** Try to truncate the undo logs.
+@param trx transaction
+@return error code */
+dberr_t trx_undo_try_truncate(const trx_t &trx)
+{
+ if (trx_undo_t *undo= trx.rsegs.m_redo.undo)
+ {
+ ut_ad(undo->rseg == trx.rsegs.m_redo.rseg);
+ if (dberr_t err= trx_undo_truncate_end(*undo, trx.undo_no, false))
+ return err;
+ }
+
+ if (trx_undo_t *undo = trx.rsegs.m_noredo.undo)
+ {
+ ut_ad(undo->rseg == trx.rsegs.m_noredo.rseg);
+ if (dberr_t err= trx_undo_truncate_end(*undo, trx.undo_no, true))
+ return err;
+ }
+
+ return DB_SUCCESS;
+}
+
+/** Truncate the head of an undo log.
+NOTE that only whole pages are freed; the header page is not
+freed, but emptied, if all the records there are below the limit.
+@param[in,out] rseg rollback segment
+@param[in] hdr_page_no header page number
+@param[in] hdr_offset header offset on the page
+@param[in] limit first undo number to preserve
+(everything below the limit will be truncated)
+@return error code */
+dberr_t
+trx_undo_truncate_start(
+ trx_rseg_t* rseg,
+ uint32_t hdr_page_no,
+ uint16_t hdr_offset,
+ undo_no_t limit)
+{
+ trx_undo_rec_t* rec;
+ trx_undo_rec_t* last_rec;
+ mtr_t mtr;
+
+ if (!limit) {
+ return DB_SUCCESS;
+ }
+loop:
+ mtr_start(&mtr);
+
+ if (!rseg->is_persistent()) {
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
+ }
+
+ dberr_t err;
+ const buf_block_t* undo_page;
+ rec = trx_undo_get_first_rec(*rseg->space, hdr_page_no, hdr_offset,
+ RW_X_LATCH, undo_page, &mtr, &err);
+ if (rec == NULL) {
+ /* Already empty */
+done:
+ mtr.commit();
+ return err;
+ }
+
+ last_rec = trx_undo_page_get_last_rec(undo_page, hdr_page_no,
+ hdr_offset);
+ if (trx_undo_rec_get_undo_no(last_rec) >= limit) {
+ goto done;
+ }
+
+ if (undo_page->page.id().page_no() == hdr_page_no) {
+ uint16_t end = mach_read_from_2(hdr_offset + TRX_UNDO_NEXT_LOG
+ + undo_page->page.frame);
+ if (end == 0) {
+ end = mach_read_from_2(TRX_UNDO_PAGE_HDR
+ + TRX_UNDO_PAGE_FREE
+ + undo_page->page.frame);
+ }
+
+ mtr.write<2>(*undo_page, undo_page->page.frame + hdr_offset
+ + TRX_UNDO_LOG_START, end);
+ } else {
+ trx_undo_free_page(rseg, true, hdr_page_no,
+ undo_page->page.id().page_no(), &mtr, &err);
+ if (err != DB_SUCCESS) {
+ goto done;
+ }
+ }
+
+ mtr.commit();
+ goto loop;
+}
+
+/*========== UNDO LOG MEMORY COPY INITIALIZATION =====================*/
+
+/** Read an undo log when starting up the database.
+@param[in,out] rseg rollback segment
+@param[in] id rollback segment slot
+@param[in] page_no undo log segment page number
+@return the undo log
+@retval nullptr on error */
+trx_undo_t *
+trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no)
+{
+ mtr_t mtr;
+ XID xid;
+
+ ut_ad(id < TRX_RSEG_N_SLOTS);
+
+ mtr.start();
+ const buf_block_t* block = buf_page_get(
+ page_id_t(rseg->space->id, page_no), 0, RW_X_LATCH, &mtr);
+ if (UNIV_UNLIKELY(!block)) {
+corrupted:
+ mtr.commit();
+ return nullptr;
+ }
+
+ const uint16_t type = mach_read_from_2(TRX_UNDO_PAGE_HDR
+ + TRX_UNDO_PAGE_TYPE
+ + block->page.frame);
+ if (UNIV_UNLIKELY(type > 2)) {
+corrupted_type:
+ sql_print_error("InnoDB: unsupported undo header type %u",
+ type);
+ goto corrupted;
+ }
+
+ uint16_t offset = mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG
+ + block->page.frame);
+ if (offset < TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE ||
+ offset >= srv_page_size - TRX_UNDO_LOG_OLD_HDR_SIZE) {
+ sql_print_error("InnoDB: invalid undo header offset %u",
+ offset);
+ goto corrupted;
+ }
+
+ const trx_ulogf_t* const undo_header = block->page.frame + offset;
+ uint16_t state = mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ + block->page.frame);
+
+ const trx_id_t trx_id= mach_read_from_8(undo_header + TRX_UNDO_TRX_ID);
+ if (trx_id >> 48) {
+ sql_print_error("InnoDB: corrupted TRX_ID %llx", trx_id);
+ goto corrupted;
+ }
+ /* We will increment rseg->needs_purge, like trx_undo_reuse_cached()
+ would do it, to avoid trouble on rollback or XA COMMIT. */
+ trx_id_t trx_no = trx_id + 1;
+
+ switch (state) {
+ case TRX_UNDO_ACTIVE:
+ case TRX_UNDO_PREPARED:
+ if (UNIV_LIKELY(type != 1)) {
+ break;
+ }
+ sql_print_error("InnoDB: upgrade from older version than"
+ " MariaDB 10.3 requires clean shutdown");
+ goto corrupted;
+ default:
+ sql_print_error("InnoDB: unsupported undo header state %u",
+ state);
+ goto corrupted;
+ case TRX_UNDO_CACHED:
+ if (UNIV_UNLIKELY(type != 0)) {
+ /* This undo page was not updated by MariaDB
+ 10.3 or later. The TRX_UNDO_TRX_NO field may
+ contain garbage. */
+ break;
+ }
+ goto read_trx_no;
+ case TRX_UNDO_TO_PURGE:
+ if (UNIV_UNLIKELY(type == 1)) {
+ goto corrupted_type;
+ }
+ read_trx_no:
+ trx_no = mach_read_from_8(TRX_UNDO_TRX_NO + undo_header);
+ if (trx_no >> 48) {
+ sql_print_error("InnoDB: corrupted TRX_NO %llx",
+ trx_no);
+ goto corrupted;
+ }
+ if (trx_no < trx_id) {
+ trx_no = trx_id;
+ }
+ }
+
+ /* Read X/Open XA transaction identification if it exists, or
+ set it to NULL. */
+
+ if (undo_header[TRX_UNDO_XID_EXISTS]) {
+ trx_undo_read_xid(undo_header, &xid);
+ } else {
+ xid.null();
+ }
+
+ if (trx_no > rseg->needs_purge) {
+ rseg->needs_purge = trx_no;
+ }
+
+ trx_undo_t* undo = trx_undo_mem_create(
+ rseg, id, trx_id, &xid, page_no, offset);
+ if (!undo) {
+ return undo;
+ }
+
+ undo->dict_operation = undo_header[TRX_UNDO_DICT_TRANS];
+ undo->size = flst_get_len(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST
+ + block->page.frame);
+
+ fil_addr_t last_addr = flst_get_last(
+ TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->page.frame);
+
+ undo->last_page_no = last_addr.page;
+ undo->top_page_no = last_addr.page;
+
+ const buf_block_t* last = buf_page_get(
+ page_id_t(rseg->space->id, undo->last_page_no), 0,
+ RW_X_LATCH, &mtr);
+
+ if (UNIV_UNLIKELY(!last)) {
+ ut_free(undo);
+ goto corrupted;
+ }
+
+ if (const trx_undo_rec_t* rec = trx_undo_page_get_last_rec(
+ last, page_no, offset)) {
+ undo->top_offset = static_cast<uint16_t>(
+ rec - last->page.frame);
+ undo->top_undo_no = trx_undo_rec_get_undo_no(rec);
+ ut_ad(!undo->empty());
+ } else {
+ undo->top_undo_no = IB_ID_MAX;
+ ut_ad(undo->empty());
+ }
+
+ undo->state = state;
+
+ if (state != TRX_UNDO_CACHED) {
+ UT_LIST_ADD_LAST(rseg->undo_list, undo);
+ } else {
+ UT_LIST_ADD_LAST(rseg->undo_cached, undo);
+ }
+
+ mtr.commit();
+ return undo;
+}
+
+/********************************************************************//**
+Creates and initializes an undo log memory object.
+@return own: the undo log memory object */
+static
+trx_undo_t*
+trx_undo_mem_create(
+/*================*/
+ trx_rseg_t* rseg, /*!< in: rollback segment memory object */
+ ulint id, /*!< in: slot index within rseg */
+ trx_id_t trx_id, /*!< in: id of the trx for which the undo log
+ is created */
+ const XID* xid, /*!< in: X/Open transaction identification */
+ uint32_t page_no,/*!< in: undo log header page number */
+ uint16_t offset) /*!< in: undo log header byte offset on page */
+{
+ trx_undo_t* undo;
+
+ ut_a(id < TRX_RSEG_N_SLOTS);
+
+ undo = static_cast<trx_undo_t*>(ut_malloc_nokey(sizeof(*undo)));
+
+ if (undo == NULL) {
+
+ return(NULL);
+ }
+
+ undo->id = id;
+ undo->state = TRX_UNDO_ACTIVE;
+ undo->trx_id = trx_id;
+ undo->xid = *xid;
+
+ undo->dict_operation = FALSE;
+
+ undo->rseg = rseg;
+
+ undo->hdr_page_no = page_no;
+ undo->hdr_offset = offset;
+ undo->last_page_no = page_no;
+ undo->size = 1;
+
+ undo->top_undo_no = IB_ID_MAX;
+ undo->top_page_no = page_no;
+ undo->guess_block = NULL;
+ ut_ad(undo->empty());
+
+ return(undo);
+}
+
+/********************************************************************//**
+Initializes a cached undo log object for new use. */
+static
+void
+trx_undo_mem_init_for_reuse(
+/*========================*/
+ trx_undo_t* undo, /*!< in: undo log to init */
+ trx_id_t trx_id, /*!< in: id of the trx for which the undo log
+ is created */
+ const XID* xid, /*!< in: X/Open XA transaction identification*/
+ uint16_t offset) /*!< in: undo log header byte offset on page */
+{
+ ut_a(undo->id < TRX_RSEG_N_SLOTS);
+
+ undo->state = TRX_UNDO_ACTIVE;
+ undo->trx_id = trx_id;
+ undo->xid = *xid;
+
+ undo->dict_operation = FALSE;
+
+ undo->hdr_offset = offset;
+ undo->top_undo_no = IB_ID_MAX;
+ ut_ad(undo->empty());
+}
+
+/** Create an undo log.
+@param[in,out] trx transaction
+@param[in,out] rseg rollback segment
+@param[out] undo undo log object
+@param[out] err error code
+@param[in,out] mtr mini-transaction
+@return undo log block
+@retval NULL on failure */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+buf_block_t*
+trx_undo_create(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
+ dberr_t* err, mtr_t* mtr)
+{
+ ulint id;
+ buf_block_t* block = rseg->get(mtr, err);
+
+ if (block) {
+ block = trx_undo_seg_create(rseg->space, block, &id, err, mtr);
+ }
+
+ if (!block) {
+ return NULL;
+ }
+
+ rseg->curr_size++;
+
+ uint16_t offset = trx_undo_header_create(block, trx->id, mtr);
+
+ *undo = trx_undo_mem_create(rseg, id, trx->id, &trx->xid,
+ block->page.id().page_no(), offset);
+ if (*undo == NULL) {
+ *err = DB_OUT_OF_MEMORY;
+ /* FIXME: this will not free the undo block to the file */
+ return NULL;
+ } else if (rseg != trx->rsegs.m_redo.rseg) {
+ return block;
+ }
+
+ if (trx->dict_operation) {
+ (*undo)->dict_operation = true;
+ mtr->write<1,mtr_t::MAYBE_NOP>(*block,
+ block->page.frame + offset
+ + TRX_UNDO_DICT_TRANS, 1U);
+ mtr->write<8,mtr_t::MAYBE_NOP>(*block,
+ block->page.frame + offset
+ + TRX_UNDO_TABLE_ID, 0U);
+ }
+
+ *err = DB_SUCCESS;
+ return block;
+}
+
+/*================ UNDO LOG ASSIGNMENT AND CLEANUP =====================*/
+
+/** Reuse a cached undo log block.
+@param[in,out] trx transaction
+@param[in,out] rseg rollback segment
+@param[out] pundo the undo log memory object
+@param[in,out] mtr mini-transaction
+@param[out] err error code
+@return the undo log block
+@retval NULL if none cached */
+static
+buf_block_t*
+trx_undo_reuse_cached(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** pundo,
+ mtr_t* mtr, dberr_t *err)
+{
+ ut_ad(rseg->is_persistent());
+ ut_ad(rseg->is_referenced());
+ ut_ad(rseg == trx->rsegs.m_redo.rseg);
+
+ if (rseg->needs_purge <= trx->id) {
+ /* trx_purge_truncate_history() checks
+ purge_sys.sees(rseg.needs_purge)
+ so we need to compensate for that.
+ The rseg->needs_purge after crash
+ recovery would be at least trx->id + 1,
+ because that is the minimum possible value
+ assigned by trx_serialise() on commit. */
+ rseg->needs_purge = trx->id + 1;
+ }
+
+ trx_undo_t* undo = UT_LIST_GET_FIRST(rseg->undo_cached);
+ if (!undo) {
+ return NULL;
+ }
+
+ ut_ad(undo->size == 1);
+ ut_ad(undo->id < TRX_RSEG_N_SLOTS);
+
+ buf_block_t* block = buf_page_get_gen(page_id_t(undo->rseg->space->id,
+ undo->hdr_page_no),
+ 0, RW_X_LATCH, nullptr, BUF_GET,
+ mtr, err);
+ if (!block) {
+ return NULL;
+ }
+
+ UT_LIST_REMOVE(rseg->undo_cached, undo);
+
+ *pundo = undo;
+
+ uint16_t offset = trx_undo_header_create(block, trx->id, mtr);
+
+ trx_undo_mem_init_for_reuse(undo, trx->id, &trx->xid, offset);
+
+ if (trx->dict_operation) {
+ undo->dict_operation = TRUE;
+ mtr->write<1,mtr_t::MAYBE_NOP>(*block,
+ block->page.frame + offset
+ + TRX_UNDO_DICT_TRANS, 1U);
+ mtr->write<8,mtr_t::MAYBE_NOP>(*block,
+ block->page.frame + offset
+ + TRX_UNDO_TABLE_ID, 0U);
+ }
+
+ return block;
+}
+
+/** Assign an undo log for a persistent transaction.
+A new undo log is created or a cached undo log reused.
+@param[in,out] trx transaction
+@param[out] err error code
+@param[in,out] mtr mini-transaction
+@return the undo log block
+@retval NULL on error */
+buf_block_t*
+trx_undo_assign(trx_t* trx, dberr_t* err, mtr_t* mtr)
+{
+ ut_ad(mtr->get_log_mode() == MTR_LOG_ALL);
+
+ trx_undo_t* undo = trx->rsegs.m_redo.undo;
+
+ if (undo) {
+ return buf_page_get_gen(
+ page_id_t(undo->rseg->space->id, undo->last_page_no),
+ 0, RW_X_LATCH, undo->guess_block,
+ BUF_GET, mtr, err);
+ }
+
+ *err = DB_SUCCESS;
+ trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
+
+ rseg->latch.wr_lock(SRW_LOCK_CALL);
+ buf_block_t* block = trx_undo_reuse_cached(
+ trx, rseg, &trx->rsegs.m_redo.undo, mtr, err);
+
+ if (!block) {
+ block = trx_undo_create(trx, rseg, &trx->rsegs.m_redo.undo,
+ err, mtr);
+ ut_ad(!block == (*err != DB_SUCCESS));
+ if (!block) {
+ goto func_exit;
+ }
+ }
+
+ UT_LIST_ADD_FIRST(rseg->undo_list, trx->rsegs.m_redo.undo);
+
+func_exit:
+ rseg->latch.wr_unlock();
+ return block;
+}
+
+/** Assign an undo log for a transaction.
+A new undo log is created or a cached undo log reused.
+@tparam is_temp whether this is temporary undo log
+@param[in,out] trx transaction
+@param[in] rseg rollback segment
+@param[out] undo the undo log
+@param[in,out] mtr mini-transaction
+@param[out] err error code
+@return the undo log block
+@retval nullptr on error */
+template<bool is_temp>
+buf_block_t*
+trx_undo_assign_low(trx_t *trx, trx_rseg_t *rseg, trx_undo_t **undo,
+ mtr_t *mtr, dberr_t *err)
+{
+ ut_ad(is_temp == (rseg == trx->rsegs.m_noredo.rseg));
+ ut_ad(is_temp || rseg == trx->rsegs.m_redo.rseg);
+ ut_ad(undo == (is_temp
+ ? &trx->rsegs.m_noredo.undo
+ : &trx->rsegs.m_redo.undo));
+ ut_ad(mtr->get_log_mode()
+ == (is_temp ? MTR_LOG_NO_REDO : MTR_LOG_ALL));
+
+ if (*undo) {
+ return buf_page_get_gen(
+ page_id_t(rseg->space->id, (*undo)->last_page_no),
+ 0, RW_X_LATCH, (*undo)->guess_block,
+ BUF_GET, mtr, err);
+ }
+
+ DBUG_EXECUTE_IF(
+ "ib_create_table_fail_too_many_trx",
+ *err = DB_TOO_MANY_CONCURRENT_TRXS; return NULL;
+ );
+
+ *err = DB_SUCCESS;
+ rseg->latch.wr_lock(SRW_LOCK_CALL);
+ buf_block_t* block;
+ if (is_temp) {
+ ut_ad(!UT_LIST_GET_LEN(rseg->undo_cached));
+ } else {
+ block = trx_undo_reuse_cached(trx, rseg, undo, mtr, err);
+ if (block) {
+ goto got_block;
+ }
+ }
+ block = trx_undo_create(trx, rseg, undo, err, mtr);
+ ut_ad(!block == (*err != DB_SUCCESS));
+ if (!block) {
+ goto func_exit;
+ }
+
+got_block:
+ UT_LIST_ADD_FIRST(rseg->undo_list, *undo);
+
+func_exit:
+ rseg->latch.wr_unlock();
+ return block;
+}
+
+template buf_block_t*
+trx_undo_assign_low<false>(trx_t *trx, trx_rseg_t *rseg, trx_undo_t **undo,
+ mtr_t *mtr, dberr_t *err);
+template buf_block_t*
+trx_undo_assign_low<true>(trx_t *trx, trx_rseg_t *rseg, trx_undo_t **undo,
+ mtr_t *mtr, dberr_t *err);
+
+/** Set the state of the undo log segment at a XA PREPARE or XA ROLLBACK.
+@param[in,out] trx transaction
+@param[in,out] undo undo log
+@param[in] rollback false=XA PREPARE, true=XA ROLLBACK
+@param[in,out] mtr mini-transaction
+@return undo log segment header page, x-latched */
+void trx_undo_set_state_at_prepare(trx_t *trx, trx_undo_t *undo, bool rollback,
+ mtr_t *mtr)
+{
+ ut_a(undo->id < TRX_RSEG_N_SLOTS);
+
+ buf_block_t* block = buf_page_get(
+ page_id_t(undo->rseg->space->id, undo->hdr_page_no), 0,
+ RW_X_LATCH, mtr);
+ if (UNIV_UNLIKELY(!block)) {
+ /* In case of !rollback the undo header page
+ corruption would leave the transaction object in an
+ unexpected (active) state. */
+ ut_a(rollback);
+ return;
+ }
+
+ if (rollback) {
+ ut_ad(undo->state == TRX_UNDO_PREPARED);
+ mtr->write<2>(*block, TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ + block->page.frame, TRX_UNDO_ACTIVE);
+ return;
+ }
+
+ /*------------------------------*/
+ ut_ad(undo->state == TRX_UNDO_ACTIVE);
+ undo->state = TRX_UNDO_PREPARED;
+ undo->xid = trx->xid;
+ /*------------------------------*/
+
+ mtr->write<2>(*block, TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ + block->page.frame, undo->state);
+ uint16_t offset = mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG
+ + block->page.frame);
+ mtr->write<1>(*block, block->page.frame + offset + TRX_UNDO_XID_EXISTS,
+ 1U);
+
+ trx_undo_write_xid(block, offset, undo->xid, mtr);
+}
+
+/** At shutdown, frees the undo logs of a transaction. */
+void trx_undo_free_at_shutdown(trx_t *trx)
+{
+ if (trx_undo_t*& undo = trx->rsegs.m_redo.undo) {
+ switch (undo->state) {
+ case TRX_UNDO_PREPARED:
+ break;
+ case TRX_UNDO_CACHED:
+ case TRX_UNDO_TO_PURGE:
+ ut_ad(trx_state_eq(trx,
+ TRX_STATE_COMMITTED_IN_MEMORY));
+ /* fall through */
+ case TRX_UNDO_ACTIVE:
+ /* trx_t::commit_state() assigns
+ trx->state = TRX_STATE_COMMITTED_IN_MEMORY. */
+ ut_a(!srv_was_started
+ || srv_read_only_mode
+ || srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO
+ || srv_fast_shutdown);
+ break;
+ default:
+ ut_error;
+ }
+
+ UT_LIST_REMOVE(trx->rsegs.m_redo.rseg->undo_list, undo);
+ ut_free(undo);
+ undo = NULL;
+ }
+ if (trx_undo_t*& undo = trx->rsegs.m_noredo.undo) {
+ ut_a(undo->state == TRX_UNDO_PREPARED);
+
+ UT_LIST_REMOVE(trx->rsegs.m_noredo.rseg->undo_list, undo);
+ ut_free(undo);
+ undo = NULL;
+ }
+}