summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/src/wsrep_provider_v26.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'wsrep-lib/src/wsrep_provider_v26.cpp')
-rw-r--r--wsrep-lib/src/wsrep_provider_v26.cpp1121
1 files changed, 1121 insertions, 0 deletions
diff --git a/wsrep-lib/src/wsrep_provider_v26.cpp b/wsrep-lib/src/wsrep_provider_v26.cpp
new file mode 100644
index 00000000..a579dff2
--- /dev/null
+++ b/wsrep-lib/src/wsrep_provider_v26.cpp
@@ -0,0 +1,1121 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep_provider_v26.hpp"
+
+#include "wsrep/encryption_service.hpp"
+#include "wsrep/server_state.hpp"
+#include "wsrep/high_priority_service.hpp"
+#include "wsrep/view.hpp"
+#include "wsrep/exception.hpp"
+#include "wsrep/logger.hpp"
+#include "wsrep/thread_service.hpp"
+#include "wsrep/tls_service.hpp"
+
+#include "thread_service_v1.hpp"
+#include "tls_service_v1.hpp"
+#include "v26/wsrep_api.h"
+
+
+#include <dlfcn.h>
+#include <cassert>
+#include <climits>
+
+#include <iostream>
+#include <sstream>
+#include <cstring> // strerror()
+
+namespace
+{
+ /////////////////////////////////////////////////////////////////////
+ // Helpers //
+ /////////////////////////////////////////////////////////////////////
+
+ enum wsrep::provider::status map_return_value(wsrep_status_t status)
+ {
+ switch (status)
+ {
+ case WSREP_OK:
+ return wsrep::provider::success;
+ case WSREP_WARNING:
+ return wsrep::provider::error_warning;
+ case WSREP_TRX_MISSING:
+ return wsrep::provider::error_transaction_missing;
+ case WSREP_TRX_FAIL:
+ return wsrep::provider::error_certification_failed;
+ case WSREP_BF_ABORT:
+ return wsrep::provider::error_bf_abort;
+ case WSREP_SIZE_EXCEEDED:
+ return wsrep::provider::error_size_exceeded;
+ case WSREP_CONN_FAIL:
+ return wsrep::provider::error_connection_failed;
+ case WSREP_NODE_FAIL:
+ return wsrep::provider::error_provider_failed;
+ case WSREP_FATAL:
+ return wsrep::provider::error_fatal;
+ case WSREP_NOT_IMPLEMENTED:
+ return wsrep::provider::error_not_implemented;
+ case WSREP_NOT_ALLOWED:
+ return wsrep::provider::error_not_allowed;
+ }
+
+ wsrep::log_warning() << "Unexpected value for wsrep_status_t: "
+ << status << " ("
+ << (status < 0 ? strerror(-status) : "") << ')';
+
+ return wsrep::provider::error_unknown;
+ }
+
+ wsrep_key_type_t map_key_type(enum wsrep::key::type type)
+ {
+ switch (type)
+ {
+ case wsrep::key::shared: return WSREP_KEY_SHARED;
+ case wsrep::key::reference: return WSREP_KEY_REFERENCE;
+ case wsrep::key::update: return WSREP_KEY_UPDATE;
+ case wsrep::key::exclusive: return WSREP_KEY_EXCLUSIVE;
+ }
+ assert(0);
+ throw wsrep::runtime_error("Invalid key type");
+ }
+
+ static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno)
+ {
+ return seqno.get();
+ }
+
+ static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno)
+ {
+ return wsrep::seqno(seqno);
+ }
+
+ template <typename F, typename T>
+ inline uint32_t map_one(const int flags, const F from,
+ const T to)
+ {
+ // INT_MAX because GCC 4.4 does not eat numeric_limits<int>::max()
+ // in static_assert
+ static_assert(WSREP_FLAGS_LAST < INT_MAX,
+ "WSREP_FLAGS_LAST < INT_MAX");
+ return static_cast<uint32_t>((flags & static_cast<int>(from)) ?
+ static_cast<int>(to) : 0);
+ }
+
+ uint32_t map_flags_to_native(int flags)
+ {
+ using wsrep::provider;
+ return static_cast<uint32_t>(
+ map_one(flags, provider::flag::start_transaction,
+ WSREP_FLAG_TRX_START) |
+ map_one(flags, provider::flag::commit, WSREP_FLAG_TRX_END) |
+ map_one(flags, provider::flag::rollback, WSREP_FLAG_ROLLBACK) |
+ map_one(flags, provider::flag::isolation, WSREP_FLAG_ISOLATION) |
+ map_one(flags, provider::flag::pa_unsafe, WSREP_FLAG_PA_UNSAFE) |
+ // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
+ // |
+ // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
+ map_one(flags, provider::flag::prepare, WSREP_FLAG_TRX_PREPARE) |
+ map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT) |
+ map_one(flags, provider::flag::implicit_deps,
+ WSREP_FLAG_IMPLICIT_DEPS));
+ }
+
+ int map_flags_from_native(uint32_t flags_u32)
+ {
+ using wsrep::provider;
+ const int flags(static_cast<int>(flags_u32));
+ return static_cast<int>(
+ map_one(flags, WSREP_FLAG_TRX_START,
+ provider::flag::start_transaction) |
+ map_one(flags, WSREP_FLAG_TRX_END, provider::flag::commit) |
+ map_one(flags, WSREP_FLAG_ROLLBACK, provider::flag::rollback) |
+ map_one(flags, WSREP_FLAG_ISOLATION, provider::flag::isolation) |
+ map_one(flags, WSREP_FLAG_PA_UNSAFE, provider::flag::pa_unsafe) |
+ // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
+ // |
+ // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
+ map_one(flags, WSREP_FLAG_TRX_PREPARE, provider::flag::prepare) |
+ map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot) |
+ map_one(flags, WSREP_FLAG_IMPLICIT_DEPS,
+ provider::flag::implicit_deps));
+ }
+
+ class mutable_ws_handle
+ {
+ public:
+ mutable_ws_handle(wsrep::ws_handle& ws_handle)
+ : ws_handle_(ws_handle)
+ , native_((wsrep_ws_handle_t)
+ {
+ ws_handle_.transaction_id().get(),
+ ws_handle_.opaque()
+ })
+ { }
+
+ ~mutable_ws_handle()
+ {
+ ws_handle_ = wsrep::ws_handle(
+ wsrep::transaction_id(native_.trx_id), native_.opaque);
+ }
+
+ wsrep_ws_handle_t* native()
+ {
+ return &native_;
+ }
+ private:
+ wsrep::ws_handle& ws_handle_;
+ wsrep_ws_handle_t native_;
+ };
+
+ class const_ws_handle
+ {
+ public:
+ const_ws_handle(const wsrep::ws_handle& ws_handle)
+ : ws_handle_(ws_handle)
+ , native_((wsrep_ws_handle_t)
+ {
+ ws_handle_.transaction_id().get(),
+ ws_handle_.opaque()
+ })
+ { }
+
+ ~const_ws_handle()
+ {
+ assert(ws_handle_.transaction_id().get() == native_.trx_id);
+ assert(ws_handle_.opaque() == native_.opaque);
+ }
+
+ const wsrep_ws_handle_t* native() const
+ {
+ return &native_;
+ }
+ private:
+ const wsrep::ws_handle& ws_handle_;
+ const wsrep_ws_handle_t native_;
+ };
+
+ class mutable_ws_meta
+ {
+ public:
+ mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags)
+ : ws_meta_(ws_meta)
+ , trx_meta_()
+ , flags_(flags)
+ {
+ std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
+ sizeof(trx_meta_.gtid.uuid.data));
+ trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
+ std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
+ sizeof(trx_meta_.stid.node.data));
+ trx_meta_.stid.conn = ws_meta.client_id().get();
+ trx_meta_.stid.trx = ws_meta.transaction_id().get();
+ trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
+ }
+
+ ~mutable_ws_meta()
+ {
+ ws_meta_ = wsrep::ws_meta(
+ wsrep::gtid(
+ wsrep::id(trx_meta_.gtid.uuid.data,
+ sizeof(trx_meta_.gtid.uuid.data)),
+ seqno_from_native(trx_meta_.gtid.seqno)),
+ wsrep::stid(wsrep::id(trx_meta_.stid.node.data,
+ sizeof(trx_meta_.stid.node.data)),
+ wsrep::transaction_id(trx_meta_.stid.trx),
+ wsrep::client_id(trx_meta_.stid.conn)),
+ seqno_from_native(trx_meta_.depends_on), flags_);
+ }
+
+ wsrep_trx_meta* native() { return &trx_meta_; }
+ uint32_t native_flags() const { return map_flags_to_native(flags_); }
+ private:
+ wsrep::ws_meta& ws_meta_;
+ wsrep_trx_meta_t trx_meta_;
+ int flags_;
+ };
+
+ class const_ws_meta
+ {
+ public:
+ const_ws_meta(const wsrep::ws_meta& ws_meta)
+ : trx_meta_()
+ {
+ std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
+ sizeof(trx_meta_.gtid.uuid.data));
+ trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
+ std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
+ sizeof(trx_meta_.stid.node.data));
+ trx_meta_.stid.conn = ws_meta.client_id().get();
+ trx_meta_.stid.trx = ws_meta.transaction_id().get();
+ trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
+ }
+
+ ~const_ws_meta()
+ {
+ }
+
+ const wsrep_trx_meta* native() const { return &trx_meta_; }
+ private:
+ wsrep_trx_meta_t trx_meta_;
+ };
+
+ enum wsrep::view::status map_view_status_from_native(
+ wsrep_view_status_t status)
+ {
+ switch (status)
+ {
+ case WSREP_VIEW_PRIMARY: return wsrep::view::primary;
+ case WSREP_VIEW_NON_PRIMARY: return wsrep::view::non_primary;
+ case WSREP_VIEW_DISCONNECTED: return wsrep::view::disconnected;
+ default: throw wsrep::runtime_error("Unknown view status");
+ }
+ }
+
+ /** @todo Currently capabilities defined in provider.hpp
+ * are one to one with wsrep_api.h. However, the mapping should
+ * be made explicit. */
+ int map_capabilities_from_native(wsrep_cap_t capabilities)
+ {
+ return static_cast<int>(capabilities);
+ }
+ wsrep::view view_from_native(const wsrep_view_info& view_info,
+ const wsrep::id& own_id)
+ {
+ std::vector<wsrep::view::member> members;
+ for (int i(0); i < view_info.memb_num; ++i)
+ {
+ wsrep::id id(view_info.members[i].id.data, sizeof(view_info.members[i].id.data));
+ std::string name(
+ view_info.members[i].name,
+ strnlen(view_info.members[i].name,
+ sizeof(view_info.members[i].name)));
+ std::string incoming(
+ view_info.members[i].incoming,
+ strnlen(view_info.members[i].incoming,
+ sizeof(view_info.members[i].incoming)));
+ members.push_back(wsrep::view::member(id, name, incoming));
+ }
+
+ int own_idx(-1);
+ if (own_id.is_undefined())
+ {
+ // If own ID is undefined, obtain it from the view. This is
+ // the case on the initial connect to cluster.
+ own_idx = view_info.my_idx;
+ }
+ else
+ {
+ // If the node has already obtained its ID from cluster,
+ // its position in the view (or lack thereof) must be determined
+ // by the ID.
+ for (size_t i(0); i < members.size(); ++i)
+ {
+ if (own_id == members[i].id())
+ {
+ own_idx = static_cast<int>(i);
+ break;
+ }
+ }
+ }
+
+ return wsrep::view(
+ wsrep::gtid(
+ wsrep::id(view_info.state_id.uuid.data,
+ sizeof(view_info.state_id.uuid.data)),
+ wsrep::seqno(view_info.state_id.seqno)),
+ wsrep::seqno(view_info.view),
+ map_view_status_from_native(view_info.status),
+ map_capabilities_from_native(view_info.capabilities),
+ own_idx,
+ view_info.proto_ver,
+ members);
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ // Callbacks //
+ /////////////////////////////////////////////////////////////////////
+
+ wsrep_cb_status_t connected_cb(
+ void* app_ctx,
+ const wsrep_view_info_t* view_info)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ wsrep::view view(view_from_native(*view_info, server_state.id()));
+ const ssize_t own_index(view.own_index());
+ assert(own_index >= 0);
+ if (own_index < 0)
+ {
+ wsrep::log_error() << "Connected without being in reported view";
+ return WSREP_CB_FAILURE;
+ }
+ assert(// first connect
+ server_state.id().is_undefined() ||
+ // reconnect to primary component
+ server_state.id() ==
+ view.members()[static_cast<size_t>(own_index)].id());
+ try
+ {
+ server_state.on_connect(view);
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_error() << "Exception: " << e.what();
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ wsrep_cb_status_t view_cb(void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view_info,
+ const char*,
+ size_t)
+ {
+ assert(app_ctx);
+ assert(view_info);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ wsrep::high_priority_service* high_priority_service(
+ reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
+ try
+ {
+ wsrep::view view(view_from_native(*view_info, server_state.id()));
+ server_state.on_view(view, high_priority_service);
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_error() << "Exception: " << e.what();
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ wsrep_cb_status_t sst_request_cb(void* app_ctx,
+ void **sst_req, size_t* sst_req_len)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+
+ try
+ {
+ std::string req(server_state.prepare_for_sst());
+ if (req.size() > 0)
+ {
+ *sst_req = ::malloc(req.size() + 1);
+ memcpy(*sst_req, req.data(), req.size() + 1);
+ *sst_req_len = req.size() + 1;
+ }
+ else
+ {
+ *sst_req = 0;
+ *sst_req_len = 0;
+ }
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ int encrypt_cb(void* app_ctx,
+ wsrep_enc_ctx_t* enc_ctx,
+ const wsrep_buf_t* input,
+ void* output,
+ wsrep_enc_direction_t direction,
+ bool last)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *static_cast<wsrep::server_state*>(app_ctx));
+
+ assert(server_state.encryption_service());
+ if (server_state.encryption_service() == 0)
+ {
+ wsrep::log_error() << "Encryption service not defined in encrypt_cb()";
+ return -1;
+ }
+
+ wsrep::const_buffer key(enc_ctx->key->ptr, enc_ctx->key->len);
+ wsrep::const_buffer in(input->ptr, input->len);
+ try
+ {
+ return server_state.encryption_service()->do_crypt(&enc_ctx->ctx,
+ key,
+ enc_ctx->iv,
+ in,
+ output,
+ direction == WSREP_ENC,
+ last);
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ free(enc_ctx->ctx);
+ // Return negative value in case of callback error
+ return -1;
+ }
+ }
+
+ wsrep_cb_status_t apply_cb(void* ctx,
+ const wsrep_ws_handle_t* wsh,
+ uint32_t flags,
+ const wsrep_buf_t* buf,
+ const wsrep_trx_meta_t* meta,
+ wsrep_bool_t* exit_loop)
+ {
+ wsrep::high_priority_service* high_priority_service(
+ reinterpret_cast<wsrep::high_priority_service*>(ctx));
+ assert(high_priority_service);
+
+ wsrep::const_buffer data(buf->ptr, buf->len);
+ wsrep::ws_handle ws_handle(wsrep::transaction_id(wsh->trx_id),
+ wsh->opaque);
+ wsrep::ws_meta ws_meta(
+ wsrep::gtid(wsrep::id(meta->gtid.uuid.data,
+ sizeof(meta->gtid.uuid.data)),
+ wsrep::seqno(meta->gtid.seqno)),
+ wsrep::stid(wsrep::id(meta->stid.node.data,
+ sizeof(meta->stid.node.data)),
+ wsrep::transaction_id(meta->stid.trx),
+ wsrep::client_id(meta->stid.conn)),
+ wsrep::seqno(seqno_from_native(meta->depends_on)),
+ map_flags_from_native(flags));
+ try
+ {
+ if (high_priority_service->apply(ws_handle, ws_meta, data))
+ {
+ return WSREP_CB_FAILURE;
+ }
+ *exit_loop = high_priority_service->must_exit();
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ wsrep::log_error() << "Caught runtime error while applying "
+ << ws_meta.flags() << ": "
+ << e.what();
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ wsrep_cb_status_t synced_cb(void* app_ctx)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ try
+ {
+ server_state.on_sync();
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ std::cerr << "On sync failed: " << e.what() << "\n";
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+
+ wsrep_cb_status_t sst_donate_cb(void* app_ctx,
+ void* ,
+ const wsrep_buf_t* req_buf,
+ const wsrep_gtid_t* req_gtid,
+ const wsrep_buf_t*,
+ bool bypass)
+ {
+ assert(app_ctx);
+ wsrep::server_state& server_state(
+ *reinterpret_cast<wsrep::server_state*>(app_ctx));
+ try
+ {
+ std::string req(reinterpret_cast<const char*>(req_buf->ptr),
+ req_buf->len);
+ wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
+ sizeof(req_gtid->uuid.data)),
+ wsrep::seqno(req_gtid->seqno));
+ if (server_state.start_sst(req, gtid, bypass))
+ {
+ return WSREP_CB_FAILURE;
+ }
+ return WSREP_CB_SUCCESS;
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ return WSREP_CB_FAILURE;
+ }
+ }
+
+ void logger_cb(wsrep_log_level_t level, const char* msg)
+ {
+ static const char* const pfx("P:"); // "provider"
+ wsrep::log::level ll(wsrep::log::unknown);
+ switch (level)
+ {
+ case WSREP_LOG_FATAL:
+ case WSREP_LOG_ERROR:
+ ll = wsrep::log::error;
+ break;
+ case WSREP_LOG_WARN:
+ ll = wsrep::log::warning;
+ break;
+ case WSREP_LOG_INFO:
+ ll = wsrep::log::info;
+ break;
+ case WSREP_LOG_DEBUG:
+ ll = wsrep::log::debug;
+ break;
+ }
+ wsrep::log(ll, pfx) << msg;
+ }
+
+ static int init_thread_service(void* dlh,
+ wsrep::thread_service* thread_service)
+ {
+ assert(thread_service);
+ if (wsrep::thread_service_v1_probe(dlh))
+ {
+ // No support in library.
+ return 1;
+ }
+ else
+ {
+ if (thread_service->before_init())
+ {
+ wsrep::log_error() << "Thread service before init failed";
+ return 1;
+ }
+ wsrep::thread_service_v1_init(dlh, thread_service);
+ if (thread_service->after_init())
+ {
+ wsrep::log_error() << "Thread service after init failed";
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ static void deinit_thread_service(void* dlh)
+ {
+ // assert(not wsrep::thread_service_v1_probe(dlh));
+ wsrep::thread_service_v1_deinit(dlh);
+ }
+
+ static int init_tls_service(void* dlh,
+ wsrep::tls_service* tls_service)
+ {
+ assert(tls_service);
+ if (not wsrep::tls_service_v1_probe(dlh))
+ {
+ return wsrep::tls_service_v1_init(dlh, tls_service);
+ }
+ return 1;
+ }
+
+ static void deinit_tls_service(void* dlh)
+ {
+ // assert(not wsrep::tls_service_v1_probe(dlh));
+ wsrep::tls_service_v1_deinit(dlh);
+ }
+}
+
+void wsrep::wsrep_provider_v26::init_services(
+ const wsrep::provider::services& services)
+{
+ if (services.thread_service)
+ {
+ if (init_thread_service(wsrep_->dlh, services.thread_service))
+ {
+ throw wsrep::runtime_error("Failed to initialize thread service");
+ }
+ services_enabled_.thread_service = services.thread_service;
+ }
+ if (services.tls_service)
+ {
+ if (init_tls_service(wsrep_->dlh, services.tls_service))
+ {
+ throw wsrep::runtime_error("Failed to initialze TLS service");
+ }
+ services_enabled_.tls_service = services.tls_service;
+ }
+}
+
+void wsrep::wsrep_provider_v26::deinit_services()
+{
+ if (services_enabled_.tls_service)
+ deinit_tls_service(wsrep_->dlh);
+ if (services_enabled_.thread_service)
+ deinit_thread_service(wsrep_->dlh);
+}
+
+wsrep::wsrep_provider_v26::wsrep_provider_v26(
+ wsrep::server_state& server_state,
+ const std::string& provider_options,
+ const std::string& provider_spec,
+ const wsrep::provider::services& services)
+ : provider(server_state)
+ , wsrep_()
+ , services_enabled_()
+{
+ wsrep_gtid_t state_id;
+ bool encryption_enabled = server_state.encryption_service() &&
+ server_state.encryption_service()->encryption_enabled();
+ std::memcpy(state_id.uuid.data,
+ server_state.initial_position().id().data(),
+ sizeof(state_id.uuid.data));
+ state_id.seqno = server_state.initial_position().seqno().get();
+ struct wsrep_init_args init_args;
+ memset(&init_args, 0, sizeof(init_args));
+ init_args.app_ctx = &server_state;
+ init_args.node_name = server_state_.name().c_str();
+ init_args.node_address = server_state_.address().c_str();
+ init_args.node_incoming = server_state_.incoming_address().c_str();
+ init_args.data_dir = server_state_.working_dir().c_str();
+ init_args.options = provider_options.c_str();
+ init_args.proto_ver = server_state.max_protocol_version();
+ init_args.state_id = &state_id;
+ init_args.state = 0;
+ init_args.logger_cb = &logger_cb;
+ init_args.connected_cb = &connected_cb;
+ init_args.view_cb = &view_cb;
+ init_args.sst_request_cb = &sst_request_cb;
+ init_args.encrypt_cb = encryption_enabled ? encrypt_cb : NULL;
+ init_args.apply_cb = &apply_cb;
+ init_args.unordered_cb = 0;
+ init_args.sst_donate_cb = &sst_donate_cb;
+ init_args.synced_cb = &synced_cb;
+
+ if (wsrep_load(provider_spec.c_str(), &wsrep_, logger_cb))
+ {
+ throw wsrep::runtime_error("Failed to load wsrep library");
+ }
+
+ init_services(services);
+
+ if (wsrep_->init(wsrep_, &init_args) != WSREP_OK)
+ {
+ throw wsrep::runtime_error("Failed to initialize wsrep provider");
+ }
+
+ if (encryption_enabled)
+ {
+ const std::vector<unsigned char>& key = server_state.get_encryption_key();
+ if (key.size())
+ {
+ wsrep::const_buffer const_key(key.data(), key.size());
+ enum status const retval(enc_set_key(const_key));
+ if (retval != success)
+ {
+ std::string msg("Failed to set encryption key: ");
+ msg += to_string(retval);
+ throw wsrep::runtime_error(msg);
+ }
+ }
+ }
+}
+
+wsrep::wsrep_provider_v26::~wsrep_provider_v26()
+{
+ wsrep_->free(wsrep_);
+ deinit_services();
+ wsrep_unload(wsrep_);
+}
+
+enum wsrep::provider::status wsrep::wsrep_provider_v26::connect(
+ const std::string& cluster_name,
+ const std::string& cluster_url,
+ const std::string& state_donor,
+ bool bootstrap)
+{
+ return map_return_value(wsrep_->connect(wsrep_,
+ cluster_name.c_str(),
+ cluster_url.c_str(),
+ state_donor.c_str(),
+ bootstrap));
+}
+
+int wsrep::wsrep_provider_v26::disconnect()
+{
+ int ret(0);
+ wsrep_status_t wret;
+ if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK)
+ {
+ std::cerr << "Failed to disconnect from cluster: "
+ << wret << "\n";
+ ret = 1;
+ }
+ return ret;
+}
+
+int wsrep::wsrep_provider_v26::capabilities() const
+{
+ return map_capabilities_from_native(wsrep_->capabilities(wsrep_));
+}
+int wsrep::wsrep_provider_v26::desync()
+{
+ return (wsrep_->desync(wsrep_) != WSREP_OK);
+}
+
+int wsrep::wsrep_provider_v26::resync()
+{
+ return (wsrep_->resync(wsrep_) != WSREP_OK);
+}
+
+wsrep::seqno wsrep::wsrep_provider_v26::pause()
+{
+ return wsrep::seqno(wsrep_->pause(wsrep_));
+}
+
+int wsrep::wsrep_provider_v26::resume()
+{
+ return (wsrep_->resume(wsrep_) != WSREP_OK);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::run_applier(
+ wsrep::high_priority_service *applier_ctx)
+{
+ return map_return_value(wsrep_->recv(wsrep_, applier_ctx));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::assign_read_view(wsrep::ws_handle& ws_handle,
+ const wsrep::gtid* gtid)
+{
+ const wsrep_gtid_t* gtid_ptr(NULL);
+ wsrep_gtid_t tmp;
+
+ if (gtid)
+ {
+ ::memcpy(&tmp.uuid, gtid->id().data(), sizeof(tmp.uuid));
+ tmp.seqno = gtid->seqno().get();
+ gtid_ptr = &tmp;
+ }
+
+ mutable_ws_handle mwsh(ws_handle);
+ return map_return_value(wsrep_->assign_read_view(wsrep_, mwsh.native(),
+ gtid_ptr));
+}
+
+int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle,
+ const wsrep::key& key)
+{
+ if (key.size() > 3)
+ {
+ assert(0);
+ return 1;
+ }
+ wsrep_buf_t key_parts[3];
+ for (size_t i(0); i < key.size(); ++i)
+ {
+ key_parts[i].ptr = key.key_parts()[i].ptr();
+ key_parts[i].len = key.key_parts()[i].size();
+ }
+ wsrep_key_t wsrep_key = {key_parts, key.size()};
+ mutable_ws_handle mwsh(ws_handle);
+ return (wsrep_->append_key(
+ wsrep_, mwsh.native(),
+ &wsrep_key, 1, map_key_type(key.type()), true)
+ != WSREP_OK);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
+ const wsrep::const_buffer& data)
+{
+ const wsrep_buf_t wsrep_buf = {data.data(), data.size()};
+ mutable_ws_handle mwsh(ws_handle);
+ return map_return_value(
+ wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf,
+ 1, WSREP_DATA_ORDERED, true));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
+ wsrep::ws_handle& ws_handle,
+ int flags,
+ wsrep::ws_meta& ws_meta)
+{
+ mutable_ws_handle mwsh(ws_handle);
+ mutable_ws_meta mmeta(ws_meta, flags);
+ return map_return_value(
+ wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
+ mmeta.native_flags(),
+ mmeta.native()));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::bf_abort(
+ wsrep::seqno bf_seqno,
+ wsrep::transaction_id victim_id,
+ wsrep::seqno& victim_seqno)
+{
+ wsrep_seqno_t wsrep_victim_seqno;
+ wsrep_status_t ret(
+ wsrep_->abort_certification(
+ wsrep_, seqno_to_native(bf_seqno),
+ victim_id.get(), &wsrep_victim_seqno));
+ victim_seqno = seqno_from_native(wsrep_victim_seqno);
+ return map_return_value(ret);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id)
+{
+ return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::commit_order_enter(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta)
+{
+ const_ws_handle cwsh(ws_handle);
+ const_ws_meta cwsm(ws_meta);
+ return map_return_value(
+ wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native()));
+}
+
+int
+wsrep::wsrep_provider_v26::commit_order_leave(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ const wsrep::mutable_buffer& err)
+{
+ const_ws_handle cwsh(ws_handle);
+ const_ws_meta cwsm(ws_meta);
+ wsrep_buf_t const err_buf = { err.data(), err.size() };
+ int ret(wsrep_->commit_order_leave(
+ wsrep_, cwsh.native(), cwsm.native(), &err_buf) != WSREP_OK);
+ return ret;
+}
+
+int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle)
+{
+ mutable_ws_handle mwsh(ws_handle);
+ return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK);
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle,
+ wsrep::high_priority_service* reply_service)
+{
+ const_ws_handle mwsh(ws_handle);
+ return map_return_value(
+ wsrep_->replay_trx(wsrep_, mwsh.native(), reply_service));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::enter_toi(
+ wsrep::client_id client_id,
+ const wsrep::key_array& keys,
+ const wsrep::const_buffer& buffer,
+ wsrep::ws_meta& ws_meta,
+ int flags)
+{
+ mutable_ws_meta mmeta(ws_meta, flags);
+ std::vector<std::vector<wsrep_buf_t> > key_parts;
+ std::vector<wsrep_key_t> wsrep_keys;
+ wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()};
+ for (size_t i(0); i < keys.size(); ++i)
+ {
+ key_parts.push_back(std::vector<wsrep_buf_t>());
+ for (size_t kp(0); kp < keys[i].size(); ++kp)
+ {
+ wsrep_buf_t buf = {keys[i].key_parts()[kp].data(),
+ keys[i].key_parts()[kp].size()};
+ key_parts[i].push_back(buf);
+ }
+ }
+ for (size_t i(0); i < key_parts.size(); ++i)
+ {
+ wsrep_key_t key = {key_parts[i].data(), key_parts[i].size()};
+ wsrep_keys.push_back(key);
+ }
+ return map_return_value(wsrep_->to_execute_start(
+ wsrep_,
+ client_id.get(),
+ wsrep_keys.data(),
+ wsrep_keys.size(),
+ &wsrep_buf,
+ buffer.size() ? 1 : 0,
+ mmeta.native_flags(),
+ mmeta.native()));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id,
+ const wsrep::mutable_buffer& err)
+{
+ const wsrep_buf_t err_buf = { err.data(), err.size() };
+ return map_return_value(wsrep_->to_execute_end(
+ wsrep_, client_id.get(), &err_buf));
+}
+
+std::pair<wsrep::gtid, enum wsrep::provider::status>
+wsrep::wsrep_provider_v26::causal_read(int timeout) const
+{
+ wsrep_gtid_t wsrep_gtid;
+ wsrep_status_t ret(wsrep_->sync_wait(wsrep_, 0, timeout, &wsrep_gtid));
+ wsrep::gtid gtid(ret == WSREP_OK ?
+ wsrep::gtid(wsrep::id(wsrep_gtid.uuid.data,
+ sizeof(wsrep_gtid.uuid.data)),
+ wsrep::seqno(wsrep_gtid.seqno)) :
+ wsrep::gtid::undefined());
+ return std::make_pair(gtid, map_return_value(ret));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
+ const
+{
+ wsrep_gtid_t wsrep_gtid;
+ std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
+ sizeof(wsrep_gtid.uuid.data));
+ wsrep_gtid.seqno = gtid.seqno().get();
+ return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0));
+}
+
+wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const
+{
+ wsrep_gtid_t wsrep_gtid;
+ if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK)
+ {
+ throw wsrep::runtime_error("Failed to read last committed id");
+ }
+ return wsrep::gtid(
+ wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)),
+ wsrep::seqno(wsrep_gtid.seqno));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
+{
+ wsrep_gtid_t wsrep_gtid;
+ std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
+ sizeof(wsrep_gtid.uuid.data));
+ wsrep_gtid.seqno = gtid.seqno().get();
+ return map_return_value(wsrep_->sst_sent(wsrep_, &wsrep_gtid, err));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::sst_received(const wsrep::gtid& gtid, int err)
+{
+ wsrep_gtid_t wsrep_gtid;
+ std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
+ sizeof(wsrep_gtid.uuid.data));
+ wsrep_gtid.seqno = gtid.seqno().get();
+ return map_return_value(wsrep_->sst_received(wsrep_, &wsrep_gtid, 0, err));
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::enc_set_key(const wsrep::const_buffer& key)
+{
+ wsrep_enc_key_t enc_key = {key.data(), key.size()};
+ return map_return_value(wsrep_->enc_set_key(wsrep_, &enc_key));
+}
+
+std::vector<wsrep::provider::status_variable>
+wsrep::wsrep_provider_v26::status() const
+{
+ std::vector<status_variable> ret;
+ wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_));
+ wsrep_stats_var* i(stats);
+ if (i)
+ {
+ while (i->name)
+ {
+ switch (i->type)
+ {
+ case WSREP_VAR_STRING:
+ ret.push_back(status_variable(i->name, i->value._string));
+ break;
+ case WSREP_VAR_INT64:
+ {
+ std::ostringstream os;
+ os << i->value._int64;
+ ret.push_back(status_variable(i->name, os.str()));
+ break;
+ }
+ case WSREP_VAR_DOUBLE:
+ {
+ std::ostringstream os;
+ os << i->value._double;
+ ret.push_back(status_variable(i->name, os.str()));
+ break;
+ }
+ default:
+ assert(0);
+ break;
+ }
+ ++i;
+ }
+ wsrep_->stats_free(wsrep_, stats);
+ }
+ return ret;
+}
+
+void wsrep::wsrep_provider_v26::reset_status()
+{
+ wsrep_->stats_reset(wsrep_);
+}
+
+std::string wsrep::wsrep_provider_v26::options() const
+{
+ std::string ret;
+ char* opts;
+ if ((opts = wsrep_->options_get(wsrep_)))
+ {
+ ret = opts;
+ free(opts);
+ }
+ else
+ {
+ throw wsrep::runtime_error("Failed to get provider options");
+ }
+ return ret;
+}
+
+enum wsrep::provider::status
+wsrep::wsrep_provider_v26::options(const std::string& opts)
+{
+ return map_return_value(wsrep_->options_set(wsrep_, opts.c_str()));
+}
+
+std::string wsrep::wsrep_provider_v26::name() const
+{
+ return (wsrep_->provider_name ? wsrep_->provider_name : "unknown");
+}
+
+std::string wsrep::wsrep_provider_v26::version() const
+{
+ return (wsrep_->provider_version ? wsrep_->provider_version : "unknown");
+}
+
+std::string wsrep::wsrep_provider_v26::vendor() const
+{
+ return (wsrep_->provider_vendor ? wsrep_->provider_vendor : "unknown");
+}
+
+void* wsrep::wsrep_provider_v26::native() const
+{
+ return wsrep_;
+}