diff options
Diffstat (limited to '')
-rw-r--r-- | storage/innobase/trx/trx0purge.cc | 1297 |
1 files changed, 1297 insertions, 0 deletions
diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc new file mode 100644 index 00000000..28491853 --- /dev/null +++ b/storage/innobase/trx/trx0purge.cc @@ -0,0 +1,1297 @@ +/***************************************************************************** + +Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2017, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file trx/trx0purge.cc +Purge old versions + +Created 3/26/1996 Heikki Tuuri +*******************************************************/ + +#include "trx0purge.h" +#include "fsp0fsp.h" +#include "fut0fut.h" +#include "mach0data.h" +#include "mtr0log.h" +#include "os0thread.h" +#include "que0que.h" +#include "row0purge.h" +#include "row0upd.h" +#include "srv0mon.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "sync0sync.h" +#include "trx0rec.h" +#include "trx0roll.h" +#include "trx0rseg.h" +#include "trx0trx.h" +#include <mysql/service_wsrep.h> + +#include <unordered_map> + +/** Maximum allowable purge history length. <=0 means 'infinite'. */ +ulong srv_max_purge_lag = 0; + +/** Max DML user threads delay in micro-seconds. */ +ulong srv_max_purge_lag_delay = 0; + +/** The global data structure coordinating a purge */ +purge_sys_t purge_sys; + +/** A dummy undo record used as a return value when we have a whole undo log +which needs no purge */ +trx_undo_rec_t trx_purge_dummy_rec; + +#ifdef UNIV_DEBUG +my_bool srv_purge_view_update_only_debug; +#endif /* UNIV_DEBUG */ + +/** Sentinel value */ +static const TrxUndoRsegs NullElement; + +/** Default constructor */ +TrxUndoRsegsIterator::TrxUndoRsegsIterator() + : m_rsegs(NullElement), m_iter(m_rsegs.begin()) +{ +} + +/** Sets the next rseg to purge in purge_sys. +Executed in the purge coordinator thread. +@return whether anything is to be purged */ +inline bool TrxUndoRsegsIterator::set_next() +{ + mutex_enter(&purge_sys.pq_mutex); + + /* Only purge consumes events from the priority queue, user + threads only produce the events. */ + + /* Check if there are more rsegs to process in the + current element. */ + if (m_iter != m_rsegs.end()) { + /* We are still processing rollback segment from + the same transaction and so expected transaction + number shouldn't increase. Undo the increment of + expected commit done by caller assuming rollback + segments from given transaction are done. */ + purge_sys.tail.trx_no = (*m_iter)->last_trx_no(); + } else if (!purge_sys.purge_queue.empty()) { + m_rsegs = purge_sys.purge_queue.top(); + purge_sys.purge_queue.pop(); + ut_ad(purge_sys.purge_queue.empty() + || purge_sys.purge_queue.top() != m_rsegs); + m_iter = m_rsegs.begin(); + } else { + /* Queue is empty, reset iterator. */ + purge_sys.rseg = NULL; + mutex_exit(&purge_sys.pq_mutex); + m_rsegs = NullElement; + m_iter = m_rsegs.begin(); + return false; + } + + purge_sys.rseg = *m_iter++; + mutex_exit(&purge_sys.pq_mutex); + mutex_enter(&purge_sys.rseg->mutex); + + ut_a(purge_sys.rseg->last_page_no != FIL_NULL); + ut_ad(purge_sys.rseg->last_trx_no() == m_rsegs.trx_no); + + /* We assume in purge of externally stored fields that space id is + in the range of UNDO tablespace space ids */ + ut_ad(purge_sys.rseg->space->id == TRX_SYS_SPACE + || srv_is_undo_tablespace(purge_sys.rseg->space->id)); + + ut_a(purge_sys.tail.trx_no <= purge_sys.rseg->last_trx_no()); + + purge_sys.tail.trx_no = purge_sys.rseg->last_trx_no(); + purge_sys.hdr_offset = purge_sys.rseg->last_offset(); + purge_sys.hdr_page_no = purge_sys.rseg->last_page_no; + + mutex_exit(&purge_sys.rseg->mutex); + + return(true); +} + +/** Build a purge 'query' graph. The actual purge is performed by executing +this query graph. +@return own: the query graph */ +static +que_t* +purge_graph_build() +{ + ut_a(srv_n_purge_threads > 0); + + trx_t* trx = trx_create(); + ut_ad(!trx->id); + trx->start_time = time(NULL); + trx->start_time_micro = microsecond_interval_timer(); + trx->state = TRX_STATE_ACTIVE; + trx->op_info = "purge trx"; + + mem_heap_t* heap = mem_heap_create(512); + que_fork_t* fork = que_fork_create( + NULL, NULL, QUE_FORK_PURGE, heap); + fork->trx = trx; + + for (auto i = innodb_purge_threads_MAX; i; i--) { + que_thr_t* thr = que_thr_create(fork, heap, NULL); + thr->child = new(mem_heap_alloc(heap, sizeof(purge_node_t))) + purge_node_t(thr); + } + + return(fork); +} + +/** Initialise the purge system. */ +void purge_sys_t::create() +{ + ut_ad(this == &purge_sys); + ut_ad(!heap); + ut_ad(!enabled()); + m_paused= 0; + query= purge_graph_build(); + next_stored= false; + rseg= NULL; + page_no= 0; + offset= 0; + hdr_page_no= 0; + hdr_offset= 0; + rw_lock_create(trx_purge_latch_key, &latch, SYNC_PURGE_LATCH); + mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex); + truncate.current= NULL; + truncate.last= NULL; + heap= mem_heap_create(4096); +} + +/** Close the purge subsystem on shutdown. */ +void purge_sys_t::close() +{ + ut_ad(this == &purge_sys); + if (!heap) + return; + + ut_ad(!enabled()); + trx_t* trx = query->trx; + que_graph_free(query); + ut_ad(!trx->id); + ut_ad(trx->state == TRX_STATE_ACTIVE); + trx->state= TRX_STATE_NOT_STARTED; + trx->free(); + rw_lock_free(&latch); + mutex_free(&pq_mutex); + mem_heap_free(heap); + heap= nullptr; +} + +/*================ UNDO LOG HISTORY LIST =============================*/ + +/** Prepend the history list with an undo log. +Remove the undo log segment from the rseg slot if it is too big for reuse. +@param[in] trx transaction +@param[in,out] undo undo log +@param[in,out] mtr mini-transaction */ +void +trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) +{ + DBUG_PRINT("trx", ("commit(" TRX_ID_FMT "," TRX_ID_FMT ")", + trx->id, trx_id_t{trx->rw_trx_hash_element->no})); + ut_ad(undo == trx->rsegs.m_redo.undo); + trx_rseg_t* rseg = trx->rsegs.m_redo.rseg; + ut_ad(undo->rseg == rseg); + buf_block_t* rseg_header = trx_rsegf_get( + rseg->space, rseg->page_no, mtr); + buf_block_t* undo_page = trx_undo_set_state_at_finish( + undo, mtr); + trx_ulogf_t* undo_header = undo_page->frame + undo->hdr_offset; + + ut_ad(mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE) <= 1); + + if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + + rseg_header->frame))) { + /* This database must have been upgraded from + before MariaDB 10.3.5. */ + trx_rseg_format_upgrade(rseg_header, mtr); + } + + if (undo->state != TRX_UNDO_CACHED) { + /* The undo log segment will not be reused */ + ut_a(undo->id < TRX_RSEG_N_SLOTS); + compile_time_assert(FIL_NULL == 0xffffffff); + mtr->memset(rseg_header, + TRX_RSEG + TRX_RSEG_UNDO_SLOTS + + undo->id * TRX_RSEG_SLOT_SIZE, 4, 0xff); + + MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED); + + uint32_t hist_size = mach_read_from_4(TRX_RSEG_HISTORY_SIZE + + TRX_RSEG + + rseg_header->frame); + + ut_ad(undo->size == flst_get_len(TRX_UNDO_SEG_HDR + + TRX_UNDO_PAGE_LIST + + undo_page->frame)); + + mtr->write<4>(*rseg_header, TRX_RSEG + TRX_RSEG_HISTORY_SIZE + + rseg_header->frame, + hist_size + undo->size); + mtr->write<8>(*rseg_header, TRX_RSEG + TRX_RSEG_MAX_TRX_ID + + rseg_header->frame, + trx_sys.get_max_trx_id()); + } + + /* After the purge thread has been given permission to exit, + we may roll back transactions (trx->undo_no==0) + in THD::cleanup() invoked from unlink_thd() in fast shutdown, + or in trx_rollback_recovered() in slow shutdown. + + Before any transaction-generating background threads or the + purge have been started, we can + start transactions in row_merge_drop_temp_indexes() and + fts_drop_orphaned_tables(), and roll back recovered transactions. + + Arbitrary user transactions may be executed when all the undo log + related background processes (including purge) are disabled due to + innodb_force_recovery=2 or innodb_force_recovery=3. + DROP TABLE may be executed at any innodb_force_recovery level. + + During fast shutdown, we may also continue to execute + user transactions. */ + ut_ad(srv_undo_sources + || trx->undo_no == 0 + || (!purge_sys.enabled() + && (srv_is_being_started + || trx_rollback_is_active + || srv_force_recovery >= SRV_FORCE_NO_BACKGROUND)) + || ((trx->mysql_thd || trx->internal) + && srv_fast_shutdown)); + +#ifdef WITH_WSREP + if (wsrep_is_wsrep_xid(trx->xid)) { + trx_rseg_update_wsrep_checkpoint(rseg_header, trx->xid, mtr); + } +#endif + + if (trx->mysql_log_file_name && *trx->mysql_log_file_name) { + /* Update the latest MySQL binlog name and offset info + in rollback segment header if MySQL binlogging is on + or the database server is a MySQL replication save. */ + trx_rseg_update_binlog_offset(rseg_header, trx, mtr); + } + + /* Add the log as the first in the history list */ + flst_add_first(rseg_header, TRX_RSEG + TRX_RSEG_HISTORY, undo_page, + static_cast<uint16_t>(undo->hdr_offset + + TRX_UNDO_HISTORY_NODE), mtr); + + mtr->write<8,mtr_t::MAYBE_NOP>(*undo_page, + undo_header + TRX_UNDO_TRX_NO, + trx->rw_trx_hash_element->no); + mtr->write<2,mtr_t::MAYBE_NOP>(*undo_page, undo_header + + TRX_UNDO_NEEDS_PURGE, 1U); + + if (rseg->last_page_no == FIL_NULL) { + rseg->last_page_no = undo->hdr_page_no; + rseg->set_last_commit(undo->hdr_offset, + trx->rw_trx_hash_element->no); + rseg->needs_purge = true; + } + + trx_sys.rseg_history_len++; + + if (undo->state == TRX_UNDO_CACHED) { + UT_LIST_ADD_FIRST(rseg->undo_cached, undo); + MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED); + } else { + ut_ad(undo->state == TRX_UNDO_TO_PURGE); + ut_free(undo); + } + + undo = NULL; +} + +/** Remove undo log header from the history list. +@param[in,out] rseg rollback segment header page +@param[in] log undo log segment header page +@param[in] offset byte offset in the undo log segment header page +@param[in,out] mtr mini-transaction */ +static void trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log, + uint16_t offset, mtr_t *mtr) +{ + flst_remove(rseg, TRX_RSEG + TRX_RSEG_HISTORY, + log, static_cast<uint16_t>(offset + TRX_UNDO_HISTORY_NODE), mtr); + trx_sys.rseg_history_len--; +} + +/** Free an undo log segment, and remove the header from the history list. +@param[in,out] rseg rollback segment +@param[in] hdr_addr file address of log_hdr */ +static +void +trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) +{ + mtr_t mtr; + + mtr.start(); + mutex_enter(&rseg->mutex); + + buf_block_t* rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr); + buf_block_t* block = trx_undo_page_get( + page_id_t(rseg->space->id, hdr_addr.page), &mtr); + + /* Mark the last undo log totally purged, so that if the + system crashes, the tail of the undo log will not get accessed + again. The list of pages in the undo log tail gets + inconsistent during the freeing of the segment, and therefore + purge should not try to access them again. */ + mtr.write<2,mtr_t::MAYBE_NOP>(*block, block->frame + hdr_addr.boffset + + TRX_UNDO_NEEDS_PURGE, 0U); + + while (!fseg_free_step_not_header( + TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER + + block->frame, &mtr)) { + mutex_exit(&rseg->mutex); + + mtr.commit(); + mtr.start(); + + mutex_enter(&rseg->mutex); + + rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr); + + block = trx_undo_page_get( + page_id_t(rseg->space->id, hdr_addr.page), &mtr); + } + + /* The page list may now be inconsistent, but the length field + stored in the list base node tells us how big it was before we + started the freeing. */ + + const uint32_t seg_size = flst_get_len( + TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->frame); + + /* We may free the undo log segment header page; it must be freed + within the same mtr as the undo log header is removed from the + history list: otherwise, in case of a database crash, the segment + could become inaccessible garbage in the file space. */ + + trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, &mtr); + + do { + + /* Here we assume that a file segment with just the header + page can be freed in a few steps, so that the buffer pool + is not flooded with bufferfixed pages: see the note in + fsp0fsp.cc. */ + + } while (!fseg_free_step(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER + + block->frame, &mtr)); + + byte* hist = TRX_RSEG + TRX_RSEG_HISTORY_SIZE + rseg_hdr->frame; + ut_ad(mach_read_from_4(hist) >= seg_size); + + mtr.write<4>(*rseg_hdr, hist, mach_read_from_4(hist) - seg_size); + + ut_ad(rseg->curr_size >= seg_size); + + rseg->curr_size -= seg_size; + + mutex_exit(&(rseg->mutex)); + + mtr_commit(&mtr); +} + +/** Remove unnecessary history data from a rollback segment. +@param[in,out] rseg rollback segment +@param[in] limit truncate anything before this */ +static +void +trx_purge_truncate_rseg_history( + trx_rseg_t& rseg, + const purge_sys_t::iterator& limit) +{ + fil_addr_t hdr_addr; + fil_addr_t prev_hdr_addr; + mtr_t mtr; + trx_id_t undo_trx_no; + + mtr.start(); + ut_ad(rseg.is_persistent()); + mutex_enter(&rseg.mutex); + + buf_block_t* rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr); + + hdr_addr = flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY + + rseg_hdr->frame); + hdr_addr.boffset = static_cast<uint16_t>(hdr_addr.boffset + - TRX_UNDO_HISTORY_NODE); + +loop: + if (hdr_addr.page == FIL_NULL) { +func_exit: + mutex_exit(&rseg.mutex); + mtr.commit(); + return; + } + + buf_block_t* block = trx_undo_page_get(page_id_t(rseg.space->id, + hdr_addr.page), + &mtr); + undo_trx_no = mach_read_from_8(block->frame + hdr_addr.boffset + + TRX_UNDO_TRX_NO); + + if (undo_trx_no >= limit.trx_no) { + if (undo_trx_no == limit.trx_no) { + trx_undo_truncate_start( + &rseg, hdr_addr.page, + hdr_addr.boffset, limit.undo_no); + } + + goto func_exit; + } + + prev_hdr_addr = flst_get_prev_addr(block->frame + hdr_addr.boffset + + TRX_UNDO_HISTORY_NODE); + prev_hdr_addr.boffset = static_cast<uint16_t>(prev_hdr_addr.boffset + - TRX_UNDO_HISTORY_NODE); + + if (mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE + block->frame) + == TRX_UNDO_TO_PURGE + && !mach_read_from_2(block->frame + hdr_addr.boffset + + TRX_UNDO_NEXT_LOG)) { + + /* We can free the whole log segment */ + + mutex_exit(&rseg.mutex); + mtr.commit(); + + /* calls the trx_purge_remove_log_hdr() + inside trx_purge_free_segment(). */ + trx_purge_free_segment(&rseg, hdr_addr); + } else { + /* Remove the log hdr from the rseg history. */ + trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, + &mtr); + + mutex_exit(&rseg.mutex); + mtr.commit(); + } + + mtr.start(); + mutex_enter(&rseg.mutex); + + rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr); + + hdr_addr = prev_hdr_addr; + + goto loop; +} + +/** Cleanse purge queue to remove the rseg that reside in undo-tablespace +marked for truncate. +@param[in] space undo tablespace being truncated */ +static void trx_purge_cleanse_purge_queue(const fil_space_t& space) +{ + typedef std::vector<TrxUndoRsegs> purge_elem_list_t; + purge_elem_list_t purge_elem_list; + + mutex_enter(&purge_sys.pq_mutex); + + /* Remove rseg instances that are in the purge queue before we start + truncate of corresponding UNDO truncate. */ + while (!purge_sys.purge_queue.empty()) { + purge_elem_list.push_back(purge_sys.purge_queue.top()); + purge_sys.purge_queue.pop(); + } + + for (purge_elem_list_t::iterator it = purge_elem_list.begin(); + it != purge_elem_list.end(); + ++it) { + + for (TrxUndoRsegs::iterator it2 = it->begin(); + it2 != it->end(); + ++it2) { + if ((*it2)->space == &space) { + it->erase(it2); + break; + } + } + + if (!it->empty()) { + purge_sys.purge_queue.push(*it); + } + } + + mutex_exit(&purge_sys.pq_mutex); +} + +/** +Removes unnecessary history data from rollback segments. NOTE that when this +function is called, the caller must not have any latches on undo log pages! +*/ +static void trx_purge_truncate_history() +{ + ut_ad(purge_sys.head <= purge_sys.tail); + purge_sys_t::iterator& head = purge_sys.head.trx_no + ? purge_sys.head : purge_sys.tail; + + if (head.trx_no >= purge_sys.low_limit_no()) { + /* This is sometimes necessary. TODO: find out why. */ + head.trx_no = purge_sys.low_limit_no(); + head.undo_no = 0; + } + + for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) { + if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) { + ut_ad(rseg->id == i); + trx_purge_truncate_rseg_history(*rseg, head); + } + } + + if (srv_undo_tablespaces_active < 2) { + return; + } + + while (srv_undo_log_truncate) { + if (!purge_sys.truncate.current) { + const ulint threshold = ulint(srv_max_undo_log_size + >> srv_page_size_shift); + for (ulint i = purge_sys.truncate.last + ? purge_sys.truncate.last->id + - srv_undo_space_id_start + : 0, j = i;; ) { + ulint space_id = srv_undo_space_id_start + i; + ut_ad(srv_is_undo_tablespace(space_id)); + fil_space_t* space= fil_space_get(space_id); + + if (space && space->get_size() > threshold) { + purge_sys.truncate.current = space; + break; + } + + ++i; + i %= srv_undo_tablespaces_active; + if (i == j) { + break; + } + } + } + + if (!purge_sys.truncate.current) { + return; + } + + fil_space_t& space = *purge_sys.truncate.current; + /* Undo tablespace always are a single file. */ + ut_a(UT_LIST_GET_LEN(space.chain) == 1); + fil_node_t* file = UT_LIST_GET_FIRST(space.chain); + /* The undo tablespace files are never closed. */ + ut_ad(file->is_open()); + + DBUG_LOG("undo", "marking for truncate: " << file->name); + + for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) { + if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) { + ut_ad(rseg->is_persistent()); + if (rseg->space == &space) { + /* Once set, this rseg will + not be allocated to subsequent + transactions, but we will wait + for existing active + transactions to finish. */ + rseg->skip_allocation = true; + } + } + } + + for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) { + trx_rseg_t* rseg = trx_sys.rseg_array[i]; + if (!rseg || rseg->space != &space) { + continue; + } + mutex_enter(&rseg->mutex); + ut_ad(rseg->skip_allocation); + if (rseg->trx_ref_count) { +not_free: + mutex_exit(&rseg->mutex); + return; + } + + if (rseg->curr_size != 1) { + /* Check if all segments are + cached and safe to remove. */ + ulint cached = 0; + + for (trx_undo_t* undo = UT_LIST_GET_FIRST( + rseg->undo_cached); + undo; + undo = UT_LIST_GET_NEXT(undo_list, + undo)) { + if (head.trx_no < undo->trx_id) { + goto not_free; + } else { + cached += undo->size; + } + } + + ut_ad(rseg->curr_size > cached); + + if (rseg->curr_size > cached + 1) { + goto not_free; + } + } + + mutex_exit(&rseg->mutex); + } + + ib::info() << "Truncating " << file->name; + trx_purge_cleanse_purge_queue(space); + + /* Flush all to-be-discarded pages of the tablespace. + + During truncation, we do not want any writes to the + to-be-discarded area, because we must set the space.size + early in order to have deterministic page allocation. + + If a log checkpoint was completed at LSN earlier than our + mini-transaction commit and the server was killed, then + discarding the to-be-trimmed pages without flushing would + break crash recovery. So, we cannot avoid the write. */ + while (buf_flush_list_space(&space)); + + log_free_check(); + + /* Adjust the tablespace metadata. */ + if (!fil_truncate_prepare(space.id)) { + ib::error() << "Failed to find UNDO tablespace " + << file->name; + return; + } + + /* Re-initialize tablespace, in a single mini-transaction. */ + mtr_t mtr; + const ulint size = SRV_UNDO_TABLESPACE_SIZE_IN_PAGES; + mtr.start(); + mtr_x_lock_space(purge_sys.truncate.current, &mtr); + /* Associate the undo tablespace with mtr. + During mtr::commit(), InnoDB can use the undo + tablespace object to clear all freed ranges */ + mtr.set_named_space(purge_sys.truncate.current); + mtr.trim_pages(page_id_t(space.id, size)); + fsp_header_init(purge_sys.truncate.current, size, &mtr); + mutex_enter(&fil_system.mutex); + purge_sys.truncate.current->size = file->size = size; + mutex_exit(&fil_system.mutex); + + buf_block_t* sys_header = trx_sysf_get(&mtr); + + for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) { + trx_rseg_t* rseg = trx_sys.rseg_array[i]; + if (!rseg || rseg->space != &space) { + continue; + } + + ut_ad(rseg->is_persistent()); + ut_d(const ulint old_page = rseg->page_no); + + buf_block_t* rblock = trx_rseg_header_create( + purge_sys.truncate.current, + rseg->id, sys_header, &mtr); + ut_ad(rblock); + rseg->page_no = rblock + ? rblock->page.id().page_no() : FIL_NULL; + ut_ad(old_page == rseg->page_no); + + /* Before re-initialization ensure that we + free the existing structure. There can't be + any active transactions. */ + ut_a(UT_LIST_GET_LEN(rseg->undo_list) == 0); + + trx_undo_t* next_undo; + + for (trx_undo_t* undo = UT_LIST_GET_FIRST( + rseg->undo_cached); + undo; undo = next_undo) { + + next_undo = UT_LIST_GET_NEXT(undo_list, undo); + UT_LIST_REMOVE(rseg->undo_cached, undo); + MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_CACHED); + ut_free(undo); + } + + UT_LIST_INIT(rseg->undo_list, + &trx_undo_t::undo_list); + UT_LIST_INIT(rseg->undo_cached, + &trx_undo_t::undo_list); + + /* These were written by trx_rseg_header_create(). */ + ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + + rblock->frame)); + ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_HISTORY_SIZE + + rblock->frame)); + + /* Initialize the undo log lists according to + the rseg header */ + rseg->curr_size = 1; + rseg->trx_ref_count = 0; + rseg->last_page_no = FIL_NULL; + rseg->last_commit_and_offset = 0; + rseg->needs_purge = false; + } + + mtr.commit(); + /* Write-ahead the redo log record. */ + log_write_up_to(mtr.commit_lsn(), true); + + /* Trim the file size. */ + os_file_truncate(file->name, file->handle, + os_offset_t(size) << srv_page_size_shift, + true); + + /* This is only executed by srv_purge_coordinator_thread. */ + export_vars.innodb_undo_truncations++; + + /* In MDEV-8319 (10.5) we will PUNCH_HOLE the garbage + (with write-ahead logging). */ + mutex_enter(&fil_system.mutex); + ut_ad(&space == purge_sys.truncate.current); + ut_ad(space.is_being_truncated); + purge_sys.truncate.current->set_stopping(false); + purge_sys.truncate.current->is_being_truncated = false; + mutex_exit(&fil_system.mutex); + + if (purge_sys.rseg != NULL + && purge_sys.rseg->last_page_no == FIL_NULL) { + /* If purge_sys.rseg is pointing to rseg that + was recently truncated then move to next rseg + element. Note: Ideally purge_sys.rseg should + be NULL because purge should complete + processing of all the records but there is + purge_batch_size that can force the purge loop + to exit before all the records are purged and + in this case purge_sys.rseg could point to a + valid rseg waiting for next purge cycle. */ + purge_sys.next_stored = false; + purge_sys.rseg = NULL; + } + + DBUG_EXECUTE_IF("ib_undo_trunc", + ib::info() << "ib_undo_trunc"; + log_buffer_flush_to_disk(); + DBUG_SUICIDE();); + + for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) { + if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) { + ut_ad(rseg->is_persistent()); + if (rseg->space == &space) { + rseg->skip_allocation = false; + } + } + } + + ib::info() << "Truncated " << file->name; + purge_sys.truncate.last = purge_sys.truncate.current; + purge_sys.truncate.current = NULL; + } +} + +/***********************************************************************//** +Updates the last not yet purged history log info in rseg when we have purged +a whole undo log. Advances also purge_sys.purge_trx_no past the purged log. */ +static void trx_purge_rseg_get_next_history_log( + ulint* n_pages_handled)/*!< in/out: number of UNDO pages + handled */ +{ + fil_addr_t prev_log_addr; + trx_id_t trx_no; + mtr_t mtr; + + mutex_enter(&purge_sys.rseg->mutex); + + ut_a(purge_sys.rseg->last_page_no != FIL_NULL); + + purge_sys.tail.trx_no = purge_sys.rseg->last_trx_no() + 1; + purge_sys.tail.undo_no = 0; + purge_sys.next_stored = false; + + mtr.start(); + + const buf_block_t* undo_page = trx_undo_page_get_s_latched( + page_id_t(purge_sys.rseg->space->id, + purge_sys.rseg->last_page_no), &mtr); + + const trx_ulogf_t* log_hdr = undo_page->frame + + purge_sys.rseg->last_offset(); + + /* Increase the purge page count by one for every handled log */ + + (*n_pages_handled)++; + + prev_log_addr = flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE); + prev_log_addr.boffset = static_cast<uint16_t>(prev_log_addr.boffset + - TRX_UNDO_HISTORY_NODE); + + + const bool empty = prev_log_addr.page == FIL_NULL; + + if (empty) { + /* No logs left in the history list */ + purge_sys.rseg->last_page_no = FIL_NULL; + } + + mutex_exit(&purge_sys.rseg->mutex); + mtr.commit(); + + if (empty) { + return; + } + + /* Read the previous log header. */ + mtr.start(); + + log_hdr = trx_undo_page_get_s_latched( + page_id_t(purge_sys.rseg->space->id, prev_log_addr.page), + &mtr)->frame + + prev_log_addr.boffset; + + trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); + ut_ad(mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE) <= 1); + + mtr_commit(&mtr); + + mutex_enter(&purge_sys.rseg->mutex); + + purge_sys.rseg->last_page_no = prev_log_addr.page; + purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no); + purge_sys.rseg->needs_purge = log_hdr[TRX_UNDO_NEEDS_PURGE + 1] != 0; + + /* Purge can also produce events, however these are already ordered + in the rollback segment and any user generated event will be greater + than the events that Purge produces. ie. Purge can never produce + events from an empty rollback segment. */ + + mutex_enter(&purge_sys.pq_mutex); + + purge_sys.purge_queue.push(*purge_sys.rseg); + + mutex_exit(&purge_sys.pq_mutex); + + mutex_exit(&purge_sys.rseg->mutex); +} + +/** Position the purge sys "iterator" on the undo record to use for purging. */ +static void trx_purge_read_undo_rec() +{ + uint16_t offset; + uint32_t page_no; + ib_uint64_t undo_no; + + purge_sys.hdr_offset = purge_sys.rseg->last_offset(); + page_no = purge_sys.hdr_page_no = purge_sys.rseg->last_page_no; + + if (purge_sys.rseg->needs_purge) { + mtr_t mtr; + mtr.start(); + buf_block_t* undo_page; + if (trx_undo_rec_t* undo_rec = trx_undo_get_first_rec( + *purge_sys.rseg->space, purge_sys.hdr_page_no, + purge_sys.hdr_offset, RW_S_LATCH, + undo_page, &mtr)) { + + offset = page_offset(undo_rec); + undo_no = trx_undo_rec_get_undo_no(undo_rec); + page_no = undo_page->page.id().page_no(); + } else { + offset = 0; + undo_no = 0; + } + + mtr.commit(); + } else { + offset = 0; + undo_no = 0; + } + + purge_sys.offset = offset; + purge_sys.page_no = page_no; + purge_sys.tail.undo_no = undo_no; + + purge_sys.next_stored = true; +} + +/***********************************************************************//** +Chooses the next undo log to purge and updates the info in purge_sys. This +function is used to initialize purge_sys when the next record to purge is +not known, and also to update the purge system info on the next record when +purge has handled the whole undo log for a transaction. */ +static +void +trx_purge_choose_next_log(void) +/*===========================*/ +{ + ut_ad(!purge_sys.next_stored); + + if (purge_sys.rseg_iter.set_next()) { + trx_purge_read_undo_rec(); + } else { + /* There is nothing to do yet. */ + os_thread_yield(); + } +} + +/***********************************************************************//** +Gets the next record to purge and updates the info in the purge system. +@return copy of an undo log record or pointer to the dummy undo log record */ +static +trx_undo_rec_t* +trx_purge_get_next_rec( +/*===================*/ + ulint* n_pages_handled,/*!< in/out: number of UNDO pages + handled */ + mem_heap_t* heap) /*!< in: memory heap where copied */ +{ + mtr_t mtr; + + ut_ad(purge_sys.next_stored); + ut_ad(purge_sys.tail.trx_no < purge_sys.low_limit_no()); + + const ulint space = purge_sys.rseg->space->id; + const uint32_t page_no = purge_sys.page_no; + const uint16_t offset = purge_sys.offset; + + if (offset == 0) { + /* It is the dummy undo log record, which means that there is + no need to purge this undo log */ + + trx_purge_rseg_get_next_history_log(n_pages_handled); + + /* Look for the next undo log and record to purge */ + + trx_purge_choose_next_log(); + + return(&trx_purge_dummy_rec); + } + + mtr_start(&mtr); + + buf_block_t* undo_page = trx_undo_page_get_s_latched( + page_id_t(space, page_no), &mtr); + buf_block_t* rec2_page = undo_page; + + const trx_undo_rec_t* rec2 = trx_undo_page_get_next_rec( + undo_page, offset, purge_sys.hdr_page_no, purge_sys.hdr_offset); + + if (rec2 == NULL) { + rec2 = trx_undo_get_next_rec(rec2_page, offset, + purge_sys.hdr_page_no, + purge_sys.hdr_offset, &mtr); + } + + if (rec2 == NULL) { + mtr_commit(&mtr); + + trx_purge_rseg_get_next_history_log(n_pages_handled); + + /* Look for the next undo log and record to purge */ + + trx_purge_choose_next_log(); + + mtr_start(&mtr); + + undo_page = trx_undo_page_get_s_latched( + page_id_t(space, page_no), &mtr); + } else { + purge_sys.offset = page_offset(rec2); + purge_sys.page_no = rec2_page->page.id().page_no(); + purge_sys.tail.undo_no = trx_undo_rec_get_undo_no(rec2); + + if (undo_page != rec2_page) { + /* We advance to a new page of the undo log: */ + (*n_pages_handled)++; + } + } + + trx_undo_rec_t* rec_copy = trx_undo_rec_copy(undo_page->frame + offset, + heap); + + mtr_commit(&mtr); + + return(rec_copy); +} + +/********************************************************************//** +Fetches the next undo log record from the history list to purge. It must be +released with the corresponding release function. +@return copy of an undo log record or pointer to trx_purge_dummy_rec, +if the whole undo log can skipped in purge; NULL if none left */ +static MY_ATTRIBUTE((warn_unused_result)) +trx_undo_rec_t* +trx_purge_fetch_next_rec( +/*=====================*/ + roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */ + ulint* n_pages_handled,/*!< in/out: number of UNDO log pages + handled */ + mem_heap_t* heap) /*!< in: memory heap where copied */ +{ + if (!purge_sys.next_stored) { + trx_purge_choose_next_log(); + + if (!purge_sys.next_stored) { + DBUG_PRINT("ib_purge", + ("no logs left in the history list")); + return(NULL); + } + } + + if (purge_sys.tail.trx_no >= purge_sys.low_limit_no()) { + + return(NULL); + } + + /* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n", + os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */ + + *roll_ptr = trx_undo_build_roll_ptr( + /* row_purge_record_func() will later set + ROLL_PTR_INSERT_FLAG for TRX_UNDO_INSERT_REC */ + false, + purge_sys.rseg->id, + purge_sys.page_no, purge_sys.offset); + + /* The following call will advance the stored values of the + purge iterator. */ + + return(trx_purge_get_next_rec(n_pages_handled, heap)); +} + +/** Run a purge batch. +@param n_purge_threads number of purge threads +@return number of undo log pages handled in the batch */ +static +ulint +trx_purge_attach_undo_recs(ulint n_purge_threads) +{ + que_thr_t* thr; + ulint i; + ulint n_pages_handled = 0; + ulint n_thrs = UT_LIST_GET_LEN(purge_sys.query->thrs); + + ut_a(n_purge_threads > 0); + + purge_sys.head = purge_sys.tail; + +#ifdef UNIV_DEBUG + i = 0; + /* Debug code to validate some pre-requisites and reset done flag. */ + for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); + thr != NULL && i < n_purge_threads; + thr = UT_LIST_GET_NEXT(thrs, thr), ++i) { + + purge_node_t* node; + + /* Get the purge node. */ + node = (purge_node_t*) thr->child; + + ut_ad(que_node_get_type(node) == QUE_NODE_PURGE); + ut_ad(node->undo_recs.empty()); + ut_ad(!node->in_progress); + ut_d(node->in_progress = true); + } + + /* There should never be fewer nodes than threads, the inverse + however is allowed because we only use purge threads as needed. */ + ut_ad(i == n_purge_threads); +#endif + + /* Fetch and parse the UNDO records. The UNDO records are added + to a per purge node vector. */ + thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); + ut_a(n_thrs > 0 && thr != NULL); + + ut_ad(purge_sys.head <= purge_sys.tail); + + i = 0; + + const ulint batch_size = srv_purge_batch_size; + std::unordered_map<table_id_t, purge_node_t*> table_id_map; + mem_heap_empty(purge_sys.heap); + + while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) { + purge_node_t* node; + trx_purge_rec_t purge_rec; + + ut_a(!thr->is_active); + + /* Get the purge node. */ + node = (purge_node_t*) thr->child; + ut_a(que_node_get_type(node) == QUE_NODE_PURGE); + + /* Track the max {trx_id, undo_no} for truncating the + UNDO logs once we have purged the records. */ + + if (purge_sys.head <= purge_sys.tail) { + purge_sys.head = purge_sys.tail; + } + + /* Fetch the next record, and advance the purge_sys.tail. */ + purge_rec.undo_rec = trx_purge_fetch_next_rec( + &purge_rec.roll_ptr, &n_pages_handled, + purge_sys.heap); + + if (purge_rec.undo_rec == NULL) { + break; + } else if (purge_rec.undo_rec == &trx_purge_dummy_rec) { + continue; + } + + table_id_t table_id = trx_undo_rec_get_table_id( + purge_rec.undo_rec); + + purge_node_t *& table_node = table_id_map[table_id]; + + if (table_node) { + node = table_node; + } else { + thr = UT_LIST_GET_NEXT(thrs, thr); + + if (!(++i % n_purge_threads)) { + thr = UT_LIST_GET_FIRST( + purge_sys.query->thrs); + } + + ut_a(thr != NULL); + table_node = node; + } + + node->undo_recs.push(purge_rec); + + if (n_pages_handled >= batch_size) { + break; + } + } + + ut_ad(purge_sys.head <= purge_sys.tail); + + return(n_pages_handled); +} + +/*******************************************************************//** +Calculate the DML delay required. +@return delay in microseconds or ULINT_MAX */ +static +ulint +trx_purge_dml_delay(void) +/*=====================*/ +{ + /* Determine how much data manipulation language (DML) statements + need to be delayed in order to reduce the lagging of the purge + thread. */ + ulint delay = 0; /* in microseconds; default: no delay */ + + /* If purge lag is set then calculate the new DML delay. */ + + if (srv_max_purge_lag > 0) { + double ratio = static_cast<double>(trx_sys.rseg_history_len) / + static_cast<double>(srv_max_purge_lag); + + if (ratio > 1.0) { + /* If the history list length exceeds the + srv_max_purge_lag, the data manipulation + statements are delayed by at least 5000 + microseconds. */ + delay = (ulint) ((ratio - .5) * 10000); + } + + if (delay > srv_max_purge_lag_delay) { + delay = srv_max_purge_lag_delay; + } + + MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay); + } + + return(delay); +} + +extern tpool::waitable_task purge_worker_task; + +/** Wait for pending purge jobs to complete. */ +static void trx_purge_wait_for_workers_to_complete() +{ + bool notify_wait = purge_worker_task.is_running(); + + if (notify_wait) + tpool::tpool_wait_begin(); + + purge_worker_task.wait(); + + if(notify_wait) + tpool::tpool_wait_end(); + + /* There should be no outstanding tasks as long + as the worker threads are active. */ + ut_ad(srv_get_task_queue_length() == 0); +} + +/** +Run a purge batch. +@param n_tasks number of purge tasks to submit to the queue +@param truncate whether to truncate the history at the end of the batch +@return number of undo log pages handled in the batch */ +ulint trx_purge(ulint n_tasks, bool truncate) +{ + que_thr_t* thr = NULL; + ulint n_pages_handled; + + ut_ad(n_tasks > 0); + + srv_dml_needed_delay = trx_purge_dml_delay(); + + purge_sys.clone_oldest_view(); + +#ifdef UNIV_DEBUG + if (srv_purge_view_update_only_debug) { + return(0); + } +#endif /* UNIV_DEBUG */ + + /* Fetch the UNDO recs that need to be purged. */ + n_pages_handled = trx_purge_attach_undo_recs(n_tasks); + + /* Submit tasks to workers queue if using multi-threaded purge. */ + for (ulint i = n_tasks; --i; ) { + thr = que_fork_scheduler_round_robin(purge_sys.query, thr); + ut_a(thr); + srv_que_task_enqueue_low(thr); + srv_thread_pool->submit_task(&purge_worker_task); + } + + thr = que_fork_scheduler_round_robin(purge_sys.query, thr); + + que_run_threads(thr); + + trx_purge_wait_for_workers_to_complete(); + + if (truncate) { + trx_purge_truncate_history(); + } + + MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1); + MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled); + + return(n_pages_handled); +} |