summaryrefslogtreecommitdiffstats
path: root/src/test/crimson/test_messenger.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/test/crimson/test_messenger.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/crimson/test_messenger.cc')
-rw-r--r--src/test/crimson/test_messenger.cc420
1 files changed, 420 insertions, 0 deletions
diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc
new file mode 100644
index 00000000..6ec23f6d
--- /dev/null
+++ b/src/test/crimson/test_messenger.cc
@@ -0,0 +1,420 @@
+#include "common/ceph_time.h"
+#include "messages/MPing.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+
+#include <map>
+#include <random>
+#include <boost/program_options.hpp>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+
+namespace bpo = boost::program_options;
+
+namespace {
+
+seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_ms);
+}
+
+static std::random_device rd;
+static std::default_random_engine rng{rd()};
+static bool verbose = false;
+
+static seastar::future<> test_echo(unsigned rounds,
+ double keepalive_ratio)
+{
+ struct test_state {
+ struct Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+ MessageRef msg_pong{new MPing(), false};
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ if (verbose) {
+ logger().info("server got {}", *m);
+ }
+ // reply with a pong
+ return c->send(msg_pong);
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ auto&& fut = ceph::net::Messenger::create(name, lname, nonce);
+ return fut.then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ }
+ };
+
+ struct Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+
+ struct PingSession : public seastar::enable_shared_from_this<PingSession> {
+ unsigned count = 0u;
+ mono_time connected_time;
+ mono_time finish_time;
+ };
+ using PingSessionRef = seastar::shared_ptr<PingSession>;
+
+ unsigned rounds;
+ std::bernoulli_distribution keepalive_dist;
+ ceph::net::Messenger *msgr = nullptr;
+ std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
+ std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+ MessageRef msg_ping{new MPing(), false};
+
+ Client(unsigned rounds, double keepalive_ratio)
+ : rounds(rounds),
+ keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
+
+ PingSessionRef find_session(ceph::net::ConnectionRef c) {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ return found->second;
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::now();
+ }
+ seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
+ logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
+ auto session = seastar::make_shared<PingSession>();
+ auto [i, added] = sessions.emplace(conn, session);
+ std::ignore = i;
+ ceph_assert(added);
+ session->connected_time = mono_clock::now();
+ return seastar::now();
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ auto session = find_session(c);
+ ++(session->count);
+ if (verbose) {
+ logger().info("client ms_dispatch {}", session->count);
+ }
+
+ if (session->count == rounds) {
+ logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+ session->finish_time = mono_clock::now();
+ return container().invoke_on_all([conn = c.get()](auto &client) {
+ auto found = client.pending_conns.find(conn);
+ ceph_assert(found != client.pending_conns.end());
+ found->second.set_value();
+ });
+ } else {
+ return seastar::now();
+ }
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ }
+
+ seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
+ mono_time start_time = mono_clock::now();
+ return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
+ .then([this, foreign_dispatch, start_time](auto conn) {
+ return seastar::futurize_apply([this, conn, foreign_dispatch] {
+ if (foreign_dispatch) {
+ return do_dispatch_pingpong(&**conn);
+ } else {
+ // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
+ return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
+ return client.do_dispatch_pingpong(conn);
+ });
+ }
+ }).finally([this, conn, start_time] {
+ return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
+ auto session = client.find_session((*conn)->shared_from_this());
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ **conn, dur_handshake.count(), dur_pingpong.count());
+ });
+ });
+ });
+ }
+
+ private:
+ seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) {
+ return container().invoke_on_all([conn](auto& client) {
+ auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
+ std::ignore = i;
+ ceph_assert(added);
+ }).then([this, conn] {
+ return seastar::do_with(0u, 0u,
+ [this, conn](auto &count_ping, auto &count_keepalive) {
+ return seastar::do_until(
+ [this, conn, &count_ping, &count_keepalive] {
+ bool stop = (count_ping == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} pings with {} keepalives",
+ *conn, count_ping, count_keepalive);
+ }
+ return stop;
+ },
+ [this, conn, &count_ping, &count_keepalive] {
+ return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+ if (keepalive_dist(rng)) {
+ return conn->keepalive()
+ .then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return conn->send(msg_ping)
+ .then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }).then([this, conn] {
+ auto found = pending_conns.find(conn);
+ return found->second.get_future();
+ });
+ });
+ });
+ }
+ };
+ };
+
+ logger().info("test_echo():");
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
+ ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
+ .then([rounds, keepalive_ratio](test_state::Server *server1,
+ test_state::Server *server2,
+ test_state::Client *client1,
+ test_state::Client *client2) {
+ // start servers and clients
+ entity_addr_t addr1;
+ addr1.parse("127.0.0.1:9010", nullptr);
+ addr1.set_type(entity_addr_t::TYPE_LEGACY);
+ entity_addr_t addr2;
+ addr2.parse("127.0.0.1:9011", nullptr);
+ addr2.set_type(entity_addr_t::TYPE_LEGACY);
+ return seastar::when_all_succeed(
+ server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+ server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+ client1->init(entity_name_t::OSD(2), "client1", 3),
+ client2->init(entity_name_t::OSD(3), "client2", 4))
+ // dispatch pingpoing
+ .then([client1, client2, server1, server2] {
+ return seastar::when_all_succeed(
+ // test connecting in parallel, accepting in parallel,
+ // and operating the connection reference from a foreign/local core
+ client1->dispatch_pingpong(server1->msgr->get_myaddr(), true),
+ client1->dispatch_pingpong(server2->msgr->get_myaddr(), false),
+ client2->dispatch_pingpong(server1->msgr->get_myaddr(), false),
+ client2->dispatch_pingpong(server2->msgr->get_myaddr(), true));
+ // shutdown
+ }).finally([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).finally([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).finally([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).finally([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
+ });
+ });
+}
+
+static seastar::future<> test_concurrent_dispatch()
+{
+ struct test_state {
+ struct Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+ int count = 0;
+ seastar::promise<> on_second; // satisfied on second dispatch
+ seastar::promise<> on_done; // satisfied when first dispatch unblocks
+
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ switch (++count) {
+ case 1:
+ // block on the first request until we reenter with the second
+ return on_second.get_future()
+ .then([this] {
+ return container().invoke_on_all([](Server& server) {
+ server.on_done.set_value();
+ });
+ });
+ case 2:
+ on_second.set_value();
+ return seastar::now();
+ default:
+ throw std::runtime_error("unexpected count");
+ }
+ }
+
+ seastar::future<> wait() { return on_done.get_future(); }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ };
+
+ struct Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+ ceph::net::Messenger *msgr = nullptr;
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ };
+ };
+
+ logger().info("test_concurrent_dispatch():");
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>())
+ .then([](test_state::Server *server,
+ test_state::Client *client) {
+ entity_addr_t addr;
+ addr.parse("127.0.0.1:9010", nullptr);
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ addr.set_family(AF_INET);
+ return seastar::when_all_succeed(
+ server->init(entity_name_t::OSD(4), "server3", 5, addr),
+ client->init(entity_name_t::OSD(5), "client3", 6))
+ .then([server, client] {
+ return client->msgr->connect(server->msgr->get_myaddr(),
+ entity_name_t::TYPE_OSD);
+ }).then([](ceph::net::ConnectionXRef conn) {
+ // send two messages
+ (*conn)->send(MessageRef{new MPing, false});
+ (*conn)->send(MessageRef{new MPing, false});
+ }).then([server] {
+ server->wait();
+ }).finally([client] {
+ logger().info("client shutdown...");
+ return client->msgr->shutdown();
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->msgr->shutdown();
+ });
+ });
+}
+
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ app.add_options()
+ ("verbose,v", bpo::value<bool>()->default_value(false),
+ "chatty if true")
+ ("rounds", bpo::value<unsigned>()->default_value(512),
+ "number of pingpong rounds")
+ ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
+ "ratio of keepalive in ping messages");
+ return app.run(argc, argv, [&app] {
+ auto&& config = app.configuration();
+ verbose = config["verbose"].as<bool>();
+ auto rounds = config["rounds"].as<unsigned>();
+ auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+ return test_echo(rounds, keepalive_ratio)
+ .then([] {
+ return test_concurrent_dispatch();
+ }).then([] {
+ std::cout << "All tests succeeded" << std::endl;
+ }).handle_exception([] (auto eptr) {
+ std::cout << "Test failure" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}