summaryrefslogtreecommitdiffstats
path: root/src/crimson/tools
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/tools
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/tools')
-rw-r--r--src/crimson/tools/CMakeLists.txt22
-rw-r--r--src/crimson/tools/perf_async_msgr.cc151
-rw-r--r--src/crimson/tools/perf_crimson_msgr.cc1222
-rw-r--r--src/crimson/tools/perf_staged_fltree.cc178
-rw-r--r--src/crimson/tools/store_nbd/block_driver.cc19
-rw-r--r--src/crimson/tools/store_nbd/block_driver.h134
-rw-r--r--src/crimson/tools/store_nbd/fs_driver.cc310
-rw-r--r--src/crimson/tools/store_nbd/fs_driver.h72
-rw-r--r--src/crimson/tools/store_nbd/store-nbd.cc456
-rw-r--r--src/crimson/tools/store_nbd/tm_driver.cc222
-rw-r--r--src/crimson/tools/store_nbd/tm_driver.h56
11 files changed, 2842 insertions, 0 deletions
diff --git a/src/crimson/tools/CMakeLists.txt b/src/crimson/tools/CMakeLists.txt
new file mode 100644
index 000000000..fc18ff90b
--- /dev/null
+++ b/src/crimson/tools/CMakeLists.txt
@@ -0,0 +1,22 @@
+add_executable(crimson-store-nbd
+ store_nbd/store-nbd.cc
+ store_nbd/tm_driver.cc
+ store_nbd/fs_driver.cc
+ store_nbd/block_driver.cc
+ )
+target_link_libraries(crimson-store-nbd
+ crimson-os)
+install(TARGETS crimson-store-nbd DESTINATION bin)
+
+add_executable(perf-crimson-msgr perf_crimson_msgr.cc)
+target_link_libraries(perf-crimson-msgr crimson)
+
+add_executable(perf-async-msgr perf_async_msgr.cc)
+target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS})
+
+add_executable(perf-staged-fltree perf_staged_fltree.cc)
+if(WITH_TESTS)
+target_link_libraries(perf-staged-fltree crimson-seastore crimson::gtest)
+else()
+target_link_libraries(perf-staged-fltree crimson-seastore)
+endif()
diff --git a/src/crimson/tools/perf_async_msgr.cc b/src/crimson/tools/perf_async_msgr.cc
new file mode 100644
index 000000000..38cc84fbb
--- /dev/null
+++ b/src/crimson/tools/perf_async_msgr.cc
@@ -0,0 +1,151 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/parsers.hpp>
+
+#include "auth/Auth.h"
+#include "global/global_init.h"
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+#include "messages/MOSDOp.h"
+
+#include "auth/DummyAuth.h"
+
+namespace {
+
+constexpr int CEPH_OSD_PROTOCOL = 10;
+
+struct Server {
+ Server(CephContext* cct, unsigned msg_len)
+ : dummy_auth(cct), dispatcher(cct, msg_len)
+ {
+ msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0));
+ dummy_auth.auth_registry.refresh_config();
+ msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ }
+ DummyAuthClientServer dummy_auth;
+ std::unique_ptr<Messenger> msgr;
+ struct ServerDispatcher : Dispatcher {
+ unsigned msg_len = 0;
+ bufferlist msg_data;
+
+ ServerDispatcher(CephContext* cct, unsigned msg_len)
+ : Dispatcher(cct), msg_len(msg_len)
+ {
+ msg_data.append_zero(msg_len);
+ }
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch(const Message* m) const override {
+ return m->get_type() == CEPH_MSG_OSD_OP;
+ }
+ void ms_fast_dispatch(Message* m) override {
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ const static pg_t pgid;
+ const static object_locator_t oloc;
+ const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ static spg_t spgid(pgid);
+ MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+ bufferlist data(msg_data);
+ rep->write(0, msg_len, data);
+ rep->set_tid(m->get_tid());
+ m->get_connection()->send_message(rep);
+ m->put();
+ }
+ bool ms_dispatch(Message*) override {
+ ceph_abort();
+ }
+ bool ms_handle_reset(Connection*) override {
+ return true;
+ }
+ void ms_handle_remote_reset(Connection*) override {
+ }
+ bool ms_handle_refused(Connection*) override {
+ return true;
+ }
+ } dispatcher;
+};
+
+}
+
+static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
+{
+ std::cout << "async server listening at " << addr << std::endl;
+ Server server{cct, bs};
+ server.msgr->bind(addr);
+ server.msgr->add_dispatcher_head(&server.dispatcher);
+ server.msgr->start();
+ server.msgr->wait();
+}
+
+int main(int argc, char** argv)
+{
+ namespace po = boost::program_options;
+ po::options_description desc{"Allowed options"};
+ desc.add_options()
+ ("help,h", "show help message")
+ ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+ "server address(crimson only supports msgr v2 protocol)")
+ ("bs", po::value<unsigned>()->default_value(0),
+ "server block size")
+ ("crc-enabled", po::value<bool>()->default_value(false),
+ "enable CRC checks")
+ ("threads", po::value<unsigned>()->default_value(3),
+ "async messenger worker threads");
+ po::variables_map vm;
+ std::vector<std::string> unrecognized_options;
+ try {
+ auto parsed = po::command_line_parser(argc, argv)
+ .options(desc)
+ .allow_unregistered()
+ .run();
+ po::store(parsed, vm);
+ if (vm.count("help")) {
+ std::cout << desc << std::endl;
+ return 0;
+ }
+ po::notify(vm);
+ unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
+ } catch(const po::error& e) {
+ std::cerr << "error: " << e.what() << std::endl;
+ return 1;
+ }
+
+ auto addr = vm["addr"].as<std::string>();
+ entity_addr_t target_addr;
+ target_addr.parse(addr.c_str(), nullptr);
+ ceph_assert_always(target_addr.is_msgr2());
+ auto bs = vm["bs"].as<unsigned>();
+ auto crc_enabled = vm["crc-enabled"].as<bool>();
+ auto worker_threads = vm["threads"].as<unsigned>();
+
+ std::vector<const char*> args(argv, argv + argc);
+ auto cct = global_init(nullptr, args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_MON_CONFIG);
+ common_init_finish(cct.get());
+
+ if (crc_enabled) {
+ cct->_conf.set_val("ms_crc_header", "true");
+ cct->_conf.set_val("ms_crc_data", "true");
+ } else {
+ cct->_conf.set_val("ms_crc_header", "false");
+ cct->_conf.set_val("ms_crc_data", "false");
+ }
+
+ cct->_conf.set_val("ms_async_op_threads", fmt::format("{}", worker_threads));
+
+ std::cout << "server[" << addr
+ << "](bs=" << bs
+ << ", crc_enabled=" << crc_enabled
+ << ", worker_threads=" << worker_threads
+ << std::endl;
+
+ run(cct.get(), target_addr, bs);
+}
diff --git a/src/crimson/tools/perf_crimson_msgr.cc b/src/crimson/tools/perf_crimson_msgr.cc
new file mode 100644
index 000000000..aa5753442
--- /dev/null
+++ b/src/crimson/tools/perf_crimson_msgr.cc
@@ -0,0 +1,1222 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <map>
+#include <boost/program_options.hpp>
+#include <boost/iterator/counting_iterator.hpp>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/smp.hh>
+#include <seastar/core/thread.hh>
+
+#include "common/ceph_time.h"
+#include "messages/MOSDOp.h"
+#include "include/random.h"
+
+#include "crimson/auth/DummyAuth.h"
+#include "crimson/common/log.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/osd/stop_signal.h"
+
+using namespace std;
+using namespace std::chrono_literals;
+
+using lowres_clock_t = seastar::lowres_system_clock;
+
+namespace bpo = boost::program_options;
+
+namespace {
+
+template<typename Message>
+using Ref = boost::intrusive_ptr<Message>;
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+ // seems we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [=] {
+ auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+ return sharded_obj->start(args...).then([sharded_obj]() {
+ seastar::engine().at_exit([sharded_obj]() {
+ return sharded_obj->stop().then([sharded_obj] {});
+ });
+ return sharded_obj.get();
+ });
+ }).then([] (seastar::sharded<T> *ptr_shard) {
+ // return the pointer valid for the caller CPU
+ return &ptr_shard->local();
+ });
+}
+
+double get_reactor_utilization() {
+ auto &value_map = seastar::metrics::impl::get_value_map();
+ auto found = value_map.find("reactor_utilization");
+ assert(found != value_map.end());
+ auto &[full_name, metric_family] = *found;
+ std::ignore = full_name;
+ assert(metric_family.size() == 1);
+ const auto& [labels, metric] = *metric_family.begin();
+ std::ignore = labels;
+ auto value = (*metric)();
+ return value.ui();
+}
+
+enum class perf_mode_t {
+ both,
+ client,
+ server
+};
+
+struct client_config {
+ entity_addr_t server_addr;
+ unsigned block_size;
+ unsigned ramptime;
+ unsigned msgtime;
+ unsigned num_clients;
+ unsigned num_conns;
+ unsigned depth;
+ bool skip_core_0;
+
+ std::string str() const {
+ std::ostringstream out;
+ out << "client[>> " << server_addr
+ << "](bs=" << block_size
+ << ", ramptime=" << ramptime
+ << ", msgtime=" << msgtime
+ << ", num_clients=" << num_clients
+ << ", num_conns=" << num_conns
+ << ", depth=" << depth
+ << ", skip_core_0=" << skip_core_0
+ << ")";
+ return out.str();
+ }
+
+ static client_config load(bpo::variables_map& options) {
+ client_config conf;
+ entity_addr_t addr;
+ ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
+ ceph_assert_always(addr.is_msgr2());
+
+ conf.server_addr = addr;
+ conf.block_size = options["client-bs"].as<unsigned>();
+ conf.ramptime = options["ramptime"].as<unsigned>();
+ conf.msgtime = options["msgtime"].as<unsigned>();
+ conf.num_clients = options["clients"].as<unsigned>();
+ ceph_assert_always(conf.num_clients > 0);
+ conf.num_conns = options["conns-per-client"].as<unsigned>();
+ ceph_assert_always(conf.num_conns > 0);
+ conf.depth = options["depth"].as<unsigned>();
+ conf.skip_core_0 = options["client-skip-core-0"].as<bool>();
+ return conf;
+ }
+};
+
+struct server_config {
+ entity_addr_t addr;
+ unsigned block_size;
+ bool is_fixed_cpu;
+ unsigned core;
+
+ std::string str() const {
+ std::ostringstream out;
+ out << "server[" << addr
+ << "](bs=" << block_size
+ << ", is_fixed_cpu=" << is_fixed_cpu
+ << ", core=" << core
+ << ")";
+ return out.str();
+ }
+
+ static server_config load(bpo::variables_map& options) {
+ server_config conf;
+ entity_addr_t addr;
+ ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
+ ceph_assert_always(addr.is_msgr2());
+
+ conf.addr = addr;
+ conf.block_size = options["server-bs"].as<unsigned>();
+ conf.is_fixed_cpu = options["server-fixed-cpu"].as<bool>();
+ conf.core = options["server-core"].as<unsigned>();
+ return conf;
+ }
+};
+
+const unsigned SAMPLE_RATE = 256;
+
+static seastar::future<> run(
+ perf_mode_t mode,
+ const client_config& client_conf,
+ const server_config& server_conf,
+ bool crc_enabled)
+{
+ struct test_state {
+ struct Server final
+ : public crimson::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ // available only in msgr_sid
+ crimson::net::MessengerRef msgr;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+ const seastar::shard_id msgr_sid;
+ std::string lname;
+
+ bool is_fixed_cpu = true;
+ bool is_stopped = false;
+ std::optional<seastar::future<>> fut_report;
+
+ unsigned conn_count = 0;
+ unsigned msg_count = 0;
+ MessageRef last_msg;
+
+ // available in all shards
+ unsigned msg_len;
+ bufferlist msg_data;
+
+ Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report)
+ : msgr_sid{msgr_sid},
+ msg_len{msg_len} {
+ lname = fmt::format("server@{}", msgr_sid);
+ msg_data.append_zero(msg_len);
+
+ if (seastar::this_shard_id() == msgr_sid &&
+ needs_report) {
+ start_report();
+ }
+ }
+
+ void ms_handle_connect(
+ crimson::net::ConnectionRef,
+ seastar::shard_id) override {
+ ceph_abort("impossible, server won't connect");
+ }
+
+ void ms_handle_accept(
+ crimson::net::ConnectionRef,
+ seastar::shard_id new_shard,
+ bool is_replace) override {
+ ceph_assert_always(new_shard == seastar::this_shard_id());
+ auto &server = container().local();
+ ++server.conn_count;
+ }
+
+ void ms_handle_reset(
+ crimson::net::ConnectionRef,
+ bool) override {
+ auto &server = container().local();
+ --server.conn_count;
+ }
+
+ std::optional<seastar::future<>> ms_dispatch(
+ crimson::net::ConnectionRef c, MessageRef m) override {
+ assert(c->get_shard_id() == seastar::this_shard_id());
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+
+ auto &server = container().local();
+
+ // server replies with MOSDOp to generate server-side write workload
+ const static pg_t pgid;
+ const static object_locator_t oloc;
+ const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ static spg_t spgid(pgid);
+ auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
+ bufferlist data(server.msg_data);
+ rep->write(0, server.msg_len, data);
+ rep->set_tid(m->get_tid());
+ ++server.msg_count;
+ std::ignore = c->send(std::move(rep));
+
+ if (server.msg_count % 16 == 0) {
+ server.last_msg = std::move(m);
+ }
+ return {seastar::now()};
+ }
+
+ seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) {
+ return container().invoke_on(
+ msgr_sid, [addr, is_fixed_cpu](auto &server) {
+ // server msgr is always with nonce 0
+ server.msgr = crimson::net::Messenger::create(
+ entity_name_t::OSD(server.msgr_sid),
+ server.lname, 0, is_fixed_cpu);
+ server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ server.msgr->set_auth_client(&server.dummy_auth);
+ server.msgr->set_auth_server(&server.dummy_auth);
+ server.is_fixed_cpu = is_fixed_cpu;
+ return server.msgr->bind(entity_addrvec_t{addr}
+ ).safe_then([&server] {
+ return server.msgr->start({&server});
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [addr] (const std::error_code& e) {
+ logger().error("Server: "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
+ });
+ }
+
+ seastar::future<> shutdown() {
+ logger().info("{} shutdown...", lname);
+ return container().invoke_on(
+ msgr_sid, [](auto &server) {
+ server.is_stopped = true;
+ ceph_assert(server.msgr);
+ server.msgr->stop();
+ return server.msgr->shutdown(
+ ).then([&server] {
+ if (server.fut_report.has_value()) {
+ return std::move(server.fut_report.value());
+ } else {
+ return seastar::now();
+ }
+ });
+ });
+ }
+
+ private:
+ struct ShardReport {
+ unsigned msg_count = 0;
+
+ // per-interval metrics
+ double reactor_utilization;
+ unsigned conn_count = 0;
+ int msg_size = 0;
+ unsigned msg_count_interval = 0;
+ };
+
+ // should not be called frequently to impact performance
+ void get_report(ShardReport& last) {
+ unsigned last_msg_count = last.msg_count;
+ int msg_size = -1;
+ if (last_msg) {
+ auto msg = boost::static_pointer_cast<MOSDOp>(last_msg);
+ msg->finish_decode();
+ ceph_assert_always(msg->ops.size() == 1);
+ msg_size = msg->ops[0].op.extent.length;
+ last_msg.reset();
+ }
+
+ last.msg_count = msg_count;
+ last.reactor_utilization = get_reactor_utilization();
+ last.conn_count = conn_count;
+ last.msg_size = msg_size;
+ last.msg_count_interval = msg_count - last_msg_count;
+ }
+
+ struct TimerReport {
+ unsigned elapsed = 0u;
+ mono_time start_time = mono_clock::zero();
+ std::vector<ShardReport> reports;
+
+ TimerReport(unsigned shards) : reports(shards) {}
+ };
+
+ void start_report() {
+ seastar::promise<> pr_report;
+ fut_report = pr_report.get_future();
+ seastar::do_with(
+ TimerReport(seastar::smp::count),
+ [this](auto &report) {
+ return seastar::do_until(
+ [this] { return is_stopped; },
+ [&report, this] {
+ return seastar::sleep(2s
+ ).then([&report, this] {
+ report.elapsed += 2;
+ if (is_fixed_cpu) {
+ return seastar::smp::submit_to(msgr_sid,
+ [&report, this] {
+ auto &server = container().local();
+ server.get_report(report.reports[seastar::this_shard_id()]);
+ }).then([&report, this] {
+ auto now = mono_clock::now();
+ auto prv = report.start_time;
+ report.start_time = now;
+ if (prv == mono_clock::zero()) {
+ // cannot compute duration
+ return;
+ }
+ std::chrono::duration<double> duration_d = now - prv;
+ double duration = duration_d.count();
+ auto &ireport = report.reports[msgr_sid];
+ double iops = ireport.msg_count_interval / duration;
+ double throughput_MB = -1;
+ if (ireport.msg_size >= 0) {
+ throughput_MB = iops * ireport.msg_size / 1048576;
+ }
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << report.elapsed
+ << "(" << std::setw(5) << duration << ") "
+ << std::setw(9) << iops << "IOPS "
+ << std::setw(8) << throughput_MB << "MiB/s "
+ << ireport.reactor_utilization
+ << "(" << ireport.conn_count << ")";
+ std::cout << sout.str() << std::endl;
+ });
+ } else {
+ return seastar::smp::invoke_on_all([&report, this] {
+ auto &server = container().local();
+ server.get_report(report.reports[seastar::this_shard_id()]);
+ }).then([&report, this] {
+ auto now = mono_clock::now();
+ auto prv = report.start_time;
+ report.start_time = now;
+ if (prv == mono_clock::zero()) {
+ // cannot compute duration
+ return;
+ }
+ std::chrono::duration<double> duration_d = now - prv;
+ double duration = duration_d.count();
+ unsigned num_msgs = 0;
+ // -1 means unavailable, -2 means mismatch
+ int msg_size = -1;
+ for (auto &i : report.reports) {
+ if (i.msg_size >= 0) {
+ if (msg_size == -2) {
+ // pass
+ } else if (msg_size == -1) {
+ msg_size = i.msg_size;
+ } else {
+ if (msg_size != i.msg_size) {
+ msg_size = -2;
+ }
+ }
+ }
+ num_msgs += i.msg_count_interval;
+ }
+ double iops = num_msgs / duration;
+ double throughput_MB = msg_size;
+ if (msg_size >= 0) {
+ throughput_MB = iops * msg_size / 1048576;
+ }
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << report.elapsed
+ << "(" << std::setw(5) << duration << ") "
+ << std::setw(9) << iops << "IOPS "
+ << std::setw(8) << throughput_MB << "MiB/s ";
+ for (auto &i : report.reports) {
+ sout << i.reactor_utilization
+ << "(" << i.conn_count << ") ";
+ }
+ std::cout << sout.str() << std::endl;
+ });
+ }
+ });
+ }
+ );
+ }).then([this] {
+ logger().info("report is stopped!");
+ }).forward_to(std::move(pr_report));
+ }
+ };
+
+ struct Client final
+ : public crimson::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+
+ struct ConnStats {
+ mono_time connecting_time = mono_clock::zero();
+ mono_time connected_time = mono_clock::zero();
+ unsigned received_count = 0u;
+
+ mono_time start_time = mono_clock::zero();
+ unsigned start_count = 0u;
+
+ unsigned sampled_count = 0u;
+ double sampled_total_lat_s = 0.0;
+
+ // for reporting only
+ mono_time finish_time = mono_clock::zero();
+
+ void start_connecting() {
+ connecting_time = mono_clock::now();
+ }
+
+ void finish_connecting() {
+ ceph_assert_always(connected_time == mono_clock::zero());
+ connected_time = mono_clock::now();
+ }
+
+ void start_collect() {
+ ceph_assert_always(connected_time != mono_clock::zero());
+ start_time = mono_clock::now();
+ start_count = received_count;
+ sampled_count = 0u;
+ sampled_total_lat_s = 0.0;
+ finish_time = mono_clock::zero();
+ }
+
+ void prepare_summary(const ConnStats &current) {
+ *this = current;
+ finish_time = mono_clock::now();
+ }
+ };
+
+ struct PeriodStats {
+ mono_time start_time = mono_clock::zero();
+ unsigned start_count = 0u;
+ unsigned sampled_count = 0u;
+ double sampled_total_lat_s = 0.0;
+
+ // for reporting only
+ mono_time finish_time = mono_clock::zero();
+ unsigned finish_count = 0u;
+ unsigned depth = 0u;
+
+ void start_collect(unsigned received_count) {
+ start_time = mono_clock::now();
+ start_count = received_count;
+ sampled_count = 0u;
+ sampled_total_lat_s = 0.0;
+ }
+
+ void reset_period(
+ unsigned received_count, unsigned _depth, PeriodStats &snapshot) {
+ snapshot.start_time = start_time;
+ snapshot.start_count = start_count;
+ snapshot.sampled_count = sampled_count;
+ snapshot.sampled_total_lat_s = sampled_total_lat_s;
+ snapshot.finish_time = mono_clock::now();
+ snapshot.finish_count = received_count;
+ snapshot.depth = _depth;
+
+ start_collect(received_count);
+ }
+ };
+
+ struct JobReport {
+ std::string name;
+ unsigned depth = 0;
+ double connect_time_s = 0;
+ unsigned total_msgs = 0;
+ double messaging_time_s = 0;
+ double latency_ms = 0;
+ double iops = 0;
+ double throughput_mbps = 0;
+
+ void account(const JobReport &stats) {
+ depth += stats.depth;
+ connect_time_s += stats.connect_time_s;
+ total_msgs += stats.total_msgs;
+ messaging_time_s += stats.messaging_time_s;
+ latency_ms += stats.latency_ms;
+ iops += stats.iops;
+ throughput_mbps += stats.throughput_mbps;
+ }
+
+ void report() const {
+ auto str = fmt::format(
+ "{}(depth={}):\n"
+ " connect time: {:08f}s\n"
+ " messages received: {}\n"
+ " messaging time: {:08f}s\n"
+ " latency: {:08f}ms\n"
+ " IOPS: {:08f}\n"
+ " out throughput: {:08f}MB/s",
+ name, depth, connect_time_s,
+ total_msgs, messaging_time_s,
+ latency_ms, iops,
+ throughput_mbps);
+ std::cout << str << std::endl;
+ }
+ };
+
+ struct ConnectionPriv : public crimson::net::Connection::user_private_t {
+ unsigned index;
+ ConnectionPriv(unsigned i) : index{i} {}
+ };
+
+ struct ConnState {
+ crimson::net::MessengerRef msgr;
+ ConnStats conn_stats;
+ PeriodStats period_stats;
+ seastar::semaphore depth;
+ std::vector<lowres_clock_t::time_point> time_msgs_sent;
+ unsigned sent_count = 0u;
+ crimson::net::ConnectionRef active_conn;
+ bool stop_send = false;
+ seastar::promise<JobReport> stopped_send_promise;
+
+ ConnState(std::size_t _depth)
+ : depth{_depth},
+ time_msgs_sent{_depth, lowres_clock_t::time_point::min()} {}
+
+ unsigned get_current_units() const {
+ ceph_assert(depth.available_units() >= 0);
+ return depth.current();
+ }
+
+ seastar::future<JobReport> stop_dispatch_messages() {
+ stop_send = true;
+ depth.broken(DepthBroken());
+ return stopped_send_promise.get_future();
+ }
+ };
+
+ const seastar::shard_id sid;
+ const unsigned id;
+ const std::optional<unsigned> server_sid;
+
+ const unsigned num_clients;
+ const unsigned num_conns;
+ const unsigned msg_len;
+ bufferlist msg_data;
+ const unsigned nr_depth;
+ const unsigned nonce_base;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+
+ std::vector<ConnState> conn_states;
+
+ Client(unsigned num_clients,
+ unsigned num_conns,
+ unsigned msg_len,
+ unsigned _depth,
+ unsigned nonce_base,
+ std::optional<unsigned> server_sid)
+ : sid{seastar::this_shard_id()},
+ id{sid + num_clients - seastar::smp::count},
+ server_sid{server_sid},
+ num_clients{num_clients},
+ num_conns{num_conns},
+ msg_len{msg_len},
+ nr_depth{_depth},
+ nonce_base{nonce_base} {
+ if (is_active()) {
+ for (unsigned i = 0; i < num_conns; ++i) {
+ conn_states.emplace_back(nr_depth);
+ }
+ }
+ msg_data.append_zero(msg_len);
+ }
+
+ std::string get_name(unsigned i) {
+ return fmt::format("client{}Conn{}@{}", id, i, sid);
+ }
+
+ void ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard) override {
+ ceph_assert_always(prv_shard == seastar::this_shard_id());
+ assert(is_active());
+ unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+ auto &conn_state = conn_states[index];
+ conn_state.conn_stats.finish_connecting();
+ }
+
+ std::optional<seastar::future<>> ms_dispatch(
+ crimson::net::ConnectionRef conn, MessageRef m) override {
+ assert(is_active());
+ // server replies with MOSDOp to generate server-side write workload
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+
+ unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+ assert(index < num_conns);
+ auto &conn_state = conn_states[index];
+
+ auto msg_id = m->get_tid();
+ if (msg_id % SAMPLE_RATE == 0) {
+ auto msg_index = msg_id % conn_state.time_msgs_sent.size();
+ ceph_assert(conn_state.time_msgs_sent[msg_index] !=
+ lowres_clock_t::time_point::min());
+ std::chrono::duration<double> cur_latency =
+ lowres_clock_t::now() - conn_state.time_msgs_sent[msg_index];
+ conn_state.conn_stats.sampled_total_lat_s += cur_latency.count();
+ ++(conn_state.conn_stats.sampled_count);
+ conn_state.period_stats.sampled_total_lat_s += cur_latency.count();
+ ++(conn_state.period_stats.sampled_count);
+ conn_state.time_msgs_sent[msg_index] = lowres_clock_t::time_point::min();
+ }
+
+ ++(conn_state.conn_stats.received_count);
+ conn_state.depth.signal(1);
+
+ return {seastar::now()};
+ }
+
+ // should start messenger at this shard?
+ bool is_active() {
+ ceph_assert(seastar::this_shard_id() == sid);
+ return sid + num_clients >= seastar::smp::count;
+ }
+
+ seastar::future<> init() {
+ return container().invoke_on_all([](auto& client) {
+ if (client.is_active()) {
+ return seastar::do_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(client.num_conns),
+ [&client](auto i) {
+ auto &conn_state = client.conn_states[i];
+ std::string name = client.get_name(i);
+ conn_state.msgr = crimson::net::Messenger::create(
+ entity_name_t::OSD(client.id * client.num_conns + i),
+ name, client.nonce_base + client.id * client.num_conns + i, true);
+ conn_state.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ conn_state.msgr->set_auth_client(&client.dummy_auth);
+ conn_state.msgr->set_auth_server(&client.dummy_auth);
+ return conn_state.msgr->start({&client});
+ });
+ }
+ return seastar::now();
+ });
+ }
+
+ seastar::future<> shutdown() {
+ return seastar::do_with(
+ std::vector<JobReport>(num_clients * num_conns),
+ [this](auto &all_stats) {
+ return container().invoke_on_all([&all_stats](auto& client) {
+ if (!client.is_active()) {
+ return seastar::now();
+ }
+
+ return seastar::parallel_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(client.num_conns),
+ [&all_stats, &client](auto i) {
+ logger().info("{} shutdown...", client.get_name(i));
+ auto &conn_state = client.conn_states[i];
+ return conn_state.stop_dispatch_messages(
+ ).then([&all_stats, &client, i](auto stats) {
+ all_stats[client.id * client.num_conns + i] = stats;
+ });
+ }).then([&client] {
+ return seastar::do_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(client.num_conns),
+ [&client](auto i) {
+ auto &conn_state = client.conn_states[i];
+ ceph_assert(conn_state.msgr);
+ conn_state.msgr->stop();
+ return conn_state.msgr->shutdown();
+ });
+ });
+ }).then([&all_stats, this] {
+ auto nr_jobs = all_stats.size();
+ JobReport summary;
+ std::vector<JobReport> clients(num_clients);
+
+ for (unsigned i = 0; i < nr_jobs; ++i) {
+ auto &stats = all_stats[i];
+ stats.report();
+ clients[i / num_conns].account(stats);
+ summary.account(stats);
+ }
+
+ std::cout << std::endl;
+ std::cout << "per client:" << std::endl;
+ for (unsigned i = 0; i < num_clients; ++i) {
+ auto &stats = clients[i];
+ stats.name = fmt::format("client{}", i);
+ stats.connect_time_s /= num_conns;
+ stats.messaging_time_s /= num_conns;
+ stats.latency_ms /= num_conns;
+ stats.report();
+ }
+
+ std::cout << std::endl;
+ summary.name = fmt::format("all", nr_jobs);
+ summary.connect_time_s /= nr_jobs;
+ summary.messaging_time_s /= nr_jobs;
+ summary.latency_ms /= nr_jobs;
+ summary.report();
+ });
+ });
+ }
+
+ seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
+ return container().invoke_on_all([peer_addr](auto& client) {
+ // start clients in active cores
+ if (client.is_active()) {
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ conn_state.conn_stats.start_connecting();
+ conn_state.active_conn = conn_state.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ conn_state.active_conn->set_user_private(
+ std::make_unique<ConnectionPriv>(i));
+ }
+ // make sure handshake won't hurt the performance
+ return seastar::sleep(1s).then([&client] {
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ if (conn_state.conn_stats.connected_time == mono_clock::zero()) {
+ logger().error("\n{} not connected after 1s!\n",
+ client.get_name(i));
+ ceph_assert(false);
+ }
+ }
+ });
+ }
+ return seastar::now();
+ });
+ }
+
+ private:
+ class TimerReport {
+ private:
+ const unsigned num_clients;
+ const unsigned num_conns;
+ const unsigned msgtime;
+ const unsigned bytes_of_block;
+
+ unsigned elapsed = 0u;
+ std::vector<PeriodStats> snaps;
+ std::vector<ConnStats> summaries;
+ std::vector<double> client_reactor_utilizations;
+ std::optional<double> server_reactor_utilization;
+
+ public:
+ TimerReport(unsigned num_clients, unsigned num_conns, unsigned msgtime, unsigned bs)
+ : num_clients{num_clients},
+ num_conns{num_conns},
+ msgtime{msgtime},
+ bytes_of_block{bs},
+ snaps{num_clients * num_conns},
+ summaries{num_clients * num_conns},
+ client_reactor_utilizations(num_clients) {}
+
+ unsigned get_elapsed() const { return elapsed; }
+
+ PeriodStats& get_snap(unsigned client_id, unsigned i) {
+ return snaps[client_id * num_conns + i];
+ }
+
+ ConnStats& get_summary(unsigned client_id, unsigned i) {
+ return summaries[client_id * num_conns + i];
+ }
+
+ void set_client_reactor_utilization(unsigned client_id, double ru) {
+ client_reactor_utilizations[client_id] = ru;
+ }
+
+ void set_server_reactor_utilization(double ru) {
+ server_reactor_utilization = ru;
+ }
+
+ bool should_stop() const {
+ return elapsed >= msgtime;
+ }
+
+ seastar::future<> ticktock() {
+ return seastar::sleep(1s).then([this] {
+ ++elapsed;
+ });
+ }
+
+ void report_header() const {
+ std::ostringstream sout;
+ sout << std::setfill(' ')
+ << std::setw(6) << "sec"
+ << std::setw(7) << "depth"
+ << std::setw(10) << "IOPS"
+ << std::setw(9) << "MB/s"
+ << std::setw(9) << "lat(ms)";
+ std::cout << sout.str() << std::endl;
+ }
+
+ void report_period() {
+ std::chrono::duration<double> elapsed_d = 0s;
+ unsigned depth = 0u;
+ unsigned ops = 0u;
+ unsigned sampled_count = 0u;
+ double sampled_total_lat_s = 0.0;
+ for (const auto& snap: snaps) {
+ elapsed_d += (snap.finish_time - snap.start_time);
+ depth += snap.depth;
+ ops += (snap.finish_count - snap.start_count);
+ sampled_count += snap.sampled_count;
+ sampled_total_lat_s += snap.sampled_total_lat_s;
+ }
+ double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
+ double iops = ops/elapsed_s;
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << std::setw(5) << elapsed_s
+ << " "
+ << std::setw(6) << depth
+ << " "
+ << std::setw(9) << iops
+ << " "
+ << std::setw(8) << iops * bytes_of_block / 1048576
+ << " "
+ << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
+ << " -- ";
+ if (server_reactor_utilization.has_value()) {
+ sout << *server_reactor_utilization << " -- ";
+ }
+ for (double cru : client_reactor_utilizations) {
+ sout << cru << ",";
+ }
+ std::cout << sout.str() << std::endl;
+ }
+
+ void report_summary() const {
+ std::chrono::duration<double> elapsed_d = 0s;
+ unsigned ops = 0u;
+ unsigned sampled_count = 0u;
+ double sampled_total_lat_s = 0.0;
+ for (const auto& summary: summaries) {
+ elapsed_d += (summary.finish_time - summary.start_time);
+ ops += (summary.received_count - summary.start_count);
+ sampled_count += summary.sampled_count;
+ sampled_total_lat_s += summary.sampled_total_lat_s;
+ }
+ double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
+ double iops = ops / elapsed_s;
+ std::ostringstream sout;
+ sout << "--------------"
+ << " summary "
+ << "--------------\n"
+ << setfill(' ')
+ << std::setw(7) << elapsed_s
+ << std::setw(6) << "-"
+ << std::setw(8) << iops
+ << std::setw(8) << iops * bytes_of_block / 1048576
+ << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
+ << "\n";
+ std::cout << sout.str() << std::endl;
+ }
+ };
+
+ seastar::future<> report_period(TimerReport& report) {
+ return container().invoke_on_all([&report] (auto& client) {
+ if (client.is_active()) {
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ PeriodStats& snap = report.get_snap(client.id, i);
+ conn_state.period_stats.reset_period(
+ conn_state.conn_stats.received_count,
+ client.nr_depth - conn_state.get_current_units(),
+ snap);
+ }
+ report.set_client_reactor_utilization(client.id, get_reactor_utilization());
+ }
+ if (client.server_sid.has_value() &&
+ seastar::this_shard_id() == *client.server_sid) {
+ assert(!client.is_active());
+ report.set_server_reactor_utilization(get_reactor_utilization());
+ }
+ }).then([&report] {
+ report.report_period();
+ });
+ }
+
+ seastar::future<> report_summary(TimerReport& report) {
+ return container().invoke_on_all([&report] (auto& client) {
+ if (client.is_active()) {
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ ConnStats& summary = report.get_summary(client.id, i);
+ summary.prepare_summary(conn_state.conn_stats);
+ }
+ }
+ }).then([&report] {
+ report.report_summary();
+ });
+ }
+
+ public:
+ seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
+ logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns",
+ num_clients, num_conns);
+ return container().invoke_on_all([] (auto& client) {
+ if (client.is_active()) {
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ client.do_dispatch_messages(i);
+ }
+ }
+ }).then([ramptime] {
+ logger().info("[all clients]: ramping up {} seconds...", ramptime);
+ return seastar::sleep(std::chrono::seconds(ramptime));
+ }).then([this] {
+ return container().invoke_on_all([] (auto& client) {
+ if (client.is_active()) {
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ conn_state.conn_stats.start_collect();
+ conn_state.period_stats.start_collect(conn_state.conn_stats.received_count);
+ }
+ }
+ });
+ }).then([this, msgtime] {
+ logger().info("[all clients]: reporting {} seconds...\n", msgtime);
+ return seastar::do_with(
+ TimerReport(num_clients, num_conns, msgtime, msg_len),
+ [this](auto& report) {
+ report.report_header();
+ return seastar::do_until(
+ [&report] { return report.should_stop(); },
+ [&report, this] {
+ return report.ticktock().then([&report, this] {
+ // report period every 1s
+ return report_period(report);
+ }).then([&report, this] {
+ // report summary every 10s
+ if (report.get_elapsed() % 10 == 0) {
+ return report_summary(report);
+ } else {
+ return seastar::now();
+ }
+ });
+ }
+ ).then([&report, this] {
+ // report the final summary
+ if (report.get_elapsed() % 10 != 0) {
+ return report_summary(report);
+ } else {
+ return seastar::now();
+ }
+ });
+ });
+ });
+ }
+
+ private:
+ seastar::future<> send_msg(ConnState &conn_state) {
+ ceph_assert(seastar::this_shard_id() == sid);
+ conn_state.sent_count += 1;
+ return conn_state.depth.wait(1
+ ).then([this, &conn_state] {
+ const static pg_t pgid;
+ const static object_locator_t oloc;
+ const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ static spg_t spgid(pgid);
+ auto m = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
+ bufferlist data(msg_data);
+ m->write(0, msg_len, data);
+ // use tid as the identity of each round
+ m->set_tid(conn_state.sent_count);
+
+ // sample message latency
+ if (unlikely(conn_state.sent_count % SAMPLE_RATE == 0)) {
+ auto index = conn_state.sent_count % conn_state.time_msgs_sent.size();
+ ceph_assert(conn_state.time_msgs_sent[index] ==
+ lowres_clock_t::time_point::min());
+ conn_state.time_msgs_sent[index] = lowres_clock_t::now();
+ }
+
+ return conn_state.active_conn->send(std::move(m));
+ });
+ }
+
+ class DepthBroken: public std::exception {};
+
+ seastar::future<JobReport> stop_dispatch_messages(unsigned i) {
+ auto &conn_state = conn_states[i];
+ conn_state.stop_send = true;
+ conn_state.depth.broken(DepthBroken());
+ return conn_state.stopped_send_promise.get_future();
+ }
+
+ void do_dispatch_messages(unsigned i) {
+ ceph_assert(seastar::this_shard_id() == sid);
+ auto &conn_state = conn_states[i];
+ ceph_assert(conn_state.sent_count == 0);
+ conn_state.conn_stats.start_time = mono_clock::now();
+ // forwarded to stopped_send_promise
+ (void) seastar::do_until(
+ [&conn_state] { return conn_state.stop_send; },
+ [this, &conn_state] { return send_msg(conn_state); }
+ ).handle_exception_type([] (const DepthBroken& e) {
+ // ok, stopped by stop_dispatch_messages()
+ }).then([this, &conn_state, i] {
+ std::string name = get_name(i);
+ logger().info("{} {}: stopped sending OSDOPs",
+ name, *conn_state.active_conn);
+
+ std::chrono::duration<double> dur_conn =
+ conn_state.conn_stats.connected_time -
+ conn_state.conn_stats.connecting_time;
+ std::chrono::duration<double> dur_msg =
+ mono_clock::now() - conn_state.conn_stats.start_time;
+ unsigned ops =
+ conn_state.conn_stats.received_count -
+ conn_state.conn_stats.start_count;
+
+ JobReport stats;
+ stats.name = name;
+ stats.depth = nr_depth;
+ stats.connect_time_s = dur_conn.count();
+ stats.total_msgs = ops;
+ stats.messaging_time_s = dur_msg.count();
+ stats.latency_ms =
+ conn_state.conn_stats.sampled_total_lat_s /
+ conn_state.conn_stats.sampled_count * 1000;
+ stats.iops = ops / dur_msg.count();
+ stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576;
+
+ conn_state.stopped_send_promise.set_value(stats);
+ });
+ }
+ };
+ };
+
+ std::optional<unsigned> server_sid;
+ bool server_needs_report = false;
+ if (mode == perf_mode_t::both) {
+ ceph_assert(server_conf.is_fixed_cpu == true);
+ server_sid = server_conf.core;
+ } else if (mode == perf_mode_t::server) {
+ server_needs_report = true;
+ }
+ return seastar::when_all(
+ seastar::futurize_invoke([mode, server_conf, server_needs_report] {
+ if (mode == perf_mode_t::client) {
+ return seastar::make_ready_future<test_state::Server*>(nullptr);
+ } else {
+ return create_sharded<test_state::Server>(
+ server_conf.core,
+ server_conf.block_size,
+ server_needs_report);
+ }
+ }),
+ seastar::futurize_invoke([mode, client_conf, server_sid] {
+ if (mode == perf_mode_t::server) {
+ return seastar::make_ready_future<test_state::Client*>(nullptr);
+ } else {
+ unsigned nonce_base = ceph::util::generate_random_number<unsigned>();
+ logger().info("client nonce_base={}", nonce_base);
+ return create_sharded<test_state::Client>(
+ client_conf.num_clients,
+ client_conf.num_conns,
+ client_conf.block_size,
+ client_conf.depth,
+ nonce_base,
+ server_sid);
+ }
+ }),
+ crimson::common::sharded_conf().start(
+ EntityName{}, std::string_view{"ceph"}
+ ).then([] {
+ return crimson::common::local_conf().start();
+ }).then([crc_enabled] {
+ return crimson::common::local_conf().set_val(
+ "ms_crc_data", crc_enabled ? "true" : "false");
+ })
+ ).then([=](auto&& ret) {
+ auto server = std::move(std::get<0>(ret).get0());
+ auto client = std::move(std::get<1>(ret).get0());
+ // reserve core 0 for potentially better performance
+ if (mode == perf_mode_t::both) {
+ logger().info("\nperf settings:\n smp={}\n {}\n {}\n",
+ seastar::smp::count, client_conf.str(), server_conf.str());
+ if (client_conf.skip_core_0) {
+ ceph_assert(seastar::smp::count > client_conf.num_clients);
+ } else {
+ ceph_assert(seastar::smp::count >= client_conf.num_clients);
+ }
+ ceph_assert(client_conf.num_clients > 0);
+ ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients);
+ return seastar::when_all_succeed(
+ // it is not reasonable to allow server/client to shared cores for
+ // performance benchmarking purposes.
+ server->init(server_conf.addr, server_conf.is_fixed_cpu),
+ client->init()
+ ).then_unpack([client, addr = client_conf.server_addr] {
+ return client->connect_wait_verify(addr);
+ }).then([client, ramptime = client_conf.ramptime,
+ msgtime = client_conf.msgtime] {
+ return client->dispatch_with_timer(ramptime, msgtime);
+ }).then([client] {
+ return client->shutdown();
+ }).then([server] {
+ return server->shutdown();
+ });
+ } else if (mode == perf_mode_t::client) {
+ logger().info("\nperf settings:\n smp={}\n {}\n",
+ seastar::smp::count, client_conf.str());
+ if (client_conf.skip_core_0) {
+ ceph_assert(seastar::smp::count > client_conf.num_clients);
+ } else {
+ ceph_assert(seastar::smp::count >= client_conf.num_clients);
+ }
+ ceph_assert(client_conf.num_clients > 0);
+ return client->init(
+ ).then([client, addr = client_conf.server_addr] {
+ return client->connect_wait_verify(addr);
+ }).then([client, ramptime = client_conf.ramptime,
+ msgtime = client_conf.msgtime] {
+ return client->dispatch_with_timer(ramptime, msgtime);
+ }).then([client] {
+ return client->shutdown();
+ });
+ } else { // mode == perf_mode_t::server
+ ceph_assert(seastar::smp::count > server_conf.core);
+ logger().info("\nperf settings:\n smp={}\n {}\n",
+ seastar::smp::count, server_conf.str());
+ return seastar::async([server, server_conf] {
+ // FIXME: SIGINT is not received by stop_signal
+ seastar_apps_lib::stop_signal should_stop;
+ server->init(server_conf.addr, server_conf.is_fixed_cpu).get();
+ should_stop.wait().get();
+ server->shutdown().get();
+ });
+ }
+ }).finally([] {
+ return crimson::common::sharded_conf().stop();
+ });
+}
+
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ app.add_options()
+ ("mode", bpo::value<unsigned>()->default_value(0),
+ "0: both, 1:client, 2:server")
+ ("server-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+ "server address(only support msgr v2 protocol)")
+ ("ramptime", bpo::value<unsigned>()->default_value(5),
+ "seconds of client ramp-up time")
+ ("msgtime", bpo::value<unsigned>()->default_value(15),
+ "seconds of client messaging time")
+ ("clients", bpo::value<unsigned>()->default_value(1),
+ "number of client messengers")
+ ("conns-per-client", bpo::value<unsigned>()->default_value(1),
+ "number of connections per client")
+ ("client-bs", bpo::value<unsigned>()->default_value(4096),
+ "client block size")
+ ("depth", bpo::value<unsigned>()->default_value(512),
+ "client io depth per job")
+ ("client-skip-core-0", bpo::value<bool>()->default_value(true),
+ "client skip core 0")
+ ("server-fixed-cpu", bpo::value<bool>()->default_value(true),
+ "server is in the fixed cpu mode, non-fixed doesn't support the mode both")
+ ("server-core", bpo::value<unsigned>()->default_value(1),
+ "server messenger running core")
+ ("server-bs", bpo::value<unsigned>()->default_value(0),
+ "server block size")
+ ("crc-enabled", bpo::value<bool>()->default_value(false),
+ "enable CRC checks");
+ return app.run(argc, argv, [&app] {
+ auto&& config = app.configuration();
+ auto mode = config["mode"].as<unsigned>();
+ ceph_assert(mode <= 2);
+ auto _mode = static_cast<perf_mode_t>(mode);
+ bool crc_enabled = config["crc-enabled"].as<bool>();
+ auto server_conf = server_config::load(config);
+ auto client_conf = client_config::load(config);
+ return run(_mode, client_conf, server_conf, crc_enabled
+ ).then([] {
+ logger().info("\nsuccessful!\n");
+ }).handle_exception([] (auto eptr) {
+ logger().info("\nfailed!\n");
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}
diff --git a/src/crimson/tools/perf_staged_fltree.cc b/src/crimson/tools/perf_staged_fltree.cc
new file mode 100644
index 000000000..81b621750
--- /dev/null
+++ b/src/crimson/tools/perf_staged_fltree.cc
@@ -0,0 +1,178 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/program_options.hpp>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+
+#include "crimson/common/config_proxy.h"
+#include "crimson/common/log.h"
+#include "crimson/common/perf_counters_collection.h"
+#include "crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h"
+#include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager.h"
+
+#include "test/crimson/seastore/onode_tree/test_value.h"
+#include "test/crimson/seastore/transaction_manager_test_state.h"
+
+using namespace crimson::os::seastore::onode;
+namespace bpo = boost::program_options;
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_test);
+}
+
+template <bool TRACK>
+class PerfTree : public TMTestState {
+ public:
+ PerfTree(bool is_dummy) : is_dummy{is_dummy} {}
+
+ seastar::future<> run(KVPool<test_item_t>& kvs, double erase_ratio) {
+ return tm_setup().then([this, &kvs, erase_ratio] {
+ return seastar::async([this, &kvs, erase_ratio] {
+ auto tree = std::make_unique<TreeBuilder<TRACK, ExtendedValue>>(kvs,
+ (is_dummy ? NodeExtentManager::create_dummy(true)
+ : NodeExtentManager::create_seastore(*tm)));
+ {
+ auto t = create_mutate_transaction();
+ with_trans_intr(*t, [&](auto &tr){
+ return tree->bootstrap(tr);
+ }).unsafe_get();
+ submit_transaction(std::move(t));
+ }
+ {
+ auto t = create_mutate_transaction();
+ with_trans_intr(*t, [&](auto &tr){
+ return tree->insert(tr);
+ }).unsafe_get();
+ auto start_time = mono_clock::now();
+ submit_transaction(std::move(t));
+ std::chrono::duration<double> duration = mono_clock::now() - start_time;
+ logger().warn("submit_transaction() done! {}s", duration.count());
+ }
+ {
+ // Note: create_weak_transaction() can also work, but too slow.
+ auto t = create_read_transaction();
+ with_trans_intr(*t, [&](auto &tr){
+ return tree->get_stats(tr);
+ }).unsafe_get();
+
+ with_trans_intr(*t, [&](auto &tr){
+ return tree->validate(tr);
+ }).unsafe_get();
+ }
+ {
+ auto t = create_mutate_transaction();
+ with_trans_intr(*t, [&](auto &tr){
+ return tree->erase(tr, kvs.size() * erase_ratio);
+ }).unsafe_get();
+ submit_transaction(std::move(t));
+ }
+ {
+ auto t = create_read_transaction();
+ with_trans_intr(*t, [&](auto &tr){
+ return tree->get_stats(tr);
+ }).unsafe_get();
+
+ with_trans_intr(*t, [&](auto &tr){
+ return tree->validate(tr);
+ }).unsafe_get();
+ }
+ tree.reset();
+ });
+ }).then([this] {
+ return tm_teardown();
+ });
+ }
+
+ private:
+ bool is_dummy;
+};
+
+template <bool TRACK>
+seastar::future<> run(const bpo::variables_map& config) {
+ return seastar::async([&config] {
+ auto backend = config["backend"].as<std::string>();
+ bool is_dummy;
+ if (backend == "dummy") {
+ is_dummy = true;
+ } else if (backend == "seastore") {
+ is_dummy = false;
+ } else {
+ ceph_abort(false && "invalid backend");
+ }
+ auto ns_sizes = config["ns-sizes"].as<std::vector<size_t>>();
+ auto oid_sizes = config["oid-sizes"].as<std::vector<size_t>>();
+ auto onode_sizes = config["onode-sizes"].as<std::vector<size_t>>();
+ auto range2 = config["range2"].as<std::vector<int>>();
+ ceph_assert(range2.size() == 2);
+ auto range1 = config["range1"].as<std::vector<unsigned>>();
+ ceph_assert(range1.size() == 2);
+ auto range0 = config["range0"].as<std::vector<unsigned>>();
+ ceph_assert(range0.size() == 2);
+ auto erase_ratio = config["erase-ratio"].as<double>();
+ ceph_assert(erase_ratio >= 0);
+ ceph_assert(erase_ratio <= 1);
+
+ using crimson::common::sharded_conf;
+ sharded_conf().start(EntityName{}, std::string_view{"ceph"}).get();
+ seastar::engine().at_exit([] {
+ return sharded_conf().stop();
+ });
+
+ using crimson::common::sharded_perf_coll;
+ sharded_perf_coll().start().get();
+ seastar::engine().at_exit([] {
+ return sharded_perf_coll().stop();
+ });
+
+ auto kvs = KVPool<test_item_t>::create_raw_range(
+ ns_sizes, oid_sizes, onode_sizes,
+ {range2[0], range2[1]},
+ {range1[0], range1[1]},
+ {range0[0], range0[1]});
+ PerfTree<TRACK> perf{is_dummy};
+ perf.run(kvs, erase_ratio).get0();
+ });
+}
+
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ app.add_options()
+ ("backend", bpo::value<std::string>()->default_value("dummy"),
+ "tree backend: dummy, seastore")
+ ("tracked", bpo::value<bool>()->default_value(false),
+ "track inserted cursors")
+ ("ns-sizes", bpo::value<std::vector<size_t>>()->default_value(
+ {8, 11, 64, 128, 255, 256}),
+ "sizes of ns strings")
+ ("oid-sizes", bpo::value<std::vector<size_t>>()->default_value(
+ {8, 13, 64, 512, 2035, 2048}),
+ "sizes of oid strings")
+ ("onode-sizes", bpo::value<std::vector<size_t>>()->default_value(
+ {8, 16, 128, 576, 992, 1200}),
+ "sizes of onode")
+ ("range2", bpo::value<std::vector<int>>()->default_value(
+ {0, 128}),
+ "range of shard-pool-crush [a, b)")
+ ("range1", bpo::value<std::vector<unsigned>>()->default_value(
+ {0, 10}),
+ "range of ns-oid strings [a, b)")
+ ("range0", bpo::value<std::vector<unsigned>>()->default_value(
+ {0, 4}),
+ "range of snap-gen [a, b)")
+ ("erase-ratio", bpo::value<double>()->default_value(
+ 0.8),
+ "erase-ratio of all the inserted onodes");
+ return app.run(argc, argv, [&app] {
+ auto&& config = app.configuration();
+ auto tracked = config["tracked"].as<bool>();
+ if (tracked) {
+ return run<true>(config);
+ } else {
+ return run<false>(config);
+ }
+ });
+}
diff --git a/src/crimson/tools/store_nbd/block_driver.cc b/src/crimson/tools/store_nbd/block_driver.cc
new file mode 100644
index 000000000..10e77a34b
--- /dev/null
+++ b/src/crimson/tools/store_nbd/block_driver.cc
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "fs_driver.h"
+#include "block_driver.h"
+
+#include "tm_driver.h"
+
+BlockDriverRef get_backend(BlockDriver::config_t config)
+{
+ if (config.type == "transaction_manager") {
+ return std::make_unique<TMDriver>(config);
+ } else if (config.is_futurized_store()) {
+ return std::make_unique<FSDriver>(config);
+ } else {
+ ceph_assert(0 == "invalid option");
+ return BlockDriverRef();
+ }
+}
diff --git a/src/crimson/tools/store_nbd/block_driver.h b/src/crimson/tools/store_nbd/block_driver.h
new file mode 100644
index 000000000..ea3453ef3
--- /dev/null
+++ b/src/crimson/tools/store_nbd/block_driver.h
@@ -0,0 +1,134 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/parsers.hpp>
+
+#include <seastar/core/future.hh>
+
+#include <string>
+#include <optional>
+
+#include "include/buffer.h"
+
+/**
+ * BlockDriver
+ *
+ * Simple interface to enable throughput test to compare raw disk to
+ * transaction_manager, etc
+ */
+class BlockDriver {
+public:
+ struct config_t {
+ std::string type;
+ bool mkfs = false;
+ unsigned num_pgs = 128;
+ unsigned log_size = 1000;
+ unsigned object_size = 4<<20 /* 4MB, rbd default */;
+ unsigned oi_size = 1<<9 /* 512b */;
+ unsigned log_entry_size = 1<<9 /* 512b */;
+ bool prepopulate_log = false;
+ std::optional<std::string> path;
+
+ bool is_futurized_store() const {
+ return type == "seastore" || type == "bluestore";
+ }
+
+ std::string get_fs_type() const {
+ ceph_assert(is_futurized_store());
+ return type;
+ }
+
+ bool oi_enabled() const {
+ return oi_size > 0;
+ }
+
+ bool log_enabled() const {
+ return log_entry_size > 0 && log_size > 0;
+ }
+
+ bool prepopulate_log_enabled() const {
+ return prepopulate_log;
+ }
+
+ void populate_options(
+ boost::program_options::options_description &desc)
+ {
+ namespace po = boost::program_options;
+ desc.add_options()
+ ("type",
+ po::value<std::string>()
+ ->default_value("transaction_manager")
+ ->notifier([this](auto s) { type = s; }),
+ "Backend to use, options are transaction_manager, seastore"
+ )
+ ("device-path",
+ po::value<std::string>()
+ ->required()
+ ->notifier([this](auto s) { path = s; }),
+ "Path to device for backend"
+ )
+ ("num-pgs",
+ po::value<unsigned>()
+ ->notifier([this](auto s) { num_pgs = s; }),
+ "Number of pgs to use for futurized_store backends"
+ )
+ ("log-size",
+ po::value<unsigned>()
+ ->notifier([this](auto s) { log_size = s; }),
+ "Number of log entries per pg to use for futurized_store backends"
+ ", 0 to disable"
+ )
+ ("log-entry-size",
+ po::value<unsigned>()
+ ->notifier([this](auto s) { log_entry_size = s; }),
+ "Size of each log entry per pg to use for futurized_store backends"
+ ", 0 to disable"
+ )
+ ("prepopulate-log",
+ po::value<bool>()
+ ->notifier([this](auto s) { prepopulate_log = s; }),
+ "Prepopulate log on mount"
+ )
+ ("object-info-size",
+ po::value<unsigned>()
+ ->notifier([this](auto s) { log_entry_size = s; }),
+ "Size of each log entry per pg to use for futurized_store backends"
+ ", 0 to disable"
+ )
+ ("object-size",
+ po::value<unsigned>()
+ ->notifier([this](auto s) { object_size = s; }),
+ "Object size to use for futurized_store backends"
+ )
+ ("mkfs",
+ po::value<bool>()
+ ->default_value(false)
+ ->notifier([this](auto s) { mkfs = s; }),
+ "Do mkfs first"
+ );
+ }
+ };
+
+ virtual ceph::bufferptr get_buffer(size_t size) = 0;
+
+ virtual seastar::future<> write(
+ off_t offset,
+ ceph::bufferptr ptr) = 0;
+
+ virtual seastar::future<ceph::bufferlist> read(
+ off_t offset,
+ size_t size) = 0;
+
+ virtual size_t get_size() const = 0;
+
+ virtual seastar::future<> mount() = 0;
+ virtual seastar::future<> close() = 0;
+
+ virtual ~BlockDriver() {}
+};
+using BlockDriverRef = std::unique_ptr<BlockDriver>;
+
+BlockDriverRef get_backend(BlockDriver::config_t config);
diff --git a/src/crimson/tools/store_nbd/fs_driver.cc b/src/crimson/tools/store_nbd/fs_driver.cc
new file mode 100644
index 000000000..18f836766
--- /dev/null
+++ b/src/crimson/tools/store_nbd/fs_driver.cc
@@ -0,0 +1,310 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/iterator/counting_iterator.hpp>
+#include <fmt/format.h>
+
+#include "os/Transaction.h"
+#include "fs_driver.h"
+
+using namespace crimson;
+using namespace crimson::os;
+
+coll_t get_coll(unsigned num) {
+ return coll_t(spg_t(pg_t(0, num)));
+}
+
+ghobject_t get_log_object(unsigned coll)
+{
+ return ghobject_t(
+ shard_id_t::NO_SHARD,
+ 0,
+ (coll << 16),
+ "",
+ "",
+ 0,
+ ghobject_t::NO_GEN);
+}
+
+std::string make_log_key(
+ unsigned i)
+{
+ return fmt::format("log_entry_{}", i);
+}
+
+void add_log_entry(
+ unsigned i,
+ unsigned entry_size,
+ std::map<std::string, ceph::buffer::list> *omap)
+{
+ assert(omap);
+ bufferlist bl;
+ bl.append(ceph::buffer::create('0', entry_size));
+
+ omap->emplace(std::make_pair(make_log_key(i), bl));
+}
+
+void populate_log(
+ ceph::os::Transaction &t,
+ FSDriver::pg_analogue_t &pg,
+ unsigned entry_size,
+ unsigned entries)
+{
+ t.touch(pg.collection->get_cid(), pg.log_object);
+ // omap_clear not yet implemented, TODO
+ // t.omap_clear(pg.collection->get_cid(), pg.log_object);
+
+ std::map<std::string, ceph::buffer::list> omap;
+ for (unsigned i = 0; i < entries; ++i) {
+ add_log_entry(i, entry_size, &omap);
+ }
+
+ t.omap_setkeys(
+ pg.collection->get_cid(),
+ pg.log_object,
+ omap);
+
+ pg.log_head = entries;
+}
+
+void update_log(
+ ceph::os::Transaction &t,
+ FSDriver::pg_analogue_t &pg,
+ unsigned entry_size,
+ unsigned entries)
+{
+ ++pg.log_head;
+ std::map<std::string, ceph::buffer::list> key;
+ add_log_entry(pg.log_head, entry_size, &key);
+
+ t.omap_setkeys(
+ pg.collection->get_cid(),
+ pg.log_object,
+ key);
+
+
+ while ((pg.log_head - pg.log_tail) > entries) {
+ t.omap_rmkey(
+ pg.collection->get_cid(),
+ pg.log_object,
+ make_log_key(pg.log_tail));
+ ++pg.log_tail;
+ }
+}
+
+FSDriver::offset_mapping_t FSDriver::map_offset(off_t offset)
+{
+ uint32_t objid = offset / config.object_size;
+ uint32_t collid = objid % config.num_pgs;
+ return offset_mapping_t{
+ collections[collid],
+ ghobject_t(
+ shard_id_t::NO_SHARD,
+ 0,
+ (collid << 16) | (objid + 1),
+ "",
+ "",
+ 0,
+ ghobject_t::NO_GEN),
+ offset % config.object_size
+ };
+}
+
+seastar::future<> FSDriver::write(
+ off_t offset,
+ bufferptr ptr)
+{
+ auto mapping = map_offset(offset);
+ ceph_assert(mapping.offset + ptr.length() <= config.object_size);
+ ceph::os::Transaction t;
+ bufferlist bl;
+ bl.append(ptr);
+ t.write(
+ mapping.pg.collection->get_cid(),
+ mapping.object,
+ mapping.offset,
+ ptr.length(),
+ bl,
+ 0);
+
+ if (config.oi_enabled() ) {
+ bufferlist attr;
+ attr.append(ceph::buffer::create(config.oi_size, '0'));
+ t.setattr(
+ mapping.pg.collection->get_cid(),
+ mapping.object,
+ "_",
+ attr);
+ }
+
+ if (config.log_enabled()) {
+ update_log(
+ t,
+ mapping.pg,
+ config.log_entry_size,
+ config.log_size);
+ }
+
+ return sharded_fs->do_transaction(
+ mapping.pg.collection,
+ std::move(t));
+}
+
+seastar::future<bufferlist> FSDriver::read(
+ off_t offset,
+ size_t size)
+{
+ auto mapping = map_offset(offset);
+ ceph_assert((mapping.offset + size) <= config.object_size);
+ return sharded_fs->read(
+ mapping.pg.collection,
+ mapping.object,
+ mapping.offset,
+ size,
+ 0
+ ).handle_error(
+ crimson::ct_error::enoent::handle([size](auto &e) {
+ bufferlist bl;
+ bl.append_zero(size);
+ return seastar::make_ready_future<bufferlist>(std::move(bl));
+ }),
+ crimson::ct_error::assert_all{"Unrecoverable error in FSDriver::read"}
+ ).then([size](auto &&bl) {
+ if (bl.length() < size) {
+ bl.append_zero(size - bl.length());
+ }
+ return seastar::make_ready_future<bufferlist>(std::move(bl));
+ });
+}
+
+seastar::future<> FSDriver::mkfs()
+{
+ return init(
+ ).then([this] {
+ assert(fs);
+ uuid_d uuid;
+ uuid.generate_random();
+ return fs->mkfs(uuid).handle_error(
+ crimson::stateful_ec::handle([] (const auto& ec) {
+ crimson::get_logger(ceph_subsys_test)
+ .error("error creating empty object store in {}: ({}) {}",
+ crimson::common::local_conf().get_val<std::string>("osd_data"),
+ ec.value(), ec.message());
+ std::exit(EXIT_FAILURE);
+ }));
+ }).then([this] {
+ return fs->stop();
+ }).then([this] {
+ return init();
+ }).then([this] {
+ return fs->mount(
+ ).handle_error(
+ crimson::stateful_ec::handle([] (const auto& ec) {
+ crimson::get_logger(
+ ceph_subsys_test
+ ).error(
+ "error mounting object store in {}: ({}) {}",
+ crimson::common::local_conf().get_val<std::string>("osd_data"),
+ ec.value(),
+ ec.message());
+ std::exit(EXIT_FAILURE);
+ }));
+ }).then([this] {
+ return seastar::do_for_each(
+ boost::counting_iterator<unsigned>(0),
+ boost::counting_iterator<unsigned>(config.num_pgs),
+ [this](auto i) {
+ return sharded_fs->create_new_collection(get_coll(i)
+ ).then([this, i](auto coll) {
+ ceph::os::Transaction t;
+ t.create_collection(get_coll(i), 0);
+ return sharded_fs->do_transaction(coll, std::move(t));
+ });
+ });
+ }).then([this] {
+ return fs->umount();
+ }).then([this] {
+ return fs->stop();
+ }).then([this] {
+ fs.reset();
+ return seastar::now();
+ });
+}
+
+seastar::future<> FSDriver::mount()
+{
+ ceph_assert(config.path);
+ return (
+ config.mkfs ? mkfs() : seastar::now()
+ ).then([this] {
+ return init();
+ }).then([this] {
+ return fs->mount(
+ ).handle_error(
+ crimson::stateful_ec::handle([] (const auto& ec) {
+ crimson::get_logger(
+ ceph_subsys_test
+ ).error(
+ "error mounting object store in {}: ({}) {}",
+ crimson::common::local_conf().get_val<std::string>("osd_data"),
+ ec.value(),
+ ec.message());
+ std::exit(EXIT_FAILURE);
+ }));
+ }).then([this] {
+ return seastar::do_for_each(
+ boost::counting_iterator<unsigned>(0),
+ boost::counting_iterator<unsigned>(config.num_pgs),
+ [this](auto i) {
+ return sharded_fs->open_collection(get_coll(i)
+ ).then([this, i](auto ref) {
+ collections[i].collection = ref;
+ collections[i].log_object = get_log_object(i);
+ if (config.log_enabled()) {
+ ceph::os::Transaction t;
+ if (config.prepopulate_log_enabled()) {
+ populate_log(
+ t,
+ collections[i],
+ config.log_entry_size,
+ config.log_size);
+ }
+ return sharded_fs->do_transaction(
+ collections[i].collection,
+ std::move(t));
+ } else {
+ return seastar::now();
+ }
+ });
+ });
+ }).then([this] {
+ return fs->stat();
+ }).then([this](auto s) {
+ size = s.total;
+ });
+};
+
+seastar::future<> FSDriver::close()
+{
+ collections.clear();
+ return fs->umount(
+ ).then([this] {
+ return fs->stop();
+ }).then([this] {
+ fs.reset();
+ return seastar::now();
+ });
+}
+
+seastar::future<> FSDriver::init()
+{
+ fs.reset();
+ fs = FuturizedStore::create(
+ config.get_fs_type(),
+ *config.path,
+ crimson::common::local_conf().get_config_values()
+ );
+ return fs->start().then([this] {
+ sharded_fs = &(fs->get_sharded_store());
+ });
+}
diff --git a/src/crimson/tools/store_nbd/fs_driver.h b/src/crimson/tools/store_nbd/fs_driver.h
new file mode 100644
index 000000000..89aca075f
--- /dev/null
+++ b/src/crimson/tools/store_nbd/fs_driver.h
@@ -0,0 +1,72 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "block_driver.h"
+
+#include "crimson/os/futurized_collection.h"
+#include "crimson/os/futurized_store.h"
+
+class FSDriver final : public BlockDriver {
+public:
+ FSDriver(config_t config)
+ : config(config)
+ {}
+ ~FSDriver() final {}
+
+ bufferptr get_buffer(size_t size) final {
+ return ceph::buffer::create_page_aligned(size);
+ }
+
+ seastar::future<> write(
+ off_t offset,
+ bufferptr ptr) final;
+
+ seastar::future<bufferlist> read(
+ off_t offset,
+ size_t size) final;
+
+ size_t get_size() const {
+ return size;
+ }
+
+ seastar::future<> mount() final;
+
+ seastar::future<> close() final;
+
+private:
+ size_t size = 0;
+ const config_t config;
+ std::unique_ptr<crimson::os::FuturizedStore> fs;
+ crimson::os::FuturizedStore::Shard* sharded_fs;
+
+ struct pg_analogue_t {
+ crimson::os::CollectionRef collection;
+
+ ghobject_t log_object;
+ unsigned log_tail = 0;
+ unsigned log_head = 0;
+ };
+ std::map<unsigned, pg_analogue_t> collections;
+
+ struct offset_mapping_t {
+ pg_analogue_t &pg;
+ ghobject_t object;
+ off_t offset;
+ };
+ offset_mapping_t map_offset(off_t offset);
+
+ seastar::future<> mkfs();
+ seastar::future<> init();
+
+ friend void populate_log(
+ ceph::os::Transaction &,
+ pg_analogue_t &,
+ unsigned,
+ unsigned);
+
+ friend void update_log(
+ ceph::os::Transaction &,
+ FSDriver::pg_analogue_t &,
+ unsigned,
+ unsigned);
+};
diff --git a/src/crimson/tools/store_nbd/store-nbd.cc b/src/crimson/tools/store_nbd/store-nbd.cc
new file mode 100644
index 000000000..9f80c3b2c
--- /dev/null
+++ b/src/crimson/tools/store_nbd/store-nbd.cc
@@ -0,0 +1,456 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+/**
+ * crimson-store-nbd
+ *
+ * This tool exposes crimson object store internals as an nbd server
+ * for use with fio in basic benchmarking.
+ *
+ * Example usage:
+ *
+ * $ ./bin/crimson-store-nbd --device-path /dev/nvme1n1 -c 1 --mkfs true --uds-path /tmp/store_nbd_socket.sock
+ *
+ * $ cat nbd.fio
+ * [global]
+ * ioengine=nbd
+ * uri=nbd+unix:///?socket=/tmp/store_nbd_socket.sock
+ * rw=randrw
+ * time_based
+ * runtime=120
+ * group_reporting
+ * iodepth=1
+ * size=500G
+ *
+ * [job0]
+ * offset=0
+ *
+ * $ fio nbd.fio
+ */
+
+#include <random>
+
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/parsers.hpp>
+
+#include <linux/nbd.h>
+#include <linux/fs.h>
+
+#include <seastar/apps/lib/stop_signal.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/byteorder.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/rwlock.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/util/defer.hh>
+
+#include "crimson/common/config_proxy.h"
+#include "crimson/common/log.h"
+
+#include "block_driver.h"
+
+namespace po = boost::program_options;
+
+using namespace ceph;
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_test);
+ }
+}
+
+struct request_context_t {
+ uint32_t magic = 0;
+ uint32_t type = 0;
+
+ char handle[8] = {0};
+
+ uint64_t from = 0;
+ uint32_t len = 0;
+
+ unsigned err = 0;
+ std::optional<bufferptr> in_buffer;
+ std::optional<bufferlist> out_buffer;
+
+ using ref = std::unique_ptr<request_context_t>;
+ static ref make_ref() {
+ return std::make_unique<request_context_t>();
+ }
+
+ bool check_magic() const {
+ auto ret = magic == NBD_REQUEST_MAGIC;
+ if (!ret) {
+ logger().error(
+ "Invalid magic {} should be {}",
+ magic,
+ NBD_REQUEST_MAGIC);
+ }
+ return ret;
+ }
+
+ uint32_t get_command() const {
+ return type & 0xff;
+ }
+
+ bool has_input_buffer() const {
+ return get_command() == NBD_CMD_WRITE;
+ }
+
+ seastar::future<> read_request(seastar::input_stream<char> &in) {
+ return in.read_exactly(sizeof(struct nbd_request)
+ ).then([this, &in](auto buf) {
+ if (buf.size() < sizeof(struct nbd_request)) {
+ throw std::system_error(
+ std::make_error_code(
+ std::errc::connection_reset));
+ }
+ auto p = buf.get();
+ magic = seastar::consume_be<uint32_t>(p);
+ type = seastar::consume_be<uint32_t>(p);
+ memcpy(handle, p, sizeof(handle));
+ p += sizeof(handle);
+ from = seastar::consume_be<uint64_t>(p);
+ len = seastar::consume_be<uint32_t>(p);
+ logger().debug(
+ "Got request, magic {}, type {}, from {}, len {}",
+ magic, type, from, len);
+
+ if (!check_magic()) {
+ throw std::system_error(
+ std::make_error_code(
+ std::errc::invalid_argument));
+ }
+
+ if (has_input_buffer()) {
+ return in.read_exactly(len).then([this](auto buf) {
+ in_buffer = ceph::buffer::create_page_aligned(len);
+ in_buffer->copy_in(0, len, buf.get());
+ return seastar::now();
+ });
+ } else {
+ return seastar::now();
+ }
+ });
+ }
+
+ seastar::future<> write_reply(seastar::output_stream<char> &out) {
+ seastar::temporary_buffer<char> buffer{sizeof(struct nbd_reply)};
+ auto p = buffer.get_write();
+ seastar::produce_be<uint32_t>(p, NBD_REPLY_MAGIC);
+ seastar::produce_be<uint32_t>(p, err);
+ logger().debug("write_reply writing err {}", err);
+ memcpy(p, handle, sizeof(handle));
+ return out.write(std::move(buffer)).then([this, &out] {
+ if (out_buffer) {
+ return seastar::do_for_each(
+ out_buffer->mut_buffers(),
+ [&out](bufferptr &ptr) {
+ logger().debug("write_reply writing {}", ptr.length());
+ return out.write(
+ seastar::temporary_buffer<char>(
+ ptr.c_str(),
+ ptr.length(),
+ seastar::make_deleter([ptr](){}))
+ );
+ });
+ } else {
+ return seastar::now();
+ }
+ }).then([&out] {
+ return out.flush();
+ });
+ }
+};
+
+struct RequestWriter {
+ seastar::rwlock lock;
+ seastar::output_stream<char> stream;
+ seastar::gate gate;
+
+ RequestWriter(
+ seastar::output_stream<char> &&stream) : stream(std::move(stream)) {}
+ RequestWriter(RequestWriter &&) = default;
+
+ seastar::future<> complete(request_context_t::ref &&req) {
+ auto &request = *req;
+ return lock.write_lock(
+ ).then([&request, this] {
+ return request.write_reply(stream);
+ }).finally([&, this, req=std::move(req)] {
+ lock.write_unlock();
+ logger().debug("complete");
+ return seastar::now();
+ });
+ }
+
+ seastar::future<> close() {
+ return gate.close().then([this] {
+ return stream.close();
+ });
+ }
+};
+
+/**
+ * NBDHandler
+ *
+ * Simple throughput test for concurrent, single threaded
+ * writes to an BlockDriver.
+ */
+class NBDHandler {
+ BlockDriver &backend;
+ std::string uds_path;
+ std::optional<seastar::server_socket> server_socket;
+ std::optional<seastar::connected_socket> connected_socket;
+ seastar::gate gate;
+public:
+ struct config_t {
+ std::string uds_path;
+
+ void populate_options(
+ po::options_description &desc)
+ {
+ desc.add_options()
+ ("uds-path",
+ po::value<std::string>()
+ ->default_value("/tmp/store_nbd_socket.sock")
+ ->notifier([this](auto s) {
+ uds_path = s;
+ }),
+ "Path to domain socket for nbd"
+ );
+ }
+ };
+
+ NBDHandler(
+ BlockDriver &backend,
+ config_t config) :
+ backend(backend),
+ uds_path(config.uds_path)
+ {}
+
+ void run();
+ seastar::future<> stop();
+};
+
+int main(int argc, char** argv)
+{
+ po::options_description desc{"Allowed options"};
+ bool debug = false;
+ desc.add_options()
+ ("help,h", "show help message")
+ ("debug", po::value<bool>(&debug)->default_value(false),
+ "enable debugging");
+
+ po::options_description nbd_pattern_options{"NBD Pattern Options"};
+ NBDHandler::config_t nbd_config;
+ nbd_config.populate_options(nbd_pattern_options);
+ desc.add(nbd_pattern_options);
+
+ po::options_description backend_pattern_options{"Backend Options"};
+ BlockDriver::config_t backend_config;
+ backend_config.populate_options(backend_pattern_options);
+ desc.add(backend_pattern_options);
+
+ po::variables_map vm;
+ std::vector<std::string> unrecognized_options;
+ try {
+ auto parsed = po::command_line_parser(argc, argv)
+ .options(desc)
+ .allow_unregistered()
+ .run();
+ po::store(parsed, vm);
+ if (vm.count("help")) {
+ std::cout << desc << std::endl;
+ return 0;
+ }
+
+ po::notify(vm);
+ unrecognized_options =
+ po::collect_unrecognized(parsed.options, po::include_positional);
+ } catch(const po::error& e) {
+ std::cerr << "error: " << e.what() << std::endl;
+ return 1;
+ }
+ std::vector<const char*> args(argv, argv + argc);
+
+ seastar::app_template::config app_cfg;
+ app_cfg.name = "crimson-store-nbd";
+ app_cfg.auto_handle_sigint_sigterm = false;
+ seastar::app_template app(std::move(app_cfg));
+
+ std::vector<char*> av{argv[0]};
+ std::transform(begin(unrecognized_options),
+ end(unrecognized_options),
+ std::back_inserter(av),
+ [](auto& s) {
+ return const_cast<char*>(s.c_str());
+ });
+ return app.run(av.size(), av.data(), [&] {
+ if (debug) {
+ seastar::global_logger_registry().set_all_loggers_level(
+ seastar::log_level::debug
+ );
+ }
+ return seastar::async([&] {
+ seastar_apps_lib::stop_signal should_stop;
+ crimson::common::sharded_conf()
+ .start(EntityName{}, std::string_view{"ceph"}).get();
+ auto stop_conf = seastar::defer([] {
+ crimson::common::sharded_conf().stop().get();
+ });
+
+ auto backend = get_backend(backend_config);
+ NBDHandler nbd(*backend, nbd_config);
+ backend->mount().get();
+ auto close_backend = seastar::defer([&] {
+ backend->close().get();
+ });
+
+ logger().debug("Running nbd server...");
+ nbd.run();
+ auto stop_nbd = seastar::defer([&] {
+ nbd.stop().get();
+ });
+ should_stop.wait().get();
+ return 0;
+ });
+ });
+}
+
+class nbd_oldstyle_negotiation_t {
+ uint64_t magic = seastar::cpu_to_be(0x4e42444d41474943); // "NBDMAGIC"
+ uint64_t magic2 = seastar::cpu_to_be(0x00420281861253); // "IHAVEOPT"
+ uint64_t size = 0;
+ uint32_t flags = seastar::cpu_to_be(0);
+ char reserved[124] = {0};
+
+public:
+ nbd_oldstyle_negotiation_t(uint64_t size, uint32_t flags)
+ : size(seastar::cpu_to_be(size)), flags(seastar::cpu_to_be(flags)) {}
+} __attribute__((packed));
+
+seastar::future<> send_negotiation(
+ size_t size,
+ seastar::output_stream<char>& out)
+{
+ seastar::temporary_buffer<char> buf{sizeof(nbd_oldstyle_negotiation_t)};
+ new (buf.get_write()) nbd_oldstyle_negotiation_t(size, 1);
+ return out.write(std::move(buf)
+ ).then([&out] {
+ return out.flush();
+ });
+}
+
+seastar::future<> handle_command(
+ BlockDriver &backend,
+ request_context_t::ref request_ref,
+ RequestWriter &out)
+{
+ auto &request = *request_ref;
+ logger().debug("got command {}", request.get_command());
+ return ([&] {
+ switch (request.get_command()) {
+ case NBD_CMD_WRITE:
+ return backend.write(
+ request.from,
+ *request.in_buffer);
+ case NBD_CMD_READ:
+ return backend.read(
+ request.from,
+ request.len).then([&] (auto buffer) {
+ logger().debug("read returned buffer len {}", buffer.length());
+ request.out_buffer = buffer;
+ });
+ case NBD_CMD_DISC:
+ throw std::system_error(std::make_error_code(std::errc::bad_message));
+ case NBD_CMD_TRIM:
+ throw std::system_error(std::make_error_code(std::errc::bad_message));
+ default:
+ throw std::system_error(std::make_error_code(std::errc::bad_message));
+ }
+ })().then([&, request_ref=std::move(request_ref)]() mutable {
+ logger().debug("handle_command complete");
+ return out.complete(std::move(request_ref));
+ });
+}
+
+
+seastar::future<> handle_commands(
+ BlockDriver &backend,
+ seastar::input_stream<char>& in,
+ RequestWriter &out)
+{
+ logger().debug("handle_commands");
+ return seastar::keep_doing([&] {
+ logger().debug("waiting for command");
+ auto request_ref = request_context_t::make_ref();
+ auto &request = *request_ref;
+ return request.read_request(in).then(
+ [&, request_ref=std::move(request_ref)]() mutable {
+ // keep running in background
+ (void)seastar::try_with_gate(out.gate,
+ [&backend, &out, request_ref=std::move(request_ref)]() mutable {
+ return handle_command(backend, std::move(request_ref), out);
+ });
+ logger().debug("handle_commands after fork");
+ });
+ }).handle_exception_type([](const seastar::gate_closed_exception&) {});
+}
+
+void NBDHandler::run()
+{
+ logger().debug("About to listen on {}", uds_path);
+ server_socket = seastar::engine().listen(
+ seastar::socket_address{
+ seastar::unix_domain_addr{uds_path}});
+
+ // keep running in background
+ (void)seastar::keep_doing([this] {
+ return seastar::try_with_gate(gate, [this] {
+ return server_socket->accept().then([this](auto acc) {
+ logger().debug("Accepted");
+ connected_socket = std::move(acc.connection);
+ return seastar::do_with(
+ connected_socket->input(),
+ RequestWriter{connected_socket->output()},
+ [&, this](auto &input, auto &output) {
+ return send_negotiation(
+ backend.get_size(),
+ output.stream
+ ).then([&, this] {
+ return handle_commands(backend, input, output);
+ }).finally([&] {
+ std::cout << "closing input and output" << std::endl;
+ return seastar::when_all(input.close(),
+ output.close());
+ }).discard_result().handle_exception([](auto e) {
+ logger().error("NBDHandler::run saw exception {}", e);
+ });
+ });
+ }).handle_exception_type([] (const std::system_error &e) {
+ // an ECONNABORTED is expected when we are being stopped.
+ if (e.code() != std::errc::connection_aborted) {
+ logger().error("accept failed: {}", e);
+ }
+ });
+ });
+ }).handle_exception_type([](const seastar::gate_closed_exception&) {});
+}
+
+seastar::future<> NBDHandler::stop()
+{
+ if (server_socket) {
+ server_socket->abort_accept();
+ }
+ if (connected_socket) {
+ connected_socket->shutdown_input();
+ connected_socket->shutdown_output();
+ }
+ return gate.close().then([this] {
+ if (!server_socket.has_value()) {
+ return seastar::now();
+ }
+ return seastar::remove_file(uds_path);
+ });
+}
diff --git a/src/crimson/tools/store_nbd/tm_driver.cc b/src/crimson/tools/store_nbd/tm_driver.cc
new file mode 100644
index 000000000..bd216fd58
--- /dev/null
+++ b/src/crimson/tools/store_nbd/tm_driver.cc
@@ -0,0 +1,222 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tm_driver.h"
+
+using namespace crimson;
+using namespace crimson::os;
+using namespace crimson::os::seastore;
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_test);
+ }
+}
+
+seastar::future<> TMDriver::write(
+ off_t offset,
+ bufferptr ptr)
+{
+ logger().debug("Writing offset {}", offset);
+ assert(offset % device->get_block_size() == 0);
+ assert((ptr.length() % device->get_block_size()) == 0);
+ return seastar::do_with(ptr, [this, offset](auto& ptr) {
+ return repeat_eagain([this, offset, &ptr] {
+ return tm->with_transaction_intr(
+ Transaction::src_t::MUTATE,
+ "write",
+ [this, offset, &ptr](auto& t)
+ {
+ return tm->dec_ref(t, offset
+ ).si_then([](auto){}).handle_error_interruptible(
+ crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
+ crimson::ct_error::pass_further_all{}
+ ).si_then([this, offset, &t, &ptr] {
+ logger().debug("dec_ref complete");
+ return tm->alloc_extent<TestBlock>(t, offset, ptr.length());
+ }).si_then([this, offset, &t, &ptr](auto ext) {
+ boost::ignore_unused(offset); // avoid clang warning;
+ assert(ext->get_laddr() == (size_t)offset);
+ assert(ext->get_bptr().length() == ptr.length());
+ ext->get_bptr().swap(ptr);
+ logger().debug("submitting transaction");
+ return tm->submit_transaction(t);
+ });
+ });
+ });
+ }).handle_error(
+ crimson::ct_error::assert_all{"store-nbd write"}
+ );
+}
+
+TMDriver::read_extents_ret TMDriver::read_extents(
+ Transaction &t,
+ laddr_t offset,
+ extent_len_t length)
+{
+ return seastar::do_with(
+ lba_pin_list_t(),
+ lextent_list_t<TestBlock>(),
+ [this, &t, offset, length](auto &pins, auto &ret) {
+ return tm->get_pins(
+ t, offset, length
+ ).si_then([this, &t, &pins, &ret](auto _pins) {
+ _pins.swap(pins);
+ logger().debug("read_extents: mappings {}", pins);
+ return trans_intr::do_for_each(
+ pins.begin(),
+ pins.end(),
+ [this, &t, &ret](auto &&pin) {
+ logger().debug(
+ "read_extents: get_extent {}~{}",
+ pin->get_val(),
+ pin->get_length());
+ return tm->read_pin<TestBlock>(
+ t,
+ std::move(pin)
+ ).si_then([&ret](auto ref) mutable {
+ ret.push_back(std::make_pair(ref->get_laddr(), ref));
+ logger().debug(
+ "read_extents: got extent {}",
+ *ref);
+ return seastar::now();
+ });
+ }).si_then([&ret] {
+ return std::move(ret);
+ });
+ });
+ });
+}
+
+seastar::future<bufferlist> TMDriver::read(
+ off_t offset,
+ size_t size)
+{
+ logger().debug("Reading offset {}", offset);
+ assert(offset % device->get_block_size() == 0);
+ assert(size % device->get_block_size() == 0);
+ auto blptrret = std::make_unique<bufferlist>();
+ auto &blret = *blptrret;
+ return repeat_eagain([=, &blret, this] {
+ return tm->with_transaction_intr(
+ Transaction::src_t::READ,
+ "read",
+ [=, &blret, this](auto& t)
+ {
+ return read_extents(t, offset, size
+ ).si_then([=, &blret](auto ext_list) {
+ size_t cur = offset;
+ for (auto &i: ext_list) {
+ if (cur != i.first) {
+ assert(cur < i.first);
+ blret.append_zero(i.first - cur);
+ cur = i.first;
+ }
+ blret.append(i.second->get_bptr());
+ cur += i.second->get_bptr().length();
+ }
+ if (blret.length() != size) {
+ assert(blret.length() < size);
+ blret.append_zero(size - blret.length());
+ }
+ });
+ });
+ }).handle_error(
+ crimson::ct_error::assert_all{"store-nbd read"}
+ ).then([blptrret=std::move(blptrret)]() mutable {
+ logger().debug("read complete");
+ return std::move(*blptrret);
+ });
+}
+
+void TMDriver::init()
+{
+ std::vector<Device*> sec_devices;
+#ifndef NDEBUG
+ tm = make_transaction_manager(device.get(), sec_devices, true);
+#else
+ tm = make_transaction_manager(device.get(), sec_devices, false);
+#endif
+}
+
+void TMDriver::clear()
+{
+ tm.reset();
+}
+
+size_t TMDriver::get_size() const
+{
+ return device->get_available_size() * .5;
+}
+
+seastar::future<> TMDriver::mkfs()
+{
+ assert(config.path);
+ logger().debug("mkfs");
+ return Device::make_device(*config.path, device_type_t::SSD
+ ).then([this](DeviceRef dev) {
+ device = std::move(dev);
+ seastore_meta_t meta;
+ meta.seastore_id.generate_random();
+ return device->mkfs(
+ device_config_t{
+ true,
+ (magic_t)std::rand(),
+ device_type_t::SSD,
+ 0,
+ meta,
+ secondary_device_set_t()});
+ }).safe_then([this] {
+ logger().debug("device mkfs done");
+ return device->mount();
+ }).safe_then([this] {
+ init();
+ logger().debug("tm mkfs");
+ return tm->mkfs();
+ }).safe_then([this] {
+ logger().debug("tm close");
+ return tm->close();
+ }).safe_then([this] {
+ logger().debug("sm close");
+ return device->close();
+ }).safe_then([this] {
+ clear();
+ device.reset();
+ logger().debug("mkfs complete");
+ return TransactionManager::mkfs_ertr::now();
+ }).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid errror during TMDriver::mkfs"
+ }
+ );
+}
+
+seastar::future<> TMDriver::mount()
+{
+ return (config.mkfs ? mkfs() : seastar::now()
+ ).then([this] {
+ return Device::make_device(*config.path, device_type_t::SSD);
+ }).then([this](DeviceRef dev) {
+ device = std::move(dev);
+ return device->mount();
+ }).safe_then([this] {
+ init();
+ return tm->mount();
+ }).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid errror during TMDriver::mount"
+ }
+ );
+};
+
+seastar::future<> TMDriver::close()
+{
+ return tm->close().safe_then([this] {
+ clear();
+ return device->close();
+ }).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid errror during TMDriver::close"
+ }
+ );
+}
diff --git a/src/crimson/tools/store_nbd/tm_driver.h b/src/crimson/tools/store_nbd/tm_driver.h
new file mode 100644
index 000000000..24aabdeb6
--- /dev/null
+++ b/src/crimson/tools/store_nbd/tm_driver.h
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "block_driver.h"
+
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/device.h"
+#include "crimson/os/seastore/transaction_manager.h"
+#include "test/crimson/seastore/test_block.h"
+
+class TMDriver final : public BlockDriver {
+public:
+ TMDriver(config_t config) : config(config) {}
+ ~TMDriver() final {}
+
+ bufferptr get_buffer(size_t size) final {
+ return ceph::buffer::create_page_aligned(size);
+ }
+
+ seastar::future<> write(
+ off_t offset,
+ bufferptr ptr) final;
+
+ seastar::future<bufferlist> read(
+ off_t offset,
+ size_t size) final;
+
+ size_t get_size() const final;
+
+ seastar::future<> mount() final;
+
+ seastar::future<> close() final;
+
+private:
+ const config_t config;
+
+ using DeviceRef = crimson::os::seastore::DeviceRef;
+ DeviceRef device;
+
+ using TransactionManager = crimson::os::seastore::TransactionManager;
+ using TransactionManagerRef = crimson::os::seastore::TransactionManagerRef;
+ TransactionManagerRef tm;
+
+ seastar::future<> mkfs();
+ void init();
+ void clear();
+
+ using read_extents_iertr = TransactionManager::read_extent_iertr;
+ using read_extents_ret = read_extents_iertr::future<
+ crimson::os::seastore::lextent_list_t<crimson::os::seastore::TestBlock>
+ >;
+ read_extents_ret read_extents(
+ crimson::os::seastore::Transaction &t,
+ crimson::os::seastore::laddr_t offset,
+ crimson::os::seastore::extent_len_t length);
+};