summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/src/client_state.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'wsrep-lib/src/client_state.cpp')
-rw-r--r--wsrep-lib/src/client_state.cpp1096
1 files changed, 1096 insertions, 0 deletions
diff --git a/wsrep-lib/src/client_state.cpp b/wsrep-lib/src/client_state.cpp
new file mode 100644
index 00000000..99c4222f
--- /dev/null
+++ b/wsrep-lib/src/client_state.cpp
@@ -0,0 +1,1096 @@
+/*
+ * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/client_state.hpp"
+#include "wsrep/compiler.hpp"
+#include "wsrep/logger.hpp"
+#include "wsrep/server_state.hpp"
+#include "wsrep/server_service.hpp"
+#include "wsrep/client_service.hpp"
+
+#include <unistd.h> // usleep()
+#include <cassert>
+#include <sstream>
+#include <iostream>
+
+wsrep::client_state::~client_state()
+{
+ assert(transaction_.active() == false);
+}
+
+wsrep::provider& wsrep::client_state::provider() const
+{
+ return server_state_.provider();
+}
+
+void wsrep::client_state::open(wsrep::client_id id)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(state_ == s_none);
+ assert(keep_command_error_ == false);
+ debug_log_state("open: enter");
+ owning_thread_id_ = wsrep::this_thread::get_id();
+ rollbacker_active_ = false;
+ sync_wait_gtid_ = wsrep::gtid::undefined();
+ last_written_gtid_ = wsrep::gtid::undefined();
+ state(lock, s_idle);
+ id_ = id;
+ debug_log_state("open: leave");
+}
+
+void wsrep::client_state::close()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("close: enter");
+
+ while (mode_ == m_local && is_rollbacker_active()) {
+ cond_.wait(lock);
+ }
+ do_acquire_ownership(lock);
+
+ state(lock, s_quitting);
+ keep_command_error_ = false;
+ lock.unlock();
+ if (transaction_.active() &&
+ (mode_ != m_local ||
+ transaction_.state() != wsrep::transaction::s_prepared))
+ {
+ client_service_.bf_rollback();
+ transaction_.after_statement();
+ }
+ if (mode_ == m_local)
+ {
+ disable_streaming();
+ }
+ debug_log_state("close: leave");
+}
+
+void wsrep::client_state::cleanup()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ cleanup(lock);
+}
+
+void wsrep::client_state::cleanup(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ debug_log_state("cleanup: enter");
+ state(lock, s_none);
+ debug_log_state("cleanup: leave");
+}
+
+void wsrep::client_state::override_error(enum wsrep::client_error error,
+ enum wsrep::provider::status status)
+{
+ assert(wsrep::this_thread::get_id() == owning_thread_id_);
+ // Error state should not be cleared with success code without
+ // explicit reset_error() call.
+ assert(current_error_ == wsrep::e_success ||
+ error != wsrep::e_success);
+ current_error_ = error;
+ current_error_status_ = status;
+}
+
+int wsrep::client_state::before_command(bool keep_command_error)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("before_command: enter");
+ // If the state is s_exec, the processing thread has already grabbed
+ // control with wait_rollback_complete_and_acquire_ownership()
+ if (state_ != s_exec)
+ {
+ assert(state_ == s_idle);
+ do_wait_rollback_complete_and_acquire_ownership(lock);
+ assert(state_ == s_exec);
+ client_service_.store_globals();
+ }
+ else
+ {
+ // This thread must have acquired control by other means,
+ // for example via wait_rollback_complete_and_acquire_ownership().
+ assert(wsrep::this_thread::get_id() == owning_thread_id_);
+ }
+
+ keep_command_error_ = keep_command_error;
+
+ // If the transaction is active, it must be either executing,
+ // aborted as rolled back by rollbacker, or must_abort if the
+ // client thread gained control via
+ // wait_rollback_complete_and_acquire_ownership()
+ // just before BF abort happened.
+ assert(transaction_.active() == false ||
+ (transaction_.state() == wsrep::transaction::s_executing ||
+ transaction_.state() == wsrep::transaction::s_prepared ||
+ transaction_.state() == wsrep::transaction::s_aborted ||
+ transaction_.state() == wsrep::transaction::s_must_abort));
+
+ if (transaction_.active())
+ {
+ if (transaction_.state() == wsrep::transaction::s_must_abort ||
+ transaction_.state() == wsrep::transaction::s_aborted)
+ {
+ if (transaction_.is_xa())
+ {
+ // Client will rollback explicitly, return error.
+ debug_log_state("before_command: error");
+ return 1;
+ }
+
+ override_error(wsrep::e_deadlock_error);
+ if (transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ lock.unlock();
+ client_service_.bf_rollback();
+ lock.lock();
+
+ }
+
+ if (keep_command_error_)
+ {
+ // Keep the error for the next command
+ debug_log_state("before_command: keep error");
+ return 0;
+ }
+
+ // Clean up the transaction and return error.
+ (void)transaction_.after_statement(lock);
+
+ assert(transaction_.active() == false);
+ assert(transaction_.state() == wsrep::transaction::s_aborted);
+ assert(current_error() != wsrep::e_success);
+
+ debug_log_state("before_command: error");
+ return 1;
+ }
+ }
+ debug_log_state("before_command: success");
+ return 0;
+}
+
+void wsrep::client_state::after_command_before_result()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("after_command_before_result: enter");
+ assert(state() == s_exec);
+ if (transaction_.active() &&
+ transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ transaction_.after_command_must_abort(lock);
+ // If keep current error is set, the result will be propagated
+ // back to client with some future command, so keep the transaction
+ // open here so that error handling can happen in before_command()
+ // hook.
+ if (not keep_command_error_)
+ {
+ (void)transaction_.after_statement(lock);
+ }
+
+ assert(transaction_.state() == wsrep::transaction::s_aborted);
+ }
+ state(lock, s_result);
+ debug_log_state("after_command_before_result: leave");
+}
+
+void wsrep::client_state::after_command_after_result()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("after_command_after_result_enter");
+ assert(state() == s_result);
+ assert(transaction_.state() != wsrep::transaction::s_aborting);
+ if (transaction_.active() &&
+ transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ transaction_.after_command_must_abort(lock);
+ assert(transaction_.state() == wsrep::transaction::s_aborted);
+ }
+ else if (transaction_.active() == false && not keep_command_error_)
+ {
+ current_error_ = wsrep::e_success;
+ current_error_status_ = wsrep::provider::success;
+ }
+ keep_command_error_ = false;
+ sync_wait_gtid_ = wsrep::gtid::undefined();
+ state(lock, s_idle);
+ debug_log_state("after_command_after_result: leave");
+}
+
+int wsrep::client_state::before_statement()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("before_statement: enter");
+#if 0
+ /**
+ * @todo It might be beneficial to implement timed wait for
+ * server synced state.
+ */
+ if (allow_dirty_reads_ == false &&
+ server_state_.state() != wsrep::server_state::s_synced)
+ {
+ return 1;
+ }
+#endif // 0
+
+ if (transaction_.active() &&
+ transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ // Rollback and cleanup will happen in after_command_before_result()
+ debug_log_state("before_statement_error");
+ return 1;
+ }
+ debug_log_state("before_statement: success");
+ return 0;
+}
+
+int wsrep::client_state::after_statement()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("after_statement: enter");
+ assert(state() == s_exec);
+ assert(mode() == m_local);
+ (void)transaction_.after_statement(lock);
+ if (current_error() == wsrep::e_deadlock_error)
+ {
+ if (mode_ == m_local)
+ {
+ debug_log_state("after_statement: may_retry");
+ }
+ else
+ {
+ debug_log_state("after_statement: error");
+ }
+ return 1;
+ }
+ debug_log_state("after_statement: success");
+ return 0;
+}
+
+void wsrep::client_state::after_applying()
+{
+ assert(mode_ == m_high_priority);
+ transaction_.after_applying();
+}
+
+int wsrep::client_state::start_transaction(const wsrep::transaction_id& id)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(state_ == s_exec);
+ return transaction_.start_transaction(id);
+}
+
+int wsrep::client_state::assign_read_view(const wsrep::gtid* const gtid)
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return transaction_.assign_read_view(gtid);
+}
+
+int wsrep::client_state::append_key(const wsrep::key& key)
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return transaction_.append_key(key);
+}
+
+int wsrep::client_state::append_keys(const wsrep::key_array& keys)
+{
+ assert(mode_ == m_local || mode_ == m_toi);
+ assert(state_ == s_exec);
+ for (auto i(keys.begin()); i != keys.end(); ++i)
+ {
+ if (transaction_.append_key(*i))
+ {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+int wsrep::client_state::append_data(const wsrep::const_buffer& data)
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return transaction_.append_data(data);
+}
+
+int wsrep::client_state::after_row()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return (transaction_.streaming_context().fragment_size()
+ ? transaction_.after_row()
+ : 0);
+}
+
+void wsrep::client_state::fragment_applied(wsrep::seqno seqno)
+{
+ assert(mode_ == m_high_priority);
+ transaction_.fragment_applied(seqno);
+}
+
+int wsrep::client_state::prepare_for_ordering(const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ bool is_commit)
+{
+ assert(state_ == s_exec);
+ return transaction_.prepare_for_ordering(ws_handle, ws_meta, is_commit);
+}
+
+int wsrep::client_state::start_transaction(const wsrep::ws_handle& wsh,
+ const wsrep::ws_meta& meta)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(mode_ == m_high_priority);
+ return transaction_.start_transaction(wsh, meta);
+}
+
+int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(mode_ == m_high_priority);
+ return transaction_.next_fragment(meta);
+}
+
+int wsrep::client_state::before_prepare()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec);
+ return transaction_.before_prepare(lock);
+}
+
+int wsrep::client_state::after_prepare()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec);
+ return transaction_.after_prepare(lock);
+}
+
+int wsrep::client_state::before_commit()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec || mode_ == m_local);
+ return transaction_.before_commit();
+}
+
+int wsrep::client_state::ordered_commit()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec || mode_ == m_local);
+ return transaction_.ordered_commit();
+}
+
+int wsrep::client_state::after_commit()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec || mode_ == m_local);
+ return transaction_.after_commit();
+}
+
+int wsrep::client_state::before_rollback()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_idle || state_ == s_exec || state_ == s_result
+ || state_ == s_quitting);
+ return transaction_.before_rollback();
+}
+
+int wsrep::client_state::after_rollback()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_idle || state_ == s_exec || state_ == s_result
+ || state_ == s_quitting);
+ return transaction_.after_rollback();
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Rollbacker synchronization //
+//////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::sync_rollback_complete()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("sync_rollback_complete: enter");
+ assert(state_ == s_idle && mode_ == m_local &&
+ transaction_.state() == wsrep::transaction::s_aborted);
+ set_rollbacker_active(false);
+ cond_.notify_all();
+ debug_log_state("sync_rollback_complete: leave");
+}
+
+void wsrep::client_state::wait_rollback_complete_and_acquire_ownership()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("wait_rollback_complete_and_acquire_ownership: enter");
+ if (state_ == s_idle)
+ {
+ do_wait_rollback_complete_and_acquire_ownership(lock);
+ }
+ assert(state_ == s_exec);
+ debug_log_state("wait_rollback_complete_and_acquire_ownership: leave");
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Streaming //
+//////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::streaming_params(
+ enum wsrep::streaming_context::fragment_unit fragment_unit,
+ size_t fragment_size)
+{
+ assert(mode_ == m_local);
+ transaction_.streaming_context().params(fragment_unit, fragment_size);
+}
+
+int wsrep::client_state::enable_streaming(
+ enum wsrep::streaming_context::fragment_unit
+ fragment_unit,
+ size_t fragment_size)
+{
+ assert(mode_ == m_local);
+ if (transaction_.is_streaming() &&
+ transaction_.streaming_context().fragment_unit() !=
+ fragment_unit)
+ {
+ wsrep::log_error()
+ << "Changing fragment unit for active streaming transaction "
+ << "not allowed";
+ return 1;
+ }
+ transaction_.streaming_context().enable(fragment_unit, fragment_size);
+ return 0;
+}
+
+void wsrep::client_state::disable_streaming()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec || state_ == s_quitting);
+ transaction_.streaming_context().disable();
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// XA //
+//////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::xa_detach()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
+ transaction_.xa_detach();
+}
+
+void wsrep::client_state::xa_replay()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_idle);
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ transaction_.xa_replay(lock);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// BF //
+//////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
+ wsrep::seqno bf_seqno)
+{
+ assert(lock.owns_lock());
+ assert(mode_ == m_local || transaction_.is_streaming());
+ auto ret = transaction_.bf_abort(lock, bf_seqno);
+ assert(lock.owns_lock());
+ return ret;
+}
+
+int wsrep::client_state::bf_abort(wsrep::seqno bf_seqno)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return bf_abort(lock, bf_seqno);
+}
+
+int wsrep::client_state::total_order_bf_abort(
+ wsrep::unique_lock<wsrep::mutex>& lock, wsrep::seqno bf_seqno)
+{
+ assert(lock.owns_lock());
+ assert(mode_ == m_local || transaction_.is_streaming());
+ auto ret = transaction_.total_order_bf_abort(lock, bf_seqno);
+ assert(lock.owns_lock());
+ return ret;
+}
+
+int wsrep::client_state::total_order_bf_abort(wsrep::seqno bf_seqno)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return total_order_bf_abort(lock, bf_seqno);
+}
+
+void wsrep::client_state::adopt_transaction(
+ const wsrep::transaction& transaction)
+{
+ assert(mode_ == m_high_priority);
+ transaction_.adopt(transaction);
+}
+
+void wsrep::client_state::adopt_apply_error(wsrep::mutable_buffer& err)
+{
+ assert(mode_ == m_high_priority);
+ transaction_.adopt_apply_error(err);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// TOI //
+//////////////////////////////////////////////////////////////////////////////
+
+enum wsrep::provider::status
+wsrep::client_state::poll_enter_toi(
+ wsrep::unique_lock<wsrep::mutex>& lock,
+ const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ wsrep::ws_meta& meta,
+ int flags,
+ std::chrono::time_point<wsrep::clock> wait_until,
+ bool& timed_out)
+{
+ WSREP_LOG_DEBUG(debug_log_level(),
+ wsrep::log::debug_level_client_state,
+ "poll_enter_toi: "
+ << flags
+ << ","
+ << wait_until.time_since_epoch().count());
+ enum wsrep::provider::status status;
+ timed_out = false;
+ wsrep::ws_meta poll_meta; // tmp var for polling, as enter_toi may clear meta arg on errors
+ do
+ {
+ lock.unlock();
+ poll_meta = meta;
+ status = provider().enter_toi(id_, keys, buffer, poll_meta, flags);
+ if (status != wsrep::provider::success &&
+ not poll_meta.gtid().is_undefined())
+ {
+ // Successfully entered TOI, but the provider reported failure.
+ // This may happen for example if certification fails.
+ // Leave TOI before proceeding.
+ if (provider().leave_toi(id_, wsrep::mutable_buffer()))
+ {
+ wsrep::log_warning()
+ << "Failed to leave TOI after failure in "
+ << "poll_enter_toi()";
+ }
+ poll_meta = wsrep::ws_meta();
+ }
+ if (status == wsrep::provider::error_certification_failed ||
+ status == wsrep::provider::error_connection_failed)
+ {
+ ::usleep(300000);
+ }
+ lock.lock();
+ timed_out = !(wait_until.time_since_epoch().count() &&
+ wsrep::clock::now() < wait_until);
+ }
+ while ((status == wsrep::provider::error_certification_failed ||
+ status == wsrep::provider::error_connection_failed) &&
+ not timed_out &&
+ not client_service_.interrupted(lock));
+ meta = poll_meta;
+ return status;
+}
+
+void wsrep::client_state::enter_toi_common(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ toi_mode_ = mode_;
+ mode(lock, m_toi);
+}
+
+int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ std::chrono::time_point<wsrep::clock> wait_until)
+{
+ debug_log_state("enter_toi_local: enter");
+ assert(state_ == s_exec);
+ assert(mode_ == m_local);
+ int ret;
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ bool timed_out;
+ auto const status(poll_enter_toi(
+ lock, keys, buffer,
+ toi_meta_,
+ wsrep::provider::flag::start_transaction |
+ wsrep::provider::flag::commit,
+ wait_until,
+ timed_out));
+ switch (status)
+ {
+ case wsrep::provider::success:
+ {
+ enter_toi_common(lock);
+ ret = 0;
+ break;
+ }
+ case wsrep::provider::error_certification_failed:
+ override_error(e_deadlock_error, status);
+ ret = 1;
+ break;
+ default:
+ if (timed_out) {
+ override_error(e_timeout_error);
+ } else {
+ override_error(e_error_during_commit, status);
+ }
+ ret = 1;
+ break;
+ }
+
+ debug_log_state("enter_toi_local: leave");
+ return ret;
+}
+
+void wsrep::client_state::enter_toi_mode(const wsrep::ws_meta& ws_meta)
+{
+ debug_log_state("enter_toi_mode: enter");
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(mode_ == m_high_priority);
+ enter_toi_common(lock);
+ toi_meta_ = ws_meta;
+ debug_log_state("enter_toi_mode: leave");
+}
+
+void wsrep::client_state::leave_toi_common()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ mode(lock, toi_mode_);
+ toi_mode_ = m_undefined;
+ if (toi_meta_.gtid().is_undefined() == false)
+ {
+ update_last_written_gtid(toi_meta_.gtid());
+ }
+ toi_meta_ = wsrep::ws_meta();
+}
+
+int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err)
+{
+ debug_log_state("leave_toi_local: enter");
+ assert(toi_mode_ == m_local);
+ leave_toi_common();
+
+ debug_log_state("leave_toi_local: leave");
+ return (provider().leave_toi(id_, err) == provider::success ? 0 : 1);
+}
+
+void wsrep::client_state::leave_toi_mode()
+{
+ debug_log_state("leave_toi_mode: enter");
+ assert(toi_mode_ == m_high_priority);
+ leave_toi_common();
+ debug_log_state("leave_toi_mode: leave");
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// RSU //
+///////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::begin_rsu(int timeout)
+{
+ if (server_state_.desync())
+ {
+ wsrep::log_warning() << "Failed to desync server";
+ return 1;
+ }
+ if (server_state_.server_service().wait_committing_transactions(timeout))
+ {
+ wsrep::log_warning() << "RSU failed due to pending transactions";
+ server_state_.resync();
+ return 1;
+ }
+ wsrep::seqno pause_seqno(server_state_.pause());
+ if (pause_seqno.is_undefined())
+ {
+ wsrep::log_warning() << "Failed to pause provider";
+ server_state_.resync();
+ return 1;
+ }
+ wsrep::log_info() << "Provider paused at: " << pause_seqno;
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ toi_mode_ = mode_;
+ mode(lock, m_rsu);
+ return 0;
+}
+
+int wsrep::client_state::end_rsu()
+{
+ int ret(0);
+ try
+ {
+ server_state_.resume();
+ server_state_.resync();
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_warning() << "End RSU failed: " << e.what();
+ ret = 1;
+ }
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ mode(lock, toi_mode_);
+ toi_mode_ = m_undefined;
+ return ret;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// NBO //
+///////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::begin_nbo_phase_one(
+ const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ std::chrono::time_point<wsrep::clock> wait_until)
+{
+ debug_log_state("begin_nbo_phase_one: enter");
+ debug_log_keys(keys);
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(state_ == s_exec);
+ assert(mode_ == m_local);
+ assert(toi_mode_ == m_undefined);
+
+ int ret;
+ bool timed_out;
+ auto const status(poll_enter_toi(
+ lock, keys, buffer,
+ toi_meta_,
+ wsrep::provider::flag::start_transaction,
+ wait_until,
+ timed_out));
+ switch (status)
+ {
+ case wsrep::provider::success:
+ toi_mode_ = mode_;
+ mode(lock, m_nbo);
+ ret= 0;
+ break;
+ case wsrep::provider::error_certification_failed:
+ override_error(e_deadlock_error, status);
+ ret = 1;
+ break;
+ default:
+ if (timed_out) {
+ override_error(e_timeout_error);
+ } else {
+ override_error(e_error_during_commit, status);
+ }
+ ret = 1;
+ break;
+ }
+
+ debug_log_state("begin_nbo_phase_one: leave");
+ return ret;
+}
+
+int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err)
+{
+ debug_log_state("end_nbo_phase_one: enter");
+ assert(state_ == s_exec);
+ assert(mode_ == m_nbo);
+ assert(in_toi());
+
+ enum wsrep::provider::status status(provider().leave_toi(id_, err));
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ int ret;
+ switch (status)
+ {
+ case wsrep::provider::success:
+ ret = 0;
+ break;
+ default:
+ override_error(e_error_during_commit, status);
+ ret = 1;
+ break;
+ }
+ nbo_meta_ = toi_meta_;
+ toi_meta_ = wsrep::ws_meta();
+ toi_mode_ = m_undefined;
+ debug_log_state("end_nbo_phase_one: leave");
+ return ret;
+}
+
+int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta)
+{
+ assert(state_ == s_exec);
+ assert(mode_ == m_local);
+ assert(toi_mode_ == m_undefined);
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ nbo_meta_ = ws_meta;
+ mode(lock, m_nbo);
+ return 0;
+}
+
+int wsrep::client_state::begin_nbo_phase_two(
+ const wsrep::key_array& keys,
+ std::chrono::time_point<wsrep::clock> wait_until)
+{
+ debug_log_state("begin_nbo_phase_two: enter");
+ debug_log_keys(keys);
+ assert(state_ == s_exec);
+ assert(mode_ == m_nbo);
+ assert(toi_mode_ == m_undefined);
+ assert(!in_toi());
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ // Note: nbo_meta_ is passed to enter_toi() as it is
+ // an input param containing gtid of NBO begin.
+ // Output stored in nbo_meta_ is copied to toi_meta_ for
+ // phase two end.
+ bool timed_out;
+ enum wsrep::provider::status status(
+ poll_enter_toi(lock, keys,
+ wsrep::const_buffer(),
+ nbo_meta_,
+ wsrep::provider::flag::commit,
+ wait_until,
+ timed_out));
+ int ret;
+ switch (status)
+ {
+ case wsrep::provider::success:
+ ret= 0;
+ toi_meta_ = nbo_meta_;
+ toi_mode_ = m_local;
+ break;
+ case wsrep::provider::error_provider_failed:
+ override_error(e_interrupted_error, status);
+ ret= 1;
+ break;
+ default:
+ if (timed_out)
+ {
+ override_error(e_timeout_error, status);
+ }
+ else
+ {
+ override_error(e_error_during_commit, status);
+ }
+ ret= 1;
+ break;
+ }
+
+ // Failed to grab TOI for completing NBO in order. This means that
+ // the operation cannot be ended in total order, so we end the
+ // NBO mode and let the DBMS to deal with the error.
+ if (ret)
+ {
+ mode(lock, m_local);
+ nbo_meta_ = wsrep::ws_meta();
+ }
+
+ debug_log_state("begin_nbo_phase_two: leave");
+ return ret;
+}
+
+int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err)
+{
+ debug_log_state("end_nbo_phase_two: enter");
+ assert(state_ == s_exec);
+ assert(mode_ == m_nbo);
+ assert(toi_mode_ == m_local);
+ assert(in_toi());
+ enum wsrep::provider::status status(
+ provider().leave_toi(id_, err));
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ int ret;
+ switch (status)
+ {
+ case wsrep::provider::success:
+ ret = 0;
+ break;
+ default:
+ override_error(e_error_during_commit, status);
+ ret = 1;
+ break;
+ }
+ toi_meta_ = wsrep::ws_meta();
+ toi_mode_ = m_undefined;
+ nbo_meta_ = wsrep::ws_meta();
+ mode(lock, m_local);
+ debug_log_state("end_nbo_phase_two: leave");
+ return ret;
+}
+///////////////////////////////////////////////////////////////////////////////
+// Misc //
+///////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::sync_wait(int timeout)
+{
+ std::pair<wsrep::gtid, enum wsrep::provider::status> result(
+ server_state_.causal_read(timeout));
+ int ret(1);
+ switch (result.second)
+ {
+ case wsrep::provider::success:
+ sync_wait_gtid_ = result.first;
+ ret = 0;
+ break;
+ case wsrep::provider::error_not_implemented:
+ override_error(wsrep::e_not_supported_error);
+ break;
+ default:
+ override_error(wsrep::e_timeout_error);
+ break;
+ }
+ return ret;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// Private //
+///////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::do_acquire_ownership(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
+{
+ assert(lock.owns_lock());
+ // Be strict about client state for clients in local mode. The
+ // owning_thread_id_ is used to detect bugs which are caused by
+ // more than one thread operating the client state at the time,
+ // for example thread handling the client session and background
+ // rollbacker.
+ assert(state_ == s_idle || mode_ != m_local);
+ owning_thread_id_ = wsrep::this_thread::get_id();
+}
+
+void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ assert(state_ == s_idle);
+ while (is_rollbacker_active())
+ {
+ cond_.wait(lock);
+ }
+ do_acquire_ownership(lock);
+ state(lock, s_exec);
+}
+
+void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)
+{
+ assert(last_written_gtid_.is_undefined() ||
+ (last_written_gtid_.id() == gtid.id() &&
+ !(last_written_gtid_.seqno() > gtid.seqno())));
+ last_written_gtid_ = gtid;
+}
+
+void wsrep::client_state::debug_log_state(const char* context) const
+{
+ WSREP_LOG_DEBUG(debug_log_level(),
+ wsrep::log::debug_level_client_state,
+ context
+ << "(" << id_.get()
+ << "," << to_c_string(state_)
+ << "," << to_c_string(mode_)
+ << "," << wsrep::to_string(current_error_)
+ << "," << current_error_status_
+ << ",toi: " << toi_meta_.seqno()
+ << ",nbo: " << nbo_meta_.seqno() << ")");
+}
+
+void wsrep::client_state::debug_log_keys(const wsrep::key_array& keys) const
+{
+ for (size_t i(0); i < keys.size(); ++i)
+ {
+ WSREP_LOG_DEBUG(debug_log_level(),
+ wsrep::log::debug_level_client_state,
+ "TOI keys: "
+ << " id: " << id_
+ << "key: " << keys[i]);
+ }
+}
+
+void wsrep::client_state::state(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
+ enum wsrep::client_state::state state)
+{
+ // Verify that the current thread has gained control to the
+ // connection by calling before_command()
+ assert(wsrep::this_thread::get_id() == owning_thread_id_);
+ assert(lock.owns_lock());
+ static const char allowed[state_max_][state_max_] =
+ {
+ /* none idle exec result quit */
+ { 0, 1, 0, 0, 0}, /* none */
+ { 0, 0, 1, 0, 1}, /* idle */
+ { 0, 0, 0, 1, 0}, /* exec */
+ { 0, 1, 0, 0, 0}, /* result */
+ { 1, 0, 0, 0, 0} /* quit */
+ };
+ if (!allowed[state_][state])
+ {
+ wsrep::log_warning() << "client_state: Unallowed state transition: "
+ << state_ << " -> " << state;
+ assert(0);
+ }
+ state_hist_.push_back(state_);
+ state_ = state;
+ if (state_hist_.size() > 10)
+ {
+ state_hist_.erase(state_hist_.begin());
+ }
+
+}
+
+void wsrep::client_state::mode(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
+ enum mode mode)
+{
+ assert(lock.owns_lock());
+
+ static const char allowed[n_modes_][n_modes_] =
+ { /* u l h t r n */
+ { 0, 0, 0, 0, 0, 0 }, /* undefined */
+ { 0, 0, 1, 1, 1, 1 }, /* local */
+ { 0, 1, 0, 1, 0, 1 }, /* high prio */
+ { 0, 1, 1, 0, 0, 0 }, /* toi */
+ { 0, 1, 0, 0, 0, 0 }, /* rsu */
+ { 0, 1, 1, 0, 0, 0 } /* nbo */
+ };
+ if (!allowed[mode_][mode])
+ {
+ wsrep::log_warning() << "client_state: Unallowed mode transition: "
+ << mode_ << " -> " << mode;
+ assert(0);
+ }
+ mode_ = mode;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// High Priority Context //
+///////////////////////////////////////////////////////////////////////////////
+
+wsrep::high_priority_context::high_priority_context(wsrep::client_state& client)
+ : client_(client)
+ , orig_mode_(client.mode_)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client.mutex_);
+ client.mode(lock, wsrep::client_state::m_high_priority);
+}
+
+wsrep::high_priority_context::~high_priority_context()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_.mutex_);
+ assert(client_.mode() == wsrep::client_state::m_high_priority);
+ client_.mode(lock, orig_mode_);
+}