// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "common/ceph_argparse.h" #include "common/ceph_time.h" #include "messages/MPing.h" #include "messages/MCommand.h" #include "messages/MCommandReply.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" #include "crimson/auth/DummyAuth.h" #include "crimson/common/log.h" #include "crimson/net/Connection.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Messenger.h" #include "crimson/net/Interceptor.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "test_messenger.h" using namespace std::chrono_literals; namespace bpo = boost::program_options; using crimson::common::local_conf; namespace { seastar::logger& logger() { return crimson::get_logger(ceph_subsys_test); } static std::random_device rd; static std::default_random_engine rng{rd()}; static bool verbose = false; static entity_addr_t get_server_addr() { static int port = 9030; ++port; entity_addr_t saddr; saddr.parse("127.0.0.1", nullptr); saddr.set_port(port); return saddr; } template seastar::future create_sharded(Args... args) { // we should only construct/stop shards on #0 return seastar::smp::submit_to(0, [=] { auto sharded_obj = seastar::make_lw_shared>(); return sharded_obj->start(args... ).then([sharded_obj] { seastar::engine().at_exit([sharded_obj] { return sharded_obj->stop().then([sharded_obj] {}); }); return sharded_obj.get(); }); }).then([](seastar::sharded *ptr_shard) { return &ptr_shard->local(); }); } class ShardedGates : public seastar::peering_sharded_service { public: ShardedGates() = default; ~ShardedGates() { assert(gate.is_closed()); } template void dispatch_in_background(const char *what, Func &&f) { std::ignore = seastar::with_gate( container().local().gate, std::forward(f) ).handle_exception([what](std::exception_ptr eptr) { try { std::rethrow_exception(eptr); } catch (std::exception &e) { logger().error("ShardedGates::dispatch_in_background: " "{} got exxception {}", what, e.what()); } }); } seastar::future<> close() { return container().invoke_on_all([](auto &local) { return local.gate.close(); }); } static seastar::future create() { return create_sharded(); } // seastar::future<> stop() is intentially not implemented private: seastar::gate gate; }; static seastar::future<> test_echo(unsigned rounds, double keepalive_ratio) { struct test_state { struct Server final : public crimson::net::Dispatcher { ShardedGates &gates; crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; Server(ShardedGates &gates) : gates{gates} {} void ms_handle_accept( crimson::net::ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) override { logger().info("server accepted {}", *conn); ceph_assert(prv_shard == seastar::this_shard_id()); ceph_assert(!is_replace); } std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { if (verbose) { logger().info("server got {}", *m); } // reply with a pong gates.dispatch_in_background("echo_send_pong", [c] { return c->send(crimson::make_message()); }); return {seastar::now()}; } seastar::future<> init(const entity_name_t& name, const std::string& lname, const uint64_t nonce, const entity_addr_t& addr) { msgr = crimson::net::Messenger::create( name, lname, nonce, false); msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return msgr->start({this}); }, crimson::net::Messenger::bind_ertr::all_same_way( [addr] (const std::error_code& e) { logger().error("test_echo(): " "there is another instance running at {}", addr); ceph_abort(); })); } seastar::future<> shutdown() { ceph_assert(msgr); msgr->stop(); return msgr->shutdown(); } }; class Client final : public crimson::net::Dispatcher, public seastar::peering_sharded_service { public: Client(seastar::shard_id primary_sid, unsigned rounds, double keepalive_ratio, ShardedGates *gates) : primary_sid{primary_sid}, keepalive_dist(std::bernoulli_distribution{keepalive_ratio}), rounds(rounds), gates{*gates} {} seastar::future<> init(const entity_name_t& name, const std::string& lname, const uint64_t nonce) { assert(seastar::this_shard_id() == primary_sid); msgr = crimson::net::Messenger::create( name, lname, nonce, false); msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); return msgr->start({this}); } seastar::future<> shutdown() { assert(seastar::this_shard_id() == primary_sid); ceph_assert(msgr); msgr->stop(); return msgr->shutdown(); } seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) { assert(seastar::this_shard_id() == primary_sid); mono_time start_time = mono_clock::now(); auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD); return seastar::futurize_invoke([this, conn] { return do_dispatch_pingpong(conn); }).then([] { // 500ms should be enough to establish the connection return seastar::sleep(500ms); }).then([this, conn, start_time] { return container().invoke_on( conn->get_shard_id(), [pconn=&*conn, start_time](auto &local) { assert(pconn->is_connected()); auto session = local.find_session(pconn); std::chrono::duration dur_handshake = session->connected_time - start_time; std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; logger().info("{}: handshake {}, pingpong {}", *pconn, dur_handshake.count(), dur_pingpong.count()); }).then([conn] {}); }); } static seastar::future create( unsigned rounds, double keepalive_ratio, ShardedGates *gates) { return create_sharded( seastar::this_shard_id(), rounds, keepalive_ratio, gates); } private: struct PingSession : public seastar::enable_shared_from_this { unsigned count = 0u; mono_time connected_time; mono_time finish_time; }; using PingSessionRef = seastar::shared_ptr; void ms_handle_connect( crimson::net::ConnectionRef conn, seastar::shard_id prv_shard) override { auto &local = container().local(); assert(prv_shard == seastar::this_shard_id()); auto session = seastar::make_shared(); auto [i, added] = local.sessions.emplace(&*conn, session); std::ignore = i; ceph_assert(added); session->connected_time = mono_clock::now(); } std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { auto &local = container().local(); auto session = local.find_session(&*c); ++(session->count); if (verbose) { logger().info("client ms_dispatch {}", session->count); } if (session->count > rounds) { logger().error("{}: got {} pongs, more than expected {}", *c, session->count, rounds); ceph_abort(); } else if (session->count == rounds) { logger().info("{}: finished receiving {} pongs", *c, session->count); session->finish_time = mono_clock::now(); gates.dispatch_in_background("echo_notify_done", [c, this] { return container().invoke_on(primary_sid, [pconn=&*c](auto &local) { auto found = local.pending_conns.find(pconn); ceph_assert(found != local.pending_conns.end()); found->second.set_value(); }).then([c] {}); }); } return {seastar::now()}; } PingSessionRef find_session(crimson::net::Connection *c) { auto found = sessions.find(c); if (found == sessions.end()) { ceph_assert(false); } return found->second; } seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) { auto [i, added] = pending_conns.emplace(&*conn, seastar::promise<>()); std::ignore = i; ceph_assert(added); return seastar::do_with(0u, 0u, [this, conn](auto &count_ping, auto &count_keepalive) { return seastar::do_until( [this, conn, &count_ping, &count_keepalive] { bool stop = (count_ping == rounds); if (stop) { logger().info("{}: finished sending {} pings with {} keepalives", *conn, count_ping, count_keepalive); } return stop; }, [this, conn, &count_ping, &count_keepalive] { return seastar::repeat([this, conn, &count_ping, &count_keepalive] { if (keepalive_dist(rng)) { return conn->send_keepalive( ).then([&count_keepalive] { count_keepalive += 1; return seastar::make_ready_future( seastar::stop_iteration::no); }); } else { return conn->send(crimson::make_message() ).then([&count_ping] { count_ping += 1; return seastar::make_ready_future( seastar::stop_iteration::yes); }); } }); }).then([this, conn] { auto found = pending_conns.find(&*conn); assert(found != pending_conns.end()); return found->second.get_future(); } ); }); } private: // primary shard only const seastar::shard_id primary_sid; std::bernoulli_distribution keepalive_dist; crimson::net::MessengerRef msgr; std::map> pending_conns; crimson::auth::DummyAuthClientServer dummy_auth; // per shard const unsigned rounds; std::map sessions; ShardedGates &gates; }; }; logger().info("test_echo(rounds={}, keepalive_ratio={}):", rounds, keepalive_ratio); return ShardedGates::create( ).then([rounds, keepalive_ratio](auto *gates) { return seastar::when_all_succeed( test_state::Client::create(rounds, keepalive_ratio, gates), test_state::Client::create(rounds, keepalive_ratio, gates), seastar::make_ready_future(gates)); }).then_unpack([](auto *client1, auto *client2, auto *gates) { auto server1 = seastar::make_shared(*gates); auto server2 = seastar::make_shared(*gates); // start servers and clients auto addr1 = get_server_addr(); auto addr2 = get_server_addr(); addr1.set_type(entity_addr_t::TYPE_MSGR2); addr2.set_type(entity_addr_t::TYPE_MSGR2); return seastar::when_all_succeed( server1->init(entity_name_t::OSD(0), "server1", 1, addr1), server2->init(entity_name_t::OSD(1), "server2", 2, addr2), client1->init(entity_name_t::OSD(2), "client1", 3), client2->init(entity_name_t::OSD(3), "client2", 4) // dispatch pingpoing ).then_unpack([client1, client2, server1, server2] { return seastar::when_all_succeed( // test connecting in parallel, accepting in parallel client1->dispatch_pingpong(server1->msgr->get_myaddr()), client1->dispatch_pingpong(server2->msgr->get_myaddr()), client2->dispatch_pingpong(server1->msgr->get_myaddr()), client2->dispatch_pingpong(server2->msgr->get_myaddr())); // shutdown }).then_unpack([client1] { logger().info("client1 shutdown..."); return client1->shutdown(); }).then([client2] { logger().info("client2 shutdown..."); return client2->shutdown(); }).then([server1] { logger().info("server1 shutdown..."); return server1->shutdown(); }).then([server2] { logger().info("server2 shutdown..."); return server2->shutdown(); }).then([] { logger().info("test_echo() done!\n"); }).handle_exception([](auto eptr) { logger().error("test_echo() failed: got exception {}", eptr); throw; }).finally([gates, server1, server2] { return gates->close(); }); }); } seastar::future<> test_preemptive_shutdown() { struct test_state { class Server final : public crimson::net::Dispatcher { crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { std::ignore = c->send(crimson::make_message()); return {seastar::now()}; } public: seastar::future<> init(const entity_name_t& name, const std::string& lname, const uint64_t nonce, const entity_addr_t& addr) { msgr = crimson::net::Messenger::create( name, lname, nonce, true); msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return msgr->start({this}); }, crimson::net::Messenger::bind_ertr::all_same_way( [addr] (const std::error_code& e) { logger().error("test_preemptive_shutdown(): " "there is another instance running at {}", addr); ceph_abort(); })); } entity_addr_t get_addr() const { return msgr->get_myaddr(); } seastar::future<> shutdown() { msgr->stop(); return msgr->shutdown(); } }; class Client final : public crimson::net::Dispatcher { crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; bool stop_send = false; seastar::promise<> stopped_send_promise; std::optional> ms_dispatch( crimson::net::ConnectionRef, MessageRef m) override { return {seastar::now()}; } public: seastar::future<> init(const entity_name_t& name, const std::string& lname, const uint64_t nonce) { msgr = crimson::net::Messenger::create( name, lname, nonce, true); msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); return msgr->start({this}); } void send_pings(const entity_addr_t& addr) { auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD); // forwarded to stopped_send_promise (void) seastar::do_until( [this] { return stop_send; }, [conn] { return conn->send(crimson::make_message()).then([] { return seastar::sleep(0ms); }); } ).then_wrapped([this, conn] (auto fut) { fut.forward_to(std::move(stopped_send_promise)); }); } seastar::future<> shutdown() { msgr->stop(); return msgr->shutdown().then([this] { stop_send = true; return stopped_send_promise.get_future(); }); } }; }; logger().info("test_preemptive_shutdown():"); auto server = seastar::make_shared(); auto client = seastar::make_shared(); auto addr = get_server_addr(); addr.set_type(entity_addr_t::TYPE_MSGR2); addr.set_family(AF_INET); return seastar::when_all_succeed( server->init(entity_name_t::OSD(6), "server4", 7, addr), client->init(entity_name_t::OSD(7), "client4", 8) ).then_unpack([server, client] { client->send_pings(server->get_addr()); return seastar::sleep(100ms); }).then([client] { logger().info("client shutdown..."); return client->shutdown(); }).then([server] { logger().info("server shutdown..."); return server->shutdown(); }).then([] { logger().info("test_preemptive_shutdown() done!\n"); }).handle_exception([server, client] (auto eptr) { logger().error("test_preemptive_shutdown() failed: got exception {}", eptr); throw; }); } using ceph::msgr::v2::Tag; using crimson::net::bp_action_t; using crimson::net::bp_type_t; using crimson::net::Breakpoint; using crimson::net::Connection; using crimson::net::ConnectionRef; using crimson::net::custom_bp_t; using crimson::net::Dispatcher; using crimson::net::Interceptor; using crimson::net::Messenger; using crimson::net::MessengerRef; using crimson::net::SocketPolicy; using crimson::net::tag_bp_t; using namespace ceph::net::test; struct counter_t { unsigned counter = 0; }; enum class conn_state_t { unknown = 0, established, closed, replaced, }; std::ostream& operator<<(std::ostream& out, const conn_state_t& state) { switch(state) { case conn_state_t::unknown: return out << "unknown"; case conn_state_t::established: return out << "established"; case conn_state_t::closed: return out << "closed"; case conn_state_t::replaced: return out << "replaced"; default: ceph_abort(); } } } // anonymous namespace #if FMT_VERSION >= 90000 template<> struct fmt::formatter : fmt::ostream_formatter {}; #endif namespace { struct ConnResult { ConnectionRef conn; unsigned index; conn_state_t state = conn_state_t::unknown; unsigned connect_attempts = 0; unsigned client_connect_attempts = 0; unsigned client_reconnect_attempts = 0; unsigned cnt_connect_dispatched = 0; unsigned accept_attempts = 0; unsigned server_connect_attempts = 0; unsigned server_reconnect_attempts = 0; unsigned cnt_accept_dispatched = 0; unsigned cnt_reset_dispatched = 0; unsigned cnt_remote_reset_dispatched = 0; ConnResult(ConnectionRef conn, unsigned index) : conn(conn), index(index) {} template void _assert_eq(const char* expr_actual, T actual, const char* expr_expected, T expected) const { if (actual != expected) { throw std::runtime_error(fmt::format( "[{}] {} '{}' is actually {}, not the expected '{}' {}", index, *conn, expr_actual, actual, expr_expected, expected)); } } #define ASSERT_EQUAL(actual, expected) \ _assert_eq(#actual, actual, #expected, expected) void assert_state_at(conn_state_t expected) const { ASSERT_EQUAL(state, expected); } void assert_connect(unsigned attempts, unsigned connects, unsigned reconnects, unsigned dispatched) const { ASSERT_EQUAL(connect_attempts, attempts); ASSERT_EQUAL(client_connect_attempts, connects); ASSERT_EQUAL(client_reconnect_attempts, reconnects); ASSERT_EQUAL(cnt_connect_dispatched, dispatched); } void assert_connect(unsigned attempts, unsigned dispatched) const { ASSERT_EQUAL(connect_attempts, attempts); ASSERT_EQUAL(cnt_connect_dispatched, dispatched); } void assert_accept(unsigned attempts, unsigned accepts, unsigned reaccepts, unsigned dispatched) const { ASSERT_EQUAL(accept_attempts, attempts); ASSERT_EQUAL(server_connect_attempts, accepts); ASSERT_EQUAL(server_reconnect_attempts, reaccepts); ASSERT_EQUAL(cnt_accept_dispatched, dispatched); } void assert_accept(unsigned attempts, unsigned dispatched) const { ASSERT_EQUAL(accept_attempts, attempts); ASSERT_EQUAL(cnt_accept_dispatched, dispatched); } void assert_reset(unsigned local, unsigned remote) const { ASSERT_EQUAL(cnt_reset_dispatched, local); ASSERT_EQUAL(cnt_remote_reset_dispatched, remote); } void dump() const { logger().info("\nResult({}):\n" " conn: [{}] {}:\n" " state: {}\n" " connect_attempts: {}\n" " client_connect_attempts: {}\n" " client_reconnect_attempts: {}\n" " cnt_connect_dispatched: {}\n" " accept_attempts: {}\n" " server_connect_attempts: {}\n" " server_reconnect_attempts: {}\n" " cnt_accept_dispatched: {}\n" " cnt_reset_dispatched: {}\n" " cnt_remote_reset_dispatched: {}\n", static_cast(this), index, *conn, state, connect_attempts, client_connect_attempts, client_reconnect_attempts, cnt_connect_dispatched, accept_attempts, server_connect_attempts, server_reconnect_attempts, cnt_accept_dispatched, cnt_reset_dispatched, cnt_remote_reset_dispatched); } }; using ConnResults = std::vector; struct TestInterceptor : public Interceptor { std::map> breakpoints; std::map breakpoints_counter; std::map conns; ConnResults results; std::optional signal; const seastar::shard_id primary_sid; TestInterceptor() : primary_sid{seastar::this_shard_id()} {} // only used for copy breakpoint configurations TestInterceptor(const TestInterceptor& other) : primary_sid{other.primary_sid} { assert(other.breakpoints_counter.empty()); assert(other.conns.empty()); assert(other.results.empty()); breakpoints = other.breakpoints; assert(!other.signal); assert(seastar::this_shard_id() == primary_sid); } void make_fault(Breakpoint bp, unsigned round = 1) { assert(round >= 1); breakpoints[bp][round] = bp_action_t::FAULT; } void make_block(Breakpoint bp, unsigned round = 1) { assert(round >= 1); breakpoints[bp][round] = bp_action_t::BLOCK; } void make_stall(Breakpoint bp, unsigned round = 1) { assert(round >= 1); breakpoints[bp][round] = bp_action_t::STALL; } ConnResult* find_result(Connection *conn) { assert(seastar::this_shard_id() == primary_sid); auto it = conns.find(conn); if (it == conns.end()) { return nullptr; } else { return &results[it->second]; } } seastar::future<> wait() { assert(seastar::this_shard_id() == primary_sid); assert(!signal); signal = seastar::abort_source(); return seastar::sleep_abortable(10s, *signal).then([] { throw std::runtime_error("Timeout (10s) in TestInterceptor::wait()"); }).handle_exception_type([] (const seastar::sleep_aborted& e) { // wait done! }); } void notify() { assert(seastar::this_shard_id() == primary_sid); if (signal) { signal->request_abort(); signal = std::nullopt; } } private: void register_conn(ConnectionRef conn) override { auto result = find_result(&*conn); if (result != nullptr) { logger().error("The connection [{}] {} already exists when register {}", result->index, *result->conn, *conn); ceph_abort(); } unsigned index = results.size(); results.emplace_back(conn, index); conns[&*conn] = index; notify(); logger().info("[{}] {} new connection registered", index, *conn); } void register_conn_closed(ConnectionRef conn) override { auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked closed connection: {}", *conn); ceph_abort(); } if (result->state != conn_state_t::replaced) { result->state = conn_state_t::closed; } notify(); logger().info("[{}] {} closed({})", result->index, *conn, result->state); } void register_conn_ready(ConnectionRef conn) override { auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked ready connection: {}", *conn); ceph_abort(); } ceph_assert(conn->is_protocol_ready()); notify(); logger().info("[{}] {} ready", result->index, *conn); } void register_conn_replaced(ConnectionRef conn) override { auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked replaced connection: {}", *conn); ceph_abort(); } result->state = conn_state_t::replaced; logger().info("[{}] {} {}", result->index, *conn, result->state); } seastar::future intercept(Connection &_conn, std::vector bps) override { assert(bps.size() >= 1); Connection *conn = &_conn; return seastar::smp::submit_to(primary_sid, [conn, bps, this] { std::vector actions; for (const Breakpoint &bp : bps) { ++breakpoints_counter[bp].counter; auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", *conn, bp, breakpoints_counter[bp].counter); ceph_abort(); } if (bp == custom_bp_t::SOCKET_CONNECTING) { ++result->connect_attempts; logger().info("[Test] connect_attempts={}", result->connect_attempts); } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) { ++result->client_connect_attempts; logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts); } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) { ++result->client_reconnect_attempts; logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts); } else if (bp == custom_bp_t::SOCKET_ACCEPTED) { ++result->accept_attempts; logger().info("[Test] accept_attempts={}", result->accept_attempts); } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) { ++result->server_connect_attempts; logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts); } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) { ++result->server_reconnect_attempts; logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts); } auto it_bp = breakpoints.find(bp); if (it_bp != breakpoints.end()) { auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter); if (it_cnt != it_bp->second.end()) { logger().info("[{}] {} intercepted {}({}) => {}", result->index, *conn, bp, breakpoints_counter[bp].counter, it_cnt->second); actions.emplace_back(it_cnt->second); continue; } } logger().info("[{}] {} intercepted {}({})", result->index, *conn, bp, breakpoints_counter[bp].counter); actions.emplace_back(bp_action_t::CONTINUE); } bp_action_t action = bp_action_t::CONTINUE; for (bp_action_t &a : actions) { if (a != bp_action_t::CONTINUE) { if (action == bp_action_t::CONTINUE) { action = a; } else { ceph_abort("got multiple incompatible actions"); } } } return seastar::make_ready_future(action); }); } }; SocketPolicy to_socket_policy(policy_t policy) { switch (policy) { case policy_t::stateful_server: return SocketPolicy::stateful_server(0); case policy_t::stateless_server: return SocketPolicy::stateless_server(0); case policy_t::lossless_peer: return SocketPolicy::lossless_peer(0); case policy_t::lossless_peer_reuse: return SocketPolicy::lossless_peer_reuse(0); case policy_t::lossy_client: return SocketPolicy::lossy_client(0); case policy_t::lossless_client: return SocketPolicy::lossless_client(0); default: logger().error("unexpected policy type"); ceph_abort(); } } class FailoverSuite : public Dispatcher { crimson::auth::DummyAuthClientServer dummy_auth; MessengerRef test_msgr; const entity_addr_t test_peer_addr; TestInterceptor interceptor; unsigned tracked_index = 0; Connection *tracked_conn = nullptr; unsigned pending_send = 0; unsigned pending_peer_receive = 0; unsigned pending_receive = 0; ShardedGates &gates; const seastar::shard_id primary_sid; std::optional> ms_dispatch( ConnectionRef conn_ref, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); Connection *conn = &*conn_ref; gates.dispatch_in_background("TestSuite_ms_dispatch", [this, conn, conn_ref] { return seastar::smp::submit_to(primary_sid, [this, conn] { auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked ms dispatched connection: {}", *conn); ceph_abort(); } if (tracked_conn != &*conn) { logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); } else { ceph_assert(result->index == tracked_index); } ceph_assert(pending_receive > 0); --pending_receive; if (pending_receive == 0) { interceptor.notify(); } logger().info("[Test] got op, left {} ops -- [{}] {}", pending_receive, result->index, *conn); }).then([conn_ref] {}); }); return {seastar::now()}; } void ms_handle_accept( ConnectionRef conn_ref, seastar::shard_id prv_shard, bool is_replace) override { Connection *conn = &*conn_ref; gates.dispatch_in_background("TestSuite_ms_dispatch", [this, conn, conn_ref] { return seastar::smp::submit_to(primary_sid, [this, conn] { auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked accepted connection: {}", *conn); ceph_abort(); } if (tracked_conn && !tracked_conn->is_protocol_closed() && tracked_conn != &*conn) { logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); ceph_abort(); } tracked_index = result->index; tracked_conn = &*conn; ++result->cnt_accept_dispatched; logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}", result->cnt_accept_dispatched, result->index, *conn); return flush_pending_send(); }).then([conn_ref] {}); }); } void ms_handle_connect( ConnectionRef conn_ref, seastar::shard_id prv_shard) override { Connection *conn = &*conn_ref; gates.dispatch_in_background("TestSuite_ms_dispatch", [this, conn, conn_ref] { return seastar::smp::submit_to(primary_sid, [this, conn] { auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked connected connection: {}", *conn); ceph_abort(); } if (tracked_conn && !tracked_conn->is_protocol_closed() && tracked_conn != &*conn) { logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); ceph_abort(); } if (tracked_conn == &*conn) { ceph_assert(result->index == tracked_index); } ++result->cnt_connect_dispatched; logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}", result->cnt_connect_dispatched, result->index, *conn); }).then([conn_ref] {}); }); } void ms_handle_reset( ConnectionRef conn_ref, bool is_replace) override { Connection *conn = &*conn_ref; gates.dispatch_in_background("TestSuite_ms_dispatch", [this, conn, conn_ref] { return seastar::smp::submit_to(primary_sid, [this, conn] { auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked reset connection: {}", *conn); ceph_abort(); } if (tracked_conn != &*conn) { logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); } else { ceph_assert(result->index == tracked_index); tracked_index = 0; tracked_conn = nullptr; } ++result->cnt_reset_dispatched; logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}", result->cnt_reset_dispatched, result->index, *conn); }).then([conn_ref] {}); }); } void ms_handle_remote_reset( ConnectionRef conn_ref) override { Connection *conn = &*conn_ref; gates.dispatch_in_background("TestSuite_ms_dispatch", [this, conn, conn_ref] { return seastar::smp::submit_to(primary_sid, [this, conn] { auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked remotely reset connection: {}", *conn); ceph_abort(); } if (tracked_conn != &*conn) { logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); } else { ceph_assert(result->index == tracked_index); } ++result->cnt_remote_reset_dispatched; logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}", result->cnt_remote_reset_dispatched, result->index, *conn); }).then([conn_ref] {}); }); } private: seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) { test_msgr->set_default_policy(policy); test_msgr->set_auth_client(&dummy_auth); test_msgr->set_auth_server(&dummy_auth); test_msgr->set_interceptor(&interceptor); return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] { return test_msgr->start({this}); }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) { logger().error("FailoverSuite: " "there is another instance running at {}", test_addr); ceph_abort(); })); } seastar::future<> send_op(bool expect_reply=true) { ceph_assert(tracked_conn); ceph_assert(!tracked_conn->is_protocol_closed()); if (expect_reply) { ++pending_peer_receive; } pg_t pgid; object_locator_t oloc; hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), oloc.nspace); spg_t spgid(pgid); return tracked_conn->send(crimson::make_message(0, 0, hobj, spgid, 0, 0, 0)); } seastar::future<> flush_pending_send() { if (pending_send != 0) { logger().info("[Test] flush sending {} ops", pending_send); } ceph_assert(tracked_conn); ceph_assert(!tracked_conn->is_protocol_closed()); return seastar::do_until( [this] { return pending_send == 0; }, [this] { --pending_send; return send_op(); }); } seastar::future<> wait_ready(unsigned num_ready_conns, unsigned num_replaced, bool wait_received) { assert(seastar::this_shard_id() == primary_sid); unsigned pending_conns = 0; unsigned pending_establish = 0; unsigned replaced_conns = 0; for (auto& result : interceptor.results) { if (result.conn->is_protocol_closed_clean()) { if (result.state == conn_state_t::replaced) { ++replaced_conns; } } else if (result.conn->is_protocol_ready()) { if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) { result.state = conn_state_t::established; } else { ++pending_establish; } } else { ++pending_conns; } } bool do_wait = false; if (num_ready_conns > 0) { if (interceptor.results.size() > num_ready_conns) { throw std::runtime_error(fmt::format( "{} connections, more than expected: {}", interceptor.results.size(), num_ready_conns)); } else if (interceptor.results.size() < num_ready_conns || pending_conns > 0) { logger().info("[Test] wait_ready(): wait for connections," " currently {} out of {}, pending {} ready ...", interceptor.results.size(), num_ready_conns, pending_conns); do_wait = true; } } if (wait_received) { if (pending_send || pending_peer_receive || pending_receive) { if (pending_conns || pending_establish) { logger().info("[Test] wait_ready(): wait for pending_send={}," " pending_peer_receive={}, pending_receive={}," " pending {}/{} ready/establish connections ...", pending_send, pending_peer_receive, pending_receive, pending_conns, pending_establish); do_wait = true; } else { // If there are pending messages, stop waiting if there are // no longer pending connections. } } else { // Stop waiting if there are no pending messages. Pending connections // should not be important. } } if (num_replaced > 0) { if (replaced_conns > num_replaced) { throw std::runtime_error(fmt::format( "{} replaced connections, more than expected: {}", replaced_conns, num_replaced)); } if (replaced_conns < num_replaced) { logger().info("[Test] wait_ready(): wait for {} replaced connections," " currently {} ...", num_replaced, replaced_conns); do_wait = true; } } if (do_wait) { return interceptor.wait( ).then([this, num_ready_conns, num_replaced, wait_received] { return wait_ready(num_ready_conns, num_replaced, wait_received); }); } else { logger().info("[Test] wait_ready(): wait done!"); return seastar::now(); } } // called by FailoverTest public: FailoverSuite(MessengerRef test_msgr, entity_addr_t test_peer_addr, const TestInterceptor& interceptor, ShardedGates &gates) : test_msgr(test_msgr), test_peer_addr(test_peer_addr), interceptor(interceptor), gates{gates}, primary_sid{seastar::this_shard_id()} { } entity_addr_t get_addr() const { return test_msgr->get_myaddr(); } seastar::future<> shutdown() { test_msgr->stop(); return test_msgr->shutdown(); } void needs_receive() { ++pending_receive; } void notify_peer_reply() { ceph_assert(pending_peer_receive > 0); --pending_peer_receive; logger().info("[Test] TestPeer said got op, left {} ops", pending_peer_receive); if (pending_peer_receive == 0) { interceptor.notify(); } } void post_check() const { // make sure all breakpoints were hit for (auto& kv : interceptor.breakpoints) { auto it = interceptor.breakpoints_counter.find(kv.first); if (it == interceptor.breakpoints_counter.end()) { throw std::runtime_error(fmt::format("{} was missed", kv.first)); } auto expected = kv.second.rbegin()->first; if (expected > it->second.counter) { throw std::runtime_error(fmt::format( "{} only triggered {} times, not the expected {}", kv.first, it->second.counter, expected)); } } } void dump_results() const { for (auto& result : interceptor.results) { result.dump(); } } static seastar::future> create(entity_addr_t test_addr, SocketPolicy test_policy, entity_addr_t test_peer_addr, const TestInterceptor& interceptor, ShardedGates &gates) { auto suite = std::make_unique( Messenger::create( entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE, false), test_peer_addr, interceptor, gates); return suite->init(test_addr, test_policy ).then([suite = std::move(suite)] () mutable { return std::move(suite); }); } // called by tests public: seastar::future<> connect_peer() { logger().info("[Test] connect_peer({})", test_peer_addr); assert(seastar::this_shard_id() == primary_sid); auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD); auto result = interceptor.find_result(&*conn); ceph_assert(result != nullptr); if (tracked_conn) { if (tracked_conn->is_protocol_closed()) { logger().info("[Test] this is a new session" " replacing an closed one"); ceph_assert(tracked_conn != &*conn); } else { logger().info("[Test] this is not a new session"); ceph_assert(tracked_index == result->index); ceph_assert(tracked_conn == &*conn); } } else { logger().info("[Test] this is a new session"); } tracked_index = result->index; tracked_conn = &*conn; return flush_pending_send(); } seastar::future<> send_peer() { assert(seastar::this_shard_id() == primary_sid); if (tracked_conn) { logger().info("[Test] send_peer()"); ceph_assert(!tracked_conn->is_protocol_closed()); ceph_assert(!pending_send); return send_op(); } else { ++pending_send; logger().info("[Test] send_peer() (pending {})", pending_send); return seastar::now(); } } seastar::future<> keepalive_peer() { logger().info("[Test] keepalive_peer()"); assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); ceph_assert(!tracked_conn->is_protocol_closed()); return tracked_conn->send_keepalive(); } seastar::future<> try_send_peer() { logger().info("[Test] try_send_peer()"); assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); ceph_assert(!tracked_conn->is_protocol_closed()); return send_op(false); } seastar::future<> markdown() { logger().info("[Test] markdown() in 100ms ..."); assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); // sleep to propagate potential remaining acks return seastar::sleep(50ms ).then([this] { return seastar::smp::submit_to( tracked_conn->get_shard_id(), [tracked_conn=tracked_conn] { assert(tracked_conn->get_shard_id() == seastar::this_shard_id()); tracked_conn->mark_down(); }); }).then([] { // sleep to wait for markdown propagate to the primary sid return seastar::sleep(100ms); }); } seastar::future<> wait_blocked() { logger().info("[Test] wait_blocked() ..."); assert(seastar::this_shard_id() == primary_sid); return interceptor.blocker.wait_blocked(); } void unblock() { logger().info("[Test] unblock()"); assert(seastar::this_shard_id() == primary_sid); return interceptor.blocker.unblock(); } seastar::future<> wait_replaced(unsigned count) { logger().info("[Test] wait_replaced({}) ...", count); return wait_ready(0, count, false); } seastar::future<> wait_established() { logger().info("[Test] wait_established() ..."); return wait_ready(0, 0, true); } seastar::future> wait_results(unsigned count) { logger().info("[Test] wait_result({}) ...", count); return wait_ready(count, 0, true).then([this] { return std::reference_wrapper(interceptor.results); }); } bool is_standby() { assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); return tracked_conn->is_protocol_standby(); } }; class FailoverTest : public Dispatcher { crimson::auth::DummyAuthClientServer dummy_auth; MessengerRef cmd_msgr; ConnectionRef cmd_conn; const entity_addr_t test_addr; const entity_addr_t test_peer_addr; std::optional> recv_pong; std::optional> recv_cmdreply; std::unique_ptr test_suite; std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { switch (m->get_type()) { case CEPH_MSG_PING: ceph_assert(recv_pong); recv_pong->set_value(); recv_pong = std::nullopt; break; case MSG_COMMAND_REPLY: ceph_assert(recv_cmdreply); recv_cmdreply->set_value(); recv_cmdreply = std::nullopt; break; case MSG_COMMAND: { auto m_cmd = boost::static_pointer_cast(m); ceph_assert(static_cast(m_cmd->cmd[0][0]) == cmd_t::suite_recv_op); ceph_assert(test_suite); test_suite->notify_peer_reply(); break; } default: logger().error("{} got unexpected msg from cmd server: {}", *c, *m); ceph_abort(); } return {seastar::now()}; } private: seastar::future<> prepare_cmd( cmd_t cmd, std::function f_prepare = [] (auto& m) { return; }) { assert(!recv_cmdreply); recv_cmdreply = seastar::promise<>(); auto fut = recv_cmdreply->get_future(); auto m = crimson::make_message(); m->cmd.emplace_back(1, static_cast(cmd)); f_prepare(*m); return cmd_conn->send(std::move(m)).then([fut = std::move(fut)] () mutable { return std::move(fut); }); } seastar::future<> start_peer(policy_t peer_policy) { return prepare_cmd(cmd_t::suite_start, [peer_policy] (auto& m) { m.cmd.emplace_back(1, static_cast(peer_policy)); }); } seastar::future<> stop_peer() { return prepare_cmd(cmd_t::suite_stop); } seastar::future<> pingpong() { assert(!recv_pong); recv_pong = seastar::promise<>(); auto fut = recv_pong->get_future(); return cmd_conn->send(crimson::make_message() ).then([fut = std::move(fut)] () mutable { return std::move(fut); }); } seastar::future<> init(entity_addr_t cmd_peer_addr) { cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0)); cmd_msgr->set_auth_client(&dummy_auth); cmd_msgr->set_auth_server(&dummy_auth); return cmd_msgr->start({this}).then([this, cmd_peer_addr] { logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr); cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD); return pingpong(); }); } public: FailoverTest(MessengerRef cmd_msgr, entity_addr_t test_addr, entity_addr_t test_peer_addr) : cmd_msgr(cmd_msgr), test_addr(test_addr), test_peer_addr(test_peer_addr) { } seastar::future<> shutdown() { logger().info("CmdCli shutdown..."); assert(!recv_cmdreply); auto m = crimson::make_message(); m->cmd.emplace_back(1, static_cast(cmd_t::shutdown)); return cmd_conn->send(std::move(m)).then([] { return seastar::sleep(200ms); }).then([this] { cmd_msgr->stop(); return cmd_msgr->shutdown(); }); } static seastar::future> create(entity_addr_t test_addr, entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) { auto test = seastar::make_lw_shared( Messenger::create( entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE, true), test_addr, test_peer_addr); return test->init(cmd_peer_addr).then([test] { logger().info("CmdCli ready"); return test; }); } // called by tests public: seastar::future<> run_suite( std::string name, const TestInterceptor& interceptor, policy_t test_policy, policy_t peer_policy, std::function(FailoverSuite&)>&& f) { logger().info("\n\n[{}]", name); ceph_assert(!test_suite); SocketPolicy test_policy_ = to_socket_policy(test_policy); return ShardedGates::create( ).then([this, test_policy_, peer_policy, interceptor, f=std::move(f)](auto *gates) mutable { return FailoverSuite::create( test_addr, test_policy_, test_peer_addr, interceptor, *gates ).then([this, peer_policy, f = std::move(f)](auto suite) mutable { ceph_assert(suite->get_addr() == test_addr); test_suite.swap(suite); return start_peer(peer_policy ).then([this, f = std::move(f)] { return f(*test_suite); }).then([this] { test_suite->post_check(); logger().info("\n[SUCCESS]"); }).handle_exception([this](auto eptr) { logger().info("\n[FAIL: {}]", eptr); test_suite->dump_results(); throw; }).then([this] { return stop_peer(); }).then([this] { return test_suite->shutdown( ).then([this] { test_suite.reset(); }); }); }).then([gates] { return gates->close(); }); }); } seastar::future<> peer_connect_me() { logger().info("[Test] peer_connect_me({})", test_addr); return prepare_cmd(cmd_t::suite_connect_me, [this] (auto& m) { m.cmd.emplace_back(fmt::format("{}", test_addr)); }); } seastar::future<> peer_send_me() { logger().info("[Test] peer_send_me()"); ceph_assert(test_suite); test_suite->needs_receive(); return prepare_cmd(cmd_t::suite_send_me); } seastar::future<> try_peer_send_me() { logger().info("[Test] try_peer_send_me()"); ceph_assert(test_suite); return prepare_cmd(cmd_t::suite_send_me); } seastar::future<> send_bidirectional() { ceph_assert(test_suite); return test_suite->send_peer().then([this] { return peer_send_me(); }); } seastar::future<> peer_keepalive_me() { logger().info("[Test] peer_keepalive_me()"); ceph_assert(test_suite); return prepare_cmd(cmd_t::suite_keepalive_me); } seastar::future<> markdown_peer() { logger().info("[Test] markdown_peer() in 150ms ..."); // sleep to propagate potential remaining acks return seastar::sleep(50ms ).then([this] { return prepare_cmd(cmd_t::suite_markdown); }).then([] { // sleep awhile for peer markdown propagated return seastar::sleep(100ms); }); } }; class FailoverSuitePeer : public Dispatcher { using cb_t = std::function()>; crimson::auth::DummyAuthClientServer dummy_auth; MessengerRef peer_msgr; cb_t op_callback; ConnectionRef tracked_conn; unsigned pending_send = 0; std::optional> ms_dispatch(ConnectionRef conn, MessageRef m) override { logger().info("[TestPeer] got op from Test"); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); std::ignore = op_callback(); return {seastar::now()}; } void ms_handle_accept( ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) override { assert(prv_shard == seastar::this_shard_id()); logger().info("[TestPeer] got accept from Test"); if (tracked_conn && !tracked_conn->is_protocol_closed() && tracked_conn != conn) { logger().error("[TestPeer] {} got accepted, but there's already a valid traced_conn {}", *conn, *tracked_conn); } tracked_conn = conn; std::ignore = flush_pending_send(); } void ms_handle_reset(ConnectionRef conn, bool is_replace) override { logger().info("[TestPeer] got reset from Test"); } private: seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) { peer_msgr->set_default_policy(policy); peer_msgr->set_auth_client(&dummy_auth); peer_msgr->set_auth_server(&dummy_auth); return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] { return peer_msgr->start({this}); }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) { logger().error("FailoverSuitePeer: " "there is another instance running at {}", test_peer_addr); ceph_abort(); })); } seastar::future<> send_op() { ceph_assert(tracked_conn); if (tracked_conn->is_protocol_closed()) { logger().error("[TestPeer] send op but the connection is closed -- {}", *tracked_conn); } pg_t pgid; object_locator_t oloc; hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), oloc.nspace); spg_t spgid(pgid); return tracked_conn->send(crimson::make_message(0, 0, hobj, spgid, 0, 0, 0)); } seastar::future<> flush_pending_send() { if (pending_send != 0) { logger().info("[TestPeer] flush sending {} ops", pending_send); } ceph_assert(tracked_conn); return seastar::do_until( [this] { return pending_send == 0; }, [this] { --pending_send; return send_op(); }); } public: FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback) : peer_msgr(peer_msgr), op_callback(op_callback) { } seastar::future<> shutdown() { peer_msgr->stop(); return peer_msgr->shutdown(); } seastar::future<> connect_peer(entity_addr_t test_addr_decoded) { logger().info("[TestPeer] connect_peer({})", test_addr_decoded); auto conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD); if (tracked_conn) { if (tracked_conn->is_protocol_closed()) { logger().info("[TestPeer] this is a new session" " replacing an closed one"); ceph_assert(tracked_conn != conn); } else { logger().info("[TestPeer] this is not a new session"); ceph_assert(tracked_conn == conn); } } else { logger().info("[TestPeer] this is a new session"); } tracked_conn = conn; return flush_pending_send(); } seastar::future<> send_peer() { if (tracked_conn) { logger().info("[TestPeer] send_peer()"); ceph_assert(!pending_send); return send_op(); } else { ++pending_send; logger().info("[TestPeer] send_peer() (pending {})", pending_send); return seastar::now(); } } seastar::future<> keepalive_peer() { logger().info("[TestPeer] keepalive_peer()"); ceph_assert(tracked_conn); return tracked_conn->send_keepalive(); } seastar::future<> markdown() { logger().info("[TestPeer] markdown()"); ceph_assert(tracked_conn); tracked_conn->mark_down(); return seastar::now(); } static seastar::future> create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) { auto suite = std::make_unique( Messenger::create( entity_name_t::OSD(TEST_PEER_OSD), "TestPeer", TEST_PEER_NONCE, true), op_callback ); return suite->init(test_peer_addr, policy ).then([suite = std::move(suite)] () mutable { return std::move(suite); }); } }; class FailoverTestPeer : public Dispatcher { crimson::auth::DummyAuthClientServer dummy_auth; MessengerRef cmd_msgr; ConnectionRef cmd_conn; const entity_addr_t test_peer_addr; std::unique_ptr test_suite; std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { ceph_assert(cmd_conn == c); switch (m->get_type()) { case CEPH_MSG_PING: std::ignore = c->send(crimson::make_message()); break; case MSG_COMMAND: { auto m_cmd = boost::static_pointer_cast(m); auto cmd = static_cast(m_cmd->cmd[0][0]); if (cmd == cmd_t::shutdown) { logger().info("CmdSrv shutdown..."); // forwarded to FailoverTestPeer::wait() cmd_msgr->stop(); std::ignore = cmd_msgr->shutdown(); } else { std::ignore = handle_cmd(cmd, m_cmd).then([c] { return c->send(crimson::make_message()); }); } break; } default: logger().error("{} got unexpected msg from cmd client: {}", *c, *m); ceph_abort(); } return {seastar::now()}; } void ms_handle_accept( ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) override { assert(prv_shard == seastar::this_shard_id()); cmd_conn = conn; } private: seastar::future<> notify_recv_op() { ceph_assert(cmd_conn); auto m = crimson::make_message(); m->cmd.emplace_back(1, static_cast(cmd_t::suite_recv_op)); return cmd_conn->send(std::move(m)); } seastar::future<> handle_cmd(cmd_t cmd, MRef m_cmd) { switch (cmd) { case cmd_t::suite_start: { ceph_assert(!test_suite); auto policy = to_socket_policy(static_cast(m_cmd->cmd[1][0])); return FailoverSuitePeer::create( test_peer_addr, policy, [this] { return notify_recv_op(); } ).then([this] (auto suite) { test_suite.swap(suite); }); } case cmd_t::suite_stop: ceph_assert(test_suite); return test_suite->shutdown().then([this] { test_suite.reset(); }); case cmd_t::suite_connect_me: { ceph_assert(test_suite); entity_addr_t test_addr_decoded = entity_addr_t(); test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr); return test_suite->connect_peer(test_addr_decoded); } case cmd_t::suite_send_me: ceph_assert(test_suite); return test_suite->send_peer(); case cmd_t::suite_keepalive_me: ceph_assert(test_suite); return test_suite->keepalive_peer(); case cmd_t::suite_markdown: ceph_assert(test_suite); return test_suite->markdown(); default: logger().error("TestPeer got unexpected command {} from Test", fmt::ptr(m_cmd.get())); ceph_abort(); return seastar::now(); } } seastar::future<> init(entity_addr_t cmd_peer_addr) { cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0)); cmd_msgr->set_auth_client(&dummy_auth); cmd_msgr->set_auth_server(&dummy_auth); return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).safe_then([this] { return cmd_msgr->start({this}); }, Messenger::bind_ertr::all_same_way([cmd_peer_addr] (const std::error_code& e) { logger().error("FailoverTestPeer: " "there is another instance running at {}", cmd_peer_addr); ceph_abort(); })); } public: FailoverTestPeer(MessengerRef cmd_msgr, entity_addr_t test_peer_addr) : cmd_msgr(cmd_msgr), test_peer_addr(test_peer_addr) { } seastar::future<> wait() { return cmd_msgr->wait(); } static seastar::future> create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) { auto test_peer = std::make_unique( Messenger::create( entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE, true), test_peer_addr); return test_peer->init(cmd_peer_addr ).then([test_peer = std::move(test_peer)] () mutable { logger().info("CmdSrv ready"); return std::move(test_peer); }); } }; seastar::future<> test_v2_lossy_early_connect_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {custom_bp_t::SOCKET_CONNECTING}, {custom_bp_t::BANNER_WRITE}, {custom_bp_t::BANNER_READ}, {custom_bp_t::BANNER_PAYLOAD_READ}, {Tag::HELLO, bp_type_t::WRITE}, {Tag::HELLO, bp_type_t::READ}, {Tag::AUTH_REQUEST, bp_type_t::WRITE}, {Tag::AUTH_DONE, bp_type_t::READ}, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}, {Tag::AUTH_SIGNATURE, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossy_early_connect_fault -- {}", bp), interceptor, policy_t::lossy_client, policy_t::stateless_server, [] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossy_connect_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::CLIENT_IDENT, bp_type_t::WRITE}, {Tag::SERVER_IDENT, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossy_connect_fault -- {}", bp), interceptor, policy_t::lossy_client, policy_t::stateless_server, [] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 2, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossy_connected_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::MESSAGE, bp_type_t::WRITE}, {Tag::MESSAGE, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossy_connected_fault -- {}", bp), interceptor, policy_t::lossy_client, policy_t::stateless_server, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(1, 0); }); }); }); }); } seastar::future<> test_v2_lossy_early_accept_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {custom_bp_t::BANNER_WRITE}, {custom_bp_t::BANNER_READ}, {custom_bp_t::BANNER_PAYLOAD_READ}, {Tag::HELLO, bp_type_t::WRITE}, {Tag::HELLO, bp_type_t::READ}, {Tag::AUTH_REQUEST, bp_type_t::READ}, {Tag::AUTH_DONE, bp_type_t::WRITE}, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}, {Tag::AUTH_SIGNATURE, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossy_early_accept_fault -- {}", bp), interceptor, policy_t::stateless_server, policy_t::lossy_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossy_accept_fault(FailoverTest& test) { auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossy_accept_fault -- {}", bp), interceptor, policy_t::stateless_server, policy_t::lossy_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_lossy_establishing_fault(FailoverTest& test) { auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossy_establishing_fault -- {}", bp), interceptor, policy_t::stateless_server, policy_t::lossy_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(1, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_lossy_accepted_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::MESSAGE, bp_type_t::WRITE}, {Tag::MESSAGE, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossy_accepted_fault -- {}", bp), interceptor, policy_t::stateless_server, policy_t::lossy_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(1, 0); }); }); }); }); } seastar::future<> test_v2_lossless_connect_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::CLIENT_IDENT, bp_type_t::WRITE}, {Tag::SERVER_IDENT, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossless_connect_fault -- {}", bp), interceptor, policy_t::lossless_client, policy_t::stateful_server, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 2, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossless_connected_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::MESSAGE, bp_type_t::WRITE}, {Tag::MESSAGE, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossless_connected_fault -- {}", bp), interceptor, policy_t::lossless_client, policy_t::stateful_server, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 1, 1, 2); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossless_connected_fault2(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::ACK, bp_type_t::READ}, {Tag::ACK, bp_type_t::WRITE}, {Tag::KEEPALIVE2, bp_type_t::READ}, {Tag::KEEPALIVE2, bp_type_t::WRITE}, {Tag::KEEPALIVE2_ACK, bp_type_t::READ}, {Tag::KEEPALIVE2_ACK, bp_type_t::WRITE}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossless_connected_fault2 -- {}", bp), interceptor, policy_t::lossless_client, policy_t::stateful_server, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_established(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.keepalive_peer(); }).then([&suite] { return suite.wait_established(); }).then([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_keepalive_me(); }).then([&suite] { return suite.wait_established(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_established(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_established(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 1, 1, 2); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossless_reconnect_fault(FailoverTest& test) { return seastar::do_with(std::vector>{ {{Tag::MESSAGE, bp_type_t::WRITE}, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}}, {{Tag::MESSAGE, bp_type_t::WRITE}, {Tag::SESSION_RECONNECT_OK, bp_type_t::READ}}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) { TestInterceptor interceptor; interceptor.make_fault(bp_pair.first); interceptor.make_fault(bp_pair.second); return test.run_suite( fmt::format("test_v2_lossless_reconnect_fault -- {}, {}", bp_pair.first, bp_pair.second), interceptor, policy_t::lossless_client, policy_t::stateful_server, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(3, 1, 2, 2); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossless_accept_fault(FailoverTest& test) { auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossless_accept_fault -- {}", bp), interceptor, policy_t::stateful_server, policy_t::lossless_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_lossless_establishing_fault(FailoverTest& test) { auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossless_establishing_fault -- {}", bp), interceptor, policy_t::stateful_server, policy_t::lossless_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_lossless_accepted_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::MESSAGE, bp_type_t::WRITE}, {Tag::MESSAGE, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_lossless_accepted_fault -- {}", bp), interceptor, policy_t::stateful_server, policy_t::lossless_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0); results[1].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_lossless_reaccept_fault(FailoverTest& test) { return seastar::do_with(std::vector>{ {{Tag::MESSAGE, bp_type_t::READ}, {Tag::SESSION_RECONNECT, bp_type_t::READ}}, {{Tag::MESSAGE, bp_type_t::READ}, {Tag::SESSION_RECONNECT_OK, bp_type_t::WRITE}}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) { TestInterceptor interceptor; interceptor.make_fault(bp_pair.first); interceptor.make_fault(bp_pair.second); return test.run_suite( fmt::format("test_v2_lossless_reaccept_fault -- {}, {}", bp_pair.first, bp_pair.second), interceptor, policy_t::stateful_server, policy_t::lossless_client, [&test, bp = bp_pair.second] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.send_bidirectional(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(3); }).then([bp] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); if (bp == Breakpoint{Tag::SESSION_RECONNECT, bp_type_t::READ}) { results[0].assert_accept(1, 1, 0, 2); } else { results[0].assert_accept(1, 1, 0, 3); } results[0].assert_reset(0, 0); if (bp == Breakpoint{Tag::SESSION_RECONNECT, bp_type_t::READ}) { results[1].assert_state_at(conn_state_t::closed); } else { results[1].assert_state_at(conn_state_t::replaced); } results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0, 1, 0); results[1].assert_reset(0, 0); results[2].assert_state_at(conn_state_t::replaced); results[2].assert_connect(0, 0, 0, 0); results[2].assert_accept(1, 0, 1, 0); results[2].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_peer_connect_fault(FailoverTest& test) { return seastar::do_with(std::vector{ {Tag::CLIENT_IDENT, bp_type_t::WRITE}, {Tag::SERVER_IDENT, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_peer_connect_fault -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 2, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_peer_accept_fault(FailoverTest& test) { auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_peer_accept_fault -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_peer_establishing_fault(FailoverTest& test) { auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_peer_establishing_fault -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_peer_connected_fault_reconnect(FailoverTest& test) { auto bp = Breakpoint{Tag::MESSAGE, bp_type_t::WRITE}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_peer_connected_fault_reconnect -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 1, 1, 2); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }); }); } seastar::future<> test_v2_peer_connected_fault_reaccept(FailoverTest& test) { auto bp = Breakpoint{Tag::MESSAGE, bp_type_t::READ}; TestInterceptor interceptor; interceptor.make_fault(bp); return test.run_suite( fmt::format("test_v2_peer_connected_fault_reaccept -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 1); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0, 1, 0); results[1].assert_reset(0, 0); }); }); } seastar::future check_peer_wins(FailoverTest& test) { return seastar::do_with(bool(), [&test] (auto& ret) { return test.run_suite("check_peer_wins", TestInterceptor(), policy_t::lossy_client, policy_t::stateless_server, [&ret] (FailoverSuite& suite) { return suite.connect_peer().then([&suite] { return suite.wait_results(1); }).then([&ret] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); ret = results[0].conn->peer_wins(); logger().info("check_peer_wins: {}", ret); }); }).then([&ret] { return ret; }); }); } seastar::future<> test_v2_racing_reconnect_acceptor_lose(FailoverTest& test) { return seastar::do_with(std::vector>{ {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}}, {2, {custom_bp_t::BANNER_WRITE}}, {2, {custom_bp_t::BANNER_READ}}, {2, {custom_bp_t::BANNER_PAYLOAD_READ}}, {2, {Tag::HELLO, bp_type_t::WRITE}}, {2, {Tag::HELLO, bp_type_t::READ}}, {2, {Tag::AUTH_REQUEST, bp_type_t::READ}}, {2, {Tag::AUTH_DONE, bp_type_t::WRITE}}, {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}}, {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; // fault acceptor interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ}); // block acceptor interceptor.make_block(bp.second, bp.first); return test.run_suite( fmt::format("test_v2_racing_reconnect_acceptor_lose -- {}({})", bp.second, bp.first), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_blocked(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_established(); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 0, 1, 1); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::closed); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0); results[1].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_racing_reconnect_acceptor_win(FailoverTest& test) { return seastar::do_with(std::vector>{ {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}}, {2, {custom_bp_t::SOCKET_CONNECTING}}, {2, {custom_bp_t::BANNER_WRITE}}, {2, {custom_bp_t::BANNER_READ}}, {2, {custom_bp_t::BANNER_PAYLOAD_READ}}, {2, {Tag::HELLO, bp_type_t::WRITE}}, {2, {Tag::HELLO, bp_type_t::READ}}, {2, {Tag::AUTH_REQUEST, bp_type_t::WRITE}}, {2, {Tag::AUTH_DONE, bp_type_t::READ}}, {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}}, {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; // fault connector interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE}); // block connector interceptor.make_block(bp.second, bp.first); return test.run_suite( fmt::format("test_v2_racing_reconnect_acceptor_win -- {}({})", bp.second, bp.first), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_blocked(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_replaced(1); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 1); results[0].assert_accept(0, 0, 0, 1); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0, 1, 0); results[1].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_racing_connect_acceptor_lose(FailoverTest& test) { return seastar::do_with(std::vector{ {custom_bp_t::BANNER_WRITE}, {custom_bp_t::BANNER_READ}, {custom_bp_t::BANNER_PAYLOAD_READ}, {Tag::HELLO, bp_type_t::WRITE}, {Tag::HELLO, bp_type_t::READ}, {Tag::AUTH_REQUEST, bp_type_t::READ}, {Tag::AUTH_DONE, bp_type_t::WRITE}, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}, {Tag::AUTH_SIGNATURE, bp_type_t::READ}, {Tag::CLIENT_IDENT, bp_type_t::READ}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; // block acceptor interceptor.make_block(bp); return test.run_suite( fmt::format("test_v2_racing_connect_acceptor_lose -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_blocked(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_established(); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(1, 1, 0, 1); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_racing_connect_acceptor_win(FailoverTest& test) { return seastar::do_with(std::vector{ {custom_bp_t::SOCKET_CONNECTING}, {custom_bp_t::BANNER_WRITE}, {custom_bp_t::BANNER_READ}, {custom_bp_t::BANNER_PAYLOAD_READ}, {Tag::HELLO, bp_type_t::WRITE}, {Tag::HELLO, bp_type_t::READ}, {Tag::AUTH_REQUEST, bp_type_t::WRITE}, {Tag::AUTH_DONE, bp_type_t::READ}, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}, {Tag::AUTH_SIGNATURE, bp_type_t::READ}, {Tag::CLIENT_IDENT, bp_type_t::WRITE}, }, [&test] (auto& failure_cases) { return seastar::do_for_each(failure_cases, [&test] (auto bp) { TestInterceptor interceptor; // block connector interceptor.make_block(bp); return test.run_suite( fmt::format("test_v2_racing_connect_acceptor_win -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_blocked(); }).then([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_replaced(1); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 0); results[0].assert_accept(0, 0, 0, 1); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }); }); }); }); } seastar::future<> test_v2_racing_connect_reconnect_lose(FailoverTest& test) { TestInterceptor interceptor; interceptor.make_fault({Tag::SERVER_IDENT, bp_type_t::READ}); interceptor.make_block({Tag::CLIENT_IDENT, bp_type_t::WRITE}, 2); return test.run_suite("test_v2_racing_connect_reconnect_lose", interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_blocked(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_replaced(1); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 2, 0, 0); results[0].assert_accept(0, 0, 0, 1); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 1, 0); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_racing_connect_reconnect_win(FailoverTest& test) { TestInterceptor interceptor; interceptor.make_fault({Tag::SERVER_IDENT, bp_type_t::READ}); interceptor.make_block({Tag::SESSION_RECONNECT, bp_type_t::READ}); return test.run_suite("test_v2_racing_connect_reconnect_win", interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_blocked(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_established(); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 2, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::closed); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0, 1, 0); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_stale_connect(FailoverTest& test) { auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::READ}; TestInterceptor interceptor; interceptor.make_stall(bp); return test.run_suite( fmt::format("test_v2_stale_connect -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_blocked(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_replaced(1); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 1, 0, 0); results[0].assert_accept(0, 0, 0, 1); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 1, 0); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_stale_reconnect(FailoverTest& test) { auto bp = Breakpoint{Tag::SESSION_RECONNECT_OK, bp_type_t::READ}; TestInterceptor interceptor; interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE}); interceptor.make_stall(bp); return test.run_suite( fmt::format("test_v2_stale_reconnect -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { return suite.send_peer(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.wait_blocked(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_replaced(1); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(2, 1, 1, 1); results[0].assert_accept(0, 0, 0, 1); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0, 1, 0); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_stale_accept(FailoverTest& test) { auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ}; TestInterceptor interceptor; interceptor.make_stall(bp); return test.run_suite( fmt::format("test_v2_stale_accept -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_blocked(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_established(); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_stale_establishing(FailoverTest& test) { auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE}; TestInterceptor interceptor; interceptor.make_stall(bp); return test.run_suite( fmt::format("test_v2_stale_establishing -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_blocked(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_replaced(1); }).then([&suite] { suite.unblock(); return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0); results[1].assert_reset(0, 0); }); }); } seastar::future<> test_v2_stale_reaccept(FailoverTest& test) { auto bp = Breakpoint{Tag::SESSION_RECONNECT_OK, bp_type_t::WRITE}; TestInterceptor interceptor; interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ}); interceptor.make_stall(bp); return test.run_suite( fmt::format("test_v2_stale_reaccept -- {}", bp), interceptor, policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { return test.peer_send_me(); }).then([&test] { return test.peer_connect_me(); }).then([&suite] { return suite.wait_blocked(); }).then([] { logger().info("[Test] block the broken REPLACING for 210ms..."); return seastar::sleep(210ms); }).then([&suite] { suite.unblock(); return suite.wait_results(3); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 3); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 0, 1, 0); results[1].assert_reset(0, 0); results[2].assert_state_at(conn_state_t::replaced); results[2].assert_connect(0, 0, 0, 0); results[2].assert_accept(1, 0); results[2].assert_reset(0, 0); ceph_assert(results[2].server_reconnect_attempts >= 1); }); }); } seastar::future<> test_v2_lossy_client(FailoverTest& test) { return test.run_suite( "test_v2_lossy_client", TestInterceptor(), policy_t::lossy_client, policy_t::stateless_server, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return suite.connect_peer(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }).then([&suite] { logger().info("-- 1 --"); logger().info("[Test] client markdown..."); return suite.markdown(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(1, 1, 0, 1); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 0); }).then([&test] { logger().info("-- 2 --"); logger().info("[Test] server markdown..."); return test.markdown_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::closed); results[1].assert_connect(1, 1, 0, 1); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(1, 0); }).then([&suite] { logger().info("-- 3 --"); logger().info("[Test] client reconnect..."); return suite.connect_peer(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_results(3); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::closed); results[1].assert_connect(1, 1, 0, 1); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(1, 0); results[2].assert_state_at(conn_state_t::established); results[2].assert_connect(1, 1, 0, 1); results[2].assert_accept(0, 0, 0, 0); results[2].assert_reset(0, 0); }); }); } seastar::future<> test_v2_stateless_server(FailoverTest& test) { return test.run_suite( "test_v2_stateless_server", TestInterceptor(), policy_t::stateless_server, policy_t::lossy_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return test.peer_connect_me(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(0, 0); }).then([&test] { logger().info("-- 1 --"); logger().info("[Test] client markdown..."); return test.markdown_peer(); }).then([&test] { return test.peer_connect_me(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(1, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }).then([&suite] { logger().info("-- 2 --"); logger().info("[Test] server markdown..."); return suite.markdown(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(1, 0); results[1].assert_state_at(conn_state_t::closed); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); }).then([&test] { logger().info("-- 3 --"); logger().info("[Test] client reconnect..."); return test.peer_connect_me(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_results(3); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(1, 0); results[1].assert_state_at(conn_state_t::closed); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 1); results[1].assert_reset(0, 0); results[2].assert_state_at(conn_state_t::established); results[2].assert_connect(0, 0, 0, 0); results[2].assert_accept(1, 1, 0, 1); results[2].assert_reset(0, 0); }); }); } seastar::future<> test_v2_lossless_client(FailoverTest& test) { return test.run_suite( "test_v2_lossless_client", TestInterceptor(), policy_t::lossless_client, policy_t::stateful_server, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return suite.connect_peer(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }).then([&suite] { logger().info("-- 1 --"); logger().info("[Test] client markdown..."); return suite.markdown(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(1, 1, 0, 1); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 0); }).then([&test] { logger().info("-- 2 --"); logger().info("[Test] server markdown..."); return test.markdown_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(2, 2, 1, 2); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 1); }).then([&suite] { logger().info("-- 3 --"); logger().info("[Test] client reconnect..."); return suite.connect_peer(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(2, 2, 1, 2); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 1); }); }); } seastar::future<> test_v2_stateful_server(FailoverTest& test) { return test.run_suite( "test_v2_stateful_server", TestInterceptor(), policy_t::stateful_server, policy_t::lossless_client, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return test.peer_connect_me(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(0, 0); }).then([&test] { logger().info("-- 1 --"); logger().info("[Test] client markdown..."); return test.markdown_peer(); }).then([&test] { return test.peer_connect_me(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 1); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }).then([&suite] { logger().info("-- 2 --"); logger().info("[Test] server markdown..."); return suite.markdown(); }).then([&suite] { return suite.wait_results(3); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 1); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); results[2].assert_state_at(conn_state_t::established); results[2].assert_connect(0, 0, 0, 0); results[2].assert_accept(1, 1, 1, 1); results[2].assert_reset(0, 0); }).then([&test] { logger().info("-- 3 --"); logger().info("[Test] client reconnect..."); return test.peer_connect_me(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_results(3); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 1); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); results[2].assert_state_at(conn_state_t::established); results[2].assert_connect(0, 0, 0, 0); results[2].assert_accept(1, 1, 1, 1); results[2].assert_reset(0, 0); }); }); } seastar::future<> test_v2_peer_reuse_connector(FailoverTest& test) { return test.run_suite( "test_v2_peer_reuse_connector", TestInterceptor(), policy_t::lossless_peer_reuse, policy_t::lossless_peer_reuse, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return suite.connect_peer(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }).then([&suite] { logger().info("-- 1 --"); logger().info("[Test] connector markdown..."); return suite.markdown(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(1, 1, 0, 1); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 0); }).then([&test] { logger().info("-- 2 --"); logger().info("[Test] acceptor markdown..."); return test.markdown_peer(); }).then([&suite] { ceph_assert(suite.is_standby()); logger().info("-- 3 --"); logger().info("[Test] connector reconnect..."); return suite.connect_peer(); }).then([&suite] { return suite.try_send_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(2, 2, 1, 2); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 1); }); }); } seastar::future<> test_v2_peer_reuse_acceptor(FailoverTest& test) { return test.run_suite( "test_v2_peer_reuse_acceptor", TestInterceptor(), policy_t::lossless_peer_reuse, policy_t::lossless_peer_reuse, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return test.peer_connect_me(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(0, 0); }).then([&test] { logger().info("-- 1 --"); logger().info("[Test] connector markdown..."); return test.markdown_peer(); }).then([&test] { return test.peer_connect_me(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 1); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }).then([&suite] { logger().info("-- 2 --"); logger().info("[Test] acceptor markdown..."); return suite.markdown(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 1); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }).then([&test] { logger().info("-- 3 --"); logger().info("[Test] connector reconnect..."); return test.peer_connect_me(); }).then([&test] { return test.try_peer_send_me(); }).then([&suite] { return suite.wait_results(3); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 1); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); results[2].assert_state_at(conn_state_t::established); results[2].assert_connect(0, 0, 0, 0); results[2].assert_accept(1, 1, 1, 1); results[2].assert_reset(0, 0); }); }); } seastar::future<> test_v2_lossless_peer_connector(FailoverTest& test) { return test.run_suite( "test_v2_lossless_peer_connector", TestInterceptor(), policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&suite] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return suite.connect_peer(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); }).then([&suite] { logger().info("-- 1 --"); logger().info("[Test] connector markdown..."); return suite.markdown(); }).then([&suite] { return suite.connect_peer(); }).then([&suite] { return suite.send_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(1, 1, 0, 1); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 0); }).then([&test] { logger().info("-- 2 --"); logger().info("[Test] acceptor markdown..."); return test.markdown_peer(); }).then([&suite] { ceph_assert(suite.is_standby()); logger().info("-- 3 --"); logger().info("[Test] connector reconnect..."); return suite.connect_peer(); }).then([&suite] { return suite.try_send_peer(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(1, 1, 0, 1); results[0].assert_accept(0, 0, 0, 0); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::established); results[1].assert_connect(2, 2, 1, 2); results[1].assert_accept(0, 0, 0, 0); results[1].assert_reset(0, 1); }); }); } seastar::future<> test_v2_lossless_peer_acceptor(FailoverTest& test) { return test.run_suite( "test_v2_lossless_peer_acceptor", TestInterceptor(), policy_t::lossless_peer, policy_t::lossless_peer, [&test] (FailoverSuite& suite) { return seastar::futurize_invoke([&test] { logger().info("-- 0 --"); logger().info("[Test] setup connection..."); return test.peer_connect_me(); }).then([&test] { return test.send_bidirectional(); }).then([&suite] { return suite.wait_results(1); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 1); results[0].assert_reset(0, 0); }).then([&test] { logger().info("-- 1 --"); logger().info("[Test] connector markdown..."); return test.markdown_peer(); }).then([&test] { return test.peer_connect_me(); }).then([&test] { return test.peer_send_me(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::established); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }).then([&suite] { logger().info("-- 2 --"); logger().info("[Test] acceptor markdown..."); return suite.markdown(); }).then([&suite] { return suite.wait_results(2); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); }).then([&test] { logger().info("-- 3 --"); logger().info("[Test] connector reconnect..."); return test.peer_connect_me(); }).then([&test] { return test.try_peer_send_me(); }).then([&suite] { return suite.wait_results(3); }).then([] (ConnResults& results) { results[0].assert_state_at(conn_state_t::closed); results[0].assert_connect(0, 0, 0, 0); results[0].assert_accept(1, 1, 0, 2); results[0].assert_reset(0, 0); results[1].assert_state_at(conn_state_t::replaced); results[1].assert_connect(0, 0, 0, 0); results[1].assert_accept(1, 1, 0, 0); results[1].assert_reset(0, 0); results[2].assert_state_at(conn_state_t::established); results[2].assert_connect(0, 0, 0, 0); results[2].assert_accept(1, 1, 1, 1); results[2].assert_reset(0, 0); }); }); } seastar::future<> test_v2_protocol(entity_addr_t test_addr, entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr, bool test_peer_islocal, bool peer_wins) { ceph_assert_always(test_addr.is_msgr2()); ceph_assert_always(cmd_peer_addr.is_msgr2()); ceph_assert_always(test_peer_addr.is_msgr2()); if (test_peer_islocal) { // initiate crimson test peer locally logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr); return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr ).then([test_addr, cmd_peer_addr, test_peer_addr, peer_wins](auto peer) { return test_v2_protocol( test_addr, cmd_peer_addr, test_peer_addr, false, peer_wins ).then([peer = std::move(peer)] () mutable { return peer->wait().then([peer = std::move(peer)] {}); }); }).handle_exception([] (auto eptr) { logger().error("FailoverTestPeer failed: got exception {}", eptr); throw; }); } return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr ).then([peer_wins](auto test) { return seastar::futurize_invoke([test] { return test_v2_lossy_early_connect_fault(*test); }).then([test] { return test_v2_lossy_connect_fault(*test); }).then([test] { return test_v2_lossy_connected_fault(*test); }).then([test] { return test_v2_lossy_early_accept_fault(*test); }).then([test] { return test_v2_lossy_accept_fault(*test); }).then([test] { return test_v2_lossy_establishing_fault(*test); }).then([test] { return test_v2_lossy_accepted_fault(*test); }).then([test] { return test_v2_lossless_connect_fault(*test); }).then([test] { return test_v2_lossless_connected_fault(*test); }).then([test] { return test_v2_lossless_connected_fault2(*test); }).then([test] { return test_v2_lossless_reconnect_fault(*test); }).then([test] { return test_v2_lossless_accept_fault(*test); }).then([test] { return test_v2_lossless_establishing_fault(*test); }).then([test] { return test_v2_lossless_accepted_fault(*test); }).then([test] { return test_v2_lossless_reaccept_fault(*test); }).then([test] { return test_v2_peer_connect_fault(*test); }).then([test] { return test_v2_peer_accept_fault(*test); }).then([test] { return test_v2_peer_establishing_fault(*test); }).then([test] { return test_v2_peer_connected_fault_reconnect(*test); }).then([test] { return test_v2_peer_connected_fault_reaccept(*test); }).then([test] { return check_peer_wins(*test); }).then([test, peer_wins](bool ret_peer_wins) { ceph_assert(peer_wins == ret_peer_wins); if (ret_peer_wins) { return seastar::futurize_invoke([test] { return test_v2_racing_connect_acceptor_win(*test); }).then([test] { return test_v2_racing_reconnect_acceptor_win(*test); }); } else { return seastar::futurize_invoke([test] { return test_v2_racing_connect_acceptor_lose(*test); }).then([test] { return test_v2_racing_reconnect_acceptor_lose(*test); }); } }).then([test] { return test_v2_racing_connect_reconnect_win(*test); }).then([test] { return test_v2_racing_connect_reconnect_lose(*test); }).then([test] { return test_v2_stale_connect(*test); }).then([test] { return test_v2_stale_reconnect(*test); }).then([test] { return test_v2_stale_accept(*test); }).then([test] { return test_v2_stale_establishing(*test); }).then([test] { return test_v2_stale_reaccept(*test); }).then([test] { return test_v2_lossy_client(*test); }).then([test] { return test_v2_stateless_server(*test); }).then([test] { return test_v2_lossless_client(*test); }).then([test] { return test_v2_stateful_server(*test); }).then([test] { return test_v2_peer_reuse_connector(*test); }).then([test] { return test_v2_peer_reuse_acceptor(*test); }).then([test] { return test_v2_lossless_peer_connector(*test); }).then([test] { return test_v2_lossless_peer_acceptor(*test); }).then([test] { return test->shutdown().then([test] {}); }); }).handle_exception([] (auto eptr) { logger().error("FailoverTest failed: got exception {}", eptr); throw; }); } } seastar::future do_test(seastar::app_template& app) { std::vector args; std::string cluster; std::string conf_file_list; auto init_params = ceph_argparse_early_args(args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list); return crimson::common::sharded_conf().start( init_params.name, cluster ).then([] { return local_conf().start(); }).then([conf_file_list] { return local_conf().parse_config_files(conf_file_list); }).then([&app] { auto&& config = app.configuration(); verbose = config["verbose"].as(); auto rounds = config["rounds"].as(); auto keepalive_ratio = config["keepalive-ratio"].as(); auto testpeer_islocal = config["testpeer-islocal"].as(); entity_addr_t test_addr; ceph_assert(test_addr.parse( config["test-addr"].as().c_str(), nullptr)); test_addr.set_nonce(TEST_NONCE); entity_addr_t cmd_peer_addr; ceph_assert(cmd_peer_addr.parse( config["testpeer-addr"].as().c_str(), nullptr)); cmd_peer_addr.set_nonce(CMD_SRV_NONCE); entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr); bool peer_wins = (test_addr > test_peer_addr); logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, " "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, " "testpeer_islocal={}, peer_wins={}, smp={}", verbose, rounds, keepalive_ratio, test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins, seastar::smp::count); return test_echo(rounds, keepalive_ratio ).then([] { return test_preemptive_shutdown(); }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] { return test_v2_protocol( test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins); }).then([] { logger().info("All tests succeeded"); // Seastar has bugs to have events undispatched during shutdown, // which will result in memory leak and thus fail LeakSanitizer. return seastar::sleep(100ms); }); }).then([] { return crimson::common::sharded_conf().stop(); }).then([] { return 0; }).handle_exception([] (auto eptr) { logger().error("Test failed: got exception {}", eptr); return 1; }); } int main(int argc, char** argv) { seastar::app_template app; app.add_options() ("verbose,v", bpo::value()->default_value(false), "chatty if true") ("rounds", bpo::value()->default_value(512), "number of pingpong rounds") ("keepalive-ratio", bpo::value()->default_value(0.1), "ratio of keepalive in ping messages") ("test-addr", bpo::value()->default_value("v2:127.0.0.1:9014"), "address of v2 failover tests") ("testpeer-addr", bpo::value()->default_value("v2:127.0.0.1:9012"), "addresses of v2 failover testpeer" " (This is CmdSrv address, and TestPeer address is at port+=1)") ("testpeer-islocal", bpo::value()->default_value(true), "create a local crimson testpeer, or connect to a remote testpeer"); return app.run(argc, argv, [&app] { // This test normally succeeds within 60 seconds, so kill it after 300 // seconds in case it is blocked forever due to unaddressed bugs. return seastar::with_timeout(seastar::lowres_clock::now() + 300s, do_test(app)) .handle_exception_type([](seastar::timed_out_error&) { logger().error("test_messenger timeout after 300s, abort! " "Consider to extend the period if the test is still running."); // use the retcode of timeout(1) return 124; }); }); }