From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/crimson/tools/CMakeLists.txt | 22 + src/crimson/tools/perf_async_msgr.cc | 151 ++++ src/crimson/tools/perf_crimson_msgr.cc | 1222 +++++++++++++++++++++++++++ src/crimson/tools/perf_staged_fltree.cc | 178 ++++ src/crimson/tools/store_nbd/block_driver.cc | 19 + src/crimson/tools/store_nbd/block_driver.h | 134 +++ src/crimson/tools/store_nbd/fs_driver.cc | 310 +++++++ src/crimson/tools/store_nbd/fs_driver.h | 72 ++ src/crimson/tools/store_nbd/store-nbd.cc | 456 ++++++++++ src/crimson/tools/store_nbd/tm_driver.cc | 222 +++++ src/crimson/tools/store_nbd/tm_driver.h | 56 ++ 11 files changed, 2842 insertions(+) create mode 100644 src/crimson/tools/CMakeLists.txt create mode 100644 src/crimson/tools/perf_async_msgr.cc create mode 100644 src/crimson/tools/perf_crimson_msgr.cc create mode 100644 src/crimson/tools/perf_staged_fltree.cc create mode 100644 src/crimson/tools/store_nbd/block_driver.cc create mode 100644 src/crimson/tools/store_nbd/block_driver.h create mode 100644 src/crimson/tools/store_nbd/fs_driver.cc create mode 100644 src/crimson/tools/store_nbd/fs_driver.h create mode 100644 src/crimson/tools/store_nbd/store-nbd.cc create mode 100644 src/crimson/tools/store_nbd/tm_driver.cc create mode 100644 src/crimson/tools/store_nbd/tm_driver.h (limited to 'src/crimson/tools') diff --git a/src/crimson/tools/CMakeLists.txt b/src/crimson/tools/CMakeLists.txt new file mode 100644 index 000000000..fc18ff90b --- /dev/null +++ b/src/crimson/tools/CMakeLists.txt @@ -0,0 +1,22 @@ +add_executable(crimson-store-nbd + store_nbd/store-nbd.cc + store_nbd/tm_driver.cc + store_nbd/fs_driver.cc + store_nbd/block_driver.cc + ) +target_link_libraries(crimson-store-nbd + crimson-os) +install(TARGETS crimson-store-nbd DESTINATION bin) + +add_executable(perf-crimson-msgr perf_crimson_msgr.cc) +target_link_libraries(perf-crimson-msgr crimson) + +add_executable(perf-async-msgr perf_async_msgr.cc) +target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS}) + +add_executable(perf-staged-fltree perf_staged_fltree.cc) +if(WITH_TESTS) +target_link_libraries(perf-staged-fltree crimson-seastore crimson::gtest) +else() +target_link_libraries(perf-staged-fltree crimson-seastore) +endif() diff --git a/src/crimson/tools/perf_async_msgr.cc b/src/crimson/tools/perf_async_msgr.cc new file mode 100644 index 000000000..38cc84fbb --- /dev/null +++ b/src/crimson/tools/perf_async_msgr.cc @@ -0,0 +1,151 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include +#include + +#include "auth/Auth.h" +#include "global/global_init.h" +#include "msg/Dispatcher.h" +#include "msg/Messenger.h" +#include "messages/MOSDOp.h" + +#include "auth/DummyAuth.h" + +namespace { + +constexpr int CEPH_OSD_PROTOCOL = 10; + +struct Server { + Server(CephContext* cct, unsigned msg_len) + : dummy_auth(cct), dispatcher(cct, msg_len) + { + msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0)); + dummy_auth.auth_registry.refresh_config(); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + } + DummyAuthClientServer dummy_auth; + std::unique_ptr msgr; + struct ServerDispatcher : Dispatcher { + unsigned msg_len = 0; + bufferlist msg_data; + + ServerDispatcher(CephContext* cct, unsigned msg_len) + : Dispatcher(cct), msg_len(msg_len) + { + msg_data.append_zero(msg_len); + } + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_OSD_OP; + } + void ms_fast_dispatch(Message* m) override { + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + rep->write(0, msg_len, data); + rep->set_tid(m->get_tid()); + m->get_connection()->send_message(rep); + m->put(); + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection*) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + } dispatcher; +}; + +} + +static void run(CephContext* cct, entity_addr_t addr, unsigned bs) +{ + std::cout << "async server listening at " << addr << std::endl; + Server server{cct, bs}; + server.msgr->bind(addr); + server.msgr->add_dispatcher_head(&server.dispatcher); + server.msgr->start(); + server.msgr->wait(); +} + +int main(int argc, char** argv) +{ + namespace po = boost::program_options; + po::options_description desc{"Allowed options"}; + desc.add_options() + ("help,h", "show help message") + ("addr", po::value()->default_value("v2:127.0.0.1:9010"), + "server address(crimson only supports msgr v2 protocol)") + ("bs", po::value()->default_value(0), + "server block size") + ("crc-enabled", po::value()->default_value(false), + "enable CRC checks") + ("threads", po::value()->default_value(3), + "async messenger worker threads"); + po::variables_map vm; + std::vector unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + auto addr = vm["addr"].as(); + entity_addr_t target_addr; + target_addr.parse(addr.c_str(), nullptr); + ceph_assert_always(target_addr.is_msgr2()); + auto bs = vm["bs"].as(); + auto crc_enabled = vm["crc-enabled"].as(); + auto worker_threads = vm["threads"].as(); + + std::vector args(argv, argv + argc); + auto cct = global_init(nullptr, args, + CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); + common_init_finish(cct.get()); + + if (crc_enabled) { + cct->_conf.set_val("ms_crc_header", "true"); + cct->_conf.set_val("ms_crc_data", "true"); + } else { + cct->_conf.set_val("ms_crc_header", "false"); + cct->_conf.set_val("ms_crc_data", "false"); + } + + cct->_conf.set_val("ms_async_op_threads", fmt::format("{}", worker_threads)); + + std::cout << "server[" << addr + << "](bs=" << bs + << ", crc_enabled=" << crc_enabled + << ", worker_threads=" << worker_threads + << std::endl; + + run(cct.get(), target_addr, bs); +} diff --git a/src/crimson/tools/perf_crimson_msgr.cc b/src/crimson/tools/perf_crimson_msgr.cc new file mode 100644 index 000000000..aa5753442 --- /dev/null +++ b/src/crimson/tools/perf_crimson_msgr.cc @@ -0,0 +1,1222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/ceph_time.h" +#include "messages/MOSDOp.h" +#include "include/random.h" + +#include "crimson/auth/DummyAuth.h" +#include "crimson/common/log.h" +#include "crimson/common/config_proxy.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Messenger.h" +#include "crimson/osd/stop_signal.h" + +using namespace std; +using namespace std::chrono_literals; + +using lowres_clock_t = seastar::lowres_system_clock; + +namespace bpo = boost::program_options; + +namespace { + +template +using Ref = boost::intrusive_ptr; + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +template +seastar::future create_sharded(Args... args) { + // seems we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [=] { + auto sharded_obj = seastar::make_lw_shared>(); + return sharded_obj->start(args...).then([sharded_obj]() { + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().then([sharded_obj] {}); + }); + return sharded_obj.get(); + }); + }).then([] (seastar::sharded *ptr_shard) { + // return the pointer valid for the caller CPU + return &ptr_shard->local(); + }); +} + +double get_reactor_utilization() { + auto &value_map = seastar::metrics::impl::get_value_map(); + auto found = value_map.find("reactor_utilization"); + assert(found != value_map.end()); + auto &[full_name, metric_family] = *found; + std::ignore = full_name; + assert(metric_family.size() == 1); + const auto& [labels, metric] = *metric_family.begin(); + std::ignore = labels; + auto value = (*metric)(); + return value.ui(); +} + +enum class perf_mode_t { + both, + client, + server +}; + +struct client_config { + entity_addr_t server_addr; + unsigned block_size; + unsigned ramptime; + unsigned msgtime; + unsigned num_clients; + unsigned num_conns; + unsigned depth; + bool skip_core_0; + + std::string str() const { + std::ostringstream out; + out << "client[>> " << server_addr + << "](bs=" << block_size + << ", ramptime=" << ramptime + << ", msgtime=" << msgtime + << ", num_clients=" << num_clients + << ", num_conns=" << num_conns + << ", depth=" << depth + << ", skip_core_0=" << skip_core_0 + << ")"; + return out.str(); + } + + static client_config load(bpo::variables_map& options) { + client_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["server-addr"].as().c_str(), nullptr)); + ceph_assert_always(addr.is_msgr2()); + + conf.server_addr = addr; + conf.block_size = options["client-bs"].as(); + conf.ramptime = options["ramptime"].as(); + conf.msgtime = options["msgtime"].as(); + conf.num_clients = options["clients"].as(); + ceph_assert_always(conf.num_clients > 0); + conf.num_conns = options["conns-per-client"].as(); + ceph_assert_always(conf.num_conns > 0); + conf.depth = options["depth"].as(); + conf.skip_core_0 = options["client-skip-core-0"].as(); + return conf; + } +}; + +struct server_config { + entity_addr_t addr; + unsigned block_size; + bool is_fixed_cpu; + unsigned core; + + std::string str() const { + std::ostringstream out; + out << "server[" << addr + << "](bs=" << block_size + << ", is_fixed_cpu=" << is_fixed_cpu + << ", core=" << core + << ")"; + return out.str(); + } + + static server_config load(bpo::variables_map& options) { + server_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["server-addr"].as().c_str(), nullptr)); + ceph_assert_always(addr.is_msgr2()); + + conf.addr = addr; + conf.block_size = options["server-bs"].as(); + conf.is_fixed_cpu = options["server-fixed-cpu"].as(); + conf.core = options["server-core"].as(); + return conf; + } +}; + +const unsigned SAMPLE_RATE = 256; + +static seastar::future<> run( + perf_mode_t mode, + const client_config& client_conf, + const server_config& server_conf, + bool crc_enabled) +{ + struct test_state { + struct Server final + : public crimson::net::Dispatcher, + public seastar::peering_sharded_service { + // available only in msgr_sid + crimson::net::MessengerRef msgr; + crimson::auth::DummyAuthClientServer dummy_auth; + const seastar::shard_id msgr_sid; + std::string lname; + + bool is_fixed_cpu = true; + bool is_stopped = false; + std::optional> fut_report; + + unsigned conn_count = 0; + unsigned msg_count = 0; + MessageRef last_msg; + + // available in all shards + unsigned msg_len; + bufferlist msg_data; + + Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report) + : msgr_sid{msgr_sid}, + msg_len{msg_len} { + lname = fmt::format("server@{}", msgr_sid); + msg_data.append_zero(msg_len); + + if (seastar::this_shard_id() == msgr_sid && + needs_report) { + start_report(); + } + } + + void ms_handle_connect( + crimson::net::ConnectionRef, + seastar::shard_id) override { + ceph_abort("impossible, server won't connect"); + } + + void ms_handle_accept( + crimson::net::ConnectionRef, + seastar::shard_id new_shard, + bool is_replace) override { + ceph_assert_always(new_shard == seastar::this_shard_id()); + auto &server = container().local(); + ++server.conn_count; + } + + void ms_handle_reset( + crimson::net::ConnectionRef, + bool) override { + auto &server = container().local(); + --server.conn_count; + } + + std::optional> ms_dispatch( + crimson::net::ConnectionRef c, MessageRef m) override { + assert(c->get_shard_id() == seastar::this_shard_id()); + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + + auto &server = container().local(); + + // server replies with MOSDOp to generate server-side write workload + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + auto rep = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(server.msg_data); + rep->write(0, server.msg_len, data); + rep->set_tid(m->get_tid()); + ++server.msg_count; + std::ignore = c->send(std::move(rep)); + + if (server.msg_count % 16 == 0) { + server.last_msg = std::move(m); + } + return {seastar::now()}; + } + + seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) { + return container().invoke_on( + msgr_sid, [addr, is_fixed_cpu](auto &server) { + // server msgr is always with nonce 0 + server.msgr = crimson::net::Messenger::create( + entity_name_t::OSD(server.msgr_sid), + server.lname, 0, is_fixed_cpu); + server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + server.msgr->set_auth_client(&server.dummy_auth); + server.msgr->set_auth_server(&server.dummy_auth); + server.is_fixed_cpu = is_fixed_cpu; + return server.msgr->bind(entity_addrvec_t{addr} + ).safe_then([&server] { + return server.msgr->start({&server}); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [addr] (const std::error_code& e) { + logger().error("Server: " + "there is another instance running at {}", addr); + ceph_abort(); + })); + }); + } + + seastar::future<> shutdown() { + logger().info("{} shutdown...", lname); + return container().invoke_on( + msgr_sid, [](auto &server) { + server.is_stopped = true; + ceph_assert(server.msgr); + server.msgr->stop(); + return server.msgr->shutdown( + ).then([&server] { + if (server.fut_report.has_value()) { + return std::move(server.fut_report.value()); + } else { + return seastar::now(); + } + }); + }); + } + + private: + struct ShardReport { + unsigned msg_count = 0; + + // per-interval metrics + double reactor_utilization; + unsigned conn_count = 0; + int msg_size = 0; + unsigned msg_count_interval = 0; + }; + + // should not be called frequently to impact performance + void get_report(ShardReport& last) { + unsigned last_msg_count = last.msg_count; + int msg_size = -1; + if (last_msg) { + auto msg = boost::static_pointer_cast(last_msg); + msg->finish_decode(); + ceph_assert_always(msg->ops.size() == 1); + msg_size = msg->ops[0].op.extent.length; + last_msg.reset(); + } + + last.msg_count = msg_count; + last.reactor_utilization = get_reactor_utilization(); + last.conn_count = conn_count; + last.msg_size = msg_size; + last.msg_count_interval = msg_count - last_msg_count; + } + + struct TimerReport { + unsigned elapsed = 0u; + mono_time start_time = mono_clock::zero(); + std::vector reports; + + TimerReport(unsigned shards) : reports(shards) {} + }; + + void start_report() { + seastar::promise<> pr_report; + fut_report = pr_report.get_future(); + seastar::do_with( + TimerReport(seastar::smp::count), + [this](auto &report) { + return seastar::do_until( + [this] { return is_stopped; }, + [&report, this] { + return seastar::sleep(2s + ).then([&report, this] { + report.elapsed += 2; + if (is_fixed_cpu) { + return seastar::smp::submit_to(msgr_sid, + [&report, this] { + auto &server = container().local(); + server.get_report(report.reports[seastar::this_shard_id()]); + }).then([&report, this] { + auto now = mono_clock::now(); + auto prv = report.start_time; + report.start_time = now; + if (prv == mono_clock::zero()) { + // cannot compute duration + return; + } + std::chrono::duration duration_d = now - prv; + double duration = duration_d.count(); + auto &ireport = report.reports[msgr_sid]; + double iops = ireport.msg_count_interval / duration; + double throughput_MB = -1; + if (ireport.msg_size >= 0) { + throughput_MB = iops * ireport.msg_size / 1048576; + } + std::ostringstream sout; + sout << setfill(' ') + << report.elapsed + << "(" << std::setw(5) << duration << ") " + << std::setw(9) << iops << "IOPS " + << std::setw(8) << throughput_MB << "MiB/s " + << ireport.reactor_utilization + << "(" << ireport.conn_count << ")"; + std::cout << sout.str() << std::endl; + }); + } else { + return seastar::smp::invoke_on_all([&report, this] { + auto &server = container().local(); + server.get_report(report.reports[seastar::this_shard_id()]); + }).then([&report, this] { + auto now = mono_clock::now(); + auto prv = report.start_time; + report.start_time = now; + if (prv == mono_clock::zero()) { + // cannot compute duration + return; + } + std::chrono::duration duration_d = now - prv; + double duration = duration_d.count(); + unsigned num_msgs = 0; + // -1 means unavailable, -2 means mismatch + int msg_size = -1; + for (auto &i : report.reports) { + if (i.msg_size >= 0) { + if (msg_size == -2) { + // pass + } else if (msg_size == -1) { + msg_size = i.msg_size; + } else { + if (msg_size != i.msg_size) { + msg_size = -2; + } + } + } + num_msgs += i.msg_count_interval; + } + double iops = num_msgs / duration; + double throughput_MB = msg_size; + if (msg_size >= 0) { + throughput_MB = iops * msg_size / 1048576; + } + std::ostringstream sout; + sout << setfill(' ') + << report.elapsed + << "(" << std::setw(5) << duration << ") " + << std::setw(9) << iops << "IOPS " + << std::setw(8) << throughput_MB << "MiB/s "; + for (auto &i : report.reports) { + sout << i.reactor_utilization + << "(" << i.conn_count << ") "; + } + std::cout << sout.str() << std::endl; + }); + } + }); + } + ); + }).then([this] { + logger().info("report is stopped!"); + }).forward_to(std::move(pr_report)); + } + }; + + struct Client final + : public crimson::net::Dispatcher, + public seastar::peering_sharded_service { + + struct ConnStats { + mono_time connecting_time = mono_clock::zero(); + mono_time connected_time = mono_clock::zero(); + unsigned received_count = 0u; + + mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; + + unsigned sampled_count = 0u; + double sampled_total_lat_s = 0.0; + + // for reporting only + mono_time finish_time = mono_clock::zero(); + + void start_connecting() { + connecting_time = mono_clock::now(); + } + + void finish_connecting() { + ceph_assert_always(connected_time == mono_clock::zero()); + connected_time = mono_clock::now(); + } + + void start_collect() { + ceph_assert_always(connected_time != mono_clock::zero()); + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + sampled_total_lat_s = 0.0; + finish_time = mono_clock::zero(); + } + + void prepare_summary(const ConnStats ¤t) { + *this = current; + finish_time = mono_clock::now(); + } + }; + + struct PeriodStats { + mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; + unsigned sampled_count = 0u; + double sampled_total_lat_s = 0.0; + + // for reporting only + mono_time finish_time = mono_clock::zero(); + unsigned finish_count = 0u; + unsigned depth = 0u; + + void start_collect(unsigned received_count) { + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + sampled_total_lat_s = 0.0; + } + + void reset_period( + unsigned received_count, unsigned _depth, PeriodStats &snapshot) { + snapshot.start_time = start_time; + snapshot.start_count = start_count; + snapshot.sampled_count = sampled_count; + snapshot.sampled_total_lat_s = sampled_total_lat_s; + snapshot.finish_time = mono_clock::now(); + snapshot.finish_count = received_count; + snapshot.depth = _depth; + + start_collect(received_count); + } + }; + + struct JobReport { + std::string name; + unsigned depth = 0; + double connect_time_s = 0; + unsigned total_msgs = 0; + double messaging_time_s = 0; + double latency_ms = 0; + double iops = 0; + double throughput_mbps = 0; + + void account(const JobReport &stats) { + depth += stats.depth; + connect_time_s += stats.connect_time_s; + total_msgs += stats.total_msgs; + messaging_time_s += stats.messaging_time_s; + latency_ms += stats.latency_ms; + iops += stats.iops; + throughput_mbps += stats.throughput_mbps; + } + + void report() const { + auto str = fmt::format( + "{}(depth={}):\n" + " connect time: {:08f}s\n" + " messages received: {}\n" + " messaging time: {:08f}s\n" + " latency: {:08f}ms\n" + " IOPS: {:08f}\n" + " out throughput: {:08f}MB/s", + name, depth, connect_time_s, + total_msgs, messaging_time_s, + latency_ms, iops, + throughput_mbps); + std::cout << str << std::endl; + } + }; + + struct ConnectionPriv : public crimson::net::Connection::user_private_t { + unsigned index; + ConnectionPriv(unsigned i) : index{i} {} + }; + + struct ConnState { + crimson::net::MessengerRef msgr; + ConnStats conn_stats; + PeriodStats period_stats; + seastar::semaphore depth; + std::vector time_msgs_sent; + unsigned sent_count = 0u; + crimson::net::ConnectionRef active_conn; + bool stop_send = false; + seastar::promise stopped_send_promise; + + ConnState(std::size_t _depth) + : depth{_depth}, + time_msgs_sent{_depth, lowres_clock_t::time_point::min()} {} + + unsigned get_current_units() const { + ceph_assert(depth.available_units() >= 0); + return depth.current(); + } + + seastar::future stop_dispatch_messages() { + stop_send = true; + depth.broken(DepthBroken()); + return stopped_send_promise.get_future(); + } + }; + + const seastar::shard_id sid; + const unsigned id; + const std::optional server_sid; + + const unsigned num_clients; + const unsigned num_conns; + const unsigned msg_len; + bufferlist msg_data; + const unsigned nr_depth; + const unsigned nonce_base; + crimson::auth::DummyAuthClientServer dummy_auth; + + std::vector conn_states; + + Client(unsigned num_clients, + unsigned num_conns, + unsigned msg_len, + unsigned _depth, + unsigned nonce_base, + std::optional server_sid) + : sid{seastar::this_shard_id()}, + id{sid + num_clients - seastar::smp::count}, + server_sid{server_sid}, + num_clients{num_clients}, + num_conns{num_conns}, + msg_len{msg_len}, + nr_depth{_depth}, + nonce_base{nonce_base} { + if (is_active()) { + for (unsigned i = 0; i < num_conns; ++i) { + conn_states.emplace_back(nr_depth); + } + } + msg_data.append_zero(msg_len); + } + + std::string get_name(unsigned i) { + return fmt::format("client{}Conn{}@{}", id, i, sid); + } + + void ms_handle_connect( + crimson::net::ConnectionRef conn, + seastar::shard_id prv_shard) override { + ceph_assert_always(prv_shard == seastar::this_shard_id()); + assert(is_active()); + unsigned index = static_cast(conn->get_user_private()).index; + auto &conn_state = conn_states[index]; + conn_state.conn_stats.finish_connecting(); + } + + std::optional> ms_dispatch( + crimson::net::ConnectionRef conn, MessageRef m) override { + assert(is_active()); + // server replies with MOSDOp to generate server-side write workload + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + + unsigned index = static_cast(conn->get_user_private()).index; + assert(index < num_conns); + auto &conn_state = conn_states[index]; + + auto msg_id = m->get_tid(); + if (msg_id % SAMPLE_RATE == 0) { + auto msg_index = msg_id % conn_state.time_msgs_sent.size(); + ceph_assert(conn_state.time_msgs_sent[msg_index] != + lowres_clock_t::time_point::min()); + std::chrono::duration cur_latency = + lowres_clock_t::now() - conn_state.time_msgs_sent[msg_index]; + conn_state.conn_stats.sampled_total_lat_s += cur_latency.count(); + ++(conn_state.conn_stats.sampled_count); + conn_state.period_stats.sampled_total_lat_s += cur_latency.count(); + ++(conn_state.period_stats.sampled_count); + conn_state.time_msgs_sent[msg_index] = lowres_clock_t::time_point::min(); + } + + ++(conn_state.conn_stats.received_count); + conn_state.depth.signal(1); + + return {seastar::now()}; + } + + // should start messenger at this shard? + bool is_active() { + ceph_assert(seastar::this_shard_id() == sid); + return sid + num_clients >= seastar::smp::count; + } + + seastar::future<> init() { + return container().invoke_on_all([](auto& client) { + if (client.is_active()) { + return seastar::do_for_each( + boost::make_counting_iterator(0u), + boost::make_counting_iterator(client.num_conns), + [&client](auto i) { + auto &conn_state = client.conn_states[i]; + std::string name = client.get_name(i); + conn_state.msgr = crimson::net::Messenger::create( + entity_name_t::OSD(client.id * client.num_conns + i), + name, client.nonce_base + client.id * client.num_conns + i, true); + conn_state.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + conn_state.msgr->set_auth_client(&client.dummy_auth); + conn_state.msgr->set_auth_server(&client.dummy_auth); + return conn_state.msgr->start({&client}); + }); + } + return seastar::now(); + }); + } + + seastar::future<> shutdown() { + return seastar::do_with( + std::vector(num_clients * num_conns), + [this](auto &all_stats) { + return container().invoke_on_all([&all_stats](auto& client) { + if (!client.is_active()) { + return seastar::now(); + } + + return seastar::parallel_for_each( + boost::make_counting_iterator(0u), + boost::make_counting_iterator(client.num_conns), + [&all_stats, &client](auto i) { + logger().info("{} shutdown...", client.get_name(i)); + auto &conn_state = client.conn_states[i]; + return conn_state.stop_dispatch_messages( + ).then([&all_stats, &client, i](auto stats) { + all_stats[client.id * client.num_conns + i] = stats; + }); + }).then([&client] { + return seastar::do_for_each( + boost::make_counting_iterator(0u), + boost::make_counting_iterator(client.num_conns), + [&client](auto i) { + auto &conn_state = client.conn_states[i]; + ceph_assert(conn_state.msgr); + conn_state.msgr->stop(); + return conn_state.msgr->shutdown(); + }); + }); + }).then([&all_stats, this] { + auto nr_jobs = all_stats.size(); + JobReport summary; + std::vector clients(num_clients); + + for (unsigned i = 0; i < nr_jobs; ++i) { + auto &stats = all_stats[i]; + stats.report(); + clients[i / num_conns].account(stats); + summary.account(stats); + } + + std::cout << std::endl; + std::cout << "per client:" << std::endl; + for (unsigned i = 0; i < num_clients; ++i) { + auto &stats = clients[i]; + stats.name = fmt::format("client{}", i); + stats.connect_time_s /= num_conns; + stats.messaging_time_s /= num_conns; + stats.latency_ms /= num_conns; + stats.report(); + } + + std::cout << std::endl; + summary.name = fmt::format("all", nr_jobs); + summary.connect_time_s /= nr_jobs; + summary.messaging_time_s /= nr_jobs; + summary.latency_ms /= nr_jobs; + summary.report(); + }); + }); + } + + seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) { + return container().invoke_on_all([peer_addr](auto& client) { + // start clients in active cores + if (client.is_active()) { + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + conn_state.conn_stats.start_connecting(); + conn_state.active_conn = conn_state.msgr->connect(peer_addr, entity_name_t::TYPE_OSD); + conn_state.active_conn->set_user_private( + std::make_unique(i)); + } + // make sure handshake won't hurt the performance + return seastar::sleep(1s).then([&client] { + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + if (conn_state.conn_stats.connected_time == mono_clock::zero()) { + logger().error("\n{} not connected after 1s!\n", + client.get_name(i)); + ceph_assert(false); + } + } + }); + } + return seastar::now(); + }); + } + + private: + class TimerReport { + private: + const unsigned num_clients; + const unsigned num_conns; + const unsigned msgtime; + const unsigned bytes_of_block; + + unsigned elapsed = 0u; + std::vector snaps; + std::vector summaries; + std::vector client_reactor_utilizations; + std::optional server_reactor_utilization; + + public: + TimerReport(unsigned num_clients, unsigned num_conns, unsigned msgtime, unsigned bs) + : num_clients{num_clients}, + num_conns{num_conns}, + msgtime{msgtime}, + bytes_of_block{bs}, + snaps{num_clients * num_conns}, + summaries{num_clients * num_conns}, + client_reactor_utilizations(num_clients) {} + + unsigned get_elapsed() const { return elapsed; } + + PeriodStats& get_snap(unsigned client_id, unsigned i) { + return snaps[client_id * num_conns + i]; + } + + ConnStats& get_summary(unsigned client_id, unsigned i) { + return summaries[client_id * num_conns + i]; + } + + void set_client_reactor_utilization(unsigned client_id, double ru) { + client_reactor_utilizations[client_id] = ru; + } + + void set_server_reactor_utilization(double ru) { + server_reactor_utilization = ru; + } + + bool should_stop() const { + return elapsed >= msgtime; + } + + seastar::future<> ticktock() { + return seastar::sleep(1s).then([this] { + ++elapsed; + }); + } + + void report_header() const { + std::ostringstream sout; + sout << std::setfill(' ') + << std::setw(6) << "sec" + << std::setw(7) << "depth" + << std::setw(10) << "IOPS" + << std::setw(9) << "MB/s" + << std::setw(9) << "lat(ms)"; + std::cout << sout.str() << std::endl; + } + + void report_period() { + std::chrono::duration elapsed_d = 0s; + unsigned depth = 0u; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double sampled_total_lat_s = 0.0; + for (const auto& snap: snaps) { + elapsed_d += (snap.finish_time - snap.start_time); + depth += snap.depth; + ops += (snap.finish_count - snap.start_count); + sampled_count += snap.sampled_count; + sampled_total_lat_s += snap.sampled_total_lat_s; + } + double elapsed_s = elapsed_d.count() / (num_clients * num_conns); + double iops = ops/elapsed_s; + std::ostringstream sout; + sout << setfill(' ') + << std::setw(5) << elapsed_s + << " " + << std::setw(6) << depth + << " " + << std::setw(9) << iops + << " " + << std::setw(8) << iops * bytes_of_block / 1048576 + << " " + << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000) + << " -- "; + if (server_reactor_utilization.has_value()) { + sout << *server_reactor_utilization << " -- "; + } + for (double cru : client_reactor_utilizations) { + sout << cru << ","; + } + std::cout << sout.str() << std::endl; + } + + void report_summary() const { + std::chrono::duration elapsed_d = 0s; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double sampled_total_lat_s = 0.0; + for (const auto& summary: summaries) { + elapsed_d += (summary.finish_time - summary.start_time); + ops += (summary.received_count - summary.start_count); + sampled_count += summary.sampled_count; + sampled_total_lat_s += summary.sampled_total_lat_s; + } + double elapsed_s = elapsed_d.count() / (num_clients * num_conns); + double iops = ops / elapsed_s; + std::ostringstream sout; + sout << "--------------" + << " summary " + << "--------------\n" + << setfill(' ') + << std::setw(7) << elapsed_s + << std::setw(6) << "-" + << std::setw(8) << iops + << std::setw(8) << iops * bytes_of_block / 1048576 + << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000) + << "\n"; + std::cout << sout.str() << std::endl; + } + }; + + seastar::future<> report_period(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + PeriodStats& snap = report.get_snap(client.id, i); + conn_state.period_stats.reset_period( + conn_state.conn_stats.received_count, + client.nr_depth - conn_state.get_current_units(), + snap); + } + report.set_client_reactor_utilization(client.id, get_reactor_utilization()); + } + if (client.server_sid.has_value() && + seastar::this_shard_id() == *client.server_sid) { + assert(!client.is_active()); + report.set_server_reactor_utilization(get_reactor_utilization()); + } + }).then([&report] { + report.report_period(); + }); + } + + seastar::future<> report_summary(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + ConnStats& summary = report.get_summary(client.id, i); + summary.prepare_summary(conn_state.conn_stats); + } + } + }).then([&report] { + report.report_summary(); + }); + } + + public: + seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) { + logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns", + num_clients, num_conns); + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + for (unsigned i = 0; i < client.num_conns; ++i) { + client.do_dispatch_messages(i); + } + } + }).then([ramptime] { + logger().info("[all clients]: ramping up {} seconds...", ramptime); + return seastar::sleep(std::chrono::seconds(ramptime)); + }).then([this] { + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + conn_state.conn_stats.start_collect(); + conn_state.period_stats.start_collect(conn_state.conn_stats.received_count); + } + } + }); + }).then([this, msgtime] { + logger().info("[all clients]: reporting {} seconds...\n", msgtime); + return seastar::do_with( + TimerReport(num_clients, num_conns, msgtime, msg_len), + [this](auto& report) { + report.report_header(); + return seastar::do_until( + [&report] { return report.should_stop(); }, + [&report, this] { + return report.ticktock().then([&report, this] { + // report period every 1s + return report_period(report); + }).then([&report, this] { + // report summary every 10s + if (report.get_elapsed() % 10 == 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); + } + ).then([&report, this] { + // report the final summary + if (report.get_elapsed() % 10 != 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); + }); + }); + } + + private: + seastar::future<> send_msg(ConnState &conn_state) { + ceph_assert(seastar::this_shard_id() == sid); + conn_state.sent_count += 1; + return conn_state.depth.wait(1 + ).then([this, &conn_state] { + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + auto m = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + m->write(0, msg_len, data); + // use tid as the identity of each round + m->set_tid(conn_state.sent_count); + + // sample message latency + if (unlikely(conn_state.sent_count % SAMPLE_RATE == 0)) { + auto index = conn_state.sent_count % conn_state.time_msgs_sent.size(); + ceph_assert(conn_state.time_msgs_sent[index] == + lowres_clock_t::time_point::min()); + conn_state.time_msgs_sent[index] = lowres_clock_t::now(); + } + + return conn_state.active_conn->send(std::move(m)); + }); + } + + class DepthBroken: public std::exception {}; + + seastar::future stop_dispatch_messages(unsigned i) { + auto &conn_state = conn_states[i]; + conn_state.stop_send = true; + conn_state.depth.broken(DepthBroken()); + return conn_state.stopped_send_promise.get_future(); + } + + void do_dispatch_messages(unsigned i) { + ceph_assert(seastar::this_shard_id() == sid); + auto &conn_state = conn_states[i]; + ceph_assert(conn_state.sent_count == 0); + conn_state.conn_stats.start_time = mono_clock::now(); + // forwarded to stopped_send_promise + (void) seastar::do_until( + [&conn_state] { return conn_state.stop_send; }, + [this, &conn_state] { return send_msg(conn_state); } + ).handle_exception_type([] (const DepthBroken& e) { + // ok, stopped by stop_dispatch_messages() + }).then([this, &conn_state, i] { + std::string name = get_name(i); + logger().info("{} {}: stopped sending OSDOPs", + name, *conn_state.active_conn); + + std::chrono::duration dur_conn = + conn_state.conn_stats.connected_time - + conn_state.conn_stats.connecting_time; + std::chrono::duration dur_msg = + mono_clock::now() - conn_state.conn_stats.start_time; + unsigned ops = + conn_state.conn_stats.received_count - + conn_state.conn_stats.start_count; + + JobReport stats; + stats.name = name; + stats.depth = nr_depth; + stats.connect_time_s = dur_conn.count(); + stats.total_msgs = ops; + stats.messaging_time_s = dur_msg.count(); + stats.latency_ms = + conn_state.conn_stats.sampled_total_lat_s / + conn_state.conn_stats.sampled_count * 1000; + stats.iops = ops / dur_msg.count(); + stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576; + + conn_state.stopped_send_promise.set_value(stats); + }); + } + }; + }; + + std::optional server_sid; + bool server_needs_report = false; + if (mode == perf_mode_t::both) { + ceph_assert(server_conf.is_fixed_cpu == true); + server_sid = server_conf.core; + } else if (mode == perf_mode_t::server) { + server_needs_report = true; + } + return seastar::when_all( + seastar::futurize_invoke([mode, server_conf, server_needs_report] { + if (mode == perf_mode_t::client) { + return seastar::make_ready_future(nullptr); + } else { + return create_sharded( + server_conf.core, + server_conf.block_size, + server_needs_report); + } + }), + seastar::futurize_invoke([mode, client_conf, server_sid] { + if (mode == perf_mode_t::server) { + return seastar::make_ready_future(nullptr); + } else { + unsigned nonce_base = ceph::util::generate_random_number(); + logger().info("client nonce_base={}", nonce_base); + return create_sharded( + client_conf.num_clients, + client_conf.num_conns, + client_conf.block_size, + client_conf.depth, + nonce_base, + server_sid); + } + }), + crimson::common::sharded_conf().start( + EntityName{}, std::string_view{"ceph"} + ).then([] { + return crimson::common::local_conf().start(); + }).then([crc_enabled] { + return crimson::common::local_conf().set_val( + "ms_crc_data", crc_enabled ? "true" : "false"); + }) + ).then([=](auto&& ret) { + auto server = std::move(std::get<0>(ret).get0()); + auto client = std::move(std::get<1>(ret).get0()); + // reserve core 0 for potentially better performance + if (mode == perf_mode_t::both) { + logger().info("\nperf settings:\n smp={}\n {}\n {}\n", + seastar::smp::count, client_conf.str(), server_conf.str()); + if (client_conf.skip_core_0) { + ceph_assert(seastar::smp::count > client_conf.num_clients); + } else { + ceph_assert(seastar::smp::count >= client_conf.num_clients); + } + ceph_assert(client_conf.num_clients > 0); + ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients); + return seastar::when_all_succeed( + // it is not reasonable to allow server/client to shared cores for + // performance benchmarking purposes. + server->init(server_conf.addr, server_conf.is_fixed_cpu), + client->init() + ).then_unpack([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).then([client] { + return client->shutdown(); + }).then([server] { + return server->shutdown(); + }); + } else if (mode == perf_mode_t::client) { + logger().info("\nperf settings:\n smp={}\n {}\n", + seastar::smp::count, client_conf.str()); + if (client_conf.skip_core_0) { + ceph_assert(seastar::smp::count > client_conf.num_clients); + } else { + ceph_assert(seastar::smp::count >= client_conf.num_clients); + } + ceph_assert(client_conf.num_clients > 0); + return client->init( + ).then([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).then([client] { + return client->shutdown(); + }); + } else { // mode == perf_mode_t::server + ceph_assert(seastar::smp::count > server_conf.core); + logger().info("\nperf settings:\n smp={}\n {}\n", + seastar::smp::count, server_conf.str()); + return seastar::async([server, server_conf] { + // FIXME: SIGINT is not received by stop_signal + seastar_apps_lib::stop_signal should_stop; + server->init(server_conf.addr, server_conf.is_fixed_cpu).get(); + should_stop.wait().get(); + server->shutdown().get(); + }); + } + }).finally([] { + return crimson::common::sharded_conf().stop(); + }); +} + +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("mode", bpo::value()->default_value(0), + "0: both, 1:client, 2:server") + ("server-addr", bpo::value()->default_value("v2:127.0.0.1:9010"), + "server address(only support msgr v2 protocol)") + ("ramptime", bpo::value()->default_value(5), + "seconds of client ramp-up time") + ("msgtime", bpo::value()->default_value(15), + "seconds of client messaging time") + ("clients", bpo::value()->default_value(1), + "number of client messengers") + ("conns-per-client", bpo::value()->default_value(1), + "number of connections per client") + ("client-bs", bpo::value()->default_value(4096), + "client block size") + ("depth", bpo::value()->default_value(512), + "client io depth per job") + ("client-skip-core-0", bpo::value()->default_value(true), + "client skip core 0") + ("server-fixed-cpu", bpo::value()->default_value(true), + "server is in the fixed cpu mode, non-fixed doesn't support the mode both") + ("server-core", bpo::value()->default_value(1), + "server messenger running core") + ("server-bs", bpo::value()->default_value(0), + "server block size") + ("crc-enabled", bpo::value()->default_value(false), + "enable CRC checks"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + auto mode = config["mode"].as(); + ceph_assert(mode <= 2); + auto _mode = static_cast(mode); + bool crc_enabled = config["crc-enabled"].as(); + auto server_conf = server_config::load(config); + auto client_conf = client_config::load(config); + return run(_mode, client_conf, server_conf, crc_enabled + ).then([] { + logger().info("\nsuccessful!\n"); + }).handle_exception([] (auto eptr) { + logger().info("\nfailed!\n"); + return seastar::make_exception_future<>(eptr); + }); + }); +} diff --git a/src/crimson/tools/perf_staged_fltree.cc b/src/crimson/tools/perf_staged_fltree.cc new file mode 100644 index 000000000..81b621750 --- /dev/null +++ b/src/crimson/tools/perf_staged_fltree.cc @@ -0,0 +1,178 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include +#include + +#include "crimson/common/config_proxy.h" +#include "crimson/common/log.h" +#include "crimson/common/perf_counters_collection.h" +#include "crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h" +#include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager.h" + +#include "test/crimson/seastore/onode_tree/test_value.h" +#include "test/crimson/seastore/transaction_manager_test_state.h" + +using namespace crimson::os::seastore::onode; +namespace bpo = boost::program_options; + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); +} + +template +class PerfTree : public TMTestState { + public: + PerfTree(bool is_dummy) : is_dummy{is_dummy} {} + + seastar::future<> run(KVPool& kvs, double erase_ratio) { + return tm_setup().then([this, &kvs, erase_ratio] { + return seastar::async([this, &kvs, erase_ratio] { + auto tree = std::make_unique>(kvs, + (is_dummy ? NodeExtentManager::create_dummy(true) + : NodeExtentManager::create_seastore(*tm))); + { + auto t = create_mutate_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->bootstrap(tr); + }).unsafe_get(); + submit_transaction(std::move(t)); + } + { + auto t = create_mutate_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->insert(tr); + }).unsafe_get(); + auto start_time = mono_clock::now(); + submit_transaction(std::move(t)); + std::chrono::duration duration = mono_clock::now() - start_time; + logger().warn("submit_transaction() done! {}s", duration.count()); + } + { + // Note: create_weak_transaction() can also work, but too slow. + auto t = create_read_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->get_stats(tr); + }).unsafe_get(); + + with_trans_intr(*t, [&](auto &tr){ + return tree->validate(tr); + }).unsafe_get(); + } + { + auto t = create_mutate_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->erase(tr, kvs.size() * erase_ratio); + }).unsafe_get(); + submit_transaction(std::move(t)); + } + { + auto t = create_read_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->get_stats(tr); + }).unsafe_get(); + + with_trans_intr(*t, [&](auto &tr){ + return tree->validate(tr); + }).unsafe_get(); + } + tree.reset(); + }); + }).then([this] { + return tm_teardown(); + }); + } + + private: + bool is_dummy; +}; + +template +seastar::future<> run(const bpo::variables_map& config) { + return seastar::async([&config] { + auto backend = config["backend"].as(); + bool is_dummy; + if (backend == "dummy") { + is_dummy = true; + } else if (backend == "seastore") { + is_dummy = false; + } else { + ceph_abort(false && "invalid backend"); + } + auto ns_sizes = config["ns-sizes"].as>(); + auto oid_sizes = config["oid-sizes"].as>(); + auto onode_sizes = config["onode-sizes"].as>(); + auto range2 = config["range2"].as>(); + ceph_assert(range2.size() == 2); + auto range1 = config["range1"].as>(); + ceph_assert(range1.size() == 2); + auto range0 = config["range0"].as>(); + ceph_assert(range0.size() == 2); + auto erase_ratio = config["erase-ratio"].as(); + ceph_assert(erase_ratio >= 0); + ceph_assert(erase_ratio <= 1); + + using crimson::common::sharded_conf; + sharded_conf().start(EntityName{}, std::string_view{"ceph"}).get(); + seastar::engine().at_exit([] { + return sharded_conf().stop(); + }); + + using crimson::common::sharded_perf_coll; + sharded_perf_coll().start().get(); + seastar::engine().at_exit([] { + return sharded_perf_coll().stop(); + }); + + auto kvs = KVPool::create_raw_range( + ns_sizes, oid_sizes, onode_sizes, + {range2[0], range2[1]}, + {range1[0], range1[1]}, + {range0[0], range0[1]}); + PerfTree perf{is_dummy}; + perf.run(kvs, erase_ratio).get0(); + }); +} + + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("backend", bpo::value()->default_value("dummy"), + "tree backend: dummy, seastore") + ("tracked", bpo::value()->default_value(false), + "track inserted cursors") + ("ns-sizes", bpo::value>()->default_value( + {8, 11, 64, 128, 255, 256}), + "sizes of ns strings") + ("oid-sizes", bpo::value>()->default_value( + {8, 13, 64, 512, 2035, 2048}), + "sizes of oid strings") + ("onode-sizes", bpo::value>()->default_value( + {8, 16, 128, 576, 992, 1200}), + "sizes of onode") + ("range2", bpo::value>()->default_value( + {0, 128}), + "range of shard-pool-crush [a, b)") + ("range1", bpo::value>()->default_value( + {0, 10}), + "range of ns-oid strings [a, b)") + ("range0", bpo::value>()->default_value( + {0, 4}), + "range of snap-gen [a, b)") + ("erase-ratio", bpo::value()->default_value( + 0.8), + "erase-ratio of all the inserted onodes"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + auto tracked = config["tracked"].as(); + if (tracked) { + return run(config); + } else { + return run(config); + } + }); +} diff --git a/src/crimson/tools/store_nbd/block_driver.cc b/src/crimson/tools/store_nbd/block_driver.cc new file mode 100644 index 000000000..10e77a34b --- /dev/null +++ b/src/crimson/tools/store_nbd/block_driver.cc @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "fs_driver.h" +#include "block_driver.h" + +#include "tm_driver.h" + +BlockDriverRef get_backend(BlockDriver::config_t config) +{ + if (config.type == "transaction_manager") { + return std::make_unique(config); + } else if (config.is_futurized_store()) { + return std::make_unique(config); + } else { + ceph_assert(0 == "invalid option"); + return BlockDriverRef(); + } +} diff --git a/src/crimson/tools/store_nbd/block_driver.h b/src/crimson/tools/store_nbd/block_driver.h new file mode 100644 index 000000000..ea3453ef3 --- /dev/null +++ b/src/crimson/tools/store_nbd/block_driver.h @@ -0,0 +1,134 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include + +#include +#include + +#include "include/buffer.h" + +/** + * BlockDriver + * + * Simple interface to enable throughput test to compare raw disk to + * transaction_manager, etc + */ +class BlockDriver { +public: + struct config_t { + std::string type; + bool mkfs = false; + unsigned num_pgs = 128; + unsigned log_size = 1000; + unsigned object_size = 4<<20 /* 4MB, rbd default */; + unsigned oi_size = 1<<9 /* 512b */; + unsigned log_entry_size = 1<<9 /* 512b */; + bool prepopulate_log = false; + std::optional path; + + bool is_futurized_store() const { + return type == "seastore" || type == "bluestore"; + } + + std::string get_fs_type() const { + ceph_assert(is_futurized_store()); + return type; + } + + bool oi_enabled() const { + return oi_size > 0; + } + + bool log_enabled() const { + return log_entry_size > 0 && log_size > 0; + } + + bool prepopulate_log_enabled() const { + return prepopulate_log; + } + + void populate_options( + boost::program_options::options_description &desc) + { + namespace po = boost::program_options; + desc.add_options() + ("type", + po::value() + ->default_value("transaction_manager") + ->notifier([this](auto s) { type = s; }), + "Backend to use, options are transaction_manager, seastore" + ) + ("device-path", + po::value() + ->required() + ->notifier([this](auto s) { path = s; }), + "Path to device for backend" + ) + ("num-pgs", + po::value() + ->notifier([this](auto s) { num_pgs = s; }), + "Number of pgs to use for futurized_store backends" + ) + ("log-size", + po::value() + ->notifier([this](auto s) { log_size = s; }), + "Number of log entries per pg to use for futurized_store backends" + ", 0 to disable" + ) + ("log-entry-size", + po::value() + ->notifier([this](auto s) { log_entry_size = s; }), + "Size of each log entry per pg to use for futurized_store backends" + ", 0 to disable" + ) + ("prepopulate-log", + po::value() + ->notifier([this](auto s) { prepopulate_log = s; }), + "Prepopulate log on mount" + ) + ("object-info-size", + po::value() + ->notifier([this](auto s) { log_entry_size = s; }), + "Size of each log entry per pg to use for futurized_store backends" + ", 0 to disable" + ) + ("object-size", + po::value() + ->notifier([this](auto s) { object_size = s; }), + "Object size to use for futurized_store backends" + ) + ("mkfs", + po::value() + ->default_value(false) + ->notifier([this](auto s) { mkfs = s; }), + "Do mkfs first" + ); + } + }; + + virtual ceph::bufferptr get_buffer(size_t size) = 0; + + virtual seastar::future<> write( + off_t offset, + ceph::bufferptr ptr) = 0; + + virtual seastar::future read( + off_t offset, + size_t size) = 0; + + virtual size_t get_size() const = 0; + + virtual seastar::future<> mount() = 0; + virtual seastar::future<> close() = 0; + + virtual ~BlockDriver() {} +}; +using BlockDriverRef = std::unique_ptr; + +BlockDriverRef get_backend(BlockDriver::config_t config); diff --git a/src/crimson/tools/store_nbd/fs_driver.cc b/src/crimson/tools/store_nbd/fs_driver.cc new file mode 100644 index 000000000..18f836766 --- /dev/null +++ b/src/crimson/tools/store_nbd/fs_driver.cc @@ -0,0 +1,310 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +#include "os/Transaction.h" +#include "fs_driver.h" + +using namespace crimson; +using namespace crimson::os; + +coll_t get_coll(unsigned num) { + return coll_t(spg_t(pg_t(0, num))); +} + +ghobject_t get_log_object(unsigned coll) +{ + return ghobject_t( + shard_id_t::NO_SHARD, + 0, + (coll << 16), + "", + "", + 0, + ghobject_t::NO_GEN); +} + +std::string make_log_key( + unsigned i) +{ + return fmt::format("log_entry_{}", i); +} + +void add_log_entry( + unsigned i, + unsigned entry_size, + std::map *omap) +{ + assert(omap); + bufferlist bl; + bl.append(ceph::buffer::create('0', entry_size)); + + omap->emplace(std::make_pair(make_log_key(i), bl)); +} + +void populate_log( + ceph::os::Transaction &t, + FSDriver::pg_analogue_t &pg, + unsigned entry_size, + unsigned entries) +{ + t.touch(pg.collection->get_cid(), pg.log_object); + // omap_clear not yet implemented, TODO + // t.omap_clear(pg.collection->get_cid(), pg.log_object); + + std::map omap; + for (unsigned i = 0; i < entries; ++i) { + add_log_entry(i, entry_size, &omap); + } + + t.omap_setkeys( + pg.collection->get_cid(), + pg.log_object, + omap); + + pg.log_head = entries; +} + +void update_log( + ceph::os::Transaction &t, + FSDriver::pg_analogue_t &pg, + unsigned entry_size, + unsigned entries) +{ + ++pg.log_head; + std::map key; + add_log_entry(pg.log_head, entry_size, &key); + + t.omap_setkeys( + pg.collection->get_cid(), + pg.log_object, + key); + + + while ((pg.log_head - pg.log_tail) > entries) { + t.omap_rmkey( + pg.collection->get_cid(), + pg.log_object, + make_log_key(pg.log_tail)); + ++pg.log_tail; + } +} + +FSDriver::offset_mapping_t FSDriver::map_offset(off_t offset) +{ + uint32_t objid = offset / config.object_size; + uint32_t collid = objid % config.num_pgs; + return offset_mapping_t{ + collections[collid], + ghobject_t( + shard_id_t::NO_SHARD, + 0, + (collid << 16) | (objid + 1), + "", + "", + 0, + ghobject_t::NO_GEN), + offset % config.object_size + }; +} + +seastar::future<> FSDriver::write( + off_t offset, + bufferptr ptr) +{ + auto mapping = map_offset(offset); + ceph_assert(mapping.offset + ptr.length() <= config.object_size); + ceph::os::Transaction t; + bufferlist bl; + bl.append(ptr); + t.write( + mapping.pg.collection->get_cid(), + mapping.object, + mapping.offset, + ptr.length(), + bl, + 0); + + if (config.oi_enabled() ) { + bufferlist attr; + attr.append(ceph::buffer::create(config.oi_size, '0')); + t.setattr( + mapping.pg.collection->get_cid(), + mapping.object, + "_", + attr); + } + + if (config.log_enabled()) { + update_log( + t, + mapping.pg, + config.log_entry_size, + config.log_size); + } + + return sharded_fs->do_transaction( + mapping.pg.collection, + std::move(t)); +} + +seastar::future FSDriver::read( + off_t offset, + size_t size) +{ + auto mapping = map_offset(offset); + ceph_assert((mapping.offset + size) <= config.object_size); + return sharded_fs->read( + mapping.pg.collection, + mapping.object, + mapping.offset, + size, + 0 + ).handle_error( + crimson::ct_error::enoent::handle([size](auto &e) { + bufferlist bl; + bl.append_zero(size); + return seastar::make_ready_future(std::move(bl)); + }), + crimson::ct_error::assert_all{"Unrecoverable error in FSDriver::read"} + ).then([size](auto &&bl) { + if (bl.length() < size) { + bl.append_zero(size - bl.length()); + } + return seastar::make_ready_future(std::move(bl)); + }); +} + +seastar::future<> FSDriver::mkfs() +{ + return init( + ).then([this] { + assert(fs); + uuid_d uuid; + uuid.generate_random(); + return fs->mkfs(uuid).handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + crimson::get_logger(ceph_subsys_test) + .error("error creating empty object store in {}: ({}) {}", + crimson::common::local_conf().get_val("osd_data"), + ec.value(), ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([this] { + return fs->stop(); + }).then([this] { + return init(); + }).then([this] { + return fs->mount( + ).handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + crimson::get_logger( + ceph_subsys_test + ).error( + "error mounting object store in {}: ({}) {}", + crimson::common::local_conf().get_val("osd_data"), + ec.value(), + ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([this] { + return seastar::do_for_each( + boost::counting_iterator(0), + boost::counting_iterator(config.num_pgs), + [this](auto i) { + return sharded_fs->create_new_collection(get_coll(i) + ).then([this, i](auto coll) { + ceph::os::Transaction t; + t.create_collection(get_coll(i), 0); + return sharded_fs->do_transaction(coll, std::move(t)); + }); + }); + }).then([this] { + return fs->umount(); + }).then([this] { + return fs->stop(); + }).then([this] { + fs.reset(); + return seastar::now(); + }); +} + +seastar::future<> FSDriver::mount() +{ + ceph_assert(config.path); + return ( + config.mkfs ? mkfs() : seastar::now() + ).then([this] { + return init(); + }).then([this] { + return fs->mount( + ).handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + crimson::get_logger( + ceph_subsys_test + ).error( + "error mounting object store in {}: ({}) {}", + crimson::common::local_conf().get_val("osd_data"), + ec.value(), + ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([this] { + return seastar::do_for_each( + boost::counting_iterator(0), + boost::counting_iterator(config.num_pgs), + [this](auto i) { + return sharded_fs->open_collection(get_coll(i) + ).then([this, i](auto ref) { + collections[i].collection = ref; + collections[i].log_object = get_log_object(i); + if (config.log_enabled()) { + ceph::os::Transaction t; + if (config.prepopulate_log_enabled()) { + populate_log( + t, + collections[i], + config.log_entry_size, + config.log_size); + } + return sharded_fs->do_transaction( + collections[i].collection, + std::move(t)); + } else { + return seastar::now(); + } + }); + }); + }).then([this] { + return fs->stat(); + }).then([this](auto s) { + size = s.total; + }); +}; + +seastar::future<> FSDriver::close() +{ + collections.clear(); + return fs->umount( + ).then([this] { + return fs->stop(); + }).then([this] { + fs.reset(); + return seastar::now(); + }); +} + +seastar::future<> FSDriver::init() +{ + fs.reset(); + fs = FuturizedStore::create( + config.get_fs_type(), + *config.path, + crimson::common::local_conf().get_config_values() + ); + return fs->start().then([this] { + sharded_fs = &(fs->get_sharded_store()); + }); +} diff --git a/src/crimson/tools/store_nbd/fs_driver.h b/src/crimson/tools/store_nbd/fs_driver.h new file mode 100644 index 000000000..89aca075f --- /dev/null +++ b/src/crimson/tools/store_nbd/fs_driver.h @@ -0,0 +1,72 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "block_driver.h" + +#include "crimson/os/futurized_collection.h" +#include "crimson/os/futurized_store.h" + +class FSDriver final : public BlockDriver { +public: + FSDriver(config_t config) + : config(config) + {} + ~FSDriver() final {} + + bufferptr get_buffer(size_t size) final { + return ceph::buffer::create_page_aligned(size); + } + + seastar::future<> write( + off_t offset, + bufferptr ptr) final; + + seastar::future read( + off_t offset, + size_t size) final; + + size_t get_size() const { + return size; + } + + seastar::future<> mount() final; + + seastar::future<> close() final; + +private: + size_t size = 0; + const config_t config; + std::unique_ptr fs; + crimson::os::FuturizedStore::Shard* sharded_fs; + + struct pg_analogue_t { + crimson::os::CollectionRef collection; + + ghobject_t log_object; + unsigned log_tail = 0; + unsigned log_head = 0; + }; + std::map collections; + + struct offset_mapping_t { + pg_analogue_t &pg; + ghobject_t object; + off_t offset; + }; + offset_mapping_t map_offset(off_t offset); + + seastar::future<> mkfs(); + seastar::future<> init(); + + friend void populate_log( + ceph::os::Transaction &, + pg_analogue_t &, + unsigned, + unsigned); + + friend void update_log( + ceph::os::Transaction &, + FSDriver::pg_analogue_t &, + unsigned, + unsigned); +}; diff --git a/src/crimson/tools/store_nbd/store-nbd.cc b/src/crimson/tools/store_nbd/store-nbd.cc new file mode 100644 index 000000000..9f80c3b2c --- /dev/null +++ b/src/crimson/tools/store_nbd/store-nbd.cc @@ -0,0 +1,456 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +/** + * crimson-store-nbd + * + * This tool exposes crimson object store internals as an nbd server + * for use with fio in basic benchmarking. + * + * Example usage: + * + * $ ./bin/crimson-store-nbd --device-path /dev/nvme1n1 -c 1 --mkfs true --uds-path /tmp/store_nbd_socket.sock + * + * $ cat nbd.fio + * [global] + * ioengine=nbd + * uri=nbd+unix:///?socket=/tmp/store_nbd_socket.sock + * rw=randrw + * time_based + * runtime=120 + * group_reporting + * iodepth=1 + * size=500G + * + * [job0] + * offset=0 + * + * $ fio nbd.fio + */ + +#include + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "crimson/common/config_proxy.h" +#include "crimson/common/log.h" + +#include "block_driver.h" + +namespace po = boost::program_options; + +using namespace ceph; + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); + } +} + +struct request_context_t { + uint32_t magic = 0; + uint32_t type = 0; + + char handle[8] = {0}; + + uint64_t from = 0; + uint32_t len = 0; + + unsigned err = 0; + std::optional in_buffer; + std::optional out_buffer; + + using ref = std::unique_ptr; + static ref make_ref() { + return std::make_unique(); + } + + bool check_magic() const { + auto ret = magic == NBD_REQUEST_MAGIC; + if (!ret) { + logger().error( + "Invalid magic {} should be {}", + magic, + NBD_REQUEST_MAGIC); + } + return ret; + } + + uint32_t get_command() const { + return type & 0xff; + } + + bool has_input_buffer() const { + return get_command() == NBD_CMD_WRITE; + } + + seastar::future<> read_request(seastar::input_stream &in) { + return in.read_exactly(sizeof(struct nbd_request) + ).then([this, &in](auto buf) { + if (buf.size() < sizeof(struct nbd_request)) { + throw std::system_error( + std::make_error_code( + std::errc::connection_reset)); + } + auto p = buf.get(); + magic = seastar::consume_be(p); + type = seastar::consume_be(p); + memcpy(handle, p, sizeof(handle)); + p += sizeof(handle); + from = seastar::consume_be(p); + len = seastar::consume_be(p); + logger().debug( + "Got request, magic {}, type {}, from {}, len {}", + magic, type, from, len); + + if (!check_magic()) { + throw std::system_error( + std::make_error_code( + std::errc::invalid_argument)); + } + + if (has_input_buffer()) { + return in.read_exactly(len).then([this](auto buf) { + in_buffer = ceph::buffer::create_page_aligned(len); + in_buffer->copy_in(0, len, buf.get()); + return seastar::now(); + }); + } else { + return seastar::now(); + } + }); + } + + seastar::future<> write_reply(seastar::output_stream &out) { + seastar::temporary_buffer buffer{sizeof(struct nbd_reply)}; + auto p = buffer.get_write(); + seastar::produce_be(p, NBD_REPLY_MAGIC); + seastar::produce_be(p, err); + logger().debug("write_reply writing err {}", err); + memcpy(p, handle, sizeof(handle)); + return out.write(std::move(buffer)).then([this, &out] { + if (out_buffer) { + return seastar::do_for_each( + out_buffer->mut_buffers(), + [&out](bufferptr &ptr) { + logger().debug("write_reply writing {}", ptr.length()); + return out.write( + seastar::temporary_buffer( + ptr.c_str(), + ptr.length(), + seastar::make_deleter([ptr](){})) + ); + }); + } else { + return seastar::now(); + } + }).then([&out] { + return out.flush(); + }); + } +}; + +struct RequestWriter { + seastar::rwlock lock; + seastar::output_stream stream; + seastar::gate gate; + + RequestWriter( + seastar::output_stream &&stream) : stream(std::move(stream)) {} + RequestWriter(RequestWriter &&) = default; + + seastar::future<> complete(request_context_t::ref &&req) { + auto &request = *req; + return lock.write_lock( + ).then([&request, this] { + return request.write_reply(stream); + }).finally([&, this, req=std::move(req)] { + lock.write_unlock(); + logger().debug("complete"); + return seastar::now(); + }); + } + + seastar::future<> close() { + return gate.close().then([this] { + return stream.close(); + }); + } +}; + +/** + * NBDHandler + * + * Simple throughput test for concurrent, single threaded + * writes to an BlockDriver. + */ +class NBDHandler { + BlockDriver &backend; + std::string uds_path; + std::optional server_socket; + std::optional connected_socket; + seastar::gate gate; +public: + struct config_t { + std::string uds_path; + + void populate_options( + po::options_description &desc) + { + desc.add_options() + ("uds-path", + po::value() + ->default_value("/tmp/store_nbd_socket.sock") + ->notifier([this](auto s) { + uds_path = s; + }), + "Path to domain socket for nbd" + ); + } + }; + + NBDHandler( + BlockDriver &backend, + config_t config) : + backend(backend), + uds_path(config.uds_path) + {} + + void run(); + seastar::future<> stop(); +}; + +int main(int argc, char** argv) +{ + po::options_description desc{"Allowed options"}; + bool debug = false; + desc.add_options() + ("help,h", "show help message") + ("debug", po::value(&debug)->default_value(false), + "enable debugging"); + + po::options_description nbd_pattern_options{"NBD Pattern Options"}; + NBDHandler::config_t nbd_config; + nbd_config.populate_options(nbd_pattern_options); + desc.add(nbd_pattern_options); + + po::options_description backend_pattern_options{"Backend Options"}; + BlockDriver::config_t backend_config; + backend_config.populate_options(backend_pattern_options); + desc.add(backend_pattern_options); + + po::variables_map vm; + std::vector unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + + po::notify(vm); + unrecognized_options = + po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + std::vector args(argv, argv + argc); + + seastar::app_template::config app_cfg; + app_cfg.name = "crimson-store-nbd"; + app_cfg.auto_handle_sigint_sigterm = false; + seastar::app_template app(std::move(app_cfg)); + + std::vector av{argv[0]}; + std::transform(begin(unrecognized_options), + end(unrecognized_options), + std::back_inserter(av), + [](auto& s) { + return const_cast(s.c_str()); + }); + return app.run(av.size(), av.data(), [&] { + if (debug) { + seastar::global_logger_registry().set_all_loggers_level( + seastar::log_level::debug + ); + } + return seastar::async([&] { + seastar_apps_lib::stop_signal should_stop; + crimson::common::sharded_conf() + .start(EntityName{}, std::string_view{"ceph"}).get(); + auto stop_conf = seastar::defer([] { + crimson::common::sharded_conf().stop().get(); + }); + + auto backend = get_backend(backend_config); + NBDHandler nbd(*backend, nbd_config); + backend->mount().get(); + auto close_backend = seastar::defer([&] { + backend->close().get(); + }); + + logger().debug("Running nbd server..."); + nbd.run(); + auto stop_nbd = seastar::defer([&] { + nbd.stop().get(); + }); + should_stop.wait().get(); + return 0; + }); + }); +} + +class nbd_oldstyle_negotiation_t { + uint64_t magic = seastar::cpu_to_be(0x4e42444d41474943); // "NBDMAGIC" + uint64_t magic2 = seastar::cpu_to_be(0x00420281861253); // "IHAVEOPT" + uint64_t size = 0; + uint32_t flags = seastar::cpu_to_be(0); + char reserved[124] = {0}; + +public: + nbd_oldstyle_negotiation_t(uint64_t size, uint32_t flags) + : size(seastar::cpu_to_be(size)), flags(seastar::cpu_to_be(flags)) {} +} __attribute__((packed)); + +seastar::future<> send_negotiation( + size_t size, + seastar::output_stream& out) +{ + seastar::temporary_buffer buf{sizeof(nbd_oldstyle_negotiation_t)}; + new (buf.get_write()) nbd_oldstyle_negotiation_t(size, 1); + return out.write(std::move(buf) + ).then([&out] { + return out.flush(); + }); +} + +seastar::future<> handle_command( + BlockDriver &backend, + request_context_t::ref request_ref, + RequestWriter &out) +{ + auto &request = *request_ref; + logger().debug("got command {}", request.get_command()); + return ([&] { + switch (request.get_command()) { + case NBD_CMD_WRITE: + return backend.write( + request.from, + *request.in_buffer); + case NBD_CMD_READ: + return backend.read( + request.from, + request.len).then([&] (auto buffer) { + logger().debug("read returned buffer len {}", buffer.length()); + request.out_buffer = buffer; + }); + case NBD_CMD_DISC: + throw std::system_error(std::make_error_code(std::errc::bad_message)); + case NBD_CMD_TRIM: + throw std::system_error(std::make_error_code(std::errc::bad_message)); + default: + throw std::system_error(std::make_error_code(std::errc::bad_message)); + } + })().then([&, request_ref=std::move(request_ref)]() mutable { + logger().debug("handle_command complete"); + return out.complete(std::move(request_ref)); + }); +} + + +seastar::future<> handle_commands( + BlockDriver &backend, + seastar::input_stream& in, + RequestWriter &out) +{ + logger().debug("handle_commands"); + return seastar::keep_doing([&] { + logger().debug("waiting for command"); + auto request_ref = request_context_t::make_ref(); + auto &request = *request_ref; + return request.read_request(in).then( + [&, request_ref=std::move(request_ref)]() mutable { + // keep running in background + (void)seastar::try_with_gate(out.gate, + [&backend, &out, request_ref=std::move(request_ref)]() mutable { + return handle_command(backend, std::move(request_ref), out); + }); + logger().debug("handle_commands after fork"); + }); + }).handle_exception_type([](const seastar::gate_closed_exception&) {}); +} + +void NBDHandler::run() +{ + logger().debug("About to listen on {}", uds_path); + server_socket = seastar::engine().listen( + seastar::socket_address{ + seastar::unix_domain_addr{uds_path}}); + + // keep running in background + (void)seastar::keep_doing([this] { + return seastar::try_with_gate(gate, [this] { + return server_socket->accept().then([this](auto acc) { + logger().debug("Accepted"); + connected_socket = std::move(acc.connection); + return seastar::do_with( + connected_socket->input(), + RequestWriter{connected_socket->output()}, + [&, this](auto &input, auto &output) { + return send_negotiation( + backend.get_size(), + output.stream + ).then([&, this] { + return handle_commands(backend, input, output); + }).finally([&] { + std::cout << "closing input and output" << std::endl; + return seastar::when_all(input.close(), + output.close()); + }).discard_result().handle_exception([](auto e) { + logger().error("NBDHandler::run saw exception {}", e); + }); + }); + }).handle_exception_type([] (const std::system_error &e) { + // an ECONNABORTED is expected when we are being stopped. + if (e.code() != std::errc::connection_aborted) { + logger().error("accept failed: {}", e); + } + }); + }); + }).handle_exception_type([](const seastar::gate_closed_exception&) {}); +} + +seastar::future<> NBDHandler::stop() +{ + if (server_socket) { + server_socket->abort_accept(); + } + if (connected_socket) { + connected_socket->shutdown_input(); + connected_socket->shutdown_output(); + } + return gate.close().then([this] { + if (!server_socket.has_value()) { + return seastar::now(); + } + return seastar::remove_file(uds_path); + }); +} diff --git a/src/crimson/tools/store_nbd/tm_driver.cc b/src/crimson/tools/store_nbd/tm_driver.cc new file mode 100644 index 000000000..bd216fd58 --- /dev/null +++ b/src/crimson/tools/store_nbd/tm_driver.cc @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "tm_driver.h" + +using namespace crimson; +using namespace crimson::os; +using namespace crimson::os::seastore; + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); + } +} + +seastar::future<> TMDriver::write( + off_t offset, + bufferptr ptr) +{ + logger().debug("Writing offset {}", offset); + assert(offset % device->get_block_size() == 0); + assert((ptr.length() % device->get_block_size()) == 0); + return seastar::do_with(ptr, [this, offset](auto& ptr) { + return repeat_eagain([this, offset, &ptr] { + return tm->with_transaction_intr( + Transaction::src_t::MUTATE, + "write", + [this, offset, &ptr](auto& t) + { + return tm->dec_ref(t, offset + ).si_then([](auto){}).handle_error_interruptible( + crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }), + crimson::ct_error::pass_further_all{} + ).si_then([this, offset, &t, &ptr] { + logger().debug("dec_ref complete"); + return tm->alloc_extent(t, offset, ptr.length()); + }).si_then([this, offset, &t, &ptr](auto ext) { + boost::ignore_unused(offset); // avoid clang warning; + assert(ext->get_laddr() == (size_t)offset); + assert(ext->get_bptr().length() == ptr.length()); + ext->get_bptr().swap(ptr); + logger().debug("submitting transaction"); + return tm->submit_transaction(t); + }); + }); + }); + }).handle_error( + crimson::ct_error::assert_all{"store-nbd write"} + ); +} + +TMDriver::read_extents_ret TMDriver::read_extents( + Transaction &t, + laddr_t offset, + extent_len_t length) +{ + return seastar::do_with( + lba_pin_list_t(), + lextent_list_t(), + [this, &t, offset, length](auto &pins, auto &ret) { + return tm->get_pins( + t, offset, length + ).si_then([this, &t, &pins, &ret](auto _pins) { + _pins.swap(pins); + logger().debug("read_extents: mappings {}", pins); + return trans_intr::do_for_each( + pins.begin(), + pins.end(), + [this, &t, &ret](auto &&pin) { + logger().debug( + "read_extents: get_extent {}~{}", + pin->get_val(), + pin->get_length()); + return tm->read_pin( + t, + std::move(pin) + ).si_then([&ret](auto ref) mutable { + ret.push_back(std::make_pair(ref->get_laddr(), ref)); + logger().debug( + "read_extents: got extent {}", + *ref); + return seastar::now(); + }); + }).si_then([&ret] { + return std::move(ret); + }); + }); + }); +} + +seastar::future TMDriver::read( + off_t offset, + size_t size) +{ + logger().debug("Reading offset {}", offset); + assert(offset % device->get_block_size() == 0); + assert(size % device->get_block_size() == 0); + auto blptrret = std::make_unique(); + auto &blret = *blptrret; + return repeat_eagain([=, &blret, this] { + return tm->with_transaction_intr( + Transaction::src_t::READ, + "read", + [=, &blret, this](auto& t) + { + return read_extents(t, offset, size + ).si_then([=, &blret](auto ext_list) { + size_t cur = offset; + for (auto &i: ext_list) { + if (cur != i.first) { + assert(cur < i.first); + blret.append_zero(i.first - cur); + cur = i.first; + } + blret.append(i.second->get_bptr()); + cur += i.second->get_bptr().length(); + } + if (blret.length() != size) { + assert(blret.length() < size); + blret.append_zero(size - blret.length()); + } + }); + }); + }).handle_error( + crimson::ct_error::assert_all{"store-nbd read"} + ).then([blptrret=std::move(blptrret)]() mutable { + logger().debug("read complete"); + return std::move(*blptrret); + }); +} + +void TMDriver::init() +{ + std::vector sec_devices; +#ifndef NDEBUG + tm = make_transaction_manager(device.get(), sec_devices, true); +#else + tm = make_transaction_manager(device.get(), sec_devices, false); +#endif +} + +void TMDriver::clear() +{ + tm.reset(); +} + +size_t TMDriver::get_size() const +{ + return device->get_available_size() * .5; +} + +seastar::future<> TMDriver::mkfs() +{ + assert(config.path); + logger().debug("mkfs"); + return Device::make_device(*config.path, device_type_t::SSD + ).then([this](DeviceRef dev) { + device = std::move(dev); + seastore_meta_t meta; + meta.seastore_id.generate_random(); + return device->mkfs( + device_config_t{ + true, + (magic_t)std::rand(), + device_type_t::SSD, + 0, + meta, + secondary_device_set_t()}); + }).safe_then([this] { + logger().debug("device mkfs done"); + return device->mount(); + }).safe_then([this] { + init(); + logger().debug("tm mkfs"); + return tm->mkfs(); + }).safe_then([this] { + logger().debug("tm close"); + return tm->close(); + }).safe_then([this] { + logger().debug("sm close"); + return device->close(); + }).safe_then([this] { + clear(); + device.reset(); + logger().debug("mkfs complete"); + return TransactionManager::mkfs_ertr::now(); + }).handle_error( + crimson::ct_error::assert_all{ + "Invalid errror during TMDriver::mkfs" + } + ); +} + +seastar::future<> TMDriver::mount() +{ + return (config.mkfs ? mkfs() : seastar::now() + ).then([this] { + return Device::make_device(*config.path, device_type_t::SSD); + }).then([this](DeviceRef dev) { + device = std::move(dev); + return device->mount(); + }).safe_then([this] { + init(); + return tm->mount(); + }).handle_error( + crimson::ct_error::assert_all{ + "Invalid errror during TMDriver::mount" + } + ); +}; + +seastar::future<> TMDriver::close() +{ + return tm->close().safe_then([this] { + clear(); + return device->close(); + }).handle_error( + crimson::ct_error::assert_all{ + "Invalid errror during TMDriver::close" + } + ); +} diff --git a/src/crimson/tools/store_nbd/tm_driver.h b/src/crimson/tools/store_nbd/tm_driver.h new file mode 100644 index 000000000..24aabdeb6 --- /dev/null +++ b/src/crimson/tools/store_nbd/tm_driver.h @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "block_driver.h" + +#include "crimson/os/seastore/cache.h" +#include "crimson/os/seastore/device.h" +#include "crimson/os/seastore/transaction_manager.h" +#include "test/crimson/seastore/test_block.h" + +class TMDriver final : public BlockDriver { +public: + TMDriver(config_t config) : config(config) {} + ~TMDriver() final {} + + bufferptr get_buffer(size_t size) final { + return ceph::buffer::create_page_aligned(size); + } + + seastar::future<> write( + off_t offset, + bufferptr ptr) final; + + seastar::future read( + off_t offset, + size_t size) final; + + size_t get_size() const final; + + seastar::future<> mount() final; + + seastar::future<> close() final; + +private: + const config_t config; + + using DeviceRef = crimson::os::seastore::DeviceRef; + DeviceRef device; + + using TransactionManager = crimson::os::seastore::TransactionManager; + using TransactionManagerRef = crimson::os::seastore::TransactionManagerRef; + TransactionManagerRef tm; + + seastar::future<> mkfs(); + void init(); + void clear(); + + using read_extents_iertr = TransactionManager::read_extent_iertr; + using read_extents_ret = read_extents_iertr::future< + crimson::os::seastore::lextent_list_t + >; + read_extents_ret read_extents( + crimson::os::seastore::Transaction &t, + crimson::os::seastore::laddr_t offset, + crimson::os::seastore::extent_len_t length); +}; -- cgit v1.2.3