From a68fb2d8219f6bccc573009600e9f23e89226a5e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 20:04:16 +0200 Subject: Adding upstream version 1:10.6.11. Signed-off-by: Daniel Baumann --- wsrep-lib/src/server_state.cpp | 1612 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1612 insertions(+) create mode 100644 wsrep-lib/src/server_state.cpp (limited to 'wsrep-lib/src/server_state.cpp') diff --git a/wsrep-lib/src/server_state.cpp b/wsrep-lib/src/server_state.cpp new file mode 100644 index 00000000..f07de3d9 --- /dev/null +++ b/wsrep-lib/src/server_state.cpp @@ -0,0 +1,1612 @@ +/* + * Copyright (C) 2018-2019 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/server_state.hpp" +#include "wsrep/client_state.hpp" +#include "wsrep/server_service.hpp" +#include "wsrep/high_priority_service.hpp" +#include "wsrep/transaction.hpp" +#include "wsrep/view.hpp" +#include "wsrep/logger.hpp" +#include "wsrep/compiler.hpp" +#include "wsrep/id.hpp" + +#include +#include +#include + + +////////////////////////////////////////////////////////////////////////////// +// Helpers // +////////////////////////////////////////////////////////////////////////////// + + +// +// This method is used to deal with historical burden of several +// ways to bootstrap the cluster. Bootstrap happens if +// +// * bootstrap option is given +// * cluster_address is "gcomm://" (Galera provider) +// +static bool is_bootstrap(const std::string& cluster_address, bool bootstrap) +{ + return (bootstrap || cluster_address == "gcomm://"); +} + +// Helper method to provide detailed error message if transaction +// adopt for fragment removal fails. +static void log_adopt_error(const wsrep::transaction& transaction) +{ + wsrep::log_warning() << "Adopting a transaction (" + << transaction.server_id() << "," << transaction.id() + << ") for rollback failed, " + << "this may leave stale entries to streaming log " + << "which may need to be removed manually."; +} + +// resolve which of the two errors return to caller +static inline int resolve_return_error(bool const vote, + int const vote_err, + int const apply_err) +{ + if (vote) return vote_err; + return vote_err != 0 ? vote_err : apply_err; +} + +static void +discard_streaming_applier(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_meta& ws_meta) +{ + server_state.stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + server_state.server_service().release_high_priority_service( + streaming_applier); + high_priority_service.store_globals(); +} + +static int apply_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret(0); + int apply_err; + wsrep::mutable_buffer err; + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + apply_err = streaming_applier->apply_write_set(ws_meta, data, err); + if (!apply_err) + { + assert(err.size() == 0); + streaming_applier->after_apply(); + } + else + { + bool const remove_fragments(streaming_applier->transaction( + ).streaming_context().fragments().size() > 0); + ret = streaming_applier->rollback(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + + if (remove_fragments) + { + ret = ret || streaming_applier->start_transaction(ws_handle, + ws_meta); + ret = ret || (streaming_applier->adopt_apply_error(err), 0); + ret = ret || streaming_applier->remove_fragments(ws_meta); + ret = ret || streaming_applier->commit(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + } + else + { + ret = streaming_applier->log_dummy_write_set(ws_handle, + ws_meta, err); + } + } + } + + if (!ret) + { + if (!apply_err) + { + high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); + const wsrep::xid xid(streaming_applier->transaction().xid()); + ret = high_priority_service.append_fragment_and_commit( + ws_handle, ws_meta, data, xid); + high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + discard_streaming_applier(server_state, + high_priority_service, + streaming_applier, + ws_meta); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } + } + + return ret; +} + +static int commit_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret(0); + { + wsrep::high_priority_switch sw( + high_priority_service, *streaming_applier); + wsrep::mutable_buffer err; + int const apply_err( + streaming_applier->apply_write_set(ws_meta, data, err)); + if (apply_err) + { + assert(streaming_applier->transaction( + ).streaming_context().fragments().size() > 0); + ret = streaming_applier->rollback(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + ret = ret || streaming_applier->start_transaction( + ws_handle, ws_meta); + ret = ret || (streaming_applier->adopt_apply_error(err),0); + } + else + { + assert(err.size() == 0); + } + + const wsrep::transaction& trx(streaming_applier->transaction()); + // Fragment removal for XA is going to happen in after_commit + if (trx.state() != wsrep::transaction::s_prepared) + { + streaming_applier->debug_crash( + "crash_apply_cb_before_fragment_removal"); + + ret = ret || streaming_applier->remove_fragments(ws_meta); + + streaming_applier->debug_crash( + "crash_apply_cb_after_fragment_removal"); + } + + streaming_applier->debug_crash( + "crash_commit_cb_before_last_fragment_commit"); + ret = ret || streaming_applier->commit(ws_handle, ws_meta); + streaming_applier->debug_crash( + "crash_commit_cb_last_fragment_commit_success"); + ret = ret || (streaming_applier->after_apply(), 0); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } + + if (!ret) + { + discard_streaming_applier(server_state, high_priority_service, + streaming_applier, ws_meta); + } + + return ret; +} + +static int rollback_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + int ret(0); + int adopt_error(0); + bool const remove_fragments(streaming_applier->transaction(). + streaming_context().fragments().size() > 0); + // If fragment removal is needed, adopt transaction state + // and start a transaction for it. + if (remove_fragments && + (adopt_error = high_priority_service.adopt_transaction( + streaming_applier->transaction()))) + { + log_adopt_error(streaming_applier->transaction()); + } + // Even if the adopt above fails we roll back the streaming transaction. + // Adopt failure will leave stale entries in streaming log which can + // be removed manually. + wsrep::const_buffer no_error; + { + wsrep::high_priority_switch ws( + high_priority_service, *streaming_applier); + // Streaming applier rolls back out of order. Fragment + // removal grabs commit order below. + ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta()); + ret = ret || (streaming_applier->after_apply(), 0); + } + + if (!ret) + { + discard_streaming_applier(server_state, high_priority_service, + streaming_applier, ws_meta); + + if (adopt_error == 0) + { + if (remove_fragments) + { + ret = high_priority_service.remove_fragments(ws_meta); + ret = ret || high_priority_service.commit(ws_handle, ws_meta); + if (ret) + { + high_priority_service.rollback(ws_handle, ws_meta); + } + high_priority_service.after_apply(); + } + else + { + if (ws_meta.ordered()) + { + wsrep::mutable_buffer no_error; + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + } + } + } + return ret; +} + +static int apply_write_set(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret(0); + if (wsrep::rolls_back_transaction(ws_meta.flags())) + { + wsrep::mutable_buffer no_error; + if (wsrep::starts_transaction(ws_meta.flags())) + { + // No transaction existed before, log a dummy write set + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is a known limitation that galera provider + // cannot always determine if certification test + // for interrupted transaction will pass or fail + // (see comments in transaction::certify_fragment()). + // As a consequence, unnecessary rollback fragments + // may be delivered here. The message below has + // been intentionally turned into a debug message, + // rather than warning. + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id()); + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + // rollback_fragment() consumes sa + ret = rollback_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta); + } + } + } + else if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + ret = high_priority_service.start_transaction(ws_handle, ws_meta); + if (!ret) + { + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_write_set( + ws_meta, data, err)); + if (!apply_err) + { + assert(err.size() == 0); + ret = high_priority_service.commit(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + ret = high_priority_service.rollback(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + ret = ret || high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, err); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } + } + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + assert(server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()) == 0); + wsrep::high_priority_service* sa( + server_state.server_service().streaming_applier_service( + high_priority_service)); + server_state.start_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id(), sa); + sa->start_transaction(ws_handle, ws_meta); + ret = apply_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + else if (ws_meta.flags() == 0 || ws_meta.flags() == wsrep::provider::flag::pa_unsafe || + wsrep::prepares_transaction(ws_meta.flags())) + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + wsrep::mutable_buffer no_error; + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + sa->next_fragment(ws_meta); + ret = apply_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + if (high_priority_service.is_replaying()) + { + wsrep::mutable_buffer unused; + ret = high_priority_service.start_transaction( + ws_handle, ws_meta) || + high_priority_service.apply_write_set(ws_meta, data, unused) || + high_priority_service.commit(ws_handle, ws_meta); + } + else + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() + << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + wsrep::mutable_buffer no_error; + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + else + { + // Commit fragment consumes sa + sa->next_fragment(ws_meta); + ret = commit_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + } + } + else + { + assert(0); + } + if (ret) + { + wsrep::log_error() << "Failed to apply write set: " << ws_meta; + } + return ret; +} + +static int apply_toi(wsrep::provider& provider, + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + // + // Regular TOI. + // + provider.commit_order_enter(ws_handle, ws_meta); + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_toi(ws_meta,data,err)); + int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err)); + return resolve_return_error(err.size() > 0, vote_err, apply_err); + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + provider.commit_order_enter(ws_handle, ws_meta); + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_nbo_begin(ws_meta, data, err)); + int const vote_err(provider.commit_order_leave(ws_handle, ws_meta, err)); + return resolve_return_error(err.size() > 0, vote_err, apply_err); + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + // NBO end event is ignored here, both local and applied + // have NBO end handled via local TOI calls. + provider.commit_order_enter(ws_handle, ws_meta); + wsrep::mutable_buffer err; + provider.commit_order_leave(ws_handle, ws_meta, err); + return 0; + } + else + { + assert(0); + return 0; + } +} + +////////////////////////////////////////////////////////////////////////////// +// Server State // +////////////////////////////////////////////////////////////////////////////// + +int wsrep::server_state::load_provider( + const std::string& provider_spec, const std::string& provider_options, + const wsrep::provider::services& services) +{ + wsrep::log_info() << "Loading provider " << provider_spec + << " initial position: " << initial_position_; + + provider_ = wsrep::provider::make_provider(*this, + provider_spec, + provider_options, + services); + return (provider_ ? 0 : 1); +} + +void wsrep::server_state::unload_provider() +{ + delete provider_; + provider_ = 0; +} + +int wsrep::server_state::connect(const std::string& cluster_name, + const std::string& cluster_address, + const std::string& state_donor, + bool bootstrap) +{ + bootstrap_ = is_bootstrap(cluster_address, bootstrap); + wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_; + return provider().connect(cluster_name, cluster_address, state_donor, + bootstrap_); +} + +int wsrep::server_state::disconnect() +{ + { + wsrep::unique_lock lock(mutex_); + // In case of failure situations which are caused by provider + // being shut down some failing operation may also try to shut + // down the replication. Check the state here and + // return success if the provider disconnect is already in progress + // or has completed. + if (state(lock) == s_disconnecting || state(lock) == s_disconnected) + { + return 0; + } + state(lock, s_disconnecting); + interrupt_state_waiters(lock); + } + return provider().disconnect(); +} + +wsrep::server_state::~server_state() +{ + delete provider_; +} + +std::vector +wsrep::server_state::status() const +{ + return provider().status(); +} + + +wsrep::seqno wsrep::server_state::pause() +{ + wsrep::unique_lock lock(mutex_); + // Disallow concurrent calls to pause to in order to have non-concurrent + // access to desynced_on_pause_ which is checked in resume() call. + wsrep::log_info() << "pause"; + while (pause_count_ > 0) + { + cond_.wait(lock); + } + ++pause_count_; + assert(pause_seqno_.is_undefined()); + lock.unlock(); + pause_seqno_ = provider().pause(); + lock.lock(); + if (pause_seqno_.is_undefined()) + { + --pause_count_; + } + return pause_seqno_; +} + +void wsrep::server_state::resume() +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_info() << "resume"; + assert(pause_seqno_.is_undefined() == false); + assert(pause_count_ == 1); + if (provider().resume()) + { + throw wsrep::runtime_error("Failed to resume provider"); + } + pause_seqno_ = wsrep::seqno::undefined(); + --pause_count_; + cond_.notify_all(); +} + +wsrep::seqno wsrep::server_state::desync_and_pause() +{ + wsrep::log_info() << "Desyncing and pausing the provider"; + // Temporary variable to store desync() return status. This will be + // assigned to desynced_on_pause_ after pause() call to prevent + // concurrent access to member variable desynced_on_pause_. + bool desync_successful; + if (desync()) + { + // Desync may give transient error if the provider cannot + // communicate with the rest of the cluster. However, this + // error can be tolerated because if the provider can be + // paused successfully below. + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Failed to desync server before pause"); + desync_successful = false; + } + else + { + desync_successful = true; + } + wsrep::seqno ret(pause()); + if (ret.is_undefined()) + { + wsrep::log_warning() << "Failed to pause provider"; + resync(); + return wsrep::seqno::undefined(); + } + else + { + desynced_on_pause_ = desync_successful; + } + wsrep::log_info() << "Provider paused at: " << ret; + return ret; +} + +void wsrep::server_state::resume_and_resync() +{ + wsrep::log_info() << "Resuming and resyncing the provider"; + try + { + // Assign desynced_on_pause_ to local variable before resuming + // in order to avoid concurrent access to desynced_on_pause_ member + // variable. + bool do_resync = desynced_on_pause_; + desynced_on_pause_ = false; + resume(); + if (do_resync) + { + resync(); + } + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_warning() + << "Resume and resync failed, server may have to be restarted"; + } +} + +std::string wsrep::server_state::prepare_for_sst() +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_joiner); + lock.unlock(); + return server_service_.sst_request(); +} + +int wsrep::server_state::start_sst(const std::string& sst_request, + const wsrep::gtid& gtid, + bool bypass) +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_donor); + int ret(0); + lock.unlock(); + if (server_service_.start_sst(sst_request, gtid, bypass)) + { + lock.lock(); + wsrep::log_warning() << "SST preparation failed"; + // v26 API does not have JOINED event, so in anticipation of SYNCED + // we must do it here. + state(lock, s_joined); + ret = 1; + } + return ret; +} + +void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error) +{ + if (0 == error) + wsrep::log_info() << "SST sent: " << gtid; + else + wsrep::log_info() << "SST sending failed: " << error; + + wsrep::unique_lock lock(mutex_); + // v26 API does not have JOINED event, so in anticipation of SYNCED + // we must do it here. + state(lock, s_joined); + lock.unlock(); + enum provider::status const retval(provider().sst_sent(gtid, error)); + if (retval != provider::success) + { + std::string msg("wsrep::sst_sent() returned an error: "); + msg += wsrep::provider::to_string(retval); + server_service_.log_message(wsrep::log::warning, msg.c_str()); + } +} + +void wsrep::server_state::sst_received(wsrep::client_service& cs, + int const error) +{ + wsrep::log_info() << "SST received"; + wsrep::gtid gtid(wsrep::gtid::undefined()); + wsrep::unique_lock lock(mutex_); + assert(state_ == s_joiner || state_ == s_initialized); + + // Run initialization only if the SST was successful. + // In case of SST failure the system is in undefined state + // may not be recoverable. + if (error == 0) + { + if (server_service_.sst_before_init()) + { + if (init_initialized_ == false) + { + state(lock, s_initializing); + lock.unlock(); + server_service_.debug_sync("on_view_wait_initialized"); + lock.lock(); + wait_until_state(lock, s_initialized); + assert(init_initialized_); + } + } + lock.unlock(); + + if (id_.is_undefined()) + { + assert(0); + throw wsrep::runtime_error( + "wsrep::sst_received() called before connection to cluster"); + } + + gtid = server_service_.get_position(cs); + wsrep::log_info() << "Recovered position from storage: " << gtid; + + lock.lock(); + if (gtid.seqno() >= connected_gtid().seqno()) + { + /* Now the node has all the data the cluster has: part in + * storage, part in replication event queue. */ + state(lock, s_joined); + } + lock.unlock(); + + wsrep::view const v(server_service_.get_view(cs, id_)); + wsrep::log_info() << "Recovered view from SST:\n" << v; + + /* + * If the state id from recovered view has undefined ID, we may + * be upgrading from earlier version which does not provide + * view stored in stable storage. In this case we skip + * sanity checks and assigning the current view and wait + * until the first view delivery. + */ + if (v.state_id().id().is_undefined() == false) + { + if (v.state_id().id() != gtid.id() || + v.state_id().seqno() > gtid.seqno()) + { + /* Since IN GENERAL we may not be able to recover SST GTID from + * the state data, we have to rely on SST script passing the + * GTID value explicitly. + * Here we check if the passed GTID makes any sense: it should + * have the same UUID and greater or equal seqno than the last + * logged view. */ + std::ostringstream msg; + msg << "SST script passed bogus GTID: " << gtid + << ". Preceding view GTID: " << v.state_id(); + throw wsrep::runtime_error(msg.str()); + } + + if (current_view_.status() == wsrep::view::primary) + { + previous_primary_view_ = current_view_; + } + current_view_ = v; + server_service_.log_view(NULL /* this view is stored already */, v); + } + else + { + wsrep::log_warning() + << "View recovered from stable storage was empty. If the " + << "server is doing rolling upgrade from previous version " + << "which does not support storing view info into stable " + << "storage, this is ok. Otherwise this may be a sign of " + << "malfunction."; + } + lock.lock(); + recover_streaming_appliers_if_not_recovered(lock, cs); + lock.unlock(); + } + + enum provider::status const retval(provider().sst_received(gtid, error)); + if (retval != provider::success) + { + std::string msg("wsrep::sst_received() failed: "); + msg += wsrep::provider::to_string(retval); + throw wsrep::runtime_error(msg); + } +} + +void wsrep::server_state::initialized() +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_info() << "Server initialized"; + init_initialized_ = true; + if (server_service_.sst_before_init()) + { + state(lock, s_initialized); + } + else + { + state(lock, s_initializing); + state(lock, s_initialized); + } +} + +enum wsrep::provider::status +wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout) + const +{ + return provider().wait_for_gtid(gtid, timeout); +} + +int +wsrep::server_state::set_encryption_key(std::vector& key) +{ + encryption_key_ = key; + if (state_ != s_disconnected) + { + wsrep::const_buffer const key(encryption_key_.data(), + encryption_key_.size()); + enum provider::status const retval(provider_->enc_set_key(key)); + if (retval != provider::success) + { + wsrep::log_error() << "Failed to set encryption key: " + << provider::to_string(retval); + return 1; + } + } + return 0; +} + +std::pair +wsrep::server_state::causal_read(int timeout) const +{ + return provider().causal_read(timeout); +} + +void wsrep::server_state::on_connect(const wsrep::view& view) +{ + // Sanity checks + if (view.own_index() < 0 || + size_t(view.own_index()) >= view.members().size()) + { + std::ostringstream os; + os << "Invalid view on connect: own index out of range: " << view; +#ifndef NDEBUG + wsrep::log_error() << os.str(); + assert(0); +#endif + throw wsrep::runtime_error(os.str()); + } + + const size_t own_index(static_cast(view.own_index())); + if (id_.is_undefined() == false && id_ != view.members()[own_index].id()) + { + std::ostringstream os; + os << "Connection in connected state.\n" + << "Connected view:\n" << view + << "Previous view:\n" << current_view_ + << "Current own ID: " << id_; +#ifndef NDEBUG + wsrep::log_error() << os.str(); + assert(0); +#endif + throw wsrep::runtime_error(os.str()); + } + else + { + id_ = view.members()[own_index].id(); + } + + wsrep::log_info() << "Server " + << name_ + << " connected to cluster at position " + << view.state_id() + << " with ID " + << id_; + + wsrep::unique_lock lock(mutex_); + connected_gtid_ = view.state_id(); + state(lock, s_connected); +} + +void wsrep::server_state::on_primary_view( + const wsrep::view& view, + wsrep::high_priority_service* high_priority_service) +{ + wsrep::unique_lock lock(mutex_); + assert(view.final() == false); + // + // Reached primary from connected state. This may mean the following + // + // 1) Server was joined to the cluster and got SST transfer + // 2) Server was partitioned from the cluster and got back + // 3) A new cluster was bootstrapped from non-prim cluster + // + // There is no enough information here what was the cause + // of the primary component, so we need to walk through + // all states leading to joined to notify possible state + // waiters in other threads. + // + if (server_service_.sst_before_init()) + { + if (state_ == s_connected) + { + state(lock, s_joiner); + // We need to assign init_initialized_ here to local + // variable. If the value here was false, we need to skip + // the initializing -> initialized -> joined state cycle + // below. However, if we don't assign the value to + // local, it is possible that the main thread gets control + // between changing the state to initializing and checking + // initialized flag, which may cause the initialzing -> initialized + // state change to be executed even if it should not be. + const bool was_initialized(init_initialized_); + state(lock, s_initializing); + if (was_initialized) + { + // If server side has already been initialized, + // skip directly to s_joined. + state(lock, s_initialized); + } + } + } + else + { + if (state_ == s_connected) + { + state(lock, s_joiner); + } + } + if (init_initialized_ == false) + { + lock.unlock(); + server_service_.debug_sync("on_view_wait_initialized"); + lock.lock(); + wait_until_state(lock, s_initialized); + } + assert(init_initialized_); + + if (bootstrap_) + { + server_service_.bootstrap(); + bootstrap_ = false; + } + + assert(high_priority_service); + + if (high_priority_service) + { + recover_streaming_appliers_if_not_recovered(lock, + *high_priority_service); + close_orphaned_sr_transactions(lock, *high_priority_service); + } + + if (state(lock) < s_joined && + view.state_id().seqno() >= connected_gtid().seqno()) + { + // If we progressed beyond connected seqno, it means we have full state + state(lock, s_joined); + } +} + +void wsrep::server_state::on_non_primary_view( + const wsrep::view& view, + wsrep::high_priority_service* high_priority_service) +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_info() << "Non-primary view"; + if (view.final()) + { + go_final(lock, view, high_priority_service); + } + else if (state_ != s_disconnecting) + { + state(lock, s_connected); + } +} + +void wsrep::server_state::go_final(wsrep::unique_lock& lock, + const wsrep::view& view, + wsrep::high_priority_service* hps) +{ + (void)view; // avoid compiler warning "unused parameter 'view'" + assert(view.final()); + assert(hps); + if (hps) + { + close_transactions_at_disconnect(*hps); + } + state(lock, s_disconnected); + id_ = wsrep::id::undefined(); +} + +void wsrep::server_state::on_view(const wsrep::view& view, + wsrep::high_priority_service* high_priority_service) +{ + wsrep::log_info() + << "================================================\nView:\n" + << view + << "================================================="; + if (current_view_.status() == wsrep::view::primary) + { + previous_primary_view_ = current_view_; + } + current_view_ = view; + switch (view.status()) + { + case wsrep::view::primary: + on_primary_view(view, high_priority_service); + break; + case wsrep::view::non_primary: + on_non_primary_view(view, high_priority_service); + break; + case wsrep::view::disconnected: + { + wsrep::unique_lock lock(mutex_); + go_final(lock, view, high_priority_service); + break; + } + default: + wsrep::log_warning() << "Unrecognized view status: " << view.status(); + assert(0); + } + + server_service_.log_view(high_priority_service, view); +} + +void wsrep::server_state::on_sync() +{ + wsrep::log_info() << "Server " << name_ << " synced with group"; + wsrep::unique_lock lock(mutex_); + + // Initial sync + if (server_service_.sst_before_init() && init_synced_ == false) + { + switch (state_) + { + case s_synced: + break; + case s_connected: // Seed node path: provider becomes + state(lock, s_joiner); // synced with itself before anything + WSREP_FALLTHROUGH; // else. Then goes DB initialization. + case s_joiner: // | + state(lock, s_initializing); // V + break; + case s_donor: + assert(false); // this should never happen + state(lock, s_joined); + state(lock, s_synced); + break; + case s_initialized: + state(lock, s_joined); + WSREP_FALLTHROUGH; + default: + /* State */ + state(lock, s_synced); + }; + } + else + { + // Calls to on_sync() in synced state are possible if + // server desyncs itself from the group. Provider does not + // inform about this through callbacks. + if (state_ != s_synced) + { + state(lock, s_synced); + } + } + init_synced_ = true; + + enum wsrep::provider::status status(send_pending_rollback_events(lock)); + if (status) + { + // TODO should be retried? + wsrep::log_warning() + << "Failed to flush rollback event cache: " << status; + } +} + +int wsrep::server_state::on_apply( + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + if (is_toi(ws_meta.flags())) + { + return apply_toi(provider(), high_priority_service, + ws_handle, ws_meta, data); + } + else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags())) + { + // Not implemented yet. + assert(0); + return 0; + } + else + { + return apply_write_set(*this, high_priority_service, + ws_handle, ws_meta, data); + } +} + +void wsrep::server_state::start_streaming_client( + wsrep::client_state* client_state) +{ + wsrep::unique_lock lock(mutex_); + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Start streaming client: " << client_state->id()); + if (streaming_clients_.insert( + std::make_pair(client_state->id(), client_state)).second == false) + { + wsrep::log_warning() << "Failed to insert streaming client " + << client_state->id(); + assert(0); + } +} + +void wsrep::server_state::convert_streaming_client_to_applier( + wsrep::client_state* client_state) +{ + // create streaming_applier beforehand as server_state lock should + // not be held when calling server_service methods + wsrep::high_priority_service* streaming_applier( + server_service_.streaming_applier_service( + client_state->client_service())); + + wsrep::unique_lock lock(mutex_); + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Convert streaming client to applier " + << client_state->id()); + streaming_clients_map::iterator i( + streaming_clients_.find(client_state->id())); + assert(i != streaming_clients_.end()); + if (i == streaming_clients_.end()) + { + wsrep::log_warning() << "Unable to find streaming client " + << client_state->id(); + assert(0); + } + else + { + streaming_clients_.erase(i); + } + + // Convert to applier only if the state is not disconnected. In + // disconnected state the applier map is supposed to be empty + // and it will be reconstructed from fragment storage when + // joining back to cluster. + if (state(lock) != s_disconnected) + { + if (streaming_applier->adopt_transaction(client_state->transaction())) + { + log_adopt_error(client_state->transaction()); + streaming_applier->after_apply(); + server_service_.release_high_priority_service(streaming_applier); + return; + } + if (streaming_appliers_.insert( + std::make_pair( + std::make_pair(client_state->transaction().server_id(), + client_state->transaction().id()), + streaming_applier)).second == false) + { + wsrep::log_warning() << "Could not insert streaming applier " + << id_ + << ", " + << client_state->transaction().id(); + assert(0); + } + } + else + { + server_service_.release_high_priority_service(streaming_applier); + client_state->client_service().store_globals(); + } +} + + +void wsrep::server_state::stop_streaming_client( + wsrep::client_state* client_state) +{ + wsrep::unique_lock lock(mutex_); + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Stop streaming client: " << client_state->id()); + streaming_clients_map::iterator i( + streaming_clients_.find(client_state->id())); + assert(i != streaming_clients_.end()); + if (i == streaming_clients_.end()) + { + wsrep::log_warning() << "Unable to find streaming client " + << client_state->id(); + assert(0); + return; + } + else + { + streaming_clients_.erase(i); + cond_.notify_all(); + } +} + +void wsrep::server_state::start_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id, + wsrep::high_priority_service* sa) +{ + wsrep::unique_lock lock(mutex_); + if (streaming_appliers_.insert( + std::make_pair(std::make_pair(server_id, transaction_id), + sa)).second == false) + { + wsrep::log_error() << "Could not insert streaming applier"; + throw wsrep::fatal_error(); + } +} + +void wsrep::server_state::stop_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id) +{ + wsrep::unique_lock lock(mutex_); + streaming_appliers_map::iterator i( + streaming_appliers_.find(std::make_pair(server_id, transaction_id))); + assert(i != streaming_appliers_.end()); + if (i == streaming_appliers_.end()) + { + wsrep::log_warning() << "Could not find streaming applier for " + << server_id << ":" << transaction_id; + } + else + { + streaming_appliers_.erase(i); + cond_.notify_all(); + } +} + +wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id) const +{ + wsrep::unique_lock lock(mutex_); + streaming_appliers_map::const_iterator i( + streaming_appliers_.find(std::make_pair(server_id, transaction_id))); + return (i == streaming_appliers_.end() ? 0 : i->second); +} + +wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( + const wsrep::xid& xid) const +{ + wsrep::unique_lock lock(mutex_); + streaming_appliers_map::const_iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* sa(i->second); + if (sa->transaction().xid() == xid) + { + return sa; + } + i++; + } + return NULL; +} + +////////////////////////////////////////////////////////////////////////////// +// Private // +////////////////////////////////////////////////////////////////////////////// + +int wsrep::server_state::desync(wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + ++desync_count_; + lock.unlock(); + int ret(provider().desync()); + lock.lock(); + if (ret) + { + --desync_count_; + } + return ret; +} + +void wsrep::server_state::resync(wsrep::unique_lock& + lock WSREP_UNUSED) +{ + assert(lock.owns_lock()); + assert(desync_count_ > 0); + if (desync_count_ > 0) + { + --desync_count_; + if (provider().resync()) + { + throw wsrep::runtime_error("Failed to resync"); + } + } + else + { + wsrep::log_warning() << "desync_count " << desync_count_ + << " on resync"; + } +} + + +void wsrep::server_state::state( + wsrep::unique_lock& lock WSREP_UNUSED, + enum wsrep::server_state::state state) +{ + assert(lock.owns_lock()); + static const char allowed[n_states_][n_states_] = + { + /* dis, ing, ized, cted, jer, jed, dor, sed, ding to/from */ + { 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */ + { 1, 0, 1, 0, 0, 0, 0, 0, 1}, /* ing */ + { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* ized */ + { 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */ + { 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */ + { 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */ + { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */ + { 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */ + { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */ + }; + + if (allowed[state_][state] == false) + { + std::ostringstream os; + os << "server: " << name_ << " unallowed state transition: " + << wsrep::to_string(state_) << " -> " << wsrep::to_string(state); + wsrep::log_warning() << os.str() << "\n"; + assert(0); + } + + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "server " << name_ << " state change: " + << to_c_string(state_) << " -> " + << to_c_string(state)); + state_hist_.push_back(state_); + server_service_.log_state_change(state_, state); + state_ = state; + cond_.notify_all(); + while (state_waiters_[state_]) + { + cond_.wait(lock); + } +} + +void wsrep::server_state::wait_until_state( + wsrep::unique_lock& lock, + enum wsrep::server_state::state state) const +{ + ++state_waiters_[state]; + while (state_ != state) + { + cond_.wait(lock); + // If the waiter waits for any other state than disconnecting + // or disconnected and the state has been changed to disconnecting, + // this usually means that some error was encountered + if (state != s_disconnecting && state != s_disconnected + && state_ == s_disconnecting) + { + throw wsrep::runtime_error("State wait was interrupted"); + } + } + --state_waiters_[state]; + cond_.notify_all(); +} + +void wsrep::server_state::interrupt_state_waiters( + wsrep::unique_lock& lock WSREP_UNUSED) +{ + assert(lock.owns_lock()); + cond_.notify_all(); +} + +template +void wsrep::server_state::recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock& lock, C& c) +{ + assert(lock.owns_lock()); + if (streaming_appliers_recovered_ == false) + { + lock.unlock(); + server_service_.recover_streaming_appliers(c); + lock.lock(); + } + streaming_appliers_recovered_ = true; +} + +class transaction_state_cmp +{ +public: + transaction_state_cmp(const enum wsrep::transaction::state s) + : state_(s) { } + bool operator()(const std::pair& vt) const + { + return vt.second->transaction().state() == state_; + } +private: + enum wsrep::transaction::state state_; +}; + +void wsrep::server_state::close_orphaned_sr_transactions( + wsrep::unique_lock& lock, + wsrep::high_priority_service& high_priority_service) +{ + assert(lock.owns_lock()); + + // When the originator of an SR transaction leaves the primary + // component of the cluster, that SR must be rolled back. When two + // consecutive primary views have the same membership, the system + // may have been in a state with no primary components. + // Example with 2 node cluster: + // - (1,2 primary) + // - (1 non-primary) and (2 non-primary) + // - (1,2 primary) + // We need to rollback SRs owned by both 1 and 2. + // Notice that since the introduction of rollback_event_queue_, + // checking for equal consecutive views is no longer needed. + // However, we must keep it here for the time being, for backwards + // compatibility. + const bool equal_consecutive_views = + current_view_.equal_membership(previous_primary_view_); + + if (current_view_.own_index() == -1 || equal_consecutive_views) + { + streaming_clients_map::iterator i; + transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared); + while ((i = std::find_if_not(streaming_clients_.begin(), + streaming_clients_.end(), + prepared_state_cmp)) + != streaming_clients_.end()) + { + wsrep::client_id client_id(i->first); + wsrep::transaction_id transaction_id(i->second->transaction().id()); + // It is safe to unlock the server state temporarily here. + // The processing happens inside view handler which is + // protected by the provider commit ordering critical + // section. The lock must be unlocked temporarily to + // allow converting the current client to streaming + // applier in transaction::streaming_rollback(). + // The iterator i may be invalidated when the server state + // remains unlocked, so it should not be accessed after + // the bf abort call. + lock.unlock(); + i->second->total_order_bf_abort(current_view_.view_seqno()); + lock.lock(); + streaming_clients_map::const_iterator found_i; + while ((found_i = streaming_clients_.find(client_id)) != + streaming_clients_.end() && + found_i->second->transaction().id() == transaction_id) + { + cond_.wait(lock); + } + } + } + + streaming_appliers_map::iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* streaming_applier(i->second); + + // Rollback SR on equal consecutive primary views or if its + // originator is not in the current view. + // Transactions in prepared state must be committed or + // rolled back explicitly, those are never rolled back here. + if ((streaming_applier->transaction().state() != + wsrep::transaction::s_prepared) && + (equal_consecutive_views || + not current_view_.is_member( + streaming_applier->transaction().server_id()))) + { + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Removing SR fragments for " + << i->first.first + << ", " << i->first.second); + wsrep::id server_id(i->first.first); + wsrep::transaction_id transaction_id(i->first.second); + int adopt_error; + if ((adopt_error = high_priority_service.adopt_transaction( + streaming_applier->transaction()))) + { + log_adopt_error(streaming_applier->transaction()); + } + // Even if the transaction adopt above fails, we roll back + // the transaction. Adopt error will leave stale entries + // in the streaming log which can be removed manually. + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta()); + streaming_applier->after_apply(); + } + + streaming_appliers_.erase(i++); + server_service_.release_high_priority_service(streaming_applier); + high_priority_service.store_globals(); + wsrep::ws_meta ws_meta( + wsrep::gtid(), + wsrep::stid(server_id, transaction_id, wsrep::client_id()), + wsrep::seqno::undefined(), 0); + lock.unlock(); + if (adopt_error == 0) + { + high_priority_service.remove_fragments(ws_meta); + high_priority_service.commit(wsrep::ws_handle(transaction_id, 0), + ws_meta); + } + high_priority_service.after_apply(); + lock.lock(); + } + else + { + ++i; + } + } +} + +void wsrep::server_state::close_transactions_at_disconnect( + wsrep::high_priority_service& high_priority_service) +{ + // Close streaming applier without removing fragments + // from fragment storage. When the server is started again, + // it must be able to recover ongoing streaming transactions. + streaming_appliers_map::iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* streaming_applier(i->second); + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta()); + streaming_applier->after_apply(); + } + streaming_appliers_.erase(i++); + server_service_.release_high_priority_service(streaming_applier); + high_priority_service.store_globals(); + } + streaming_appliers_recovered_ = false; +} + +// +// Rollback event queue +// + +void wsrep::server_state::queue_rollback_event( + const wsrep::transaction_id& id) +{ + wsrep::unique_lock lock(mutex_); +#ifndef NDEBUG + // Make sure we don't have duplicate + // transaction ids in rollback event queue. + // There is no need to do this in release + // build given that caller (streaming_rollback()) + // should avoid duplicates. + for (auto i : rollback_event_queue_) + { + assert(id != i); + } +#endif + rollback_event_queue_.push_back(id); +} + +enum wsrep::provider::status +wsrep::server_state::send_pending_rollback_events( + wsrep::unique_lock& lock WSREP_UNUSED) +{ + assert(lock.owns_lock()); + while (not rollback_event_queue_.empty()) + { + const wsrep::transaction_id& id(rollback_event_queue_.front()); + const enum wsrep::provider::status status(provider().rollback(id)); + if (status) + { + return status; + } + rollback_event_queue_.pop_front(); + } + return wsrep::provider::success; +} + +enum wsrep::provider::status +wsrep::server_state::send_pending_rollback_events() +{ + wsrep::unique_lock lock(mutex_); + return send_pending_rollback_events(lock); +} -- cgit v1.2.3