diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-18 13:22:53 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-18 13:22:53 +0000 |
commit | 347c164c35eddab388009470e6848cb361ac93f8 (patch) | |
tree | 2c0c44eac690f510bb0a35b2a13b36d606b77b6b /storage/innobase/mtr/mtr0mtr.cc | |
parent | Releasing progress-linux version 1:10.11.7-4~progress7.99u1. (diff) | |
download | mariadb-347c164c35eddab388009470e6848cb361ac93f8.tar.xz mariadb-347c164c35eddab388009470e6848cb361ac93f8.zip |
Merging upstream version 1:10.11.8.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/innobase/mtr/mtr0mtr.cc')
-rw-r--r-- | storage/innobase/mtr/mtr0mtr.cc | 525 |
1 files changed, 326 insertions, 199 deletions
diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc index 01641f74..74d3adb2 100644 --- a/storage/innobase/mtr/mtr0mtr.cc +++ b/storage/innobase/mtr/mtr0mtr.cc @@ -37,6 +37,31 @@ Created 11/26/1995 Heikki Tuuri #include "srv0start.h" #include "log.h" #include "mariadb_stats.h" +#include "my_cpu.h" + +#ifdef HAVE_PMEM +void (*mtr_t::commit_logger)(mtr_t *, std::pair<lsn_t,page_flush_ahead>); +#endif +std::pair<lsn_t,mtr_t::page_flush_ahead> (*mtr_t::finisher)(mtr_t *, size_t); +unsigned mtr_t::spin_wait_delay; + +void mtr_t::finisher_update() +{ + ut_ad(log_sys.latch_have_wr()); +#ifdef HAVE_PMEM + if (log_sys.is_pmem()) + { + commit_logger= mtr_t::commit_log<true>; + finisher= spin_wait_delay + ? mtr_t::finish_writer<true,true> : mtr_t::finish_writer<false,true>; + return; + } + commit_logger= mtr_t::commit_log<false>; +#endif + finisher= + (spin_wait_delay + ? mtr_t::finish_writer<true,false> : mtr_t::finish_writer<false,false>); +} void mtr_memo_slot_t::release() const { @@ -82,9 +107,7 @@ void mtr_memo_slot_t::release() const inline buf_page_t *buf_pool_t::prepare_insert_into_flush_list(lsn_t lsn) noexcept { -#ifndef SUX_LOCK_GENERIC - ut_ad(recv_recovery_is_on() || log_sys.latch.is_locked()); -#endif + ut_ad(recv_recovery_is_on() || log_sys.latch_have_any()); ut_ad(lsn >= log_sys.last_checkpoint_lsn); mysql_mutex_assert_owner(&flush_list_mutex); static_assert(log_t::FIRST_LSN >= 2, "compatibility"); @@ -234,7 +257,14 @@ static void insert_imported(buf_block_t *block) if (block->page.oldest_modification() <= 1) { log_sys.latch.rd_lock(SRW_LOCK_CALL); - const lsn_t lsn= log_sys.last_checkpoint_lsn; + /* For unlogged mtrs (MTR_LOG_NO_REDO), we use the current system LSN. The + mtr that generated the LSN is either already committed or in mtr_t::commit. + Shared latch and relaxed atomics should be fine here as it is guaranteed + that both the current mtr and the mtr that generated the LSN would have + added the dirty pages to flush list before we access the minimum LSN during + checkpoint. log_checkpoint_low() acquires exclusive log_sys.latch before + commencing. */ + const lsn_t lsn= log_sys.get_lsn(); mysql_mutex_lock(&buf_pool.flush_list_mutex); buf_pool.insert_into_flush_list (buf_pool.prepare_insert_into_flush_list(lsn), block, lsn); @@ -310,12 +340,9 @@ void mtr_t::release() inline lsn_t log_t::get_write_target() const { -#ifndef SUX_LOCK_GENERIC - ut_ad(latch.is_locked()); -#endif - if (UNIV_LIKELY(buf_free < max_buf_free)) + ut_ad(latch_have_any()); + if (UNIV_LIKELY(buf_free_ok())) return 0; - ut_ad(!is_pmem()); /* The LSN corresponding to the end of buf is write_lsn - (first_lsn & 4095) + buf_free, but we use simpler arithmetics to return a smaller write target in @@ -324,151 +351,161 @@ inline lsn_t log_t::get_write_target() const return write_lsn + max_buf_free / 2; } -/** Commit a mini-transaction. */ -void mtr_t::commit() +template<bool pmem> +void mtr_t::commit_log(mtr_t *mtr, std::pair<lsn_t,page_flush_ahead> lsns) { - ut_ad(is_active()); - ut_ad(!is_inside_ibuf()); - - /* This is a dirty read, for debugging. */ - ut_ad(!m_modifications || !recv_no_log_write); - ut_ad(!m_modifications || m_log_mode != MTR_LOG_NONE); - ut_ad(!m_latch_ex); + size_t modified= 0; + const lsn_t write_lsn= pmem ? 0 : log_sys.get_write_target(); - if (m_modifications && (m_log_mode == MTR_LOG_NO_REDO || !m_log.empty())) + if (mtr->m_made_dirty) { - if (UNIV_UNLIKELY(!is_logged())) + auto it= mtr->m_memo.rbegin(); + + mysql_mutex_lock(&buf_pool.flush_list_mutex); + + buf_page_t *const prev= + buf_pool.prepare_insert_into_flush_list(lsns.first); + + while (it != mtr->m_memo.rend()) { - release_unlogged(); - goto func_exit; + const mtr_memo_slot_t &slot= *it++; + if (slot.type & MTR_MEMO_MODIFY) + { + ut_ad(slot.type == MTR_MEMO_PAGE_X_MODIFY || + slot.type == MTR_MEMO_PAGE_SX_MODIFY); + modified++; + buf_block_t *b= static_cast<buf_block_t*>(slot.object); + ut_ad(b->page.id() < end_page_id); + ut_d(const auto s= b->page.state()); + ut_ad(s > buf_page_t::FREED); + ut_ad(s < buf_page_t::READ_FIX); + ut_ad(mach_read_from_8(b->page.frame + FIL_PAGE_LSN) <= + mtr->m_commit_lsn); + mach_write_to_8(b->page.frame + FIL_PAGE_LSN, mtr->m_commit_lsn); + if (UNIV_LIKELY_NULL(b->page.zip.data)) + memcpy_aligned<8>(FIL_PAGE_LSN + b->page.zip.data, + FIL_PAGE_LSN + b->page.frame, 8); + buf_pool.insert_into_flush_list(prev, b, lsns.first); + } } - ut_ad(!srv_read_only_mode); - std::pair<lsn_t,page_flush_ahead> lsns{do_write()}; - process_freed_pages(); - size_t modified= 0; - const lsn_t write_lsn= log_sys.get_write_target(); + ut_ad(modified); + buf_pool.flush_list_requests+= modified; + buf_pool.page_cleaner_wakeup(); + mysql_mutex_unlock(&buf_pool.flush_list_mutex); - if (m_made_dirty) + if (mtr->m_latch_ex) { - auto it= m_memo.rbegin(); - - mysql_mutex_lock(&buf_pool.flush_list_mutex); + log_sys.latch.wr_unlock(); + mtr->m_latch_ex= false; + } + else + log_sys.latch.rd_unlock(); - buf_page_t *const prev= - buf_pool.prepare_insert_into_flush_list(lsns.first); + mtr->release(); + } + else + { + if (mtr->m_latch_ex) + { + log_sys.latch.wr_unlock(); + mtr->m_latch_ex= false; + } + else + log_sys.latch.rd_unlock(); - while (it != m_memo.rend()) - { - const mtr_memo_slot_t &slot= *it++; + for (auto it= mtr->m_memo.rbegin(); it != mtr->m_memo.rend(); ) + { + const mtr_memo_slot_t &slot= *it++; + ut_ad(slot.object); + switch (slot.type) { + case MTR_MEMO_S_LOCK: + static_cast<index_lock*>(slot.object)->s_unlock(); + break; + case MTR_MEMO_SPACE_X_LOCK: + static_cast<fil_space_t*>(slot.object)->set_committed_size(); + static_cast<fil_space_t*>(slot.object)->x_unlock(); + break; + case MTR_MEMO_X_LOCK: + case MTR_MEMO_SX_LOCK: + static_cast<index_lock*>(slot.object)-> + u_or_x_unlock(slot.type == MTR_MEMO_SX_LOCK); + break; + default: + buf_page_t *bpage= static_cast<buf_page_t*>(slot.object); + ut_d(const auto s=) + bpage->unfix(); if (slot.type & MTR_MEMO_MODIFY) { ut_ad(slot.type == MTR_MEMO_PAGE_X_MODIFY || slot.type == MTR_MEMO_PAGE_SX_MODIFY); - modified++; - buf_block_t *b= static_cast<buf_block_t*>(slot.object); - ut_ad(b->page.id() < end_page_id); - ut_d(const auto s= b->page.state()); - ut_ad(s > buf_page_t::FREED); + ut_ad(bpage->oldest_modification() > 1); + ut_ad(bpage->oldest_modification() < mtr->m_commit_lsn); + ut_ad(bpage->id() < end_page_id); + ut_ad(s >= buf_page_t::FREED); ut_ad(s < buf_page_t::READ_FIX); - ut_ad(mach_read_from_8(b->page.frame + FIL_PAGE_LSN) <= - m_commit_lsn); - mach_write_to_8(b->page.frame + FIL_PAGE_LSN, m_commit_lsn); - if (UNIV_LIKELY_NULL(b->page.zip.data)) - memcpy_aligned<8>(FIL_PAGE_LSN + b->page.zip.data, - FIL_PAGE_LSN + b->page.frame, 8); - buf_pool.insert_into_flush_list(prev, b, lsns.first); + ut_ad(mach_read_from_8(bpage->frame + FIL_PAGE_LSN) <= + mtr->m_commit_lsn); + mach_write_to_8(bpage->frame + FIL_PAGE_LSN, mtr->m_commit_lsn); + if (UNIV_LIKELY_NULL(bpage->zip.data)) + memcpy_aligned<8>(FIL_PAGE_LSN + bpage->zip.data, + FIL_PAGE_LSN + bpage->frame, 8); + modified++; + } + switch (auto latch= slot.type & ~MTR_MEMO_MODIFY) { + case MTR_MEMO_PAGE_S_FIX: + bpage->lock.s_unlock(); + continue; + case MTR_MEMO_PAGE_SX_FIX: + case MTR_MEMO_PAGE_X_FIX: + bpage->lock.u_or_x_unlock(latch == MTR_MEMO_PAGE_SX_FIX); + continue; + default: + ut_ad(latch == MTR_MEMO_BUF_FIX); } } + } - ut_ad(modified); - buf_pool.flush_list_requests+= modified; - buf_pool.page_cleaner_wakeup(); - mysql_mutex_unlock(&buf_pool.flush_list_mutex); + buf_pool.add_flush_list_requests(modified); + mtr->m_memo.clear(); + } - if (m_latch_ex) - { - log_sys.latch.wr_unlock(); - m_latch_ex= false; - } - else - log_sys.latch.rd_unlock(); + mariadb_increment_pages_updated(modified); - release(); - } - else - { - if (m_latch_ex) - { - log_sys.latch.wr_unlock(); - m_latch_ex= false; - } - else - log_sys.latch.rd_unlock(); + if (UNIV_UNLIKELY(lsns.second != PAGE_FLUSH_NO)) + buf_flush_ahead(mtr->m_commit_lsn, lsns.second == PAGE_FLUSH_SYNC); - for (auto it= m_memo.rbegin(); it != m_memo.rend(); ) - { - const mtr_memo_slot_t &slot= *it++; - ut_ad(slot.object); - switch (slot.type) { - case MTR_MEMO_S_LOCK: - static_cast<index_lock*>(slot.object)->s_unlock(); - break; - case MTR_MEMO_SPACE_X_LOCK: - static_cast<fil_space_t*>(slot.object)->set_committed_size(); - static_cast<fil_space_t*>(slot.object)->x_unlock(); - break; - case MTR_MEMO_X_LOCK: - case MTR_MEMO_SX_LOCK: - static_cast<index_lock*>(slot.object)-> - u_or_x_unlock(slot.type == MTR_MEMO_SX_LOCK); - break; - default: - buf_page_t *bpage= static_cast<buf_page_t*>(slot.object); - ut_d(const auto s=) - bpage->unfix(); - if (slot.type & MTR_MEMO_MODIFY) - { - ut_ad(slot.type == MTR_MEMO_PAGE_X_MODIFY || - slot.type == MTR_MEMO_PAGE_SX_MODIFY); - ut_ad(bpage->oldest_modification() > 1); - ut_ad(bpage->oldest_modification() < m_commit_lsn); - ut_ad(bpage->id() < end_page_id); - ut_ad(s >= buf_page_t::FREED); - ut_ad(s < buf_page_t::READ_FIX); - ut_ad(mach_read_from_8(bpage->frame + FIL_PAGE_LSN) <= - m_commit_lsn); - mach_write_to_8(bpage->frame + FIL_PAGE_LSN, m_commit_lsn); - if (UNIV_LIKELY_NULL(bpage->zip.data)) - memcpy_aligned<8>(FIL_PAGE_LSN + bpage->zip.data, - FIL_PAGE_LSN + bpage->frame, 8); - modified++; - } - switch (auto latch= slot.type & ~MTR_MEMO_MODIFY) { - case MTR_MEMO_PAGE_S_FIX: - bpage->lock.s_unlock(); - continue; - case MTR_MEMO_PAGE_SX_FIX: - case MTR_MEMO_PAGE_X_FIX: - bpage->lock.u_or_x_unlock(latch == MTR_MEMO_PAGE_SX_FIX); - continue; - default: - ut_ad(latch == MTR_MEMO_BUF_FIX); - } - } - } + if (!pmem && UNIV_UNLIKELY(write_lsn != 0)) + log_write_up_to(write_lsn, false); +} - buf_pool.add_flush_list_requests(modified); - m_memo.clear(); - } +/** Commit a mini-transaction. */ +void mtr_t::commit() +{ + ut_ad(is_active()); + ut_ad(!is_inside_ibuf()); - mariadb_increment_pages_updated(modified); + /* This is a dirty read, for debugging. */ + ut_ad(!m_modifications || !recv_no_log_write); + ut_ad(!m_modifications || m_log_mode != MTR_LOG_NONE); + ut_ad(!m_latch_ex); - if (UNIV_UNLIKELY(lsns.second != PAGE_FLUSH_NO)) - buf_flush_ahead(m_commit_lsn, lsns.second == PAGE_FLUSH_SYNC); + if (m_modifications && (m_log_mode == MTR_LOG_NO_REDO || !m_log.empty())) + { + if (UNIV_UNLIKELY(!is_logged())) + { + release_unlogged(); + goto func_exit; + } - if (UNIV_UNLIKELY(write_lsn != 0)) - log_write_up_to(write_lsn, false); + ut_ad(!srv_read_only_mode); + std::pair<lsn_t,page_flush_ahead> lsns{do_write()}; + process_freed_pages(); +#ifdef HAVE_PMEM + commit_logger(this, lsns); +#else + commit_log<false>(this, lsns); +#endif } else { @@ -513,10 +550,8 @@ void mtr_t::rollback_to_savepoint(ulint begin, ulint end) /** Set create_lsn. */ inline void fil_space_t::set_create_lsn(lsn_t lsn) { -#ifndef SUX_LOCK_GENERIC /* Concurrent log_checkpoint_low() must be impossible. */ - ut_ad(latch.is_write_locked()); -#endif + ut_ad(latch.have_wr()); create_lsn= lsn; } @@ -529,7 +564,6 @@ void mtr_t::commit_shrink(fil_space_t &space, uint32_t size) ut_ad(!is_inside_ibuf()); ut_ad(!high_level_read_only); ut_ad(m_modifications); - ut_ad(m_made_dirty); ut_ad(!m_memo.empty()); ut_ad(!recv_recovery_is_on()); ut_ad(m_log_mode == MTR_LOG_ALL); @@ -554,9 +588,7 @@ void mtr_t::commit_shrink(fil_space_t &space, uint32_t size) /* Durably write the reduced FSP_SIZE before truncating the data file. */ log_write_and_flush(); -#ifndef SUX_LOCK_GENERIC - ut_ad(log_sys.latch.is_write_locked()); -#endif + ut_ad(log_sys.latch_have_wr()); os_file_truncate(space.chain.start->name, space.chain.start->handle, os_offset_t{size} << srv_page_size_shift, true); @@ -713,9 +745,7 @@ This is to be used at log_checkpoint(). @return current LSN */ ATTRIBUTE_COLD lsn_t mtr_t::commit_files(lsn_t checkpoint_lsn) { -#ifndef SUX_LOCK_GENERIC - ut_ad(log_sys.latch.is_write_locked()); -#endif + ut_ad(log_sys.latch_have_wr()); ut_ad(is_active()); ut_ad(!is_inside_ibuf()); ut_ad(m_log_mode == MTR_LOG_ALL); @@ -870,13 +900,111 @@ ATTRIBUTE_COLD static void log_overwrite_warning(lsn_t lsn) ? ". Shutdown is in progress" : ""); } -/** Wait in append_prepare() for buffer to become available -@param lsn log sequence number to write up to -@param ex whether log_sys.latch is exclusively locked */ -ATTRIBUTE_COLD void log_t::append_prepare_wait(lsn_t lsn, bool ex) noexcept +static ATTRIBUTE_NOINLINE void lsn_delay(size_t delay, size_t mult) noexcept +{ + delay*= mult * 2; // GCC 13.2.0 -O2 targeting AMD64 wants to unroll twice + HMT_low(); + do + MY_RELAX_CPU(); + while (--delay); + HMT_medium(); +} + +#if defined __clang_major__ && __clang_major__ < 10 +/* Only clang-10 introduced support for asm goto */ +#elif defined __APPLE__ +/* At least some versions of Apple Xcode do not support asm goto */ +#elif defined __GNUC__ && (defined __i386__ || defined __x86_64__) +# if SIZEOF_SIZE_T == 8 +# define LOCK_TSET \ + __asm__ goto("lock btsq $63, %0\n\t" "jnc %l1" \ + : : "m"(buf_free) : "cc", "memory" : got) +# else +# define LOCK_TSET \ + __asm__ goto("lock btsl $31, %0\n\t" "jnc %l1" \ + : : "m"(buf_free) : "cc", "memory" : got) +# endif +#elif defined _MSC_VER && (defined _M_IX86 || defined _M_X64) +# if SIZEOF_SIZE_T == 8 +# define LOCK_TSET \ + if (!_interlockedbittestandset64 \ + (reinterpret_cast<volatile LONG64*>(&buf_free), 63)) return +# else +# define LOCK_TSET \ + if (!_interlockedbittestandset \ + (reinterpret_cast<volatile long*>(&buf_free), 31)) return +# endif +#endif + +#ifdef LOCK_TSET +ATTRIBUTE_NOINLINE +void log_t::lsn_lock_bts() noexcept +{ + LOCK_TSET; + { + const size_t m= mtr_t::spin_wait_delay; + constexpr size_t DELAY= 10, MAX_ITERATIONS= 10; + for (size_t delay_count= DELAY, delay_iterations= 1;; + lsn_delay(delay_iterations, m)) + { + if (!(buf_free.load(std::memory_order_relaxed) & buf_free_LOCK)) + LOCK_TSET; + if (!delay_count); + else if (delay_iterations < MAX_ITERATIONS) + delay_count= DELAY, delay_iterations++; + else + delay_count--; + } + } + +# ifdef __GNUC__ + got: + return; +# endif +} + +inline +#else +ATTRIBUTE_NOINLINE +#endif +size_t log_t::lock_lsn() noexcept +{ +#ifdef LOCK_TSET + lsn_lock_bts(); + return ~buf_free_LOCK & buf_free.load(std::memory_order_relaxed); +# undef LOCK_TSET +#else + size_t b= buf_free.fetch_or(buf_free_LOCK, std::memory_order_acquire); + if (b & buf_free_LOCK) + { + const size_t m= mtr_t::spin_wait_delay; + constexpr size_t DELAY= 10, MAX_ITERATIONS= 10; + for (size_t delay_count= DELAY, delay_iterations= 1; + ((b= buf_free.load(std::memory_order_relaxed)) & buf_free_LOCK) || + (buf_free_LOCK & (b= buf_free.fetch_or(buf_free_LOCK, + std::memory_order_acquire))); + lsn_delay(delay_iterations, m)) + if (!delay_count); + else if (delay_iterations < MAX_ITERATIONS) + delay_count= DELAY, delay_iterations++; + else + delay_count--; + } + return b; +#endif +} + +template<bool spin> +ATTRIBUTE_COLD size_t log_t::append_prepare_wait(size_t b, bool ex, lsn_t lsn) + noexcept { waits++; - unlock_lsn(); + ut_ad(buf_free.load(std::memory_order_relaxed) == + (spin ? (b | buf_free_LOCK) : b)); + if (spin) + buf_free.store(b, std::memory_order_release); + else + lsn_lock.wr_unlock(); if (ex) latch.wr_unlock(); @@ -890,51 +1018,57 @@ ATTRIBUTE_COLD void log_t::append_prepare_wait(lsn_t lsn, bool ex) noexcept else latch.rd_lock(SRW_LOCK_CALL); - lock_lsn(); + if (spin) + return lock_lsn(); + + lsn_lock.wr_lock(); + return buf_free.load(std::memory_order_relaxed); } /** Reserve space in the log buffer for appending data. +@tparam spin whether to use the spin-only lock_lsn() @tparam pmem log_sys.is_pmem() @param size total length of the data to append(), in bytes @param ex whether log_sys.latch is exclusively locked @return the start LSN and the buffer position for append() */ -template<bool pmem> +template<bool spin,bool pmem> inline std::pair<lsn_t,byte*> log_t::append_prepare(size_t size, bool ex) noexcept { -#ifndef SUX_LOCK_GENERIC - ut_ad(latch.is_locked()); -# ifndef _WIN32 // there is no accurate is_write_locked() on SRWLOCK - ut_ad(ex == latch.is_write_locked()); -# endif -#endif + ut_ad(ex ? latch_have_wr() : latch_have_rd()); ut_ad(pmem == is_pmem()); - lock_lsn(); + if (!spin) + lsn_lock.wr_lock(); + size_t b{spin ? lock_lsn() : buf_free.load(std::memory_order_relaxed)}; write_to_buf++; const lsn_t l{lsn.load(std::memory_order_relaxed)}, end_lsn{l + size}; - size_t b{buf_free}; if (UNIV_UNLIKELY(pmem ? (end_lsn - get_flushed_lsn(std::memory_order_relaxed)) > capacity() : b + size >= buf_size)) - { - append_prepare_wait(l, ex); - b= buf_free; - } + b= append_prepare_wait<spin>(b, ex, l); - lsn.store(end_lsn, std::memory_order_relaxed); size_t new_buf_free= b + size; if (pmem && new_buf_free >= file_size) new_buf_free-= size_t(capacity()); - buf_free= new_buf_free; - unlock_lsn(); + + lsn.store(end_lsn, std::memory_order_relaxed); if (UNIV_UNLIKELY(end_lsn >= last_checkpoint_lsn + log_capacity)) - set_check_for_checkpoint(); + set_check_for_checkpoint(true); + + byte *our_buf= buf; + if (spin) + buf_free.store(new_buf_free, std::memory_order_release); + else + { + buf_free.store(new_buf_free, std::memory_order_relaxed); + lsn_lock.wr_unlock(); + } - return {l, &buf[b]}; + return {l, our_buf + b}; } /** Finish appending data to the log. @@ -942,9 +1076,7 @@ std::pair<lsn_t,byte*> log_t::append_prepare(size_t size, bool ex) noexcept @return whether buf_flush_ahead() will have to be invoked */ static mtr_t::page_flush_ahead log_close(lsn_t lsn) noexcept { -#ifndef SUX_LOCK_GENERIC - ut_ad(log_sys.latch.is_locked()); -#endif + ut_ad(log_sys.latch_have_any()); const lsn_t checkpoint_age= lsn - log_sys.last_checkpoint_lsn; @@ -1009,9 +1141,7 @@ std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::do_write() ut_ad(!recv_no_log_write); ut_ad(is_logged()); ut_ad(m_log.size()); -#ifndef SUX_LOCK_GENERIC - ut_ad(!m_latch_ex || log_sys.latch.is_write_locked()); -#endif + ut_ad(!m_latch_ex || log_sys.latch_have_wr()); #ifndef DBUG_OFF do @@ -1069,9 +1199,7 @@ func_exit: inline void log_t::resize_write(lsn_t lsn, const byte *end, size_t len, size_t seq) noexcept { -#ifndef SUX_LOCK_GENERIC - ut_ad(latch.is_locked()); -#endif + ut_ad(latch_have_any()); if (UNIV_LIKELY_NULL(resize_buf)) { @@ -1176,50 +1304,47 @@ inline void log_t::resize_write(lsn_t lsn, const byte *end, size_t len, } } +template<bool spin,bool pmem> std::pair<lsn_t,mtr_t::page_flush_ahead> -mtr_t::finish_write(size_t len) +mtr_t::finish_writer(mtr_t *mtr, size_t len) { + ut_ad(log_sys.is_latest()); ut_ad(!recv_no_log_write); - ut_ad(is_logged()); -#ifndef SUX_LOCK_GENERIC -# ifndef _WIN32 // there is no accurate is_write_locked() on SRWLOCK - ut_ad(m_latch_ex == log_sys.latch.is_write_locked()); -# endif -#endif + ut_ad(mtr->is_logged()); + ut_ad(mtr->m_latch_ex ? log_sys.latch_have_wr() : log_sys.latch_have_rd()); - const size_t size{m_commit_lsn ? 5U + 8U : 5U}; - std::pair<lsn_t, byte*> start; + const size_t size{mtr->m_commit_lsn ? 5U + 8U : 5U}; + std::pair<lsn_t, byte*> start= + log_sys.append_prepare<spin,pmem>(len, mtr->m_latch_ex); - if (!log_sys.is_pmem()) + if (!pmem) { - start= log_sys.append_prepare<false>(len, m_latch_ex); - m_log.for_each_block([&start](const mtr_buf_t::block_t *b) + mtr->m_log.for_each_block([&start](const mtr_buf_t::block_t *b) { log_sys.append(start.second, b->begin(), b->used()); return true; }); #ifdef HAVE_PMEM write_trailer: #endif *start.second++= log_sys.get_sequence_bit(start.first + len - size); - if (m_commit_lsn) + if (mtr->m_commit_lsn) { - mach_write_to_8(start.second, m_commit_lsn); - m_crc= my_crc32c(m_crc, start.second, 8); + mach_write_to_8(start.second, mtr->m_commit_lsn); + mtr->m_crc= my_crc32c(mtr->m_crc, start.second, 8); start.second+= 8; } - mach_write_to_4(start.second, m_crc); + mach_write_to_4(start.second, mtr->m_crc); start.second+= 4; } #ifdef HAVE_PMEM else { - start= log_sys.append_prepare<true>(len, m_latch_ex); if (UNIV_LIKELY(start.second + len <= &log_sys.buf[log_sys.file_size])) { - m_log.for_each_block([&start](const mtr_buf_t::block_t *b) + mtr->m_log.for_each_block([&start](const mtr_buf_t::block_t *b) { log_sys.append(start.second, b->begin(), b->used()); return true; }); goto write_trailer; } - m_log.for_each_block([&start](const mtr_buf_t::block_t *b) + mtr->m_log.for_each_block([&start](const mtr_buf_t::block_t *b) { size_t size{b->used()}; const size_t size_left(&log_sys.buf[log_sys.file_size] - start.second); @@ -1242,14 +1367,14 @@ mtr_t::finish_write(size_t len) byte tail[5 + 8]; tail[0]= log_sys.get_sequence_bit(start.first + len - size); - if (m_commit_lsn) + if (mtr->m_commit_lsn) { - mach_write_to_8(tail + 1, m_commit_lsn); - m_crc= my_crc32c(m_crc, tail + 1, 8); - mach_write_to_4(tail + 9, m_crc); + mach_write_to_8(tail + 1, mtr->m_commit_lsn); + mtr->m_crc= my_crc32c(mtr->m_crc, tail + 1, 8); + mach_write_to_4(tail + 9, mtr->m_crc); } else - mach_write_to_4(tail + 1, m_crc); + mach_write_to_4(tail + 1, mtr->m_crc); ::memcpy(start.second, tail, size_left); ::memcpy(log_sys.buf + log_sys.START_OFFSET, tail + size_left, @@ -1258,12 +1383,14 @@ mtr_t::finish_write(size_t len) ((size >= size_left) ? log_sys.START_OFFSET : log_sys.file_size) + (size - size_left); } +#else + static_assert(!pmem, ""); #endif log_sys.resize_write(start.first, start.second, len, size); - m_commit_lsn= start.first + len; - return {start.first, log_close(m_commit_lsn)}; + mtr->m_commit_lsn= start.first + len; + return {start.first, log_close(mtr->m_commit_lsn)}; } bool mtr_t::have_x_latch(const buf_block_t &block) const @@ -1385,7 +1512,7 @@ void mtr_t::upgrade_buffer_fix(ulint savepoint, rw_lock_type_t rw_latch) ut_ad(slot.type == MTR_MEMO_BUF_FIX); buf_block_t *block= static_cast<buf_block_t*>(slot.object); ut_d(const auto state= block->page.state()); - ut_ad(state > buf_page_t::UNFIXED); + ut_ad(state > buf_page_t::FREED); ut_ad(state > buf_page_t::WRITE_FIX || state < buf_page_t::READ_FIX); static_assert(int{MTR_MEMO_PAGE_S_FIX} == int{RW_S_LATCH}, ""); static_assert(int{MTR_MEMO_PAGE_X_FIX} == int{RW_X_LATCH}, ""); |