summaryrefslogtreecommitdiffstats
path: root/storage/innobase/log/log0sync.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/log/log0sync.cc')
-rw-r--r--storage/innobase/log/log0sync.cc404
1 files changed, 404 insertions, 0 deletions
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
new file mode 100644
index 00000000..6b14d1d3
--- /dev/null
+++ b/storage/innobase/log/log0sync.cc
@@ -0,0 +1,404 @@
+/*****************************************************************************
+Copyright (c) 2020 MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/*
+The group commit synchronization used in log_write_up_to()
+works as follows
+
+For simplicity, lets consider only write operation,synchronozation of
+flush operation works the same.
+
+Rules of the game
+
+A thread enters log_write_up_to() with lsn of the current transaction
+1. If last written lsn is greater than wait lsn (another thread already
+ wrote the log buffer),then there is no need to do anything.
+2. If no other thread is currently writing, write the log buffer,
+ and update last written lsn.
+3. Otherwise, wait, and go to step 1.
+
+Synchronization can be done in different ways, e.g
+
+a) Simple mutex locking the entire check and write operation
+Disadvantage that threads that could continue after updating
+last written lsn, still wait.
+
+b) Spinlock, with periodic checks for last written lsn.
+Fixes a) but burns CPU unnecessary.
+
+c) Mutex / condition variable combo.
+
+Condtion variable notifies (broadcast) all waiters, whenever
+last written lsn is changed.
+
+Has a disadvantage of many suprious wakeups, stress on OS scheduler,
+and mutex contention.
+
+d) Something else.
+Make use of the waiter's lsn parameter, and only wakeup "right" waiting
+threads.
+
+We chose d). Even if implementation is more complicated than alternatves
+due to the need to maintain list of waiters, it provides the best performance.
+
+See group_commit_lock implementation for details.
+
+Note that if write operation is very fast, a) or b) can be fine as alternative.
+*/
+#ifdef _WIN32
+#include <windows.h>
+#endif
+
+#ifdef __linux__
+#include <linux/futex.h>
+#include <sys/syscall.h>
+#endif
+
+#include <atomic>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <my_cpu.h>
+
+#include <log0types.h>
+#include "log0sync.h"
+#include <mysql/service_thd_wait.h>
+#include <sql_class.h>
+/**
+ Helper class , used in group commit lock.
+
+ Binary semaphore, or (same thing), an auto-reset event
+ Has state (signalled or not), and provides 2 operations.
+ wait() and wake()
+
+ The implementation uses efficient locking primitives on Linux and Windows.
+ Or, mutex/condition combo elsewhere.
+*/
+
+class binary_semaphore
+{
+public:
+ /**Wait until semaphore becomes signalled, and atomically reset the state
+ to non-signalled*/
+ void wait();
+ /** signals the semaphore */
+ void wake();
+
+private:
+#if defined(__linux__) || defined (_WIN32)
+ std::atomic<int> m_signalled;
+ static constexpr std::memory_order mem_order= std::memory_order_acq_rel;
+public:
+ binary_semaphore() :m_signalled(0) {}
+#else
+ std::mutex m_mtx{};
+ std::condition_variable m_cv{};
+ bool m_signalled = false;
+#endif
+};
+
+#if defined (__linux__) || defined (_WIN32)
+void binary_semaphore::wait()
+{
+ for (;;)
+ {
+ if (m_signalled.exchange(0, mem_order) == 1)
+ {
+ break;
+ }
+#ifdef _WIN32
+ int zero = 0;
+ WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE);
+#else
+ syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
+#endif
+ }
+}
+
+void binary_semaphore::wake()
+{
+ if (m_signalled.exchange(1, mem_order) == 0)
+ {
+#ifdef _WIN32
+ WakeByAddressSingle(&m_signalled);
+#else
+ syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
+#endif
+ }
+}
+#else
+void binary_semaphore::wait()
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ while (!m_signalled)
+ m_cv.wait(lk);
+ m_signalled = false;
+}
+void binary_semaphore::wake()
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_signalled = true;
+ m_cv.notify_one();
+}
+#endif
+
+/* A thread helper structure, used in group commit lock below*/
+struct group_commit_waiter_t
+{
+ lsn_t m_value=0;
+ binary_semaphore m_sema{};
+ group_commit_waiter_t* m_next= nullptr;
+ bool m_group_commit_leader=false;
+};
+
+group_commit_lock::group_commit_lock() :
+ m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
+{
+}
+
+group_commit_lock::value_type group_commit_lock::value() const
+{
+ return m_value.load(std::memory_order::memory_order_relaxed);
+}
+
+group_commit_lock::value_type group_commit_lock::pending() const
+{
+ return m_pending_value.load(std::memory_order::memory_order_relaxed);
+}
+
+void group_commit_lock::set_pending(group_commit_lock::value_type num)
+{
+ ut_a(num >= value());
+ m_pending_value.store(num, std::memory_order::memory_order_relaxed);
+}
+
+const unsigned int MAX_SPINS = 1; /** max spins in acquire */
+thread_local group_commit_waiter_t thread_local_waiter;
+
+static inline void do_completion_callback(const completion_callback* cb)
+{
+ if (cb)
+ cb->m_callback(cb->m_param);
+}
+
+group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
+{
+ unsigned int spins = MAX_SPINS;
+
+ for(;;)
+ {
+ if (num <= value())
+ {
+ /* No need to wait.*/
+ do_completion_callback(callback);
+ return lock_return_code::EXPIRED;
+ }
+
+ if(spins-- == 0)
+ break;
+ if (num > pending())
+ {
+ /* Longer wait expected (longer than currently running operation),
+ don't spin.*/
+ break;
+ }
+ ut_delay(1);
+ }
+
+ thread_local_waiter.m_value = num;
+ thread_local_waiter.m_group_commit_leader= false;
+ std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
+ while (num > value() || thread_local_waiter.m_group_commit_leader)
+ {
+ lk.lock();
+
+ /* Re-read current value after acquiring the lock*/
+ if (num <= value() &&
+ (!thread_local_waiter.m_group_commit_leader || m_lock))
+ {
+ lk.unlock();
+ do_completion_callback(callback);
+ return lock_return_code::EXPIRED;
+ }
+
+ if (!m_lock)
+ {
+ /* Take the lock, become group commit leader.*/
+ m_lock = true;
+#ifndef DBUG_OFF
+ m_owner_id = std::this_thread::get_id();
+#endif
+ if (callback)
+ m_pending_callbacks.push_back({num,*callback});
+ return lock_return_code::ACQUIRED;
+ }
+
+ if (callback && (m_waiters_list || num <= pending()))
+ {
+ /*
+ If num > pending(), we have a good candidate for the next group
+ commit lead, that will be taking over the lock after current owner
+ releases it. We put current thread into waiter's list so it sleeps
+ and can be signaled and marked as group commit lead during lock release.
+
+ For this to work well, pending() must deliver a good approximation for N
+ in the next call to group_commit_lock::release(N).
+ */
+ m_pending_callbacks.push_back({num, *callback});
+ return lock_return_code::CALLBACK_QUEUED;
+ }
+
+ /* Add yourself to waiters list.*/
+ thread_local_waiter.m_group_commit_leader= false;
+ thread_local_waiter.m_next = m_waiters_list;
+ m_waiters_list = &thread_local_waiter;
+ lk.unlock();
+
+ /* Sleep until woken in release().*/
+ thd_wait_begin(0,THD_WAIT_GROUP_COMMIT);
+ thread_local_waiter.m_sema.wait();
+ thd_wait_end(0);
+
+ }
+ do_completion_callback(callback);
+ return lock_return_code::EXPIRED;
+}
+
+group_commit_lock::value_type group_commit_lock::release(value_type num)
+{
+ completion_callback callbacks[1000];
+ size_t callback_count = 0;
+ value_type ret = 0;
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_lock = false;
+
+ /* Update current value. */
+ ut_a(num >= value());
+ m_value.store(num, std::memory_order_relaxed);
+
+ /*
+ Wake waiters for value <= current value.
+ Wake one more waiter, who will become the group commit lead.
+ */
+ group_commit_waiter_t* cur, * prev, * next;
+ group_commit_waiter_t* wakeup_list = nullptr;
+ for (auto& c : m_pending_callbacks)
+ {
+ if (c.first <= num)
+ {
+ if (callback_count < array_elements(callbacks))
+ callbacks[callback_count++] = c.second;
+ else
+ c.second.m_callback(c.second.m_param);
+ }
+ }
+
+ for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
+ {
+ next= cur->m_next;
+ if (cur->m_value <= num)
+ {
+ /* Move current waiter to wakeup_list*/
+
+ if (!prev)
+ {
+ /* Remove from the start of the list.*/
+ m_waiters_list = next;
+ }
+ else
+ {
+ /* Remove from the middle of the list.*/
+ prev->m_next= cur->m_next;
+ }
+
+ /* Append entry to the wakeup list.*/
+ cur->m_next = wakeup_list;
+ wakeup_list = cur;
+ }
+ else
+ {
+ prev= cur;
+ }
+ }
+
+ auto it= std::remove_if(
+ m_pending_callbacks.begin(), m_pending_callbacks.end(),
+ [num](const pending_cb &c) { return c.first <= num; });
+
+ m_pending_callbacks.erase(it, m_pending_callbacks.end());
+
+ if (m_pending_callbacks.size() || m_waiters_list)
+ {
+ /*
+ Ensure that after this thread released the lock,
+ there is a new group commit leader
+ We take this from waiters list or wakeup list. It
+ might look like a spurious wake, but in fact we just
+ ensure the waiter do not wait for eternity.
+ */
+ if (m_waiters_list)
+ {
+ /* Move one waiter to wakeup list */
+ auto e= m_waiters_list;
+ m_waiters_list= m_waiters_list->m_next;
+ e->m_next= wakeup_list;
+ e->m_group_commit_leader= true;
+ wakeup_list = e;
+ }
+ else if (wakeup_list)
+ {
+ wakeup_list->m_group_commit_leader=true;
+ }
+ else
+ {
+ /* Tell the caller that some pending callbacks left, and he should
+ do something to prevent stalls. This should be a rare situation.*/
+ ret= m_pending_callbacks[0].first;
+ }
+ }
+
+ lk.unlock();
+
+ /*
+ Release designated next group commit lead first,
+ to minimize spurious wakeups.
+ */
+ if (wakeup_list && wakeup_list->m_group_commit_leader)
+ {
+ next = wakeup_list->m_next;
+ wakeup_list->m_sema.wake();
+ wakeup_list= next;
+ }
+
+ for (size_t i = 0; i < callback_count; i++)
+ callbacks[i].m_callback(callbacks[i].m_param);
+
+ for (cur= wakeup_list; cur; cur= next)
+ {
+ next= cur->m_next;
+ cur->m_sema.wake();
+ }
+ return ret;
+}
+
+#ifndef DBUG_OFF
+bool group_commit_lock::is_owner()
+{
+ return m_lock && std::this_thread::get_id() == m_owner_id;
+}
+#endif
+