diff options
Diffstat (limited to '')
35 files changed, 6059 insertions, 0 deletions
diff --git a/wsrep-lib/include/wsrep/allowlist_service.hpp b/wsrep-lib/include/wsrep/allowlist_service.hpp new file mode 100644 index 00000000..1e619ce8 --- /dev/null +++ b/wsrep-lib/include/wsrep/allowlist_service.hpp @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2021 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + + +/** @file allowlist_service.hpp + * + * Service interface for interacting with DBMS provided + * allowlist callback. + */ + +#ifndef WSREP_ALLOWLIST_SERVICE_HPP +#define WSREP_ALLOWLIST_SERVICE_HPP + +#include "compiler.hpp" +#include "wsrep/buffer.hpp" + +#include <sys/types.h> // ssize_t + +namespace wsrep +{ + class allowlist_service + { + public: + enum allowlist_key + { + /** IP allowlist check */ + allowlist_ip, + /** SSL allowlist check */ + allowlist_ssl + }; + + virtual ~allowlist_service() { } + + /** + * Allowlist callback. + */ + virtual bool allowlist_cb(allowlist_key key, + const wsrep::const_buffer& value) WSREP_NOEXCEPT = 0; + }; +} + +#endif // WSREP_ALLOWLIST_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/atomic.hpp b/wsrep-lib/include/wsrep/atomic.hpp new file mode 100644 index 00000000..6d92c167 --- /dev/null +++ b/wsrep-lib/include/wsrep/atomic.hpp @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_ATOMIC_HPP +#define WSREP_ATOMIC_HPP + +#if defined(__GNUG__) && (__GNUC__ == 4 && __GNUC_MINOR__ == 4) +#include <cstdatomic> +#else +#include <atomic> +#endif // defined(__GNUG__) && (__GNUC__ == 4 && __GNUC_MINOR__ == 4) + +#endif // WSREP_ATOMIC_HPP diff --git a/wsrep-lib/include/wsrep/buffer.hpp b/wsrep-lib/include/wsrep/buffer.hpp new file mode 100644 index 00000000..1c1ac599 --- /dev/null +++ b/wsrep-lib/include/wsrep/buffer.hpp @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2018-2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_BUFFER_HPP +#define WSREP_BUFFER_HPP + +#include <cstddef> +#include <vector> + +namespace wsrep +{ + class const_buffer + { + public: + const_buffer() + : ptr_() + , size_() + { } + + const_buffer(const void* ptr, size_t size) + : ptr_(ptr) + , size_(size) + { } + + const_buffer(const const_buffer& b) + : ptr_(b.ptr()) + , size_(b.size()) + { } + + const void* ptr() const { return ptr_; } + const char* data() const { return static_cast<const char*>(ptr_); } + size_t size() const { return size_; } + + const_buffer& operator=(const const_buffer& b) + { + ptr_ = b.ptr(); + size_ = b.size(); + return *this; + } + private: + const void* ptr_; + size_t size_; + }; + + + class mutable_buffer + { + public: + mutable_buffer() + : buffer_() + { } + + mutable_buffer(const mutable_buffer& b) + : buffer_(b.buffer_) + { } + + void resize(size_t s) { buffer_.resize(s); } + + void clear() + { + // using swap to ensure deallocation + std::vector<char>().swap(buffer_); + } + + void push_back(const char* begin, const char* end) + { + buffer_.insert(buffer_.end(), begin, end); + } + + template <class C> void push_back(const C& c) + { + std::copy(c.begin(), c.end(), std::back_inserter(buffer_)); + } + + size_t size() const { return buffer_.size(); } + + /** + * Return pointer to underlying data array. The returned pointer + * may or may not be null in case of empty buffer, it is up to + * user to check the size of the array before dereferencing the + * pointer. + * + * @return Pointer to underlying data array. + */ + char* data() { return buffer_.data(); } + + /** + * Return const pointer to underlying data array. The returned pointer + * may or may not be null in case of empty buffer, it is up to + * user to check the size of the array before dereferencing the + * pointer. + * + * @return Const pointer to underlying data array. + */ + const char* data() const { return buffer_.data(); } + + mutable_buffer& operator= (const mutable_buffer& other) + { + buffer_ = other.buffer_; + return *this; + } + + bool operator==(const mutable_buffer& other) const + { + return buffer_ == other.buffer_; + } + private: + std::vector<char> buffer_; + }; +} + +#endif // WSREP_BUFFER_HPP diff --git a/wsrep-lib/include/wsrep/chrono.hpp b/wsrep-lib/include/wsrep/chrono.hpp new file mode 100644 index 00000000..14961d77 --- /dev/null +++ b/wsrep-lib/include/wsrep/chrono.hpp @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file chrono.hpp + * + * Type definitions to work around GCC 4.4 incompatibilities with + * C++11 chrono. + */ + +#ifndef WSREP_CHRONO_HPP +#define WSREP_CHRONO_HPP + +#include <chrono> + +namespace wsrep +{ + /* wsrep::clock - clock type compatible with std::chrono::steady_clock. */ +#if defined(__GNUG__) && (__GNUC__ == 4 && __GNUC_MINOR__ == 4) + typedef std::chrono::monotonic_clock clock; +#else + using clock = std::chrono::steady_clock; +#endif // defined(__GNUG__) && (__GNUC__ == 4 && __GNUC_MINOR__ == 4) + +} + +#endif // WSREP_CHRONO_HPP diff --git a/wsrep-lib/include/wsrep/client_id.hpp b/wsrep-lib/include/wsrep/client_id.hpp new file mode 100644 index 00000000..f7597c88 --- /dev/null +++ b/wsrep-lib/include/wsrep/client_id.hpp @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_CLIENT_ID_HPP +#define WSREP_CLIENT_ID_HPP + +#include <ostream> +#include <limits> + +namespace wsrep +{ + class client_id + { + public: + typedef unsigned long long type; + client_id() + : id_(std::numeric_limits<type>::max()) + { } + template <typename I> + explicit client_id(I id) + : id_(static_cast<type>(id)) + { } + type get() const { return id_; } + static type undefined() { return std::numeric_limits<type>::max(); } + bool operator<(const client_id& other) const + { + return (id_ < other.id_); + } + bool operator==(const client_id& other) const + { + return (id_ == other.id_); + } + private: + type id_; + }; + static inline std::ostream& operator<<( + std::ostream& os, const wsrep::client_id& client_id) + { + return (os << client_id.get()); + } +} + + +#endif // WSREP_CLIENT_ID_HPP diff --git a/wsrep-lib/include/wsrep/client_service.hpp b/wsrep-lib/include/wsrep/client_service.hpp new file mode 100644 index 00000000..d47396df --- /dev/null +++ b/wsrep-lib/include/wsrep/client_service.hpp @@ -0,0 +1,223 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file client_service.hpp + * + * This file will define a `callback` abstract interface for a + * DBMS client session service. The interface will define methods + * which will be called by the wsrep-lib. + */ + +#ifndef WSREP_CLIENT_SERVICE_HPP +#define WSREP_CLIENT_SERVICE_HPP + +#include "buffer.hpp" +#include "provider.hpp" +#include "mutex.hpp" +#include "lock.hpp" + +namespace wsrep +{ + class client_service + { + public: + client_service() { } + virtual ~client_service() { } + + /** + * Return true if the current transaction has been interrupted + * by the DBMS. The lock which is passed to interrupted call + * will always have underlying mutex locked. + * + * @param lock Lock object grabbed by the client_state + */ + virtual bool interrupted(wsrep::unique_lock<wsrep::mutex>& lock) const = 0; + + /** + * Reset possible global or thread local parameters associated + * to the thread. + */ + virtual void reset_globals() = 0; + + /** + * Store possible global or thread local parameters associated + * to the thread. + */ + virtual void store_globals() = 0; + + /** + * Set up a data for replication. + */ + virtual int prepare_data_for_replication() = 0; + + /** + * Clean up after transcation has been terminated. + */ + virtual void cleanup_transaction() = 0; + + // + // Streaming + // + /** + * Return true if current statement is allowed for streaming, + * otherwise false. + */ + virtual bool statement_allowed_for_streaming() const = 0; + + /** + * Return the total number of bytes generated by the transaction + * context. + */ + virtual size_t bytes_generated() const = 0; + + /** + * Prepare a buffer containing data for the next fragment to replicate. + * The caller may set log_position to record the database specific + * position corresponding to changes contained in the buffer. + * When the call returns, the log_position will be available to read + * from streaming_context::log_position(). + * + * @return Zero in case of success, non-zero on failure. + * If there is no data to replicate, the method shall return + * zero and leave the buffer empty. + */ + virtual int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, + size_t& log_position) = 0; + + /** + * Remove fragments from the storage within current transaction. + * Fragment removal will be committed once the current transaction + * commits. + * + * @return Zero in case of success, non-zero on failure. + */ + virtual int remove_fragments() = 0; + + // + // Rollback + // + /** + * Perform brute force rollback. + * + * This method may be called from two contexts, either from + * client state methods when the BF abort condition is detected, + * or from the background rollbacker thread. The task for this + * method is to release all reasources held by the client + * after BF abort so that the high priority thread can continue + * applying. + */ + virtual int bf_rollback() = 0; + + // + // Interface to global server state + // + /** + * Forcefully shut down the DBMS process or replication system. + * This may be called in situations where + * the process may encounter a situation where data integrity + * may not be guaranteed or other unrecoverable condition is + * encontered. + */ + virtual void emergency_shutdown() = 0; + + // Replaying + /** + * Notify that the client will replay. + * + * @todo This should not be visible to DBMS level, should be + * handled internally by wsrep-lib. + */ + virtual void will_replay() = 0; + + /** + * Signal that replay is done. + */ + virtual void signal_replayed() = 0; + + /** + * Replay the current transaction. The implementation must put + * the caller Client Context into applying mode and call + * client_state::replay(). + * + * @todo This should not be visible to DBMS level, should be + * handled internally by wsrep-lib. + */ + virtual enum wsrep::provider::status replay() = 0; + + /** + * Replay the current transaction. This is used for replaying + * prepared XA transactions, which are BF aborted but not + * while orderding commit / rollback. + */ + virtual enum wsrep::provider::status replay_unordered() = 0; + + /** + * Wait until all replaying transactions have been finished + * replaying. + * + * @todo This should not be visible to DBMS level, should be + * handled internally by wsrep-lib. + */ + virtual void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) = 0; + + // + // XA + // + /** + * Send a commit by xid + */ + virtual enum wsrep::provider::status commit_by_xid() = 0; + + /** + * Returns true if the client has an ongoing XA transaction. + * This method is used to determine when to cleanup the + * corresponding wsrep-lib transaction object. + * This method should return false when the XA transaction + * is over, and the wsrep-lib transaction object can be + * cleaned up. + */ + virtual bool is_explicit_xa() = 0; + + /** + * Returns true if the currently executing command is + * a rollback for XA. This is used to avoid setting a + * a deadlock error rollback as it may be unexpected + * by the DBMS. + */ + virtual bool is_xa_rollback() = 0; + + // + // Debug interface + // + /** + * Enter debug sync point. + * + * @params sync_point Name of the debug sync point. + */ + virtual void debug_sync(const char* sync_point) = 0; + + /** + * Forcefully kill the process if the crash_point has + * been enabled. + */ + virtual void debug_crash(const char* crash_point) = 0; + }; +} + +#endif // WSREP_CLIENT_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/client_state.hpp b/wsrep-lib/include/wsrep/client_state.hpp new file mode 100644 index 00000000..138bf5f0 --- /dev/null +++ b/wsrep-lib/include/wsrep/client_state.hpp @@ -0,0 +1,1209 @@ +/* + * Copyright (C) 2018-2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file client_state.hpp + * + * + * Return value conventions: + * + * The calls which may alter either client_state or associated + * transaction state will generally return zero on success and + * non-zero on failure. More detailed error information is stored + * into client state and persisted there until explicitly cleared. + */ + +#ifndef WSREP_CLIENT_STATE_HPP +#define WSREP_CLIENT_STATE_HPP + +#include "server_state.hpp" +#include "server_service.hpp" +#include "provider.hpp" +#include "transaction.hpp" +#include "client_id.hpp" +#include "client_service.hpp" +#include "mutex.hpp" +#include "lock.hpp" +#include "buffer.hpp" +#include "thread.hpp" +#include "xid.hpp" +#include "chrono.hpp" + +namespace wsrep +{ + class server_state; + class provider; + + enum client_error + { + e_success, + e_error_during_commit, + e_deadlock_error, + e_interrupted_error, + e_size_exceeded_error, + e_append_fragment_error, + e_not_supported_error, + e_timeout_error + }; + + static inline const char* to_c_string(enum client_error error) + { + switch (error) + { + case e_success: return "success"; + case e_error_during_commit: return "commit_error"; + case e_deadlock_error: return "deadlock_error"; + case e_interrupted_error: return "interrupted_error"; + case e_size_exceeded_error: return "size_exceeded_error"; + case e_append_fragment_error: return "append_fragment_error"; + case e_not_supported_error: return "not_supported_error"; + case e_timeout_error: return "timeout_error"; + } + return "unknown"; + } + + static inline std::string to_string(enum client_error error) + { + return to_c_string(error); + } + /** + * Client State + */ + class client_state + { + public: + /** + * Client mode enumeration. + */ + enum mode + { + /** undefined mode */ + m_undefined, + /** Locally operating client session. */ + m_local, + /** High priority mode */ + m_high_priority, + /** Client is in total order isolation mode */ + m_toi, + /** Client is executing rolling schema upgrade */ + m_rsu, + /** Client is executing NBO */ + m_nbo + }; + + static const int n_modes_ = m_nbo + 1; + /** + * Client state enumeration. + * + */ + enum state + { + /** + * Client session has not been initialized yet. + */ + s_none, + /** + * Client is idle, the control is in the application which + * uses the DBMS system. + */ + s_idle, + /** + * The control of the client processing is inside the DBMS + * system. + */ + s_exec, + /** + * Client handler is sending result to client. + */ + s_result, + /** + * The client session is terminating. + */ + s_quitting + }; + + static const int state_max_ = s_quitting + 1; + + /** + * Aqcuire ownership on the thread. + * + * This method should be called every time the thread + * operating the client state changes. This method is called + * implicitly from before_command() and + * wait_rollback_complete_and_acquire_ownership(). + */ + void acquire_ownership() + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + do_acquire_ownership(lock); + } + + /** + * @deprecated Use acquire_ownership() instead. + */ + void store_globals() + { + acquire_ownership(); + } + + /** + * Destructor. + */ + virtual ~client_state() + { + assert(transaction_.active() == false); + } + + /** @name Client session handling */ + /** @{ */ + /** + * This method should be called when opening the client session. + * + * Initializes client id and changes the state to s_idle. + */ + void open(wsrep::client_id); + + /** + * This method should be called before closing the client session. + * + * The state is changed to s_quitting and any open transactions + * are rolled back. + */ + void close(); + + /** + * This method should be called after closing the client session + * to clean up. + * + * The state is changed to s_none. + */ + void cleanup(); + + /** + * Overload of cleanup() method which takes lock as argument. + * This method does not release the lock during execution, but + * the lock is needed for debug build sanity checks. + */ + void cleanup(wsrep::unique_lock<wsrep::mutex>& lock); + /** @} */ + + /** @name Client command handling */ + /** @{ */ + /** + * This mehod should be called before the processing of command + * received from DBMS client starts. + * + * This method will wait until the possible synchronous + * rollback for associated transaction has finished unless + * wait_rollback_complete_and_acquire_ownership() has been + * called before. + * + * The method has a side effect of changing the client + * context state to executing. + * + * The value set by keep_command_error has an effect on + * how before_command() behaves when it is entered after + * background rollback has been processed: + * + * - If keep_command_error is set true, the current error + * is set and success will be returned. + * - If keep_command_error is set false, the transaction is + * cleaned up and the return value will be non-zero to + * indicate error. + * + * @param keep_command_error Make client state to preserve error + * state in command hooks. + * This is needed if a current command is not supposed to + * return an error status to the client and the protocol must + * advance until the next client command to return error status. + * + * @return Zero in case of success, non-zero in case of the + * associated transaction was BF aborted. + */ + int before_command(bool keep_command_error); + + int before_command() + { + return before_command(false); + } + + /** + * This method should be called before returning + * a result to DBMS client. + * + * The method will check if the transaction associated to + * the connection has been aborted. Rollback is performed + * if needed. After the call, current_error() will return an error + * code associated to the client state. If the error code is + * not success, the transaction associated to the client state + * has been aborted and rolled back. + */ + void after_command_before_result(); + + /** + * Method which should be called after returning the + * control back to DBMS client.. + * + * The method will do the check if the transaction associated + * to the connection has been aborted. If so, rollback is + * performed and the transaction is left to aborted state. + * The next call to before_command() will return an error and + * the error state can be examined after after_command_before_resul() + * is called. + * + * This method has a side effect of changing state to + * idle. + */ + void after_command_after_result(); + /** @} */ + + /** @name Statement level operations */ + /** @{ */ + /** + * Before statement execution operations. + * + * Check if server is synced and if dirty reads are allowed. + * + * @return Zero in case of success, non-zero if the statement + * is not allowed to be executed due to read or write + * isolation requirements. + */ + int before_statement(); + + /** + * After statement execution operations. + * + * * Check for must_replay state + * * Do rollback if requested + */ + int after_statement(); + /** @} */ + + /** + * Perform cleanup after applying a transaction. + * + * @param err Applying error (empty for no error) + */ + void after_applying() + { + assert(mode_ == m_high_priority); + transaction_.after_applying(); + } + + /** @name Replication interface */ + /** @{ */ + /** + * Start a new transaction with a transaction id. + * + * @todo This method should + * - Register the transaction on server level for bookkeeping + * - Isolation levels? Or part of the transaction? + */ + int start_transaction(const wsrep::transaction_id& id) + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + assert(state_ == s_exec); + return transaction_.start_transaction(id); + } + + /** + * Establish read view ID of the transaction. + * + * This method should be preferably called immediately before any + * first read or write operation in the transaction is performed, + * Then it can be called with default NULL parameter and will use + * the current last committed GTID. + * Alternatively it can be called at any time before commit with an + * explicit GTID that corresponds to transaction read view. + * + * @param gtid optional explicit GTID of the transaction read view. + */ + int assign_read_view(const wsrep::gtid* const gtid = NULL) + { + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.assign_read_view(gtid); + } + + /** + * Append a key into transaction write set. + * + * @param key Key to be appended + * + * @return Zero on success, non-zero on failure. + */ + int append_key(const wsrep::key& key) + { + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.append_key(key); + } + + /** + * Append keys in key_array into transaction write set. + * + * @param keys Array of keys to be appended + * + * @return Zero in case of success, non-zero on failure. + */ + int append_keys(const wsrep::key_array& keys) + { + assert(mode_ == m_local || mode_ == m_toi); + assert(state_ == s_exec); + for (auto i(keys.begin()); i != keys.end(); ++i) + { + if (transaction_.append_key(*i)) + { + return 1; + } + } + return 0; + } + + /** + * Append data into transaction write set. + */ + int append_data(const wsrep::const_buffer& data) + { + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.append_data(data); + } + + /** @} */ + + /** @name Streaming replication interface */ + /** @{ */ + /** + * This method should be called after every row operation. + */ + int after_row() + { + assert(mode_ == m_local); + assert(state_ == s_exec); + return (transaction_.streaming_context().fragment_size() ? + transaction_.after_row() : 0); + } + + /** + * Set streaming parameters. + * + * @param fragment_unit Desired fragment unit + * @param fragment_size Desired fragment size + */ + void streaming_params(enum wsrep::streaming_context::fragment_unit + fragment_unit, + size_t fragment_size); + + /** + * Enable streaming replication. + * + * Currently it is not possible to change the fragment unit + * for active streaming transaction. + * + * @param fragment_unit Desired fragment unit + * @param fragment_size Desired fragment size + * + * @return Zero on success, non-zero if the streaming cannot be + * enabled. + */ + int enable_streaming( + enum wsrep::streaming_context::fragment_unit + fragment_unit, + size_t fragment_size); + + /** + * Disable streaming for context. + */ + void disable_streaming(); + + void fragment_applied(wsrep::seqno seqno) + { + assert(mode_ == m_high_priority); + transaction_.fragment_applied(seqno); + } + + /** + * Prepare write set meta data for ordering. + * This method should be called before ordered commit or + * rollback if the commit time meta data was not known + * at the time of the start of the transaction. + * This mostly applies to streaming replication. + * + * @param ws_handle Write set handle + * @param ws_meta Write set meta data + * @param is_commit Boolean to denote whether the operation + * is commit or rollback. + */ + int prepare_for_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) + { + assert(state_ == s_exec); + return transaction_.prepare_for_ordering( + ws_handle, ws_meta, is_commit); + } + /** @} */ + + /** @name Applying interface */ + /** @{ */ + int start_transaction(const wsrep::ws_handle& wsh, + const wsrep::ws_meta& meta) + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(mode_ == m_high_priority); + return transaction_.start_transaction(wsh, meta); + } + + int next_fragment(const wsrep::ws_meta& meta) + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + assert(mode_ == m_high_priority); + return transaction_.next_fragment(meta); + } + + /** @name Commit ordering interface */ + /** @{ */ + int before_prepare() + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec); + return transaction_.before_prepare(lock); + } + + int after_prepare() + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec); + return transaction_.after_prepare(lock); + } + + int before_commit() + { + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec || mode_ == m_local); + return transaction_.before_commit(); + } + + int ordered_commit() + { + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec || mode_ == m_local); + return transaction_.ordered_commit(); + } + + int after_commit() + { + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec || mode_ == m_local); + return transaction_.after_commit(); + } + /** @} */ + int before_rollback() + { + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_idle || + state_ == s_exec || + state_ == s_result || + state_ == s_quitting); + return transaction_.before_rollback(); + } + + int after_rollback() + { + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_idle || + state_ == s_exec || + state_ == s_result || + state_ == s_quitting); + return transaction_.after_rollback(); + } + + /** + * This method should be called by the background rollbacker + * thread after the rollback is complete. This will allow + * the client to proceed through before_command() and + * wait_rollback_complete_and_acquire_ownership(). + */ + void sync_rollback_complete(); + + /** + * Wait for background rollback to complete. This method can + * be called before before_command() to verify that the + * background rollback has been finished. After the call returns, + * it is guaranteed that BF abort does not launch background + * rollback process before after_command_after_result() is called. + * This method is idempotent, it can be called many times + * by the same thread before before_command() is called. + */ + void wait_rollback_complete_and_acquire_ownership(); + /** @} */ + + // + // XA + // + /** + * Assign transaction external id. + * + * Other than storing the xid, the transaction is marked as XA. + * This should be called when XA transaction is started. + * + * @param xid transaction id + */ + void assign_xid(const wsrep::xid& xid) + { + transaction_.assign_xid(xid); + } + + /** + * Restores the client's transaction to prepared state + * + * The purpose of this method is to restore transaction state + * during recovery of a prepared XA transaction. + */ + int restore_xid(const wsrep::xid& xid) + { + return transaction_.restore_to_prepared_state(xid); + } + + /** + * Commit transaction with the given xid + * + * Sends a commit fragment to terminate the transaction with + * the given xid. For the fragment to be sent, a streaming + * applier for the transaction must exist, and the transaction + * must be in prepared state. + * + * @param xid the xid of the the transaction to commit + * + * @return Zero on success, non-zero on error. In case of error + * the client_state's current_error is set + */ + int commit_by_xid(const wsrep::xid& xid) + { + return transaction_.commit_or_rollback_by_xid(xid, true); + } + + /** + * Rollback transaction with the given xid + * + * Sends a rollback fragment to terminate the transaction with + * the given xid. For the fragment to be sent, a streaming + * applier for the transaction must exist, and the transaction + * must be in prepared state. + * + * @param xid the xid of the the transaction to commit + * + * @return Zero on success, non-zero on error. In case of error + * the client_state's current_error is set + */ + int rollback_by_xid(const wsrep::xid& xid) + { + return transaction_.commit_or_rollback_by_xid(xid, false); + } + + /** + * Detach a prepared XA transaction + * + * This method cleans up a local XA transaction in prepared state + * and converts it to high priority mode. + * This can be used to handle the case where the client of a XA + * transaction disconnects, and the transaction must not rollback. + * After this call, a different client may later attempt to terminate + * the transaction by calling method commit_by_xid() or rollback_by_xid(). + */ + void xa_detach() + { + assert(mode_ == m_local); + assert(state_ == s_none || state_ == s_exec || state_ == s_quitting); + transaction_.xa_detach(); + } + + /** + * Replay a XA transaction + * + * Replay a XA transaction that is in s_idle state. + * This may happen if the transaction is BF aborted + * between prepare and commit. + * Since the victim is idle, this method can be called + * by the BF aborter or the backround rollbacker. + */ + void xa_replay() + { + assert(mode_ == m_local); + assert(state_ == s_idle); + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + transaction_.xa_replay(lock); + } + + // + // BF aborting + // + /** + * Brute force abort a transaction. This method should be + * called by a transaction which needs to BF abort a conflicting + * locally processing transaction. + */ + int bf_abort(wsrep::seqno bf_seqno) + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + assert(mode_ == m_local || transaction_.is_streaming()); + return transaction_.bf_abort(lock, bf_seqno); + } + /** + * Brute force abort a transaction in total order. This method + * should be called by the TOI operation which needs to + * BF abort a transaction. + */ + int total_order_bf_abort(wsrep::seqno bf_seqno) + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + assert(mode_ == m_local || transaction_.is_streaming()); + return transaction_.total_order_bf_abort(lock, bf_seqno); + } + + /** + * Adopt a streaming transaction state. This is must be + * called from high_priority_service::adopt_transaction() + * during streaming transaction rollback. The call will + * set up enough context for handling the rollback + * fragment. + */ + void adopt_transaction(const wsrep::transaction& transaction) + { + assert(mode_ == m_high_priority); + transaction_.adopt(transaction); + } + + /** + * Adopt (store) transaction applying error for further processing. + */ + void adopt_apply_error(wsrep::mutable_buffer& err) + { + assert(mode_ == m_high_priority); + transaction_.adopt_apply_error(err); + } + + /** + * Clone enough state from another transaction so that replaing will + * be possible with a transaction contained in this client state. + * + * @param transaction Transaction which is to be replied in this + * client state + */ + void clone_transaction_for_replay(const wsrep::transaction& transaction) + { + // assert(mode_ == m_high_priority); + transaction_.clone_for_replay(transaction); + } + + /** @name Non-transactional operations */ + /** @{*/ + + /** + * Enter total order isolation critical section. If the wait_until + * is given non-default value, the operation is retried until + * successful, the given time point is reached or the client is + * interrupted. + * + * @param key_array Array of keys + * @param buffer Buffer containing the action to execute inside + * total order isolation section + * @param flags Provider flags for TOI operation + * @param wait_until Time point to wait until for successful + * certification. + * + * @return Zero on success, non-zero otherwise. + */ + int enter_toi_local( + const wsrep::key_array& key_array, + const wsrep::const_buffer& buffer, + std::chrono::time_point<wsrep::clock> + wait_until = + std::chrono::time_point<wsrep::clock>()); + /** + * Enter applier TOI mode + * + * @param ws_meta Write set meta data + */ + void enter_toi_mode(const wsrep::ws_meta& ws_meta); + + /** + * Return true if the client_state is under TOI operation. + */ + bool in_toi() const + { + return (toi_meta_.seqno().is_undefined() == false); + } + + /** + * Return the mode where client entered into TOI mode. + * The return value can be either m_local or + * m_high_priority. + */ + enum mode toi_mode() const + { + return toi_mode_; + } + + /** + * Leave total order isolation critical section. + * (for local mode clients) + * + * @param err definition of the error that happened during the + * execution of TOI operation (empty for no error) + */ + int leave_toi_local(const wsrep::mutable_buffer& err); + + /** + * Leave applier TOI mode. + */ + void leave_toi_mode(); + + /** + * Begin rolling schema upgrade operation. + * + * @param timeout Timeout in seconds to wait for committing + * connections to finish. + */ + int begin_rsu(int timeout); + + /** + * End rolling schema upgrade operation. + */ + int end_rsu(); + + /** + * Begin non-blocking operation. + * + * The NBO operation is started by grabbing TOI critical + * section. The keys and buffer are certifed as in TOI + * operation. If the call fails due to error returned by + * the provider, the provider error code can be retrieved + * by current_error_status() call. + * + * If the wait_until is given non-default value, the operation is + * retried until successful, the given time point is reached or the + * client is interrupted. + * + * @param keys Array of keys for NBO operation. + * @param buffer NBO write set + * @param wait_until Time point to wait until for successful certification. + * @return Zero in case of success, non-zero in case of failure. + */ + int begin_nbo_phase_one( + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + std::chrono::time_point<wsrep::clock> + wait_until = + std::chrono::time_point<wsrep::clock>()); + + /** + * End non-blocking operation phase after aquiring required + * resources for operation. + * + * @param err definition of the error that happened during the + * execution of phase one (empty for no error) + */ + int end_nbo_phase_one(const wsrep::mutable_buffer& err); + + /** + * Enter in NBO mode. This method should be called when the + * applier launches the asynchronous process to perform the + * operation. The purpose of the call is to adjust + * the state and set write set meta data. + * + * @param ws_meta Write set meta data. + * + * @return Zero in case of success, non-zero on failure. + */ + int enter_nbo_mode(const wsrep::ws_meta& ws_meta); + + /** + * Begin non-blocking operation phase two. The keys argument + * passed to this call must contain the same keys which were + * passed to begin_nbo_phase_one(). + * + * If the wait_until is given non-default value, the operation is + * retried until successful, the given time point is reached or the + * client is interrupted. + * + * @param keys Key array. + * @param wait_until Time point to wait until for entering TOI for + * phase two. + */ + int begin_nbo_phase_two(const wsrep::key_array& keys, + std::chrono::time_point<wsrep::clock> + wait_until = + std::chrono::time_point<wsrep::clock>()); + + /** + * End non-blocking operation phase two. This call will + * release TOI critical section and set the mode to m_local. + * + * @param err definition of the error that happened during the + * execution of phase two (empty for no error) + */ + int end_nbo_phase_two(const wsrep::mutable_buffer& err); + + /** + * Get reference to the client mutex. + * + * @return Reference to the client mutex. + */ + wsrep::mutex& mutex() { return mutex_; } + + /** + * Get server context associated the the client session. + * + * @return Reference to server context. + */ + wsrep::server_state& server_state() const + { return server_state_; } + + wsrep::client_service& client_service() const + { return client_service_; } + /** + * Get reference to the Provider which is associated + * with the client context. + * + * @return Reference to the provider. + * @throw wsrep::runtime_error if no providers are associated + * with the client context. + * + * @todo Should be removed. + */ + wsrep::provider& provider() const; + + /** + * Get Client identifier. + * + * @return Client Identifier + */ + client_id id() const { return id_; } + + /** + * Get Client mode. + * + * @todo Enforce mutex protection if called from other threads. + * + * @return Client mode. + */ + enum mode mode() const { return mode_; } + + /** + * Get Client state. + * + * @todo Enforce mutex protection if called from other threads. + * + * @return Client state + */ + enum state state() const { return state_; } + + /** + * Return a const reference to the transaction associated + * with the client state. + */ + const wsrep::transaction& transaction() const + { + return transaction_; + } + + /** + * Mark the transaction associated with the client state + * (if any), as unsafe for parallel applying + * + * @return Zero on success, non-zero on error. + */ + int mark_transaction_pa_unsafe() + { + if (transaction_.active()) + { + transaction_.pa_unsafe(true); + return 0; + } + return 1; + } + + const wsrep::ws_meta& toi_meta() const + { + return toi_meta_; + } + + /** + * Do sync wait operation. If the method fails, current_error() + * can be inspected about the reason of error. + * + * @param Sync wait timeout in seconds. + * + * @return Zero on success, non-zero on error. + */ + int sync_wait(int timeout); + + /** + * Return the current sync wait GTID. + * + * Sync wait GTID is updated on each sync_wait() call and + * reset to wsrep::gtid::undefined() in after_command_after_result() + * method. The variable can thus be used to check if a sync wait + * has been performend for the current client command. + */ + const wsrep::gtid& sync_wait_gtid() const + { + return sync_wait_gtid_; + } + /** + * Return the last written GTID. + */ + const wsrep::gtid& last_written_gtid() const + { + return last_written_gtid_; + } + + /** + * Set debug logging level. + * + * Levels: + * 0 - Debug logging is disabled + * 1..n - Debug logging with increasing verbosity. + */ + void debug_log_level(int level) { debug_log_level_ = level; } + + /** + * Return current debug logging level. The return value + * is a maximum of client state and server state debug log + * levels. + * + * @return Current debug log level. + */ + int debug_log_level() const + { + return std::max(debug_log_level_, + wsrep::log::debug_log_level()); + } + + // + // Error handling + // + + /** + * Reset the current error state. + * + * @todo There should be some protection about when this can + * be done. + */ + void reset_error() + { + current_error_ = wsrep::e_success; + } + + /** + * Return current error code. + * + * @return Current error code. + */ + enum wsrep::client_error current_error() const + { + return current_error_; + } + + enum wsrep::provider::status current_error_status() const + { + return current_error_status_; + } + protected: + /** + * Client context constuctor. This is protected so that it + * can be called from derived class constructors only. + */ + client_state(wsrep::mutex& mutex, + wsrep::condition_variable& cond, + wsrep::server_state& server_state, + wsrep::client_service& client_service, + const client_id& id, + enum mode mode) + : owning_thread_id_(wsrep::this_thread::get_id()) + , rollbacker_active_(false) + , mutex_(mutex) + , cond_(cond) + , server_state_(server_state) + , client_service_(client_service) + , id_(id) + , mode_(mode) + , toi_mode_(m_undefined) + , state_(s_none) + , state_hist_() + , transaction_(*this) + , toi_meta_() + , nbo_meta_() + , allow_dirty_reads_() + , sync_wait_gtid_() + , last_written_gtid_() + , debug_log_level_(0) + , current_error_(wsrep::e_success) + , current_error_status_(wsrep::provider::success) + , keep_command_error_() + { } + + private: + client_state(const client_state&); + client_state& operator=(client_state&); + + friend class client_state_switch; + friend class high_priority_context; + friend class transaction; + + void do_acquire_ownership(wsrep::unique_lock<wsrep::mutex>& lock); + // Wait for sync rollbacker to finish, with lock. Changes state + // to exec. + void do_wait_rollback_complete_and_acquire_ownership( + wsrep::unique_lock<wsrep::mutex>& lock); + void update_last_written_gtid(const wsrep::gtid&); + void debug_log_state(const char*) const; + void debug_log_keys(const wsrep::key_array& keys) const; + void state(wsrep::unique_lock<wsrep::mutex>& lock, enum state state); + void mode(wsrep::unique_lock<wsrep::mutex>& lock, enum mode mode); + + // Override current client error status. Optionally provide + // an error status from the provider if the error was caused + // by the provider call. + void override_error(enum wsrep::client_error error, + enum wsrep::provider::status status = + wsrep::provider::success); + + // Poll provider::enter_toi() until return status from provider + // does not indicate certification failure, timeout expires + // or client is interrupted. + enum wsrep::provider::status + poll_enter_toi(wsrep::unique_lock<wsrep::mutex>& lock, + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + wsrep::ws_meta& meta, + int flags, + std::chrono::time_point<wsrep::clock> wait_until, + bool& timed_out); + void enter_toi_common(wsrep::unique_lock<wsrep::mutex>&); + void leave_toi_common(); + + wsrep::thread::id owning_thread_id_; + bool rollbacker_active_; + wsrep::mutex& mutex_; + wsrep::condition_variable& cond_; + wsrep::server_state& server_state_; + wsrep::client_service& client_service_; + wsrep::client_id id_; + enum mode mode_; + enum mode toi_mode_; + enum state state_; + std::vector<enum state> state_hist_; + wsrep::transaction transaction_; + wsrep::ws_meta toi_meta_; + wsrep::ws_meta nbo_meta_; + bool allow_dirty_reads_; + wsrep::gtid sync_wait_gtid_; + wsrep::gtid last_written_gtid_; + int debug_log_level_; + enum wsrep::client_error current_error_; + enum wsrep::provider::status current_error_status_; + bool keep_command_error_; + + /** + * Marks external rollbacker thread for the client + * as active. This will block client in before_command(), until + * rolbacker has released the client. + */ + void set_rollbacker_active(bool value) + { + rollbacker_active_ = value; + } + + bool is_rollbacker_active() + { + return rollbacker_active_; + } + }; + + static inline const char* to_c_string( + enum wsrep::client_state::state state) + { + switch (state) + { + case wsrep::client_state::s_none: return "none"; + case wsrep::client_state::s_idle: return "idle"; + case wsrep::client_state::s_exec: return "exec"; + case wsrep::client_state::s_result: return "result"; + case wsrep::client_state::s_quitting: return "quit"; + } + return "unknown"; + } + + static inline std::string to_string(enum wsrep::client_state::state state) + { + return to_c_string(state); + } + + static inline const char* to_c_string(enum wsrep::client_state::mode mode) + { + switch (mode) + { + case wsrep::client_state::m_undefined: return "undefined"; + case wsrep::client_state::m_local: return "local"; + case wsrep::client_state::m_high_priority: return "high priority"; + case wsrep::client_state::m_toi: return "toi"; + case wsrep::client_state::m_rsu: return "rsu"; + case wsrep::client_state::m_nbo: return "nbo"; + } + return "unknown"; + } + + static inline std::string to_string(enum wsrep::client_state::mode mode) + { + return to_c_string(mode); + } + + /** + * Utility class to switch the client state to high priority + * mode. The client is switched back to the original mode + * when the high priority context goes out of scope. + */ + class high_priority_context + { + public: + high_priority_context(wsrep::client_state& client) + : client_(client) + , orig_mode_(client.mode_) + { + wsrep::unique_lock<wsrep::mutex> lock(client.mutex_); + client.mode(lock, wsrep::client_state::m_high_priority); + } + virtual ~high_priority_context() + { + wsrep::unique_lock<wsrep::mutex> lock(client_.mutex_); + assert(client_.mode() == wsrep::client_state::m_high_priority); + client_.mode(lock, orig_mode_); + } + private: + wsrep::client_state& client_; + enum wsrep::client_state::mode orig_mode_; + }; +} + +#endif // WSREP_CLIENT_STATE_HPP diff --git a/wsrep-lib/include/wsrep/compiler.hpp b/wsrep-lib/include/wsrep/compiler.hpp new file mode 100644 index 00000000..5f8e1ce0 --- /dev/null +++ b/wsrep-lib/include/wsrep/compiler.hpp @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + + +/** @file compiler.hpp + * + * Compiler specific macro definitions. + * + * WSREP_NOEXCEPT - Specifies that the method/function does not throw. If + * and exception is thrown inside, std::terminate is called + * without propagating the exception. + * Set to "noexcept" if the compiler supports it, otherwise + * left empty. + * WSREP_NORETURN - Indicates that the method/function does not return. + * Set to attribute "[[noreturn]]" if the compiler supports, + * it, otherwise "__attribute__((noreturn))". + * WSREP_OVERRIDE - Set to "override" if the compiler supports it, otherwise + * left empty. + * WSREP_UNUSED - Can be used to mark variables which may be present in + * debug builds but not in release builds. + * WSREP_FALLTHROUGH - Silence implicit fallthrough warning. + */ + +#ifndef WSREP_LIB_COMPILER_HPP +#define WSREP_LIB_COMPILER_HPP + +#if __cplusplus >= 201103L && !(__GNUC__ == 4 && __GNUG_MINOR__ < 8) +#define WSREP_NORETURN [[noreturn]] +#else +#define WSREP_NORETURN __attribute__((noreturn)) +#endif // __cplusplus >= 201103L && !(__GNUC__ == 4 && __GNUG_MINOR__ < 8) + +#if __cplusplus >= 201103L +#define WSREP_NOEXCEPT noexcept +#define WSREP_OVERRIDE override +#else +#define WSREP_NOEXCEPT +#define WSREP_OVERRIDE +#endif // __cplusplus >= 201103L +#define WSREP_UNUSED __attribute__((unused)) + +#if __GNUC__ >= 7 +#define WSREP_FALLTHROUGH __attribute__((fallthrough)) +#elif defined(__clang__) +# if defined(__has_warning) +# if __has_feature(cxx_attributes) && __has_warning("-Wimplicit-fallthrough") +# define WSREP_FALLTHROUGH [[clang::fallthrough]] +# endif +# endif +#else // __clang __ +#define WSREP_FALLTHROUGH ((void)0) +#endif // __GNUC__ >= 7 || (__clang__ && __clang_major__ >= 10) + +#endif // WSREP_LIB_COMPILER_HPP diff --git a/wsrep-lib/include/wsrep/condition_variable.hpp b/wsrep-lib/include/wsrep/condition_variable.hpp new file mode 100644 index 00000000..25a0e16e --- /dev/null +++ b/wsrep-lib/include/wsrep/condition_variable.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_CONDITION_VARIABLE_HPP +#define WSREP_CONDITION_VARIABLE_HPP + +#include "compiler.hpp" +#include "lock.hpp" + +#include <cstdlib> + +namespace wsrep +{ + class condition_variable + { + public: + condition_variable() { } + virtual ~condition_variable() { } + virtual void notify_one() = 0; + virtual void notify_all() = 0; + virtual void wait(wsrep::unique_lock<wsrep::mutex>& lock) = 0; + private: + condition_variable(const condition_variable&); + condition_variable& operator=(const condition_variable&); + }; + + // Default pthreads based condition variable implementation + class default_condition_variable : public condition_variable + { + public: + default_condition_variable() + : cond_() + { + if (pthread_cond_init(&cond_, 0)) + { + throw wsrep::runtime_error("Failed to initialized condvar"); + } + } + + ~default_condition_variable() + { + if (pthread_cond_destroy(&cond_)) + { + ::abort(); + } + } + void notify_one() WSREP_OVERRIDE + { + (void)pthread_cond_signal(&cond_); + } + + void notify_all() WSREP_OVERRIDE + { + (void)pthread_cond_broadcast(&cond_); + } + + void wait(wsrep::unique_lock<wsrep::mutex>& lock) WSREP_OVERRIDE + { + if (pthread_cond_wait( + &cond_, + reinterpret_cast<pthread_mutex_t*>(lock.mutex()->native()))) + { + throw wsrep::runtime_error("Cond wait failed"); + } + } + + private: + pthread_cond_t cond_; + }; + +} + +#endif // WSREP_CONDITION_VARIABLE_HPP diff --git a/wsrep-lib/include/wsrep/encryption_service.hpp b/wsrep-lib/include/wsrep/encryption_service.hpp new file mode 100644 index 00000000..0efcde06 --- /dev/null +++ b/wsrep-lib/include/wsrep/encryption_service.hpp @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_ENCRYPTION_SERVICE_HPP +#define WSREP_ENCRYPTION_SERVICE_HPP + +#include "buffer.hpp" + +namespace wsrep +{ + /** + * Encryption service. + */ + class encryption_service + { + public: + + virtual ~encryption_service() { } + + /** + * Encryption/decryption callback. Can be NULL for no encryption. + * + * @param ctx Encryption context + * @param key Key used in encryption/decryption + * @param iv IV vector + * @param input Input data buffer + * @param output An output buffer, must be at least the size of the input + * data plus unwritten bytes from the previous call(s). + * @param encrypt Flag used to either encrypt or decrypt data + * @param last true if this is the last buffer to encrypt/decrypt + * + * @return Number of bytes written to output or a negative error code. + */ + virtual int do_crypt(void** ctx, + wsrep::const_buffer& key, + const char (*iv)[32], + wsrep::const_buffer& input, + void* output, + bool encrypt, + bool last) = 0; + + /** + * Is encryption enabled on server. + * + * @return True if encryption is enabled. False otherwise + */ + virtual bool encryption_enabled() = 0; + }; +} + +#endif // WSREP_ENCRYPTION_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/event_service.hpp b/wsrep-lib/include/wsrep/event_service.hpp new file mode 100644 index 00000000..9cc8465f --- /dev/null +++ b/wsrep-lib/include/wsrep/event_service.hpp @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2020 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + + +/** @file event_service.hpp + * + * Service interface for providing events to DBMS. + */ + +#ifndef WSREP_EVENT_SERVICE_HPP +#define WSREP_EVENT_SERVICE_HPP + +#include <string> + +namespace wsrep +{ + + /** @class event_service + * + * Event service interface. This provides an interface corresponding + * to wsrep-API event service. For details see + * wsrep-API/wsrep_event_service.h + */ + class event_service + { + public: + virtual ~event_service() { } + + /** + * Process event with name name and value value. + */ + virtual void process_event(const std::string& name, + const std::string& value) = 0; + }; +} + +#endif // WSREP_EVENT_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/exception.hpp b/wsrep-lib/include/wsrep/exception.hpp new file mode 100644 index 00000000..ef19de9a --- /dev/null +++ b/wsrep-lib/include/wsrep/exception.hpp @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_EXCEPTION_HPP +#define WSREP_EXCEPTION_HPP + +#include <stdexcept> +#include <cstdlib> + +namespace wsrep +{ + extern bool abort_on_exception; + + class runtime_error : public std::runtime_error + { + public: + runtime_error(const char* msg) + : std::runtime_error(msg) + { + if (abort_on_exception) + { + ::abort(); + } + } + + runtime_error(const std::string& msg) + : std::runtime_error(msg) + { + if (abort_on_exception) + { + ::abort(); + } + } + }; + + class not_implemented_error : public std::exception + { + public: + not_implemented_error() + : std::exception() + { + ::abort(); + } + }; + + class fatal_error : public std::exception + { + }; +} + + +#endif // WSREP_EXCEPTION_HPP diff --git a/wsrep-lib/include/wsrep/gtid.hpp b/wsrep-lib/include/wsrep/gtid.hpp new file mode 100644 index 00000000..0d49c58d --- /dev/null +++ b/wsrep-lib/include/wsrep/gtid.hpp @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_GTID_HPP +#define WSREP_GTID_HPP + +#include "id.hpp" +#include "seqno.hpp" +#include "compiler.hpp" + +#include <iosfwd> + +/** + * Minimum number of bytes guaratneed to store GTID string representation, + * terminating '\0' not included (36 + 1 + 20). + */ +#define WSREP_LIB_GTID_C_STR_LEN 57 + +namespace wsrep +{ + class gtid + { + public: + gtid() + : id_() + , seqno_() + { } + gtid(const wsrep::id& id, wsrep::seqno seqno) + : id_(id) + , seqno_(seqno) + { } + const wsrep::id& id() const { return id_; } + wsrep::seqno seqno() const { return seqno_ ; } + bool is_undefined() const + { + return (seqno_.is_undefined() && id_.is_undefined()); + } + static const wsrep::gtid& undefined() + { + return undefined_; + } + bool operator==(const gtid& other) const + { + return ( + seqno_ == other.seqno_ && + id_ == other.id_ + ); + } + private: + static const wsrep::gtid undefined_; + wsrep::id id_; + wsrep::seqno seqno_; + }; + + /** + * Scan a GTID from C string. + * + * @param buf Buffer containing the string + * @param len Length of buffer + * @param[out] gtid Gtid to be printed to + * + * @return Number of bytes scanned, negative value on error. + */ + ssize_t scan_from_c_str(const char* buf, size_t buf_len, + wsrep::gtid& gtid); + + /* + * Deprecated version of the above for backwards compatibility. + * Will be removed when all the superprojects have been updated. + */ + static inline ssize_t gtid_scan_from_c_str(const char* buf, size_t buf_len, + wsrep::gtid& gtid) + { + return scan_from_c_str(buf, buf_len, gtid); + } + + /** + * Print a GTID into character buffer. + * + * @param gtid GTID to be printed. + * @param buf Pointer to the beginning of the buffer + * @param buf_len Buffer length + * + * @return Number of characters printed or negative value for error + */ + ssize_t print_to_c_str(const wsrep::gtid& gtid, char* buf, size_t buf_len); + + /* + * Deprecated version of the above for backwards compatibility. + * Will be removed when all the superprojects have been updated. + */ + static inline ssize_t gtid_print_to_c_str(const wsrep::gtid& gtid, + char* buf, size_t buf_len) + { + return print_to_c_str(gtid, buf, buf_len); + } + + /** + * Return minimum number of chars required to store any GTID. + */ + static inline size_t gtid_c_str_len() { return WSREP_LIB_GTID_C_STR_LEN; } + + /** + * Overload for ostream operator<<. + */ + std::ostream& operator<<(std::ostream&, const wsrep::gtid&); + + /** + * Overload for istream operator>>. + */ + std::istream& operator>>(std::istream&, wsrep::gtid&); +} + +#endif // WSREP_GTID_HPP diff --git a/wsrep-lib/include/wsrep/high_priority_service.hpp b/wsrep-lib/include/wsrep/high_priority_service.hpp new file mode 100644 index 00000000..f1d011ec --- /dev/null +++ b/wsrep-lib/include/wsrep/high_priority_service.hpp @@ -0,0 +1,266 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file high_priority_service.hpp + * + * Interface for services for applying high priority transactions. + */ +#ifndef WSREP_HIGH_PRIORITY_SERVICE_HPP +#define WSREP_HIGH_PRIORITY_SERVICE_HPP + +#include "xid.hpp" +#include "server_state.hpp" + +namespace wsrep +{ + class ws_handle; + class ws_meta; + class const_buffer; + class transaction; + class high_priority_service + { + public: + high_priority_service(wsrep::server_state& server_state) + : server_state_(server_state) + , must_exit_() { } + virtual ~high_priority_service() { } + + int apply(const ws_handle& ws_handle, const ws_meta& ws_meta, + const const_buffer& data) + { + return server_state_.on_apply(*this, ws_handle, ws_meta, data); + } + /** + * Start a new transaction + */ + virtual int start_transaction(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; + + /** + * Start the next fragment of current transaction + */ + virtual int next_fragment(const wsrep::ws_meta&) = 0; + + /** + * Return transaction object associated to high priority + * service state. + */ + virtual const wsrep::transaction& transaction() const = 0; + + /** + * Adopt a transaction. + */ + virtual int adopt_transaction(const wsrep::transaction&) = 0; + + /** + * Apply a write set. + * + * A write set applying happens always + * as a part of the transaction. The caller must start a + * new transaction before applying a write set and must + * either commit to make changes persistent or roll back. + * + * @params ws_meta Write set meta data + * @params ws Write set buffer + * @params err Buffer to store error data + */ + virtual int apply_write_set(const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& ws, + wsrep::mutable_buffer& err) = 0; + + /** + * Append a fragment into fragment storage. This will be + * called after a non-committing fragment belonging to + * streaming transaction has been applied. The call will + * not be done within an open transaction, the implementation + * must start a new transaction and commit. + * + * Note that the call is not done from streaming transaction + * context, but from applier context. + */ + virtual int append_fragment_and_commit( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data, + const wsrep::xid& xid) = 0; + + /** + * Remove fragments belonging to streaming transaction. + * This method will be called within the streaming transaction + * before commit. The implementation must not commit the + * whole transaction. The call will be done from streaming + * transaction context. + * + * @param ws_meta Write set meta data for commit fragment. + * + * @return Zero on success, non-zero on failure. + */ + virtual int remove_fragments(const wsrep::ws_meta& ws_meta) = 0; + + /** + * Commit a transaction. + * An implementation must call + * wsrep::client_state::prepare_for_ordering() to set + * the ws_handle and ws_meta before the commit if the + * commit process will go through client state commit + * processing. Otherwise the implementation must release + * commit order explicitly via provider. + * + * @param ws_handle Write set handle + * @param ws_meta Write set meta + * + * @return Zero in case of success, non-zero in case of failure + */ + virtual int commit(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) = 0; + /** + * Roll back a transaction + * + * An implementation must call + * wsrep::client_state::prepare_for_ordering() to set + * the ws_handle and ws_meta before the rollback if + * the rollback process will go through client state + * rollback processing. Otherwise the implementation + * must release commit order explicitly via provider. + * + * @param ws_handle Write set handle + * @param ws_meta Write set meta + * + * @return Zero in case of success, non-zero in case of failure + */ + virtual int rollback(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) = 0; + + /** + * Apply a TOI operation. + * + * TOI operation is a standalone operation and should not + * be executed as a part of a transaction. + * + * @params ws_meta Write set meta data + * @params ws Write set buffer + * @params err Buffer to store error data + */ + virtual int apply_toi(const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& ws, + wsrep::mutable_buffer& err) = 0; + + /** + * Apply NBO begin event. + * + * The responsibility of the implementation is to start + * an asynchronous process which will complete the operation. + * The call is done under total order isolation, and the + * isolation is released by the caller after the method + * returns. It is a responsibility of the asynchronous process + * to complete the second phase of NBO. + * + * @param ws_meta Write set meta data. + * @param data Buffer containing the command to execute. + * @params err Buffer to store error data + * + * @return Zero in case of success, non-zero if the asynchronous + * process could not be started. + */ + virtual int apply_nbo_begin(const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err) = 0; + + /** + * Actions to take after applying a write set was completed. + */ + virtual void after_apply() = 0; + + /** + * Store global execution context for high priority service. + */ + virtual void store_globals() = 0; + + /** + * Reset global execution context for high priority service. + */ + virtual void reset_globals() = 0; + + /** + * Switch exection context to context of orig_hps. + * + * @param orig_hps Original high priority service. + */ + virtual void switch_execution_context( + wsrep::high_priority_service& orig_hps) = 0; + + /** + * Log a dummy write set which is either SR transaction fragment + * or roll back fragment. The implementation must release + * commit order inside the call. + * + * @params ws_handle Write set handle + * @params ws_meta Write set meta data + * @params err Optional applying error data buffer, may be modified + * + * @return Zero in case of success, non-zero on failure + */ + virtual int log_dummy_write_set(const ws_handle& ws_handle, + const ws_meta& ws_meta, + wsrep::mutable_buffer& err) = 0; + + /** + * Adopt (store) apply error description for further reporting + * to provider, source buffer may be modified. + */ + virtual void adopt_apply_error(wsrep::mutable_buffer& err) = 0; + + virtual bool is_replaying() const = 0; + + bool must_exit() const { return must_exit_; } + + /** + * Debug facility to crash the server at given point. + */ + virtual void debug_crash(const char* crash_point) = 0; + + protected: + wsrep::server_state& server_state_; + bool must_exit_; + }; + + class high_priority_switch + { + public: + high_priority_switch(high_priority_service& orig_service, + high_priority_service& current_service) + : orig_service_(orig_service) + , current_service_(current_service) + { + orig_service_.reset_globals(); + current_service_.switch_execution_context(orig_service_); + current_service_.store_globals(); + } + ~high_priority_switch() + { + current_service_.reset_globals(); + orig_service_.store_globals(); + } + private: + high_priority_service& orig_service_; + high_priority_service& current_service_; + }; +} + +#endif // WSREP_HIGH_PRIORITY_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/id.hpp b/wsrep-lib/include/wsrep/id.hpp new file mode 100644 index 00000000..fc1e82b2 --- /dev/null +++ b/wsrep-lib/include/wsrep/id.hpp @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file id.hpp + * + * A generic identifier utility class. + */ +#ifndef WSREP_ID_HPP +#define WSREP_ID_HPP + +#include "exception.hpp" + +#include <iosfwd> +#include <cstring> // std::memset() + +namespace wsrep +{ + /** + * The idientifier class stores identifiers either in UUID + * format or in string format. The storage format is decided + * upon construction. If the given string contains a valid + * UUID, the storage format will be binary. Otherwise the + * string will be copied verbatim. If the string format is used, + * the maximum length of the identifier is limited to 16 bytes. + */ + class id + { + public: + typedef struct native_type { unsigned char buf[16]; } native_type; + /** + * Default constructor. Constructs an empty identifier. + */ + id() : data_() { std::memset(data_.buf, 0, sizeof(data_.buf)); } + + /** + * Construct from string. The input string may contain either + * valid UUID or a string with maximum 16 bytes length. + */ + id(const std::string&); + + /** + * Construct from void pointer. + */ + id (const void* data, size_t size) : data_() + { + if (size > 16) + { + throw wsrep::runtime_error("Too long identifier"); + } + std::memset(data_.buf, 0, sizeof(data_.buf)); + std::memcpy(data_.buf, data, size); + } + + bool operator<(const id& other) const + { + return (std::memcmp(data_.buf, other.data_.buf, sizeof(data_.buf)) < 0); + } + + bool operator==(const id& other) const + { + return (std::memcmp(data_.buf, other.data_.buf, sizeof(data_.buf)) == 0); + } + bool operator!=(const id& other) const + { + return !(*this == other); + } + const void* data() const { return data_.buf; } + + size_t size() const { return sizeof(data_); } + + bool is_undefined() const + { + return (*this == undefined()); + } + + static const wsrep::id& undefined() + { + return undefined_; + } + private: + static const wsrep::id undefined_; + native_type data_; + }; + + std::ostream& operator<<(std::ostream&, const wsrep::id& id); + std::istream& operator>>(std::istream&, wsrep::id& id); +} + +#endif // WSREP_ID_HPP diff --git a/wsrep-lib/include/wsrep/key.hpp b/wsrep-lib/include/wsrep/key.hpp new file mode 100644 index 00000000..85c266c6 --- /dev/null +++ b/wsrep-lib/include/wsrep/key.hpp @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_KEY_HPP +#define WSREP_KEY_HPP + +#include "exception.hpp" +#include "buffer.hpp" + +#include <iosfwd> + +namespace wsrep +{ + /** @class key + * + * Certification key type. + */ + class key + { + public: + enum type + { + shared, + reference, + update, + exclusive + }; + + key(enum type type) + : type_(type) + , key_parts_() + , key_parts_len_() + { } + + /** + * Append key part to key. + * + * @param ptr Pointer to key part data. The caller is supposed to take + * care that the pointer remains valid over the lifetime + * if the key object. + * @param len Length of the key part data. + */ + void append_key_part(const void* ptr, size_t len) + { + if (key_parts_len_ == 3) + { + throw wsrep::runtime_error("key parts exceed maximum of 3"); + } + key_parts_[key_parts_len_] = wsrep::const_buffer(ptr, len); + ++key_parts_len_; + } + + enum type type() const + { + return type_; + } + + size_t size() const + { + return key_parts_len_; + } + + const wsrep::const_buffer* key_parts() const + { + return key_parts_; + } + private: + + enum type type_; + wsrep::const_buffer key_parts_[3]; + size_t key_parts_len_; + }; + + typedef std::vector<wsrep::key> key_array; + + std::ostream& operator<<(std::ostream&, enum wsrep::key::type); + std::ostream& operator<<(std::ostream&, const wsrep::key&); +} + +#endif // WSREP_KEY_HPP diff --git a/wsrep-lib/include/wsrep/lock.hpp b/wsrep-lib/include/wsrep/lock.hpp new file mode 100644 index 00000000..6537901a --- /dev/null +++ b/wsrep-lib/include/wsrep/lock.hpp @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_LOCK_HPP +#define WSREP_LOCK_HPP + +#include <mutex> + +namespace wsrep +{ + template <class C> + using unique_lock = std::unique_lock<C>; +} + +#endif // WSREP_LOCK_HPP diff --git a/wsrep-lib/include/wsrep/logger.hpp b/wsrep-lib/include/wsrep/logger.hpp new file mode 100644 index 00000000..a15873c2 --- /dev/null +++ b/wsrep-lib/include/wsrep/logger.hpp @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2018-2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_LOGGER_HPP +#define WSREP_LOGGER_HPP + +#include "mutex.hpp" +#include "lock.hpp" +#include "atomic.hpp" + +#include <iosfwd> +#include <sstream> + +#define WSREP_LOG_DEBUG(debug_level_fn, debug_level, msg) \ + do { \ + if (debug_level_fn >= debug_level) wsrep::log_debug() << msg; \ + } while (0) + +namespace wsrep +{ + class log + { + public: + enum level + { + debug, + info, + warning, + error, + unknown + }; + + enum debug_level + { + debug_level_server_state = 1, + debug_level_transaction, + debug_level_streaming, + debug_level_client_state + }; + + /** + * Signature for user defined logger callback function. + * + * @param pfx optional internally defined prefix for the message + * @param msg message to log + */ + typedef void (*logger_fn_type)(level l, + const char* pfx, const char* msg); + + static const char* to_c_string(enum level level) + { + switch (level) + { + case debug: return "debug"; + case info: return "info"; + case warning: return "warning"; + case error: return "error"; + case unknown: break; + }; + return "unknown"; + } + + log(enum wsrep::log::level level, const char* prefix = "L:") + : level_(level) + , prefix_(prefix) + , oss_() + { } + + ~log() + { + if (logger_fn_) + { + logger_fn_(level_, prefix_, oss_.str().c_str()); + } + else + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + os_ << prefix_ << oss_.str() << std::endl; + } + } + + template <typename T> + std::ostream& operator<<(const T& val) + { + return (oss_ << val); + } + + /** + * Set user defined logger callback function. + */ + static void logger_fn(logger_fn_type); + + /** + * Set debug log level from client + */ + static void debug_log_level(int debug_level); + + /** + * Get current debug log level + */ + static int debug_log_level(); + + private: + log(const log&); + log& operator=(const log&); + enum level level_; + const char* prefix_; + std::ostringstream oss_; + static wsrep::mutex& mutex_; + static std::ostream& os_; + static logger_fn_type logger_fn_; + static std::atomic_int debug_log_level_; + }; + + class log_error : public log + { + public: + log_error() + : log(error) { } + }; + + class log_warning : public log + { + public: + log_warning() + : log(warning) { } + }; + + class log_info : public log + { + public: + log_info() + : log(info) { } + }; + + class log_debug : public log + { + public: + log_debug() + : log(debug) { } + }; +} + +#endif // WSREP_LOGGER_HPP diff --git a/wsrep-lib/include/wsrep/mutex.hpp b/wsrep-lib/include/wsrep/mutex.hpp new file mode 100644 index 00000000..9b0173bf --- /dev/null +++ b/wsrep-lib/include/wsrep/mutex.hpp @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_MUTEX_HPP +#define WSREP_MUTEX_HPP + +#include "compiler.hpp" +#include "exception.hpp" + + +#include <pthread.h> + +namespace wsrep +{ + /** + * Mutex interface. + */ + class mutex + { + public: + mutex() { } + virtual ~mutex() { } + virtual void lock() = 0; + virtual void unlock() = 0; + /* Return native handle */ + virtual void* native() = 0; + private: + mutex(const mutex& other); + mutex& operator=(const mutex& other); + }; + + // Default pthread implementation + class default_mutex : public wsrep::mutex + { + public: + default_mutex() + : wsrep::mutex(), + mutex_() + { + if (pthread_mutex_init(&mutex_, 0)) + { + throw wsrep::runtime_error("mutex init failed"); + } + } + ~default_mutex() + { + if (pthread_mutex_destroy(&mutex_)) ::abort(); + } + + void lock() WSREP_OVERRIDE + { + if (pthread_mutex_lock(&mutex_)) + { + throw wsrep::runtime_error("mutex lock failed"); + } + } + + void unlock() WSREP_OVERRIDE + { + if (pthread_mutex_unlock(&mutex_)) + { + throw wsrep::runtime_error("mutex unlock failed"); + } + } + + void* native() WSREP_OVERRIDE + { + return &mutex_; + } + + private: + pthread_mutex_t mutex_; + }; +} + +#endif // WSREP_MUTEX_HPP diff --git a/wsrep-lib/include/wsrep/provider.hpp b/wsrep-lib/include/wsrep/provider.hpp new file mode 100644 index 00000000..ea7e97fe --- /dev/null +++ b/wsrep-lib/include/wsrep/provider.hpp @@ -0,0 +1,507 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_PROVIDER_HPP +#define WSREP_PROVIDER_HPP + +#include "gtid.hpp" +#include "key.hpp" +#include "buffer.hpp" +#include "client_id.hpp" +#include "transaction_id.hpp" +#include "compiler.hpp" + +#include <cassert> +#include <cstring> + +#include <string> +#include <vector> +#include <ostream> + +/** + * Empty provider magic. If none provider is passed to make_provider(), + * a dummy provider is loaded. + */ +#define WSREP_LIB_PROVIDER_NONE "none" + +namespace wsrep +{ + class server_state; + class high_priority_service; + class thread_service; + class tls_service; + class allowlist_service; + class event_service; + + class stid + { + public: + stid() + : server_id_() + , transaction_id_() + , client_id_() + { } + stid(const wsrep::id& server_id, + wsrep::transaction_id transaction_id, + wsrep::client_id client_id) + : server_id_(server_id) + , transaction_id_(transaction_id) + , client_id_(client_id) + { } + const wsrep::id& server_id() const { return server_id_; } + wsrep::transaction_id transaction_id() const + { return transaction_id_; } + wsrep::client_id client_id() const { return client_id_; } + bool operator==(const stid& other) const + { + return ( + server_id_ == other.server_id_ && + transaction_id_ == other.transaction_id_ && + client_id_ == other.client_id_ + ); + } + private: + wsrep::id server_id_; + wsrep::transaction_id transaction_id_; + wsrep::client_id client_id_; + }; + + class ws_handle + { + public: + ws_handle() + : transaction_id_() + , opaque_() + { } + ws_handle(wsrep::transaction_id id) + : transaction_id_(id) + , opaque_() + { } + ws_handle(wsrep::transaction_id id, + void* opaque) + : transaction_id_(id) + , opaque_(opaque) + { } + + wsrep::transaction_id transaction_id() const + { return transaction_id_; } + + void* opaque() const { return opaque_; } + + bool operator==(const ws_handle& other) const + { + return ( + transaction_id_ == other.transaction_id_ && + opaque_ == other.opaque_ + ); + } + private: + wsrep::transaction_id transaction_id_; + void* opaque_; + }; + + class ws_meta + { + public: + ws_meta() + : gtid_() + , stid_() + , depends_on_() + , flags_() + { } + ws_meta(const wsrep::gtid& gtid, + const wsrep::stid& stid, + wsrep::seqno depends_on, + int flags) + : gtid_(gtid) + , stid_(stid) + , depends_on_(depends_on) + , flags_(flags) + { } + ws_meta(const wsrep::stid& stid) + : gtid_() + , stid_(stid) + , depends_on_() + , flags_() + { } + const wsrep::gtid& gtid() const { return gtid_; } + const wsrep::id& group_id() const + { + return gtid_.id(); + } + + wsrep::seqno seqno() const + { + return gtid_.seqno(); + } + + const wsrep::id& server_id() const + { + return stid_.server_id(); + } + + wsrep::client_id client_id() const + { + return stid_.client_id(); + } + + wsrep::transaction_id transaction_id() const + { + return stid_.transaction_id(); + } + + bool ordered() const { return !gtid_.is_undefined(); } + + wsrep::seqno depends_on() const { return depends_on_; } + + int flags() const { return flags_; } + + bool operator==(const ws_meta& other) const + { + return ( + gtid_ == other.gtid_ && + stid_ == other.stid_ && + depends_on_ == other.depends_on_ && + flags_ == other.flags_ + ); + } + private: + wsrep::gtid gtid_; + wsrep::stid stid_; + wsrep::seqno depends_on_; + int flags_; + }; + + std::string flags_to_string(int flags); + + std::ostream& operator<<(std::ostream& os, const wsrep::ws_meta& ws_meta); + + // Abstract interface for provider implementations + class provider + { + public: + class status_variable + { + public: + status_variable(const std::string& name, + const std::string& value) + : name_(name) + , value_(value) + { } + const std::string& name() const { return name_; } + const std::string& value() const { return value_; } + private: + std::string name_; + std::string value_; + }; + + /** + * Return value enumeration + * + * @todo Convert this to struct ec, get rid of prefixes. + */ + enum status + { + /** Success */ + success, + /** Warning*/ + error_warning, + /** Transaction was not found from provider */ + error_transaction_missing, + /** Certification failed */ + error_certification_failed, + /** Transaction was BF aborted */ + error_bf_abort, + /** Transaction size exceeded */ + error_size_exceeded, + /** Connectivity to cluster lost */ + error_connection_failed, + /** Internal provider failure or provider was closed, + provider must be reinitialized */ + error_provider_failed, + /** Fatal error, server must abort */ + error_fatal, + /** Requested functionality is not implemented by the provider */ + error_not_implemented, + /** Operation is not allowed */ + error_not_allowed, + /** Unknown error code from the provider */ + error_unknown + }; + + static std::string to_string(enum status); + + struct flag + { + static const int start_transaction = (1 << 0); + static const int commit = (1 << 1); + static const int rollback = (1 << 2); + static const int isolation = (1 << 3); + static const int pa_unsafe = (1 << 4); + static const int commutative = (1 << 5); + static const int native = (1 << 6); + static const int prepare = (1 << 7); + static const int snapshot = (1 << 8); + static const int implicit_deps = (1 << 9); + }; + + /** + * Provider capabilities. + */ + struct capability + { + static const int multi_master = (1 << 0); + static const int certification = (1 << 1); + static const int parallel_applying = (1 << 2); + static const int transaction_replay = (1 << 3); + static const int isolation = (1 << 4); + static const int pause = (1 << 5); + static const int causal_reads = (1 << 6); + static const int causal_transaction = (1 << 7); + static const int incremental_writeset = (1 << 8); + static const int session_locks = (1 << 9); + static const int distributed_locks = (1 << 10); + static const int consistency_check = (1 << 11); + static const int unordered = (1 << 12); + static const int annotation = (1 << 13); + static const int preordered = (1 << 14); + static const int streaming = (1 << 15); + static const int snapshot = (1 << 16); + static const int nbo = (1 << 17); + + /** decipher capability bitmask */ + static std::string str(int); + }; + + provider(wsrep::server_state& server_state) + : server_state_(server_state) + { } + virtual ~provider() { } + // Provider state management + virtual enum status connect(const std::string& cluster_name, + const std::string& cluster_url, + const std::string& state_donor, + bool bootstrap) = 0; + virtual int disconnect() = 0; + + virtual int capabilities() const = 0; + virtual int desync() = 0; + virtual int resync() = 0; + + virtual wsrep::seqno pause() = 0; + virtual int resume() = 0; + + // Applier interface + virtual enum status run_applier(wsrep::high_priority_service* + applier_ctx) = 0; + // Write set replication + // TODO: Rename to assing_read_view() + virtual int start_transaction(wsrep::ws_handle&) = 0; + virtual enum status assign_read_view( + wsrep::ws_handle&, const wsrep::gtid*) = 0; + virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0; + virtual enum status append_data( + wsrep::ws_handle&, const wsrep::const_buffer&) = 0; + virtual enum status + certify(wsrep::client_id, wsrep::ws_handle&, + int, + wsrep::ws_meta&) = 0; + /** + * BF abort a transaction inside provider. + * + * @param[in] bf_seqno Seqno of the aborter transaction + * @param[in] victim_txt Transaction identifier of the victim + * @param[out] victim_seqno Sequence number of the victim transaction + * or WSREP_SEQNO_UNDEFINED if the victim was not ordered + * + * @return wsrep_status_t + */ + virtual enum status bf_abort(wsrep::seqno bf_seqno, + wsrep::transaction_id victim_trx, + wsrep::seqno& victim_seqno) = 0; + virtual enum status rollback(wsrep::transaction_id) = 0; + virtual enum status commit_order_enter(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; + virtual int commit_order_leave(const wsrep::ws_handle&, + const wsrep::ws_meta&, + const wsrep::mutable_buffer& err) = 0; + virtual int release(wsrep::ws_handle&) = 0; + + /** + * Replay a transaction. + * + * @todo Inspect if the ws_handle could be made const + * + * @return Zero in case of success, non-zero on failure. + */ + virtual enum status replay( + const wsrep::ws_handle& ws_handle, + wsrep::high_priority_service* applier_ctx) = 0; + + /** + * Enter total order isolation critical section + */ + virtual enum status enter_toi(wsrep::client_id, + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + wsrep::ws_meta& ws_meta, + int flags) = 0; + /** + * Leave total order isolation critical section + */ + virtual enum status leave_toi(wsrep::client_id, + const wsrep::mutable_buffer& err) = 0; + + /** + * Perform a causal read on cluster. + * + * @param timeout Timeout in seconds + * + * @return Provider status indicating the result of the call. + */ + virtual std::pair<wsrep::gtid, enum status> + causal_read(int timeout) const = 0; + virtual enum status wait_for_gtid(const wsrep::gtid&, int timeout) const = 0; + /** + * Return last committed GTID. + */ + virtual wsrep::gtid last_committed_gtid() const = 0; + virtual enum status sst_sent(const wsrep::gtid&, int) = 0; + virtual enum status sst_received(const wsrep::gtid&, int) = 0; + virtual enum status enc_set_key(const wsrep::const_buffer& key) = 0; + virtual std::vector<status_variable> status() const = 0; + virtual void reset_status() = 0; + + virtual std::string options() const = 0; + virtual enum status options(const std::string&) = 0; + /** + * Get provider name. + * + * @return Provider name string. + */ + virtual std::string name() const = 0; + + /** + * Get provider version. + * + * @return Provider version string. + */ + virtual std::string version() const = 0; + + /** + * Get provider vendor. + * + * @return Provider vendor string. + */ + virtual std::string vendor() const = 0; + + /** + * Return pointer to native provider handle. + */ + virtual void* native() const = 0; + + /** + * Services argument passed to make_provider. This struct contains + * optional services which are passed to the provider. + */ + struct services + { + wsrep::thread_service* thread_service; + wsrep::tls_service* tls_service; + wsrep::allowlist_service* allowlist_service; + wsrep::event_service* event_service; + + // some GCC and clang versions don't support C++11 default + // initializers fully, so we need to use explicit constructors + // instead: + // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=88165 + // https://bugs.llvm.org/show_bug.cgi?id=36684 + services() + : thread_service() + , tls_service() + , allowlist_service() + , event_service() + { + } + + services(wsrep::thread_service* thr, + wsrep::tls_service* tls, + wsrep::allowlist_service* all, + wsrep::event_service* event) + : thread_service(thr) + , tls_service(tls) + , allowlist_service(all) + , event_service(event) + { + } + }; + /** + * Create a new provider. + * + * @param provider_spec Provider specification + * @param provider_options Initial options to provider + * @param thread_service Optional thread service implementation. + */ + static provider* make_provider(wsrep::server_state&, + const std::string& provider_spec, + const std::string& provider_options, + const wsrep::provider::services& services + = wsrep::provider::services()); + + protected: + wsrep::server_state& server_state_; + }; + + static inline bool starts_transaction(int flags) + { + return (flags & wsrep::provider::flag::start_transaction); + } + + static inline bool commits_transaction(int flags) + { + return (flags & wsrep::provider::flag::commit); + } + + static inline bool rolls_back_transaction(int flags) + { + return (flags & wsrep::provider::flag::rollback); + } + + static inline bool prepares_transaction(int flags) + { + return (flags & wsrep::provider::flag::prepare); + } + + static inline bool is_toi(int flags) + { + return (flags & wsrep::provider::flag::isolation); + } + + static inline bool is_commutative(int flags) + { + return (flags & wsrep::provider::flag::commutative); + } + + static inline bool is_native(int flags) + { + return (flags & wsrep::provider::flag::native); + } +} + +#endif // WSREP_PROVIDER_HPP diff --git a/wsrep-lib/include/wsrep/reporter.hpp b/wsrep-lib/include/wsrep/reporter.hpp new file mode 100644 index 00000000..75ed0389 --- /dev/null +++ b/wsrep-lib/include/wsrep/reporter.hpp @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2021 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file info.hpp + * + * Interface to report application status to external programs + * via JSON file. + */ + +#ifndef WSREP_REPORTER_HPP +#define WSREP_REPORTER_HPP + +#include "mutex.hpp" +#include "server_state.hpp" + +#include <string> +#include <deque> + +namespace wsrep +{ + class reporter + { + public: + reporter(mutex& mutex, + const std::string& file_name, + size_t max_msg); + + virtual ~reporter(); + + void report_state(enum server_state::state state); + + /** + * Report progres in the form of a JSON string (all values integers): + * { + * "from": FROM, // from wsrep API state number + * "to": TO, // to wsrep API state number + * "total": TOTAL, // total work to do + * "done": DONE, // work already done + * "indefinite": INDEFINITE // indefinite value of work constant + * } + */ + void report_progress(const std::string& json); + + enum log_level + { + error, + warning + }; + + // undefined timestamp value + static double constexpr undefined = 0.0; + + void report_log_msg(log_level, const std::string& msg, + double timestamp = undefined); + + private: + enum substates { + s_disconnected_disconnected, + s_disconnected_initializing, + s_disconnected_initialized, + s_connected_waiting, // to become joiner + s_joining_initialized, + s_joining_sst, + s_joining_initializing, + s_joining_ist, + s_joined_syncing, + s_synced_running, + s_donor_sending, + s_disconnecting_disconnecting, + substates_max + }; + + wsrep::mutex& mutex_; + std::string const file_name_; + std::string progress_; + char* template_; + substates state_; + bool initialized_; + + typedef struct { + double tstamp; + std::string msg; + } log_msg; + + std::deque<log_msg> err_msg_; + std::deque<log_msg> warn_msg_; + size_t const max_msg_; + + static void write_log_msgs(std::ostream& os, + const std::string& label, + const std::deque<log_msg>& msgs); + static void write_log_msg(std::ostream& os, + const log_msg& msg); + + substates substate_map(enum server_state::state state); + float progress_map(float progress) const; + void write_file(double timestamp); + + // make uncopyable + reporter(const wsrep::reporter&); + void operator=(const wsrep::reporter&); + }; /* reporter */ + +} /* wsrep */ + +#endif /* WSREP_REPORTER_HPP */ diff --git a/wsrep-lib/include/wsrep/seqno.hpp b/wsrep-lib/include/wsrep/seqno.hpp new file mode 100644 index 00000000..9d8cedbc --- /dev/null +++ b/wsrep-lib/include/wsrep/seqno.hpp @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_SEQNO_HPP +#define WSREP_SEQNO_HPP + +#include <iosfwd> + +namespace wsrep +{ + /** @class seqno + * + * Sequence number type. + */ + class seqno + { + public: + typedef long long native_type; + + seqno() + : seqno_(-1) + { } + + explicit seqno(long long seqno) + : seqno_(seqno) + { } + + long long get() const + { + return seqno_; + } + + bool is_undefined() const + { + return (seqno_ == -1); + } + + bool operator<(seqno other) const + { + return (seqno_ < other.seqno_); + } + + bool operator>(seqno other) const + { + return (seqno_ > other.seqno_); + } + + bool operator<=(seqno other) const + { + return !(seqno_ > other.seqno_); + } + + bool operator>=(seqno other) const + { + return !(seqno_ < other.seqno_); + } + + bool operator==(seqno other) const + { + return (seqno_ == other.seqno_); + } + bool operator!=(seqno other) const + { + return !(seqno_ == other.seqno_); + } + seqno operator+(seqno other) const + { + return (seqno(seqno_ + other.seqno_)); + } + seqno operator+(long long other) const + { + return (*this + seqno(other)); + } + static seqno undefined() { return seqno(-1); } + + private: + native_type seqno_; + }; + + std::ostream& operator<<(std::ostream& os, wsrep::seqno seqno); +} + +#endif // WSREP_SEQNO_HPP diff --git a/wsrep-lib/include/wsrep/server_service.hpp b/wsrep-lib/include/wsrep/server_service.hpp new file mode 100644 index 00000000..2021e66d --- /dev/null +++ b/wsrep-lib/include/wsrep/server_service.hpp @@ -0,0 +1,258 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + + +/** @file server_service.hpp + * + * An abstract interface for a DBMS server services. + * The interface will define methods which will be called from + * the wsrep-lib. + */ + +#ifndef WSREP_SERVER_SERVICE_HPP +#define WSREP_SERVER_SERVICE_HPP + +#include "logger.hpp" +#include "server_state.hpp" + +#include <string> + +namespace wsrep +{ + class client_service; + class storage_service; + class high_priority_service; + class ws_meta; + class gtid; + class view; + class server_service + { + public: + + virtual ~server_service() { } + virtual wsrep::storage_service* storage_service( + wsrep::client_service&) = 0; + virtual wsrep::storage_service* storage_service( + wsrep::high_priority_service&) = 0; + virtual void release_storage_service(wsrep::storage_service*) = 0; + + /** + * Create an applier state for streaming transaction applying. + * + * @param orig_cs Reference to client service which is + * requesting a new streaming applier service + * instance. + * + * @return Pointer to streaming applier client state. + */ + virtual wsrep::high_priority_service* + streaming_applier_service(wsrep::client_service& orig_cs) = 0; + + /** + * Create an applier state for streaming transaction applying. + * + * @param orig_hps Reference to high priority service which is + * requesting a new streaming applier service + * instance. + * + * @return Pointer to streaming applier client state. + */ + virtual wsrep::high_priority_service* + streaming_applier_service(wsrep::high_priority_service& orig_hps) = 0; + + /** + * Release a client state allocated by either local_client_state() + * or streaming_applier_client_state(). + */ + virtual void release_high_priority_service( + wsrep::high_priority_service*) = 0; + + /** + * Perform a background rollback for a transaction. + */ + virtual void background_rollback(wsrep::client_state&) = 0; + + /** + * Bootstrap a DBMS state for a new cluster. + * + * This method is called by the wsrep lib after the + * new cluster is bootstrapped and the server has reached + * initialized state. From this call the DBMS should initialize + * environment for the new cluster. + */ + virtual void bootstrap() = 0; + + /** + * Log message + * + * @param level Requested logging level + * @param message Message + */ + virtual void log_message(enum wsrep::log::level level, + const char* message) = 0; + /** + * Log a dummy write set. A dummy write set is usually either + * a remotely generated write set which failed certification in + * provider and had a GTID assigned or a streaming replication + * rollback write set. If the DBMS implements logging for + * applied transactions, logging dummy write sets which do not + * commit any transaction is neeeded to keep the GTID sequence + * continuous in the server. + */ + virtual void log_dummy_write_set(wsrep::client_state& client_state, + const wsrep::ws_meta& ws_meta) = 0; + + /** + * Log a cluster view change event. The method takes + * as an argument a pointer to high priority service associated + * to an applier thread if one is available. + * + * @param high_priority_service Pointer to high priority service + * @param view Reference to view object + */ + virtual void log_view( + wsrep::high_priority_service* high_priority_service, + const wsrep::view& view) = 0; + + /** + * Recover streaming appliers from the streaming log. + * The implementation must scan through log of stored streaming + * fragments and reconstruct the streaming applier service + * objects. + * + * This is overload for calls which are done from client context, + * e.g. after SST has been received. + * + * @param client_service Reference to client service object + */ + virtual void recover_streaming_appliers( + wsrep::client_service& client_service) = 0; + + /** + * Recover streaming appliers from the streaming log. + * The implementation must scan through log of stored streaming + * fragments and reconstruct the streaming applier service + * objects. + * + * This is overload for calls which are done from high priority + * context, e.g. when handling cluster view change events. + * + * @param high_priority_service Reference to high priority service + * object. + */ + virtual void recover_streaming_appliers( + wsrep::high_priority_service& high_priority_service) = 0; + + /** + * Recover a cluster view change event. + * The method takes own node ID. + * + * @param client_service Reference to client_service + * @param own_id this node ID obtained on connection to cluster + */ + virtual wsrep::view get_view( + wsrep::client_service& client_service, + const wsrep::id& own_id) = 0; + + /** + * Get the current replication position from the server + * storage. + * + * @param client_service Reference to client_service + * + * @return Current position GTID. + */ + virtual wsrep::gtid get_position( + wsrep::client_service& client_service) = 0; + + /** + * Set the current replication position of the server + * storage. + * + * @param client_service Reference to client_service + * @param gtid Reference to position to be set + */ + virtual void set_position( + wsrep::client_service& client_service, + const wsrep::gtid& gtid) = 0; + + /** + * Log a state change event. + * + * Note that this method may be called with server_state + * mutex locked, so calling server_state public methods + * should be avoided from within this call. + * + * @param prev_state Previous state server was in + * @param current_state Current state + */ + virtual void log_state_change( + enum wsrep::server_state::state prev_state, + enum wsrep::server_state::state current_state) = 0; + + /** + * Determine if the configured SST method requires SST to be + * performed before DBMS storage engine initialization. + * + * @return True if the SST must happen before storage engine init, + * otherwise false. + */ + virtual bool sst_before_init() const = 0; + + /** + * Return SST request which provides the donor server enough + * information how to donate the snapshot. + * + * @return A string containing a SST request. + */ + virtual std::string sst_request() = 0; + + /** + * Start a SST process. + * + * @param sst_request A string containing the SST request from + * the joiner + * @param gtid A GTID denoting the current replication position + * @param bypass Boolean bypass flag. + * + * @return Zero if the SST transfer was successfully started, + * non-zero otherwise. + */ + virtual int start_sst(const std::string& sst_request, + const wsrep::gtid& gtid, + bool bypass) = 0; + + + /** + * Wait until committing transactions have completed. + * Prior calling this method the server should have been + * desynced from the group to disallow further transactions + * to start committing. + */ + virtual int wait_committing_transactions(int timeout) = 0; + + /** + * Provide a server level debug sync point for a caller. + */ + virtual void debug_sync(const char* sync_point) = 0; + + }; +} + +#endif // WSREP_SERVER_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/server_state.hpp b/wsrep-lib/include/wsrep/server_state.hpp new file mode 100644 index 00000000..0fa6420b --- /dev/null +++ b/wsrep-lib/include/wsrep/server_state.hpp @@ -0,0 +1,745 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file server_state.hpp + * + * Server State Abstraction + * ========================== + * + * This file defines an interface for WSREP Server State. + * The Server State will encapsulate server identification, + * server state and server capabilities. The class also + * defines an interface for manipulating server state, applying + * of remote transaction write sets, processing SST requests, + * creating local client connections for local storage access + * operations. + * + * Concepts + * ======== + * + * State Snapshot Transfer + * ----------------------- + * + * Depending on SST type (physical or logical), the server storage + * engine initialization must be done before or after SST happens. + * In case of physical SST method (typically rsync, filesystem snapshot) + * the SST happens before the storage engine is initialized, in case + * of logical backup typically after the storage engine initialization. + * + * Rollback Mode + * ------------- + * + * When High Priority Transaction (HPT) write set is applied, it + * may be required that the HPT Brute Force Aborts (BFA) locally + * executing transaction. As HPT must be able to apply all its + * write sets without interruption, the locally executing transaction + * must yield immediately, otherwise a transaction processing + * may stop or even deadlock. Depending on DBMS implementation, + * the local transaction may need to be rolled back immediately + * (synchronous mode) or the rollback may happen later on + * (asynchronous mode). The Server Context implementation + * which derives from Server Context base class must provide + * the base class for the rollback mode which server operates on. + * + * ### Synchronous + * + * If the DBMS server implementation does not allow asynchronous rollback, + * the victim transaction must be rolled back immediately in order to + * allow transaction processing to proceed. Depending on DBMS process model, + * there may be either background thread which processes the rollback + * or the rollback can be done by the HTP applier. + * + * ### Asynchronous + * + * In asynchronous mode the BFA victim transaction is just marked + * to be aborted or in case of fully optimistic concurrency control, + * the conflict is detected at commit. + * + * + * # Return value conventions + * + * The calls which are proxies to corresponding provider functionality + * will return wsrep::provider::status enum as a result. Otherwise + * the return value is generally zero on success, non zero on failure. + */ + +#ifndef WSREP_SERVER_STATE_HPP +#define WSREP_SERVER_STATE_HPP + +#include "mutex.hpp" +#include "condition_variable.hpp" +#include "id.hpp" +#include "view.hpp" +#include "transaction_id.hpp" +#include "logger.hpp" +#include "provider.hpp" +#include "compiler.hpp" +#include "xid.hpp" + +#include <deque> +#include <vector> +#include <string> +#include <map> + +/** + * Magic string to tell provider to engage into trivial (empty) + * state transfer. No data will be passed, but the node shall be + * considered joined. + */ +#define WSREP_LIB_SST_TRIVIAL "trivial" + +namespace wsrep +{ + // Forward declarations + class ws_handle; + class ws_meta; + class client_state; + class transaction; + class const_buffer; + class server_service; + class client_service; + class encryption_service; + + /** @class Server Context + * + * + */ + class server_state + { + public: + /** + * Server state enumeration. + * + * @todo Fix UML generation + * + * Server state diagram if initialization happens before SST. + * + * [*] --> disconnected + * disconnected --> initializing + * initializing --> initialized + * initialized --> connected + * connected --> joiner + * joiner --> joined + * joined --> synced + * synced --> donor + * donor --> joined + * + * Server state diagram if SST happens before initialization. + * + * [*] --> disconnected + * disconnected --> connected + * connected --> joiner + * joiner --> initializing + * initializing --> initialized + * initialized --> joined + * joined --> synced + * synced --> donor + * donor --> joined + */ + enum state + { + /** Server is in disconnected state. */ + s_disconnected, + /** Server is initializing */ + s_initializing, + /** Server has been initialized */ + s_initialized, + /** Server is connected to the cluster */ + s_connected, + /** Server is receiving SST */ + s_joiner, + /** Server has received SST successfully but has not synced + with rest of the cluster yet. */ + s_joined, + /** Server is donating state snapshot transfer */ + s_donor, + /** Server has synced with the cluster */ + s_synced, + /** Server is disconnecting from group */ + s_disconnecting + }; + + static const int n_states_ = s_disconnecting + 1; + + /** + * Rollback Mode enumeration + */ + enum rollback_mode + { + /** Asynchronous rollback mode */ + rm_async, + /** Synchronous rollback mode */ + rm_sync + }; + + virtual ~server_state(); + + wsrep::encryption_service* encryption_service() + { return encryption_service_; } + + wsrep::server_service& server_service() { return server_service_; } + + /** + * Return human readable server name. + * + * @return Human readable server name string. + */ + const std::string& name() const { return name_; } + + /** + * Return Server identifier. + * + * @return Server identifier. + */ + const wsrep::id& id() const { return id_; } + + const std::string& incoming_address() const + { return incoming_address_; } + /** + * Return server group communication address. + * + * @return Return server group communication address. + */ + const std::string& address() const { return address_; } + + /** + * Return working directory + * + * @return String containing path to working directory. + */ + const std::string& working_dir() const { return working_dir_; } + + /** + * Return initial position for server. + */ + const wsrep::gtid& initial_position() const + { return initial_position_; } + /** + * Return maximum protocol version. + */ + int max_protocol_version() const { return max_protocol_version_;} + + /** + * Get the rollback mode which server is operating in. + * + * @return Rollback mode. + */ + enum rollback_mode rollback_mode() const { return rollback_mode_; } + + /** + * Registers a streaming client. + */ + void start_streaming_client(wsrep::client_state* client_state); + + void convert_streaming_client_to_applier( + wsrep::client_state* client_state); + void stop_streaming_client(wsrep::client_state* client_state); + + void start_streaming_applier( + const wsrep::id&, + const wsrep::transaction_id&, + wsrep::high_priority_service*); + + void stop_streaming_applier( + const wsrep::id&, const wsrep::transaction_id&); + + /** + * Find a streaming applier matching server and transaction ids + */ + wsrep::high_priority_service* find_streaming_applier( + const wsrep::id&, + const wsrep::transaction_id&) const; + + /** + * Find a streaming applier matching xid + */ + wsrep::high_priority_service* find_streaming_applier( + const wsrep::xid& xid) const; + + /** + * Queue a rollback fragment the transaction with given id + */ + void queue_rollback_event(const wsrep::transaction_id& id); + + /** + * Send rollback fragments for previously queued events via + * queue_rollback_event() + */ + enum wsrep::provider::status send_pending_rollback_events(); + + /** + * Load WSRep provider. + * + * @param provider WSRep provider library to be loaded. + * @param provider_options Provider specific options string + * to be passed for provider during initialization. + * @param services Application defined services passed to + * the provider. + * + * @return Zero on success, non-zero on error. + */ + int load_provider(const std::string& provider, + const std::string& provider_options, + const wsrep::provider::services& services + = wsrep::provider::services()); + + void unload_provider(); + + bool is_provider_loaded() const { return provider_ != 0; } + + /** + * Return reference to provider. + * + * @return Reference to provider + * + * @throw wsrep::runtime_error if provider has not been loaded + * + * @todo This should not be virtual. However, currently there + * is no mechanism for tests and integrations to provide + * their own provider implementations, so this is kept virtual + * for time being. + */ + virtual wsrep::provider& provider() const + { + if (provider_ == 0) + { + throw wsrep::runtime_error("provider not loaded"); + } + return *provider_; + } + + /** + * Initialize connection to cluster. + * + * @param cluster_name A string containing the name of the cluster + * @param cluster_address Cluster address string + * @param state_donor String containing a list of desired donors + * @param bootstrap Bootstrap option + * + * @return Zero in case of success, non-zero on error. + */ + int connect(const std::string& cluster_name, + const std::string& cluster_address, + const std::string& state_donor, + bool bootstrap); + + int disconnect(); + + /** + * A method which will be called when the server + * has been joined to the cluster + */ + void on_connect(const wsrep::view& view); + + /** + * A method which will be called when a view + * notification event has been delivered by the + * provider. + * + * @params view wsrep::view object which holds the new view + * information. + */ + void on_view(const wsrep::view& view, + wsrep::high_priority_service*); + + /** + * A method which will be called when the server + * has been synchronized with the cluster. + * + * This will have a side effect of changing the Server Context + * state to s_synced. + */ + void on_sync(); + + /** + * Wait until server reaches given state. + */ + void wait_until_state(enum state state) const + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + wait_until_state(lock, state); + } + + /** + * Return GTID at the position when server connected to + * the cluster. + */ + wsrep::gtid connected_gtid() const { return connected_gtid_; } + + /** + * Return current view + */ + const wsrep::view& current_view() const { return current_view_; } + + /** + * Wait until all the write sets up to given GTID have been + * committed. + * + * @return Zero on success, non-zero on failure. + */ + enum wsrep::provider::status + wait_for_gtid(const wsrep::gtid&, int timeout) const; + + /** + * Set encryption key + * + * @param key Encryption key + * + * @return Zero on success, non-zero on failure. + */ + int set_encryption_key(std::vector<unsigned char>& key); + + /** + * Return encryption key. + */ + const std::vector<unsigned char>& get_encryption_key() const + { return encryption_key_; } + + /** + * Perform a causal read in the cluster. After the call returns, + * all the causally preceding write sets have been committed + * or the error is returned. + * + * This operation may require communication with other processes + * in the DBMS cluster, so it may be relatively heavy operation. + * Method wait_for_gtid() should be used whenever possible. + * + * @param timeout Timeout in seconds + * + * @return Pair of GTID and result status from provider. + */ + + std::pair<wsrep::gtid, enum wsrep::provider::status> + causal_read(int timeout) const; + + /** + * Desynchronize the server. + * + * If the server state is synced, this call will desynchronize + * the server from the cluster. + */ + int desync() + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + return desync(lock); + } + + /** + * Resynchronize the server. + */ + void resync() + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + resync(lock); + } + + wsrep::seqno pause(); + + wsrep::seqno pause_seqno() const { return pause_seqno_; } + + void resume(); + + /** + * Desync and pause the provider on one go. Will return + * pause seqno if successful. In case of failure, + * undefined seqno will be returned. + */ + wsrep::seqno desync_and_pause(); + + /** + * Resume and resync the provider on one go. Prior this + * call the provider must have been both desynced and paused, + * by either desync_and_pause() or separate calls to desync() + * and pause(). + */ + void resume_and_resync(); + + /** + * True if server has issued and active desync and pause in one go, + * false otherwise. + */ + bool desynced_on_pause() const { return desynced_on_pause_; } + + /** + * Prepares server state for SST. + * + * @return String containing a SST request + */ + std::string prepare_for_sst(); + + /** + * Start a state snapshot transfer. + * + * @param sst_requets SST request string + * @param gtid Current GTID + * @param bypass Bypass flag + * + * @return Zero in case of success, non-zero otherwise + */ + int start_sst(const std::string& sst_request, + const wsrep::gtid& gtid, + bool bypass); + + /** + * + */ + void sst_sent(const wsrep::gtid& gtid, int error); + + /** + * This method must be called by the joiner after the SST + * transfer has been received. If the DBMS state has not been + * initialized, the call will shift the state to initializing + * and will wait until the initialization is complete. + * + * @param client_service + * @param error code of the SST operation + */ + void sst_received(wsrep::client_service& cs, int error); + + /** + * This method must be called after the server initialization + * has been completed. The call has a side effect of changing + * the Server Context state to s_initialized. + */ + void initialized(); + + /** + * Return true if the server has been initialized. + */ + bool is_initialized() const + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + return init_initialized_; + } + + /** + * This method will be called by the provider when + * a remote write set is being applied. It is the responsibility + * of the caller to set up transaction context and data properly. + * + * @todo Make this private, allow calls for provider implementations + * only. + * @param high_priority_service High priority applier service. + * @param transaction Transaction context. + * @param data Write set data + * + * @return Zero on success, non-zero on failure. + */ + int on_apply(wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data); + + enum state state() const + { + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + return state(lock); + } + + enum state state(wsrep::unique_lock<wsrep::mutex>& + lock WSREP_UNUSED) const + { + assert(lock.owns_lock()); + return state_; + } + + /** + * Get provider status variables. + */ + std::vector<wsrep::provider::status_variable> status() const; + + /** + * Set server wide wsrep debug logging level. + * + * Log levels are + * - 0 - No debug logging. + * - 1..n - Debug logging with increasing verbosity. + */ + void debug_log_level(int level) + { + wsrep::log::debug_log_level(level); + } + + wsrep::mutex& mutex() { return mutex_; } + + protected: + /** Server state constructor + * + * @param mutex Mutex provided by the DBMS implementation. + * @param name Human Readable Server Name. + * @param id Server Identifier String, UUID or some unique + * identifier. + * @param address Server address in form of IPv4 address, IPv6 address + * or hostname. + * @param working_dir Working directory for replication specific + * data files. + * @param rollback_mode Rollback mode which server operates on. + */ + server_state(wsrep::mutex& mutex, + wsrep::condition_variable& cond, + wsrep::server_service& server_service, + wsrep::encryption_service* encryption_service, + const std::string& name, + const std::string& incoming_address, + const std::string& address, + const std::string& working_dir, + const wsrep::gtid& initial_position, + int max_protocol_version, + enum rollback_mode rollback_mode) + : mutex_(mutex) + , cond_(cond) + , server_service_(server_service) + , encryption_service_(encryption_service) + , state_(s_disconnected) + , state_hist_() + , state_waiters_(n_states_) + , bootstrap_() + , initial_position_(initial_position) + , init_initialized_() + , init_synced_() + , sst_gtid_() + , desync_count_() + , desynced_on_pause_() + , pause_count_() + , pause_seqno_() + , streaming_clients_() + , streaming_appliers_() + , streaming_appliers_recovered_() + , provider_() + , name_(name) + , id_(wsrep::id::undefined()) + , incoming_address_(incoming_address) + , address_(address) + , working_dir_(working_dir) + , encryption_key_() + , max_protocol_version_(max_protocol_version) + , rollback_mode_(rollback_mode) + , connected_gtid_() + , previous_primary_view_() + , current_view_() + , rollback_event_queue_() + { } + + private: + + server_state(const server_state&); + server_state& operator=(const server_state&); + + int desync(wsrep::unique_lock<wsrep::mutex>&); + void resync(wsrep::unique_lock<wsrep::mutex>&); + void state(wsrep::unique_lock<wsrep::mutex>&, enum state); + void wait_until_state(wsrep::unique_lock<wsrep::mutex>&, enum state) const; + // Interrupt all threads which are waiting for state + void interrupt_state_waiters(wsrep::unique_lock<wsrep::mutex>&); + + // Recover streaming appliers if not already recoverd + template <class C> + void recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock<wsrep::mutex>&, C&); + + // Close SR transcations whose origin is outside of current + // cluster view. + void close_orphaned_sr_transactions( + wsrep::unique_lock<wsrep::mutex>&, + wsrep::high_priority_service&); + + // Close transactions when handling disconnect from the group. + void close_transactions_at_disconnect(wsrep::high_priority_service&); + + // Handle primary view + void on_primary_view(const wsrep::view&, + wsrep::high_priority_service*); + // Handle non-primary view + void on_non_primary_view(const wsrep::view&, + wsrep::high_priority_service*); + // Common actions on final view + void go_final(wsrep::unique_lock<wsrep::mutex>&, + const wsrep::view&, wsrep::high_priority_service*); + + // Send rollback fragments for all transactions in + // rollback_event_queue_ + enum wsrep::provider::status send_pending_rollback_events( + wsrep::unique_lock<wsrep::mutex>& lock); + + wsrep::mutex& mutex_; + wsrep::condition_variable& cond_; + wsrep::server_service& server_service_; + wsrep::encryption_service* encryption_service_; + enum state state_; + std::vector<enum state> state_hist_; + mutable std::vector<int> state_waiters_; + bool bootstrap_; + const wsrep::gtid initial_position_; + bool init_initialized_; + bool init_synced_; + wsrep::gtid sst_gtid_; + size_t desync_count_; + // Boolean to denote if desync was succesfull when desyncing + // and pausing the provider on one go. + bool desynced_on_pause_; + size_t pause_count_; + wsrep::seqno pause_seqno_; + typedef std::map<wsrep::client_id, wsrep::client_state*> + streaming_clients_map; + streaming_clients_map streaming_clients_; + typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>, + wsrep::high_priority_service*> streaming_appliers_map; + streaming_appliers_map streaming_appliers_; + bool streaming_appliers_recovered_; + wsrep::provider* provider_; + std::string name_; + wsrep::id id_; + std::string incoming_address_; + std::string address_; + std::string working_dir_; + std::vector<unsigned char> encryption_key_; + int max_protocol_version_; + enum rollback_mode rollback_mode_; + wsrep::gtid connected_gtid_; + wsrep::view previous_primary_view_; + wsrep::view current_view_; + std::deque<wsrep::transaction_id> rollback_event_queue_; + }; + + static inline const char* to_c_string( + enum wsrep::server_state::state state) + { + switch (state) + { + case wsrep::server_state::s_disconnected: return "disconnected"; + case wsrep::server_state::s_initializing: return "initializing"; + case wsrep::server_state::s_initialized: return "initialized"; + case wsrep::server_state::s_connected: return "connected"; + case wsrep::server_state::s_joiner: return "joiner"; + case wsrep::server_state::s_joined: return "joined"; + case wsrep::server_state::s_donor: return "donor"; + case wsrep::server_state::s_synced: return "synced"; + case wsrep::server_state::s_disconnecting: return "disconnecting"; + } + return "unknown"; + } + + static inline std::string to_string(enum wsrep::server_state::state state) + { + return (to_c_string(state)); + } + +} + +#endif // WSREP_SERVER_STATE_HPP diff --git a/wsrep-lib/include/wsrep/sr_key_set.hpp b/wsrep-lib/include/wsrep/sr_key_set.hpp new file mode 100644 index 00000000..abdb1e04 --- /dev/null +++ b/wsrep-lib/include/wsrep/sr_key_set.hpp @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_SR_KEY_SET_HPP +#define WSREP_SR_KEY_SET_HPP + +#include <set> +#include <map> + +namespace wsrep +{ + class sr_key_set + { + public: + typedef std::set<std::string> leaf_type; + typedef std::map<std::string, leaf_type > branch_type; + sr_key_set() + : root_() + { } + + void insert(const wsrep::key& key) + { + assert(key.size() >= 2); + if (key.size() < 2) + { + throw wsrep::runtime_error("Invalid key size"); + } + + root_[std::string( + static_cast<const char*>(key.key_parts()[0].data()), + key.key_parts()[0].size())].insert( + std::string(static_cast<const char*>(key.key_parts()[1].data()), + key.key_parts()[1].size())); + } + + const branch_type& root() const { return root_; } + void clear() { root_.clear(); } + bool empty() const { return root_.empty(); } + private: + branch_type root_; + }; +} + +#endif // WSREP_KEY_SET_HPP diff --git a/wsrep-lib/include/wsrep/storage_service.hpp b/wsrep-lib/include/wsrep/storage_service.hpp new file mode 100644 index 00000000..e68548b7 --- /dev/null +++ b/wsrep-lib/include/wsrep/storage_service.hpp @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file storage_service.hpp + * + * Abstract interface which defines required access to DBMS storage + * service. The service is used for storing streaming replication + * write set fragments into stable storage. The interface is used + * from locally processing transaction context only. Corresponding + * operations for high priority processing can be found from + * wsrep::high_priority_service interface. + */ +#ifndef WSREP_STORAGE_SERVICE_HPP +#define WSREP_STORAGE_SERVICE_HPP + +#include "transaction_id.hpp" +#include "id.hpp" +#include "buffer.hpp" +#include "xid.hpp" + +namespace wsrep +{ + // Forward declarations + class ws_handle; + class ws_meta; + class transaction; + + /** + * Storage service abstract interface. + */ + class storage_service + { + public: + virtual ~storage_service() { } + /** + * Start a new transaction for storage access. + * + * @param[out] ws_handle Write set handle for a new transaction + * + * @return Zero in case of success, non-zero on error. + */ + virtual int start_transaction(const wsrep::ws_handle&) = 0; + + virtual void adopt_transaction(const wsrep::transaction&) = 0; + /** + * Append fragment into stable storage. + */ + virtual int append_fragment(const wsrep::id& server_id, + wsrep::transaction_id client_id, + int flags, + const wsrep::const_buffer& data, + const wsrep::xid& xid) = 0; + + /** + * Update fragment meta data after certification process. + */ + virtual int update_fragment_meta(const wsrep::ws_meta&) = 0; + + /** + * Remove fragments from storage. The storage service must have + * adopted a transaction prior this call. + */ + virtual int remove_fragments() = 0; + + /** + * Commit the transaction. + */ + virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; + + /** + * Roll back the transaction. + */ + virtual int rollback(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; + + + virtual void store_globals() = 0; + virtual void reset_globals() = 0; + }; +} + +#endif // WSREP_STORAGE_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/streaming_context.hpp b/wsrep-lib/include/wsrep/streaming_context.hpp new file mode 100644 index 00000000..9b205c5b --- /dev/null +++ b/wsrep-lib/include/wsrep/streaming_context.hpp @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_STREAMING_CONTEXT_HPP +#define WSREP_STREAMING_CONTEXT_HPP + +#include "compiler.hpp" +#include "logger.hpp" +#include "seqno.hpp" +#include "transaction_id.hpp" + +#include <vector> + +namespace wsrep +{ + class streaming_context + { + public: + enum fragment_unit + { + bytes, + row, + statement + }; + + streaming_context() + : fragments_certified_() + , fragments_() + , rollback_replicated_for_() + , fragment_unit_() + , fragment_size_() + , unit_counter_() + , log_position_() + { } + + /** + * Set streaming parameters. + * + * Calling this method has a side effect of resetting unit + * counter. + * + * @param fragment_unit Desired fragment unit. + * @param fragment_size Desired fragment size. + */ + void params(enum fragment_unit fragment_unit, size_t fragment_size) + { + if (fragment_size) + { + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_streaming, + "Enabling streaming: " + << fragment_unit << " " << fragment_size); + } + else + { + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_streaming, + "Disabling streaming"); + } + fragment_unit_ = fragment_unit; + fragment_size_ = fragment_size; + reset_unit_counter(); + } + + void enable(enum fragment_unit fragment_unit, size_t fragment_size) + { + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_streaming, + "Enabling streaming: " + << fragment_unit << " " << fragment_size); + assert(fragment_size > 0); + fragment_unit_ = fragment_unit; + fragment_size_ = fragment_size; + } + + enum fragment_unit fragment_unit() const { return fragment_unit_; } + + size_t fragment_size() const { return fragment_size_; } + + void disable() + { + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_streaming, + "Disabling streaming"); + fragment_size_ = 0; + } + + void certified() + { + ++fragments_certified_; + } + + size_t fragments_certified() const + { + return fragments_certified_; + } + + void stored(wsrep::seqno seqno) + { + check_fragment_seqno(seqno); + fragments_.push_back(seqno); + } + + size_t fragments_stored() const + { + return fragments_.size(); + } + + void applied(wsrep::seqno seqno) + { + check_fragment_seqno(seqno); + ++fragments_certified_; + fragments_.push_back(seqno); + } + + void rolled_back(wsrep::transaction_id id) + { + assert(rollback_replicated_for_ == wsrep::transaction_id::undefined()); + rollback_replicated_for_ = id; + } + + bool rolled_back() const + { + return (rollback_replicated_for_ != + wsrep::transaction_id::undefined()); + } + + size_t unit_counter() const + { + return unit_counter_; + } + + void set_unit_counter(size_t count) + { + unit_counter_ = count; + } + + void increment_unit_counter(size_t inc) + { + unit_counter_ += inc; + } + + void reset_unit_counter() + { + unit_counter_ = 0; + } + + size_t log_position() const + { + return log_position_; + } + + void set_log_position(size_t position) + { + log_position_ = position; + } + + const std::vector<wsrep::seqno>& fragments() const + { + return fragments_; + } + + bool fragment_size_exceeded() const + { + return unit_counter_ >= fragment_size_; + } + + void cleanup() + { + fragments_certified_ = 0; + fragments_.clear(); + rollback_replicated_for_ = wsrep::transaction_id::undefined(); + unit_counter_ = 0; + log_position_ = 0; + } + private: + + void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) + { + assert(seqno.is_undefined() == false); + assert(fragments_.empty() || fragments_.back() < seqno); + } + + size_t fragments_certified_; + std::vector<wsrep::seqno> fragments_; + wsrep::transaction_id rollback_replicated_for_; + enum fragment_unit fragment_unit_; + size_t fragment_size_; + size_t unit_counter_; + size_t log_position_; + }; +} + +#endif // WSREP_STREAMING_CONTEXT_HPP diff --git a/wsrep-lib/include/wsrep/thread.hpp b/wsrep-lib/include/wsrep/thread.hpp new file mode 100644 index 00000000..99b8cff5 --- /dev/null +++ b/wsrep-lib/include/wsrep/thread.hpp @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2018-2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <pthread.h> +#include <iosfwd> + +namespace wsrep +{ + class thread + { + public: + class id + { + public: + id() : thread_() { } + explicit id(pthread_t thread) : thread_(thread) { } + private: + friend bool operator==(thread::id left, thread::id right) + { + return (pthread_equal(left.thread_, right.thread_)); + } + friend std::ostream& operator<<(std::ostream&, const id&); + pthread_t thread_; + }; + + thread() + : id_(pthread_self()) + { } + private: + id id_; + }; + + namespace this_thread + { + static inline thread::id get_id() { return thread::id(pthread_self()); } + } + + std::ostream& operator<<(std::ostream&, const thread::id&); +}; diff --git a/wsrep-lib/include/wsrep/thread_service.hpp b/wsrep-lib/include/wsrep/thread_service.hpp new file mode 100644 index 00000000..702d71c1 --- /dev/null +++ b/wsrep-lib/include/wsrep/thread_service.hpp @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + + +/** @file thread_service.hpp + * + * Service interface for threads and synchronization primitives. + * The purpose of this interface is to provider provider implementations + * means to integrate with application thread implementation. + * + * Interface is designed to resemble POSIX threads, mutexes and + * condition variables. + */ + +#ifndef WSREP_THREAD_SERVICE_HPP +#define WSREP_THREAD_SERVICE_HPP + +#include <cstddef> // size_t +#include "compiler.hpp" + +struct timespec; +struct sched_param; + +namespace wsrep +{ + + class thread_service + { + public: + + thread_service() : exit() { } + virtual ~thread_service() { } + struct thread_key { }; + struct thread { }; + struct mutex_key { }; + struct mutex { }; + struct cond_key { }; + struct cond { }; + + /** + * Method will be called before library side thread + * service initialization. + */ + virtual int before_init() = 0; + + /** + * Method will be called after library side thread service + * has been initialized. + */ + virtual int after_init() = 0; + + /* Thread */ + virtual const thread_key* create_thread_key(const char* name) WSREP_NOEXCEPT + = 0; + virtual int create_thread(const thread_key*, thread**, + void* (*fn)(void*), void*) WSREP_NOEXCEPT + = 0; + virtual int detach(thread*) WSREP_NOEXCEPT = 0; + virtual int equal(thread*, thread*) WSREP_NOEXCEPT = 0; + + /* + * This unlike others is a function pointer to + * avoid having C++ methods on thread exit codepath. + */ + WSREP_NORETURN void (*exit)(thread*, void* retval); + virtual int join(thread*, void** retval) WSREP_NOEXCEPT = 0; + virtual thread* self() WSREP_NOEXCEPT = 0; + virtual int setschedparam(thread*, int, + const struct sched_param*) WSREP_NOEXCEPT + = 0; + virtual int getschedparam(thread*, int*, struct sched_param*) WSREP_NOEXCEPT + = 0; + + /* Mutex */ + virtual const mutex_key* create_mutex_key(const char* name) WSREP_NOEXCEPT + = 0; + virtual mutex* init_mutex(const mutex_key*, void*, size_t) WSREP_NOEXCEPT = 0; + virtual int destroy(mutex*) WSREP_NOEXCEPT = 0; + virtual int lock(mutex*) WSREP_NOEXCEPT = 0; + virtual int trylock(mutex*) WSREP_NOEXCEPT = 0; + virtual int unlock(mutex*) WSREP_NOEXCEPT = 0; + + /* Condition variable */ + virtual const cond_key* create_cond_key(const char* name) WSREP_NOEXCEPT = 0; + virtual cond* init_cond(const cond_key*, void*, size_t) WSREP_NOEXCEPT = 0; + virtual int destroy(cond*) WSREP_NOEXCEPT = 0; + virtual int wait(cond*, mutex*) WSREP_NOEXCEPT = 0; + virtual int timedwait(cond*, mutex*, const struct timespec*) WSREP_NOEXCEPT + = 0; + virtual int signal(cond*) WSREP_NOEXCEPT = 0; + virtual int broadcast(cond*) WSREP_NOEXCEPT = 0; + }; +} // namespace wsrep + +#endif // WSREP_THREAD_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/tls_service.hpp b/wsrep-lib/include/wsrep/tls_service.hpp new file mode 100644 index 00000000..07d20642 --- /dev/null +++ b/wsrep-lib/include/wsrep/tls_service.hpp @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2020 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + + +/** @file tls_service.hpp + * + * Service interface for interacting with DBMS provided + * TLS and encryption facilities. + */ + +#ifndef WSREP_TLS_SERVICE_HPP +#define WSREP_TLS_SERVICE_HPP + +#include "compiler.hpp" + +#include <sys/types.h> // ssize_t + +namespace wsrep +{ + + /* Type tags for TLS context and TLS stream. */ + struct tls_context { }; + struct tls_stream { }; + + /** @class tls_service + * + * TLS service interface. This provides an interface corresponding + * to wsrep-API TLS service. For details see wsrep-API/wsrep_tls_service.h + */ + class tls_service + { + public: + enum status + { + success = 0, + want_read, + want_write, + eof, + error + }; + + struct op_result + { + /** Status code of the operation of negative system error number. */ + ssize_t status; + /** Bytes transferred from/to given buffer during the operation. */ + size_t bytes_transferred; + }; + + virtual ~tls_service() { } + /** + * @return Zero on success, system error code on failure. + */ + virtual tls_stream* create_tls_stream(int fd) WSREP_NOEXCEPT = 0; + virtual void destroy(tls_stream*) WSREP_NOEXCEPT = 0; + + virtual int get_error_number(const tls_stream*) const WSREP_NOEXCEPT = 0; + virtual const void* get_error_category(const tls_stream*) const WSREP_NOEXCEPT = 0; + virtual const char* get_error_message(const tls_stream*, + int value, const void* category) + const WSREP_NOEXCEPT = 0; + /** + * @return Status enum. + */ + virtual status client_handshake(tls_stream*) WSREP_NOEXCEPT = 0; + + /** + * @return Status enum or negative error code. + */ + virtual status server_handshake(tls_stream*) WSREP_NOEXCEPT = 0; + + /** + * Read at most max_count bytes into buf. + */ + virtual op_result read(tls_stream*, + void* buf, size_t max_count) WSREP_NOEXCEPT = 0; + + /** + * Write at most count bytes from buf. + */ + virtual op_result write(tls_stream*, + const void* buf, size_t count) WSREP_NOEXCEPT = 0; + + /** + * Shutdown TLS stream. + */ + virtual status shutdown(tls_stream*) WSREP_NOEXCEPT = 0; + }; +} + +#endif // WSREP_TLS_SERVICE_HPP diff --git a/wsrep-lib/include/wsrep/transaction.hpp b/wsrep-lib/include/wsrep/transaction.hpp new file mode 100644 index 00000000..76835fd9 --- /dev/null +++ b/wsrep-lib/include/wsrep/transaction.hpp @@ -0,0 +1,313 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file transaction.hpp */ +#ifndef WSREP_TRANSACTION_HPP +#define WSREP_TRANSACTION_HPP + +#include "provider.hpp" +#include "server_state.hpp" +#include "transaction_id.hpp" +#include "streaming_context.hpp" +#include "lock.hpp" +#include "sr_key_set.hpp" +#include "buffer.hpp" +#include "client_service.hpp" +#include "xid.hpp" + +#include <cassert> +#include <vector> + +namespace wsrep +{ + class client_service; + class client_state; + class key; + class const_buffer; + + class transaction + { + public: + enum state + { + s_executing, + s_preparing, + s_prepared, + s_certifying, + s_committing, + s_ordered_commit, + s_committed, + s_cert_failed, + s_must_abort, + s_aborting, + s_aborted, + s_must_replay, + s_replaying + }; + static const int n_states = s_replaying + 1; + enum state state() const + { return state_; } + + transaction(wsrep::client_state& client_state); + ~transaction(); + // Accessors + wsrep::transaction_id id() const + { return id_; } + + const wsrep::id& server_id() const + { return server_id_; } + + bool active() const + { return (id_ != wsrep::transaction_id::undefined()); } + + + void state(wsrep::unique_lock<wsrep::mutex>&, enum state); + + // Return true if the certification of the last + // fragment succeeded + bool certified() const { return certified_; } + + wsrep::seqno seqno() const + { + return ws_meta_.seqno(); + } + // Return true if the last fragment was ordered by the + // provider + bool ordered() const + { return (ws_meta_.seqno().is_undefined() == false); } + + /** + * Return true if any fragments have been successfully certified + * for the transaction. + */ + bool is_streaming() const + { + return (streaming_context_.fragments_certified() > 0); + } + + /** + * Return number of fragments certified for current statement. + * + * This counts fragments which have been successfully certified + * since the construction of object or last after_statement() + * call. + * + * @return Number of fragments certified for current statement. + */ + size_t fragments_certified_for_statement() const + { + return fragments_certified_for_statement_; + } + + /** + * Return true if transaction has not generated any changes. + */ + bool is_empty() const + { + return sr_keys_.empty(); + } + + bool is_xa() const + { + return !xid_.is_null(); + } + + void assign_xid(const wsrep::xid& xid); + + const wsrep::xid& xid() const + { + return xid_; + } + + int restore_to_prepared_state(const wsrep::xid& xid); + + int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit); + + void xa_detach(); + + int xa_replay(wsrep::unique_lock<wsrep::mutex>&); + + bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); } + void pa_unsafe(bool pa_unsafe) { + if (pa_unsafe) { + flags(flags() | wsrep::provider::flag::pa_unsafe); + } else { + flags(flags() & ~wsrep::provider::flag::pa_unsafe); + } + } + bool implicit_deps() const { return implicit_deps_; } + void implicit_deps(bool implicit) { implicit_deps_ = implicit; } + + int start_transaction(const wsrep::transaction_id& id); + + int start_transaction(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta); + + int next_fragment(const wsrep::ws_meta& ws_meta); + + void adopt(const transaction& transaction); + void fragment_applied(wsrep::seqno seqno); + + int prepare_for_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit); + + int assign_read_view(const wsrep::gtid* gtid); + + int append_key(const wsrep::key&); + + int append_data(const wsrep::const_buffer&); + + int after_row(); + + int before_prepare(wsrep::unique_lock<wsrep::mutex>&); + + int after_prepare(wsrep::unique_lock<wsrep::mutex>&); + + int before_commit(); + + int ordered_commit(); + + int after_commit(); + + int before_rollback(); + + int after_rollback(); + + int before_statement(); + + int after_statement(); + + void after_command_must_abort(wsrep::unique_lock<wsrep::mutex>&); + + void after_applying(); + + bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock, + wsrep::seqno bf_seqno); + bool total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>&, + wsrep::seqno bf_seqno); + + void clone_for_replay(const wsrep::transaction& other); + + bool bf_aborted() const + { + return (bf_abort_client_state_ != 0); + } + + bool bf_aborted_in_total_order() const + { + return bf_aborted_in_total_order_; + } + + int flags() const + { + return flags_; + } + + // wsrep::mutex& mutex(); + wsrep::ws_handle& ws_handle() { return ws_handle_; } + const wsrep::ws_handle& ws_handle() const { return ws_handle_; } + const wsrep::ws_meta& ws_meta() const { return ws_meta_; } + const wsrep::streaming_context& streaming_context() const + { return streaming_context_; } + wsrep::streaming_context& streaming_context() + { return streaming_context_; } + void adopt_apply_error(wsrep::mutable_buffer& buf) + { + apply_error_buf_ = std::move(buf); + } + const wsrep::mutable_buffer& apply_error() const + { return apply_error_buf_; } + private: + transaction(const transaction&); + transaction operator=(const transaction&); + + wsrep::provider& provider(); + void flags(int flags) { flags_ = flags; } + // Return true if the transaction must abort, is aborting, + // or has been aborted, or has been interrupted by DBMS + // as indicated by client_service::interrupted() call. + // The call will adjust transaction state and set client_state + // error status accordingly. + bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&); + int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false); + int certify_fragment(wsrep::unique_lock<wsrep::mutex>&); + int certify_commit(wsrep::unique_lock<wsrep::mutex>&); + int append_sr_keys_for_commit(); + int release_commit_order(wsrep::unique_lock<wsrep::mutex>&); + void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&); + int replay(wsrep::unique_lock<wsrep::mutex>&); + void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&); + int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&); + void cleanup(); + void debug_log_state(const char*) const; + void debug_log_key_append(const wsrep::key& key) const; + + wsrep::server_service& server_service_; + wsrep::client_service& client_service_; + wsrep::client_state& client_state_; + wsrep::id server_id_; + wsrep::transaction_id id_; + enum state state_; + std::vector<enum state> state_hist_; + enum state bf_abort_state_; + enum wsrep::provider::status bf_abort_provider_status_; + int bf_abort_client_state_; + bool bf_aborted_in_total_order_; + wsrep::ws_handle ws_handle_; + wsrep::ws_meta ws_meta_; + int flags_; + bool implicit_deps_; + bool certified_; + size_t fragments_certified_for_statement_; + wsrep::streaming_context streaming_context_; + wsrep::sr_key_set sr_keys_; + wsrep::mutable_buffer apply_error_buf_; + wsrep::xid xid_; + bool streaming_rollback_in_progress_; + }; + + static inline const char* to_c_string(enum wsrep::transaction::state state) + { + switch (state) + { + case wsrep::transaction::s_executing: return "executing"; + case wsrep::transaction::s_preparing: return "preparing"; + case wsrep::transaction::s_prepared: return "prepared"; + case wsrep::transaction::s_certifying: return "certifying"; + case wsrep::transaction::s_committing: return "committing"; + case wsrep::transaction::s_ordered_commit: return "ordered_commit"; + case wsrep::transaction::s_committed: return "committed"; + case wsrep::transaction::s_cert_failed: return "cert_failed"; + case wsrep::transaction::s_must_abort: return "must_abort"; + case wsrep::transaction::s_aborting: return "aborting"; + case wsrep::transaction::s_aborted: return "aborted"; + case wsrep::transaction::s_must_replay: return "must_replay"; + case wsrep::transaction::s_replaying: return "replaying"; + } + return "unknown"; + } + static inline std::string to_string(enum wsrep::transaction::state state) + { + return to_c_string(state); + } + +} + +#endif // WSREP_TRANSACTION_HPP diff --git a/wsrep-lib/include/wsrep/transaction_id.hpp b/wsrep-lib/include/wsrep/transaction_id.hpp new file mode 100644 index 00000000..f5fb5dbc --- /dev/null +++ b/wsrep-lib/include/wsrep/transaction_id.hpp @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_TRANSACTION_ID_HPP +#define WSREP_TRANSACTION_ID_HPP + +#include <iostream> +#include <limits> + +namespace wsrep +{ + class transaction_id + { + public: + typedef unsigned long long type; + + + transaction_id() + : id_(std::numeric_limits<type>::max()) + { } + + template <typename I> + explicit transaction_id(I id) + : id_(static_cast<type>(id)) + { } + type get() const { return id_; } + static transaction_id undefined() { return transaction_id(-1); } + bool is_undefined() const { return (id_ == type(-1)); } + bool operator<(const transaction_id& other) const + { + return (id_ < other.id_); + } + bool operator==(const transaction_id& other) const + { return (id_ == other.id_); } + bool operator!=(const transaction_id& other) const + { return (id_ != other.id_); } + private: + type id_; + }; + + static inline std::ostream& operator<<(std::ostream& os, transaction_id id) + { + return (os << id.get()); + } +} + +#endif // WSREP_TRANSACTION_ID_HPP diff --git a/wsrep-lib/include/wsrep/version.hpp b/wsrep-lib/include/wsrep/version.hpp new file mode 100644 index 00000000..94312cf0 --- /dev/null +++ b/wsrep-lib/include/wsrep/version.hpp @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_VERSION_HPP +#define WSREP_VERSION_HPP + +/** @file version.hpp + * + * Wsrep library version numbers. The versioning follows Semantic + * Versioning 2.0.0 (https://semver.org/). + */ + +/** + * Major version number. + */ +#define WSREP_LIB_VERSION_MAJOR 1 +/** + * Minor version number. + */ +#define WSREP_LIB_VERSION_MINOR 0 +/** + * Patch version number. + */ +#define WSREP_LIB_VERSION_PATCH 0 + +// Range of supported wsrep-API versions. + +/** + * Lowest supported wsrep-API version. + */ +#define WSREP_LIB_MIN_API_VERSION 26 +/** + * Highest supported wsrep-API version. + */ +#define WSREP_LIB_MAX_API_VERSION 26 + +#endif // WSREP_VERSION_HPP diff --git a/wsrep-lib/include/wsrep/view.hpp b/wsrep-lib/include/wsrep/view.hpp new file mode 100644 index 00000000..d17c27f1 --- /dev/null +++ b/wsrep-lib/include/wsrep/view.hpp @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file view.hpp + * + * + */ + + +#ifndef WSREP_VIEW_HPP +#define WSREP_VIEW_HPP + +#include "id.hpp" +#include "seqno.hpp" +#include "gtid.hpp" +#include <vector> +#include <iostream> + +namespace wsrep +{ + class view + { + public: + enum status + { + primary, + non_primary, + disconnected + }; + class member + { + public: + member(const wsrep::id& id, + const std::string& name, + const std::string& incoming) + : id_(id) + , name_(name) + , incoming_(incoming) + { + } + const wsrep::id& id() const { return id_; } + const std::string& name() const { return name_; } + const std::string& incoming() const { return incoming_; } + private: + wsrep::id id_; + std::string name_; + std::string incoming_; + }; + + view() + : state_id_() + , view_seqno_() + , status_(disconnected) + , capabilities_() + , own_index_(-1) + , protocol_version_(0) + , members_() + { } + view(const wsrep::gtid& state_id, + wsrep::seqno view_seqno, + enum wsrep::view::status status, + int capabilities, + ssize_t own_index, + int protocol_version, + const std::vector<wsrep::view::member>& members) + : state_id_(state_id) + , view_seqno_(view_seqno) + , status_(status) + , capabilities_(capabilities) + , own_index_(own_index) + , protocol_version_(protocol_version) + , members_(members) + { } + + wsrep::gtid state_id() const + { return state_id_; } + + wsrep::seqno view_seqno() const + { return view_seqno_; } + + wsrep::view::status status() const + { return status_; } + + int capabilities() const + { return capabilities_; } + + ssize_t own_index() const + { return own_index_; } + + /** + * Return true if the two views have the same membership + */ + bool equal_membership(const wsrep::view& other) const; + + int protocol_version() const + { return protocol_version_; } + + const std::vector<member>& members() const + { return members_; } + + /** + * Return true if the view is final + */ + bool final() const + { + return (members_.empty() && own_index_ == -1); + } + + /** + * Return member index in the view. + * + * @return Member index if found, -1 if member is not present + * in the view. + */ + int member_index(const wsrep::id& member_id) const; + + /** + * Return true if id is member of this view + */ + bool is_member(const wsrep::id& id) const + { + return member_index(id) != -1; + } + + void print(std::ostream& os) const; + + private: + wsrep::gtid state_id_; + wsrep::seqno view_seqno_; + enum wsrep::view::status status_; + int capabilities_; + ssize_t own_index_; + int protocol_version_; + std::vector<wsrep::view::member> members_; + }; + + static inline + std::ostream& operator<<(std::ostream& os, const wsrep::view& v) + { + v.print(os); return os; + } + + static inline const char* to_c_string(enum wsrep::view::status status) + { + switch(status) + { + case wsrep::view::primary: return "primary"; + case wsrep::view::non_primary: return "non-primary"; + case wsrep::view::disconnected: return "disconnected"; + } + return "invalid status"; + } +} + +#endif // WSREP_VIEW diff --git a/wsrep-lib/include/wsrep/xid.hpp b/wsrep-lib/include/wsrep/xid.hpp new file mode 100644 index 00000000..c500d63b --- /dev/null +++ b/wsrep-lib/include/wsrep/xid.hpp @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_XID_HPP +#define WSREP_XID_HPP + +#include <iosfwd> +#include "buffer.hpp" +#include "exception.hpp" + +namespace wsrep +{ + class xid + { + public: + xid() + : format_id_(-1) + , gtrid_len_(0) + , bqual_len_(0) + , data_() + { } + + xid(long format_id, long gtrid_len, + long bqual_len, const char* data) + : format_id_(format_id) + , gtrid_len_(gtrid_len) + , bqual_len_(bqual_len) + , data_() + { + if (gtrid_len_ > 64 || bqual_len_ > 64) + { + throw wsrep::runtime_error("maximum wsrep::xid size exceeded"); + } + const long len = gtrid_len_ + bqual_len_; + if (len > 0) + { + data_.push_back(data, data + len); + } + } + + xid(const xid& xid) + : format_id_(xid.format_id_) + , gtrid_len_(xid.gtrid_len_) + , bqual_len_(xid.bqual_len_) + , data_(xid.data_) + { } + + bool is_null() const + { + return format_id_ == -1; + } + + void clear() + { + format_id_ = -1; + gtrid_len_ = 0; + bqual_len_ = 0; + data_.clear(); + } + + xid& operator= (const xid& other) + { + format_id_ = other.format_id_; + gtrid_len_ = other.gtrid_len_; + bqual_len_ = other.bqual_len_; + data_ = other.data_; + return *this; + } + + bool operator==(const xid& other) const + { + if (format_id_ != other.format_id_ || + gtrid_len_ != other.gtrid_len_ || + bqual_len_ != other.bqual_len_ || + data_.size() != other.data_.size()) + { + return false; + } + return data_ == other.data_; + } + + friend std::string to_string(const wsrep::xid& xid); + friend std::ostream& operator<<(std::ostream& os, const wsrep::xid& xid); + protected: + long format_id_; + long gtrid_len_; + long bqual_len_; + mutable_buffer data_; + }; + + std::string to_string(const wsrep::xid& xid); + std::ostream& operator<<(std::ostream& os, const wsrep::xid& xid); +} + +#endif // WSREP_XID_HPP |