summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/dbsim
diff options
context:
space:
mode:
Diffstat (limited to 'wsrep-lib/dbsim')
-rw-r--r--wsrep-lib/dbsim/CMakeLists.txt21
-rw-r--r--wsrep-lib/dbsim/db_client.cpp195
-rw-r--r--wsrep-lib/dbsim/db_client.hpp89
-rw-r--r--wsrep-lib/dbsim/db_client_service.cpp54
-rw-r--r--wsrep-lib/dbsim/db_client_service.hpp98
-rw-r--r--wsrep-lib/dbsim/db_client_state.hpp52
-rw-r--r--wsrep-lib/dbsim/db_high_priority_service.cpp137
-rw-r--r--wsrep-lib/dbsim/db_high_priority_service.hpp89
-rw-r--r--wsrep-lib/dbsim/db_params.cpp121
-rw-r--r--wsrep-lib/dbsim/db_params.hpp69
-rw-r--r--wsrep-lib/dbsim/db_server.cpp131
-rw-r--r--wsrep-lib/dbsim/db_server.hpp78
-rw-r--r--wsrep-lib/dbsim/db_server_service.cpp169
-rw-r--r--wsrep-lib/dbsim/db_server_service.hpp66
-rw-r--r--wsrep-lib/dbsim/db_server_state.cpp24
-rw-r--r--wsrep-lib/dbsim/db_server_state.hpp60
-rw-r--r--wsrep-lib/dbsim/db_simulator.cpp254
-rw-r--r--wsrep-lib/dbsim/db_simulator.hpp81
-rw-r--r--wsrep-lib/dbsim/db_storage_engine.cpp119
-rw-r--r--wsrep-lib/dbsim/db_storage_engine.hpp91
-rw-r--r--wsrep-lib/dbsim/db_storage_service.hpp54
-rw-r--r--wsrep-lib/dbsim/db_threads.cpp727
-rw-r--r--wsrep-lib/dbsim/db_threads.hpp84
-rw-r--r--wsrep-lib/dbsim/db_tls.cpp451
-rw-r--r--wsrep-lib/dbsim/db_tls.hpp62
-rw-r--r--wsrep-lib/dbsim/dbsim.cpp35
26 files changed, 3411 insertions, 0 deletions
diff --git a/wsrep-lib/dbsim/CMakeLists.txt b/wsrep-lib/dbsim/CMakeLists.txt
new file mode 100644
index 00000000..aee964cd
--- /dev/null
+++ b/wsrep-lib/dbsim/CMakeLists.txt
@@ -0,0 +1,21 @@
+#
+# Copyright (C) 2018 Codership Oy <info@codership.com>
+#
+
+add_executable(dbsim
+ db_client.cpp
+ db_client_service.cpp
+ db_high_priority_service.cpp
+ db_params.cpp
+ db_server.cpp
+ db_server_service.cpp
+ db_server_state.cpp
+ db_simulator.cpp
+ db_storage_engine.cpp
+ db_threads.cpp
+ db_tls.cpp
+ dbsim.cpp
+)
+
+target_link_libraries(dbsim wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY})
+set_property(TARGET dbsim PROPERTY CXX_STANDARD 14)
diff --git a/wsrep-lib/dbsim/db_client.cpp b/wsrep-lib/dbsim/db_client.cpp
new file mode 100644
index 00000000..b5d56d37
--- /dev/null
+++ b/wsrep-lib/dbsim/db_client.cpp
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_client.hpp"
+#include "db_server.hpp"
+
+#include "wsrep/logger.hpp"
+
+db::client::client(db::server& server,
+ wsrep::client_id client_id,
+ enum wsrep::client_state::mode mode,
+ const db::params& params)
+ : mutex_()
+ , cond_()
+ , params_(params)
+ , server_(server)
+ , server_state_(server.server_state())
+ , client_state_(mutex_, cond_, server_state_, client_service_, client_id, mode)
+ , client_service_(*this)
+ , se_trx_(server.storage_engine())
+ , data_()
+ , random_device_()
+ , random_engine_(random_device_())
+ , stats_()
+{
+ data_.resize(params.max_data_size);
+}
+
+void db::client::start()
+{
+ client_state_.open(client_state_.id());
+ for (size_t i(0); i < params_.n_transactions; ++i)
+ {
+ run_one_transaction();
+ report_progress(i + 1);
+ }
+ client_state_.close();
+ client_state_.cleanup();
+}
+
+bool db::client::bf_abort(wsrep::seqno seqno)
+{
+ return client_state_.bf_abort(seqno);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Private //
+////////////////////////////////////////////////////////////////////////////////
+
+template <class F>
+int db::client::client_command(F f)
+{
+ int err(client_state_.before_command());
+ // wsrep::log_debug() << "before_command: " << err;
+ // If err != 0, transaction was BF aborted while client idle
+ if (err == 0)
+ {
+ err = client_state_.before_statement();
+ if (err == 0)
+ {
+ err = f();
+ }
+ client_state_.after_statement();
+ }
+ client_state_.after_command_before_result();
+ if (client_state_.current_error())
+ {
+ // wsrep::log_info() << "Current error";
+ assert(client_state_.transaction().state() ==
+ wsrep::transaction::s_aborted);
+ err = 1;
+ }
+ client_state_.after_command_after_result();
+ // wsrep::log_info() << "client_command(): " << err;
+ return err;
+}
+
+void db::client::run_one_transaction()
+{
+ if (params_.sync_wait)
+ {
+ if (client_state_.sync_wait(5))
+ {
+ throw wsrep::runtime_error("Sync wait failed");
+ }
+ }
+ client_state_.reset_error();
+ int err = client_command(
+ [&]()
+ {
+ // wsrep::log_debug() << "Start transaction";
+ err = client_state_.start_transaction(
+ wsrep::transaction_id(server_.next_transaction_id()));
+ assert(err == 0);
+ se_trx_.start(this);
+ return err;
+ });
+
+ const wsrep::transaction& transaction(
+ client_state_.transaction());
+
+ err = err || client_command(
+ [&]()
+ {
+ // wsrep::log_debug() << "Generate write set";
+ assert(transaction.active());
+ assert(err == 0);
+ std::uniform_int_distribution<size_t> uniform_dist(0, params_.n_rows);
+ const size_t randkey(uniform_dist(random_engine_));
+ ::memcpy(data_.data(), &randkey,
+ std::min(sizeof(randkey), data_.size()));
+ wsrep::key key(wsrep::key::exclusive);
+ key.append_key_part("dbms", 4);
+ unsigned long long client_key(client_state_.id().get());
+ key.append_key_part(&client_key, sizeof(client_key));
+ key.append_key_part(&randkey, sizeof(randkey));
+ err = client_state_.append_key(key);
+ size_t bytes_to_append(data_.size());
+ if (params_.random_data_size)
+ {
+ bytes_to_append = std::uniform_int_distribution<size_t>(
+ 1, data_.size())(random_engine_);
+ }
+ err = err || client_state_.append_data(
+ wsrep::const_buffer(data_.data(), bytes_to_append));
+ return err;
+ });
+
+ err = err || client_command(
+ [&]()
+ {
+ // wsrep::log_debug() << "Commit";
+ assert(err == 0);
+ if (do_2pc())
+ {
+ err = err || client_state_.before_prepare();
+ err = err || client_state_.after_prepare();
+ }
+ err = err || client_state_.before_commit();
+ if (err == 0) se_trx_.commit(transaction.ws_meta().gtid());
+ err = err || client_state_.ordered_commit();
+ err = err || client_state_.after_commit();
+ if (err)
+ {
+ client_state_.before_rollback();
+ se_trx_.rollback();
+ client_state_.after_rollback();
+ }
+ return err;
+ });
+
+ assert(err ||
+ transaction.state() == wsrep::transaction::s_aborted ||
+ transaction.state() == wsrep::transaction::s_committed);
+ assert(se_trx_.active() == false);
+ assert(transaction.active() == false);
+
+ switch (transaction.state())
+ {
+ case wsrep::transaction::s_committed:
+ ++stats_.commits;
+ break;
+ case wsrep::transaction::s_aborted:
+ ++stats_.rollbacks;
+ break;
+ default:
+ assert(0);
+ }
+}
+
+void db::client::report_progress(size_t i) const
+{
+ if ((i % 1000) == 0)
+ {
+ wsrep::log_info() << "client: " << client_state_.id().get()
+ << " transactions: " << i
+ << " " << 100*double(i)/double(params_.n_transactions) << "%";
+ }
+}
diff --git a/wsrep-lib/dbsim/db_client.hpp b/wsrep-lib/dbsim/db_client.hpp
new file mode 100644
index 00000000..5536a449
--- /dev/null
+++ b/wsrep-lib/dbsim/db_client.hpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+/** @file db_client.hpp */
+
+#ifndef WSREP_DB_CLIENT_HPP
+#define WSREP_DB_CLIENT_HPP
+
+#include "db_server_state.hpp"
+#include "db_storage_engine.hpp"
+#include "db_client_state.hpp"
+#include "db_client_service.hpp"
+#include "db_high_priority_service.hpp"
+
+#include <random>
+
+namespace db
+{
+ class server;
+ class client
+ {
+ public:
+ struct stats
+ {
+ long long commits;
+ long long rollbacks;
+ long long replays;
+ stats()
+ : commits(0)
+ , rollbacks(0)
+ , replays(0)
+ { }
+ };
+ client(db::server&,
+ wsrep::client_id,
+ enum wsrep::client_state::mode,
+ const db::params&);
+ bool bf_abort(wsrep::seqno);
+ const struct stats stats() const { return stats_; }
+ void store_globals()
+ {
+ client_state_.store_globals();
+ }
+ void reset_globals()
+ { }
+ void start();
+ wsrep::client_state& client_state() { return client_state_; }
+ wsrep::client_service& client_service() { return client_service_; }
+ bool do_2pc() const { return false; }
+ private:
+ friend class db::server_state;
+ friend class db::client_service;
+ friend class db::high_priority_service;
+ template <class F> int client_command(F f);
+ void run_one_transaction();
+ void reset_error();
+ void report_progress(size_t) const;
+ wsrep::default_mutex mutex_;
+ wsrep::default_condition_variable cond_;
+ const db::params& params_;
+ db::server& server_;
+ db::server_state& server_state_;
+ db::client_state client_state_;
+ db::client_service client_service_;
+ db::storage_engine::transaction se_trx_;
+ wsrep::mutable_buffer data_;
+ std::random_device random_device_;
+ std::default_random_engine random_engine_;
+ struct stats stats_;
+ };
+}
+
+#endif // WSREP_DB_CLIENT_HPP
diff --git a/wsrep-lib/dbsim/db_client_service.cpp b/wsrep-lib/dbsim/db_client_service.cpp
new file mode 100644
index 00000000..edf7df8f
--- /dev/null
+++ b/wsrep-lib/dbsim/db_client_service.cpp
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_client_service.hpp"
+#include "db_high_priority_service.hpp"
+#include "db_client.hpp"
+
+db::client_service::client_service(db::client& client)
+ : wsrep::client_service()
+ , client_(client)
+ , client_state_(client_.client_state())
+{ }
+
+int db::client_service::bf_rollback()
+{
+ int ret(client_state_.before_rollback());
+ assert(ret == 0);
+ client_.se_trx_.rollback();
+ ret = client_state_.after_rollback();
+ assert(ret == 0);
+ return ret;
+}
+
+enum wsrep::provider::status
+db::client_service::replay()
+{
+ wsrep::high_priority_context high_priority_context(client_state_);
+ db::replayer_service replayer_service(
+ client_.server_, client_);
+ auto ret(client_state_.provider().replay(
+ client_state_.transaction().ws_handle(),
+ &replayer_service));
+ if (ret == wsrep::provider::success)
+ {
+ ++client_.stats_.replays;
+ }
+ return ret;
+}
diff --git a/wsrep-lib/dbsim/db_client_service.hpp b/wsrep-lib/dbsim/db_client_service.hpp
new file mode 100644
index 00000000..be6f9ad8
--- /dev/null
+++ b/wsrep-lib/dbsim/db_client_service.hpp
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_CLIENT_SERVICE_HPP
+#define WSREP_DB_CLIENT_SERVICE_HPP
+
+#include "wsrep/client_service.hpp"
+#include "wsrep/transaction.hpp"
+
+namespace db
+{
+ class client;
+ class client_state;
+
+ class client_service : public wsrep::client_service
+ {
+ public:
+ client_service(db::client& client);
+
+ bool interrupted(wsrep::unique_lock<wsrep::mutex>&)
+ const override
+ { return false; }
+ void reset_globals() override { }
+ void store_globals() override { }
+ int prepare_data_for_replication() override
+ {
+ return 0;
+ }
+ void cleanup_transaction() override { }
+ size_t bytes_generated() const override
+ {
+ return 0;
+ }
+ bool statement_allowed_for_streaming() const override
+ {
+ return true;
+ }
+ int prepare_fragment_for_replication(wsrep::mutable_buffer&,
+ size_t& position) override
+ {
+ position = 0;
+ return 0;
+ }
+ int remove_fragments() override { return 0; }
+ int bf_rollback() override;
+ void will_replay() override { }
+ void signal_replayed() override { }
+ void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override { }
+ enum wsrep::provider::status replay()
+ override;
+
+ enum wsrep::provider::status replay_unordered() override
+ {
+ return wsrep::provider::success;
+ }
+
+ void emergency_shutdown() override { ::abort(); }
+
+ enum wsrep::provider::status commit_by_xid() override
+ {
+ return wsrep::provider::success;
+ }
+
+ bool is_explicit_xa() override
+ {
+ return false;
+ }
+
+ bool is_xa_rollback() override
+ {
+ return false;
+ }
+
+ void debug_sync(const char*) override { }
+ void debug_crash(const char*) override { }
+ private:
+ db::client& client_;
+ wsrep::client_state& client_state_;
+ };
+}
+
+#endif // WSREP_DB_CLIENT_SERVICE_HPP
diff --git a/wsrep-lib/dbsim/db_client_state.hpp b/wsrep-lib/dbsim/db_client_state.hpp
new file mode 100644
index 00000000..a5cf71b2
--- /dev/null
+++ b/wsrep-lib/dbsim/db_client_state.hpp
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_CLIENT_CONTEXT_HPP
+#define WSREP_DB_CLIENT_CONTEXT_HPP
+
+#include "wsrep/client_state.hpp"
+#include "db_server_state.hpp"
+
+namespace db
+{
+ class client;
+ class client_state : public wsrep::client_state
+ {
+ public:
+ client_state(wsrep::mutex& mutex,
+ wsrep::condition_variable& cond,
+ db::server_state& server_state,
+ wsrep::client_service& client_service,
+ const wsrep::client_id& client_id,
+ enum wsrep::client_state::mode mode)
+ : wsrep::client_state(mutex,
+ cond,
+ server_state,
+ client_service,
+ client_id,
+ mode)
+ { }
+
+ private:
+ client_state(const client_state&);
+ client_state& operator=(const client_state&);
+ };
+}
+
+#endif // WSREP_DB_CLIENT_CONTEXT
diff --git a/wsrep-lib/dbsim/db_high_priority_service.cpp b/wsrep-lib/dbsim/db_high_priority_service.cpp
new file mode 100644
index 00000000..669fe502
--- /dev/null
+++ b/wsrep-lib/dbsim/db_high_priority_service.cpp
@@ -0,0 +1,137 @@
+/*
+ * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_high_priority_service.hpp"
+#include "db_server.hpp"
+#include "db_client.hpp"
+
+db::high_priority_service::high_priority_service(
+ db::server& server, db::client& client)
+ : wsrep::high_priority_service(server.server_state())
+ , server_(server)
+ , client_(client)
+{ }
+
+int db::high_priority_service::start_transaction(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta)
+{
+ return client_.client_state().start_transaction(ws_handle, ws_meta);
+}
+
+int db::high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta)
+{
+ return client_.client_state().next_fragment(ws_meta);
+}
+
+const wsrep::transaction& db::high_priority_service::transaction() const
+{
+ return client_.client_state().transaction();
+}
+
+int db::high_priority_service::adopt_transaction(const wsrep::transaction&)
+{
+ throw wsrep::not_implemented_error();
+}
+
+int db::high_priority_service::apply_write_set(
+ const wsrep::ws_meta&,
+ const wsrep::const_buffer&,
+ wsrep::mutable_buffer&)
+{
+ client_.se_trx_.start(&client_);
+ client_.se_trx_.apply(client_.client_state().transaction());
+ return 0;
+}
+
+int db::high_priority_service::apply_toi(
+ const wsrep::ws_meta&,
+ const wsrep::const_buffer&,
+ wsrep::mutable_buffer&)
+{
+ throw wsrep::not_implemented_error();
+}
+
+int db::high_priority_service::apply_nbo_begin(
+ const wsrep::ws_meta&,
+ const wsrep::const_buffer&,
+ wsrep::mutable_buffer&)
+{
+ throw wsrep::not_implemented_error();
+}
+
+int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta)
+{
+ client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
+ int ret(client_.client_state_.before_commit());
+ if (ret == 0) client_.se_trx_.commit(ws_meta.gtid());
+ ret = ret || client_.client_state_.ordered_commit();
+ ret = ret || client_.client_state_.after_commit();
+ return ret;
+}
+
+int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta)
+{
+ client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false);
+ int ret(client_.client_state_.before_rollback());
+ assert(ret == 0);
+ client_.se_trx_.rollback();
+ ret = client_.client_state_.after_rollback();
+ assert(ret == 0);
+ return ret;
+}
+
+void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err)
+{
+ client_.client_state_.adopt_apply_error(err);
+}
+
+void db::high_priority_service::after_apply()
+{
+ client_.client_state_.after_applying();
+}
+
+int db::high_priority_service::log_dummy_write_set(
+ const wsrep::ws_handle& ws_handle,
+ const wsrep::ws_meta& ws_meta,
+ wsrep::mutable_buffer& err)
+{
+ int ret(client_.client_state_.start_transaction(ws_handle, ws_meta));
+ assert(ret == 0);
+ if (ws_meta.ordered())
+ {
+ client_.client_state_.adopt_apply_error(err);
+ client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
+ ret = client_.client_state_.before_commit();
+ assert(ret == 0);
+ ret = client_.client_state_.ordered_commit();
+ assert(ret == 0);
+ ret = client_.client_state_.after_commit();
+ assert(ret == 0);
+ }
+ client_.client_state_.after_applying();
+ return ret;
+}
+
+bool db::high_priority_service::is_replaying() const
+{
+ return false;
+}
diff --git a/wsrep-lib/dbsim/db_high_priority_service.hpp b/wsrep-lib/dbsim/db_high_priority_service.hpp
new file mode 100644
index 00000000..d4a80f1b
--- /dev/null
+++ b/wsrep-lib/dbsim/db_high_priority_service.hpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_HIGH_PRIORITY_SERVICE_HPP
+#define WSREP_DB_HIGH_PRIORITY_SERVICE_HPP
+
+#include "wsrep/high_priority_service.hpp"
+
+namespace db
+{
+ class server;
+ class client;
+ class high_priority_service : public wsrep::high_priority_service
+ {
+ public:
+ high_priority_service(db::server& server, db::client& client);
+ int start_transaction(const wsrep::ws_handle&,
+ const wsrep::ws_meta&) override;
+ int next_fragment(const wsrep::ws_meta&) override;
+ const wsrep::transaction& transaction() const override;
+ int adopt_transaction(const wsrep::transaction&) override;
+ int apply_write_set(const wsrep::ws_meta&,
+ const wsrep::const_buffer&,
+ wsrep::mutable_buffer&) override;
+ int append_fragment_and_commit(
+ const wsrep::ws_handle&,
+ const wsrep::ws_meta&,
+ const wsrep::const_buffer&,
+ const wsrep::xid&) override
+ { return 0; }
+ int remove_fragments(const wsrep::ws_meta&) override
+ { return 0; }
+ int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
+ int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
+ int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&,
+ wsrep::mutable_buffer&) override;
+ int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&,
+ wsrep::mutable_buffer&)
+ override;
+ void adopt_apply_error(wsrep::mutable_buffer&) override;
+ virtual void after_apply() override;
+ void store_globals() override { }
+ void reset_globals() override { }
+ void switch_execution_context(wsrep::high_priority_service&) override
+ { }
+ int log_dummy_write_set(const wsrep::ws_handle&,
+ const wsrep::ws_meta&,
+ wsrep::mutable_buffer&) override;
+ virtual bool is_replaying() const override;
+ void debug_crash(const char*) override { }
+ private:
+ high_priority_service(const high_priority_service&);
+ high_priority_service& operator=(const high_priority_service&);
+ db::server& server_;
+ db::client& client_;
+ };
+
+ class replayer_service : public db::high_priority_service
+ {
+ public:
+ replayer_service(db::server& server, db::client& client)
+ : db::high_priority_service(server, client)
+ { }
+ // After apply is empty for replayer to keep the transaction
+ // context available for the client session after replaying
+ // is over.
+ void after_apply() override {}
+ bool is_replaying() const override { return true; }
+ };
+
+}
+
+#endif // WSREP_DB_HIGH_PRIORITY_SERVICE_HPP
diff --git a/wsrep-lib/dbsim/db_params.cpp b/wsrep-lib/dbsim/db_params.cpp
new file mode 100644
index 00000000..4fbefd01
--- /dev/null
+++ b/wsrep-lib/dbsim/db_params.cpp
@@ -0,0 +1,121 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_params.hpp"
+
+#include <boost/program_options.hpp>
+#include <iostream>
+#include <stdexcept>
+
+namespace
+{
+ void validate_params(const db::params& params)
+ {
+ std::ostringstream os;
+ if (params.n_servers != params.topology.size())
+ {
+ if (params.topology.size() > 0)
+ {
+ os << "Error: --topology=" << params.topology << " does not "
+ << "match the number of server --servers="
+ << params.n_servers << "\n";
+ }
+ }
+ if (os.str().size())
+ {
+ throw std::invalid_argument(os.str());
+ }
+ }
+}
+
+db::params db::parse_args(int argc, char** argv)
+{
+ namespace po = boost::program_options;
+ db::params params;
+ po::options_description desc("Allowed options");
+ desc.add_options()
+ ("help", "produce help message")
+ ("wsrep-provider",
+ po::value<std::string>(&params.wsrep_provider)->required(),
+ "wsrep provider to load")
+ ("wsrep-provider-options",
+ po::value<std::string>(&params.wsrep_provider_options),
+ "wsrep provider options")
+ ("servers", po::value<size_t>(&params.n_servers)->required(),
+ "number of servers to start")
+ ("topology", po::value<std::string>(&params.topology),
+ "replication topology (e.g. mm for multi master, ms for master/slave")
+ ("clients", po::value<size_t>(&params.n_clients)->required(),
+ "number of clients to start per master")
+ ("transactions", po::value<size_t>(&params.n_transactions),
+ "number of transactions run by a client")
+ ("rows", po::value<size_t>(&params.n_rows),
+ "number of rows per table")
+ ("max-data-size", po::value<size_t>(&params.max_data_size),
+ "maximum size of data payload (default 8)")
+ ("random-data-size", po::value<bool>(&params.random_data_size),
+ "randomized payload data size (default 0)")
+ ("alg-freq", po::value<size_t>(&params.alg_freq),
+ "ALG frequency")
+ ("sync-wait", po::value<bool>(&params.sync_wait),
+ "Turn on sync wait for each transaction")
+ ("debug-log-level", po::value<int>(&params.debug_log_level),
+ "debug logging level: 0 - none, 1 - verbose")
+ ("fast-exit", po::value<int>(&params.fast_exit),
+ "exit from simulation without graceful shutdown")
+ ("ti",
+ po::value<int>(&params.thread_instrumentation),
+ "use instrumentation for threads/mutexes/condition variables"
+ "(0 default disabled, 1 total counts, 2 per object)")
+ ("ti-cond-checks",
+ po::value<bool>(&params.cond_checks),
+ "Enable checks for correct condition variable use. "
+ " Effective only if thread-instrumentation is enabled")
+ ("tls-service",
+ po::value<int>(&params.tls_service),
+ "Configure TLS service stubs.\n0 default disabled\n1 enabled\n"
+ "2 enabled with short read/write and renegotiation simulation\n"
+ "3 enabled with error simulation.")
+ ;
+ try
+ {
+ po::variables_map vm;
+ po::store(po::parse_command_line(argc, argv, desc), vm);
+ if (vm.count("help"))
+ {
+ std::cerr << desc << "\n";
+ exit(0);
+ }
+ po::notify(vm);
+ validate_params(params);
+ }
+ catch (const po::error& e)
+ {
+ std::cerr << "Error parsing arguments: " << e.what() << "\n";
+ std::cerr << desc << "\n";
+ exit(1);
+ }
+ catch (...)
+ {
+ std::cerr << "Error parsing arguments\n";
+ std::cerr << desc << "\n";
+ exit(1);
+ }
+ return params;
+}
diff --git a/wsrep-lib/dbsim/db_params.hpp b/wsrep-lib/dbsim/db_params.hpp
new file mode 100644
index 00000000..6443e130
--- /dev/null
+++ b/wsrep-lib/dbsim/db_params.hpp
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_PARAMS_HPP
+#define WSREP_DB_PARAMS_HPP
+
+#include <cstddef>
+#include <string>
+
+namespace db
+{
+ struct params
+ {
+ size_t n_servers;
+ size_t n_clients;
+ size_t n_transactions;
+ size_t n_rows;
+ size_t max_data_size; // Maximum size of write set data payload.
+ bool random_data_size; // If true, randomize data payload size.
+ size_t alg_freq;
+ bool sync_wait;
+ std::string topology;
+ std::string wsrep_provider;
+ std::string wsrep_provider_options;
+ int debug_log_level;
+ int fast_exit;
+ int thread_instrumentation;
+ bool cond_checks;
+ int tls_service;
+ params()
+ : n_servers(0)
+ , n_clients(0)
+ , n_transactions(0)
+ , n_rows(1000)
+ , max_data_size(8)
+ , random_data_size(false)
+ , alg_freq(0)
+ , sync_wait(false)
+ , topology()
+ , wsrep_provider()
+ , wsrep_provider_options()
+ , debug_log_level(0)
+ , fast_exit(0)
+ , thread_instrumentation()
+ , cond_checks()
+ , tls_service()
+ { }
+ };
+
+ params parse_args(int argc, char** argv);
+}
+
+#endif // WSREP_DB_PARAMS_HPP
diff --git a/wsrep-lib/dbsim/db_server.cpp b/wsrep-lib/dbsim/db_server.cpp
new file mode 100644
index 00000000..51ed2b4c
--- /dev/null
+++ b/wsrep-lib/dbsim/db_server.cpp
@@ -0,0 +1,131 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_server.hpp"
+#include "db_server_service.hpp"
+#include "db_high_priority_service.hpp"
+#include "db_client.hpp"
+#include "db_simulator.hpp"
+
+#include "wsrep/logger.hpp"
+
+db::server::server(simulator& simulator,
+ const std::string& name,
+ const std::string& address)
+ : simulator_(simulator)
+ , storage_engine_(simulator_.params())
+ , mutex_()
+ , cond_()
+ , server_service_(*this)
+ , server_state_(server_service_,
+ name, address, "dbsim_" + name + "_data")
+ , last_client_id_(0)
+ , last_transaction_id_(0)
+ , appliers_()
+ , clients_()
+ , client_threads_()
+{ }
+
+void db::server::applier_thread()
+{
+ wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
+ db::client applier(*this, client_id,
+ wsrep::client_state::m_high_priority,
+ simulator_.params());
+ wsrep::client_state* cc(static_cast<wsrep::client_state*>(
+ &applier.client_state()));
+ db::high_priority_service hps(*this, applier);
+ cc->open(cc->id());
+ cc->before_command();
+ enum wsrep::provider::status ret(
+ server_state_.provider().run_applier(&hps));
+ wsrep::log_info() << "Applier thread exited with error code " << ret;
+ cc->after_command_before_result();
+ cc->after_command_after_result();
+ cc->close();
+ cc->cleanup();
+}
+
+void db::server::start_applier()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ appliers_.push_back(boost::thread(&server::applier_thread, this));
+}
+
+void db::server::stop_applier()
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ appliers_.front().join();
+ appliers_.erase(appliers_.begin());
+}
+
+
+void db::server::start_clients()
+{
+ size_t n_clients(simulator_.params().n_clients);
+ for (size_t i(0); i < n_clients; ++i)
+ {
+ start_client(i + 1);
+ }
+}
+
+void db::server::stop_clients()
+{
+ for (auto& i : client_threads_)
+ {
+ i.join();
+ }
+ for (const auto& i : clients_)
+ {
+ const struct db::client::stats& stats(i->stats());
+ simulator_.stats_.commits += stats.commits;
+ simulator_.stats_.rollbacks += stats.rollbacks;
+ simulator_.stats_.replays += stats.replays;
+ }
+}
+
+void db::server::client_thread(const std::shared_ptr<db::client>& client)
+{
+ client->start();
+}
+
+void db::server::start_client(size_t id)
+{
+ auto client(std::make_shared<db::client>(
+ *this, wsrep::client_id(id),
+ wsrep::client_state::m_local,
+ simulator_.params()));
+ clients_.push_back(client);
+ client_threads_.push_back(
+ boost::thread(&db::server::client_thread, this, client));
+}
+
+void db::server::donate_sst(const std::string& req,
+ const wsrep::gtid& gtid,
+ bool bypass)
+{
+ simulator_.sst(*this, req, gtid, bypass);
+}
+
+
+wsrep::high_priority_service* db::server::streaming_applier_service()
+{
+ throw wsrep::not_implemented_error();
+}
+
diff --git a/wsrep-lib/dbsim/db_server.hpp b/wsrep-lib/dbsim/db_server.hpp
new file mode 100644
index 00000000..b64947cd
--- /dev/null
+++ b/wsrep-lib/dbsim/db_server.hpp
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_SERVER_HPP
+#define WSREP_DB_SERVER_HPP
+
+#include "wsrep/gtid.hpp"
+#include "wsrep/client_state.hpp"
+
+#include "db_storage_engine.hpp"
+#include "db_server_state.hpp"
+#include "db_server_service.hpp"
+
+#include <boost/thread.hpp>
+
+#include <string>
+#include <memory>
+
+namespace db
+{
+ class simulator;
+ class client;
+ class server
+ {
+ public:
+ server(simulator& simulator,
+ const std::string& name,
+ const std::string& address);
+ void applier_thread();
+ void start_applier();
+ void stop_applier();
+ void start_clients();
+ void stop_clients();
+ void client_thread(const std::shared_ptr<db::client>& client);
+ db::storage_engine& storage_engine() { return storage_engine_; }
+ db::server_state& server_state() { return server_state_; }
+ wsrep::transaction_id next_transaction_id()
+ {
+ return wsrep::transaction_id(last_transaction_id_.fetch_add(1) + 1);
+ }
+ void donate_sst(const std::string&, const wsrep::gtid&, bool);
+ wsrep::client_state* local_client_state();
+ void release_client_state(wsrep::client_state*);
+ wsrep::high_priority_service* streaming_applier_service();
+ private:
+ void start_client(size_t id);
+
+ db::simulator& simulator_;
+ db::storage_engine storage_engine_;
+ wsrep::default_mutex mutex_;
+ wsrep::default_condition_variable cond_;
+ db::server_service server_service_;
+ db::server_state server_state_;
+ std::atomic<size_t> last_client_id_;
+ std::atomic<size_t> last_transaction_id_;
+ std::vector<boost::thread> appliers_;
+ std::vector<std::shared_ptr<db::client>> clients_;
+ std::vector<boost::thread> client_threads_;
+ };
+};
+
+#endif // WSREP_DB_SERVER_HPP
diff --git a/wsrep-lib/dbsim/db_server_service.cpp b/wsrep-lib/dbsim/db_server_service.cpp
new file mode 100644
index 00000000..4fbc6667
--- /dev/null
+++ b/wsrep-lib/dbsim/db_server_service.cpp
@@ -0,0 +1,169 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_server_service.hpp"
+#include "db_server.hpp"
+#include "db_storage_service.hpp"
+
+#include "wsrep/logger.hpp"
+#include "wsrep/high_priority_service.hpp"
+
+db::server_service::server_service(db::server& server)
+ : server_(server)
+{ }
+
+wsrep::storage_service* db::server_service::storage_service(
+ wsrep::client_service&)
+{
+ return new db::storage_service();
+}
+
+wsrep::storage_service* db::server_service::storage_service(
+ wsrep::high_priority_service&)
+{
+ return new db::storage_service();
+}
+
+void db::server_service::release_storage_service(
+ wsrep::storage_service* storage_service)
+{
+ delete storage_service;
+}
+
+wsrep::high_priority_service* db::server_service::streaming_applier_service(
+ wsrep::client_service&)
+{
+ return server_.streaming_applier_service();
+}
+
+wsrep::high_priority_service* db::server_service::streaming_applier_service(
+ wsrep::high_priority_service&)
+{
+ return server_.streaming_applier_service();
+}
+
+void db::server_service::release_high_priority_service(
+ wsrep::high_priority_service *high_priority_service)
+{
+ delete high_priority_service;
+}
+
+bool db::server_service::sst_before_init() const
+{
+ return true;
+}
+
+std::string db::server_service::sst_request()
+{
+ std::ostringstream os;
+ os << server_.server_state().name();
+ wsrep::log_info() << "SST request: "
+ << server_.server_state().name();
+
+ return os.str();
+}
+
+int db::server_service::start_sst(
+ const std::string& request, const wsrep::gtid& gtid, bool bypass)
+{
+ server_.donate_sst(request, gtid, bypass);
+ return 0;
+}
+
+void db::server_service::background_rollback(wsrep::client_state&)
+{
+}
+
+void db::server_service::bootstrap()
+{
+}
+
+void db::server_service::log_message(enum wsrep::log::level level,
+ const char* message)
+{
+ wsrep::log(level, server_.server_state().name().c_str()) << message;
+}
+void db::server_service::log_dummy_write_set(
+ wsrep::client_state&, const wsrep::ws_meta& meta)
+{
+ wsrep::log_info() << "Dummy write set: " << meta.seqno();
+}
+
+void db::server_service::log_view(wsrep::high_priority_service*,
+ const wsrep::view& v)
+{
+ wsrep::log_info() << "View:\n" << v;
+ server_.storage_engine().store_view(v);
+}
+
+void db::server_service::recover_streaming_appliers(
+ wsrep::client_service&)
+{
+}
+
+void db::server_service::recover_streaming_appliers(
+ wsrep::high_priority_service&)
+{
+}
+
+wsrep::view db::server_service::get_view(wsrep::client_service&,
+ const wsrep::id& own_id)
+{
+ wsrep::view stored_view(server_.storage_engine().get_view());
+ int const my_idx(stored_view.member_index(own_id));
+ wsrep::view my_view(
+ stored_view.state_id(),
+ stored_view.view_seqno(),
+ stored_view.status(),
+ stored_view.capabilities(),
+ my_idx,
+ stored_view.protocol_version(),
+ stored_view.members()
+ );
+ return my_view;
+}
+
+wsrep::gtid db::server_service::get_position(wsrep::client_service&)
+{
+ return server_.storage_engine().get_position();
+}
+
+void db::server_service::set_position(wsrep::client_service&,
+ const wsrep::gtid& gtid)
+{
+ return server_.storage_engine().store_position(gtid);
+}
+
+void db::server_service::log_state_change(
+ enum wsrep::server_state::state prev_state,
+ enum wsrep::server_state::state current_state)
+{
+
+ wsrep::log_info() << "State changed "
+ << prev_state << " -> " << current_state;
+}
+int db::server_service::wait_committing_transactions(int)
+{
+ throw wsrep::not_implemented_error();
+}
+
+void db::server_service::debug_sync(const char*)
+{
+
+}
diff --git a/wsrep-lib/dbsim/db_server_service.hpp b/wsrep-lib/dbsim/db_server_service.hpp
new file mode 100644
index 00000000..e5cd70d2
--- /dev/null
+++ b/wsrep-lib/dbsim/db_server_service.hpp
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_SERVER_SERVICE_HPP
+#define WSREP_DB_SERVER_SERVICE_HPP
+
+#include "wsrep/server_service.hpp"
+#include <string>
+
+namespace db
+{
+ class server;
+ class server_service : public wsrep::server_service
+ {
+ public:
+ server_service(db::server& server);
+ wsrep::storage_service* storage_service(wsrep::client_service&) override;
+ wsrep::storage_service* storage_service(wsrep::high_priority_service&) override;
+
+ void release_storage_service(wsrep::storage_service*) override;
+ wsrep::high_priority_service* streaming_applier_service(wsrep::client_service&) override;
+ wsrep::high_priority_service* streaming_applier_service(wsrep::high_priority_service&) override;
+ void release_high_priority_service(wsrep::high_priority_service*) override;
+
+ bool sst_before_init() const override;
+ int start_sst(const std::string&, const wsrep::gtid&, bool) override;
+ std::string sst_request() override;
+ void background_rollback(wsrep::client_state&) override;
+ void bootstrap() override;
+ void log_message(enum wsrep::log::level, const char* message) override;
+ void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
+ override;
+ void log_view(wsrep::high_priority_service*,
+ const wsrep::view&) override;
+ void recover_streaming_appliers(wsrep::client_service&) override;
+ void recover_streaming_appliers(wsrep::high_priority_service&) override;
+ wsrep::view get_view(wsrep::client_service&, const wsrep::id&)
+ override;
+ wsrep::gtid get_position(wsrep::client_service&) override;
+ void set_position(wsrep::client_service&, const wsrep::gtid&) override;
+ void log_state_change(enum wsrep::server_state::state,
+ enum wsrep::server_state::state) override;
+ int wait_committing_transactions(int) override;
+ void debug_sync(const char*) override;
+ private:
+ db::server& server_;
+ };
+}
+
+#endif // WSREP_DB_SERVER_SERVICE_HPP
diff --git a/wsrep-lib/dbsim/db_server_state.cpp b/wsrep-lib/dbsim/db_server_state.cpp
new file mode 100644
index 00000000..2bc17d29
--- /dev/null
+++ b/wsrep-lib/dbsim/db_server_state.cpp
@@ -0,0 +1,24 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_server_state.hpp"
+#include "db_server.hpp"
+
+#include "wsrep/logger.hpp"
+
diff --git a/wsrep-lib/dbsim/db_server_state.hpp b/wsrep-lib/dbsim/db_server_state.hpp
new file mode 100644
index 00000000..49d4499e
--- /dev/null
+++ b/wsrep-lib/dbsim/db_server_state.hpp
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_SERVER_CONTEXT_HPP
+#define WSREP_DB_SERVER_CONTEXT_HPP
+
+#include "wsrep/server_state.hpp"
+#include "wsrep/server_service.hpp"
+#include "wsrep/client_state.hpp"
+
+#include <atomic>
+
+namespace db
+{
+ class server;
+ class server_state : public wsrep::server_state
+ {
+ public:
+ server_state(wsrep::server_service& server_service,
+ const std::string& name,
+ const std::string& address,
+ const std::string& working_dir)
+ : wsrep::server_state(
+ mutex_,
+ cond_,
+ server_service,
+ nullptr,
+ name,
+ "",
+ address,
+ working_dir,
+ wsrep::gtid::undefined(),
+ 1,
+ wsrep::server_state::rm_async)
+ , mutex_()
+ , cond_()
+ { }
+ private:
+ wsrep::default_mutex mutex_;
+ wsrep::default_condition_variable cond_;
+ };
+}
+
+#endif // WSREP_DB_SERVER_CONTEXT_HPP
diff --git a/wsrep-lib/dbsim/db_simulator.cpp b/wsrep-lib/dbsim/db_simulator.cpp
new file mode 100644
index 00000000..0971b110
--- /dev/null
+++ b/wsrep-lib/dbsim/db_simulator.cpp
@@ -0,0 +1,254 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_simulator.hpp"
+#include "db_client.hpp"
+#include "db_threads.hpp"
+#include "db_tls.hpp"
+
+#include "wsrep/logger.hpp"
+
+#include <boost/filesystem.hpp>
+#include <sstream>
+
+static db::ti thread_instrumentation;
+static db::tls tls_service;
+
+void db::simulator::run()
+{
+ start();
+ stop();
+ std::flush(std::cerr);
+ std::cout << "Results:\n";
+ std::cout << stats() << std::endl;
+ std::cout << db::ti::stats() << std::endl;
+ std::cout << db::tls::stats() << std::endl;
+}
+
+void db::simulator::sst(db::server& server,
+ const std::string& request,
+ const wsrep::gtid& gtid,
+ bool bypass)
+{
+ // The request may contain extra trailing '\0' after it goes
+ // through the provider, strip it first.
+ std::string name(request);
+ name.erase(std::find(name.begin(), name.end(), '\0'), name.end());
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ auto i(servers_.find(name));
+ wsrep::log_info() << "SST request '" << name << "'";
+ if (i == servers_.end())
+ {
+ wsrep::log_error() << "Server " << request << " not found";
+ wsrep::log_info() << "servers:";
+ for (const auto& s : servers_)
+ {
+ wsrep::log_info() << "server: " << s.first;
+ }
+ throw wsrep::runtime_error("Server " + request + " not found");
+ }
+ if (bypass == false)
+ {
+ wsrep::log_info() << "SST "
+ << server.server_state().name()
+ << " -> " << request;
+ i->second->storage_engine().store_position(gtid);
+ i->second->storage_engine().store_view(
+ server.storage_engine().get_view());
+ }
+
+ db::client dummy(*(i->second), wsrep::client_id(-1),
+ wsrep::client_state::m_local, params());
+
+ i->second->server_state().sst_received(dummy.client_service(), 0);
+ server.server_state().sst_sent(gtid, 0);
+}
+
+std::string db::simulator::stats() const
+{
+ auto duration(std::chrono::duration<double>(
+ clients_stop_ - clients_start_).count());
+ long long transactions(stats_.commits + stats_.rollbacks);
+ long long bf_aborts(0);
+ for (const auto& s : servers_)
+ {
+ bf_aborts += s.second->storage_engine().bf_aborts();
+ }
+ std::ostringstream os;
+ os << "Number of transactions: " << transactions
+ << "\n"
+ << "Seconds: " << duration
+ << " \n"
+ << "Transactions per second: " << double(transactions)/double(duration)
+ << "\n"
+ << "BF aborts: "
+ << bf_aborts
+ << "\n"
+ << "Client commits: " << stats_.commits
+ << "\n"
+ << "Client rollbacks: " << stats_.rollbacks
+ << "\n"
+ << "Client replays: " << stats_.replays;
+ return os.str();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Private //
+////////////////////////////////////////////////////////////////////////////////
+
+void db::simulator::start()
+{
+ thread_instrumentation.level(params_.thread_instrumentation);
+ thread_instrumentation.cond_checks(params_.cond_checks);
+ tls_service.init(params_.tls_service);
+ wsrep::log_info() << "Provider: " << params_.wsrep_provider;
+
+ std::string cluster_address(build_cluster_address());
+ wsrep::log_info() << "Cluster address: " << cluster_address;
+ for (size_t i(0); i < params_.n_servers; ++i)
+ {
+ std::ostringstream name_os;
+ name_os << (i + 1);
+ std::ostringstream id_os;
+ id_os << (i + 1);
+ std::ostringstream address_os;
+ address_os << "127.0.0.1:" << server_port(i);
+ wsrep::id server_id(id_os.str());
+ auto it(servers_.insert(
+ std::make_pair(
+ name_os.str(),
+ std::make_unique<db::server>(
+ *this,
+ name_os.str(),
+ address_os.str()))));
+ if (it.second == false)
+ {
+ throw wsrep::runtime_error("Failed to add server");
+ }
+ boost::filesystem::path dir("dbsim_" + id_os.str() + "_data");
+ boost::filesystem::create_directory(dir);
+
+ db::server& server(*it.first->second);
+ server.server_state().debug_log_level(params_.debug_log_level);
+ std::string server_options(params_.wsrep_provider_options);
+
+ wsrep::provider::services services;
+ services.thread_service = params_.thread_instrumentation
+ ? &thread_instrumentation
+ : nullptr;
+ services.tls_service = params_.tls_service
+ ? &tls_service
+ : nullptr;
+ if (server.server_state().load_provider(params_.wsrep_provider,
+ server_options, services))
+ {
+ throw wsrep::runtime_error("Failed to load provider");
+ }
+ if (server.server_state().connect("sim_cluster", cluster_address, "",
+ i == 0))
+ {
+ throw wsrep::runtime_error("Failed to connect");
+ }
+ wsrep::log_debug() << "main: Starting applier";
+ server.start_applier();
+ wsrep::log_debug() << "main: Waiting initializing state";
+ server.server_state().wait_until_state(wsrep::server_state::s_initializing);
+ wsrep::log_debug() << "main: Calling initialized";
+ server.server_state().initialized();
+ wsrep::log_debug() << "main: Waiting for synced state";
+ server.server_state().wait_until_state(
+ wsrep::server_state::s_synced);
+ wsrep::log_debug() << "main: Server synced";
+ }
+
+ // Start client threads
+ wsrep::log_info() << "####################### Starting client load";
+ clients_start_ = std::chrono::steady_clock::now();
+ size_t index(0);
+ for (auto& i : servers_)
+ {
+ if (params_.topology.size() == 0 || params_.topology[index] == 'm')
+ {
+ i.second->start_clients();
+ }
+ ++index;
+ }
+}
+
+void db::simulator::stop()
+{
+ for (auto& i : servers_)
+ {
+ db::server& server(*i.second);
+ server.stop_clients();
+ }
+ clients_stop_ = std::chrono::steady_clock::now();
+ wsrep::log_info() << "######## Stats ############";
+ wsrep::log_info() << stats();
+ std::cout << db::ti::stats() << std::endl;
+ wsrep::log_info() << "######## Stats ############";
+ if (params_.fast_exit)
+ {
+ exit(0);
+ }
+ for (auto& i : servers_)
+ {
+ db::server& server(*i.second);
+ wsrep::log_info() << "Status for server: "
+ << server.server_state().id();
+ auto status(server.server_state().provider().status());
+ for_each(status.begin(), status.end(),
+ [](const wsrep::provider::status_variable& sv)
+ {
+ wsrep::log_info() << sv.name() << " = " << sv.value();
+ });
+ server.server_state().disconnect();
+ server.server_state().wait_until_state(
+ wsrep::server_state::s_disconnected);
+ server.stop_applier();
+ server.server_state().unload_provider();
+ }
+}
+
+std::string db::simulator::server_port(size_t i) const
+{
+ std::ostringstream os;
+ os << (10000 + (i + 1)*10);
+ return os.str();
+}
+
+std::string db::simulator::build_cluster_address() const
+{
+ std::string ret;
+ if (params_.wsrep_provider.find("galera_smm") != std::string::npos)
+ {
+ ret += "gcomm://";
+ }
+
+ for (size_t i(0); i < params_.n_servers; ++i)
+ {
+ std::ostringstream sa_os;
+ sa_os << "127.0.0.1:";
+ sa_os << server_port(i);
+ ret += sa_os.str();
+ if (i < params_.n_servers - 1) ret += ",";
+ }
+ return ret;
+}
diff --git a/wsrep-lib/dbsim/db_simulator.hpp b/wsrep-lib/dbsim/db_simulator.hpp
new file mode 100644
index 00000000..693645e3
--- /dev/null
+++ b/wsrep-lib/dbsim/db_simulator.hpp
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_SIMULATOR_HPP
+#define WSREP_DB_SIMULATOR_HPP
+
+#include "wsrep/gtid.hpp"
+#include "wsrep/mutex.hpp"
+#include "wsrep/lock.hpp"
+
+#include "db_params.hpp"
+#include "db_server.hpp"
+
+#include <memory>
+#include <chrono>
+#include <unordered_set>
+#include <map>
+
+namespace db
+{
+ class server;
+ class simulator
+ {
+ public:
+ simulator(const params& params)
+ : mutex_()
+ , params_(params)
+ , servers_()
+ , clients_start_()
+ , clients_stop_()
+ , stats_()
+ { }
+
+ void run();
+ void sst(db::server&,
+ const std::string&, const wsrep::gtid&, bool);
+ const db::params& params() const
+ { return params_; }
+ std::string stats() const;
+ private:
+ void start();
+ void stop();
+ std::string server_port(size_t i) const;
+ std::string build_cluster_address() const;
+
+ wsrep::default_mutex mutex_;
+ const db::params& params_;
+ std::map<std::string, std::unique_ptr<db::server>> servers_;
+ std::chrono::time_point<std::chrono::steady_clock> clients_start_;
+ std::chrono::time_point<std::chrono::steady_clock> clients_stop_;
+ public:
+ struct stats
+ {
+ long long commits;
+ long long rollbacks;
+ long long replays;
+ stats()
+ : commits(0)
+ , rollbacks(0)
+ , replays(0)
+ { }
+ } stats_;
+ };
+}
+#endif // WSRE_DB_SIMULATOR_HPP
diff --git a/wsrep-lib/dbsim/db_storage_engine.cpp b/wsrep-lib/dbsim/db_storage_engine.cpp
new file mode 100644
index 00000000..c102783a
--- /dev/null
+++ b/wsrep-lib/dbsim/db_storage_engine.cpp
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_storage_engine.hpp"
+#include "db_client.hpp"
+
+void db::storage_engine::transaction::start(db::client* cc)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
+ if (se_.transactions_.insert(cc).second == false)
+ {
+ ::abort();
+ }
+ cc_ = cc;
+}
+
+void db::storage_engine::transaction::apply(
+ const wsrep::transaction& transaction)
+{
+ assert(cc_);
+ se_.bf_abort_some(transaction);
+}
+
+void db::storage_engine::transaction::commit(const wsrep::gtid& gtid)
+{
+ if (cc_)
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
+ se_.transactions_.erase(cc_);
+ se_.store_position(gtid);
+ }
+ cc_ = nullptr;
+}
+
+
+void db::storage_engine::transaction::rollback()
+{
+ if (cc_)
+ {
+ wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
+ se_.transactions_.erase(cc_);
+ }
+ cc_ = nullptr;
+}
+
+void db::storage_engine::bf_abort_some(const wsrep::transaction& txc)
+{
+ std::uniform_int_distribution<size_t> uniform_dist(0, alg_freq_);
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+ if (alg_freq_ && uniform_dist(random_engine_) == 0)
+ {
+ if (transactions_.empty() == false)
+ {
+ for (auto victim : transactions_)
+ {
+ wsrep::client_state& cc(victim->client_state());
+ if (cc.mode() == wsrep::client_state::m_local)
+ {
+ if (victim->bf_abort(txc.seqno()))
+ {
+ ++bf_aborts_;
+ }
+ break;
+ }
+ }
+ }
+ }
+}
+
+void db::storage_engine::store_position(const wsrep::gtid& gtid)
+{
+ validate_position(gtid);
+ position_ = gtid;
+}
+
+wsrep::gtid db::storage_engine::get_position() const
+{
+ return position_;
+}
+
+void db::storage_engine::store_view(const wsrep::view& view)
+{
+ view_ = view;
+}
+
+wsrep::view db::storage_engine::get_view() const
+{
+ return view_;
+}
+
+void db::storage_engine::validate_position(const wsrep::gtid& gtid) const
+{
+ using std::rel_ops::operator<=;
+ if (position_.id() == gtid.id() && gtid.seqno() <= position_.seqno())
+ {
+ std::ostringstream os;
+ os << "Invalid position submitted, position seqno "
+ << position_.seqno()
+ << " is greater than submitted seqno "
+ << gtid.seqno();
+ throw wsrep::runtime_error(os.str());
+ }
+}
diff --git a/wsrep-lib/dbsim/db_storage_engine.hpp b/wsrep-lib/dbsim/db_storage_engine.hpp
new file mode 100644
index 00000000..0721a2ad
--- /dev/null
+++ b/wsrep-lib/dbsim/db_storage_engine.hpp
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_STORAGE_ENGINE_HPP
+#define WSREP_DB_STORAGE_ENGINE_HPP
+
+#include "db_params.hpp"
+
+#include "wsrep/mutex.hpp"
+#include "wsrep/client_state.hpp"
+
+#include <atomic>
+#include <unordered_set>
+#include <random>
+
+namespace db
+{
+ class client;
+ class storage_engine
+ {
+ public:
+ storage_engine(const params& params)
+ : mutex_()
+ , transactions_()
+ , alg_freq_(params.alg_freq)
+ , bf_aborts_()
+ , position_()
+ , view_()
+ , random_device_()
+ , random_engine_(random_device_())
+ { }
+
+ class transaction
+ {
+ public:
+ transaction(storage_engine& se)
+ : se_(se)
+ , cc_()
+ { }
+ ~transaction()
+ {
+ rollback();
+ }
+ bool active() const { return cc_ != nullptr; }
+ void start(client* cc);
+ void apply(const wsrep::transaction&);
+ void commit(const wsrep::gtid&);
+ void rollback();
+ db::client* client() { return cc_; }
+ transaction(const transaction&) = delete;
+ transaction& operator=(const transaction&) = delete;
+ private:
+ db::storage_engine& se_;
+ db::client* cc_;
+ };
+ void bf_abort_some(const wsrep::transaction& tc);
+ long long bf_aborts() const { return bf_aborts_; }
+ void store_position(const wsrep::gtid& gtid);
+ wsrep::gtid get_position() const;
+ void store_view(const wsrep::view& view);
+ wsrep::view get_view() const;
+ private:
+ void validate_position(const wsrep::gtid& gtid) const;
+ wsrep::default_mutex mutex_;
+ std::unordered_set<db::client*> transactions_;
+ size_t alg_freq_;
+ std::atomic<long long> bf_aborts_;
+ wsrep::gtid position_;
+ wsrep::view view_;
+ std::random_device random_device_;
+ std::default_random_engine random_engine_;
+ };
+}
+
+#endif // WSREP_DB_STORAGE_ENGINE_HPP
diff --git a/wsrep-lib/dbsim/db_storage_service.hpp b/wsrep-lib/dbsim/db_storage_service.hpp
new file mode 100644
index 00000000..839253db
--- /dev/null
+++ b/wsrep-lib/dbsim/db_storage_service.hpp
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_STORAGE_SERVICE_HPP
+#define WSREP_DB_STORAGE_SERVICE_HPP
+
+#include "wsrep/storage_service.hpp"
+#include "wsrep/exception.hpp"
+
+namespace db
+{
+ class storage_service : public wsrep::storage_service
+ {
+ int start_transaction(const wsrep::ws_handle&) override
+ { throw wsrep::not_implemented_error(); }
+ void adopt_transaction(const wsrep::transaction&) override
+ { throw wsrep::not_implemented_error(); }
+ int append_fragment(const wsrep::id&,
+ wsrep::transaction_id,
+ int,
+ const wsrep::const_buffer&,
+ const wsrep::xid&) override
+ { throw wsrep::not_implemented_error(); }
+ int update_fragment_meta(const wsrep::ws_meta&) override
+ { throw wsrep::not_implemented_error(); }
+ int remove_fragments() override
+ { throw wsrep::not_implemented_error(); }
+ int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override
+ { throw wsrep::not_implemented_error(); }
+ int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)
+ override
+ { throw wsrep::not_implemented_error(); }
+ void store_globals() override { }
+ void reset_globals() override { }
+ };
+}
+
+#endif // WSREP_DB_STORAGE_SERVICE_HPP
diff --git a/wsrep-lib/dbsim/db_threads.cpp b/wsrep-lib/dbsim/db_threads.cpp
new file mode 100644
index 00000000..d066c65a
--- /dev/null
+++ b/wsrep-lib/dbsim/db_threads.cpp
@@ -0,0 +1,727 @@
+/*
+ * Copyright (C) 2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_threads.hpp"
+#include "wsrep/compiler.hpp"
+#include "wsrep/logger.hpp"
+
+#include <cassert>
+#include <pthread.h>
+
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <map>
+#include <mutex>
+#include <ostream>
+#include <sstream>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+extern "C" { static void* start_thread(void* args_ptr); }
+namespace
+{
+ struct ti_obj
+ {
+ };
+ enum ti_opcode
+ {
+ oc_thread_create,
+ oc_thread_destroy,
+ oc_mutex_create,
+ oc_mutex_destroy,
+ oc_mutex_lock,
+ oc_mutex_trylock,
+ oc_mutex_unlock,
+ oc_cond_create,
+ oc_cond_destroy,
+ oc_cond_wait,
+ oc_cond_timedwait,
+ oc_cond_signal,
+ oc_cond_broadcast,
+ oc_max // must be the last
+ };
+
+ static const char* ti_opstring(enum ti_opcode op)
+ {
+ switch (op)
+ {
+ case oc_thread_create: return "thread_create";
+ case oc_thread_destroy: return "thread_destroy";
+ case oc_mutex_create: return "mutex_create";
+ case oc_mutex_destroy: return "mutex_destroy";
+ case oc_mutex_lock: return "mutex_lock";
+ case oc_mutex_trylock: return "mutex_trylock";
+ case oc_mutex_unlock: return "mutex_unlock";
+ case oc_cond_create: return "cond_create";
+ case oc_cond_destroy: return "cond_destroy";
+ case oc_cond_wait: return "cond_wait";
+ case oc_cond_timedwait: return "cond_timedwait";
+ case oc_cond_signal: return "cond_signal";
+ case oc_cond_broadcast: return "cond_broadcast";
+ default: return "unknown";
+ }
+ }
+
+ static std::vector<std::string> key_vec;
+ static std::atomic<int> key_cnt;
+ static std::vector<std::vector<size_t>> ops_map;
+ static std::vector<std::mutex*> ops_map_sync;
+ static struct ops_map_sync_deleter
+ {
+ ~ops_map_sync_deleter()
+ {
+ std::for_each(ops_map_sync.begin(), ops_map_sync.end(),
+ [](auto entry) { delete entry; });
+ }
+ } ops_map_sync_deleter;
+ static std::array<std::atomic<size_t>, oc_max> total_ops;
+ static std::atomic<size_t> total_allocations;
+ static std::atomic<size_t> mutex_contention;
+ static std::unordered_map<std::string, size_t> mutex_contention_counts;
+ static int op_level;
+ // Check correct condition variable usage:
+ // - Associated mutex must be locked when waiting for cond
+ // - There must be at least one waiter when signalling for condition
+ static bool cond_checks;
+ static inline void cond_check(bool condition, const char* name,
+ const char* message)
+ {
+ if (cond_checks && !condition)
+ {
+ wsrep::log_error() << "Condition variable check failed for '"
+ << name << "': " << message;
+ ::abort();
+ }
+ }
+ static inline int append_key(const char* name, const char* type)
+ {
+
+ key_vec.push_back(std::string(name) + "_" + type);
+ wsrep::log_info() << "Register key " << name << "_" << type
+ << " with index " << (key_cnt + 1);
+ ops_map.push_back(std::vector<size_t>());
+ ops_map_sync.push_back(new std::mutex());
+ ops_map.back().resize(oc_max);
+ return ++key_cnt;
+ }
+
+ template <class Key> static inline size_t get_key_index(const Key* key)
+ {
+ size_t index(reinterpret_cast<const size_t>(key) - 1);
+ assert(index < key_vec.size());
+ return index;
+ }
+
+ template <class Key>
+ static inline const char* get_key_name(const Key* key)
+ {
+ return key_vec[get_key_index(key)].c_str();
+ }
+
+ static inline const std::string& get_key_name_by_index(size_t index)
+ {
+ assert(index < key_vec.size());
+ return key_vec[index];
+ }
+
+ // Note: Do not refer the obj pointer in this function, it may
+ // have been deleted before the call.
+ template <class Key>
+ static inline void update_ops(const ti_obj* obj,
+ const Key* key,
+ enum ti_opcode op)
+ {
+ if (op_level < 1)
+ return;
+ total_ops[op] += 1;
+ if (op_level < 2)
+ return;
+ if (false && op == oc_mutex_destroy)
+ {
+ wsrep::log_info() << "thread: " << std::this_thread::get_id()
+ << " object: " << obj
+ << ": name: " << get_key_name(key)
+ << " op: " << ti_opstring(op);
+ }
+
+ std::lock_guard<std::mutex> lock(*ops_map_sync[get_key_index(key)]);
+ ops_map[get_key_index(key)][op] += 1;
+ }
+
+ struct thread_args
+ {
+ void* this_thread;
+ void* (*fn)(void*);
+ void* args;
+ };
+
+ pthread_key_t this_thread_key;
+ struct this_thread_key_initializer
+ {
+ this_thread_key_initializer()
+ {
+ pthread_key_create(&this_thread_key, nullptr);
+ }
+
+ ~this_thread_key_initializer()
+ {
+ pthread_key_delete(this_thread_key);
+ }
+ };
+
+
+ class ti_thread : public ti_obj
+ {
+ public:
+ ti_thread(const wsrep::thread_service::thread_key* key)
+ : key_(key)
+ , th_()
+ , retval_()
+ , detached_()
+ {
+ update_ops(this, key_, oc_thread_create);
+ }
+ ~ti_thread()
+ {
+ update_ops(this, key_, oc_thread_destroy);
+ }
+
+ ti_thread(const ti_thread&) = delete;
+ ti_thread& operator=(const ti_thread&) = delete;
+ int run(void* (*fn)(void *), void* args)
+ {
+ auto ta(new thread_args{this, fn, args});
+ return pthread_create(&th_, nullptr, start_thread, ta);
+ }
+
+ int detach()
+ {
+ detached_ = true;
+ return pthread_detach(th_);
+ }
+
+ int join(void** retval)
+ {
+ return pthread_join(th_, retval);
+ }
+
+ bool detached() const { return detached_; }
+
+ void retval(void* retval) { retval_ = retval; }
+
+ static ti_thread* self()
+ {
+ return reinterpret_cast<ti_thread*>(
+ pthread_getspecific(this_thread_key));
+ }
+
+ int setschedparam(int policy, const struct sched_param* param)
+ {
+ return pthread_setschedparam(th_, policy, param);
+ }
+
+ int getschedparam(int* policy, struct sched_param* param)
+ {
+ return pthread_getschedparam(th_, policy, param);
+ }
+
+ int equal(ti_thread* other)
+ {
+ return pthread_equal(th_, other->th_);
+ }
+ private:
+ const wsrep::thread_service::thread_key* key_;
+ pthread_t th_;
+ void* retval_;
+ bool detached_;
+ };
+
+ class ti_mutex : public ti_obj
+ {
+ public:
+ ti_mutex(const wsrep::thread_service::mutex_key* key, bool inplace)
+ : mutex_(PTHREAD_MUTEX_INITIALIZER)
+ , key_(key)
+ , inplace_(inplace)
+#ifndef NDEBUG
+ , locked_()
+ , owner_()
+#endif // ! NDEBUG
+ {
+ update_ops(this, key_, oc_mutex_create);
+ if (not inplace) total_allocations++;
+ }
+
+ ~ti_mutex() { update_ops(this, key_, oc_mutex_destroy); }
+
+ ti_mutex& operator=(const ti_mutex&) = delete;
+ ti_mutex(const ti_mutex&) = delete;
+
+ int lock()
+ {
+ update_ops(this, key_, oc_mutex_lock);
+ int ret(pthread_mutex_trylock(&mutex_));
+ if (ret == EBUSY)
+ {
+ mutex_contention++;
+ {
+ std::lock_guard<std::mutex> lock(*ops_map_sync[get_key_index(key_)]);
+ mutex_contention_counts[get_key_name(key_)] += 1;
+ }
+ ret = pthread_mutex_lock(&mutex_);
+ }
+#ifndef NDEBUG
+ if (ret == 0)
+ {
+ assert(owner_ == std::thread::id());
+ locked_ = true;
+ owner_ = std::this_thread::get_id();
+ }
+#endif // ! NDEBUG
+ return ret;
+ }
+ int trylock()
+ {
+ update_ops(this, key_, oc_mutex_trylock);
+ int ret(pthread_mutex_trylock(&mutex_));
+#ifndef NDEBUG
+ if (ret == 0)
+ {
+ assert(owner_ == std::thread::id());
+ locked_ = true;
+ owner_ = std::this_thread::get_id();
+ }
+#endif // ! NDEBUG
+ return ret;
+ }
+
+ int unlock()
+ {
+ assert(locked_);
+#ifndef NDEBUG
+ assert(owner_ == std::this_thread::get_id());
+ owner_ = std::thread::id();
+#endif // ! NDEBUG
+ // Use temporary object. After mutex is unlocked it may be
+ // destroyed before this update_ops() finishes.
+ auto key(key_);
+ int ret(pthread_mutex_unlock(&mutex_));
+ update_ops(this, key, oc_mutex_unlock);
+ return ret;
+ }
+
+ struct condwait_context
+ {
+#ifndef NDEBUG
+ bool locked;
+ std::thread::id owner;
+#endif // ! NDEBUG
+ };
+
+ condwait_context save_for_condwait()
+ {
+#ifndef NDEBUG
+ return condwait_context{ locked_, owner_ };
+#else
+ return condwait_context{};
+#endif // ! NDEBUG
+ }
+
+ void reset()
+ {
+#ifndef NDEBUG
+ locked_ = false;
+ owner_ = std::thread::id();
+#endif // ! NDEBUG
+ }
+
+ void restore_from_condwait(const condwait_context& ctx WSREP_UNUSED)
+ {
+#ifndef NDEBUG
+ locked_ = ctx.locked;
+ owner_ = ctx.owner;
+#endif // ! NDEBUG
+ }
+
+ pthread_mutex_t* native_handle() { return &mutex_; }
+ const wsrep::thread_service::mutex_key* key() const { return key_; }
+
+ bool inplace() const { return inplace_; }
+ private:
+ pthread_mutex_t mutex_;
+ const wsrep::thread_service::mutex_key* key_;
+ const bool inplace_;
+#ifndef NDEBUG
+ bool locked_;
+ std::atomic<std::thread::id> owner_;
+#endif // ! NDEBU
+ };
+
+ class ti_cond : public ti_obj
+ {
+ public:
+ ti_cond(const wsrep::thread_service::cond_key* key, bool inplace)
+ : cond_(PTHREAD_COND_INITIALIZER)
+ , key_(key)
+ , inplace_(inplace)
+ , waiter_()
+ {
+ update_ops(this, key_, oc_cond_create);
+ if (not inplace) total_allocations++;
+ }
+
+ ~ti_cond() { update_ops(this, key_, oc_cond_destroy); }
+
+ ti_cond& operator=(const ti_cond&) = delete;
+ ti_cond(const ti_cond&) = delete;
+
+ int wait(ti_mutex& mutex)
+ {
+ cond_check(pthread_mutex_trylock(mutex.native_handle()),
+ get_key_name(key_), "Mutex not locked in cond wait");
+ waiter_ = true;
+ update_ops(this, key_, oc_cond_wait);
+ // update_ops(&mutex, mutex.key(), oc_mutex_unlock);
+ auto condwait_ctx(mutex.save_for_condwait());
+ mutex.reset();
+ int ret(pthread_cond_wait(&cond_, mutex.native_handle()));
+ // update_ops(&mutex, mutex.key(), oc_mutex_lock);
+ mutex.restore_from_condwait(condwait_ctx);
+ waiter_ = false;
+ return ret;
+ }
+
+ int timedwait(ti_mutex& mutex, const struct timespec* ts)
+ {
+ cond_check(pthread_mutex_trylock(mutex.native_handle()),
+ get_key_name(key_), "Mutex not locked in cond wait");
+ waiter_ = true;
+ update_ops(this, key_, oc_cond_timedwait);
+ // update_ops(&mutex, mutex.key(), oc_mutex_unlock);
+ auto condwait_ctx(mutex.save_for_condwait());
+ mutex.reset();
+ int ret(pthread_cond_timedwait(&cond_, mutex.native_handle(), ts));
+ // update_ops(&mutex, mutex.key(), oc_mutex_lock);
+ mutex.restore_from_condwait(condwait_ctx);
+ waiter_ = false;
+ return ret;
+ }
+
+ int signal()
+ {
+ update_ops(this, key_, oc_cond_signal);
+ cond_check(waiter_, get_key_name(key_),
+ "Signalling condition variable without waiter");
+ return pthread_cond_signal(&cond_);
+ }
+
+ int broadcast()
+ {
+ update_ops(this, key_, oc_cond_broadcast);
+ return pthread_cond_broadcast(&cond_);
+ }
+
+ bool inplace() const { return inplace_; }
+ private:
+ pthread_cond_t cond_;
+ const wsrep::thread_service::cond_key* key_;
+ const bool inplace_;
+ bool waiter_;
+ };
+}
+
+int db::ti::before_init()
+{
+ wsrep::log_info() << "db::ti::before_init()";
+ return 0;
+}
+
+int db::ti::after_init()
+{
+ wsrep::log_info() << "db::ti::after_init()";
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Thread //
+//////////////////////////////////////////////////////////////////////////////
+
+extern "C"
+{
+static void* start_thread(void* args_ptr)
+{
+ thread_args* ta(reinterpret_cast<thread_args*>(args_ptr));
+ ti_thread* thread = reinterpret_cast<ti_thread*>(ta->this_thread);
+ pthread_setspecific(this_thread_key, thread);
+ void* (*fn)(void*) = ta->fn;
+ void* args = ta->args;
+ delete ta;
+ void* ret = (*fn)(args);
+ pthread_setspecific(this_thread_key, nullptr);
+ // If we end here the thread returned instead of calling
+ // pthread_exit()
+ if (thread->detached())
+ delete thread;
+ return ret;
+}
+
+WSREP_NORETURN
+static void exit_thread(wsrep::thread_service::thread* thread, void* retval)
+{
+ pthread_setspecific(this_thread_key, nullptr);
+ ti_thread* th(reinterpret_cast<ti_thread*>(thread));
+ th->retval(retval);
+ if (th->detached())
+ delete th;
+ pthread_exit(retval);
+}
+} // extern "C"
+
+db::ti::ti()
+{
+ thread_service::exit = exit_thread;
+}
+
+const wsrep::thread_service::thread_key*
+db::ti::create_thread_key(const char* name) WSREP_NOEXCEPT
+{
+ assert(name);
+ return reinterpret_cast<const wsrep::thread_service::thread_key*>(
+ append_key(name, "thread"));
+}
+
+int db::ti::create_thread(const wsrep::thread_service::thread_key* key,
+ wsrep::thread_service::thread** thread,
+ void* (*fn)(void*), void* args) WSREP_NOEXCEPT
+{
+ auto pit(new ti_thread(key));
+ total_allocations++;
+ int ret;
+ if ((ret = pit->run(fn, args)))
+ {
+ delete pit;
+ }
+ else
+ {
+ *thread = reinterpret_cast<wsrep::thread_service::thread*>(pit);
+ }
+ return ret;
+}
+
+int db::ti::detach(wsrep::thread_service::thread* thread) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_thread*>(thread)->detach();
+}
+
+int db::ti::equal(wsrep::thread_service::thread* thread_1,
+ wsrep::thread_service::thread* thread_2) WSREP_NOEXCEPT
+{
+ return (reinterpret_cast<ti_thread*>(thread_1)->equal(
+ reinterpret_cast<ti_thread*>(thread_2)));
+}
+
+int db::ti::join(wsrep::thread_service::thread* thread, void** retval) WSREP_NOEXCEPT
+{
+ ti_thread* th(reinterpret_cast<ti_thread*>(thread));
+ int ret(th->join(retval));
+ if (not th->detached())
+ {
+ delete th;
+ }
+ return ret;
+}
+
+wsrep::thread_service::thread* db::ti::self() WSREP_NOEXCEPT
+{
+ return reinterpret_cast<wsrep::thread_service::thread*>(ti_thread::self());
+}
+
+int db::ti::setschedparam(wsrep::thread_service::thread* thread,
+ int policy, const struct sched_param* param) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_thread*>(thread)->setschedparam(policy, param);
+}
+
+int db::ti::getschedparam(wsrep::thread_service::thread* thread,
+ int* policy, struct sched_param* param) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_thread*>(thread)->getschedparam(policy, param);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Mutex //
+//////////////////////////////////////////////////////////////////////////////
+
+const wsrep::thread_service::mutex_key*
+db::ti::create_mutex_key(const char* name) WSREP_NOEXCEPT
+{
+ assert(name);
+ return reinterpret_cast<const wsrep::thread_service::mutex_key*>(
+ append_key(name, "mutex"));
+}
+
+wsrep::thread_service::mutex*
+db::ti::init_mutex(const wsrep::thread_service::mutex_key* key, void* memblock,
+ size_t memblock_size) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<wsrep::thread_service::mutex*>(
+ memblock_size >= sizeof(ti_mutex) ? new (memblock) ti_mutex(key, true)
+ : new ti_mutex(key, false));
+}
+
+int db::ti::destroy(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
+{
+ ti_mutex* m(reinterpret_cast<ti_mutex*>(mutex));
+ if (m->inplace())
+ {
+ m->~ti_mutex();
+ }
+ else
+ {
+ delete m;
+ }
+ return 0;
+}
+
+int db::ti::lock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_mutex*>(mutex)->lock();
+}
+
+int db::ti::trylock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_mutex*>(mutex)->trylock();
+}
+
+int db::ti::unlock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_mutex*>(mutex)->unlock();
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// Cond //
+//////////////////////////////////////////////////////////////////////////////
+
+const wsrep::thread_service::cond_key* db::ti::create_cond_key(const char* name) WSREP_NOEXCEPT
+{
+ assert(name);
+ return reinterpret_cast<const wsrep::thread_service::cond_key*>(
+ append_key(name, "cond"));
+}
+
+wsrep::thread_service::cond*
+db::ti::init_cond(const wsrep::thread_service::cond_key* key, void* memblock,
+ size_t memblock_size) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<wsrep::thread_service::cond*>(
+ memblock_size >= sizeof(ti_cond) ? new (memblock) ti_cond(key, true)
+ : new ti_cond(key, false));
+}
+
+int db::ti::destroy(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT
+{
+ ti_cond* c(reinterpret_cast<ti_cond*>(cond));
+ if (c->inplace())
+ {
+ c->~ti_cond();
+ }
+ else
+ {
+ delete c;
+ }
+ return 0;
+}
+
+int db::ti::wait(wsrep::thread_service::cond* cond,
+ wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_cond*>(cond)->wait(
+ *reinterpret_cast<ti_mutex*>(mutex));
+}
+
+int db::ti::timedwait(wsrep::thread_service::cond* cond,
+ wsrep::thread_service::mutex* mutex,
+ const struct timespec* ts) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_cond*>(cond)->timedwait(
+ *reinterpret_cast<ti_mutex*>(mutex), ts);
+}
+
+int db::ti::signal(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_cond*>(cond)->signal();
+}
+
+int db::ti::broadcast(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT
+{
+ return reinterpret_cast<ti_cond*>(cond)->broadcast();
+}
+
+void db::ti::level(int level)
+{
+ ::op_level = level;
+}
+
+void db::ti::cond_checks(bool cond_checks)
+{
+ if (cond_checks)
+ wsrep::log_info() << "Enabling condition variable checking";
+ ::cond_checks = cond_checks;
+}
+
+std::string db::ti::stats()
+{
+ std::ostringstream os;
+ os << "Totals:\n";
+ for (size_t i(0); i < total_ops.size(); ++i)
+ {
+ if (total_ops[i] > 0)
+ {
+ os << " " << ti_opstring(static_cast<enum ti_opcode>(i)) << ": "
+ << total_ops[i] << "\n";
+ }
+ }
+ os << "Total allocations: " << total_allocations << "\n";
+ os << "Mutex contention: " << mutex_contention << "\n";
+ for (auto i : mutex_contention_counts)
+ {
+ os << " " << i.first << ": " << i.second << "\n";
+ }
+ os << "Per key:\n";
+ std::map<std::string, std::vector<size_t>> sorted;
+ for (size_t i(0); i < ops_map.size(); ++i)
+ {
+ sorted.insert(std::make_pair(get_key_name_by_index(i), ops_map[i]));
+ }
+ for (auto i : sorted)
+ {
+ for (size_t j(0); j < i.second.size(); ++j)
+ {
+ if (i.second[j])
+ {
+ os << " " << i.first << ": "
+ << ti_opstring(static_cast<enum ti_opcode>(j)) << ": "
+ << i.second[j] << "\n";
+ }
+ }
+ }
+ return os.str();
+}
diff --git a/wsrep-lib/dbsim/db_threads.hpp b/wsrep-lib/dbsim/db_threads.hpp
new file mode 100644
index 00000000..32d44c10
--- /dev/null
+++ b/wsrep-lib/dbsim/db_threads.hpp
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2019 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_THREADS_HPP
+#define WSREP_DB_THREADS_HPP
+
+#include "wsrep/thread_service.hpp"
+#include <string>
+
+namespace db
+{
+ class ti : public wsrep::thread_service
+ {
+ public:
+ ti();
+ int before_init() override;
+ int after_init() override;
+
+ /* Thread */
+ const wsrep::thread_service::thread_key*
+ create_thread_key(const char* name) WSREP_NOEXCEPT override;
+ int create_thread(const wsrep::thread_service::thread_key* key,
+ wsrep::thread_service::thread**,
+ void* (*fn)(void*), void*) WSREP_NOEXCEPT override;
+ int detach(wsrep::thread_service::thread*) WSREP_NOEXCEPT override;
+ int equal(wsrep::thread_service::thread*,
+ wsrep::thread_service::thread*) WSREP_NOEXCEPT override;
+ int join(wsrep::thread_service::thread*, void**) WSREP_NOEXCEPT override;
+ wsrep::thread_service::thread* self() WSREP_NOEXCEPT override;
+ int setschedparam(wsrep::thread_service::thread*, int,
+ const struct sched_param*) WSREP_NOEXCEPT override;
+ int getschedparam(wsrep::thread_service::thread*, int*,
+ struct sched_param*) WSREP_NOEXCEPT override;
+
+ /* Mutex */
+ const wsrep::thread_service::mutex_key*
+ create_mutex_key(const char* name) WSREP_NOEXCEPT override;
+ wsrep::thread_service::mutex*
+ init_mutex(const wsrep::thread_service::mutex_key* key, void* memblock,
+ size_t memblock_size) WSREP_NOEXCEPT override;
+ int destroy(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
+ int lock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
+ int trylock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
+ int unlock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
+ /* Cond */
+ const wsrep::thread_service::cond_key*
+ create_cond_key(const char* name) WSREP_NOEXCEPT override;
+ wsrep::thread_service::cond*
+ init_cond(const wsrep::thread_service::cond_key* key, void* memblock,
+ size_t memblock_size) WSREP_NOEXCEPT override;
+ int destroy(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override;
+ int wait(wsrep::thread_service::cond* cond,
+ wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
+ int timedwait(wsrep::thread_service::cond* cond,
+ wsrep::thread_service::mutex* mutex,
+ const struct timespec* ts) WSREP_NOEXCEPT override;
+ int signal(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override;
+ int broadcast(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override;
+
+ static void level(int level);
+ static void cond_checks(bool cond_checks);
+ static std::string stats();
+ };
+
+
+}
+
+#endif // WSREP_DB_THREADS_HPP
diff --git a/wsrep-lib/dbsim/db_tls.cpp b/wsrep-lib/dbsim/db_tls.cpp
new file mode 100644
index 00000000..c242668c
--- /dev/null
+++ b/wsrep-lib/dbsim/db_tls.cpp
@@ -0,0 +1,451 @@
+/*
+ * Copyright (C) 2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+/** @file db_tls.cpp
+ *
+ * This file demonstrates the use of TLS service. It does not implement
+ * real encryption, but may manipulate stream bytes for testing purposes.
+ */
+
+#include "db_tls.hpp"
+
+#include "wsrep/logger.hpp"
+
+#include <unistd.h> // read()
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h> // send()
+#include <cassert>
+#include <cerrno>
+#include <cstring>
+
+#include <mutex>
+#include <string>
+
+namespace
+{
+ class db_stream : public wsrep::tls_stream
+ {
+ public:
+ db_stream(int fd, int mode)
+ : fd_(fd)
+ , state_(s_initialized)
+ , last_error_()
+ , mode_(mode)
+ , stats_()
+ , is_blocking_()
+ {
+ int val(fcntl(fd_, F_GETFL, 0));
+ is_blocking_ = not (val & O_NONBLOCK);
+ }
+ struct stats
+ {
+ size_t bytes_read{0};
+ size_t bytes_written{0};
+ };
+
+ /*
+ * in idle --|
+ * |-> ch -| ^ | -> want_read --|
+ * |-> sh -| ---- |--> | -> want_write --|
+ * |----------------------|
+ */
+ enum state
+ {
+ s_initialized,
+ s_client_handshake,
+ s_server_handshake,
+ s_idle,
+ s_want_read,
+ s_want_write
+ };
+
+ int get_error_number() const { return last_error_; }
+ const void* get_error_category() const
+ {
+ return reinterpret_cast<const void*>(1);
+ }
+
+ static char* get_error_message(int value, const void*)
+ {
+ return ::strerror(value);
+ }
+
+ enum wsrep::tls_service::status client_handshake();
+
+ enum wsrep::tls_service::status server_handshake();
+
+ wsrep::tls_service::op_result read(void*, size_t);
+
+ wsrep::tls_service::op_result write(const void*, size_t);
+
+ enum state state() const { return state_; }
+
+ int fd() const { return fd_; }
+ void inc_reads(size_t val) { stats_.bytes_read += val; }
+ void inc_writes(size_t val) { stats_.bytes_written += val; }
+ const stats& get_stats() const { return stats_; }
+ private:
+ enum wsrep::tls_service::status handle_handshake_read(const char* expect);
+ size_t determine_read_count(size_t max_count)
+ {
+ if (is_blocking_ || mode_ < 2) return max_count;
+ else if (::rand() % 100 == 0) return std::min(size_t(42), max_count);
+ else return max_count;
+ }
+ size_t determine_write_count(size_t count)
+ {
+ if (is_blocking_ || mode_ < 2) return count;
+ else if (::rand() % 100 == 0) return std::min(size_t(43), count);
+ else return count;
+ }
+
+ ssize_t do_read(void* buf, size_t max_count)
+ {
+ if (is_blocking_ || mode_ < 3 )
+ return ::read(fd_, buf, max_count);
+ else if (::rand() % 1000 == 0) return EINTR;
+ else return ::read(fd_, buf, max_count);
+ }
+
+ ssize_t do_write(const void* buf, size_t count)
+ {
+ if (is_blocking_ || mode_ < 3)
+ return ::send(fd_, buf, count, MSG_NOSIGNAL);
+ else if (::rand() % 1000 == 0) return EINTR;
+ else return ::send(fd_, buf, count, MSG_NOSIGNAL);
+ }
+
+ wsrep::tls_service::op_result map_success(ssize_t result)
+ {
+ if (is_blocking_ || mode_ < 2)
+ {
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::success, size_t(result)};
+ }
+ else if (::rand() % 1000 == 0)
+ {
+ wsrep::log_info() << "Success want extra read";
+ state_ = s_want_read;
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::want_read, size_t(result)};
+ }
+ else if (::rand() % 1000 == 0)
+ {
+ wsrep::log_info() << "Success want extra write";
+ state_ = s_want_write;
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::want_write, size_t(result)};
+ }
+ else
+ {
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::success, size_t(result)};
+ }
+ }
+
+ wsrep::tls_service::op_result map_result(ssize_t result)
+ {
+ if (result > 0)
+ {
+ return map_success(result);
+ }
+ else if (result == 0)
+ {
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::eof, 0};
+ }
+ else if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::want_read, 0};
+ }
+ else
+ {
+ last_error_ = errno;
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::error, 0};
+ }
+ }
+
+ void clear_error() { last_error_ = 0; }
+
+ int fd_;
+ enum state state_;
+ int last_error_;
+ // Operation mode:
+ // 1 - simulate handshake exchange
+ // 2 - simulate errors and short reads
+ int mode_;
+ stats stats_;
+ bool is_blocking_;
+ };
+
+ enum wsrep::tls_service::status db_stream::client_handshake()
+ {
+ clear_error();
+ enum wsrep::tls_service::status ret;
+ assert(state_ == s_initialized ||
+ state_ == s_client_handshake ||
+ state_ == s_want_write);
+ if (state_ == s_initialized)
+ {
+ (void)::send(fd_, "clie", 4, MSG_NOSIGNAL);
+ ret = wsrep::tls_service::want_read;
+ state_ = s_client_handshake;
+ wsrep::log_info() << this << " client handshake sent";
+ stats_.bytes_written += 4;
+ if (not is_blocking_) return ret;
+ }
+
+ if (state_ == s_client_handshake)
+ {
+ if ((ret = handle_handshake_read("serv")) ==
+ wsrep::tls_service::success)
+ {
+ state_ = s_want_write;
+ ret = wsrep::tls_service::want_write;
+ }
+ if (not is_blocking_) return ret;
+ }
+
+ if (state_ == s_want_write)
+ {
+ state_ = s_idle;
+ ret = wsrep::tls_service::success;
+ if (not is_blocking_) return ret;
+ }
+
+ if (not is_blocking_)
+ {
+ last_error_ = EPROTO;
+ ret = wsrep::tls_service::error;
+ }
+ return ret;
+ }
+
+
+
+ enum wsrep::tls_service::status db_stream::server_handshake()
+ {
+ enum wsrep::tls_service::status ret;
+ assert(state_ == s_initialized ||
+ state_ == s_server_handshake ||
+ state_ == s_want_write);
+
+ if (state_ == s_initialized)
+ {
+ ::send(fd_, "serv", 4, MSG_NOSIGNAL);
+ ret = wsrep::tls_service::want_read;
+ state_ = s_server_handshake;
+ stats_.bytes_written += 4;
+ if (not is_blocking_) return ret;
+ }
+
+ if (state_ == s_server_handshake)
+ {
+ if ((ret = handle_handshake_read("clie")) ==
+ wsrep::tls_service::success)
+ {
+ state_ = s_want_write;
+ ret = wsrep::tls_service::want_write;
+ }
+ if (not is_blocking_) return ret;
+ }
+
+ if (state_ == s_want_write)
+ {
+ state_ = s_idle;
+ ret = wsrep::tls_service::success;
+ if (not is_blocking_) return ret;
+ }
+
+ if (not is_blocking_)
+ {
+ last_error_ = EPROTO;
+ ret = wsrep::tls_service::error;
+ }
+ return ret;
+ }
+
+ enum wsrep::tls_service::status db_stream::handle_handshake_read(
+ const char* expect)
+ {
+ assert(::strlen(expect) >= 4);
+ char buf[4] = { };
+ ssize_t read_result(::read(fd_, buf, sizeof(buf)));
+ if (read_result > 0) stats_.bytes_read += size_t(read_result);
+ enum wsrep::tls_service::status ret;
+ if (read_result == -1 &&
+ (errno == EWOULDBLOCK || errno == EAGAIN))
+ {
+ ret = wsrep::tls_service::want_read;
+ }
+ else if (read_result == 0)
+ {
+ ret = wsrep::tls_service::eof;
+ }
+ else if (read_result != 4 || ::memcmp(buf, expect, 4))
+ {
+ last_error_ = EPROTO;
+ ret = wsrep::tls_service::error;
+ }
+ else
+ {
+ wsrep::log_info() << "Handshake success: " << std::string(buf, 4);
+ ret = wsrep::tls_service::success;
+ }
+ return ret;
+ }
+
+ wsrep::tls_service::op_result db_stream::read(void* buf, size_t max_count)
+ {
+ clear_error();
+ if (state_ == s_want_read)
+ {
+ state_ = s_idle;
+ if (max_count == 0)
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::success, 0};
+ }
+ max_count = determine_read_count(max_count);
+ ssize_t read_result(do_read(buf, max_count));
+ if (read_result > 0)
+ {
+ inc_reads(size_t(read_result));
+ }
+ return map_result(read_result);
+ }
+
+ wsrep::tls_service::op_result db_stream::write(
+ const void* buf, size_t count)
+ {
+ clear_error();
+ if (state_ == s_want_write)
+ {
+ state_ = s_idle;
+ if (count == 0)
+ return wsrep::tls_service::op_result{
+ wsrep::tls_service::success, 0};
+ }
+ count = determine_write_count(count);
+ ssize_t write_result(do_write(buf, count));
+ if (write_result > 0)
+ {
+ inc_writes(size_t(write_result));
+ }
+ return map_result(write_result);
+ }
+}
+
+
+static db_stream::stats global_stats;
+std::mutex global_stats_lock;
+static int global_mode;
+
+static void merge_to_global_stats(const db_stream::stats& stats)
+{
+ std::lock_guard<std::mutex> lock(global_stats_lock);
+ global_stats.bytes_read += stats.bytes_read;
+ global_stats.bytes_written += stats.bytes_written;
+}
+
+wsrep::tls_stream* db::tls::create_tls_stream(int fd) WSREP_NOEXCEPT
+{
+ auto ret(new db_stream(fd, global_mode));
+ wsrep::log_debug() << "New DB stream: " << ret;
+ return ret;
+}
+
+void db::tls::destroy(wsrep::tls_stream* stream) WSREP_NOEXCEPT
+{
+ auto dbs(static_cast<db_stream*>(stream));
+ merge_to_global_stats(dbs->get_stats());
+ wsrep::log_debug() << "Stream destroy: " << dbs->get_stats().bytes_read
+ << " " << dbs->get_stats().bytes_written;
+ wsrep::log_debug() << "Stream destroy" << dbs;
+ delete dbs;
+}
+
+int db::tls::get_error_number(const wsrep::tls_stream* stream)
+ const WSREP_NOEXCEPT
+{
+ return static_cast<const db_stream*>(stream)->get_error_number();
+}
+
+const void* db::tls::get_error_category(const wsrep::tls_stream* stream)
+ const WSREP_NOEXCEPT
+{
+ return static_cast<const db_stream*>(stream)->get_error_category();
+}
+
+const char* db::tls::get_error_message(const wsrep::tls_stream*,
+ int value, const void* category)
+ const WSREP_NOEXCEPT
+{
+ return db_stream::get_error_message(value, category);
+}
+
+enum wsrep::tls_service::status
+db::tls::client_handshake(wsrep::tls_stream* stream) WSREP_NOEXCEPT
+{
+ return static_cast<db_stream*>(stream)->client_handshake();
+}
+
+enum wsrep::tls_service::status
+db::tls::server_handshake(wsrep::tls_stream* stream) WSREP_NOEXCEPT
+{
+ return static_cast<db_stream*>(stream)->server_handshake();
+}
+
+wsrep::tls_service::op_result db::tls::read(
+ wsrep::tls_stream* stream,
+ void* buf, size_t max_count) WSREP_NOEXCEPT
+{
+ return static_cast<db_stream*>(stream)->read(buf, max_count);
+}
+
+wsrep::tls_service::op_result db::tls::write(
+ wsrep::tls_stream* stream,
+ const void* buf, size_t count) WSREP_NOEXCEPT
+{
+ return static_cast<db_stream*>(stream)->write(buf, count);
+}
+
+wsrep::tls_service::status
+db::tls::shutdown(wsrep::tls_stream*) WSREP_NOEXCEPT
+{
+ // @todo error simulation
+ return wsrep::tls_service::success;
+}
+
+
+void db::tls::init(int mode)
+{
+ global_mode = mode;
+}
+
+std::string db::tls::stats()
+{
+ std::ostringstream oss;
+ oss << "Transport stats:\n"
+ << " bytes_read: " << global_stats.bytes_read << "\n"
+ << " bytes_written: " << global_stats.bytes_written << "\n";
+ return oss.str();
+}
diff --git a/wsrep-lib/dbsim/db_tls.hpp b/wsrep-lib/dbsim/db_tls.hpp
new file mode 100644
index 00000000..97c4387e
--- /dev/null
+++ b/wsrep-lib/dbsim/db_tls.hpp
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2020 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef WSREP_DB_TLS_HPP
+#define WSREP_DB_TLS_HPP
+
+#include "wsrep/tls_service.hpp"
+
+#include <string>
+
+namespace db
+{
+ class tls : public wsrep::tls_service
+ {
+ public:
+ virtual wsrep::tls_stream* create_tls_stream(int)
+ WSREP_NOEXCEPT override;
+ virtual void destroy(wsrep::tls_stream*) WSREP_NOEXCEPT override;
+ virtual int get_error_number(const wsrep::tls_stream*) const
+ WSREP_NOEXCEPT override;
+ virtual const void* get_error_category(const wsrep::tls_stream*) const
+ WSREP_NOEXCEPT override;
+ virtual const char* get_error_message(const wsrep::tls_stream*,
+ int, const void*) const
+ WSREP_NOEXCEPT override;
+ virtual enum wsrep::tls_service::status
+ client_handshake(wsrep::tls_stream*)
+ WSREP_NOEXCEPT override;
+ virtual enum wsrep::tls_service::status
+ server_handshake(wsrep::tls_stream*)
+ WSREP_NOEXCEPT override;
+ virtual wsrep::tls_service::op_result
+ read(wsrep::tls_stream*, void* buf, size_t max_count)
+ WSREP_NOEXCEPT override;
+ virtual wsrep::tls_service::op_result
+ write(wsrep::tls_stream*, const void* buf, size_t count)
+ WSREP_NOEXCEPT override;
+ virtual wsrep::tls_service::status
+ shutdown(wsrep::tls_stream*) WSREP_NOEXCEPT override;
+
+ static void init(int mode);
+ static std::string stats();
+ };
+};
+
+#endif // WSREP_DB_TLS_HPP
diff --git a/wsrep-lib/dbsim/dbsim.cpp b/wsrep-lib/dbsim/dbsim.cpp
new file mode 100644
index 00000000..070f83b4
--- /dev/null
+++ b/wsrep-lib/dbsim/dbsim.cpp
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2018 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "db_params.hpp"
+#include "db_simulator.hpp"
+
+int main(int argc, char** argv)
+{
+ try
+ {
+ db::simulator(db::parse_args(argc, argv)).run();
+ }
+ catch (const std::exception& e)
+ {
+ std::cerr << e.what() << std::endl;
+ return 1;
+ }
+ return 0;
+}