From a175314c3e5827eb193872241446f2f8f5c9d33c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 20:07:14 +0200 Subject: Adding upstream version 1:10.5.12. Signed-off-by: Daniel Baumann --- wsrep-lib/src/CMakeLists.txt | 23 + wsrep-lib/src/client_state.cpp | 881 +++++++++++++++ wsrep-lib/src/exception.cpp | 22 + wsrep-lib/src/gtid.cpp | 90 ++ wsrep-lib/src/id.cpp | 81 ++ wsrep-lib/src/key.cpp | 65 ++ wsrep-lib/src/logger.cpp | 43 + wsrep-lib/src/provider.cpp | 170 +++ wsrep-lib/src/seqno.cpp | 26 + wsrep-lib/src/server_state.cpp | 1582 +++++++++++++++++++++++++++ wsrep-lib/src/service_helpers.hpp | 106 ++ wsrep-lib/src/thread.cpp | 30 + wsrep-lib/src/thread_service_v1.cpp | 285 +++++ wsrep-lib/src/thread_service_v1.hpp | 55 + wsrep-lib/src/tls_service_v1.cpp | 232 ++++ wsrep-lib/src/tls_service_v1.hpp | 54 + wsrep-lib/src/transaction.cpp | 2003 ++++++++++++++++++++++++++++++++++ wsrep-lib/src/uuid.cpp | 74 ++ wsrep-lib/src/uuid.hpp | 79 ++ wsrep-lib/src/view.cpp | 71 ++ wsrep-lib/src/wsrep_provider_v26.cpp | 1121 +++++++++++++++++++ wsrep-lib/src/wsrep_provider_v26.hpp | 116 ++ wsrep-lib/src/xid.cpp | 31 + 23 files changed, 7240 insertions(+) create mode 100644 wsrep-lib/src/CMakeLists.txt create mode 100644 wsrep-lib/src/client_state.cpp create mode 100644 wsrep-lib/src/exception.cpp create mode 100644 wsrep-lib/src/gtid.cpp create mode 100644 wsrep-lib/src/id.cpp create mode 100644 wsrep-lib/src/key.cpp create mode 100644 wsrep-lib/src/logger.cpp create mode 100644 wsrep-lib/src/provider.cpp create mode 100644 wsrep-lib/src/seqno.cpp create mode 100644 wsrep-lib/src/server_state.cpp create mode 100644 wsrep-lib/src/service_helpers.hpp create mode 100644 wsrep-lib/src/thread.cpp create mode 100644 wsrep-lib/src/thread_service_v1.cpp create mode 100644 wsrep-lib/src/thread_service_v1.hpp create mode 100644 wsrep-lib/src/tls_service_v1.cpp create mode 100644 wsrep-lib/src/tls_service_v1.hpp create mode 100644 wsrep-lib/src/transaction.cpp create mode 100644 wsrep-lib/src/uuid.cpp create mode 100644 wsrep-lib/src/uuid.hpp create mode 100644 wsrep-lib/src/view.cpp create mode 100644 wsrep-lib/src/wsrep_provider_v26.cpp create mode 100644 wsrep-lib/src/wsrep_provider_v26.hpp create mode 100644 wsrep-lib/src/xid.cpp (limited to 'wsrep-lib/src') diff --git a/wsrep-lib/src/CMakeLists.txt b/wsrep-lib/src/CMakeLists.txt new file mode 100644 index 00000000..0401494b --- /dev/null +++ b/wsrep-lib/src/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright (C) 2018 Codership Oy +# + +add_library(wsrep-lib + client_state.cpp + exception.cpp + gtid.cpp + id.cpp + xid.cpp + key.cpp + logger.cpp + provider.cpp + seqno.cpp + view.cpp + server_state.cpp + thread.cpp + thread_service_v1.cpp + tls_service_v1.cpp + transaction.cpp + uuid.cpp + wsrep_provider_v26.cpp) +target_link_libraries(wsrep-lib wsrep_api_v26 pthread ${WSREP_LIB_LIBDL}) diff --git a/wsrep-lib/src/client_state.cpp b/wsrep-lib/src/client_state.cpp new file mode 100644 index 00000000..68573254 --- /dev/null +++ b/wsrep-lib/src/client_state.cpp @@ -0,0 +1,881 @@ +/* + * Copyright (C) 2018-2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/client_state.hpp" +#include "wsrep/compiler.hpp" +#include "wsrep/logger.hpp" + +#include // usleep() +#include +#include + +wsrep::provider& wsrep::client_state::provider() const +{ + return server_state_.provider(); +} + +void wsrep::client_state::open(wsrep::client_id id) +{ + wsrep::unique_lock lock(mutex_); + assert(state_ == s_none); + assert(keep_command_error_ == false); + debug_log_state("open: enter"); + owning_thread_id_ = wsrep::this_thread::get_id(); + rollbacker_active_ = false; + sync_wait_gtid_ = wsrep::gtid::undefined(); + last_written_gtid_ = wsrep::gtid::undefined(); + state(lock, s_idle); + id_ = id; + debug_log_state("open: leave"); +} + +void wsrep::client_state::close() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("close: enter"); + state(lock, s_quitting); + keep_command_error_ = false; + lock.unlock(); + if (transaction_.active() && + (mode_ != m_local || + transaction_.state() != wsrep::transaction::s_prepared)) + { + client_service_.bf_rollback(); + transaction_.after_statement(); + } + if (mode_ == m_local) + { + disable_streaming(); + } + debug_log_state("close: leave"); +} + +void wsrep::client_state::cleanup() +{ + wsrep::unique_lock lock(mutex_); + cleanup(lock); +} + +void wsrep::client_state::cleanup(wsrep::unique_lock& lock) +{ + debug_log_state("cleanup: enter"); + state(lock, s_none); + debug_log_state("cleanup: leave"); +} + +void wsrep::client_state::override_error(enum wsrep::client_error error, + enum wsrep::provider::status status) +{ + assert(wsrep::this_thread::get_id() == owning_thread_id_); + // Error state should not be cleared with success code without + // explicit reset_error() call. + assert(current_error_ == wsrep::e_success || + error != wsrep::e_success); + current_error_ = error; + current_error_status_ = status; +} + +int wsrep::client_state::before_command(bool keep_command_error) +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("before_command: enter"); + // If the state is s_exec, the processing thread has already grabbed + // control with wait_rollback_complete_and_acquire_ownership() + if (state_ != s_exec) + { + assert(state_ == s_idle); + do_wait_rollback_complete_and_acquire_ownership(lock); + assert(state_ == s_exec); + client_service_.store_globals(); + } + else + { + // This thread must have acquired control by other means, + // for example via wait_rollback_complete_and_acquire_ownership(). + assert(wsrep::this_thread::get_id() == owning_thread_id_); + } + + keep_command_error_ = keep_command_error; + + // If the transaction is active, it must be either executing, + // aborted as rolled back by rollbacker, or must_abort if the + // client thread gained control via + // wait_rollback_complete_and_acquire_ownership() + // just before BF abort happened. + assert(transaction_.active() == false || + (transaction_.state() == wsrep::transaction::s_executing || + transaction_.state() == wsrep::transaction::s_prepared || + transaction_.state() == wsrep::transaction::s_aborted || + transaction_.state() == wsrep::transaction::s_must_abort)); + + if (transaction_.active()) + { + if (transaction_.state() == wsrep::transaction::s_must_abort || + transaction_.state() == wsrep::transaction::s_aborted) + { + if (transaction_.is_xa()) + { + // Client will rollback explicitly, return error. + debug_log_state("before_command: error"); + return 1; + } + + override_error(wsrep::e_deadlock_error); + if (transaction_.state() == wsrep::transaction::s_must_abort) + { + lock.unlock(); + client_service_.bf_rollback(); + lock.lock(); + + } + + if (keep_command_error_) + { + // Keep the error for the next command + debug_log_state("before_command: keep error"); + return 0; + } + + // Clean up the transaction and return error. + lock.unlock(); + (void)transaction_.after_statement(); + lock.lock(); + + assert(transaction_.active() == false); + assert(transaction_.state() == wsrep::transaction::s_aborted); + assert(current_error() != wsrep::e_success); + + debug_log_state("before_command: error"); + return 1; + } + } + debug_log_state("before_command: success"); + return 0; +} + +void wsrep::client_state::after_command_before_result() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("after_command_before_result: enter"); + assert(state() == s_exec); + if (transaction_.active() && + transaction_.state() == wsrep::transaction::s_must_abort) + { + override_error(wsrep::e_deadlock_error); + lock.unlock(); + client_service_.bf_rollback(); + // If keep current error is set, the result will be propagated + // back to client with some future command, so keep the transaction + // open here so that error handling can happen in before_command() + // hook. + if (not keep_command_error_) + { + (void)transaction_.after_statement(); + } + lock.lock(); + assert(transaction_.state() == wsrep::transaction::s_aborted); + assert(current_error() != wsrep::e_success); + } + state(lock, s_result); + debug_log_state("after_command_before_result: leave"); +} + +void wsrep::client_state::after_command_after_result() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("after_command_after_result_enter"); + assert(state() == s_result); + assert(transaction_.state() != wsrep::transaction::s_aborting); + if (transaction_.active() && + transaction_.state() == wsrep::transaction::s_must_abort) + { + lock.unlock(); + client_service_.bf_rollback(); + lock.lock(); + assert(transaction_.state() == wsrep::transaction::s_aborted); + override_error(wsrep::e_deadlock_error); + } + else if (transaction_.active() == false && not keep_command_error_) + { + current_error_ = wsrep::e_success; + current_error_status_ = wsrep::provider::success; + } + keep_command_error_ = false; + sync_wait_gtid_ = wsrep::gtid::undefined(); + state(lock, s_idle); + debug_log_state("after_command_after_result: leave"); +} + +int wsrep::client_state::before_statement() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("before_statement: enter"); +#if 0 + /** + * @todo It might be beneficial to implement timed wait for + * server synced state. + */ + if (allow_dirty_reads_ == false && + server_state_.state() != wsrep::server_state::s_synced) + { + return 1; + } +#endif // 0 + + if (transaction_.active() && + transaction_.state() == wsrep::transaction::s_must_abort) + { + // Rollback and cleanup will happen in after_command_before_result() + debug_log_state("before_statement_error"); + return 1; + } + debug_log_state("before_statement: success"); + return 0; +} + +int wsrep::client_state::after_statement() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("after_statement: enter"); + assert(state() == s_exec); + assert(mode() == m_local); + + if (transaction_.active() && + transaction_.state() == wsrep::transaction::s_must_abort) + { + lock.unlock(); + client_service_.bf_rollback(); + lock.lock(); + assert(transaction_.state() == wsrep::transaction::s_aborted); + // Error may be set already. For example, if fragment size + // exceeded the maximum size in certify_fragment(), then + // we already have wsrep::e_error_during_commit + if (current_error() == wsrep::e_success) + { + override_error(wsrep::e_deadlock_error); + } + } + lock.unlock(); + + (void)transaction_.after_statement(); + if (current_error() == wsrep::e_deadlock_error) + { + if (mode_ == m_local) + { + debug_log_state("after_statement: may_retry"); + return 1; + } + else + { + debug_log_state("after_statement: error"); + return 1; + } + } + debug_log_state("after_statement: success"); + return 0; +} + +////////////////////////////////////////////////////////////////////////////// +// Rollbacker synchronization // +////////////////////////////////////////////////////////////////////////////// + +void wsrep::client_state::sync_rollback_complete() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("sync_rollback_complete: enter"); + assert(state_ == s_idle && mode_ == m_local && + transaction_.state() == wsrep::transaction::s_aborted); + set_rollbacker_active(false); + cond_.notify_all(); + debug_log_state("sync_rollback_complete: leave"); +} + +void wsrep::client_state::wait_rollback_complete_and_acquire_ownership() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("wait_rollback_complete_and_acquire_ownership: enter"); + if (state_ == s_idle) + { + do_wait_rollback_complete_and_acquire_ownership(lock); + } + assert(state_ == s_exec); + debug_log_state("wait_rollback_complete_and_acquire_ownership: leave"); +} + +////////////////////////////////////////////////////////////////////////////// +// Streaming // +////////////////////////////////////////////////////////////////////////////// + +void wsrep::client_state::streaming_params( + enum wsrep::streaming_context::fragment_unit fragment_unit, + size_t fragment_size) +{ + assert(mode_ == m_local); + transaction_.streaming_context().params(fragment_unit, fragment_size); +} + +int wsrep::client_state::enable_streaming( + enum wsrep::streaming_context::fragment_unit + fragment_unit, + size_t fragment_size) +{ + assert(mode_ == m_local); + if (transaction_.is_streaming() && + transaction_.streaming_context().fragment_unit() != + fragment_unit) + { + wsrep::log_error() + << "Changing fragment unit for active streaming transaction " + << "not allowed"; + return 1; + } + transaction_.streaming_context().enable(fragment_unit, fragment_size); + return 0; +} + +void wsrep::client_state::disable_streaming() +{ + assert(mode_ == m_local); + assert(state_ == s_exec || state_ == s_quitting); + transaction_.streaming_context().disable(); +} + +////////////////////////////////////////////////////////////////////////////// +// TOI // +////////////////////////////////////////////////////////////////////////////// + +enum wsrep::provider::status +wsrep::client_state::poll_enter_toi( + wsrep::unique_lock& lock, + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + wsrep::ws_meta& meta, + int flags, + std::chrono::time_point wait_until, + bool& timed_out) +{ + WSREP_LOG_DEBUG(debug_log_level(), + wsrep::log::debug_level_client_state, + "poll_enter_toi: " + << flags + << "," + << wait_until.time_since_epoch().count()); + enum wsrep::provider::status status; + timed_out = false; + wsrep::ws_meta poll_meta; // tmp var for polling, as enter_toi may clear meta arg on errors + do + { + lock.unlock(); + poll_meta = meta; + status = provider().enter_toi(id_, keys, buffer, poll_meta, flags); + if (status != wsrep::provider::success && + not poll_meta.gtid().is_undefined()) + { + // Successfully entered TOI, but the provider reported failure. + // This may happen for example if certification fails. + // Leave TOI before proceeding. + if (provider().leave_toi(id_, wsrep::mutable_buffer())) + { + wsrep::log_warning() + << "Failed to leave TOI after failure in " + << "poll_enter_toi()"; + } + poll_meta = wsrep::ws_meta(); + } + if (status == wsrep::provider::error_certification_failed || + status == wsrep::provider::error_connection_failed) + { + ::usleep(300000); + } + lock.lock(); + timed_out = !(wait_until.time_since_epoch().count() && + wsrep::clock::now() < wait_until); + } + while ((status == wsrep::provider::error_certification_failed || + status == wsrep::provider::error_connection_failed) && + not timed_out && + not client_service_.interrupted(lock)); + meta = poll_meta; + return status; +} + +void wsrep::client_state::enter_toi_common( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + toi_mode_ = mode_; + mode(lock, m_toi); +} + +int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + std::chrono::time_point wait_until) +{ + debug_log_state("enter_toi_local: enter"); + assert(state_ == s_exec); + assert(mode_ == m_local); + int ret; + + wsrep::unique_lock lock(mutex_); + + bool timed_out; + auto const status(poll_enter_toi( + lock, keys, buffer, + toi_meta_, + wsrep::provider::flag::start_transaction | + wsrep::provider::flag::commit, + wait_until, + timed_out)); + switch (status) + { + case wsrep::provider::success: + { + enter_toi_common(lock); + ret = 0; + break; + } + case wsrep::provider::error_certification_failed: + override_error(e_deadlock_error, status); + ret = 1; + break; + default: + if (timed_out) { + override_error(e_timeout_error); + } else { + override_error(e_error_during_commit, status); + } + ret = 1; + break; + } + + debug_log_state("enter_toi_local: leave"); + return ret; +} + +void wsrep::client_state::enter_toi_mode(const wsrep::ws_meta& ws_meta) +{ + debug_log_state("enter_toi_mode: enter"); + wsrep::unique_lock lock(mutex_); + assert(mode_ == m_high_priority); + enter_toi_common(lock); + toi_meta_ = ws_meta; + debug_log_state("enter_toi_mode: leave"); +} + +void wsrep::client_state::leave_toi_common() +{ + wsrep::unique_lock lock(mutex_); + mode(lock, toi_mode_); + toi_mode_ = m_undefined; + if (toi_meta_.gtid().is_undefined() == false) + { + update_last_written_gtid(toi_meta_.gtid()); + } + toi_meta_ = wsrep::ws_meta(); +} + +int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err) +{ + debug_log_state("leave_toi_local: enter"); + assert(toi_mode_ == m_local); + leave_toi_common(); + + debug_log_state("leave_toi_local: leave"); + return (provider().leave_toi(id_, err) == provider::success ? 0 : 1); +} + +void wsrep::client_state::leave_toi_mode() +{ + debug_log_state("leave_toi_mode: enter"); + assert(toi_mode_ == m_high_priority); + leave_toi_common(); + debug_log_state("leave_toi_mode: leave"); +} + +/////////////////////////////////////////////////////////////////////////////// +// RSU // +/////////////////////////////////////////////////////////////////////////////// + +int wsrep::client_state::begin_rsu(int timeout) +{ + if (server_state_.desync()) + { + wsrep::log_warning() << "Failed to desync server"; + return 1; + } + if (server_state_.server_service().wait_committing_transactions(timeout)) + { + wsrep::log_warning() << "RSU failed due to pending transactions"; + server_state_.resync(); + return 1; + } + wsrep::seqno pause_seqno(server_state_.pause()); + if (pause_seqno.is_undefined()) + { + wsrep::log_warning() << "Failed to pause provider"; + server_state_.resync(); + return 1; + } + wsrep::log_info() << "Provider paused at: " << pause_seqno; + wsrep::unique_lock lock(mutex_); + toi_mode_ = mode_; + mode(lock, m_rsu); + return 0; +} + +int wsrep::client_state::end_rsu() +{ + int ret(0); + try + { + server_state_.resume(); + server_state_.resync(); + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_warning() << "End RSU failed: " << e.what(); + ret = 1; + } + wsrep::unique_lock lock(mutex_); + mode(lock, toi_mode_); + toi_mode_ = m_undefined; + return ret; +} + +/////////////////////////////////////////////////////////////////////////////// +// NBO // +/////////////////////////////////////////////////////////////////////////////// + +int wsrep::client_state::begin_nbo_phase_one( + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + std::chrono::time_point wait_until) +{ + debug_log_state("begin_nbo_phase_one: enter"); + debug_log_keys(keys); + wsrep::unique_lock lock(mutex_); + assert(state_ == s_exec); + assert(mode_ == m_local); + assert(toi_mode_ == m_undefined); + + int ret; + bool timed_out; + auto const status(poll_enter_toi( + lock, keys, buffer, + toi_meta_, + wsrep::provider::flag::start_transaction, + wait_until, + timed_out)); + switch (status) + { + case wsrep::provider::success: + toi_mode_ = mode_; + mode(lock, m_nbo); + ret= 0; + break; + case wsrep::provider::error_certification_failed: + override_error(e_deadlock_error, status); + ret = 1; + break; + default: + if (timed_out) { + override_error(e_timeout_error); + } else { + override_error(e_error_during_commit, status); + } + ret = 1; + break; + } + + debug_log_state("begin_nbo_phase_one: leave"); + return ret; +} + +int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err) +{ + debug_log_state("end_nbo_phase_one: enter"); + assert(state_ == s_exec); + assert(mode_ == m_nbo); + assert(in_toi()); + + enum wsrep::provider::status status(provider().leave_toi(id_, err)); + wsrep::unique_lock lock(mutex_); + int ret; + switch (status) + { + case wsrep::provider::success: + ret = 0; + break; + default: + override_error(e_error_during_commit, status); + ret = 1; + break; + } + nbo_meta_ = toi_meta_; + toi_meta_ = wsrep::ws_meta(); + toi_mode_ = m_undefined; + debug_log_state("end_nbo_phase_one: leave"); + return ret; +} + +int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta) +{ + assert(state_ == s_exec); + assert(mode_ == m_local); + assert(toi_mode_ == m_undefined); + wsrep::unique_lock lock(mutex_); + nbo_meta_ = ws_meta; + mode(lock, m_nbo); + return 0; +} + +int wsrep::client_state::begin_nbo_phase_two( + const wsrep::key_array& keys, + std::chrono::time_point wait_until) +{ + debug_log_state("begin_nbo_phase_two: enter"); + debug_log_keys(keys); + assert(state_ == s_exec); + assert(mode_ == m_nbo); + assert(toi_mode_ == m_undefined); + assert(!in_toi()); + + wsrep::unique_lock lock(mutex_); + // Note: nbo_meta_ is passed to enter_toi() as it is + // an input param containing gtid of NBO begin. + // Output stored in nbo_meta_ is copied to toi_meta_ for + // phase two end. + bool timed_out; + enum wsrep::provider::status status( + poll_enter_toi(lock, keys, + wsrep::const_buffer(), + nbo_meta_, + wsrep::provider::flag::commit, + wait_until, + timed_out)); + int ret; + switch (status) + { + case wsrep::provider::success: + ret= 0; + toi_meta_ = nbo_meta_; + toi_mode_ = m_local; + break; + case wsrep::provider::error_provider_failed: + override_error(e_interrupted_error, status); + ret= 1; + break; + default: + if (timed_out) + { + override_error(e_timeout_error, status); + } + else + { + override_error(e_error_during_commit, status); + } + ret= 1; + break; + } + + // Failed to grab TOI for completing NBO in order. This means that + // the operation cannot be ended in total order, so we end the + // NBO mode and let the DBMS to deal with the error. + if (ret) + { + mode(lock, m_local); + nbo_meta_ = wsrep::ws_meta(); + } + + debug_log_state("begin_nbo_phase_two: leave"); + return ret; +} + +int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err) +{ + debug_log_state("end_nbo_phase_two: enter"); + assert(state_ == s_exec); + assert(mode_ == m_nbo); + assert(toi_mode_ == m_local); + assert(in_toi()); + enum wsrep::provider::status status( + provider().leave_toi(id_, err)); + wsrep::unique_lock lock(mutex_); + int ret; + switch (status) + { + case wsrep::provider::success: + ret = 0; + break; + default: + override_error(e_error_during_commit, status); + ret = 1; + break; + } + toi_meta_ = wsrep::ws_meta(); + toi_mode_ = m_undefined; + nbo_meta_ = wsrep::ws_meta(); + mode(lock, m_local); + debug_log_state("end_nbo_phase_two: leave"); + return ret; +} +/////////////////////////////////////////////////////////////////////////////// +// Misc // +/////////////////////////////////////////////////////////////////////////////// + +int wsrep::client_state::sync_wait(int timeout) +{ + std::pair result( + server_state_.causal_read(timeout)); + int ret(1); + switch (result.second) + { + case wsrep::provider::success: + sync_wait_gtid_ = result.first; + ret = 0; + break; + case wsrep::provider::error_not_implemented: + override_error(wsrep::e_not_supported_error); + break; + default: + override_error(wsrep::e_timeout_error); + break; + } + return ret; +} + +/////////////////////////////////////////////////////////////////////////////// +// Private // +/////////////////////////////////////////////////////////////////////////////// + +void wsrep::client_state::do_acquire_ownership( + wsrep::unique_lock& lock WSREP_UNUSED) +{ + assert(lock.owns_lock()); + // Be strict about client state for clients in local mode. The + // owning_thread_id_ is used to detect bugs which are caused by + // more than one thread operating the client state at the time, + // for example thread handling the client session and background + // rollbacker. + assert(state_ == s_idle || mode_ != m_local); + owning_thread_id_ = wsrep::this_thread::get_id(); +} + +void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + assert(state_ == s_idle); + while (is_rollbacker_active()) + { + cond_.wait(lock); + } + do_acquire_ownership(lock); + state(lock, s_exec); +} + +void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid) +{ + assert(last_written_gtid_.is_undefined() || + (last_written_gtid_.id() == gtid.id() && + !(last_written_gtid_.seqno() > gtid.seqno()))); + last_written_gtid_ = gtid; +} + +void wsrep::client_state::debug_log_state(const char* context) const +{ + WSREP_LOG_DEBUG(debug_log_level(), + wsrep::log::debug_level_client_state, + context + << "(" << id_.get() + << "," << to_c_string(state_) + << "," << to_c_string(mode_) + << "," << wsrep::to_string(current_error_) + << "," << current_error_status_ + << ",toi: " << toi_meta_.seqno() + << ",nbo: " << nbo_meta_.seqno() << ")"); +} + +void wsrep::client_state::debug_log_keys(const wsrep::key_array& keys) const +{ + for (size_t i(0); i < keys.size(); ++i) + { + WSREP_LOG_DEBUG(debug_log_level(), + wsrep::log::debug_level_client_state, + "TOI keys: " + << " id: " << id_ + << "key: " << keys[i]); + } +} + +void wsrep::client_state::state( + wsrep::unique_lock& lock WSREP_UNUSED, + enum wsrep::client_state::state state) +{ + // Verify that the current thread has gained control to the + // connection by calling before_command() + assert(wsrep::this_thread::get_id() == owning_thread_id_); + assert(lock.owns_lock()); + static const char allowed[state_max_][state_max_] = + { + /* none idle exec result quit */ + { 0, 1, 0, 0, 0}, /* none */ + { 0, 0, 1, 0, 1}, /* idle */ + { 0, 0, 0, 1, 0}, /* exec */ + { 0, 1, 0, 0, 0}, /* result */ + { 1, 0, 0, 0, 0} /* quit */ + }; + if (!allowed[state_][state]) + { + wsrep::log_warning() << "client_state: Unallowed state transition: " + << state_ << " -> " << state; + assert(0); + } + state_hist_.push_back(state_); + state_ = state; + if (state_hist_.size() > 10) + { + state_hist_.erase(state_hist_.begin()); + } + +} + +void wsrep::client_state::mode( + wsrep::unique_lock& lock WSREP_UNUSED, + enum mode mode) +{ + assert(lock.owns_lock()); + + static const char allowed[n_modes_][n_modes_] = + { /* u l h t r n */ + { 0, 0, 0, 0, 0, 0 }, /* undefined */ + { 0, 0, 1, 1, 1, 1 }, /* local */ + { 0, 1, 0, 1, 0, 1 }, /* high prio */ + { 0, 1, 1, 0, 0, 0 }, /* toi */ + { 0, 1, 0, 0, 0, 0 }, /* rsu */ + { 0, 1, 1, 0, 0, 0 } /* nbo */ + }; + if (!allowed[mode_][mode]) + { + wsrep::log_warning() << "client_state: Unallowed mode transition: " + << mode_ << " -> " << mode; + assert(0); + } + mode_ = mode; +} diff --git a/wsrep-lib/src/exception.cpp b/wsrep-lib/src/exception.cpp new file mode 100644 index 00000000..c55ed641 --- /dev/null +++ b/wsrep-lib/src/exception.cpp @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/exception.hpp" + +bool wsrep::abort_on_exception(false); diff --git a/wsrep-lib/src/gtid.cpp b/wsrep-lib/src/gtid.cpp new file mode 100644 index 00000000..af32d524 --- /dev/null +++ b/wsrep-lib/src/gtid.cpp @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/gtid.hpp" + +#include +#include +#include + +const wsrep::gtid wsrep::gtid::undefined_ = wsrep::gtid(); + +std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::gtid& gtid) +{ + return (os << gtid.id() << ":" << gtid.seqno()); +} + +std::istream& wsrep::operator>>(std::istream& is, wsrep::gtid& gtid) +{ + std::string id_str; + std::getline(is, id_str, ':'); + long long seq; + is >> seq; + if (!is) + { + is.clear(std::ios_base::failbit); + return is; + } + try + { + // wsrep::id constructor will throw if it cannot parse the + // id_str. + gtid = wsrep::gtid(wsrep::id(id_str), wsrep::seqno(seq)); + } + catch (const wsrep::runtime_error& e) + { + // Formatting or extraction error. Clear the istream state and + // set failibit. + is.clear(std::ios_base::failbit); + } + return is; +} + +ssize_t wsrep::scan_from_c_str( + const char* buf, size_t buf_len, wsrep::gtid& gtid) +{ + std::istringstream is(std::string(buf, buf_len)); + is >> gtid; + // Some failure occurred + if (!is) + { + return -EINVAL; + } + // Whole string was consumed without failures + if (is.eof()) + { + return static_cast(buf_len); + } + // The string was not consumed completely, return current position + // of the istream. + return static_cast(is.tellg()); +} + +ssize_t wsrep::print_to_c_str( + const wsrep::gtid& gtid, char* buf, size_t buf_len) +{ + std::ostringstream os; + os << gtid; + if (os.str().size() > buf_len) + { + return -ENOBUFS; + } + std::strncpy(buf, os.str().c_str(), os.str().size()); + return static_cast(os.str().size()); +} diff --git a/wsrep-lib/src/id.cpp b/wsrep-lib/src/id.cpp new file mode 100644 index 00000000..2da188fc --- /dev/null +++ b/wsrep-lib/src/id.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/id.hpp" +#include "uuid.hpp" + +#include +#include +#include + +const wsrep::id wsrep::id::undefined_ = wsrep::id(); + +wsrep::id::id(const std::string& str) + : data_() +{ + wsrep::uuid_t wsrep_uuid; + + if (str.size() == WSREP_LIB_UUID_STR_LEN && + wsrep::uuid_scan(str.c_str(), str.size(), &wsrep_uuid) == + WSREP_LIB_UUID_STR_LEN) + { + std::memcpy(data_.buf, wsrep_uuid.data, sizeof(data_.buf)); + } + else if (str.size() <= 16) + { + std::memcpy(data_.buf, str.c_str(), str.size()); + } + else + { + std::ostringstream os; + os << "String '" << str + << "' does not contain UUID or is longer thatn 16 bytes"; + throw wsrep::runtime_error(os.str()); + } +} + +std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id) +{ + const char* ptr(static_cast(id.data())); + size_t size(id.size()); + if (static_cast(std::count_if(ptr, ptr + size, ::isalnum)) == size) + { + return (os << std::string(ptr, size)); + } + else + { + char uuid_str[WSREP_LIB_UUID_STR_LEN + 1]; + wsrep::uuid_t uuid; + std::memcpy(uuid.data, ptr, sizeof(uuid.data)); + if (wsrep::uuid_print(&uuid, uuid_str, sizeof(uuid_str)) < 0) + { + throw wsrep::runtime_error("Could not print uuid"); + } + uuid_str[WSREP_LIB_UUID_STR_LEN] = '\0'; + return (os << uuid_str); + } +} + +std::istream& wsrep::operator>>(std::istream& is, wsrep::id& id) +{ + std::string id_str; + std::getline(is, id_str); + id = wsrep::id(id_str); + return is; +} diff --git a/wsrep-lib/src/key.cpp b/wsrep-lib/src/key.cpp new file mode 100644 index 00000000..fb94fefc --- /dev/null +++ b/wsrep-lib/src/key.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/key.hpp" +#include +#include + +namespace +{ + void print_key_part(std::ostream& os, const void* ptr, size_t len) + { + std::ios::fmtflags flags_save(os.flags()); + os << len << ": "; + for (size_t i(0); i < len; ++i) + { + os << std::hex + << std::setfill('0') + << std::setw(2) + << static_cast( + *(reinterpret_cast(ptr) + i)) << " "; + } + os.flags(flags_save); + } +} + +std::ostream& wsrep::operator<<(std::ostream& os, + enum wsrep::key::type key_type) +{ + switch (key_type) + { + case wsrep::key::shared: os << "shared"; break; + case wsrep::key::reference: os << "reference"; break; + case wsrep::key::update: os << "update"; break; + case wsrep::key::exclusive: os << "exclusive"; break; + default: os << "unknown"; break; + } + return os; +} + +std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::key& key) +{ + os << "type: " << key.type(); + for (size_t i(0); i < key.size(); ++i) + { + os << "\n "; + print_key_part(os, key.key_parts()[i].data(), key.key_parts()[i].size()); + } + return os; +} diff --git a/wsrep-lib/src/logger.cpp b/wsrep-lib/src/logger.cpp new file mode 100644 index 00000000..058a42a0 --- /dev/null +++ b/wsrep-lib/src/logger.cpp @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/logger.hpp" + +#include + +std::ostream& wsrep::log::os_ = std::cout; +static wsrep::default_mutex log_mutex_; +wsrep::mutex& wsrep::log::mutex_ = log_mutex_; +wsrep::log::logger_fn_type wsrep::log::logger_fn_ = 0; +std::atomic_int wsrep::log::debug_log_level_(0); + +void wsrep::log::logger_fn(wsrep::log::logger_fn_type logger_fn) +{ + logger_fn_ = logger_fn; +} + +void wsrep::log::debug_log_level(int debug_log_level) +{ + debug_log_level_.store(debug_log_level, std::memory_order_relaxed); +} + +int wsrep::log::debug_log_level() +{ + return debug_log_level_.load(std::memory_order_relaxed); +} diff --git a/wsrep-lib/src/provider.cpp b/wsrep-lib/src/provider.cpp new file mode 100644 index 00000000..43ded86d --- /dev/null +++ b/wsrep-lib/src/provider.cpp @@ -0,0 +1,170 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/provider.hpp" +#include "wsrep/logger.hpp" + +#include "wsrep_provider_v26.hpp" + +#include +#include + +wsrep::provider* wsrep::provider::make_provider( + wsrep::server_state& server_state, + const std::string& provider_spec, + const std::string& provider_options, + const wsrep::provider::services& services) +{ + try + { + return new wsrep::wsrep_provider_v26( + server_state, provider_options, provider_spec, services); + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_error() << "Failed to create a new provider '" + << provider_spec << "'" + << " with options '" << provider_options + << "': " << e.what(); + } + catch (...) + { + wsrep::log_error() << "Caught unknown exception when trying to " + << "create a new provider '" + << provider_spec << "'" + << " with options '" << provider_options; + } + return 0; +} + +std::string +wsrep::provider::to_string(enum wsrep::provider::status const val) +{ + switch(val) + { + case success: + return "Success"; + case error_warning: + return "Warning"; + case error_transaction_missing: + return "Transaction not registered with provider"; + case error_certification_failed: + return "Certification failed"; + case error_bf_abort: + return "Transaction was BF aborted"; + case error_size_exceeded: + return "Transaction size exceeded"; + case error_connection_failed: + return "Not connected to Primary Component"; + case error_provider_failed: + return "Provider in bad state, needs to be reinitialized."; + case error_fatal: + return "Fatal error, must abort."; + case error_not_implemented: + return "Function not implemented"; + case error_not_allowed: + return "Operation not allowed"; + case error_unknown: + return "Unknown error"; + } + + assert(0); + + std::ostringstream os; + os << "Invalid error code: " << val; + return os.str(); +} + +std::string wsrep::provider::capability::str(int caps) +{ + std::ostringstream os; + +#define WSREP_PRINT_CAPABILITY(cap_value, cap_string) \ + if (caps & cap_value) { \ + os << cap_string ", "; \ + caps &= ~cap_value; \ + } + + WSREP_PRINT_CAPABILITY(multi_master, "MULTI-MASTER"); + WSREP_PRINT_CAPABILITY(certification, "CERTIFICATION"); + WSREP_PRINT_CAPABILITY(parallel_applying, "PARALLEL_APPLYING"); + WSREP_PRINT_CAPABILITY(transaction_replay, "REPLAY"); + WSREP_PRINT_CAPABILITY(isolation, "ISOLATION"); + WSREP_PRINT_CAPABILITY(pause, "PAUSE"); + WSREP_PRINT_CAPABILITY(causal_reads, "CAUSAL_READ"); + WSREP_PRINT_CAPABILITY(causal_transaction, "CAUSAL_TRX"); + WSREP_PRINT_CAPABILITY(incremental_writeset, "INCREMENTAL_WS"); + WSREP_PRINT_CAPABILITY(session_locks, "SESSION_LOCK"); + WSREP_PRINT_CAPABILITY(distributed_locks, "DISTRIBUTED_LOCK"); + WSREP_PRINT_CAPABILITY(consistency_check, "CONSISTENCY_CHECK"); + WSREP_PRINT_CAPABILITY(unordered, "UNORDERED"); + WSREP_PRINT_CAPABILITY(annotation, "ANNOTATION"); + WSREP_PRINT_CAPABILITY(preordered, "PREORDERED"); + WSREP_PRINT_CAPABILITY(streaming, "STREAMING"); + WSREP_PRINT_CAPABILITY(snapshot, "READ_VIEW"); + WSREP_PRINT_CAPABILITY(nbo, "NBO"); + +#undef WSREP_PRINT_CAPABILITY + + if (caps) + { + assert(caps == 0); // to catch missed capabilities + os << "UNKNOWN(" << caps << ") "; + } + + std::string ret(os.str()); + if (ret.size() > 2) ret.erase(ret.size() - 2); + return ret; +} + +std::string wsrep::flags_to_string(int flags) +{ + std::ostringstream oss; + if (flags & provider::flag::start_transaction) + oss << "start_transaction | "; + if (flags & provider::flag::commit) + oss << "commit | "; + if (flags & provider::flag::rollback) + oss << "rollback | "; + if (flags & provider::flag::isolation) + oss << "isolation | "; + if (flags & provider::flag::pa_unsafe) + oss << "pa_unsafe | "; + if (flags & provider::flag::prepare) + oss << "prepare | "; + if (flags & provider::flag::snapshot) + oss << "read_view | "; + if (flags & provider::flag::implicit_deps) + oss << "implicit_deps | "; + + std::string ret(oss.str()); + if (ret.size() > 3) ret.erase(ret.size() - 3); + return ret; +} + +std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::ws_meta& ws_meta) +{ + os << "gtid: " << ws_meta.gtid() + << " server_id: " << ws_meta.server_id() + << " client_id: " << ws_meta.client_id() + << " trx_id: " << ws_meta.transaction_id() + << " flags: " << ws_meta.flags() + << " (" << wsrep::flags_to_string(ws_meta.flags()) << ")"; + return os; +} diff --git a/wsrep-lib/src/seqno.cpp b/wsrep-lib/src/seqno.cpp new file mode 100644 index 00000000..19ff567d --- /dev/null +++ b/wsrep-lib/src/seqno.cpp @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/seqno.hpp" +#include + +std::ostream& wsrep::operator<<(std::ostream& os, wsrep::seqno seqno) +{ + return (os << seqno.get()); +} diff --git a/wsrep-lib/src/server_state.cpp b/wsrep-lib/src/server_state.cpp new file mode 100644 index 00000000..246f7142 --- /dev/null +++ b/wsrep-lib/src/server_state.cpp @@ -0,0 +1,1582 @@ +/* + * Copyright (C) 2018-2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/server_state.hpp" +#include "wsrep/client_state.hpp" +#include "wsrep/server_service.hpp" +#include "wsrep/high_priority_service.hpp" +#include "wsrep/transaction.hpp" +#include "wsrep/view.hpp" +#include "wsrep/logger.hpp" +#include "wsrep/compiler.hpp" +#include "wsrep/id.hpp" + +#include +#include +#include + + +////////////////////////////////////////////////////////////////////////////// +// Helpers // +////////////////////////////////////////////////////////////////////////////// + + +// +// This method is used to deal with historical burden of several +// ways to bootstrap the cluster. Bootstrap happens if +// +// * bootstrap option is given +// * cluster_address is "gcomm://" (Galera provider) +// +static bool is_bootstrap(const std::string& cluster_address, bool bootstrap) +{ + return (bootstrap || cluster_address == "gcomm://"); +} + +// Helper method to provide detailed error message if transaction +// adopt for fragment removal fails. +static void log_adopt_error(const wsrep::transaction& transaction) +{ + wsrep::log_warning() << "Adopting a transaction (" + << transaction.server_id() << "," << transaction.id() + << ") for rollback failed, " + << "this may leave stale entries to streaming log " + << "which may need to be removed manually."; +} + +// resolve which of the two errors return to caller +static inline int resolve_return_error(bool const vote, + int const vote_err, + int const apply_err) +{ + if (vote) return vote_err; + return vote_err != 0 ? vote_err : apply_err; +} + +static void +discard_streaming_applier(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_meta& ws_meta) +{ + server_state.stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + server_state.server_service().release_high_priority_service( + streaming_applier); + high_priority_service.store_globals(); +} + +static int apply_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret(0); + int apply_err; + wsrep::mutable_buffer err; + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + apply_err = streaming_applier->apply_write_set(ws_meta, data, err); + if (!apply_err) + { + assert(err.size() == 0); + streaming_applier->after_apply(); + } + else + { + bool const remove_fragments(streaming_applier->transaction( + ).streaming_context().fragments().size() > 0); + ret = streaming_applier->rollback(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + + if (remove_fragments) + { + ret = ret || streaming_applier->start_transaction(ws_handle, + ws_meta); + ret = ret || (streaming_applier->adopt_apply_error(err), 0); + ret = ret || streaming_applier->remove_fragments(ws_meta); + ret = ret || streaming_applier->commit(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + } + else + { + ret = streaming_applier->log_dummy_write_set(ws_handle, + ws_meta, err); + } + } + } + + if (!ret) + { + if (!apply_err) + { + high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); + const wsrep::xid xid(streaming_applier->transaction().xid()); + ret = high_priority_service.append_fragment_and_commit( + ws_handle, ws_meta, data, xid); + high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + discard_streaming_applier(server_state, + high_priority_service, + streaming_applier, + ws_meta); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } + } + + return ret; +} + +static int commit_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret(0); + { + wsrep::high_priority_switch sw( + high_priority_service, *streaming_applier); + wsrep::mutable_buffer err; + int const apply_err( + streaming_applier->apply_write_set(ws_meta, data, err)); + if (apply_err) + { + assert(streaming_applier->transaction( + ).streaming_context().fragments().size() > 0); + ret = streaming_applier->rollback(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + ret = ret || streaming_applier->start_transaction( + ws_handle, ws_meta); + ret = ret || (streaming_applier->adopt_apply_error(err),0); + } + else + { + assert(err.size() == 0); + } + + const wsrep::transaction& trx(streaming_applier->transaction()); + // Fragment removal for XA is going to happen in after_commit + if (trx.state() != wsrep::transaction::s_prepared) + { + streaming_applier->debug_crash( + "crash_apply_cb_before_fragment_removal"); + + ret = ret || streaming_applier->remove_fragments(ws_meta); + + streaming_applier->debug_crash( + "crash_apply_cb_after_fragment_removal"); + } + + streaming_applier->debug_crash( + "crash_commit_cb_before_last_fragment_commit"); + ret = ret || streaming_applier->commit(ws_handle, ws_meta); + streaming_applier->debug_crash( + "crash_commit_cb_last_fragment_commit_success"); + ret = ret || (streaming_applier->after_apply(), 0); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } + + if (!ret) + { + discard_streaming_applier(server_state, high_priority_service, + streaming_applier, ws_meta); + } + + return ret; +} + +static int rollback_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + int ret(0); + int adopt_error(0); + bool const remove_fragments(streaming_applier->transaction(). + streaming_context().fragments().size() > 0); + // If fragment removal is needed, adopt transaction state + // and start a transaction for it. + if (remove_fragments && + (adopt_error = high_priority_service.adopt_transaction( + streaming_applier->transaction()))) + { + log_adopt_error(streaming_applier->transaction()); + } + // Even if the adopt above fails we roll back the streaming transaction. + // Adopt failure will leave stale entries in streaming log which can + // be removed manually. + wsrep::const_buffer no_error; + { + wsrep::high_priority_switch ws( + high_priority_service, *streaming_applier); + // Streaming applier rolls back out of order. Fragment + // removal grabs commit order below. + ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta()); + ret = ret || (streaming_applier->after_apply(), 0); + } + + if (!ret) + { + discard_streaming_applier(server_state, high_priority_service, + streaming_applier, ws_meta); + + if (adopt_error == 0) + { + if (remove_fragments) + { + ret = high_priority_service.remove_fragments(ws_meta); + ret = ret || high_priority_service.commit(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + if (ws_meta.ordered()) + { + wsrep::mutable_buffer no_error; + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + } + } + } + return ret; +} + +static int apply_write_set(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret(0); + if (wsrep::rolls_back_transaction(ws_meta.flags())) + { + wsrep::mutable_buffer no_error; + if (wsrep::starts_transaction(ws_meta.flags())) + { + // No transaction existed before, log a dummy write set + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is a known limitation that galera provider + // cannot always determine if certification test + // for interrupted transaction will pass or fail + // (see comments in transaction::certify_fragment()). + // As a consequence, unnecessary rollback fragments + // may be delivered here. The message below has + // been intentionally turned into a debug message, + // rather than warning. + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id()); + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + // rollback_fragment() consumes sa + ret = rollback_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta); + } + } + } + else if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + ret = high_priority_service.start_transaction(ws_handle, ws_meta); + if (!ret) + { + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_write_set( + ws_meta, data, err)); + if (!apply_err) + { + assert(err.size() == 0); + ret = high_priority_service.commit(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + ret = high_priority_service.rollback(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + ret = ret || high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, err); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } + } + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + assert(server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()) == 0); + wsrep::high_priority_service* sa( + server_state.server_service().streaming_applier_service( + high_priority_service)); + server_state.start_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id(), sa); + sa->start_transaction(ws_handle, ws_meta); + ret = apply_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + else if (ws_meta.flags() == 0 || ws_meta.flags() == wsrep::provider::flag::pa_unsafe || + wsrep::prepares_transaction(ws_meta.flags())) + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + wsrep::mutable_buffer no_error; + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + sa->next_fragment(ws_meta); + ret = apply_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + if (high_priority_service.is_replaying()) + { + wsrep::mutable_buffer unused; + ret = high_priority_service.start_transaction( + ws_handle, ws_meta) || + high_priority_service.apply_write_set(ws_meta, data, unused) || + high_priority_service.commit(ws_handle, ws_meta); + } + else + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() + << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + wsrep::mutable_buffer no_error; + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + // Commit fragment consumes sa + sa->next_fragment(ws_meta); + ret = commit_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + } + } + else + { + assert(0); + } + if (ret) + { + wsrep::log_error() << "Failed to apply write set: " << ws_meta; + } + return ret; +} + +static int apply_toi(wsrep::provider& provider, + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + // + // Regular TOI. + // + provider.commit_order_enter(ws_handle, ws_meta); + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_toi(ws_meta,data,err)); + int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err)); + return resolve_return_error(err.size() > 0, vote_err, apply_err); + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + provider.commit_order_enter(ws_handle, ws_meta); + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_nbo_begin(ws_meta, data, err)); + int const vote_err(provider.commit_order_leave(ws_handle, ws_meta, err)); + return resolve_return_error(err.size() > 0, vote_err, apply_err); + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + // NBO end event is ignored here, both local and applied + // have NBO end handled via local TOI calls. + provider.commit_order_enter(ws_handle, ws_meta); + wsrep::mutable_buffer err; + provider.commit_order_leave(ws_handle, ws_meta, err); + return 0; + } + else + { + assert(0); + return 0; + } +} + +////////////////////////////////////////////////////////////////////////////// +// Server State // +////////////////////////////////////////////////////////////////////////////// + +int wsrep::server_state::load_provider( + const std::string& provider_spec, const std::string& provider_options, + const wsrep::provider::services& services) +{ + wsrep::log_info() << "Loading provider " << provider_spec + << " initial position: " << initial_position_; + + provider_ = wsrep::provider::make_provider(*this, + provider_spec, + provider_options, + services); + return (provider_ ? 0 : 1); +} + +void wsrep::server_state::unload_provider() +{ + delete provider_; + provider_ = 0; +} + +int wsrep::server_state::connect(const std::string& cluster_name, + const std::string& cluster_address, + const std::string& state_donor, + bool bootstrap) +{ + bootstrap_ = is_bootstrap(cluster_address, bootstrap); + wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_; + return provider().connect(cluster_name, cluster_address, state_donor, + bootstrap_); +} + +int wsrep::server_state::disconnect() +{ + { + wsrep::unique_lock lock(mutex_); + // In case of failure situations which are caused by provider + // being shut down some failing operation may also try to shut + // down the replication. Check the state here and + // return success if the provider disconnect is already in progress + // or has completed. + if (state(lock) == s_disconnecting || state(lock) == s_disconnected) + { + return 0; + } + state(lock, s_disconnecting); + interrupt_state_waiters(lock); + } + return provider().disconnect(); +} + +wsrep::server_state::~server_state() +{ + delete provider_; +} + +std::vector +wsrep::server_state::status() const +{ + return provider().status(); +} + + +wsrep::seqno wsrep::server_state::pause() +{ + wsrep::unique_lock lock(mutex_); + // Disallow concurrent calls to pause to in order to have non-concurrent + // access to desynced_on_pause_ which is checked in resume() call. + wsrep::log_info() << "pause"; + while (pause_count_ > 0) + { + cond_.wait(lock); + } + ++pause_count_; + assert(pause_seqno_.is_undefined()); + lock.unlock(); + pause_seqno_ = provider().pause(); + lock.lock(); + if (pause_seqno_.is_undefined()) + { + --pause_count_; + } + return pause_seqno_; +} + +void wsrep::server_state::resume() +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_info() << "resume"; + assert(pause_seqno_.is_undefined() == false); + assert(pause_count_ == 1); + if (provider().resume()) + { + throw wsrep::runtime_error("Failed to resume provider"); + } + pause_seqno_ = wsrep::seqno::undefined(); + --pause_count_; + cond_.notify_all(); +} + +wsrep::seqno wsrep::server_state::desync_and_pause() +{ + wsrep::log_info() << "Desyncing and pausing the provider"; + // Temporary variable to store desync() return status. This will be + // assigned to desynced_on_pause_ after pause() call to prevent + // concurrent access to member variable desynced_on_pause_. + bool desync_successful; + if (desync()) + { + // Desync may give transient error if the provider cannot + // communicate with the rest of the cluster. However, this + // error can be tolerated because if the provider can be + // paused successfully below. + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Failed to desync server before pause"); + desync_successful = false; + } + else + { + desync_successful = true; + } + wsrep::seqno ret(pause()); + if (ret.is_undefined()) + { + wsrep::log_warning() << "Failed to pause provider"; + resync(); + return wsrep::seqno::undefined(); + } + else + { + desynced_on_pause_ = desync_successful; + } + wsrep::log_info() << "Provider paused at: " << ret; + return ret; +} + +void wsrep::server_state::resume_and_resync() +{ + wsrep::log_info() << "Resuming and resyncing the provider"; + try + { + // Assign desynced_on_pause_ to local variable before resuming + // in order to avoid concurrent access to desynced_on_pause_ member + // variable. + bool do_resync = desynced_on_pause_; + desynced_on_pause_ = false; + resume(); + if (do_resync) + { + resync(); + } + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_warning() + << "Resume and resync failed, server may have to be restarted"; + } +} + +std::string wsrep::server_state::prepare_for_sst() +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_joiner); + lock.unlock(); + return server_service_.sst_request(); +} + +int wsrep::server_state::start_sst(const std::string& sst_request, + const wsrep::gtid& gtid, + bool bypass) +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_donor); + int ret(0); + lock.unlock(); + if (server_service_.start_sst(sst_request, gtid, bypass)) + { + lock.lock(); + wsrep::log_warning() << "SST start failed"; + state(lock, s_synced); + ret = 1; + } + return ret; +} + +void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error) +{ + if (0 == error) + wsrep::log_info() << "SST sent: " << gtid; + else + wsrep::log_info() << "SST sending failed: " << error; + + wsrep::unique_lock lock(mutex_); + state(lock, s_joined); + lock.unlock(); + enum provider::status const retval(provider().sst_sent(gtid, error)); + if (retval != provider::success) + { + std::string msg("wsrep::sst_sent() returned an error: "); + msg += wsrep::provider::to_string(retval); + server_service_.log_message(wsrep::log::warning, msg.c_str()); + } +} + +void wsrep::server_state::sst_received(wsrep::client_service& cs, + int const error) +{ + wsrep::log_info() << "SST received"; + wsrep::gtid gtid(wsrep::gtid::undefined()); + wsrep::unique_lock lock(mutex_); + assert(state_ == s_joiner || state_ == s_initialized); + + // Run initialization only if the SST was successful. + // In case of SST failure the system is in undefined state + // may not be recoverable. + if (error == 0) + { + if (server_service_.sst_before_init()) + { + if (init_initialized_ == false) + { + state(lock, s_initializing); + lock.unlock(); + server_service_.debug_sync("on_view_wait_initialized"); + lock.lock(); + wait_until_state(lock, s_initialized); + assert(init_initialized_); + } + } + state(lock, s_joined); + lock.unlock(); + + if (id_.is_undefined()) + { + assert(0); + throw wsrep::runtime_error( + "wsrep::sst_received() called before connection to cluster"); + } + + gtid = server_service_.get_position(cs); + wsrep::log_info() << "Recovered position from storage: " << gtid; + wsrep::view const v(server_service_.get_view(cs, id_)); + wsrep::log_info() << "Recovered view from SST:\n" << v; + + /* + * If the state id from recovered view has undefined ID, we may + * be upgrading from earlier version which does not provide + * view stored in stable storage. In this case we skip + * sanity checks and assigning the current view and wait + * until the first view delivery. + */ + if (v.state_id().id().is_undefined() == false) + { + if (v.state_id().id() != gtid.id() || + v.state_id().seqno() > gtid.seqno()) + { + /* Since IN GENERAL we may not be able to recover SST GTID from + * the state data, we have to rely on SST script passing the + * GTID value explicitly. + * Here we check if the passed GTID makes any sense: it should + * have the same UUID and greater or equal seqno than the last + * logged view. */ + std::ostringstream msg; + msg << "SST script passed bogus GTID: " << gtid + << ". Preceding view GTID: " << v.state_id(); + throw wsrep::runtime_error(msg.str()); + } + + if (current_view_.status() == wsrep::view::primary) + { + previous_primary_view_ = current_view_; + } + current_view_ = v; + server_service_.log_view(NULL /* this view is stored already */, v); + } + else + { + wsrep::log_warning() + << "View recovered from stable storage was empty. If the " + << "server is doing rolling upgrade from previous version " + << "which does not support storing view info into stable " + << "storage, this is ok. Otherwise this may be a sign of " + << "malfunction."; + } + lock.lock(); + recover_streaming_appliers_if_not_recovered(lock, cs); + lock.unlock(); + } + + enum provider::status const retval(provider().sst_received(gtid, error)); + if (retval != provider::success) + { + std::string msg("wsrep::sst_received() failed: "); + msg += wsrep::provider::to_string(retval); + throw wsrep::runtime_error(msg); + } +} + +void wsrep::server_state::initialized() +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_info() << "Server initialized"; + init_initialized_ = true; + if (server_service_.sst_before_init()) + { + state(lock, s_initialized); + } + else + { + state(lock, s_initializing); + state(lock, s_initialized); + } +} + +void wsrep::server_state::last_committed_gtid(const wsrep::gtid& gtid) +{ + wsrep::unique_lock lock(mutex_); + assert(last_committed_gtid_.is_undefined() || + last_committed_gtid_.seqno() + 1 == gtid.seqno()); + last_committed_gtid_ = gtid; + cond_.notify_all(); +} + +wsrep::gtid wsrep::server_state::last_committed_gtid() const +{ + wsrep::unique_lock lock(mutex_); + return last_committed_gtid_; +} + +enum wsrep::provider::status +wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout) + const +{ + return provider().wait_for_gtid(gtid, timeout); +} + +int +wsrep::server_state::set_encryption_key(std::vector& key) +{ + encryption_key_ = key; + if (state_ != s_disconnected) + { + wsrep::const_buffer const key(encryption_key_.data(), + encryption_key_.size()); + enum provider::status const retval(provider_->enc_set_key(key)); + if (retval != provider::success) + { + wsrep::log_error() << "Failed to set encryption key: " + << provider::to_string(retval); + return 1; + } + } + return 0; +} + +std::pair +wsrep::server_state::causal_read(int timeout) const +{ + return provider().causal_read(timeout); +} + +void wsrep::server_state::on_connect(const wsrep::view& view) +{ + // Sanity checks + if (view.own_index() < 0 || + size_t(view.own_index()) >= view.members().size()) + { + std::ostringstream os; + os << "Invalid view on connect: own index out of range: " << view; +#ifndef NDEBUG + wsrep::log_error() << os.str(); + assert(0); +#endif + throw wsrep::runtime_error(os.str()); + } + + const size_t own_index(static_cast(view.own_index())); + if (id_.is_undefined() == false && id_ != view.members()[own_index].id()) + { + std::ostringstream os; + os << "Connection in connected state.\n" + << "Connected view:\n" << view + << "Previous view:\n" << current_view_ + << "Current own ID: " << id_; +#ifndef NDEBUG + wsrep::log_error() << os.str(); + assert(0); +#endif + throw wsrep::runtime_error(os.str()); + } + else + { + id_ = view.members()[own_index].id(); + } + + wsrep::log_info() << "Server " + << name_ + << " connected to cluster at position " + << view.state_id() + << " with ID " + << id_; + + wsrep::unique_lock lock(mutex_); + connected_gtid_ = view.state_id(); + state(lock, s_connected); +} + +void wsrep::server_state::on_primary_view( + const wsrep::view& view WSREP_UNUSED, + wsrep::high_priority_service* high_priority_service) +{ + wsrep::unique_lock lock(mutex_); + assert(view.final() == false); + // + // Reached primary from connected state. This may mean the following + // + // 1) Server was joined to the cluster and got SST transfer + // 2) Server was partitioned from the cluster and got back + // 3) A new cluster was bootstrapped from non-prim cluster + // + // There is no enough information here what was the cause + // of the primary component, so we need to walk through + // all states leading to joined to notify possible state + // waiters in other threads. + // + if (server_service_.sst_before_init()) + { + if (state_ == s_connected) + { + state(lock, s_joiner); + // We need to assign init_initialized_ here to local + // variable. If the value here was false, we need to skip + // the initializing -> initialized -> joined state cycle + // below. However, if we don't assign the value to + // local, it is possible that the main thread gets control + // between changing the state to initializing and checking + // initialized flag, which may cause the initialzing -> initialized + // state change to be executed even if it should not be. + const bool was_initialized(init_initialized_); + state(lock, s_initializing); + if (was_initialized) + { + // If server side has already been initialized, + // skip directly to s_joined. + state(lock, s_initialized); + state(lock, s_joined); + } + } + else if (state_ == s_joiner) + { + // Got partiioned from the cluster, got IST and + // started applying actions. + state(lock, s_joined); + } + } + else + { + if (state_ == s_connected) + { + state(lock, s_joiner); + } + if (init_initialized_ && state_ != s_joined) + { + // If server side has already been initialized, + // skip directly to s_joined. + state(lock, s_joined); + } + } + + if (init_initialized_ == false) + { + lock.unlock(); + server_service_.debug_sync("on_view_wait_initialized"); + lock.lock(); + wait_until_state(lock, s_initialized); + } + assert(init_initialized_); + + if (bootstrap_) + { + server_service_.bootstrap(); + bootstrap_ = false; + } + + assert(high_priority_service); + + if (high_priority_service) + { + recover_streaming_appliers_if_not_recovered(lock, + *high_priority_service); + close_orphaned_sr_transactions(lock, *high_priority_service); + } + + if (server_service_.sst_before_init()) + { + if (state_ == s_initialized) + { + state(lock, s_joined); + if (init_synced_) + { + state(lock, s_synced); + } + } + } + else + { + if (state_ == s_joiner) + { + state(lock, s_joined); + if (init_synced_) + { + state(lock, s_synced); + } + } + } +} + +void wsrep::server_state::on_non_primary_view( + const wsrep::view& view, + wsrep::high_priority_service* high_priority_service) +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_info() << "Non-primary view"; + if (view.final()) + { + go_final(lock, view, high_priority_service); + } + else if (state_ != s_disconnecting) + { + state(lock, s_connected); + } +} + +void wsrep::server_state::go_final(wsrep::unique_lock& lock, + const wsrep::view& view, + wsrep::high_priority_service* hps) +{ + (void)view; // avoid compiler warning "unused parameter 'view'" + assert(view.final()); + assert(hps); + if (hps) + { + close_transactions_at_disconnect(*hps); + } + state(lock, s_disconnected); + id_ = wsrep::id::undefined(); +} + +void wsrep::server_state::on_view(const wsrep::view& view, + wsrep::high_priority_service* high_priority_service) +{ + wsrep::log_info() + << "================================================\nView:\n" + << view + << "================================================="; + if (current_view_.status() == wsrep::view::primary) + { + previous_primary_view_ = current_view_; + } + current_view_ = view; + switch (view.status()) + { + case wsrep::view::primary: + on_primary_view(view, high_priority_service); + break; + case wsrep::view::non_primary: + on_non_primary_view(view, high_priority_service); + break; + case wsrep::view::disconnected: + { + wsrep::unique_lock lock(mutex_); + go_final(lock, view, high_priority_service); + break; + } + default: + wsrep::log_warning() << "Unrecognized view status: " << view.status(); + assert(0); + } + + server_service_.log_view(high_priority_service, view); +} + +void wsrep::server_state::on_sync() +{ + wsrep::log_info() << "Server " << name_ << " synced with group"; + wsrep::unique_lock lock(mutex_); + + // Initial sync + if (server_service_.sst_before_init() && init_synced_ == false) + { + switch (state_) + { + case s_synced: + break; + case s_connected: + state(lock, s_joiner); + // fall through + case s_joiner: + state(lock, s_initializing); + break; + case s_donor: + state(lock, s_joined); + state(lock, s_synced); + break; + case s_initialized: + state(lock, s_joined); + // fall through + default: + /* State */ + state(lock, s_synced); + }; + } + else + { + // Calls to on_sync() in synced state are possible if + // server desyncs itself from the group. Provider does not + // inform about this through callbacks. + if (state_ != s_synced) + { + state(lock, s_synced); + } + } + init_synced_ = true; +} + +int wsrep::server_state::on_apply( + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + if (is_toi(ws_meta.flags())) + { + return apply_toi(provider(), high_priority_service, + ws_handle, ws_meta, data); + } + else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags())) + { + // Not implemented yet. + assert(0); + return 0; + } + else + { + return apply_write_set(*this, high_priority_service, + ws_handle, ws_meta, data); + } +} + +void wsrep::server_state::start_streaming_client( + wsrep::client_state* client_state) +{ + wsrep::unique_lock lock(mutex_); + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Start streaming client: " << client_state->id()); + if (streaming_clients_.insert( + std::make_pair(client_state->id(), client_state)).second == false) + { + wsrep::log_warning() << "Failed to insert streaming client " + << client_state->id(); + assert(0); + } +} + +void wsrep::server_state::convert_streaming_client_to_applier( + wsrep::client_state* client_state) +{ + // create streaming_applier beforehand as server_state lock should + // not be held when calling server_service methods + wsrep::high_priority_service* streaming_applier( + server_service_.streaming_applier_service( + client_state->client_service())); + + wsrep::unique_lock lock(mutex_); + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Convert streaming client to applier " + << client_state->id()); + streaming_clients_map::iterator i( + streaming_clients_.find(client_state->id())); + assert(i != streaming_clients_.end()); + if (i == streaming_clients_.end()) + { + wsrep::log_warning() << "Unable to find streaming client " + << client_state->id(); + assert(0); + } + else + { + streaming_clients_.erase(i); + } + + // Convert to applier only if the state is not disconnected. In + // disconnected state the applier map is supposed to be empty + // and it will be reconstructed from fragment storage when + // joining back to cluster. + if (state(lock) != s_disconnected) + { + if (streaming_applier->adopt_transaction(client_state->transaction())) + { + log_adopt_error(client_state->transaction()); + streaming_applier->after_apply(); + server_service_.release_high_priority_service(streaming_applier); + return; + } + if (streaming_appliers_.insert( + std::make_pair( + std::make_pair(client_state->transaction().server_id(), + client_state->transaction().id()), + streaming_applier)).second == false) + { + wsrep::log_warning() << "Could not insert streaming applier " + << id_ + << ", " + << client_state->transaction().id(); + assert(0); + } + } + else + { + server_service_.release_high_priority_service(streaming_applier); + client_state->client_service().store_globals(); + } +} + + +void wsrep::server_state::stop_streaming_client( + wsrep::client_state* client_state) +{ + wsrep::unique_lock lock(mutex_); + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Stop streaming client: " << client_state->id()); + streaming_clients_map::iterator i( + streaming_clients_.find(client_state->id())); + assert(i != streaming_clients_.end()); + if (i == streaming_clients_.end()) + { + wsrep::log_warning() << "Unable to find streaming client " + << client_state->id(); + assert(0); + return; + } + else + { + streaming_clients_.erase(i); + cond_.notify_all(); + } +} + +void wsrep::server_state::start_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id, + wsrep::high_priority_service* sa) +{ + wsrep::unique_lock lock(mutex_); + if (streaming_appliers_.insert( + std::make_pair(std::make_pair(server_id, transaction_id), + sa)).second == false) + { + wsrep::log_error() << "Could not insert streaming applier"; + throw wsrep::fatal_error(); + } +} + +void wsrep::server_state::stop_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id) +{ + wsrep::unique_lock lock(mutex_); + streaming_appliers_map::iterator i( + streaming_appliers_.find(std::make_pair(server_id, transaction_id))); + assert(i != streaming_appliers_.end()); + if (i == streaming_appliers_.end()) + { + wsrep::log_warning() << "Could not find streaming applier for " + << server_id << ":" << transaction_id; + } + else + { + streaming_appliers_.erase(i); + cond_.notify_all(); + } +} + +wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id) const +{ + wsrep::unique_lock lock(mutex_); + streaming_appliers_map::const_iterator i( + streaming_appliers_.find(std::make_pair(server_id, transaction_id))); + return (i == streaming_appliers_.end() ? 0 : i->second); +} + +wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( + const wsrep::xid& xid) const +{ + wsrep::unique_lock lock(mutex_); + streaming_appliers_map::const_iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* sa(i->second); + if (sa->transaction().xid() == xid) + { + return sa; + } + i++; + } + return NULL; +} + +////////////////////////////////////////////////////////////////////////////// +// Private // +////////////////////////////////////////////////////////////////////////////// + +int wsrep::server_state::desync(wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + ++desync_count_; + lock.unlock(); + int ret(provider().desync()); + lock.lock(); + if (ret) + { + --desync_count_; + } + return ret; +} + +void wsrep::server_state::resync(wsrep::unique_lock& + lock WSREP_UNUSED) +{ + assert(lock.owns_lock()); + assert(desync_count_ > 0); + if (desync_count_ > 0) + { + --desync_count_; + if (provider().resync()) + { + throw wsrep::runtime_error("Failed to resync"); + } + } + else + { + wsrep::log_warning() << "desync_count " << desync_count_ + << " on resync"; + } +} + + +void wsrep::server_state::state( + wsrep::unique_lock& lock WSREP_UNUSED, + enum wsrep::server_state::state state) +{ + assert(lock.owns_lock()); + static const char allowed[n_states_][n_states_] = + { + /* dis, ing, ized, cted, jer, jed, dor, sed, ding */ + { 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */ + { 1, 0, 1, 0, 0, 0, 0, 0, 1}, /* ing */ + { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* ized */ + { 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */ + { 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */ + { 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */ + { 1, 0, 0, 1, 0, 1, 0, 1, 1}, /* dor */ + { 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */ + { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */ + }; + + if (allowed[state_][state] == false) + { + std::ostringstream os; + os << "server: " << name_ << " unallowed state transition: " + << wsrep::to_string(state_) << " -> " << wsrep::to_string(state); + wsrep::log_warning() << os.str() << "\n"; + assert(0); + } + + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "server " << name_ << " state change: " + << to_c_string(state_) << " -> " + << to_c_string(state)); + state_hist_.push_back(state_); + server_service_.log_state_change(state_, state); + state_ = state; + cond_.notify_all(); + while (state_waiters_[state_]) + { + cond_.wait(lock); + } +} + +void wsrep::server_state::wait_until_state( + wsrep::unique_lock& lock, + enum wsrep::server_state::state state) const +{ + ++state_waiters_[state]; + while (state_ != state) + { + cond_.wait(lock); + // If the waiter waits for any other state than disconnecting + // or disconnected and the state has been changed to disconnecting, + // this usually means that some error was encountered + if (state != s_disconnecting && state != s_disconnected + && state_ == s_disconnecting) + { + throw wsrep::runtime_error("State wait was interrupted"); + } + } + --state_waiters_[state]; + cond_.notify_all(); +} + +void wsrep::server_state::interrupt_state_waiters( + wsrep::unique_lock& lock WSREP_UNUSED) +{ + assert(lock.owns_lock()); + cond_.notify_all(); +} + +template +void wsrep::server_state::recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock& lock, C& c) +{ + assert(lock.owns_lock()); + if (streaming_appliers_recovered_ == false) + { + lock.unlock(); + server_service_.recover_streaming_appliers(c); + lock.lock(); + } + streaming_appliers_recovered_ = true; +} + +class transaction_state_cmp +{ +public: + transaction_state_cmp(const enum wsrep::transaction::state s) + : state_(s) { } + bool operator()(const std::pair& vt) const + { + return vt.second->transaction().state() == state_; + } +private: + enum wsrep::transaction::state state_; +}; + +void wsrep::server_state::close_orphaned_sr_transactions( + wsrep::unique_lock& lock, + wsrep::high_priority_service& high_priority_service) +{ + assert(lock.owns_lock()); + + // When the originator of an SR transaction leaves the primary + // component of the cluster, that SR must be rolled back. When two + // consecutive primary views have the same membership, the system + // may have been in a state with no primary components. + // Example with 2 node cluster: + // - (1,2 primary) + // - (1 non-primary) and (2 non-primary) + // - (1,2 primary) + // We need to rollback SRs owned by both 1 and 2. + const bool equal_consecutive_views = + current_view_.equal_membership(previous_primary_view_); + + if (current_view_.own_index() == -1 || equal_consecutive_views) + { + streaming_clients_map::iterator i; + transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared); + while ((i = std::find_if_not(streaming_clients_.begin(), + streaming_clients_.end(), + prepared_state_cmp)) + != streaming_clients_.end()) + { + wsrep::client_id client_id(i->first); + wsrep::transaction_id transaction_id(i->second->transaction().id()); + // It is safe to unlock the server state temporarily here. + // The processing happens inside view handler which is + // protected by the provider commit ordering critical + // section. The lock must be unlocked temporarily to + // allow converting the current client to streaming + // applier in transaction::streaming_rollback(). + // The iterator i may be invalidated when the server state + // remains unlocked, so it should not be accessed after + // the bf abort call. + lock.unlock(); + i->second->total_order_bf_abort(current_view_.view_seqno()); + lock.lock(); + streaming_clients_map::const_iterator found_i; + while ((found_i = streaming_clients_.find(client_id)) != + streaming_clients_.end() && + found_i->second->transaction().id() == transaction_id) + { + cond_.wait(lock); + } + } + } + + streaming_appliers_map::iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* streaming_applier(i->second); + + // Rollback SR on equal consecutive primary views or if its + // originator is not in the current view. + // Transactions in prepared state must be committed or + // rolled back explicitly, those are never rolled back here. + if ((streaming_applier->transaction().state() != + wsrep::transaction::s_prepared) && + (equal_consecutive_views || + (std::find_if(current_view_.members().begin(), + current_view_.members().end(), + server_id_cmp(i->first.first)) == + current_view_.members().end()))) + { + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Removing SR fragments for " + << i->first.first + << ", " << i->first.second); + wsrep::id server_id(i->first.first); + wsrep::transaction_id transaction_id(i->first.second); + int adopt_error; + if ((adopt_error = high_priority_service.adopt_transaction( + streaming_applier->transaction()))) + { + log_adopt_error(streaming_applier->transaction()); + } + // Even if the transaction adopt above fails, we roll back + // the transaction. Adopt error will leave stale entries + // in the streaming log which can be removed manually. + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta()); + streaming_applier->after_apply(); + } + + streaming_appliers_.erase(i++); + server_service_.release_high_priority_service(streaming_applier); + high_priority_service.store_globals(); + wsrep::ws_meta ws_meta( + wsrep::gtid(), + wsrep::stid(server_id, transaction_id, wsrep::client_id()), + wsrep::seqno::undefined(), 0); + lock.unlock(); + if (adopt_error == 0) + { + high_priority_service.remove_fragments(ws_meta); + high_priority_service.commit(wsrep::ws_handle(transaction_id, 0), + ws_meta); + } + high_priority_service.after_apply(); + lock.lock(); + } + else + { + ++i; + } + } +} + +void wsrep::server_state::close_transactions_at_disconnect( + wsrep::high_priority_service& high_priority_service) +{ + // Close streaming applier without removing fragments + // from fragment storage. When the server is started again, + // it must be able to recover ongoing streaming transactions. + streaming_appliers_map::iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* streaming_applier(i->second); + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta()); + streaming_applier->after_apply(); + } + streaming_appliers_.erase(i++); + server_service_.release_high_priority_service(streaming_applier); + high_priority_service.store_globals(); + } + streaming_appliers_recovered_ = false; +} diff --git a/wsrep-lib/src/service_helpers.hpp b/wsrep-lib/src/service_helpers.hpp new file mode 100644 index 00000000..6e9f9ca3 --- /dev/null +++ b/wsrep-lib/src/service_helpers.hpp @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2020 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#ifndef WSREP_SERVICE_HELPERS_HPP +#define WSREP_SERVICE_HELPERS_HPP + +#include "wsrep/logger.hpp" + +#include +#include + +namespace wsrep_impl +{ + template + int service_probe(void* dlh, const char* symbol, const char* service_name) + { + union { + InitFn dlfun; + void* obj; + } alias; + // Clear previous errors + (void)dlerror(); + alias.obj = dlsym(dlh, symbol); + if (alias.obj) + { + return 0; + } + else + { + wsrep::log_warning() << "Support for " << service_name + << " " << symbol + << " not found from provider: " + << dlerror(); + return ENOTSUP; + } + } + + + template + int service_init(void* dlh, + const char* symbol, + ServiceCallbacks service_callbacks, + const char* service_name) + { + union { + InitFn dlfun; + void* obj; + } alias; + // Clear previous errors + (void)dlerror(); + alias.obj = dlsym(dlh, symbol); + if (alias.obj) + { + wsrep::log_info() << "Initializing " << service_name; + return (*alias.dlfun)(service_callbacks); + } + else + { + wsrep::log_info() + << "Provider does not support " << service_name; + return ENOTSUP; + } + } + + template + void service_deinit(void* dlh, const char* symbol, const char* service_name) + { + union { + DeinitFn dlfun; + void* obj; + } alias; + // Clear previous errors + (void)dlerror(); + alias.obj = dlsym(dlh, symbol); + if (alias.obj) + { + wsrep::log_info() << "Deinitializing " << service_name; + (*alias.dlfun)(); + } + else + { + wsrep::log_info() + << "Provider does not support deinitializing of " + << service_name; + } + } +} + +#endif // WSREP_SERVICE_HELPERS_HPP + diff --git a/wsrep-lib/src/thread.cpp b/wsrep-lib/src/thread.cpp new file mode 100644 index 00000000..5fec0ff5 --- /dev/null +++ b/wsrep-lib/src/thread.cpp @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/thread.hpp" + +#include + +std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::thread::id& id) +{ + std::ios_base::fmtflags orig_flags(os.flags()); + os << std::hex << id.thread_; + os.flags(orig_flags); + return os; +} diff --git a/wsrep-lib/src/thread_service_v1.cpp b/wsrep-lib/src/thread_service_v1.cpp new file mode 100644 index 00000000..ee81ba31 --- /dev/null +++ b/wsrep-lib/src/thread_service_v1.cpp @@ -0,0 +1,285 @@ +/* + * Copyright (C) 2019-2020 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "thread_service_v1.hpp" +#include "service_helpers.hpp" + +#include "wsrep/thread_service.hpp" +#include "wsrep/logger.hpp" +#include "v26/wsrep_thread_service.h" + +#include +#include +#include + +namespace wsrep_thread_service_v1 +{ + // + // Thread service callbacks + // + + // Pointer to thread service implementation provided by + // the application. + static wsrep::thread_service* thread_service_impl{ 0 }; + static std::atomic use_count; + + static const wsrep_thread_key_t* thread_key_create_cb(const char* name) + { + assert(thread_service_impl); + return reinterpret_cast( + thread_service_impl->create_thread_key(name)); + ; + } + + static int thread_create_cb(const wsrep_thread_key_t* key, + wsrep_thread_t** thread, + void* (*fn)(void*), void* args) + { + assert(thread_service_impl); + return thread_service_impl->create_thread( + reinterpret_cast(key), + reinterpret_cast(thread), fn, + args); + } + + int thread_detach_cb(wsrep_thread_t* thread) + { + assert(thread_service_impl); + return thread_service_impl->detach( + reinterpret_cast(thread)); + } + + int thread_equal_cb(wsrep_thread_t* thread_1, wsrep_thread_t* thread_2) + { + assert(thread_service_impl); + return thread_service_impl->equal( + reinterpret_cast(thread_1), + reinterpret_cast(thread_2)); + } + + __attribute__((noreturn)) + void thread_exit_cb(wsrep_thread_t* thread, void* retval) + { + assert(thread_service_impl); + thread_service_impl->exit( + reinterpret_cast(thread), retval); + throw; // Implementation broke the contract and returned. + } + + int thread_join_cb(wsrep_thread_t* thread, void** retval) + { + assert(thread_service_impl); + return thread_service_impl->join( + reinterpret_cast(thread), retval); + } + + wsrep_thread_t* thread_self_cb(void) + { + assert(thread_service_impl); + return reinterpret_cast(thread_service_impl->self()); + } + + int thread_setschedparam_cb(wsrep_thread_t* thread, int policy, + const struct sched_param* sp) + { + assert(thread_service_impl); + return thread_service_impl->setschedparam( + reinterpret_cast(thread), + policy, sp); + } + + int thread_getschedparam_cb(wsrep_thread_t* thread, int* policy, + struct sched_param* sp) + { + assert(thread_service_impl); + return thread_service_impl->getschedparam( + reinterpret_cast(thread), policy, + sp); + } + + const wsrep_mutex_key_t* mutex_key_create_cb(const char* name) + { + assert(thread_service_impl); + return reinterpret_cast( + thread_service_impl->create_mutex_key(name)); + } + + wsrep_mutex_t* mutex_init_cb(const wsrep_mutex_key_t* key, void* memblock, + size_t memblock_size) + { + assert(thread_service_impl); + return reinterpret_cast( + thread_service_impl->init_mutex( + reinterpret_cast(key), + memblock, memblock_size)); + } + + int mutex_destroy_cb(wsrep_mutex_t* mutex) + { + assert(thread_service_impl); + return thread_service_impl->destroy( + reinterpret_cast(mutex)); + } + int mutex_lock_cb(wsrep_mutex_t* mutex) + { + assert(thread_service_impl); + return thread_service_impl->lock( + reinterpret_cast(mutex)); + } + + int mutex_trylock_cb(wsrep_mutex_t* mutex) + { + assert(thread_service_impl); + return thread_service_impl->trylock( + reinterpret_cast(mutex)); + } + + int mutex_unlock_cb(wsrep_mutex_t* mutex) + { + assert(thread_service_impl); + return thread_service_impl->unlock( + reinterpret_cast(mutex)); + } + + const wsrep_cond_key_t* cond_key_create_cb(const char* name) + { + assert(thread_service_impl); + return reinterpret_cast( + thread_service_impl->create_cond_key(name)); + } + + wsrep_cond_t* cond_init_cb(const wsrep_cond_key_t* key, void* memblock, + size_t memblock_size) + { + assert(thread_service_impl); + return reinterpret_cast(thread_service_impl->init_cond( + reinterpret_cast(key), + memblock, memblock_size)); + } + + int cond_destroy_cb(wsrep_cond_t* cond) + { + assert(thread_service_impl); + return thread_service_impl->destroy( + reinterpret_cast(cond)); + } + + int cond_wait_cb(wsrep_cond_t* cond, wsrep_mutex_t* mutex) + { + assert(thread_service_impl); + return thread_service_impl->wait( + reinterpret_cast(cond), + reinterpret_cast(mutex)); + } + + int cond_timedwait_cb(wsrep_cond_t* cond, wsrep_mutex_t* mutex, + const struct timespec* ts) + { + assert(thread_service_impl); + return thread_service_impl->timedwait( + reinterpret_cast(cond), + reinterpret_cast(mutex), ts); + } + + int cond_signal_cb(wsrep_cond_t* cond) + { + assert(thread_service_impl); + return thread_service_impl->signal( + reinterpret_cast(cond)); + } + + int cond_broadcast_cb(wsrep_cond_t* cond) + { + assert(thread_service_impl); + return thread_service_impl->broadcast( + reinterpret_cast(cond)); + } + + static wsrep_thread_service_v1_t thread_service_callbacks + = { thread_key_create_cb, + thread_create_cb, + thread_detach_cb, + thread_equal_cb, + thread_exit_cb, + thread_join_cb, + thread_self_cb, + thread_setschedparam_cb, + thread_getschedparam_cb, + mutex_key_create_cb, + mutex_init_cb, + mutex_destroy_cb, + mutex_lock_cb, + mutex_trylock_cb, + mutex_unlock_cb, + cond_key_create_cb, + cond_init_cb, + cond_destroy_cb, + cond_wait_cb, + cond_timedwait_cb, + cond_signal_cb, + cond_broadcast_cb }; +} + +int wsrep::thread_service_v1_probe(void* dlh) +{ + typedef int (*init_fn)(wsrep_thread_service_v1_t*); + typedef void (*deinit_fn)(); + if (wsrep_impl::service_probe( + dlh, WSREP_THREAD_SERVICE_INIT_FUNC_V1, "thread service v1") || + wsrep_impl::service_probe( + dlh, WSREP_THREAD_SERVICE_DEINIT_FUNC_V1, "thread service v1")) + { + wsrep::log_warning() << "Provider does not support thread service v1"; + return 1; + } + return 0; +} + +int wsrep::thread_service_v1_init(void* dlh, + wsrep::thread_service* thread_service) +{ + if (not (dlh && thread_service)) return EINVAL; + typedef int (*init_fn)(wsrep_thread_service_v1_t*); + wsrep_thread_service_v1::thread_service_impl = thread_service; + int ret(0); + if ((ret = wsrep_impl::service_init( + dlh, WSREP_THREAD_SERVICE_INIT_FUNC_V1, + &wsrep_thread_service_v1::thread_service_callbacks, + "thread service v1"))) + { + wsrep_thread_service_v1::thread_service_impl = 0; + } + else + { + ++wsrep_thread_service_v1::use_count; + } + return ret; +} + +void wsrep::thread_service_v1_deinit(void* dlh) +{ + typedef int (*deinit_fn)(); + wsrep_impl::service_deinit( + dlh, WSREP_THREAD_SERVICE_DEINIT_FUNC_V1, "thread service v1"); + --wsrep_thread_service_v1::use_count; + if (wsrep_thread_service_v1::use_count == 0) + { + wsrep_thread_service_v1::thread_service_impl = 0; + } +} diff --git a/wsrep-lib/src/thread_service_v1.hpp b/wsrep-lib/src/thread_service_v1.hpp new file mode 100644 index 00000000..b8300041 --- /dev/null +++ b/wsrep-lib/src/thread_service_v1.hpp @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#ifndef WSREP_THREAD_SERVICE_V1_HPP +#define WSREP_THREAD_SERVICE_V1_HPP + +namespace wsrep +{ + class thread_service; + /** + * Probe thread_service_v1 support in loaded library. + * + * @param dlh Handle returned by dlopen(). + * + * @return Zero on success, non-zero system error code on failure. + */ + int thread_service_v1_probe(void *dlh); + + /** + * Initialize the thread service. + * + * @param dlh Handle returned by dlopen(). + * @params thread_service Pointer to wsrep::thread_service implementation. + * + * @return Zero on success, non-zero system error code on failure. + */ + int thread_service_v1_init(void* dlh, + wsrep::thread_service* thread_service); + + /** + * Deinitialize the thread service. + * + * @params dlh Handler returned by dlopen(). + */ + void thread_service_v1_deinit(void* dlh); + +} + +#endif // WSREP_THREAD_SERVICE_V1_HPP diff --git a/wsrep-lib/src/tls_service_v1.cpp b/wsrep-lib/src/tls_service_v1.cpp new file mode 100644 index 00000000..8746a767 --- /dev/null +++ b/wsrep-lib/src/tls_service_v1.cpp @@ -0,0 +1,232 @@ +/* + * Copyright (C) 2020 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "tls_service_v1.hpp" + +#include "wsrep/tls_service.hpp" +#include "wsrep/logger.hpp" +#include "v26/wsrep_tls_service.h" +#include "service_helpers.hpp" + +#include + +namespace wsrep_tls_service_v1 +{ + static wsrep::tls_service* tls_service_impl{0}; + static std::atomic use_count; + + static int tls_stream_init_cb( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream) + { + assert(tls_service_impl); + stream->opaque = + tls_service_impl->create_tls_stream(stream->fd); + if (not stream->opaque) + { + return ENOMEM; + } + return 0; + } + + static void tls_stream_deinit_cb( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream) + { + assert(tls_service_impl); + tls_service_impl->destroy( + reinterpret_cast(stream->opaque)); + } + + static int tls_stream_get_error_number_cb( + wsrep_tls_context_t*, + const wsrep_tls_stream_t* stream) + { + assert(tls_service_impl); + return tls_service_impl->get_error_number( + reinterpret_cast(stream->opaque)); + } + + static const void* tls_stream_get_error_category_cb( + wsrep_tls_context_t*, + const wsrep_tls_stream_t* stream) + { + assert(tls_service_impl); + return tls_service_impl->get_error_category( + reinterpret_cast(stream->opaque)); + } + + static const char* tls_error_message_get_cb( + wsrep_tls_context_t*, + const wsrep_tls_stream_t* stream, + int value, const void* category) + { + assert(tls_service_impl); + return tls_service_impl->get_error_message( + reinterpret_cast(stream->opaque), value, category); + } + + static enum wsrep_tls_result map_return_value(ssize_t status) + { + switch (status) + { + case wsrep::tls_service::success: + return wsrep_tls_result_success; + case wsrep::tls_service::want_read: + return wsrep_tls_result_want_read; + case wsrep::tls_service::want_write: + return wsrep_tls_result_want_write; + case wsrep::tls_service::eof: + return wsrep_tls_result_eof; + case wsrep::tls_service::error: + return wsrep_tls_result_error; + default: + assert(status < 0); + return wsrep_tls_result_error; + } + } + + + static enum wsrep_tls_result + tls_stream_client_handshake_cb( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream) + { + assert(tls_service_impl); + return map_return_value( + tls_service_impl->client_handshake( + reinterpret_cast(stream->opaque))); + } + + static enum wsrep_tls_result + tls_stream_server_handshake_cb( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream) + { + assert(tls_service_impl); + return map_return_value( + tls_service_impl->server_handshake( + reinterpret_cast(stream->opaque))); + } + + static enum wsrep_tls_result tls_stream_read_cb( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream, + void* buf, + size_t max_count, + size_t* bytes_transferred) + { + assert(tls_service_impl); + auto result(tls_service_impl->read( + reinterpret_cast(stream->opaque), + buf, max_count)); + *bytes_transferred = result.bytes_transferred; + return map_return_value(result.status); + } + + static enum wsrep_tls_result tls_stream_write_cb( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream, + const void* buf, + size_t count, + size_t* bytes_transferred) + { + assert(tls_service_impl); + auto result(tls_service_impl->write( + reinterpret_cast(stream->opaque), + buf, count)); + *bytes_transferred = result.bytes_transferred; + return map_return_value(result.status); + } + + static enum wsrep_tls_result + tls_stream_shutdown_cb(wsrep_tls_context_t*, + wsrep_tls_stream_t* stream) + { + assert(tls_service_impl); + // @todo Handle other values than success. + return map_return_value( + tls_service_impl->shutdown( + reinterpret_cast(stream->opaque))); + } + + static wsrep_tls_service_v1_t tls_service_callbacks = + { + tls_stream_init_cb, + tls_stream_deinit_cb, + tls_stream_get_error_number_cb, + tls_stream_get_error_category_cb, + tls_stream_client_handshake_cb, + tls_stream_server_handshake_cb, + tls_stream_read_cb, + tls_stream_write_cb, + tls_stream_shutdown_cb, + tls_error_message_get_cb, + 0 // we pass NULL context for now. + }; +} + +int wsrep::tls_service_v1_probe(void* dlh) +{ + typedef int (*init_fn)(wsrep_tls_service_v1_t*); + typedef void (*deinit_fn)(); + if (wsrep_impl::service_probe( + dlh, WSREP_TLS_SERVICE_INIT_FUNC_V1, "tls service v1") || + wsrep_impl::service_probe( + dlh, WSREP_TLS_SERVICE_DEINIT_FUNC_V1, "tls service v1")) + { + wsrep::log_warning() << "Provider does not support tls service v1"; + return 1; + } + return 0; +} + +int wsrep::tls_service_v1_init(void* dlh, + wsrep::tls_service* tls_service) +{ + if (not (dlh && tls_service)) return EINVAL; + + typedef int (*init_fn)(wsrep_tls_service_v1_t*); + wsrep_tls_service_v1::tls_service_impl = tls_service; + int ret(0); + if ((ret = wsrep_impl::service_init( + dlh, WSREP_TLS_SERVICE_INIT_FUNC_V1, + &wsrep_tls_service_v1::tls_service_callbacks, + "tls service v1"))) + { + wsrep_tls_service_v1::tls_service_impl = 0; + } + else + { + ++wsrep_tls_service_v1::use_count; + } + return ret; +} + +void wsrep::tls_service_v1_deinit(void* dlh) +{ + typedef int (*deinit_fn)(); + wsrep_impl::service_deinit( + dlh, WSREP_TLS_SERVICE_DEINIT_FUNC_V1, "tls service v1"); + --wsrep_tls_service_v1::use_count; + if (wsrep_tls_service_v1::use_count == 0) + { + wsrep_tls_service_v1::tls_service_impl = 0; + } +} diff --git a/wsrep-lib/src/tls_service_v1.hpp b/wsrep-lib/src/tls_service_v1.hpp new file mode 100644 index 00000000..ee9a14be --- /dev/null +++ b/wsrep-lib/src/tls_service_v1.hpp @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2020 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#ifndef WSREP_TLS_SERVICE_V1_HPP +#define WSREP_TLS_SERVICE_V1_HPP + +namespace wsrep +{ + class tls_service; + /** + * Probe thread_service_v1 support in loaded library. + * + * @param dlh Handle returned by dlopen(). + * + * @return Zero on success, non-zero system error code on failure. + */ + int tls_service_v1_probe(void *dlh); + + /** + * Initialize TLS service. + * + * @param dlh Handle returned by dlopen(). + * @params thread_service Pointer to wsrep::thread_service implementation. + * + * @return Zero on success, non-zero system error code on failure. + */ + int tls_service_v1_init(void* dlh, + wsrep::tls_service* thread_service); + + /** + * Deinitialize TLS service. + * + * @param dlh Handler returned by dlopen(). + */ + void tls_service_v1_deinit(void* dlh); +} + +#endif // WSREP_TLS_SERVICE_V1_HPP diff --git a/wsrep-lib/src/transaction.cpp b/wsrep-lib/src/transaction.cpp new file mode 100644 index 00000000..2738271a --- /dev/null +++ b/wsrep-lib/src/transaction.cpp @@ -0,0 +1,2003 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/transaction.hpp" +#include "wsrep/client_state.hpp" +#include "wsrep/server_state.hpp" +#include "wsrep/storage_service.hpp" +#include "wsrep/high_priority_service.hpp" +#include "wsrep/key.hpp" +#include "wsrep/logger.hpp" +#include "wsrep/compiler.hpp" + +#include +#include + +namespace +{ + class storage_service_deleter + { + public: + storage_service_deleter(wsrep::server_service& server_service) + : server_service_(server_service) + { } + void operator()(wsrep::storage_service* storage_service) + { + server_service_.release_storage_service(storage_service); + } + private: + wsrep::server_service& server_service_; + }; + + template + class scoped_storage_service + { + public: + scoped_storage_service(wsrep::client_service& client_service, + wsrep::storage_service* storage_service, + D deleter) + : client_service_(client_service) + , storage_service_(storage_service) + , deleter_(deleter) + { + if (storage_service_ == 0) + { + throw wsrep::runtime_error("Null client_state provided"); + } + client_service_.reset_globals(); + storage_service_->store_globals(); + } + + wsrep::storage_service& storage_service() + { + return *storage_service_; + } + + ~scoped_storage_service() + { + deleter_(storage_service_); + client_service_.store_globals(); + } + private: + scoped_storage_service(const scoped_storage_service&); + scoped_storage_service& operator=(const scoped_storage_service&); + wsrep::client_service& client_service_; + wsrep::storage_service* storage_service_; + D deleter_; + }; +} + +// Public + +wsrep::transaction::transaction( + wsrep::client_state& client_state) + : server_service_(client_state.server_state().server_service()) + , client_service_(client_state.client_service()) + , client_state_(client_state) + , server_id_() + , id_(transaction_id::undefined()) + , state_(s_executing) + , state_hist_() + , bf_abort_state_(s_executing) + , bf_abort_provider_status_() + , bf_abort_client_state_() + , bf_aborted_in_total_order_() + , ws_handle_() + , ws_meta_() + , flags_() + , implicit_deps_(false) + , certified_(false) + , fragments_certified_for_statement_() + , streaming_context_() + , sr_keys_() + , apply_error_buf_() + , xid_() +{ } + + +wsrep::transaction::~transaction() +{ +} + +int wsrep::transaction::start_transaction( + const wsrep::transaction_id& id) +{ + debug_log_state("start_transaction enter"); + assert(active() == false); + assert(is_xa() == false); + assert(flags() == 0); + server_id_ = client_state_.server_state().id(); + id_ = id; + state_ = s_executing; + state_hist_.clear(); + ws_handle_ = wsrep::ws_handle(id); + flags(wsrep::provider::flag::start_transaction); + switch (client_state_.mode()) + { + case wsrep::client_state::m_high_priority: + debug_log_state("start_transaction success"); + return 0; + case wsrep::client_state::m_local: + debug_log_state("start_transaction success"); + return provider().start_transaction(ws_handle_); + default: + debug_log_state("start_transaction error"); + assert(0); + return 1; + } +} + +int wsrep::transaction::start_transaction( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + debug_log_state("start_transaction enter"); + if (state() != s_replaying) + { + // assert(ws_meta.flags()); + assert(active() == false); + assert(flags() == 0); + server_id_ = ws_meta.server_id(); + id_ = ws_meta.transaction_id(); + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + state_ = s_executing; + state_hist_.clear(); + ws_handle_ = ws_handle; + ws_meta_ = ws_meta; + flags(wsrep::provider::flag::start_transaction); + certified_ = true; + } + else + { + ws_meta_ = ws_meta; + assert(ws_meta_.flags() & wsrep::provider::flag::commit); + assert(active()); + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + assert(state() == s_replaying); + assert(ws_meta_.seqno().is_undefined() == false); + certified_ = true; + } + debug_log_state("start_transaction leave"); + return 0; +} + +int wsrep::transaction::next_fragment( + const wsrep::ws_meta& ws_meta) +{ + debug_log_state("next_fragment enter"); + ws_meta_ = ws_meta; + debug_log_state("next_fragment leave"); + return 0; +} + +void wsrep::transaction::adopt(const wsrep::transaction& transaction) +{ + debug_log_state("adopt enter"); + assert(transaction.is_streaming()); + start_transaction(transaction.id()); + server_id_ = transaction.server_id_; + flags_ = transaction.flags(); + streaming_context_ = transaction.streaming_context(); + debug_log_state("adopt leave"); +} + +void wsrep::transaction::fragment_applied(wsrep::seqno seqno) +{ + assert(active()); + streaming_context_.applied(seqno); +} + +int wsrep::transaction::prepare_for_ordering( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) +{ + assert(active()); + + if (state_ != s_replaying) + { + ws_handle_ = ws_handle; + ws_meta_ = ws_meta; + certified_ = is_commit; + } + return 0; +} + +int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid) +{ + try + { + return provider().assign_read_view(ws_handle_, gtid); + } + catch (...) + { + wsrep::log_error() << "Failed to assign read view"; + return 1; + } +} + +int wsrep::transaction::append_key(const wsrep::key& key) +{ + try + { + debug_log_key_append(key); + sr_keys_.insert(key); + return provider().append_key(ws_handle_, key); + } + catch (...) + { + wsrep::log_error() << "Failed to append key"; + return 1; + } +} + +int wsrep::transaction::append_data(const wsrep::const_buffer& data) +{ + + return provider().append_data(ws_handle_, data); +} + +int wsrep::transaction::after_row() +{ + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("after_row_enter"); + int ret(0); + if (streaming_context_.fragment_size() && + streaming_context_.fragment_unit() != streaming_context::statement) + { + ret = streaming_step(lock); + } + debug_log_state("after_row_leave"); + return ret; +} + +int wsrep::transaction::before_prepare( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + int ret(0); + debug_log_state("before_prepare_enter"); + assert(state() == s_executing || state() == s_must_abort || + state() == s_replaying); + + if (state() == s_must_abort) + { + assert(client_state_.mode() == wsrep::client_state::m_local); + client_state_.override_error(wsrep::e_deadlock_error); + return 1; + } + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + if (is_streaming()) + { + client_service_.debug_crash( + "crash_last_fragment_commit_before_fragment_removal"); + lock.unlock(); + if (client_service_.statement_allowed_for_streaming() == false) + { + client_state_.override_error( + wsrep::e_error_during_commit, + wsrep::provider::error_not_allowed); + ret = 1; + } + else if (!is_xa()) + { + // Note: we can't remove fragments here for XA, + // the transaction has already issued XA END and + // is in IDLE state, no more changes allowed! + ret = client_service_.remove_fragments(); + if (ret) + { + client_state_.override_error(wsrep::e_deadlock_error); + } + } + lock.lock(); + client_service_.debug_crash( + "crash_last_fragment_commit_after_fragment_removal"); + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + ret = 1; + } + } + + if (ret == 0) + { + if (is_xa()) + { + // Force fragment replication on XA prepare + flags(flags() | wsrep::provider::flag::prepare); + pa_unsafe(true); + append_sr_keys_for_commit(); + const bool force_streaming_step = true; + ret = streaming_step(lock, force_streaming_step); + if (ret == 0) + { + assert(state() == s_executing); + state(lock, s_preparing); + } + } + else + { + ret = certify_commit(lock); + } + + assert((ret == 0 && state() == s_preparing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + + if (ret) + { + assert(state() == s_must_replay || + client_state_.current_error()); + ret = 1; + } + } + + break; + case wsrep::client_state::m_high_priority: + // Note: fragment removal is done from applying + // context for high priority mode. + if (is_xa()) + { + assert(state() == s_executing || + state() == s_replaying); + if (state() == s_replaying) + { + break; + } + } + state(lock, s_preparing); + break; + default: + assert(0); + break; + } + + assert(state() == s_preparing || + (is_xa() && state() == s_replaying) || + (ret && (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed || + state() == s_aborted))); + debug_log_state("before_prepare_leave"); + return ret; +} + +int wsrep::transaction::after_prepare( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + + int ret = 0; + debug_log_state("after_prepare_enter"); + if (is_xa()) + { + switch (state()) + { + case s_preparing: + assert(client_state_.mode() == wsrep::client_state::m_local || + (certified() && ordered())); + state(lock, s_prepared); + break; + case s_must_abort: + // prepared locally, but client has not received + // a result yet. We can still abort. + assert(client_state_.mode() == wsrep::client_state::m_local); + client_state_.override_error(wsrep::e_deadlock_error); + ret = 1; + break; + case s_replaying: + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + break; + default: + assert(0); + } + } + else + { + assert(certified() && ordered()); + assert(state() == s_preparing || state() == s_must_abort); + + if (state() == s_must_abort) + { + assert(client_state_.mode() == wsrep::client_state::m_local); + state(lock, s_must_replay); + ret = 1; + } + else + { + state(lock, s_committing); + } + } + debug_log_state("after_prepare_leave"); + return ret; +} + +int wsrep::transaction::before_commit() +{ + int ret(1); + + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("before_commit_enter"); + assert(client_state_.mode() != wsrep::client_state::m_toi); + assert(state() == s_executing || + state() == s_prepared || + state() == s_committing || + state() == s_must_abort || + state() == s_replaying); + assert((state() != s_committing && state() != s_replaying) || + certified()); + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + if (state() == s_executing) + { + ret = before_prepare(lock) || after_prepare(lock); + assert((ret == 0 && + (state() == s_committing || state() == s_prepared)) + || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed || + state() == s_aborted)); + } + else if (state() != s_committing && state() != s_prepared) + { + assert(state() == s_must_abort); + if (certified() || + (is_xa() && is_streaming())) + { + state(lock, s_must_replay); + } + else + { + client_state_.override_error(wsrep::e_deadlock_error); + } + } + else + { + // 2PC commit, prepare was done before + ret = 0; + } + + if (ret == 0 && state() == s_prepared) + { + ret = certify_commit(lock); + assert((ret == 0 && state() == s_committing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed || + state() == s_prepared)); + } + + if (ret == 0) + { + assert(certified()); + assert(ordered()); + lock.unlock(); + client_service_.debug_sync("wsrep_before_commit_order_enter"); + enum wsrep::provider::status + status(provider().commit_order_enter(ws_handle_, ws_meta_)); + lock.lock(); + switch (status) + { + case wsrep::provider::success: + break; + case wsrep::provider::error_bf_abort: + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + state(lock, s_must_replay); + ret = 1; + break; + default: + ret = 1; + assert(0); + break; + } + } + break; + case wsrep::client_state::m_high_priority: + assert(certified()); + assert(ordered()); + if (is_xa()) + { + assert(state() == s_prepared || + state() == s_replaying); + state(lock, s_committing); + ret = 0; + } + else if (state() == s_executing || state() == s_replaying) + { + ret = before_prepare(lock) || after_prepare(lock); + } + else + { + ret = 0; + } + lock.unlock(); + ret = ret || provider().commit_order_enter(ws_handle_, ws_meta_); + lock.lock(); + if (ret) + { + state(lock, s_must_abort); + state(lock, s_aborting); + } + break; + default: + assert(0); + break; + } + debug_log_state("before_commit_leave"); + return ret; +} + +int wsrep::transaction::ordered_commit() +{ + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("ordered_commit_enter"); + assert(state() == s_committing); + assert(ordered()); + client_service_.debug_sync("wsrep_before_commit_order_leave"); + int ret(provider().commit_order_leave(ws_handle_, ws_meta_, + apply_error_buf_)); + client_service_.debug_sync("wsrep_after_commit_order_leave"); + // Should always succeed: + // 1) If before commit before succeeds, the transaction handle + // in the provider is guaranteed to exist and the commit + // has been ordered + // 2) The transaction which has been ordered for commit cannot be BF + // aborted anymore + // 3) The provider should always guarantee that the transactions which + // have been ordered for commit can finish committing. + // + // The exception here is a storage service transaction which is running + // in high priority mode. The fragment storage commit may get BF + // aborted in the provider after commit ordering has been + // established since the transaction is operating in streaming + // mode. + if (ret) + { + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + state(lock, s_must_abort); + state(lock, s_aborting); + } + else + { + state(lock, s_ordered_commit); + } + debug_log_state("ordered_commit_leave"); + return ret; +} + +int wsrep::transaction::after_commit() +{ + int ret(0); + + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("after_commit_enter"); + assert(state() == s_ordered_commit); + + if (is_streaming()) + { + assert(client_state_.mode() == wsrep::client_state::m_local || + client_state_.mode() == wsrep::client_state::m_high_priority); + + if (is_xa()) + { + // XA fragment removal happens here, + // see comment in before_prepare + lock.unlock(); + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + storage_service.adopt_transaction(*this); + storage_service.remove_fragments(); + storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta()); + lock.lock(); + } + + if (client_state_.mode() == wsrep::client_state::m_local) + { + lock.unlock(); + client_state_.server_state_.stop_streaming_client(&client_state_); + lock.lock(); + } + streaming_context_.cleanup(); + } + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + ret = provider().release(ws_handle_); + break; + case wsrep::client_state::m_high_priority: + break; + default: + assert(0); + break; + } + assert(ret == 0); + state(lock, s_committed); + + // client_state_.server_state().last_committed_gtid(ws_meta.gitd()); + debug_log_state("after_commit_leave"); + return ret; +} + +int wsrep::transaction::before_rollback() +{ + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("before_rollback_enter"); + assert(state() == s_executing || + state() == s_preparing || + state() == s_prepared || + state() == s_must_abort || + // Background rollbacker or rollback initiated from SE + state() == s_aborting || + state() == s_cert_failed || + state() == s_must_replay); + + switch (client_state_.mode()) + { + case wsrep::client_state::m_local: + if (is_streaming()) + { + client_service_.debug_sync("wsrep_before_SR_rollback"); + } + switch (state()) + { + case s_preparing: + // Error detected during prepare phase + state(lock, s_must_abort); + // fall through + case s_prepared: + // fall through + case s_executing: + // Voluntary rollback + if (is_streaming()) + { + streaming_rollback(lock); + } + state(lock, s_aborting); + break; + case s_must_abort: + if (certified()) + { + state(lock, s_must_replay); + } + else + { + if (is_streaming()) + { + streaming_rollback(lock); + } + state(lock, s_aborting); + } + break; + case s_cert_failed: + if (is_streaming()) + { + streaming_rollback(lock); + } + state(lock, s_aborting); + break; + case s_aborting: + if (is_streaming()) + { + streaming_rollback(lock); + } + break; + case s_must_replay: + break; + default: + assert(0); + break; + } + break; + case wsrep::client_state::m_high_priority: + // Rollback by rollback write set or BF abort + assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting); + if (state_ != s_aborting) + { + state(lock, s_aborting); + } + break; + default: + assert(0); + break; + } + + debug_log_state("before_rollback_leave"); + return 0; +} + +int wsrep::transaction::after_rollback() +{ + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("after_rollback_enter"); + assert(state() == s_aborting || + state() == s_must_replay); + + if (is_streaming() && bf_aborted_in_total_order_) + { + lock.unlock(); + // Storage service scope + { + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + storage_service.adopt_transaction(*this); + storage_service.remove_fragments(); + storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta()); + } + lock.lock(); + streaming_context_.cleanup(); + } + + if (is_streaming() && state() != s_must_replay) + { + streaming_context_.cleanup(); + } + + if (state() == s_aborting) + { + state(lock, s_aborted); + } + + // Releasing the transaction from provider is postponed into + // after_statement() hook. Depending on DBMS system all the + // resources acquired by transaction may or may not be released + // during actual rollback. If the transaction has been ordered, + // releasing the commit ordering critical section should be + // also postponed until all resources have been released. + debug_log_state("after_rollback_leave"); + return 0; +} + +int wsrep::transaction::release_commit_order( + wsrep::unique_lock& lock) +{ + lock.unlock(); + int ret(provider().commit_order_enter(ws_handle_, ws_meta_)); + lock.lock(); + if (!ret) + { + server_service_.set_position(client_service_, ws_meta_.gtid()); + ret = provider().commit_order_leave(ws_handle_, ws_meta_, + apply_error_buf_); + } + return ret; +} + +int wsrep::transaction::after_statement() +{ + int ret(0); + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("after_statement_enter"); + assert(client_state_.mode() == wsrep::client_state::m_local); + assert(state() == s_executing || + state() == s_prepared || + state() == s_committed || + state() == s_aborted || + state() == s_must_abort || + state() == s_cert_failed || + state() == s_must_replay); + + if (state() == s_executing && + streaming_context_.fragment_size() && + streaming_context_.fragment_unit() == streaming_context::statement) + { + ret = streaming_step(lock); + } + + switch (state()) + { + case s_executing: + // ? + break; + case s_prepared: + assert(is_xa()); + break; + case s_committed: + assert(is_streaming() == false); + break; + case s_must_abort: + case s_cert_failed: + client_state_.override_error(wsrep::e_deadlock_error); + lock.unlock(); + ret = client_service_.bf_rollback(); + lock.lock(); + if (state() != s_must_replay) + { + break; + } + // Continue to replay if rollback() changed the state to s_must_replay + // Fall through + case s_must_replay: + { + if (is_xa() && !ordered()) + { + ret = xa_replay(lock); + } + else + { + ret = replay(lock); + } + break; + } + case s_aborted: + // Raise a deadlock error if the transaction was BF aborted and + // rolled back by client outside of transaction hooks. + if (bf_aborted() && client_state_.current_error() == wsrep::e_success && + !client_service_.is_xa_rollback()) + { + client_state_.override_error(wsrep::e_deadlock_error); + } + break; + default: + assert(0); + break; + } + + assert(state() == s_executing || + state() == s_prepared || + state() == s_committed || + state() == s_aborted || + state() == s_must_replay); + + if (state() == s_aborted) + { + if (ordered()) + { + ret = release_commit_order(lock); + } + lock.unlock(); + provider().release(ws_handle_); + lock.lock(); + } + + if (state() != s_executing && + (!client_service_.is_explicit_xa() || + client_state_.state() == wsrep::client_state::s_quitting)) + { + cleanup(); + } + fragments_certified_for_statement_ = 0; + debug_log_state("after_statement_leave"); + assert(ret == 0 || state() == s_aborted); + return ret; +} + +void wsrep::transaction::after_applying() +{ + wsrep::unique_lock lock(client_state_.mutex_); + debug_log_state("after_applying enter"); + assert(state_ == s_executing || + state_ == s_prepared || + state_ == s_committed || + state_ == s_aborted || + state_ == s_replaying); + + if (state_ != s_executing && state_ != s_prepared) + { + if (state_ == s_replaying) state(lock, s_aborted); + cleanup(); + } + else + { + // State remains executing or prepared, so this is a streaming applier. + // Reset the meta data to avoid releasing commit order + // critical section above if the next fragment is rollback + // fragment. Rollback fragment ordering will be handled by + // another instance while removing the fragments from + // storage. + ws_meta_ = wsrep::ws_meta(); + } + debug_log_state("after_applying leave"); +} + +bool wsrep::transaction::bf_abort( + wsrep::unique_lock& lock, + wsrep::seqno bf_seqno) +{ + bool ret(false); + const enum wsrep::transaction::state state_at_enter(state()); + assert(lock.owns_lock()); + + if (active() == false) + { + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Transaction not active, skipping bf abort"); + } + else + { + switch (state_at_enter) + { + case s_executing: + case s_preparing: + case s_prepared: + case s_certifying: + case s_committing: + { + wsrep::seqno victim_seqno; + enum wsrep::provider::status + status(client_state_.provider().bf_abort( + bf_seqno, id_, victim_seqno)); + switch (status) + { + case wsrep::provider::success: + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Seqno " << bf_seqno + << " successfully BF aborted " << id_ + << " victim_seqno " << victim_seqno); + bf_abort_state_ = state_at_enter; + state(lock, s_must_abort); + ret = true; + break; + default: + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Seqno " << bf_seqno + << " failed to BF abort " << id_ + << " with status " << status + << " victim_seqno " << victim_seqno); + break; + } + break; + } + default: + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "BF abort not allowed in state " + << wsrep::to_string(state_at_enter)); + break; + } + } + + if (ret) + { + bf_abort_client_state_ = client_state_.state(); + // If the transaction is in executing state, we must initiate + // streaming rollback to ensure that the rollback fragment gets + // replicated before the victim starts to roll back and release locks. + // In other states the BF abort will be detected outside of + // storage engine operations and streaming rollback will be + // handled from before_rollback() call. + if (client_state_.mode() == wsrep::client_state::m_local && + is_streaming() && state_at_enter == s_executing) + { + streaming_rollback(lock); + } + + if ((client_state_.state() == wsrep::client_state::s_idle && + client_state_.server_state().rollback_mode() == + wsrep::server_state::rm_sync) // locally processing idle + || + // high priority streaming + (client_state_.mode() == wsrep::client_state::m_high_priority && + is_streaming())) + { + // We need to change the state to aborting under the + // lock protection to avoid a race between client thread, + // otherwise it could happen that the client gains control + // between releasing the lock and before background + // rollbacker gets control. + if (is_xa() && state_at_enter == s_prepared) + { + state(lock, s_must_replay); + client_state_.set_rollbacker_active(true); + } + else + { + state(lock, s_aborting); + client_state_.set_rollbacker_active(true); + if (client_state_.mode() == wsrep::client_state::m_high_priority) + { + lock.unlock(); + client_state_.server_state().stop_streaming_applier( + server_id_, id_); + lock.lock(); + } + } + + lock.unlock(); + server_service_.background_rollback(client_state_); + } + } + return ret; +} + +bool wsrep::transaction::total_order_bf_abort( + wsrep::unique_lock& lock WSREP_UNUSED, + wsrep::seqno bf_seqno) +{ + bool ret(bf_abort(lock, bf_seqno)); + if (ret) + { + bf_aborted_in_total_order_ = true; + } + return ret; +} + +void wsrep::transaction::clone_for_replay(const wsrep::transaction& other) +{ + assert(other.state() == s_replaying); + id_ = other.id_; + xid_ = other.xid_; + server_id_ = other.server_id_; + ws_handle_ = other.ws_handle_; + ws_meta_ = other.ws_meta_; + streaming_context_ = other.streaming_context_; + state_ = s_replaying; +} + +void wsrep::transaction::assign_xid(const wsrep::xid& xid) +{ + assert(active()); + assert(!is_xa()); + xid_ = xid; +} + +int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) +{ + wsrep::unique_lock lock(client_state_.mutex_); + assert(active()); + assert(is_empty()); + assert(state() == s_executing || state() == s_replaying); + flags(flags() & ~wsrep::provider::flag::start_transaction); + if (state() == s_executing) + { + state(lock, s_certifying); + } + else + { + state(lock, s_preparing); + } + state(lock, s_prepared); + xid_ = xid; + return 0; +} + +int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, + bool commit) +{ + debug_log_state("commit_or_rollback_by_xid enter"); + wsrep::unique_lock lock(client_state_.mutex_); + wsrep::server_state& server_state(client_state_.server_state()); + wsrep::high_priority_service* sa(server_state.find_streaming_applier(xid)); + + if (!sa) + { + assert(sa); + client_state_.override_error(wsrep::e_error_during_commit); + return 1; + } + + if (commit) + { + flags(wsrep::provider::flag::commit); + } + else + { + flags(wsrep::provider::flag::rollback); + } + pa_unsafe(true); + wsrep::stid stid(sa->transaction().server_id(), + sa->transaction().id(), + client_state_.id()); + wsrep::ws_meta meta(stid); + + const enum wsrep::provider::status cert_ret( + provider().certify(client_state_.id(), + ws_handle_, + flags(), + meta)); + + int ret; + if (cert_ret == wsrep::provider::success) + { + if (commit) + { + state(lock, s_certifying); + state(lock, s_committing); + state(lock, s_committed); + } + else + { + state(lock, s_aborting); + state(lock, s_aborted); + } + ret = 0; + } + else + { + client_state_.override_error(wsrep::e_error_during_commit, + cert_ret); + wsrep::log_error() << "Failed to commit_or_rollback_by_xid," + << " xid: " << xid + << " error: " << cert_ret; + ret = 1; + } + debug_log_state("commit_or_rollback_by_xid leave"); + return ret; +} + +void wsrep::transaction::xa_detach() +{ + debug_log_state("xa_detach enter"); + assert(state() == s_prepared); + wsrep::server_state& server_state(client_state_.server_state()); + server_state.convert_streaming_client_to_applier(&client_state_); + client_service_.store_globals(); + client_service_.cleanup_transaction(); + wsrep::unique_lock lock(client_state_.mutex_); + streaming_context_.cleanup(); + state(lock, s_aborting); + state(lock, s_aborted); + provider().release(ws_handle_); + cleanup(); + debug_log_state("xa_detach leave"); +} + +int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) +{ + debug_log_state("xa_replay enter"); + assert(lock.owns_lock()); + assert(is_xa()); + assert(is_streaming()); + assert(state() == s_must_replay); + assert(bf_aborted()); + + state(lock, s_replaying); + + enum wsrep::provider::status status; + wsrep::server_state& server_state(client_state_.server_state()); + + lock.unlock(); + server_state.convert_streaming_client_to_applier(&client_state_); + status = client_service_.replay_unordered(); + client_service_.store_globals(); + lock.lock(); + + if (status != wsrep::provider::success) + { + client_service_.emergency_shutdown(); + } + + int ret(1); + if (bf_abort_client_state_ == wsrep::client_state::s_idle) + { + state(lock, s_aborted); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + ret = 0; + } + else + { + lock.unlock(); + enum wsrep::provider::status status(client_service_.commit_by_xid()); + lock.lock(); + switch (status) + { + case wsrep::provider::success: + state(lock, s_committed); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + ret = 0; + break; + default: + log_warning() << "Failed to commit by xid during replay"; + // Commit by xid failed, return a commit + // error and let the client retry + state(lock, s_preparing); + state(lock, s_prepared); + client_state_.override_error(wsrep::e_error_during_commit, status); + } + } + + client_service_.signal_replayed(); + debug_log_state("xa_replay leave"); + return ret; +} + +//////////////////////////////////////////////////////////////////////////////// +// Private // +//////////////////////////////////////////////////////////////////////////////// + +inline wsrep::provider& wsrep::transaction::provider() +{ + return client_state_.server_state().provider(); +} + +void wsrep::transaction::state( + wsrep::unique_lock& lock __attribute__((unused)), + enum wsrep::transaction::state next_state) +{ + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "client: " << client_state_.id().get() + << " txc: " << id().get() + << " state: " << to_string(state_) + << " -> " << to_string(next_state)); + + assert(lock.owns_lock()); + // BF aborter is allowed to change the state without gaining control + // to the state if the next state is s_must_abort, s_aborting or + // s_must_replay (for xa idle replay). + assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || + next_state == s_must_abort || + next_state == s_must_replay || + next_state == s_aborting); + + static const char allowed[n_states][n_states] = + { /* ex pg pd ce co oc ct cf ma ab ad mr re */ + { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ + { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */ + { 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0}, /* pd */ + { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ + { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ + { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ + { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ + { 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0} /* re */ + }; + + if (!allowed[state_][next_state]) + { + wsrep::log_debug() << "unallowed state transition for transaction " + << id_ << ": " << wsrep::to_string(state_) + << " -> " << wsrep::to_string(next_state); + assert(0); + } + + state_hist_.push_back(state_); + if (state_hist_.size() == 12) + { + state_hist_.erase(state_hist_.begin()); + } + state_ = next_state; + + if (state_ == s_must_replay) + { + client_service_.will_replay(); + } +} + +bool wsrep::transaction::abort_or_interrupt( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + return true; + } + else if (state() == s_aborting || state() == s_aborted) + { + // Somehow we got there after BF abort without setting error + // status. This may happen if the DBMS side aborts the transaction + // but still tries to continue processing rows. Raise the error here + // and assert so that the error will be caught in debug builds. + if (bf_abort_client_state_ && + client_state_.current_error() == wsrep::e_success) + { + client_state_.override_error(wsrep::e_deadlock_error); + assert(0); + } + return true; + } + else if (client_service_.interrupted(lock)) + { + client_state_.override_error(wsrep::e_interrupted_error); + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + return true; + } + return false; +} + +int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, + bool force) +{ + assert(lock.owns_lock()); + assert(streaming_context_.fragment_size() || is_xa()); + + if (client_service_.bytes_generated() < + streaming_context_.log_position()) + { + /* Something went wrong on DBMS side in keeping track of + generated bytes. Return an error to abort the transaction. */ + wsrep::log_warning() << "Bytes generated " + << client_service_.bytes_generated() + << " less than bytes certified " + << streaming_context_.log_position() + << ", aborting streaming transaction"; + return 1; + } + int ret(0); + + const size_t bytes_to_replicate(client_service_.bytes_generated() - + streaming_context_.log_position()); + + switch (streaming_context_.fragment_unit()) + { + case streaming_context::row: + // fall through + case streaming_context::statement: + streaming_context_.increment_unit_counter(1); + break; + case streaming_context::bytes: + streaming_context_.set_unit_counter(bytes_to_replicate); + break; + } + + // Some statements have no effect. Do not atttempt to + // replicate a fragment if no data has been generated since + // last fragment replication. + // We use `force = true` on XA prepare: a fragment will be + // generated even if no data is pending replication. + if (bytes_to_replicate <= 0 && !force) + { + assert(bytes_to_replicate == 0); + return ret; + } + + if (streaming_context_.fragment_size_exceeded() || force) + { + streaming_context_.reset_unit_counter(); + ret = certify_fragment(lock); + } + + return ret; +} + +int wsrep::transaction::certify_fragment( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + + assert(client_state_.mode() == wsrep::client_state::m_local); + assert(streaming_context_.rolled_back() == false || + state() == s_must_abort); + + client_service_.wait_for_replayers(lock); + if (abort_or_interrupt(lock)) + { + return 1; + } + + state(lock, s_certifying); + lock.unlock(); + client_service_.debug_sync("wsrep_before_fragment_certification"); + + wsrep::mutable_buffer data; + size_t log_position(0); + if (client_service_.prepare_fragment_for_replication(data, log_position)) + { + lock.lock(); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit); + return 1; + } + streaming_context_.set_log_position(log_position); + + if (data.size() == 0) + { + wsrep::log_warning() << "Attempt to replicate empty data buffer"; + lock.lock(); + state(lock, s_executing); + return 0; + } + + if (provider().append_data(ws_handle_, + wsrep::const_buffer(data.data(), data.size()))) + { + lock.lock(); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit); + return 1; + } + + if (is_xa()) + { + // One more check to see if the transaction + // has been aborted. This is necessary because + // until the append_data above will make sure + // that the transaction exists in provider. + lock.lock(); + if (abort_or_interrupt(lock)) + { + return 1; + } + lock.unlock(); + } + + if (is_streaming() == false) + { + client_state_.server_state_.start_streaming_client(&client_state_); + } + + if (implicit_deps()) + { + flags(flags() | wsrep::provider::flag::implicit_deps); + } + + int ret(0); + enum wsrep::client_error error(wsrep::e_success); + enum wsrep::provider::status cert_ret(wsrep::provider::success); + // Storage service scope + { + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + + // First the fragment is appended to the stable storage. + // This is done to ensure that there is enough capacity + // available to store the fragment. The fragment meta data + // is updated after certification. + wsrep::id server_id(client_state_.server_state().id()); + assert(server_id.is_undefined() == false); + if (storage_service.start_transaction(ws_handle_) || + storage_service.append_fragment( + server_id, + id(), + flags(), + wsrep::const_buffer(data.data(), data.size()), + xid())) + { + ret = 1; + error = wsrep::e_append_fragment_error; + } + + if (ret == 0) + { + client_service_.debug_crash( + "crash_replicate_fragment_before_certify"); + + wsrep::ws_meta sr_ws_meta; + cert_ret = provider().certify(client_state_.id(), + ws_handle_, + flags(), + sr_ws_meta); + client_service_.debug_crash( + "crash_replicate_fragment_after_certify"); + + switch (cert_ret) + { + case wsrep::provider::success: + ++fragments_certified_for_statement_; + assert(sr_ws_meta.seqno().is_undefined() == false); + streaming_context_.certified(); + if (storage_service.update_fragment_meta(sr_ws_meta)) + { + storage_service.rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + ret = 1; + error = wsrep::e_deadlock_error; + break; + } + if (storage_service.commit(ws_handle_, sr_ws_meta)) + { + ret = 1; + error = wsrep::e_deadlock_error; + } + else + { + streaming_context_.stored(sr_ws_meta.seqno()); + } + client_service_.debug_crash( + "crash_replicate_fragment_success"); + break; + case wsrep::provider::error_bf_abort: + case wsrep::provider::error_certification_failed: + // Streaming transcation got BF aborted, so it must roll + // back. Roll back the fragment storage operation out of + // order as the commit order will be grabbed later on + // during rollback process. Mark the fragment as certified + // though in streaming context in order to enter streaming + // rollback codepath. + // + // Note that despite we handle error_certification_failed + // here, we mark the transaction as streaming. Apparently + // the provider may return status corresponding to certification + // failure even if the fragment has passed certification. + // This may be a bug in provider implementation or a limitation + // of error codes defined in wsrep-API. In order to make + // sure that the transaction will be cleaned on other servers, + // we take a risk of sending one rollback fragment for nothing. + storage_service.rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + streaming_context_.certified(); + ret = 1; + error = wsrep::e_deadlock_error; + break; + default: + // Storage service rollback must be done out of order, + // otherwise there may be a deadlock between BF aborter + // and the rollback process. + storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta()); + ret = 1; + error = wsrep::e_deadlock_error; + break; + } + } + } + + // Note: This does not release the handle in the provider + // since streaming is still on. However it is needed to + // make provider internal state to transition for the + // next fragment. If any of the operations above failed, + // the handle needs to be left unreleased for the following + // rollback process. + if (ret == 0) + { + assert(error == wsrep::e_success); + ret = provider().release(ws_handle_); + if (ret) + { + error = wsrep::e_deadlock_error; + } + } + lock.lock(); + if (ret) + { + assert(error != wsrep::e_success); + if (is_streaming() == false) + { + lock.unlock(); + client_state_.server_state_.stop_streaming_client(&client_state_); + lock.lock(); + } + else + { + streaming_rollback(lock); + } + if (state_ != s_must_abort) + { + state(lock, s_must_abort); + } + client_state_.override_error(error, cert_ret); + } + else if (state_ == s_must_abort) + { + if (is_streaming()) + { + streaming_rollback(lock); + } + client_state_.override_error(wsrep::e_deadlock_error, cert_ret); + ret = 1; + } + else + { + assert(state_ == s_certifying); + state(lock, s_executing); + flags(flags() & ~wsrep::provider::flag::start_transaction); + flags(flags() & ~wsrep::provider::flag::pa_unsafe); + } + return ret; +} + +int wsrep::transaction::certify_commit( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + assert(active()); + client_service_.wait_for_replayers(lock); + + assert(lock.owns_lock()); + + if (abort_or_interrupt(lock)) + { + if (is_xa() && state() == s_must_abort) + { + state(lock, s_must_replay); + } + return 1; + } + + state(lock, s_certifying); + lock.unlock(); + + if (is_streaming()) + { + if (!is_xa()) + { + append_sr_keys_for_commit(); + } + pa_unsafe(true); + } + + if (implicit_deps()) + { + flags(flags() | wsrep::provider::flag::implicit_deps); + } + + flags(flags() | wsrep::provider::flag::commit); + flags(flags() & ~wsrep::provider::flag::prepare); + + if (client_service_.prepare_data_for_replication()) + { + lock.lock(); + // Here we fake that the size exceeded error came from provider, + // even though it came from the client service. This requires + // some consideration how to get meaningful error codes from + // the client service. + client_state_.override_error(wsrep::e_size_exceeded_error, + wsrep::provider::error_size_exceeded); + if (state_ != s_must_abort) + { + state(lock, s_must_abort); + } + return 1; + } + + client_service_.debug_sync("wsrep_before_certification"); + enum wsrep::provider::status + cert_ret(provider().certify(client_state_.id(), + ws_handle_, + flags(), + ws_meta_)); + client_service_.debug_sync("wsrep_after_certification"); + + lock.lock(); + + assert(state() == s_certifying || state() == s_must_abort); + + int ret(1); + switch (cert_ret) + { + case wsrep::provider::success: + assert(ordered()); + certified_ = true; + ++fragments_certified_for_statement_; + switch (state()) + { + case s_certifying: + if (is_xa()) + { + state(lock, s_committing); + } + else + { + state(lock, s_preparing); + } + ret = 0; + break; + case s_must_abort: + // We got BF aborted after succesful certification + // and before acquiring client state lock. The trasaction + // must be replayed. + state(lock, s_must_replay); + break; + default: + assert(0); + break; + } + break; + case wsrep::provider::error_warning: + assert(ordered() == false); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_transaction_missing: + state(lock, s_must_abort); + // The execution should never reach this point if the + // transaction has not generated any keys or data. + wsrep::log_warning() << "Transaction was missing in provider"; + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_bf_abort: + // Transaction was replicated successfully and it was either + // certified successfully or the result of certifying is not + // yet known. Therefore the transaction must roll back + // and go through replay either to replay and commit the whole + // transaction or to determine failed certification status. + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + state(lock, s_must_replay); + break; + case wsrep::provider::error_certification_failed: + state(lock, s_cert_failed); + client_state_.override_error(wsrep::e_deadlock_error); + break; + case wsrep::provider::error_size_exceeded: + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_connection_failed: + // Galera provider may return CONN_FAIL if the trx is + // BF aborted O_o. If we see here that the trx was BF aborted, + // return deadlock error instead of error during commit + // to reduce number of error state combinations elsewhere. + if (state() == s_must_abort) + { + if (is_xa()) + { + state(lock, s_must_replay); + } + client_state_.override_error(wsrep::e_deadlock_error); + } + else + { + client_state_.override_error(wsrep::e_error_during_commit, + cert_ret); + if (is_xa()) + { + state(lock, s_prepared); + } + else + { + state(lock, s_must_abort); + } + } + break; + case wsrep::provider::error_provider_failed: + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + case wsrep::provider::error_fatal: + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + state(lock, s_must_abort); + client_service_.emergency_shutdown(); + break; + case wsrep::provider::error_not_implemented: + case wsrep::provider::error_not_allowed: + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + state(lock, s_must_abort); + wsrep::log_warning() << "Certification operation was not allowed: " + << "id: " << id().get() + << " flags: " << std::hex << flags() << std::dec; + break; + default: + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit, cert_ret); + break; + } + + return ret; +} + +int wsrep::transaction::append_sr_keys_for_commit() +{ + int ret(0); + assert(client_state_.mode() == wsrep::client_state::m_local); + for (wsrep::sr_key_set::branch_type::const_iterator + i(sr_keys_.root().begin()); + ret == 0 && i != sr_keys_.root().end(); ++i) + { + for (wsrep::sr_key_set::leaf_type::const_iterator + j(i->second.begin()); + ret == 0 && j != i->second.end(); ++j) + { + wsrep::key key(wsrep::key::shared); + key.append_key_part(i->first.data(), i->first.size()); + key.append_key_part(j->data(), j->size()); + ret = provider().append_key(ws_handle_, key); + } + } + return ret; +} + +void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lock) +{ + debug_log_state("streaming_rollback enter"); + assert(state_ != s_must_replay); + assert(is_streaming()); + if (streaming_context_.rolled_back() == false) + { + // We must set rolled_back id before stopping streaming client + // or converting to applier. Accessing server_state requires + // releasing the client_state lock in order to avoid violating + // locking order, and this will open up a possibility for two + // threads accessing this block simultaneously. + streaming_context_.rolled_back(id_); + if (bf_aborted_in_total_order_) + { + lock.unlock(); + client_state_.server_state_.stop_streaming_client(&client_state_); + lock.lock(); + } + else + { + // Create a high priority applier which will handle the + // rollback fragment or clean up on configuration change. + // Adopt transaction will copy fragment set and appropriate + // meta data. Mark current transaction streaming context + // rolled back. + lock.unlock(); + client_state_.server_state_.convert_streaming_client_to_applier( + &client_state_); + lock.lock(); + streaming_context_.cleanup(); + // Cleanup cleans rolled_back_for from streaming context, but + // we want to preserve it to avoid executing this block + // more than once. + streaming_context_.rolled_back(id_); + enum wsrep::provider::status ret; + if ((ret = provider().rollback(id_))) + { + wsrep::log_debug() + << "Failed to replicate rollback fragment for " + << id_ << ": " << ret; + } + } + } + debug_log_state("streaming_rollback leave"); +} + +int wsrep::transaction::replay(wsrep::unique_lock& lock) +{ + int ret(0); + state(lock, s_replaying); + // Need to remember streaming state before replay, entering + // after_commit() after succesful replay will clear + // fragments. + const bool was_streaming(is_streaming()); + lock.unlock(); + client_service_.debug_sync("wsrep_before_replay"); + enum wsrep::provider::status replay_ret(client_service_.replay()); + client_service_.signal_replayed(); + if (was_streaming) + { + client_state_.server_state_.stop_streaming_client(&client_state_); + } + lock.lock(); + switch (replay_ret) + { + case wsrep::provider::success: + if (state() == s_replaying) + { + // Replay was done by using different client state, adjust state + // to committed. + state(lock, s_committed); + } + if (is_streaming()) + { + streaming_context_.cleanup(); + } + provider().release(ws_handle_); + break; + case wsrep::provider::error_certification_failed: + client_state_.override_error( + wsrep::e_deadlock_error); + if (is_streaming()) + { + client_service_.remove_fragments(); + streaming_context_.cleanup(); + } + state(lock, s_aborted); + ret = 1; + break; + default: + client_service_.emergency_shutdown(); + break; + } + + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "replay returned" << replay_ret); + return ret; +} + +void wsrep::transaction::cleanup() +{ + debug_log_state("cleanup_enter"); + assert(is_streaming() == false); + assert(state() == s_committed || state() == s_aborted); + id_ = wsrep::transaction_id::undefined(); + ws_handle_ = wsrep::ws_handle(); + // Keep the state history for troubleshooting. Reset + // at start_transaction(). + // state_hist_.clear(); + if (ordered()) + { + client_state_.update_last_written_gtid(ws_meta_.gtid()); + } + bf_abort_state_ = s_executing; + bf_abort_provider_status_ = wsrep::provider::success; + bf_abort_client_state_ = 0; + bf_aborted_in_total_order_ = false; + ws_meta_ = wsrep::ws_meta(); + flags_ = 0; + certified_ = false; + implicit_deps_ = false; + sr_keys_.clear(); + streaming_context_.cleanup(); + client_service_.cleanup_transaction(); + apply_error_buf_.clear(); + xid_.clear(); + debug_log_state("cleanup_leave"); +} + +void wsrep::transaction::debug_log_state( + const char* context) const +{ + WSREP_LOG_DEBUG( + client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + context + << "\n server: " << server_id_ + << ", client: " << int64_t(client_state_.id().get()) + << ", state: " << wsrep::to_c_string(client_state_.state()) + << ", mode: " << wsrep::to_c_string(client_state_.mode()) + << "\n trx_id: " << int64_t(id_.get()) + << ", seqno: " << ws_meta_.seqno().get() + << ", flags: " << flags() + << "\n" + << " state: " << wsrep::to_c_string(state_) + << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_) + << ", error: " << wsrep::to_c_string(client_state_.current_error()) + << ", status: " << client_state_.current_error_status() + << "\n" + << " is_sr: " << is_streaming() + << ", frags: " << streaming_context_.fragments_certified() + << ", frags size: " << streaming_context_.fragments().size() + << ", unit: " << streaming_context_.fragment_unit() + << ", size: " << streaming_context_.fragment_size() + << ", counter: " << streaming_context_.unit_counter() + << ", log_pos: " << streaming_context_.log_position() + << ", sr_rb: " << streaming_context_.rolled_back() + << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id()) + << " thread_id: " << client_state_.owning_thread_id_ + << ""); +} + +void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const +{ + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "key_append: " + << "trx_id: " + << int64_t(id().get()) + << " append key:\n" << key); +} diff --git a/wsrep-lib/src/uuid.cpp b/wsrep-lib/src/uuid.cpp new file mode 100644 index 00000000..9fe41779 --- /dev/null +++ b/wsrep-lib/src/uuid.cpp @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "uuid.hpp" + +#include +#include +#include +#include + +int wsrep::uuid_scan (const char* str, size_t str_len, wsrep::uuid_t* uuid) +{ + unsigned int uuid_len = 0; + unsigned int uuid_offt = 0; + + while (uuid_len + 1 < str_len) { + /* We are skipping potential '-' after uuid_offt == 4, 6, 8, 10 + * which means + * (uuid_offt >> 1) == 2, 3, 4, 5, + * which in turn means + * (uuid_offt >> 1) - 2 <= 3 + * since it is always >= 0, because uuid_offt is unsigned */ + if (((uuid_offt >> 1) - 2) <= 3 && str[uuid_len] == '-') { + // skip dashes after 4th, 6th, 8th and 10th positions + uuid_len += 1; + continue; + } + + if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) { + // got hex digit, scan another byte to uuid, increment uuid_offt + sscanf (str + uuid_len, "%2hhx", uuid->data + uuid_offt); + uuid_len += 2; + uuid_offt += 1; + if (sizeof (uuid->data) == uuid_offt) + return static_cast(uuid_len); + } + else { + break; + } + } + + *uuid = wsrep::uuid_initializer; + return -EINVAL; +} + +int wsrep::uuid_print (const wsrep::uuid_t* uuid, char* str, size_t str_len) +{ + if (str_len > WSREP_LIB_UUID_STR_LEN) { + const unsigned char* u = uuid->data; + return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-" + "%02x%02x-%02x%02x%02x%02x%02x%02x", + u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7], + u[ 8], u[ 9], u[10], u[11], u[12], u[13], u[14], u[15]); + } + else { + return -EMSGSIZE; + } +} diff --git a/wsrep-lib/src/uuid.hpp b/wsrep-lib/src/uuid.hpp new file mode 100644 index 00000000..72812a76 --- /dev/null +++ b/wsrep-lib/src/uuid.hpp @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +/** @file uuid.hpp + * + * Helper methods to parse and print UUIDs, intended to use + * internally in wsrep-lib. + * + * The implementation is copied from wsrep-API v26. + */ + +#ifndef WSREP_UUID_HPP +#define WSREP_UUID_HPP + +#include "wsrep/compiler.hpp" + +#include + +/** + * Length of UUID string representation, not including terminating '\0'. + */ +#define WSREP_LIB_UUID_STR_LEN 36 + +namespace wsrep +{ + /** + * UUID type. + */ + typedef union uuid_ + { + unsigned char data[16]; + size_t alignment; + } uuid_t; + + static const wsrep::uuid_t uuid_initializer = {{0, }}; + + /** + * Read UUID from string. + * + * @param str String to read from + * @param str_len Length of string + * @param[out] UUID to read to + * + * @return Number of bytes read or negative error code in case + * of error. + */ + int uuid_scan(const char* str, size_t str_len, wsrep::uuid_t* uuid); + + /** + * Write UUID to string. The caller must allocate at least + * WSREP_LIB_UUID_STR_LEN + 1 space for the output str parameter. + * + * @param uuid UUID to print + * @param str[out] Output buffer + * @param str_len Size of output buffer + * + * @return Number of chars printerd, negative error code in case of + * error. + */ + int uuid_print(const wsrep::uuid_t* uuid, char* str, size_t str_len); +} + +#endif // WSREP_UUID_HPP diff --git a/wsrep-lib/src/view.cpp b/wsrep-lib/src/view.cpp new file mode 100644 index 00000000..d5bff099 --- /dev/null +++ b/wsrep-lib/src/view.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/view.hpp" +#include "wsrep/provider.hpp" + +int wsrep::view::member_index(const wsrep::id& member_id) const +{ + for (std::vector::const_iterator i(members_.begin()); + i != members_.end(); ++i) + { + if (i->id() == member_id) + { + return static_cast(i - members_.begin()); + } + } + return -1; +} + +bool wsrep::view::equal_membership(const wsrep::view& other) const +{ + if (members_.size() != other.members_.size()) + { + return false; + } + // we can't assume members ordering + for (std::vector::const_iterator i(members_.begin()); + i != members_.end(); ++i) + { + if (other.member_index(i->id()) == -1) + { + return false; + } + } + return true; +} + +void wsrep::view::print(std::ostream& os) const +{ + os << " id: " << state_id() << "\n" + << " status: " << to_c_string(status()) << "\n" + << " protocol_version: " << protocol_version() << "\n" + << " capabilities: " << provider::capability::str(capabilities())<<"\n" + << " final: " << (final() ? "yes" : "no") << "\n" + << " own_index: " << own_index() << "\n" + << " members(" << members().size() << "):\n"; + + for (std::vector::const_iterator i(members().begin()); + i != members().end(); ++i) + { + os << "\t" << (i - members().begin()) /* ordinal index */ + << ": " << i->id() + << ", " << i->name() << "\n"; + } +} diff --git a/wsrep-lib/src/wsrep_provider_v26.cpp b/wsrep-lib/src/wsrep_provider_v26.cpp new file mode 100644 index 00000000..a579dff2 --- /dev/null +++ b/wsrep-lib/src/wsrep_provider_v26.cpp @@ -0,0 +1,1121 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep_provider_v26.hpp" + +#include "wsrep/encryption_service.hpp" +#include "wsrep/server_state.hpp" +#include "wsrep/high_priority_service.hpp" +#include "wsrep/view.hpp" +#include "wsrep/exception.hpp" +#include "wsrep/logger.hpp" +#include "wsrep/thread_service.hpp" +#include "wsrep/tls_service.hpp" + +#include "thread_service_v1.hpp" +#include "tls_service_v1.hpp" +#include "v26/wsrep_api.h" + + +#include +#include +#include + +#include +#include +#include // strerror() + +namespace +{ + ///////////////////////////////////////////////////////////////////// + // Helpers // + ///////////////////////////////////////////////////////////////////// + + enum wsrep::provider::status map_return_value(wsrep_status_t status) + { + switch (status) + { + case WSREP_OK: + return wsrep::provider::success; + case WSREP_WARNING: + return wsrep::provider::error_warning; + case WSREP_TRX_MISSING: + return wsrep::provider::error_transaction_missing; + case WSREP_TRX_FAIL: + return wsrep::provider::error_certification_failed; + case WSREP_BF_ABORT: + return wsrep::provider::error_bf_abort; + case WSREP_SIZE_EXCEEDED: + return wsrep::provider::error_size_exceeded; + case WSREP_CONN_FAIL: + return wsrep::provider::error_connection_failed; + case WSREP_NODE_FAIL: + return wsrep::provider::error_provider_failed; + case WSREP_FATAL: + return wsrep::provider::error_fatal; + case WSREP_NOT_IMPLEMENTED: + return wsrep::provider::error_not_implemented; + case WSREP_NOT_ALLOWED: + return wsrep::provider::error_not_allowed; + } + + wsrep::log_warning() << "Unexpected value for wsrep_status_t: " + << status << " (" + << (status < 0 ? strerror(-status) : "") << ')'; + + return wsrep::provider::error_unknown; + } + + wsrep_key_type_t map_key_type(enum wsrep::key::type type) + { + switch (type) + { + case wsrep::key::shared: return WSREP_KEY_SHARED; + case wsrep::key::reference: return WSREP_KEY_REFERENCE; + case wsrep::key::update: return WSREP_KEY_UPDATE; + case wsrep::key::exclusive: return WSREP_KEY_EXCLUSIVE; + } + assert(0); + throw wsrep::runtime_error("Invalid key type"); + } + + static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno) + { + return seqno.get(); + } + + static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno) + { + return wsrep::seqno(seqno); + } + + template + inline uint32_t map_one(const int flags, const F from, + const T to) + { + // INT_MAX because GCC 4.4 does not eat numeric_limits::max() + // in static_assert + static_assert(WSREP_FLAGS_LAST < INT_MAX, + "WSREP_FLAGS_LAST < INT_MAX"); + return static_cast((flags & static_cast(from)) ? + static_cast(to) : 0); + } + + uint32_t map_flags_to_native(int flags) + { + using wsrep::provider; + return static_cast( + map_one(flags, provider::flag::start_transaction, + WSREP_FLAG_TRX_START) | + map_one(flags, provider::flag::commit, WSREP_FLAG_TRX_END) | + map_one(flags, provider::flag::rollback, WSREP_FLAG_ROLLBACK) | + map_one(flags, provider::flag::isolation, WSREP_FLAG_ISOLATION) | + map_one(flags, provider::flag::pa_unsafe, WSREP_FLAG_PA_UNSAFE) | + // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE) + // | + // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) | + map_one(flags, provider::flag::prepare, WSREP_FLAG_TRX_PREPARE) | + map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT) | + map_one(flags, provider::flag::implicit_deps, + WSREP_FLAG_IMPLICIT_DEPS)); + } + + int map_flags_from_native(uint32_t flags_u32) + { + using wsrep::provider; + const int flags(static_cast(flags_u32)); + return static_cast( + map_one(flags, WSREP_FLAG_TRX_START, + provider::flag::start_transaction) | + map_one(flags, WSREP_FLAG_TRX_END, provider::flag::commit) | + map_one(flags, WSREP_FLAG_ROLLBACK, provider::flag::rollback) | + map_one(flags, WSREP_FLAG_ISOLATION, provider::flag::isolation) | + map_one(flags, WSREP_FLAG_PA_UNSAFE, provider::flag::pa_unsafe) | + // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE) + // | + // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) | + map_one(flags, WSREP_FLAG_TRX_PREPARE, provider::flag::prepare) | + map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot) | + map_one(flags, WSREP_FLAG_IMPLICIT_DEPS, + provider::flag::implicit_deps)); + } + + class mutable_ws_handle + { + public: + mutable_ws_handle(wsrep::ws_handle& ws_handle) + : ws_handle_(ws_handle) + , native_((wsrep_ws_handle_t) + { + ws_handle_.transaction_id().get(), + ws_handle_.opaque() + }) + { } + + ~mutable_ws_handle() + { + ws_handle_ = wsrep::ws_handle( + wsrep::transaction_id(native_.trx_id), native_.opaque); + } + + wsrep_ws_handle_t* native() + { + return &native_; + } + private: + wsrep::ws_handle& ws_handle_; + wsrep_ws_handle_t native_; + }; + + class const_ws_handle + { + public: + const_ws_handle(const wsrep::ws_handle& ws_handle) + : ws_handle_(ws_handle) + , native_((wsrep_ws_handle_t) + { + ws_handle_.transaction_id().get(), + ws_handle_.opaque() + }) + { } + + ~const_ws_handle() + { + assert(ws_handle_.transaction_id().get() == native_.trx_id); + assert(ws_handle_.opaque() == native_.opaque); + } + + const wsrep_ws_handle_t* native() const + { + return &native_; + } + private: + const wsrep::ws_handle& ws_handle_; + const wsrep_ws_handle_t native_; + }; + + class mutable_ws_meta + { + public: + mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags) + : ws_meta_(ws_meta) + , trx_meta_() + , flags_(flags) + { + std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(), + sizeof(trx_meta_.gtid.uuid.data)); + trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno()); + std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(), + sizeof(trx_meta_.stid.node.data)); + trx_meta_.stid.conn = ws_meta.client_id().get(); + trx_meta_.stid.trx = ws_meta.transaction_id().get(); + trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on()); + } + + ~mutable_ws_meta() + { + ws_meta_ = wsrep::ws_meta( + wsrep::gtid( + wsrep::id(trx_meta_.gtid.uuid.data, + sizeof(trx_meta_.gtid.uuid.data)), + seqno_from_native(trx_meta_.gtid.seqno)), + wsrep::stid(wsrep::id(trx_meta_.stid.node.data, + sizeof(trx_meta_.stid.node.data)), + wsrep::transaction_id(trx_meta_.stid.trx), + wsrep::client_id(trx_meta_.stid.conn)), + seqno_from_native(trx_meta_.depends_on), flags_); + } + + wsrep_trx_meta* native() { return &trx_meta_; } + uint32_t native_flags() const { return map_flags_to_native(flags_); } + private: + wsrep::ws_meta& ws_meta_; + wsrep_trx_meta_t trx_meta_; + int flags_; + }; + + class const_ws_meta + { + public: + const_ws_meta(const wsrep::ws_meta& ws_meta) + : trx_meta_() + { + std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(), + sizeof(trx_meta_.gtid.uuid.data)); + trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno()); + std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(), + sizeof(trx_meta_.stid.node.data)); + trx_meta_.stid.conn = ws_meta.client_id().get(); + trx_meta_.stid.trx = ws_meta.transaction_id().get(); + trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on()); + } + + ~const_ws_meta() + { + } + + const wsrep_trx_meta* native() const { return &trx_meta_; } + private: + wsrep_trx_meta_t trx_meta_; + }; + + enum wsrep::view::status map_view_status_from_native( + wsrep_view_status_t status) + { + switch (status) + { + case WSREP_VIEW_PRIMARY: return wsrep::view::primary; + case WSREP_VIEW_NON_PRIMARY: return wsrep::view::non_primary; + case WSREP_VIEW_DISCONNECTED: return wsrep::view::disconnected; + default: throw wsrep::runtime_error("Unknown view status"); + } + } + + /** @todo Currently capabilities defined in provider.hpp + * are one to one with wsrep_api.h. However, the mapping should + * be made explicit. */ + int map_capabilities_from_native(wsrep_cap_t capabilities) + { + return static_cast(capabilities); + } + wsrep::view view_from_native(const wsrep_view_info& view_info, + const wsrep::id& own_id) + { + std::vector members; + for (int i(0); i < view_info.memb_num; ++i) + { + wsrep::id id(view_info.members[i].id.data, sizeof(view_info.members[i].id.data)); + std::string name( + view_info.members[i].name, + strnlen(view_info.members[i].name, + sizeof(view_info.members[i].name))); + std::string incoming( + view_info.members[i].incoming, + strnlen(view_info.members[i].incoming, + sizeof(view_info.members[i].incoming))); + members.push_back(wsrep::view::member(id, name, incoming)); + } + + int own_idx(-1); + if (own_id.is_undefined()) + { + // If own ID is undefined, obtain it from the view. This is + // the case on the initial connect to cluster. + own_idx = view_info.my_idx; + } + else + { + // If the node has already obtained its ID from cluster, + // its position in the view (or lack thereof) must be determined + // by the ID. + for (size_t i(0); i < members.size(); ++i) + { + if (own_id == members[i].id()) + { + own_idx = static_cast(i); + break; + } + } + } + + return wsrep::view( + wsrep::gtid( + wsrep::id(view_info.state_id.uuid.data, + sizeof(view_info.state_id.uuid.data)), + wsrep::seqno(view_info.state_id.seqno)), + wsrep::seqno(view_info.view), + map_view_status_from_native(view_info.status), + map_capabilities_from_native(view_info.capabilities), + own_idx, + view_info.proto_ver, + members); + } + + ///////////////////////////////////////////////////////////////////// + // Callbacks // + ///////////////////////////////////////////////////////////////////// + + wsrep_cb_status_t connected_cb( + void* app_ctx, + const wsrep_view_info_t* view_info) + { + assert(app_ctx); + wsrep::server_state& server_state( + *reinterpret_cast(app_ctx)); + wsrep::view view(view_from_native(*view_info, server_state.id())); + const ssize_t own_index(view.own_index()); + assert(own_index >= 0); + if (own_index < 0) + { + wsrep::log_error() << "Connected without being in reported view"; + return WSREP_CB_FAILURE; + } + assert(// first connect + server_state.id().is_undefined() || + // reconnect to primary component + server_state.id() == + view.members()[static_cast(own_index)].id()); + try + { + server_state.on_connect(view); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_error() << "Exception: " << e.what(); + return WSREP_CB_FAILURE; + } + } + + wsrep_cb_status_t view_cb(void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view_info, + const char*, + size_t) + { + assert(app_ctx); + assert(view_info); + wsrep::server_state& server_state( + *reinterpret_cast(app_ctx)); + wsrep::high_priority_service* high_priority_service( + reinterpret_cast(recv_ctx)); + try + { + wsrep::view view(view_from_native(*view_info, server_state.id())); + server_state.on_view(view, high_priority_service); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_error() << "Exception: " << e.what(); + return WSREP_CB_FAILURE; + } + } + + wsrep_cb_status_t sst_request_cb(void* app_ctx, + void **sst_req, size_t* sst_req_len) + { + assert(app_ctx); + wsrep::server_state& server_state( + *reinterpret_cast(app_ctx)); + + try + { + std::string req(server_state.prepare_for_sst()); + if (req.size() > 0) + { + *sst_req = ::malloc(req.size() + 1); + memcpy(*sst_req, req.data(), req.size() + 1); + *sst_req_len = req.size() + 1; + } + else + { + *sst_req = 0; + *sst_req_len = 0; + } + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + return WSREP_CB_FAILURE; + } + } + + int encrypt_cb(void* app_ctx, + wsrep_enc_ctx_t* enc_ctx, + const wsrep_buf_t* input, + void* output, + wsrep_enc_direction_t direction, + bool last) + { + assert(app_ctx); + wsrep::server_state& server_state( + *static_cast(app_ctx)); + + assert(server_state.encryption_service()); + if (server_state.encryption_service() == 0) + { + wsrep::log_error() << "Encryption service not defined in encrypt_cb()"; + return -1; + } + + wsrep::const_buffer key(enc_ctx->key->ptr, enc_ctx->key->len); + wsrep::const_buffer in(input->ptr, input->len); + try + { + return server_state.encryption_service()->do_crypt(&enc_ctx->ctx, + key, + enc_ctx->iv, + in, + output, + direction == WSREP_ENC, + last); + } + catch (const wsrep::runtime_error& e) + { + free(enc_ctx->ctx); + // Return negative value in case of callback error + return -1; + } + } + + wsrep_cb_status_t apply_cb(void* ctx, + const wsrep_ws_handle_t* wsh, + uint32_t flags, + const wsrep_buf_t* buf, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop) + { + wsrep::high_priority_service* high_priority_service( + reinterpret_cast(ctx)); + assert(high_priority_service); + + wsrep::const_buffer data(buf->ptr, buf->len); + wsrep::ws_handle ws_handle(wsrep::transaction_id(wsh->trx_id), + wsh->opaque); + wsrep::ws_meta ws_meta( + wsrep::gtid(wsrep::id(meta->gtid.uuid.data, + sizeof(meta->gtid.uuid.data)), + wsrep::seqno(meta->gtid.seqno)), + wsrep::stid(wsrep::id(meta->stid.node.data, + sizeof(meta->stid.node.data)), + wsrep::transaction_id(meta->stid.trx), + wsrep::client_id(meta->stid.conn)), + wsrep::seqno(seqno_from_native(meta->depends_on)), + map_flags_from_native(flags)); + try + { + if (high_priority_service->apply(ws_handle, ws_meta, data)) + { + return WSREP_CB_FAILURE; + } + *exit_loop = high_priority_service->must_exit(); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_error() << "Caught runtime error while applying " + << ws_meta.flags() << ": " + << e.what(); + return WSREP_CB_FAILURE; + } + } + + wsrep_cb_status_t synced_cb(void* app_ctx) + { + assert(app_ctx); + wsrep::server_state& server_state( + *reinterpret_cast(app_ctx)); + try + { + server_state.on_sync(); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + std::cerr << "On sync failed: " << e.what() << "\n"; + return WSREP_CB_FAILURE; + } + } + + + wsrep_cb_status_t sst_donate_cb(void* app_ctx, + void* , + const wsrep_buf_t* req_buf, + const wsrep_gtid_t* req_gtid, + const wsrep_buf_t*, + bool bypass) + { + assert(app_ctx); + wsrep::server_state& server_state( + *reinterpret_cast(app_ctx)); + try + { + std::string req(reinterpret_cast(req_buf->ptr), + req_buf->len); + wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data, + sizeof(req_gtid->uuid.data)), + wsrep::seqno(req_gtid->seqno)); + if (server_state.start_sst(req, gtid, bypass)) + { + return WSREP_CB_FAILURE; + } + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + return WSREP_CB_FAILURE; + } + } + + void logger_cb(wsrep_log_level_t level, const char* msg) + { + static const char* const pfx("P:"); // "provider" + wsrep::log::level ll(wsrep::log::unknown); + switch (level) + { + case WSREP_LOG_FATAL: + case WSREP_LOG_ERROR: + ll = wsrep::log::error; + break; + case WSREP_LOG_WARN: + ll = wsrep::log::warning; + break; + case WSREP_LOG_INFO: + ll = wsrep::log::info; + break; + case WSREP_LOG_DEBUG: + ll = wsrep::log::debug; + break; + } + wsrep::log(ll, pfx) << msg; + } + + static int init_thread_service(void* dlh, + wsrep::thread_service* thread_service) + { + assert(thread_service); + if (wsrep::thread_service_v1_probe(dlh)) + { + // No support in library. + return 1; + } + else + { + if (thread_service->before_init()) + { + wsrep::log_error() << "Thread service before init failed"; + return 1; + } + wsrep::thread_service_v1_init(dlh, thread_service); + if (thread_service->after_init()) + { + wsrep::log_error() << "Thread service after init failed"; + return 1; + } + } + return 0; + } + + static void deinit_thread_service(void* dlh) + { + // assert(not wsrep::thread_service_v1_probe(dlh)); + wsrep::thread_service_v1_deinit(dlh); + } + + static int init_tls_service(void* dlh, + wsrep::tls_service* tls_service) + { + assert(tls_service); + if (not wsrep::tls_service_v1_probe(dlh)) + { + return wsrep::tls_service_v1_init(dlh, tls_service); + } + return 1; + } + + static void deinit_tls_service(void* dlh) + { + // assert(not wsrep::tls_service_v1_probe(dlh)); + wsrep::tls_service_v1_deinit(dlh); + } +} + +void wsrep::wsrep_provider_v26::init_services( + const wsrep::provider::services& services) +{ + if (services.thread_service) + { + if (init_thread_service(wsrep_->dlh, services.thread_service)) + { + throw wsrep::runtime_error("Failed to initialize thread service"); + } + services_enabled_.thread_service = services.thread_service; + } + if (services.tls_service) + { + if (init_tls_service(wsrep_->dlh, services.tls_service)) + { + throw wsrep::runtime_error("Failed to initialze TLS service"); + } + services_enabled_.tls_service = services.tls_service; + } +} + +void wsrep::wsrep_provider_v26::deinit_services() +{ + if (services_enabled_.tls_service) + deinit_tls_service(wsrep_->dlh); + if (services_enabled_.thread_service) + deinit_thread_service(wsrep_->dlh); +} + +wsrep::wsrep_provider_v26::wsrep_provider_v26( + wsrep::server_state& server_state, + const std::string& provider_options, + const std::string& provider_spec, + const wsrep::provider::services& services) + : provider(server_state) + , wsrep_() + , services_enabled_() +{ + wsrep_gtid_t state_id; + bool encryption_enabled = server_state.encryption_service() && + server_state.encryption_service()->encryption_enabled(); + std::memcpy(state_id.uuid.data, + server_state.initial_position().id().data(), + sizeof(state_id.uuid.data)); + state_id.seqno = server_state.initial_position().seqno().get(); + struct wsrep_init_args init_args; + memset(&init_args, 0, sizeof(init_args)); + init_args.app_ctx = &server_state; + init_args.node_name = server_state_.name().c_str(); + init_args.node_address = server_state_.address().c_str(); + init_args.node_incoming = server_state_.incoming_address().c_str(); + init_args.data_dir = server_state_.working_dir().c_str(); + init_args.options = provider_options.c_str(); + init_args.proto_ver = server_state.max_protocol_version(); + init_args.state_id = &state_id; + init_args.state = 0; + init_args.logger_cb = &logger_cb; + init_args.connected_cb = &connected_cb; + init_args.view_cb = &view_cb; + init_args.sst_request_cb = &sst_request_cb; + init_args.encrypt_cb = encryption_enabled ? encrypt_cb : NULL; + init_args.apply_cb = &apply_cb; + init_args.unordered_cb = 0; + init_args.sst_donate_cb = &sst_donate_cb; + init_args.synced_cb = &synced_cb; + + if (wsrep_load(provider_spec.c_str(), &wsrep_, logger_cb)) + { + throw wsrep::runtime_error("Failed to load wsrep library"); + } + + init_services(services); + + if (wsrep_->init(wsrep_, &init_args) != WSREP_OK) + { + throw wsrep::runtime_error("Failed to initialize wsrep provider"); + } + + if (encryption_enabled) + { + const std::vector& key = server_state.get_encryption_key(); + if (key.size()) + { + wsrep::const_buffer const_key(key.data(), key.size()); + enum status const retval(enc_set_key(const_key)); + if (retval != success) + { + std::string msg("Failed to set encryption key: "); + msg += to_string(retval); + throw wsrep::runtime_error(msg); + } + } + } +} + +wsrep::wsrep_provider_v26::~wsrep_provider_v26() +{ + wsrep_->free(wsrep_); + deinit_services(); + wsrep_unload(wsrep_); +} + +enum wsrep::provider::status wsrep::wsrep_provider_v26::connect( + const std::string& cluster_name, + const std::string& cluster_url, + const std::string& state_donor, + bool bootstrap) +{ + return map_return_value(wsrep_->connect(wsrep_, + cluster_name.c_str(), + cluster_url.c_str(), + state_donor.c_str(), + bootstrap)); +} + +int wsrep::wsrep_provider_v26::disconnect() +{ + int ret(0); + wsrep_status_t wret; + if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK) + { + std::cerr << "Failed to disconnect from cluster: " + << wret << "\n"; + ret = 1; + } + return ret; +} + +int wsrep::wsrep_provider_v26::capabilities() const +{ + return map_capabilities_from_native(wsrep_->capabilities(wsrep_)); +} +int wsrep::wsrep_provider_v26::desync() +{ + return (wsrep_->desync(wsrep_) != WSREP_OK); +} + +int wsrep::wsrep_provider_v26::resync() +{ + return (wsrep_->resync(wsrep_) != WSREP_OK); +} + +wsrep::seqno wsrep::wsrep_provider_v26::pause() +{ + return wsrep::seqno(wsrep_->pause(wsrep_)); +} + +int wsrep::wsrep_provider_v26::resume() +{ + return (wsrep_->resume(wsrep_) != WSREP_OK); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::run_applier( + wsrep::high_priority_service *applier_ctx) +{ + return map_return_value(wsrep_->recv(wsrep_, applier_ctx)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::assign_read_view(wsrep::ws_handle& ws_handle, + const wsrep::gtid* gtid) +{ + const wsrep_gtid_t* gtid_ptr(NULL); + wsrep_gtid_t tmp; + + if (gtid) + { + ::memcpy(&tmp.uuid, gtid->id().data(), sizeof(tmp.uuid)); + tmp.seqno = gtid->seqno().get(); + gtid_ptr = &tmp; + } + + mutable_ws_handle mwsh(ws_handle); + return map_return_value(wsrep_->assign_read_view(wsrep_, mwsh.native(), + gtid_ptr)); +} + +int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle, + const wsrep::key& key) +{ + if (key.size() > 3) + { + assert(0); + return 1; + } + wsrep_buf_t key_parts[3]; + for (size_t i(0); i < key.size(); ++i) + { + key_parts[i].ptr = key.key_parts()[i].ptr(); + key_parts[i].len = key.key_parts()[i].size(); + } + wsrep_key_t wsrep_key = {key_parts, key.size()}; + mutable_ws_handle mwsh(ws_handle); + return (wsrep_->append_key( + wsrep_, mwsh.native(), + &wsrep_key, 1, map_key_type(key.type()), true) + != WSREP_OK); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle, + const wsrep::const_buffer& data) +{ + const wsrep_buf_t wsrep_buf = {data.data(), data.size()}; + mutable_ws_handle mwsh(ws_handle); + return map_return_value( + wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf, + 1, WSREP_DATA_ORDERED, true)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id, + wsrep::ws_handle& ws_handle, + int flags, + wsrep::ws_meta& ws_meta) +{ + mutable_ws_handle mwsh(ws_handle); + mutable_ws_meta mmeta(ws_meta, flags); + return map_return_value( + wsrep_->certify(wsrep_, client_id.get(), mwsh.native(), + mmeta.native_flags(), + mmeta.native())); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::bf_abort( + wsrep::seqno bf_seqno, + wsrep::transaction_id victim_id, + wsrep::seqno& victim_seqno) +{ + wsrep_seqno_t wsrep_victim_seqno; + wsrep_status_t ret( + wsrep_->abort_certification( + wsrep_, seqno_to_native(bf_seqno), + victim_id.get(), &wsrep_victim_seqno)); + victim_seqno = seqno_from_native(wsrep_victim_seqno); + return map_return_value(ret); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id) +{ + return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::commit_order_enter( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + const_ws_handle cwsh(ws_handle); + const_ws_meta cwsm(ws_meta); + return map_return_value( + wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native())); +} + +int +wsrep::wsrep_provider_v26::commit_order_leave( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::mutable_buffer& err) +{ + const_ws_handle cwsh(ws_handle); + const_ws_meta cwsm(ws_meta); + wsrep_buf_t const err_buf = { err.data(), err.size() }; + int ret(wsrep_->commit_order_leave( + wsrep_, cwsh.native(), cwsm.native(), &err_buf) != WSREP_OK); + return ret; +} + +int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle) +{ + mutable_ws_handle mwsh(ws_handle); + return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle, + wsrep::high_priority_service* reply_service) +{ + const_ws_handle mwsh(ws_handle); + return map_return_value( + wsrep_->replay_trx(wsrep_, mwsh.native(), reply_service)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::enter_toi( + wsrep::client_id client_id, + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + wsrep::ws_meta& ws_meta, + int flags) +{ + mutable_ws_meta mmeta(ws_meta, flags); + std::vector > key_parts; + std::vector wsrep_keys; + wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()}; + for (size_t i(0); i < keys.size(); ++i) + { + key_parts.push_back(std::vector()); + for (size_t kp(0); kp < keys[i].size(); ++kp) + { + wsrep_buf_t buf = {keys[i].key_parts()[kp].data(), + keys[i].key_parts()[kp].size()}; + key_parts[i].push_back(buf); + } + } + for (size_t i(0); i < key_parts.size(); ++i) + { + wsrep_key_t key = {key_parts[i].data(), key_parts[i].size()}; + wsrep_keys.push_back(key); + } + return map_return_value(wsrep_->to_execute_start( + wsrep_, + client_id.get(), + wsrep_keys.data(), + wsrep_keys.size(), + &wsrep_buf, + buffer.size() ? 1 : 0, + mmeta.native_flags(), + mmeta.native())); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id, + const wsrep::mutable_buffer& err) +{ + const wsrep_buf_t err_buf = { err.data(), err.size() }; + return map_return_value(wsrep_->to_execute_end( + wsrep_, client_id.get(), &err_buf)); +} + +std::pair +wsrep::wsrep_provider_v26::causal_read(int timeout) const +{ + wsrep_gtid_t wsrep_gtid; + wsrep_status_t ret(wsrep_->sync_wait(wsrep_, 0, timeout, &wsrep_gtid)); + wsrep::gtid gtid(ret == WSREP_OK ? + wsrep::gtid(wsrep::id(wsrep_gtid.uuid.data, + sizeof(wsrep_gtid.uuid.data)), + wsrep::seqno(wsrep_gtid.seqno)) : + wsrep::gtid::undefined()); + return std::make_pair(gtid, map_return_value(ret)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout) + const +{ + wsrep_gtid_t wsrep_gtid; + std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(), + sizeof(wsrep_gtid.uuid.data)); + wsrep_gtid.seqno = gtid.seqno().get(); + return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0)); +} + +wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const +{ + wsrep_gtid_t wsrep_gtid; + if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK) + { + throw wsrep::runtime_error("Failed to read last committed id"); + } + return wsrep::gtid( + wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)), + wsrep::seqno(wsrep_gtid.seqno)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err) +{ + wsrep_gtid_t wsrep_gtid; + std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(), + sizeof(wsrep_gtid.uuid.data)); + wsrep_gtid.seqno = gtid.seqno().get(); + return map_return_value(wsrep_->sst_sent(wsrep_, &wsrep_gtid, err)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::sst_received(const wsrep::gtid& gtid, int err) +{ + wsrep_gtid_t wsrep_gtid; + std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(), + sizeof(wsrep_gtid.uuid.data)); + wsrep_gtid.seqno = gtid.seqno().get(); + return map_return_value(wsrep_->sst_received(wsrep_, &wsrep_gtid, 0, err)); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::enc_set_key(const wsrep::const_buffer& key) +{ + wsrep_enc_key_t enc_key = {key.data(), key.size()}; + return map_return_value(wsrep_->enc_set_key(wsrep_, &enc_key)); +} + +std::vector +wsrep::wsrep_provider_v26::status() const +{ + std::vector ret; + wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_)); + wsrep_stats_var* i(stats); + if (i) + { + while (i->name) + { + switch (i->type) + { + case WSREP_VAR_STRING: + ret.push_back(status_variable(i->name, i->value._string)); + break; + case WSREP_VAR_INT64: + { + std::ostringstream os; + os << i->value._int64; + ret.push_back(status_variable(i->name, os.str())); + break; + } + case WSREP_VAR_DOUBLE: + { + std::ostringstream os; + os << i->value._double; + ret.push_back(status_variable(i->name, os.str())); + break; + } + default: + assert(0); + break; + } + ++i; + } + wsrep_->stats_free(wsrep_, stats); + } + return ret; +} + +void wsrep::wsrep_provider_v26::reset_status() +{ + wsrep_->stats_reset(wsrep_); +} + +std::string wsrep::wsrep_provider_v26::options() const +{ + std::string ret; + char* opts; + if ((opts = wsrep_->options_get(wsrep_))) + { + ret = opts; + free(opts); + } + else + { + throw wsrep::runtime_error("Failed to get provider options"); + } + return ret; +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::options(const std::string& opts) +{ + return map_return_value(wsrep_->options_set(wsrep_, opts.c_str())); +} + +std::string wsrep::wsrep_provider_v26::name() const +{ + return (wsrep_->provider_name ? wsrep_->provider_name : "unknown"); +} + +std::string wsrep::wsrep_provider_v26::version() const +{ + return (wsrep_->provider_version ? wsrep_->provider_version : "unknown"); +} + +std::string wsrep::wsrep_provider_v26::vendor() const +{ + return (wsrep_->provider_vendor ? wsrep_->provider_vendor : "unknown"); +} + +void* wsrep::wsrep_provider_v26::native() const +{ + return wsrep_; +} diff --git a/wsrep-lib/src/wsrep_provider_v26.hpp b/wsrep-lib/src/wsrep_provider_v26.hpp new file mode 100644 index 00000000..1859b679 --- /dev/null +++ b/wsrep-lib/src/wsrep_provider_v26.hpp @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#ifndef WSREP_WSREP_PROVIDER_V26_HPP +#define WSREP_WSREP_PROVIDER_V26_HPP + +#include "wsrep/provider.hpp" + +struct wsrep_st; + +namespace wsrep +{ + class thread_service; + class wsrep_provider_v26 : public wsrep::provider + { + public: + void init_services(const wsrep::provider::services& services); + void deinit_services(); + wsrep_provider_v26(wsrep::server_state&, const std::string&, + const std::string&, + const wsrep::provider::services& services); + ~wsrep_provider_v26(); + enum wsrep::provider::status + connect(const std::string&, const std::string&, const std::string&, + bool) WSREP_OVERRIDE; + int disconnect() WSREP_OVERRIDE; + int capabilities() const WSREP_OVERRIDE; + + int desync() WSREP_OVERRIDE; + int resync() WSREP_OVERRIDE; + wsrep::seqno pause() WSREP_OVERRIDE; + int resume() WSREP_OVERRIDE; + + enum wsrep::provider::status + run_applier(wsrep::high_priority_service*) WSREP_OVERRIDE; + int start_transaction(wsrep::ws_handle&) WSREP_OVERRIDE { return 0; } + enum wsrep::provider::status + assign_read_view(wsrep::ws_handle&, const wsrep::gtid*) WSREP_OVERRIDE; + int append_key(wsrep::ws_handle&, const wsrep::key&) WSREP_OVERRIDE; + enum wsrep::provider::status + append_data(wsrep::ws_handle&, const wsrep::const_buffer&) + WSREP_OVERRIDE; + enum wsrep::provider::status + certify(wsrep::client_id, wsrep::ws_handle&, + int, + wsrep::ws_meta&) WSREP_OVERRIDE; + enum wsrep::provider::status + bf_abort(wsrep::seqno, + wsrep::transaction_id, + wsrep::seqno&) WSREP_OVERRIDE; + enum wsrep::provider::status + rollback(const wsrep::transaction_id) WSREP_OVERRIDE; + enum wsrep::provider::status + commit_order_enter(const wsrep::ws_handle&, + const wsrep::ws_meta&) WSREP_OVERRIDE; + int commit_order_leave(const wsrep::ws_handle&, + const wsrep::ws_meta&, + const wsrep::mutable_buffer&) WSREP_OVERRIDE; + int release(wsrep::ws_handle&) WSREP_OVERRIDE; + enum wsrep::provider::status replay(const wsrep::ws_handle&, + wsrep::high_priority_service*) + WSREP_OVERRIDE; + enum wsrep::provider::status enter_toi(wsrep::client_id, + const wsrep::key_array&, + const wsrep::const_buffer&, + wsrep::ws_meta&, + int) + WSREP_OVERRIDE; + enum wsrep::provider::status leave_toi(wsrep::client_id, + const wsrep::mutable_buffer&) + WSREP_OVERRIDE; + std::pair + causal_read(int) const WSREP_OVERRIDE; + enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&, int) + const WSREP_OVERRIDE; + wsrep::gtid last_committed_gtid() const WSREP_OVERRIDE; + enum wsrep::provider::status sst_sent(const wsrep::gtid&, int) + WSREP_OVERRIDE; + enum wsrep::provider::status sst_received(const wsrep::gtid& gtid, int) + WSREP_OVERRIDE; + enum wsrep::provider::status enc_set_key(const wsrep::const_buffer& key) + WSREP_OVERRIDE; + std::vector status() const WSREP_OVERRIDE; + void reset_status() WSREP_OVERRIDE; + std::string options() const WSREP_OVERRIDE; + enum wsrep::provider::status options(const std::string&) WSREP_OVERRIDE; + std::string name() const WSREP_OVERRIDE; + std::string version() const WSREP_OVERRIDE; + std::string vendor() const WSREP_OVERRIDE; + void* native() const WSREP_OVERRIDE; + private: + wsrep_provider_v26(const wsrep_provider_v26&); + wsrep_provider_v26& operator=(const wsrep_provider_v26); + struct wsrep_st* wsrep_; + services services_enabled_; + }; +} + + +#endif // WSREP_WSREP_PROVIDER_V26_HPP diff --git a/wsrep-lib/src/xid.cpp b/wsrep-lib/src/xid.cpp new file mode 100644 index 00000000..7757e445 --- /dev/null +++ b/wsrep-lib/src/xid.cpp @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/xid.hpp" +#include + +std::string wsrep::to_string(const wsrep::xid& xid) +{ + return std::string(xid.data_.data(), xid.data_.size()); +} + +std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::xid& xid) +{ + return os << to_string(xid); +} -- cgit v1.2.3