summaryrefslogtreecommitdiffstats
path: root/src/test/direct_messenger/test_direct_messenger.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/direct_messenger/test_direct_messenger.cc')
-rw-r--r--src/test/direct_messenger/test_direct_messenger.cc437
1 files changed, 437 insertions, 0 deletions
diff --git a/src/test/direct_messenger/test_direct_messenger.cc b/src/test/direct_messenger/test_direct_messenger.cc
new file mode 100644
index 00000000..2789e422
--- /dev/null
+++ b/src/test/direct_messenger/test_direct_messenger.cc
@@ -0,0 +1,437 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+
+#include "DirectMessenger.h"
+#include "msg/FastStrategy.h"
+#include "msg/QueueStrategy.h"
+#include "messages/MPing.h"
+
+
+/// mock dispatcher that calls the given callback
+class MockDispatcher : public Dispatcher {
+ std::function<void(Message*)> callback;
+ public:
+ MockDispatcher(CephContext *cct, std::function<void(Message*)> callback)
+ : Dispatcher(cct), callback(std::move(callback)) {}
+ bool ms_handle_reset(Connection *con) override { return false; }
+ void ms_handle_remote_reset(Connection *con) override {}
+ bool ms_handle_refused(Connection *con) override { return false; }
+ bool ms_dispatch(Message *m) override {
+ callback(m);
+ m->put();
+ return true;
+ }
+};
+
+/// test synchronous dispatch of messenger and connection interfaces
+TEST(DirectMessenger, SyncDispatch)
+{
+ auto cct = g_ceph_context;
+
+ // use FastStrategy for synchronous dispatch
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ bool got_request = false;
+ bool got_reply = false;
+
+ MockDispatcher client_dispatcher(cct, [&] (Message *m) {
+ got_reply = true;
+ });
+ client.add_dispatcher_head(&client_dispatcher);
+
+ MockDispatcher server_dispatcher(cct, [&] (Message *m) {
+ got_request = true;
+ ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
+ });
+ server.add_dispatcher_head(&server_dispatcher);
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ // test DirectMessenger::send_message()
+ ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
+ ASSERT_TRUE(got_request);
+ ASSERT_TRUE(got_reply);
+
+ // test DirectConnection::send_message()
+ {
+ got_request = false;
+ got_reply = false;
+ auto conn = client.get_connection(server.get_myinst());
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ ASSERT_TRUE(got_request);
+ ASSERT_TRUE(got_reply);
+ }
+
+ // test DirectMessenger::send_message() with loopback address
+ got_request = false;
+ got_reply = false;
+ ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
+ ASSERT_FALSE(got_request); // server should never see this
+ ASSERT_TRUE(got_reply);
+
+ // test DirectConnection::send_message() with loopback address
+ {
+ got_request = false;
+ got_reply = false;
+ auto conn = client.get_connection(client.get_myinst());
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ ASSERT_FALSE(got_request); // server should never see this
+ ASSERT_TRUE(got_reply);
+ }
+
+ // test DirectConnection::send_message() with loopback connection
+ {
+ got_request = false;
+ got_reply = false;
+ auto conn = client.get_loopback_connection();
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ ASSERT_FALSE(got_request); // server should never see this
+ ASSERT_TRUE(got_reply);
+ }
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test asynchronous dispatch of messenger and connection interfaces
+TEST(DirectMessenger, AsyncDispatch)
+{
+ auto cct = g_ceph_context;
+
+ // use QueueStrategy for async replies
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new QueueStrategy(1));
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ // condition variable to wait on ping reply
+ std::mutex mutex;
+ std::condition_variable cond;
+ bool done = false;
+
+ auto wait_for_reply = [&] {
+ std::unique_lock<std::mutex> lock(mutex);
+ while (!done) {
+ cond.wait(lock);
+ }
+ done = false; // clear for reuse
+ };
+
+ // client dispatcher signals the condition variable on reply
+ MockDispatcher client_dispatcher(cct, [&] (Message *m) {
+ std::lock_guard<std::mutex> lock(mutex);
+ done = true;
+ cond.notify_one();
+ });
+ client.add_dispatcher_head(&client_dispatcher);
+
+ MockDispatcher server_dispatcher(cct, [&] (Message *m) {
+ // hold the lock over the call to send_message() to prove that the client's
+ // dispatch is asynchronous. if it isn't, it will deadlock
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
+ });
+ server.add_dispatcher_head(&server_dispatcher);
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ // test DirectMessenger::send_message()
+ ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
+ wait_for_reply();
+
+ // test DirectConnection::send_message()
+ {
+ auto conn = client.get_connection(server.get_myinst());
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ }
+ wait_for_reply();
+
+ // test DirectMessenger::send_message() with loopback address
+ {
+ // hold the lock to test that loopback dispatch is asynchronous
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
+ }
+ wait_for_reply();
+
+ // test DirectConnection::send_message() with loopback address
+ {
+ auto conn = client.get_connection(client.get_myinst());
+ // hold the lock to test that loopback dispatch is asynchronous
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ }
+ wait_for_reply();
+
+ // test DirectConnection::send_message() with loopback connection
+ {
+ auto conn = client.get_loopback_connection();
+ // hold the lock to test that loopback dispatch is asynchronous
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ }
+ wait_for_reply();
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test that wait() blocks until shutdown()
+TEST(DirectMessenger, WaitShutdown)
+{
+ auto cct = g_ceph_context;
+
+ // test wait() with both Queue- and FastStrategy
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new QueueStrategy(1));
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ std::atomic<bool> client_waiting{false};
+ std::atomic<bool> server_waiting{false};
+
+ // spawn threads to wait() on each of the messengers
+ std::thread client_thread([&] {
+ client_waiting = true;
+ client.wait();
+ client_waiting = false;
+ });
+ std::thread server_thread([&] {
+ server_waiting = true;
+ server.wait();
+ server_waiting = false;
+ });
+
+ // give them time to start
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+
+ ASSERT_TRUE(client_waiting);
+ ASSERT_TRUE(server_waiting);
+
+ // call shutdown to unblock the waiting threads
+ ASSERT_EQ(0, client.shutdown());
+ ASSERT_EQ(0, server.shutdown());
+
+ client_thread.join();
+ server_thread.join();
+
+ ASSERT_FALSE(client_waiting);
+ ASSERT_FALSE(server_waiting);
+}
+
+/// test connection and messenger interfaces after mark_down()
+TEST(DirectMessenger, MarkDown)
+{
+ auto cct = g_ceph_context;
+
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ auto client_to_server = client.get_connection(server.get_myinst());
+ auto server_to_client = server.get_connection(client.get_myinst());
+
+ ASSERT_TRUE(client_to_server->is_connected());
+ ASSERT_TRUE(server_to_client->is_connected());
+
+ // mark_down() breaks the connection on both sides
+ client_to_server->mark_down();
+
+ ASSERT_FALSE(client_to_server->is_connected());
+ ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
+
+ ASSERT_FALSE(server_to_client->is_connected());
+ ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst()));
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test connection and messenger interfaces after shutdown()
+TEST(DirectMessenger, SendShutdown)
+{
+ auto cct = g_ceph_context;
+
+ // put client on the heap so we can free it early
+ std::unique_ptr<DirectMessenger> client{
+ new DirectMessenger(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy())};
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client->set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(client.get()));
+
+ ASSERT_EQ(0, client->start());
+ ASSERT_EQ(0, server.start());
+
+ const auto client_inst = client->get_myinst();
+ const auto server_inst = server.get_myinst();
+
+ auto client_to_server = client->get_connection(server_inst);
+ auto server_to_client = server.get_connection(client_inst);
+
+ ASSERT_TRUE(client_to_server->is_connected());
+ ASSERT_TRUE(server_to_client->is_connected());
+
+ // shut down the client to break connections
+ ASSERT_EQ(0, client->shutdown());
+ client->wait();
+
+ ASSERT_FALSE(client_to_server->is_connected());
+ ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst));
+
+ // free the client connection/messenger to test that calls to the server no
+ // longer try to dereference them
+ client_to_server.reset();
+ client.reset();
+
+ ASSERT_FALSE(server_to_client->is_connected());
+ ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst));
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test connection and messenger interfaces after bind()
+TEST(DirectMessenger, Bind)
+{
+ auto cct = g_ceph_context;
+
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ entity_addr_t client_addr;
+ client_addr.set_family(AF_INET);
+ client_addr.set_port(1);
+
+ // client bind succeeds before set_direct_peer()
+ ASSERT_EQ(0, client.bind(client_addr));
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ // server bind fails after set_direct_peer()
+ entity_addr_t empty_addr;
+ ASSERT_EQ(-EINVAL, server.bind(empty_addr));
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ auto client_to_server = client.get_connection(server.get_myinst());
+ auto server_to_client = server.get_connection(client.get_myinst());
+
+ ASSERT_TRUE(client_to_server->is_connected());
+ ASSERT_TRUE(server_to_client->is_connected());
+
+ // no address in connection to server
+ ASSERT_EQ(empty_addr, client_to_server->get_peer_addr());
+ // bind address is reflected in connection to client
+ ASSERT_EQ(client_addr, server_to_client->get_peer_addr());
+
+ // mark_down() with bind address breaks the connection
+ server.mark_down(client_addr);
+
+ ASSERT_FALSE(client_to_server->is_connected());
+ ASSERT_FALSE(server_to_client->is_connected());
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test connection and messenger interfaces before calls to set_direct_peer()
+TEST(DirectMessenger, StartWithoutPeer)
+{
+ auto cct = g_ceph_context;
+
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ // can't start until set_direct_peer()
+ ASSERT_EQ(-EINVAL, client.start());
+ ASSERT_EQ(-EINVAL, server.start());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+
+ // only client can start
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(-EINVAL, server.start());
+
+ // client has a connection but can't send
+ auto conn = client.get_connection(server.get_myinst());
+ ASSERT_NE(nullptr, conn);
+ ASSERT_FALSE(conn->is_connected());
+ ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+}
+
+int main(int argc, char **argv)
+{
+ // command-line arguments
+ vector<const char*> args;
+ argv_to_vec(argc, (const char **)argv, args);
+
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY,
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(cct.get());
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}