summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'wsrep-lib/src')
-rw-r--r--wsrep-lib/src/CMakeLists.txt31
-rw-r--r--wsrep-lib/src/allowlist_service_v1.cpp119
-rw-r--r--wsrep-lib/src/allowlist_service_v1.hpp55
-rw-r--r--wsrep-lib/src/client_state.cpp1096
-rw-r--r--wsrep-lib/src/config_service_v1.cpp176
-rw-r--r--wsrep-lib/src/config_service_v1.hpp30
-rw-r--r--wsrep-lib/src/event_service_v1.cpp104
-rw-r--r--wsrep-lib/src/event_service_v1.hpp54
-rw-r--r--wsrep-lib/src/exception.cpp22
-rw-r--r--wsrep-lib/src/gtid.cpp90
-rw-r--r--wsrep-lib/src/id.cpp81
-rw-r--r--wsrep-lib/src/key.cpp65
-rw-r--r--wsrep-lib/src/logger.cpp43
-rw-r--r--wsrep-lib/src/provider.cpp171
-rw-r--r--wsrep-lib/src/provider_options.cpp168
-rw-r--r--wsrep-lib/src/reporter.cpp370
-rw-r--r--wsrep-lib/src/seqno.cpp26
-rw-r--r--wsrep-lib/src/server_state.cpp1654
-rw-r--r--wsrep-lib/src/service_helpers.hpp106
-rw-r--r--wsrep-lib/src/sr_key_set.cpp43
-rw-r--r--wsrep-lib/src/streaming_context.cpp95
-rw-r--r--wsrep-lib/src/thread.cpp30
-rw-r--r--wsrep-lib/src/thread_service_v1.cpp285
-rw-r--r--wsrep-lib/src/thread_service_v1.hpp55
-rw-r--r--wsrep-lib/src/tls_service_v1.cpp232
-rw-r--r--wsrep-lib/src/tls_service_v1.hpp54
-rw-r--r--wsrep-lib/src/transaction.cpp2171
-rw-r--r--wsrep-lib/src/uuid.cpp74
-rw-r--r--wsrep-lib/src/uuid.hpp79
-rw-r--r--wsrep-lib/src/view.cpp71
-rw-r--r--wsrep-lib/src/wsrep_provider_v26.cpp1183
-rw-r--r--wsrep-lib/src/wsrep_provider_v26.hpp116
-rw-r--r--wsrep-lib/src/xid.cpp31
33 files changed, 8980 insertions, 0 deletions
diff --git a/wsrep-lib/src/CMakeLists.txt b/wsrep-lib/src/CMakeLists.txt
new file mode 100644
index 00000000..85524cea
--- /dev/null
+++ b/wsrep-lib/src/CMakeLists.txt
@@ -0,0 +1,31 @@
+#
+# Copyright (C) 2018 Codership Oy <info@codership.com>
+#
+
+add_library(wsrep-lib
+ allowlist_service_v1.cpp
+ client_state.cpp
+ config_service_v1.cpp
+ event_service_v1.cpp
+ exception.cpp
+ gtid.cpp
+ id.cpp
+ key.cpp
+ logger.cpp
+ provider.cpp
+ provider_options.cpp
+ reporter.cpp
+ seqno.cpp
+ server_state.cpp
+ sr_key_set.cpp
+ streaming_context.cpp
+ thread.cpp
+ thread_service_v1.cpp
+ tls_service_v1.cpp
+ transaction.cpp
+ uuid.cpp
+ view.cpp
+ wsrep_provider_v26.cpp
+ xid.cpp
+ )
+target_link_libraries(wsrep-lib wsrep_api_v26 pthread ${WSREP_LIB_LIBDL})
diff --git a/wsrep-lib/src/allowlist_service_v1.cpp b/wsrep-lib/src/allowlist_service_v1.cpp
new file mode 100644
index 00000000..9e7bf2ae
--- /dev/null
+++ b/wsrep-lib/src/allowlist_service_v1.cpp
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2021 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "allowlist_service_v1.hpp"
+#include "service_helpers.hpp"
+
+#include "wsrep/allowlist_service.hpp"
+#include "wsrep/buffer.hpp"
+#include "v26/wsrep_allowlist_service.h"
+
+#include <cassert>
+#include <dlfcn.h>
+#include <cerrno>
+
+namespace wsrep_allowlist_service_v1
+{
+ // Pointer to allowlist service implementation provided by
+ // the application.
+ static wsrep::allowlist_service* allowlist_service_impl{ 0 };
+ static std::atomic<size_t> use_count;
+
+ enum wsrep::allowlist_service::allowlist_key allowlist_key_from_native(wsrep_allowlist_key_t key)
+ {
+ switch (key)
+ {
+ case WSREP_ALLOWLIST_KEY_IP: return wsrep::allowlist_service::allowlist_key::allowlist_ip;
+ case WSREP_ALLOWLIST_KEY_SSL: return wsrep::allowlist_service::allowlist_key::allowlist_ssl;
+ default: throw wsrep::runtime_error("Unknown allowlist key");
+ }
+ }
+
+ //
+ // allowlist service callbacks
+ //
+
+ wsrep_status_t allowlist_cb(
+ wsrep_allowlist_context_t*,
+ wsrep_allowlist_key_t key,
+ const wsrep_buf_t* value
+ )
+ {
+ assert(allowlist_service_impl);
+ wsrep::const_buffer allowlist_value(value->ptr, value->len);
+ if (allowlist_service_impl->allowlist_cb(allowlist_key_from_native(key),
+ allowlist_value))
+ {
+ return WSREP_OK;
+ }
+ return WSREP_NOT_ALLOWED;
+ }
+
+ static wsrep_allowlist_service_v1_t allowlist_service_callbacks
+ = { allowlist_cb,
+ 0 };
+}
+
+int wsrep::allowlist_service_v1_probe(void* dlh)
+{
+ typedef int (*init_fn)(wsrep_allowlist_service_v1_t*);
+ typedef void (*deinit_fn)();
+ if (wsrep_impl::service_probe<init_fn>(
+ dlh, WSREP_ALLOWLIST_SERVICE_INIT_FUNC_V1, "allowlist service v1") ||
+ wsrep_impl::service_probe<deinit_fn>(
+ dlh, WSREP_ALLOWLIST_SERVICE_DEINIT_FUNC_V1, "allowlist service v1"))
+ {
+ wsrep::log_warning() << "Provider does not support allowlist service v1";
+ return 1;
+ }
+ return 0;
+}
+
+int wsrep::allowlist_service_v1_init(void* dlh,
+ wsrep::allowlist_service* allowlist_service)
+{
+ if (not (dlh && allowlist_service)) return EINVAL;
+ typedef int (*init_fn)(wsrep_allowlist_service_v1_t*);
+ wsrep_allowlist_service_v1::allowlist_service_impl = allowlist_service;
+ int ret(0);
+ if ((ret = wsrep_impl::service_init<init_fn>(
+ dlh, WSREP_ALLOWLIST_SERVICE_INIT_FUNC_V1,
+ &wsrep_allowlist_service_v1::allowlist_service_callbacks,
+ "allowlist service v1")))
+ {
+ wsrep_allowlist_service_v1::allowlist_service_impl = 0;
+ }
+ else
+ {
+ ++wsrep_allowlist_service_v1::use_count;
+ }
+ return ret;
+}
+
+void wsrep::allowlist_service_v1_deinit(void* dlh)
+{
+ typedef int (*deinit_fn)();
+ wsrep_impl::service_deinit<deinit_fn>(
+ dlh, WSREP_ALLOWLIST_SERVICE_DEINIT_FUNC_V1, "allowlist service v1");
+ --wsrep_allowlist_service_v1::use_count;
+ if (wsrep_allowlist_service_v1::use_count == 0)
+ {
+ wsrep_allowlist_service_v1::allowlist_service_impl = 0;
+ }
+}
diff --git a/wsrep-lib/src/allowlist_service_v1.hpp b/wsrep-lib/src/allowlist_service_v1.hpp
new file mode 100644
index 00000000..eb73bd9c
--- /dev/null
+++ b/wsrep-lib/src/allowlist_service_v1.hpp
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2021 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_ALLOWLIST_SERVICE_V1_HPP
+#define WSREP_ALLOWLIST_SERVICE_V1_HPP
+
+namespace wsrep
+{
+ class allowlist_service;
+ /**
+ * Probe allowlist_service_v1 support in loaded library.
+ *
+ * @param dlh Handle returned by dlopen().
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int allowlist_service_v1_probe(void *dlh);
+
+ /**
+ * Initialize the allowlist service.
+ *
+ * @param dlh Handle returned by dlopen().
+ * @param allowlist_service Pointer to wsrep::allowlist_service implementation.
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int allowlist_service_v1_init(void* dlh,
+ wsrep::allowlist_service* allowlist_service);
+
+ /**
+ * Deinitialize the allowlist service.
+ *
+ * @params dlh Handler returned by dlopen().
+ */
+ void allowlist_service_v1_deinit(void* dlh);
+
+}
+
+#endif // WSREP_allowlist_SERVICE_V1_HPP
diff --git a/wsrep-lib/src/client_state.cpp b/wsrep-lib/src/client_state.cpp
new file mode 100644
index 00000000..99c4222f
--- /dev/null
+++ b/wsrep-lib/src/client_state.cpp
@@ -0,0 +1,1096 @@
+/*
+ * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/client_state.hpp"
+#include "wsrep/compiler.hpp"
+#include "wsrep/logger.hpp"
+#include "wsrep/server_state.hpp"
+#include "wsrep/server_service.hpp"
+#include "wsrep/client_service.hpp"
+
+#include <unistd.h> // usleep()
+#include <cassert>
+#include <sstream>
+#include <iostream>
+
+wsrep::client_state::~client_state()
+{
+ assert(transaction_.active() == false);
+}
+
+wsrep::provider& wsrep::client_state::provider() const
+{
+ return server_state_.provider();
+}
+
+void wsrep::client_state::open(wsrep::client_id id)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(state_ == s_none);
+ assert(keep_command_error_ == false);
+ debug_log_state("open: enter");
+ owning_thread_id_ = wsrep::this_thread::get_id();
+ rollbacker_active_ = false;
+ sync_wait_gtid_ = wsrep::gtid::undefined();
+ last_written_gtid_ = wsrep::gtid::undefined();
+ state(lock, s_idle);
+ id_ = id;
+ debug_log_state("open: leave");
+}
+
+void wsrep::client_state::close()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("close: enter");
+
+ while (mode_ == m_local && is_rollbacker_active()) {
+ cond_.wait(lock);
+ }
+ do_acquire_ownership(lock);
+
+ state(lock, s_quitting);
+ keep_command_error_ = false;
+ lock.unlock();
+ if (transaction_.active() &&
+ (mode_ != m_local ||
+ transaction_.state() != wsrep::transaction::s_prepared))
+ {
+ client_service_.bf_rollback();
+ transaction_.after_statement();
+ }
+ if (mode_ == m_local)
+ {
+ disable_streaming();
+ }
+ debug_log_state("close: leave");
+}
+
+void wsrep::client_state::cleanup()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ cleanup(lock);
+}
+
+void wsrep::client_state::cleanup(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ debug_log_state("cleanup: enter");
+ state(lock, s_none);
+ debug_log_state("cleanup: leave");
+}
+
+void wsrep::client_state::override_error(enum wsrep::client_error error,
+ enum wsrep::provider::status status)
+{
+ assert(wsrep::this_thread::get_id() == owning_thread_id_);
+ // Error state should not be cleared with success code without
+ // explicit reset_error() call.
+ assert(current_error_ == wsrep::e_success ||
+ error != wsrep::e_success);
+ current_error_ = error;
+ current_error_status_ = status;
+}
+
+int wsrep::client_state::before_command(bool keep_command_error)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("before_command: enter");
+ // If the state is s_exec, the processing thread has already grabbed
+ // control with wait_rollback_complete_and_acquire_ownership()
+ if (state_ != s_exec)
+ {
+ assert(state_ == s_idle);
+ do_wait_rollback_complete_and_acquire_ownership(lock);
+ assert(state_ == s_exec);
+ client_service_.store_globals();
+ }
+ else
+ {
+ // This thread must have acquired control by other means,
+ // for example via wait_rollback_complete_and_acquire_ownership().
+ assert(wsrep::this_thread::get_id() == owning_thread_id_);
+ }
+
+ keep_command_error_ = keep_command_error;
+
+ // If the transaction is active, it must be either executing,
+ // aborted as rolled back by rollbacker, or must_abort if the
+ // client thread gained control via
+ // wait_rollback_complete_and_acquire_ownership()
+ // just before BF abort happened.
+ assert(transaction_.active() == false ||
+ (transaction_.state() == wsrep::transaction::s_executing ||
+ transaction_.state() == wsrep::transaction::s_prepared ||
+ transaction_.state() == wsrep::transaction::s_aborted ||
+ transaction_.state() == wsrep::transaction::s_must_abort));
+
+ if (transaction_.active())
+ {
+ if (transaction_.state() == wsrep::transaction::s_must_abort ||
+ transaction_.state() == wsrep::transaction::s_aborted)
+ {
+ if (transaction_.is_xa())
+ {
+ // Client will rollback explicitly, return error.
+ debug_log_state("before_command: error");
+ return 1;
+ }
+
+ override_error(wsrep::e_deadlock_error);
+ if (transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ lock.unlock();
+ client_service_.bf_rollback();
+ lock.lock();
+
+ }
+
+ if (keep_command_error_)
+ {
+ // Keep the error for the next command
+ debug_log_state("before_command: keep error");
+ return 0;
+ }
+
+ // Clean up the transaction and return error.
+ (void)transaction_.after_statement(lock);
+
+ assert(transaction_.active() == false);
+ assert(transaction_.state() == wsrep::transaction::s_aborted);
+ assert(current_error() != wsrep::e_success);
+
+ debug_log_state("before_command: error");
+ return 1;
+ }
+ }
+ debug_log_state("before_command: success");
+ return 0;
+}
+
+void wsrep::client_state::after_command_before_result()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("after_command_before_result: enter");
+ assert(state() == s_exec);
+ if (transaction_.active() &&
+ transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ transaction_.after_command_must_abort(lock);
+ // If keep current error is set, the result will be propagated
+ // back to client with some future command, so keep the transaction
+ // open here so that error handling can happen in before_command()
+ // hook.
+ if (not keep_command_error_)
+ {
+ (void)transaction_.after_statement(lock);
+ }
+
+ assert(transaction_.state() == wsrep::transaction::s_aborted);
+ }
+ state(lock, s_result);
+ debug_log_state("after_command_before_result: leave");
+}
+
+void wsrep::client_state::after_command_after_result()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("after_command_after_result_enter");
+ assert(state() == s_result);
+ assert(transaction_.state() != wsrep::transaction::s_aborting);
+ if (transaction_.active() &&
+ transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ transaction_.after_command_must_abort(lock);
+ assert(transaction_.state() == wsrep::transaction::s_aborted);
+ }
+ else if (transaction_.active() == false && not keep_command_error_)
+ {
+ current_error_ = wsrep::e_success;
+ current_error_status_ = wsrep::provider::success;
+ }
+ keep_command_error_ = false;
+ sync_wait_gtid_ = wsrep::gtid::undefined();
+ state(lock, s_idle);
+ debug_log_state("after_command_after_result: leave");
+}
+
+int wsrep::client_state::before_statement()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("before_statement: enter");
+#if 0
+ /**
+ * @todo It might be beneficial to implement timed wait for
+ * server synced state.
+ */
+ if (allow_dirty_reads_ == false &&
+ server_state_.state() != wsrep::server_state::s_synced)
+ {
+ return 1;
+ }
+#endif // 0
+
+ if (transaction_.active() &&
+ transaction_.state() == wsrep::transaction::s_must_abort)
+ {
+ // Rollback and cleanup will happen in after_command_before_result()
+ debug_log_state("before_statement_error");
+ return 1;
+ }
+ debug_log_state("before_statement: success");
+ return 0;
+}
+
+int wsrep::client_state::after_statement()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("after_statement: enter");
+ assert(state() == s_exec);
+ assert(mode() == m_local);
+ (void)transaction_.after_statement(lock);
+ if (current_error() == wsrep::e_deadlock_error)
+ {
+ if (mode_ == m_local)
+ {
+ debug_log_state("after_statement: may_retry");
+ }
+ else
+ {
+ debug_log_state("after_statement: error");
+ }
+ return 1;
+ }
+ debug_log_state("after_statement: success");
+ return 0;
+}
+
+void wsrep::client_state::after_applying()
+{
+ assert(mode_ == m_high_priority);
+ transaction_.after_applying();
+}
+
+int wsrep::client_state::start_transaction(const wsrep::transaction_id& id)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(state_ == s_exec);
+ return transaction_.start_transaction(id);
+}
+
+int wsrep::client_state::assign_read_view(const wsrep::gtid* const gtid)
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return transaction_.assign_read_view(gtid);
+}
+
+int wsrep::client_state::append_key(const wsrep::key& key)
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return transaction_.append_key(key);
+}
+
+int wsrep::client_state::append_keys(const wsrep::key_array& keys)
+{
+ assert(mode_ == m_local || mode_ == m_toi);
+ assert(state_ == s_exec);
+ for (auto i(keys.begin()); i != keys.end(); ++i)
+ {
+ if (transaction_.append_key(*i))
+ {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+int wsrep::client_state::append_data(const wsrep::const_buffer& data)
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return transaction_.append_data(data);
+}
+
+int wsrep::client_state::after_row()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec);
+ return (transaction_.streaming_context().fragment_size()
+ ? transaction_.after_row()
+ : 0);
+}
+
+void wsrep::client_state::fragment_applied(wsrep::seqno seqno)
+{
+ assert(mode_ == m_high_priority);
+ transaction_.fragment_applied(seqno);
+}
+
+int wsrep::client_state::prepare_for_ordering(const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ bool is_commit)
+{
+ assert(state_ == s_exec);
+ return transaction_.prepare_for_ordering(ws_handle, ws_meta, is_commit);
+}
+
+int wsrep::client_state::start_transaction(const wsrep::ws_handle& wsh,
+ const wsrep::ws_meta& meta)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(mode_ == m_high_priority);
+ return transaction_.start_transaction(wsh, meta);
+}
+
+int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(mode_ == m_high_priority);
+ return transaction_.next_fragment(meta);
+}
+
+int wsrep::client_state::before_prepare()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec);
+ return transaction_.before_prepare(lock);
+}
+
+int wsrep::client_state::after_prepare()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec);
+ return transaction_.after_prepare(lock);
+}
+
+int wsrep::client_state::before_commit()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec || mode_ == m_local);
+ return transaction_.before_commit();
+}
+
+int wsrep::client_state::ordered_commit()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec || mode_ == m_local);
+ return transaction_.ordered_commit();
+}
+
+int wsrep::client_state::after_commit()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_exec || mode_ == m_local);
+ return transaction_.after_commit();
+}
+
+int wsrep::client_state::before_rollback()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_idle || state_ == s_exec || state_ == s_result
+ || state_ == s_quitting);
+ return transaction_.before_rollback();
+}
+
+int wsrep::client_state::after_rollback()
+{
+ assert(owning_thread_id_ == wsrep::this_thread::get_id());
+ assert(state_ == s_idle || state_ == s_exec || state_ == s_result
+ || state_ == s_quitting);
+ return transaction_.after_rollback();
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Rollbacker synchronization //
+//////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::sync_rollback_complete()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("sync_rollback_complete: enter");
+ assert(state_ == s_idle && mode_ == m_local &&
+ transaction_.state() == wsrep::transaction::s_aborted);
+ set_rollbacker_active(false);
+ cond_.notify_all();
+ debug_log_state("sync_rollback_complete: leave");
+}
+
+void wsrep::client_state::wait_rollback_complete_and_acquire_ownership()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ debug_log_state("wait_rollback_complete_and_acquire_ownership: enter");
+ if (state_ == s_idle)
+ {
+ do_wait_rollback_complete_and_acquire_ownership(lock);
+ }
+ assert(state_ == s_exec);
+ debug_log_state("wait_rollback_complete_and_acquire_ownership: leave");
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Streaming //
+//////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::streaming_params(
+ enum wsrep::streaming_context::fragment_unit fragment_unit,
+ size_t fragment_size)
+{
+ assert(mode_ == m_local);
+ transaction_.streaming_context().params(fragment_unit, fragment_size);
+}
+
+int wsrep::client_state::enable_streaming(
+ enum wsrep::streaming_context::fragment_unit
+ fragment_unit,
+ size_t fragment_size)
+{
+ assert(mode_ == m_local);
+ if (transaction_.is_streaming() &&
+ transaction_.streaming_context().fragment_unit() !=
+ fragment_unit)
+ {
+ wsrep::log_error()
+ << "Changing fragment unit for active streaming transaction "
+ << "not allowed";
+ return 1;
+ }
+ transaction_.streaming_context().enable(fragment_unit, fragment_size);
+ return 0;
+}
+
+void wsrep::client_state::disable_streaming()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_exec || state_ == s_quitting);
+ transaction_.streaming_context().disable();
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// XA //
+//////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::xa_detach()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
+ transaction_.xa_detach();
+}
+
+void wsrep::client_state::xa_replay()
+{
+ assert(mode_ == m_local);
+ assert(state_ == s_idle);
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ transaction_.xa_replay(lock);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// BF //
+//////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
+ wsrep::seqno bf_seqno)
+{
+ assert(lock.owns_lock());
+ assert(mode_ == m_local || transaction_.is_streaming());
+ auto ret = transaction_.bf_abort(lock, bf_seqno);
+ assert(lock.owns_lock());
+ return ret;
+}
+
+int wsrep::client_state::bf_abort(wsrep::seqno bf_seqno)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return bf_abort(lock, bf_seqno);
+}
+
+int wsrep::client_state::total_order_bf_abort(
+ wsrep::unique_lock<wsrep::mutex>& lock, wsrep::seqno bf_seqno)
+{
+ assert(lock.owns_lock());
+ assert(mode_ == m_local || transaction_.is_streaming());
+ auto ret = transaction_.total_order_bf_abort(lock, bf_seqno);
+ assert(lock.owns_lock());
+ return ret;
+}
+
+int wsrep::client_state::total_order_bf_abort(wsrep::seqno bf_seqno)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return total_order_bf_abort(lock, bf_seqno);
+}
+
+void wsrep::client_state::adopt_transaction(
+ const wsrep::transaction& transaction)
+{
+ assert(mode_ == m_high_priority);
+ transaction_.adopt(transaction);
+}
+
+void wsrep::client_state::adopt_apply_error(wsrep::mutable_buffer& err)
+{
+ assert(mode_ == m_high_priority);
+ transaction_.adopt_apply_error(err);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// TOI //
+//////////////////////////////////////////////////////////////////////////////
+
+enum wsrep::provider::status
+wsrep::client_state::poll_enter_toi(
+ wsrep::unique_lock<wsrep::mutex>& lock,
+ const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ wsrep::ws_meta& meta,
+ int flags,
+ std::chrono::time_point<wsrep::clock> wait_until,
+ bool& timed_out)
+{
+ WSREP_LOG_DEBUG(debug_log_level(),
+ wsrep::log::debug_level_client_state,
+ "poll_enter_toi: "
+ << flags
+ << ","
+ << wait_until.time_since_epoch().count());
+ enum wsrep::provider::status status;
+ timed_out = false;
+ wsrep::ws_meta poll_meta; // tmp var for polling, as enter_toi may clear meta arg on errors
+ do
+ {
+ lock.unlock();
+ poll_meta = meta;
+ status = provider().enter_toi(id_, keys, buffer, poll_meta, flags);
+ if (status != wsrep::provider::success &&
+ not poll_meta.gtid().is_undefined())
+ {
+ // Successfully entered TOI, but the provider reported failure.
+ // This may happen for example if certification fails.
+ // Leave TOI before proceeding.
+ if (provider().leave_toi(id_, wsrep::mutable_buffer()))
+ {
+ wsrep::log_warning()
+ << "Failed to leave TOI after failure in "
+ << "poll_enter_toi()";
+ }
+ poll_meta = wsrep::ws_meta();
+ }
+ if (status == wsrep::provider::error_certification_failed ||
+ status == wsrep::provider::error_connection_failed)
+ {
+ ::usleep(300000);
+ }
+ lock.lock();
+ timed_out = !(wait_until.time_since_epoch().count() &&
+ wsrep::clock::now() < wait_until);
+ }
+ while ((status == wsrep::provider::error_certification_failed ||
+ status == wsrep::provider::error_connection_failed) &&
+ not timed_out &&
+ not client_service_.interrupted(lock));
+ meta = poll_meta;
+ return status;
+}
+
+void wsrep::client_state::enter_toi_common(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ toi_mode_ = mode_;
+ mode(lock, m_toi);
+}
+
+int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ std::chrono::time_point<wsrep::clock> wait_until)
+{
+ debug_log_state("enter_toi_local: enter");
+ assert(state_ == s_exec);
+ assert(mode_ == m_local);
+ int ret;
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ bool timed_out;
+ auto const status(poll_enter_toi(
+ lock, keys, buffer,
+ toi_meta_,
+ wsrep::provider::flag::start_transaction |
+ wsrep::provider::flag::commit,
+ wait_until,
+ timed_out));
+ switch (status)
+ {
+ case wsrep::provider::success:
+ {
+ enter_toi_common(lock);
+ ret = 0;
+ break;
+ }
+ case wsrep::provider::error_certification_failed:
+ override_error(e_deadlock_error, status);
+ ret = 1;
+ break;
+ default:
+ if (timed_out) {
+ override_error(e_timeout_error);
+ } else {
+ override_error(e_error_during_commit, status);
+ }
+ ret = 1;
+ break;
+ }
+
+ debug_log_state("enter_toi_local: leave");
+ return ret;
+}
+
+void wsrep::client_state::enter_toi_mode(const wsrep::ws_meta& ws_meta)
+{
+ debug_log_state("enter_toi_mode: enter");
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(mode_ == m_high_priority);
+ enter_toi_common(lock);
+ toi_meta_ = ws_meta;
+ debug_log_state("enter_toi_mode: leave");
+}
+
+void wsrep::client_state::leave_toi_common()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ mode(lock, toi_mode_);
+ toi_mode_ = m_undefined;
+ if (toi_meta_.gtid().is_undefined() == false)
+ {
+ update_last_written_gtid(toi_meta_.gtid());
+ }
+ toi_meta_ = wsrep::ws_meta();
+}
+
+int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err)
+{
+ debug_log_state("leave_toi_local: enter");
+ assert(toi_mode_ == m_local);
+ leave_toi_common();
+
+ debug_log_state("leave_toi_local: leave");
+ return (provider().leave_toi(id_, err) == provider::success ? 0 : 1);
+}
+
+void wsrep::client_state::leave_toi_mode()
+{
+ debug_log_state("leave_toi_mode: enter");
+ assert(toi_mode_ == m_high_priority);
+ leave_toi_common();
+ debug_log_state("leave_toi_mode: leave");
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// RSU //
+///////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::begin_rsu(int timeout)
+{
+ if (server_state_.desync())
+ {
+ wsrep::log_warning() << "Failed to desync server";
+ return 1;
+ }
+ if (server_state_.server_service().wait_committing_transactions(timeout))
+ {
+ wsrep::log_warning() << "RSU failed due to pending transactions";
+ server_state_.resync();
+ return 1;
+ }
+ wsrep::seqno pause_seqno(server_state_.pause());
+ if (pause_seqno.is_undefined())
+ {
+ wsrep::log_warning() << "Failed to pause provider";
+ server_state_.resync();
+ return 1;
+ }
+ wsrep::log_info() << "Provider paused at: " << pause_seqno;
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ toi_mode_ = mode_;
+ mode(lock, m_rsu);
+ return 0;
+}
+
+int wsrep::client_state::end_rsu()
+{
+ int ret(0);
+ try
+ {
+ server_state_.resume();
+ server_state_.resync();
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_warning() << "End RSU failed: " << e.what();
+ ret = 1;
+ }
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ mode(lock, toi_mode_);
+ toi_mode_ = m_undefined;
+ return ret;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// NBO //
+///////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::begin_nbo_phase_one(
+ const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ std::chrono::time_point<wsrep::clock> wait_until)
+{
+ debug_log_state("begin_nbo_phase_one: enter");
+ debug_log_keys(keys);
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(state_ == s_exec);
+ assert(mode_ == m_local);
+ assert(toi_mode_ == m_undefined);
+
+ int ret;
+ bool timed_out;
+ auto const status(poll_enter_toi(
+ lock, keys, buffer,
+ toi_meta_,
+ wsrep::provider::flag::start_transaction,
+ wait_until,
+ timed_out));
+ switch (status)
+ {
+ case wsrep::provider::success:
+ toi_mode_ = mode_;
+ mode(lock, m_nbo);
+ ret= 0;
+ break;
+ case wsrep::provider::error_certification_failed:
+ override_error(e_deadlock_error, status);
+ ret = 1;
+ break;
+ default:
+ if (timed_out) {
+ override_error(e_timeout_error);
+ } else {
+ override_error(e_error_during_commit, status);
+ }
+ ret = 1;
+ break;
+ }
+
+ debug_log_state("begin_nbo_phase_one: leave");
+ return ret;
+}
+
+int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err)
+{
+ debug_log_state("end_nbo_phase_one: enter");
+ assert(state_ == s_exec);
+ assert(mode_ == m_nbo);
+ assert(in_toi());
+
+ enum wsrep::provider::status status(provider().leave_toi(id_, err));
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ int ret;
+ switch (status)
+ {
+ case wsrep::provider::success:
+ ret = 0;
+ break;
+ default:
+ override_error(e_error_during_commit, status);
+ ret = 1;
+ break;
+ }
+ nbo_meta_ = toi_meta_;
+ toi_meta_ = wsrep::ws_meta();
+ toi_mode_ = m_undefined;
+ debug_log_state("end_nbo_phase_one: leave");
+ return ret;
+}
+
+int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta)
+{
+ assert(state_ == s_exec);
+ assert(mode_ == m_local);
+ assert(toi_mode_ == m_undefined);
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ nbo_meta_ = ws_meta;
+ mode(lock, m_nbo);
+ return 0;
+}
+
+int wsrep::client_state::begin_nbo_phase_two(
+ const wsrep::key_array& keys,
+ std::chrono::time_point<wsrep::clock> wait_until)
+{
+ debug_log_state("begin_nbo_phase_two: enter");
+ debug_log_keys(keys);
+ assert(state_ == s_exec);
+ assert(mode_ == m_nbo);
+ assert(toi_mode_ == m_undefined);
+ assert(!in_toi());
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ // Note: nbo_meta_ is passed to enter_toi() as it is
+ // an input param containing gtid of NBO begin.
+ // Output stored in nbo_meta_ is copied to toi_meta_ for
+ // phase two end.
+ bool timed_out;
+ enum wsrep::provider::status status(
+ poll_enter_toi(lock, keys,
+ wsrep::const_buffer(),
+ nbo_meta_,
+ wsrep::provider::flag::commit,
+ wait_until,
+ timed_out));
+ int ret;
+ switch (status)
+ {
+ case wsrep::provider::success:
+ ret= 0;
+ toi_meta_ = nbo_meta_;
+ toi_mode_ = m_local;
+ break;
+ case wsrep::provider::error_provider_failed:
+ override_error(e_interrupted_error, status);
+ ret= 1;
+ break;
+ default:
+ if (timed_out)
+ {
+ override_error(e_timeout_error, status);
+ }
+ else
+ {
+ override_error(e_error_during_commit, status);
+ }
+ ret= 1;
+ break;
+ }
+
+ // Failed to grab TOI for completing NBO in order. This means that
+ // the operation cannot be ended in total order, so we end the
+ // NBO mode and let the DBMS to deal with the error.
+ if (ret)
+ {
+ mode(lock, m_local);
+ nbo_meta_ = wsrep::ws_meta();
+ }
+
+ debug_log_state("begin_nbo_phase_two: leave");
+ return ret;
+}
+
+int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err)
+{
+ debug_log_state("end_nbo_phase_two: enter");
+ assert(state_ == s_exec);
+ assert(mode_ == m_nbo);
+ assert(toi_mode_ == m_local);
+ assert(in_toi());
+ enum wsrep::provider::status status(
+ provider().leave_toi(id_, err));
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ int ret;
+ switch (status)
+ {
+ case wsrep::provider::success:
+ ret = 0;
+ break;
+ default:
+ override_error(e_error_during_commit, status);
+ ret = 1;
+ break;
+ }
+ toi_meta_ = wsrep::ws_meta();
+ toi_mode_ = m_undefined;
+ nbo_meta_ = wsrep::ws_meta();
+ mode(lock, m_local);
+ debug_log_state("end_nbo_phase_two: leave");
+ return ret;
+}
+///////////////////////////////////////////////////////////////////////////////
+// Misc //
+///////////////////////////////////////////////////////////////////////////////
+
+int wsrep::client_state::sync_wait(int timeout)
+{
+ std::pair<wsrep::gtid, enum wsrep::provider::status> result(
+ server_state_.causal_read(timeout));
+ int ret(1);
+ switch (result.second)
+ {
+ case wsrep::provider::success:
+ sync_wait_gtid_ = result.first;
+ ret = 0;
+ break;
+ case wsrep::provider::error_not_implemented:
+ override_error(wsrep::e_not_supported_error);
+ break;
+ default:
+ override_error(wsrep::e_timeout_error);
+ break;
+ }
+ return ret;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// Private //
+///////////////////////////////////////////////////////////////////////////////
+
+void wsrep::client_state::do_acquire_ownership(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
+{
+ assert(lock.owns_lock());
+ // Be strict about client state for clients in local mode. The
+ // owning_thread_id_ is used to detect bugs which are caused by
+ // more than one thread operating the client state at the time,
+ // for example thread handling the client session and background
+ // rollbacker.
+ assert(state_ == s_idle || mode_ != m_local);
+ owning_thread_id_ = wsrep::this_thread::get_id();
+}
+
+void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ assert(state_ == s_idle);
+ while (is_rollbacker_active())
+ {
+ cond_.wait(lock);
+ }
+ do_acquire_ownership(lock);
+ state(lock, s_exec);
+}
+
+void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)
+{
+ assert(last_written_gtid_.is_undefined() ||
+ (last_written_gtid_.id() == gtid.id() &&
+ !(last_written_gtid_.seqno() > gtid.seqno())));
+ last_written_gtid_ = gtid;
+}
+
+void wsrep::client_state::debug_log_state(const char* context) const
+{
+ WSREP_LOG_DEBUG(debug_log_level(),
+ wsrep::log::debug_level_client_state,
+ context
+ << "(" << id_.get()
+ << "," << to_c_string(state_)
+ << "," << to_c_string(mode_)
+ << "," << wsrep::to_string(current_error_)
+ << "," << current_error_status_
+ << ",toi: " << toi_meta_.seqno()
+ << ",nbo: " << nbo_meta_.seqno() << ")");
+}
+
+void wsrep::client_state::debug_log_keys(const wsrep::key_array& keys) const
+{
+ for (size_t i(0); i < keys.size(); ++i)
+ {
+ WSREP_LOG_DEBUG(debug_log_level(),
+ wsrep::log::debug_level_client_state,
+ "TOI keys: "
+ << " id: " << id_
+ << "key: " << keys[i]);
+ }
+}
+
+void wsrep::client_state::state(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
+ enum wsrep::client_state::state state)
+{
+ // Verify that the current thread has gained control to the
+ // connection by calling before_command()
+ assert(wsrep::this_thread::get_id() == owning_thread_id_);
+ assert(lock.owns_lock());
+ static const char allowed[state_max_][state_max_] =
+ {
+ /* none idle exec result quit */
+ { 0, 1, 0, 0, 0}, /* none */
+ { 0, 0, 1, 0, 1}, /* idle */
+ { 0, 0, 0, 1, 0}, /* exec */
+ { 0, 1, 0, 0, 0}, /* result */
+ { 1, 0, 0, 0, 0} /* quit */
+ };
+ if (!allowed[state_][state])
+ {
+ wsrep::log_warning() << "client_state: Unallowed state transition: "
+ << state_ << " -> " << state;
+ assert(0);
+ }
+ state_hist_.push_back(state_);
+ state_ = state;
+ if (state_hist_.size() > 10)
+ {
+ state_hist_.erase(state_hist_.begin());
+ }
+
+}
+
+void wsrep::client_state::mode(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
+ enum mode mode)
+{
+ assert(lock.owns_lock());
+
+ static const char allowed[n_modes_][n_modes_] =
+ { /* u l h t r n */
+ { 0, 0, 0, 0, 0, 0 }, /* undefined */
+ { 0, 0, 1, 1, 1, 1 }, /* local */
+ { 0, 1, 0, 1, 0, 1 }, /* high prio */
+ { 0, 1, 1, 0, 0, 0 }, /* toi */
+ { 0, 1, 0, 0, 0, 0 }, /* rsu */
+ { 0, 1, 1, 0, 0, 0 } /* nbo */
+ };
+ if (!allowed[mode_][mode])
+ {
+ wsrep::log_warning() << "client_state: Unallowed mode transition: "
+ << mode_ << " -> " << mode;
+ assert(0);
+ }
+ mode_ = mode;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// High Priority Context //
+///////////////////////////////////////////////////////////////////////////////
+
+wsrep::high_priority_context::high_priority_context(wsrep::client_state& client)
+ : client_(client)
+ , orig_mode_(client.mode_)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client.mutex_);
+ client.mode(lock, wsrep::client_state::m_high_priority);
+}
+
+wsrep::high_priority_context::~high_priority_context()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_.mutex_);
+ assert(client_.mode() == wsrep::client_state::m_high_priority);
+ client_.mode(lock, orig_mode_);
+}
diff --git a/wsrep-lib/src/config_service_v1.cpp b/wsrep-lib/src/config_service_v1.cpp
new file mode 100644
index 00000000..ace61427
--- /dev/null
+++ b/wsrep-lib/src/config_service_v1.cpp
@@ -0,0 +1,176 @@
+/*
+ * Copyright (C) 2022 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "config_service_v1.hpp"
+#include "service_helpers.hpp"
+#include "v26/wsrep_config_service.h"
+#include "wsrep/logger.hpp"
+#include "wsrep/provider_options.hpp"
+
+#include <cassert>
+
+namespace wsrep_config_service_v1
+{
+ wsrep_config_service_v1_t service{ 0 };
+
+ static int map_flags(int flags)
+ {
+ int option_flags = 0;
+ if (flags & WSREP_PARAM_DEPRECATED)
+ option_flags |= wsrep::provider_options::flag::deprecated;
+ if (flags & WSREP_PARAM_READONLY)
+ option_flags |= wsrep::provider_options::flag::readonly;
+ if (flags & WSREP_PARAM_TYPE_BOOL)
+ option_flags |= wsrep::provider_options::flag::type_bool;
+ if (flags & WSREP_PARAM_TYPE_INTEGER)
+ option_flags |= wsrep::provider_options::flag::type_integer;
+ if (flags & WSREP_PARAM_TYPE_DOUBLE)
+ option_flags |= wsrep::provider_options::flag::type_double;
+ return option_flags;
+ }
+
+ static enum wsrep::provider::status
+ make_option(wsrep::provider_options* opt, const char* name, const char* val,
+ int flags)
+ {
+ std::unique_ptr<wsrep::provider_options::option_value> value(
+ new wsrep::provider_options::option_value_string(val));
+ std::unique_ptr<wsrep::provider_options::option_value> default_value(
+ new wsrep::provider_options::option_value_string(val));
+ return opt->set_default(name, std::move(value),
+ std::move(default_value), flags);
+ }
+
+ static enum wsrep::provider::status
+ make_option(wsrep::provider_options* opt, const char* name, int64_t val,
+ int flags)
+ {
+ std::unique_ptr<wsrep::provider_options::option_value> value(
+ new wsrep::provider_options::option_value_int(val));
+ std::unique_ptr<wsrep::provider_options::option_value> default_value(
+ new wsrep::provider_options::option_value_int(val));
+ return opt->set_default(name, std::move(value),
+ std::move(default_value), flags);
+ }
+
+ static enum wsrep::provider::status
+ make_option(wsrep::provider_options* opt, const char* name, bool val,
+ int flags)
+ {
+ std::unique_ptr<wsrep::provider_options::option_value> value(
+ new wsrep::provider_options::option_value_bool(val));
+ std::unique_ptr<wsrep::provider_options::option_value> default_value(
+ new wsrep::provider_options::option_value_bool(val));
+ return opt->set_default(name, std::move(value),
+ std::move(default_value), flags);
+ }
+
+ static enum wsrep::provider::status
+ make_option(wsrep::provider_options* opt, const char* name, double val,
+ int flags)
+ {
+ std::unique_ptr<wsrep::provider_options::option_value> value(
+ new wsrep::provider_options::option_value_double(val));
+ std::unique_ptr<wsrep::provider_options::option_value> default_value(
+ new wsrep::provider_options::option_value_double(val));
+ return opt->set_default(name, std::move(value),
+ std::move(default_value), flags);
+ }
+
+ wsrep_status_t service_callback(const wsrep_parameter* p, void* context)
+ {
+ const int flags = map_flags(p->flags);
+ enum wsrep::provider::status ret(wsrep::provider::error_unknown);
+ wsrep::provider_options* options = (wsrep::provider_options*)context;
+ switch (p->flags & WSREP_PARAM_TYPE_MASK)
+ {
+ case WSREP_PARAM_TYPE_BOOL:
+ ret = make_option(options, p->name, p->value.as_bool, flags);
+ break;
+ case WSREP_PARAM_TYPE_INTEGER:
+ ret = make_option(options, p->name, p->value.as_integer, flags);
+ break;
+ case WSREP_PARAM_TYPE_DOUBLE:
+ ret = make_option(options, p->name, p->value.as_double, flags);
+ break;
+ default:
+ assert((p->flags & WSREP_PARAM_TYPE_MASK) == 0);
+ ret = make_option(options, p->name, p->value.as_string, flags);
+ break;
+ }
+
+ if (ret == wsrep::provider::success)
+ return WSREP_OK;
+ else
+ return WSREP_FATAL;
+ }
+} // namespace wsrep_config_service_v1
+
+static int config_service_v1_probe(void* dlh)
+{
+ typedef int (*init_fn)(wsrep_config_service_v1_t*);
+ typedef void (*deinit_fn)();
+ return wsrep_impl::service_probe<init_fn>(
+ dlh, WSREP_CONFIG_SERVICE_INIT_FUNC_V1, "config service v1")
+ || wsrep_impl::service_probe<deinit_fn>(
+ dlh, WSREP_CONFIG_SERVICE_DEINIT_FUNC_V1, "config service v1");
+}
+
+static int config_service_v1_init(void* dlh)
+{
+ typedef int (*init_fn)(wsrep_config_service_v1_t*);
+ return wsrep_impl::service_init<init_fn>(
+ dlh, WSREP_CONFIG_SERVICE_INIT_FUNC_V1,
+ &wsrep_config_service_v1::service, "config service v1");
+}
+
+static void config_service_v1_deinit(void* dlh)
+{
+ typedef int (*deinit_fn)();
+ wsrep_impl::service_deinit<deinit_fn>(
+ dlh, WSREP_CONFIG_SERVICE_DEINIT_FUNC_V1, "config service v1");
+}
+
+int wsrep::config_service_v1_fetch(wsrep::provider& provider,
+ wsrep::provider_options* options)
+{
+ struct wsrep_st* wsrep = (struct wsrep_st*)provider.native();
+ if (config_service_v1_probe(wsrep->dlh))
+ {
+ wsrep::log_warning() << "Provider does not support config service v1";
+ return 1;
+ }
+ if (config_service_v1_init(wsrep->dlh))
+ {
+ wsrep::log_warning() << "Failed to initialize config service v1";
+ return 1;
+ }
+
+ wsrep_status_t ret = wsrep_config_service_v1::service.get_parameters(
+ wsrep, &wsrep_config_service_v1::service_callback, options);
+
+ config_service_v1_deinit(wsrep->dlh);
+
+ if (ret != WSREP_OK)
+ {
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/wsrep-lib/src/config_service_v1.hpp b/wsrep-lib/src/config_service_v1.hpp
new file mode 100644
index 00000000..49532cde
--- /dev/null
+++ b/wsrep-lib/src/config_service_v1.hpp
@@ -0,0 +1,30 @@
+/*
+ * Copyright (C) 2022 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_CONFIG_SERVICE_V1_HPP
+#define WSREP_CONFIG_SERVICE_V1_HPP
+
+namespace wsrep
+{
+ class provider;
+ class provider_options;
+ int config_service_v1_fetch(provider& provider, provider_options* opts);
+} // namespace wsrep
+
+#endif // WSREP_CONFIG_SERVICE_V1_HPP
diff --git a/wsrep-lib/src/event_service_v1.cpp b/wsrep-lib/src/event_service_v1.cpp
new file mode 100644
index 00000000..ffaf4ec4
--- /dev/null
+++ b/wsrep-lib/src/event_service_v1.cpp
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "event_service_v1.hpp"
+
+#include "wsrep/event_service.hpp"
+#include "wsrep/reporter.hpp"
+#include "wsrep/logger.hpp"
+#include "v26/wsrep_event_service.h"
+#include "service_helpers.hpp"
+
+#include <cassert>
+
+namespace wsrep_event_service_v1
+{
+ static std::atomic_flag initialized = ATOMIC_FLAG_INIT;
+
+ static void callback(
+ wsrep_event_context_t* ctx,
+ const char* name,
+ const char* value)
+ {
+ if (ctx)
+ {
+ wsrep::event_service* const impl
+ (reinterpret_cast<wsrep::event_service*>(ctx));
+ impl->process_event(name, value);
+ }
+ }
+
+ static const char* const log_string = "event service v1";
+}
+
+int wsrep::event_service_v1_probe(void* dlh)
+{
+ typedef int (*init_fn)(wsrep_event_service_v1_t*);
+ typedef void (*deinit_fn)();
+ if (wsrep_impl::service_probe<init_fn>(
+ dlh, WSREP_EVENT_SERVICE_INIT_FUNC_V1,
+ wsrep_event_service_v1::log_string) ||
+ wsrep_impl::service_probe<deinit_fn>(
+ dlh, WSREP_EVENT_SERVICE_DEINIT_FUNC_V1,
+ wsrep_event_service_v1::log_string))
+ {
+ // diagnostic message was logged by wsrep_impl::service_probe()
+ return 1;
+ }
+ return 0;
+}
+
+int wsrep::event_service_v1_init(void* dlh,
+ wsrep::event_service* event_service)
+{
+ if (not (dlh && event_service)) return EINVAL;
+
+ if (wsrep_event_service_v1::initialized.test_and_set()) return EALREADY;
+
+ wsrep_event_service_v1_t service =
+ {
+ wsrep_event_service_v1::callback,
+ reinterpret_cast<wsrep_event_context_t*>(event_service)
+ };
+
+ typedef int (*init_fn)(wsrep_event_service_v1_t*);
+ int const ret(wsrep_impl::service_init<init_fn>(
+ dlh, WSREP_EVENT_SERVICE_INIT_FUNC_V1, &service,
+ wsrep_event_service_v1::log_string));
+ if (ret)
+ {
+ wsrep_event_service_v1::initialized.clear();
+ }
+
+ return ret;
+}
+
+void wsrep::event_service_v1_deinit(void* dlh)
+{
+ if (wsrep_event_service_v1::initialized.test_and_set())
+ {
+ // service was initialized
+ typedef int (*deinit_fn)();
+ wsrep_impl::service_deinit<deinit_fn>(
+ dlh, WSREP_EVENT_SERVICE_DEINIT_FUNC_V1,
+ wsrep_event_service_v1::log_string);
+ }
+
+ wsrep_event_service_v1::initialized.clear();
+}
diff --git a/wsrep-lib/src/event_service_v1.hpp b/wsrep-lib/src/event_service_v1.hpp
new file mode 100644
index 00000000..1c85b01e
--- /dev/null
+++ b/wsrep-lib/src/event_service_v1.hpp
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_EVENT_SERVICE_V1_HPP
+#define WSREP_EVENT_SERVICE_V1_HPP
+
+namespace wsrep
+{
+ class event_service;
+ /**
+ * Probe event_service_v1 support in loaded library.
+ *
+ * @param dlh Handle returned by dlopen().
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int event_service_v1_probe(void *dlh);
+
+ /**
+ * Initialize event service.
+ *
+ * @param dlh Handle returned by dlopen().
+ * @params event_service Pointer to wsrep::event_service implementation.
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int event_service_v1_init(void* dlh,
+ wsrep::event_service* event_service);
+
+ /**
+ * Deinitialize event service.
+ *
+ * @param dlh Handler returned by dlopen().
+ */
+ void event_service_v1_deinit(void* dlh);
+}
+
+#endif // WSREP_EVENT_SERVICE_V1_HPP
diff --git a/wsrep-lib/src/exception.cpp b/wsrep-lib/src/exception.cpp
new file mode 100644
index 00000000..c55ed641
--- /dev/null
+++ b/wsrep-lib/src/exception.cpp
@@ -0,0 +1,22 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/exception.hpp"
+
+bool wsrep::abort_on_exception(false);
diff --git a/wsrep-lib/src/gtid.cpp b/wsrep-lib/src/gtid.cpp
new file mode 100644
index 00000000..af32d524
--- /dev/null
+++ b/wsrep-lib/src/gtid.cpp
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/gtid.hpp"
+
+#include <cerrno>
+#include <iostream>
+#include <sstream>
+
+const wsrep::gtid wsrep::gtid::undefined_ = wsrep::gtid();
+
+std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::gtid& gtid)
+{
+ return (os << gtid.id() << ":" << gtid.seqno());
+}
+
+std::istream& wsrep::operator>>(std::istream& is, wsrep::gtid& gtid)
+{
+ std::string id_str;
+ std::getline(is, id_str, ':');
+ long long seq;
+ is >> seq;
+ if (!is)
+ {
+ is.clear(std::ios_base::failbit);
+ return is;
+ }
+ try
+ {
+ // wsrep::id constructor will throw if it cannot parse the
+ // id_str.
+ gtid = wsrep::gtid(wsrep::id(id_str), wsrep::seqno(seq));
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ // Formatting or extraction error. Clear the istream state and
+ // set failibit.
+ is.clear(std::ios_base::failbit);
+ }
+ return is;
+}
+
+ssize_t wsrep::scan_from_c_str(
+ const char* buf, size_t buf_len, wsrep::gtid& gtid)
+{
+ std::istringstream is(std::string(buf, buf_len));
+ is >> gtid;
+ // Some failure occurred
+ if (!is)
+ {
+ return -EINVAL;
+ }
+ // Whole string was consumed without failures
+ if (is.eof())
+ {
+ return static_cast<ssize_t>(buf_len);
+ }
+ // The string was not consumed completely, return current position
+ // of the istream.
+ return static_cast<ssize_t>(is.tellg());
+}
+
+ssize_t wsrep::print_to_c_str(
+ const wsrep::gtid& gtid, char* buf, size_t buf_len)
+{
+ std::ostringstream os;
+ os << gtid;
+ if (os.str().size() > buf_len)
+ {
+ return -ENOBUFS;
+ }
+ std::strncpy(buf, os.str().c_str(), os.str().size());
+ return static_cast<ssize_t>(os.str().size());
+}
diff --git a/wsrep-lib/src/id.cpp b/wsrep-lib/src/id.cpp
new file mode 100644
index 00000000..2da188fc
--- /dev/null
+++ b/wsrep-lib/src/id.cpp
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/id.hpp"
+#include "uuid.hpp"
+
+#include <cctype>
+#include <sstream>
+#include <algorithm>
+
+const wsrep::id wsrep::id::undefined_ = wsrep::id();
+
+wsrep::id::id(const std::string& str)
+ : data_()
+{
+ wsrep::uuid_t wsrep_uuid;
+
+ if (str.size() == WSREP_LIB_UUID_STR_LEN &&
+ wsrep::uuid_scan(str.c_str(), str.size(), &wsrep_uuid) ==
+ WSREP_LIB_UUID_STR_LEN)
+ {
+ std::memcpy(data_.buf, wsrep_uuid.data, sizeof(data_.buf));
+ }
+ else if (str.size() <= 16)
+ {
+ std::memcpy(data_.buf, str.c_str(), str.size());
+ }
+ else
+ {
+ std::ostringstream os;
+ os << "String '" << str
+ << "' does not contain UUID or is longer thatn 16 bytes";
+ throw wsrep::runtime_error(os.str());
+ }
+}
+
+std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id)
+{
+ const char* ptr(static_cast<const char*>(id.data()));
+ size_t size(id.size());
+ if (static_cast<size_t>(std::count_if(ptr, ptr + size, ::isalnum)) == size)
+ {
+ return (os << std::string(ptr, size));
+ }
+ else
+ {
+ char uuid_str[WSREP_LIB_UUID_STR_LEN + 1];
+ wsrep::uuid_t uuid;
+ std::memcpy(uuid.data, ptr, sizeof(uuid.data));
+ if (wsrep::uuid_print(&uuid, uuid_str, sizeof(uuid_str)) < 0)
+ {
+ throw wsrep::runtime_error("Could not print uuid");
+ }
+ uuid_str[WSREP_LIB_UUID_STR_LEN] = '\0';
+ return (os << uuid_str);
+ }
+}
+
+std::istream& wsrep::operator>>(std::istream& is, wsrep::id& id)
+{
+ std::string id_str;
+ std::getline(is, id_str);
+ id = wsrep::id(id_str);
+ return is;
+}
diff --git a/wsrep-lib/src/key.cpp b/wsrep-lib/src/key.cpp
new file mode 100644
index 00000000..fb94fefc
--- /dev/null
+++ b/wsrep-lib/src/key.cpp
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/key.hpp"
+#include <ostream>
+#include <iomanip>
+
+namespace
+{
+ void print_key_part(std::ostream& os, const void* ptr, size_t len)
+ {
+ std::ios::fmtflags flags_save(os.flags());
+ os << len << ": ";
+ for (size_t i(0); i < len; ++i)
+ {
+ os << std::hex
+ << std::setfill('0')
+ << std::setw(2)
+ << static_cast<int>(
+ *(reinterpret_cast<const unsigned char*>(ptr) + i)) << " ";
+ }
+ os.flags(flags_save);
+ }
+}
+
+std::ostream& wsrep::operator<<(std::ostream& os,
+ enum wsrep::key::type key_type)
+{
+ switch (key_type)
+ {
+ case wsrep::key::shared: os << "shared"; break;
+ case wsrep::key::reference: os << "reference"; break;
+ case wsrep::key::update: os << "update"; break;
+ case wsrep::key::exclusive: os << "exclusive"; break;
+ default: os << "unknown"; break;
+ }
+ return os;
+}
+
+std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::key& key)
+{
+ os << "type: " << key.type();
+ for (size_t i(0); i < key.size(); ++i)
+ {
+ os << "\n ";
+ print_key_part(os, key.key_parts()[i].data(), key.key_parts()[i].size());
+ }
+ return os;
+}
diff --git a/wsrep-lib/src/logger.cpp b/wsrep-lib/src/logger.cpp
new file mode 100644
index 00000000..058a42a0
--- /dev/null
+++ b/wsrep-lib/src/logger.cpp
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/logger.hpp"
+
+#include <iostream>
+
+std::ostream& wsrep::log::os_ = std::cout;
+static wsrep::default_mutex log_mutex_;
+wsrep::mutex& wsrep::log::mutex_ = log_mutex_;
+wsrep::log::logger_fn_type wsrep::log::logger_fn_ = 0;
+std::atomic_int wsrep::log::debug_log_level_(0);
+
+void wsrep::log::logger_fn(wsrep::log::logger_fn_type logger_fn)
+{
+ logger_fn_ = logger_fn;
+}
+
+void wsrep::log::debug_log_level(int debug_log_level)
+{
+ debug_log_level_.store(debug_log_level, std::memory_order_relaxed);
+}
+
+int wsrep::log::debug_log_level()
+{
+ return debug_log_level_.load(std::memory_order_relaxed);
+}
diff --git a/wsrep-lib/src/provider.cpp b/wsrep-lib/src/provider.cpp
new file mode 100644
index 00000000..9e99f7fd
--- /dev/null
+++ b/wsrep-lib/src/provider.cpp
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/provider.hpp"
+#include "wsrep/logger.hpp"
+
+#include "wsrep_provider_v26.hpp"
+
+#include <dlfcn.h>
+#include <cassert>
+#include <memory>
+
+wsrep::provider* wsrep::provider::make_provider(
+ wsrep::server_state& server_state,
+ const std::string& provider_spec,
+ const std::string& provider_options,
+ const wsrep::provider::services& services)
+{
+ try
+ {
+ return new wsrep::wsrep_provider_v26(
+ server_state, provider_options, provider_spec, services);
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_error() << "Failed to create a new provider '"
+ << provider_spec << "'"
+ << " with options '" << provider_options
+ << "': " << e.what();
+ }
+ catch (...)
+ {
+ wsrep::log_error() << "Caught unknown exception when trying to "
+ << "create a new provider '"
+ << provider_spec << "'"
+ << " with options '" << provider_options;
+ }
+ return 0;
+}
+
+std::string
+wsrep::provider::to_string(enum wsrep::provider::status const val)
+{
+ switch(val)
+ {
+ case success:
+ return "Success";
+ case error_warning:
+ return "Warning";
+ case error_transaction_missing:
+ return "Transaction not registered with provider";
+ case error_certification_failed:
+ return "Certification failed";
+ case error_bf_abort:
+ return "Transaction was BF aborted";
+ case error_size_exceeded:
+ return "Transaction size exceeded";
+ case error_connection_failed:
+ return "Not connected to Primary Component";
+ case error_provider_failed:
+ return "Provider in bad state, needs to be reinitialized.";
+ case error_fatal:
+ return "Fatal error, must abort.";
+ case error_not_implemented:
+ return "Function not implemented";
+ case error_not_allowed:
+ return "Operation not allowed";
+ case error_unknown:
+ return "Unknown error";
+ }
+
+ assert(0);
+
+ std::ostringstream os;
+ os << "Invalid error code: " << val;
+ return os.str();
+}
+
+std::string wsrep::provider::capability::str(int caps)
+{
+ std::ostringstream os;
+
+#define WSREP_PRINT_CAPABILITY(cap_value, cap_string) \
+ if (caps & cap_value) { \
+ os << cap_string ", "; \
+ caps &= ~cap_value; \
+ }
+
+ WSREP_PRINT_CAPABILITY(multi_master, "MULTI-MASTER");
+ WSREP_PRINT_CAPABILITY(certification, "CERTIFICATION");
+ WSREP_PRINT_CAPABILITY(parallel_applying, "PARALLEL_APPLYING");
+ WSREP_PRINT_CAPABILITY(transaction_replay, "REPLAY");
+ WSREP_PRINT_CAPABILITY(isolation, "ISOLATION");
+ WSREP_PRINT_CAPABILITY(pause, "PAUSE");
+ WSREP_PRINT_CAPABILITY(causal_reads, "CAUSAL_READ");
+ WSREP_PRINT_CAPABILITY(causal_transaction, "CAUSAL_TRX");
+ WSREP_PRINT_CAPABILITY(incremental_writeset, "INCREMENTAL_WS");
+ WSREP_PRINT_CAPABILITY(session_locks, "SESSION_LOCK");
+ WSREP_PRINT_CAPABILITY(distributed_locks, "DISTRIBUTED_LOCK");
+ WSREP_PRINT_CAPABILITY(consistency_check, "CONSISTENCY_CHECK");
+ WSREP_PRINT_CAPABILITY(unordered, "UNORDERED");
+ WSREP_PRINT_CAPABILITY(annotation, "ANNOTATION");
+ WSREP_PRINT_CAPABILITY(preordered, "PREORDERED");
+ WSREP_PRINT_CAPABILITY(streaming, "STREAMING");
+ WSREP_PRINT_CAPABILITY(snapshot, "READ_VIEW");
+ WSREP_PRINT_CAPABILITY(nbo, "NBO");
+
+#undef WSREP_PRINT_CAPABILITY
+
+ if (caps)
+ {
+ assert(caps == 0); // to catch missed capabilities
+ os << "UNKNOWN(" << caps << ") ";
+ }
+
+ std::string ret(os.str());
+ if (ret.size() > 2) ret.erase(ret.size() - 2);
+ return ret;
+}
+
+std::string wsrep::flags_to_string(int flags)
+{
+ std::ostringstream oss;
+ if (flags & provider::flag::start_transaction)
+ oss << "start_transaction | ";
+ if (flags & provider::flag::commit)
+ oss << "commit | ";
+ if (flags & provider::flag::rollback)
+ oss << "rollback | ";
+ if (flags & provider::flag::isolation)
+ oss << "isolation | ";
+ if (flags & provider::flag::pa_unsafe)
+ oss << "pa_unsafe | ";
+ if (flags & provider::flag::prepare)
+ oss << "prepare | ";
+ if (flags & provider::flag::snapshot)
+ oss << "read_view | ";
+ if (flags & provider::flag::implicit_deps)
+ oss << "implicit_deps | ";
+
+ std::string ret(oss.str());
+ if (ret.size() > 3) ret.erase(ret.size() - 3);
+ return ret;
+}
+
+std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::ws_meta& ws_meta)
+{
+ os << "gtid: " << ws_meta.gtid()
+ << " server_id: " << ws_meta.server_id()
+ << " client_id: " << ws_meta.client_id()
+ << " trx_id: " << ws_meta.transaction_id()
+ << " flags: " << ws_meta.flags()
+ << " (" << wsrep::flags_to_string(ws_meta.flags()) << ")";
+ return os;
+}
diff --git a/wsrep-lib/src/provider_options.cpp b/wsrep-lib/src/provider_options.cpp
new file mode 100644
index 00000000..fcd7a959
--- /dev/null
+++ b/wsrep-lib/src/provider_options.cpp
@@ -0,0 +1,168 @@
+/*
+ * Copyright (C) 2021 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/provider_options.hpp"
+#include "config_service_v1.hpp"
+#include "wsrep/logger.hpp"
+
+#include <algorithm>
+#include <cassert>
+#include <cctype>
+#include <cstring>
+
+/**
+ * Provider options string separators.
+ */
+struct provider_options_sep
+{
+ /** Parameter separator. */
+ char param{ ';' };
+ /** Key value separator. */
+ char key_value{ '=' };
+};
+
+// Replace dots in option name with underscores
+static void sanitize_name(std::string& name)
+{
+ std::transform(name.begin(), name.end(), name.begin(),
+ [](std::string::value_type c) {
+ if (c == '.')
+ return '_';
+ return c;
+ });
+}
+
+bool wsrep::operator==(const wsrep::provider_options::option& left,
+ const wsrep::provider_options::option& right)
+{
+ return (std::strcmp(left.name(), right.name()) == 0);
+}
+
+// wsrep-API lacks better error code for not found, and this is
+// what Galera returns when parameter is not recogized, so we
+// got with it.
+static enum wsrep::provider::status not_found_error{
+ wsrep::provider::error_warning
+};
+
+wsrep::provider_options::option::option()
+ : name_{}
+ , real_name_{}
+ , value_{}
+ , default_value_{}
+ , flags_{ 0 }
+{
+}
+
+wsrep::provider_options::option::option(
+ const std::string& name,
+ std::unique_ptr<wsrep::provider_options::option_value> value,
+ std::unique_ptr<wsrep::provider_options::option_value> default_value,
+ int flags)
+ : name_{ name }
+ , real_name_{ name }
+ , value_{ std::move(value) }
+ , default_value_{ std::move(default_value) }
+ , flags_{ flags }
+{
+ sanitize_name(name_);
+}
+
+void wsrep::provider_options::option::update_value(
+ std::unique_ptr<wsrep::provider_options::option_value> value)
+{
+ value_ = std::move(value);
+}
+
+wsrep::provider_options::option::~option() {}
+
+wsrep::provider_options::provider_options(wsrep::provider& provider)
+ : provider_(provider)
+ , options_()
+{
+}
+
+enum wsrep::provider::status wsrep::provider_options::initial_options()
+{
+ options_.clear();
+ if (config_service_v1_fetch(provider_, this))
+ {
+ return wsrep::provider::error_not_implemented;
+ }
+ else
+ {
+ return wsrep::provider::success;
+ }
+}
+
+const wsrep::provider_options::option*
+wsrep::provider_options::get_option(const std::string& name) const
+{
+ auto ret(options_.find(name));
+ if (ret == options_.end())
+ {
+ return nullptr;
+ }
+ return ret->second.get();
+}
+
+enum wsrep::provider::status wsrep::provider_options::set(
+ const std::string& name,
+ std::unique_ptr<wsrep::provider_options::option_value> value)
+{
+ auto option(options_.find(name));
+ if (option == options_.end())
+ {
+ return not_found_error;
+ }
+ provider_options_sep sep;
+ auto ret(provider_.options(std::string(option->second->real_name())
+ + sep.key_value + value->as_string()
+ + sep.param));
+ if (ret == provider::success)
+ {
+ option->second->update_value(std::move(value));
+ }
+ return ret;
+}
+
+enum wsrep::provider::status wsrep::provider_options::set_default(
+ const std::string& name,
+ std::unique_ptr<wsrep::provider_options::option_value> value,
+ std::unique_ptr<wsrep::provider_options::option_value> default_value,
+ int flags)
+{
+ auto found(options_.find(name));
+ auto opt(std::unique_ptr<provider_options::option>(
+ new option{ name, std::move(value), std::move(default_value), flags }));
+ if (found != options_.end())
+ {
+ assert(0);
+ return wsrep::provider::error_not_allowed;
+ }
+ options_.emplace(std::string(opt->name()), std::move(opt));
+ return wsrep::provider::success;
+}
+
+void wsrep::provider_options::for_each(const std::function<void(option*)>& fn)
+{
+ std::for_each(
+ options_.begin(), options_.end(),
+ [&fn](const options_map::value_type& opt) { fn(opt.second.get()); });
+}
diff --git a/wsrep-lib/src/reporter.cpp b/wsrep-lib/src/reporter.cpp
new file mode 100644
index 00000000..511ef819
--- /dev/null
+++ b/wsrep-lib/src/reporter.cpp
@@ -0,0 +1,370 @@
+/*
+ * Copyright (C) 2021 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/reporter.hpp"
+#include "wsrep/logger.hpp"
+
+#include <sstream>
+#include <iomanip>
+
+#include <cassert>
+#include <cstring> // strerror()
+#include <cstdlib> // mkstemp()
+#include <cerrno> // errno
+#include <unistd.h> // write()
+#include <cstdio> // rename(), snprintf()
+#include <ctime> // clock_gettime()
+#include <cmath> // floor()
+
+static std::string const TEMP_EXTENSION(".XXXXXX");
+
+static std::string make_progress_string(int const from, int const to,
+ int const total,int const done,
+ int const indefinite)
+{
+ std::ostringstream os;
+
+ os << "{ \"from\": " << from << ", "
+ << "\"to\": " << to << ", "
+ << "\"total\": " << total << ", "
+ << "\"done\": " << done << ", "
+ << "\"indefinite\": " << indefinite << " }";
+
+ return os.str();
+}
+
+static std::string const indefinite_progress
+ (make_progress_string(-1, -1, -1, -1, -1));
+static std::string const steady_state
+ (make_progress_string(-1, -1, 0, 0, -1));
+
+static inline double
+timestamp()
+{
+ struct timespec time;
+ clock_gettime(CLOCK_REALTIME, &time);
+ return (double(time.tv_sec) + double(time.tv_nsec)*1.0e-9);
+}
+
+wsrep::reporter::reporter(wsrep::mutex& mutex,
+ const std::string& file_name,
+ size_t const max_msg)
+ : mutex_(mutex)
+ , file_name_(file_name)
+ , progress_(indefinite_progress)
+ , template_(new char [file_name_.length() + TEMP_EXTENSION.length() + 1])
+ , state_(wsrep::reporter::s_disconnected_disconnected)
+ , initialized_(false)
+ , err_msg_()
+ , warn_msg_()
+ , events_()
+ , max_msg_(max_msg)
+{
+ template_[file_name_.length() + TEMP_EXTENSION.length()] = '\0';
+ write_file(timestamp());
+}
+
+wsrep::reporter::~reporter()
+{
+ delete [] template_;
+}
+
+wsrep::reporter::substates
+wsrep::reporter::substate_map(enum wsrep::server_state::state const state)
+{
+ switch (state)
+ {
+ case wsrep::server_state::s_disconnected:
+ initialized_ = false;
+ return s_disconnected_disconnected;
+ case wsrep::server_state::s_initializing:
+ if (s_disconnected_disconnected == state_)
+ return s_disconnected_initializing;
+ else if (s_joining_sst == state_)
+ return s_joining_initializing;
+ else if (s_joining_initializing == state_)
+ return s_joining_initializing; // continuation
+ else
+ {
+ assert(0);
+ return state_;
+ }
+ case wsrep::server_state::s_initialized:
+ initialized_ = true;
+ if (s_disconnected_initializing >= state_)
+ return s_disconnected_initialized;
+ else if (s_joining_initializing == state_)
+ return s_joining_ist;
+ else if (s_joining_ist == state_)
+ return s_joining_ist; // continuation
+ else
+ {
+ assert(0);
+ return state_;
+ }
+ case wsrep::server_state::s_connected:
+ return s_connected_waiting;
+ case wsrep::server_state::s_joiner:
+ if (initialized_)
+ return s_joining_initialized;
+ else
+ return s_joining_sst;
+ case wsrep::server_state::s_joined:
+ return s_joined_syncing;
+ case wsrep::server_state::s_donor:
+ return s_donor_sending;
+ case wsrep::server_state::s_synced:
+ return s_synced_running;
+ case wsrep::server_state::s_disconnecting:
+ return s_disconnecting_disconnecting;
+ default:
+ assert(0);
+ return state_;
+ }
+}
+
+// See https://www.ietf.org/rfc/rfc4627.txt
+static std::string escape_json(const std::string& str)
+{
+ std::ostringstream os;
+ for (auto c = str.cbegin(); c != str.cend(); ++c)
+ {
+ switch (*c)
+ {
+ case '"': os << "\\\""; break;
+ case '\\': os << "\\\\"; break;
+ case '/': os << "\\/"; break;
+ case '\b': os << "\\b"; break;
+ case '\f': os << "\\f"; break;
+ case '\n': os << "\\n"; break;
+ case '\r': os << "\\r"; break;
+ case '\t': os << "\\t"; break;
+ default:
+ // std::iscntrl() returns non-zero for [0x00, 0x1f] and
+ // backspace character. Argument type is int so cast to
+ // unsigned char is needed to make it safe, see
+ // https://en.cppreference.com/w/cpp/string/byte/iscntrl
+ if (std::iscntrl(static_cast<unsigned char>(*c)))
+ {
+ os << "\\u" << std::hex << std::setw(4) <<
+ std::setfill('0') << static_cast<int>(*c);
+ }
+ else
+ {
+ os << *c;
+ }
+ }
+ }
+ return os.str();
+}
+
+void
+wsrep::reporter::write_log_msg(std::ostream& os,
+ const log_msg& msg)
+{
+ os << "\t\t{\n";
+ os << "\t\t\t\"timestamp\": " << std::showpoint << std::setprecision(18)
+ << msg.tstamp << ",\n";
+ os << "\t\t\t\"msg\": \"" << msg.msg << "\"\n";
+ os << "\t\t}";
+}
+
+void
+wsrep::reporter::write_event(std::ostream& os,
+ const log_msg& msg)
+{
+ os << "\t\t{\n";
+ os << "\t\t\t\"timestamp\": " << std::showpoint << std::setprecision(18)
+ << msg.tstamp << ",\n";
+ os << "\t\t\t\"event\": " << msg.msg << "\n";
+ os << "\t\t}";
+}
+
+void
+wsrep::reporter::write_array(std::ostream& os,
+ const std::string& label,
+ const std::deque<log_msg>& msgs,
+ void (*element_writer)(std::ostream& os,
+ const log_msg& msg))
+{
+ os << "\t\"" << label << "\": [\n";
+ for (size_t i(0); i < msgs.size(); ++i)
+ {
+ element_writer(os, msgs[i]);
+ os << (i+1 < msgs.size() ? ",\n" : "\n");
+ }
+ os << "\t],\n";
+}
+
+// write data to temporary file and then rename it to target file for atomicity
+void
+wsrep::reporter::write_file(double const tstamp)
+{
+ enum progress_type {
+ t_indefinite = -1, // indefinite wait
+ t_progressive, // measurable progress
+ t_final // final state
+ };
+
+ struct strings {
+ const char* state;
+ const char* comment;
+ progress_type type;
+ };
+
+ static const struct strings strings[substates_max] =
+ {
+ { "DISCONNECTED", "Disconnected", t_indefinite },
+ { "DISCONNECTED", "Initializing", t_indefinite },
+ { "DISCONNECTED", "Connecting", t_indefinite },
+ { "CONNECTED", "Waiting", t_indefinite },
+ { "JOINING", "Receiving state", t_progressive },
+ { "JOINING", "Receiving SST", t_progressive },
+ { "JOINING", "Initializing", t_progressive },
+ { "JOINING", "Receiving IST", t_progressive },
+ { "JOINED", "Syncing", t_progressive },
+ { "SYNCED", "Operational", t_final },
+ { "DONOR", "Donating SST", t_progressive },
+ { "DISCONNECTING", "Disconnecting", t_indefinite }
+ };
+
+ double const seconds(floor(tstamp));
+ time_t const tt = time_t(seconds);
+ struct tm date;
+ localtime_r(&tt, &date);
+
+ char date_str[85] = { '\0', };
+ snprintf(date_str, sizeof(date_str) - 1,
+ "%04d-%02d-%02d %02d:%02d:%02d.%03d",
+ date.tm_year + 1900, date.tm_mon + 1, date.tm_mday,
+ date.tm_hour, date.tm_min, date.tm_sec,
+ (int)((tstamp-seconds)*1000));
+
+ std::ostringstream os;
+ os << "{\n";
+ os << "\t\"date\": \"" << date_str << "\",\n";
+ os << "\t\"timestamp\": " << std::showpoint << std::setprecision(18)
+ << tstamp << ",\n";
+ write_array(os, "errors", err_msg_, write_log_msg);
+ write_array(os, "warnings", warn_msg_, write_log_msg);
+ write_array(os, "events", events_, write_event);
+ os << "\t\"status\": {\n";
+ os << "\t\t\"state\": \"" << strings[state_].state << "\",\n";
+ os << "\t\t\"comment\": \"" << strings[state_].comment << "\",\n";
+ os << "\t\t\"progress\": " << progress_ << "\n";
+ os << "\t}\n";
+ os << "}\n";
+
+ std::string const str(os.str());
+
+ // prepare template for mkstemp()
+ file_name_.copy(template_, file_name_.length());
+ TEMP_EXTENSION.copy(template_ +file_name_.length(),TEMP_EXTENSION.length());
+
+ int const fd(mkstemp(template_));
+ if (fd < 0)
+ {
+ std::cerr << "Reporter could not open temporary file `" << template_
+ << "': " << strerror(errno) << " (" << errno << ")\n";
+ return;
+ }
+ ssize_t err(write(fd, str.c_str(), str.length()));
+ close(fd);
+ if (err < 0)
+ {
+ std::cerr << "Could not write " << str.length()
+ << " bytes to temporary file '"
+ << template_ << "': " << strerror(errno)
+ << " (" << errno << ")\n";
+ return;
+ }
+
+ rename(template_, file_name_.c_str());
+}
+
+void
+wsrep::reporter::report_state(enum server_state::state const s)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ substates const state(substate_map(s));
+
+ if (state != state_)
+ {
+ state_ = state;
+
+ if (state_ == s_synced_running)
+ progress_ = steady_state;
+ else
+ progress_ = indefinite_progress;
+
+ write_file(timestamp());
+ }
+}
+
+void
+wsrep::reporter::report_progress(const std::string& json)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ if (json != progress_)
+ {
+ if (state_ != s_synced_running)
+ {
+ // ignore any progress in SYNCED state
+ progress_ = json;
+ write_file(timestamp());
+ }
+ }
+}
+
+void
+wsrep::reporter::report_event(const std::string& json)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ if (events_.size() == max_msg_)
+ {
+ events_.pop_front();
+ }
+ events_.push_back({timestamp(), json});
+ write_file(timestamp());
+}
+
+void
+wsrep::reporter::report_log_msg(log_level const lvl,
+ const std::string& msg,
+ double tstamp)
+{
+ std::deque<log_msg>& deque(lvl == error ? err_msg_ : warn_msg_);
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ if (deque.empty() || msg != deque.back().msg)
+ {
+ if (deque.size() == max_msg_) deque.pop_front();
+
+ if (tstamp <= undefined) tstamp = timestamp();
+
+ /* Log messages are not expected to be json formatted, so we escape
+ the message strings here to keep the report file well formatted. */
+ log_msg entry({tstamp, escape_json(msg)});
+ deque.push_back(entry);
+ write_file(tstamp);
+ }
+}
diff --git a/wsrep-lib/src/seqno.cpp b/wsrep-lib/src/seqno.cpp
new file mode 100644
index 00000000..19ff567d
--- /dev/null
+++ b/wsrep-lib/src/seqno.cpp
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/seqno.hpp"
+#include <ostream>
+
+std::ostream& wsrep::operator<<(std::ostream& os, wsrep::seqno seqno)
+{
+ return (os << seqno.get());
+}
diff --git a/wsrep-lib/src/server_state.cpp b/wsrep-lib/src/server_state.cpp
new file mode 100644
index 00000000..2fc9b199
--- /dev/null
+++ b/wsrep-lib/src/server_state.cpp
@@ -0,0 +1,1654 @@
+/*
+ * Copyright (C) 2018-2023 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/server_state.hpp"
+#include "wsrep/client_state.hpp"
+#include "wsrep/server_service.hpp"
+#include "wsrep/client_service.hpp"
+#include "wsrep/high_priority_service.hpp"
+#include "wsrep/transaction.hpp"
+#include "wsrep/view.hpp"
+#include "wsrep/logger.hpp"
+#include "wsrep/compiler.hpp"
+#include "wsrep/id.hpp"
+
+#include <cassert>
+#include <sstream>
+#include <algorithm>
+
+
+//////////////////////////////////////////////////////////////////////////////
+// Helpers //
+//////////////////////////////////////////////////////////////////////////////
+
+
+//
+// This method is used to deal with historical burden of several
+// ways to bootstrap the cluster. Bootstrap happens if
+//
+// * bootstrap option is given
+// * cluster_address is "gcomm://" (Galera provider)
+//
+static bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
+{
+ return (bootstrap || cluster_address == "gcomm://");
+}
+
+// Helper method to provide detailed error message if transaction
+// adopt for fragment removal fails.
+static void log_adopt_error(const wsrep::transaction& transaction)
+{
+ wsrep::log_warning() << "Adopting a transaction ("
+ << transaction.server_id() << "," << transaction.id()
+ << ") for rollback failed, "
+ << "this may leave stale entries to streaming log "
+ << "which may need to be removed manually.";
+}
+
+// resolve which of the two errors return to caller
+static inline int resolve_return_error(bool const vote,
+ int const vote_err,
+ int const apply_err)
+{
+ if (vote) return vote_err;
+ return vote_err != 0 ? vote_err : apply_err;
+}
+
+static void
+discard_streaming_applier(wsrep::server_state& server_state,
+ wsrep::high_priority_service& high_priority_service,
+ wsrep::high_priority_service* streaming_applier,
+ const wsrep::ws_meta& ws_meta)
+{
+ server_state.stop_streaming_applier(
+ ws_meta.server_id(), ws_meta.transaction_id());
+ server_state.server_service().release_high_priority_service(
+ streaming_applier);
+ high_priority_service.store_globals();
+}
+
+static int apply_fragment(wsrep::server_state& server_state,
+ wsrep::high_priority_service& high_priority_service,
+ wsrep::high_priority_service* streaming_applier,
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::const_buffer& data)
+{
+ int ret(0);
+ int apply_err;
+ wsrep::mutable_buffer err;
+ {
+ wsrep::high_priority_switch sw(high_priority_service,
+ *streaming_applier);
+ apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
+ if (!apply_err)
+ {
+ assert(err.size() == 0);
+ streaming_applier->after_apply();
+ }
+ else
+ {
+ bool const remove_fragments(streaming_applier->transaction(
+ ).streaming_context().fragments().size() > 0);
+ ret = streaming_applier->rollback(ws_handle, ws_meta);
+ ret = ret || (streaming_applier->after_apply(), 0);
+
+ if (remove_fragments)
+ {
+ ret = ret || streaming_applier->start_transaction(ws_handle,
+ ws_meta);
+ ret = ret || (streaming_applier->adopt_apply_error(err), 0);
+ ret = ret || streaming_applier->remove_fragments(ws_meta);
+ ret = ret || streaming_applier->commit(ws_handle, ws_meta);
+ ret = ret || (streaming_applier->after_apply(), 0);
+ }
+ else
+ {
+ ret = streaming_applier->log_dummy_write_set(ws_handle,
+ ws_meta, err);
+ }
+ }
+ }
+
+ if (!ret)
+ {
+ if (!apply_err)
+ {
+ high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
+ const wsrep::xid xid(streaming_applier->transaction().xid());
+ ret = high_priority_service.append_fragment_and_commit(
+ ws_handle, ws_meta, data, xid);
+ high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
+ ret = ret || (high_priority_service.after_apply(), 0);
+ }
+ else
+ {
+ discard_streaming_applier(server_state,
+ high_priority_service,
+ streaming_applier,
+ ws_meta);
+ ret = resolve_return_error(err.size() > 0, ret, apply_err);
+ }
+ }
+
+ return ret;
+}
+
+static int commit_fragment(wsrep::server_state& server_state,
+ wsrep::high_priority_service& high_priority_service,
+ wsrep::high_priority_service* streaming_applier,
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::const_buffer& data)
+{
+ int ret(0);
+ {
+ wsrep::high_priority_switch sw(
+ high_priority_service, *streaming_applier);
+ wsrep::mutable_buffer err;
+ int const apply_err(
+ streaming_applier->apply_write_set(ws_meta, data, err));
+ if (apply_err)
+ {
+ assert(streaming_applier->transaction(
+ ).streaming_context().fragments().size() > 0);
+ ret = streaming_applier->rollback(ws_handle, ws_meta);
+ ret = ret || (streaming_applier->after_apply(), 0);
+ ret = ret || streaming_applier->start_transaction(
+ ws_handle, ws_meta);
+ ret = ret || (streaming_applier->adopt_apply_error(err),0);
+ }
+ else
+ {
+ assert(err.size() == 0);
+ }
+
+ const wsrep::transaction& trx(streaming_applier->transaction());
+ // Fragment removal for XA is going to happen in after_commit
+ if (trx.state() != wsrep::transaction::s_prepared)
+ {
+ streaming_applier->debug_crash(
+ "crash_apply_cb_before_fragment_removal");
+
+ ret = ret || streaming_applier->remove_fragments(ws_meta);
+
+ streaming_applier->debug_crash(
+ "crash_apply_cb_after_fragment_removal");
+ }
+
+ streaming_applier->debug_crash(
+ "crash_commit_cb_before_last_fragment_commit");
+ ret = ret || streaming_applier->commit(ws_handle, ws_meta);
+ streaming_applier->debug_crash(
+ "crash_commit_cb_last_fragment_commit_success");
+ ret = ret || (streaming_applier->after_apply(), 0);
+ ret = resolve_return_error(err.size() > 0, ret, apply_err);
+ }
+
+ if (!ret)
+ {
+ discard_streaming_applier(server_state, high_priority_service,
+ streaming_applier, ws_meta);
+ }
+
+ return ret;
+}
+
+static int rollback_fragment(wsrep::server_state& server_state,
+ wsrep::high_priority_service& high_priority_service,
+ wsrep::high_priority_service* streaming_applier,
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta)
+{
+ int ret(0);
+ int adopt_error(0);
+ bool const remove_fragments(streaming_applier->transaction().
+ streaming_context().fragments().size() > 0);
+ // If fragment removal is needed, adopt transaction state
+ // and start a transaction for it.
+ if (remove_fragments &&
+ (adopt_error = high_priority_service.adopt_transaction(
+ streaming_applier->transaction())))
+ {
+ log_adopt_error(streaming_applier->transaction());
+ }
+ // Even if the adopt above fails we roll back the streaming transaction.
+ // Adopt failure will leave stale entries in streaming log which can
+ // be removed manually.
+ wsrep::const_buffer no_error;
+ {
+ wsrep::high_priority_switch ws(
+ high_priority_service, *streaming_applier);
+ // Streaming applier rolls back out of order. Fragment
+ // removal grabs commit order below.
+ ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta());
+ ret = ret || (streaming_applier->after_apply(), 0);
+ }
+
+ if (!ret)
+ {
+ discard_streaming_applier(server_state, high_priority_service,
+ streaming_applier, ws_meta);
+
+ if (adopt_error == 0)
+ {
+ if (remove_fragments)
+ {
+ ret = high_priority_service.remove_fragments(ws_meta);
+ ret = ret || high_priority_service.commit(ws_handle, ws_meta);
+ if (ret)
+ {
+ high_priority_service.rollback(ws_handle, ws_meta);
+ }
+ high_priority_service.after_apply();
+ }
+ else
+ {
+ if (ws_meta.ordered())
+ {
+ wsrep::mutable_buffer no_error;
+ ret = high_priority_service.log_dummy_write_set(
+ ws_handle, ws_meta, no_error);
+ }
+ }
+ }
+ }
+ return ret;
+}
+
+static int apply_write_set(wsrep::server_state& server_state,
+ wsrep::high_priority_service& high_priority_service,
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::const_buffer& data)
+{
+ int ret(0);
+ if (wsrep::rolls_back_transaction(ws_meta.flags()))
+ {
+ wsrep::mutable_buffer no_error;
+ if (wsrep::starts_transaction(ws_meta.flags()))
+ {
+ // No transaction existed before, log a dummy write set
+ ret = high_priority_service.log_dummy_write_set(
+ ws_handle, ws_meta, no_error);
+ }
+ else
+ {
+ wsrep::high_priority_service* sa(
+ server_state.find_streaming_applier(
+ ws_meta.server_id(), ws_meta.transaction_id()));
+ if (sa == 0)
+ {
+ // It is a known limitation that galera provider
+ // cannot always determine if certification test
+ // for interrupted transaction will pass or fail
+ // (see comments in transaction::certify_fragment()).
+ // As a consequence, unnecessary rollback fragments
+ // may be delivered here. The message below has
+ // been intentionally turned into a debug message,
+ // rather than warning.
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_server_state,
+ "Could not find applier context for "
+ << ws_meta.server_id()
+ << ": " << ws_meta.transaction_id());
+ ret = high_priority_service.log_dummy_write_set(
+ ws_handle, ws_meta, no_error);
+ }
+ else
+ {
+ // rollback_fragment() consumes sa
+ ret = rollback_fragment(server_state,
+ high_priority_service,
+ sa,
+ ws_handle,
+ ws_meta);
+ }
+ }
+ }
+ else if (wsrep::starts_transaction(ws_meta.flags()) &&
+ wsrep::commits_transaction(ws_meta.flags()))
+ {
+ ret = high_priority_service.start_transaction(ws_handle, ws_meta);
+ if (!ret)
+ {
+ wsrep::mutable_buffer err;
+ int const apply_err(high_priority_service.apply_write_set(
+ ws_meta, data, err));
+ if (!apply_err)
+ {
+ assert(err.size() == 0);
+ ret = high_priority_service.commit(ws_handle, ws_meta);
+ ret = ret || (high_priority_service.after_apply(), 0);
+ }
+ else
+ {
+ ret = high_priority_service.rollback(ws_handle, ws_meta);
+ ret = ret || (high_priority_service.after_apply(), 0);
+ ret = ret || high_priority_service.log_dummy_write_set(
+ ws_handle, ws_meta, err);
+ ret = resolve_return_error(err.size() > 0, ret, apply_err);
+ }
+ }
+ }
+ else if (wsrep::starts_transaction(ws_meta.flags()))
+ {
+ assert(server_state.find_streaming_applier(
+ ws_meta.server_id(), ws_meta.transaction_id()) == 0);
+ wsrep::high_priority_service* sa(
+ server_state.server_service().streaming_applier_service(
+ high_priority_service));
+ server_state.start_streaming_applier(
+ ws_meta.server_id(), ws_meta.transaction_id(), sa);
+ sa->start_transaction(ws_handle, ws_meta);
+ ret = apply_fragment(server_state,
+ high_priority_service,
+ sa,
+ ws_handle,
+ ws_meta,
+ data);
+ }
+ else if (ws_meta.flags() == 0 || ws_meta.flags() == wsrep::provider::flag::pa_unsafe ||
+ wsrep::prepares_transaction(ws_meta.flags()))
+ {
+ wsrep::high_priority_service* sa(
+ server_state.find_streaming_applier(
+ ws_meta.server_id(), ws_meta.transaction_id()));
+ if (sa == 0)
+ {
+ // It is possible that rapid group membership changes
+ // may cause streaming transaction be rolled back before
+ // commit fragment comes in. Although this is a valid
+ // situation, log a warning if a sac cannot be found as
+ // it may be an indication of a bug too.
+ wsrep::log_warning() << "Could not find applier context for "
+ << ws_meta.server_id()
+ << ": " << ws_meta.transaction_id();
+ wsrep::mutable_buffer no_error;
+ ret = high_priority_service.log_dummy_write_set(
+ ws_handle, ws_meta, no_error);
+ }
+ else
+ {
+ sa->next_fragment(ws_meta);
+ ret = apply_fragment(server_state,
+ high_priority_service,
+ sa,
+ ws_handle,
+ ws_meta,
+ data);
+ }
+ }
+ else if (wsrep::commits_transaction(ws_meta.flags()))
+ {
+ if (high_priority_service.is_replaying())
+ {
+ wsrep::mutable_buffer unused;
+ ret = high_priority_service.start_transaction(
+ ws_handle, ws_meta) ||
+ high_priority_service.apply_write_set(ws_meta, data, unused) ||
+ high_priority_service.commit(ws_handle, ws_meta);
+ }
+ else
+ {
+ wsrep::high_priority_service* sa(
+ server_state.find_streaming_applier(
+ ws_meta.server_id(), ws_meta.transaction_id()));
+ if (sa == 0)
+ {
+ // It is possible that rapid group membership changes
+ // may cause streaming transaction be rolled back before
+ // commit fragment comes in. Although this is a valid
+ // situation, log a warning if a sac cannot be found as
+ // it may be an indication of a bug too.
+ wsrep::log_warning()
+ << "Could not find applier context for "
+ << ws_meta.server_id()
+ << ": " << ws_meta.transaction_id();
+ wsrep::mutable_buffer no_error;
+ ret = high_priority_service.log_dummy_write_set(
+ ws_handle, ws_meta, no_error);
+ }
+ else
+ {
+ // Commit fragment consumes sa
+ sa->next_fragment(ws_meta);
+ ret = commit_fragment(server_state,
+ high_priority_service,
+ sa,
+ ws_handle,
+ ws_meta,
+ data);
+ }
+ }
+ }
+ else
+ {
+ assert(0);
+ }
+ if (ret)
+ {
+ wsrep::log_error() << "Failed to apply write set: " << ws_meta;
+ }
+ return ret;
+}
+
+static int apply_toi(wsrep::provider& provider,
+ wsrep::high_priority_service& high_priority_service,
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::const_buffer& data)
+{
+ if (wsrep::starts_transaction(ws_meta.flags()) &&
+ wsrep::commits_transaction(ws_meta.flags()))
+ {
+ //
+ // Regular TOI.
+ //
+ provider.commit_order_enter(ws_handle, ws_meta);
+ wsrep::mutable_buffer err;
+ int const apply_err(high_priority_service.apply_toi(ws_meta,data,err));
+ int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err));
+ return resolve_return_error(err.size() > 0, vote_err, apply_err);
+ }
+ else if (wsrep::starts_transaction(ws_meta.flags()))
+ {
+ provider.commit_order_enter(ws_handle, ws_meta);
+ wsrep::mutable_buffer err;
+ int const apply_err(high_priority_service.apply_nbo_begin(ws_meta, data, err));
+ int const vote_err(provider.commit_order_leave(ws_handle, ws_meta, err));
+ return resolve_return_error(err.size() > 0, vote_err, apply_err);
+ }
+ else if (wsrep::commits_transaction(ws_meta.flags()))
+ {
+ // NBO end event is ignored here, both local and applied
+ // have NBO end handled via local TOI calls.
+ provider.commit_order_enter(ws_handle, ws_meta);
+ wsrep::mutable_buffer err;
+ provider.commit_order_leave(ws_handle, ws_meta, err);
+ return 0;
+ }
+ else
+ {
+ assert(0);
+ return 0;
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Server State //
+//////////////////////////////////////////////////////////////////////////////
+
+int wsrep::server_state::load_provider(
+ const std::string& provider_spec, const std::string& provider_options,
+ const wsrep::provider::services& services)
+{
+ wsrep::log_info() << "Loading provider " << provider_spec
+ << " initial position: " << initial_position_;
+
+ provider_ = wsrep::provider::make_provider(*this,
+ provider_spec,
+ provider_options,
+ services);
+ return (provider_ ? 0 : 1);
+}
+
+void wsrep::server_state::unload_provider()
+{
+ delete provider_;
+ provider_ = 0;
+}
+
+int wsrep::server_state::connect(const std::string& cluster_name,
+ const std::string& cluster_address,
+ const std::string& state_donor,
+ bool bootstrap)
+{
+ bootstrap_ = is_bootstrap(cluster_address, bootstrap);
+ wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_;
+ return provider().connect(cluster_name, cluster_address, state_donor,
+ bootstrap_);
+}
+
+int wsrep::server_state::disconnect()
+{
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ // In case of failure situations which are caused by provider
+ // being shut down some failing operation may also try to shut
+ // down the replication. Check the state here and
+ // return success if the provider disconnect is already in progress
+ // or has completed.
+ if (state(lock) == s_disconnecting || state(lock) == s_disconnected)
+ {
+ return 0;
+ }
+ state(lock, s_disconnecting);
+ interrupt_state_waiters(lock);
+ }
+ return provider().disconnect();
+}
+
+wsrep::server_state::~server_state()
+{
+ delete provider_;
+}
+
+std::vector<wsrep::provider::status_variable>
+wsrep::server_state::status() const
+{
+ return provider().status();
+}
+
+
+wsrep::seqno wsrep::server_state::pause()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ // Disallow concurrent calls to pause to in order to have non-concurrent
+ // access to desynced_on_pause_ which is checked in resume() call.
+ wsrep::log_info() << "pause";
+ while (pause_count_ > 0)
+ {
+ cond_.wait(lock);
+ }
+ ++pause_count_;
+ assert(pause_seqno_.is_undefined());
+ lock.unlock();
+ pause_seqno_ = provider().pause();
+ lock.lock();
+ if (pause_seqno_.is_undefined())
+ {
+ --pause_count_;
+ }
+ return pause_seqno_;
+}
+
+void wsrep::server_state::resume()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ wsrep::log_info() << "resume";
+ assert(pause_seqno_.is_undefined() == false);
+ assert(pause_count_ == 1);
+ if (provider().resume())
+ {
+ throw wsrep::runtime_error("Failed to resume provider");
+ }
+ pause_seqno_ = wsrep::seqno::undefined();
+ --pause_count_;
+ cond_.notify_all();
+}
+
+wsrep::seqno wsrep::server_state::desync_and_pause()
+{
+ wsrep::log_info() << "Desyncing and pausing the provider";
+ // Temporary variable to store desync() return status. This will be
+ // assigned to desynced_on_pause_ after pause() call to prevent
+ // concurrent access to member variable desynced_on_pause_.
+ bool desync_successful;
+ if (desync())
+ {
+ // Desync may give transient error if the provider cannot
+ // communicate with the rest of the cluster. However, this
+ // error can be tolerated because if the provider can be
+ // paused successfully below.
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_server_state,
+ "Failed to desync server before pause");
+ desync_successful = false;
+ }
+ else
+ {
+ desync_successful = true;
+ }
+ wsrep::seqno ret(pause());
+ if (ret.is_undefined())
+ {
+ wsrep::log_warning() << "Failed to pause provider";
+ resync();
+ return wsrep::seqno::undefined();
+ }
+ else
+ {
+ desynced_on_pause_ = desync_successful;
+ }
+ wsrep::log_info() << "Provider paused at: " << ret;
+ return ret;
+}
+
+void wsrep::server_state::resume_and_resync()
+{
+ wsrep::log_info() << "Resuming and resyncing the provider";
+ try
+ {
+ // Assign desynced_on_pause_ to local variable before resuming
+ // in order to avoid concurrent access to desynced_on_pause_ member
+ // variable.
+ bool do_resync = desynced_on_pause_;
+ desynced_on_pause_ = false;
+ resume();
+ if (do_resync)
+ {
+ resync();
+ }
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_warning()
+ << "Resume and resync failed, server may have to be restarted";
+ }
+}
+
+std::string wsrep::server_state::prepare_for_sst()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ state(lock, s_joiner);
+ lock.unlock();
+ return server_service_.sst_request();
+}
+
+int wsrep::server_state::start_sst(const std::string& sst_request,
+ const wsrep::gtid& gtid,
+ bool bypass)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ state(lock, s_donor);
+ int ret(0);
+ lock.unlock();
+ if (server_service_.start_sst(sst_request, gtid, bypass))
+ {
+ lock.lock();
+ wsrep::log_warning() << "SST preparation failed";
+ return_from_donor_state(lock);
+ ret = 1;
+ }
+ return ret;
+}
+
+void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error)
+{
+ if (0 == error)
+ wsrep::log_info() << "SST sent: " << gtid;
+ else
+ wsrep::log_info() << "SST sending failed: " << error;
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ return_from_donor_state(lock);
+
+ lock.unlock();
+ enum provider::status const retval(provider().sst_sent(gtid, error));
+ if (retval != provider::success)
+ {
+ std::string msg("wsrep::sst_sent() returned an error: ");
+ msg += wsrep::provider::to_string(retval);
+ server_service_.log_message(wsrep::log::warning, msg.c_str());
+ }
+}
+
+int wsrep::server_state::sst_received(wsrep::client_service& cs,
+ int const error)
+try
+{
+ wsrep::log_info() << "SST received";
+ wsrep::gtid gtid(wsrep::gtid::undefined());
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(state_ == s_joiner || state_ == s_initialized);
+
+ // Run initialization only if the SST was successful.
+ // In case of SST failure the system is in undefined state
+ // may not be recoverable.
+ if (error == 0)
+ {
+ if (server_service_.sst_before_init())
+ {
+ if (init_initialized_ == false)
+ {
+ state(lock, s_initializing);
+ lock.unlock();
+ server_service_.debug_sync("on_view_wait_initialized");
+ lock.lock();
+ wait_until_state(lock, s_initialized);
+ assert(init_initialized_);
+ }
+ }
+ lock.unlock();
+
+ if (id_.is_undefined())
+ {
+ assert(0);
+ throw wsrep::runtime_error(
+ "wsrep::sst_received() called before connection to cluster");
+ }
+
+ gtid = server_service_.get_position(cs);
+ wsrep::log_info() << "Recovered position from storage: " << gtid;
+
+ lock.lock();
+ if (gtid.seqno() >= connected_gtid().seqno())
+ {
+ /* Now the node has all the data the cluster has: part in
+ * storage, part in replication event queue. */
+ state(lock, s_joined);
+ }
+ lock.unlock();
+
+ wsrep::view const v(server_service_.get_view(cs, id_));
+ wsrep::log_info() << "Recovered view from SST:\n" << v;
+
+ /*
+ * If the state id from recovered view has undefined ID, we may
+ * be upgrading from earlier version which does not provide
+ * view stored in stable storage. In this case we skip
+ * sanity checks and assigning the current view and wait
+ * until the first view delivery.
+ */
+ if (v.state_id().id().is_undefined() == false)
+ {
+ if (v.state_id().id() != gtid.id() ||
+ v.state_id().seqno() > gtid.seqno())
+ {
+ /* Since IN GENERAL we may not be able to recover SST GTID from
+ * the state data, we have to rely on SST script passing the
+ * GTID value explicitly.
+ * Here we check if the passed GTID makes any sense: it should
+ * have the same UUID and greater or equal seqno than the last
+ * logged view. */
+ std::ostringstream msg;
+ msg << "SST script passed bogus GTID: " << gtid
+ << ". Preceding view GTID: " << v.state_id();
+ throw wsrep::runtime_error(msg.str());
+ }
+
+ if (current_view_.status() == wsrep::view::primary)
+ {
+ previous_primary_view_ = current_view_;
+ }
+ current_view_ = v;
+ server_service_.log_view(NULL /* this view is stored already */, v);
+ }
+ else
+ {
+ wsrep::log_warning()
+ << "View recovered from stable storage was empty. If the "
+ << "server is doing rolling upgrade from previous version "
+ << "which does not support storing view info into stable "
+ << "storage, this is ok. Otherwise this may be a sign of "
+ << "malfunction.";
+ }
+ lock.lock();
+ recover_streaming_appliers_if_not_recovered(lock, cs);
+ lock.unlock();
+ }
+
+ enum provider::status const retval(provider().sst_received(gtid, error));
+ if (retval != provider::success)
+ {
+ wsrep::log_error() << "provider.sst_received() failed: "
+ << wsrep::provider::to_string(retval);
+ return 1;
+ }
+ return 0;
+}
+catch (const wsrep::runtime_error& e)
+{
+ wsrep::log_error() << "sst_received failed: " << e.what();
+ if (provider_)
+ {
+ provider_->sst_received(wsrep::gtid::undefined(), -EINTR);
+ }
+ return 1;
+}
+
+void wsrep::server_state::initialized()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ wsrep::log_info() << "Server initialized";
+ init_initialized_ = true;
+ if (server_service_.sst_before_init())
+ {
+ state(lock, s_initialized);
+ }
+ else
+ {
+ state(lock, s_initializing);
+ state(lock, s_initialized);
+ }
+}
+
+enum wsrep::provider::status
+wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
+ const
+{
+ return provider().wait_for_gtid(gtid, timeout);
+}
+
+int
+wsrep::server_state::set_encryption_key(std::vector<unsigned char>& key)
+{
+ encryption_key_ = key;
+ if (provider_)
+ {
+ wsrep::const_buffer const key(encryption_key_.data(),
+ encryption_key_.size());
+ enum provider::status const retval(provider_->enc_set_key(key));
+ if (retval != provider::success)
+ {
+ wsrep::log_error() << "Failed to set encryption key: "
+ << provider::to_string(retval);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+std::pair<wsrep::gtid, enum wsrep::provider::status>
+wsrep::server_state::causal_read(int timeout) const
+{
+ return provider().causal_read(timeout);
+}
+
+void wsrep::server_state::on_connect(const wsrep::view& view)
+{
+ // Sanity checks
+ if (view.own_index() < 0 ||
+ size_t(view.own_index()) >= view.members().size())
+ {
+ std::ostringstream os;
+ os << "Invalid view on connect: own index out of range: " << view;
+#ifndef NDEBUG
+ wsrep::log_error() << os.str();
+ assert(0);
+#endif
+ throw wsrep::runtime_error(os.str());
+ }
+
+ const size_t own_index(static_cast<size_t>(view.own_index()));
+ if (id_.is_undefined() == false && id_ != view.members()[own_index].id())
+ {
+ std::ostringstream os;
+ os << "Connection in connected state.\n"
+ << "Connected view:\n" << view
+ << "Previous view:\n" << current_view_
+ << "Current own ID: " << id_;
+#ifndef NDEBUG
+ wsrep::log_error() << os.str();
+ assert(0);
+#endif
+ throw wsrep::runtime_error(os.str());
+ }
+ else
+ {
+ id_ = view.members()[own_index].id();
+ }
+
+ wsrep::log_info() << "Server "
+ << name_
+ << " connected to cluster at position "
+ << view.state_id()
+ << " with ID "
+ << id_;
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ connected_gtid_ = view.state_id();
+ state(lock, s_connected);
+}
+
+void wsrep::server_state::on_primary_view(
+ const wsrep::view& view,
+ wsrep::high_priority_service* high_priority_service)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ assert(view.final() == false);
+ //
+ // Reached primary from connected state. This may mean the following
+ //
+ // 1) Server was joined to the cluster and got SST transfer
+ // 2) Server was partitioned from the cluster and got back
+ // 3) A new cluster was bootstrapped from non-prim cluster
+ //
+ // There is no enough information here what was the cause
+ // of the primary component, so we need to walk through
+ // all states leading to joined to notify possible state
+ // waiters in other threads.
+ //
+ if (server_service_.sst_before_init())
+ {
+ if (state_ == s_connected)
+ {
+ state(lock, s_joiner);
+ // We need to assign init_initialized_ here to local
+ // variable. If the value here was false, we need to skip
+ // the initializing -> initialized -> joined state cycle
+ // below. However, if we don't assign the value to
+ // local, it is possible that the main thread gets control
+ // between changing the state to initializing and checking
+ // initialized flag, which may cause the initialzing -> initialized
+ // state change to be executed even if it should not be.
+ const bool was_initialized(init_initialized_);
+ state(lock, s_initializing);
+ if (was_initialized)
+ {
+ // If server side has already been initialized,
+ // skip directly to s_joined.
+ state(lock, s_initialized);
+ }
+ }
+ }
+ else
+ {
+ if (state_ == s_connected)
+ {
+ state(lock, s_joiner);
+ }
+ }
+ if (init_initialized_ == false)
+ {
+ lock.unlock();
+ server_service_.debug_sync("on_view_wait_initialized");
+ lock.lock();
+ wait_until_state(lock, s_initialized);
+ }
+ assert(init_initialized_);
+
+ if (bootstrap_)
+ {
+ server_service_.bootstrap();
+ bootstrap_ = false;
+ }
+
+ assert(high_priority_service);
+
+ if (high_priority_service)
+ {
+ recover_streaming_appliers_if_not_recovered(lock,
+ *high_priority_service);
+ close_orphaned_sr_transactions(lock, *high_priority_service);
+ }
+
+ if (state(lock) < s_joined &&
+ view.state_id().seqno() >= connected_gtid().seqno())
+ {
+ // If we progressed beyond connected seqno, it means we have full state
+ state(lock, s_joined);
+ }
+}
+
+void wsrep::server_state::on_non_primary_view(
+ const wsrep::view& view,
+ wsrep::high_priority_service* high_priority_service)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ wsrep::log_info() << "Non-primary view";
+ if (view.final())
+ {
+ go_final(lock, view, high_priority_service);
+ }
+ else if (state_ != s_disconnecting)
+ {
+ state(lock, s_connected);
+ }
+}
+
+void wsrep::server_state::go_final(wsrep::unique_lock<wsrep::mutex>& lock,
+ const wsrep::view& view,
+ wsrep::high_priority_service* hps)
+{
+ (void)view; // avoid compiler warning "unused parameter 'view'"
+ assert(view.final());
+ assert(hps);
+ if (hps)
+ {
+ close_transactions_at_disconnect(*hps);
+ }
+ state(lock, s_disconnected);
+ id_ = wsrep::id::undefined();
+}
+
+void wsrep::server_state::on_view(const wsrep::view& view,
+ wsrep::high_priority_service* high_priority_service)
+{
+ wsrep::log_info()
+ << "================================================\nView:\n"
+ << view
+ << "=================================================";
+ if (current_view_.status() == wsrep::view::primary)
+ {
+ previous_primary_view_ = current_view_;
+ }
+ current_view_ = view;
+ switch (view.status())
+ {
+ case wsrep::view::primary:
+ on_primary_view(view, high_priority_service);
+ break;
+ case wsrep::view::non_primary:
+ on_non_primary_view(view, high_priority_service);
+ break;
+ case wsrep::view::disconnected:
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ go_final(lock, view, high_priority_service);
+ break;
+ }
+ default:
+ wsrep::log_warning() << "Unrecognized view status: " << view.status();
+ assert(0);
+ }
+
+ server_service_.log_view(high_priority_service, view);
+}
+
+void wsrep::server_state::on_sync()
+{
+ wsrep::log_info() << "Server " << name_ << " synced with group";
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ // Initial sync
+ if (server_service_.sst_before_init() && init_synced_ == false)
+ {
+ switch (state_)
+ {
+ case s_synced:
+ break;
+ case s_connected: // Seed node path: provider becomes
+ state(lock, s_joiner); // synced with itself before anything
+ WSREP_FALLTHROUGH; // else. Then goes DB initialization.
+ case s_joiner: // |
+ state(lock, s_initializing); // V
+ break;
+ case s_donor:
+ assert(false); // this should never happen
+ state(lock, s_joined);
+ state(lock, s_synced);
+ break;
+ case s_initialized:
+ state(lock, s_joined);
+ WSREP_FALLTHROUGH;
+ default:
+ /* State */
+ state(lock, s_synced);
+ };
+ }
+ else
+ {
+ // Calls to on_sync() in synced state are possible if
+ // server desyncs itself from the group. Provider does not
+ // inform about this through callbacks.
+ if (state_ != s_synced)
+ {
+ state(lock, s_synced);
+ }
+ }
+ init_synced_ = true;
+
+ enum wsrep::provider::status status(send_pending_rollback_events(lock));
+ if (status)
+ {
+ // TODO should be retried?
+ wsrep::log_warning()
+ << "Failed to flush rollback event cache: " << status;
+ }
+}
+
+int wsrep::server_state::on_apply(
+ wsrep::high_priority_service& high_priority_service,
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::const_buffer& data)
+{
+ if (is_toi(ws_meta.flags()))
+ {
+ return apply_toi(provider(), high_priority_service,
+ ws_handle, ws_meta, data);
+ }
+ else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags()))
+ {
+ // Not implemented yet.
+ assert(0);
+ return 0;
+ }
+ else
+ {
+ return apply_write_set(*this, high_priority_service,
+ ws_handle, ws_meta, data);
+ }
+}
+
+enum wsrep::server_state::state wsrep::server_state::state(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED) const
+{
+ assert(lock.owns_lock());
+ return state_;
+}
+
+void wsrep::server_state::start_streaming_client(
+ wsrep::client_state* client_state)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_server_state,
+ "Start streaming client: " << client_state->id());
+ if (streaming_clients_.insert(
+ std::make_pair(client_state->id(), client_state)).second == false)
+ {
+ wsrep::log_warning() << "Failed to insert streaming client "
+ << client_state->id();
+ assert(0);
+ }
+}
+
+void wsrep::server_state::convert_streaming_client_to_applier(
+ wsrep::client_state* client_state)
+{
+ // create streaming_applier beforehand as server_state lock should
+ // not be held when calling server_service methods
+ wsrep::high_priority_service* streaming_applier(
+ server_service_.streaming_applier_service(
+ client_state->client_service()));
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_server_state,
+ "Convert streaming client to applier "
+ << client_state->id());
+ streaming_clients_map::iterator i(
+ streaming_clients_.find(client_state->id()));
+ assert(i != streaming_clients_.end());
+ if (i == streaming_clients_.end())
+ {
+ wsrep::log_warning() << "Unable to find streaming client "
+ << client_state->id();
+ assert(0);
+ }
+ else
+ {
+ streaming_clients_.erase(i);
+ }
+
+ // Convert to applier only if the state is not disconnected. In
+ // disconnected state the applier map is supposed to be empty
+ // and it will be reconstructed from fragment storage when
+ // joining back to cluster.
+ if (state(lock) != s_disconnected)
+ {
+ if (streaming_applier->adopt_transaction(client_state->transaction()))
+ {
+ log_adopt_error(client_state->transaction());
+ streaming_applier->after_apply();
+ server_service_.release_high_priority_service(streaming_applier);
+ return;
+ }
+ if (streaming_appliers_.insert(
+ std::make_pair(
+ std::make_pair(client_state->transaction().server_id(),
+ client_state->transaction().id()),
+ streaming_applier)).second == false)
+ {
+ wsrep::log_warning() << "Could not insert streaming applier "
+ << id_
+ << ", "
+ << client_state->transaction().id();
+ assert(0);
+ }
+ }
+ else
+ {
+ server_service_.release_high_priority_service(streaming_applier);
+ client_state->client_service().store_globals();
+ }
+}
+
+
+void wsrep::server_state::stop_streaming_client(
+ wsrep::client_state* client_state)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_server_state,
+ "Stop streaming client: " << client_state->id());
+ streaming_clients_map::iterator i(
+ streaming_clients_.find(client_state->id()));
+ assert(i != streaming_clients_.end());
+ if (i == streaming_clients_.end())
+ {
+ wsrep::log_warning() << "Unable to find streaming client "
+ << client_state->id();
+ assert(0);
+ return;
+ }
+ else
+ {
+ streaming_clients_.erase(i);
+ cond_.notify_all();
+ }
+}
+
+void wsrep::server_state::start_streaming_applier(
+ const wsrep::id& server_id,
+ const wsrep::transaction_id& transaction_id,
+ wsrep::high_priority_service* sa)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ if (streaming_appliers_.insert(
+ std::make_pair(std::make_pair(server_id, transaction_id),
+ sa)).second == false)
+ {
+ wsrep::log_error() << "Could not insert streaming applier";
+ throw wsrep::fatal_error();
+ }
+}
+
+void wsrep::server_state::stop_streaming_applier(
+ const wsrep::id& server_id,
+ const wsrep::transaction_id& transaction_id)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ streaming_appliers_map::iterator i(
+ streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
+ assert(i != streaming_appliers_.end());
+ if (i == streaming_appliers_.end())
+ {
+ wsrep::log_warning() << "Could not find streaming applier for "
+ << server_id << ":" << transaction_id;
+ }
+ else
+ {
+ streaming_appliers_.erase(i);
+ cond_.notify_all();
+ }
+}
+
+wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
+ const wsrep::id& server_id,
+ const wsrep::transaction_id& transaction_id) const
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ streaming_appliers_map::const_iterator i(
+ streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
+ return (i == streaming_appliers_.end() ? 0 : i->second);
+}
+
+wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
+ const wsrep::xid& xid) const
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ streaming_appliers_map::const_iterator i(streaming_appliers_.begin());
+ while (i != streaming_appliers_.end())
+ {
+ wsrep::high_priority_service* sa(i->second);
+ if (sa->transaction().xid() == xid)
+ {
+ return sa;
+ }
+ i++;
+ }
+ return NULL;
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Private //
+//////////////////////////////////////////////////////////////////////////////
+
+int wsrep::server_state::desync(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ ++desync_count_;
+ lock.unlock();
+ int ret(provider().desync());
+ lock.lock();
+ if (ret)
+ {
+ --desync_count_;
+ }
+ return ret;
+}
+
+void wsrep::server_state::resync(wsrep::unique_lock<wsrep::mutex>&
+ lock WSREP_UNUSED)
+{
+ assert(lock.owns_lock());
+ assert(desync_count_ > 0);
+ if (desync_count_ > 0)
+ {
+ --desync_count_;
+ if (provider().resync())
+ {
+ throw wsrep::runtime_error("Failed to resync");
+ }
+ }
+ else
+ {
+ wsrep::log_warning() << "desync_count " << desync_count_
+ << " on resync";
+ }
+}
+
+
+void wsrep::server_state::state(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
+ enum wsrep::server_state::state state)
+{
+ assert(lock.owns_lock());
+ static const char allowed[n_states_][n_states_] =
+ {
+ /* dis, ing, ized, cted, jer, jed, dor, sed, ding to/from */
+ { 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */
+ { 1, 0, 1, 0, 0, 0, 0, 0, 1}, /* ing */
+ { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* ized */
+ { 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */
+ { 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */
+ { 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */
+ { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */
+ { 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */
+ { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */
+ };
+
+ if (allowed[state_][state] == false)
+ {
+ std::ostringstream os;
+ os << "server: " << name_ << " unallowed state transition: "
+ << wsrep::to_string(state_) << " -> " << wsrep::to_string(state);
+ wsrep::log_warning() << os.str() << "\n";
+ assert(0);
+ }
+
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_server_state,
+ "server " << name_ << " state change: "
+ << to_c_string(state_) << " -> "
+ << to_c_string(state));
+ state_hist_.push_back(state_);
+ server_service_.log_state_change(state_, state);
+ state_ = state;
+ cond_.notify_all();
+ while (state_waiters_[state_])
+ {
+ cond_.wait(lock);
+ }
+}
+
+void wsrep::server_state::wait_until_state(
+ wsrep::unique_lock<wsrep::mutex>& lock,
+ enum wsrep::server_state::state state) const
+{
+ ++state_waiters_[state];
+ while (state_ != state)
+ {
+ cond_.wait(lock);
+ // If the waiter waits for any other state than disconnecting
+ // or disconnected and the state has been changed to disconnecting,
+ // this usually means that some error was encountered
+ if (state != s_disconnecting && state != s_disconnected
+ && state_ == s_disconnecting)
+ {
+ throw wsrep::runtime_error("State wait was interrupted");
+ }
+ }
+ --state_waiters_[state];
+ cond_.notify_all();
+}
+
+int wsrep::server_state::wait_until_state(enum state state) const
+try
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ wait_until_state(lock, state);
+ return 0;
+}
+catch (...)
+{
+ return 1;
+}
+
+void wsrep::server_state::interrupt_state_waiters(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
+{
+ assert(lock.owns_lock());
+ cond_.notify_all();
+}
+
+template <class C>
+void wsrep::server_state::recover_streaming_appliers_if_not_recovered(
+ wsrep::unique_lock<wsrep::mutex>& lock, C& c)
+{
+ assert(lock.owns_lock());
+ if (streaming_appliers_recovered_ == false)
+ {
+ lock.unlock();
+ server_service_.recover_streaming_appliers(c);
+ lock.lock();
+ }
+ streaming_appliers_recovered_ = true;
+}
+
+class transaction_state_cmp
+{
+public:
+ transaction_state_cmp(const enum wsrep::transaction::state s)
+ : state_(s) { }
+ bool operator()(const std::pair<wsrep::client_id,
+ wsrep::client_state*>& vt) const
+ {
+ return vt.second->transaction().state() == state_;
+ }
+private:
+ enum wsrep::transaction::state state_;
+};
+
+void wsrep::server_state::close_orphaned_sr_transactions(
+ wsrep::unique_lock<wsrep::mutex>& lock,
+ wsrep::high_priority_service& high_priority_service)
+{
+ assert(lock.owns_lock());
+
+ // When the originator of an SR transaction leaves the primary
+ // component of the cluster, that SR must be rolled back. When two
+ // consecutive primary views have the same membership, the system
+ // may have been in a state with no primary components.
+ // Example with 2 node cluster:
+ // - (1,2 primary)
+ // - (1 non-primary) and (2 non-primary)
+ // - (1,2 primary)
+ // We need to rollback SRs owned by both 1 and 2.
+ // Notice that since the introduction of rollback_event_queue_,
+ // checking for equal consecutive views is no longer needed.
+ // However, we must keep it here for the time being, for backwards
+ // compatibility.
+ const bool equal_consecutive_views =
+ current_view_.equal_membership(previous_primary_view_);
+
+ if (current_view_.own_index() == -1 || equal_consecutive_views)
+ {
+ streaming_clients_map::iterator i;
+ transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared);
+ while ((i = std::find_if_not(streaming_clients_.begin(),
+ streaming_clients_.end(),
+ prepared_state_cmp))
+ != streaming_clients_.end())
+ {
+ wsrep::client_id client_id(i->first);
+ wsrep::transaction_id transaction_id(i->second->transaction().id());
+ // It is safe to unlock the server state temporarily here.
+ // The processing happens inside view handler which is
+ // protected by the provider commit ordering critical
+ // section. The lock must be unlocked temporarily to
+ // allow converting the current client to streaming
+ // applier in transaction::streaming_rollback().
+ // The iterator i may be invalidated when the server state
+ // remains unlocked, so it should not be accessed after
+ // the bf abort call.
+ lock.unlock();
+ i->second->total_order_bf_abort(current_view_.view_seqno());
+ lock.lock();
+ streaming_clients_map::const_iterator found_i;
+ while ((found_i = streaming_clients_.find(client_id)) !=
+ streaming_clients_.end() &&
+ found_i->second->transaction().id() == transaction_id)
+ {
+ cond_.wait(lock);
+ }
+ }
+ }
+
+ streaming_appliers_map::iterator i(streaming_appliers_.begin());
+ while (i != streaming_appliers_.end())
+ {
+ wsrep::high_priority_service* streaming_applier(i->second);
+
+ // Rollback SR on equal consecutive primary views or if its
+ // originator is not in the current view.
+ // Transactions in prepared state must be committed or
+ // rolled back explicitly, those are never rolled back here.
+ if ((streaming_applier->transaction().state() !=
+ wsrep::transaction::s_prepared) &&
+ (equal_consecutive_views ||
+ not current_view_.is_member(
+ streaming_applier->transaction().server_id())))
+ {
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_server_state,
+ "Removing SR fragments for "
+ << i->first.first
+ << ", " << i->first.second);
+ wsrep::id server_id(i->first.first);
+ wsrep::transaction_id transaction_id(i->first.second);
+ int adopt_error;
+ if ((adopt_error = high_priority_service.adopt_transaction(
+ streaming_applier->transaction())))
+ {
+ log_adopt_error(streaming_applier->transaction());
+ }
+ // Even if the transaction adopt above fails, we roll back
+ // the transaction. Adopt error will leave stale entries
+ // in the streaming log which can be removed manually.
+ {
+ wsrep::high_priority_switch sw(high_priority_service,
+ *streaming_applier);
+ streaming_applier->rollback(
+ wsrep::ws_handle(), wsrep::ws_meta());
+ streaming_applier->after_apply();
+ }
+
+ streaming_appliers_.erase(i++);
+ server_service_.release_high_priority_service(streaming_applier);
+ high_priority_service.store_globals();
+ wsrep::ws_meta ws_meta(
+ wsrep::gtid(),
+ wsrep::stid(server_id, transaction_id, wsrep::client_id()),
+ wsrep::seqno::undefined(), 0);
+ lock.unlock();
+ if (adopt_error == 0)
+ {
+ high_priority_service.remove_fragments(ws_meta);
+ high_priority_service.commit(wsrep::ws_handle(transaction_id, 0),
+ ws_meta);
+ }
+ high_priority_service.after_apply();
+ lock.lock();
+ }
+ else
+ {
+ ++i;
+ }
+ }
+}
+
+void wsrep::server_state::close_transactions_at_disconnect(
+ wsrep::high_priority_service& high_priority_service)
+{
+ // Close streaming applier without removing fragments
+ // from fragment storage. When the server is started again,
+ // it must be able to recover ongoing streaming transactions.
+ streaming_appliers_map::iterator i(streaming_appliers_.begin());
+ while (i != streaming_appliers_.end())
+ {
+ wsrep::high_priority_service* streaming_applier(i->second);
+ {
+ wsrep::high_priority_switch sw(high_priority_service,
+ *streaming_applier);
+ streaming_applier->rollback(
+ wsrep::ws_handle(), wsrep::ws_meta());
+ streaming_applier->after_apply();
+ }
+ streaming_appliers_.erase(i++);
+ server_service_.release_high_priority_service(streaming_applier);
+ high_priority_service.store_globals();
+ }
+ streaming_appliers_recovered_ = false;
+}
+
+//
+// Rollback event queue
+//
+
+void wsrep::server_state::queue_rollback_event(
+ const wsrep::transaction_id& id)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+#ifndef NDEBUG
+ // Make sure we don't have duplicate
+ // transaction ids in rollback event queue.
+ // There is no need to do this in release
+ // build given that caller (streaming_rollback())
+ // should avoid duplicates.
+ for (auto i : rollback_event_queue_)
+ {
+ assert(id != i);
+ }
+#endif
+ rollback_event_queue_.push_back(id);
+}
+
+enum wsrep::provider::status
+wsrep::server_state::send_pending_rollback_events(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
+{
+ assert(lock.owns_lock());
+ while (not rollback_event_queue_.empty())
+ {
+ const wsrep::transaction_id& id(rollback_event_queue_.front());
+ const enum wsrep::provider::status status(provider().rollback(id));
+ if (status)
+ {
+ return status;
+ }
+ rollback_event_queue_.pop_front();
+ }
+ return wsrep::provider::success;
+}
+
+enum wsrep::provider::status
+wsrep::server_state::send_pending_rollback_events()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ return send_pending_rollback_events(lock);
+}
+
+void wsrep::server_state::return_from_donor_state(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ // v26 API does not have JOINED event, so in anticipation of SYNCED
+ // we must do it here. Do not modify the state if donor lost the
+ // donor state e.g. due to cluster partitioning.
+ if (state(lock) == s_donor)
+ {
+ state(lock, s_joined);
+ }
+}
diff --git a/wsrep-lib/src/service_helpers.hpp b/wsrep-lib/src/service_helpers.hpp
new file mode 100644
index 00000000..6e9f9ca3
--- /dev/null
+++ b/wsrep-lib/src/service_helpers.hpp
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_SERVICE_HELPERS_HPP
+#define WSREP_SERVICE_HELPERS_HPP
+
+#include "wsrep/logger.hpp"
+
+#include <dlfcn.h>
+#include <cerrno>
+
+namespace wsrep_impl
+{
+ template <typename InitFn>
+ int service_probe(void* dlh, const char* symbol, const char* service_name)
+ {
+ union {
+ InitFn dlfun;
+ void* obj;
+ } alias;
+ // Clear previous errors
+ (void)dlerror();
+ alias.obj = dlsym(dlh, symbol);
+ if (alias.obj)
+ {
+ return 0;
+ }
+ else
+ {
+ wsrep::log_warning() << "Support for " << service_name
+ << " " << symbol
+ << " not found from provider: "
+ << dlerror();
+ return ENOTSUP;
+ }
+ }
+
+
+ template <typename InitFn, typename ServiceCallbacks>
+ int service_init(void* dlh,
+ const char* symbol,
+ ServiceCallbacks service_callbacks,
+ const char* service_name)
+ {
+ union {
+ InitFn dlfun;
+ void* obj;
+ } alias;
+ // Clear previous errors
+ (void)dlerror();
+ alias.obj = dlsym(dlh, symbol);
+ if (alias.obj)
+ {
+ wsrep::log_info() << "Initializing " << service_name;
+ return (*alias.dlfun)(service_callbacks);
+ }
+ else
+ {
+ wsrep::log_info()
+ << "Provider does not support " << service_name;
+ return ENOTSUP;
+ }
+ }
+
+ template <typename DeinitFn>
+ void service_deinit(void* dlh, const char* symbol, const char* service_name)
+ {
+ union {
+ DeinitFn dlfun;
+ void* obj;
+ } alias;
+ // Clear previous errors
+ (void)dlerror();
+ alias.obj = dlsym(dlh, symbol);
+ if (alias.obj)
+ {
+ wsrep::log_info() << "Deinitializing " << service_name;
+ (*alias.dlfun)();
+ }
+ else
+ {
+ wsrep::log_info()
+ << "Provider does not support deinitializing of "
+ << service_name;
+ }
+ }
+}
+
+#endif // WSREP_SERVICE_HELPERS_HPP
+
diff --git a/wsrep-lib/src/sr_key_set.cpp b/wsrep-lib/src/sr_key_set.cpp
new file mode 100644
index 00000000..31b6e692
--- /dev/null
+++ b/wsrep-lib/src/sr_key_set.cpp
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2023 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/sr_key_set.hpp"
+
+#include "wsrep/key.hpp"
+
+#include <cassert>
+
+void wsrep::sr_key_set::insert(const wsrep::key& key)
+{
+ assert(key.size() >= 2);
+ if (key.size() < 2)
+ {
+ throw wsrep::runtime_error("Invalid key size");
+ }
+
+ root_[std::string(static_cast<const char*>(key.key_parts()[0].data()),
+ key.key_parts()[0].size())]
+ .insert(std::string(static_cast<const char*>(key.key_parts()[1].data()),
+ key.key_parts()[1].size()));
+}
+
+void wsrep::sr_key_set::clear()
+{
+ root_.clear();
+}
diff --git a/wsrep-lib/src/streaming_context.cpp b/wsrep-lib/src/streaming_context.cpp
new file mode 100644
index 00000000..c5423079
--- /dev/null
+++ b/wsrep-lib/src/streaming_context.cpp
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2023 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/streaming_context.hpp"
+
+#include <cassert>
+
+void wsrep::streaming_context::params(enum fragment_unit fragment_unit,
+ size_t fragment_size)
+{
+ if (fragment_size)
+ {
+ WSREP_LOG_DEBUG(
+ wsrep::log::debug_log_level(), wsrep::log::debug_level_streaming,
+ "Enabling streaming: " << fragment_unit << " " << fragment_size);
+ }
+ else
+ {
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_streaming,
+ "Disabling streaming");
+ }
+ fragment_unit_ = fragment_unit;
+ fragment_size_ = fragment_size;
+ reset_unit_counter();
+}
+
+void wsrep::streaming_context::enable(enum fragment_unit fragment_unit,
+ size_t fragment_size)
+{
+ WSREP_LOG_DEBUG(
+ wsrep::log::debug_log_level(), wsrep::log::debug_level_streaming,
+ "Enabling streaming: " << fragment_unit << " " << fragment_size);
+ assert(fragment_size > 0);
+ fragment_unit_ = fragment_unit;
+ fragment_size_ = fragment_size;
+}
+
+void wsrep::streaming_context::disable()
+{
+ WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
+ wsrep::log::debug_level_streaming, "Disabling streaming");
+ fragment_size_ = 0;
+}
+
+void wsrep::streaming_context::stored(wsrep::seqno seqno)
+{
+ check_fragment_seqno(seqno);
+ fragments_.push_back(seqno);
+}
+
+void wsrep::streaming_context::applied(wsrep::seqno seqno)
+{
+ check_fragment_seqno(seqno);
+ ++fragments_certified_;
+ fragments_.push_back(seqno);
+}
+
+void wsrep::streaming_context::rolled_back(wsrep::transaction_id id)
+{
+ assert(rollback_replicated_for_ == wsrep::transaction_id::undefined());
+ rollback_replicated_for_ = id;
+}
+
+void wsrep::streaming_context::cleanup()
+{
+ fragments_certified_ = 0;
+ fragments_.clear();
+ rollback_replicated_for_ = wsrep::transaction_id::undefined();
+ unit_counter_ = 0;
+ log_position_ = 0;
+}
+
+void wsrep::streaming_context::check_fragment_seqno(
+ wsrep::seqno seqno WSREP_UNUSED)
+{
+ assert(seqno.is_undefined() == false);
+ assert(fragments_.empty() || fragments_.back() < seqno);
+}
diff --git a/wsrep-lib/src/thread.cpp b/wsrep-lib/src/thread.cpp
new file mode 100644
index 00000000..5fec0ff5
--- /dev/null
+++ b/wsrep-lib/src/thread.cpp
@@ -0,0 +1,30 @@
+/*
+ * Copyright (C) 2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/thread.hpp"
+
+#include <ostream>
+
+std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::thread::id& id)
+{
+ std::ios_base::fmtflags orig_flags(os.flags());
+ os << std::hex << id.thread_;
+ os.flags(orig_flags);
+ return os;
+}
diff --git a/wsrep-lib/src/thread_service_v1.cpp b/wsrep-lib/src/thread_service_v1.cpp
new file mode 100644
index 00000000..ee81ba31
--- /dev/null
+++ b/wsrep-lib/src/thread_service_v1.cpp
@@ -0,0 +1,285 @@
+/*
+ * Copyright (C) 2019-2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "thread_service_v1.hpp"
+#include "service_helpers.hpp"
+
+#include "wsrep/thread_service.hpp"
+#include "wsrep/logger.hpp"
+#include "v26/wsrep_thread_service.h"
+
+#include <cassert>
+#include <dlfcn.h>
+#include <cerrno>
+
+namespace wsrep_thread_service_v1
+{
+ //
+ // Thread service callbacks
+ //
+
+ // Pointer to thread service implementation provided by
+ // the application.
+ static wsrep::thread_service* thread_service_impl{ 0 };
+ static std::atomic<size_t> use_count;
+
+ static const wsrep_thread_key_t* thread_key_create_cb(const char* name)
+ {
+ assert(thread_service_impl);
+ return reinterpret_cast<const wsrep_thread_key_t*>(
+ thread_service_impl->create_thread_key(name));
+ ;
+ }
+
+ static int thread_create_cb(const wsrep_thread_key_t* key,
+ wsrep_thread_t** thread,
+ void* (*fn)(void*), void* args)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->create_thread(
+ reinterpret_cast<const wsrep::thread_service::thread_key*>(key),
+ reinterpret_cast<wsrep::thread_service::thread**>(thread), fn,
+ args);
+ }
+
+ int thread_detach_cb(wsrep_thread_t* thread)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->detach(
+ reinterpret_cast<wsrep::thread_service::thread*>(thread));
+ }
+
+ int thread_equal_cb(wsrep_thread_t* thread_1, wsrep_thread_t* thread_2)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->equal(
+ reinterpret_cast<wsrep::thread_service::thread*>(thread_1),
+ reinterpret_cast<wsrep::thread_service::thread*>(thread_2));
+ }
+
+ __attribute__((noreturn))
+ void thread_exit_cb(wsrep_thread_t* thread, void* retval)
+ {
+ assert(thread_service_impl);
+ thread_service_impl->exit(
+ reinterpret_cast<wsrep::thread_service::thread*>(thread), retval);
+ throw; // Implementation broke the contract and returned.
+ }
+
+ int thread_join_cb(wsrep_thread_t* thread, void** retval)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->join(
+ reinterpret_cast<wsrep::thread_service::thread*>(thread), retval);
+ }
+
+ wsrep_thread_t* thread_self_cb(void)
+ {
+ assert(thread_service_impl);
+ return reinterpret_cast<wsrep_thread_t*>(thread_service_impl->self());
+ }
+
+ int thread_setschedparam_cb(wsrep_thread_t* thread, int policy,
+ const struct sched_param* sp)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->setschedparam(
+ reinterpret_cast<wsrep::thread_service::thread*>(thread),
+ policy, sp);
+ }
+
+ int thread_getschedparam_cb(wsrep_thread_t* thread, int* policy,
+ struct sched_param* sp)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->getschedparam(
+ reinterpret_cast<wsrep::thread_service::thread*>(thread), policy,
+ sp);
+ }
+
+ const wsrep_mutex_key_t* mutex_key_create_cb(const char* name)
+ {
+ assert(thread_service_impl);
+ return reinterpret_cast<const wsrep_mutex_key_t*>(
+ thread_service_impl->create_mutex_key(name));
+ }
+
+ wsrep_mutex_t* mutex_init_cb(const wsrep_mutex_key_t* key, void* memblock,
+ size_t memblock_size)
+ {
+ assert(thread_service_impl);
+ return reinterpret_cast<wsrep_mutex_t*>(
+ thread_service_impl->init_mutex(
+ reinterpret_cast<const wsrep::thread_service::mutex_key*>(key),
+ memblock, memblock_size));
+ }
+
+ int mutex_destroy_cb(wsrep_mutex_t* mutex)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->destroy(
+ reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
+ }
+ int mutex_lock_cb(wsrep_mutex_t* mutex)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->lock(
+ reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
+ }
+
+ int mutex_trylock_cb(wsrep_mutex_t* mutex)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->trylock(
+ reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
+ }
+
+ int mutex_unlock_cb(wsrep_mutex_t* mutex)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->unlock(
+ reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
+ }
+
+ const wsrep_cond_key_t* cond_key_create_cb(const char* name)
+ {
+ assert(thread_service_impl);
+ return reinterpret_cast<const wsrep_cond_key_t*>(
+ thread_service_impl->create_cond_key(name));
+ }
+
+ wsrep_cond_t* cond_init_cb(const wsrep_cond_key_t* key, void* memblock,
+ size_t memblock_size)
+ {
+ assert(thread_service_impl);
+ return reinterpret_cast<wsrep_cond_t*>(thread_service_impl->init_cond(
+ reinterpret_cast<const wsrep::thread_service::cond_key*>(key),
+ memblock, memblock_size));
+ }
+
+ int cond_destroy_cb(wsrep_cond_t* cond)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->destroy(
+ reinterpret_cast<wsrep::thread_service::cond*>(cond));
+ }
+
+ int cond_wait_cb(wsrep_cond_t* cond, wsrep_mutex_t* mutex)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->wait(
+ reinterpret_cast<wsrep::thread_service::cond*>(cond),
+ reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
+ }
+
+ int cond_timedwait_cb(wsrep_cond_t* cond, wsrep_mutex_t* mutex,
+ const struct timespec* ts)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->timedwait(
+ reinterpret_cast<wsrep::thread_service::cond*>(cond),
+ reinterpret_cast<wsrep::thread_service::mutex*>(mutex), ts);
+ }
+
+ int cond_signal_cb(wsrep_cond_t* cond)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->signal(
+ reinterpret_cast<wsrep::thread_service::cond*>(cond));
+ }
+
+ int cond_broadcast_cb(wsrep_cond_t* cond)
+ {
+ assert(thread_service_impl);
+ return thread_service_impl->broadcast(
+ reinterpret_cast<wsrep::thread_service::cond*>(cond));
+ }
+
+ static wsrep_thread_service_v1_t thread_service_callbacks
+ = { thread_key_create_cb,
+ thread_create_cb,
+ thread_detach_cb,
+ thread_equal_cb,
+ thread_exit_cb,
+ thread_join_cb,
+ thread_self_cb,
+ thread_setschedparam_cb,
+ thread_getschedparam_cb,
+ mutex_key_create_cb,
+ mutex_init_cb,
+ mutex_destroy_cb,
+ mutex_lock_cb,
+ mutex_trylock_cb,
+ mutex_unlock_cb,
+ cond_key_create_cb,
+ cond_init_cb,
+ cond_destroy_cb,
+ cond_wait_cb,
+ cond_timedwait_cb,
+ cond_signal_cb,
+ cond_broadcast_cb };
+}
+
+int wsrep::thread_service_v1_probe(void* dlh)
+{
+ typedef int (*init_fn)(wsrep_thread_service_v1_t*);
+ typedef void (*deinit_fn)();
+ if (wsrep_impl::service_probe<init_fn>(
+ dlh, WSREP_THREAD_SERVICE_INIT_FUNC_V1, "thread service v1") ||
+ wsrep_impl::service_probe<deinit_fn>(
+ dlh, WSREP_THREAD_SERVICE_DEINIT_FUNC_V1, "thread service v1"))
+ {
+ wsrep::log_warning() << "Provider does not support thread service v1";
+ return 1;
+ }
+ return 0;
+}
+
+int wsrep::thread_service_v1_init(void* dlh,
+ wsrep::thread_service* thread_service)
+{
+ if (not (dlh && thread_service)) return EINVAL;
+ typedef int (*init_fn)(wsrep_thread_service_v1_t*);
+ wsrep_thread_service_v1::thread_service_impl = thread_service;
+ int ret(0);
+ if ((ret = wsrep_impl::service_init<init_fn>(
+ dlh, WSREP_THREAD_SERVICE_INIT_FUNC_V1,
+ &wsrep_thread_service_v1::thread_service_callbacks,
+ "thread service v1")))
+ {
+ wsrep_thread_service_v1::thread_service_impl = 0;
+ }
+ else
+ {
+ ++wsrep_thread_service_v1::use_count;
+ }
+ return ret;
+}
+
+void wsrep::thread_service_v1_deinit(void* dlh)
+{
+ typedef int (*deinit_fn)();
+ wsrep_impl::service_deinit<deinit_fn>(
+ dlh, WSREP_THREAD_SERVICE_DEINIT_FUNC_V1, "thread service v1");
+ --wsrep_thread_service_v1::use_count;
+ if (wsrep_thread_service_v1::use_count == 0)
+ {
+ wsrep_thread_service_v1::thread_service_impl = 0;
+ }
+}
diff --git a/wsrep-lib/src/thread_service_v1.hpp b/wsrep-lib/src/thread_service_v1.hpp
new file mode 100644
index 00000000..b8300041
--- /dev/null
+++ b/wsrep-lib/src/thread_service_v1.hpp
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_THREAD_SERVICE_V1_HPP
+#define WSREP_THREAD_SERVICE_V1_HPP
+
+namespace wsrep
+{
+ class thread_service;
+ /**
+ * Probe thread_service_v1 support in loaded library.
+ *
+ * @param dlh Handle returned by dlopen().
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int thread_service_v1_probe(void *dlh);
+
+ /**
+ * Initialize the thread service.
+ *
+ * @param dlh Handle returned by dlopen().
+ * @params thread_service Pointer to wsrep::thread_service implementation.
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int thread_service_v1_init(void* dlh,
+ wsrep::thread_service* thread_service);
+
+ /**
+ * Deinitialize the thread service.
+ *
+ * @params dlh Handler returned by dlopen().
+ */
+ void thread_service_v1_deinit(void* dlh);
+
+}
+
+#endif // WSREP_THREAD_SERVICE_V1_HPP
diff --git a/wsrep-lib/src/tls_service_v1.cpp b/wsrep-lib/src/tls_service_v1.cpp
new file mode 100644
index 00000000..8746a767
--- /dev/null
+++ b/wsrep-lib/src/tls_service_v1.cpp
@@ -0,0 +1,232 @@
+/*
+ * Copyright (C) 2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "tls_service_v1.hpp"
+
+#include "wsrep/tls_service.hpp"
+#include "wsrep/logger.hpp"
+#include "v26/wsrep_tls_service.h"
+#include "service_helpers.hpp"
+
+#include <cassert>
+
+namespace wsrep_tls_service_v1
+{
+ static wsrep::tls_service* tls_service_impl{0};
+ static std::atomic<size_t> use_count;
+
+ static int tls_stream_init_cb(
+ wsrep_tls_context_t*,
+ wsrep_tls_stream_t* stream)
+ {
+ assert(tls_service_impl);
+ stream->opaque =
+ tls_service_impl->create_tls_stream(stream->fd);
+ if (not stream->opaque)
+ {
+ return ENOMEM;
+ }
+ return 0;
+ }
+
+ static void tls_stream_deinit_cb(
+ wsrep_tls_context_t*,
+ wsrep_tls_stream_t* stream)
+ {
+ assert(tls_service_impl);
+ tls_service_impl->destroy(
+ reinterpret_cast<wsrep::tls_stream*>(stream->opaque));
+ }
+
+ static int tls_stream_get_error_number_cb(
+ wsrep_tls_context_t*,
+ const wsrep_tls_stream_t* stream)
+ {
+ assert(tls_service_impl);
+ return tls_service_impl->get_error_number(
+ reinterpret_cast<const wsrep::tls_stream*>(stream->opaque));
+ }
+
+ static const void* tls_stream_get_error_category_cb(
+ wsrep_tls_context_t*,
+ const wsrep_tls_stream_t* stream)
+ {
+ assert(tls_service_impl);
+ return tls_service_impl->get_error_category(
+ reinterpret_cast<const wsrep::tls_stream*>(stream->opaque));
+ }
+
+ static const char* tls_error_message_get_cb(
+ wsrep_tls_context_t*,
+ const wsrep_tls_stream_t* stream,
+ int value, const void* category)
+ {
+ assert(tls_service_impl);
+ return tls_service_impl->get_error_message(
+ reinterpret_cast<const wsrep::tls_stream*>(stream->opaque), value, category);
+ }
+
+ static enum wsrep_tls_result map_return_value(ssize_t status)
+ {
+ switch (status)
+ {
+ case wsrep::tls_service::success:
+ return wsrep_tls_result_success;
+ case wsrep::tls_service::want_read:
+ return wsrep_tls_result_want_read;
+ case wsrep::tls_service::want_write:
+ return wsrep_tls_result_want_write;
+ case wsrep::tls_service::eof:
+ return wsrep_tls_result_eof;
+ case wsrep::tls_service::error:
+ return wsrep_tls_result_error;
+ default:
+ assert(status < 0);
+ return wsrep_tls_result_error;
+ }
+ }
+
+
+ static enum wsrep_tls_result
+ tls_stream_client_handshake_cb(
+ wsrep_tls_context_t*,
+ wsrep_tls_stream_t* stream)
+ {
+ assert(tls_service_impl);
+ return map_return_value(
+ tls_service_impl->client_handshake(
+ reinterpret_cast<wsrep::tls_stream*>(stream->opaque)));
+ }
+
+ static enum wsrep_tls_result
+ tls_stream_server_handshake_cb(
+ wsrep_tls_context_t*,
+ wsrep_tls_stream_t* stream)
+ {
+ assert(tls_service_impl);
+ return map_return_value(
+ tls_service_impl->server_handshake(
+ reinterpret_cast<wsrep::tls_stream*>(stream->opaque)));
+ }
+
+ static enum wsrep_tls_result tls_stream_read_cb(
+ wsrep_tls_context_t*,
+ wsrep_tls_stream_t* stream,
+ void* buf,
+ size_t max_count,
+ size_t* bytes_transferred)
+ {
+ assert(tls_service_impl);
+ auto result(tls_service_impl->read(
+ reinterpret_cast<wsrep::tls_stream*>(stream->opaque),
+ buf, max_count));
+ *bytes_transferred = result.bytes_transferred;
+ return map_return_value(result.status);
+ }
+
+ static enum wsrep_tls_result tls_stream_write_cb(
+ wsrep_tls_context_t*,
+ wsrep_tls_stream_t* stream,
+ const void* buf,
+ size_t count,
+ size_t* bytes_transferred)
+ {
+ assert(tls_service_impl);
+ auto result(tls_service_impl->write(
+ reinterpret_cast<wsrep::tls_stream*>(stream->opaque),
+ buf, count));
+ *bytes_transferred = result.bytes_transferred;
+ return map_return_value(result.status);
+ }
+
+ static enum wsrep_tls_result
+ tls_stream_shutdown_cb(wsrep_tls_context_t*,
+ wsrep_tls_stream_t* stream)
+ {
+ assert(tls_service_impl);
+ // @todo Handle other values than success.
+ return map_return_value(
+ tls_service_impl->shutdown(
+ reinterpret_cast<wsrep::tls_stream*>(stream->opaque)));
+ }
+
+ static wsrep_tls_service_v1_t tls_service_callbacks =
+ {
+ tls_stream_init_cb,
+ tls_stream_deinit_cb,
+ tls_stream_get_error_number_cb,
+ tls_stream_get_error_category_cb,
+ tls_stream_client_handshake_cb,
+ tls_stream_server_handshake_cb,
+ tls_stream_read_cb,
+ tls_stream_write_cb,
+ tls_stream_shutdown_cb,
+ tls_error_message_get_cb,
+ 0 // we pass NULL context for now.
+ };
+}
+
+int wsrep::tls_service_v1_probe(void* dlh)
+{
+ typedef int (*init_fn)(wsrep_tls_service_v1_t*);
+ typedef void (*deinit_fn)();
+ if (wsrep_impl::service_probe<init_fn>(
+ dlh, WSREP_TLS_SERVICE_INIT_FUNC_V1, "tls service v1") ||
+ wsrep_impl::service_probe<deinit_fn>(
+ dlh, WSREP_TLS_SERVICE_DEINIT_FUNC_V1, "tls service v1"))
+ {
+ wsrep::log_warning() << "Provider does not support tls service v1";
+ return 1;
+ }
+ return 0;
+}
+
+int wsrep::tls_service_v1_init(void* dlh,
+ wsrep::tls_service* tls_service)
+{
+ if (not (dlh && tls_service)) return EINVAL;
+
+ typedef int (*init_fn)(wsrep_tls_service_v1_t*);
+ wsrep_tls_service_v1::tls_service_impl = tls_service;
+ int ret(0);
+ if ((ret = wsrep_impl::service_init<init_fn>(
+ dlh, WSREP_TLS_SERVICE_INIT_FUNC_V1,
+ &wsrep_tls_service_v1::tls_service_callbacks,
+ "tls service v1")))
+ {
+ wsrep_tls_service_v1::tls_service_impl = 0;
+ }
+ else
+ {
+ ++wsrep_tls_service_v1::use_count;
+ }
+ return ret;
+}
+
+void wsrep::tls_service_v1_deinit(void* dlh)
+{
+ typedef int (*deinit_fn)();
+ wsrep_impl::service_deinit<deinit_fn>(
+ dlh, WSREP_TLS_SERVICE_DEINIT_FUNC_V1, "tls service v1");
+ --wsrep_tls_service_v1::use_count;
+ if (wsrep_tls_service_v1::use_count == 0)
+ {
+ wsrep_tls_service_v1::tls_service_impl = 0;
+ }
+}
diff --git a/wsrep-lib/src/tls_service_v1.hpp b/wsrep-lib/src/tls_service_v1.hpp
new file mode 100644
index 00000000..fe17e206
--- /dev/null
+++ b/wsrep-lib/src/tls_service_v1.hpp
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_TLS_SERVICE_V1_HPP
+#define WSREP_TLS_SERVICE_V1_HPP
+
+namespace wsrep
+{
+ class tls_service;
+ /**
+ * Probe tls_service_v1 support in loaded library.
+ *
+ * @param dlh Handle returned by dlopen().
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int tls_service_v1_probe(void *dlh);
+
+ /**
+ * Initialize TLS service.
+ *
+ * @param dlh Handle returned by dlopen().
+ * @params tls_service Pointer to wsrep::thread_service implementation.
+ *
+ * @return Zero on success, non-zero system error code on failure.
+ */
+ int tls_service_v1_init(void* dlh,
+ wsrep::tls_service* tls_service);
+
+ /**
+ * Deinitialize TLS service.
+ *
+ * @param dlh Handler returned by dlopen().
+ */
+ void tls_service_v1_deinit(void* dlh);
+}
+
+#endif // WSREP_TLS_SERVICE_V1_HPP
diff --git a/wsrep-lib/src/transaction.cpp b/wsrep-lib/src/transaction.cpp
new file mode 100644
index 00000000..451e94dd
--- /dev/null
+++ b/wsrep-lib/src/transaction.cpp
@@ -0,0 +1,2171 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/transaction.hpp"
+#include "wsrep/client_state.hpp"
+#include "wsrep/server_state.hpp"
+#include "wsrep/storage_service.hpp"
+#include "wsrep/high_priority_service.hpp"
+#include "wsrep/key.hpp"
+#include "wsrep/logger.hpp"
+#include "wsrep/compiler.hpp"
+#include "wsrep/server_service.hpp"
+#include "wsrep/client_service.hpp"
+
+#include <cassert>
+#include <sstream>
+#include <memory>
+
+namespace
+{
+ class storage_service_deleter
+ {
+ public:
+ storage_service_deleter(wsrep::server_service& server_service)
+ : server_service_(server_service)
+ { }
+ void operator()(wsrep::storage_service* storage_service)
+ {
+ server_service_.release_storage_service(storage_service);
+ }
+ private:
+ wsrep::server_service& server_service_;
+ };
+
+ template <class D>
+ class scoped_storage_service
+ {
+ public:
+ scoped_storage_service(wsrep::client_service& client_service,
+ wsrep::storage_service* storage_service,
+ D deleter)
+ : client_service_(client_service)
+ , storage_service_(storage_service)
+ , deleter_(deleter)
+ {
+ if (storage_service_ == 0)
+ {
+ throw wsrep::runtime_error("Null client_state provided");
+ }
+ client_service_.reset_globals();
+ storage_service_->store_globals();
+ }
+
+ wsrep::storage_service& storage_service()
+ {
+ return *storage_service_;
+ }
+
+ ~scoped_storage_service()
+ {
+ deleter_(storage_service_);
+ client_service_.store_globals();
+ }
+ private:
+ scoped_storage_service(const scoped_storage_service&);
+ scoped_storage_service& operator=(const scoped_storage_service&);
+ wsrep::client_service& client_service_;
+ wsrep::storage_service* storage_service_;
+ D deleter_;
+ };
+}
+
+// Public
+
+wsrep::transaction::transaction(
+ wsrep::client_state& client_state)
+ : server_service_(client_state.server_state().server_service())
+ , client_service_(client_state.client_service())
+ , client_state_(client_state)
+ , server_id_()
+ , id_(transaction_id::undefined())
+ , state_(s_executing)
+ , state_hist_()
+ , bf_abort_state_(s_executing)
+ , bf_abort_provider_status_()
+ , bf_abort_client_state_()
+ , bf_aborted_in_total_order_()
+ , ws_handle_()
+ , ws_meta_()
+ , flags_()
+ , implicit_deps_(false)
+ , certified_(false)
+ , fragments_certified_for_statement_()
+ , streaming_context_()
+ , sr_keys_()
+ , apply_error_buf_()
+ , xid_()
+ , streaming_rollback_in_progress_(false)
+ , is_bf_immutable_(false)
+{ }
+
+
+wsrep::transaction::~transaction()
+{
+}
+
+int wsrep::transaction::start_transaction(
+ const wsrep::transaction_id& id)
+{
+ debug_log_state("start_transaction enter");
+ assert(active() == false);
+ assert(is_xa() == false);
+ assert(flags() == 0);
+ server_id_ = client_state_.server_state().id();
+ id_ = id;
+ state_ = s_executing;
+ state_hist_.clear();
+ ws_handle_ = wsrep::ws_handle(id);
+ flags(wsrep::provider::flag::start_transaction);
+ switch (client_state_.mode())
+ {
+ case wsrep::client_state::m_high_priority:
+ debug_log_state("start_transaction success");
+ return 0;
+ case wsrep::client_state::m_local:
+ debug_log_state("start_transaction success");
+ return provider().start_transaction(ws_handle_);
+ default:
+ debug_log_state("start_transaction error");
+ assert(0);
+ return 1;
+ }
+}
+
+int wsrep::transaction::start_transaction(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta)
+{
+ debug_log_state("start_transaction enter");
+ if (state() != s_replaying)
+ {
+ // assert(ws_meta.flags());
+ assert(active() == false);
+ assert(flags() == 0);
+ server_id_ = ws_meta.server_id();
+ id_ = ws_meta.transaction_id();
+ assert(client_state_.mode() == wsrep::client_state::m_high_priority);
+ state_ = s_executing;
+ state_hist_.clear();
+ ws_handle_ = ws_handle;
+ ws_meta_ = ws_meta;
+ flags(wsrep::provider::flag::start_transaction);
+ certified_ = true;
+ }
+ else
+ {
+ ws_meta_ = ws_meta;
+ assert(ws_meta_.flags() & wsrep::provider::flag::commit);
+ assert(active());
+ assert(client_state_.mode() == wsrep::client_state::m_high_priority);
+ assert(state() == s_replaying);
+ assert(ws_meta_.seqno().is_undefined() == false);
+ certified_ = true;
+ }
+ debug_log_state("start_transaction leave");
+ return 0;
+}
+
+int wsrep::transaction::next_fragment(
+ const wsrep::ws_meta& ws_meta)
+{
+ debug_log_state("next_fragment enter");
+ ws_meta_ = ws_meta;
+ debug_log_state("next_fragment leave");
+ return 0;
+}
+
+void wsrep::transaction::adopt(const wsrep::transaction& transaction)
+{
+ debug_log_state("adopt enter");
+ assert(transaction.is_streaming());
+ start_transaction(transaction.id());
+ server_id_ = transaction.server_id_;
+ flags_ = transaction.flags();
+ streaming_context_ = transaction.streaming_context();
+ debug_log_state("adopt leave");
+}
+
+void wsrep::transaction::fragment_applied(wsrep::seqno seqno)
+{
+ assert(active());
+ streaming_context_.applied(seqno);
+}
+
+int wsrep::transaction::prepare_for_ordering(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ bool is_commit)
+{
+ assert(active());
+
+ if (state_ != s_replaying)
+ {
+ ws_handle_ = ws_handle;
+ ws_meta_ = ws_meta;
+ certified_ = is_commit;
+ }
+ return 0;
+}
+
+int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid)
+{
+ try
+ {
+ return provider().assign_read_view(ws_handle_, gtid);
+ }
+ catch (...)
+ {
+ wsrep::log_error() << "Failed to assign read view";
+ return 1;
+ }
+}
+
+int wsrep::transaction::append_key(const wsrep::key& key)
+{
+ assert(active());
+ try
+ {
+ debug_log_key_append(key);
+ sr_keys_.insert(key);
+ return provider().append_key(ws_handle_, key);
+ }
+ catch (...)
+ {
+ wsrep::log_error() << "Failed to append key";
+ return 1;
+ }
+}
+
+int wsrep::transaction::append_data(const wsrep::const_buffer& data)
+{
+ assert(active());
+ return provider().append_data(ws_handle_, data);
+}
+
+int wsrep::transaction::after_row()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
+ debug_log_state("after_row_enter");
+ int ret(0);
+ if (streaming_context_.fragment_size() &&
+ streaming_context_.fragment_unit() != streaming_context::statement)
+ {
+ ret = streaming_step(lock);
+ }
+ debug_log_state("after_row_leave");
+ return ret;
+}
+
+int wsrep::transaction::before_prepare(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ int ret(0);
+ debug_log_state("before_prepare_enter");
+ assert(state() == s_executing || state() == s_must_abort ||
+ state() == s_replaying);
+
+ if (state() == s_must_abort)
+ {
+ assert(client_state_.mode() == wsrep::client_state::m_local);
+ client_state_.override_error(wsrep::e_deadlock_error);
+ return 1;
+ }
+
+ switch (client_state_.mode())
+ {
+ case wsrep::client_state::m_local:
+ if (is_streaming())
+ {
+ client_service_.debug_crash(
+ "crash_last_fragment_commit_before_fragment_removal");
+ lock.unlock();
+ if (client_service_.statement_allowed_for_streaming() == false)
+ {
+ client_state_.override_error(
+ wsrep::e_error_during_commit,
+ wsrep::provider::error_not_allowed);
+ ret = 1;
+ }
+ else if (!is_xa())
+ {
+ // Note: we can't remove fragments here for XA,
+ // the transaction has already issued XA END and
+ // is in IDLE state, no more changes allowed!
+ ret = client_service_.remove_fragments();
+ if (ret)
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ }
+ }
+ lock.lock();
+ client_service_.debug_crash(
+ "crash_last_fragment_commit_after_fragment_removal");
+ if (state() == s_must_abort)
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ ret = 1;
+ }
+ }
+
+ if (ret == 0)
+ {
+ if (is_xa())
+ {
+ // Force fragment replication on XA prepare
+ flags(flags() | wsrep::provider::flag::prepare);
+ pa_unsafe(true);
+ append_sr_keys_for_commit();
+ const bool force_streaming_step = true;
+ ret = streaming_step(lock, force_streaming_step);
+ if (ret == 0)
+ {
+ assert(state() == s_executing);
+ state(lock, s_preparing);
+ }
+ }
+ else
+ {
+ ret = certify_commit(lock);
+ }
+
+ assert((ret == 0 && state() == s_preparing) ||
+ (state() == s_must_abort ||
+ state() == s_must_replay ||
+ state() == s_cert_failed));
+
+ if (ret)
+ {
+ assert(state() == s_must_replay ||
+ client_state_.current_error());
+ ret = 1;
+ }
+ }
+
+ break;
+ case wsrep::client_state::m_high_priority:
+ // Note: fragment removal is done from applying
+ // context for high priority mode.
+ if (is_xa())
+ {
+ assert(state() == s_executing ||
+ state() == s_replaying);
+ if (state() == s_replaying)
+ {
+ break;
+ }
+ }
+ state(lock, s_preparing);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+
+ assert(state() == s_preparing ||
+ (is_xa() && state() == s_replaying) ||
+ (ret && (state() == s_must_abort ||
+ state() == s_must_replay ||
+ state() == s_cert_failed ||
+ state() == s_aborted)));
+ debug_log_state("before_prepare_leave");
+ return ret;
+}
+
+int wsrep::transaction::after_prepare(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+
+ int ret = 0;
+ debug_log_state("after_prepare_enter");
+ if (is_xa())
+ {
+ switch (state())
+ {
+ case s_preparing:
+ assert(client_state_.mode() == wsrep::client_state::m_local ||
+ (certified() && ordered()));
+ state(lock, s_prepared);
+ break;
+ case s_must_abort:
+ // prepared locally, but client has not received
+ // a result yet. We can still abort.
+ assert(client_state_.mode() == wsrep::client_state::m_local);
+ client_state_.override_error(wsrep::e_deadlock_error);
+ ret = 1;
+ break;
+ case s_replaying:
+ assert(client_state_.mode() == wsrep::client_state::m_high_priority);
+ break;
+ default:
+ assert(0);
+ }
+ }
+ else
+ {
+ assert(certified() && ordered());
+ assert(state() == s_preparing || state() == s_must_abort);
+
+ if (state() == s_must_abort)
+ {
+ assert(client_state_.mode() == wsrep::client_state::m_local);
+ state(lock, s_must_replay);
+ ret = 1;
+ }
+ else
+ {
+ state(lock, s_committing);
+ }
+ }
+ debug_log_state("after_prepare_leave");
+ return ret;
+}
+
+int wsrep::transaction::before_commit()
+{
+ int ret(1);
+
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
+ debug_log_state("before_commit_enter");
+ assert(client_state_.mode() != wsrep::client_state::m_toi);
+ assert(state() == s_executing ||
+ state() == s_prepared ||
+ state() == s_committing ||
+ state() == s_must_abort ||
+ state() == s_replaying);
+ assert((state() != s_committing && state() != s_replaying) ||
+ certified());
+
+ switch (client_state_.mode())
+ {
+ case wsrep::client_state::m_local:
+ if (state() == s_executing)
+ {
+ ret = before_prepare(lock) || after_prepare(lock);
+ assert((ret == 0 &&
+ (state() == s_committing || state() == s_prepared))
+ ||
+ (state() == s_must_abort ||
+ state() == s_must_replay ||
+ state() == s_cert_failed ||
+ state() == s_aborted));
+ }
+ else if (state() != s_committing && state() != s_prepared)
+ {
+ assert(state() == s_must_abort);
+ if (certified() ||
+ (is_xa() && is_streaming()))
+ {
+ state(lock, s_must_replay);
+ }
+ else
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ }
+ }
+ else
+ {
+ // 2PC commit, prepare was done before
+ ret = 0;
+ }
+
+ if (ret == 0 && state() == s_prepared)
+ {
+ ret = certify_commit(lock);
+ assert((ret == 0 && state() == s_committing) ||
+ (state() == s_must_abort ||
+ state() == s_must_replay ||
+ state() == s_cert_failed ||
+ state() == s_prepared));
+ }
+
+ if (ret == 0)
+ {
+ assert(certified());
+ assert(ordered());
+ lock.unlock();
+ client_service_.debug_sync("wsrep_before_commit_order_enter");
+ enum wsrep::provider::status
+ status(provider().commit_order_enter(ws_handle_, ws_meta_));
+ lock.lock();
+ switch (status)
+ {
+ case wsrep::provider::success:
+ break;
+ case wsrep::provider::error_bf_abort:
+ if (state() != s_must_abort)
+ {
+ state(lock, s_must_abort);
+ }
+ state(lock, s_must_replay);
+ ret = 1;
+ break;
+ default:
+ ret = 1;
+ assert(0);
+ break;
+ }
+ }
+ break;
+ case wsrep::client_state::m_high_priority:
+ assert(certified());
+ assert(ordered());
+ if (is_xa())
+ {
+ assert(state() == s_prepared ||
+ state() == s_replaying);
+ state(lock, s_committing);
+ ret = 0;
+ }
+ else if (state() == s_executing || state() == s_replaying)
+ {
+ ret = before_prepare(lock) || after_prepare(lock);
+ }
+ else
+ {
+ ret = 0;
+ }
+ lock.unlock();
+ ret = ret || provider().commit_order_enter(ws_handle_, ws_meta_);
+ lock.lock();
+ if (ret)
+ {
+ state(lock, s_must_abort);
+ state(lock, s_aborting);
+ }
+ break;
+ default:
+ assert(0);
+ break;
+ }
+
+ if (ret == 0 && state() == s_committing)
+ {
+ is_bf_immutable_ = true;
+ }
+
+ debug_log_state("before_commit_leave");
+ return ret;
+}
+
+int wsrep::transaction::ordered_commit()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
+ debug_log_state("ordered_commit_enter");
+ assert(state() == s_committing);
+ assert(is_bf_immutable_);
+ assert(ordered());
+ client_service_.debug_sync("wsrep_before_commit_order_leave");
+ int ret(provider().commit_order_leave(ws_handle_, ws_meta_,
+ apply_error_buf_));
+ client_service_.debug_sync("wsrep_after_commit_order_leave");
+ // Should always succeed:
+ // 1) If before commit before succeeds, the transaction handle
+ // in the provider is guaranteed to exist and the commit
+ // has been ordered
+ // 2) The transaction which has been ordered for commit cannot be BF
+ // aborted anymore
+ // 3) The provider should always guarantee that the transactions which
+ // have been ordered for commit can finish committing.
+ //
+ // The exception here is a storage service transaction which is running
+ // in high priority mode. The fragment storage commit may get BF
+ // aborted in the provider after commit ordering has been
+ // established since the transaction is operating in streaming
+ // mode.
+ if (ret)
+ {
+ assert(client_state_.mode() == wsrep::client_state::m_high_priority);
+ state(lock, s_must_abort);
+ state(lock, s_aborting);
+ }
+ else
+ {
+ state(lock, s_ordered_commit);
+ }
+ debug_log_state("ordered_commit_leave");
+ return ret;
+}
+
+int wsrep::transaction::after_commit()
+{
+ int ret(0);
+
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
+ assert(is_bf_immutable_);
+ debug_log_state("after_commit_enter");
+ assert(state() == s_ordered_commit);
+
+ if (is_streaming())
+ {
+ assert(client_state_.mode() == wsrep::client_state::m_local ||
+ client_state_.mode() == wsrep::client_state::m_high_priority);
+
+ if (is_xa())
+ {
+ // XA fragment removal happens here,
+ // see comment in before_prepare
+ remove_fragments_in_storage_service_scope(lock);
+ }
+
+ if (client_state_.mode() == wsrep::client_state::m_local)
+ {
+ lock.unlock();
+ client_state_.server_state_.stop_streaming_client(&client_state_);
+ lock.lock();
+ }
+ streaming_context_.cleanup();
+ }
+
+ switch (client_state_.mode())
+ {
+ case wsrep::client_state::m_local:
+ ret = provider().release(ws_handle_);
+ break;
+ case wsrep::client_state::m_high_priority:
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ assert(ret == 0);
+ state(lock, s_committed);
+ debug_log_state("after_commit_leave");
+ return ret;
+}
+
+int wsrep::transaction::before_rollback()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
+ debug_log_state("before_rollback_enter");
+ assert(state() == s_executing ||
+ state() == s_preparing ||
+ state() == s_prepared ||
+ state() == s_must_abort ||
+ // Background rollbacker or rollback initiated from SE
+ state() == s_aborting ||
+ state() == s_cert_failed ||
+ state() == s_must_replay);
+
+ switch (client_state_.mode())
+ {
+ case wsrep::client_state::m_local:
+ if (is_streaming())
+ {
+ client_service_.debug_sync("wsrep_before_SR_rollback");
+ }
+ switch (state())
+ {
+ case s_preparing:
+ // Error detected during prepare phase
+ state(lock, s_must_abort);
+ WSREP_FALLTHROUGH;
+ case s_prepared:
+ WSREP_FALLTHROUGH;
+ case s_executing:
+ // Voluntary rollback
+ if (is_streaming())
+ {
+ streaming_rollback(lock);
+ }
+ state(lock, s_aborting);
+ break;
+ case s_must_abort:
+ if (certified())
+ {
+ state(lock, s_must_replay);
+ }
+ else
+ {
+ if (is_streaming())
+ {
+ streaming_rollback(lock);
+ }
+ state(lock, s_aborting);
+ }
+ break;
+ case s_cert_failed:
+ if (is_streaming())
+ {
+ streaming_rollback(lock);
+ }
+ state(lock, s_aborting);
+ break;
+ case s_aborting:
+ if (is_streaming())
+ {
+ streaming_rollback(lock);
+ }
+ break;
+ case s_must_replay:
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ break;
+ case wsrep::client_state::m_high_priority:
+ // Rollback by rollback write set or BF abort
+ assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting);
+ if (state_ != s_aborting)
+ {
+ state(lock, s_aborting);
+ }
+ break;
+ default:
+ assert(0);
+ break;
+ }
+
+ debug_log_state("before_rollback_leave");
+ return 0;
+}
+
+int wsrep::transaction::after_rollback()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
+ debug_log_state("after_rollback_enter");
+ assert(state() == s_aborting || state() == s_must_replay);
+
+ // Note that it would be technically more correct to
+ // remove fragments after TOI BF abort in before_rollback(),
+ // it seems to cause deadlocks and is done here instead.
+ // We assume that the application does not let the TOI
+ // to proceed until this method returns, e.g. by holding
+ // MDL locks. It is not clear how to enforce that though.
+ if (is_streaming() && bf_aborted_in_total_order_)
+ {
+ remove_fragments_in_storage_service_scope(lock);
+ }
+
+ if (state() == s_aborting)
+ {
+ if (is_streaming())
+ {
+ // We skip streaming context cleanup for replay because
+ // we want to remember if the transaction was streaming.
+ // See transaction::replay()
+ streaming_context_.cleanup();
+ }
+
+ state(lock, s_aborted);
+ }
+
+ // Releasing the transaction from provider is postponed into
+ // after_statement() hook. Depending on DBMS system all the
+ // resources acquired by transaction may or may not be released
+ // during actual rollback. If the transaction has been ordered,
+ // releasing the commit ordering critical section should be
+ // also postponed until all resources have been released.
+ debug_log_state("after_rollback_leave");
+ return 0;
+}
+
+int wsrep::transaction::release_commit_order(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ lock.unlock();
+ int ret(provider().commit_order_enter(ws_handle_, ws_meta_));
+ if (!ret)
+ {
+ server_service_.set_position(client_service_, ws_meta_.gtid());
+ ret = provider().commit_order_leave(ws_handle_, ws_meta_,
+ apply_error_buf_);
+ }
+ // grabbing lock here, as set_position may call for sync wait in galera side
+ lock.lock();
+ return ret;
+}
+
+void wsrep::transaction::remove_fragments_in_storage_service_scope(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ lock.unlock();
+ {
+ scoped_storage_service<storage_service_deleter>
+ sr_scope(
+ client_service_,
+ server_service_.storage_service(client_service_),
+ storage_service_deleter(server_service_));
+ wsrep::storage_service& storage_service(
+ sr_scope.storage_service());
+ storage_service.adopt_transaction(*this);
+ storage_service.remove_fragments();
+ storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
+ }
+ lock.lock();
+}
+
+int wsrep::transaction::after_statement()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
+ return after_statement(lock);
+}
+
+int wsrep::transaction::after_statement(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ int ret(0);
+ debug_log_state("after_statement_enter");
+ assert(lock.owns_lock());
+ assert(client_state_.mode() == wsrep::client_state::m_local);
+ assert(state() == s_executing ||
+ state() == s_prepared ||
+ state() == s_committed ||
+ state() == s_aborted ||
+ state() == s_must_abort ||
+ state() == s_cert_failed ||
+ state() == s_must_replay);
+
+ if (state() == s_executing &&
+ streaming_context_.fragment_size() &&
+ streaming_context_.fragment_unit() == streaming_context::statement)
+ {
+ ret = streaming_step(lock);
+ }
+
+ switch (state())
+ {
+ case s_executing:
+ // ?
+ break;
+ case s_prepared:
+ assert(is_xa());
+ break;
+ case s_committed:
+ assert(is_streaming() == false);
+ break;
+ case s_must_abort:
+ case s_cert_failed:
+ // Error may be set already. For example, if fragment size
+ // exceeded the maximum size in certify_fragment(), then
+ // we already have wsrep::e_error_during_commit
+ if (client_state_.current_error() == wsrep::e_success)
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ }
+ lock.unlock();
+ ret = client_service_.bf_rollback();
+ lock.lock();
+ if (state() != s_must_replay)
+ {
+ break;
+ }
+ // Continue to replay if rollback() changed the state to s_must_replay
+ WSREP_FALLTHROUGH;
+ case s_must_replay:
+ {
+ if (is_xa() && !ordered())
+ {
+ ret = xa_replay_commit(lock);
+ }
+ else
+ {
+ ret = replay(lock);
+ }
+ break;
+ }
+ case s_aborted:
+ // Raise a deadlock error if the transaction was BF aborted and
+ // rolled back by client outside of transaction hooks.
+ if (bf_aborted() && client_state_.current_error() == wsrep::e_success &&
+ !client_service_.is_xa_rollback())
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ }
+ break;
+ default:
+ assert(0);
+ break;
+ }
+
+ assert(state() == s_executing ||
+ state() == s_prepared ||
+ state() == s_committed ||
+ state() == s_aborted ||
+ state() == s_must_replay);
+
+ if (state() == s_aborted)
+ {
+ if (ordered())
+ {
+ ret = release_commit_order(lock);
+ }
+ lock.unlock();
+ provider().release(ws_handle_);
+ lock.lock();
+ }
+
+ if (state() != s_executing &&
+ (!client_service_.is_explicit_xa() ||
+ client_state_.state() == wsrep::client_state::s_quitting))
+ {
+ cleanup();
+ }
+ fragments_certified_for_statement_ = 0;
+ debug_log_state("after_statement_leave");
+ assert(ret == 0 || state() == s_aborted);
+ return ret;
+}
+
+void wsrep::transaction::after_command_must_abort(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ debug_log_state("after_command_must_abort enter");
+ assert(active());
+ assert(state_ == s_must_abort);
+
+ if (is_xa() && is_streaming())
+ {
+ state(lock, s_must_replay);
+ }
+
+ lock.unlock();
+ client_service_.bf_rollback();
+ lock.lock();
+
+ if (is_xa() && is_streaming())
+ {
+ xa_replay(lock);
+ }
+ else
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ }
+
+ debug_log_state("after_command_must_abort leave");
+}
+
+void wsrep::transaction::after_applying()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
+ debug_log_state("after_applying enter");
+ assert(state_ == s_executing ||
+ state_ == s_prepared ||
+ state_ == s_committed ||
+ state_ == s_aborted ||
+ state_ == s_replaying);
+
+ if (state_ != s_executing && state_ != s_prepared)
+ {
+ if (state_ == s_replaying) state(lock, s_aborted);
+ cleanup();
+ }
+ else
+ {
+ // State remains executing or prepared, so this is a streaming applier.
+ // Reset the meta data to avoid releasing commit order
+ // critical section above if the next fragment is rollback
+ // fragment. Rollback fragment ordering will be handled by
+ // another instance while removing the fragments from
+ // storage.
+ ws_meta_ = wsrep::ws_meta();
+ }
+ debug_log_state("after_applying leave");
+}
+
+bool wsrep::transaction::bf_abort(
+ wsrep::unique_lock<wsrep::mutex>& lock,
+ wsrep::seqno bf_seqno)
+{
+ bool ret(false);
+ const enum wsrep::transaction::state state_at_enter(state());
+ assert(lock.owns_lock());
+
+ if (active() == false)
+ {
+ WSREP_LOG_DEBUG(client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ "Transaction not active, skipping bf abort");
+ }
+ else if (is_bf_immutable_)
+ {
+ WSREP_LOG_DEBUG(client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ "Transaction has become immutable for BF abort");
+ }
+ else
+ {
+ switch (state_at_enter)
+ {
+ case s_executing:
+ case s_preparing:
+ case s_prepared:
+ case s_certifying:
+ case s_committing:
+ {
+ wsrep::seqno victim_seqno;
+ enum wsrep::provider::status
+ status(client_state_.provider().bf_abort(
+ bf_seqno, id_, victim_seqno));
+ switch (status)
+ {
+ case wsrep::provider::success:
+ WSREP_LOG_DEBUG(client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ "Seqno " << bf_seqno
+ << " successfully BF aborted " << id_
+ << " victim_seqno " << victim_seqno);
+ bf_abort_state_ = state_at_enter;
+ state(lock, s_must_abort);
+ ret = true;
+ break;
+ default:
+ WSREP_LOG_DEBUG(client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ "Seqno " << bf_seqno
+ << " failed to BF abort " << id_
+ << " with status " << status
+ << " victim_seqno " << victim_seqno);
+ break;
+ }
+ break;
+ }
+ default:
+ WSREP_LOG_DEBUG(client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ "BF abort not allowed in state "
+ << wsrep::to_string(state_at_enter));
+ break;
+ }
+ }
+
+ if (ret)
+ {
+ bf_abort_client_state_ = client_state_.state();
+ // If the transaction is in executing state, we must initiate
+ // streaming rollback to ensure that the rollback fragment gets
+ // replicated before the victim starts to roll back and release locks.
+ // In other states the BF abort will be detected outside of
+ // storage engine operations and streaming rollback will be
+ // handled from before_rollback() call.
+ if (client_state_.mode() == wsrep::client_state::m_local &&
+ is_streaming() && state_at_enter == s_executing)
+ {
+ streaming_rollback(lock);
+ }
+
+ if ((client_state_.state() == wsrep::client_state::s_idle &&
+ client_state_.server_state().rollback_mode() ==
+ wsrep::server_state::rm_sync) // locally processing idle
+ ||
+ // high priority streaming
+ (client_state_.mode() == wsrep::client_state::m_high_priority &&
+ is_streaming()))
+ {
+ // We need to change the state to aborting under the
+ // lock protection to avoid a race between client thread,
+ // otherwise it could happen that the client gains control
+ // between releasing the lock and before background
+ // rollbacker gets control.
+ if (is_xa() && state_at_enter == s_prepared)
+ {
+ state(lock, s_must_replay);
+ client_state_.set_rollbacker_active(true);
+ }
+ else
+ {
+ state(lock, s_aborting);
+ client_state_.set_rollbacker_active(true);
+ if (client_state_.mode() == wsrep::client_state::m_high_priority)
+ {
+ lock.unlock();
+ client_state_.server_state().stop_streaming_applier(
+ server_id_, id_);
+ lock.lock();
+ }
+ }
+
+ server_service_.background_rollback(lock, client_state_);
+ }
+ }
+ return ret;
+}
+
+bool wsrep::transaction::total_order_bf_abort(
+ wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
+ wsrep::seqno bf_seqno)
+{
+ /* We must set this flag before entering bf_abort() in order
+ * to streaming_rollback() work correctly. The flag will be
+ * unset if BF abort was not allowed. Note that we rely in
+ * bf_abort() not to release lock if the BF abort is not allowed. */
+ bf_aborted_in_total_order_ = true;
+ bool ret(bf_abort(lock, bf_seqno));
+ if (not ret)
+ {
+ bf_aborted_in_total_order_ = false;
+ }
+ return ret;
+}
+
+void wsrep::transaction::clone_for_replay(const wsrep::transaction& other)
+{
+ assert(other.state() == s_replaying);
+ id_ = other.id_;
+ xid_ = other.xid_;
+ server_id_ = other.server_id_;
+ ws_handle_ = other.ws_handle_;
+ ws_meta_ = other.ws_meta_;
+ streaming_context_ = other.streaming_context_;
+ state_ = s_replaying;
+}
+
+void wsrep::transaction::assign_xid(const wsrep::xid& xid)
+{
+ assert(active());
+ assert(!is_xa());
+ xid_ = xid;
+}
+
+int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
+ assert(active());
+ assert(is_empty());
+ assert(state() == s_executing || state() == s_replaying);
+ flags(flags() & ~wsrep::provider::flag::start_transaction);
+ if (state() == s_executing)
+ {
+ state(lock, s_certifying);
+ }
+ else
+ {
+ state(lock, s_preparing);
+ }
+ state(lock, s_prepared);
+ xid_ = xid;
+ return 0;
+}
+
+int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
+ bool commit)
+{
+ debug_log_state("commit_or_rollback_by_xid enter");
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
+ wsrep::server_state& server_state(client_state_.server_state());
+ wsrep::high_priority_service* sa(server_state.find_streaming_applier(xid));
+
+ if (!sa)
+ {
+ assert(sa);
+ client_state_.override_error(wsrep::e_error_during_commit);
+ return 1;
+ }
+
+ if (commit)
+ {
+ flags(wsrep::provider::flag::commit);
+ }
+ else
+ {
+ flags(wsrep::provider::flag::rollback);
+ }
+ pa_unsafe(true);
+ wsrep::stid stid(sa->transaction().server_id(),
+ sa->transaction().id(),
+ client_state_.id());
+ wsrep::ws_meta meta(stid);
+
+ const enum wsrep::provider::status cert_ret(
+ provider().certify(client_state_.id(),
+ ws_handle_,
+ flags(),
+ meta));
+
+ int ret;
+ if (cert_ret == wsrep::provider::success)
+ {
+ if (commit)
+ {
+ state(lock, s_certifying);
+ state(lock, s_committing);
+ state(lock, s_committed);
+ }
+ else
+ {
+ state(lock, s_aborting);
+ state(lock, s_aborted);
+ }
+ ret = 0;
+ }
+ else
+ {
+ client_state_.override_error(wsrep::e_error_during_commit,
+ cert_ret);
+ wsrep::log_error() << "Failed to commit_or_rollback_by_xid,"
+ << " xid: " << xid
+ << " error: " << cert_ret;
+ ret = 1;
+ }
+ debug_log_state("commit_or_rollback_by_xid leave");
+ return ret;
+}
+
+void wsrep::transaction::xa_detach()
+{
+ debug_log_state("xa_detach enter");
+ assert(state() == s_prepared ||
+ client_state_.state() == wsrep::client_state::s_quitting);
+ if (state() == s_prepared)
+ {
+ wsrep::server_state& server_state(client_state_.server_state());
+ server_state.convert_streaming_client_to_applier(&client_state_);
+ client_service_.store_globals();
+ client_service_.cleanup_transaction();
+ }
+ wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
+ streaming_context_.cleanup();
+ state(lock, s_aborting);
+ state(lock, s_aborted);
+ provider().release(ws_handle_);
+ cleanup();
+ debug_log_state("xa_detach leave");
+}
+
+void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ assert(is_xa());
+ assert(is_streaming());
+ assert(state() == s_must_replay);
+ assert(bf_aborted());
+
+ state(lock, s_replaying);
+
+ enum wsrep::provider::status status;
+ wsrep::server_state& server_state(client_state_.server_state());
+
+ lock.unlock();
+ server_state.convert_streaming_client_to_applier(&client_state_);
+ status = client_service_.replay_unordered();
+ client_service_.store_globals();
+ lock.lock();
+
+ if (status != wsrep::provider::success)
+ {
+ client_service_.emergency_shutdown();
+ }
+}
+
+int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ debug_log_state("xa_replay enter");
+ xa_replay_common(lock);
+ state(lock, s_aborted);
+ streaming_context_.cleanup();
+ provider().release(ws_handle_);
+ cleanup();
+ client_service_.signal_replayed();
+ debug_log_state("xa_replay leave");
+ return 0;
+}
+
+int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ debug_log_state("xa_replay_commit enter");
+ xa_replay_common(lock);
+ lock.unlock();
+ enum wsrep::provider::status status(client_service_.commit_by_xid());
+ lock.lock();
+ int ret(1);
+ switch (status)
+ {
+ case wsrep::provider::success:
+ state(lock, s_committed);
+ streaming_context_.cleanup();
+ provider().release(ws_handle_);
+ cleanup();
+ ret = 0;
+ break;
+ default:
+ log_warning() << "Failed to commit by xid during replay";
+ // Commit by xid failed, return a commit
+ // error and let the client retry
+ state(lock, s_preparing);
+ state(lock, s_prepared);
+ client_state_.override_error(wsrep::e_error_during_commit, status);
+ }
+
+ client_service_.signal_replayed();
+ debug_log_state("xa_replay_commit leave");
+ return ret;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Private //
+////////////////////////////////////////////////////////////////////////////////
+
+inline wsrep::provider& wsrep::transaction::provider()
+{
+ return client_state_.server_state().provider();
+}
+
+void wsrep::transaction::state(
+ wsrep::unique_lock<wsrep::mutex>& lock __attribute__((unused)),
+ enum wsrep::transaction::state next_state)
+{
+ WSREP_LOG_DEBUG(client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ "client: " << client_state_.id().get()
+ << " txc: " << id().get()
+ << " state: " << to_string(state_)
+ << " -> " << to_string(next_state));
+
+ assert(lock.owns_lock());
+ // BF aborter is allowed to change the state without gaining control
+ // to the state if the next state is s_must_abort, s_aborting or
+ // s_must_replay (for xa idle replay).
+ assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() ||
+ next_state == s_must_abort ||
+ next_state == s_must_replay ||
+ next_state == s_aborting);
+
+ static const char allowed[n_states][n_states] =
+ { /* ex pg pd ce co oc ct cf ma ab ad mr re */
+ { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
+ { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */
+ { 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0}, /* pd */
+ { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
+ { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
+ { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
+ { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
+ { 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0} /* re */
+ };
+
+ if (!allowed[state_][next_state])
+ {
+ wsrep::log_debug() << "unallowed state transition for transaction "
+ << id_ << ": " << wsrep::to_string(state_)
+ << " -> " << wsrep::to_string(next_state);
+ assert(0);
+ }
+
+ state_hist_.push_back(state_);
+ if (state_hist_.size() == 12)
+ {
+ state_hist_.erase(state_hist_.begin());
+ }
+ state_ = next_state;
+
+ if (state_ == s_must_replay)
+ {
+ client_service_.will_replay();
+ }
+}
+
+bool wsrep::transaction::abort_or_interrupt(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ if (state() == s_must_abort)
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ return true;
+ }
+ else if (state() == s_aborting || state() == s_aborted)
+ {
+ // Somehow we got there after BF abort without setting error
+ // status. This may happen if the DBMS side aborts the transaction
+ // but still tries to continue processing rows. Raise the error here
+ // and assert so that the error will be caught in debug builds.
+ if (bf_abort_client_state_ &&
+ client_state_.current_error() == wsrep::e_success)
+ {
+ client_state_.override_error(wsrep::e_deadlock_error);
+ assert(0);
+ }
+ return true;
+ }
+ else if (client_service_.interrupted(lock))
+ {
+ client_state_.override_error(wsrep::e_interrupted_error);
+ if (state() != s_must_abort)
+ {
+ state(lock, s_must_abort);
+ }
+ return true;
+ }
+ return false;
+}
+
+int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
+ bool force)
+{
+ assert(lock.owns_lock());
+ assert(streaming_context_.fragment_size() || is_xa());
+
+ if (client_service_.bytes_generated() <
+ streaming_context_.log_position())
+ {
+ /* Something went wrong on DBMS side in keeping track of
+ generated bytes. Return an error to abort the transaction. */
+ wsrep::log_warning() << "Bytes generated "
+ << client_service_.bytes_generated()
+ << " less than bytes certified "
+ << streaming_context_.log_position()
+ << ", aborting streaming transaction";
+ return 1;
+ }
+ int ret(0);
+
+ const size_t bytes_to_replicate(client_service_.bytes_generated() -
+ streaming_context_.log_position());
+
+ switch (streaming_context_.fragment_unit())
+ {
+ case streaming_context::row:
+ WSREP_FALLTHROUGH;
+ case streaming_context::statement:
+ streaming_context_.increment_unit_counter(1);
+ break;
+ case streaming_context::bytes:
+ streaming_context_.set_unit_counter(bytes_to_replicate);
+ break;
+ }
+
+ // Some statements have no effect. Do not atttempt to
+ // replicate a fragment if no data has been generated since
+ // last fragment replication.
+ // We use `force = true` on XA prepare: a fragment will be
+ // generated even if no data is pending replication.
+ if (bytes_to_replicate <= 0 && !force)
+ {
+ assert(bytes_to_replicate == 0);
+ return ret;
+ }
+
+ if (streaming_context_.fragment_size_exceeded() || force)
+ {
+ streaming_context_.reset_unit_counter();
+ ret = certify_fragment(lock);
+ }
+
+ return ret;
+}
+
+int wsrep::transaction::certify_fragment(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+
+ assert(client_state_.mode() == wsrep::client_state::m_local);
+ assert(streaming_context_.rolled_back() == false ||
+ state() == s_must_abort);
+
+ client_service_.wait_for_replayers(lock);
+ if (abort_or_interrupt(lock))
+ {
+ return 1;
+ }
+
+ state(lock, s_certifying);
+ lock.unlock();
+ client_service_.debug_sync("wsrep_before_fragment_certification");
+
+ enum wsrep::provider::status status(
+ client_state_.server_state_.send_pending_rollback_events());
+ if (status)
+ {
+ wsrep::log_warning()
+ << "Failed to replicate pending rollback events: "
+ << status << " ("
+ << wsrep::provider::to_string(status) << ")";
+ lock.lock();
+ state(lock, s_must_abort);
+ return 1;
+ }
+
+ wsrep::mutable_buffer data;
+ size_t log_position(0);
+ if (client_service_.prepare_fragment_for_replication(data, log_position))
+ {
+ lock.lock();
+ state(lock, s_must_abort);
+ client_state_.override_error(wsrep::e_error_during_commit);
+ return 1;
+ }
+ streaming_context_.set_log_position(log_position);
+
+ if (data.size() == 0)
+ {
+ wsrep::log_warning() << "Attempt to replicate empty data buffer";
+ lock.lock();
+ state(lock, s_executing);
+ return 0;
+ }
+
+ if (provider().append_data(ws_handle_,
+ wsrep::const_buffer(data.data(), data.size())))
+ {
+ lock.lock();
+ state(lock, s_must_abort);
+ client_state_.override_error(wsrep::e_error_during_commit);
+ return 1;
+ }
+
+ if (is_xa())
+ {
+ // One more check to see if the transaction
+ // has been aborted. This is necessary because
+ // until the append_data above will make sure
+ // that the transaction exists in provider.
+ lock.lock();
+ if (abort_or_interrupt(lock))
+ {
+ return 1;
+ }
+ lock.unlock();
+ }
+
+ if (is_streaming() == false)
+ {
+ client_state_.server_state_.start_streaming_client(&client_state_);
+ }
+
+ if (implicit_deps())
+ {
+ flags(flags() | wsrep::provider::flag::implicit_deps);
+ }
+
+ int ret(0);
+ enum wsrep::client_error error(wsrep::e_success);
+ enum wsrep::provider::status cert_ret(wsrep::provider::success);
+ // Storage service scope
+ {
+ scoped_storage_service<storage_service_deleter>
+ sr_scope(
+ client_service_,
+ server_service_.storage_service(client_service_),
+ storage_service_deleter(server_service_));
+ wsrep::storage_service& storage_service(
+ sr_scope.storage_service());
+
+ // First the fragment is appended to the stable storage.
+ // This is done to ensure that there is enough capacity
+ // available to store the fragment. The fragment meta data
+ // is updated after certification.
+ wsrep::id server_id(client_state_.server_state().id());
+
+ if (server_id.is_undefined()) {
+ // Server disconnected from cluster, do not
+ // append a fragment with undefined server_id.
+ ret = 1;
+ error = wsrep::e_append_fragment_error;
+ }
+
+ if (ret == 0)
+ {
+ ret = storage_service.start_transaction(ws_handle_);
+ if (ret)
+ {
+ error = wsrep::e_append_fragment_error;
+ }
+ }
+
+ if (ret == 0)
+ {
+ ret = storage_service.append_fragment(
+ server_id, id(), flags(),
+ wsrep::const_buffer(data.data(), data.size()), xid());
+ if (ret)
+ {
+ error = wsrep::e_append_fragment_error;
+ storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta());
+ }
+ }
+
+ if (ret == 0)
+ {
+ client_service_.debug_crash(
+ "crash_replicate_fragment_before_certify");
+
+ wsrep::ws_meta sr_ws_meta;
+ cert_ret = provider().certify(client_state_.id(),
+ ws_handle_,
+ flags(),
+ sr_ws_meta);
+ client_service_.debug_crash(
+ "crash_replicate_fragment_after_certify");
+
+ switch (cert_ret)
+ {
+ case wsrep::provider::success:
+ ++fragments_certified_for_statement_;
+ assert(sr_ws_meta.seqno().is_undefined() == false);
+ streaming_context_.certified();
+ if (storage_service.update_fragment_meta(sr_ws_meta))
+ {
+ storage_service.rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ ret = 1;
+ error = wsrep::e_deadlock_error;
+ break;
+ }
+ if (storage_service.commit(ws_handle_, sr_ws_meta))
+ {
+ ret = 1;
+ error = wsrep::e_deadlock_error;
+ }
+ else
+ {
+ streaming_context_.stored(sr_ws_meta.seqno());
+ }
+ client_service_.debug_crash(
+ "crash_replicate_fragment_success");
+ break;
+ case wsrep::provider::error_bf_abort:
+ case wsrep::provider::error_certification_failed:
+ // Streaming transcation got BF aborted, so it must roll
+ // back. Roll back the fragment storage operation out of
+ // order as the commit order will be grabbed later on
+ // during rollback process. Mark the fragment as certified
+ // though in streaming context in order to enter streaming
+ // rollback codepath.
+ //
+ // Note that despite we handle error_certification_failed
+ // here, we mark the transaction as streaming. Apparently
+ // the provider may return status corresponding to certification
+ // failure even if the fragment has passed certification.
+ // This may be a bug in provider implementation or a limitation
+ // of error codes defined in wsrep-API. In order to make
+ // sure that the transaction will be cleaned on other servers,
+ // we take a risk of sending one rollback fragment for nothing.
+ storage_service.rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ streaming_context_.certified();
+ ret = 1;
+ error = wsrep::e_deadlock_error;
+ break;
+ default:
+ // Storage service rollback must be done out of order,
+ // otherwise there may be a deadlock between BF aborter
+ // and the rollback process.
+ storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta());
+ ret = 1;
+ error = (cert_ret == wsrep::provider::error_size_exceeded ?
+ wsrep::e_size_exceeded_error :
+ wsrep::e_deadlock_error);
+ break;
+ }
+ }
+ }
+
+ // Note: This does not release the handle in the provider
+ // since streaming is still on. However it is needed to
+ // make provider internal state to transition for the
+ // next fragment. If any of the operations above failed,
+ // the handle needs to be left unreleased for the following
+ // rollback process.
+ if (ret == 0)
+ {
+ assert(error == wsrep::e_success);
+ ret = provider().release(ws_handle_);
+ if (ret)
+ {
+ error = wsrep::e_deadlock_error;
+ }
+ }
+ lock.lock();
+ if (ret)
+ {
+ assert(error != wsrep::e_success);
+ if (is_streaming() == false)
+ {
+ lock.unlock();
+ client_state_.server_state_.stop_streaming_client(&client_state_);
+ lock.lock();
+ }
+ else
+ {
+ streaming_rollback(lock);
+ }
+ if (state_ != s_must_abort)
+ {
+ state(lock, s_must_abort);
+ }
+ client_state_.override_error(error, cert_ret);
+ }
+ else if (state_ == s_must_abort)
+ {
+ if (is_streaming())
+ {
+ streaming_rollback(lock);
+ }
+ client_state_.override_error(wsrep::e_deadlock_error, cert_ret);
+ ret = 1;
+ }
+ else
+ {
+ assert(state_ == s_certifying);
+ state(lock, s_executing);
+ flags(flags() & ~wsrep::provider::flag::start_transaction);
+ flags(flags() & ~wsrep::provider::flag::pa_unsafe);
+ }
+ return ret;
+}
+
+int wsrep::transaction::certify_commit(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ assert(lock.owns_lock());
+ assert(active());
+ client_service_.wait_for_replayers(lock);
+
+ assert(lock.owns_lock());
+
+ if (abort_or_interrupt(lock))
+ {
+ if (is_xa() && state() == s_must_abort)
+ {
+ state(lock, s_must_replay);
+ }
+ return 1;
+ }
+
+ state(lock, s_certifying);
+ lock.unlock();
+
+ enum wsrep::provider::status status(
+ client_state_.server_state_.send_pending_rollback_events());
+ if (status)
+ {
+ wsrep::log_warning()
+ << "Failed to replicate pending rollback events: "
+ << status << " ("
+ << wsrep::provider::to_string(status) << ")";
+
+ // We failed to replicate some pending rollback fragment.
+ // Meaning that some transaction that was rolled back
+ // locally might still be active out there in the cluster.
+ // To avoid a potential BF-BF conflict, we need to abort
+ // and give up on this one.
+ // Notice that we can't abort a prepared XA that wants to
+ // commit. Fortunately, there is no need to in this case:
+ // the commit fragment for XA does not cause any changes and
+ // can't possibly conflict with other transactions out there.
+ if (!is_xa())
+ {
+ lock.lock();
+ state(lock, s_must_abort);
+ return 1;
+ }
+ }
+
+ if (is_streaming())
+ {
+ if (!is_xa())
+ {
+ append_sr_keys_for_commit();
+ }
+ pa_unsafe(true);
+ }
+
+ if (implicit_deps())
+ {
+ flags(flags() | wsrep::provider::flag::implicit_deps);
+ }
+
+ flags(flags() | wsrep::provider::flag::commit);
+ flags(flags() & ~wsrep::provider::flag::prepare);
+
+ if (client_service_.prepare_data_for_replication())
+ {
+ lock.lock();
+ // Here we fake that the size exceeded error came from provider,
+ // even though it came from the client service. This requires
+ // some consideration how to get meaningful error codes from
+ // the client service.
+ client_state_.override_error(wsrep::e_size_exceeded_error,
+ wsrep::provider::error_size_exceeded);
+ if (state_ != s_must_abort)
+ {
+ state(lock, s_must_abort);
+ }
+ return 1;
+ }
+
+ client_service_.debug_sync("wsrep_before_certification");
+ enum wsrep::provider::status
+ cert_ret(provider().certify(client_state_.id(),
+ ws_handle_,
+ flags(),
+ ws_meta_));
+ client_service_.debug_sync("wsrep_after_certification");
+
+ lock.lock();
+
+ assert(state() == s_certifying || state() == s_must_abort);
+
+ int ret(1);
+ switch (cert_ret)
+ {
+ case wsrep::provider::success:
+ assert(ordered());
+ certified_ = true;
+ ++fragments_certified_for_statement_;
+ switch (state())
+ {
+ case s_certifying:
+ if (is_xa())
+ {
+ state(lock, s_committing);
+ }
+ else
+ {
+ state(lock, s_preparing);
+ }
+ ret = 0;
+ break;
+ case s_must_abort:
+ // We got BF aborted after succesful certification
+ // and before acquiring client state lock. The trasaction
+ // must be replayed.
+ state(lock, s_must_replay);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ break;
+ case wsrep::provider::error_warning:
+ assert(ordered() == false);
+ state(lock, s_must_abort);
+ client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
+ break;
+ case wsrep::provider::error_transaction_missing:
+ state(lock, s_must_abort);
+ // The execution should never reach this point if the
+ // transaction has not generated any keys or data.
+ wsrep::log_warning() << "Transaction was missing in provider";
+ client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
+ break;
+ case wsrep::provider::error_bf_abort:
+ // Transaction was replicated successfully and it was either
+ // certified successfully or the result of certifying is not
+ // yet known. Therefore the transaction must roll back
+ // and go through replay either to replay and commit the whole
+ // transaction or to determine failed certification status.
+ if (state() != s_must_abort)
+ {
+ state(lock, s_must_abort);
+ }
+ state(lock, s_must_replay);
+ break;
+ case wsrep::provider::error_certification_failed:
+ state(lock, s_cert_failed);
+ client_state_.override_error(wsrep::e_deadlock_error);
+ break;
+ case wsrep::provider::error_size_exceeded:
+ state(lock, s_must_abort);
+ client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
+ break;
+ case wsrep::provider::error_connection_failed:
+ // Galera provider may return CONN_FAIL if the trx is
+ // BF aborted O_o. If we see here that the trx was BF aborted,
+ // return deadlock error instead of error during commit
+ // to reduce number of error state combinations elsewhere.
+ if (state() == s_must_abort)
+ {
+ if (is_xa())
+ {
+ state(lock, s_must_replay);
+ }
+ client_state_.override_error(wsrep::e_deadlock_error);
+ }
+ else
+ {
+ client_state_.override_error(wsrep::e_error_during_commit,
+ cert_ret);
+ if (is_xa())
+ {
+ state(lock, s_prepared);
+ }
+ else
+ {
+ state(lock, s_must_abort);
+ }
+ }
+ break;
+ case wsrep::provider::error_provider_failed:
+ if (state() != s_must_abort)
+ {
+ state(lock, s_must_abort);
+ }
+ client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
+ break;
+ case wsrep::provider::error_fatal:
+ client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
+ state(lock, s_must_abort);
+ client_service_.emergency_shutdown();
+ break;
+ case wsrep::provider::error_not_implemented:
+ case wsrep::provider::error_not_allowed:
+ client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
+ state(lock, s_must_abort);
+ wsrep::log_warning() << "Certification operation was not allowed: "
+ << "id: " << id().get()
+ << " flags: " << std::hex << flags() << std::dec;
+ break;
+ default:
+ state(lock, s_must_abort);
+ client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
+ break;
+ }
+
+ return ret;
+}
+
+int wsrep::transaction::append_sr_keys_for_commit()
+{
+ int ret(0);
+ assert(client_state_.mode() == wsrep::client_state::m_local);
+ for (wsrep::sr_key_set::branch_type::const_iterator
+ i(sr_keys_.root().begin());
+ ret == 0 && i != sr_keys_.root().end(); ++i)
+ {
+ for (wsrep::sr_key_set::leaf_type::const_iterator
+ j(i->second.begin());
+ ret == 0 && j != i->second.end(); ++j)
+ {
+ wsrep::key key(wsrep::key::shared);
+ key.append_key_part(i->first.data(), i->first.size());
+ key.append_key_part(j->data(), j->size());
+ ret = provider().append_key(ws_handle_, key);
+ }
+ }
+ return ret;
+}
+
+void wsrep::transaction::streaming_rollback(
+ wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ debug_log_state("streaming_rollback enter");
+ assert(state_ != s_must_replay);
+ assert(is_streaming());
+ assert(lock.owns_lock());
+
+ // Prevent streaming_rollback() to be executed simultaneously.
+ // Notice that lock is unlocked when calling into server_state
+ // methods, to avoid violating lock order.
+ // The condition variable below prevents a thread to go
+ // through streaming_rollback() while another thread is busy
+ // stopping or converting the streaming_client().
+ // This would be problematic if a thread is performing BF abort,
+ // while the original client manages to complete its rollback
+ // and therefore change the state of the transaction, causing
+ // assertions to fire.
+ while (streaming_rollback_in_progress_)
+ client_state_.cond_.wait(lock);
+ streaming_rollback_in_progress_ = true;
+
+ if (streaming_context_.rolled_back() == false)
+ {
+ // Note that streaming_context_ must not be cleaned up in this
+ // method. This is because the owning thread may still be executing
+ // fragment removal on commit, which will access fragment
+ // vector in streaming context. Clearing streaming context
+ // here may cause owning thread to access memory which was
+ // already freed. Cleanup for streaming_context_ will happen
+ // in after_rollback().
+
+ if (bf_aborted_in_total_order_)
+ {
+ lock.unlock();
+ server_service_.debug_sync("wsrep_streaming_rollback");
+ client_state_.server_state_.stop_streaming_client(&client_state_);
+ lock.lock();
+ }
+ else
+ {
+ // Create a high priority applier which will handle the
+ // rollback fragment or clean up on configuration change.
+ // Adopt transaction will copy fragment set and appropriate
+ // meta data.
+ lock.unlock();
+ server_service_.debug_sync("wsrep_streaming_rollback");
+ client_state_.server_state_.convert_streaming_client_to_applier(
+ &client_state_);
+ lock.lock();
+
+ enum wsrep::provider::status status(provider().rollback(id_));
+ if (status)
+ {
+ lock.unlock();
+ client_state_.server_state_.queue_rollback_event(id_);
+ lock.lock();
+ wsrep::log_debug()
+ << "Failed to replicate rollback fragment for " << id_
+ << ": " << status << " ( "
+ << wsrep::provider::to_string(status) << ")";
+ }
+ }
+
+ // Mark the streaming context as rolled back,
+ // so that this block is executed once.
+ streaming_context_.rolled_back(id_);
+ }
+
+ debug_log_state("streaming_rollback leave");
+ streaming_rollback_in_progress_ = false;
+ client_state_.cond_.notify_all();
+}
+
+int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
+{
+ int ret(0);
+ state(lock, s_replaying);
+ // Need to remember streaming state before replay, entering
+ // after_commit() after succesful replay will clear
+ // fragments.
+ const bool was_streaming(is_streaming());
+ lock.unlock();
+ client_service_.debug_sync("wsrep_before_replay");
+ enum wsrep::provider::status replay_ret(client_service_.replay());
+ client_service_.signal_replayed();
+ if (was_streaming)
+ {
+ client_state_.server_state_.stop_streaming_client(&client_state_);
+ }
+ lock.lock();
+ switch (replay_ret)
+ {
+ case wsrep::provider::success:
+ if (state() == s_replaying)
+ {
+ // Replay was done by using different client state, adjust state
+ // to committed.
+ state(lock, s_committed);
+ }
+ if (is_streaming())
+ {
+ streaming_context_.cleanup();
+ }
+ provider().release(ws_handle_);
+ break;
+ case wsrep::provider::error_certification_failed:
+ client_state_.override_error(
+ wsrep::e_deadlock_error);
+ if (is_streaming())
+ {
+ lock.unlock();
+ client_service_.remove_fragments();
+ lock.lock();
+ streaming_context_.cleanup();
+ }
+ state(lock, s_aborted);
+ ret = 1;
+ break;
+ default:
+ client_service_.emergency_shutdown();
+ break;
+ }
+
+ WSREP_LOG_DEBUG(
+ client_state_.debug_log_level(), wsrep::log::debug_level_transaction,
+ "replay returned: " << replay_ret << " ("
+ << wsrep::provider::to_string(replay_ret) << ")");
+ return ret;
+}
+
+void wsrep::transaction::cleanup()
+{
+ debug_log_state("cleanup_enter");
+ assert(state() == s_committed || state() == s_aborted);
+ id_ = wsrep::transaction_id::undefined();
+ ws_handle_ = wsrep::ws_handle();
+ // Keep the state history for troubleshooting. Reset
+ // at start_transaction().
+ // state_hist_.clear();
+ if (ordered())
+ {
+ client_state_.update_last_written_gtid(ws_meta_.gtid());
+ }
+ bf_abort_state_ = s_executing;
+ bf_abort_provider_status_ = wsrep::provider::success;
+ bf_abort_client_state_ = 0;
+ bf_aborted_in_total_order_ = false;
+ ws_meta_ = wsrep::ws_meta();
+ flags_ = 0;
+ certified_ = false;
+ implicit_deps_ = false;
+ sr_keys_.clear();
+ streaming_context_.cleanup();
+ client_service_.cleanup_transaction();
+ apply_error_buf_.clear();
+ xid_.clear();
+ is_bf_immutable_ = false;
+ debug_log_state("cleanup_leave");
+}
+
+void wsrep::transaction::debug_log_state(
+ const char* context) const
+{
+ WSREP_LOG_DEBUG(
+ client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ context
+ << "\n server: " << server_id_
+ << ", client: " << int64_t(client_state_.id().get())
+ << ", state: " << wsrep::to_c_string(client_state_.state())
+ << ", mode: " << wsrep::to_c_string(client_state_.mode())
+ << "\n trx_id: " << int64_t(id_.get())
+ << ", seqno: " << ws_meta_.seqno().get()
+ << ", flags: " << flags()
+ << "\n"
+ << " state: " << wsrep::to_c_string(state_)
+ << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_)
+ << ", error: " << wsrep::to_c_string(client_state_.current_error())
+ << ", status: " << client_state_.current_error_status()
+ << "\n"
+ << " is_sr: " << is_streaming()
+ << ", frags: " << streaming_context_.fragments_certified()
+ << ", frags size: " << streaming_context_.fragments().size()
+ << ", unit: " << streaming_context_.fragment_unit()
+ << ", size: " << streaming_context_.fragment_size()
+ << ", counter: " << streaming_context_.unit_counter()
+ << ", log_pos: " << streaming_context_.log_position()
+ << ", sr_rb: " << streaming_context_.rolled_back()
+ << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id())
+ << " thread_id: " << client_state_.owning_thread_id_
+ << "");
+}
+
+void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const
+{
+ WSREP_LOG_DEBUG(client_state_.debug_log_level(),
+ wsrep::log::debug_level_transaction,
+ "key_append: "
+ << "trx_id: "
+ << int64_t(id().get())
+ << " append key:\n" << key);
+}
+
+std::ostream& wsrep::operator<<(std::ostream& os,
+ enum wsrep::transaction::state state)
+{
+ return (os << to_c_string(state));
+}
diff --git a/wsrep-lib/src/uuid.cpp b/wsrep-lib/src/uuid.cpp
new file mode 100644
index 00000000..9fe41779
--- /dev/null
+++ b/wsrep-lib/src/uuid.cpp
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "uuid.hpp"
+
+#include <cstring>
+#include <cerrno>
+#include <cstdio>
+#include <cctype>
+
+int wsrep::uuid_scan (const char* str, size_t str_len, wsrep::uuid_t* uuid)
+{
+ unsigned int uuid_len = 0;
+ unsigned int uuid_offt = 0;
+
+ while (uuid_len + 1 < str_len) {
+ /* We are skipping potential '-' after uuid_offt == 4, 6, 8, 10
+ * which means
+ * (uuid_offt >> 1) == 2, 3, 4, 5,
+ * which in turn means
+ * (uuid_offt >> 1) - 2 <= 3
+ * since it is always >= 0, because uuid_offt is unsigned */
+ if (((uuid_offt >> 1) - 2) <= 3 && str[uuid_len] == '-') {
+ // skip dashes after 4th, 6th, 8th and 10th positions
+ uuid_len += 1;
+ continue;
+ }
+
+ if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) {
+ // got hex digit, scan another byte to uuid, increment uuid_offt
+ sscanf (str + uuid_len, "%2hhx", uuid->data + uuid_offt);
+ uuid_len += 2;
+ uuid_offt += 1;
+ if (sizeof (uuid->data) == uuid_offt)
+ return static_cast<int>(uuid_len);
+ }
+ else {
+ break;
+ }
+ }
+
+ *uuid = wsrep::uuid_initializer;
+ return -EINVAL;
+}
+
+int wsrep::uuid_print (const wsrep::uuid_t* uuid, char* str, size_t str_len)
+{
+ if (str_len > WSREP_LIB_UUID_STR_LEN) {
+ const unsigned char* u = uuid->data;
+ return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-"
+ "%02x%02x-%02x%02x%02x%02x%02x%02x",
+ u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7],
+ u[ 8], u[ 9], u[10], u[11], u[12], u[13], u[14], u[15]);
+ }
+ else {
+ return -EMSGSIZE;
+ }
+}
diff --git a/wsrep-lib/src/uuid.hpp b/wsrep-lib/src/uuid.hpp
new file mode 100644
index 00000000..72812a76
--- /dev/null
+++ b/wsrep-lib/src/uuid.hpp
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+/** @file uuid.hpp
+ *
+ * Helper methods to parse and print UUIDs, intended to use
+ * internally in wsrep-lib.
+ *
+ * The implementation is copied from wsrep-API v26.
+ */
+
+#ifndef WSREP_UUID_HPP
+#define WSREP_UUID_HPP
+
+#include "wsrep/compiler.hpp"
+
+#include <cstddef>
+
+/**
+ * Length of UUID string representation, not including terminating '\0'.
+ */
+#define WSREP_LIB_UUID_STR_LEN 36
+
+namespace wsrep
+{
+ /**
+ * UUID type.
+ */
+ typedef union uuid_
+ {
+ unsigned char data[16];
+ size_t alignment;
+ } uuid_t;
+
+ static const wsrep::uuid_t uuid_initializer = {{0, }};
+
+ /**
+ * Read UUID from string.
+ *
+ * @param str String to read from
+ * @param str_len Length of string
+ * @param[out] UUID to read to
+ *
+ * @return Number of bytes read or negative error code in case
+ * of error.
+ */
+ int uuid_scan(const char* str, size_t str_len, wsrep::uuid_t* uuid);
+
+ /**
+ * Write UUID to string. The caller must allocate at least
+ * WSREP_LIB_UUID_STR_LEN + 1 space for the output str parameter.
+ *
+ * @param uuid UUID to print
+ * @param str[out] Output buffer
+ * @param str_len Size of output buffer
+ *
+ * @return Number of chars printerd, negative error code in case of
+ * error.
+ */
+ int uuid_print(const wsrep::uuid_t* uuid, char* str, size_t str_len);
+}
+
+#endif // WSREP_UUID_HPP
diff --git a/wsrep-lib/src/view.cpp b/wsrep-lib/src/view.cpp
new file mode 100644
index 00000000..d5bff099
--- /dev/null
+++ b/wsrep-lib/src/view.cpp
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/view.hpp"
+#include "wsrep/provider.hpp"
+
+int wsrep::view::member_index(const wsrep::id& member_id) const
+{
+ for (std::vector<member>::const_iterator i(members_.begin());
+ i != members_.end(); ++i)
+ {
+ if (i->id() == member_id)
+ {
+ return static_cast<int>(i - members_.begin());
+ }
+ }
+ return -1;
+}
+
+bool wsrep::view::equal_membership(const wsrep::view& other) const
+{
+ if (members_.size() != other.members_.size())
+ {
+ return false;
+ }
+ // we can't assume members ordering
+ for (std::vector<member>::const_iterator i(members_.begin());
+ i != members_.end(); ++i)
+ {
+ if (other.member_index(i->id()) == -1)
+ {
+ return false;
+ }
+ }
+ return true;
+}
+
+void wsrep::view::print(std::ostream& os) const
+{
+ os << " id: " << state_id() << "\n"
+ << " status: " << to_c_string(status()) << "\n"
+ << " protocol_version: " << protocol_version() << "\n"
+ << " capabilities: " << provider::capability::str(capabilities())<<"\n"
+ << " final: " << (final() ? "yes" : "no") << "\n"
+ << " own_index: " << own_index() << "\n"
+ << " members(" << members().size() << "):\n";
+
+ for (std::vector<wsrep::view::member>::const_iterator i(members().begin());
+ i != members().end(); ++i)
+ {
+ os << "\t" << (i - members().begin()) /* ordinal index */
+ << ": " << i->id()
+ << ", " << i->name() << "\n";
+ }
+}
diff --git a/wsrep-lib/src/wsrep_provider_v26.cpp b/wsrep-lib/src/wsrep_provider_v26.cpp
new file mode 100644
index 00000000..eb8679c7
--- /dev/null
+++ b/wsrep-lib/src/wsrep_provider_v26.cpp
@@ -0,0 +1,1183 @@
+/*
+ * Copyright (C) 2018-2021 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep_provider_v26.hpp"
+
+#include "wsrep/encryption_service.hpp"
+#include "wsrep/server_state.hpp"
+#include "wsrep/high_priority_service.hpp"
+#include "wsrep/view.hpp"
+#include "wsrep/exception.hpp"
+#include "wsrep/logger.hpp"
+#include "wsrep/thread_service.hpp"
+#include "wsrep/tls_service.hpp"
+#include "wsrep/allowlist_service.hpp"
+
+#include "thread_service_v1.hpp"
+#include "tls_service_v1.hpp"
+#include "allowlist_service_v1.hpp"
+#include "event_service_v1.hpp"
+#include "v26/wsrep_api.h"
+
+
+#include <dlfcn.h>
+#include <cassert>
+#include <climits>
+
+#include <iostream>
+#include <sstream>
+#include <cstring> // strerror()
+
+namespace
+{
+ /////////////////////////////////////////////////////////////////////
+ // Helpers //
+ /////////////////////////////////////////////////////////////////////
+
+ enum wsrep::provider::status map_return_value(wsrep_status_t status)
+ {
+ switch (status)
+ {
+ case WSREP_OK:
+ return wsrep::provider::success;
+ case WSREP_WARNING:
+ return wsrep::provider::error_warning;
+ case WSREP_TRX_MISSING:
+ return wsrep::provider::error_transaction_missing;
+ case WSREP_TRX_FAIL:
+ return wsrep::provider::error_certification_failed;
+ case WSREP_BF_ABORT:
+ return wsrep::provider::error_bf_abort;
+ case WSREP_SIZE_EXCEEDED:
+ return wsrep::provider::error_size_exceeded;
+ case WSREP_CONN_FAIL:
+ return wsrep::provider::error_connection_failed;
+ case WSREP_NODE_FAIL:
+ return wsrep::provider::error_provider_failed;
+ case WSREP_FATAL:
+ return wsrep::provider::error_fatal;
+ case WSREP_NOT_IMPLEMENTED:
+ return wsrep::provider::error_not_implemented;
+ case WSREP_NOT_ALLOWED:
+ return wsrep::provider::error_not_allowed;
+ }
+
+ wsrep::log_warning() << "Unexpected value for wsrep_status_t: "
+ << status << " ("
+ << (status < 0 ? strerror(-status) : "") << ')';
+
+ return wsrep::provider::error_unknown;
+ }
+
+ wsrep_key_type_t map_key_type(enum wsrep::key::type type)
+ {
+ switch (type)
+ {
+ case wsrep::key::shared: return WSREP_KEY_SHARED;
+ case wsrep::key::reference: return WSREP_KEY_REFERENCE;
+ case wsrep::key::update: return WSREP_KEY_UPDATE;
+ case wsrep::key::exclusive: return WSREP_KEY_EXCLUSIVE;
+ }
+ assert(0);
+ throw wsrep::runtime_error("Invalid key type");
+ }
+
+ static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno)
+ {
+ return seqno.get();
+ }
+
+ static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno)
+ {
+ return wsrep::seqno(seqno);
+ }
+
+ template <typename F, typename T>
+ inline uint32_t map_one(const int flags, const F from,
+ const T to)
+ {
+ // INT_MAX because GCC 4.4 does not eat numeric_limits<int>::max()
+ // in static_assert
+ static_assert(WSREP_FLAGS_LAST < INT_MAX,
+ "WSREP_FLAGS_LAST < INT_MAX");
+ return static_cast<uint32_t>((flags & static_cast<int>(from)) ?
+ static_cast<int>(to) : 0);
+ }
+
+ uint32_t map_flags_to_native(int flags)
+ {
+ using wsrep::provider;
+ return static_cast<uint32_t>(
+ map_one(flags, provider::flag::start_transaction,
+ WSREP_FLAG_TRX_START) |
+ map_one(flags, provider::flag::commit, WSREP_FLAG_TRX_END) |
+ map_one(flags, provider::flag::rollback, WSREP_FLAG_ROLLBACK) |
+ map_one(flags, provider::flag::isolation, WSREP_FLAG_ISOLATION) |
+ map_one(flags, provider::flag::pa_unsafe, WSREP_FLAG_PA_UNSAFE) |
+ // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
+ // |
+ // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
+ map_one(flags, provider::flag::prepare, WSREP_FLAG_TRX_PREPARE) |
+ map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT) |
+ map_one(flags, provider::flag::implicit_deps,
+ WSREP_FLAG_IMPLICIT_DEPS));
+ }
+
+ int map_flags_from_native(uint32_t flags_u32)
+ {
+ using wsrep::provider;
+ const int flags(static_cast<int>(flags_u32));
+ return static_cast<int>(
+ map_one(flags, WSREP_FLAG_TRX_START,
+ provider::flag::start_transaction) |
+ map_one(flags, WSREP_FLAG_TRX_END, provider::flag::commit) |
+ map_one(flags, WSREP_FLAG_ROLLBACK, provider::flag::rollback) |
+ map_one(flags, WSREP_FLAG_ISOLATION, provider::flag::isolation) |
+ map_one(flags, WSREP_FLAG_PA_UNSAFE, provider::flag::pa_unsafe) |
+ // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
+ // |
+ // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
+ map_one(flags, WSREP_FLAG_TRX_PREPARE, provider::flag::prepare) |
+ map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot) |
+ map_one(flags, WSREP_FLAG_IMPLICIT_DEPS,
+ provider::flag::implicit_deps));
+ }
+
+ class mutable_ws_handle
+ {
+ public:
+ mutable_ws_handle(wsrep::ws_handle& ws_handle)
+ : ws_handle_(ws_handle)
+ , native_((wsrep_ws_handle_t)
+ {
+ ws_handle_.transaction_id().get(),
+ ws_handle_.opaque()
+ })
+ { }
+
+ ~mutable_ws_handle()
+ {
+ ws_handle_ = wsrep::ws_handle(
+ wsrep::transaction_id(native_.trx_id), native_.opaque);
+ }
+
+ wsrep_ws_handle_t* native()
+ {
+ return &native_;
+ }
+ private:
+ wsrep::ws_handle& ws_handle_;
+ wsrep_ws_handle_t native_;
+ };
+
+ class const_ws_handle
+ {
+ public:
+ const_ws_handle(const wsrep::ws_handle& ws_handle)
+ : ws_handle_(ws_handle)
+ , native_((wsrep_ws_handle_t)
+ {
+ ws_handle_.transaction_id().get(),
+ ws_handle_.opaque()
+ })
+ { }
+
+ ~const_ws_handle()
+ {
+ assert(ws_handle_.transaction_id().get() == native_.trx_id);
+ assert(ws_handle_.opaque() == native_.opaque);
+ }
+
+ const wsrep_ws_handle_t* native() const
+ {
+ return &native_;
+ }
+ private:
+ const wsrep::ws_handle& ws_handle_;
+ const wsrep_ws_handle_t native_;
+ };
+
+ class mutable_ws_meta
+ {
+ public:
+ mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags)
+ : ws_meta_(ws_meta)
+ , trx_meta_()
+ , flags_(flags)
+ {
+ std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
+ sizeof(trx_meta_.gtid.uuid.data));
+ trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
+ std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
+ sizeof(trx_meta_.stid.node.data));
+ trx_meta_.stid.conn = ws_meta.client_id().get();
+ trx_meta_.stid.trx = ws_meta.transaction_id().get();
+ trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
+ }
+
+ ~mutable_ws_meta()
+ {
+ ws_meta_ = wsrep::ws_meta(
+ wsrep::gtid(
+ wsrep::id(trx_meta_.gtid.uuid.data,
+ sizeof(trx_meta_.gtid.uuid.data)),
+ seqno_from_native(trx_meta_.gtid.seqno)),
+ wsrep::stid(wsrep::id(trx_meta_.stid.node.data,
+ sizeof(trx_meta_.stid.node.data)),
+ wsrep::transaction_id(trx_meta_.stid.trx),
+ wsrep::client_id(trx_meta_.stid.conn)),
+ seqno_from_native(trx_meta_.depends_on), flags_);
+ }
+
+ wsrep_trx_meta* native() { return &trx_meta_; }
+ uint32_t native_flags() const { return map_flags_to_native(flags_); }
+ private:
+ wsrep::ws_meta& ws_meta_;
+ wsrep_trx_meta_t trx_meta_;
+ int flags_;
+ };
+
+ class const_ws_meta
+ {
+ public:
+ const_ws_meta(const wsrep::ws_meta& ws_meta)
+ : trx_meta_()
+ {
+ std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
+ sizeof(trx_meta_.gtid.uuid.data));
+ trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
+ std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
+ sizeof(trx_meta_.stid.node.data));
+ trx_meta_.stid.conn = ws_meta.client_id().get();
+ trx_meta_.stid.trx = ws_meta.transaction_id().get();
+ trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
+ }
+
+ ~const_ws_meta()
+ {
+ }
+
+ const wsrep_trx_meta* native() const { return &trx_meta_; }
+ private:
+ wsrep_trx_meta_t trx_meta_;
+ };
+
+ enum wsrep::view::status map_view_status_from_native(
+ wsrep_view_status_t status)
+ {
+ switch (status)
+ {
+ case WSREP_VIEW_PRIMARY: return wsrep::view::primary;
+ case WSREP_VIEW_NON_PRIMARY: return wsrep::view::non_primary;
+ case WSREP_VIEW_DISCONNECTED: return wsrep::view::disconnected;
+ default: throw wsrep::runtime_error("Unknown view status");
+ }
+ }
+
+ /** @todo Currently capabilities defined in provider.hpp
+ * are one to one with wsrep_api.h. However, the mapping should
+ * be made explicit. */
+ int map_capabilities_from_native(wsrep_cap_t capabilities)
+ {
+ return static_cast<int>(capabilities);
+ }
+ wsrep::view view_from_native(const wsrep_view_info& view_info,
+ const wsrep::id& own_id)
+ {
+ std::vector<wsrep::view::member> members;
+ for (int i(0); i < view_info.memb_num; ++i)
+ {
+ wsrep::id id(view_info.members[i].id.data, sizeof(view_info.members[i].id.data));
+ std::string name(
+ view_info.members[i].name,
+ strnlen(view_info.members[i].name,
+ sizeof(view_info.members[i].name)));
+ std::string incoming(
+ view_info.members[i].incoming,
+ strnlen(view_info.members[i].incoming,
+ sizeof(view_info.members[i].incoming)));
+ members.push_back(wsrep::view::member(id, name, incoming));
+ }
+
+ int own_idx(-1);
+ if (own_id.is_undefined())
+ {
+ // If own ID is undefined, obtain it from the view. This is
+ // the case on the initial connect to cluster.
+ own_idx = view_info.my_idx;
+ }
+ else
+ {
+ // If the node has already obtained its ID from cluster,
+ // its position in the view (or lack thereof) must be determined
+ // by the ID.
+ for (size_t i(0); i < members.size(); ++i)
+ {
+ if (own_id == members[i].id())
+ {
+ own_idx = static_cast<int>(i);
+ break;
+ }
+ }
+ }
+
+ return wsrep::view(
+ wsrep::gtid(
+ wsrep::id(view_info.state_id.uuid.data,
+ sizeof(view_info.state_id.uuid.data)),
+ wsrep::seqno(view_info.state_id.seqno)),
+ wsrep::seqno(view_info.view),
+ map_view_status_from_native(view_info.status),
+ map_capabilities_from_native(view_info.capabilities),
+ own_idx,
+ view_info.proto_ver,
+ members);
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ // Callbacks //
+ /////////////////////////////////////////////////////////////////////
+
+ wsrep_cb_status_t connected_cb(
+ void* app_ctx,
+ const wsrep_view_info_t* view_info)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ wsrep::view view(view_from_native(*view_info, server_state.id()));
+ const ssize_t own_index(view.own_index());
+ assert(own_index >= 0);
+ if (own_index < 0)
+ {
+ wsrep::log_error() << "Connected without being in reported view";
+ return WSREP_CB_FAILURE;
+ }
+ assert(// first connect
+ server_state.id().is_undefined() ||
+ // reconnect to primary component
+ server_state.id() ==
+ view.members()[static_cast<size_t>(own_index)].id());
+ try
+ {
+ server_state.on_connect(view);
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_error() << "Exception: " << e.what();
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ wsrep_cb_status_t view_cb(void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view_info,
+ const char*,
+ size_t)
+ {
+ assert(app_ctx);
+ assert(view_info);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ wsrep::high_priority_service* high_priority_service(
+ reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
+ try
+ {
+ wsrep::view view(view_from_native(*view_info, server_state.id()));
+ server_state.on_view(view, high_priority_service);
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_error() << "Exception: " << e.what();
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ wsrep_cb_status_t sst_request_cb(void* app_ctx,
+ void **sst_req, size_t* sst_req_len)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+
+ try
+ {
+ std::string req(server_state.prepare_for_sst());
+ if (req.size() > 0)
+ {
+ *sst_req = ::malloc(req.size() + 1);
+ memcpy(*sst_req, req.data(), req.size() + 1);
+ *sst_req_len = req.size() + 1;
+ }
+ else
+ {
+ *sst_req = 0;
+ *sst_req_len = 0;
+ }
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ int encrypt_cb(void* app_ctx,
+ wsrep_enc_ctx_t* enc_ctx,
+ const wsrep_buf_t* input,
+ void* output,
+ wsrep_enc_direction_t direction,
+ bool last)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *static_cast<wsrep::server_state*>(app_ctx));
+
+ assert(server_state.encryption_service());
+ if (server_state.encryption_service() == 0)
+ {
+ wsrep::log_error() << "Encryption service not defined in encrypt_cb()";
+ return -1;
+ }
+
+ wsrep::const_buffer key(enc_ctx->key->ptr, enc_ctx->key->len);
+ wsrep::const_buffer in(input->ptr, input->len);
+ try
+ {
+ return server_state.encryption_service()->do_crypt(&enc_ctx->ctx,
+ key,
+ enc_ctx->iv,
+ in,
+ output,
+ direction == WSREP_ENC,
+ last);
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ free(enc_ctx->ctx);
+ // Return negative value in case of callback error
+ return -1;
+ }
+ }
+
+ wsrep_cb_status_t apply_cb(void* ctx,
+ const wsrep_ws_handle_t* wsh,
+ uint32_t flags,
+ const wsrep_buf_t* buf,
+ const wsrep_trx_meta_t* meta,
+ wsrep_bool_t* exit_loop)
+ {
+ wsrep::high_priority_service* high_priority_service(
+ reinterpret_cast<wsrep::high_priority_service*>(ctx));
+ assert(high_priority_service);
+
+ wsrep::const_buffer data(buf->ptr, buf->len);
+ wsrep::ws_handle ws_handle(wsrep::transaction_id(wsh->trx_id),
+ wsh->opaque);
+ wsrep::ws_meta ws_meta(
+ wsrep::gtid(wsrep::id(meta->gtid.uuid.data,
+ sizeof(meta->gtid.uuid.data)),
+ wsrep::seqno(meta->gtid.seqno)),
+ wsrep::stid(wsrep::id(meta->stid.node.data,
+ sizeof(meta->stid.node.data)),
+ wsrep::transaction_id(meta->stid.trx),
+ wsrep::client_id(meta->stid.conn)),
+ wsrep::seqno(seqno_from_native(meta->depends_on)),
+ map_flags_from_native(flags));
+ try
+ {
+ if (high_priority_service->apply(ws_handle, ws_meta, data))
+ {
+ return WSREP_CB_FAILURE;
+ }
+ *exit_loop = high_priority_service->must_exit();
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_error() << "Caught runtime error while applying "
+ << ws_meta.flags() << ": "
+ << e.what();
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ wsrep_cb_status_t synced_cb(void* app_ctx)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ try
+ {
+ server_state.on_sync();
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ std::cerr << "On sync failed: " << e.what() << "\n";
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+
+ wsrep_cb_status_t sst_donate_cb(void* app_ctx,
+ void* ,
+ const wsrep_buf_t* req_buf,
+ const wsrep_gtid_t* req_gtid,
+ const wsrep_buf_t*,
+ bool bypass)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ try
+ {
+ std::string req(reinterpret_cast<const char*>(req_buf->ptr),
+ req_buf->len);
+ wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
+ sizeof(req_gtid->uuid.data)),
+ wsrep::seqno(req_gtid->seqno));
+ if (server_state.start_sst(req, gtid, bypass))
+ {
+ return WSREP_CB_FAILURE;
+ }
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ void logger_cb(wsrep_log_level_t level, const char* msg)
+ {
+ static const char* const pfx("P:"); // "provider"
+ wsrep::log::level ll(wsrep::log::unknown);
+ switch (level)
+ {
+ case WSREP_LOG_FATAL:
+ case WSREP_LOG_ERROR:
+ ll = wsrep::log::error;
+ break;
+ case WSREP_LOG_WARN:
+ ll = wsrep::log::warning;
+ break;
+ case WSREP_LOG_INFO:
+ ll = wsrep::log::info;
+ break;
+ case WSREP_LOG_DEBUG:
+ ll = wsrep::log::debug;
+ break;
+ }
+ wsrep::log(ll, pfx) << msg;
+ }
+
+ static int init_thread_service(void* dlh,
+ wsrep::thread_service* thread_service)
+ {
+ assert(thread_service);
+ if (wsrep::thread_service_v1_probe(dlh))
+ {
+ // No support in library.
+ return 1;
+ }
+ else
+ {
+ if (thread_service->before_init())
+ {
+ wsrep::log_error() << "Thread service before init failed";
+ return 1;
+ }
+ wsrep::thread_service_v1_init(dlh, thread_service);
+ if (thread_service->after_init())
+ {
+ wsrep::log_error() << "Thread service after init failed";
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ static void deinit_thread_service(void* dlh)
+ {
+ // assert(not wsrep::thread_service_v1_probe(dlh));
+ wsrep::thread_service_v1_deinit(dlh);
+ }
+
+ static int init_tls_service(void* dlh,
+ wsrep::tls_service* tls_service)
+ {
+ assert(tls_service);
+ if (not wsrep::tls_service_v1_probe(dlh))
+ {
+ return wsrep::tls_service_v1_init(dlh, tls_service);
+ }
+ return 1;
+ }
+
+ static void deinit_tls_service(void* dlh)
+ {
+ // assert(not wsrep::tls_service_v1_probe(dlh));
+ wsrep::tls_service_v1_deinit(dlh);
+ }
+
+ static int init_allowlist_service(void* dlh,
+ wsrep::allowlist_service* allowlist_service)
+ {
+ assert(allowlist_service);
+ if (not wsrep::allowlist_service_v1_probe(dlh))
+ {
+ return wsrep::allowlist_service_v1_init(dlh, allowlist_service);
+ }
+ return 1;
+ }
+
+ static void deinit_allowlist_service(void* dlh)
+ {
+ // assert(not wsrep::allowlist_service_v1_probe(dlh));
+ wsrep::allowlist_service_v1_deinit(dlh);
+ }
+
+ static int init_event_service(void* dlh,
+ wsrep::event_service* service)
+ {
+ assert(service);
+ if (not wsrep::event_service_v1_probe(dlh))
+ {
+ return wsrep::event_service_v1_init(dlh, service);
+ }
+ return 1;
+ }
+
+ static void deinit_event_service(void* dlh)
+ {
+ wsrep::event_service_v1_deinit(dlh);
+ }
+}
+
+
+
+void wsrep::wsrep_provider_v26::init_services(
+ const wsrep::provider::services& services)
+{
+ if (services.thread_service)
+ {
+ if (init_thread_service(wsrep_->dlh, services.thread_service))
+ {
+ throw wsrep::runtime_error("Failed to initialize thread service");
+ }
+ services_enabled_.thread_service = services.thread_service;
+ }
+ if (services.tls_service)
+ {
+ if (init_tls_service(wsrep_->dlh, services.tls_service))
+ {
+ throw wsrep::runtime_error("Failed to initialize TLS service");
+ }
+ services_enabled_.tls_service = services.tls_service;
+ }
+ if (services.allowlist_service)
+ {
+ if (init_allowlist_service(wsrep_->dlh, services.allowlist_service))
+ {
+ throw wsrep::runtime_error("Failed to initialize allowlist service");
+ }
+ services_enabled_.allowlist_service = services.allowlist_service;
+ }
+ if (services.event_service)
+ {
+ if (init_event_service(wsrep_->dlh, services.event_service))
+ {
+ wsrep::log_warning() << "Failed to initialize event service";
+ // provider does not produce events, ignore
+ }
+ else
+ {
+ services_enabled_.event_service = services.event_service;
+ }
+ }
+}
+
+void wsrep::wsrep_provider_v26::deinit_services()
+{
+ if (services_enabled_.event_service)
+ deinit_event_service(wsrep_->dlh);
+ if (services_enabled_.tls_service)
+ deinit_tls_service(wsrep_->dlh);
+ if (services_enabled_.thread_service)
+ deinit_thread_service(wsrep_->dlh);
+ if (services_enabled_.allowlist_service)
+ deinit_allowlist_service(wsrep_->dlh);
+}
+
+wsrep::wsrep_provider_v26::wsrep_provider_v26(
+ wsrep::server_state& server_state,
+ const std::string& provider_options,
+ const std::string& provider_spec,
+ const wsrep::provider::services& services)
+ : provider(server_state)
+ , wsrep_()
+ , services_enabled_()
+{
+ wsrep_gtid_t state_id;
+ bool encryption_enabled = server_state.encryption_service() &&
+ server_state.encryption_service()->encryption_enabled();
+ std::memcpy(state_id.uuid.data,
+ server_state.initial_position().id().data(),
+ sizeof(state_id.uuid.data));
+ state_id.seqno = server_state.initial_position().seqno().get();
+ struct wsrep_init_args init_args;
+ memset(&init_args, 0, sizeof(init_args));
+ init_args.app_ctx = &server_state;
+ init_args.node_name = server_state_.name().c_str();
+ init_args.node_address = server_state_.address().c_str();
+ init_args.node_incoming = server_state_.incoming_address().c_str();
+ init_args.data_dir = server_state_.working_dir().c_str();
+ init_args.options = provider_options.c_str();
+ init_args.proto_ver = server_state.max_protocol_version();
+ init_args.state_id = &state_id;
+ init_args.state = 0;
+ init_args.logger_cb = &logger_cb;
+ init_args.connected_cb = &connected_cb;
+ init_args.view_cb = &view_cb;
+ init_args.sst_request_cb = &sst_request_cb;
+ init_args.encrypt_cb = encryption_enabled ? encrypt_cb : NULL;
+ init_args.apply_cb = &apply_cb;
+ init_args.unordered_cb = 0;
+ init_args.sst_donate_cb = &sst_donate_cb;
+ init_args.synced_cb = &synced_cb;
+
+ if (wsrep_load(provider_spec.c_str(), &wsrep_, logger_cb))
+ {
+ throw wsrep::runtime_error("Failed to load wsrep library");
+ }
+
+ init_services(services);
+
+ if (wsrep_->init(wsrep_, &init_args) != WSREP_OK)
+ {
+ throw wsrep::runtime_error("Failed to initialize wsrep provider");
+ }
+
+ if (encryption_enabled)
+ {
+ const std::vector<unsigned char>& key = server_state.get_encryption_key();
+ if (key.size())
+ {
+ wsrep::const_buffer const_key(key.data(), key.size());
+ enum status const retval(enc_set_key(const_key));
+ if (retval != success)
+ {
+ std::string msg("Failed to set encryption key: ");
+ msg += to_string(retval);
+ throw wsrep::runtime_error(msg);
+ }
+ }
+ }
+}
+
+wsrep::wsrep_provider_v26::~wsrep_provider_v26()
+{
+ wsrep_->free(wsrep_);
+ deinit_services();
+ wsrep_unload(wsrep_);
+}
+
+enum wsrep::provider::status wsrep::wsrep_provider_v26::connect(
+ const std::string& cluster_name,
+ const std::string& cluster_url,
+ const std::string& state_donor,
+ bool bootstrap)
+{
+ return map_return_value(wsrep_->connect(wsrep_,
+ cluster_name.c_str(),
+ cluster_url.c_str(),
+ state_donor.c_str(),
+ bootstrap));
+}
+
+int wsrep::wsrep_provider_v26::disconnect()
+{
+ int ret(0);
+ wsrep_status_t wret;
+ if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK)
+ {
+ std::cerr << "Failed to disconnect from cluster: "
+ << wret << "\n";
+ ret = 1;
+ }
+ return ret;
+}
+
+int wsrep::wsrep_provider_v26::capabilities() const
+{
+ return map_capabilities_from_native(wsrep_->capabilities(wsrep_));
+}
+int wsrep::wsrep_provider_v26::desync()
+{
+ return (wsrep_->desync(wsrep_) != WSREP_OK);
+}
+
+int wsrep::wsrep_provider_v26::resync()
+{
+ return (wsrep_->resync(wsrep_) != WSREP_OK);
+}
+
+wsrep::seqno wsrep::wsrep_provider_v26::pause()
+{
+ return wsrep::seqno(wsrep_->pause(wsrep_));
+}
+
+int wsrep::wsrep_provider_v26::resume()
+{
+ return (wsrep_->resume(wsrep_) != WSREP_OK);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::run_applier(
+ wsrep::high_priority_service *applier_ctx)
+{
+ return map_return_value(wsrep_->recv(wsrep_, applier_ctx));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::assign_read_view(wsrep::ws_handle& ws_handle,
+ const wsrep::gtid* gtid)
+{
+ const wsrep_gtid_t* gtid_ptr(NULL);
+ wsrep_gtid_t tmp;
+
+ if (gtid)
+ {
+ ::memcpy(&tmp.uuid, gtid->id().data(), sizeof(tmp.uuid));
+ tmp.seqno = gtid->seqno().get();
+ gtid_ptr = &tmp;
+ }
+
+ mutable_ws_handle mwsh(ws_handle);
+ return map_return_value(wsrep_->assign_read_view(wsrep_, mwsh.native(),
+ gtid_ptr));
+}
+
+int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle,
+ const wsrep::key& key)
+{
+ if (key.size() > 3)
+ {
+ assert(0);
+ return 1;
+ }
+ wsrep_buf_t key_parts[3];
+ for (size_t i(0); i < key.size(); ++i)
+ {
+ key_parts[i].ptr = key.key_parts()[i].ptr();
+ key_parts[i].len = key.key_parts()[i].size();
+ }
+ wsrep_key_t wsrep_key = {key_parts, key.size()};
+ mutable_ws_handle mwsh(ws_handle);
+ return (wsrep_->append_key(
+ wsrep_, mwsh.native(),
+ &wsrep_key, 1, map_key_type(key.type()), true)
+ != WSREP_OK);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
+ const wsrep::const_buffer& data)
+{
+ const wsrep_buf_t wsrep_buf = {data.data(), data.size()};
+ mutable_ws_handle mwsh(ws_handle);
+ return map_return_value(
+ wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf,
+ 1, WSREP_DATA_ORDERED, true));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
+ wsrep::ws_handle& ws_handle,
+ int flags,
+ wsrep::ws_meta& ws_meta)
+{
+ mutable_ws_handle mwsh(ws_handle);
+ mutable_ws_meta mmeta(ws_meta, flags);
+ return map_return_value(
+ wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
+ mmeta.native_flags(),
+ mmeta.native()));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::bf_abort(
+ wsrep::seqno bf_seqno,
+ wsrep::transaction_id victim_id,
+ wsrep::seqno& victim_seqno)
+{
+ wsrep_seqno_t wsrep_victim_seqno;
+ wsrep_status_t ret(
+ wsrep_->abort_certification(
+ wsrep_, seqno_to_native(bf_seqno),
+ victim_id.get(), &wsrep_victim_seqno));
+ victim_seqno = seqno_from_native(wsrep_victim_seqno);
+ return map_return_value(ret);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id)
+{
+ return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::commit_order_enter(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta)
+{
+ const_ws_handle cwsh(ws_handle);
+ const_ws_meta cwsm(ws_meta);
+ return map_return_value(
+ wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native()));
+}
+
+int
+wsrep::wsrep_provider_v26::commit_order_leave(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::mutable_buffer& err)
+{
+ const_ws_handle cwsh(ws_handle);
+ const_ws_meta cwsm(ws_meta);
+ wsrep_buf_t const err_buf = { err.data(), err.size() };
+ int ret(wsrep_->commit_order_leave(
+ wsrep_, cwsh.native(), cwsm.native(), &err_buf) != WSREP_OK);
+ return ret;
+}
+
+int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle)
+{
+ mutable_ws_handle mwsh(ws_handle);
+ return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle,
+ wsrep::high_priority_service* reply_service)
+{
+ const_ws_handle mwsh(ws_handle);
+ return map_return_value(
+ wsrep_->replay_trx(wsrep_, mwsh.native(), reply_service));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::enter_toi(
+ wsrep::client_id client_id,
+ const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ wsrep::ws_meta& ws_meta,
+ int flags)
+{
+ mutable_ws_meta mmeta(ws_meta, flags);
+ std::vector<std::vector<wsrep_buf_t> > key_parts;
+ std::vector<wsrep_key_t> wsrep_keys;
+ wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()};
+ for (size_t i(0); i < keys.size(); ++i)
+ {
+ key_parts.push_back(std::vector<wsrep_buf_t>());
+ for (size_t kp(0); kp < keys[i].size(); ++kp)
+ {
+ wsrep_buf_t buf = {keys[i].key_parts()[kp].data(),
+ keys[i].key_parts()[kp].size()};
+ key_parts[i].push_back(buf);
+ }
+ }
+ for (size_t i(0); i < key_parts.size(); ++i)
+ {
+ wsrep_key_t key = {key_parts[i].data(), key_parts[i].size()};
+ wsrep_keys.push_back(key);
+ }
+ return map_return_value(wsrep_->to_execute_start(
+ wsrep_,
+ client_id.get(),
+ wsrep_keys.data(),
+ wsrep_keys.size(),
+ &wsrep_buf,
+ buffer.size() ? 1 : 0,
+ mmeta.native_flags(),
+ mmeta.native()));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id,
+ const wsrep::mutable_buffer& err)
+{
+ const wsrep_buf_t err_buf = { err.data(), err.size() };
+ return map_return_value(wsrep_->to_execute_end(
+ wsrep_, client_id.get(), &err_buf));
+}
+
+std::pair<wsrep::gtid, enum wsrep::provider::status>
+wsrep::wsrep_provider_v26::causal_read(int timeout) const
+{
+ wsrep_gtid_t wsrep_gtid;
+ wsrep_status_t ret(wsrep_->sync_wait(wsrep_, 0, timeout, &wsrep_gtid));
+ wsrep::gtid gtid(ret == WSREP_OK ?
+ wsrep::gtid(wsrep::id(wsrep_gtid.uuid.data,
+ sizeof(wsrep_gtid.uuid.data)),
+ wsrep::seqno(wsrep_gtid.seqno)) :
+ wsrep::gtid::undefined());
+ return std::make_pair(gtid, map_return_value(ret));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
+ const
+{
+ wsrep_gtid_t wsrep_gtid;
+ std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
+ sizeof(wsrep_gtid.uuid.data));
+ wsrep_gtid.seqno = gtid.seqno().get();
+ return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0));
+}
+
+wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const
+{
+ wsrep_gtid_t wsrep_gtid;
+ if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK)
+ {
+ throw wsrep::runtime_error("Failed to read last committed id");
+ }
+ return wsrep::gtid(
+ wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)),
+ wsrep::seqno(wsrep_gtid.seqno));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
+{
+ wsrep_gtid_t wsrep_gtid;
+ std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
+ sizeof(wsrep_gtid.uuid.data));
+ wsrep_gtid.seqno = gtid.seqno().get();
+ return map_return_value(wsrep_->sst_sent(wsrep_, &wsrep_gtid, err));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::sst_received(const wsrep::gtid& gtid, int err)
+{
+ wsrep_gtid_t wsrep_gtid;
+ std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
+ sizeof(wsrep_gtid.uuid.data));
+ wsrep_gtid.seqno = gtid.seqno().get();
+ return map_return_value(wsrep_->sst_received(wsrep_, &wsrep_gtid, 0, err));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::enc_set_key(const wsrep::const_buffer& key)
+{
+ wsrep_enc_key_t enc_key = {key.data(), key.size()};
+ return map_return_value(wsrep_->enc_set_key(wsrep_, &enc_key));
+}
+
+std::vector<wsrep::provider::status_variable>
+wsrep::wsrep_provider_v26::status() const
+{
+ std::vector<status_variable> ret;
+ wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_));
+ wsrep_stats_var* i(stats);
+ if (i)
+ {
+ while (i->name)
+ {
+ switch (i->type)
+ {
+ case WSREP_VAR_STRING:
+ ret.push_back(status_variable(i->name, i->value._string));
+ break;
+ case WSREP_VAR_INT64:
+ {
+ std::ostringstream os;
+ os << i->value._int64;
+ ret.push_back(status_variable(i->name, os.str()));
+ break;
+ }
+ case WSREP_VAR_DOUBLE:
+ {
+ std::ostringstream os;
+ os << i->value._double;
+ ret.push_back(status_variable(i->name, os.str()));
+ break;
+ }
+ default:
+ assert(0);
+ break;
+ }
+ ++i;
+ }
+ wsrep_->stats_free(wsrep_, stats);
+ }
+ return ret;
+}
+
+void wsrep::wsrep_provider_v26::reset_status()
+{
+ wsrep_->stats_reset(wsrep_);
+}
+
+std::string wsrep::wsrep_provider_v26::options() const
+{
+ std::string ret;
+ char* opts;
+ if ((opts = wsrep_->options_get(wsrep_)))
+ {
+ ret = opts;
+ free(opts);
+ }
+ else
+ {
+ throw wsrep::runtime_error("Failed to get provider options");
+ }
+ return ret;
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::options(const std::string& opts)
+{
+ return map_return_value(wsrep_->options_set(wsrep_, opts.c_str()));
+}
+
+std::string wsrep::wsrep_provider_v26::name() const
+{
+ return (wsrep_->provider_name ? wsrep_->provider_name : "unknown");
+}
+
+std::string wsrep::wsrep_provider_v26::version() const
+{
+ return (wsrep_->provider_version ? wsrep_->provider_version : "unknown");
+}
+
+std::string wsrep::wsrep_provider_v26::vendor() const
+{
+ return (wsrep_->provider_vendor ? wsrep_->provider_vendor : "unknown");
+}
+
+void* wsrep::wsrep_provider_v26::native() const
+{
+ return wsrep_;
+}
diff --git a/wsrep-lib/src/wsrep_provider_v26.hpp b/wsrep-lib/src/wsrep_provider_v26.hpp
new file mode 100644
index 00000000..608b7c9b
--- /dev/null
+++ b/wsrep-lib/src/wsrep_provider_v26.hpp
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_WSREP_PROVIDER_V26_HPP
+#define WSREP_WSREP_PROVIDER_V26_HPP
+
+#include "wsrep/provider.hpp"
+
+struct wsrep_st;
+
+namespace wsrep
+{
+ class thread_service;
+ class wsrep_provider_v26 : public wsrep::provider
+ {
+ public:
+ void init_services(const wsrep::provider::services& services);
+ void deinit_services();
+ wsrep_provider_v26(wsrep::server_state&, const std::string&,
+ const std::string&,
+ const wsrep::provider::services& services);
+ ~wsrep_provider_v26() WSREP_OVERRIDE;
+ enum wsrep::provider::status
+ connect(const std::string&, const std::string&, const std::string&,
+ bool) WSREP_OVERRIDE;
+ int disconnect() WSREP_OVERRIDE;
+ int capabilities() const WSREP_OVERRIDE;
+
+ int desync() WSREP_OVERRIDE;
+ int resync() WSREP_OVERRIDE;
+ wsrep::seqno pause() WSREP_OVERRIDE;
+ int resume() WSREP_OVERRIDE;
+
+ enum wsrep::provider::status
+ run_applier(wsrep::high_priority_service*) WSREP_OVERRIDE;
+ int start_transaction(wsrep::ws_handle&) WSREP_OVERRIDE { return 0; }
+ enum wsrep::provider::status
+ assign_read_view(wsrep::ws_handle&, const wsrep::gtid*) WSREP_OVERRIDE;
+ int append_key(wsrep::ws_handle&, const wsrep::key&) WSREP_OVERRIDE;
+ enum wsrep::provider::status
+ append_data(wsrep::ws_handle&, const wsrep::const_buffer&)
+ WSREP_OVERRIDE;
+ enum wsrep::provider::status
+ certify(wsrep::client_id, wsrep::ws_handle&,
+ int,
+ wsrep::ws_meta&) WSREP_OVERRIDE;
+ enum wsrep::provider::status
+ bf_abort(wsrep::seqno,
+ wsrep::transaction_id,
+ wsrep::seqno&) WSREP_OVERRIDE;
+ enum wsrep::provider::status
+ rollback(const wsrep::transaction_id) WSREP_OVERRIDE;
+ enum wsrep::provider::status
+ commit_order_enter(const wsrep::ws_handle&,
+ const wsrep::ws_meta&) WSREP_OVERRIDE;
+ int commit_order_leave(const wsrep::ws_handle&,
+ const wsrep::ws_meta&,
+ const wsrep::mutable_buffer&) WSREP_OVERRIDE;
+ int release(wsrep::ws_handle&) WSREP_OVERRIDE;
+ enum wsrep::provider::status replay(const wsrep::ws_handle&,
+ wsrep::high_priority_service*)
+ WSREP_OVERRIDE;
+ enum wsrep::provider::status enter_toi(wsrep::client_id,
+ const wsrep::key_array&,
+ const wsrep::const_buffer&,
+ wsrep::ws_meta&,
+ int)
+ WSREP_OVERRIDE;
+ enum wsrep::provider::status leave_toi(wsrep::client_id,
+ const wsrep::mutable_buffer&)
+ WSREP_OVERRIDE;
+ std::pair<wsrep::gtid, enum wsrep::provider::status>
+ causal_read(int) const WSREP_OVERRIDE;
+ enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&, int)
+ const WSREP_OVERRIDE;
+ wsrep::gtid last_committed_gtid() const WSREP_OVERRIDE;
+ enum wsrep::provider::status sst_sent(const wsrep::gtid&, int)
+ WSREP_OVERRIDE;
+ enum wsrep::provider::status sst_received(const wsrep::gtid& gtid, int)
+ WSREP_OVERRIDE;
+ enum wsrep::provider::status enc_set_key(const wsrep::const_buffer& key)
+ WSREP_OVERRIDE;
+ std::vector<status_variable> status() const WSREP_OVERRIDE;
+ void reset_status() WSREP_OVERRIDE;
+ std::string options() const WSREP_OVERRIDE;
+ enum wsrep::provider::status options(const std::string&) WSREP_OVERRIDE;
+ std::string name() const WSREP_OVERRIDE;
+ std::string version() const WSREP_OVERRIDE;
+ std::string vendor() const WSREP_OVERRIDE;
+ void* native() const WSREP_OVERRIDE;
+ private:
+ wsrep_provider_v26(const wsrep_provider_v26&);
+ wsrep_provider_v26& operator=(const wsrep_provider_v26);
+ struct wsrep_st* wsrep_;
+ services services_enabled_;
+ };
+}
+
+
+#endif // WSREP_WSREP_PROVIDER_V26_HPP
diff --git a/wsrep-lib/src/xid.cpp b/wsrep-lib/src/xid.cpp
new file mode 100644
index 00000000..7757e445
--- /dev/null
+++ b/wsrep-lib/src/xid.cpp
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/xid.hpp"
+#include <ostream>
+
+std::string wsrep::to_string(const wsrep::xid& xid)
+{
+ return std::string(xid.data_.data(), xid.data_.size());
+}
+
+std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::xid& xid)
+{
+ return os << to_string(xid);
+}