diff options
Diffstat (limited to 'src/tools/crimson')
-rw-r--r-- | src/tools/crimson/CMakeLists.txt | 8 | ||||
-rw-r--r-- | src/tools/crimson/perf_async_msgr.cc | 140 | ||||
-rw-r--r-- | src/tools/crimson/perf_crimson_msgr.cc | 746 | ||||
-rw-r--r-- | src/tools/crimson/perf_staged_fltree.cc | 129 |
4 files changed, 1023 insertions, 0 deletions
diff --git a/src/tools/crimson/CMakeLists.txt b/src/tools/crimson/CMakeLists.txt new file mode 100644 index 000000000..19a2cfa91 --- /dev/null +++ b/src/tools/crimson/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(perf-crimson-msgr perf_crimson_msgr.cc) +target_link_libraries(perf-crimson-msgr crimson) + +add_executable(perf-async-msgr perf_async_msgr.cc) +target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS}) + +add_executable(perf-staged-fltree perf_staged_fltree.cc) +target_link_libraries(perf-staged-fltree crimson-seastore) diff --git a/src/tools/crimson/perf_async_msgr.cc b/src/tools/crimson/perf_async_msgr.cc new file mode 100644 index 000000000..25d1d410e --- /dev/null +++ b/src/tools/crimson/perf_async_msgr.cc @@ -0,0 +1,140 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include <boost/program_options/variables_map.hpp> +#include <boost/program_options/parsers.hpp> + +#include "auth/Auth.h" +#include "global/global_init.h" +#include "msg/Dispatcher.h" +#include "msg/Messenger.h" +#include "messages/MOSDOp.h" + +#include "auth/DummyAuth.h" + +namespace { + +constexpr int CEPH_OSD_PROTOCOL = 10; + +struct Server { + Server(CephContext* cct, unsigned msg_len) + : dummy_auth(cct), dispatcher(cct, msg_len) + { + msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0)); + dummy_auth.auth_registry.refresh_config(); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + msgr->set_require_authorizer(false); + } + DummyAuthClientServer dummy_auth; + unique_ptr<Messenger> msgr; + struct ServerDispatcher : Dispatcher { + unsigned msg_len = 0; + bufferlist msg_data; + + ServerDispatcher(CephContext* cct, unsigned msg_len) + : Dispatcher(cct), msg_len(msg_len) + { + msg_data.append_zero(msg_len); + } + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_OSD_OP; + } + void ms_fast_dispatch(Message* m) override { + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + rep->write(0, msg_len, data); + rep->set_tid(m->get_tid()); + m->get_connection()->send_message(rep); + m->put(); + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection*) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + } dispatcher; +}; + +} + +static void run(CephContext* cct, entity_addr_t addr, unsigned bs) +{ + std::cout << "async server listening at " << addr << std::endl; + Server server{cct, bs}; + server.msgr->bind(addr); + server.msgr->add_dispatcher_head(&server.dispatcher); + server.msgr->start(); + server.msgr->wait(); +} + +int main(int argc, char** argv) +{ + namespace po = boost::program_options; + po::options_description desc{"Allowed options"}; + desc.add_options() + ("help,h", "show help message") + ("addr", po::value<std::string>()->default_value("v1:127.0.0.1:9010"), + "server address") + ("bs", po::value<unsigned>()->default_value(0), + "server block size") + ("v1-crc-enabled", po::value<bool>()->default_value(false), + "enable v1 CRC checks"); + po::variables_map vm; + std::vector<std::string> unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + auto addr = vm["addr"].as<std::string>(); + entity_addr_t target_addr; + target_addr.parse(addr.c_str(), nullptr); + auto bs = vm["bs"].as<unsigned>(); + auto v1_crc_enabled = vm["v1-crc-enabled"].as<bool>(); + + std::vector<const char*> args(argv, argv + argc); + auto cct = global_init(nullptr, args, + CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); + common_init_finish(cct.get()); + + if (v1_crc_enabled) { + cct->_conf.set_val("ms_crc_header", "true"); + cct->_conf.set_val("ms_crc_data", "true"); + } else { + cct->_conf.set_val("ms_crc_header", "false"); + cct->_conf.set_val("ms_crc_data", "false"); + } + + run(cct.get(), target_addr, bs); +} diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc new file mode 100644 index 000000000..e76f273a9 --- /dev/null +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -0,0 +1,746 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <map> +#include <random> +#include <boost/program_options.hpp> + +#include <seastar/core/app-template.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/semaphore.hh> +#include <seastar/core/smp.hh> + +#include "common/ceph_time.h" +#include "messages/MOSDOp.h" + +#include "crimson/auth/DummyAuth.h" +#include "crimson/common/log.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Messenger.h" + +namespace bpo = boost::program_options; + +namespace { + +template<typename Message> +using Ref = boost::intrusive_ptr<Message>; + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +template <typename T, typename... Args> +seastar::future<T*> create_sharded(Args... args) { + // seems we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [=] { + auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>(); + return sharded_obj->start(args...).then([sharded_obj]() { + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().then([sharded_obj] {}); + }); + return sharded_obj.get(); + }); + }).then([] (seastar::sharded<T> *ptr_shard) { + // return the pointer valid for the caller CPU + return &ptr_shard->local(); + }); +} + +enum class perf_mode_t { + both, + client, + server +}; + +struct client_config { + entity_addr_t server_addr; + unsigned block_size; + unsigned ramptime; + unsigned msgtime; + unsigned jobs; + unsigned depth; + bool v1_crc_enabled; + + std::string str() const { + std::ostringstream out; + out << "client[>> " << server_addr + << "](bs=" << block_size + << ", ramptime=" << ramptime + << ", msgtime=" << msgtime + << ", jobs=" << jobs + << ", depth=" << depth + << ", v1-crc-enabled=" << v1_crc_enabled + << ")"; + return out.str(); + } + + static client_config load(bpo::variables_map& options) { + client_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr)); + + conf.server_addr = addr; + conf.block_size = options["cbs"].as<unsigned>(); + conf.ramptime = options["ramptime"].as<unsigned>(); + conf.msgtime = options["msgtime"].as<unsigned>(); + conf.jobs = options["jobs"].as<unsigned>(); + conf.depth = options["depth"].as<unsigned>(); + ceph_assert(conf.depth % conf.jobs == 0); + conf.v1_crc_enabled = options["v1-crc-enabled"].as<bool>(); + return conf; + } +}; + +struct server_config { + entity_addr_t addr; + unsigned block_size; + unsigned core; + bool v1_crc_enabled; + + std::string str() const { + std::ostringstream out; + out << "server[" << addr + << "](bs=" << block_size + << ", core=" << core + << ", v1-crc-enabled=" << v1_crc_enabled + << ")"; + return out.str(); + } + + static server_config load(bpo::variables_map& options) { + server_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr)); + + conf.addr = addr; + conf.block_size = options["sbs"].as<unsigned>(); + conf.core = options["core"].as<unsigned>(); + conf.v1_crc_enabled = options["v1-crc-enabled"].as<bool>(); + return conf; + } +}; + +const unsigned SAMPLE_RATE = 7; + +static seastar::future<> run( + perf_mode_t mode, + const client_config& client_conf, + const server_config& server_conf) +{ + struct test_state { + struct Server; + using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>; + + struct Server final + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; + crimson::auth::DummyAuthClientServer dummy_auth; + const seastar::shard_id msgr_sid; + std::string lname; + unsigned msg_len; + bufferlist msg_data; + + Server(unsigned msg_len) + : msgr_sid{seastar::this_shard_id()}, + msg_len{msg_len} { + lname = "server#"; + lname += std::to_string(msgr_sid); + msg_data.append_zero(msg_len); + } + + std::optional<seastar::future<>> ms_dispatch( + crimson::net::ConnectionRef c, MessageRef m) override { + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + + // server replies with MOSDOp to generate server-side write workload + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + auto rep = make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + rep->write(0, msg_len, data); + rep->set_tid(m->get_tid()); + std::ignore = c->send(std::move(rep)); + return {seastar::now()}; + } + + seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) { + return seastar::smp::submit_to(msgr_sid, [v1_crc_enabled, addr, this] { + // server msgr is always with nonce 0 + msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0); + msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + if (v1_crc_enabled) { + msgr->set_crc_header(); + msgr->set_crc_data(); + } + return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { + return msgr->start({this}); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [addr] (const std::error_code& e) { + logger().error("Server: " + "there is another instance running at {}", addr); + ceph_abort(); + })); + }); + } + seastar::future<> shutdown() { + logger().info("{} shutdown...", lname); + return seastar::smp::submit_to(msgr_sid, [this] { + ceph_assert(msgr); + msgr->stop(); + return msgr->shutdown(); + }); + } + seastar::future<> wait() { + return seastar::smp::submit_to(msgr_sid, [this] { + ceph_assert(msgr); + return msgr->wait(); + }); + } + + static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) { + return seastar::smp::submit_to(msgr_sid, [msg_len] { + return seastar::make_foreign(std::make_unique<Server>(msg_len)); + }); + } + }; + + struct Client final + : public crimson::net::Dispatcher, + public seastar::peering_sharded_service<Client> { + + struct ConnStats { + mono_time connecting_time = mono_clock::zero(); + mono_time connected_time = mono_clock::zero(); + unsigned received_count = 0u; + + mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; + + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + + // for reporting only + mono_time finish_time = mono_clock::zero(); + + void start() { + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + total_lat_s = 0.0; + finish_time = mono_clock::zero(); + } + }; + ConnStats conn_stats; + + struct PeriodStats { + mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + + // for reporting only + mono_time finish_time = mono_clock::zero(); + unsigned finish_count = 0u; + unsigned depth = 0u; + + void reset(unsigned received_count, PeriodStats* snap = nullptr) { + if (snap) { + snap->start_time = start_time; + snap->start_count = start_count; + snap->sampled_count = sampled_count; + snap->total_lat_s = total_lat_s; + snap->finish_time = mono_clock::now(); + snap->finish_count = received_count; + } + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + total_lat_s = 0.0; + } + }; + PeriodStats period_stats; + + const seastar::shard_id sid; + std::string lname; + + const unsigned jobs; + crimson::net::MessengerRef msgr; + const unsigned msg_len; + bufferlist msg_data; + const unsigned nr_depth; + seastar::semaphore depth; + std::vector<mono_time> time_msgs_sent; + crimson::auth::DummyAuthClientServer dummy_auth; + + unsigned sent_count = 0u; + crimson::net::ConnectionRef active_conn = nullptr; + + bool stop_send = false; + seastar::promise<> stopped_send_promise; + + Client(unsigned jobs, unsigned msg_len, unsigned depth) + : sid{seastar::this_shard_id()}, + jobs{jobs}, + msg_len{msg_len}, + nr_depth{depth/jobs}, + depth{nr_depth}, + time_msgs_sent{depth/jobs, mono_clock::zero()} { + lname = "client#"; + lname += std::to_string(sid); + msg_data.append_zero(msg_len); + } + + unsigned get_current_depth() const { + ceph_assert(depth.available_units() >= 0); + return nr_depth - depth.current(); + } + + void ms_handle_connect(crimson::net::ConnectionRef conn) override { + conn_stats.connected_time = mono_clock::now(); + } + std::optional<seastar::future<>> ms_dispatch( + crimson::net::ConnectionRef, MessageRef m) override { + // server replies with MOSDOp to generate server-side write workload + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + + auto msg_id = m->get_tid(); + if (msg_id % SAMPLE_RATE == 0) { + auto index = msg_id % time_msgs_sent.size(); + ceph_assert(time_msgs_sent[index] != mono_clock::zero()); + std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index]; + conn_stats.total_lat_s += cur_latency.count(); + ++(conn_stats.sampled_count); + period_stats.total_lat_s += cur_latency.count(); + ++(period_stats.sampled_count); + time_msgs_sent[index] = mono_clock::zero(); + } + + ++(conn_stats.received_count); + depth.signal(1); + + return {seastar::now()}; + } + + // should start messenger at this shard? + bool is_active() { + ceph_assert(seastar::this_shard_id() == sid); + return sid != 0 && sid <= jobs; + } + + seastar::future<> init(bool v1_crc_enabled) { + return container().invoke_on_all([v1_crc_enabled] (auto& client) { + if (client.is_active()) { + client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid); + client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + client.msgr->set_require_authorizer(false); + client.msgr->set_auth_client(&client.dummy_auth); + client.msgr->set_auth_server(&client.dummy_auth); + if (v1_crc_enabled) { + client.msgr->set_crc_header(); + client.msgr->set_crc_data(); + } + return client.msgr->start({&client}); + } + return seastar::now(); + }); + } + + seastar::future<> shutdown() { + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + logger().info("{} shutdown...", client.lname); + ceph_assert(client.msgr); + client.msgr->stop(); + return client.msgr->shutdown().then([&client] { + return client.stop_dispatch_messages(); + }); + } + return seastar::now(); + }); + } + + seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) { + return container().invoke_on_all([peer_addr] (auto& client) { + // start clients in active cores (#1 ~ #jobs) + if (client.is_active()) { + mono_time start_time = mono_clock::now(); + client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD); + // make sure handshake won't hurt the performance + return seastar::sleep(1s).then([&client, start_time] { + if (client.conn_stats.connected_time == mono_clock::zero()) { + logger().error("\n{} not connected after 1s!\n", client.lname); + ceph_assert(false); + } + client.conn_stats.connecting_time = start_time; + }); + } + return seastar::now(); + }); + } + + private: + class TimerReport { + private: + const unsigned jobs; + const unsigned msgtime; + const unsigned bytes_of_block; + + unsigned elapsed = 0u; + std::vector<mono_time> start_times; + std::vector<PeriodStats> snaps; + std::vector<ConnStats> summaries; + + public: + TimerReport(unsigned jobs, unsigned msgtime, unsigned bs) + : jobs{jobs}, + msgtime{msgtime}, + bytes_of_block{bs}, + start_times{jobs, mono_clock::zero()}, + snaps{jobs}, + summaries{jobs} {} + + unsigned get_elapsed() const { return elapsed; } + + PeriodStats& get_snap_by_job(seastar::shard_id sid) { + ceph_assert(sid >= 1 && sid <= jobs); + return snaps[sid - 1]; + } + + ConnStats& get_summary_by_job(seastar::shard_id sid) { + ceph_assert(sid >= 1 && sid <= jobs); + return summaries[sid - 1]; + } + + bool should_stop() const { + return elapsed >= msgtime; + } + + seastar::future<> ticktock() { + return seastar::sleep(1s).then([this] { + ++elapsed; + }); + } + + void report_header() { + std::ostringstream sout; + sout << std::setfill(' ') + << std::setw(7) << "sec" + << std::setw(6) << "depth" + << std::setw(8) << "IOPS" + << std::setw(8) << "MB/s" + << std::setw(8) << "lat(ms)"; + std::cout << sout.str() << std::endl; + } + + void report_period() { + if (elapsed == 1) { + // init this->start_times at the first period + for (unsigned i=0; i<jobs; ++i) { + start_times[i] = snaps[i].start_time; + } + } + std::chrono::duration<double> elapsed_d = 0s; + unsigned depth = 0u; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + for (const auto& snap: snaps) { + elapsed_d += (snap.finish_time - snap.start_time); + depth += snap.depth; + ops += (snap.finish_count - snap.start_count); + sampled_count += snap.sampled_count; + total_lat_s += snap.total_lat_s; + } + double elapsed_s = elapsed_d.count() / jobs; + double iops = ops/elapsed_s; + std::ostringstream sout; + sout << setfill(' ') + << std::setw(7) << elapsed_s + << std::setw(6) << depth + << std::setw(8) << iops + << std::setw(8) << iops * bytes_of_block / 1048576 + << std::setw(8) << (total_lat_s / sampled_count * 1000); + std::cout << sout.str() << std::endl; + } + + void report_summary() const { + std::chrono::duration<double> elapsed_d = 0s; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + for (const auto& summary: summaries) { + elapsed_d += (summary.finish_time - summary.start_time); + ops += (summary.received_count - summary.start_count); + sampled_count += summary.sampled_count; + total_lat_s += summary.total_lat_s; + } + double elapsed_s = elapsed_d.count() / jobs; + double iops = ops / elapsed_s; + std::ostringstream sout; + sout << "--------------" + << " summary " + << "--------------\n" + << setfill(' ') + << std::setw(7) << elapsed_s + << std::setw(6) << "-" + << std::setw(8) << iops + << std::setw(8) << iops * bytes_of_block / 1048576 + << std::setw(8) << (total_lat_s / sampled_count * 1000) + << "\n"; + std::cout << sout.str() << std::endl; + } + }; + + seastar::future<> report_period(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + PeriodStats& snap = report.get_snap_by_job(client.sid); + client.period_stats.reset(client.conn_stats.received_count, + &snap); + snap.depth = client.get_current_depth(); + } + }).then([&report] { + report.report_period(); + }); + } + + seastar::future<> report_summary(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + ConnStats& summary = report.get_summary_by_job(client.sid); + summary = client.conn_stats; + summary.finish_time = mono_clock::now(); + } + }).then([&report] { + report.report_summary(); + }); + } + + public: + seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) { + logger().info("[all clients]: start sending MOSDOps from {} clients", jobs); + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + client.do_dispatch_messages(client.active_conn.get()); + } + }).then([this, ramptime] { + logger().info("[all clients]: ramping up {} seconds...", ramptime); + return seastar::sleep(std::chrono::seconds(ramptime)); + }).then([this] { + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + client.conn_stats.start(); + client.period_stats.reset(client.conn_stats.received_count); + } + }); + }).then([this, msgtime] { + logger().info("[all clients]: reporting {} seconds...\n", msgtime); + return seastar::do_with( + TimerReport(jobs, msgtime, msg_len), [this] (auto& report) { + report.report_header(); + return seastar::do_until( + [&report] { return report.should_stop(); }, + [&report, this] { + return report.ticktock().then([&report, this] { + // report period every 1s + return report_period(report); + }).then([&report, this] { + // report summary every 10s + if (report.get_elapsed() % 10 == 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); + } + ).then([&report, this] { + // report the final summary + if (report.get_elapsed() % 10 != 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); + }); + }); + } + + private: + seastar::future<> send_msg(crimson::net::Connection* conn) { + ceph_assert(seastar::this_shard_id() == sid); + return depth.wait(1).then([this, conn] { + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + auto m = make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + m->write(0, msg_len, data); + // use tid as the identity of each round + m->set_tid(sent_count); + + // sample message latency + if (sent_count % SAMPLE_RATE == 0) { + auto index = sent_count % time_msgs_sent.size(); + ceph_assert(time_msgs_sent[index] == mono_clock::zero()); + time_msgs_sent[index] = mono_clock::now(); + } + + return conn->send(std::move(m)); + }); + } + + class DepthBroken: public std::exception {}; + + seastar::future<> stop_dispatch_messages() { + stop_send = true; + depth.broken(DepthBroken()); + return stopped_send_promise.get_future(); + } + + void do_dispatch_messages(crimson::net::Connection* conn) { + ceph_assert(seastar::this_shard_id() == sid); + ceph_assert(sent_count == 0); + conn_stats.start_time = mono_clock::now(); + // forwarded to stopped_send_promise + (void) seastar::do_until( + [this] { return stop_send; }, + [this, conn] { + sent_count += 1; + return send_msg(conn); + } + ).handle_exception_type([] (const DepthBroken& e) { + // ok, stopped by stop_dispatch_messages() + }).then([this, conn] { + std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time; + std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time; + unsigned ops = conn_stats.received_count - conn_stats.start_count; + logger().info("{}: stopped sending OSDOPs.\n" + "{}(depth={}):\n" + " connect time: {}s\n" + " messages received: {}\n" + " messaging time: {}s\n" + " latency: {}ms\n" + " IOPS: {}\n" + " throughput: {}MB/s\n", + *conn, + lname, + nr_depth, + dur_conn.count(), + ops, + dur_msg.count(), + conn_stats.total_lat_s / conn_stats.sampled_count * 1000, + ops / dur_msg.count(), + ops / dur_msg.count() * msg_len / 1048576); + stopped_send_promise.set_value(); + }); + } + }; + }; + + return seastar::when_all( + test_state::Server::create(server_conf.core, server_conf.block_size), + create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth) + ).then([=](auto&& ret) { + auto fp_server = std::move(std::get<0>(ret).get0()); + auto client = std::move(std::get<1>(ret).get0()); + test_state::Server* server = fp_server.get(); + if (mode == perf_mode_t::both) { + logger().info("\nperf settings:\n {}\n {}\n", + client_conf.str(), server_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); + ceph_assert(seastar::smp::count >= 1+server_conf.core); + ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs); + return seastar::when_all_succeed( + server->init(server_conf.v1_crc_enabled, server_conf.addr), + client->init(client_conf.v1_crc_enabled) + ).then_unpack([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).then([client] { + return client->shutdown(); + }).then([server, fp_server = std::move(fp_server)] () mutable { + return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }); + } else if (mode == perf_mode_t::client) { + logger().info("\nperf settings:\n {}\n", client_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); + return client->init(client_conf.v1_crc_enabled + ).then([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).then([client] { + return client->shutdown(); + }); + } else { // mode == perf_mode_t::server + ceph_assert(seastar::smp::count >= 1+server_conf.core); + logger().info("\nperf settings:\n {}\n", server_conf.str()); + return server->init(server_conf.v1_crc_enabled, server_conf.addr + // dispatch ops + ).then([server] { + return server->wait(); + // shutdown + }).then([server, fp_server = std::move(fp_server)] () mutable { + return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }); + } + }); +} + +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("mode", bpo::value<unsigned>()->default_value(0), + "0: both, 1:client, 2:server") + ("addr", bpo::value<std::string>()->default_value("v1:127.0.0.1:9010"), + "server address") + ("ramptime", bpo::value<unsigned>()->default_value(5), + "seconds of client ramp-up time") + ("msgtime", bpo::value<unsigned>()->default_value(15), + "seconds of client messaging time") + ("jobs", bpo::value<unsigned>()->default_value(1), + "number of client jobs (messengers)") + ("cbs", bpo::value<unsigned>()->default_value(4096), + "client block size") + ("depth", bpo::value<unsigned>()->default_value(512), + "client io depth") + ("core", bpo::value<unsigned>()->default_value(0), + "server running core") + ("sbs", bpo::value<unsigned>()->default_value(0), + "server block size") + ("v1-crc-enabled", bpo::value<bool>()->default_value(false), + "enable v1 CRC checks"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + auto mode = config["mode"].as<unsigned>(); + ceph_assert(mode <= 2); + auto _mode = static_cast<perf_mode_t>(mode); + auto server_conf = server_config::load(config); + auto client_conf = client_config::load(config); + return run(_mode, client_conf, server_conf).then([] { + logger().info("\nsuccessful!\n"); + }).handle_exception([] (auto eptr) { + logger().info("\nfailed!\n"); + return seastar::make_exception_future<>(eptr); + }); + }); +} diff --git a/src/tools/crimson/perf_staged_fltree.cc b/src/tools/crimson/perf_staged_fltree.cc new file mode 100644 index 000000000..14f863508 --- /dev/null +++ b/src/tools/crimson/perf_staged_fltree.cc @@ -0,0 +1,129 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include <boost/program_options.hpp> + +#include <seastar/core/app-template.hh> +#include <seastar/core/thread.hh> + +#include "crimson/common/log.h" +#include "crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h" +#include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager.h" +#include "test/crimson/seastore/transaction_manager_test_state.h" + +using namespace crimson::os::seastore::onode; +namespace bpo = boost::program_options; + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); +} + +template <bool TRACK> +class PerfTree : public TMTestState { + public: + PerfTree(bool is_dummy) : is_dummy{is_dummy} {} + + seastar::future<> run(KVPool& kvs) { + return tm_setup().then([this, &kvs] { + return seastar::async([this, &kvs] { + auto tree = std::make_unique<TreeBuilder<TRACK>>(kvs, + (is_dummy ? NodeExtentManager::create_dummy(true) + : NodeExtentManager::create_seastore(*tm))); + { + auto t = tm->create_transaction(); + tree->bootstrap(*t).unsafe_get(); + tm->submit_transaction(std::move(t)).unsafe_get(); + } + { + auto t = tm->create_transaction(); + tree->insert(*t).unsafe_get(); + auto start_time = mono_clock::now(); + tm->submit_transaction(std::move(t)).unsafe_get(); + std::chrono::duration<double> duration = mono_clock::now() - start_time; + logger().warn("submit_transaction() done! {}s", duration.count()); + } + { + auto t = tm->create_transaction(); + tree->get_stats(*t).unsafe_get(); + tm->submit_transaction(std::move(t)).unsafe_get(); + } + { + // Note: tm->create_weak_transaction() can also work, but too slow. + auto t = tm->create_transaction(); + tree->validate(*t).unsafe_get(); + } + tree.reset(); + }); + }).then([this] { + return tm_teardown(); + }); + } + + private: + bool is_dummy; +}; + +template <bool TRACK> +seastar::future<> run(const bpo::variables_map& config) { + return seastar::async([&config] { + auto backend = config["backend"].as<std::string>(); + bool is_dummy; + if (backend == "dummy") { + is_dummy = true; + } else if (backend == "seastore") { + is_dummy = false; + } else { + ceph_abort(false && "invalid backend"); + } + auto str_sizes = config["str-sizes"].as<std::vector<size_t>>(); + auto onode_sizes = config["onode-sizes"].as<std::vector<size_t>>(); + auto range2 = config["range2"].as<std::vector<int>>(); + ceph_assert(range2.size() == 2); + auto range1 = config["range1"].as<std::vector<unsigned>>(); + ceph_assert(range1.size() == 2); + auto range0 = config["range0"].as<std::vector<unsigned>>(); + ceph_assert(range0.size() == 2); + + KVPool kvs{str_sizes, onode_sizes, + {range2[0], range2[1]}, + {range1[0], range1[1]}, + {range0[0], range0[1]}}; + PerfTree<TRACK> perf{is_dummy}; + perf.run(kvs).get0(); + }); +} + + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("backend", bpo::value<std::string>()->default_value("dummy"), + "tree backend: dummy, seastore") + ("tracked", bpo::value<bool>()->default_value(false), + "track inserted cursors") + ("str-sizes", bpo::value<std::vector<size_t>>()->default_value( + {8, 11, 64, 256, 301, 320}), + "sizes of ns/oid strings") + ("onode-sizes", bpo::value<std::vector<size_t>>()->default_value( + {8, 16, 128, 512, 576, 640}), + "sizes of onode") + ("range2", bpo::value<std::vector<int>>()->default_value( + {0, 128}), + "range of shard-pool-crush [a, b)") + ("range1", bpo::value<std::vector<unsigned>>()->default_value( + {0, 10}), + "range of ns-oid strings [a, b)") + ("range0", bpo::value<std::vector<unsigned>>()->default_value( + {0, 4}), + "range of snap-gen [a, b)"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + auto tracked = config["tracked"].as<bool>(); + if (tracked) { + return run<true>(config); + } else { + return run<false>(config); + } + }); +} |