diff options
Diffstat (limited to 'storage/innobase/trx/trx0purge.cc')
-rw-r--r-- | storage/innobase/trx/trx0purge.cc | 1416 |
1 files changed, 1416 insertions, 0 deletions
diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc new file mode 100644 index 00000000..625d3223 --- /dev/null +++ b/storage/innobase/trx/trx0purge.cc @@ -0,0 +1,1416 @@ +/***************************************************************************** + +Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2017, 2022, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file trx/trx0purge.cc +Purge old versions + +Created 3/26/1996 Heikki Tuuri +*******************************************************/ + +#include "trx0purge.h" +#include "fsp0fsp.h" +#include "mach0data.h" +#include "mtr0log.h" +#include "que0que.h" +#include "row0purge.h" +#include "row0upd.h" +#include "srv0mon.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "trx0rec.h" +#include "trx0roll.h" +#include "trx0rseg.h" +#include "trx0trx.h" +#include <mysql/service_wsrep.h> + +#include <unordered_map> + +/** Maximum allowable purge history length. <=0 means 'infinite'. */ +ulong srv_max_purge_lag = 0; + +/** Max DML user threads delay in micro-seconds. */ +ulong srv_max_purge_lag_delay = 0; + +/** The global data structure coordinating a purge */ +purge_sys_t purge_sys; + +#ifdef UNIV_DEBUG +my_bool srv_purge_view_update_only_debug; +#endif /* UNIV_DEBUG */ + +/** Sentinel value */ +static const TrxUndoRsegs NullElement; + +/** Default constructor */ +TrxUndoRsegsIterator::TrxUndoRsegsIterator() + : m_rsegs(NullElement), m_iter(m_rsegs.begin()) +{ +} + +/** Sets the next rseg to purge in purge_sys. +Executed in the purge coordinator thread. +@return whether anything is to be purged */ +TRANSACTIONAL_INLINE inline bool TrxUndoRsegsIterator::set_next() +{ + mysql_mutex_lock(&purge_sys.pq_mutex); + + /* Only purge consumes events from the priority queue, user + threads only produce the events. */ + + /* Check if there are more rsegs to process in the + current element. */ + if (m_iter != m_rsegs.end()) { + /* We are still processing rollback segment from + the same transaction and so expected transaction + number shouldn't increase. Undo the increment of + expected commit done by caller assuming rollback + segments from given transaction are done. */ + purge_sys.tail.trx_no = (*m_iter)->last_trx_no(); + } else if (!purge_sys.purge_queue.empty()) { + m_rsegs = purge_sys.purge_queue.top(); + purge_sys.purge_queue.pop(); + ut_ad(purge_sys.purge_queue.empty() + || purge_sys.purge_queue.top() != m_rsegs); + m_iter = m_rsegs.begin(); + } else { + /* Queue is empty, reset iterator. */ + purge_sys.rseg = NULL; + mysql_mutex_unlock(&purge_sys.pq_mutex); + m_rsegs = NullElement; + m_iter = m_rsegs.begin(); + return false; + } + + purge_sys.rseg = *m_iter++; + mysql_mutex_unlock(&purge_sys.pq_mutex); + + /* We assume in purge of externally stored fields that space + id is in the range of UNDO tablespace space ids */ + ut_ad(purge_sys.rseg->space->id == TRX_SYS_SPACE + || srv_is_undo_tablespace(purge_sys.rseg->space->id)); + + trx_id_t last_trx_no; + { +#ifdef SUX_LOCK_GENERIC + purge_sys.rseg->latch.rd_lock(SRW_LOCK_CALL); +#else + transactional_shared_lock_guard<srw_spin_lock> rg + {purge_sys.rseg->latch}; +#endif + last_trx_no = purge_sys.rseg->last_trx_no(); + + purge_sys.hdr_offset = purge_sys.rseg->last_offset(); + purge_sys.hdr_page_no = purge_sys.rseg->last_page_no; + +#ifdef SUX_LOCK_GENERIC + purge_sys.rseg->latch.rd_unlock(); +#endif + } + + /* Only the purge coordinator task will access this object + purge_sys.rseg_iter, or any of purge_sys.hdr_page_no, + purge_sys.tail, purge_sys.head, or modify purge_sys.view. */ + ut_ad(last_trx_no == m_rsegs.trx_no); + ut_a(purge_sys.hdr_page_no != FIL_NULL); + ut_a(purge_sys.tail.trx_no <= last_trx_no); + purge_sys.tail.trx_no = last_trx_no; + + return(true); +} + +/** Build a purge 'query' graph. The actual purge is performed by executing +this query graph. +@return own: the query graph */ +static +que_t* +purge_graph_build() +{ + ut_a(srv_n_purge_threads > 0); + + trx_t* trx = trx_create(); + ut_ad(!trx->id); + trx->start_time = time(NULL); + trx->start_time_micro = microsecond_interval_timer(); + trx->state = TRX_STATE_ACTIVE; + trx->op_info = "purge trx"; + + mem_heap_t* heap = mem_heap_create(512); + que_fork_t* fork = que_fork_create(heap); + fork->trx = trx; + + for (auto i = innodb_purge_threads_MAX; i; i--) { + que_thr_t* thr = que_thr_create(fork, heap, NULL); + thr->child = new(mem_heap_alloc(heap, sizeof(purge_node_t))) + purge_node_t(thr); + } + + return(fork); +} + +/** Initialise the purge system. */ +void purge_sys_t::create() +{ + ut_ad(this == &purge_sys); + ut_ad(!heap); + ut_ad(!enabled()); + m_paused= 0; + m_SYS_paused= 0; + query= purge_graph_build(); + next_stored= false; + rseg= NULL; + page_no= 0; + offset= 0; + hdr_page_no= 0; + hdr_offset= 0; + latch.SRW_LOCK_INIT(trx_purge_latch_key); + end_latch.init(); + mysql_mutex_init(purge_sys_pq_mutex_key, &pq_mutex, nullptr); + truncate.current= NULL; + truncate.last= NULL; + heap= mem_heap_create(4096); +} + +/** Close the purge subsystem on shutdown. */ +void purge_sys_t::close() +{ + ut_ad(this == &purge_sys); + if (!heap) + return; + + ut_ad(!enabled()); + trx_t* trx = query->trx; + que_graph_free(query); + ut_ad(!trx->id); + ut_ad(trx->state == TRX_STATE_ACTIVE); + trx->state= TRX_STATE_NOT_STARTED; + trx->free(); + latch.destroy(); + end_latch.destroy(); + mysql_mutex_destroy(&pq_mutex); + mem_heap_free(heap); + heap= nullptr; +} + +/** Determine if the history of a transaction is purgeable. +@param trx_id transaction identifier +@return whether the history is purgeable */ +TRANSACTIONAL_TARGET bool purge_sys_t::is_purgeable(trx_id_t trx_id) const +{ + bool purgeable; +#if !defined SUX_LOCK_GENERIC && !defined NO_ELISION + purgeable= false; + if (xbegin()) + { + if (!latch.is_write_locked()) + { + purgeable= view.changes_visible(trx_id); + xend(); + } + else + xabort(); + } + else +#endif + { + latch.rd_lock(SRW_LOCK_CALL); + purgeable= view.changes_visible(trx_id); + latch.rd_unlock(); + } + return purgeable; +} + +/*================ UNDO LOG HISTORY LIST =============================*/ + +/** Prepend the history list with an undo log. +Remove the undo log segment from the rseg slot if it is too big for reuse. +@param[in] trx transaction +@param[in,out] undo undo log +@param[in,out] mtr mini-transaction */ +void +trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) +{ + DBUG_PRINT("trx", ("commit(" TRX_ID_FMT "," TRX_ID_FMT ")", + trx->id, trx_id_t{trx->rw_trx_hash_element->no})); + ut_ad(undo == trx->rsegs.m_redo.undo); + trx_rseg_t* rseg = trx->rsegs.m_redo.rseg; + ut_ad(undo->rseg == rseg); + buf_block_t* rseg_header = rseg->get(mtr, nullptr); + /* We are in transaction commit; we cannot return an error. If the + database is corrupted, it is better to crash it than to + intentionally violate ACID by committing something that is known to + be corrupted. */ + ut_ad(rseg_header); + buf_block_t* undo_page = trx_undo_set_state_at_finish( + undo, mtr); + trx_ulogf_t* undo_header = undo_page->page.frame + + undo->hdr_offset; + + ut_ad(mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE) <= 1); + + if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + + rseg_header->page.frame))) { + /* This database must have been upgraded from + before MariaDB 10.3.5. */ + trx_rseg_format_upgrade(rseg_header, mtr); + } + + if (undo->state != TRX_UNDO_CACHED) { + /* The undo log segment will not be reused */ + ut_a(undo->id < TRX_RSEG_N_SLOTS); + compile_time_assert(FIL_NULL == 0xffffffff); + mtr->memset(rseg_header, + TRX_RSEG + TRX_RSEG_UNDO_SLOTS + + undo->id * TRX_RSEG_SLOT_SIZE, 4, 0xff); + + MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED); + + uint32_t hist_size = mach_read_from_4( + TRX_RSEG_HISTORY_SIZE + TRX_RSEG + + rseg_header->page.frame); + + ut_ad(undo->size == flst_get_len(TRX_UNDO_SEG_HDR + + TRX_UNDO_PAGE_LIST + + undo_page->page.frame)); + + mtr->write<4>(*rseg_header, TRX_RSEG + TRX_RSEG_HISTORY_SIZE + + rseg_header->page.frame, + hist_size + undo->size); + mtr->write<8>(*rseg_header, TRX_RSEG + TRX_RSEG_MAX_TRX_ID + + rseg_header->page.frame, + trx_sys.get_max_trx_id()); + } + + /* After the purge thread has been given permission to exit, + we may roll back transactions (trx->undo_no==0) + in THD::cleanup() invoked from unlink_thd() in fast shutdown, + or in trx_rollback_recovered() in slow shutdown. + + Before any transaction-generating background threads or the + purge have been started, we can + start transactions in row_merge_drop_temp_indexes(), + and roll back recovered transactions. + + Arbitrary user transactions may be executed when all the undo log + related background processes (including purge) are disabled due to + innodb_force_recovery=2 or innodb_force_recovery=3. + DROP TABLE may be executed at any innodb_force_recovery level. + + During fast shutdown, we may also continue to execute + user transactions. */ + ut_ad(srv_undo_sources + || trx->undo_no == 0 + || (!purge_sys.enabled() + && (srv_is_being_started + || trx_rollback_is_active + || srv_force_recovery >= SRV_FORCE_NO_BACKGROUND)) + || srv_fast_shutdown); + +#ifdef WITH_WSREP + if (wsrep_is_wsrep_xid(&trx->xid)) { + trx_rseg_update_wsrep_checkpoint(rseg_header, &trx->xid, mtr); + } +#endif + + if (trx->mysql_log_file_name && *trx->mysql_log_file_name) { + /* Update the latest MySQL binlog name and offset info + in rollback segment header if MySQL binlogging is on + or the database server is a MySQL replication save. */ + trx_rseg_update_binlog_offset(rseg_header, trx, mtr); + } + + /* Add the log as the first in the history list */ + + /* We are in transaction commit; we cannot return an error + when detecting corruption. It is better to crash the server + than to intentionally violate ACID by committing something + that is known to be corrupted. */ + ut_a(flst_add_first(rseg_header, TRX_RSEG + TRX_RSEG_HISTORY, undo_page, + static_cast<uint16_t>(undo->hdr_offset + + TRX_UNDO_HISTORY_NODE), + mtr) == DB_SUCCESS); + + mtr->write<8,mtr_t::MAYBE_NOP>(*undo_page, + undo_header + TRX_UNDO_TRX_NO, + trx->rw_trx_hash_element->no); + mtr->write<2,mtr_t::MAYBE_NOP>(*undo_page, undo_header + + TRX_UNDO_NEEDS_PURGE, 1U); + + if (rseg->last_page_no == FIL_NULL) { + rseg->last_page_no = undo->hdr_page_no; + rseg->set_last_commit(undo->hdr_offset, + trx->rw_trx_hash_element->no); + rseg->set_needs_purge(); + } + + rseg->history_size++; + + if (undo->state == TRX_UNDO_CACHED) { + UT_LIST_ADD_FIRST(rseg->undo_cached, undo); + MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED); + } else { + ut_ad(undo->state == TRX_UNDO_TO_PURGE); + ut_free(undo); + } + + undo = NULL; +} + +MY_ATTRIBUTE((nonnull, warn_unused_result)) +/** Remove undo log header from the history list. +@param[in,out] rseg rollback segment header page +@param[in] log undo log segment header page +@param[in] offset byte offset in the undo log segment header page +@param[in,out] mtr mini-transaction */ +static dberr_t trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log, + uint16_t offset, mtr_t *mtr) +{ + return flst_remove(rseg, TRX_RSEG + TRX_RSEG_HISTORY, log, + uint16_t(offset + TRX_UNDO_HISTORY_NODE), mtr); +} + +MY_ATTRIBUTE((nonnull, warn_unused_result)) +/** Free an undo log segment, and remove the header from the history list. +@param[in,out] rseg rollback segment +@param[in] hdr_addr file address of log_hdr +@return error code */ +static dberr_t trx_purge_free_segment(trx_rseg_t *rseg, fil_addr_t hdr_addr) +{ + const page_id_t hdr_page_id{rseg->space->id, hdr_addr.page}; + mtr_t mtr; + mtr.start(); + + /* We only need the latch to maintain rseg->curr_size. To follow the + latching order, we must acquire it before acquiring any related + page latch. */ + rseg->latch.wr_lock(SRW_LOCK_CALL); + + dberr_t err; + buf_block_t *rseg_hdr= rseg->get(&mtr, &err); + if (!rseg_hdr) + goto func_exit; + if (buf_block_t *block= buf_page_get_gen(hdr_page_id, 0, RW_X_LATCH, + nullptr, BUF_GET_POSSIBLY_FREED, + &mtr, &err)) + { + /* Mark the last undo log totally purged, so that if the system + crashes, the tail of the undo log will not get accessed again. The + list of pages in the undo log tail gets inconsistent during the + freeing of the segment, and therefore purge should not try to + access them again. */ + mtr.write<2,mtr_t::MAYBE_NOP>(*block, block->page.frame + + hdr_addr.boffset + TRX_UNDO_NEEDS_PURGE, 0U); + while (!fseg_free_step_not_header(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER + + block->page.frame, &mtr)) + { + rseg->latch.wr_unlock(); + rseg_hdr->fix(); + block->fix(); + mtr.commit(); + mtr.start(); + mtr.flag_modified(); + rseg->latch.wr_lock(SRW_LOCK_CALL); + rseg_hdr->page.lock.x_lock(); + block->page.lock.x_lock(); + mtr.memo_push(rseg_hdr, MTR_MEMO_PAGE_X_FIX); + mtr.memo_push(block, MTR_MEMO_PAGE_X_MODIFY); + } + + /* The page list may now be inconsistent, but the length field + stored in the list base node tells us how big it was before we + started the freeing. */ + const uint32_t seg_size= + flst_get_len(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->page.frame); + + /* We may free the undo log segment header page; it must be freed + within the same mtr as the undo log header is removed from the + history list: otherwise, in case of a database crash, the segment + could become inaccessible garbage in the file space. */ + err= trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, &mtr); + if (UNIV_UNLIKELY(err != DB_SUCCESS)) + goto func_exit; + byte *hist= TRX_RSEG + TRX_RSEG_HISTORY_SIZE + rseg_hdr->page.frame; + if (UNIV_UNLIKELY(mach_read_from_4(hist) < seg_size)) + { + err= DB_CORRUPTION; + goto func_exit; + } + mtr.write<4>(*rseg_hdr, hist, mach_read_from_4(hist) - seg_size); + + /* Here we assume that a file segment with just the header page + can be freed in a few steps, so that the buffer pool is not + flooded with bufferfixed pages: see the note in fsp0fsp.cc. */ + while (!fseg_free_step(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER + + block->page.frame, &mtr)); + + ut_ad(rseg->curr_size >= seg_size); + + rseg->history_size--; + rseg->curr_size -= seg_size; + } + +func_exit: + rseg->latch.wr_unlock(); + mtr.commit(); + return err; +} + +/** Remove unnecessary history data from a rollback segment. +@param[in,out] rseg rollback segment +@param[in] limit truncate anything before this +@return error code */ +static +dberr_t +trx_purge_truncate_rseg_history( + trx_rseg_t& rseg, + const purge_sys_t::iterator& limit) +{ + fil_addr_t hdr_addr; + mtr_t mtr; + + mtr.start(); + ut_ad(rseg.is_persistent()); + rseg.latch.wr_lock(SRW_LOCK_CALL); + + dberr_t err; + buf_block_t* rseg_hdr = rseg.get(&mtr, &err); + if (!rseg_hdr) { + goto func_exit; + } + + hdr_addr = flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY + + rseg_hdr->page.frame); + hdr_addr.boffset = static_cast<uint16_t>(hdr_addr.boffset + - TRX_UNDO_HISTORY_NODE); + +loop: + if (hdr_addr.page == FIL_NULL) { +func_exit: + rseg.latch.wr_unlock(); + mtr.commit(); + return err; + } + + buf_block_t* block = buf_page_get_gen(page_id_t(rseg.space->id, + hdr_addr.page), + 0, RW_X_LATCH, nullptr, + BUF_GET_POSSIBLY_FREED, + &mtr, &err); + if (!block) { + goto func_exit; + } + + const trx_id_t undo_trx_no = mach_read_from_8( + block->page.frame + hdr_addr.boffset + TRX_UNDO_TRX_NO); + + if (undo_trx_no >= limit.trx_no) { + if (undo_trx_no == limit.trx_no) { + err = trx_undo_truncate_start( + &rseg, hdr_addr.page, + hdr_addr.boffset, limit.undo_no); + } + + goto func_exit; + } + + fil_addr_t prev_hdr_addr = flst_get_prev_addr( + block->page.frame + hdr_addr.boffset + TRX_UNDO_HISTORY_NODE); + prev_hdr_addr.boffset = static_cast<uint16_t>(prev_hdr_addr.boffset + - TRX_UNDO_HISTORY_NODE); + + if (mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE + + block->page.frame) + == TRX_UNDO_TO_PURGE + && !mach_read_from_2(block->page.frame + hdr_addr.boffset + + TRX_UNDO_NEXT_LOG)) { + + /* We can free the whole log segment */ + + rseg.latch.wr_unlock(); + mtr.commit(); + + /* calls the trx_purge_remove_log_hdr() + inside trx_purge_free_segment(). */ + err = trx_purge_free_segment(&rseg, hdr_addr); + if (err != DB_SUCCESS) { + return err; + } + } else { + /* Remove the log hdr from the rseg history. */ + err = trx_purge_remove_log_hdr(rseg_hdr, block, + hdr_addr.boffset, &mtr); + if (err != DB_SUCCESS) { + goto func_exit; + } + + rseg.history_size--; + rseg.latch.wr_unlock(); + mtr.commit(); + } + + mtr.start(); + rseg.latch.wr_lock(SRW_LOCK_CALL); + + hdr_addr = prev_hdr_addr; + + rseg_hdr = rseg.get(&mtr, &err); + if (!rseg_hdr) { + goto func_exit; + } + + goto loop; +} + +/** Cleanse purge queue to remove the rseg that reside in undo-tablespace +marked for truncate. +@param[in] space undo tablespace being truncated */ +static void trx_purge_cleanse_purge_queue(const fil_space_t& space) +{ + typedef std::vector<TrxUndoRsegs> purge_elem_list_t; + purge_elem_list_t purge_elem_list; + + mysql_mutex_lock(&purge_sys.pq_mutex); + + /* Remove rseg instances that are in the purge queue before we start + truncate of corresponding UNDO truncate. */ + while (!purge_sys.purge_queue.empty()) { + purge_elem_list.push_back(purge_sys.purge_queue.top()); + purge_sys.purge_queue.pop(); + } + + for (purge_elem_list_t::iterator it = purge_elem_list.begin(); + it != purge_elem_list.end(); + ++it) { + + for (TrxUndoRsegs::iterator it2 = it->begin(); + it2 != it->end(); + ++it2) { + if ((*it2)->space == &space) { + it->erase(it2); + break; + } + } + + if (!it->empty()) { + purge_sys.purge_queue.push(*it); + } + } + + mysql_mutex_unlock(&purge_sys.pq_mutex); +} + +#if defined __GNUC__ && __GNUC__ == 4 && !defined __clang__ +# if defined __arm__ || defined __aarch64__ +/* Work around an internal compiler error in GCC 4.8.5 */ +__attribute__((optimize(0))) +# endif +#endif +/** +Removes unnecessary history data from rollback segments. NOTE that when this +function is called, the caller must not have any latches on undo log pages! +*/ +TRANSACTIONAL_TARGET static void trx_purge_truncate_history() +{ + ut_ad(purge_sys.head <= purge_sys.tail); + purge_sys_t::iterator &head= purge_sys.head.trx_no + ? purge_sys.head : purge_sys.tail; + + if (head.trx_no >= purge_sys.low_limit_no()) + { + /* This is sometimes necessary. TODO: find out why. */ + head.trx_no= purge_sys.low_limit_no(); + head.undo_no= 0; + } + + dberr_t err= DB_SUCCESS; + for (auto &rseg : trx_sys.rseg_array) + if (rseg.space) + if (dberr_t e= trx_purge_truncate_rseg_history(rseg, head)) + err= e; + + if (err != DB_SUCCESS || srv_undo_tablespaces_active < 2) + return; + + while (srv_undo_log_truncate) + { + if (!purge_sys.truncate.current) + { + const ulint threshold= + ulint(srv_max_undo_log_size >> srv_page_size_shift); + for (ulint i= purge_sys.truncate.last + ? purge_sys.truncate.last->id - srv_undo_space_id_start : 0, + j= i;; ) + { + const auto space_id= srv_undo_space_id_start + i; + ut_ad(srv_is_undo_tablespace(space_id)); + fil_space_t *space= fil_space_get(space_id); + ut_a(UT_LIST_GET_LEN(space->chain) == 1); + + if (space && space->get_size() > threshold) + { + purge_sys.truncate.current= space; + break; + } + + ++i; + i %= srv_undo_tablespaces_active; + if (i == j) + return; + } + } + + fil_space_t &space= *purge_sys.truncate.current; + /* Undo tablespace always are a single file. */ + fil_node_t *file= UT_LIST_GET_FIRST(space.chain); + /* The undo tablespace files are never closed. */ + ut_ad(file->is_open()); + + DBUG_LOG("undo", "marking for truncate: " << file->name); + + for (auto &rseg : trx_sys.rseg_array) + if (rseg.space == &space) + /* Once set, this rseg will not be allocated to subsequent + transactions, but we will wait for existing active + transactions to finish. */ + rseg.set_skip_allocation(); + + for (auto &rseg : trx_sys.rseg_array) + { + if (rseg.space != &space) + continue; +#ifdef SUX_LOCK_GENERIC + rseg.latch.rd_lock(SRW_LOCK_CALL); +#else + transactional_shared_lock_guard<srw_spin_lock> g{rseg.latch}; +#endif + ut_ad(rseg.skip_allocation()); + if (rseg.is_referenced()) + { +not_free: +#ifdef SUX_LOCK_GENERIC + rseg.latch.rd_unlock(); +#endif + return; + } + + if (rseg.curr_size != 1) + { + /* Check if all segments are cached and safe to remove. */ + ulint cached= 0; + for (trx_undo_t *undo= UT_LIST_GET_FIRST(rseg.undo_cached); undo; + undo= UT_LIST_GET_NEXT(undo_list, undo)) + { + if (head.trx_no < undo->trx_id) + goto not_free; + else + cached+= undo->size; + } + + ut_ad(rseg.curr_size > cached); + + if (rseg.curr_size > cached + 1) + goto not_free; + } + +#ifdef SUX_LOCK_GENERIC + rseg.latch.rd_unlock(); +#endif + } + + ib::info() << "Truncating " << file->name; + trx_purge_cleanse_purge_queue(space); + + log_free_check(); + + mtr_t mtr; + mtr.start(); + mtr.x_lock_space(&space); + + /* Lock all modified pages of the tablespace. + + During truncation, we do not want any writes to the file. + + If a log checkpoint was completed at LSN earlier than our + mini-transaction commit and the server was killed, then + discarding the to-be-trimmed pages without flushing would + break crash recovery. */ + mysql_mutex_lock(&buf_pool.flush_list_mutex); + + for (buf_page_t *bpage= UT_LIST_GET_LAST(buf_pool.flush_list); bpage; ) + { + ut_ad(bpage->oldest_modification()); + ut_ad(bpage->in_file()); + + buf_page_t *prev= UT_LIST_GET_PREV(list, bpage); + + if (bpage->id().space() == space.id && + bpage->oldest_modification() != 1) + { + ut_ad(bpage->frame); + auto block= reinterpret_cast<buf_block_t*>(bpage); + if (!bpage->lock.x_lock_try()) + { + /* Let buf_pool_t::release_freed_page() proceed. */ + mysql_mutex_unlock(&buf_pool.flush_list_mutex); + std::this_thread::yield(); + mysql_mutex_lock(&buf_pool.flush_list_mutex); + rescan: + bpage= UT_LIST_GET_LAST(buf_pool.flush_list); + continue; + } + buf_pool.flush_hp.set(prev); + mysql_mutex_unlock(&buf_pool.flush_list_mutex); + +#ifdef BTR_CUR_HASH_ADAPT + ut_ad(!block->index); /* There is no AHI on undo tablespaces. */ +#endif + bpage->fix(); + ut_ad(!bpage->is_io_fixed()); + mysql_mutex_lock(&buf_pool.flush_list_mutex); + + if (bpage->oldest_modification() > 1) + { + bpage->reset_oldest_modification(); + mtr.memo_push(block, MTR_MEMO_PAGE_X_FIX); + } + else + { + bpage->unfix(); + bpage->lock.x_unlock(); + } + + if (prev != buf_pool.flush_hp.get()) + /* Rescan, because we may have lost the position. */ + goto rescan; + } + + bpage= prev; + } + + mysql_mutex_unlock(&buf_pool.flush_list_mutex); + + /* Re-initialize tablespace, in a single mini-transaction. */ + const ulint size= SRV_UNDO_TABLESPACE_SIZE_IN_PAGES; + + /* Adjust the tablespace metadata. */ + mysql_mutex_lock(&fil_system.mutex); + space.set_stopping(); + space.is_being_truncated= true; + if (space.crypt_data) + { + space.reacquire(); + mysql_mutex_unlock(&fil_system.mutex); + fil_space_crypt_close_tablespace(&space); + space.release(); + } + else + mysql_mutex_unlock(&fil_system.mutex); + + for (auto i= 6000; space.referenced(); + std::this_thread::sleep_for(std::chrono::milliseconds(10))) + { + if (!--i) + { + mtr.commit(); + ib::error() << "Failed to freeze UNDO tablespace " << file->name; + return; + } + } + + /* Associate the undo tablespace with mtr. + During mtr::commit_shrink(), InnoDB can use the undo + tablespace object to clear all freed ranges */ + mtr.set_named_space(&space); + mtr.trim_pages(page_id_t(space.id, size)); + ut_a(fsp_header_init(&space, size, &mtr) == DB_SUCCESS); + mysql_mutex_lock(&fil_system.mutex); + space.size= file->size= size; + mysql_mutex_unlock(&fil_system.mutex); + + for (auto &rseg : trx_sys.rseg_array) + { + if (rseg.space != &space) + continue; + + dberr_t err; + buf_block_t *rblock= trx_rseg_header_create(&space, + &rseg - trx_sys.rseg_array, + trx_sys.get_max_trx_id(), + &mtr, &err); + ut_a(rblock); + /* These were written by trx_rseg_header_create(). */ + ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + + rblock->page.frame)); + ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_HISTORY_SIZE + + rblock->page.frame)); + rseg.reinit(rblock->page.id().page_no()); + } + + mtr.commit_shrink(space); + + /* No mutex; this is only updated by the purge coordinator. */ + export_vars.innodb_undo_truncations++; + + if (purge_sys.rseg && purge_sys.rseg->last_page_no == FIL_NULL) + { + /* If purge_sys.rseg is pointing to rseg that was recently + truncated then move to next rseg element. + + Note: Ideally purge_sys.rseg should be NULL because purge should + complete processing of all the records but srv_purge_batch_size + can force the purge loop to exit before all the records are purged. */ + purge_sys.rseg= nullptr; + purge_sys.next_stored= false; + } + + DBUG_EXECUTE_IF("ib_undo_trunc", ib::info() << "ib_undo_trunc"; + log_buffer_flush_to_disk(); + DBUG_SUICIDE();); + + for (auto &rseg : trx_sys.rseg_array) + if (rseg.space == &space) + rseg.clear_skip_allocation(); + + ib::info() << "Truncated " << file->name; + purge_sys.truncate.last= purge_sys.truncate.current; + ut_ad(&space == purge_sys.truncate.current); + purge_sys.truncate.current= nullptr; + } +} + +/***********************************************************************//** +Updates the last not yet purged history log info in rseg when we have purged +a whole undo log. Advances also purge_sys.purge_trx_no past the purged log. */ +static void trx_purge_rseg_get_next_history_log( + ulint* n_pages_handled)/*!< in/out: number of UNDO pages + handled */ +{ + fil_addr_t prev_log_addr; + mtr_t mtr; + + mtr.start(); + + purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL); + + ut_a(purge_sys.rseg->last_page_no != FIL_NULL); + + purge_sys.tail.trx_no= purge_sys.rseg->last_trx_no() + 1; + purge_sys.tail.undo_no= 0; + purge_sys.next_stored= false; + + if (const buf_block_t* undo_page= + buf_page_get_gen(page_id_t(purge_sys.rseg->space->id, + purge_sys.rseg->last_page_no), + 0, RW_S_LATCH, nullptr, + BUF_GET_POSSIBLY_FREED, &mtr)) + { + const trx_ulogf_t *log_hdr= + undo_page->page.frame + purge_sys.rseg->last_offset(); + /* Increase the purge page count by one for every handled log */ + ++*n_pages_handled; + prev_log_addr= flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE); + prev_log_addr.boffset = static_cast<uint16_t>(prev_log_addr.boffset - + TRX_UNDO_HISTORY_NODE); + } + else + prev_log_addr.page= FIL_NULL; + + const bool empty= prev_log_addr.page == FIL_NULL; + + if (empty) + /* No logs left in the history list */ + purge_sys.rseg->last_page_no= FIL_NULL; + + purge_sys.rseg->latch.wr_unlock(); + mtr.commit(); + + if (empty) + return; + + /* Read the previous log header. */ + mtr.start(); + + byte needs_purge= 0; + trx_id_t trx_no= 0; + + if (const buf_block_t* undo_page= + buf_page_get_gen(page_id_t(purge_sys.rseg->space->id, prev_log_addr.page), + 0, RW_S_LATCH, nullptr, BUF_GET_POSSIBLY_FREED, &mtr)) + { + const byte *log_hdr= undo_page->page.frame + prev_log_addr.boffset; + + trx_no= mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); + ut_ad(mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE) <= 1); + needs_purge= log_hdr[TRX_UNDO_NEEDS_PURGE + 1]; + } + + mtr.commit(); + + if (UNIV_UNLIKELY(!trx_no)) + return; + + purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL); + purge_sys.rseg->last_page_no= prev_log_addr.page; + purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no); + + if (needs_purge) + purge_sys.rseg->set_needs_purge(); + else + purge_sys.rseg->clear_needs_purge(); + + /* Purge can also produce events, however these are already ordered + in the rollback segment and any user generated event will be greater + than the events that Purge produces. ie. Purge can never produce + events from an empty rollback segment. */ + + mysql_mutex_lock(&purge_sys.pq_mutex); + purge_sys.purge_queue.push(*purge_sys.rseg); + mysql_mutex_unlock(&purge_sys.pq_mutex); + purge_sys.rseg->latch.wr_unlock(); +} + +/** Position the purge sys "iterator" on the undo record to use for purging. */ +static void trx_purge_read_undo_rec() +{ + uint16_t offset; + uint32_t page_no; + ib_uint64_t undo_no; + + purge_sys.hdr_offset = purge_sys.rseg->last_offset(); + page_no = purge_sys.hdr_page_no = purge_sys.rseg->last_page_no; + + if (purge_sys.rseg->needs_purge()) { + mtr_t mtr; + mtr.start(); + const buf_block_t* undo_page; + if (trx_undo_rec_t* undo_rec = trx_undo_get_first_rec( + *purge_sys.rseg->space, purge_sys.hdr_page_no, + purge_sys.hdr_offset, RW_S_LATCH, + undo_page, &mtr, nullptr)) { + + offset = page_offset(undo_rec); + undo_no = trx_undo_rec_get_undo_no(undo_rec); + page_no = undo_page->page.id().page_no(); + } else { + offset = 0; + undo_no = 0; + } + + mtr.commit(); + } else { + offset = 0; + undo_no = 0; + } + + purge_sys.offset = offset; + purge_sys.page_no = page_no; + purge_sys.tail.undo_no = undo_no; + + purge_sys.next_stored = true; +} + +/***********************************************************************//** +Chooses the next undo log to purge and updates the info in purge_sys. This +function is used to initialize purge_sys when the next record to purge is +not known, and also to update the purge system info on the next record when +purge has handled the whole undo log for a transaction. */ +TRANSACTIONAL_TARGET static void trx_purge_choose_next_log() +{ + ut_ad(!purge_sys.next_stored); + + if (purge_sys.rseg_iter.set_next()) { + trx_purge_read_undo_rec(); + } else { + /* There is nothing to do yet. */ + std::this_thread::yield(); + } +} + +/***********************************************************************//** +Gets the next record to purge and updates the info in the purge system. +@return copy of an undo log record +@retval -1 if there is nothing to purge +@retval nullptr on corruption */ +static +trx_undo_rec_t* +trx_purge_get_next_rec( +/*===================*/ + ulint* n_pages_handled,/*!< in/out: number of UNDO pages + handled */ + mem_heap_t* heap) /*!< in: memory heap where copied */ +{ + mtr_t mtr; + + ut_ad(purge_sys.next_stored); + ut_ad(purge_sys.tail.trx_no < purge_sys.low_limit_no()); + + const page_id_t page_id{purge_sys.rseg->space->id, purge_sys.page_no}; + const uint16_t offset = purge_sys.offset; + + if (offset == 0) { + /* It is the dummy undo log record, which means that there is + no need to purge this undo log */ + + trx_purge_rseg_get_next_history_log(n_pages_handled); + + /* Look for the next undo log and record to purge */ + + trx_purge_choose_next_log(); + return reinterpret_cast<trx_undo_rec_t*>(-1); + } + + mtr.start(); + + const buf_block_t* undo_page + = buf_page_get_gen(page_id, 0, RW_S_LATCH, nullptr, + BUF_GET_POSSIBLY_FREED, &mtr); + if (UNIV_UNLIKELY(!undo_page)) { +corrupted: + mtr.commit(); + return nullptr; + } + + const buf_block_t* rec2_page = undo_page; + + const trx_undo_rec_t* rec2 = trx_undo_page_get_next_rec( + undo_page, offset, purge_sys.hdr_page_no, purge_sys.hdr_offset); + + if (rec2 == NULL) { + rec2 = trx_undo_get_next_rec(rec2_page, offset, + purge_sys.hdr_page_no, + purge_sys.hdr_offset, &mtr); + } + + if (rec2 == NULL) { + mtr_commit(&mtr); + + trx_purge_rseg_get_next_history_log(n_pages_handled); + + /* Look for the next undo log and record to purge */ + + trx_purge_choose_next_log(); + + mtr_start(&mtr); + + undo_page = buf_page_get_gen(page_id, 0, RW_S_LATCH, + nullptr, BUF_GET_POSSIBLY_FREED, + &mtr); + if (UNIV_UNLIKELY(!undo_page)) { + goto corrupted; + } + } else { + purge_sys.offset = page_offset(rec2); + purge_sys.page_no = rec2_page->page.id().page_no(); + purge_sys.tail.undo_no = trx_undo_rec_get_undo_no(rec2); + + if (undo_page != rec2_page) { + /* We advance to a new page of the undo log: */ + (*n_pages_handled)++; + } + } + + trx_undo_rec_t* rec_copy = trx_undo_rec_copy(undo_page->page.frame + + offset, heap); + + mtr.commit(); + return rec_copy; +} + +/********************************************************************//** +Fetches the next undo log record from the history list to purge. It must be +released with the corresponding release function. +@return copy of an undo log record +@retval -1 if the whole undo log can skipped in purge +@retval nullptr if nothing is left, or on corruption */ +static MY_ATTRIBUTE((warn_unused_result)) +trx_undo_rec_t* +trx_purge_fetch_next_rec( +/*=====================*/ + roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */ + ulint* n_pages_handled,/*!< in/out: number of UNDO log pages + handled */ + mem_heap_t* heap) /*!< in: memory heap where copied */ +{ + if (!purge_sys.next_stored) { + trx_purge_choose_next_log(); + + if (!purge_sys.next_stored) { + DBUG_PRINT("ib_purge", + ("no logs left in the history list")); + return nullptr; + } + } + + if (purge_sys.tail.trx_no >= purge_sys.low_limit_no()) { + return nullptr; + } + + /* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n", + pthread_self(), iter->trx_no, iter->undo_no); */ + + *roll_ptr = trx_undo_build_roll_ptr( + /* row_purge_record_func() will later set + ROLL_PTR_INSERT_FLAG for TRX_UNDO_INSERT_REC */ + false, + trx_sys.rseg_id(purge_sys.rseg, true), + purge_sys.page_no, purge_sys.offset); + + /* The following call will advance the stored values of the + purge iterator. */ + + return trx_purge_get_next_rec(n_pages_handled, heap); +} + +/** Run a purge batch. +@param n_purge_threads number of purge threads +@return number of undo log pages handled in the batch */ +static +ulint +trx_purge_attach_undo_recs(ulint n_purge_threads) +{ + que_thr_t* thr; + ulint i; + ulint n_pages_handled = 0; + ulint n_thrs = UT_LIST_GET_LEN(purge_sys.query->thrs); + + ut_a(n_purge_threads > 0); + + purge_sys.head = purge_sys.tail; + +#ifdef UNIV_DEBUG + i = 0; + /* Debug code to validate some pre-requisites and reset done flag. */ + for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); + thr != NULL && i < n_purge_threads; + thr = UT_LIST_GET_NEXT(thrs, thr), ++i) { + + purge_node_t* node; + + /* Get the purge node. */ + node = (purge_node_t*) thr->child; + + ut_ad(que_node_get_type(node) == QUE_NODE_PURGE); + ut_ad(node->undo_recs.empty()); + ut_ad(!node->in_progress); + ut_d(node->in_progress = true); + } + + /* There should never be fewer nodes than threads, the inverse + however is allowed because we only use purge threads as needed. */ + ut_ad(i == n_purge_threads); +#endif + + /* Fetch and parse the UNDO records. The UNDO records are added + to a per purge node vector. */ + thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); + ut_a(n_thrs > 0 && thr != NULL); + + ut_ad(purge_sys.head <= purge_sys.tail); + + i = 0; + + std::unordered_map<table_id_t, purge_node_t*> table_id_map; + mem_heap_empty(purge_sys.heap); + + while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) { + purge_node_t* node; + trx_purge_rec_t purge_rec; + + /* Get the purge node. */ + node = (purge_node_t*) thr->child; + ut_a(que_node_get_type(node) == QUE_NODE_PURGE); + + /* Track the max {trx_id, undo_no} for truncating the + UNDO logs once we have purged the records. */ + + if (purge_sys.head <= purge_sys.tail) { + purge_sys.head = purge_sys.tail; + } + + /* Fetch the next record, and advance the purge_sys.tail. */ + purge_rec.undo_rec = trx_purge_fetch_next_rec( + &purge_rec.roll_ptr, &n_pages_handled, + purge_sys.heap); + + if (purge_rec.undo_rec == NULL) { + break; + } else if (purge_rec.undo_rec + == reinterpret_cast<trx_undo_rec_t*>(-1)) { + continue; + } + + table_id_t table_id = trx_undo_rec_get_table_id( + purge_rec.undo_rec); + + purge_node_t *& table_node = table_id_map[table_id]; + + if (table_node) { + node = table_node; + } else { + thr = UT_LIST_GET_NEXT(thrs, thr); + + if (!(++i % n_purge_threads)) { + thr = UT_LIST_GET_FIRST( + purge_sys.query->thrs); + } + + ut_a(thr != NULL); + table_node = node; + } + + node->undo_recs.push(purge_rec); + + if (n_pages_handled >= srv_purge_batch_size) { + break; + } + } + + ut_ad(purge_sys.head <= purge_sys.tail); + + return(n_pages_handled); +} + +/*******************************************************************//** +Calculate the DML delay required. +@return delay in microseconds or ULINT_MAX */ +static +ulint +trx_purge_dml_delay(void) +/*=====================*/ +{ + /* Determine how much data manipulation language (DML) statements + need to be delayed in order to reduce the lagging of the purge + thread. */ + ulint delay = 0; /* in microseconds; default: no delay */ + + /* If purge lag is set then calculate the new DML delay. */ + + if (srv_max_purge_lag > 0) { + double ratio = static_cast<double>(trx_sys.history_size()) / + static_cast<double>(srv_max_purge_lag); + + if (ratio > 1.0) { + /* If the history list length exceeds the + srv_max_purge_lag, the data manipulation + statements are delayed by at least 5000 + microseconds. */ + delay = (ulint) ((ratio - .5) * 10000); + } + + if (delay > srv_max_purge_lag_delay) { + delay = srv_max_purge_lag_delay; + } + + MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay); + } + + return(delay); +} + +extern tpool::waitable_task purge_worker_task; + +/** Wait for pending purge jobs to complete. */ +static void trx_purge_wait_for_workers_to_complete() +{ + const bool notify_wait{purge_worker_task.is_running()}; + + if (notify_wait) + tpool::tpool_wait_begin(); + + purge_worker_task.wait(); + + if (notify_wait) + tpool::tpool_wait_end(); + + /* There should be no outstanding tasks as long + as the worker threads are active. */ + ut_ad(srv_get_task_queue_length() == 0); +} + +/** Update end_view at the end of a purge batch. */ +TRANSACTIONAL_INLINE void purge_sys_t::clone_end_view() +{ + /* This is only invoked only by the purge coordinator, + which is the only thread that can modify our inputs head, tail, view. + Therefore, we only need to protect end_view from concurrent reads. */ + + /* Limit the end_view similar to what trx_purge_truncate_history() does. */ + const trx_id_t trx_no= head.trx_no ? head.trx_no : tail.trx_no; +#ifdef SUX_LOCK_GENERIC + end_latch.wr_lock(); +#else + transactional_lock_guard<srw_spin_lock_low> g(end_latch); +#endif + end_view= view; + end_view.clamp_low_limit_id(trx_no); +#ifdef SUX_LOCK_GENERIC + end_latch.wr_unlock(); +#endif +} + +/** +Run a purge batch. +@param n_tasks number of purge tasks to submit to the queue +@param truncate whether to truncate the history at the end of the batch +@return number of undo log pages handled in the batch */ +TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, bool truncate) +{ + que_thr_t* thr = NULL; + ulint n_pages_handled; + + ut_ad(n_tasks > 0); + + srv_dml_needed_delay = trx_purge_dml_delay(); + + purge_sys.clone_oldest_view(); + +#ifdef UNIV_DEBUG + if (srv_purge_view_update_only_debug) { + return(0); + } +#endif /* UNIV_DEBUG */ + + /* Fetch the UNDO recs that need to be purged. */ + n_pages_handled = trx_purge_attach_undo_recs(n_tasks); + + /* Submit tasks to workers queue if using multi-threaded purge. */ + for (ulint i = n_tasks; --i; ) { + thr = que_fork_scheduler_round_robin(purge_sys.query, thr); + ut_a(thr); + srv_que_task_enqueue_low(thr); + srv_thread_pool->submit_task(&purge_worker_task); + } + + thr = que_fork_scheduler_round_robin(purge_sys.query, thr); + + que_run_threads(thr); + + trx_purge_wait_for_workers_to_complete(); + + purge_sys.clone_end_view(); + + if (truncate) { + trx_purge_truncate_history(); + } + + MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1); + MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled); + + return(n_pages_handled); +} |