diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/test/crimson | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/crimson')
-rw-r--r-- | src/test/crimson/CMakeLists.txt | 42 | ||||
-rw-r--r-- | src/test/crimson/perf_crimson_msgr.cc | 357 | ||||
-rw-r--r-- | src/test/crimson/test_alien_echo.cc | 281 | ||||
-rw-r--r-- | src/test/crimson/test_async_echo.cc | 238 | ||||
-rw-r--r-- | src/test/crimson/test_buffer.cc | 49 | ||||
-rw-r--r-- | src/test/crimson/test_config.cc | 110 | ||||
-rw-r--r-- | src/test/crimson/test_denc.cc | 53 | ||||
-rw-r--r-- | src/test/crimson/test_lru.cc | 213 | ||||
-rw-r--r-- | src/test/crimson/test_messenger.cc | 420 | ||||
-rw-r--r-- | src/test/crimson/test_monc.cc | 78 | ||||
-rw-r--r-- | src/test/crimson/test_perfcounters.cc | 62 | ||||
-rw-r--r-- | src/test/crimson/test_thread_pool.cc | 49 |
12 files changed, 1952 insertions, 0 deletions
diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt new file mode 100644 index 00000000..625610c2 --- /dev/null +++ b/src/test/crimson/CMakeLists.txt @@ -0,0 +1,42 @@ +add_executable(unittest_seastar_buffer + test_buffer.cc) +target_link_libraries(unittest_seastar_buffer ceph-common crimson) + +add_executable(unittest_seastar_denc + test_denc.cc) +target_link_libraries(unittest_seastar_denc crimson GTest::Main) + +add_executable(unittest_seastar_messenger test_messenger.cc) +target_link_libraries(unittest_seastar_messenger ceph-common crimson) + +add_executable(perf_crimson_msgr perf_crimson_msgr.cc) +target_link_libraries(perf_crimson_msgr ceph-common crimson) + +add_executable(unittest_seastar_echo + test_alien_echo.cc) +target_link_libraries(unittest_seastar_echo crimson) + +add_executable(unittest_async_echo + test_async_echo.cc) +target_link_libraries(unittest_async_echo ceph-common global) + +add_executable(unittest_seastar_thread_pool + test_thread_pool.cc) +target_link_libraries(unittest_seastar_thread_pool crimson) + +add_executable(unittest_seastar_config + test_config.cc) +target_link_libraries(unittest_seastar_config crimson) + +add_executable(unittest_seastar_monc + test_monc.cc) +target_link_libraries(unittest_seastar_monc crimson) + +add_executable(unittest_seastar_perfcounters + test_perfcounters.cc) +target_link_libraries(unittest_seastar_perfcounters crimson) + +add_executable(unittest_seastar_lru + test_lru.cc) +target_link_libraries(unittest_seastar_lru crimson GTest::Main) + diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc new file mode 100644 index 00000000..f1973845 --- /dev/null +++ b/src/test/crimson/perf_crimson_msgr.cc @@ -0,0 +1,357 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <map> +#include <random> +#include <boost/program_options.hpp> + +#include <seastar/core/app-template.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/semaphore.hh> + +#include "common/ceph_time.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" + +#include "crimson/common/log.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Messenger.h" + +namespace bpo = boost::program_options; + +namespace { + +template<typename Message> +using Ref = boost::intrusive_ptr<Message>; + +seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); +} + + +enum class perf_mode_t { + both, + client, + server +}; + +static std::random_device rd; +static std::default_random_engine rng{rd()}; + +static seastar::future<> run(unsigned rounds, + double keepalive_ratio, + int bs, + int depth, + std::string addr, + perf_mode_t mode) +{ + struct test_state { + struct Server final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service<Server> { + ceph::net::Messenger *msgr = nullptr; + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + // reply + Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m); + req->finish_decode(); + return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false }); + } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const entity_addr_t& addr) { + auto&& fut = ceph::net::Messenger::create(name, lname, nonce, 1); + return fut.then([this, addr](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + server.msgr->set_crc_header(); + }).then([messenger, addr] { + return messenger->bind(entity_addrvec_t{addr}); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); + } + }; + + struct Client final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service<Client> { + + struct PingSession : public seastar::enable_shared_from_this<PingSession> { + unsigned count = 0u; + mono_time connected_time; + mono_time finish_time; + }; + using PingSessionRef = seastar::shared_ptr<PingSession>; + + unsigned rounds; + std::bernoulli_distribution keepalive_dist; + ceph::net::Messenger *msgr = nullptr; + std::map<ceph::net::Connection*, seastar::promise<>> pending_conns; + std::map<ceph::net::ConnectionRef, PingSessionRef> sessions; + int msg_len; + bufferlist msg_data; + seastar::semaphore depth; + + Client(unsigned rounds, double keepalive_ratio, int msg_len, int depth) + : rounds(rounds), + keepalive_dist(std::bernoulli_distribution{keepalive_ratio}), + depth(depth) { + bufferptr ptr(msg_len); + memset(ptr.c_str(), 0, msg_len); + msg_data.append(ptr); + } + + PingSessionRef find_session(ceph::net::ConnectionRef c) { + auto found = sessions.find(c); + if (found == sessions.end()) { + ceph_assert(false); + } + return found->second; + } + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::now(); + } + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { + logger().info("{}: connected to {}", *conn, conn->get_peer_addr()); + auto session = seastar::make_shared<PingSession>(); + auto [i, added] = sessions.emplace(conn, session); + std::ignore = i; + ceph_assert(added); + session->connected_time = mono_clock::now(); + return seastar::now(); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY); + depth.signal(1); + auto session = find_session(c); + ++(session->count); + + if (session->count == rounds) { + logger().info("{}: finished receiving {} OPREPLYs", *c.get(), session->count); + session->finish_time = mono_clock::now(); + return container().invoke_on_all([conn = c.get()](auto &client) { + auto found = client.pending_conns.find(conn); + ceph_assert(found != client.pending_conns.end()); + found->second.set_value(); + }); + } else { + return seastar::now(); + } + } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + return ceph::net::Messenger::create(name, lname, nonce, 2) + .then([this](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& client) { + client.msgr = messenger->get_local_shard(); + client.msgr->set_crc_header(); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); + } + + seastar::future<> dispatch_messages(const entity_addr_t& peer_addr, bool foreign_dispatch=true) { + mono_time start_time = mono_clock::now(); + return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) + .then([this, foreign_dispatch, start_time](auto conn) { + return seastar::futurize_apply([this, conn, foreign_dispatch] { + if (foreign_dispatch) { + return do_dispatch_messages(&**conn); + } else { + // NOTE: this could be faster if we don't switch cores in do_dispatch_messages(). + return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { + return client.do_dispatch_messages(conn); + }); + } + }).finally([this, conn, start_time] { + return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) { + auto session = client.find_session((*conn)->shared_from_this()); + std::chrono::duration<double> dur_handshake = session->connected_time - start_time; + std::chrono::duration<double> dur_messaging = session->finish_time - session->connected_time; + logger().info("{}: handshake {}, messaging {}", + **conn, dur_handshake.count(), dur_messaging.count()); + }); + }); + }); + } + + private: + seastar::future<> send_msg(ceph::net::Connection* conn) { + return depth.wait(1).then([this, conn] { + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + MOSDOp *m = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + m->write(0, msg_len, data); + MessageRef msg = {m, false}; + return conn->send(msg); + }); + } + + seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) { + return container().invoke_on_all([conn](auto& client) { + auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); + std::ignore = i; + ceph_assert(added); + }).then([this, conn] { + return seastar::do_with(0u, 0u, + [this, conn](auto &count_ping, auto &count_keepalive) { + return seastar::do_until( + [this, conn, &count_ping, &count_keepalive] { + bool stop = (count_ping == rounds); + if (stop) { + logger().info("{}: finished sending {} OSDOPs with {} keepalives", + *conn, count_ping, count_keepalive); + } + return stop; + }, + [this, conn, &count_ping, &count_keepalive] { + return seastar::repeat([this, conn, &count_ping, &count_keepalive] { + if (keepalive_dist(rng)) { + return conn->keepalive() + .then([&count_keepalive] { + count_keepalive += 1; + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::no); + }); + } else { + return send_msg(conn) + .then([&count_ping] { + count_ping += 1; + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + }); + } + }); + }).then([this, conn] { + auto found = pending_conns.find(conn); + return found->second.get_future(); + }); + }); + }); + } + }; + }; + + return seastar::when_all_succeed( + ceph::net::create_sharded<test_state::Server>(), + ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio, bs, depth)) + .then([rounds, keepalive_ratio, addr, mode](test_state::Server *server, + test_state::Client *client) { + entity_addr_t target_addr; + target_addr.parse(addr.c_str(), nullptr); + target_addr.set_type(entity_addr_t::TYPE_LEGACY); + if (mode == perf_mode_t::both) { + return seastar::when_all_succeed( + server->init(entity_name_t::OSD(0), "server", 0, target_addr), + client->init(entity_name_t::OSD(1), "client", 0)) + // dispatch pingpoing + .then([client, target_addr] { + return client->dispatch_messages(target_addr, false); + // shutdown + }).finally([client] { + logger().info("client shutdown..."); + return client->shutdown(); + }).finally([server] { + logger().info("server shutdown..."); + return server->shutdown(); + }); + } else if (mode == perf_mode_t::client) { + return client->init(entity_name_t::OSD(1), "client", 0) + // dispatch pingpoing + .then([client, target_addr] { + return client->dispatch_messages(target_addr, false); + // shutdown + }).finally([client] { + logger().info("client shutdown..."); + return client->shutdown(); + }); + } else { // mode == perf_mode_t::server + return server->init(entity_name_t::OSD(0), "server", 0, target_addr) + // dispatch pingpoing + .then([server] { + return server->msgr->wait(); + // shutdown + }).finally([server] { + logger().info("server shutdown..."); + return server->shutdown(); + }); + } + }); +} + +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("addr", bpo::value<std::string>()->default_value("0.0.0.0:9010"), + "start server") + ("mode", bpo::value<int>()->default_value(0), + "0: both, 1:client, 2:server") + ("rounds", bpo::value<unsigned>()->default_value(65536), + "number of messaging rounds") + ("keepalive-ratio", bpo::value<double>()->default_value(0), + "ratio of keepalive in ping messages") + ("bs", bpo::value<int>()->default_value(4096), + "block size") + ("depth", bpo::value<int>()->default_value(512), + "io depth"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + auto rounds = config["rounds"].as<unsigned>(); + auto keepalive_ratio = config["keepalive-ratio"].as<double>(); + auto bs = config["bs"].as<int>(); + auto depth = config["depth"].as<int>(); + auto addr = config["addr"].as<std::string>(); + auto mode = config["mode"].as<int>(); + logger().info("\nsettings:\n addr={}\n mode={}\n rounds={}\n keepalive-ratio={}\n bs={}\n depth={}", + addr, mode, rounds, keepalive_ratio, bs, depth); + ceph_assert(mode >= 0 && mode <= 2); + auto _mode = static_cast<perf_mode_t>(mode); + return run(rounds, keepalive_ratio, bs, depth, addr, _mode) + .then([] { + std::cout << "successful" << std::endl; + }).handle_exception([] (auto eptr) { + std::cout << "failed" << std::endl; + return seastar::make_exception_future<>(eptr); + }); + }); +} diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc new file mode 100644 index 00000000..1dbe8133 --- /dev/null +++ b/src/test/crimson/test_alien_echo.cc @@ -0,0 +1,281 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include "auth/Auth.h" +#include "messages/MPing.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Messenger.h" +#include "crimson/net/Config.h" +#include "crimson/thread/Condition.h" +#include "crimson/thread/Throttle.h" + +#include <seastar/core/alien.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> + + +enum class echo_role { + as_server, + as_client, +}; + +namespace seastar_pingpong { +struct DummyAuthAuthorizer : public AuthAuthorizer { + DummyAuthAuthorizer() + : AuthAuthorizer(CEPH_AUTH_CEPHX) + {} + bool verify_reply(bufferlist::const_iterator&, + std::string *connection_secret) override { + return true; + } + bool add_challenge(CephContext*, const bufferlist&) override { + return true; + } +}; + +struct Server { + ceph::thread::Throttle byte_throttler; + ceph::net::Messenger& msgr; + struct ServerDispatcher : ceph::net::Dispatcher { + unsigned count = 0; + seastar::condition_variable on_reply; + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + std::cout << "server got ping " << *m << std::endl; + // reply with a pong + return c->send(MessageRef{new MPing(), false}).then([this] { + ++count; + on_reply.signal(); + }); + } + seastar::future<ceph::net::msgr_tag_t, bufferlist> + ms_verify_authorizer(peer_type_t peer_type, + auth_proto_t protocol, + bufferlist& auth) override { + return seastar::make_ready_future<ceph::net::msgr_tag_t, bufferlist>( + 0, bufferlist{}); + } + seastar::future<std::unique_ptr<AuthAuthorizer>> + ms_get_authorizer(peer_type_t) override { + return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>( + new DummyAuthAuthorizer{}); + } + } dispatcher; + Server(ceph::net::Messenger& msgr) + : byte_throttler(ceph::net::conf.osd_client_message_size_cap), + msgr{msgr} + { + msgr.set_crc_header(); + msgr.set_crc_data(); + } +}; + +struct Client { + ceph::thread::Throttle byte_throttler; + ceph::net::Messenger& msgr; + struct ClientDispatcher : ceph::net::Dispatcher { + unsigned count = 0; + seastar::condition_variable on_reply; + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + std::cout << "client got pong " << *m << std::endl; + ++count; + on_reply.signal(); + return seastar::now(); + } + } dispatcher; + Client(ceph::net::Messenger& msgr) + : byte_throttler(ceph::net::conf.osd_client_message_size_cap), + msgr{msgr} + { + msgr.set_crc_header(); + msgr.set_crc_data(); + } +}; +} // namespace seastar_pingpong + +class SeastarContext { + seastar::file_desc begin_fd; + ceph::thread::Condition on_end; + +public: + SeastarContext() + : begin_fd{seastar::file_desc::eventfd(0, 0)} + {} + + template<class Func> + std::thread with_seastar(Func&& func) { + return std::thread{[this, func = std::forward<Func>(func)] { + // alien: are you ready? + wait_for_seastar(); + // alien: could you help me apply(func)? + func(); + // alien: i've sent my request. have you replied it? + // wait_for_seastar(); + // alien: you are free to go! + on_end.notify(); + }}; + } + + void run(seastar::app_template& app, int argc, char** argv) { + app.run(argc, argv, [this] { + return seastar::now().then([this] { + return set_seastar_ready(); + }).then([this] { + // seastar: let me know once i am free to leave. + return on_end.wait(); + }).handle_exception([](auto ep) { + std::cerr << "Error: " << ep << std::endl; + }).finally([] { + seastar::engine().exit(0); + }); + }); + } + + seastar::future<> set_seastar_ready() { + // seastar: i am ready to serve! + ::eventfd_write(begin_fd.get(), 1); + return seastar::now(); + } + +private: + void wait_for_seastar() { + eventfd_t result = 0; + if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) { + std::cerr << "unable to eventfd_read():" << errno << std::endl; + } + } +}; + +static seastar::future<> +seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) +{ + std::cout << "seastar/"; + if (role == echo_role::as_server) { + return ceph::net::Messenger::create(entity_name_t::OSD(0), "server", + addr.get_nonce(), 0) + .then([addr, count] (auto msgr) { + return seastar::do_with(seastar_pingpong::Server{*msgr}, + [addr, count](auto& server) mutable { + std::cout << "server listening at " << addr << std::endl; + // bind the server + server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &server.byte_throttler); + return server.msgr.bind(entity_addrvec_t{addr}) + .then([&server] { + return server.msgr.start(&server.dispatcher); + }).then([&dispatcher=server.dispatcher, count] { + return dispatcher.on_reply.wait([&dispatcher, count] { + return dispatcher.count >= count; + }); + }).finally([&server] { + std::cout << "server shutting down" << std::endl; + return server.msgr.shutdown(); + }); + }); + }); + } else { + return ceph::net::Messenger::create(entity_name_t::OSD(1), "client", + addr.get_nonce(), 0) + .then([addr, count] (auto msgr) { + return seastar::do_with(seastar_pingpong::Client{*msgr}, + [addr, count](auto& client) { + std::cout << "client sending to " << addr << std::endl; + client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &client.byte_throttler); + return client.msgr.start(&client.dispatcher) + .then([addr, &client] { + return client.msgr.connect(addr, entity_name_t::TYPE_OSD); + }).then([&disp=client.dispatcher, count](ceph::net::ConnectionXRef conn) { + return seastar::do_until( + [&disp,count] { return disp.count >= count; }, + [&disp,conn] { return (*conn)->send(MessageRef{new MPing(), false}) + .then([&] { return disp.on_reply.wait(); }); + }); + }).finally([&client] { + std::cout << "client shutting down" << std::endl; + return client.msgr.shutdown(); + }); + }); + }); + } +} + +int main(int argc, char** argv) +{ + namespace po = boost::program_options; + po::options_description desc{"Allowed options"}; + desc.add_options() + ("help,h", "show help message") + ("role", po::value<std::string>()->default_value("pong"), + "role to play (ping | pong)") + ("port", po::value<uint16_t>()->default_value(9010), + "port #") + ("nonce", po::value<uint32_t>()->default_value(42), + "a unique number to identify the pong server") + ("count", po::value<unsigned>()->default_value(10), + "stop after sending/echoing <count> MPing messages") + ("v2", po::value<bool>()->default_value(false), + "using msgr v2 protocol"); + po::variables_map vm; + std::vector<std::string> unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + entity_addr_t addr; + if (vm["v2"].as<bool>()) { + addr.set_type(entity_addr_t::TYPE_MSGR2); + } else { + addr.set_type(entity_addr_t::TYPE_LEGACY); + } + addr.set_family(AF_INET); + addr.set_port(vm["port"].as<std::uint16_t>()); + addr.set_nonce(vm["nonce"].as<std::uint32_t>()); + + echo_role role = echo_role::as_server; + if (vm["role"].as<std::string>() == "ping") { + role = echo_role::as_client; + } + + auto count = vm["count"].as<unsigned>(); + seastar::app_template app; + SeastarContext sc; + auto job = sc.with_seastar([&] { + auto fut = seastar::alien::submit_to(0, [addr, role, count] { + return seastar_echo(addr, role, count); + }); + fut.wait(); + }); + std::vector<char*> av{argv[0]}; + std::transform(begin(unrecognized_options), + end(unrecognized_options), + std::back_inserter(av), + [](auto& s) { + return const_cast<char*>(s.c_str()); + }); + sc.run(app, av.size(), av.data()); + job.join(); +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * unittest_seastar_echo" + * End: + */ diff --git a/src/test/crimson/test_async_echo.cc b/src/test/crimson/test_async_echo.cc new file mode 100644 index 00000000..7a7323f1 --- /dev/null +++ b/src/test/crimson/test_async_echo.cc @@ -0,0 +1,238 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include <boost/program_options/variables_map.hpp> +#include <boost/program_options/parsers.hpp> + +#include "auth/Auth.h" +#include "global/global_init.h" +#include "messages/MPing.h" +#include "msg/Dispatcher.h" +#include "msg/Messenger.h" + +#include "auth/DummyAuth.h" + +enum class echo_role { + as_server, + as_client, +}; + +namespace native_pingpong { + +constexpr int CEPH_OSD_PROTOCOL = 10; + +struct Server { + Server(CephContext* cct, const entity_inst_t& entity) + : dummy_auth(cct), dispatcher(cct) + { + msgr.reset(Messenger::create(cct, "async", + entity.name, "pong", entity.addr.get_nonce(), 0)); + dummy_auth.auth_registry.refresh_config(); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + dispatcher.ms_set_require_authorizer(false); + } + DummyAuthClientServer dummy_auth; + unique_ptr<Messenger> msgr; + struct ServerDispatcher : Dispatcher { + std::mutex mutex; + std::condition_variable on_reply; + bool replied = false; + ServerDispatcher(CephContext* cct) + : Dispatcher(cct) + {} + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_PING; + } + void ms_fast_dispatch(Message* m) override { + m->get_connection()->send_message(new MPing); + m->put(); + { + std::lock_guard lock{mutex}; + replied = true; + } + on_reply.notify_one(); + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection*) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + void echo() { + replied = false; + std::unique_lock lock{mutex}; + return on_reply.wait(lock, [this] { return replied; }); + } + } dispatcher; + void echo() { + dispatcher.echo(); + } +}; + +struct Client { + unique_ptr<Messenger> msgr; + Client(CephContext *cct) + : dummy_auth(cct), dispatcher(cct) + { + msgr.reset(Messenger::create(cct, "async", + entity_name_t::CLIENT(-1), "ping", + getpid(), 0)); + dummy_auth.auth_registry.refresh_config(); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::lossy_client(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + dispatcher.ms_set_require_authorizer(false); + } + DummyAuthClientServer dummy_auth; + struct ClientDispatcher : Dispatcher { + std::mutex mutex; + std::condition_variable on_reply; + bool replied = false; + + ClientDispatcher(CephContext* cct) + : Dispatcher(cct) + {} + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_PING; + } + void ms_fast_dispatch(Message* m) override { + m->put(); + { + std::lock_guard lock{mutex}; + replied = true; + } + on_reply.notify_one(); + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection *) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + bool ping(Messenger* msgr, const entity_inst_t& peer) { + auto conn = msgr->connect_to(peer.name.type(), + entity_addrvec_t{peer.addr}); + replied = false; + conn->send_message(new MPing); + std::unique_lock lock{mutex}; + return on_reply.wait_for(lock, 500ms, [&] { + return replied; + }); + } + } dispatcher; + void ping(const entity_inst_t& peer) { + dispatcher.ping(msgr.get(), peer); + } +}; +} // namespace native_pingpong + +static void ceph_echo(CephContext* cct, + entity_addr_t addr, echo_role role, unsigned count) +{ + std::cout << "ceph/"; + entity_inst_t entity{entity_name_t::OSD(0), addr}; + if (role == echo_role::as_server) { + std::cout << "server listening at " << addr << std::endl; + native_pingpong::Server server{cct, entity}; + server.msgr->bind(addr); + server.msgr->add_dispatcher_head(&server.dispatcher); + server.msgr->start(); + for (unsigned i = 0; i < count; i++) { + server.echo(); + } + server.msgr->shutdown(); + server.msgr->wait(); + } else { + std::cout << "client sending to " << addr << std::endl; + native_pingpong::Client client{cct}; + client.msgr->add_dispatcher_head(&client.dispatcher); + client.msgr->start(); + auto conn = client.msgr->connect_to(entity.name.type(), + entity_addrvec_t{entity.addr}); + for (unsigned i = 0; i < count; i++) { + std::cout << "seq=" << i << std::endl; + client.ping(entity); + } + client.msgr->shutdown(); + client.msgr->wait(); + } +} + +int main(int argc, char** argv) +{ + namespace po = boost::program_options; + po::options_description desc{"Allowed options"}; + desc.add_options() + ("help,h", "show help message") + ("role", po::value<std::string>()->default_value("pong"), + "role to play (ping | pong)") + ("port", po::value<uint16_t>()->default_value(9010), + "port #") + ("nonce", po::value<uint32_t>()->default_value(42), + "a unique number to identify the pong server") + ("count", po::value<unsigned>()->default_value(10), + "stop after sending/echoing <count> MPing messages") + ("v2", po::value<bool>()->default_value(false), + "using msgr v2 protocol"); + po::variables_map vm; + std::vector<std::string> unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + entity_addr_t addr; + if (vm["v2"].as<bool>()) { + addr.set_type(entity_addr_t::TYPE_MSGR2); + } else { + addr.set_type(entity_addr_t::TYPE_LEGACY); + } + addr.set_family(AF_INET); + addr.set_port(vm["port"].as<std::uint16_t>()); + addr.set_nonce(vm["nonce"].as<std::uint32_t>()); + + echo_role role = echo_role::as_server; + if (vm["role"].as<std::string>() == "ping") { + role = echo_role::as_client; + } + + auto count = vm["count"].as<unsigned>(); + std::vector<const char*> args(argv, argv + argc); + auto cct = global_init(nullptr, args, + CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); + common_init_finish(cct.get()); + ceph_echo(cct.get(), addr, role, count); +} diff --git a/src/test/crimson/test_buffer.cc b/src/test/crimson/test_buffer.cc new file mode 100644 index 00000000..1093b688 --- /dev/null +++ b/src/test/crimson/test_buffer.cc @@ -0,0 +1,49 @@ +#include "include/buffer.h" +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> + +// allocate a foreign buffer on each cpu, collect them all into a bufferlist, +// and destruct it on this cpu +seastar::future<> test_foreign_bufferlist() +{ + auto make_foreign_buffer = [] (unsigned cpu) { + return seastar::smp::submit_to(cpu, [=] { + bufferlist bl; + seastar::temporary_buffer<char> buf("abcd", 4); + bl.append(buffer::create(std::move(buf))); + return bl; + }); + }; + auto reduce = [] (bufferlist&& lhs, bufferlist&& rhs) { + bufferlist bl; + bl.claim_append(lhs); + bl.claim_append(rhs); + return bl; + }; + return seastar::map_reduce(seastar::smp::all_cpus(), make_foreign_buffer, + bufferlist(), reduce).then( + [] (bufferlist&& bl) { + if (bl.length() != 4 * seastar::smp::count) { + auto e = std::make_exception_ptr(std::runtime_error("wrong buffer size")); + return seastar::make_exception_future<>(e); + } + bl.clear(); + return seastar::make_ready_future<>(); + }); +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + return app.run(argc, argv, [] { + return seastar::now().then( + &test_foreign_bufferlist + ).then([] { + std::cout << "All tests succeeded" << std::endl; + }).handle_exception([] (auto eptr) { + std::cout << "Test failure" << std::endl; + return seastar::make_exception_future<>(eptr); + }); + }); +} diff --git a/src/test/crimson/test_config.cc b/src/test/crimson/test_config.cc new file mode 100644 index 00000000..35b258f6 --- /dev/null +++ b/src/test/crimson/test_config.cc @@ -0,0 +1,110 @@ +#include <chrono> +#include <string> +#include <numeric> +#include <seastar/core/app-template.hh> +#include <seastar/core/sharded.hh> +#include "common/ceph_argparse.h" +#include "common/config_obs.h" +#include "crimson/common/config_proxy.h" + +using Config = ceph::common::ConfigProxy; +const std::string test_uint_option = "osd_max_pgls"; +const uint64_t INVALID_VALUE = (uint64_t)(-1); + +class ConfigObs : public ceph::md_config_obs_impl<Config> { + uint64_t last_change = INVALID_VALUE; + uint64_t num_changes = 0; + + const char** get_tracked_conf_keys() const override { + static const char* keys[] = { + test_uint_option.c_str(), + nullptr, + }; + return keys; + } + void handle_conf_change(const Config& conf, + const std::set <std::string> &changes) override{ + if (changes.count(test_uint_option)) { + last_change = conf.get_val<uint64_t>(test_uint_option); + num_changes += 1; + } + } +public: + ConfigObs() { + ceph::common::local_conf().add_observer(this); + } + + uint64_t get_last_change() const { return last_change; } + uint64_t get_num_changes() const { return num_changes; } + seastar::future<> stop() { + ceph::common::local_conf().remove_observer(this); + return seastar::now(); + } +}; + +seastar::sharded<ConfigObs> sharded_cobs; + +static seastar::future<> test_config() +{ + return ceph::common::sharded_conf().start(EntityName{}, string_view{"ceph"}).then([] { + std::vector<const char*> args; + std::string cluster; + std::string conf_file_list; + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_CLIENT, + &cluster, + &conf_file_list); + auto& conf = ceph::common::local_conf(); + conf->name = init_params.name; + conf->cluster = cluster; + return conf.parse_config_files(conf_file_list); + }).then([] { + return ceph::common::sharded_conf().invoke_on(0, &Config::start); + }).then([] { + return sharded_cobs.start(); + }).then([] { + return ceph::common::sharded_conf().invoke_on_all([](Config& config) { + return config.set_val(test_uint_option, + std::to_string(seastar::engine().cpu_id())); + }); + }).then([] { + auto expected = ceph::common::local_conf().get_val<uint64_t>(test_uint_option); + return ceph::common::sharded_conf().invoke_on_all([expected](Config& config) { + if (expected != config.get_val<uint64_t>(test_uint_option)) { + throw std::runtime_error("configurations don't match"); + } + if (expected != sharded_cobs.local().get_last_change()) { + throw std::runtime_error("last applied changes don't match the latest config"); + } + if (seastar::smp::count != sharded_cobs.local().get_num_changes()) { + throw std::runtime_error("num changes don't match actual changes"); + } + }); + }).finally([] { + return sharded_cobs.stop(); + }).finally([] { + return ceph::common::sharded_conf().stop(); + }); +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + return app.run(argc, argv, [&] { + return test_config().then([] { + std::cout << "All tests succeeded" << std::endl; + }).handle_exception([] (auto eptr) { + std::cout << "Test failure" << std::endl; + return seastar::make_exception_future<>(eptr); + }); + }); +} + + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * unittest_seastar_config" + * End: + */ diff --git a/src/test/crimson/test_denc.cc b/src/test/crimson/test_denc.cc new file mode 100644 index 00000000..10ebd6dc --- /dev/null +++ b/src/test/crimson/test_denc.cc @@ -0,0 +1,53 @@ +#include <string> +#include <seastar/core/temporary_buffer.hh> +#include <gtest/gtest.h> +#include "include/denc.h" +#include "common/buffer_seastar.h" + +using temporary_buffer = seastar::temporary_buffer<char>; +using buffer_iterator = seastar_buffer_iterator; +using const_buffer_iterator = const_seastar_buffer_iterator; + +template<typename T> +void test_denc(T v) { + // estimate + size_t s = 0; + denc(v, s); + ASSERT_NE(s, 0u); + + // encode + temporary_buffer buf{s}; + buffer_iterator enc{buf}; + denc(v, enc); + size_t len = enc.get() - buf.begin(); + ASSERT_LE(len, s); + + // decode + T out; + temporary_buffer encoded = buf.share(); + encoded.trim(len); + const_buffer_iterator dec{encoded}; + denc(out, dec); + ASSERT_EQ(v, out); + ASSERT_EQ(dec.get(), enc.get()); +} + +TEST(denc, simple) +{ + test_denc((uint8_t)4); + test_denc((int8_t)-5); + test_denc((uint16_t)6); + test_denc((int16_t)-7); + test_denc((uint32_t)8); + test_denc((int32_t)-9); + test_denc((uint64_t)10); + test_denc((int64_t)-11); +} + +TEST(denc, string) +{ + std::string a, b("hi"), c("multi\nline\n"); + test_denc(a); + test_denc(b); + test_denc(c); +} diff --git a/src/test/crimson/test_lru.cc b/src/test/crimson/test_lru.cc new file mode 100644 index 00000000..40ab4153 --- /dev/null +++ b/src/test/crimson/test_lru.cc @@ -0,0 +1,213 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com> + * + * Author: Loic Dachary <loic@dachary.org> + * Cheng Cheng <ccheng.leo@gmail.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Library Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Library Public License for more details. + * + */ + +#include <stdio.h> +#include "gtest/gtest.h" +#include "crimson/common/shared_lru.h" + +class LRUTest : public SharedLRU<unsigned int, int> { +public: + auto add(unsigned int key, int value, bool* existed = nullptr) { + auto pv = new int{value}; + auto ptr = insert(key, std::unique_ptr<int>{pv}); + if (existed) { + *existed = (ptr.get() != pv); + } + return ptr; + } +}; + +TEST(LRU, add) { + LRUTest cache; + unsigned int key = 1; + int value1 = 2; + bool existed = false; + { + auto ptr = cache.add(key, value1, &existed); + ASSERT_TRUE(ptr); + ASSERT_TRUE(ptr.get()); + ASSERT_EQ(value1, *ptr); + ASSERT_FALSE(existed); + } + { + auto ptr = cache.add(key, 3, &existed); + ASSERT_EQ(value1, *ptr); + ASSERT_TRUE(existed); + } +} + +TEST(LRU, empty) { + LRUTest cache; + unsigned int key = 1; + bool existed = false; + + ASSERT_TRUE(cache.empty()); + { + int value1 = 2; + auto ptr = cache.add(key, value1, &existed); + ASSERT_EQ(value1, *ptr); + ASSERT_FALSE(existed); + } + ASSERT_FALSE(cache.empty()); + + cache.clear(); + ASSERT_TRUE(cache.empty()); +} + +TEST(LRU, lookup) { + LRUTest cache; + unsigned int key = 1; + { + int value = 2; + auto ptr = cache.add(key, value); + ASSERT_TRUE(ptr); + ASSERT_TRUE(ptr.get()); + ASSERT_TRUE(cache.find(key).get()); + ASSERT_EQ(value, *cache.find(key)); + } + ASSERT_TRUE(cache.find(key).get()); +} + +TEST(LRU, lookup_or_create) { + LRUTest cache; + { + int value = 2; + unsigned int key = 1; + ASSERT_TRUE(cache.add(key, value).get()); + ASSERT_TRUE(cache[key].get()); + ASSERT_EQ(value, *cache.find(key)); + } + { + unsigned int key = 2; + ASSERT_TRUE(cache[key].get()); + ASSERT_EQ(0, *cache.find(key)); + } + ASSERT_TRUE(cache.find(1).get()); + ASSERT_TRUE(cache.find(2).get()); +} + +TEST(LRU, lower_bound) { + LRUTest cache; + + { + unsigned int key = 1; + ASSERT_FALSE(cache.lower_bound(key)); + int value = 2; + + ASSERT_TRUE(cache.add(key, value).get()); + ASSERT_TRUE(cache.lower_bound(key).get()); + EXPECT_EQ(value, *cache.lower_bound(key)); + } +} + +TEST(LRU, get_next) { + + { + LRUTest cache; + const unsigned int key = 0; + EXPECT_FALSE(cache.upper_bound(key)); + } + { + LRUTest cache; + const unsigned int key1 = 111; + auto ptr1 = cache[key1]; + const unsigned int key2 = 222; + auto ptr2 = cache[key2]; + + auto i = cache.upper_bound(0); + ASSERT_TRUE(i); + EXPECT_EQ(i->first, key1); + auto j = cache.upper_bound(i->first); + ASSERT_TRUE(j); + EXPECT_EQ(j->first, key2); + } +} + +TEST(LRU, clear) { + LRUTest cache; + unsigned int key = 1; + int value = 2; + cache.add(key, value); + { + auto found = cache.find(key); + ASSERT_TRUE(found); + ASSERT_EQ(value, *found); + } + ASSERT_TRUE(cache.find(key).get()); + cache.clear(); + ASSERT_FALSE(cache.find(key)); + ASSERT_TRUE(cache.empty()); +} + +TEST(LRU, eviction) { + LRUTest cache{5}; + bool existed; + // add a bunch of elements, some of them will be evicted + for (size_t i = 0; i < 2 * cache.capacity(); ++i) { + cache.add(i, i, &existed); + ASSERT_FALSE(existed); + } + size_t i = 0; + for (; i < cache.capacity(); ++i) { + ASSERT_FALSE(cache.find(i)); + } + for (; i < 2 * cache.capacity(); ++i) { + ASSERT_TRUE(cache.find(i)); + } +} + +TEST(LRU, track_weak) { + constexpr int SIZE = 5; + LRUTest cache{SIZE}; + + bool existed = false; + // strong reference to keep 0 alive + auto ptr = cache.add(0, 0, &existed); + ASSERT_FALSE(existed); + + // add a bunch of elements to get 0 evicted + for (size_t i = 1; i < 2 * cache.capacity(); ++i) { + cache.add(i, i, &existed); + ASSERT_FALSE(existed); + } + // 0 is still reachable via the cache + ASSERT_TRUE(cache.find(0)); + ASSERT_TRUE(cache.find(0).get()); + ASSERT_EQ(0, *cache.find(0)); + + // [0..SIZE) are evicted when adding [SIZE..2*SIZE) + // [SIZE..SIZE * 2) were still in the cache before accessing 0, + // but SIZE got evicted when accessing 0 + ASSERT_FALSE(cache.find(SIZE-1)); + ASSERT_FALSE(cache.find(SIZE)); + ASSERT_TRUE(cache.find(SIZE+1)); + ASSERT_TRUE(cache.find(SIZE+1).get()); + ASSERT_EQ((int)SIZE+1, *cache.find(SIZE+1)); + + ptr.reset(); + // 0 is still reachable, as it is now put back into LRU cache + ASSERT_TRUE(cache.find(0)); +} + +// Local Variables: +// compile-command: "cmake --build ../../../build -j 8 --target unittest_seastar_lru && ctest -R unittest_seastar_lru # --gtest_filter=*.* --log-to-stderr=true" +// End: diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc new file mode 100644 index 00000000..6ec23f6d --- /dev/null +++ b/src/test/crimson/test_messenger.cc @@ -0,0 +1,420 @@ +#include "common/ceph_time.h" +#include "messages/MPing.h" +#include "crimson/common/log.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Messenger.h" + +#include <map> +#include <random> +#include <boost/program_options.hpp> +#include <seastar/core/app-template.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sleep.hh> + +namespace bpo = boost::program_options; + +namespace { + +seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); +} + +static std::random_device rd; +static std::default_random_engine rng{rd()}; +static bool verbose = false; + +static seastar::future<> test_echo(unsigned rounds, + double keepalive_ratio) +{ + struct test_state { + struct Server final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service<Server> { + ceph::net::Messenger *msgr = nullptr; + MessageRef msg_pong{new MPing(), false}; + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + if (verbose) { + logger().info("server got {}", *m); + } + // reply with a pong + return c->send(msg_pong); + } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const entity_addr_t& addr) { + auto&& fut = ceph::net::Messenger::create(name, lname, nonce); + return fut.then([this, addr](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + }).then([messenger, addr] { + return messenger->bind(entity_addrvec_t{addr}); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); + } + }; + + struct Client final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service<Client> { + + struct PingSession : public seastar::enable_shared_from_this<PingSession> { + unsigned count = 0u; + mono_time connected_time; + mono_time finish_time; + }; + using PingSessionRef = seastar::shared_ptr<PingSession>; + + unsigned rounds; + std::bernoulli_distribution keepalive_dist; + ceph::net::Messenger *msgr = nullptr; + std::map<ceph::net::Connection*, seastar::promise<>> pending_conns; + std::map<ceph::net::ConnectionRef, PingSessionRef> sessions; + MessageRef msg_ping{new MPing(), false}; + + Client(unsigned rounds, double keepalive_ratio) + : rounds(rounds), + keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {} + + PingSessionRef find_session(ceph::net::ConnectionRef c) { + auto found = sessions.find(c); + if (found == sessions.end()) { + ceph_assert(false); + } + return found->second; + } + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::now(); + } + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { + logger().info("{}: connected to {}", *conn, conn->get_peer_addr()); + auto session = seastar::make_shared<PingSession>(); + auto [i, added] = sessions.emplace(conn, session); + std::ignore = i; + ceph_assert(added); + session->connected_time = mono_clock::now(); + return seastar::now(); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + auto session = find_session(c); + ++(session->count); + if (verbose) { + logger().info("client ms_dispatch {}", session->count); + } + + if (session->count == rounds) { + logger().info("{}: finished receiving {} pongs", *c.get(), session->count); + session->finish_time = mono_clock::now(); + return container().invoke_on_all([conn = c.get()](auto &client) { + auto found = client.pending_conns.find(conn); + ceph_assert(found != client.pending_conns.end()); + found->second.set_value(); + }); + } else { + return seastar::now(); + } + } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + return ceph::net::Messenger::create(name, lname, nonce) + .then([this](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& client) { + client.msgr = messenger->get_local_shard(); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); + } + + seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) { + mono_time start_time = mono_clock::now(); + return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) + .then([this, foreign_dispatch, start_time](auto conn) { + return seastar::futurize_apply([this, conn, foreign_dispatch] { + if (foreign_dispatch) { + return do_dispatch_pingpong(&**conn); + } else { + // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong(). + return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { + return client.do_dispatch_pingpong(conn); + }); + } + }).finally([this, conn, start_time] { + return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) { + auto session = client.find_session((*conn)->shared_from_this()); + std::chrono::duration<double> dur_handshake = session->connected_time - start_time; + std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time; + logger().info("{}: handshake {}, pingpong {}", + **conn, dur_handshake.count(), dur_pingpong.count()); + }); + }); + }); + } + + private: + seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) { + return container().invoke_on_all([conn](auto& client) { + auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); + std::ignore = i; + ceph_assert(added); + }).then([this, conn] { + return seastar::do_with(0u, 0u, + [this, conn](auto &count_ping, auto &count_keepalive) { + return seastar::do_until( + [this, conn, &count_ping, &count_keepalive] { + bool stop = (count_ping == rounds); + if (stop) { + logger().info("{}: finished sending {} pings with {} keepalives", + *conn, count_ping, count_keepalive); + } + return stop; + }, + [this, conn, &count_ping, &count_keepalive] { + return seastar::repeat([this, conn, &count_ping, &count_keepalive] { + if (keepalive_dist(rng)) { + return conn->keepalive() + .then([&count_keepalive] { + count_keepalive += 1; + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::no); + }); + } else { + return conn->send(msg_ping) + .then([&count_ping] { + count_ping += 1; + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + }); + } + }); + }).then([this, conn] { + auto found = pending_conns.find(conn); + return found->second.get_future(); + }); + }); + }); + } + }; + }; + + logger().info("test_echo():"); + return seastar::when_all_succeed( + ceph::net::create_sharded<test_state::Server>(), + ceph::net::create_sharded<test_state::Server>(), + ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio), + ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio)) + .then([rounds, keepalive_ratio](test_state::Server *server1, + test_state::Server *server2, + test_state::Client *client1, + test_state::Client *client2) { + // start servers and clients + entity_addr_t addr1; + addr1.parse("127.0.0.1:9010", nullptr); + addr1.set_type(entity_addr_t::TYPE_LEGACY); + entity_addr_t addr2; + addr2.parse("127.0.0.1:9011", nullptr); + addr2.set_type(entity_addr_t::TYPE_LEGACY); + return seastar::when_all_succeed( + server1->init(entity_name_t::OSD(0), "server1", 1, addr1), + server2->init(entity_name_t::OSD(1), "server2", 2, addr2), + client1->init(entity_name_t::OSD(2), "client1", 3), + client2->init(entity_name_t::OSD(3), "client2", 4)) + // dispatch pingpoing + .then([client1, client2, server1, server2] { + return seastar::when_all_succeed( + // test connecting in parallel, accepting in parallel, + // and operating the connection reference from a foreign/local core + client1->dispatch_pingpong(server1->msgr->get_myaddr(), true), + client1->dispatch_pingpong(server2->msgr->get_myaddr(), false), + client2->dispatch_pingpong(server1->msgr->get_myaddr(), false), + client2->dispatch_pingpong(server2->msgr->get_myaddr(), true)); + // shutdown + }).finally([client1] { + logger().info("client1 shutdown..."); + return client1->shutdown(); + }).finally([client2] { + logger().info("client2 shutdown..."); + return client2->shutdown(); + }).finally([server1] { + logger().info("server1 shutdown..."); + return server1->shutdown(); + }).finally([server2] { + logger().info("server2 shutdown..."); + return server2->shutdown(); + }); + }); +} + +static seastar::future<> test_concurrent_dispatch() +{ + struct test_state { + struct Server final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service<Server> { + ceph::net::Messenger *msgr = nullptr; + int count = 0; + seastar::promise<> on_second; // satisfied on second dispatch + seastar::promise<> on_done; // satisfied when first dispatch unblocks + + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + switch (++count) { + case 1: + // block on the first request until we reenter with the second + return on_second.get_future() + .then([this] { + return container().invoke_on_all([](Server& server) { + server.on_done.set_value(); + }); + }); + case 2: + on_second.set_value(); + return seastar::now(); + default: + throw std::runtime_error("unexpected count"); + } + } + + seastar::future<> wait() { return on_done.get_future(); } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const entity_addr_t& addr) { + return ceph::net::Messenger::create(name, lname, nonce) + .then([this, addr](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + }).then([messenger, addr] { + return messenger->bind(entity_addrvec_t{addr}); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + }; + + struct Client final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service<Client> { + ceph::net::Messenger *msgr = nullptr; + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + return ceph::net::Messenger::create(name, lname, nonce) + .then([this](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& client) { + client.msgr = messenger->get_local_shard(); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + }; + }; + + logger().info("test_concurrent_dispatch():"); + return seastar::when_all_succeed( + ceph::net::create_sharded<test_state::Server>(), + ceph::net::create_sharded<test_state::Client>()) + .then([](test_state::Server *server, + test_state::Client *client) { + entity_addr_t addr; + addr.parse("127.0.0.1:9010", nullptr); + addr.set_type(entity_addr_t::TYPE_LEGACY); + addr.set_family(AF_INET); + return seastar::when_all_succeed( + server->init(entity_name_t::OSD(4), "server3", 5, addr), + client->init(entity_name_t::OSD(5), "client3", 6)) + .then([server, client] { + return client->msgr->connect(server->msgr->get_myaddr(), + entity_name_t::TYPE_OSD); + }).then([](ceph::net::ConnectionXRef conn) { + // send two messages + (*conn)->send(MessageRef{new MPing, false}); + (*conn)->send(MessageRef{new MPing, false}); + }).then([server] { + server->wait(); + }).finally([client] { + logger().info("client shutdown..."); + return client->msgr->shutdown(); + }).finally([server] { + logger().info("server shutdown..."); + return server->msgr->shutdown(); + }); + }); +} + +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("verbose,v", bpo::value<bool>()->default_value(false), + "chatty if true") + ("rounds", bpo::value<unsigned>()->default_value(512), + "number of pingpong rounds") + ("keepalive-ratio", bpo::value<double>()->default_value(0.1), + "ratio of keepalive in ping messages"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + verbose = config["verbose"].as<bool>(); + auto rounds = config["rounds"].as<unsigned>(); + auto keepalive_ratio = config["keepalive-ratio"].as<double>(); + return test_echo(rounds, keepalive_ratio) + .then([] { + return test_concurrent_dispatch(); + }).then([] { + std::cout << "All tests succeeded" << std::endl; + }).handle_exception([] (auto eptr) { + std::cout << "Test failure" << std::endl; + return seastar::make_exception_future<>(eptr); + }); + }); +} diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc new file mode 100644 index 00000000..671aa644 --- /dev/null +++ b/src/test/crimson/test_monc.cc @@ -0,0 +1,78 @@ +#include <seastar/core/app-template.hh> +#include "common/ceph_argparse.h" +#include "crimson/common/config_proxy.h" +#include "crimson/mon/MonClient.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" + +using Config = ceph::common::ConfigProxy; +using MonClient = ceph::mon::Client; + +static seastar::future<> test_monc() +{ + return ceph::common::sharded_conf().start(EntityName{}, string_view{"ceph"}).then([] { + std::vector<const char*> args; + std::string cluster; + std::string conf_file_list; + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_CLIENT, + &cluster, + &conf_file_list); + auto& conf = ceph::common::local_conf(); + conf->name = init_params.name; + conf->cluster = cluster; + return conf.parse_config_files(conf_file_list); + }).then([] { + return ceph::common::sharded_perf_coll().start(); + }).then([] { + return ceph::net::Messenger::create(entity_name_t::OSD(0), "monc", 0, + seastar::engine().cpu_id()) + .then([] (ceph::net::Messenger *msgr) { + auto& conf = ceph::common::local_conf(); + if (conf->ms_crc_data) { + msgr->set_crc_data(); + } + if (conf->ms_crc_header) { + msgr->set_crc_header(); + } + return seastar::do_with(MonClient{*msgr}, + [msgr](auto& monc) { + return msgr->start(&monc).then([&monc] { + return seastar::with_timeout( + seastar::lowres_clock::now() + std::chrono::seconds{10}, + monc.start()); + }).then([&monc] { + return monc.stop(); + }); + }).finally([msgr] { + return msgr->shutdown(); + }); + }); + }).finally([] { + return ceph::common::sharded_perf_coll().stop().then([] { + return ceph::common::sharded_conf().stop(); + }); + }); +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + return app.run(argc, argv, [&] { + return test_monc().then([] { + std::cout << "All tests succeeded" << std::endl; + }).handle_exception([] (auto eptr) { + std::cout << "Test failure" << std::endl; + return seastar::make_exception_future<>(eptr); + }); + }); +} + + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * unittest_seastar_monc" + * End: + */ diff --git a/src/test/crimson/test_perfcounters.cc b/src/test/crimson/test_perfcounters.cc new file mode 100644 index 00000000..d5c8bb8b --- /dev/null +++ b/src/test/crimson/test_perfcounters.cc @@ -0,0 +1,62 @@ +#include <pthread.h> +#include <stdlib.h> +#include <iostream> +#include <fmt/format.h> + +#include "common/Formatter.h" +#include "common/perf_counters.h" +#include "crimson/common/perf_counters_collection.h" + +#include <seastar/core/app-template.hh> +#include <seastar/core/sharded.hh> + +enum { + PERFTEST_FIRST = 1000000, + PERFTEST_INDEX, + PERFTEST_LAST, +}; + +static constexpr uint64_t PERF_VAL = 42; + +static seastar::future<> test_perfcounters(){ + return ceph::common::sharded_perf_coll().start().then([] { + return ceph::common::sharded_perf_coll().invoke_on_all([] (auto& s){ + std::string name =fmt::format("seastar-osd::shard-{}",seastar::engine().cpu_id()); + PerfCountersBuilder plb(NULL, name, PERFTEST_FIRST,PERFTEST_LAST); + plb.add_u64_counter(PERFTEST_INDEX, "perftest_count", "count perftest"); + auto perf_logger = plb.create_perf_counters(); + perf_logger->inc(PERFTEST_INDEX,PERF_VAL); + s.get_perf_collection()->add(perf_logger); + }); + }).then([]{ + return ceph::common::sharded_perf_coll().invoke_on_all([] (auto& s){ + auto pcc = s.get_perf_collection(); + pcc->with_counters([](auto& by_path){ + for (auto& perf_counter : by_path) { + if (PERF_VAL != perf_counter.second.perf_counters->get(PERFTEST_INDEX)) { + throw std::runtime_error("perf counter does not match"); + } + } + }); + }); + }).finally([] { + return ceph::common::sharded_perf_coll().stop(); + }); + +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + return app.run(argc, argv, [&] { + return test_perfcounters().then([] { + std::cout << "All tests succeeded" << std::endl; + }).handle_exception([] (auto eptr) { + std::cout << "Test failure" << std::endl; + return seastar::make_exception_future<>(eptr); + }); + }); + +} + + diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc new file mode 100644 index 00000000..48be1ffe --- /dev/null +++ b/src/test/crimson/test_thread_pool.cc @@ -0,0 +1,49 @@ +#include <chrono> +#include <numeric> +#include <seastar/core/app-template.hh> +#include "crimson/thread/ThreadPool.h" + +using namespace std::chrono_literals; +using ThreadPool = ceph::thread::ThreadPool; + +seastar::future<> test_accumulate(ThreadPool& tp) { + static constexpr auto N = 5; + static constexpr auto M = 1; + auto slow_plus = [&tp](int i) { + return tp.submit([=] { + std::this_thread::sleep_for(10ns); + return i + M; + }); + }; + return seastar::map_reduce( + boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) { + auto r = boost::irange(0 + M, N + M); + if (sum != std::accumulate(r.begin(), r.end(), 0)) { + throw std::runtime_error("test_accumulate failed"); + } + }); +} + +int main(int argc, char** argv) +{ + ThreadPool tp{2, 128, 0}; + seastar::app_template app; + return app.run(argc, argv, [&tp] { + return tp.start().then([&tp] { + return test_accumulate(tp); + }).handle_exception([](auto e) { + std::cerr << "Error: " << e << std::endl; + seastar::engine().exit(1); + }).finally([&tp] { + return tp.stop(); + }); + }); +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * unittest_seastar_thread_pool" + * End: + */ |