/*
 * Copyright (C) 2018-2021 Codership Oy <info@codership.com>
 *
 * This file is part of wsrep-lib.
 *
 * Wsrep-lib is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 2 of the License, or
 * (at your option) any later version.
 *
 * Wsrep-lib is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
 */

#include "wsrep_provider_v26.hpp"

#include "wsrep/encryption_service.hpp"
#include "wsrep/server_state.hpp"
#include "wsrep/high_priority_service.hpp"
#include "wsrep/view.hpp"
#include "wsrep/exception.hpp"
#include "wsrep/logger.hpp"
#include "wsrep/thread_service.hpp"
#include "wsrep/tls_service.hpp"
#include "wsrep/allowlist_service.hpp"

#include "thread_service_v1.hpp"
#include "tls_service_v1.hpp"
#include "allowlist_service_v1.hpp"
#include "event_service_v1.hpp"
#include "v26/wsrep_api.h"


#include <dlfcn.h>
#include <cassert>
#include <climits>

#include <iostream>
#include <sstream>
#include <cstring> // strerror()

namespace
{
    /////////////////////////////////////////////////////////////////////
    //                           Helpers                               //
    /////////////////////////////////////////////////////////////////////

    enum wsrep::provider::status map_return_value(wsrep_status_t status)
    {
        switch (status)
        {
        case WSREP_OK:
            return wsrep::provider::success;
        case WSREP_WARNING:
            return wsrep::provider::error_warning;
        case WSREP_TRX_MISSING:
            return wsrep::provider::error_transaction_missing;
        case WSREP_TRX_FAIL:
            return wsrep::provider::error_certification_failed;
        case WSREP_BF_ABORT:
            return wsrep::provider::error_bf_abort;
        case WSREP_SIZE_EXCEEDED:
            return wsrep::provider::error_size_exceeded;
        case WSREP_CONN_FAIL:
            return wsrep::provider::error_connection_failed;
        case WSREP_NODE_FAIL:
            return wsrep::provider::error_provider_failed;
        case WSREP_FATAL:
            return wsrep::provider::error_fatal;
        case WSREP_NOT_IMPLEMENTED:
            return wsrep::provider::error_not_implemented;
        case WSREP_NOT_ALLOWED:
            return wsrep::provider::error_not_allowed;
        }

        wsrep::log_warning() << "Unexpected value for wsrep_status_t: "
                             << status << " ("
                             << (status < 0 ? strerror(-status) : "") << ')';

        return wsrep::provider::error_unknown;
    }

    wsrep_key_type_t map_key_type(enum wsrep::key::type type)
    {
        switch (type)
        {
        case wsrep::key::shared:        return WSREP_KEY_SHARED;
        case wsrep::key::reference:     return WSREP_KEY_REFERENCE;
        case wsrep::key::update:        return WSREP_KEY_UPDATE;
        case wsrep::key::exclusive:     return WSREP_KEY_EXCLUSIVE;
        }
        assert(0);
        throw wsrep::runtime_error("Invalid key type");
    }

    static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno)
    {
        return seqno.get();
    }

    static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno)
    {
        return wsrep::seqno(seqno);
    }

    template <typename F, typename T>
    inline uint32_t map_one(const int flags, const F from,
                            const T to)
    {
        // INT_MAX because GCC 4.4 does not eat numeric_limits<int>::max()
        // in static_assert
        static_assert(WSREP_FLAGS_LAST < INT_MAX,
                      "WSREP_FLAGS_LAST < INT_MAX");
        return static_cast<uint32_t>((flags & static_cast<int>(from)) ?
                                     static_cast<int>(to) : 0);
    }

    uint32_t map_flags_to_native(int flags)
    {
      using wsrep::provider;
      return static_cast<uint32_t>(
          map_one(flags, provider::flag::start_transaction,
                  WSREP_FLAG_TRX_START) |
          map_one(flags, provider::flag::commit, WSREP_FLAG_TRX_END) |
          map_one(flags, provider::flag::rollback, WSREP_FLAG_ROLLBACK) |
          map_one(flags, provider::flag::isolation, WSREP_FLAG_ISOLATION) |
          map_one(flags, provider::flag::pa_unsafe, WSREP_FLAG_PA_UNSAFE) |
          // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
          // |
          // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
          map_one(flags, provider::flag::prepare, WSREP_FLAG_TRX_PREPARE) |
          map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT) |
          map_one(flags, provider::flag::implicit_deps,
                  WSREP_FLAG_IMPLICIT_DEPS));
    }

    int map_flags_from_native(uint32_t flags_u32)
    {
      using wsrep::provider;
      const int flags(static_cast<int>(flags_u32));
      return static_cast<int>(
          map_one(flags, WSREP_FLAG_TRX_START,
                  provider::flag::start_transaction) |
          map_one(flags, WSREP_FLAG_TRX_END, provider::flag::commit) |
          map_one(flags, WSREP_FLAG_ROLLBACK, provider::flag::rollback) |
          map_one(flags, WSREP_FLAG_ISOLATION, provider::flag::isolation) |
          map_one(flags, WSREP_FLAG_PA_UNSAFE, provider::flag::pa_unsafe) |
          // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
          // |
          // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
          map_one(flags, WSREP_FLAG_TRX_PREPARE, provider::flag::prepare) |
          map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot) |
          map_one(flags, WSREP_FLAG_IMPLICIT_DEPS,
                  provider::flag::implicit_deps));
    }

    class mutable_ws_handle
    {
    public:
        mutable_ws_handle(wsrep::ws_handle& ws_handle)
            : ws_handle_(ws_handle)
            , native_((wsrep_ws_handle_t)
                      {
                          ws_handle_.transaction_id().get(),
                          ws_handle_.opaque()
                      })
        { }

        ~mutable_ws_handle()
        {
            ws_handle_ = wsrep::ws_handle(
                wsrep::transaction_id(native_.trx_id), native_.opaque);
        }

        wsrep_ws_handle_t* native()
        {
            return &native_;
        }
    private:
        wsrep::ws_handle& ws_handle_;
        wsrep_ws_handle_t native_;
    };

    class const_ws_handle
    {
    public:
        const_ws_handle(const wsrep::ws_handle& ws_handle)
            : ws_handle_(ws_handle)
            , native_((wsrep_ws_handle_t)
                      {
                          ws_handle_.transaction_id().get(),
                          ws_handle_.opaque()
                      })
        { }

        ~const_ws_handle()
        {
            assert(ws_handle_.transaction_id().get() == native_.trx_id);
            assert(ws_handle_.opaque() == native_.opaque);
        }

        const wsrep_ws_handle_t* native() const
        {
            return &native_;
        }
    private:
        const wsrep::ws_handle& ws_handle_;
        const wsrep_ws_handle_t native_;
    };

    class mutable_ws_meta
    {
    public:
        mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags)
            : ws_meta_(ws_meta)
            , trx_meta_()
            , flags_(flags)
        {
            std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
                        sizeof(trx_meta_.gtid.uuid.data));
            trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
            std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
                        sizeof(trx_meta_.stid.node.data));
            trx_meta_.stid.conn = ws_meta.client_id().get();
            trx_meta_.stid.trx = ws_meta.transaction_id().get();
            trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
        }

        ~mutable_ws_meta()
        {
            ws_meta_ = wsrep::ws_meta(
                wsrep::gtid(
                    wsrep::id(trx_meta_.gtid.uuid.data,
                              sizeof(trx_meta_.gtid.uuid.data)),
                    seqno_from_native(trx_meta_.gtid.seqno)),
                wsrep::stid(wsrep::id(trx_meta_.stid.node.data,
                                      sizeof(trx_meta_.stid.node.data)),
                            wsrep::transaction_id(trx_meta_.stid.trx),
                            wsrep::client_id(trx_meta_.stid.conn)),
                seqno_from_native(trx_meta_.depends_on), flags_);
        }

        wsrep_trx_meta* native() { return &trx_meta_; }
        uint32_t native_flags() const { return map_flags_to_native(flags_); }
    private:
        wsrep::ws_meta& ws_meta_;
        wsrep_trx_meta_t trx_meta_;
        int flags_;
    };

    class const_ws_meta
    {
    public:
        const_ws_meta(const wsrep::ws_meta& ws_meta)
            : trx_meta_()
        {
            std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
                        sizeof(trx_meta_.gtid.uuid.data));
            trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
            std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
                        sizeof(trx_meta_.stid.node.data));
            trx_meta_.stid.conn = ws_meta.client_id().get();
            trx_meta_.stid.trx = ws_meta.transaction_id().get();
            trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
        }

        ~const_ws_meta()
        {
        }

        const wsrep_trx_meta* native() const { return &trx_meta_; }
    private:
        wsrep_trx_meta_t trx_meta_;
    };

    enum wsrep::view::status map_view_status_from_native(
        wsrep_view_status_t status)
    {
        switch (status)
        {
        case WSREP_VIEW_PRIMARY: return wsrep::view::primary;
        case WSREP_VIEW_NON_PRIMARY: return wsrep::view::non_primary;
        case WSREP_VIEW_DISCONNECTED: return wsrep::view::disconnected;
        default: throw wsrep::runtime_error("Unknown view status");
        }
    }

    /** @todo Currently capabilities defined in provider.hpp
     * are one to one with wsrep_api.h. However, the mapping should
     * be made explicit. */
    int map_capabilities_from_native(wsrep_cap_t capabilities)
    {
        return static_cast<int>(capabilities);
    }
    wsrep::view view_from_native(const wsrep_view_info& view_info,
                                 const wsrep::id& own_id)
    {
        std::vector<wsrep::view::member> members;
        for (int i(0); i < view_info.memb_num; ++i)
        {
            wsrep::id id(view_info.members[i].id.data, sizeof(view_info.members[i].id.data));
            std::string name(
                view_info.members[i].name,
                strnlen(view_info.members[i].name,
                        sizeof(view_info.members[i].name)));
            std::string incoming(
                view_info.members[i].incoming,
                strnlen(view_info.members[i].incoming,
                        sizeof(view_info.members[i].incoming)));
            members.push_back(wsrep::view::member(id, name, incoming));
        }

        int own_idx(-1);
        if (own_id.is_undefined())
        {
            // If own ID is undefined, obtain it from the view. This is
            // the case on the initial connect to cluster.
            own_idx = view_info.my_idx;
        }
        else
        {
            // If the node has already obtained its ID from cluster,
            // its position in the view (or lack thereof) must be determined
            // by the ID.
            for (size_t i(0); i < members.size(); ++i)
            {
                if (own_id == members[i].id())
                {
                    own_idx = static_cast<int>(i);
                    break;
                }
            }
        }

        return wsrep::view(
            wsrep::gtid(
                wsrep::id(view_info.state_id.uuid.data,
                          sizeof(view_info.state_id.uuid.data)),
                wsrep::seqno(view_info.state_id.seqno)),
            wsrep::seqno(view_info.view),
            map_view_status_from_native(view_info.status),
            map_capabilities_from_native(view_info.capabilities),
            own_idx,
            view_info.proto_ver,
            members);
    }

    /////////////////////////////////////////////////////////////////////
    //                         Callbacks                               //
    /////////////////////////////////////////////////////////////////////

    wsrep_cb_status_t connected_cb(
        void* app_ctx,
        const wsrep_view_info_t* view_info)
    {
        assert(app_ctx);
        wsrep::server_state& server_state(
            *reinterpret_cast<wsrep::server_state*>(app_ctx));
        wsrep::view view(view_from_native(*view_info, server_state.id()));
        const ssize_t own_index(view.own_index());
        assert(own_index >= 0);
        if (own_index < 0)
        {
            wsrep::log_error() << "Connected without being in reported view";
            return WSREP_CB_FAILURE;
        }
        assert(// first connect
            server_state.id().is_undefined() ||
            // reconnect to primary component
            server_state.id() ==
            view.members()[static_cast<size_t>(own_index)].id());
        try
        {
            server_state.on_connect(view);
            return WSREP_CB_SUCCESS;
        }
        catch (const wsrep::runtime_error& e)
        {
            wsrep::log_error() << "Exception: " << e.what();
            return WSREP_CB_FAILURE;
        }
    }

    wsrep_cb_status_t view_cb(void* app_ctx,
                              void* recv_ctx,
                              const wsrep_view_info_t* view_info,
                              const char*,
                              size_t)
    {
        assert(app_ctx);
        assert(view_info);
        wsrep::server_state& server_state(
            *reinterpret_cast<wsrep::server_state*>(app_ctx));
        wsrep::high_priority_service* high_priority_service(
            reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
        try
        {
            wsrep::view view(view_from_native(*view_info, server_state.id()));
            server_state.on_view(view, high_priority_service);
            return WSREP_CB_SUCCESS;
        }
        catch (const wsrep::runtime_error& e)
        {
            wsrep::log_error() << "Exception: " << e.what();
            return WSREP_CB_FAILURE;
        }
    }

    wsrep_cb_status_t sst_request_cb(void* app_ctx,
                                     void **sst_req, size_t* sst_req_len)
    {
        assert(app_ctx);
        wsrep::server_state& server_state(
            *reinterpret_cast<wsrep::server_state*>(app_ctx));

        try
        {
            std::string req(server_state.prepare_for_sst());
            if (req.size() > 0)
            {
                *sst_req = ::malloc(req.size() + 1);
                memcpy(*sst_req, req.data(), req.size() + 1);
                *sst_req_len = req.size() + 1;
            }
            else
            {
                *sst_req = 0;
                *sst_req_len = 0;
            }
            return WSREP_CB_SUCCESS;
        }
        catch (const wsrep::runtime_error& e)
        {
            return WSREP_CB_FAILURE;
        }
    }

    int encrypt_cb(void*                 app_ctx,
                   wsrep_enc_ctx_t*      enc_ctx,
                   const wsrep_buf_t*    input,
                   void*                 output,
                   wsrep_enc_direction_t direction,
                   bool                  last)
    {
        assert(app_ctx);
        wsrep::server_state& server_state(
            *static_cast<wsrep::server_state*>(app_ctx));

        assert(server_state.encryption_service());
        if (server_state.encryption_service() == 0)
        {
            wsrep::log_error() << "Encryption service not defined in encrypt_cb()";
            return -1;
        }

        wsrep::const_buffer key(enc_ctx->key->ptr, enc_ctx->key->len);
        wsrep::const_buffer in(input->ptr, input->len);
        try
        {
            return server_state.encryption_service()->do_crypt(&enc_ctx->ctx,
                                                              key,
                                                              enc_ctx->iv,
                                                              in,
                                                              output,
                                                              direction == WSREP_ENC,
                                                              last);
        }
        catch (const wsrep::runtime_error& e)
        {
            free(enc_ctx->ctx);
            // Return negative value in case of callback error
            return -1;
        }
    }

    wsrep_cb_status_t apply_cb(void* ctx,
                               const wsrep_ws_handle_t* wsh,
                               uint32_t flags,
                               const wsrep_buf_t* buf,
                               const wsrep_trx_meta_t* meta,
                               wsrep_bool_t* exit_loop)
    {
        wsrep::high_priority_service* high_priority_service(
            reinterpret_cast<wsrep::high_priority_service*>(ctx));
        assert(high_priority_service);

        wsrep::const_buffer data(buf->ptr, buf->len);
        wsrep::ws_handle ws_handle(wsrep::transaction_id(wsh->trx_id),
                                   wsh->opaque);
        wsrep::ws_meta ws_meta(
            wsrep::gtid(wsrep::id(meta->gtid.uuid.data,
                                  sizeof(meta->gtid.uuid.data)),
                        wsrep::seqno(meta->gtid.seqno)),
            wsrep::stid(wsrep::id(meta->stid.node.data,
                                  sizeof(meta->stid.node.data)),
                        wsrep::transaction_id(meta->stid.trx),
                        wsrep::client_id(meta->stid.conn)),
            wsrep::seqno(seqno_from_native(meta->depends_on)),
            map_flags_from_native(flags));
        try
        {
            if (high_priority_service->apply(ws_handle, ws_meta, data))
            {
                return WSREP_CB_FAILURE;
            }
            *exit_loop = high_priority_service->must_exit();
            return WSREP_CB_SUCCESS;
        }
        catch (const wsrep::runtime_error& e)
        {
            wsrep::log_error() << "Caught runtime error while applying "
                               << ws_meta.flags() << ": "
                               << e.what();
            return WSREP_CB_FAILURE;
        }
    }

    wsrep_cb_status_t synced_cb(void* app_ctx)
    {
        assert(app_ctx);
        wsrep::server_state& server_state(
            *reinterpret_cast<wsrep::server_state*>(app_ctx));
        try
        {
            server_state.on_sync();
            return WSREP_CB_SUCCESS;
        }
        catch (const wsrep::runtime_error& e)
        {
            std::cerr << "On sync failed: " << e.what() << "\n";
            return WSREP_CB_FAILURE;
        }
    }


    wsrep_cb_status_t sst_donate_cb(void* app_ctx,
                                    void* ,
                                    const wsrep_buf_t* req_buf,
                                    const wsrep_gtid_t* req_gtid,
                                    const wsrep_buf_t*,
                                    bool bypass)
    {
        assert(app_ctx);
        wsrep::server_state& server_state(
            *reinterpret_cast<wsrep::server_state*>(app_ctx));
        try
        {
            std::string req(reinterpret_cast<const char*>(req_buf->ptr),
                            req_buf->len);
            wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
                                       sizeof(req_gtid->uuid.data)),
                             wsrep::seqno(req_gtid->seqno));
            if (server_state.start_sst(req, gtid, bypass))
            {
                return WSREP_CB_FAILURE;
            }
            return WSREP_CB_SUCCESS;
        }
        catch (const wsrep::runtime_error& e)
        {
            return WSREP_CB_FAILURE;
        }
    }

    void logger_cb(wsrep_log_level_t level, const char* msg)
    {
        static const char* const pfx("P:"); // "provider"
        wsrep::log::level ll(wsrep::log::unknown);
        switch (level)
        {
        case WSREP_LOG_FATAL:
        case WSREP_LOG_ERROR:
            ll = wsrep::log::error;
            break;
        case WSREP_LOG_WARN:
            ll = wsrep::log::warning;
            break;
        case WSREP_LOG_INFO:
            ll = wsrep::log::info;
            break;
        case WSREP_LOG_DEBUG:
            ll = wsrep::log::debug;
            break;
        }
        wsrep::log(ll, pfx) << msg;
    }

    static int init_thread_service(void* dlh,
                                   wsrep::thread_service* thread_service)
    {
        assert(thread_service);
        if (wsrep::thread_service_v1_probe(dlh))
        {
            // No support in library.
            return 1;
        }
        else
        {
            if (thread_service->before_init())
            {
                wsrep::log_error() << "Thread service before init failed";
                return 1;
            }
            wsrep::thread_service_v1_init(dlh, thread_service);
            if (thread_service->after_init())
            {
                wsrep::log_error() << "Thread service after init failed";
                return 1;
            }
        }
        return 0;
    }

    static void deinit_thread_service(void* dlh)
    {
        // assert(not wsrep::thread_service_v1_probe(dlh));
        wsrep::thread_service_v1_deinit(dlh);
    }

    static int init_tls_service(void* dlh,
                                wsrep::tls_service* tls_service)
    {
        assert(tls_service);
        if (not wsrep::tls_service_v1_probe(dlh))
        {
            return wsrep::tls_service_v1_init(dlh, tls_service);
        }
        return 1;
    }

    static void deinit_tls_service(void* dlh)
    {
        // assert(not wsrep::tls_service_v1_probe(dlh));
        wsrep::tls_service_v1_deinit(dlh);
    }

    static int init_allowlist_service(void* dlh,
                                      wsrep::allowlist_service* allowlist_service)
    {
        assert(allowlist_service);
        if (not wsrep::allowlist_service_v1_probe(dlh))
        {
            return wsrep::allowlist_service_v1_init(dlh, allowlist_service);
        }
        return 1;
    }

    static void deinit_allowlist_service(void* dlh)
    {
        // assert(not wsrep::allowlist_service_v1_probe(dlh));
        wsrep::allowlist_service_v1_deinit(dlh);
    }

    static int init_event_service(void* dlh,
                                  wsrep::event_service* service)
    {
        assert(service);
        if (not wsrep::event_service_v1_probe(dlh))
        {
            return wsrep::event_service_v1_init(dlh, service);
        }
        return 1;
    }

    static void deinit_event_service(void* dlh)
    {
        wsrep::event_service_v1_deinit(dlh);
    }
}



void wsrep::wsrep_provider_v26::init_services(
    const wsrep::provider::services& services)
{
    if (services.thread_service)
    {
        if (init_thread_service(wsrep_->dlh, services.thread_service))
        {
            throw wsrep::runtime_error("Failed to initialize thread service");
        }
        services_enabled_.thread_service = services.thread_service;
    }
    if (services.tls_service)
    {
        if (init_tls_service(wsrep_->dlh, services.tls_service))
        {
            throw wsrep::runtime_error("Failed to initialize TLS service");
        }
        services_enabled_.tls_service = services.tls_service;
    }
    if (services.allowlist_service)
    {
        if (init_allowlist_service(wsrep_->dlh, services.allowlist_service))
        {
            throw wsrep::runtime_error("Failed to initialize allowlist service");
        }
        services_enabled_.allowlist_service = services.allowlist_service;
    }
    if (services.event_service)
    {
        if (init_event_service(wsrep_->dlh, services.event_service))
        {
            wsrep::log_warning() << "Failed to initialize event service";
            // provider does not produce events, ignore
        }
        else
        {
            services_enabled_.event_service = services.event_service;
        }
    }
}

void wsrep::wsrep_provider_v26::deinit_services()
{
    if (services_enabled_.event_service)
        deinit_event_service(wsrep_->dlh);
    if (services_enabled_.tls_service)
        deinit_tls_service(wsrep_->dlh);
    if (services_enabled_.thread_service)
        deinit_thread_service(wsrep_->dlh);
    if (services_enabled_.allowlist_service)
        deinit_allowlist_service(wsrep_->dlh);
}

wsrep::wsrep_provider_v26::wsrep_provider_v26(
    wsrep::server_state& server_state,
    const std::string& provider_options,
    const std::string& provider_spec,
    const wsrep::provider::services& services)
    : provider(server_state)
    , wsrep_()
    , services_enabled_()
{
    wsrep_gtid_t state_id;
    bool encryption_enabled = server_state.encryption_service() &&
                              server_state.encryption_service()->encryption_enabled();
    std::memcpy(state_id.uuid.data,
                server_state.initial_position().id().data(),
                sizeof(state_id.uuid.data));
    state_id.seqno = server_state.initial_position().seqno().get();
    struct wsrep_init_args init_args;
    memset(&init_args, 0, sizeof(init_args));
    init_args.app_ctx = &server_state;
    init_args.node_name = server_state_.name().c_str();
    init_args.node_address = server_state_.address().c_str();
    init_args.node_incoming = server_state_.incoming_address().c_str();
    init_args.data_dir = server_state_.working_dir().c_str();
    init_args.options = provider_options.c_str();
    init_args.proto_ver = server_state.max_protocol_version();
    init_args.state_id = &state_id;
    init_args.state = 0;
    init_args.logger_cb = &logger_cb;
    init_args.connected_cb = &connected_cb;
    init_args.view_cb = &view_cb;
    init_args.sst_request_cb = &sst_request_cb;
    init_args.encrypt_cb = encryption_enabled ? encrypt_cb : NULL;
    init_args.apply_cb = &apply_cb;
    init_args.unordered_cb = 0;
    init_args.sst_donate_cb = &sst_donate_cb;
    init_args.synced_cb = &synced_cb;

    if (wsrep_load(provider_spec.c_str(), &wsrep_, logger_cb))
    {
        throw wsrep::runtime_error("Failed to load wsrep library");
    }

    init_services(services);

    if (wsrep_->init(wsrep_, &init_args) != WSREP_OK)
    {
        throw wsrep::runtime_error("Failed to initialize wsrep provider");
    }

    if (encryption_enabled)
    {
        const std::vector<unsigned char>& key = server_state.get_encryption_key();
        if (key.size())
        {
            wsrep::const_buffer const_key(key.data(), key.size());
            enum status const retval(enc_set_key(const_key));
            if (retval != success)
            {
                std::string msg("Failed to set encryption key: ");
                msg += to_string(retval);
                throw wsrep::runtime_error(msg);
            }
        }
    }
}

wsrep::wsrep_provider_v26::~wsrep_provider_v26()
{
    wsrep_->free(wsrep_);
    deinit_services();
    wsrep_unload(wsrep_);
}

enum wsrep::provider::status wsrep::wsrep_provider_v26::connect(
    const std::string& cluster_name,
    const std::string& cluster_url,
    const std::string& state_donor,
    bool bootstrap)
{
    return map_return_value(wsrep_->connect(wsrep_,
                                            cluster_name.c_str(),
                                            cluster_url.c_str(),
                                            state_donor.c_str(),
                                            bootstrap));
}

int wsrep::wsrep_provider_v26::disconnect()
{
    int ret(0);
    wsrep_status_t wret;
    if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK)
    {
        std::cerr << "Failed to disconnect from cluster: "
                  << wret << "\n";
        ret = 1;
    }
    return ret;
}

int wsrep::wsrep_provider_v26::capabilities() const
{
    return map_capabilities_from_native(wsrep_->capabilities(wsrep_));
}
int wsrep::wsrep_provider_v26::desync()
{
    return (wsrep_->desync(wsrep_) != WSREP_OK);
}

int wsrep::wsrep_provider_v26::resync()
{
    return (wsrep_->resync(wsrep_) != WSREP_OK);
}

wsrep::seqno wsrep::wsrep_provider_v26::pause()
{
    return wsrep::seqno(wsrep_->pause(wsrep_));
}

int wsrep::wsrep_provider_v26::resume()
{
    return (wsrep_->resume(wsrep_) != WSREP_OK);
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::run_applier(
    wsrep::high_priority_service *applier_ctx)
{
    return map_return_value(wsrep_->recv(wsrep_, applier_ctx));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::assign_read_view(wsrep::ws_handle& ws_handle,
                                            const wsrep::gtid* gtid)
{
    const wsrep_gtid_t* gtid_ptr(NULL);
    wsrep_gtid_t tmp;

    if (gtid)
    {
        ::memcpy(&tmp.uuid, gtid->id().data(), sizeof(tmp.uuid));
        tmp.seqno = gtid->seqno().get();
        gtid_ptr = &tmp;
    }

    mutable_ws_handle mwsh(ws_handle);
    return map_return_value(wsrep_->assign_read_view(wsrep_, mwsh.native(),
                                                     gtid_ptr));
}

int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle,
                                          const wsrep::key& key)
{
    if (key.size() > 3)
    {
        assert(0);
        return 1;
    }
    wsrep_buf_t key_parts[3];
    for (size_t i(0); i < key.size(); ++i)
    {
        key_parts[i].ptr = key.key_parts()[i].ptr();
        key_parts[i].len = key.key_parts()[i].size();
    }
    wsrep_key_t wsrep_key = {key_parts, key.size()};
    mutable_ws_handle mwsh(ws_handle);
    return (wsrep_->append_key(
                wsrep_, mwsh.native(),
                &wsrep_key, 1, map_key_type(key.type()), true)
            != WSREP_OK);
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
                                       const wsrep::const_buffer& data)
{
    const wsrep_buf_t wsrep_buf = {data.data(), data.size()};
    mutable_ws_handle mwsh(ws_handle);
    return map_return_value(
        wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf,
                            1, WSREP_DATA_ORDERED, true));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
                                   wsrep::ws_handle& ws_handle,
                                   int flags,
                                   wsrep::ws_meta& ws_meta)
{
    mutable_ws_handle mwsh(ws_handle);
    mutable_ws_meta mmeta(ws_meta, flags);
    return map_return_value(
        wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
                        mmeta.native_flags(),
                        mmeta.native()));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::bf_abort(
    wsrep::seqno bf_seqno,
    wsrep::transaction_id victim_id,
    wsrep::seqno& victim_seqno)
{
    wsrep_seqno_t wsrep_victim_seqno;
    wsrep_status_t ret(
        wsrep_->abort_certification(
            wsrep_, seqno_to_native(bf_seqno),
            victim_id.get(), &wsrep_victim_seqno));
    victim_seqno = seqno_from_native(wsrep_victim_seqno);
    return map_return_value(ret);
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id)
{
    return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::commit_order_enter(
    const wsrep::ws_handle& ws_handle,
    const wsrep::ws_meta& ws_meta)
{
    const_ws_handle cwsh(ws_handle);
    const_ws_meta cwsm(ws_meta);
    return map_return_value(
        wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native()));
}

int
wsrep::wsrep_provider_v26::commit_order_leave(
    const wsrep::ws_handle& ws_handle,
    const wsrep::ws_meta& ws_meta,
    const wsrep::mutable_buffer& err)
{
    const_ws_handle cwsh(ws_handle);
    const_ws_meta cwsm(ws_meta);
    wsrep_buf_t const err_buf = { err.data(), err.size() };
    int ret(wsrep_->commit_order_leave(
         wsrep_, cwsh.native(), cwsm.native(), &err_buf) != WSREP_OK);
    return ret;
}

int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle)
{
    mutable_ws_handle mwsh(ws_handle);
    return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK);
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle,
                                  wsrep::high_priority_service* reply_service)
{
    const_ws_handle mwsh(ws_handle);
    return map_return_value(
        wsrep_->replay_trx(wsrep_, mwsh.native(), reply_service));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::enter_toi(
    wsrep::client_id client_id,
    const wsrep::key_array& keys,
    const wsrep::const_buffer& buffer,
    wsrep::ws_meta& ws_meta,
    int flags)
{
    mutable_ws_meta mmeta(ws_meta, flags);
    std::vector<std::vector<wsrep_buf_t> > key_parts;
    std::vector<wsrep_key_t> wsrep_keys;
    wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()};
    for (size_t i(0); i < keys.size(); ++i)
    {
        key_parts.push_back(std::vector<wsrep_buf_t>());
        for (size_t kp(0); kp < keys[i].size(); ++kp)
        {
            wsrep_buf_t buf = {keys[i].key_parts()[kp].data(),
                               keys[i].key_parts()[kp].size()};
            key_parts[i].push_back(buf);
        }
    }
    for (size_t i(0); i < key_parts.size(); ++i)
    {
        wsrep_key_t key = {key_parts[i].data(), key_parts[i].size()};
        wsrep_keys.push_back(key);
    }
    return map_return_value(wsrep_->to_execute_start(
                                wsrep_,
                                client_id.get(),
                                wsrep_keys.data(),
                                wsrep_keys.size(),
                                &wsrep_buf,
                                buffer.size() ? 1 : 0,
                                mmeta.native_flags(),
                                mmeta.native()));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id,
                                     const wsrep::mutable_buffer& err)
{
    const wsrep_buf_t err_buf = { err.data(), err.size() };
    return map_return_value(wsrep_->to_execute_end(
                                wsrep_, client_id.get(), &err_buf));
}

std::pair<wsrep::gtid, enum wsrep::provider::status>
wsrep::wsrep_provider_v26::causal_read(int timeout) const
{
    wsrep_gtid_t wsrep_gtid;
    wsrep_status_t ret(wsrep_->sync_wait(wsrep_, 0, timeout, &wsrep_gtid));
    wsrep::gtid gtid(ret == WSREP_OK ?
                     wsrep::gtid(wsrep::id(wsrep_gtid.uuid.data,
                                           sizeof(wsrep_gtid.uuid.data)),
                                 wsrep::seqno(wsrep_gtid.seqno)) :
                     wsrep::gtid::undefined());
    return std::make_pair(gtid, map_return_value(ret));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
    const
{
    wsrep_gtid_t wsrep_gtid;
    std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
                sizeof(wsrep_gtid.uuid.data));
    wsrep_gtid.seqno = gtid.seqno().get();
    return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0));
}

wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const
{
    wsrep_gtid_t wsrep_gtid;
    if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK)
    {
        throw wsrep::runtime_error("Failed to read last committed id");
    }
    return wsrep::gtid(
        wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)),
        wsrep::seqno(wsrep_gtid.seqno));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
{
    wsrep_gtid_t wsrep_gtid;
    std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
                sizeof(wsrep_gtid.uuid.data));
    wsrep_gtid.seqno = gtid.seqno().get();
    return map_return_value(wsrep_->sst_sent(wsrep_, &wsrep_gtid, err));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::sst_received(const wsrep::gtid& gtid, int err)
{
    wsrep_gtid_t wsrep_gtid;
    std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
                sizeof(wsrep_gtid.uuid.data));
    wsrep_gtid.seqno = gtid.seqno().get();
    return map_return_value(wsrep_->sst_received(wsrep_, &wsrep_gtid, 0, err));
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::enc_set_key(const wsrep::const_buffer& key)
{
    wsrep_enc_key_t enc_key = {key.data(), key.size()};
    return map_return_value(wsrep_->enc_set_key(wsrep_, &enc_key));
}

std::vector<wsrep::provider::status_variable>
wsrep::wsrep_provider_v26::status() const
{
    std::vector<status_variable> ret;
    wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_));
    wsrep_stats_var* i(stats);
    if (i)
    {
        while (i->name)
        {
            switch (i->type)
            {
            case WSREP_VAR_STRING:
                ret.push_back(status_variable(i->name, i->value._string));
                break;
            case WSREP_VAR_INT64:
            {
                std::ostringstream os;
                os << i->value._int64;
                ret.push_back(status_variable(i->name, os.str()));
                break;
            }
            case WSREP_VAR_DOUBLE:
            {
                std::ostringstream os;
                os << i->value._double;
                ret.push_back(status_variable(i->name, os.str()));
                break;
            }
            default:
                assert(0);
                break;
            }
            ++i;
        }
        wsrep_->stats_free(wsrep_, stats);
    }
    return ret;
}

void wsrep::wsrep_provider_v26::reset_status()
{
    wsrep_->stats_reset(wsrep_);
}

std::string wsrep::wsrep_provider_v26::options() const
{
    std::string ret;
    char* opts;
    if ((opts = wsrep_->options_get(wsrep_)))
    {
        ret = opts;
        free(opts);
    }
    else
    {
        throw wsrep::runtime_error("Failed to get provider options");
    }
    return ret;
}

enum wsrep::provider::status
wsrep::wsrep_provider_v26::options(const std::string& opts)
{
    return map_return_value(wsrep_->options_set(wsrep_, opts.c_str()));
}

std::string wsrep::wsrep_provider_v26::name() const
{
    return (wsrep_->provider_name ? wsrep_->provider_name : "unknown");
}

std::string wsrep::wsrep_provider_v26::version() const
{
    return (wsrep_->provider_version ? wsrep_->provider_version : "unknown");
}

std::string wsrep::wsrep_provider_v26::vendor() const
{
    return (wsrep_->provider_vendor ? wsrep_->provider_vendor : "unknown");
}

void* wsrep::wsrep_provider_v26::native() const
{
    return wsrep_;
}