summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/include/wsrep/server_state.hpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
commit3f619478f796eddbba6e39502fe941b285dd97b1 (patch)
treee2c7b5777f728320e5b5542b6213fd3591ba51e2 /wsrep-lib/include/wsrep/server_state.hpp
parentInitial commit. (diff)
downloadmariadb-upstream.tar.xz
mariadb-upstream.zip
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'wsrep-lib/include/wsrep/server_state.hpp')
-rw-r--r--wsrep-lib/include/wsrep/server_state.hpp745
1 files changed, 745 insertions, 0 deletions
diff --git a/wsrep-lib/include/wsrep/server_state.hpp b/wsrep-lib/include/wsrep/server_state.hpp
new file mode 100644
index 00000000..b280b55d
--- /dev/null
+++ b/wsrep-lib/include/wsrep/server_state.hpp
@@ -0,0 +1,745 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+/** @file server_state.hpp
+ *
+ * Server State Abstraction
+ * ==========================
+ *
+ * This file defines an interface for WSREP Server State.
+ * The Server State will encapsulate server identification,
+ * server state and server capabilities. The class also
+ * defines an interface for manipulating server state, applying
+ * of remote transaction write sets, processing SST requests,
+ * creating local client connections for local storage access
+ * operations.
+ *
+ * Concepts
+ * ========
+ *
+ * State Snapshot Transfer
+ * -----------------------
+ *
+ * Depending on SST type (physical or logical), the server storage
+ * engine initialization must be done before or after SST happens.
+ * In case of physical SST method (typically rsync, filesystem snapshot)
+ * the SST happens before the storage engine is initialized, in case
+ * of logical backup typically after the storage engine initialization.
+ *
+ * Rollback Mode
+ * -------------
+ *
+ * When High Priority Transaction (HPT) write set is applied, it
+ * may be required that the HPT Brute Force Aborts (BFA) locally
+ * executing transaction. As HPT must be able to apply all its
+ * write sets without interruption, the locally executing transaction
+ * must yield immediately, otherwise a transaction processing
+ * may stop or even deadlock. Depending on DBMS implementation,
+ * the local transaction may need to be rolled back immediately
+ * (synchronous mode) or the rollback may happen later on
+ * (asynchronous mode). The Server Context implementation
+ * which derives from Server Context base class must provide
+ * the base class for the rollback mode which server operates on.
+ *
+ * ### Synchronous
+ *
+ * If the DBMS server implementation does not allow asynchronous rollback,
+ * the victim transaction must be rolled back immediately in order to
+ * allow transaction processing to proceed. Depending on DBMS process model,
+ * there may be either background thread which processes the rollback
+ * or the rollback can be done by the HTP applier.
+ *
+ * ### Asynchronous
+ *
+ * In asynchronous mode the BFA victim transaction is just marked
+ * to be aborted or in case of fully optimistic concurrency control,
+ * the conflict is detected at commit.
+ *
+ *
+ * # Return value conventions
+ *
+ * The calls which are proxies to corresponding provider functionality
+ * will return wsrep::provider::status enum as a result. Otherwise
+ * the return value is generally zero on success, non zero on failure.
+ */
+
+#ifndef WSREP_SERVER_STATE_HPP
+#define WSREP_SERVER_STATE_HPP
+
+#include "mutex.hpp"
+#include "condition_variable.hpp"
+#include "id.hpp"
+#include "view.hpp"
+#include "transaction_id.hpp"
+#include "logger.hpp"
+#include "provider.hpp"
+#include "compiler.hpp"
+#include "xid.hpp"
+
+#include <deque>
+#include <vector>
+#include <string>
+#include <map>
+
+/**
+ * Magic string to tell provider to engage into trivial (empty)
+ * state transfer. No data will be passed, but the node shall be
+ * considered joined.
+ */
+#define WSREP_LIB_SST_TRIVIAL "trivial"
+
+namespace wsrep
+{
+ // Forward declarations
+ class ws_handle;
+ class ws_meta;
+ class client_state;
+ class transaction;
+ class const_buffer;
+ class server_service;
+ class client_service;
+ class encryption_service;
+
+ /** @class Server Context
+ *
+ *
+ */
+ class server_state
+ {
+ public:
+ /**
+ * Server state enumeration.
+ *
+ * @todo Fix UML generation
+ *
+ * Server state diagram if initialization happens before SST.
+ *
+ * [*] --> disconnected
+ * disconnected --> initializing
+ * initializing --> initialized
+ * initialized --> connected
+ * connected --> joiner
+ * joiner --> joined
+ * joined --> synced
+ * synced --> donor
+ * donor --> joined
+ *
+ * Server state diagram if SST happens before initialization.
+ *
+ * [*] --> disconnected
+ * disconnected --> connected
+ * connected --> joiner
+ * joiner --> initializing
+ * initializing --> initialized
+ * initialized --> joined
+ * joined --> synced
+ * synced --> donor
+ * donor --> joined
+ */
+ enum state
+ {
+ /** Server is in disconnected state. */
+ s_disconnected,
+ /** Server is initializing */
+ s_initializing,
+ /** Server has been initialized */
+ s_initialized,
+ /** Server is connected to the cluster */
+ s_connected,
+ /** Server is receiving SST */
+ s_joiner,
+ /** Server has received SST successfully but has not synced
+ with rest of the cluster yet. */
+ s_joined,
+ /** Server is donating state snapshot transfer */
+ s_donor,
+ /** Server has synced with the cluster */
+ s_synced,
+ /** Server is disconnecting from group */
+ s_disconnecting
+ };
+
+ static const int n_states_ = s_disconnecting + 1;
+
+ /**
+ * Rollback Mode enumeration
+ */
+ enum rollback_mode
+ {
+ /** Asynchronous rollback mode */
+ rm_async,
+ /** Synchronous rollback mode */
+ rm_sync
+ };
+
+ virtual ~server_state();
+
+ wsrep::encryption_service* encryption_service()
+ { return encryption_service_; }
+
+ wsrep::server_service& server_service() { return server_service_; }
+
+ /**
+ * Return human readable server name.
+ *
+ * @return Human readable server name string.
+ */
+ const std::string& name() const { return name_; }
+
+ /**
+ * Return Server identifier.
+ *
+ * @return Server identifier.
+ */
+ const wsrep::id& id() const { return id_; }
+
+ const std::string& incoming_address() const
+ { return incoming_address_; }
+ /**
+ * Return server group communication address.
+ *
+ * @return Return server group communication address.
+ */
+ const std::string& address() const { return address_; }
+
+ /**
+ * Return working directory
+ *
+ * @return String containing path to working directory.
+ */
+ const std::string& working_dir() const { return working_dir_; }
+
+ /**
+ * Return initial position for server.
+ */
+ const wsrep::gtid& initial_position() const
+ { return initial_position_; }
+ /**
+ * Return maximum protocol version.
+ */
+ int max_protocol_version() const { return max_protocol_version_;}
+
+ /**
+ * Get the rollback mode which server is operating in.
+ *
+ * @return Rollback mode.
+ */
+ enum rollback_mode rollback_mode() const { return rollback_mode_; }
+
+ /**
+ * Registers a streaming client.
+ */
+ void start_streaming_client(wsrep::client_state* client_state);
+
+ void convert_streaming_client_to_applier(
+ wsrep::client_state* client_state);
+ void stop_streaming_client(wsrep::client_state* client_state);
+
+ void start_streaming_applier(
+ const wsrep::id&,
+ const wsrep::transaction_id&,
+ wsrep::high_priority_service*);
+
+ void stop_streaming_applier(
+ const wsrep::id&, const wsrep::transaction_id&);
+
+ /**
+ * Find a streaming applier matching server and transaction ids
+ */
+ wsrep::high_priority_service* find_streaming_applier(
+ const wsrep::id&,
+ const wsrep::transaction_id&) const;
+
+ /**
+ * Find a streaming applier matching xid
+ */
+ wsrep::high_priority_service* find_streaming_applier(
+ const wsrep::xid& xid) const;
+
+ /**
+ * Queue a rollback fragment the transaction with given id
+ */
+ void queue_rollback_event(const wsrep::transaction_id& id);
+
+ /**
+ * Send rollback fragments for previously queued events via
+ * queue_rollback_event()
+ */
+ enum wsrep::provider::status send_pending_rollback_events();
+
+ /**
+ * Load WSRep provider.
+ *
+ * @param provider WSRep provider library to be loaded.
+ * @param provider_options Provider specific options string
+ * to be passed for provider during initialization.
+ * @param services Application defined services passed to
+ * the provider.
+ *
+ * @return Zero on success, non-zero on error.
+ */
+ int load_provider(const std::string& provider,
+ const std::string& provider_options,
+ const wsrep::provider::services& services
+ = wsrep::provider::services());
+
+ void unload_provider();
+
+ bool is_provider_loaded() const { return provider_ != 0; }
+
+ /**
+ * Return reference to provider.
+ *
+ * @return Reference to provider
+ *
+ * @throw wsrep::runtime_error if provider has not been loaded
+ *
+ * @todo This should not be virtual. However, currently there
+ * is no mechanism for tests and integrations to provide
+ * their own provider implementations, so this is kept virtual
+ * for time being.
+ */
+ virtual wsrep::provider& provider() const
+ {
+ if (provider_ == 0)
+ {
+ throw wsrep::runtime_error("provider not loaded");
+ }
+ return *provider_;
+ }
+
+ /**
+ * Initialize connection to cluster.
+ *
+ * @param cluster_name A string containing the name of the cluster
+ * @param cluster_address Cluster address string
+ * @param state_donor String containing a list of desired donors
+ * @param bootstrap Bootstrap option
+ *
+ * @return Zero in case of success, non-zero on error.
+ */
+ int connect(const std::string& cluster_name,
+ const std::string& cluster_address,
+ const std::string& state_donor,
+ bool bootstrap);
+
+ int disconnect();
+
+ /**
+ * A method which will be called when the server
+ * has been joined to the cluster
+ */
+ void on_connect(const wsrep::view& view);
+
+ /**
+ * A method which will be called when a view
+ * notification event has been delivered by the
+ * provider.
+ *
+ * @params view wsrep::view object which holds the new view
+ * information.
+ */
+ void on_view(const wsrep::view& view,
+ wsrep::high_priority_service*);
+
+ /**
+ * A method which will be called when the server
+ * has been synchronized with the cluster.
+ *
+ * This will have a side effect of changing the Server Context
+ * state to s_synced.
+ */
+ void on_sync();
+
+ /**
+ * Wait until server reaches given state.
+ *
+ * @return Zero in case of success, non-zero if the
+ * wait was interrupted.
+ */
+ int wait_until_state(enum state state) const;
+
+ /**
+ * Return GTID at the position when server connected to
+ * the cluster.
+ */
+ wsrep::gtid connected_gtid() const { return connected_gtid_; }
+
+ /**
+ * Return current view
+ */
+ const wsrep::view& current_view() const { return current_view_; }
+
+ /**
+ * Wait until all the write sets up to given GTID have been
+ * committed.
+ *
+ * @return Zero on success, non-zero on failure.
+ */
+ enum wsrep::provider::status
+ wait_for_gtid(const wsrep::gtid&, int timeout) const;
+
+ /**
+ * Set encryption key
+ *
+ * @param key Encryption key
+ *
+ * @return Zero on success, non-zero on failure.
+ */
+ int set_encryption_key(std::vector<unsigned char>& key);
+
+ /**
+ * Return encryption key.
+ */
+ const std::vector<unsigned char>& get_encryption_key() const
+ { return encryption_key_; }
+
+ /**
+ * Perform a causal read in the cluster. After the call returns,
+ * all the causally preceding write sets have been committed
+ * or the error is returned.
+ *
+ * This operation may require communication with other processes
+ * in the DBMS cluster, so it may be relatively heavy operation.
+ * Method wait_for_gtid() should be used whenever possible.
+ *
+ * @param timeout Timeout in seconds
+ *
+ * @return Pair of GTID and result status from provider.
+ */
+
+ std::pair<wsrep::gtid, enum wsrep::provider::status>
+ causal_read(int timeout) const;
+
+ /**
+ * Desynchronize the server.
+ *
+ * If the server state is synced, this call will desynchronize
+ * the server from the cluster.
+ */
+ int desync()
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return desync(lock);
+ }
+
+ /**
+ * Resynchronize the server.
+ */
+ void resync()
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ resync(lock);
+ }
+
+ wsrep::seqno pause();
+
+ wsrep::seqno pause_seqno() const { return pause_seqno_; }
+
+ void resume();
+
+ /**
+ * Desync and pause the provider on one go. Will return
+ * pause seqno if successful. In case of failure,
+ * undefined seqno will be returned.
+ */
+ wsrep::seqno desync_and_pause();
+
+ /**
+ * Resume and resync the provider on one go. Prior this
+ * call the provider must have been both desynced and paused,
+ * by either desync_and_pause() or separate calls to desync()
+ * and pause().
+ */
+ void resume_and_resync();
+
+ /**
+ * True if server has issued and active desync and pause in one go,
+ * false otherwise.
+ */
+ bool desynced_on_pause() const { return desynced_on_pause_; }
+
+ /**
+ * Prepares server state for SST.
+ *
+ * @return String containing a SST request
+ */
+ std::string prepare_for_sst();
+
+ /**
+ * Start a state snapshot transfer.
+ *
+ * @param sst_requets SST request string
+ * @param gtid Current GTID
+ * @param bypass Bypass flag
+ *
+ * @return Zero in case of success, non-zero otherwise
+ */
+ int start_sst(const std::string& sst_request,
+ const wsrep::gtid& gtid,
+ bool bypass);
+
+ /**
+ *
+ */
+ void sst_sent(const wsrep::gtid& gtid, int error);
+
+ /**
+ * This method must be called by the joiner after the SST
+ * transfer has been received. If the DBMS state has not been
+ * initialized, the call will shift the state to initializing
+ * and will wait until the initialization is complete.
+ *
+ * @param client_service
+ * @param error code of the SST operation
+ *
+ * @return Zero in case of success, non-zero on error.
+ */
+ int sst_received(wsrep::client_service& cs, int error);
+
+ /**
+ * This method must be called after the server initialization
+ * has been completed. The call has a side effect of changing
+ * the Server Context state to s_initialized.
+ */
+ void initialized();
+
+ /**
+ * Return true if the server has been initialized.
+ */
+ bool is_initialized() const
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return init_initialized_;
+ }
+
+ /**
+ * This method will be called by the provider when
+ * a remote write set is being applied. It is the responsibility
+ * of the caller to set up transaction context and data properly.
+ *
+ * @todo Make this private, allow calls for provider implementations
+ * only.
+ * @param high_priority_service High priority applier service.
+ * @param transaction Transaction context.
+ * @param data Write set data
+ *
+ * @return Zero on success, non-zero on failure.
+ */
+ int on_apply(wsrep::high_priority_service& high_priority_service,
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::const_buffer& data);
+
+ enum state state() const
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return state(lock);
+ }
+
+ enum state state(wsrep::unique_lock<wsrep::mutex>& lock) const;
+
+ /**
+ * Get provider status variables.
+ */
+ std::vector<wsrep::provider::status_variable> status() const;
+
+ /**
+ * Set server wide wsrep debug logging level.
+ *
+ * Log levels are
+ * - 0 - No debug logging.
+ * - 1..n - Debug logging with increasing verbosity.
+ */
+ void debug_log_level(int level)
+ {
+ wsrep::log::debug_log_level(level);
+ }
+
+ wsrep::mutex& mutex() { return mutex_; }
+
+ protected:
+ /** Server state constructor
+ *
+ * @param mutex Mutex provided by the DBMS implementation.
+ * @param name Human Readable Server Name.
+ * @param id Server Identifier String, UUID or some unique
+ * identifier.
+ * @param address Server address in form of IPv4 address, IPv6 address
+ * or hostname.
+ * @param working_dir Working directory for replication specific
+ * data files.
+ * @param rollback_mode Rollback mode which server operates on.
+ */
+ server_state(wsrep::mutex& mutex,
+ wsrep::condition_variable& cond,
+ wsrep::server_service& server_service,
+ wsrep::encryption_service* encryption_service,
+ const std::string& name,
+ const std::string& incoming_address,
+ const std::string& address,
+ const std::string& working_dir,
+ const wsrep::gtid& initial_position,
+ int max_protocol_version,
+ enum rollback_mode rollback_mode)
+ : mutex_(mutex)
+ , cond_(cond)
+ , server_service_(server_service)
+ , encryption_service_(encryption_service)
+ , state_(s_disconnected)
+ , state_hist_()
+ , state_waiters_(n_states_)
+ , bootstrap_()
+ , initial_position_(initial_position)
+ , init_initialized_()
+ , init_synced_()
+ , sst_gtid_()
+ , desync_count_()
+ , desynced_on_pause_()
+ , pause_count_()
+ , pause_seqno_()
+ , streaming_clients_()
+ , streaming_appliers_()
+ , streaming_appliers_recovered_()
+ , provider_()
+ , name_(name)
+ , id_(wsrep::id::undefined())
+ , incoming_address_(incoming_address)
+ , address_(address)
+ , working_dir_(working_dir)
+ , encryption_key_()
+ , max_protocol_version_(max_protocol_version)
+ , rollback_mode_(rollback_mode)
+ , connected_gtid_()
+ , previous_primary_view_()
+ , current_view_()
+ , rollback_event_queue_()
+ { }
+
+ private:
+
+ server_state(const server_state&);
+ server_state& operator=(const server_state&);
+
+ int desync(wsrep::unique_lock<wsrep::mutex>&);
+ void resync(wsrep::unique_lock<wsrep::mutex>&);
+ void state(wsrep::unique_lock<wsrep::mutex>&, enum state);
+ void wait_until_state(wsrep::unique_lock<wsrep::mutex>&,
+ enum state) const;
+ // Interrupt all threads which are waiting for state
+ void interrupt_state_waiters(wsrep::unique_lock<wsrep::mutex>&);
+
+ // Recover streaming appliers if not already recoverd
+ template <class C>
+ void recover_streaming_appliers_if_not_recovered(
+ wsrep::unique_lock<wsrep::mutex>&, C&);
+
+ // Close SR transcations whose origin is outside of current
+ // cluster view.
+ void close_orphaned_sr_transactions(
+ wsrep::unique_lock<wsrep::mutex>&,
+ wsrep::high_priority_service&);
+
+ // Close transactions when handling disconnect from the group.
+ void close_transactions_at_disconnect(wsrep::high_priority_service&);
+
+ // Handle primary view
+ void on_primary_view(const wsrep::view&,
+ wsrep::high_priority_service*);
+ // Handle non-primary view
+ void on_non_primary_view(const wsrep::view&,
+ wsrep::high_priority_service*);
+ // Common actions on final view
+ void go_final(wsrep::unique_lock<wsrep::mutex>&,
+ const wsrep::view&, wsrep::high_priority_service*);
+
+ // Send rollback fragments for all transactions in
+ // rollback_event_queue_
+ enum wsrep::provider::status send_pending_rollback_events(
+ wsrep::unique_lock<wsrep::mutex>& lock);
+
+ // Handle returning from donor state.
+ void return_from_donor_state(wsrep::unique_lock<wsrep::mutex>& lock);
+
+ wsrep::mutex& mutex_;
+ wsrep::condition_variable& cond_;
+ wsrep::server_service& server_service_;
+ wsrep::encryption_service* encryption_service_;
+ enum state state_;
+ std::vector<enum state> state_hist_;
+ mutable std::vector<int> state_waiters_;
+ bool bootstrap_;
+ const wsrep::gtid initial_position_;
+ bool init_initialized_;
+ bool init_synced_;
+ wsrep::gtid sst_gtid_;
+ size_t desync_count_;
+ // Boolean to denote if desync was succesfull when desyncing
+ // and pausing the provider on one go.
+ bool desynced_on_pause_;
+ size_t pause_count_;
+ wsrep::seqno pause_seqno_;
+ typedef std::map<wsrep::client_id, wsrep::client_state*>
+ streaming_clients_map;
+ streaming_clients_map streaming_clients_;
+ typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>,
+ wsrep::high_priority_service*> streaming_appliers_map;
+ streaming_appliers_map streaming_appliers_;
+ bool streaming_appliers_recovered_;
+ wsrep::provider* provider_;
+ std::string name_;
+ wsrep::id id_;
+ std::string incoming_address_;
+ std::string address_;
+ std::string working_dir_;
+ std::vector<unsigned char> encryption_key_;
+ int max_protocol_version_;
+ enum rollback_mode rollback_mode_;
+ wsrep::gtid connected_gtid_;
+ wsrep::view previous_primary_view_;
+ wsrep::view current_view_;
+ std::deque<wsrep::transaction_id> rollback_event_queue_;
+ };
+
+ static inline const char* to_c_string(
+ enum wsrep::server_state::state state)
+ {
+ switch (state)
+ {
+ case wsrep::server_state::s_disconnected: return "disconnected";
+ case wsrep::server_state::s_initializing: return "initializing";
+ case wsrep::server_state::s_initialized: return "initialized";
+ case wsrep::server_state::s_connected: return "connected";
+ case wsrep::server_state::s_joiner: return "joiner";
+ case wsrep::server_state::s_joined: return "joined";
+ case wsrep::server_state::s_donor: return "donor";
+ case wsrep::server_state::s_synced: return "synced";
+ case wsrep::server_state::s_disconnecting: return "disconnecting";
+ }
+ return "unknown";
+ }
+
+ static inline std::string to_string(enum wsrep::server_state::state state)
+ {
+ return (to_c_string(state));
+ }
+
+}
+
+#endif // WSREP_SERVER_STATE_HPP