summaryrefslogtreecommitdiffstats
path: root/storage/innobase/trx
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--storage/innobase/trx/trx0purge.cc247
-rw-r--r--storage/innobase/trx/trx0rseg.cc62
-rw-r--r--storage/innobase/trx/trx0trx.cc24
-rw-r--r--storage/innobase/trx/trx0undo.cc42
4 files changed, 195 insertions, 180 deletions
diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc
index cff16d9c..f32f4de5 100644
--- a/storage/innobase/trx/trx0purge.cc
+++ b/storage/innobase/trx/trx0purge.cc
@@ -56,84 +56,6 @@ purge_sys_t purge_sys;
my_bool srv_purge_view_update_only_debug;
#endif /* UNIV_DEBUG */
-/** Sentinel value */
-static const TrxUndoRsegs NullElement;
-
-/** Default constructor */
-TrxUndoRsegsIterator::TrxUndoRsegsIterator()
- : m_rsegs(NullElement), m_iter(m_rsegs.begin())
-{
-}
-
-/** Sets the next rseg to purge in purge_sys.
-Executed in the purge coordinator thread.
-@retval false when nothing is to be purged
-@retval true when purge_sys.rseg->latch was locked */
-inline bool TrxUndoRsegsIterator::set_next()
-{
- ut_ad(!purge_sys.next_stored);
- mysql_mutex_lock(&purge_sys.pq_mutex);
-
- /* Only purge consumes events from the priority queue, user
- threads only produce the events. */
-
- /* Check if there are more rsegs to process in the
- current element. */
- if (m_iter != m_rsegs.end()) {
- /* We are still processing rollback segment from
- the same transaction and so expected transaction
- number shouldn't increase. Undo the increment of
- expected commit done by caller assuming rollback
- segments from given transaction are done. */
- purge_sys.tail.trx_no = (*m_iter)->last_trx_no();
- } else if (!purge_sys.purge_queue.empty()) {
- m_rsegs = purge_sys.purge_queue.top();
- purge_sys.purge_queue.pop();
- ut_ad(purge_sys.purge_queue.empty()
- || purge_sys.purge_queue.top() != m_rsegs);
- m_iter = m_rsegs.begin();
- } else {
- /* Queue is empty, reset iterator. */
- purge_sys.rseg = NULL;
- mysql_mutex_unlock(&purge_sys.pq_mutex);
- m_rsegs = NullElement;
- m_iter = m_rsegs.begin();
- return false;
- }
-
- purge_sys.rseg = *m_iter++;
- mysql_mutex_unlock(&purge_sys.pq_mutex);
-
- /* We assume in purge of externally stored fields that space
- id is in the range of UNDO tablespace space ids */
- ut_ad(purge_sys.rseg->space->id == TRX_SYS_SPACE
- || srv_is_undo_tablespace(purge_sys.rseg->space->id));
-
- purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL);
- trx_id_t last_trx_no = purge_sys.rseg->last_trx_no();
- purge_sys.hdr_offset = purge_sys.rseg->last_offset();
- purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
-
- /* Only the purge_coordinator_task will access this object
- purge_sys.rseg_iter, or any of purge_sys.hdr_page_no,
- purge_sys.tail.
- The field purge_sys.head and purge_sys.view are modified by
- purge_sys_t::clone_end_view()
- in the purge_coordinator_task
- while holding exclusive purge_sys.latch.
- The purge_sys.view may also be modified by
- purge_sys_t::wake_if_not_active() while holding exclusive
- purge_sys.latch.
- The purge_sys.head may be read by
- purge_truncation_callback(). */
- ut_ad(last_trx_no == m_rsegs.trx_no);
- ut_a(purge_sys.hdr_page_no != FIL_NULL);
- ut_a(purge_sys.tail.trx_no <= last_trx_no);
- purge_sys.tail.trx_no = last_trx_no;
-
- return(true);
-}
-
/** Build a purge 'query' graph. The actual purge is performed by executing
this query graph.
@return own: the query graph */
@@ -345,7 +267,8 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
that is known to be corrupted. */
ut_a(flst_add_first(rseg_header, TRX_RSEG + TRX_RSEG_HISTORY, undo_page,
uint16_t(page_offset(undo_header) +
- TRX_UNDO_HISTORY_NODE), mtr) == DB_SUCCESS);
+ TRX_UNDO_HISTORY_NODE), rseg->space->free_limit,
+ mtr) == DB_SUCCESS);
mtr->write<2>(*undo_page, TRX_UNDO_SEG_HDR + TRX_UNDO_STATE +
undo_page->page.frame, undo_state);
@@ -396,9 +319,7 @@ static void trx_purge_free_segment(buf_block_t *rseg_hdr, buf_block_t *block,
void purge_sys_t::rseg_enable(trx_rseg_t &rseg)
{
ut_ad(this == &purge_sys);
-#ifndef SUX_LOCK_GENERIC
- ut_ad(rseg.latch.is_write_locked());
-#endif
+ ut_ad(rseg.latch.have_wr());
uint8_t skipped= skipped_rseg;
ut_ad(skipped < TRX_SYS_N_RSEGS);
if (&rseg == &trx_sys.rseg_array[skipped])
@@ -437,6 +358,19 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
mtr_t mtr;
bool freed= false;
uint32_t rseg_ref= 0;
+ const auto last_boffset= srv_page_size - TRX_UNDO_LOG_OLD_HDR_SIZE;
+ /* Technically, rseg.space->free_limit is not protected by
+ rseg.latch, which we are holding, but rseg.space->latch. The value
+ that we are reading may become stale (too small) if other pages are
+ being allocated in this tablespace, for other rollback
+ segments. Nothing can be added to this rseg without holding
+ rseg.latch, and hence we can validate the entire file-based list
+ against the limit that we are reading here.
+
+ Note: The read here may look like a data race. On none of our target
+ architectures this should be an actual problem, because the uint32_t
+ value should always fit in a register and be correctly aligned. */
+ const auto last_page= rseg.space->free_limit;
mtr.start();
@@ -452,13 +386,23 @@ func_exit:
}
hdr_addr= flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY + rseg_hdr->page.frame);
- hdr_addr.boffset= static_cast<uint16_t>(hdr_addr.boffset -
- TRX_UNDO_HISTORY_NODE);
-loop:
if (hdr_addr.page == FIL_NULL)
goto func_exit;
+ if (hdr_addr.page >= last_page ||
+ hdr_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
+ hdr_addr.boffset >= last_boffset)
+ {
+ corrupted:
+ err= DB_CORRUPTION;
+ goto func_exit;
+ }
+
+ hdr_addr.boffset= static_cast<uint16_t>(hdr_addr.boffset -
+ TRX_UNDO_HISTORY_NODE);
+
+loop:
buf_block_t *b=
buf_page_get_gen(page_id_t(rseg.space->id, hdr_addr.page),
0, RW_X_LATCH, nullptr, BUF_GET_POSSIBLY_FREED,
@@ -507,11 +451,18 @@ loop:
fil_addr_t prev_hdr_addr=
flst_get_prev_addr(b->page.frame + hdr_addr.boffset +
TRX_UNDO_HISTORY_NODE);
+ if (prev_hdr_addr.page == FIL_NULL);
+ else if (prev_hdr_addr.page >= last_page ||
+ prev_hdr_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
+ prev_hdr_addr.boffset >= last_boffset)
+ goto corrupted;
+
prev_hdr_addr.boffset= static_cast<uint16_t>(prev_hdr_addr.boffset -
TRX_UNDO_HISTORY_NODE);
err= flst_remove(rseg_hdr, TRX_RSEG + TRX_RSEG_HISTORY, b,
- uint16_t(hdr_addr.boffset + TRX_UNDO_HISTORY_NODE), &mtr);
+ uint16_t(hdr_addr.boffset + TRX_UNDO_HISTORY_NODE),
+ last_page, &mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS))
goto func_exit;
@@ -571,45 +522,21 @@ loop:
ut_ad(rseg_hdr->page.id() == rseg.page_id());
mtr.memo_push(rseg_hdr, MTR_MEMO_PAGE_X_FIX);
+ if (hdr_addr.page == FIL_NULL)
+ goto func_exit;
+
goto loop;
}
-/** Cleanse purge queue to remove the rseg that reside in undo-tablespace
-marked for truncate.
-@param[in] space undo tablespace being truncated */
-static void trx_purge_cleanse_purge_queue(const fil_space_t& space)
+void purge_sys_t::cleanse_purge_queue(const fil_space_t &space)
{
- typedef std::vector<TrxUndoRsegs> purge_elem_list_t;
- purge_elem_list_t purge_elem_list;
-
- mysql_mutex_lock(&purge_sys.pq_mutex);
-
- /* Remove rseg instances that are in the purge queue before we start
- truncate of corresponding UNDO truncate. */
- while (!purge_sys.purge_queue.empty()) {
- purge_elem_list.push_back(purge_sys.purge_queue.top());
- purge_sys.purge_queue.pop();
- }
-
- for (purge_elem_list_t::iterator it = purge_elem_list.begin();
- it != purge_elem_list.end();
- ++it) {
-
- for (TrxUndoRsegs::iterator it2 = it->begin();
- it2 != it->end();
- ++it2) {
- if ((*it2)->space == &space) {
- it->erase(it2);
- break;
- }
- }
-
- if (!it->empty()) {
- purge_sys.purge_queue.push(*it);
- }
- }
-
- mysql_mutex_unlock(&purge_sys.pq_mutex);
+ mysql_mutex_lock(&pq_mutex);
+ auto purge_elem_list= clone_queue_container();
+ purge_queue.clear();
+ for (auto elem : purge_elem_list)
+ if (purge_queue::rseg(elem)->space != &space)
+ purge_queue.push_trx_no_rseg(elem);
+ mysql_mutex_unlock(&pq_mutex);
}
dberr_t purge_sys_t::iterator::free_history() const
@@ -672,7 +599,9 @@ fil_space_t *purge_sys_t::truncating_tablespace()
if (space || srv_undo_tablespaces_active < 2 || !srv_undo_log_truncate)
return space;
- const uint32_t size= uint32_t(srv_max_undo_log_size >> srv_page_size_shift);
+ const uint32_t size=
+ uint32_t(std::min(ulonglong{std::numeric_limits<uint32_t>::max()},
+ srv_max_undo_log_size >> srv_page_size_shift));
for (uint32_t i= truncate_undo_space.last, j= i;; )
{
if (fil_space_t *s= undo_truncate_try(srv_undo_space_id_start + i, size))
@@ -751,7 +680,7 @@ not_free:
const char *file_name= UT_LIST_GET_FIRST(space->chain)->name;
sql_print_information("InnoDB: Truncating %s", file_name);
- trx_purge_cleanse_purge_queue(*space);
+ purge_sys.cleanse_purge_queue(*space);
/* Lock all modified pages of the tablespace.
@@ -870,13 +799,11 @@ buf_block_t *purge_sys_t::get_page(page_id_t id)
return nullptr;
}
-void purge_sys_t::rseg_get_next_history_log()
+bool purge_sys_t::rseg_get_next_history_log()
{
fil_addr_t prev_log_addr;
-#ifndef SUX_LOCK_GENERIC
- ut_ad(rseg->latch.is_write_locked());
-#endif
+ ut_ad(rseg->latch.have_wr());
ut_a(rseg->last_page_no != FIL_NULL);
tail.trx_no= rseg->last_trx_no() + 1;
@@ -888,21 +815,24 @@ void purge_sys_t::rseg_get_next_history_log()
{
const byte *log_hdr= undo_page->page.frame + rseg->last_offset();
prev_log_addr= flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE);
+ if (prev_log_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
+ prev_log_addr.boffset >= srv_page_size - TRX_UNDO_LOG_OLD_HDR_SIZE)
+ goto corrupted;
prev_log_addr.boffset = static_cast<uint16_t>(prev_log_addr.boffset -
TRX_UNDO_HISTORY_NODE);
}
else
- prev_log_addr.page= FIL_NULL;
+ goto corrupted;
- if (prev_log_addr.page == FIL_NULL)
+ if (prev_log_addr.page >= rseg->space->free_limit)
+ corrupted:
rseg->last_page_no= FIL_NULL;
else
{
/* Read the previous log header. */
trx_id_t trx_no= 0;
if (const buf_block_t* undo_page=
- get_page(page_id_t(rseg->space->id,
- prev_log_addr.page)))
+ get_page(page_id_t(rseg->space->id, prev_log_addr.page)))
{
const byte *log_hdr= undo_page->page.frame + prev_log_addr.boffset;
trx_no= mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
@@ -920,12 +850,13 @@ void purge_sys_t::rseg_get_next_history_log()
can never produce events from an empty rollback segment. */
mysql_mutex_lock(&pq_mutex);
- purge_queue.push(*rseg);
+ enqueue(*rseg);
mysql_mutex_unlock(&pq_mutex);
}
}
rseg->latch.wr_unlock();
+ return choose_next_log();
}
/** Position the purge sys "iterator" on the undo record to use for purging.
@@ -933,11 +864,37 @@ void purge_sys_t::rseg_get_next_history_log()
@retval true when purge_sys.rseg->latch was locked */
bool purge_sys_t::choose_next_log()
{
- if (!rseg_iter.set_next())
- return false;
+ ut_ad(!next_stored);
- hdr_offset= rseg->last_offset();
- hdr_page_no= rseg->last_page_no;
+ mysql_mutex_lock(&pq_mutex);
+ if (purge_queue.empty()) {
+ rseg = nullptr;
+ mysql_mutex_unlock(&purge_sys.pq_mutex);
+ return false;
+ }
+ rseg= purge_queue.pop();
+ mysql_mutex_unlock(&purge_sys.pq_mutex);
+
+ /* We assume in purge of externally stored fields that space
+ id is in the range of UNDO tablespace space ids */
+ ut_ad(rseg->space == fil_system.sys_space ||
+ srv_is_undo_tablespace(rseg->space->id));
+
+ rseg->latch.wr_lock(SRW_LOCK_CALL);
+ trx_id_t last_trx_no = rseg->last_trx_no();
+ hdr_offset = rseg->last_offset();
+ hdr_page_no = rseg->last_page_no;
+
+ /* Only the purge_coordinator_task will access this any of
+ purge_sys.hdr_page_no, purge_sys.tail. The field purge_sys.head and
+ purge_sys.view are modified by clone_end_view() in the
+ purge_coordinator_task while holding exclusive purge_sys.latch. The
+ purge_sys.view may also be modified by wake_if_not_active() while holding
+ exclusive purge_sys.latch. The purge_sys.head may be read by
+ purge_truncation_callback(). */
+ ut_a(hdr_page_no != FIL_NULL);
+ ut_a(tail.trx_no <= last_trx_no);
+ tail.trx_no = last_trx_no;
if (!rseg->needs_purge)
{
@@ -968,7 +925,7 @@ bool purge_sys_t::choose_next_log()
if (!b)
goto purge_nothing;
undo_rec=
- trx_undo_page_get_first_rec(b, page_no, hdr_offset);
+ trx_undo_page_get_first_rec(b, hdr_page_no, hdr_offset);
if (!undo_rec)
goto purge_nothing;
}
@@ -992,18 +949,13 @@ inline trx_purge_rec_t purge_sys_t::get_next_rec(roll_ptr_t roll_ptr)
{
ut_ad(next_stored);
ut_ad(tail.trx_no < low_limit_no());
-#ifndef SUX_LOCK_GENERIC
- ut_ad(rseg->latch.is_write_locked());
-#endif
+ ut_ad(rseg->latch.have_wr());
if (!offset)
{
- /* It is the dummy undo log record, which means that there is no
- need to purge this undo log */
- rseg_get_next_history_log();
-
- /* Look for the next undo log and record to purge */
- if (choose_next_log())
+ /* It is the dummy undo log record, which means that there is no need to
+ purge this undo log. Look for the next undo log and record to purge */
+ if (rseg_get_next_history_log())
rseg->latch.wr_unlock();
return {nullptr, 1};
}
@@ -1051,9 +1003,8 @@ inline trx_purge_rec_t purge_sys_t::get_next_rec(roll_ptr_t roll_ptr)
else
{
got_no_rec:
- rseg_get_next_history_log();
/* Look for the next undo log and record to purge */
- locked= choose_next_log();
+ locked= rseg_get_next_history_log();
}
if (locked)
diff --git a/storage/innobase/trx/trx0rseg.cc b/storage/innobase/trx/trx0rseg.cc
index 87a2ac7b..964dca94 100644
--- a/storage/innobase/trx/trx0rseg.cc
+++ b/storage/innobase/trx/trx0rseg.cc
@@ -201,7 +201,7 @@ bool trx_rseg_read_wsrep_checkpoint(const buf_block_t *rseg_header, XID &xid)
memcpy(xid.data, TRX_RSEG + TRX_RSEG_WSREP_XID_DATA
+ rseg_header->page.frame, XIDDATASIZE);
- return true;
+ return wsrep_is_wsrep_xid(&xid);
}
/** Read the WSREP XID from the TRX_SYS page (in case of upgrade).
@@ -210,6 +210,11 @@ bool trx_rseg_read_wsrep_checkpoint(const buf_block_t *rseg_header, XID &xid)
@return whether the WSREP XID is present */
static bool trx_rseg_init_wsrep_xid(const page_t* page, XID& xid)
{
+ if (memcmp(TRX_SYS + TRX_SYS_WSREP_XID_INFO + page,
+ field_ref_zero, TRX_SYS_WSREP_XID_LEN) == 0) {
+ return false;
+ }
+
if (mach_read_from_4(TRX_SYS + TRX_SYS_WSREP_XID_INFO
+ TRX_SYS_WSREP_XID_MAGIC_N_FLD
+ page)
@@ -232,7 +237,8 @@ static bool trx_rseg_init_wsrep_xid(const page_t* page, XID& xid)
memcpy(xid.data,
TRX_SYS + TRX_SYS_WSREP_XID_INFO
+ TRX_SYS_WSREP_XID_DATA + page, XIDDATASIZE);
- return true;
+
+ return wsrep_is_wsrep_xid(&xid);
}
/** Recover the latest WSREP checkpoint XID.
@@ -448,7 +454,14 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
{
if (!rseg->space)
return DB_TABLESPACE_NOT_FOUND;
+
+ /* Access the tablespace header page to recover rseg->space->free_limit */
+ page_id_t page_id{rseg->space->id, 0};
dberr_t err;
+ if (!buf_page_get_gen(page_id, 0, RW_S_LATCH, nullptr, BUF_GET, mtr, &err))
+ return err;
+ mtr->release_last_page();
+ page_id.set_page_no(rseg->page_no);
const buf_block_t *rseg_hdr=
buf_page_get_gen(rseg->page_id(), 0, RW_S_LATCH, nullptr, BUF_GET, mtr,
&err);
@@ -493,10 +506,17 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
trx_sys.recovered_binlog_offset= binlog_offset;
trx_sys.recovered_binlog_is_legacy_pos= false;
}
+ }
#ifdef WITH_WSREP
- trx_rseg_read_wsrep_checkpoint(rseg_hdr, trx_sys.recovered_wsrep_xid);
+ XID tmp_xid;
+ tmp_xid.null();
+ /* Update recovered wsrep xid only if we found wsrep xid from
+ rseg header page and read xid seqno is larger than currently
+ recovered xid seqno. */
+ if (trx_rseg_read_wsrep_checkpoint(rseg_hdr, tmp_xid) &&
+ wsrep_xid_seqno(&tmp_xid) > wsrep_xid_seqno(&trx_sys.recovered_wsrep_xid))
+ trx_sys.recovered_wsrep_xid.set(&tmp_xid);
#endif
- }
}
if (srv_operation == SRV_OPERATION_RESTORE)
@@ -518,6 +538,11 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
fil_addr_t node_addr= flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY +
rseg_hdr->page.frame);
+ if (node_addr.page >= rseg->space->free_limit ||
+ node_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
+ node_addr.boffset >= srv_page_size - TRX_UNDO_LOG_OLD_HDR_SIZE)
+ return DB_CORRUPTION;
+
node_addr.boffset= static_cast<uint16_t>(node_addr.boffset -
TRX_UNDO_HISTORY_NODE);
rseg->last_page_no= node_addr.page;
@@ -544,7 +569,7 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
if (rseg->last_page_no != FIL_NULL)
/* There is no need to cover this operation by the purge
mutex because we are still bootstrapping. */
- purge_sys.purge_queue.push(*rseg);
+ purge_sys.enqueue(*rseg);
}
trx_sys.set_undo_non_empty(rseg->history_size > 0);
@@ -567,10 +592,6 @@ static void trx_rseg_init_binlog_info(const page_t* page)
+ TRX_SYS + page);
trx_sys.recovered_binlog_is_legacy_pos= true;
}
-
-#ifdef WITH_WSREP
- trx_rseg_init_wsrep_xid(page, trx_sys.recovered_wsrep_xid);
-#endif
}
/** Initialize or recover the rollback segments at startup. */
@@ -589,7 +610,17 @@ dberr_t trx_rseg_array_init()
#endif
mtr_t mtr;
dberr_t err = DB_SUCCESS;
-
+ /* mariabackup --prepare only deals with the redo log and the data
+ files, not with transactions or the data dictionary, that's why
+ trx_lists_init_at_db_start() does not invoke purge_sys.create() and
+ purge queue mutex stays uninitialized, and trx_rseg_mem_restore() quits
+ before initializing undo log lists. */
+ if (srv_operation != SRV_OPERATION_RESTORE)
+ /* Acquiring purge queue mutex here should be fine from the
+ deadlock prevention point of view, because executing that
+ function is a prerequisite for starting the purge subsystem or
+ any transactions. */
+ purge_sys.queue_lock();
for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) {
mtr.start();
if (const buf_block_t* sys = trx_sysf_get(&mtr, false)) {
@@ -602,7 +633,11 @@ dberr_t trx_rseg_array_init()
+ sys->page.frame);
trx_rseg_init_binlog_info(sys->page.frame);
#ifdef WITH_WSREP
- wsrep_sys_xid.set(&trx_sys.recovered_wsrep_xid);
+ if (trx_rseg_init_wsrep_xid(
+ sys->page.frame, trx_sys.recovered_wsrep_xid)) {
+ wsrep_sys_xid.set(
+ &trx_sys.recovered_wsrep_xid);
+ }
#endif
}
@@ -655,7 +690,8 @@ dberr_t trx_rseg_array_init()
mtr.commit();
}
-
+ if (srv_operation != SRV_OPERATION_RESTORE)
+ purge_sys.queue_unlock();
if (err != DB_SUCCESS) {
for (auto& rseg : trx_sys.rseg_array) {
while (auto u = UT_LIST_GET_FIRST(rseg.undo_list)) {
@@ -667,7 +703,7 @@ dberr_t trx_rseg_array_init()
}
#ifdef WITH_WSREP
- if (!wsrep_sys_xid.is_null()) {
+ if (srv_operation == SRV_OPERATION_NORMAL && !wsrep_sys_xid.is_null()) {
/* Upgrade from a version prior to 10.3.5,
where WSREP XID was stored in TRX_SYS page.
If no rollback segment has a WSREP XID set,
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 942b8bd4..1d22b853 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -412,12 +412,12 @@ void trx_t::free()
#endif
read_view.mem_noaccess();
MEM_NOACCESS(&lock, sizeof lock);
- MEM_NOACCESS(&op_info, sizeof op_info);
- MEM_NOACCESS(&isolation_level, sizeof isolation_level);
- MEM_NOACCESS(&check_foreigns, sizeof check_foreigns);
+ MEM_NOACCESS(&op_info, sizeof op_info +
+ sizeof(unsigned) /* isolation_level, snapshot_isolation,
+ check_foreigns, check_unique_secondary,
+ bulk_insert */);
MEM_NOACCESS(&is_registered, sizeof is_registered);
MEM_NOACCESS(&active_commit_ordered, sizeof active_commit_ordered);
- MEM_NOACCESS(&check_unique_secondary, sizeof check_unique_secondary);
MEM_NOACCESS(&flush_log_later, sizeof flush_log_later);
MEM_NOACCESS(&duplicates, sizeof duplicates);
MEM_NOACCESS(&dict_operation, sizeof dict_operation);
@@ -1142,15 +1142,23 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
}
else if (rseg->last_page_no == FIL_NULL)
{
- mysql_mutex_lock(&purge_sys.pq_mutex);
+ /* trx_sys.assign_new_trx_no() and
+ purge_sys.enqueue() must be invoked in the same
+ critical section protected with purge queue mutex to avoid rseg with
+ greater last commit number to be pushed to purge queue prior to rseg with
+ lesser last commit number. In other words pushing to purge queue must be
+ serialized along with assigning trx_no. Otherwise purge coordinator
+ thread can also fetch redo log records from rseg with greater last commit
+ number before rseg with lesser one. */
+ purge_sys.queue_lock();
trx_sys.assign_new_trx_no(this);
const trx_id_t end{rw_trx_hash_element->no};
+ rseg->last_page_no= undo->hdr_page_no;
/* end cannot be less than anything in rseg. User threads only
produce events when a rollback segment is empty. */
- purge_sys.purge_queue.push(TrxUndoRsegs{end, *rseg});
- mysql_mutex_unlock(&purge_sys.pq_mutex);
- rseg->last_page_no= undo->hdr_page_no;
rseg->set_last_commit(undo->hdr_offset, end);
+ purge_sys.enqueue(end, *rseg);
+ purge_sys.queue_unlock();
}
else
trx_sys.assign_new_trx_no(this);
diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc
index ccc68dfe..c0f5b1fb 100644
--- a/storage/innobase/trx/trx0undo.cc
+++ b/storage/innobase/trx/trx0undo.cc
@@ -134,8 +134,9 @@ trx_undo_page_get_first_rec(const buf_block_t *block, uint32_t page_no,
uint16_t offset)
{
uint16_t start= trx_undo_page_get_start(block, page_no, offset);
- return start == trx_undo_page_get_end(block, page_no, offset)
- ? nullptr : block->page.frame + start;
+ uint16_t end= trx_undo_page_get_end(block, page_no, offset);
+ ut_ad(start <= end);
+ return start >= end ? nullptr : block->page.frame + start;
}
/** Get the last undo log record on a page.
@@ -149,8 +150,10 @@ trx_undo_rec_t*
trx_undo_page_get_last_rec(const buf_block_t *block, uint32_t page_no,
uint16_t offset)
{
+ uint16_t start= trx_undo_page_get_start(block, page_no, offset);
uint16_t end= trx_undo_page_get_end(block, page_no, offset);
- return trx_undo_page_get_start(block, page_no, offset) == end
+ ut_ad(start <= end);
+ return start >= end
? nullptr
: block->page.frame + mach_read_from_2(block->page.frame + end - 2);
}
@@ -510,7 +513,7 @@ trx_undo_seg_create(fil_space_t *space, buf_block_t *rseg_hdr, ulint *id,
*err = flst_add_last(block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
- mtr);
+ space->free_limit, mtr);
*id = slot_no;
mtr->write<4>(*rseg_hdr, TRX_RSEG + TRX_RSEG_UNDO_SLOTS
@@ -693,7 +696,8 @@ buf_block_t *trx_undo_add_page(trx_undo_t *undo, mtr_t *mtr, dberr_t *err)
mtr->undo_create(*new_block);
trx_undo_page_init(*new_block);
*err= flst_add_last(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
- new_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE, mtr);
+ new_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
+ rseg->space->free_limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS))
new_block= nullptr;
else
@@ -744,9 +748,11 @@ trx_undo_free_page(
buf_page_make_young_if_needed(&header_block->page);
+ const uint32_t limit = rseg->space->free_limit;
+
*err = flst_remove(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
undo_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
- mtr);
+ limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
return FIL_NULL;
@@ -755,7 +761,13 @@ trx_undo_free_page(
const fil_addr_t last_addr = flst_get_last(
TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST
+ header_block->page.frame);
- if (UNIV_UNLIKELY(last_addr.page == page_no)) {
+ if (UNIV_UNLIKELY(last_addr.page == page_no)
+ || UNIV_UNLIKELY(last_addr.page != FIL_NULL
+ && last_addr.page >= limit)
+ || UNIV_UNLIKELY(last_addr.boffset < TRX_UNDO_PAGE_HDR
+ + TRX_UNDO_PAGE_NODE)
+ || UNIV_UNLIKELY(last_addr.boffset >= srv_page_size
+ - TRX_UNDO_LOG_OLD_HDR_SIZE)) {
*err = DB_CORRUPTION;
return FIL_NULL;
}
@@ -972,8 +984,8 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no)
ut_ad(id < TRX_RSEG_N_SLOTS);
mtr.start();
- const buf_block_t* block = buf_page_get(
- page_id_t(rseg->space->id, page_no), 0, RW_X_LATCH, &mtr);
+ const page_id_t page_id{rseg->space->id, page_no};
+ const buf_block_t* block = buf_page_get(page_id, 0, RW_X_LATCH, &mtr);
if (UNIV_UNLIKELY(!block)) {
corrupted:
mtr.commit();
@@ -1075,6 +1087,15 @@ corrupted_type:
fil_addr_t last_addr = flst_get_last(
TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->page.frame);
+ if (last_addr.page >= rseg->space->free_limit
+ || last_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE
+ || last_addr.boffset >= srv_page_size
+ - TRX_UNDO_LOG_OLD_HDR_SIZE) {
+ corrupted_undo:
+ ut_free(undo);
+ goto corrupted;
+ }
+
undo->last_page_no = last_addr.page;
undo->top_page_no = last_addr.page;
@@ -1083,8 +1104,7 @@ corrupted_type:
RW_X_LATCH, &mtr);
if (UNIV_UNLIKELY(!last)) {
- ut_free(undo);
- goto corrupted;
+ goto corrupted_undo;
}
if (const trx_undo_rec_t* rec = trx_undo_page_get_last_rec(