diff options
Diffstat (limited to '')
-rw-r--r-- | wsrep-lib/src/transaction.cpp | 2171 |
1 files changed, 2171 insertions, 0 deletions
diff --git a/wsrep-lib/src/transaction.cpp b/wsrep-lib/src/transaction.cpp new file mode 100644 index 00000000..451e94dd --- /dev/null +++ b/wsrep-lib/src/transaction.cpp @@ -0,0 +1,2171 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "wsrep/transaction.hpp" +#include "wsrep/client_state.hpp" +#include "wsrep/server_state.hpp" +#include "wsrep/storage_service.hpp" +#include "wsrep/high_priority_service.hpp" +#include "wsrep/key.hpp" +#include "wsrep/logger.hpp" +#include "wsrep/compiler.hpp" +#include "wsrep/server_service.hpp" +#include "wsrep/client_service.hpp" + +#include <cassert> +#include <sstream> +#include <memory> + +namespace +{ + class storage_service_deleter + { + public: + storage_service_deleter(wsrep::server_service& server_service) + : server_service_(server_service) + { } + void operator()(wsrep::storage_service* storage_service) + { + server_service_.release_storage_service(storage_service); + } + private: + wsrep::server_service& server_service_; + }; + + template <class D> + class scoped_storage_service + { + public: + scoped_storage_service(wsrep::client_service& client_service, + wsrep::storage_service* storage_service, + D deleter) + : client_service_(client_service) + , storage_service_(storage_service) + , deleter_(deleter) + { + if (storage_service_ == 0) + { + throw wsrep::runtime_error("Null client_state provided"); + } + client_service_.reset_globals(); + storage_service_->store_globals(); + } + + wsrep::storage_service& storage_service() + { + return *storage_service_; + } + + ~scoped_storage_service() + { + deleter_(storage_service_); + client_service_.store_globals(); + } + private: + scoped_storage_service(const scoped_storage_service&); + scoped_storage_service& operator=(const scoped_storage_service&); + wsrep::client_service& client_service_; + wsrep::storage_service* storage_service_; + D deleter_; + }; +} + +// Public + +wsrep::transaction::transaction( + wsrep::client_state& client_state) + : server_service_(client_state.server_state().server_service()) + , client_service_(client_state.client_service()) + , client_state_(client_state) + , server_id_() + , id_(transaction_id::undefined()) + , state_(s_executing) + , state_hist_() + , bf_abort_state_(s_executing) + , bf_abort_provider_status_() + , bf_abort_client_state_() + , bf_aborted_in_total_order_() + , ws_handle_() + , ws_meta_() + , flags_() + , implicit_deps_(false) + , certified_(false) + , fragments_certified_for_statement_() + , streaming_context_() + , sr_keys_() + , apply_error_buf_() + , xid_() + , streaming_rollback_in_progress_(false) + , is_bf_immutable_(false) +{ } + + +wsrep::transaction::~transaction() +{ +} + +int wsrep::transaction::start_transaction( + const wsrep::transaction_id& id) +{ + debug_log_state("start_transaction enter"); + assert(active() == false); + assert(is_xa() == false); + assert(flags() == 0); + server_id_ = client_state_.server_state().id(); + id_ = id; + state_ = s_executing; + state_hist_.clear(); + ws_handle_ = wsrep::ws_handle(id); + flags(wsrep::provider::flag::start_transaction); + switch (client_state_.mode()) + { + case wsrep::client_state::m_high_priority: + debug_log_state("start_transaction success"); + return 0; + case wsrep::client_state::m_local: + debug_log_state("start_transaction success"); + return provider().start_transaction(ws_handle_); + default: + debug_log_state("start_transaction error"); + assert(0); + return 1; + } +} + +int wsrep::transaction::start_transaction( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + debug_log_state("start_transaction enter"); + if (state() != s_replaying) + { + // assert(ws_meta.flags()); + assert(active() == false); + assert(flags() == 0); + server_id_ = ws_meta.server_id(); + id_ = ws_meta.transaction_id(); + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + state_ = s_executing; + state_hist_.clear(); + ws_handle_ = ws_handle; + ws_meta_ = ws_meta; + flags(wsrep::provider::flag::start_transaction); + certified_ = true; + } + else + { + ws_meta_ = ws_meta; + assert(ws_meta_.flags() & wsrep::provider::flag::commit); + assert(active()); + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + assert(state() == s_replaying); + assert(ws_meta_.seqno().is_undefined() == false); + certified_ = true; + } + debug_log_state("start_transaction leave"); + return 0; +} + +int wsrep::transaction::next_fragment( + const wsrep::ws_meta& ws_meta) +{ + debug_log_state("next_fragment enter"); + ws_meta_ = ws_meta; + debug_log_state("next_fragment leave"); + return 0; +} + +void wsrep::transaction::adopt(const wsrep::transaction& transaction) +{ + debug_log_state("adopt enter"); + assert(transaction.is_streaming()); + start_transaction(transaction.id()); + server_id_ = transaction.server_id_; + flags_ = transaction.flags(); + streaming_context_ = transaction.streaming_context(); + debug_log_state("adopt leave"); +} + +void wsrep::transaction::fragment_applied(wsrep::seqno seqno) +{ + assert(active()); + streaming_context_.applied(seqno); +} + +int wsrep::transaction::prepare_for_ordering( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) +{ + assert(active()); + + if (state_ != s_replaying) + { + ws_handle_ = ws_handle; + ws_meta_ = ws_meta; + certified_ = is_commit; + } + return 0; +} + +int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid) +{ + try + { + return provider().assign_read_view(ws_handle_, gtid); + } + catch (...) + { + wsrep::log_error() << "Failed to assign read view"; + return 1; + } +} + +int wsrep::transaction::append_key(const wsrep::key& key) +{ + assert(active()); + try + { + debug_log_key_append(key); + sr_keys_.insert(key); + return provider().append_key(ws_handle_, key); + } + catch (...) + { + wsrep::log_error() << "Failed to append key"; + return 1; + } +} + +int wsrep::transaction::append_data(const wsrep::const_buffer& data) +{ + assert(active()); + return provider().append_data(ws_handle_, data); +} + +int wsrep::transaction::after_row() +{ + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); + debug_log_state("after_row_enter"); + int ret(0); + if (streaming_context_.fragment_size() && + streaming_context_.fragment_unit() != streaming_context::statement) + { + ret = streaming_step(lock); + } + debug_log_state("after_row_leave"); + return ret; +} + +int wsrep::transaction::before_prepare( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + assert(lock.owns_lock()); + int ret(0); + debug_log_state("before_prepare_enter"); + assert(state() == s_executing || state() == s_must_abort || + state() == s_replaying); + + if (state() == s_must_abort) + { + assert(client_state_.mode() == wsrep::client_state::m_local); + client_state_.override_error(wsrep::e_deadlock_error); + return 1; + } + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + if (is_streaming()) + { + client_service_.debug_crash( + "crash_last_fragment_commit_before_fragment_removal"); + lock.unlock(); + if (client_service_.statement_allowed_for_streaming() == false) + { + client_state_.override_error( + wsrep::e_error_during_commit, + wsrep::provider::error_not_allowed); + ret = 1; + } + else if (!is_xa()) + { + // Note: we can't remove fragments here for XA, + // the transaction has already issued XA END and + // is in IDLE state, no more changes allowed! + ret = client_service_.remove_fragments(); + if (ret) + { + client_state_.override_error(wsrep::e_deadlock_error); + } + } + lock.lock(); + client_service_.debug_crash( + "crash_last_fragment_commit_after_fragment_removal"); + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + ret = 1; + } + } + + if (ret == 0) + { + if (is_xa()) + { + // Force fragment replication on XA prepare + flags(flags() | wsrep::provider::flag::prepare); + pa_unsafe(true); + append_sr_keys_for_commit(); + const bool force_streaming_step = true; + ret = streaming_step(lock, force_streaming_step); + if (ret == 0) + { + assert(state() == s_executing); + state(lock, s_preparing); + } + } + else + { + ret = certify_commit(lock); + } + + assert((ret == 0 && state() == s_preparing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + + if (ret) + { + assert(state() == s_must_replay || + client_state_.current_error()); + ret = 1; + } + } + + break; + case wsrep::client_state::m_high_priority: + // Note: fragment removal is done from applying + // context for high priority mode. + if (is_xa()) + { + assert(state() == s_executing || + state() == s_replaying); + if (state() == s_replaying) + { + break; + } + } + state(lock, s_preparing); + break; + default: + assert(0); + break; + } + + assert(state() == s_preparing || + (is_xa() && state() == s_replaying) || + (ret && (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed || + state() == s_aborted))); + debug_log_state("before_prepare_leave"); + return ret; +} + +int wsrep::transaction::after_prepare( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + assert(lock.owns_lock()); + + int ret = 0; + debug_log_state("after_prepare_enter"); + if (is_xa()) + { + switch (state()) + { + case s_preparing: + assert(client_state_.mode() == wsrep::client_state::m_local || + (certified() && ordered())); + state(lock, s_prepared); + break; + case s_must_abort: + // prepared locally, but client has not received + // a result yet. We can still abort. + assert(client_state_.mode() == wsrep::client_state::m_local); + client_state_.override_error(wsrep::e_deadlock_error); + ret = 1; + break; + case s_replaying: + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + break; + default: + assert(0); + } + } + else + { + assert(certified() && ordered()); + assert(state() == s_preparing || state() == s_must_abort); + + if (state() == s_must_abort) + { + assert(client_state_.mode() == wsrep::client_state::m_local); + state(lock, s_must_replay); + ret = 1; + } + else + { + state(lock, s_committing); + } + } + debug_log_state("after_prepare_leave"); + return ret; +} + +int wsrep::transaction::before_commit() +{ + int ret(1); + + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); + debug_log_state("before_commit_enter"); + assert(client_state_.mode() != wsrep::client_state::m_toi); + assert(state() == s_executing || + state() == s_prepared || + state() == s_committing || + state() == s_must_abort || + state() == s_replaying); + assert((state() != s_committing && state() != s_replaying) || + certified()); + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + if (state() == s_executing) + { + ret = before_prepare(lock) || after_prepare(lock); + assert((ret == 0 && + (state() == s_committing || state() == s_prepared)) + || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed || + state() == s_aborted)); + } + else if (state() != s_committing && state() != s_prepared) + { + assert(state() == s_must_abort); + if (certified() || + (is_xa() && is_streaming())) + { + state(lock, s_must_replay); + } + else + { + client_state_.override_error(wsrep::e_deadlock_error); + } + } + else + { + // 2PC commit, prepare was done before + ret = 0; + } + + if (ret == 0 && state() == s_prepared) + { + ret = certify_commit(lock); + assert((ret == 0 && state() == s_committing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed || + state() == s_prepared)); + } + + if (ret == 0) + { + assert(certified()); + assert(ordered()); + lock.unlock(); + client_service_.debug_sync("wsrep_before_commit_order_enter"); + enum wsrep::provider::status + status(provider().commit_order_enter(ws_handle_, ws_meta_)); + lock.lock(); + switch (status) + { + case wsrep::provider::success: + break; + case wsrep::provider::error_bf_abort: + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + state(lock, s_must_replay); + ret = 1; + break; + default: + ret = 1; + assert(0); + break; + } + } + break; + case wsrep::client_state::m_high_priority: + assert(certified()); + assert(ordered()); + if (is_xa()) + { + assert(state() == s_prepared || + state() == s_replaying); + state(lock, s_committing); + ret = 0; + } + else if (state() == s_executing || state() == s_replaying) + { + ret = before_prepare(lock) || after_prepare(lock); + } + else + { + ret = 0; + } + lock.unlock(); + ret = ret || provider().commit_order_enter(ws_handle_, ws_meta_); + lock.lock(); + if (ret) + { + state(lock, s_must_abort); + state(lock, s_aborting); + } + break; + default: + assert(0); + break; + } + + if (ret == 0 && state() == s_committing) + { + is_bf_immutable_ = true; + } + + debug_log_state("before_commit_leave"); + return ret; +} + +int wsrep::transaction::ordered_commit() +{ + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); + debug_log_state("ordered_commit_enter"); + assert(state() == s_committing); + assert(is_bf_immutable_); + assert(ordered()); + client_service_.debug_sync("wsrep_before_commit_order_leave"); + int ret(provider().commit_order_leave(ws_handle_, ws_meta_, + apply_error_buf_)); + client_service_.debug_sync("wsrep_after_commit_order_leave"); + // Should always succeed: + // 1) If before commit before succeeds, the transaction handle + // in the provider is guaranteed to exist and the commit + // has been ordered + // 2) The transaction which has been ordered for commit cannot be BF + // aborted anymore + // 3) The provider should always guarantee that the transactions which + // have been ordered for commit can finish committing. + // + // The exception here is a storage service transaction which is running + // in high priority mode. The fragment storage commit may get BF + // aborted in the provider after commit ordering has been + // established since the transaction is operating in streaming + // mode. + if (ret) + { + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + state(lock, s_must_abort); + state(lock, s_aborting); + } + else + { + state(lock, s_ordered_commit); + } + debug_log_state("ordered_commit_leave"); + return ret; +} + +int wsrep::transaction::after_commit() +{ + int ret(0); + + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); + assert(is_bf_immutable_); + debug_log_state("after_commit_enter"); + assert(state() == s_ordered_commit); + + if (is_streaming()) + { + assert(client_state_.mode() == wsrep::client_state::m_local || + client_state_.mode() == wsrep::client_state::m_high_priority); + + if (is_xa()) + { + // XA fragment removal happens here, + // see comment in before_prepare + remove_fragments_in_storage_service_scope(lock); + } + + if (client_state_.mode() == wsrep::client_state::m_local) + { + lock.unlock(); + client_state_.server_state_.stop_streaming_client(&client_state_); + lock.lock(); + } + streaming_context_.cleanup(); + } + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + ret = provider().release(ws_handle_); + break; + case wsrep::client_state::m_high_priority: + break; + default: + assert(0); + break; + } + assert(ret == 0); + state(lock, s_committed); + debug_log_state("after_commit_leave"); + return ret; +} + +int wsrep::transaction::before_rollback() +{ + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); + debug_log_state("before_rollback_enter"); + assert(state() == s_executing || + state() == s_preparing || + state() == s_prepared || + state() == s_must_abort || + // Background rollbacker or rollback initiated from SE + state() == s_aborting || + state() == s_cert_failed || + state() == s_must_replay); + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + if (is_streaming()) + { + client_service_.debug_sync("wsrep_before_SR_rollback"); + } + switch (state()) + { + case s_preparing: + // Error detected during prepare phase + state(lock, s_must_abort); + WSREP_FALLTHROUGH; + case s_prepared: + WSREP_FALLTHROUGH; + case s_executing: + // Voluntary rollback + if (is_streaming()) + { + streaming_rollback(lock); + } + state(lock, s_aborting); + break; + case s_must_abort: + if (certified()) + { + state(lock, s_must_replay); + } + else + { + if (is_streaming()) + { + streaming_rollback(lock); + } + state(lock, s_aborting); + } + break; + case s_cert_failed: + if (is_streaming()) + { + streaming_rollback(lock); + } + state(lock, s_aborting); + break; + case s_aborting: + if (is_streaming()) + { + streaming_rollback(lock); + } + break; + case s_must_replay: + break; + default: + assert(0); + break; + } + break; + case wsrep::client_state::m_high_priority: + // Rollback by rollback write set or BF abort + assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting); + if (state_ != s_aborting) + { + state(lock, s_aborting); + } + break; + default: + assert(0); + break; + } + + debug_log_state("before_rollback_leave"); + return 0; +} + +int wsrep::transaction::after_rollback() +{ + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); + debug_log_state("after_rollback_enter"); + assert(state() == s_aborting || state() == s_must_replay); + + // Note that it would be technically more correct to + // remove fragments after TOI BF abort in before_rollback(), + // it seems to cause deadlocks and is done here instead. + // We assume that the application does not let the TOI + // to proceed until this method returns, e.g. by holding + // MDL locks. It is not clear how to enforce that though. + if (is_streaming() && bf_aborted_in_total_order_) + { + remove_fragments_in_storage_service_scope(lock); + } + + if (state() == s_aborting) + { + if (is_streaming()) + { + // We skip streaming context cleanup for replay because + // we want to remember if the transaction was streaming. + // See transaction::replay() + streaming_context_.cleanup(); + } + + state(lock, s_aborted); + } + + // Releasing the transaction from provider is postponed into + // after_statement() hook. Depending on DBMS system all the + // resources acquired by transaction may or may not be released + // during actual rollback. If the transaction has been ordered, + // releasing the commit ordering critical section should be + // also postponed until all resources have been released. + debug_log_state("after_rollback_leave"); + return 0; +} + +int wsrep::transaction::release_commit_order( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + lock.unlock(); + int ret(provider().commit_order_enter(ws_handle_, ws_meta_)); + if (!ret) + { + server_service_.set_position(client_service_, ws_meta_.gtid()); + ret = provider().commit_order_leave(ws_handle_, ws_meta_, + apply_error_buf_); + } + // grabbing lock here, as set_position may call for sync wait in galera side + lock.lock(); + return ret; +} + +void wsrep::transaction::remove_fragments_in_storage_service_scope( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + assert(lock.owns_lock()); + lock.unlock(); + { + scoped_storage_service<storage_service_deleter> + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + storage_service.adopt_transaction(*this); + storage_service.remove_fragments(); + storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta()); + } + lock.lock(); +} + +int wsrep::transaction::after_statement() +{ + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); + return after_statement(lock); +} + +int wsrep::transaction::after_statement(wsrep::unique_lock<wsrep::mutex>& lock) +{ + int ret(0); + debug_log_state("after_statement_enter"); + assert(lock.owns_lock()); + assert(client_state_.mode() == wsrep::client_state::m_local); + assert(state() == s_executing || + state() == s_prepared || + state() == s_committed || + state() == s_aborted || + state() == s_must_abort || + state() == s_cert_failed || + state() == s_must_replay); + + if (state() == s_executing && + streaming_context_.fragment_size() && + streaming_context_.fragment_unit() == streaming_context::statement) + { + ret = streaming_step(lock); + } + + switch (state()) + { + case s_executing: + // ? + break; + case s_prepared: + assert(is_xa()); + break; + case s_committed: + assert(is_streaming() == false); + break; + case s_must_abort: + case s_cert_failed: + // Error may be set already. For example, if fragment size + // exceeded the maximum size in certify_fragment(), then + // we already have wsrep::e_error_during_commit + if (client_state_.current_error() == wsrep::e_success) + { + client_state_.override_error(wsrep::e_deadlock_error); + } + lock.unlock(); + ret = client_service_.bf_rollback(); + lock.lock(); + if (state() != s_must_replay) + { + break; + } + // Continue to replay if rollback() changed the state to s_must_replay + WSREP_FALLTHROUGH; + case s_must_replay: + { + if (is_xa() && !ordered()) + { + ret = xa_replay_commit(lock); + } + else + { + ret = replay(lock); + } + break; + } + case s_aborted: + // Raise a deadlock error if the transaction was BF aborted and + // rolled back by client outside of transaction hooks. + if (bf_aborted() && client_state_.current_error() == wsrep::e_success && + !client_service_.is_xa_rollback()) + { + client_state_.override_error(wsrep::e_deadlock_error); + } + break; + default: + assert(0); + break; + } + + assert(state() == s_executing || + state() == s_prepared || + state() == s_committed || + state() == s_aborted || + state() == s_must_replay); + + if (state() == s_aborted) + { + if (ordered()) + { + ret = release_commit_order(lock); + } + lock.unlock(); + provider().release(ws_handle_); + lock.lock(); + } + + if (state() != s_executing && + (!client_service_.is_explicit_xa() || + client_state_.state() == wsrep::client_state::s_quitting)) + { + cleanup(); + } + fragments_certified_for_statement_ = 0; + debug_log_state("after_statement_leave"); + assert(ret == 0 || state() == s_aborted); + return ret; +} + +void wsrep::transaction::after_command_must_abort( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + debug_log_state("after_command_must_abort enter"); + assert(active()); + assert(state_ == s_must_abort); + + if (is_xa() && is_streaming()) + { + state(lock, s_must_replay); + } + + lock.unlock(); + client_service_.bf_rollback(); + lock.lock(); + + if (is_xa() && is_streaming()) + { + xa_replay(lock); + } + else + { + client_state_.override_error(wsrep::e_deadlock_error); + } + + debug_log_state("after_command_must_abort leave"); +} + +void wsrep::transaction::after_applying() +{ + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); + debug_log_state("after_applying enter"); + assert(state_ == s_executing || + state_ == s_prepared || + state_ == s_committed || + state_ == s_aborted || + state_ == s_replaying); + + if (state_ != s_executing && state_ != s_prepared) + { + if (state_ == s_replaying) state(lock, s_aborted); + cleanup(); + } + else + { + // State remains executing or prepared, so this is a streaming applier. + // Reset the meta data to avoid releasing commit order + // critical section above if the next fragment is rollback + // fragment. Rollback fragment ordering will be handled by + // another instance while removing the fragments from + // storage. + ws_meta_ = wsrep::ws_meta(); + } + debug_log_state("after_applying leave"); +} + +bool wsrep::transaction::bf_abort( + wsrep::unique_lock<wsrep::mutex>& lock, + wsrep::seqno bf_seqno) +{ + bool ret(false); + const enum wsrep::transaction::state state_at_enter(state()); + assert(lock.owns_lock()); + + if (active() == false) + { + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Transaction not active, skipping bf abort"); + } + else if (is_bf_immutable_) + { + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Transaction has become immutable for BF abort"); + } + else + { + switch (state_at_enter) + { + case s_executing: + case s_preparing: + case s_prepared: + case s_certifying: + case s_committing: + { + wsrep::seqno victim_seqno; + enum wsrep::provider::status + status(client_state_.provider().bf_abort( + bf_seqno, id_, victim_seqno)); + switch (status) + { + case wsrep::provider::success: + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Seqno " << bf_seqno + << " successfully BF aborted " << id_ + << " victim_seqno " << victim_seqno); + bf_abort_state_ = state_at_enter; + state(lock, s_must_abort); + ret = true; + break; + default: + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Seqno " << bf_seqno + << " failed to BF abort " << id_ + << " with status " << status + << " victim_seqno " << victim_seqno); + break; + } + break; + } + default: + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "BF abort not allowed in state " + << wsrep::to_string(state_at_enter)); + break; + } + } + + if (ret) + { + bf_abort_client_state_ = client_state_.state(); + // If the transaction is in executing state, we must initiate + // streaming rollback to ensure that the rollback fragment gets + // replicated before the victim starts to roll back and release locks. + // In other states the BF abort will be detected outside of + // storage engine operations and streaming rollback will be + // handled from before_rollback() call. + if (client_state_.mode() == wsrep::client_state::m_local && + is_streaming() && state_at_enter == s_executing) + { + streaming_rollback(lock); + } + + if ((client_state_.state() == wsrep::client_state::s_idle && + client_state_.server_state().rollback_mode() == + wsrep::server_state::rm_sync) // locally processing idle + || + // high priority streaming + (client_state_.mode() == wsrep::client_state::m_high_priority && + is_streaming())) + { + // We need to change the state to aborting under the + // lock protection to avoid a race between client thread, + // otherwise it could happen that the client gains control + // between releasing the lock and before background + // rollbacker gets control. + if (is_xa() && state_at_enter == s_prepared) + { + state(lock, s_must_replay); + client_state_.set_rollbacker_active(true); + } + else + { + state(lock, s_aborting); + client_state_.set_rollbacker_active(true); + if (client_state_.mode() == wsrep::client_state::m_high_priority) + { + lock.unlock(); + client_state_.server_state().stop_streaming_applier( + server_id_, id_); + lock.lock(); + } + } + + server_service_.background_rollback(lock, client_state_); + } + } + return ret; +} + +bool wsrep::transaction::total_order_bf_abort( + wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED, + wsrep::seqno bf_seqno) +{ + /* We must set this flag before entering bf_abort() in order + * to streaming_rollback() work correctly. The flag will be + * unset if BF abort was not allowed. Note that we rely in + * bf_abort() not to release lock if the BF abort is not allowed. */ + bf_aborted_in_total_order_ = true; + bool ret(bf_abort(lock, bf_seqno)); + if (not ret) + { + bf_aborted_in_total_order_ = false; + } + return ret; +} + +void wsrep::transaction::clone_for_replay(const wsrep::transaction& other) +{ + assert(other.state() == s_replaying); + id_ = other.id_; + xid_ = other.xid_; + server_id_ = other.server_id_; + ws_handle_ = other.ws_handle_; + ws_meta_ = other.ws_meta_; + streaming_context_ = other.streaming_context_; + state_ = s_replaying; +} + +void wsrep::transaction::assign_xid(const wsrep::xid& xid) +{ + assert(active()); + assert(!is_xa()); + xid_ = xid; +} + +int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) +{ + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); + assert(active()); + assert(is_empty()); + assert(state() == s_executing || state() == s_replaying); + flags(flags() & ~wsrep::provider::flag::start_transaction); + if (state() == s_executing) + { + state(lock, s_certifying); + } + else + { + state(lock, s_preparing); + } + state(lock, s_prepared); + xid_ = xid; + return 0; +} + +int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, + bool commit) +{ + debug_log_state("commit_or_rollback_by_xid enter"); + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); + wsrep::server_state& server_state(client_state_.server_state()); + wsrep::high_priority_service* sa(server_state.find_streaming_applier(xid)); + + if (!sa) + { + assert(sa); + client_state_.override_error(wsrep::e_error_during_commit); + return 1; + } + + if (commit) + { + flags(wsrep::provider::flag::commit); + } + else + { + flags(wsrep::provider::flag::rollback); + } + pa_unsafe(true); + wsrep::stid stid(sa->transaction().server_id(), + sa->transaction().id(), + client_state_.id()); + wsrep::ws_meta meta(stid); + + const enum wsrep::provider::status cert_ret( + provider().certify(client_state_.id(), + ws_handle_, + flags(), + meta)); + + int ret; + if (cert_ret == wsrep::provider::success) + { + if (commit) + { + state(lock, s_certifying); + state(lock, s_committing); + state(lock, s_committed); + } + else + { + state(lock, s_aborting); + state(lock, s_aborted); + } + ret = 0; + } + else + { + client_state_.override_error(wsrep::e_error_during_commit, + cert_ret); + wsrep::log_error() << "Failed to commit_or_rollback_by_xid," + << " xid: " << xid + << " error: " << cert_ret; + ret = 1; + } + debug_log_state("commit_or_rollback_by_xid leave"); + return ret; +} + +void wsrep::transaction::xa_detach() +{ + debug_log_state("xa_detach enter"); + assert(state() == s_prepared || + client_state_.state() == wsrep::client_state::s_quitting); + if (state() == s_prepared) + { + wsrep::server_state& server_state(client_state_.server_state()); + server_state.convert_streaming_client_to_applier(&client_state_); + client_service_.store_globals(); + client_service_.cleanup_transaction(); + } + wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); + streaming_context_.cleanup(); + state(lock, s_aborting); + state(lock, s_aborted); + provider().release(ws_handle_); + cleanup(); + debug_log_state("xa_detach leave"); +} + +void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock) +{ + assert(lock.owns_lock()); + assert(is_xa()); + assert(is_streaming()); + assert(state() == s_must_replay); + assert(bf_aborted()); + + state(lock, s_replaying); + + enum wsrep::provider::status status; + wsrep::server_state& server_state(client_state_.server_state()); + + lock.unlock(); + server_state.convert_streaming_client_to_applier(&client_state_); + status = client_service_.replay_unordered(); + client_service_.store_globals(); + lock.lock(); + + if (status != wsrep::provider::success) + { + client_service_.emergency_shutdown(); + } +} + +int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock) +{ + debug_log_state("xa_replay enter"); + xa_replay_common(lock); + state(lock, s_aborted); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + client_service_.signal_replayed(); + debug_log_state("xa_replay leave"); + return 0; +} + +int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock) +{ + debug_log_state("xa_replay_commit enter"); + xa_replay_common(lock); + lock.unlock(); + enum wsrep::provider::status status(client_service_.commit_by_xid()); + lock.lock(); + int ret(1); + switch (status) + { + case wsrep::provider::success: + state(lock, s_committed); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + ret = 0; + break; + default: + log_warning() << "Failed to commit by xid during replay"; + // Commit by xid failed, return a commit + // error and let the client retry + state(lock, s_preparing); + state(lock, s_prepared); + client_state_.override_error(wsrep::e_error_during_commit, status); + } + + client_service_.signal_replayed(); + debug_log_state("xa_replay_commit leave"); + return ret; +} + +//////////////////////////////////////////////////////////////////////////////// +// Private // +//////////////////////////////////////////////////////////////////////////////// + +inline wsrep::provider& wsrep::transaction::provider() +{ + return client_state_.server_state().provider(); +} + +void wsrep::transaction::state( + wsrep::unique_lock<wsrep::mutex>& lock __attribute__((unused)), + enum wsrep::transaction::state next_state) +{ + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "client: " << client_state_.id().get() + << " txc: " << id().get() + << " state: " << to_string(state_) + << " -> " << to_string(next_state)); + + assert(lock.owns_lock()); + // BF aborter is allowed to change the state without gaining control + // to the state if the next state is s_must_abort, s_aborting or + // s_must_replay (for xa idle replay). + assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || + next_state == s_must_abort || + next_state == s_must_replay || + next_state == s_aborting); + + static const char allowed[n_states][n_states] = + { /* ex pg pd ce co oc ct cf ma ab ad mr re */ + { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ + { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */ + { 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0}, /* pd */ + { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ + { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ + { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ + { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ + { 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0} /* re */ + }; + + if (!allowed[state_][next_state]) + { + wsrep::log_debug() << "unallowed state transition for transaction " + << id_ << ": " << wsrep::to_string(state_) + << " -> " << wsrep::to_string(next_state); + assert(0); + } + + state_hist_.push_back(state_); + if (state_hist_.size() == 12) + { + state_hist_.erase(state_hist_.begin()); + } + state_ = next_state; + + if (state_ == s_must_replay) + { + client_service_.will_replay(); + } +} + +bool wsrep::transaction::abort_or_interrupt( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + assert(lock.owns_lock()); + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + return true; + } + else if (state() == s_aborting || state() == s_aborted) + { + // Somehow we got there after BF abort without setting error + // status. This may happen if the DBMS side aborts the transaction + // but still tries to continue processing rows. Raise the error here + // and assert so that the error will be caught in debug builds. + if (bf_abort_client_state_ && + client_state_.current_error() == wsrep::e_success) + { + client_state_.override_error(wsrep::e_deadlock_error); + assert(0); + } + return true; + } + else if (client_service_.interrupted(lock)) + { + client_state_.override_error(wsrep::e_interrupted_error); + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + return true; + } + return false; +} + +int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock, + bool force) +{ + assert(lock.owns_lock()); + assert(streaming_context_.fragment_size() || is_xa()); + + if (client_service_.bytes_generated() < + streaming_context_.log_position()) + { + /* Something went wrong on DBMS side in keeping track of + generated bytes. Return an error to abort the transaction. */ + wsrep::log_warning() << "Bytes generated " + << client_service_.bytes_generated() + << " less than bytes certified " + << streaming_context_.log_position() + << ", aborting streaming transaction"; + return 1; + } + int ret(0); + + const size_t bytes_to_replicate(client_service_.bytes_generated() - + streaming_context_.log_position()); + + switch (streaming_context_.fragment_unit()) + { + case streaming_context::row: + WSREP_FALLTHROUGH; + case streaming_context::statement: + streaming_context_.increment_unit_counter(1); + break; + case streaming_context::bytes: + streaming_context_.set_unit_counter(bytes_to_replicate); + break; + } + + // Some statements have no effect. Do not atttempt to + // replicate a fragment if no data has been generated since + // last fragment replication. + // We use `force = true` on XA prepare: a fragment will be + // generated even if no data is pending replication. + if (bytes_to_replicate <= 0 && !force) + { + assert(bytes_to_replicate == 0); + return ret; + } + + if (streaming_context_.fragment_size_exceeded() || force) + { + streaming_context_.reset_unit_counter(); + ret = certify_fragment(lock); + } + + return ret; +} + +int wsrep::transaction::certify_fragment( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + assert(lock.owns_lock()); + + assert(client_state_.mode() == wsrep::client_state::m_local); + assert(streaming_context_.rolled_back() == false || + state() == s_must_abort); + + client_service_.wait_for_replayers(lock); + if (abort_or_interrupt(lock)) + { + return 1; + } + + state(lock, s_certifying); + lock.unlock(); + client_service_.debug_sync("wsrep_before_fragment_certification"); + + enum wsrep::provider::status status( + client_state_.server_state_.send_pending_rollback_events()); + if (status) + { + wsrep::log_warning() + << "Failed to replicate pending rollback events: " + << status << " (" + << wsrep::provider::to_string(status) << ")"; + lock.lock(); + state(lock, s_must_abort); + return 1; + } + + wsrep::mutable_buffer data; + size_t log_position(0); + if (client_service_.prepare_fragment_for_replication(data, log_position)) + { + lock.lock(); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit); + return 1; + } + streaming_context_.set_log_position(log_position); + + if (data.size() == 0) + { + wsrep::log_warning() << "Attempt to replicate empty data buffer"; + lock.lock(); + state(lock, s_executing); + return 0; + } + + if (provider().append_data(ws_handle_, + wsrep::const_buffer(data.data(), data.size()))) + { + lock.lock(); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit); + return 1; + } + + if (is_xa()) + { + // One more check to see if the transaction + // has been aborted. This is necessary because + // until the append_data above will make sure + // that the transaction exists in provider. + lock.lock(); + if (abort_or_interrupt(lock)) + { + return 1; + } + lock.unlock(); + } + + if (is_streaming() == false) + { + client_state_.server_state_.start_streaming_client(&client_state_); + } + + if (implicit_deps()) + { + flags(flags() | wsrep::provider::flag::implicit_deps); + } + + int ret(0); + enum wsrep::client_error error(wsrep::e_success); + enum wsrep::provider::status cert_ret(wsrep::provider::success); + // Storage service scope + { + scoped_storage_service<storage_service_deleter> + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + + // First the fragment is appended to the stable storage. + // This is done to ensure that there is enough capacity + // available to store the fragment. The fragment meta data + // is updated after certification. + wsrep::id server_id(client_state_.server_state().id()); + + if (server_id.is_undefined()) { + // Server disconnected from cluster, do not + // append a fragment with undefined server_id. + ret = 1; + error = wsrep::e_append_fragment_error; + } + + if (ret == 0) + { + ret = storage_service.start_transaction(ws_handle_); + if (ret) + { + error = wsrep::e_append_fragment_error; + } + } + + if (ret == 0) + { + ret = storage_service.append_fragment( + server_id, id(), flags(), + wsrep::const_buffer(data.data(), data.size()), xid()); + if (ret) + { + error = wsrep::e_append_fragment_error; + storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta()); + } + } + + if (ret == 0) + { + client_service_.debug_crash( + "crash_replicate_fragment_before_certify"); + + wsrep::ws_meta sr_ws_meta; + cert_ret = provider().certify(client_state_.id(), + ws_handle_, + flags(), + sr_ws_meta); + client_service_.debug_crash( + "crash_replicate_fragment_after_certify"); + + switch (cert_ret) + { + case wsrep::provider::success: + ++fragments_certified_for_statement_; + assert(sr_ws_meta.seqno().is_undefined() == false); + streaming_context_.certified(); + if (storage_service.update_fragment_meta(sr_ws_meta)) + { + storage_service.rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + ret = 1; + error = wsrep::e_deadlock_error; + break; + } + if (storage_service.commit(ws_handle_, sr_ws_meta)) + { + ret = 1; + error = wsrep::e_deadlock_error; + } + else + { + streaming_context_.stored(sr_ws_meta.seqno()); + } + client_service_.debug_crash( + "crash_replicate_fragment_success"); + break; + case wsrep::provider::error_bf_abort: + case wsrep::provider::error_certification_failed: + // Streaming transcation got BF aborted, so it must roll + // back. Roll back the fragment storage operation out of + // order as the commit order will be grabbed later on + // during rollback process. Mark the fragment as certified + // though in streaming context in order to enter streaming + // rollback codepath. + // + // Note that despite we handle error_certification_failed + // here, we mark the transaction as streaming. Apparently + // the provider may return status corresponding to certification + // failure even if the fragment has passed certification. + // This may be a bug in provider implementation or a limitation + // of error codes defined in wsrep-API. In order to make + // sure that the transaction will be cleaned on other servers, + // we take a risk of sending one rollback fragment for nothing. + storage_service.rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + streaming_context_.certified(); + ret = 1; + error = wsrep::e_deadlock_error; + break; + default: + // Storage service rollback must be done out of order, + // otherwise there may be a deadlock between BF aborter + // and the rollback process. + storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta()); + ret = 1; + error = (cert_ret == wsrep::provider::error_size_exceeded ? + wsrep::e_size_exceeded_error : + wsrep::e_deadlock_error); + break; + } + } + } + + // Note: This does not release the handle in the provider + // since streaming is still on. However it is needed to + // make provider internal state to transition for the + // next fragment. If any of the operations above failed, + // the handle needs to be left unreleased for the following + // rollback process. + if (ret == 0) + { + assert(error == wsrep::e_success); + ret = provider().release(ws_handle_); + if (ret) + { + error = wsrep::e_deadlock_error; + } + } + lock.lock(); + if (ret) + { + assert(error != wsrep::e_success); + if (is_streaming() == false) + { + lock.unlock(); + client_state_.server_state_.stop_streaming_client(&client_state_); + lock.lock(); + } + else + { + streaming_rollback(lock); + } + if (state_ != s_must_abort) + { + state(lock, s_must_abort); + } + client_state_.override_error(error, cert_ret); + } + else if (state_ == s_must_abort) + { + if (is_streaming()) + { + streaming_rollback(lock); + } + client_state_.override_error(wsrep::e_deadlock_error, cert_ret); + ret = 1; + } + else + { + assert(state_ == s_certifying); + state(lock, s_executing); + flags(flags() & ~wsrep::provider::flag::start_transaction); + flags(flags() & ~wsrep::provider::flag::pa_unsafe); + } + return ret; +} + +int wsrep::transaction::certify_commit( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + assert(lock.owns_lock()); + assert(active()); + client_service_.wait_for_replayers(lock); + + assert(lock.owns_lock()); + + if (abort_or_interrupt(lock)) + { + if (is_xa() && state() == s_must_abort) + { + state(lock, s_must_replay); + } + return 1; + } + + state(lock, s_certifying); + lock.unlock(); + + enum wsrep::provider::status status( + client_state_.server_state_.send_pending_rollback_events()); + if (status) + { + wsrep::log_warning() + << "Failed to replicate pending rollback events: " + << status << " (" + << wsrep::provider::to_string(status) << ")"; + + // We failed to replicate some pending rollback fragment. + // Meaning that some transaction that was rolled back + // locally might still be active out there in the cluster. + // To avoid a potential BF-BF conflict, we need to abort + // and give up on this one. + // Notice that we can't abort a prepared XA that wants to + // commit. Fortunately, there is no need to in this case: + // the commit fragment for XA does not cause any changes and + // can't possibly conflict with other transactions out there. + if (!is_xa()) + { + lock.lock(); + state(lock, s_must_abort); + return 1; + } + } + + if (is_streaming()) + { + if (!is_xa()) + { + append_sr_keys_for_commit(); + } + pa_unsafe(true); + } + + if (implicit_deps()) + { + flags(flags() | wsrep::provider::flag::implicit_deps); + } + + flags(flags() | wsrep::provider::flag::commit); + flags(flags() & ~wsrep::provider::flag::prepare); + + if (client_service_.prepare_data_for_replication()) + { + lock.lock(); + // Here we fake that the size exceeded error came from provider, + // even though it came from the client service. This requires + // some consideration how to get meaningful error codes from + // the client service. + client_state_.override_error(wsrep::e_size_exceeded_error, + wsrep::provider::error_size_exceeded); + if (state_ != s_must_abort) + { + state(lock, s_must_abort); + } + return 1; + } + + client_service_.debug_sync("wsrep_before_certification"); + enum wsrep::provider::status + cert_ret(provider().certify(client_state_.id(), + ws_handle_, + flags(), + ws_meta_)); + client_service_.debug_sync("wsrep_after_certification"); + + lock.lock(); + + assert(state() == s_certifying || state() == s_must_abort); + + int ret(1); + switch (cert_ret) + { + case wsrep::provider::success: + assert(ordered()); + certified_ = true; + ++fragments_certified_for_statement_; + switch (state()) + { + case s_certifying: + if (is_xa()) + { + state(lock, s_committing); + } + else + { + state(lock, s_preparing); + } + ret = 0; + break; + case s_must_abort: + // We got BF aborted after succesful certification + // and before acquiring client state lock. The trasaction + // must be replayed. + state(lock, s_must_replay); + break; + default: + assert(0); + break; + } + break; + case wsrep::provider::error_warning: + assert(ordered() == false); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_transaction_missing: + state(lock, s_must_abort); + // The execution should never reach this point if the + // transaction has not generated any keys or data. + wsrep::log_warning() << "Transaction was missing in provider"; + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_bf_abort: + // Transaction was replicated successfully and it was either + // certified successfully or the result of certifying is not + // yet known. Therefore the transaction must roll back + // and go through replay either to replay and commit the whole + // transaction or to determine failed certification status. + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + state(lock, s_must_replay); + break; + case wsrep::provider::error_certification_failed: + state(lock, s_cert_failed); + client_state_.override_error(wsrep::e_deadlock_error); + break; + case wsrep::provider::error_size_exceeded: + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_connection_failed: + // Galera provider may return CONN_FAIL if the trx is + // BF aborted O_o. If we see here that the trx was BF aborted, + // return deadlock error instead of error during commit + // to reduce number of error state combinations elsewhere. + if (state() == s_must_abort) + { + if (is_xa()) + { + state(lock, s_must_replay); + } + client_state_.override_error(wsrep::e_deadlock_error); + } + else + { + client_state_.override_error(wsrep::e_error_during_commit, + cert_ret); + if (is_xa()) + { + state(lock, s_prepared); + } + else + { + state(lock, s_must_abort); + } + } + break; + case wsrep::provider::error_provider_failed: + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_fatal: + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + state(lock, s_must_abort); + client_service_.emergency_shutdown(); + break; + case wsrep::provider::error_not_implemented: + case wsrep::provider::error_not_allowed: + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + state(lock, s_must_abort); + wsrep::log_warning() << "Certification operation was not allowed: " + << "id: " << id().get() + << " flags: " << std::hex << flags() << std::dec; + break; + default: + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + } + + return ret; +} + +int wsrep::transaction::append_sr_keys_for_commit() +{ + int ret(0); + assert(client_state_.mode() == wsrep::client_state::m_local); + for (wsrep::sr_key_set::branch_type::const_iterator + i(sr_keys_.root().begin()); + ret == 0 && i != sr_keys_.root().end(); ++i) + { + for (wsrep::sr_key_set::leaf_type::const_iterator + j(i->second.begin()); + ret == 0 && j != i->second.end(); ++j) + { + wsrep::key key(wsrep::key::shared); + key.append_key_part(i->first.data(), i->first.size()); + key.append_key_part(j->data(), j->size()); + ret = provider().append_key(ws_handle_, key); + } + } + return ret; +} + +void wsrep::transaction::streaming_rollback( + wsrep::unique_lock<wsrep::mutex>& lock) +{ + debug_log_state("streaming_rollback enter"); + assert(state_ != s_must_replay); + assert(is_streaming()); + assert(lock.owns_lock()); + + // Prevent streaming_rollback() to be executed simultaneously. + // Notice that lock is unlocked when calling into server_state + // methods, to avoid violating lock order. + // The condition variable below prevents a thread to go + // through streaming_rollback() while another thread is busy + // stopping or converting the streaming_client(). + // This would be problematic if a thread is performing BF abort, + // while the original client manages to complete its rollback + // and therefore change the state of the transaction, causing + // assertions to fire. + while (streaming_rollback_in_progress_) + client_state_.cond_.wait(lock); + streaming_rollback_in_progress_ = true; + + if (streaming_context_.rolled_back() == false) + { + // Note that streaming_context_ must not be cleaned up in this + // method. This is because the owning thread may still be executing + // fragment removal on commit, which will access fragment + // vector in streaming context. Clearing streaming context + // here may cause owning thread to access memory which was + // already freed. Cleanup for streaming_context_ will happen + // in after_rollback(). + + if (bf_aborted_in_total_order_) + { + lock.unlock(); + server_service_.debug_sync("wsrep_streaming_rollback"); + client_state_.server_state_.stop_streaming_client(&client_state_); + lock.lock(); + } + else + { + // Create a high priority applier which will handle the + // rollback fragment or clean up on configuration change. + // Adopt transaction will copy fragment set and appropriate + // meta data. + lock.unlock(); + server_service_.debug_sync("wsrep_streaming_rollback"); + client_state_.server_state_.convert_streaming_client_to_applier( + &client_state_); + lock.lock(); + + enum wsrep::provider::status status(provider().rollback(id_)); + if (status) + { + lock.unlock(); + client_state_.server_state_.queue_rollback_event(id_); + lock.lock(); + wsrep::log_debug() + << "Failed to replicate rollback fragment for " << id_ + << ": " << status << " ( " + << wsrep::provider::to_string(status) << ")"; + } + } + + // Mark the streaming context as rolled back, + // so that this block is executed once. + streaming_context_.rolled_back(id_); + } + + debug_log_state("streaming_rollback leave"); + streaming_rollback_in_progress_ = false; + client_state_.cond_.notify_all(); +} + +int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock) +{ + int ret(0); + state(lock, s_replaying); + // Need to remember streaming state before replay, entering + // after_commit() after succesful replay will clear + // fragments. + const bool was_streaming(is_streaming()); + lock.unlock(); + client_service_.debug_sync("wsrep_before_replay"); + enum wsrep::provider::status replay_ret(client_service_.replay()); + client_service_.signal_replayed(); + if (was_streaming) + { + client_state_.server_state_.stop_streaming_client(&client_state_); + } + lock.lock(); + switch (replay_ret) + { + case wsrep::provider::success: + if (state() == s_replaying) + { + // Replay was done by using different client state, adjust state + // to committed. + state(lock, s_committed); + } + if (is_streaming()) + { + streaming_context_.cleanup(); + } + provider().release(ws_handle_); + break; + case wsrep::provider::error_certification_failed: + client_state_.override_error( + wsrep::e_deadlock_error); + if (is_streaming()) + { + lock.unlock(); + client_service_.remove_fragments(); + lock.lock(); + streaming_context_.cleanup(); + } + state(lock, s_aborted); + ret = 1; + break; + default: + client_service_.emergency_shutdown(); + break; + } + + WSREP_LOG_DEBUG( + client_state_.debug_log_level(), wsrep::log::debug_level_transaction, + "replay returned: " << replay_ret << " (" + << wsrep::provider::to_string(replay_ret) << ")"); + return ret; +} + +void wsrep::transaction::cleanup() +{ + debug_log_state("cleanup_enter"); + assert(state() == s_committed || state() == s_aborted); + id_ = wsrep::transaction_id::undefined(); + ws_handle_ = wsrep::ws_handle(); + // Keep the state history for troubleshooting. Reset + // at start_transaction(). + // state_hist_.clear(); + if (ordered()) + { + client_state_.update_last_written_gtid(ws_meta_.gtid()); + } + bf_abort_state_ = s_executing; + bf_abort_provider_status_ = wsrep::provider::success; + bf_abort_client_state_ = 0; + bf_aborted_in_total_order_ = false; + ws_meta_ = wsrep::ws_meta(); + flags_ = 0; + certified_ = false; + implicit_deps_ = false; + sr_keys_.clear(); + streaming_context_.cleanup(); + client_service_.cleanup_transaction(); + apply_error_buf_.clear(); + xid_.clear(); + is_bf_immutable_ = false; + debug_log_state("cleanup_leave"); +} + +void wsrep::transaction::debug_log_state( + const char* context) const +{ + WSREP_LOG_DEBUG( + client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + context + << "\n server: " << server_id_ + << ", client: " << int64_t(client_state_.id().get()) + << ", state: " << wsrep::to_c_string(client_state_.state()) + << ", mode: " << wsrep::to_c_string(client_state_.mode()) + << "\n trx_id: " << int64_t(id_.get()) + << ", seqno: " << ws_meta_.seqno().get() + << ", flags: " << flags() + << "\n" + << " state: " << wsrep::to_c_string(state_) + << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_) + << ", error: " << wsrep::to_c_string(client_state_.current_error()) + << ", status: " << client_state_.current_error_status() + << "\n" + << " is_sr: " << is_streaming() + << ", frags: " << streaming_context_.fragments_certified() + << ", frags size: " << streaming_context_.fragments().size() + << ", unit: " << streaming_context_.fragment_unit() + << ", size: " << streaming_context_.fragment_size() + << ", counter: " << streaming_context_.unit_counter() + << ", log_pos: " << streaming_context_.log_position() + << ", sr_rb: " << streaming_context_.rolled_back() + << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id()) + << " thread_id: " << client_state_.owning_thread_id_ + << ""); +} + +void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const +{ + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "key_append: " + << "trx_id: " + << int64_t(id().get()) + << " append key:\n" << key); +} + +std::ostream& wsrep::operator<<(std::ostream& os, + enum wsrep::transaction::state state) +{ + return (os << to_c_string(state)); +} |