// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2014 UnitedStack * * Author: Haomai Wang * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include #include #include #include #include #include #include #include #include #include #define MSG_POLICY_UNIT_TESTING #include "common/ceph_argparse.h" #include "common/ceph_mutex.h" #include "global/global_init.h" #include "messages/MCommand.h" #include "messages/MPing.h" #include "msg/Connection.h" #include "msg/Dispatcher.h" #include "msg/Message.h" #include "msg/Messenger.h" #include "msg/msg_types.h" typedef boost::mt11213b gen_type; #include "common/dout.h" #include "include/ceph_assert.h" #include "auth/DummyAuth.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << " ceph_test_msgr " #define CHECK_AND_WAIT_TRUE(expr) do { \ int n = 1000; \ while (--n) { \ if (expr) \ break; \ usleep(1000); \ } \ } while(0); using namespace std; class MessengerTest : public ::testing::TestWithParam { public: DummyAuthClientServer dummy_auth; Messenger *server_msgr; Messenger *client_msgr; MessengerTest() : dummy_auth(g_ceph_context), server_msgr(NULL), client_msgr(NULL) { dummy_auth.auth_registry.refresh_config(); } void SetUp() override { lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl; server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid()); server_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); client_msgr->set_default_policy(Messenger::Policy::lossy_client(0)); server_msgr->set_auth_client(&dummy_auth); server_msgr->set_auth_server(&dummy_auth); client_msgr->set_auth_client(&dummy_auth); client_msgr->set_auth_server(&dummy_auth); server_msgr->set_require_authorizer(false); } void TearDown() override { ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0); ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0); delete server_msgr; delete client_msgr; } }; class FakeDispatcher : public Dispatcher { public: struct Session : public RefCountedObject { atomic count; ConnectionRef con; explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) { } uint64_t get_count() { return count; } }; ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock"); ceph::condition_variable cond; bool is_server; bool got_new; bool got_remote_reset; bool got_connect; bool loopback; entity_addrvec_t last_accept; ConnectionRef *last_accept_con_ptr = nullptr; explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), is_server(s), got_new(false), got_remote_reset(false), got_connect(false), loopback(false) { } bool ms_can_fast_dispatch_any() const override { return true; } bool ms_can_fast_dispatch(const Message *m) const override { switch (m->get_type()) { case CEPH_MSG_PING: return true; default: return false; } } void ms_handle_fast_connect(Connection *con) override { std::scoped_lock l{lock}; lderr(g_ceph_context) << __func__ << " " << con << dendl; auto s = con->get_priv(); if (!s) { auto session = new Session(con); con->set_priv(RefCountedPtr{session, false}); lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << session->count << dendl; } got_connect = true; cond.notify_all(); } void ms_handle_fast_accept(Connection *con) override { last_accept = con->get_peer_addrs(); if (last_accept_con_ptr) { *last_accept_con_ptr = con; } if (!con->get_priv()) { con->set_priv(RefCountedPtr{new Session(con), false}); } } bool ms_dispatch(Message *m) override { auto priv = m->get_connection()->get_priv(); auto s = static_cast(priv.get()); if (!s) { s = new Session(m->get_connection()); priv.reset(s, false); m->get_connection()->set_priv(priv); } s->count++; lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; if (is_server) { reply_message(m); } std::lock_guard l{lock}; got_new = true; cond.notify_all(); m->put(); return true; } bool ms_handle_reset(Connection *con) override { std::lock_guard l{lock}; lderr(g_ceph_context) << __func__ << " " << con << dendl; auto priv = con->get_priv(); if (auto s = static_cast(priv.get()); s) { s->con.reset(); // break con <-> session ref cycle con->set_priv(nullptr); // break ref <-> session cycle, if any } return true; } void ms_handle_remote_reset(Connection *con) override { std::lock_guard l{lock}; lderr(g_ceph_context) << __func__ << " " << con << dendl; auto priv = con->get_priv(); if (auto s = static_cast(priv.get()); s) { s->con.reset(); // break con <-> session ref cycle con->set_priv(nullptr); // break ref <-> session cycle, if any } got_remote_reset = true; cond.notify_all(); } bool ms_handle_refused(Connection *con) override { return false; } void ms_fast_dispatch(Message *m) override { auto priv = m->get_connection()->get_priv(); auto s = static_cast(priv.get()); if (!s) { s = new Session(m->get_connection()); priv.reset(s, false); m->get_connection()->set_priv(priv); } s->count++; lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; if (is_server) { if (loopback) ceph_assert(m->get_source().is_osd()); else reply_message(m); } else if (loopback) { ceph_assert(m->get_source().is_client()); } m->put(); std::lock_guard l{lock}; got_new = true; cond.notify_all(); } int ms_handle_fast_authentication(Connection *con) override { return 1; } void reply_message(Message *m) { MPing *rm = new MPing(); m->get_connection()->send_message(rm); } }; typedef FakeDispatcher::Session Session; struct TestInterceptor : public Interceptor { bool step_waiting = false; bool waiting = true; std::map current_step; std::map> step_history; std::map> decisions; std::set breakpoints; uint32_t count_step(Connection *conn, uint32_t step) { uint32_t count = 0; for (auto s : step_history[conn]) { if (s == step) { count++; } } return count; } void breakpoint(uint32_t step) { breakpoints.insert(step); } void remove_bp(uint32_t step) { breakpoints.erase(step); } Connection *wait(uint32_t step, Connection *conn=nullptr) { std::unique_lock l(lock); while(true) { if (conn) { auto it = current_step.find(conn); if (it != current_step.end()) { if (it->second == step) { break; } } } else { for (auto it : current_step) { if (it.second == step) { conn = it.first; break; } } if (conn) { break; } } step_waiting = true; cond_var.wait(l); } step_waiting = false; return conn; } ACTION wait_for_decision(uint32_t step, std::unique_lock &l) { if (decisions[step]) { return *(decisions[step]); } waiting = true; cond_var.wait(l, [this] { return !waiting; }); return *(decisions[step]); } void proceed(uint32_t step, ACTION decision) { std::unique_lock l(lock); decisions[step] = decision; if (waiting) { waiting = false; cond_var.notify_one(); } } ACTION intercept(Connection *conn, uint32_t step) override { lderr(g_ceph_context) << __func__ << " conn(" << conn << ") intercept called on step=" << step << dendl; { std::unique_lock l(lock); step_history[conn].push_back(step); current_step[conn] = step; if (step_waiting) { cond_var.notify_one(); } } std::unique_lock l(lock); ACTION decision = ACTION::CONTINUE; if (breakpoints.find(step) != breakpoints.end()) { lderr(g_ceph_context) << __func__ << " conn(" << conn << ") pausing on step=" << step << dendl; decision = wait_for_decision(step, l); } else { if (decisions[step]) { decision = *(decisions[step]); } } lderr(g_ceph_context) << __func__ << " conn(" << conn << ") resuming step=" << step << " with decision=" << decision << dendl; decisions[step].reset(); return decision; } }; /** * Scenario: A connects to B, and B connects to A at the same time. */ TEST_P(MessengerTest, ConnectionRaceTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); TestInterceptor *cli_interceptor = new TestInterceptor(); TestInterceptor *srv_interceptor = new TestInterceptor(); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0)); server_msgr->interceptor = srv_interceptor; client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0)); client_msgr->interceptor = cli_interceptor; entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1:3300"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); bind_addr.parse("v2:127.0.0.1:3301"); client_msgr->bind(bind_addr); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // pause before sending client_ident message cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); // pause before sending client_ident message srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); MPing *m1 = new MPing(); ASSERT_EQ(c2s->send_message(m1), 0); ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), client_msgr->get_myaddrs()); MPing *m2 = new MPing(); ASSERT_EQ(s2c->send_message(m2), 0); cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get()); srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get()); // at this point both connections (A->B, B->A) are paused just before sending // the client_ident message. cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } { std::unique_lock l{srv_dispatcher.lock}; srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); srv_dispatcher.got_new = false; } ASSERT_TRUE(s2c->is_connected()); ASSERT_EQ(1u, static_cast(s2c->get_priv().get())->get_count()); ASSERT_TRUE(s2c->peer_is_client()); ASSERT_TRUE(c2s->is_connected()); ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); ASSERT_TRUE(c2s->peer_is_osd()); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); delete cli_interceptor; delete srv_interceptor; } /** * Scenario: A connects to B, and B connects to A at the same time. * The first (A -> B) connection gets to message flow handshake, the * second (B -> A) connection is stuck waiting for a banner from A. * After A sends client_ident to B, the first connection wins and B * calls reuse_connection() to replace the second connection's socket * while the second connection is still in BANNER_CONNECTING. */ TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); auto cli_interceptor = std::make_unique(); auto srv_interceptor = std::make_unique(); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0)); server_msgr->interceptor = srv_interceptor.get(); client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0)); client_msgr->interceptor = cli_interceptor.get(); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1:3300"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); bind_addr.parse("v2:127.0.0.1:3301"); client_msgr->bind(bind_addr); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // pause before sending client_ident message srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), client_msgr->get_myaddrs()); MPing *m1 = new MPing(); ASSERT_EQ(s2c->send_message(m1), 0); srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY); srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); // pause before sending banner cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); MPing *m2 = new MPing(); ASSERT_EQ(c2s->send_message(m2), 0); cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); // second connection is in BANNER_CONNECTING, ensure it stays so // and send client_ident srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE); srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); // handle client_ident -- triggers reuse_connection() with exproto // in BANNER_CONNECTING cli_interceptor->breakpoint(Interceptor::STEP::READY); cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE); cli_interceptor->wait(Interceptor::STEP::READY); cli_interceptor->remove_bp(Interceptor::STEP::READY); // first connection is in READY Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE); srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE); srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE); cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } { std::unique_lock l{srv_dispatcher.lock}; srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); srv_dispatcher.got_new = false; } EXPECT_TRUE(s2c->is_connected()); EXPECT_EQ(1u, static_cast(s2c->get_priv().get())->get_count()); EXPECT_TRUE(s2c->peer_is_client()); EXPECT_TRUE(c2s->is_connected()); EXPECT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); EXPECT_TRUE(c2s->peer_is_osd()); // closed in reuse_connection() -- EPIPE when writing banner/hello EXPECT_FALSE(s2c_accepter->is_connected()); // established exactly once, never faulted and reconnected EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u); EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u); EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); } /** * Scenario: * - A connects to B * - A sends client_ident to B * - B fails before sending server_ident to A * - A reconnects */ TEST_P(MessengerTest, MissingServerIdenTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); TestInterceptor *cli_interceptor = new TestInterceptor(); TestInterceptor *srv_interceptor = new TestInterceptor(); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); server_msgr->interceptor = srv_interceptor; client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0)); client_msgr->interceptor = cli_interceptor; entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1:3300"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); bind_addr.parse("v2:127.0.0.1:3301"); client_msgr->bind(bind_addr); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // pause before sending server_ident message srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); MPing *m1 = new MPing(); ASSERT_EQ(c2s->send_message(m1), 0); Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); // We inject a message from this side of the connection to force it to be // in standby when we inject the failure below MPing *m2 = new MPing(); ASSERT_EQ(c2s_accepter->send_message(m2), 0); srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); { std::unique_lock l{srv_dispatcher.lock}; srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); srv_dispatcher.got_new = false; } { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(c2s->is_connected()); ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); ASSERT_TRUE(c2s->peer_is_osd()); ASSERT_TRUE(c2s_accepter->is_connected()); ASSERT_EQ(1u, static_cast(c2s_accepter->get_priv().get())->get_count()); ASSERT_TRUE(c2s_accepter->peer_is_client()); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); delete cli_interceptor; delete srv_interceptor; } /** * Scenario: * - A connects to B * - A sends client_ident to B * - B fails before sending server_ident to A * - A goes to standby * - B reconnects to A */ TEST_P(MessengerTest, MissingServerIdenTest2) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); TestInterceptor *cli_interceptor = new TestInterceptor(); TestInterceptor *srv_interceptor = new TestInterceptor(); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); server_msgr->interceptor = srv_interceptor; client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); client_msgr->interceptor = cli_interceptor; entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1:3300"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); bind_addr.parse("v2:127.0.0.1:3301"); client_msgr->bind(bind_addr); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // pause before sending server_ident message srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); // We inject a message from this side of the connection to force it to be // in standby when we inject the failure below MPing *m2 = new MPing(); ASSERT_EQ(c2s_accepter->send_message(m2), 0); srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(c2s->is_connected()); ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); ASSERT_TRUE(c2s->peer_is_osd()); ASSERT_TRUE(c2s_accepter->is_connected()); ASSERT_EQ(0u, static_cast(c2s_accepter->get_priv().get())->get_count()); ASSERT_TRUE(c2s_accepter->peer_is_client()); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); delete cli_interceptor; delete srv_interceptor; } /** * Scenario: * - A connects to B * - A and B exchange messages * - A fails * - B goes into standby * - A reconnects */ TEST_P(MessengerTest, ReconnectTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); TestInterceptor *cli_interceptor = new TestInterceptor(); TestInterceptor *srv_interceptor = new TestInterceptor(); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); server_msgr->interceptor = srv_interceptor; client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); client_msgr->interceptor = cli_interceptor; entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1:3300"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); bind_addr.parse("v2:127.0.0.1:3301"); client_msgr->bind(bind_addr); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); MPing *m1 = new MPing(); ASSERT_EQ(c2s->send_message(m1), 0); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(c2s->is_connected()); ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); ASSERT_TRUE(c2s->peer_is_osd()); cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); MPing *m2 = new MPing(); ASSERT_EQ(c2s->send_message(m2), 0); cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); // at this point client and server are connected together srv_interceptor->breakpoint(Interceptor::STEP::READY); // failing client cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); MPing *m3 = new MPing(); ASSERT_EQ(c2s->send_message(m3), 0); Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); // the srv end of theconnection is now paused at ready // this means that the reconnect was successful srv_interceptor->remove_bp(Interceptor::STEP::READY); ASSERT_TRUE(c2s_accepter->peer_is_client()); // c2s_accepter sent 0 reconnect messages ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); // c2s_accepter sent 1 reconnect_ok messages ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u); // c2s sent 1 reconnect messages ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); // c2s sent 0 reconnect_ok messages ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u); srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); delete cli_interceptor; delete srv_interceptor; } /** * Scenario: * - A connects to B * - A and B exchange messages * - A fails * - A reconnects // B reconnects */ TEST_P(MessengerTest, ReconnectRaceTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); TestInterceptor *cli_interceptor = new TestInterceptor(); TestInterceptor *srv_interceptor = new TestInterceptor(); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); server_msgr->interceptor = srv_interceptor; client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); client_msgr->interceptor = cli_interceptor; entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1:3300"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); bind_addr.parse("v2:127.0.0.1:3301"); client_msgr->bind(bind_addr); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); MPing *m1 = new MPing(); ASSERT_EQ(c2s->send_message(m1), 0); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(c2s->is_connected()); ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); ASSERT_TRUE(c2s->peer_is_osd()); cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); MPing *m2 = new MPing(); ASSERT_EQ(c2s->send_message(m2), 0); cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); // at this point client and server are connected together // force both client and server to race on reconnect cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); // failing client // this will cause both client and server to reconnect at the same time cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); MPing *m3 = new MPing(); ASSERT_EQ(c2s->send_message(m3), 0); cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get()); srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT); cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); // pause on "ready" srv_interceptor->breakpoint(Interceptor::STEP::READY); cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); // the server has reconnected and is "ready" srv_interceptor->remove_bp(Interceptor::STEP::READY); ASSERT_TRUE(c2s_accepter->peer_is_client()); ASSERT_TRUE(c2s->peer_is_osd()); // the server should win the reconnect race // c2s_accepter sent 1 or 2 reconnect messages ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u); ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); // c2s_accepter sent 0 reconnect_ok messages ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u); // c2s sent 1 reconnect messages ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); // c2s sent 1 reconnect_ok messages ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u); if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) { // if the server send the reconnect message two times then // the client must have sent a session retry message to the server ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u); } else { ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u); } srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); delete cli_interceptor; delete srv_interceptor; } TEST_P(MessengerTest, SimpleTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. simple round trip MPing *m = new MPing(); ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); ASSERT_EQ(1u, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->peer_is_osd()); // 2. test rebind port set avoid_ports; for (int i = 0; i < 10 ; i++) { for (auto a : server_msgr->get_myaddrs().v) { avoid_ports.insert(a.get_port() + i); } } server_msgr->rebind(avoid_ports); for (auto a : server_msgr->get_myaddrs().v) { ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); } conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); // 3. test markdown connection conn->mark_down(); ASSERT_FALSE(conn->is_connected()); // 4. test failed connection server_msgr->shutdown(); server_msgr->wait(); m = new MPing(); conn->send_message(m); CHECK_AND_WAIT_TRUE(!conn->is_connected()); ASSERT_FALSE(conn->is_connected()); // 5. loopback connection srv_dispatcher.loopback = true; conn = client_msgr->get_loopback_connection(); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } srv_dispatcher.loopback = false; ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); } TEST_P(MessengerTest, SimpleMsgr2Test) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t legacy_addr; legacy_addr.parse("v1:127.0.0.1"); entity_addr_t msgr2_addr; msgr2_addr.parse("v2:127.0.0.1"); entity_addrvec_t bind_addrs; bind_addrs.v.push_back(legacy_addr); bind_addrs.v.push_back(msgr2_addr); server_msgr->bindv(bind_addrs); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. simple round trip MPing *m = new MPing(); ConnectionRef conn = client_msgr->connect_to( server_msgr->get_mytype(), server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); ASSERT_EQ(1u, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->peer_is_osd()); // 2. test rebind port set avoid_ports; for (int i = 0; i < 10 ; i++) { for (auto a : server_msgr->get_myaddrs().v) { avoid_ports.insert(a.get_port() + i); } } server_msgr->rebind(avoid_ports); for (auto a : server_msgr->get_myaddrs().v) { ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); } conn = client_msgr->connect_to( server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); // 3. test markdown connection conn->mark_down(); ASSERT_FALSE(conn->is_connected()); // 4. test failed connection server_msgr->shutdown(); server_msgr->wait(); m = new MPing(); conn->send_message(m); CHECK_AND_WAIT_TRUE(!conn->is_connected()); ASSERT_FALSE(conn->is_connected()); // 5. loopback connection srv_dispatcher.loopback = true; conn = client_msgr->get_loopback_connection(); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } srv_dispatcher.loopback = false; ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); server_msgr->wait(); } TEST_P(MessengerTest, FeatureTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); uint64_t all_feature_supported, feature_required, feature_supported = 0; for (int i = 0; i < 10; i++) feature_supported |= 1ULL << i; feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2; feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS; feature_required = feature_supported | 1ULL << 13; all_feature_supported = feature_required | 1ULL << 14; Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT); p.features_required = feature_required; server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); // 1. Suppose if only support less than required p = client_msgr->get_policy(entity_name_t::TYPE_OSD); p.features_supported = feature_supported; client_msgr->set_policy(entity_name_t::TYPE_OSD, p); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); MPing *m = new MPing(); ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); conn->send_message(m); CHECK_AND_WAIT_TRUE(!conn->is_connected()); // should failed build a connection ASSERT_FALSE(conn->is_connected()); client_msgr->shutdown(); client_msgr->wait(); // 2. supported met required p = client_msgr->get_policy(entity_name_t::TYPE_OSD); p.features_supported = all_feature_supported; client_msgr->set_policy(entity_name_t::TYPE_OSD, p); client_msgr->start(); conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } TEST_P(MessengerTest, TimeoutTest) { g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1"); FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. build the connection MPing *m = new MPing(); ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->peer_is_osd()); // 2. wait for idle usleep(2500*1000); ASSERT_FALSE(conn->is_connected()); server_msgr->shutdown(); server_msgr->wait(); client_msgr->shutdown(); client_msgr->wait(); g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900"); } TEST_P(MessengerTest, StatefulTest) { Message *m; FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); Messenger::Policy p = Messenger::Policy::stateful_server(0); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); p = Messenger::Policy::lossless_client(0); client_msgr->set_policy(entity_name_t::TYPE_OSD, p); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. test for server standby ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); ConnectionRef server_conn = server_msgr->connect_to( client_msgr->get_mytype(), srv_dispatcher.last_accept); // don't lose state ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); srv_dispatcher.got_new = false; conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->connect_to(client_msgr->get_mytype(), srv_dispatcher.last_accept); { std::unique_lock l{srv_dispatcher.lock}; srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; }); } // 2. test for client reconnect ASSERT_FALSE(cli_dispatcher.got_remote_reset); cli_dispatcher.got_connect = false; cli_dispatcher.got_new = false; cli_dispatcher.got_remote_reset = false; server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); // ensure client detect server socket closed { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); cli_dispatcher.got_remote_reset = false; } { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); cli_dispatcher.got_connect = false; } CHECK_AND_WAIT_TRUE(conn->is_connected()); ASSERT_TRUE(conn->is_connected()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); ASSERT_TRUE(conn->is_connected()); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } // resetcheck happen ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->connect_to(client_msgr->get_mytype(), srv_dispatcher.last_accept); ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); cli_dispatcher.got_remote_reset = false; server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } TEST_P(MessengerTest, StatelessTest) { Message *m; FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); Messenger::Policy p = Messenger::Policy::stateless_server(0); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); p = Messenger::Policy::lossy_client(0); client_msgr->set_policy(entity_name_t::TYPE_OSD, p); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. test for server lose state ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); srv_dispatcher.got_new = false; ConnectionRef server_conn; srv_dispatcher.last_accept_con_ptr = &server_conn; conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(server_conn); // server lose state { std::unique_lock l{srv_dispatcher.lock}; srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); } ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); // 2. test for client lossy server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); conn->send_keepalive(); CHECK_AND_WAIT_TRUE(!conn->is_connected()); ASSERT_FALSE(conn->is_connected()); conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } TEST_P(MessengerTest, AnonTest) { Message *m; FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); Messenger::Policy p = Messenger::Policy::stateless_server(0); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); p = Messenger::Policy::lossy_client(0); client_msgr->set_policy(entity_name_t::TYPE_OSD, p); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); ConnectionRef server_con_a, server_con_b; // a srv_dispatcher.last_accept_con_ptr = &server_con_a; ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs(), true); { m = new MPing(); ASSERT_EQ(con_a->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(con_a->get_priv().get())->get_count()); // b srv_dispatcher.last_accept_con_ptr = &server_con_b; ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs(), true); { m = new MPing(); ASSERT_EQ(con_b->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(con_b->get_priv().get())->get_count()); // these should be distinct ASSERT_NE(con_a, con_b); ASSERT_NE(server_con_a, server_con_b); // and both connected { m = new MPing(); ASSERT_EQ(con_a->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } { m = new MPing(); ASSERT_EQ(con_b->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } // clean up con_a->mark_down(); ASSERT_FALSE(con_a->is_connected()); con_b->mark_down(); ASSERT_FALSE(con_b->is_connected()); server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } TEST_P(MessengerTest, ClientStandbyTest) { Message *m; FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); Messenger::Policy p = Messenger::Policy::stateful_server(0); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); p = Messenger::Policy::lossless_peer(0); client_msgr->set_policy(entity_name_t::TYPE_OSD, p); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. test for client standby, resetcheck ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ConnectionRef server_conn = server_msgr->connect_to( client_msgr->get_mytype(), srv_dispatcher.last_accept); ASSERT_FALSE(cli_dispatcher.got_remote_reset); cli_dispatcher.got_connect = false; server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); // client should be standby usleep(300*1000); // client should be standby, so we use original connection { // Try send message to verify got remote reset callback m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); { std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); cli_dispatcher.got_remote_reset = false; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); cli_dispatcher.got_connect = false; } CHECK_AND_WAIT_TRUE(conn->is_connected()); ASSERT_TRUE(conn->is_connected()); m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->connect_to(client_msgr->get_mytype(), srv_dispatcher.last_accept); ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } TEST_P(MessengerTest, AuthTest) { g_ceph_context->_conf.set_val("auth_cluster_required", "cephx"); g_ceph_context->_conf.set_val("auth_service_required", "cephx"); g_ceph_context->_conf.set_val("auth_client_required", "cephx"); FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. simple auth round trip MPing *m = new MPing(); ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); // 2. mix auth g_ceph_context->_conf.set_val("auth_cluster_required", "none"); g_ceph_context->_conf.set_val("auth_service_required", "none"); g_ceph_context->_conf.set_val("auth_client_required", "none"); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { MPing *m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } TEST_P(MessengerTest, MessageTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1"); Messenger::Policy p = Messenger::Policy::stateful_server(0); server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); p = Messenger::Policy::lossless_peer(0); client_msgr->set_policy(entity_name_t::TYPE_OSD, p); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->start(); // 1. A very large "front"(as well as "payload") // Because a external message need to invade Messenger::decode_message, // here we only use existing message class(MCommand) ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { uuid_d uuid; uuid.generate_random(); vector cmds; string s("abcdefghijklmnopqrstuvwxyz"); for (int i = 0; i < 1024*30; i++) cmds.push_back(s); MCommand *m = new MCommand(uuid); m->cmd = cmds; conn->send_message(m); std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; }); ASSERT_TRUE(cli_dispatcher.got_new); cli_dispatcher.got_new = false; } // 2. A very large "data" { bufferlist bl; string s("abcdefghijklmnopqrstuvwxyz"); for (int i = 0; i < 1024*30; i++) bl.append(s); MPing *m = new MPing(); m->set_data(bl); conn->send_message(m); utime_t t; t += 1000*1000*500; std::unique_lock l{cli_dispatcher.lock}; cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); ASSERT_TRUE(cli_dispatcher.got_new); cli_dispatcher.got_new = false; } server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } class SyntheticWorkload; struct Payload { enum Who : uint8_t { PING = 0, PONG = 1, }; uint8_t who = 0; uint64_t seq = 0; bufferlist data; Payload(Who who, uint64_t seq, const bufferlist& data) : who(who), seq(seq), data(data) {} Payload() = default; DENC(Payload, v, p) { DENC_START(1, 1, p); denc(v.who, p); denc(v.seq, p); denc(v.data, p); DENC_FINISH(p); } }; WRITE_CLASS_DENC(Payload) ostream& operator<<(ostream& out, const Payload &pl) { return out << "reply=" << pl.who << " i = " << pl.seq; } class SyntheticDispatcher : public Dispatcher { public: ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock"); ceph::condition_variable cond; bool is_server; bool got_new; bool got_remote_reset; bool got_connect; map > conn_sent; map sent; atomic index; SyntheticWorkload *workload; SyntheticDispatcher(bool s, SyntheticWorkload *wl): Dispatcher(g_ceph_context), is_server(s), got_new(false), got_remote_reset(false), got_connect(false), index(0), workload(wl) { } bool ms_can_fast_dispatch_any() const override { return true; } bool ms_can_fast_dispatch(const Message *m) const override { switch (m->get_type()) { case CEPH_MSG_PING: case MSG_COMMAND: return true; default: return false; } } void ms_handle_fast_connect(Connection *con) override { std::lock_guard l{lock}; list c = conn_sent[con]; for (list::iterator it = c.begin(); it != c.end(); ++it) sent.erase(*it); conn_sent.erase(con); got_connect = true; cond.notify_all(); } void ms_handle_fast_accept(Connection *con) override { std::lock_guard l{lock}; list c = conn_sent[con]; for (list::iterator it = c.begin(); it != c.end(); ++it) sent.erase(*it); conn_sent.erase(con); cond.notify_all(); } bool ms_dispatch(Message *m) override { ceph_abort(); } bool ms_handle_reset(Connection *con) override; void ms_handle_remote_reset(Connection *con) override { std::lock_guard l{lock}; list c = conn_sent[con]; for (list::iterator it = c.begin(); it != c.end(); ++it) sent.erase(*it); conn_sent.erase(con); got_remote_reset = true; } bool ms_handle_refused(Connection *con) override { return false; } void ms_fast_dispatch(Message *m) override { // MSG_COMMAND is used to disorganize regular message flow if (m->get_type() == MSG_COMMAND) { m->put(); return ; } Payload pl; auto p = m->get_data().cbegin(); decode(pl, p); if (pl.who == Payload::PING) { lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; reply_message(m, pl); m->put(); std::lock_guard l{lock}; got_new = true; cond.notify_all(); } else { std::lock_guard l{lock}; if (sent.count(pl.seq)) { lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq])); conn_sent[m->get_connection()].pop_front(); sent.erase(pl.seq); } m->put(); got_new = true; cond.notify_all(); } } int ms_handle_fast_authentication(Connection *con) override { return 1; } void reply_message(const Message *m, Payload& pl) { pl.who = Payload::PONG; bufferlist bl; encode(pl, bl); MPing *rm = new MPing(); rm->set_data(bl); m->get_connection()->send_message(rm); lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl; } void send_message_wrap(ConnectionRef con, const bufferlist& data) { Message *m = new MPing(); Payload pl{Payload::PING, index++, data}; bufferlist bl; encode(pl, bl); m->set_data(bl); if (!con->get_messenger()->get_default_policy().lossy) { std::lock_guard l{lock}; sent[pl.seq] = pl.data; conn_sent[con].push_back(pl.seq); } lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl; ASSERT_EQ(0, con->send_message(m)); } uint64_t get_pending() { std::lock_guard l{lock}; return sent.size(); } void clear_pending(ConnectionRef con) { std::lock_guard l{lock}; for (list::iterator it = conn_sent[con].begin(); it != conn_sent[con].end(); ++it) sent.erase(*it); conn_sent.erase(con); } void print() { for (auto && p : conn_sent) { if (!p.second.empty()) { lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl; } } } }; class SyntheticWorkload { ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock"); ceph::condition_variable cond; set available_servers; set available_clients; Messenger::Policy client_policy; map > available_connections; SyntheticDispatcher dispatcher; gen_type rng; vector rand_data; DummyAuthClientServer dummy_auth; public: const unsigned max_in_flight = 0; const unsigned max_connections = 0; static const unsigned max_message_len = 1024 * 1024 * 4; SyntheticWorkload(int servers, int clients, string type, int random_num, Messenger::Policy srv_policy, Messenger::Policy cli_policy, int _max_in_flight = 64, int _max_connections = 128) : client_policy(cli_policy), dispatcher(false, this), rng(time(NULL)), dummy_auth(g_ceph_context), max_in_flight(_max_in_flight), max_connections(_max_connections) { dummy_auth.auth_registry.refresh_config(); Messenger *msgr; int base_port = 16800; entity_addr_t bind_addr; char addr[64]; for (int i = 0; i < servers; ++i) { msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", getpid()+i); snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", base_port+i); bind_addr.parse(addr); msgr->bind(bind_addr); msgr->add_dispatcher_head(&dispatcher); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); ceph_assert(msgr); msgr->set_default_policy(srv_policy); available_servers.insert(msgr); msgr->start(); } for (int i = 0; i < clients; ++i) { msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), "client", getpid()+i+servers); if (cli_policy.standby) { snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", base_port+i+servers); bind_addr.parse(addr); msgr->bind(bind_addr); } msgr->add_dispatcher_head(&dispatcher); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); ceph_assert(msgr); msgr->set_default_policy(cli_policy); available_clients.insert(msgr); msgr->start(); } for (int i = 0; i < random_num; i++) { bufferlist bl; boost::uniform_int<> u(32, max_message_len); uint64_t value_len = u(rng); bufferptr bp(value_len); bp.zero(); for (uint64_t j = 0; j < value_len-sizeof(i); ) { memcpy(bp.c_str()+j, &i, sizeof(i)); j += 4096; } bl.append(bp); rand_data.push_back(bl); } } ConnectionRef _get_random_connection() { while (dispatcher.get_pending() > max_in_flight) { lock.unlock(); usleep(500); lock.lock(); } ceph_assert(ceph_mutex_is_locked(lock)); boost::uniform_int<> choose(0, available_connections.size() - 1); int index = choose(rng); map >::iterator i = available_connections.begin(); for (; index > 0; --index, ++i) ; return i->first; } bool can_create_connection() { return available_connections.size() < max_connections; } void generate_connection() { std::lock_guard l{lock}; if (!can_create_connection()) return ; Messenger *server, *client; { boost::uniform_int<> choose(0, available_servers.size() - 1); int index = choose(rng); set::iterator i = available_servers.begin(); for (; index > 0; --index, ++i) ; server = *i; } { boost::uniform_int<> choose(0, available_clients.size() - 1); int index = choose(rng); set::iterator i = available_clients.begin(); for (; index > 0; --index, ++i) ; client = *i; } pair p; { boost::uniform_int<> choose(0, available_servers.size() - 1); if (server->get_default_policy().server) { p = make_pair(client, server); ConnectionRef conn = client->connect_to(server->get_mytype(), server->get_myaddrs()); available_connections[conn] = p; } else { ConnectionRef conn = client->connect_to(server->get_mytype(), server->get_myaddrs()); p = make_pair(client, server); available_connections[conn] = p; } } } void send_message() { std::lock_guard l{lock}; ConnectionRef conn = _get_random_connection(); boost::uniform_int<> true_false(0, 99); int val = true_false(rng); if (val >= 95) { uuid_d uuid; uuid.generate_random(); MCommand *m = new MCommand(uuid); vector cmds; cmds.push_back("command"); m->cmd = cmds; m->set_priority(200); conn->send_message(m); } else { boost::uniform_int<> u(0, rand_data.size()-1); dispatcher.send_message_wrap(conn, rand_data[u(rng)]); } } void send_large_message(bool inject_network_congestion=false) { std::lock_guard l{lock}; ConnectionRef conn = _get_random_connection(); uuid_d uuid; uuid.generate_random(); MCommand *m = new MCommand(uuid); vector cmds; cmds.push_back("command"); // set the random data to make the large message bufferlist bl; string s("abcdefghijklmnopqrstuvwxyz"); for (int i = 0; i < 1024*256; i++) bl.append(s); // bl is around 6M m->set_data(bl); m->cmd = cmds; m->set_priority(200); // setup after connection is ready if (inject_network_congestion && conn->is_connected()) { g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100"); } else { g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0"); } conn->send_message(m); } void drop_connection() { std::lock_guard l{lock}; if (available_connections.size() < 10) return; ConnectionRef conn = _get_random_connection(); dispatcher.clear_pending(conn); conn->mark_down(); if (!client_policy.server && !client_policy.lossy && client_policy.standby) { // it's a lossless policy, so we need to mark down each side pair &p = available_connections[conn]; if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) { ASSERT_EQ(conn->get_messenger(), p.first); ConnectionRef peer = p.second->connect_to(p.first->get_mytype(), p.first->get_myaddrs()); peer->mark_down(); dispatcher.clear_pending(peer); available_connections.erase(peer); } } ASSERT_EQ(available_connections.erase(conn), 1U); } void print_internal_state(bool detail=false) { std::lock_guard l{lock}; lderr(g_ceph_context) << "available_connections: " << available_connections.size() << " inflight messages: " << dispatcher.get_pending() << dendl; if (detail && !available_connections.empty()) { dispatcher.print(); } } void wait_for_done() { int64_t tick_us = 1000 * 100; // 100ms int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins int i = 0; while (dispatcher.get_pending()) { usleep(tick_us); timeout_us -= tick_us; if (i++ % 50 == 0) print_internal_state(true); if (timeout_us < 0) ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!"); } for (set::iterator it = available_servers.begin(); it != available_servers.end(); ++it) { (*it)->shutdown(); (*it)->wait(); ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); delete (*it); } available_servers.clear(); for (set::iterator it = available_clients.begin(); it != available_clients.end(); ++it) { (*it)->shutdown(); (*it)->wait(); ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); delete (*it); } available_clients.clear(); } void handle_reset(Connection *con) { std::lock_guard l{lock}; available_connections.erase(con); dispatcher.clear_pending(con); } }; bool SyntheticDispatcher::ms_handle_reset(Connection *con) { workload->handle_reset(con); return true; } TEST_P(MessengerTest, SyntheticStressTest) { SyntheticWorkload test_msg(8, 32, GetParam(), 100, Messenger::Policy::stateful_server(0), Messenger::Policy::lossless_client(0)); for (int i = 0; i < 100; ++i) { if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; test_msg.generate_connection(); } gen_type rng(time(NULL)); for (int i = 0; i < 5000; ++i) { if (!(i % 10)) { lderr(g_ceph_context) << "Op " << i << ": " << dendl; test_msg.print_internal_state(); } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); if (val > 90) { test_msg.generate_connection(); } else if (val > 80) { test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); } else { usleep(rand() % 1000 + 500); } } test_msg.wait_for_done(); } TEST_P(MessengerTest, SyntheticStressTest1) { SyntheticWorkload test_msg(16, 32, GetParam(), 100, Messenger::Policy::lossless_peer_reuse(0), Messenger::Policy::lossless_peer_reuse(0)); for (int i = 0; i < 10; ++i) { if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; test_msg.generate_connection(); } gen_type rng(time(NULL)); for (int i = 0; i < 10000; ++i) { if (!(i % 10)) { lderr(g_ceph_context) << "Op " << i << ": " << dendl; test_msg.print_internal_state(); } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); if (val > 80) { test_msg.generate_connection(); } else if (val > 60) { test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); } else { usleep(rand() % 1000 + 500); } } test_msg.wait_for_done(); } TEST_P(MessengerTest, SyntheticInjectTest) { uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes; g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216"); SyntheticWorkload test_msg(8, 32, GetParam(), 100, Messenger::Policy::stateful_server(0), Messenger::Policy::lossless_client(0)); for (int i = 0; i < 100; ++i) { if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; test_msg.generate_connection(); } gen_type rng(time(NULL)); for (int i = 0; i < 1000; ++i) { if (!(i % 10)) { lderr(g_ceph_context) << "Op " << i << ": " << dendl; test_msg.print_internal_state(); } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); if (val > 90) { test_msg.generate_connection(); } else if (val > 80) { test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); } else { usleep(rand() % 500 + 100); } } test_msg.wait_for_done(); g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); g_ceph_context->_conf.set_val( "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes)); } TEST_P(MessengerTest, SyntheticInjectTest2) { g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); SyntheticWorkload test_msg(8, 16, GetParam(), 100, Messenger::Policy::lossless_peer_reuse(0), Messenger::Policy::lossless_peer_reuse(0)); for (int i = 0; i < 100; ++i) { if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; test_msg.generate_connection(); } gen_type rng(time(NULL)); for (int i = 0; i < 1000; ++i) { if (!(i % 10)) { lderr(g_ceph_context) << "Op " << i << ": " << dendl; test_msg.print_internal_state(); } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); if (val > 90) { test_msg.generate_connection(); } else if (val > 80) { test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); } else { usleep(rand() % 500 + 100); } } test_msg.wait_for_done(); g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); } TEST_P(MessengerTest, SyntheticInjectTest3) { g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); SyntheticWorkload test_msg(8, 16, GetParam(), 100, Messenger::Policy::stateless_server(0), Messenger::Policy::lossy_client(0)); for (int i = 0; i < 100; ++i) { if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; test_msg.generate_connection(); } gen_type rng(time(NULL)); for (int i = 0; i < 1000; ++i) { if (!(i % 10)) { lderr(g_ceph_context) << "Op " << i << ": " << dendl; test_msg.print_internal_state(); } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); if (val > 90) { test_msg.generate_connection(); } else if (val > 80) { test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); } else { usleep(rand() % 500 + 100); } } test_msg.wait_for_done(); g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); } TEST_P(MessengerTest, SyntheticInjectTest4) { g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1"); g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd"); g_ceph_context->_conf.set_val("ms_inject_delay_max", "5"); SyntheticWorkload test_msg(16, 32, GetParam(), 100, Messenger::Policy::lossless_peer(0), Messenger::Policy::lossless_peer(0)); for (int i = 0; i < 100; ++i) { if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; test_msg.generate_connection(); } gen_type rng(time(NULL)); for (int i = 0; i < 1000; ++i) { if (!(i % 10)) { lderr(g_ceph_context) << "Op " << i << ": " << dendl; test_msg.print_internal_state(); } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); if (val > 95) { test_msg.generate_connection(); } else if (val > 80) { // test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); } else { usleep(rand() % 500 + 100); } } test_msg.wait_for_done(); g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0"); g_ceph_context->_conf.set_val("ms_inject_delay_type", ""); g_ceph_context->_conf.set_val("ms_inject_delay_max", "0"); } // This is test for network block, means ::send return EAGAIN TEST_P(MessengerTest, SyntheticInjectTest5) { SyntheticWorkload test_msg(1, 8, GetParam(), 100, Messenger::Policy::stateful_server(0), Messenger::Policy::lossless_client(0), 64, 2); bool simulate_network_congestion = true; for (int i = 0; i < 2; ++i) test_msg.generate_connection(); for (int i = 0; i < 5000; ++i) { if (!(i % 10)) { ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl; test_msg.print_internal_state(); } if (i < 1600) { // means that we would stuck 1600 * 6M (9.6G) around with 2 connections test_msg.send_large_message(simulate_network_congestion); } else { simulate_network_congestion = false; test_msg.send_large_message(simulate_network_congestion); } } test_msg.wait_for_done(); } class MarkdownDispatcher : public Dispatcher { ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock"); set conns; bool last_mark; public: std::atomic count = { 0 }; explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), last_mark(false) { } bool ms_can_fast_dispatch_any() const override { return false; } bool ms_can_fast_dispatch(const Message *m) const override { switch (m->get_type()) { case CEPH_MSG_PING: return true; default: return false; } } void ms_handle_fast_connect(Connection *con) override { lderr(g_ceph_context) << __func__ << " " << con << dendl; std::lock_guard l{lock}; conns.insert(con); } void ms_handle_fast_accept(Connection *con) override { std::lock_guard l{lock}; conns.insert(con); } bool ms_dispatch(Message *m) override { lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl; std::lock_guard l{lock}; count++; conns.insert(m->get_connection()); if (conns.size() < 2 && !last_mark) { m->put(); return true; } last_mark = true; usleep(rand() % 500); for (set::iterator it = conns.begin(); it != conns.end(); ++it) { if ((*it) != m->get_connection().get()) { (*it)->mark_down(); conns.erase(it); break; } } if (conns.empty()) last_mark = false; m->put(); return true; } bool ms_handle_reset(Connection *con) override { lderr(g_ceph_context) << __func__ << " " << con << dendl; std::lock_guard l{lock}; conns.erase(con); usleep(rand() % 500); return true; } void ms_handle_remote_reset(Connection *con) override { std::lock_guard l{lock}; conns.erase(con); lderr(g_ceph_context) << __func__ << " " << con << dendl; } bool ms_handle_refused(Connection *con) override { return false; } void ms_fast_dispatch(Message *m) override { ceph_abort(); } int ms_handle_fast_authentication(Connection *con) override { return 1; } }; // Markdown with external lock TEST_P(MessengerTest, MarkdownTest) { Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); DummyAuthClientServer dummy_auth(g_ceph_context); dummy_auth.auth_registry.refresh_config(); entity_addr_t bind_addr; bind_addr.parse("v2:127.0.0.1:16800"); server_msgr->bind(bind_addr); server_msgr->add_dispatcher_head(&srv_dispatcher); server_msgr->set_auth_client(&dummy_auth); server_msgr->set_auth_server(&dummy_auth); server_msgr->start(); bind_addr.parse("v2:127.0.0.1:16801"); server_msgr2->bind(bind_addr); server_msgr2->add_dispatcher_head(&srv_dispatcher); server_msgr2->set_auth_client(&dummy_auth); server_msgr2->set_auth_server(&dummy_auth); server_msgr2->start(); client_msgr->add_dispatcher_head(&cli_dispatcher); client_msgr->set_auth_client(&dummy_auth); client_msgr->set_auth_server(&dummy_auth); client_msgr->start(); int i = 1000; uint64_t last = 0; bool equal = false; uint64_t equal_count = 0; while (i--) { ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(), server_msgr2->get_myaddrs()); MPing *m = new MPing(); ASSERT_EQ(conn1->send_message(m), 0); m = new MPing(); ASSERT_EQ(conn2->send_message(m), 0); CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1); if (srv_dispatcher.count == last) { lderr(g_ceph_context) << __func__ << " last is " << last << dendl; equal = true; equal_count++; } else { equal = false; equal_count = 0; } last = srv_dispatcher.count; if (equal_count) usleep(1000*500); ASSERT_FALSE(equal && equal_count > 3); } server_msgr->shutdown(); client_msgr->shutdown(); server_msgr2->shutdown(); server_msgr->wait(); client_msgr->wait(); server_msgr2->wait(); delete server_msgr2; } INSTANTIATE_TEST_SUITE_P( Messenger, MessengerTest, ::testing::Values( "async+posix" ) ); int main(int argc, char **argv) { auto args = argv_to_vec(argc, argv); auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); g_ceph_context->_conf.set_val("auth_cluster_required", "none"); g_ceph_context->_conf.set_val("auth_service_required", "none"); g_ceph_context->_conf.set_val("auth_client_required", "none"); g_ceph_context->_conf.set_val("keyring", "/dev/null"); g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async"); g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true"); g_ceph_context->_conf.set_val("ms_die_on_old_message", "true"); g_ceph_context->_conf.set_val("ms_max_backoff", "1"); common_init_finish(g_ceph_context); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } /* * Local Variables: * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr" * End: */