/***************************************************************************** Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2017, 2021, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA *****************************************************************************/ /**************************************************//** @file include/trx0sys.h Transaction system Created 3/26/1996 Heikki Tuuri *******************************************************/ #ifndef trx0sys_h #define trx0sys_h #include "buf0buf.h" #include "fil0fil.h" #include "trx0types.h" #include "mem0mem.h" #include "mtr0mtr.h" #include "ut0byte.h" #include "ut0lst.h" #include "read0types.h" #include "page0types.h" #include "ut0mutex.h" #include "trx0trx.h" #ifdef WITH_WSREP #include "trx0xa.h" #endif /* WITH_WSREP */ #include "ilist.h" /** Checks if a page address is the trx sys header page. @param[in] page_id page id @return true if trx sys header page */ inline bool trx_sys_hdr_page(const page_id_t page_id) { return page_id == page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO); } /*****************************************************************//** Creates and initializes the transaction system at the database creation. */ void trx_sys_create_sys_pages(void); /*==========================*/ /** Find an available rollback segment. @param[in] sys_header @return an unallocated rollback segment slot in the TRX_SYS header @retval ULINT_UNDEFINED if not found */ ulint trx_sys_rseg_find_free(const buf_block_t* sys_header); /** Request the TRX_SYS page. @param[in] rw whether to lock the page for writing @return the TRX_SYS page @retval NULL if the page cannot be read */ inline buf_block_t *trx_sysf_get(mtr_t* mtr, bool rw= true) { buf_block_t* block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO), 0, rw ? RW_X_LATCH : RW_S_LATCH, mtr); ut_d(if (block) buf_block_dbg_add_level(block, SYNC_TRX_SYS_HEADER);) return block; } #ifdef UNIV_DEBUG /* Flag to control TRX_RSEG_N_SLOTS behavior debugging. */ extern uint trx_rseg_n_slots_debug; #endif /** Write DB_TRX_ID. @param[out] db_trx_id the DB_TRX_ID field to be written to @param[in] id transaction ID */ UNIV_INLINE void trx_write_trx_id(byte* db_trx_id, trx_id_t id) { compile_time_assert(DATA_TRX_ID_LEN == 6); mach_write_to_6(db_trx_id, id); } /** Read a transaction identifier. @return id */ inline trx_id_t trx_read_trx_id(const byte* ptr) { compile_time_assert(DATA_TRX_ID_LEN == 6); return(mach_read_from_6(ptr)); } #ifdef UNIV_DEBUG /** Check that the DB_TRX_ID in a record is valid. @param[in] db_trx_id the DB_TRX_ID column to validate @param[in] trx_id the id of the ALTER TABLE transaction */ inline bool trx_id_check(const void* db_trx_id, trx_id_t trx_id) { trx_id_t id = trx_read_trx_id(static_cast(db_trx_id)); ut_ad(id == 0 || id > trx_id); return true; } #endif /*****************************************************************//** Updates the offset information about the end of the MySQL binlog entry which corresponds to the transaction just being committed. In a MySQL replication slave updates the latest master binlog position up to which replication has proceeded. */ void trx_sys_update_mysql_binlog_offset( /*===============================*/ const char* file_name,/*!< in: MySQL log file name */ int64_t offset, /*!< in: position in that log file */ buf_block_t* sys_header, /*!< in,out: trx sys header */ mtr_t* mtr); /*!< in,out: mini-transaction */ /** Display the MySQL binlog offset info if it is present in the trx system header. */ void trx_sys_print_mysql_binlog_offset(); /** Create the rollback segments. @return whether the creation succeeded */ bool trx_sys_create_rsegs(); /** The automatically created system rollback segment has this id */ #define TRX_SYS_SYSTEM_RSEG_ID 0 /** The offset of the transaction system header on the page */ #define TRX_SYS FSEG_PAGE_DATA /** Transaction system header */ /*------------------------------------------------------------- @{ */ /** In old versions of InnoDB, this persisted the value of trx_sys.get_max_trx_id(). Starting with MariaDB 10.3.5, the field TRX_RSEG_MAX_TRX_ID in rollback segment header pages and the fields TRX_UNDO_TRX_ID, TRX_UNDO_TRX_NO in undo log pages are used instead. The field only exists for the purpose of upgrading from older MySQL or MariaDB versions. */ #define TRX_SYS_TRX_ID_STORE 0 #define TRX_SYS_FSEG_HEADER 8 /*!< segment header for the tablespace segment the trx system is created into */ #define TRX_SYS_RSEGS (8 + FSEG_HEADER_SIZE) /*!< the start of the array of rollback segment specification slots */ /*------------------------------------------------------------- @} */ /** The number of rollback segments; rollback segment id must fit in the 7 bits reserved for it in DB_ROLL_PTR. */ #define TRX_SYS_N_RSEGS 128 /** Maximum number of undo tablespaces (not counting the system tablespace) */ #define TRX_SYS_MAX_UNDO_SPACES (TRX_SYS_N_RSEGS - 1) /* Rollback segment specification slot offsets */ /** the tablespace ID of an undo log header; starting with MySQL/InnoDB 5.1.7, this is FIL_NULL if the slot is unused */ #define TRX_SYS_RSEG_SPACE 0 /** the page number of an undo log header, or FIL_NULL if unused */ #define TRX_SYS_RSEG_PAGE_NO 4 /** Size of a rollback segment specification slot */ #define TRX_SYS_RSEG_SLOT_SIZE 8 /** Read the tablespace ID of a rollback segment slot. @param[in] sys_header TRX_SYS page @param[in] rseg_id rollback segment identifier @return undo tablespace id */ inline uint32_t trx_sysf_rseg_get_space(const buf_block_t* sys_header, ulint rseg_id) { ut_ad(rseg_id < TRX_SYS_N_RSEGS); return mach_read_from_4(TRX_SYS + TRX_SYS_RSEGS + TRX_SYS_RSEG_SPACE + rseg_id * TRX_SYS_RSEG_SLOT_SIZE + sys_header->frame); } /** Read the page number of a rollback segment slot. @param[in] sys_header TRX_SYS page @param[in] rseg_id rollback segment identifier @return undo page number */ inline uint32_t trx_sysf_rseg_get_page_no(const buf_block_t *sys_header, ulint rseg_id) { ut_ad(rseg_id < TRX_SYS_N_RSEGS); return mach_read_from_4(TRX_SYS + TRX_SYS_RSEGS + TRX_SYS_RSEG_PAGE_NO + rseg_id * TRX_SYS_RSEG_SLOT_SIZE + sys_header->frame); } /** Maximum length of MySQL binlog file name, in bytes. (Used before MariaDB 10.3.5.) */ #define TRX_SYS_MYSQL_LOG_NAME_LEN 512 /** Contents of TRX_SYS_MYSQL_LOG_MAGIC_N_FLD */ #define TRX_SYS_MYSQL_LOG_MAGIC_N 873422344 #if UNIV_PAGE_SIZE_MIN < 4096 # error "UNIV_PAGE_SIZE_MIN < 4096" #endif /** The offset of the MySQL binlog offset info in the trx system header */ #define TRX_SYS_MYSQL_LOG_INFO (srv_page_size - 1000) #define TRX_SYS_MYSQL_LOG_MAGIC_N_FLD 0 /*!< magic number which is TRX_SYS_MYSQL_LOG_MAGIC_N if we have valid data in the MySQL binlog info */ #define TRX_SYS_MYSQL_LOG_OFFSET 4 /*!< the 64-bit offset within that file */ #define TRX_SYS_MYSQL_LOG_NAME 12 /*!< MySQL log file name */ /** Memory map TRX_SYS_PAGE_NO = 5 when srv_page_size = 4096 0...37 FIL_HEADER 38...45 TRX_SYS_TRX_ID_STORE 46...55 TRX_SYS_FSEG_HEADER (FSEG_HEADER_SIZE == 10) 56 TRX_SYS_RSEGS 56...59 TRX_SYS_RSEG_SPACE for slot 0 60...63 TRX_SYS_RSEG_PAGE_NO for slot 0 64...67 TRX_SYS_RSEG_SPACE for slot 1 68...71 TRX_SYS_RSEG_PAGE_NO for slot 1 .... 594..597 TRX_SYS_RSEG_SPACE for slot 72 598..601 TRX_SYS_RSEG_PAGE_NO for slot 72 ... ...1063 TRX_SYS_RSEG_PAGE_NO for slot 126 (srv_page_size-3500 WSREP ::: FAIL would overwrite undo tablespace space_id, page_no pairs :::) 596 TRX_SYS_WSREP_XID_INFO TRX_SYS_WSREP_XID_MAGIC_N_FLD 600 TRX_SYS_WSREP_XID_FORMAT 604 TRX_SYS_WSREP_XID_GTRID_LEN 608 TRX_SYS_WSREP_XID_BQUAL_LEN 612 TRX_SYS_WSREP_XID_DATA (len = 128) 739 TRX_SYS_WSREP_XID_DATA_END FIXED WSREP XID info offsets for 4k page size 10.0.32-galera (srv_page_size-2500) 1596 TRX_SYS_WSREP_XID_INFO TRX_SYS_WSREP_XID_MAGIC_N_FLD 1600 TRX_SYS_WSREP_XID_FORMAT 1604 TRX_SYS_WSREP_XID_GTRID_LEN 1608 TRX_SYS_WSREP_XID_BQUAL_LEN 1612 TRX_SYS_WSREP_XID_DATA (len = 128) 1739 TRX_SYS_WSREP_XID_DATA_END (srv_page_size - 2000 MYSQL MASTER LOG) 2096 TRX_SYS_MYSQL_MASTER_LOG_INFO TRX_SYS_MYSQL_LOG_MAGIC_N_FLD 2100 TRX_SYS_MYSQL_LOG_OFFSET_HIGH 2104 TRX_SYS_MYSQL_LOG_OFFSET_LOW 2108 TRX_SYS_MYSQL_LOG_NAME (srv_page_size - 1000 MYSQL LOG) 3096 TRX_SYS_MYSQL_LOG_INFO TRX_SYS_MYSQL_LOG_MAGIC_N_FLD 3100 TRX_SYS_MYSQL_LOG_OFFSET_HIGH 3104 TRX_SYS_MYSQL_LOG_OFFSET_LOW 3108 TRX_SYS_MYSQL_LOG_NAME (srv_page_size - 200 DOUBLEWRITE) 3896 TRX_SYS_DOUBLEWRITE TRX_SYS_DOUBLEWRITE_FSEG 3906 TRX_SYS_DOUBLEWRITE_MAGIC 3910 TRX_SYS_DOUBLEWRITE_BLOCK1 3914 TRX_SYS_DOUBLEWRITE_BLOCK2 3918 TRX_SYS_DOUBLEWRITE_REPEAT 3930 TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N (srv_page_size - 8, TAILER) 4088..4096 FIL_TAILER */ #ifdef WITH_WSREP /** The offset to WSREP XID headers (used before MariaDB 10.3.5) */ #define TRX_SYS_WSREP_XID_INFO std::max(srv_page_size - 3500, 1596UL) #define TRX_SYS_WSREP_XID_MAGIC_N_FLD 0 #define TRX_SYS_WSREP_XID_MAGIC_N 0x77737265 /** XID field: formatID, gtrid_len, bqual_len, xid_data */ #define TRX_SYS_WSREP_XID_LEN (4 + 4 + 4 + XIDDATASIZE) #define TRX_SYS_WSREP_XID_FORMAT 4 #define TRX_SYS_WSREP_XID_GTRID_LEN 8 #define TRX_SYS_WSREP_XID_BQUAL_LEN 12 #define TRX_SYS_WSREP_XID_DATA 16 #endif /* WITH_WSREP*/ /** Doublewrite buffer */ /* @{ */ /** The offset of the doublewrite buffer header on the trx system header page */ #define TRX_SYS_DOUBLEWRITE (srv_page_size - 200) /*-------------------------------------------------------------*/ #define TRX_SYS_DOUBLEWRITE_FSEG 0 /*!< fseg header of the fseg containing the doublewrite buffer */ #define TRX_SYS_DOUBLEWRITE_MAGIC FSEG_HEADER_SIZE /*!< 4-byte magic number which shows if we already have created the doublewrite buffer */ #define TRX_SYS_DOUBLEWRITE_BLOCK1 (4 + FSEG_HEADER_SIZE) /*!< page number of the first page in the first sequence of 64 (= FSP_EXTENT_SIZE) consecutive pages in the doublewrite buffer */ #define TRX_SYS_DOUBLEWRITE_BLOCK2 (8 + FSEG_HEADER_SIZE) /*!< page number of the first page in the second sequence of 64 consecutive pages in the doublewrite buffer */ #define TRX_SYS_DOUBLEWRITE_REPEAT 12 /*!< we repeat TRX_SYS_DOUBLEWRITE_MAGIC, TRX_SYS_DOUBLEWRITE_BLOCK1, TRX_SYS_DOUBLEWRITE_BLOCK2 so that if the trx sys header is half-written to disk, we still may be able to recover the information */ /** If this is not yet set to TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N, we must reset the doublewrite buffer, because starting from 4.1.x the space id of a data page is stored into FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */ #define TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED (24 + FSEG_HEADER_SIZE) /*-------------------------------------------------------------*/ /** Contents of TRX_SYS_DOUBLEWRITE_MAGIC */ constexpr uint32_t TRX_SYS_DOUBLEWRITE_MAGIC_N= 536853855; /** Contents of TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED */ constexpr uint32_t TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N= 1783657386; /* @} */ trx_t* current_trx(); struct rw_trx_hash_element_t { rw_trx_hash_element_t(): trx(0) { mutex_create(LATCH_ID_RW_TRX_HASH_ELEMENT, &mutex); } ~rw_trx_hash_element_t() { mutex_free(&mutex); } trx_id_t id; /* lf_hash_init() relies on this to be first in the struct */ /** Transaction serialization number. Assigned shortly before the transaction is moved to COMMITTED_IN_MEMORY state. Initially set to TRX_ID_MAX. */ Atomic_counter no; trx_t *trx; ib_mutex_t mutex; }; /** Wrapper around LF_HASH to store set of in memory read-write transactions. */ class rw_trx_hash_t { LF_HASH hash; template using walk_action= my_bool(rw_trx_hash_element_t *element, T *action); /** Constructor callback for lock-free allocator. Object is just allocated and is not yet accessible via rw_trx_hash by concurrent threads. Object can be reused multiple times before it is freed. Every time object is being reused initializer() callback is called. */ static void rw_trx_hash_constructor(uchar *arg) { new(arg + LF_HASH_OVERHEAD) rw_trx_hash_element_t(); } /** Destructor callback for lock-free allocator. Object is about to be freed and is not accessible via rw_trx_hash by concurrent threads. */ static void rw_trx_hash_destructor(uchar *arg) { reinterpret_cast (arg + LF_HASH_OVERHEAD)->~rw_trx_hash_element_t(); } /** Destructor callback for lock-free allocator. This destructor is used at shutdown. It frees remaining transaction objects. XA PREPARED transactions may remain if they haven't been committed or rolled back. ACTIVE transactions may remain if startup was interrupted or server is running in read-only mode or for certain srv_force_recovery levels. */ static void rw_trx_hash_shutdown_destructor(uchar *arg) { rw_trx_hash_element_t *element= reinterpret_cast(arg + LF_HASH_OVERHEAD); if (trx_t *trx= element->trx) { ut_ad(trx_state_eq(trx, TRX_STATE_PREPARED) || trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED) || (trx_state_eq(trx, TRX_STATE_ACTIVE) && (!srv_was_started || srv_read_only_mode || srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO))); trx_free_at_shutdown(trx); } element->~rw_trx_hash_element_t(); } /** Initializer callback for lock-free hash. Object is not yet accessible via rw_trx_hash by concurrent threads, but is about to become such. Object id can be changed only by this callback and remains the same until all pins to this object are released. Object trx can be changed to 0 by erase() under object mutex protection, which indicates it is about to be removed from lock-free hash and become not accessible by concurrent threads. */ static void rw_trx_hash_initializer(LF_HASH *, rw_trx_hash_element_t *element, trx_t *trx) { ut_ad(element->trx == 0); element->trx= trx; element->id= trx->id; element->no= TRX_ID_MAX; trx->rw_trx_hash_element= element; } /** Gets LF_HASH pins. Pins are used to protect object from being destroyed or reused. They are normally stored in trx object for quick access. If caller doesn't have trx available, we try to get it using currnet_trx(). If caller doesn't have trx at all, temporary pins are allocated. */ LF_PINS *get_pins(trx_t *trx) { if (!trx->rw_trx_hash_pins) { trx->rw_trx_hash_pins= lf_hash_get_pins(&hash); ut_a(trx->rw_trx_hash_pins); } return trx->rw_trx_hash_pins; } template struct eliminate_duplicates_arg { trx_ids_t ids; walk_action *action; T *argument; eliminate_duplicates_arg(size_t size, walk_action *act, T *arg): action(act), argument(arg) { ids.reserve(size); } }; template static my_bool eliminate_duplicates(rw_trx_hash_element_t *element, eliminate_duplicates_arg *arg) { for (trx_ids_t::iterator it= arg->ids.begin(); it != arg->ids.end(); it++) { if (*it == element->id) return 0; } arg->ids.push_back(element->id); return arg->action(element, arg->argument); } #ifdef UNIV_DEBUG static void validate_element(trx_t *trx) { ut_ad(!trx->read_only || !trx->rsegs.m_redo.rseg); ut_ad(!trx->is_autocommit_non_locking()); /* trx->state can be anything except TRX_STATE_NOT_STARTED */ mutex_enter(&trx->mutex); ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) || trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) || trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED) || trx_state_eq(trx, TRX_STATE_PREPARED)); mutex_exit(&trx->mutex); } template struct debug_iterator_arg { walk_action *action; T *argument; }; template static my_bool debug_iterator(rw_trx_hash_element_t *element, debug_iterator_arg *arg) { mutex_enter(&element->mutex); if (element->trx) validate_element(element->trx); mutex_exit(&element->mutex); return arg->action(element, arg->argument); } #endif public: void init() { lf_hash_init(&hash, sizeof(rw_trx_hash_element_t), LF_HASH_UNIQUE, 0, sizeof(trx_id_t), 0, &my_charset_bin); hash.alloc.constructor= rw_trx_hash_constructor; hash.alloc.destructor= rw_trx_hash_destructor; hash.initializer= reinterpret_cast(rw_trx_hash_initializer); } void destroy() { hash.alloc.destructor= rw_trx_hash_shutdown_destructor; lf_hash_destroy(&hash); } /** Releases LF_HASH pins. Must be called by thread that owns trx_t object when the latter is being "detached" from thread (e.g. released to the pool by trx_t::free()). Can be called earlier if thread is expected not to use rw_trx_hash. Since pins are not allowed to be transferred to another thread, initialisation thread calls this for recovered transactions. */ void put_pins(trx_t *trx) { if (trx->rw_trx_hash_pins) { lf_hash_put_pins(trx->rw_trx_hash_pins); trx->rw_trx_hash_pins= 0; } } /** Finds trx object in lock-free hash with given id. Only ACTIVE or PREPARED trx objects may participate in hash. Nevertheless the transaction may get committed before this method returns. With do_ref_count == false the caller may dereference returned trx pointer only if lock_sys.mutex was acquired before calling find(). With do_ref_count == true caller may dereference trx even if it is not holding lock_sys.mutex. Caller is responsible for calling trx->release_reference() when it is done playing with trx. Ideally this method should get caller rw_trx_hash_pins along with trx object as a parameter, similar to insert() and erase(). However most callers lose trx early in their call chains and it is not that easy to pass them through. So we take more expensive approach: get trx through current_thd()->ha_data. Some threads don't have trx attached to THD, and at least server initialisation thread, fts_optimize_thread, srv_master_thread, dict_stats_thread, srv_monitor_thread, btr_defragment_thread don't even have THD at all. For such cases we allocate pins only for duration of search and free them immediately. This has negative performance impact and should be fixed eventually (by passing caller_trx as a parameter). Still stream of DML is more or less Ok. @return @retval 0 not found @retval pointer to trx */ trx_t *find(trx_t *caller_trx, trx_id_t trx_id, bool do_ref_count) { /* In MariaDB 10.3, purge will reset DB_TRX_ID to 0 when the history is lost. Read/write transactions will always have a nonzero trx_t::id; there the value 0 is reserved for transactions that did not write or lock anything yet. The caller should already have handled trx_id==0 specially. */ ut_ad(trx_id); ut_ad(!caller_trx || caller_trx->id != trx_id || !do_ref_count); trx_t *trx= 0; LF_PINS *pins= caller_trx ? get_pins(caller_trx) : lf_hash_get_pins(&hash); ut_a(pins); rw_trx_hash_element_t *element= reinterpret_cast (lf_hash_search(&hash, pins, reinterpret_cast(&trx_id), sizeof(trx_id_t))); if (element) { mutex_enter(&element->mutex); lf_hash_search_unpin(pins); if ((trx= element->trx)) { DBUG_ASSERT(trx_id == trx->id); ut_d(validate_element(trx)); if (do_ref_count) { /* We have an early state check here to avoid committer starvation in a wait loop for transaction references, when there's a stream of trx_sys.find() calls from other threads. The trx->state may change to COMMITTED after trx->mutex is released, and it will have to be rechecked by the caller after reacquiring the mutex. */ trx_mutex_enter(trx); const trx_state_t state= trx->state; trx_mutex_exit(trx); if (state == TRX_STATE_COMMITTED_IN_MEMORY) trx= NULL; else trx->reference(); } } mutex_exit(&element->mutex); } if (!caller_trx) lf_hash_put_pins(pins); return trx; } /** Inserts trx to lock-free hash. Object becomes accessible via rw_trx_hash. */ void insert(trx_t *trx) { ut_d(validate_element(trx)); int res= lf_hash_insert(&hash, get_pins(trx), reinterpret_cast(trx)); ut_a(res == 0); } /** Removes trx from lock-free hash. Object becomes not accessible via rw_trx_hash. But it still can be pinned by concurrent find(), which is supposed to release it immediately after it sees object trx is 0. */ void erase(trx_t *trx) { ut_d(validate_element(trx)); mutex_enter(&trx->rw_trx_hash_element->mutex); trx->rw_trx_hash_element->trx= 0; mutex_exit(&trx->rw_trx_hash_element->mutex); int res= lf_hash_delete(&hash, get_pins(trx), reinterpret_cast(&trx->id), sizeof(trx_id_t)); ut_a(res == 0); } /** Returns the number of elements in the hash. The number is exact only if hash is protected against concurrent modifications (e.g. single threaded startup or hash is protected by some mutex). Otherwise the number may be used as a hint only, because it may change even before this method returns. */ uint32_t size() { return uint32_t(lf_hash_size(&hash)); } /** Iterates the hash. @param caller_trx used to get/set pins @param action called for every element in hash @param argument opque argument passed to action May return the same element multiple times if hash is under contention. If caller doesn't like to see the same transaction multiple times, it has to call iterate_no_dups() instead. May return element with committed transaction. If caller doesn't like to see committed transactions, it has to skip those under element mutex: mutex_enter(&element->mutex); if (trx_t trx= element->trx) { // trx is protected against commit in this branch } mutex_exit(&element->mutex); May miss concurrently inserted transactions. @return @retval 0 iteration completed successfully @retval 1 iteration was interrupted (action returned 1) */ template int iterate(trx_t *caller_trx, walk_action *action, T *argument= nullptr) { LF_PINS *pins= caller_trx ? get_pins(caller_trx) : lf_hash_get_pins(&hash); ut_a(pins); #ifdef UNIV_DEBUG debug_iterator_arg debug_arg= { action, argument }; action= reinterpret_cast(debug_iterator); argument= reinterpret_cast(&debug_arg); #endif int res= lf_hash_iterate(&hash, pins, reinterpret_cast(action), const_cast(static_cast (argument))); if (!caller_trx) lf_hash_put_pins(pins); return res; } template int iterate(walk_action *action, T *argument= nullptr) { return iterate(current_trx(), action, argument); } /** Iterates the hash and eliminates duplicate elements. @sa iterate() */ template int iterate_no_dups(trx_t *caller_trx, walk_action *action, T *argument= nullptr) { eliminate_duplicates_arg arg(size() + 32, action, argument); return iterate(caller_trx, eliminate_duplicates, &arg); } template int iterate_no_dups(walk_action *action, T *argument= nullptr) { return iterate_no_dups(current_trx(), action, argument); } }; class thread_safe_trx_ilist_t { public: void create() { mutex_create(LATCH_ID_TRX_SYS, &mutex); } void close() { mutex_free(&mutex); } bool empty() const { mutex_enter(&mutex); auto result= trx_list.empty(); mutex_exit(&mutex); return result; } void push_front(trx_t &trx) { mutex_enter(&mutex); trx_list.push_front(trx); mutex_exit(&mutex); } void remove(trx_t &trx) { mutex_enter(&mutex); trx_list.remove(trx); mutex_exit(&mutex); } template void for_each(Callable &&callback) const { mutex_enter(&mutex); for (const auto &trx : trx_list) callback(trx); mutex_exit(&mutex); } template void for_each(Callable &&callback) { mutex_enter(&mutex); for (auto &trx : trx_list) callback(trx); mutex_exit(&mutex); } void freeze() const { mutex_enter(&mutex); } void unfreeze() const { mutex_exit(&mutex); } private: alignas(CACHE_LINE_SIZE) mutable TrxSysMutex mutex; alignas(CACHE_LINE_SIZE) ilist trx_list; }; /** The transaction system central memory data structure. */ class trx_sys_t { /** The smallest number not yet assigned as a transaction id or transaction number. Accessed and updated with atomic operations. */ MY_ALIGNED(CACHE_LINE_SIZE) Atomic_counter m_max_trx_id; /** Solves race conditions between register_rw() and snapshot_ids() as well as race condition between assign_new_trx_no() and snapshot_ids(). @sa register_rw() @sa assign_new_trx_no() @sa snapshot_ids() */ MY_ALIGNED(CACHE_LINE_SIZE) std::atomic m_rw_trx_hash_version; bool m_initialised; public: /** TRX_RSEG_HISTORY list length (number of committed transactions to purge) */ MY_ALIGNED(CACHE_LINE_SIZE) Atomic_counter rseg_history_len; /** List of all transactions. */ thread_safe_trx_ilist_t trx_list; MY_ALIGNED(CACHE_LINE_SIZE) /** Temporary rollback segments */ trx_rseg_t* temp_rsegs[TRX_SYS_N_RSEGS]; MY_ALIGNED(CACHE_LINE_SIZE) trx_rseg_t* rseg_array[TRX_SYS_N_RSEGS]; /*!< Pointer array to rollback segments; NULL if slot not in use; created and destroyed in single-threaded mode; not protected by any mutex, because it is read-only during multi-threaded operation */ /** Lock-free hash of in memory read-write transactions. Works faster when it is on it's own cache line (tested). */ MY_ALIGNED(CACHE_LINE_SIZE) rw_trx_hash_t rw_trx_hash; #ifdef WITH_WSREP /** Latest recovered XID during startup */ XID recovered_wsrep_xid; #endif /** Latest recovered binlog offset */ uint64_t recovered_binlog_offset; /** Latest recovered binlog file name */ char recovered_binlog_filename[TRX_SYS_MYSQL_LOG_NAME_LEN]; /** FIL_PAGE_LSN of the page with the latest recovered binlog metadata */ lsn_t recovered_binlog_lsn; /** Constructor. Some members may require late initialisation, thus we just mark object as uninitialised. Real initialisation happens in create(). */ trx_sys_t(): m_initialised(false) {} /** Returns the minimum trx id in rw trx list. This is the smallest id for which the trx can possibly be active. (But, you must look at the trx->state to find out if the minimum trx id transaction itself is active, or already committed.) @return the minimum trx id, or m_max_trx_id if the trx list is empty */ trx_id_t get_min_trx_id() { trx_id_t id= get_max_trx_id(); rw_trx_hash.iterate(get_min_trx_id_callback, &id); return id; } /** Determines the maximum transaction id. @return maximum currently allocated trx id; will be stale after the next call to trx_sys.get_new_trx_id() */ trx_id_t get_max_trx_id() { return m_max_trx_id; } /** Allocates a new transaction id. @return new, allocated trx id */ trx_id_t get_new_trx_id() { trx_id_t id= get_new_trx_id_no_refresh(); refresh_rw_trx_hash_version(); return id; } /** Allocates and assigns new transaction serialisation number. There's a gap between m_max_trx_id increment and transaction serialisation number becoming visible through rw_trx_hash. While we're in this gap concurrent thread may come and do MVCC snapshot without seeing allocated but not yet assigned serialisation number. Then at some point purge thread may clone this view. As a result it won't see newly allocated serialisation number and may remove "unnecessary" history data of this transaction from rollback segments. m_rw_trx_hash_version is intended to solve this problem. MVCC snapshot has to wait until m_max_trx_id == m_rw_trx_hash_version, which effectively means that all transaction serialisation numbers up to m_max_trx_id are available through rw_trx_hash. We rely on refresh_rw_trx_hash_version() to issue RELEASE memory barrier so that m_rw_trx_hash_version increment happens after trx->rw_trx_hash_element->no becomes visible through rw_trx_hash. @param trx transaction */ void assign_new_trx_no(trx_t *trx) { trx->rw_trx_hash_element->no= get_new_trx_id_no_refresh(); refresh_rw_trx_hash_version(); } /** Takes MVCC snapshot. To reduce malloc probablility we reserve rw_trx_hash.size() + 32 elements in ids. For details about get_rw_trx_hash_version() != get_max_trx_id() spin @sa register_rw() and @sa assign_new_trx_no(). We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash. To optimise snapshot creation rw_trx_hash.iterate() is being used instead of rw_trx_hash.iterate_no_dups(). It means that some transaction identifiers may appear multiple times in ids. @param[in,out] caller_trx used to get access to rw_trx_hash_pins @param[out] ids array to store registered transaction identifiers @param[out] max_trx_id variable to store m_max_trx_id value @param[out] mix_trx_no variable to store min(no) value */ void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id, trx_id_t *min_trx_no) { snapshot_ids_arg arg(ids); while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id()) ut_delay(1); arg.m_no= arg.m_id; ids->clear(); ids->reserve(rw_trx_hash.size() + 32); rw_trx_hash.iterate(caller_trx, copy_one_id, &arg); *max_trx_id= arg.m_id; *min_trx_no= arg.m_no; } /** Initialiser for m_max_trx_id and m_rw_trx_hash_version. */ void init_max_trx_id(trx_id_t value) { m_max_trx_id= value; m_rw_trx_hash_version.store(value, std::memory_order_relaxed); } bool is_initialised() { return m_initialised; } /** Initialise the transaction subsystem. */ void create(); /** Close the transaction subsystem on shutdown. */ void close(); /** @return total number of active (non-prepared) transactions */ ulint any_active_transactions(); /** Registers read-write transaction. Transaction becomes visible to MVCC. There's a gap between m_max_trx_id increment and transaction becoming visible through rw_trx_hash. While we're in this gap concurrent thread may come and do MVCC snapshot. As a result concurrent read view will be able to observe records owned by this transaction even before it was committed. m_rw_trx_hash_version is intended to solve this problem. MVCC snapshot has to wait until m_max_trx_id == m_rw_trx_hash_version, which effectively means that all transactions up to m_max_trx_id are available through rw_trx_hash. We rely on refresh_rw_trx_hash_version() to issue RELEASE memory barrier so that m_rw_trx_hash_version increment happens after transaction becomes visible through rw_trx_hash. */ void register_rw(trx_t *trx) { trx->id= get_new_trx_id_no_refresh(); rw_trx_hash.insert(trx); refresh_rw_trx_hash_version(); } /** Deregisters read-write transaction. Transaction is removed from rw_trx_hash, which releases all implicit locks. MVCC snapshot won't see this transaction anymore. */ void deregister_rw(trx_t *trx) { rw_trx_hash.erase(trx); } bool is_registered(trx_t *caller_trx, trx_id_t id) { return id && find(caller_trx, id, false); } trx_t *find(trx_t *caller_trx, trx_id_t id, bool do_ref_count= true) { return rw_trx_hash.find(caller_trx, id, do_ref_count); } /** Registers transaction in trx_sys. @param trx transaction */ void register_trx(trx_t *trx) { trx_list.push_front(*trx); } /** Deregisters transaction in trx_sys. @param trx transaction */ void deregister_trx(trx_t *trx) { trx_list.remove(*trx); } /** Clones the oldest view and stores it in view. No need to call ReadView::close(). The caller owns the view that is passed in. This function is called by purge thread to determine whether it should purge the delete marked record or not. */ void clone_oldest_view(ReadViewBase *view) const; /** @return the number of active views */ size_t view_count() const { size_t count= 0; trx_list.for_each([&count](const trx_t &trx) { if (trx.read_view.is_open()) ++count; }); return count; } private: static my_bool get_min_trx_id_callback(rw_trx_hash_element_t *element, trx_id_t *id) { if (element->id < *id) { mutex_enter(&element->mutex); /* We don't care about read-only transactions here. */ if (element->trx && element->trx->rsegs.m_redo.rseg) *id= element->id; mutex_exit(&element->mutex); } return 0; } struct snapshot_ids_arg { snapshot_ids_arg(trx_ids_t *ids): m_ids(ids) {} trx_ids_t *m_ids; trx_id_t m_id; trx_id_t m_no; }; static my_bool copy_one_id(rw_trx_hash_element_t *element, snapshot_ids_arg *arg) { if (element->id < arg->m_id) { trx_id_t no= element->no; arg->m_ids->push_back(element->id); if (no < arg->m_no) arg->m_no= no; } return 0; } /** Getter for m_rw_trx_hash_version, must issue ACQUIRE memory barrier. */ trx_id_t get_rw_trx_hash_version() { return m_rw_trx_hash_version.load(std::memory_order_acquire); } /** Increments m_rw_trx_hash_version, must issue RELEASE memory barrier. */ void refresh_rw_trx_hash_version() { m_rw_trx_hash_version.fetch_add(1, std::memory_order_release); } /** Allocates new transaction id without refreshing rw_trx_hash version. This method is extracted for exclusive use by register_rw() and assign_new_trx_no() where new id must be allocated atomically with payload of these methods from MVCC snapshot point of view. @sa get_new_trx_id() @sa assign_new_trx_no() @return new transaction id */ trx_id_t get_new_trx_id_no_refresh() { return m_max_trx_id++; } }; /** The transaction system */ extern trx_sys_t trx_sys; #endif