diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/test/direct_messenger | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/test/direct_messenger/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/test/direct_messenger/DirectMessenger.cc | 252 | ||||
-rw-r--r-- | src/test/direct_messenger/DirectMessenger.h | 98 | ||||
-rw-r--r-- | src/test/direct_messenger/test_direct_messenger.cc | 437 |
4 files changed, 791 insertions, 0 deletions
diff --git a/src/test/direct_messenger/CMakeLists.txt b/src/test/direct_messenger/CMakeLists.txt new file mode 100644 index 00000000..2e7f2780 --- /dev/null +++ b/src/test/direct_messenger/CMakeLists.txt @@ -0,0 +1,4 @@ +# unittest_direct_messenger +#add_executable(unittest_direct_messenger test_direct_messenger.cc DirectMessenger.cc) +#add_ceph_unittest(unittest_direct_messenger) +#target_link_libraries(unittest_direct_messenger global) diff --git a/src/test/direct_messenger/DirectMessenger.cc b/src/test/direct_messenger/DirectMessenger.cc new file mode 100644 index 00000000..076f5fc3 --- /dev/null +++ b/src/test/direct_messenger/DirectMessenger.cc @@ -0,0 +1,252 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "DirectMessenger.h" +#include "msg/DispatchStrategy.h" + + +class DirectConnection : public Connection { + /// sent messages are dispatched here + DispatchStrategy *const dispatchers; + + /// the connection that will be attached to outgoing messages, so that replies + /// can be dispatched back to the sender. the pointer is atomic for + /// thread-safety between mark_down() and send_message(). no reference is held + /// on this Connection to avoid cyclical refs. we don't need a reference + /// because its owning DirectMessenger will mark both connections down (and + /// clear this pointer) before dropping its own reference + std::atomic<Connection*> reply_connection{nullptr}; + + public: + DirectConnection(CephContext *cct, DirectMessenger *m, + DispatchStrategy *dispatchers) + : Connection(cct, m), + dispatchers(dispatchers) + {} + + /// sets the Connection that will receive replies to outgoing messages + void set_direct_reply_connection(ConnectionRef conn); + + /// return true if a peer connection exists + bool is_connected() override; + + /// pass the given message directly to our dispatchers + int send_message(Message *m) override; + + /// release our pointer to the peer connection. later calls to is_connected() + /// will return false, and send_message() will fail with -ENOTCONN + void mark_down() override; + + /// noop - keepalive messages are not needed within a process + void send_keepalive() override {} + + /// noop - reconnect/recovery semantics are not needed within a process + void mark_disposable() override {} +}; + +void DirectConnection::set_direct_reply_connection(ConnectionRef conn) +{ + reply_connection.store(conn.get()); +} + +bool DirectConnection::is_connected() +{ + // true between calls to set_direct_reply_connection() and mark_down() + return reply_connection.load() != nullptr; +} + +int DirectConnection::send_message(Message *m) +{ + // read reply_connection atomically and take a reference + ConnectionRef conn = reply_connection.load(); + if (!conn) { + m->put(); + return -ENOTCONN; + } + // attach reply_connection to the Message, so that calls to + // m->get_connection()->send_message() can be dispatched back to the sender + m->set_connection(conn); + + dispatchers->ds_dispatch(m); + return 0; +} + +void DirectConnection::mark_down() +{ + Connection *conn = reply_connection.load(); + if (!conn) { + return; // already marked down + } + if (!reply_connection.compare_exchange_weak(conn, nullptr)) { + return; // lost the race to mark down + } + // called only once to avoid loops + conn->mark_down(); +} + + +static ConnectionRef create_loopback(DirectMessenger *m, + entity_name_t name, + DispatchStrategy *dispatchers) +{ + auto loopback = boost::intrusive_ptr<DirectConnection>( + new DirectConnection(m->cct, m, dispatchers)); + // loopback replies go to itself + loopback->set_direct_reply_connection(loopback); + loopback->set_peer_type(name.type()); + loopback->set_features(CEPH_FEATURES_ALL); + return loopback; +} + +DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + DispatchStrategy *dispatchers) + : SimplePolicyMessenger(cct, name, mname, nonce), + dispatchers(dispatchers), + loopback_connection(create_loopback(this, name, dispatchers)) +{ + dispatchers->set_messenger(this); +} + +DirectMessenger::~DirectMessenger() +{ +} + +int DirectMessenger::set_direct_peer(DirectMessenger *peer) +{ + if (get_myinst() == peer->get_myinst()) { + return -EADDRINUSE; // must have a different entity instance + } + peer_inst = peer->get_myinst(); + + // allocate a Connection that dispatches to the peer messenger + auto direct_connection = boost::intrusive_ptr<DirectConnection>( + new DirectConnection(cct, peer, peer->dispatchers.get())); + + direct_connection->set_peer_addr(peer_inst.addr); + direct_connection->set_peer_type(peer_inst.name.type()); + direct_connection->set_features(CEPH_FEATURES_ALL); + + // if set_direct_peer() was already called on the peer messenger, we can + // finish by attaching their connections. if not, the later call to + // peer->set_direct_peer() will attach their connection to ours + auto connection = peer->get_connection(get_myinst()); + if (connection) { + auto p = static_cast<DirectConnection*>(connection.get()); + + p->set_direct_reply_connection(direct_connection); + direct_connection->set_direct_reply_connection(p); + } + + peer_connection = std::move(direct_connection); + return 0; +} + +int DirectMessenger::bind(const entity_addr_t &bind_addr) +{ + if (peer_connection) { + return -EINVAL; // can't change address after sharing it with the peer + } + set_myaddr(bind_addr); + loopback_connection->set_peer_addr(bind_addr); + return 0; +} + +int DirectMessenger::client_bind(const entity_addr_t &bind_addr) +{ + // same as bind + return bind(bind_addr); +} + +int DirectMessenger::start() +{ + if (!peer_connection) { + return -EINVAL; // did not connect to a peer + } + if (started) { + return -EINVAL; // already started + } + + dispatchers->start(); + return SimplePolicyMessenger::start(); +} + +int DirectMessenger::shutdown() +{ + if (!started) { + return -EINVAL; // not started + } + + mark_down_all(); + peer_connection.reset(); + loopback_connection.reset(); + + dispatchers->shutdown(); + SimplePolicyMessenger::shutdown(); + sem.Put(); // signal wait() + return 0; +} + +void DirectMessenger::wait() +{ + sem.Get(); // wait on signal from shutdown() + dispatchers->wait(); +} + +ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst) +{ + if (dst == peer_inst) { + return peer_connection; + } + if (dst == get_myinst()) { + return loopback_connection; + } + return nullptr; +} + +ConnectionRef DirectMessenger::get_loopback_connection() +{ + return loopback_connection; +} + +int DirectMessenger::send_message(Message *m, const entity_inst_t& dst) +{ + auto conn = get_connection(dst); + if (!conn) { + m->put(); + return -ENOTCONN; + } + return conn->send_message(m); +} + +void DirectMessenger::mark_down(const entity_addr_t& addr) +{ + ConnectionRef conn; + if (addr == peer_inst.addr) { + conn = peer_connection; + } else if (addr == get_myaddr_legacy()) { + conn = loopback_connection; + } + if (conn) { + conn->mark_down(); + } +} + +void DirectMessenger::mark_down_all() +{ + if (peer_connection) { + peer_connection->mark_down(); + } + loopback_connection->mark_down(); +} diff --git a/src/test/direct_messenger/DirectMessenger.h b/src/test/direct_messenger/DirectMessenger.h new file mode 100644 index 00000000..710fcfb3 --- /dev/null +++ b/src/test/direct_messenger/DirectMessenger.h @@ -0,0 +1,98 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_DIRECTMESSENGER_H +#define CEPH_MSG_DIRECTMESSENGER_H + +#include "msg/SimplePolicyMessenger.h" +#include "common/Semaphore.h" + + +class DispatchStrategy; + +/** + * DirectMessenger provides a direct path between two messengers + * within a process. A pair of DirectMessengers share their + * DispatchStrategy with each other, and calls to send_message() + * forward the message directly to the other. + * + * This is for testing and i/o injection only, and cannot be used + * for normal messengers with ms_type. + */ +class DirectMessenger : public SimplePolicyMessenger { + private: + /// strategy for local dispatch + std::unique_ptr<DispatchStrategy> dispatchers; + /// peer instance for comparison in get_connection() + entity_inst_t peer_inst; + /// connection that sends to the peer's dispatchers + ConnectionRef peer_connection; + /// connection that sends to my own dispatchers + ConnectionRef loopback_connection; + /// semaphore for signalling wait() from shutdown() + Semaphore sem; + + public: + DirectMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + DispatchStrategy *dispatchers); + ~DirectMessenger(); + + /// attach to a peer messenger. must be called before start() + int set_direct_peer(DirectMessenger *peer); + + + // Messenger interface + + /// sets the addr. must not be called after set_direct_peer() or start() + int bind(const entity_addr_t& bind_addr) override; + + /// sets the addr. must not be called after set_direct_peer() or start() + int client_bind(const entity_addr_t& bind_addr) override; + + /// starts dispatchers + int start() override; + + /// breaks connections, stops dispatchers, and unblocks callers of wait() + int shutdown() override; + + /// blocks until shutdown() completes + void wait() override; + + /// returns a connection to the peer instance, a loopback connection to our + /// own instance, or null if not connected + ConnectionRef get_connection(const entity_inst_t& dst) override; + + /// returns a loopback connection that dispatches to this messenger + ConnectionRef get_loopback_connection() override; + + /// dispatches a message to the peer instance if connected + int send_message(Message *m, const entity_inst_t& dst) override; + + /// mark down the connection for the given address + void mark_down(const entity_addr_t& a) override; + + /// mark down all connections + void mark_down_all() override; + + + // unimplemented Messenger interface + void set_addr_unknowns(const entity_addr_t &addr) override {} + void set_addr(const entity_addr_t &addr) override {} + int get_dispatch_queue_len() override { return 0; } + double get_dispatch_queue_max_age(utime_t now) override { return 0; } + void set_cluster_protocol(int p) override {} +}; + +#endif diff --git a/src/test/direct_messenger/test_direct_messenger.cc b/src/test/direct_messenger/test_direct_messenger.cc new file mode 100644 index 00000000..2789e422 --- /dev/null +++ b/src/test/direct_messenger/test_direct_messenger.cc @@ -0,0 +1,437 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <condition_variable> +#include <mutex> +#include <thread> + +#include <gtest/gtest.h> + +#include "global/global_init.h" +#include "common/ceph_argparse.h" + +#include "DirectMessenger.h" +#include "msg/FastStrategy.h" +#include "msg/QueueStrategy.h" +#include "messages/MPing.h" + + +/// mock dispatcher that calls the given callback +class MockDispatcher : public Dispatcher { + std::function<void(Message*)> callback; + public: + MockDispatcher(CephContext *cct, std::function<void(Message*)> callback) + : Dispatcher(cct), callback(std::move(callback)) {} + bool ms_handle_reset(Connection *con) override { return false; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; } + bool ms_dispatch(Message *m) override { + callback(m); + m->put(); + return true; + } +}; + +/// test synchronous dispatch of messenger and connection interfaces +TEST(DirectMessenger, SyncDispatch) +{ + auto cct = g_ceph_context; + + // use FastStrategy for synchronous dispatch + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + bool got_request = false; + bool got_reply = false; + + MockDispatcher client_dispatcher(cct, [&] (Message *m) { + got_reply = true; + }); + client.add_dispatcher_head(&client_dispatcher); + + MockDispatcher server_dispatcher(cct, [&] (Message *m) { + got_request = true; + ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); + }); + server.add_dispatcher_head(&server_dispatcher); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + // test DirectMessenger::send_message() + ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); + ASSERT_TRUE(got_request); + ASSERT_TRUE(got_reply); + + // test DirectConnection::send_message() + { + got_request = false; + got_reply = false; + auto conn = client.get_connection(server.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_TRUE(got_request); + ASSERT_TRUE(got_reply); + } + + // test DirectMessenger::send_message() with loopback address + got_request = false; + got_reply = false; + ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + + // test DirectConnection::send_message() with loopback address + { + got_request = false; + got_reply = false; + auto conn = client.get_connection(client.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + } + + // test DirectConnection::send_message() with loopback connection + { + got_request = false; + got_reply = false; + auto conn = client.get_loopback_connection(); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + } + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test asynchronous dispatch of messenger and connection interfaces +TEST(DirectMessenger, AsyncDispatch) +{ + auto cct = g_ceph_context; + + // use QueueStrategy for async replies + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new QueueStrategy(1)); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + // condition variable to wait on ping reply + std::mutex mutex; + std::condition_variable cond; + bool done = false; + + auto wait_for_reply = [&] { + std::unique_lock<std::mutex> lock(mutex); + while (!done) { + cond.wait(lock); + } + done = false; // clear for reuse + }; + + // client dispatcher signals the condition variable on reply + MockDispatcher client_dispatcher(cct, [&] (Message *m) { + std::lock_guard<std::mutex> lock(mutex); + done = true; + cond.notify_one(); + }); + client.add_dispatcher_head(&client_dispatcher); + + MockDispatcher server_dispatcher(cct, [&] (Message *m) { + // hold the lock over the call to send_message() to prove that the client's + // dispatch is asynchronous. if it isn't, it will deadlock + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); + }); + server.add_dispatcher_head(&server_dispatcher); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + // test DirectMessenger::send_message() + ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); + wait_for_reply(); + + // test DirectConnection::send_message() + { + auto conn = client.get_connection(server.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + // test DirectMessenger::send_message() with loopback address + { + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); + } + wait_for_reply(); + + // test DirectConnection::send_message() with loopback address + { + auto conn = client.get_connection(client.get_myinst()); + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + // test DirectConnection::send_message() with loopback connection + { + auto conn = client.get_loopback_connection(); + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test that wait() blocks until shutdown() +TEST(DirectMessenger, WaitShutdown) +{ + auto cct = g_ceph_context; + + // test wait() with both Queue- and FastStrategy + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new QueueStrategy(1)); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + std::atomic<bool> client_waiting{false}; + std::atomic<bool> server_waiting{false}; + + // spawn threads to wait() on each of the messengers + std::thread client_thread([&] { + client_waiting = true; + client.wait(); + client_waiting = false; + }); + std::thread server_thread([&] { + server_waiting = true; + server.wait(); + server_waiting = false; + }); + + // give them time to start + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + ASSERT_TRUE(client_waiting); + ASSERT_TRUE(server_waiting); + + // call shutdown to unblock the waiting threads + ASSERT_EQ(0, client.shutdown()); + ASSERT_EQ(0, server.shutdown()); + + client_thread.join(); + server_thread.join(); + + ASSERT_FALSE(client_waiting); + ASSERT_FALSE(server_waiting); +} + +/// test connection and messenger interfaces after mark_down() +TEST(DirectMessenger, MarkDown) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + auto client_to_server = client.get_connection(server.get_myinst()); + auto server_to_client = server.get_connection(client.get_myinst()); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // mark_down() breaks the connection on both sides + client_to_server->mark_down(); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); + + ASSERT_FALSE(server_to_client->is_connected()); + ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst())); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces after shutdown() +TEST(DirectMessenger, SendShutdown) +{ + auto cct = g_ceph_context; + + // put client on the heap so we can free it early + std::unique_ptr<DirectMessenger> client{ + new DirectMessenger(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy())}; + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client->set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(client.get())); + + ASSERT_EQ(0, client->start()); + ASSERT_EQ(0, server.start()); + + const auto client_inst = client->get_myinst(); + const auto server_inst = server.get_myinst(); + + auto client_to_server = client->get_connection(server_inst); + auto server_to_client = server.get_connection(client_inst); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // shut down the client to break connections + ASSERT_EQ(0, client->shutdown()); + client->wait(); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst)); + + // free the client connection/messenger to test that calls to the server no + // longer try to dereference them + client_to_server.reset(); + client.reset(); + + ASSERT_FALSE(server_to_client->is_connected()); + ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst)); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces after bind() +TEST(DirectMessenger, Bind) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + entity_addr_t client_addr; + client_addr.set_family(AF_INET); + client_addr.set_port(1); + + // client bind succeeds before set_direct_peer() + ASSERT_EQ(0, client.bind(client_addr)); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + // server bind fails after set_direct_peer() + entity_addr_t empty_addr; + ASSERT_EQ(-EINVAL, server.bind(empty_addr)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + auto client_to_server = client.get_connection(server.get_myinst()); + auto server_to_client = server.get_connection(client.get_myinst()); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // no address in connection to server + ASSERT_EQ(empty_addr, client_to_server->get_peer_addr()); + // bind address is reflected in connection to client + ASSERT_EQ(client_addr, server_to_client->get_peer_addr()); + + // mark_down() with bind address breaks the connection + server.mark_down(client_addr); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_FALSE(server_to_client->is_connected()); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces before calls to set_direct_peer() +TEST(DirectMessenger, StartWithoutPeer) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + // can't start until set_direct_peer() + ASSERT_EQ(-EINVAL, client.start()); + ASSERT_EQ(-EINVAL, server.start()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + + // only client can start + ASSERT_EQ(0, client.start()); + ASSERT_EQ(-EINVAL, server.start()); + + // client has a connection but can't send + auto conn = client.get_connection(server.get_myinst()); + ASSERT_NE(nullptr, conn); + ASSERT_FALSE(conn->is_connected()); + ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); +} + +int main(int argc, char **argv) +{ + // command-line arguments + vector<const char*> args; + argv_to_vec(argc, (const char **)argv, args); + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY, + CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(cct.get()); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |