diff options
Diffstat (limited to '')
26 files changed, 3476 insertions, 0 deletions
diff --git a/wsrep-lib/dbsim/CMakeLists.txt b/wsrep-lib/dbsim/CMakeLists.txt new file mode 100644 index 00000000..aee964cd --- /dev/null +++ b/wsrep-lib/dbsim/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Copyright (C) 2018 Codership Oy <info@codership.com> +# + +add_executable(dbsim + db_client.cpp + db_client_service.cpp + db_high_priority_service.cpp + db_params.cpp + db_server.cpp + db_server_service.cpp + db_server_state.cpp + db_simulator.cpp + db_storage_engine.cpp + db_threads.cpp + db_tls.cpp + dbsim.cpp +) + +target_link_libraries(dbsim wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY}) +set_property(TARGET dbsim PROPERTY CXX_STANDARD 14) diff --git a/wsrep-lib/dbsim/db_client.cpp b/wsrep-lib/dbsim/db_client.cpp new file mode 100644 index 00000000..b5d56d37 --- /dev/null +++ b/wsrep-lib/dbsim/db_client.cpp @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_client.hpp" +#include "db_server.hpp" + +#include "wsrep/logger.hpp" + +db::client::client(db::server& server, + wsrep::client_id client_id, + enum wsrep::client_state::mode mode, + const db::params& params) + : mutex_() + , cond_() + , params_(params) + , server_(server) + , server_state_(server.server_state()) + , client_state_(mutex_, cond_, server_state_, client_service_, client_id, mode) + , client_service_(*this) + , se_trx_(server.storage_engine()) + , data_() + , random_device_() + , random_engine_(random_device_()) + , stats_() +{ + data_.resize(params.max_data_size); +} + +void db::client::start() +{ + client_state_.open(client_state_.id()); + for (size_t i(0); i < params_.n_transactions; ++i) + { + run_one_transaction(); + report_progress(i + 1); + } + client_state_.close(); + client_state_.cleanup(); +} + +bool db::client::bf_abort(wsrep::seqno seqno) +{ + return client_state_.bf_abort(seqno); +} + +//////////////////////////////////////////////////////////////////////////////// +// Private // +//////////////////////////////////////////////////////////////////////////////// + +template <class F> +int db::client::client_command(F f) +{ + int err(client_state_.before_command()); + // wsrep::log_debug() << "before_command: " << err; + // If err != 0, transaction was BF aborted while client idle + if (err == 0) + { + err = client_state_.before_statement(); + if (err == 0) + { + err = f(); + } + client_state_.after_statement(); + } + client_state_.after_command_before_result(); + if (client_state_.current_error()) + { + // wsrep::log_info() << "Current error"; + assert(client_state_.transaction().state() == + wsrep::transaction::s_aborted); + err = 1; + } + client_state_.after_command_after_result(); + // wsrep::log_info() << "client_command(): " << err; + return err; +} + +void db::client::run_one_transaction() +{ + if (params_.sync_wait) + { + if (client_state_.sync_wait(5)) + { + throw wsrep::runtime_error("Sync wait failed"); + } + } + client_state_.reset_error(); + int err = client_command( + [&]() + { + // wsrep::log_debug() << "Start transaction"; + err = client_state_.start_transaction( + wsrep::transaction_id(server_.next_transaction_id())); + assert(err == 0); + se_trx_.start(this); + return err; + }); + + const wsrep::transaction& transaction( + client_state_.transaction()); + + err = err || client_command( + [&]() + { + // wsrep::log_debug() << "Generate write set"; + assert(transaction.active()); + assert(err == 0); + std::uniform_int_distribution<size_t> uniform_dist(0, params_.n_rows); + const size_t randkey(uniform_dist(random_engine_)); + ::memcpy(data_.data(), &randkey, + std::min(sizeof(randkey), data_.size())); + wsrep::key key(wsrep::key::exclusive); + key.append_key_part("dbms", 4); + unsigned long long client_key(client_state_.id().get()); + key.append_key_part(&client_key, sizeof(client_key)); + key.append_key_part(&randkey, sizeof(randkey)); + err = client_state_.append_key(key); + size_t bytes_to_append(data_.size()); + if (params_.random_data_size) + { + bytes_to_append = std::uniform_int_distribution<size_t>( + 1, data_.size())(random_engine_); + } + err = err || client_state_.append_data( + wsrep::const_buffer(data_.data(), bytes_to_append)); + return err; + }); + + err = err || client_command( + [&]() + { + // wsrep::log_debug() << "Commit"; + assert(err == 0); + if (do_2pc()) + { + err = err || client_state_.before_prepare(); + err = err || client_state_.after_prepare(); + } + err = err || client_state_.before_commit(); + if (err == 0) se_trx_.commit(transaction.ws_meta().gtid()); + err = err || client_state_.ordered_commit(); + err = err || client_state_.after_commit(); + if (err) + { + client_state_.before_rollback(); + se_trx_.rollback(); + client_state_.after_rollback(); + } + return err; + }); + + assert(err || + transaction.state() == wsrep::transaction::s_aborted || + transaction.state() == wsrep::transaction::s_committed); + assert(se_trx_.active() == false); + assert(transaction.active() == false); + + switch (transaction.state()) + { + case wsrep::transaction::s_committed: + ++stats_.commits; + break; + case wsrep::transaction::s_aborted: + ++stats_.rollbacks; + break; + default: + assert(0); + } +} + +void db::client::report_progress(size_t i) const +{ + if ((i % 1000) == 0) + { + wsrep::log_info() << "client: " << client_state_.id().get() + << " transactions: " << i + << " " << 100*double(i)/double(params_.n_transactions) << "%"; + } +} diff --git a/wsrep-lib/dbsim/db_client.hpp b/wsrep-lib/dbsim/db_client.hpp new file mode 100644 index 00000000..5536a449 --- /dev/null +++ b/wsrep-lib/dbsim/db_client.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file db_client.hpp */ + +#ifndef WSREP_DB_CLIENT_HPP +#define WSREP_DB_CLIENT_HPP + +#include "db_server_state.hpp" +#include "db_storage_engine.hpp" +#include "db_client_state.hpp" +#include "db_client_service.hpp" +#include "db_high_priority_service.hpp" + +#include <random> + +namespace db +{ + class server; + class client + { + public: + struct stats + { + long long commits; + long long rollbacks; + long long replays; + stats() + : commits(0) + , rollbacks(0) + , replays(0) + { } + }; + client(db::server&, + wsrep::client_id, + enum wsrep::client_state::mode, + const db::params&); + bool bf_abort(wsrep::seqno); + const struct stats stats() const { return stats_; } + void store_globals() + { + client_state_.store_globals(); + } + void reset_globals() + { } + void start(); + wsrep::client_state& client_state() { return client_state_; } + wsrep::client_service& client_service() { return client_service_; } + bool do_2pc() const { return false; } + private: + friend class db::server_state; + friend class db::client_service; + friend class db::high_priority_service; + template <class F> int client_command(F f); + void run_one_transaction(); + void reset_error(); + void report_progress(size_t) const; + wsrep::default_mutex mutex_; + wsrep::default_condition_variable cond_; + const db::params& params_; + db::server& server_; + db::server_state& server_state_; + db::client_state client_state_; + db::client_service client_service_; + db::storage_engine::transaction se_trx_; + wsrep::mutable_buffer data_; + std::random_device random_device_; + std::default_random_engine random_engine_; + struct stats stats_; + }; +} + +#endif // WSREP_DB_CLIENT_HPP diff --git a/wsrep-lib/dbsim/db_client_service.cpp b/wsrep-lib/dbsim/db_client_service.cpp new file mode 100644 index 00000000..bb40017c --- /dev/null +++ b/wsrep-lib/dbsim/db_client_service.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_client_service.hpp" +#include "db_high_priority_service.hpp" +#include "db_client.hpp" + +#include <cassert> + +db::client_service::client_service(db::client& client) + : wsrep::client_service() + , client_(client) + , client_state_(client_.client_state()) +{ } + +int db::client_service::bf_rollback() +{ + int ret(client_state_.before_rollback()); + assert(ret == 0); + client_.se_trx_.rollback(); + ret = client_state_.after_rollback(); + assert(ret == 0); + return ret; +} + +enum wsrep::provider::status +db::client_service::replay() +{ + wsrep::high_priority_context high_priority_context(client_state_); + db::replayer_service replayer_service( + client_.server_, client_); + auto ret(client_state_.provider().replay( + client_state_.transaction().ws_handle(), + &replayer_service)); + if (ret == wsrep::provider::success) + { + ++client_.stats_.replays; + } + return ret; +} diff --git a/wsrep-lib/dbsim/db_client_service.hpp b/wsrep-lib/dbsim/db_client_service.hpp new file mode 100644 index 00000000..be6f9ad8 --- /dev/null +++ b/wsrep-lib/dbsim/db_client_service.hpp @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_CLIENT_SERVICE_HPP +#define WSREP_DB_CLIENT_SERVICE_HPP + +#include "wsrep/client_service.hpp" +#include "wsrep/transaction.hpp" + +namespace db +{ + class client; + class client_state; + + class client_service : public wsrep::client_service + { + public: + client_service(db::client& client); + + bool interrupted(wsrep::unique_lock<wsrep::mutex>&) + const override + { return false; } + void reset_globals() override { } + void store_globals() override { } + int prepare_data_for_replication() override + { + return 0; + } + void cleanup_transaction() override { } + size_t bytes_generated() const override + { + return 0; + } + bool statement_allowed_for_streaming() const override + { + return true; + } + int prepare_fragment_for_replication(wsrep::mutable_buffer&, + size_t& position) override + { + position = 0; + return 0; + } + int remove_fragments() override { return 0; } + int bf_rollback() override; + void will_replay() override { } + void signal_replayed() override { } + void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override { } + enum wsrep::provider::status replay() + override; + + enum wsrep::provider::status replay_unordered() override + { + return wsrep::provider::success; + } + + void emergency_shutdown() override { ::abort(); } + + enum wsrep::provider::status commit_by_xid() override + { + return wsrep::provider::success; + } + + bool is_explicit_xa() override + { + return false; + } + + bool is_xa_rollback() override + { + return false; + } + + void debug_sync(const char*) override { } + void debug_crash(const char*) override { } + private: + db::client& client_; + wsrep::client_state& client_state_; + }; +} + +#endif // WSREP_DB_CLIENT_SERVICE_HPP diff --git a/wsrep-lib/dbsim/db_client_state.hpp b/wsrep-lib/dbsim/db_client_state.hpp new file mode 100644 index 00000000..a5cf71b2 --- /dev/null +++ b/wsrep-lib/dbsim/db_client_state.hpp @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_CLIENT_CONTEXT_HPP +#define WSREP_DB_CLIENT_CONTEXT_HPP + +#include "wsrep/client_state.hpp" +#include "db_server_state.hpp" + +namespace db +{ + class client; + class client_state : public wsrep::client_state + { + public: + client_state(wsrep::mutex& mutex, + wsrep::condition_variable& cond, + db::server_state& server_state, + wsrep::client_service& client_service, + const wsrep::client_id& client_id, + enum wsrep::client_state::mode mode) + : wsrep::client_state(mutex, + cond, + server_state, + client_service, + client_id, + mode) + { } + + private: + client_state(const client_state&); + client_state& operator=(const client_state&); + }; +} + +#endif // WSREP_DB_CLIENT_CONTEXT diff --git a/wsrep-lib/dbsim/db_high_priority_service.cpp b/wsrep-lib/dbsim/db_high_priority_service.cpp new file mode 100644 index 00000000..669fe502 --- /dev/null +++ b/wsrep-lib/dbsim/db_high_priority_service.cpp @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2018-2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_high_priority_service.hpp" +#include "db_server.hpp" +#include "db_client.hpp" + +db::high_priority_service::high_priority_service( + db::server& server, db::client& client) + : wsrep::high_priority_service(server.server_state()) + , server_(server) + , client_(client) +{ } + +int db::high_priority_service::start_transaction( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + return client_.client_state().start_transaction(ws_handle, ws_meta); +} + +int db::high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta) +{ + return client_.client_state().next_fragment(ws_meta); +} + +const wsrep::transaction& db::high_priority_service::transaction() const +{ + return client_.client_state().transaction(); +} + +int db::high_priority_service::adopt_transaction(const wsrep::transaction&) +{ + throw wsrep::not_implemented_error(); +} + +int db::high_priority_service::apply_write_set( + const wsrep::ws_meta&, + const wsrep::const_buffer&, + wsrep::mutable_buffer&) +{ + client_.se_trx_.start(&client_); + client_.se_trx_.apply(client_.client_state().transaction()); + return 0; +} + +int db::high_priority_service::apply_toi( + const wsrep::ws_meta&, + const wsrep::const_buffer&, + wsrep::mutable_buffer&) +{ + throw wsrep::not_implemented_error(); +} + +int db::high_priority_service::apply_nbo_begin( + const wsrep::ws_meta&, + const wsrep::const_buffer&, + wsrep::mutable_buffer&) +{ + throw wsrep::not_implemented_error(); +} + +int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); + int ret(client_.client_state_.before_commit()); + if (ret == 0) client_.se_trx_.commit(ws_meta.gtid()); + ret = ret || client_.client_state_.ordered_commit(); + ret = ret || client_.client_state_.after_commit(); + return ret; +} + +int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false); + int ret(client_.client_state_.before_rollback()); + assert(ret == 0); + client_.se_trx_.rollback(); + ret = client_.client_state_.after_rollback(); + assert(ret == 0); + return ret; +} + +void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err) +{ + client_.client_state_.adopt_apply_error(err); +} + +void db::high_priority_service::after_apply() +{ + client_.client_state_.after_applying(); +} + +int db::high_priority_service::log_dummy_write_set( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + wsrep::mutable_buffer& err) +{ + int ret(client_.client_state_.start_transaction(ws_handle, ws_meta)); + assert(ret == 0); + if (ws_meta.ordered()) + { + client_.client_state_.adopt_apply_error(err); + client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); + ret = client_.client_state_.before_commit(); + assert(ret == 0); + ret = client_.client_state_.ordered_commit(); + assert(ret == 0); + ret = client_.client_state_.after_commit(); + assert(ret == 0); + } + client_.client_state_.after_applying(); + return ret; +} + +bool db::high_priority_service::is_replaying() const +{ + return false; +} diff --git a/wsrep-lib/dbsim/db_high_priority_service.hpp b/wsrep-lib/dbsim/db_high_priority_service.hpp new file mode 100644 index 00000000..d4a80f1b --- /dev/null +++ b/wsrep-lib/dbsim/db_high_priority_service.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2018-2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_HIGH_PRIORITY_SERVICE_HPP +#define WSREP_DB_HIGH_PRIORITY_SERVICE_HPP + +#include "wsrep/high_priority_service.hpp" + +namespace db +{ + class server; + class client; + class high_priority_service : public wsrep::high_priority_service + { + public: + high_priority_service(db::server& server, db::client& client); + int start_transaction(const wsrep::ws_handle&, + const wsrep::ws_meta&) override; + int next_fragment(const wsrep::ws_meta&) override; + const wsrep::transaction& transaction() const override; + int adopt_transaction(const wsrep::transaction&) override; + int apply_write_set(const wsrep::ws_meta&, + const wsrep::const_buffer&, + wsrep::mutable_buffer&) override; + int append_fragment_and_commit( + const wsrep::ws_handle&, + const wsrep::ws_meta&, + const wsrep::const_buffer&, + const wsrep::xid&) override + { return 0; } + int remove_fragments(const wsrep::ws_meta&) override + { return 0; } + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; + int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, + wsrep::mutable_buffer&) override; + int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&, + wsrep::mutable_buffer&) + override; + void adopt_apply_error(wsrep::mutable_buffer&) override; + virtual void after_apply() override; + void store_globals() override { } + void reset_globals() override { } + void switch_execution_context(wsrep::high_priority_service&) override + { } + int log_dummy_write_set(const wsrep::ws_handle&, + const wsrep::ws_meta&, + wsrep::mutable_buffer&) override; + virtual bool is_replaying() const override; + void debug_crash(const char*) override { } + private: + high_priority_service(const high_priority_service&); + high_priority_service& operator=(const high_priority_service&); + db::server& server_; + db::client& client_; + }; + + class replayer_service : public db::high_priority_service + { + public: + replayer_service(db::server& server, db::client& client) + : db::high_priority_service(server, client) + { } + // After apply is empty for replayer to keep the transaction + // context available for the client session after replaying + // is over. + void after_apply() override {} + bool is_replaying() const override { return true; } + }; + +} + +#endif // WSREP_DB_HIGH_PRIORITY_SERVICE_HPP diff --git a/wsrep-lib/dbsim/db_params.cpp b/wsrep-lib/dbsim/db_params.cpp new file mode 100644 index 00000000..40433f6c --- /dev/null +++ b/wsrep-lib/dbsim/db_params.cpp @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_params.hpp" + +#include <boost/program_options.hpp> +#include <iostream> +#include <stdexcept> + +namespace +{ + void validate_params(const db::params& params) + { + std::ostringstream os; + if (params.n_servers != params.topology.size()) + { + if (params.topology.size() > 0) + { + os << "Error: --topology=" << params.topology << " does not " + << "match the number of server --servers=" + << params.n_servers << "\n"; + } + } + if (os.str().size()) + { + throw std::invalid_argument(os.str()); + } + } +} + +db::params db::parse_args(int argc, char** argv) +{ + namespace po = boost::program_options; + db::params params; + po::options_description desc("Allowed options"); + desc.add_options() + ("help", "produce help message") + ("wsrep-provider", + po::value<std::string>(¶ms.wsrep_provider)->required(), + "wsrep provider to load") + ("wsrep-provider-options", + po::value<std::string>(¶ms.wsrep_provider_options), + "wsrep provider options") + ("status-file", + po::value<std::string>(¶ms.status_file), + "status output file") + ("servers", po::value<size_t>(¶ms.n_servers)->required(), + "number of servers to start") + ("topology", po::value<std::string>(¶ms.topology), + "replication topology (e.g. mm for multi master, ms for master/slave") + ("clients", po::value<size_t>(¶ms.n_clients)->required(), + "number of clients to start per master") + ("transactions", po::value<size_t>(¶ms.n_transactions), + "number of transactions run by a client") + ("rows", po::value<size_t>(¶ms.n_rows), + "number of rows per table") + ("max-data-size", po::value<size_t>(¶ms.max_data_size), + "maximum size of data payload (default 8)") + ("random-data-size", po::value<bool>(¶ms.random_data_size), + "randomized payload data size (default 0)") + ("alg-freq", po::value<size_t>(¶ms.alg_freq), + "ALG frequency") + ("sync-wait", po::value<bool>(¶ms.sync_wait), + "Turn on sync wait for each transaction") + ("debug-log-level", po::value<int>(¶ms.debug_log_level), + "debug logging level: 0 - none, 1 - verbose") + ("fast-exit", po::value<int>(¶ms.fast_exit), + "exit from simulation without graceful shutdown") + ("ti", + po::value<int>(¶ms.thread_instrumentation), + "use instrumentation for threads/mutexes/condition variables" + "(0 default disabled, 1 total counts, 2 per object)") + ("ti-cond-checks", + po::value<bool>(¶ms.cond_checks), + "Enable checks for correct condition variable use. " + " Effective only if thread-instrumentation is enabled") + ("tls-service", + po::value<int>(¶ms.tls_service), + "Configure TLS service stubs.\n0 default disabled\n1 enabled\n" + "2 enabled with short read/write and renegotiation simulation\n" + "3 enabled with error simulation.") + ; + try + { + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + if (vm.count("help")) + { + std::cerr << desc << "\n"; + exit(0); + } + po::notify(vm); + validate_params(params); + } + catch (const po::error& e) + { + std::cerr << "Error parsing arguments: " << e.what() << "\n"; + std::cerr << desc << "\n"; + exit(1); + } + catch (...) + { + std::cerr << "Error parsing arguments\n"; + std::cerr << desc << "\n"; + exit(1); + } + return params; +} diff --git a/wsrep-lib/dbsim/db_params.hpp b/wsrep-lib/dbsim/db_params.hpp new file mode 100644 index 00000000..e5df8062 --- /dev/null +++ b/wsrep-lib/dbsim/db_params.hpp @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_PARAMS_HPP +#define WSREP_DB_PARAMS_HPP + +#include <cstddef> +#include <string> + +namespace db +{ + struct params + { + size_t n_servers; + size_t n_clients; + size_t n_transactions; + size_t n_rows; + size_t max_data_size; // Maximum size of write set data payload. + bool random_data_size; // If true, randomize data payload size. + size_t alg_freq; + bool sync_wait; + std::string topology; + std::string wsrep_provider; + std::string wsrep_provider_options; + std::string status_file; + int debug_log_level; + int fast_exit; + int thread_instrumentation; + bool cond_checks; + int tls_service; + params() + : n_servers(0) + , n_clients(0) + , n_transactions(0) + , n_rows(1000) + , max_data_size(8) + , random_data_size(false) + , alg_freq(0) + , sync_wait(false) + , topology() + , wsrep_provider() + , wsrep_provider_options() + , status_file("status.json") + , debug_log_level(0) + , fast_exit(0) + , thread_instrumentation() + , cond_checks() + , tls_service() + { } + }; + + params parse_args(int argc, char** argv); +} + +#endif // WSREP_DB_PARAMS_HPP diff --git a/wsrep-lib/dbsim/db_server.cpp b/wsrep-lib/dbsim/db_server.cpp new file mode 100644 index 00000000..a54610d1 --- /dev/null +++ b/wsrep-lib/dbsim/db_server.cpp @@ -0,0 +1,167 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_server.hpp" +#include "db_server_service.hpp" +#include "db_high_priority_service.hpp" +#include "db_client.hpp" +#include "db_simulator.hpp" + +#include "wsrep/logger.hpp" + +#include <ostream> +#include <cstdio> + +static wsrep::default_mutex logger_mtx; + +static void +logger_fn(wsrep::log::level l, const char* pfx, const char* msg) +{ + wsrep::unique_lock<wsrep::mutex> lock(logger_mtx); + + struct timespec time; + clock_gettime(CLOCK_REALTIME, &time); + + time_t const tt(time.tv_sec); + struct tm date; + localtime_r(&tt, &date); + + char date_str[85] = { '\0', }; + snprintf(date_str, sizeof(date_str) - 1, + "%04d-%02d-%02d %02d:%02d:%02d.%03d", + date.tm_year + 1900, date.tm_mon + 1, date.tm_mday, + date.tm_hour, date.tm_min, date.tm_sec, (int)time.tv_nsec/1000000); + + std::cerr << date_str << ' ' << pfx << wsrep::log::to_c_string(l) << ' ' + << msg << std::endl; +} + +db::server::server(simulator& simulator, + const std::string& name, + const std::string& address) + : simulator_(simulator) + , storage_engine_(simulator_.params()) + , mutex_() + , cond_() + , server_service_(*this) + , reporter_(mutex_, name + ".json", 4) + , server_state_(server_service_, + name, address, "dbsim_" + name + "_data") + , last_client_id_(0) + , last_transaction_id_(0) + , appliers_() + , clients_() + , client_threads_() +{ + wsrep::log::logger_fn(logger_fn); +} + +void db::server::applier_thread() +{ + wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1); + db::client applier(*this, client_id, + wsrep::client_state::m_high_priority, + simulator_.params()); + wsrep::client_state* cc(static_cast<wsrep::client_state*>( + &applier.client_state())); + db::high_priority_service hps(*this, applier); + cc->open(cc->id()); + cc->before_command(); + enum wsrep::provider::status ret( + server_state_.provider().run_applier(&hps)); + wsrep::log_info() << "Applier thread exited with error code " << ret; + cc->after_command_before_result(); + cc->after_command_after_result(); + cc->close(); + cc->cleanup(); +} + +void db::server::start_applier() +{ + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + appliers_.push_back(boost::thread(&server::applier_thread, this)); +} + +void db::server::stop_applier() +{ + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + appliers_.front().join(); + appliers_.erase(appliers_.begin()); +} + + +void db::server::start_clients() +{ + size_t n_clients(simulator_.params().n_clients); + for (size_t i(0); i < n_clients; ++i) + { + start_client(i + 1); + } +} + +void db::server::stop_clients() +{ + for (auto& i : client_threads_) + { + i.join(); + } + for (const auto& i : clients_) + { + const struct db::client::stats& stats(i->stats()); + simulator_.stats_.commits += stats.commits; + simulator_.stats_.rollbacks += stats.rollbacks; + simulator_.stats_.replays += stats.replays; + } +} + +void db::server::client_thread(const std::shared_ptr<db::client>& client) +{ + client->start(); +} + +void db::server::start_client(size_t id) +{ + auto client(std::make_shared<db::client>( + *this, wsrep::client_id(id), + wsrep::client_state::m_local, + simulator_.params())); + clients_.push_back(client); + client_threads_.push_back( + boost::thread(&db::server::client_thread, this, client)); +} + +void db::server::donate_sst(const std::string& req, + const wsrep::gtid& gtid, + bool bypass) +{ + simulator_.sst(*this, req, gtid, bypass); +} + + +wsrep::high_priority_service* db::server::streaming_applier_service() +{ + throw wsrep::not_implemented_error(); +} + +void db::server::log_state_change(enum wsrep::server_state::state from, + enum wsrep::server_state::state to) +{ + wsrep::log_info() << "State changed " << from << " -> " << to; + reporter_.report_state(to); +} diff --git a/wsrep-lib/dbsim/db_server.hpp b/wsrep-lib/dbsim/db_server.hpp new file mode 100644 index 00000000..98b9a837 --- /dev/null +++ b/wsrep-lib/dbsim/db_server.hpp @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_SERVER_HPP +#define WSREP_DB_SERVER_HPP + +#include "wsrep/gtid.hpp" +#include "wsrep/client_state.hpp" +#include "wsrep/reporter.hpp" + +#include "db_storage_engine.hpp" +#include "db_server_state.hpp" +#include "db_server_service.hpp" + +#include <boost/thread.hpp> + +#include <string> +#include <memory> + +namespace db +{ + class simulator; + class client; + class server + { + public: + server(simulator& simulator, + const std::string& name, + const std::string& address); + void applier_thread(); + void start_applier(); + void stop_applier(); + void start_clients(); + void stop_clients(); + void client_thread(const std::shared_ptr<db::client>& client); + db::storage_engine& storage_engine() { return storage_engine_; } + db::server_state& server_state() { return server_state_; } + wsrep::transaction_id next_transaction_id() + { + return wsrep::transaction_id(last_transaction_id_.fetch_add(1) + 1); + } + void donate_sst(const std::string&, const wsrep::gtid&, bool); + wsrep::client_state* local_client_state(); + void release_client_state(wsrep::client_state*); + wsrep::high_priority_service* streaming_applier_service(); + void log_state_change(enum wsrep::server_state::state, + enum wsrep::server_state::state); + private: + void start_client(size_t id); + + db::simulator& simulator_; + db::storage_engine storage_engine_; + wsrep::default_mutex mutex_; + wsrep::default_condition_variable cond_; + db::server_service server_service_; + wsrep::reporter reporter_; + db::server_state server_state_; + std::atomic<size_t> last_client_id_; + std::atomic<size_t> last_transaction_id_; + std::vector<boost::thread> appliers_; + std::vector<std::shared_ptr<db::client>> clients_; + std::vector<boost::thread> client_threads_; + }; +} + +#endif // WSREP_DB_SERVER_HPP diff --git a/wsrep-lib/dbsim/db_server_service.cpp b/wsrep-lib/dbsim/db_server_service.cpp new file mode 100644 index 00000000..d9b1cf90 --- /dev/null +++ b/wsrep-lib/dbsim/db_server_service.cpp @@ -0,0 +1,169 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_server_service.hpp" +#include "db_server.hpp" +#include "db_storage_service.hpp" + +#include "wsrep/logger.hpp" +#include "wsrep/high_priority_service.hpp" + +db::server_service::server_service(db::server& server) + : server_(server) +{ } + +wsrep::storage_service* db::server_service::storage_service( + wsrep::client_service&) +{ + return new db::storage_service(); +} + +wsrep::storage_service* db::server_service::storage_service( + wsrep::high_priority_service&) +{ + return new db::storage_service(); +} + +void db::server_service::release_storage_service( + wsrep::storage_service* storage_service) +{ + delete storage_service; +} + +wsrep::high_priority_service* db::server_service::streaming_applier_service( + wsrep::client_service&) +{ + return server_.streaming_applier_service(); +} + +wsrep::high_priority_service* db::server_service::streaming_applier_service( + wsrep::high_priority_service&) +{ + return server_.streaming_applier_service(); +} + +void db::server_service::release_high_priority_service( + wsrep::high_priority_service *high_priority_service) +{ + delete high_priority_service; +} + +bool db::server_service::sst_before_init() const +{ + return true; +} + +std::string db::server_service::sst_request() +{ + std::ostringstream os; + os << server_.server_state().name(); + wsrep::log_info() << "SST request: " + << server_.server_state().name(); + + return os.str(); +} + +int db::server_service::start_sst( + const std::string& request, const wsrep::gtid& gtid, bool bypass) +{ + server_.donate_sst(request, gtid, bypass); + return 0; +} + +void db::server_service::background_rollback(wsrep::unique_lock<wsrep::mutex>&, + wsrep::client_state&) +{ +} + +void db::server_service::bootstrap() +{ +} + +void db::server_service::log_message(enum wsrep::log::level level, + const char* message) +{ + wsrep::log(level, server_.server_state().name().c_str()) << message; +} +void db::server_service::log_dummy_write_set( + wsrep::client_state&, const wsrep::ws_meta& meta) +{ + wsrep::log_info() << "Dummy write set: " << meta.seqno(); +} + +void db::server_service::log_view(wsrep::high_priority_service*, + const wsrep::view& v) +{ + wsrep::log_info() << "View:\n" << v; + server_.storage_engine().store_view(v); +} + +void db::server_service::recover_streaming_appliers( + wsrep::client_service&) +{ +} + +void db::server_service::recover_streaming_appliers( + wsrep::high_priority_service&) +{ +} + +wsrep::view db::server_service::get_view(wsrep::client_service&, + const wsrep::id& own_id) +{ + wsrep::view stored_view(server_.storage_engine().get_view()); + int const my_idx(stored_view.member_index(own_id)); + wsrep::view my_view( + stored_view.state_id(), + stored_view.view_seqno(), + stored_view.status(), + stored_view.capabilities(), + my_idx, + stored_view.protocol_version(), + stored_view.members() + ); + return my_view; +} + +wsrep::gtid db::server_service::get_position(wsrep::client_service&) +{ + return server_.storage_engine().get_position(); +} + +void db::server_service::set_position(wsrep::client_service&, + const wsrep::gtid& gtid) +{ + return server_.storage_engine().store_position(gtid); +} + +void db::server_service::log_state_change( + enum wsrep::server_state::state prev_state, + enum wsrep::server_state::state current_state) +{ + server_.log_state_change(prev_state, current_state); +} + +int db::server_service::wait_committing_transactions(int) +{ + throw wsrep::not_implemented_error(); +} + +void db::server_service::debug_sync(const char*) +{ + +} diff --git a/wsrep-lib/dbsim/db_server_service.hpp b/wsrep-lib/dbsim/db_server_service.hpp new file mode 100644 index 00000000..d62ff11f --- /dev/null +++ b/wsrep-lib/dbsim/db_server_service.hpp @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_SERVER_SERVICE_HPP +#define WSREP_DB_SERVER_SERVICE_HPP + +#include "wsrep/server_service.hpp" +#include <string> + +namespace db +{ + class server; + class server_service : public wsrep::server_service + { + public: + server_service(db::server& server); + wsrep::storage_service* storage_service(wsrep::client_service&) override; + wsrep::storage_service* storage_service(wsrep::high_priority_service&) override; + + void release_storage_service(wsrep::storage_service*) override; + wsrep::high_priority_service* streaming_applier_service(wsrep::client_service&) override; + wsrep::high_priority_service* streaming_applier_service(wsrep::high_priority_service&) override; + void release_high_priority_service(wsrep::high_priority_service*) override; + + bool sst_before_init() const override; + int start_sst(const std::string&, const wsrep::gtid&, bool) override; + std::string sst_request() override; + void background_rollback(wsrep::unique_lock<wsrep::mutex>&, + wsrep::client_state&) override; + void bootstrap() override; + void log_message(enum wsrep::log::level, const char* message) override; + void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&) + override; + void log_view(wsrep::high_priority_service*, + const wsrep::view&) override; + void recover_streaming_appliers(wsrep::client_service&) override; + void recover_streaming_appliers(wsrep::high_priority_service&) override; + wsrep::view get_view(wsrep::client_service&, const wsrep::id&) + override; + wsrep::gtid get_position(wsrep::client_service&) override; + void set_position(wsrep::client_service&, const wsrep::gtid&) override; + void log_state_change(enum wsrep::server_state::state, + enum wsrep::server_state::state) override; + int wait_committing_transactions(int) override; + void debug_sync(const char*) override; + private: + db::server& server_; + }; +} + +#endif // WSREP_DB_SERVER_SERVICE_HPP diff --git a/wsrep-lib/dbsim/db_server_state.cpp b/wsrep-lib/dbsim/db_server_state.cpp new file mode 100644 index 00000000..2bc17d29 --- /dev/null +++ b/wsrep-lib/dbsim/db_server_state.cpp @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_server_state.hpp" +#include "db_server.hpp" + +#include "wsrep/logger.hpp" + diff --git a/wsrep-lib/dbsim/db_server_state.hpp b/wsrep-lib/dbsim/db_server_state.hpp new file mode 100644 index 00000000..49d4499e --- /dev/null +++ b/wsrep-lib/dbsim/db_server_state.hpp @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_SERVER_CONTEXT_HPP +#define WSREP_DB_SERVER_CONTEXT_HPP + +#include "wsrep/server_state.hpp" +#include "wsrep/server_service.hpp" +#include "wsrep/client_state.hpp" + +#include <atomic> + +namespace db +{ + class server; + class server_state : public wsrep::server_state + { + public: + server_state(wsrep::server_service& server_service, + const std::string& name, + const std::string& address, + const std::string& working_dir) + : wsrep::server_state( + mutex_, + cond_, + server_service, + nullptr, + name, + "", + address, + working_dir, + wsrep::gtid::undefined(), + 1, + wsrep::server_state::rm_async) + , mutex_() + , cond_() + { } + private: + wsrep::default_mutex mutex_; + wsrep::default_condition_variable cond_; + }; +} + +#endif // WSREP_DB_SERVER_CONTEXT_HPP diff --git a/wsrep-lib/dbsim/db_simulator.cpp b/wsrep-lib/dbsim/db_simulator.cpp new file mode 100644 index 00000000..97a18187 --- /dev/null +++ b/wsrep-lib/dbsim/db_simulator.cpp @@ -0,0 +1,267 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_simulator.hpp" +#include "db_client.hpp" +#include "db_threads.hpp" +#include "db_tls.hpp" + +#include "wsrep/logger.hpp" + +#include <boost/filesystem.hpp> +#include <sstream> + +static db::ti thread_instrumentation; +static db::tls tls_service; + +void db::simulator::run() +{ + start(); + stop(); + std::flush(std::cerr); + std::cout << "Results:\n"; + std::cout << stats() << std::endl; + std::cout << db::ti::stats() << std::endl; + std::cout << db::tls::stats() << std::endl; +} + +void db::simulator::sst(db::server& server, + const std::string& request, + const wsrep::gtid& gtid, + bool bypass) +{ + // The request may contain extra trailing '\0' after it goes + // through the provider, strip it first. + std::string name(request); + name.erase(std::find(name.begin(), name.end(), '\0'), name.end()); + + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + auto i(servers_.find(name)); + wsrep::log_info() << "SST request '" << name << "'"; + if (i == servers_.end()) + { + wsrep::log_error() << "Server " << request << " not found"; + wsrep::log_info() << "servers:"; + for (const auto& s : servers_) + { + wsrep::log_info() << "server: " << s.first; + } + throw wsrep::runtime_error("Server " + request + " not found"); + } + if (bypass == false) + { + wsrep::log_info() << "SST " + << server.server_state().name() + << " -> " << request; + i->second->storage_engine().store_position(gtid); + i->second->storage_engine().store_view( + server.storage_engine().get_view()); + } + + db::client dummy(*(i->second), wsrep::client_id(-1), + wsrep::client_state::m_local, params()); + + if (i->second->server_state().sst_received(dummy.client_service(), 0)) + { + throw wsrep::runtime_error("Call to SST received failed"); + } + server.server_state().sst_sent(gtid, 0); +} + +std::string db::simulator::stats() const +{ + auto duration(std::chrono::duration<double>( + clients_stop_ - clients_start_).count()); + long long transactions(stats_.commits + stats_.rollbacks); + long long bf_aborts(0); + for (const auto& s : servers_) + { + bf_aborts += s.second->storage_engine().bf_aborts(); + } + std::ostringstream os; + os << "Number of transactions: " << transactions + << "\n" + << "Seconds: " << duration + << " \n" + << "Transactions per second: " << double(transactions)/double(duration) + << "\n" + << "BF aborts: " + << bf_aborts + << "\n" + << "Client commits: " << stats_.commits + << "\n" + << "Client rollbacks: " << stats_.rollbacks + << "\n" + << "Client replays: " << stats_.replays; + return os.str(); +} + +//////////////////////////////////////////////////////////////////////////////// +// Private // +//////////////////////////////////////////////////////////////////////////////// + +void db::simulator::start() +{ + thread_instrumentation.level(params_.thread_instrumentation); + thread_instrumentation.cond_checks(params_.cond_checks); + tls_service.init(params_.tls_service); + wsrep::log_info() << "Provider: " << params_.wsrep_provider; + + std::string cluster_address(build_cluster_address()); + wsrep::log_info() << "Cluster address: " << cluster_address; + for (size_t i(0); i < params_.n_servers; ++i) + { + std::ostringstream name_os; + name_os << (i + 1); + std::ostringstream id_os; + id_os << (i + 1); + std::ostringstream address_os; + address_os << "127.0.0.1:" << server_port(i); + wsrep::id server_id(id_os.str()); + auto it(servers_.insert( + std::make_pair( + name_os.str(), + std::make_unique<db::server>( + *this, + name_os.str(), + address_os.str())))); + if (it.second == false) + { + throw wsrep::runtime_error("Failed to add server"); + } + boost::filesystem::path dir("dbsim_" + id_os.str() + "_data"); + boost::filesystem::create_directory(dir); + + db::server& server(*it.first->second); + server.server_state().debug_log_level(params_.debug_log_level); + std::string server_options(params_.wsrep_provider_options); + + wsrep::provider::services services; + services.thread_service = params_.thread_instrumentation + ? &thread_instrumentation + : nullptr; + services.tls_service = params_.tls_service + ? &tls_service + : nullptr; + if (server.server_state().load_provider(params_.wsrep_provider, + server_options, services)) + { + throw wsrep::runtime_error("Failed to load provider"); + } + if (server.server_state().connect("sim_cluster", cluster_address, "", + i == 0)) + { + throw wsrep::runtime_error("Failed to connect"); + } + wsrep::log_debug() << "main: Starting applier"; + server.start_applier(); + wsrep::log_debug() << "main: Waiting initializing state"; + if (server.server_state().wait_until_state( + wsrep::server_state::s_initializing)) + { + throw wsrep::runtime_error("Failed to reach initializing state"); + } + wsrep::log_debug() << "main: Calling initialized"; + server.server_state().initialized(); + wsrep::log_debug() << "main: Waiting for synced state"; + if (server.server_state().wait_until_state( + wsrep::server_state::s_synced)) + { + throw wsrep::runtime_error("Failed to reach synced state"); + } + wsrep::log_debug() << "main: Server synced"; + } + + // Start client threads + wsrep::log_info() << "####################### Starting client load"; + clients_start_ = std::chrono::steady_clock::now(); + size_t index(0); + for (auto& i : servers_) + { + if (params_.topology.size() == 0 || params_.topology[index] == 'm') + { + i.second->start_clients(); + } + ++index; + } +} + +void db::simulator::stop() +{ + for (auto& i : servers_) + { + db::server& server(*i.second); + server.stop_clients(); + } + clients_stop_ = std::chrono::steady_clock::now(); + wsrep::log_info() << "######## Stats ############"; + wsrep::log_info() << stats(); + std::cout << db::ti::stats() << std::endl; + wsrep::log_info() << "######## Stats ############"; + if (params_.fast_exit) + { + exit(0); + } + for (auto& i : servers_) + { + db::server& server(*i.second); + wsrep::log_info() << "Status for server: " + << server.server_state().id(); + auto status(server.server_state().provider().status()); + for_each(status.begin(), status.end(), + [](const wsrep::provider::status_variable& sv) + { + wsrep::log_info() << sv.name() << " = " << sv.value(); + }); + server.server_state().disconnect(); + if (server.server_state().wait_until_state( + wsrep::server_state::s_disconnected)) + { + throw wsrep::runtime_error("Failed to reach disconnected state"); + } + server.stop_applier(); + server.server_state().unload_provider(); + } +} + +std::string db::simulator::server_port(size_t i) const +{ + std::ostringstream os; + os << (10000 + (i + 1)*10); + return os.str(); +} + +std::string db::simulator::build_cluster_address() const +{ + std::string ret; + if (params_.wsrep_provider.find("galera_smm") != std::string::npos) + { + ret += "gcomm://"; + } + + for (size_t i(0); i < params_.n_servers; ++i) + { + std::ostringstream sa_os; + sa_os << "127.0.0.1:"; + sa_os << server_port(i); + ret += sa_os.str(); + if (i < params_.n_servers - 1) ret += ","; + } + return ret; +} diff --git a/wsrep-lib/dbsim/db_simulator.hpp b/wsrep-lib/dbsim/db_simulator.hpp new file mode 100644 index 00000000..693645e3 --- /dev/null +++ b/wsrep-lib/dbsim/db_simulator.hpp @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_SIMULATOR_HPP +#define WSREP_DB_SIMULATOR_HPP + +#include "wsrep/gtid.hpp" +#include "wsrep/mutex.hpp" +#include "wsrep/lock.hpp" + +#include "db_params.hpp" +#include "db_server.hpp" + +#include <memory> +#include <chrono> +#include <unordered_set> +#include <map> + +namespace db +{ + class server; + class simulator + { + public: + simulator(const params& params) + : mutex_() + , params_(params) + , servers_() + , clients_start_() + , clients_stop_() + , stats_() + { } + + void run(); + void sst(db::server&, + const std::string&, const wsrep::gtid&, bool); + const db::params& params() const + { return params_; } + std::string stats() const; + private: + void start(); + void stop(); + std::string server_port(size_t i) const; + std::string build_cluster_address() const; + + wsrep::default_mutex mutex_; + const db::params& params_; + std::map<std::string, std::unique_ptr<db::server>> servers_; + std::chrono::time_point<std::chrono::steady_clock> clients_start_; + std::chrono::time_point<std::chrono::steady_clock> clients_stop_; + public: + struct stats + { + long long commits; + long long rollbacks; + long long replays; + stats() + : commits(0) + , rollbacks(0) + , replays(0) + { } + } stats_; + }; +} +#endif // WSRE_DB_SIMULATOR_HPP diff --git a/wsrep-lib/dbsim/db_storage_engine.cpp b/wsrep-lib/dbsim/db_storage_engine.cpp new file mode 100644 index 00000000..4278b81d --- /dev/null +++ b/wsrep-lib/dbsim/db_storage_engine.cpp @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_storage_engine.hpp" +#include "db_client.hpp" + +#include <cassert> + +void db::storage_engine::transaction::start(db::client* cc) +{ + wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_); + if (se_.transactions_.insert(cc).second == false) + { + ::abort(); + } + cc_ = cc; +} + +void db::storage_engine::transaction::apply( + const wsrep::transaction& transaction) +{ + assert(cc_); + se_.bf_abort_some(transaction); +} + +void db::storage_engine::transaction::commit(const wsrep::gtid& gtid) +{ + if (cc_) + { + wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_); + se_.transactions_.erase(cc_); + se_.store_position(gtid); + } + cc_ = nullptr; +} + + +void db::storage_engine::transaction::rollback() +{ + if (cc_) + { + wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_); + se_.transactions_.erase(cc_); + } + cc_ = nullptr; +} + +void db::storage_engine::bf_abort_some(const wsrep::transaction& txc) +{ + std::uniform_int_distribution<size_t> uniform_dist(0, alg_freq_); + wsrep::unique_lock<wsrep::mutex> lock(mutex_); + if (alg_freq_ && uniform_dist(random_engine_) == 0) + { + if (transactions_.empty() == false) + { + for (auto victim : transactions_) + { + wsrep::client_state& cc(victim->client_state()); + if (cc.mode() == wsrep::client_state::m_local) + { + if (victim->bf_abort(txc.seqno())) + { + ++bf_aborts_; + } + break; + } + } + } + } +} + +void db::storage_engine::store_position(const wsrep::gtid& gtid) +{ + validate_position(gtid); + position_ = gtid; +} + +wsrep::gtid db::storage_engine::get_position() const +{ + return position_; +} + +void db::storage_engine::store_view(const wsrep::view& view) +{ + view_ = view; +} + +wsrep::view db::storage_engine::get_view() const +{ + return view_; +} + +void db::storage_engine::validate_position(const wsrep::gtid& gtid) const +{ + if (position_.id() == gtid.id() && gtid.seqno() <= position_.seqno()) + { + std::ostringstream os; + os << "Invalid position submitted, position seqno " + << position_.seqno() + << " is greater than submitted seqno " + << gtid.seqno(); + throw wsrep::runtime_error(os.str()); + } +} diff --git a/wsrep-lib/dbsim/db_storage_engine.hpp b/wsrep-lib/dbsim/db_storage_engine.hpp new file mode 100644 index 00000000..de5080cf --- /dev/null +++ b/wsrep-lib/dbsim/db_storage_engine.hpp @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_STORAGE_ENGINE_HPP +#define WSREP_DB_STORAGE_ENGINE_HPP + +#include "db_params.hpp" + +#include "wsrep/mutex.hpp" +#include "wsrep/view.hpp" +#include "wsrep/transaction.hpp" + +#include <atomic> +#include <unordered_set> +#include <random> + +namespace db +{ + class client; + class storage_engine + { + public: + storage_engine(const params& params) + : mutex_() + , transactions_() + , alg_freq_(params.alg_freq) + , bf_aborts_() + , position_() + , view_() + , random_device_() + , random_engine_(random_device_()) + { } + + class transaction + { + public: + transaction(storage_engine& se) + : se_(se) + , cc_() + { } + ~transaction() + { + rollback(); + } + bool active() const { return cc_ != nullptr; } + void start(client* cc); + void apply(const wsrep::transaction&); + void commit(const wsrep::gtid&); + void rollback(); + db::client* client() { return cc_; } + transaction(const transaction&) = delete; + transaction& operator=(const transaction&) = delete; + private: + db::storage_engine& se_; + db::client* cc_; + }; + void bf_abort_some(const wsrep::transaction& tc); + long long bf_aborts() const { return bf_aborts_; } + void store_position(const wsrep::gtid& gtid); + wsrep::gtid get_position() const; + void store_view(const wsrep::view& view); + wsrep::view get_view() const; + private: + void validate_position(const wsrep::gtid& gtid) const; + wsrep::default_mutex mutex_; + std::unordered_set<db::client*> transactions_; + size_t alg_freq_; + std::atomic<long long> bf_aborts_; + wsrep::gtid position_; + wsrep::view view_; + std::random_device random_device_; + std::default_random_engine random_engine_; + }; +} + +#endif // WSREP_DB_STORAGE_ENGINE_HPP diff --git a/wsrep-lib/dbsim/db_storage_service.hpp b/wsrep-lib/dbsim/db_storage_service.hpp new file mode 100644 index 00000000..839253db --- /dev/null +++ b/wsrep-lib/dbsim/db_storage_service.hpp @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_STORAGE_SERVICE_HPP +#define WSREP_DB_STORAGE_SERVICE_HPP + +#include "wsrep/storage_service.hpp" +#include "wsrep/exception.hpp" + +namespace db +{ + class storage_service : public wsrep::storage_service + { + int start_transaction(const wsrep::ws_handle&) override + { throw wsrep::not_implemented_error(); } + void adopt_transaction(const wsrep::transaction&) override + { throw wsrep::not_implemented_error(); } + int append_fragment(const wsrep::id&, + wsrep::transaction_id, + int, + const wsrep::const_buffer&, + const wsrep::xid&) override + { throw wsrep::not_implemented_error(); } + int update_fragment_meta(const wsrep::ws_meta&) override + { throw wsrep::not_implemented_error(); } + int remove_fragments() override + { throw wsrep::not_implemented_error(); } + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override + { throw wsrep::not_implemented_error(); } + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) + override + { throw wsrep::not_implemented_error(); } + void store_globals() override { } + void reset_globals() override { } + }; +} + +#endif // WSREP_DB_STORAGE_SERVICE_HPP diff --git a/wsrep-lib/dbsim/db_threads.cpp b/wsrep-lib/dbsim/db_threads.cpp new file mode 100644 index 00000000..10b580dd --- /dev/null +++ b/wsrep-lib/dbsim/db_threads.cpp @@ -0,0 +1,729 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_threads.hpp" +#include "wsrep/compiler.hpp" +#include "wsrep/logger.hpp" + +#include <cassert> +#include <cstdint> +#include <pthread.h> + +#include <algorithm> +#include <atomic> +#include <array> +#include <chrono> +#include <map> +#include <mutex> +#include <ostream> +#include <sstream> +#include <thread> +#include <unordered_map> +#include <vector> + +extern "C" { static void* start_thread(void* args_ptr); } +namespace +{ + struct ti_obj + { + }; + enum ti_opcode + { + oc_thread_create, + oc_thread_destroy, + oc_mutex_create, + oc_mutex_destroy, + oc_mutex_lock, + oc_mutex_trylock, + oc_mutex_unlock, + oc_cond_create, + oc_cond_destroy, + oc_cond_wait, + oc_cond_timedwait, + oc_cond_signal, + oc_cond_broadcast, + oc_max // must be the last + }; + + static const char* ti_opstring(enum ti_opcode op) + { + switch (op) + { + case oc_thread_create: return "thread_create"; + case oc_thread_destroy: return "thread_destroy"; + case oc_mutex_create: return "mutex_create"; + case oc_mutex_destroy: return "mutex_destroy"; + case oc_mutex_lock: return "mutex_lock"; + case oc_mutex_trylock: return "mutex_trylock"; + case oc_mutex_unlock: return "mutex_unlock"; + case oc_cond_create: return "cond_create"; + case oc_cond_destroy: return "cond_destroy"; + case oc_cond_wait: return "cond_wait"; + case oc_cond_timedwait: return "cond_timedwait"; + case oc_cond_signal: return "cond_signal"; + case oc_cond_broadcast: return "cond_broadcast"; + default: return "unknown"; + } + } + + static std::vector<std::string> key_vec; + static std::atomic<int> key_cnt; + static std::vector<std::vector<size_t>> ops_map; + static std::vector<std::mutex*> ops_map_sync; + static struct ops_map_sync_deleter + { + ~ops_map_sync_deleter() + { + std::for_each(ops_map_sync.begin(), ops_map_sync.end(), + [](auto entry) { delete entry; }); + } + } ops_map_sync_deleter; + static std::array<std::atomic<size_t>, oc_max> total_ops; + static std::atomic<size_t> total_allocations; + static std::atomic<size_t> mutex_contention; + static std::unordered_map<std::string, size_t> mutex_contention_counts; + static int op_level; + // Check correct condition variable usage: + // - Associated mutex must be locked when waiting for cond + // - There must be at least one waiter when signalling for condition + static bool cond_checks; + static inline void cond_check(bool condition, const char* name, + const char* message) + { + if (cond_checks && !condition) + { + wsrep::log_error() << "Condition variable check failed for '" + << name << "': " << message; + ::abort(); + } + } + static inline int append_key(const char* name, const char* type) + { + + key_vec.push_back(std::string(name) + "_" + type); + wsrep::log_info() << "Register key " << name << "_" << type + << " with index " << (key_cnt + 1); + ops_map.push_back(std::vector<size_t>()); + ops_map_sync.push_back(new std::mutex()); + ops_map.back().resize(oc_max); + return ++key_cnt; + } + + template <class Key> static inline size_t get_key_index(const Key* key) + { + size_t index(reinterpret_cast<const size_t>(key) - 1); + assert(index < key_vec.size()); + return index; + } + + template <class Key> + static inline const char* get_key_name(const Key* key) + { + return key_vec[get_key_index(key)].c_str(); + } + + static inline const std::string& get_key_name_by_index(size_t index) + { + assert(index < key_vec.size()); + return key_vec[index]; + } + + // Note: Do not refer the obj pointer in this function, it may + // have been deleted before the call. + template <class Key> + static inline void update_ops(const ti_obj* obj, + const Key* key, + enum ti_opcode op) + { + if (op_level < 1) + return; + total_ops[op] += 1; + if (op_level < 2) + return; + if (false && op == oc_mutex_destroy) + { + wsrep::log_info() << "thread: " << std::this_thread::get_id() + << " object: " << obj + << ": name: " << get_key_name(key) + << " op: " << ti_opstring(op); + } + + std::lock_guard<std::mutex> lock(*ops_map_sync[get_key_index(key)]); + ops_map[get_key_index(key)][op] += 1; + } + + struct thread_args + { + void* this_thread; + void* (*fn)(void*); + void* args; + }; + + pthread_key_t this_thread_key; + struct this_thread_key_initializer + { + this_thread_key_initializer() + { + pthread_key_create(&this_thread_key, nullptr); + } + + ~this_thread_key_initializer() + { + pthread_key_delete(this_thread_key); + } + }; + + + class ti_thread : public ti_obj + { + public: + ti_thread(const wsrep::thread_service::thread_key* key) + : key_(key) + , th_() + , retval_() + , detached_() + { + update_ops(this, key_, oc_thread_create); + } + ~ti_thread() + { + update_ops(this, key_, oc_thread_destroy); + } + + ti_thread(const ti_thread&) = delete; + ti_thread& operator=(const ti_thread&) = delete; + int run(void* (*fn)(void *), void* args) + { + auto ta(new thread_args{this, fn, args}); + return pthread_create(&th_, nullptr, start_thread, ta); + } + + int detach() + { + detached_ = true; + return pthread_detach(th_); + } + + int join(void** retval) + { + return pthread_join(th_, retval); + } + + bool detached() const { return detached_; } + + void retval(void* retval) { retval_ = retval; } + + static ti_thread* self() + { + return reinterpret_cast<ti_thread*>( + pthread_getspecific(this_thread_key)); + } + + int setschedparam(int policy, const struct sched_param* param) + { + return pthread_setschedparam(th_, policy, param); + } + + int getschedparam(int* policy, struct sched_param* param) + { + return pthread_getschedparam(th_, policy, param); + } + + int equal(ti_thread* other) + { + return pthread_equal(th_, other->th_); + } + private: + const wsrep::thread_service::thread_key* key_; + pthread_t th_; + void* retval_; + bool detached_; + }; + + class ti_mutex : public ti_obj + { + public: + ti_mutex(const wsrep::thread_service::mutex_key* key, bool inplace) + : mutex_(PTHREAD_MUTEX_INITIALIZER) + , key_(key) + , inplace_(inplace) +#ifndef NDEBUG + , locked_() + , owner_() +#endif // ! NDEBUG + { + update_ops(this, key_, oc_mutex_create); + if (not inplace) total_allocations++; + } + + ~ti_mutex() { update_ops(this, key_, oc_mutex_destroy); } + + ti_mutex& operator=(const ti_mutex&) = delete; + ti_mutex(const ti_mutex&) = delete; + + int lock() + { + update_ops(this, key_, oc_mutex_lock); + int ret(pthread_mutex_trylock(&mutex_)); + if (ret == EBUSY) + { + mutex_contention++; + { + std::lock_guard<std::mutex> lock(*ops_map_sync[get_key_index(key_)]); + mutex_contention_counts[get_key_name(key_)] += 1; + } + ret = pthread_mutex_lock(&mutex_); + } +#ifndef NDEBUG + if (ret == 0) + { + assert(owner_ == std::thread::id()); + locked_ = true; + owner_ = std::this_thread::get_id(); + } +#endif // ! NDEBUG + return ret; + } + int trylock() + { + update_ops(this, key_, oc_mutex_trylock); + int ret(pthread_mutex_trylock(&mutex_)); +#ifndef NDEBUG + if (ret == 0) + { + assert(owner_ == std::thread::id()); + locked_ = true; + owner_ = std::this_thread::get_id(); + } +#endif // ! NDEBUG + return ret; + } + + int unlock() + { + assert(locked_); +#ifndef NDEBUG + assert(owner_ == std::this_thread::get_id()); + owner_ = std::thread::id(); +#endif // ! NDEBUG + // Use temporary object. After mutex is unlocked it may be + // destroyed before this update_ops() finishes. + auto key(key_); + int ret(pthread_mutex_unlock(&mutex_)); + update_ops(this, key, oc_mutex_unlock); + return ret; + } + + struct condwait_context + { +#ifndef NDEBUG + bool locked; + std::thread::id owner; +#endif // ! NDEBUG + }; + + condwait_context save_for_condwait() + { +#ifndef NDEBUG + return condwait_context{ locked_, owner_ }; +#else + return condwait_context{}; +#endif // ! NDEBUG + } + + void reset() + { +#ifndef NDEBUG + locked_ = false; + owner_ = std::thread::id(); +#endif // ! NDEBUG + } + + void restore_from_condwait(const condwait_context& ctx WSREP_UNUSED) + { +#ifndef NDEBUG + locked_ = ctx.locked; + owner_ = ctx.owner; +#endif // ! NDEBUG + } + + pthread_mutex_t* native_handle() { return &mutex_; } + const wsrep::thread_service::mutex_key* key() const { return key_; } + + bool inplace() const { return inplace_; } + private: + pthread_mutex_t mutex_; + const wsrep::thread_service::mutex_key* key_; + const bool inplace_; +#ifndef NDEBUG + bool locked_; + std::atomic<std::thread::id> owner_; +#endif // ! NDEBU + }; + + class ti_cond : public ti_obj + { + public: + ti_cond(const wsrep::thread_service::cond_key* key, bool inplace) + : cond_(PTHREAD_COND_INITIALIZER) + , key_(key) + , inplace_(inplace) + , waiter_() + { + update_ops(this, key_, oc_cond_create); + if (not inplace) total_allocations++; + } + + ~ti_cond() { update_ops(this, key_, oc_cond_destroy); } + + ti_cond& operator=(const ti_cond&) = delete; + ti_cond(const ti_cond&) = delete; + + int wait(ti_mutex& mutex) + { + cond_check(pthread_mutex_trylock(mutex.native_handle()), + get_key_name(key_), "Mutex not locked in cond wait"); + waiter_ = true; + update_ops(this, key_, oc_cond_wait); + // update_ops(&mutex, mutex.key(), oc_mutex_unlock); + auto condwait_ctx(mutex.save_for_condwait()); + mutex.reset(); + int ret(pthread_cond_wait(&cond_, mutex.native_handle())); + // update_ops(&mutex, mutex.key(), oc_mutex_lock); + mutex.restore_from_condwait(condwait_ctx); + waiter_ = false; + return ret; + } + + int timedwait(ti_mutex& mutex, const struct timespec* ts) + { + cond_check(pthread_mutex_trylock(mutex.native_handle()), + get_key_name(key_), "Mutex not locked in cond wait"); + waiter_ = true; + update_ops(this, key_, oc_cond_timedwait); + // update_ops(&mutex, mutex.key(), oc_mutex_unlock); + auto condwait_ctx(mutex.save_for_condwait()); + mutex.reset(); + int ret(pthread_cond_timedwait(&cond_, mutex.native_handle(), ts)); + // update_ops(&mutex, mutex.key(), oc_mutex_lock); + mutex.restore_from_condwait(condwait_ctx); + waiter_ = false; + return ret; + } + + int signal() + { + update_ops(this, key_, oc_cond_signal); + cond_check(waiter_, get_key_name(key_), + "Signalling condition variable without waiter"); + return pthread_cond_signal(&cond_); + } + + int broadcast() + { + update_ops(this, key_, oc_cond_broadcast); + return pthread_cond_broadcast(&cond_); + } + + bool inplace() const { return inplace_; } + private: + pthread_cond_t cond_; + const wsrep::thread_service::cond_key* key_; + const bool inplace_; + bool waiter_; + }; +} + +int db::ti::before_init() +{ + wsrep::log_info() << "db::ti::before_init()"; + return 0; +} + +int db::ti::after_init() +{ + wsrep::log_info() << "db::ti::after_init()"; + return 0; +} + +////////////////////////////////////////////////////////////////////////////// +// Thread // +////////////////////////////////////////////////////////////////////////////// + +extern "C" +{ +static void* start_thread(void* args_ptr) +{ + thread_args* ta(reinterpret_cast<thread_args*>(args_ptr)); + ti_thread* thread = reinterpret_cast<ti_thread*>(ta->this_thread); + pthread_setspecific(this_thread_key, thread); + void* (*fn)(void*) = ta->fn; + void* args = ta->args; + delete ta; + void* ret = (*fn)(args); + pthread_setspecific(this_thread_key, nullptr); + // If we end here the thread returned instead of calling + // pthread_exit() + if (thread->detached()) + delete thread; + return ret; +} + +WSREP_NORETURN +static void exit_thread(wsrep::thread_service::thread* thread, void* retval) +{ + pthread_setspecific(this_thread_key, nullptr); + ti_thread* th(reinterpret_cast<ti_thread*>(thread)); + th->retval(retval); + if (th->detached()) + delete th; + pthread_exit(retval); +} +} // extern "C" + +db::ti::ti() +{ + thread_service::exit = exit_thread; +} + +const wsrep::thread_service::thread_key* +db::ti::create_thread_key(const char* name) WSREP_NOEXCEPT +{ + assert(name); + return reinterpret_cast<const wsrep::thread_service::thread_key*>( + append_key(name, "thread")); +} + +int db::ti::create_thread(const wsrep::thread_service::thread_key* key, + wsrep::thread_service::thread** thread, + void* (*fn)(void*), void* args) WSREP_NOEXCEPT +{ + auto pit(new ti_thread(key)); + total_allocations++; + int ret; + if ((ret = pit->run(fn, args))) + { + delete pit; + } + else + { + *thread = reinterpret_cast<wsrep::thread_service::thread*>(pit); + } + return ret; +} + +int db::ti::detach(wsrep::thread_service::thread* thread) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_thread*>(thread)->detach(); +} + +int db::ti::equal(wsrep::thread_service::thread* thread_1, + wsrep::thread_service::thread* thread_2) WSREP_NOEXCEPT +{ + return (reinterpret_cast<ti_thread*>(thread_1)->equal( + reinterpret_cast<ti_thread*>(thread_2))); +} + +int db::ti::join(wsrep::thread_service::thread* thread, void** retval) WSREP_NOEXCEPT +{ + ti_thread* th(reinterpret_cast<ti_thread*>(thread)); + int ret(th->join(retval)); + if (not th->detached()) + { + delete th; + } + return ret; +} + +wsrep::thread_service::thread* db::ti::self() WSREP_NOEXCEPT +{ + return reinterpret_cast<wsrep::thread_service::thread*>(ti_thread::self()); +} + +int db::ti::setschedparam(wsrep::thread_service::thread* thread, + int policy, const struct sched_param* param) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_thread*>(thread)->setschedparam(policy, param); +} + +int db::ti::getschedparam(wsrep::thread_service::thread* thread, + int* policy, struct sched_param* param) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_thread*>(thread)->getschedparam(policy, param); +} + +////////////////////////////////////////////////////////////////////////////// +// Mutex // +////////////////////////////////////////////////////////////////////////////// + +const wsrep::thread_service::mutex_key* +db::ti::create_mutex_key(const char* name) WSREP_NOEXCEPT +{ + assert(name); + return reinterpret_cast<const wsrep::thread_service::mutex_key*>( + append_key(name, "mutex")); +} + +wsrep::thread_service::mutex* +db::ti::init_mutex(const wsrep::thread_service::mutex_key* key, void* memblock, + size_t memblock_size) WSREP_NOEXCEPT +{ + return reinterpret_cast<wsrep::thread_service::mutex*>( + memblock_size >= sizeof(ti_mutex) ? new (memblock) ti_mutex(key, true) + : new ti_mutex(key, false)); +} + +int db::ti::destroy(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT +{ + ti_mutex* m(reinterpret_cast<ti_mutex*>(mutex)); + if (m->inplace()) + { + m->~ti_mutex(); + } + else + { + delete m; + } + return 0; +} + +int db::ti::lock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_mutex*>(mutex)->lock(); +} + +int db::ti::trylock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_mutex*>(mutex)->trylock(); +} + +int db::ti::unlock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_mutex*>(mutex)->unlock(); +} + +////////////////////////////////////////////////////////////////////////////// +// Cond // +////////////////////////////////////////////////////////////////////////////// + +const wsrep::thread_service::cond_key* db::ti::create_cond_key(const char* name) WSREP_NOEXCEPT +{ + assert(name); + return reinterpret_cast<const wsrep::thread_service::cond_key*>( + append_key(name, "cond")); +} + +wsrep::thread_service::cond* +db::ti::init_cond(const wsrep::thread_service::cond_key* key, void* memblock, + size_t memblock_size) WSREP_NOEXCEPT +{ + return reinterpret_cast<wsrep::thread_service::cond*>( + memblock_size >= sizeof(ti_cond) ? new (memblock) ti_cond(key, true) + : new ti_cond(key, false)); +} + +int db::ti::destroy(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT +{ + ti_cond* c(reinterpret_cast<ti_cond*>(cond)); + if (c->inplace()) + { + c->~ti_cond(); + } + else + { + delete c; + } + return 0; +} + +int db::ti::wait(wsrep::thread_service::cond* cond, + wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_cond*>(cond)->wait( + *reinterpret_cast<ti_mutex*>(mutex)); +} + +int db::ti::timedwait(wsrep::thread_service::cond* cond, + wsrep::thread_service::mutex* mutex, + const struct timespec* ts) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_cond*>(cond)->timedwait( + *reinterpret_cast<ti_mutex*>(mutex), ts); +} + +int db::ti::signal(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_cond*>(cond)->signal(); +} + +int db::ti::broadcast(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT +{ + return reinterpret_cast<ti_cond*>(cond)->broadcast(); +} + +void db::ti::level(int level) +{ + ::op_level = level; +} + +void db::ti::cond_checks(bool cond_checks) +{ + if (cond_checks) + wsrep::log_info() << "Enabling condition variable checking"; + ::cond_checks = cond_checks; +} + +std::string db::ti::stats() +{ + std::ostringstream os; + os << "Totals:\n"; + for (size_t i(0); i < total_ops.size(); ++i) + { + if (total_ops[i] > 0) + { + os << " " << ti_opstring(static_cast<enum ti_opcode>(i)) << ": " + << total_ops[i] << "\n"; + } + } + os << "Total allocations: " << total_allocations << "\n"; + os << "Mutex contention: " << mutex_contention << "\n"; + for (auto i : mutex_contention_counts) + { + os << " " << i.first << ": " << i.second << "\n"; + } + os << "Per key:\n"; + std::map<std::string, std::vector<size_t>> sorted; + for (size_t i(0); i < ops_map.size(); ++i) + { + sorted.insert(std::make_pair(get_key_name_by_index(i), ops_map[i])); + } + for (auto i : sorted) + { + for (size_t j(0); j < i.second.size(); ++j) + { + if (i.second[j]) + { + os << " " << i.first << ": " + << ti_opstring(static_cast<enum ti_opcode>(j)) << ": " + << i.second[j] << "\n"; + } + } + } + return os.str(); +} diff --git a/wsrep-lib/dbsim/db_threads.hpp b/wsrep-lib/dbsim/db_threads.hpp new file mode 100644 index 00000000..32d44c10 --- /dev/null +++ b/wsrep-lib/dbsim/db_threads.hpp @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_THREADS_HPP +#define WSREP_DB_THREADS_HPP + +#include "wsrep/thread_service.hpp" +#include <string> + +namespace db +{ + class ti : public wsrep::thread_service + { + public: + ti(); + int before_init() override; + int after_init() override; + + /* Thread */ + const wsrep::thread_service::thread_key* + create_thread_key(const char* name) WSREP_NOEXCEPT override; + int create_thread(const wsrep::thread_service::thread_key* key, + wsrep::thread_service::thread**, + void* (*fn)(void*), void*) WSREP_NOEXCEPT override; + int detach(wsrep::thread_service::thread*) WSREP_NOEXCEPT override; + int equal(wsrep::thread_service::thread*, + wsrep::thread_service::thread*) WSREP_NOEXCEPT override; + int join(wsrep::thread_service::thread*, void**) WSREP_NOEXCEPT override; + wsrep::thread_service::thread* self() WSREP_NOEXCEPT override; + int setschedparam(wsrep::thread_service::thread*, int, + const struct sched_param*) WSREP_NOEXCEPT override; + int getschedparam(wsrep::thread_service::thread*, int*, + struct sched_param*) WSREP_NOEXCEPT override; + + /* Mutex */ + const wsrep::thread_service::mutex_key* + create_mutex_key(const char* name) WSREP_NOEXCEPT override; + wsrep::thread_service::mutex* + init_mutex(const wsrep::thread_service::mutex_key* key, void* memblock, + size_t memblock_size) WSREP_NOEXCEPT override; + int destroy(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override; + int lock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override; + int trylock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override; + int unlock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override; + /* Cond */ + const wsrep::thread_service::cond_key* + create_cond_key(const char* name) WSREP_NOEXCEPT override; + wsrep::thread_service::cond* + init_cond(const wsrep::thread_service::cond_key* key, void* memblock, + size_t memblock_size) WSREP_NOEXCEPT override; + int destroy(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override; + int wait(wsrep::thread_service::cond* cond, + wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override; + int timedwait(wsrep::thread_service::cond* cond, + wsrep::thread_service::mutex* mutex, + const struct timespec* ts) WSREP_NOEXCEPT override; + int signal(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override; + int broadcast(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override; + + static void level(int level); + static void cond_checks(bool cond_checks); + static std::string stats(); + }; + + +} + +#endif // WSREP_DB_THREADS_HPP diff --git a/wsrep-lib/dbsim/db_tls.cpp b/wsrep-lib/dbsim/db_tls.cpp new file mode 100644 index 00000000..c242668c --- /dev/null +++ b/wsrep-lib/dbsim/db_tls.cpp @@ -0,0 +1,451 @@ +/* + * Copyright (C) 2020 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file db_tls.cpp + * + * This file demonstrates the use of TLS service. It does not implement + * real encryption, but may manipulate stream bytes for testing purposes. + */ + +#include "db_tls.hpp" + +#include "wsrep/logger.hpp" + +#include <unistd.h> // read() +#include <fcntl.h> +#include <sys/types.h> +#include <sys/socket.h> // send() +#include <cassert> +#include <cerrno> +#include <cstring> + +#include <mutex> +#include <string> + +namespace +{ + class db_stream : public wsrep::tls_stream + { + public: + db_stream(int fd, int mode) + : fd_(fd) + , state_(s_initialized) + , last_error_() + , mode_(mode) + , stats_() + , is_blocking_() + { + int val(fcntl(fd_, F_GETFL, 0)); + is_blocking_ = not (val & O_NONBLOCK); + } + struct stats + { + size_t bytes_read{0}; + size_t bytes_written{0}; + }; + + /* + * in idle --| + * |-> ch -| ^ | -> want_read --| + * |-> sh -| ---- |--> | -> want_write --| + * |----------------------| + */ + enum state + { + s_initialized, + s_client_handshake, + s_server_handshake, + s_idle, + s_want_read, + s_want_write + }; + + int get_error_number() const { return last_error_; } + const void* get_error_category() const + { + return reinterpret_cast<const void*>(1); + } + + static char* get_error_message(int value, const void*) + { + return ::strerror(value); + } + + enum wsrep::tls_service::status client_handshake(); + + enum wsrep::tls_service::status server_handshake(); + + wsrep::tls_service::op_result read(void*, size_t); + + wsrep::tls_service::op_result write(const void*, size_t); + + enum state state() const { return state_; } + + int fd() const { return fd_; } + void inc_reads(size_t val) { stats_.bytes_read += val; } + void inc_writes(size_t val) { stats_.bytes_written += val; } + const stats& get_stats() const { return stats_; } + private: + enum wsrep::tls_service::status handle_handshake_read(const char* expect); + size_t determine_read_count(size_t max_count) + { + if (is_blocking_ || mode_ < 2) return max_count; + else if (::rand() % 100 == 0) return std::min(size_t(42), max_count); + else return max_count; + } + size_t determine_write_count(size_t count) + { + if (is_blocking_ || mode_ < 2) return count; + else if (::rand() % 100 == 0) return std::min(size_t(43), count); + else return count; + } + + ssize_t do_read(void* buf, size_t max_count) + { + if (is_blocking_ || mode_ < 3 ) + return ::read(fd_, buf, max_count); + else if (::rand() % 1000 == 0) return EINTR; + else return ::read(fd_, buf, max_count); + } + + ssize_t do_write(const void* buf, size_t count) + { + if (is_blocking_ || mode_ < 3) + return ::send(fd_, buf, count, MSG_NOSIGNAL); + else if (::rand() % 1000 == 0) return EINTR; + else return ::send(fd_, buf, count, MSG_NOSIGNAL); + } + + wsrep::tls_service::op_result map_success(ssize_t result) + { + if (is_blocking_ || mode_ < 2) + { + return wsrep::tls_service::op_result{ + wsrep::tls_service::success, size_t(result)}; + } + else if (::rand() % 1000 == 0) + { + wsrep::log_info() << "Success want extra read"; + state_ = s_want_read; + return wsrep::tls_service::op_result{ + wsrep::tls_service::want_read, size_t(result)}; + } + else if (::rand() % 1000 == 0) + { + wsrep::log_info() << "Success want extra write"; + state_ = s_want_write; + return wsrep::tls_service::op_result{ + wsrep::tls_service::want_write, size_t(result)}; + } + else + { + return wsrep::tls_service::op_result{ + wsrep::tls_service::success, size_t(result)}; + } + } + + wsrep::tls_service::op_result map_result(ssize_t result) + { + if (result > 0) + { + return map_success(result); + } + else if (result == 0) + { + return wsrep::tls_service::op_result{ + wsrep::tls_service::eof, 0}; + } + else if (errno == EAGAIN || errno == EWOULDBLOCK) + { + return wsrep::tls_service::op_result{ + wsrep::tls_service::want_read, 0}; + } + else + { + last_error_ = errno; + return wsrep::tls_service::op_result{ + wsrep::tls_service::error, 0}; + } + } + + void clear_error() { last_error_ = 0; } + + int fd_; + enum state state_; + int last_error_; + // Operation mode: + // 1 - simulate handshake exchange + // 2 - simulate errors and short reads + int mode_; + stats stats_; + bool is_blocking_; + }; + + enum wsrep::tls_service::status db_stream::client_handshake() + { + clear_error(); + enum wsrep::tls_service::status ret; + assert(state_ == s_initialized || + state_ == s_client_handshake || + state_ == s_want_write); + if (state_ == s_initialized) + { + (void)::send(fd_, "clie", 4, MSG_NOSIGNAL); + ret = wsrep::tls_service::want_read; + state_ = s_client_handshake; + wsrep::log_info() << this << " client handshake sent"; + stats_.bytes_written += 4; + if (not is_blocking_) return ret; + } + + if (state_ == s_client_handshake) + { + if ((ret = handle_handshake_read("serv")) == + wsrep::tls_service::success) + { + state_ = s_want_write; + ret = wsrep::tls_service::want_write; + } + if (not is_blocking_) return ret; + } + + if (state_ == s_want_write) + { + state_ = s_idle; + ret = wsrep::tls_service::success; + if (not is_blocking_) return ret; + } + + if (not is_blocking_) + { + last_error_ = EPROTO; + ret = wsrep::tls_service::error; + } + return ret; + } + + + + enum wsrep::tls_service::status db_stream::server_handshake() + { + enum wsrep::tls_service::status ret; + assert(state_ == s_initialized || + state_ == s_server_handshake || + state_ == s_want_write); + + if (state_ == s_initialized) + { + ::send(fd_, "serv", 4, MSG_NOSIGNAL); + ret = wsrep::tls_service::want_read; + state_ = s_server_handshake; + stats_.bytes_written += 4; + if (not is_blocking_) return ret; + } + + if (state_ == s_server_handshake) + { + if ((ret = handle_handshake_read("clie")) == + wsrep::tls_service::success) + { + state_ = s_want_write; + ret = wsrep::tls_service::want_write; + } + if (not is_blocking_) return ret; + } + + if (state_ == s_want_write) + { + state_ = s_idle; + ret = wsrep::tls_service::success; + if (not is_blocking_) return ret; + } + + if (not is_blocking_) + { + last_error_ = EPROTO; + ret = wsrep::tls_service::error; + } + return ret; + } + + enum wsrep::tls_service::status db_stream::handle_handshake_read( + const char* expect) + { + assert(::strlen(expect) >= 4); + char buf[4] = { }; + ssize_t read_result(::read(fd_, buf, sizeof(buf))); + if (read_result > 0) stats_.bytes_read += size_t(read_result); + enum wsrep::tls_service::status ret; + if (read_result == -1 && + (errno == EWOULDBLOCK || errno == EAGAIN)) + { + ret = wsrep::tls_service::want_read; + } + else if (read_result == 0) + { + ret = wsrep::tls_service::eof; + } + else if (read_result != 4 || ::memcmp(buf, expect, 4)) + { + last_error_ = EPROTO; + ret = wsrep::tls_service::error; + } + else + { + wsrep::log_info() << "Handshake success: " << std::string(buf, 4); + ret = wsrep::tls_service::success; + } + return ret; + } + + wsrep::tls_service::op_result db_stream::read(void* buf, size_t max_count) + { + clear_error(); + if (state_ == s_want_read) + { + state_ = s_idle; + if (max_count == 0) + return wsrep::tls_service::op_result{ + wsrep::tls_service::success, 0}; + } + max_count = determine_read_count(max_count); + ssize_t read_result(do_read(buf, max_count)); + if (read_result > 0) + { + inc_reads(size_t(read_result)); + } + return map_result(read_result); + } + + wsrep::tls_service::op_result db_stream::write( + const void* buf, size_t count) + { + clear_error(); + if (state_ == s_want_write) + { + state_ = s_idle; + if (count == 0) + return wsrep::tls_service::op_result{ + wsrep::tls_service::success, 0}; + } + count = determine_write_count(count); + ssize_t write_result(do_write(buf, count)); + if (write_result > 0) + { + inc_writes(size_t(write_result)); + } + return map_result(write_result); + } +} + + +static db_stream::stats global_stats; +std::mutex global_stats_lock; +static int global_mode; + +static void merge_to_global_stats(const db_stream::stats& stats) +{ + std::lock_guard<std::mutex> lock(global_stats_lock); + global_stats.bytes_read += stats.bytes_read; + global_stats.bytes_written += stats.bytes_written; +} + +wsrep::tls_stream* db::tls::create_tls_stream(int fd) WSREP_NOEXCEPT +{ + auto ret(new db_stream(fd, global_mode)); + wsrep::log_debug() << "New DB stream: " << ret; + return ret; +} + +void db::tls::destroy(wsrep::tls_stream* stream) WSREP_NOEXCEPT +{ + auto dbs(static_cast<db_stream*>(stream)); + merge_to_global_stats(dbs->get_stats()); + wsrep::log_debug() << "Stream destroy: " << dbs->get_stats().bytes_read + << " " << dbs->get_stats().bytes_written; + wsrep::log_debug() << "Stream destroy" << dbs; + delete dbs; +} + +int db::tls::get_error_number(const wsrep::tls_stream* stream) + const WSREP_NOEXCEPT +{ + return static_cast<const db_stream*>(stream)->get_error_number(); +} + +const void* db::tls::get_error_category(const wsrep::tls_stream* stream) + const WSREP_NOEXCEPT +{ + return static_cast<const db_stream*>(stream)->get_error_category(); +} + +const char* db::tls::get_error_message(const wsrep::tls_stream*, + int value, const void* category) + const WSREP_NOEXCEPT +{ + return db_stream::get_error_message(value, category); +} + +enum wsrep::tls_service::status +db::tls::client_handshake(wsrep::tls_stream* stream) WSREP_NOEXCEPT +{ + return static_cast<db_stream*>(stream)->client_handshake(); +} + +enum wsrep::tls_service::status +db::tls::server_handshake(wsrep::tls_stream* stream) WSREP_NOEXCEPT +{ + return static_cast<db_stream*>(stream)->server_handshake(); +} + +wsrep::tls_service::op_result db::tls::read( + wsrep::tls_stream* stream, + void* buf, size_t max_count) WSREP_NOEXCEPT +{ + return static_cast<db_stream*>(stream)->read(buf, max_count); +} + +wsrep::tls_service::op_result db::tls::write( + wsrep::tls_stream* stream, + const void* buf, size_t count) WSREP_NOEXCEPT +{ + return static_cast<db_stream*>(stream)->write(buf, count); +} + +wsrep::tls_service::status +db::tls::shutdown(wsrep::tls_stream*) WSREP_NOEXCEPT +{ + // @todo error simulation + return wsrep::tls_service::success; +} + + +void db::tls::init(int mode) +{ + global_mode = mode; +} + +std::string db::tls::stats() +{ + std::ostringstream oss; + oss << "Transport stats:\n" + << " bytes_read: " << global_stats.bytes_read << "\n" + << " bytes_written: " << global_stats.bytes_written << "\n"; + return oss.str(); +} diff --git a/wsrep-lib/dbsim/db_tls.hpp b/wsrep-lib/dbsim/db_tls.hpp new file mode 100644 index 00000000..02051dce --- /dev/null +++ b/wsrep-lib/dbsim/db_tls.hpp @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2020 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef WSREP_DB_TLS_HPP +#define WSREP_DB_TLS_HPP + +#include "wsrep/tls_service.hpp" + +#include <string> + +namespace db +{ + class tls : public wsrep::tls_service + { + public: + virtual wsrep::tls_stream* create_tls_stream(int) + WSREP_NOEXCEPT override; + virtual void destroy(wsrep::tls_stream*) WSREP_NOEXCEPT override; + virtual int get_error_number(const wsrep::tls_stream*) const + WSREP_NOEXCEPT override; + virtual const void* get_error_category(const wsrep::tls_stream*) const + WSREP_NOEXCEPT override; + virtual const char* get_error_message(const wsrep::tls_stream*, + int, const void*) const + WSREP_NOEXCEPT override; + virtual enum wsrep::tls_service::status + client_handshake(wsrep::tls_stream*) + WSREP_NOEXCEPT override; + virtual enum wsrep::tls_service::status + server_handshake(wsrep::tls_stream*) + WSREP_NOEXCEPT override; + virtual wsrep::tls_service::op_result + read(wsrep::tls_stream*, void* buf, size_t max_count) + WSREP_NOEXCEPT override; + virtual wsrep::tls_service::op_result + write(wsrep::tls_stream*, const void* buf, size_t count) + WSREP_NOEXCEPT override; + virtual wsrep::tls_service::status + shutdown(wsrep::tls_stream*) WSREP_NOEXCEPT override; + + static void init(int mode); + static std::string stats(); + }; +} + +#endif // WSREP_DB_TLS_HPP diff --git a/wsrep-lib/dbsim/dbsim.cpp b/wsrep-lib/dbsim/dbsim.cpp new file mode 100644 index 00000000..070f83b4 --- /dev/null +++ b/wsrep-lib/dbsim/dbsim.cpp @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "db_params.hpp" +#include "db_simulator.hpp" + +int main(int argc, char** argv) +{ + try + { + db::simulator(db::parse_args(argc, argv)).run(); + } + catch (const std::exception& e) + { + std::cerr << e.what() << std::endl; + return 1; + } + return 0; +} |