From a68fb2d8219f6bccc573009600e9f23e89226a5e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 20:04:16 +0200 Subject: Adding upstream version 1:10.6.11. Signed-off-by: Daniel Baumann --- wsrep-lib/dbsim/db_server.cpp | 167 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 wsrep-lib/dbsim/db_server.cpp (limited to 'wsrep-lib/dbsim/db_server.cpp') diff --git a/wsrep-lib/dbsim/db_server.cpp b/wsrep-lib/dbsim/db_server.cpp new file mode 100644 index 00000000..a54610d1 --- /dev/null +++ b/wsrep-lib/dbsim/db_server.cpp @@ -0,0 +1,167 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "db_server.hpp" +#include "db_server_service.hpp" +#include "db_high_priority_service.hpp" +#include "db_client.hpp" +#include "db_simulator.hpp" + +#include "wsrep/logger.hpp" + +#include +#include + +static wsrep::default_mutex logger_mtx; + +static void +logger_fn(wsrep::log::level l, const char* pfx, const char* msg) +{ + wsrep::unique_lock lock(logger_mtx); + + struct timespec time; + clock_gettime(CLOCK_REALTIME, &time); + + time_t const tt(time.tv_sec); + struct tm date; + localtime_r(&tt, &date); + + char date_str[85] = { '\0', }; + snprintf(date_str, sizeof(date_str) - 1, + "%04d-%02d-%02d %02d:%02d:%02d.%03d", + date.tm_year + 1900, date.tm_mon + 1, date.tm_mday, + date.tm_hour, date.tm_min, date.tm_sec, (int)time.tv_nsec/1000000); + + std::cerr << date_str << ' ' << pfx << wsrep::log::to_c_string(l) << ' ' + << msg << std::endl; +} + +db::server::server(simulator& simulator, + const std::string& name, + const std::string& address) + : simulator_(simulator) + , storage_engine_(simulator_.params()) + , mutex_() + , cond_() + , server_service_(*this) + , reporter_(mutex_, name + ".json", 4) + , server_state_(server_service_, + name, address, "dbsim_" + name + "_data") + , last_client_id_(0) + , last_transaction_id_(0) + , appliers_() + , clients_() + , client_threads_() +{ + wsrep::log::logger_fn(logger_fn); +} + +void db::server::applier_thread() +{ + wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1); + db::client applier(*this, client_id, + wsrep::client_state::m_high_priority, + simulator_.params()); + wsrep::client_state* cc(static_cast( + &applier.client_state())); + db::high_priority_service hps(*this, applier); + cc->open(cc->id()); + cc->before_command(); + enum wsrep::provider::status ret( + server_state_.provider().run_applier(&hps)); + wsrep::log_info() << "Applier thread exited with error code " << ret; + cc->after_command_before_result(); + cc->after_command_after_result(); + cc->close(); + cc->cleanup(); +} + +void db::server::start_applier() +{ + wsrep::unique_lock lock(mutex_); + appliers_.push_back(boost::thread(&server::applier_thread, this)); +} + +void db::server::stop_applier() +{ + wsrep::unique_lock lock(mutex_); + appliers_.front().join(); + appliers_.erase(appliers_.begin()); +} + + +void db::server::start_clients() +{ + size_t n_clients(simulator_.params().n_clients); + for (size_t i(0); i < n_clients; ++i) + { + start_client(i + 1); + } +} + +void db::server::stop_clients() +{ + for (auto& i : client_threads_) + { + i.join(); + } + for (const auto& i : clients_) + { + const struct db::client::stats& stats(i->stats()); + simulator_.stats_.commits += stats.commits; + simulator_.stats_.rollbacks += stats.rollbacks; + simulator_.stats_.replays += stats.replays; + } +} + +void db::server::client_thread(const std::shared_ptr& client) +{ + client->start(); +} + +void db::server::start_client(size_t id) +{ + auto client(std::make_shared( + *this, wsrep::client_id(id), + wsrep::client_state::m_local, + simulator_.params())); + clients_.push_back(client); + client_threads_.push_back( + boost::thread(&db::server::client_thread, this, client)); +} + +void db::server::donate_sst(const std::string& req, + const wsrep::gtid& gtid, + bool bypass) +{ + simulator_.sst(*this, req, gtid, bypass); +} + + +wsrep::high_priority_service* db::server::streaming_applier_service() +{ + throw wsrep::not_implemented_error(); +} + +void db::server::log_state_change(enum wsrep::server_state::state from, + enum wsrep::server_state::state to) +{ + wsrep::log_info() << "State changed " << from << " -> " << to; + reporter_.report_state(to); +} -- cgit v1.2.3