summaryrefslogtreecommitdiffstats
path: root/src/test/crimson/test_messenger.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/test/crimson/test_messenger.cc3874
1 files changed, 3874 insertions, 0 deletions
diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc
new file mode 100644
index 000000000..a42572246
--- /dev/null
+++ b/src/test/crimson/test_messenger.cc
@@ -0,0 +1,3874 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_time.h"
+#include "messages/MPing.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "crimson/auth/DummyAuth.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/net/Interceptor.h"
+
+#include <map>
+#include <random>
+#include <boost/program_options.hpp>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/with_timeout.hh>
+
+#include "test_messenger.h"
+
+using namespace std::chrono_literals;
+namespace bpo = boost::program_options;
+using crimson::common::local_conf;
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_test);
+}
+
+static std::random_device rd;
+static std::default_random_engine rng{rd()};
+static bool verbose = false;
+
+static entity_addr_t get_server_addr() {
+ static int port = 9030;
+ ++port;
+ entity_addr_t saddr;
+ saddr.parse("127.0.0.1", nullptr);
+ saddr.set_port(port);
+ return saddr;
+}
+
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+ // we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [=] {
+ auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+ return sharded_obj->start(args...
+ ).then([sharded_obj] {
+ seastar::engine().at_exit([sharded_obj] {
+ return sharded_obj->stop().then([sharded_obj] {});
+ });
+ return sharded_obj.get();
+ });
+ }).then([](seastar::sharded<T> *ptr_shard) {
+ return &ptr_shard->local();
+ });
+}
+
+class ShardedGates
+ : public seastar::peering_sharded_service<ShardedGates> {
+public:
+ ShardedGates() = default;
+ ~ShardedGates() {
+ assert(gate.is_closed());
+ }
+
+ template <typename Func>
+ void dispatch_in_background(const char *what, Func &&f) {
+ std::ignore = seastar::with_gate(
+ container().local().gate, std::forward<Func>(f)
+ ).handle_exception([what](std::exception_ptr eptr) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ logger().error("ShardedGates::dispatch_in_background: "
+ "{} got exxception {}", what, e.what());
+ }
+ });
+ }
+
+ seastar::future<> close() {
+ return container().invoke_on_all([](auto &local) {
+ return local.gate.close();
+ });
+ }
+
+ static seastar::future<ShardedGates*> create() {
+ return create_sharded<ShardedGates>();
+ }
+
+ // seastar::future<> stop() is intentially not implemented
+
+private:
+ seastar::gate gate;
+};
+
+static seastar::future<> test_echo(unsigned rounds,
+ double keepalive_ratio)
+{
+ struct test_state {
+ struct Server final
+ : public crimson::net::Dispatcher {
+ ShardedGates &gates;
+ crimson::net::MessengerRef msgr;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+
+ Server(ShardedGates &gates) : gates{gates} {}
+
+ void ms_handle_accept(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ logger().info("server accepted {}", *conn);
+ ceph_assert(prv_shard == seastar::this_shard_id());
+ ceph_assert(!is_replace);
+ }
+
+ std::optional<seastar::future<>> ms_dispatch(
+ crimson::net::ConnectionRef c, MessageRef m) override {
+ if (verbose) {
+ logger().info("server got {}", *m);
+ }
+ // reply with a pong
+ gates.dispatch_in_background("echo_send_pong", [c] {
+ return c->send(crimson::make_message<MPing>());
+ });
+ return {seastar::now()};
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, false);
+ msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+ return msgr->start({this});
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [addr] (const std::error_code& e) {
+ logger().error("test_echo(): "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
+ }
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ msgr->stop();
+ return msgr->shutdown();
+ }
+ };
+
+ class Client final
+ : public crimson::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+ public:
+ Client(seastar::shard_id primary_sid,
+ unsigned rounds,
+ double keepalive_ratio,
+ ShardedGates *gates)
+ : primary_sid{primary_sid},
+ keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
+ rounds(rounds),
+ gates{*gates} {}
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ assert(seastar::this_shard_id() == primary_sid);
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, false);
+ msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->start({this});
+ }
+
+ seastar::future<> shutdown() {
+ assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(msgr);
+ msgr->stop();
+ return msgr->shutdown();
+ }
+
+ seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
+ assert(seastar::this_shard_id() == primary_sid);
+ mono_time start_time = mono_clock::now();
+ auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ return seastar::futurize_invoke([this, conn] {
+ return do_dispatch_pingpong(conn);
+ }).then([] {
+ // 500ms should be enough to establish the connection
+ return seastar::sleep(500ms);
+ }).then([this, conn, start_time] {
+ return container().invoke_on(
+ conn->get_shard_id(),
+ [pconn=&*conn, start_time](auto &local) {
+ assert(pconn->is_connected());
+ auto session = local.find_session(pconn);
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ *pconn, dur_handshake.count(), dur_pingpong.count());
+ }).then([conn] {});
+ });
+ }
+
+ static seastar::future<Client*> create(
+ unsigned rounds,
+ double keepalive_ratio,
+ ShardedGates *gates) {
+ return create_sharded<Client>(
+ seastar::this_shard_id(),
+ rounds,
+ keepalive_ratio,
+ gates);
+ }
+
+ private:
+ struct PingSession : public seastar::enable_shared_from_this<PingSession> {
+ unsigned count = 0u;
+ mono_time connected_time;
+ mono_time finish_time;
+ };
+ using PingSessionRef = seastar::shared_ptr<PingSession>;
+
+ void ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard) override {
+ auto &local = container().local();
+ assert(prv_shard == seastar::this_shard_id());
+ auto session = seastar::make_shared<PingSession>();
+ auto [i, added] = local.sessions.emplace(&*conn, session);
+ std::ignore = i;
+ ceph_assert(added);
+ session->connected_time = mono_clock::now();
+ }
+
+ std::optional<seastar::future<>> ms_dispatch(
+ crimson::net::ConnectionRef c, MessageRef m) override {
+ auto &local = container().local();
+ auto session = local.find_session(&*c);
+ ++(session->count);
+ if (verbose) {
+ logger().info("client ms_dispatch {}", session->count);
+ }
+
+ if (session->count > rounds) {
+ logger().error("{}: got {} pongs, more than expected {}", *c, session->count, rounds);
+ ceph_abort();
+ } else if (session->count == rounds) {
+ logger().info("{}: finished receiving {} pongs", *c, session->count);
+ session->finish_time = mono_clock::now();
+ gates.dispatch_in_background("echo_notify_done", [c, this] {
+ return container().invoke_on(primary_sid, [pconn=&*c](auto &local) {
+ auto found = local.pending_conns.find(pconn);
+ ceph_assert(found != local.pending_conns.end());
+ found->second.set_value();
+ }).then([c] {});
+ });
+ }
+ return {seastar::now()};
+ }
+
+ PingSessionRef find_session(crimson::net::Connection *c) {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ return found->second;
+ }
+
+ seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) {
+ auto [i, added] = pending_conns.emplace(&*conn, seastar::promise<>());
+ std::ignore = i;
+ ceph_assert(added);
+ return seastar::do_with(0u, 0u,
+ [this, conn](auto &count_ping, auto &count_keepalive) {
+ return seastar::do_until(
+ [this, conn, &count_ping, &count_keepalive] {
+ bool stop = (count_ping == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} pings with {} keepalives",
+ *conn, count_ping, count_keepalive);
+ }
+ return stop;
+ },
+ [this, conn, &count_ping, &count_keepalive] {
+ return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+ if (keepalive_dist(rng)) {
+ return conn->send_keepalive(
+ ).then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return conn->send(crimson::make_message<MPing>()
+ ).then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }).then([this, conn] {
+ auto found = pending_conns.find(&*conn);
+ assert(found != pending_conns.end());
+ return found->second.get_future();
+ }
+ );
+ });
+ }
+
+ private:
+ // primary shard only
+ const seastar::shard_id primary_sid;
+ std::bernoulli_distribution keepalive_dist;
+ crimson::net::MessengerRef msgr;
+ std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+
+ // per shard
+ const unsigned rounds;
+ std::map<crimson::net::Connection*, PingSessionRef> sessions;
+ ShardedGates &gates;
+ };
+ };
+
+ logger().info("test_echo(rounds={}, keepalive_ratio={}):",
+ rounds, keepalive_ratio);
+ return ShardedGates::create(
+ ).then([rounds, keepalive_ratio](auto *gates) {
+ return seastar::when_all_succeed(
+ test_state::Client::create(rounds, keepalive_ratio, gates),
+ test_state::Client::create(rounds, keepalive_ratio, gates),
+ seastar::make_ready_future<ShardedGates*>(gates));
+ }).then_unpack([](auto *client1, auto *client2, auto *gates) {
+ auto server1 = seastar::make_shared<test_state::Server>(*gates);
+ auto server2 = seastar::make_shared<test_state::Server>(*gates);
+ // start servers and clients
+ auto addr1 = get_server_addr();
+ auto addr2 = get_server_addr();
+ addr1.set_type(entity_addr_t::TYPE_MSGR2);
+ addr2.set_type(entity_addr_t::TYPE_MSGR2);
+ return seastar::when_all_succeed(
+ server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+ server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+ client1->init(entity_name_t::OSD(2), "client1", 3),
+ client2->init(entity_name_t::OSD(3), "client2", 4)
+ // dispatch pingpoing
+ ).then_unpack([client1, client2, server1, server2] {
+ return seastar::when_all_succeed(
+ // test connecting in parallel, accepting in parallel
+ client1->dispatch_pingpong(server1->msgr->get_myaddr()),
+ client1->dispatch_pingpong(server2->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server1->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server2->msgr->get_myaddr()));
+ // shutdown
+ }).then_unpack([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).then([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).then([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).then([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
+ }).then([] {
+ logger().info("test_echo() done!\n");
+ }).handle_exception([](auto eptr) {
+ logger().error("test_echo() failed: got exception {}", eptr);
+ throw;
+ }).finally([gates, server1, server2] {
+ return gates->close();
+ });
+ });
+}
+
+seastar::future<> test_preemptive_shutdown() {
+ struct test_state {
+ class Server final
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+
+ std::optional<seastar::future<>> ms_dispatch(
+ crimson::net::ConnectionRef c, MessageRef m) override {
+ std::ignore = c->send(crimson::make_message<MPing>());
+ return {seastar::now()};
+ }
+
+ public:
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, true);
+ msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+ return msgr->start({this});
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [addr] (const std::error_code& e) {
+ logger().error("test_preemptive_shutdown(): "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
+ }
+ entity_addr_t get_addr() const {
+ return msgr->get_myaddr();
+ }
+ seastar::future<> shutdown() {
+ msgr->stop();
+ return msgr->shutdown();
+ }
+ };
+
+ class Client final
+ : public crimson::net::Dispatcher {
+ crimson::net::MessengerRef msgr;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+
+ bool stop_send = false;
+ seastar::promise<> stopped_send_promise;
+
+ std::optional<seastar::future<>> ms_dispatch(
+ crimson::net::ConnectionRef, MessageRef m) override {
+ return {seastar::now()};
+ }
+
+ public:
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, true);
+ msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->start({this});
+ }
+ void send_pings(const entity_addr_t& addr) {
+ auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD);
+ // forwarded to stopped_send_promise
+ (void) seastar::do_until(
+ [this] { return stop_send; },
+ [conn] {
+ return conn->send(crimson::make_message<MPing>()).then([] {
+ return seastar::sleep(0ms);
+ });
+ }
+ ).then_wrapped([this, conn] (auto fut) {
+ fut.forward_to(std::move(stopped_send_promise));
+ });
+ }
+ seastar::future<> shutdown() {
+ msgr->stop();
+ return msgr->shutdown().then([this] {
+ stop_send = true;
+ return stopped_send_promise.get_future();
+ });
+ }
+ };
+ };
+
+ logger().info("test_preemptive_shutdown():");
+ auto server = seastar::make_shared<test_state::Server>();
+ auto client = seastar::make_shared<test_state::Client>();
+ auto addr = get_server_addr();
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ addr.set_family(AF_INET);
+ return seastar::when_all_succeed(
+ server->init(entity_name_t::OSD(6), "server4", 7, addr),
+ client->init(entity_name_t::OSD(7), "client4", 8)
+ ).then_unpack([server, client] {
+ client->send_pings(server->get_addr());
+ return seastar::sleep(100ms);
+ }).then([client] {
+ logger().info("client shutdown...");
+ return client->shutdown();
+ }).then([server] {
+ logger().info("server shutdown...");
+ return server->shutdown();
+ }).then([] {
+ logger().info("test_preemptive_shutdown() done!\n");
+ }).handle_exception([server, client] (auto eptr) {
+ logger().error("test_preemptive_shutdown() failed: got exception {}", eptr);
+ throw;
+ });
+}
+
+using ceph::msgr::v2::Tag;
+using crimson::net::bp_action_t;
+using crimson::net::bp_type_t;
+using crimson::net::Breakpoint;
+using crimson::net::Connection;
+using crimson::net::ConnectionRef;
+using crimson::net::custom_bp_t;
+using crimson::net::Dispatcher;
+using crimson::net::Interceptor;
+using crimson::net::Messenger;
+using crimson::net::MessengerRef;
+using crimson::net::SocketPolicy;
+using crimson::net::tag_bp_t;
+using namespace ceph::net::test;
+
+struct counter_t { unsigned counter = 0; };
+
+enum class conn_state_t {
+ unknown = 0,
+ established,
+ closed,
+ replaced,
+};
+
+std::ostream& operator<<(std::ostream& out, const conn_state_t& state) {
+ switch(state) {
+ case conn_state_t::unknown:
+ return out << "unknown";
+ case conn_state_t::established:
+ return out << "established";
+ case conn_state_t::closed:
+ return out << "closed";
+ case conn_state_t::replaced:
+ return out << "replaced";
+ default:
+ ceph_abort();
+ }
+}
+
+} // anonymous namespace
+
+#if FMT_VERSION >= 90000
+template<>
+struct fmt::formatter<conn_state_t> : fmt::ostream_formatter {};
+#endif
+
+namespace {
+
+struct ConnResult {
+ ConnectionRef conn;
+ unsigned index;
+ conn_state_t state = conn_state_t::unknown;
+
+ unsigned connect_attempts = 0;
+ unsigned client_connect_attempts = 0;
+ unsigned client_reconnect_attempts = 0;
+ unsigned cnt_connect_dispatched = 0;
+
+ unsigned accept_attempts = 0;
+ unsigned server_connect_attempts = 0;
+ unsigned server_reconnect_attempts = 0;
+ unsigned cnt_accept_dispatched = 0;
+
+ unsigned cnt_reset_dispatched = 0;
+ unsigned cnt_remote_reset_dispatched = 0;
+
+ ConnResult(ConnectionRef conn, unsigned index)
+ : conn(conn), index(index) {}
+
+ template <typename T>
+ void _assert_eq(const char* expr_actual, T actual,
+ const char* expr_expected, T expected) const {
+ if (actual != expected) {
+ throw std::runtime_error(fmt::format(
+ "[{}] {} '{}' is actually {}, not the expected '{}' {}",
+ index, *conn, expr_actual, actual, expr_expected, expected));
+ }
+ }
+
+#define ASSERT_EQUAL(actual, expected) \
+ _assert_eq(#actual, actual, #expected, expected)
+
+ void assert_state_at(conn_state_t expected) const {
+ ASSERT_EQUAL(state, expected);
+ }
+
+ void assert_connect(unsigned attempts,
+ unsigned connects,
+ unsigned reconnects,
+ unsigned dispatched) const {
+ ASSERT_EQUAL(connect_attempts, attempts);
+ ASSERT_EQUAL(client_connect_attempts, connects);
+ ASSERT_EQUAL(client_reconnect_attempts, reconnects);
+ ASSERT_EQUAL(cnt_connect_dispatched, dispatched);
+ }
+
+ void assert_connect(unsigned attempts,
+ unsigned dispatched) const {
+ ASSERT_EQUAL(connect_attempts, attempts);
+ ASSERT_EQUAL(cnt_connect_dispatched, dispatched);
+ }
+
+ void assert_accept(unsigned attempts,
+ unsigned accepts,
+ unsigned reaccepts,
+ unsigned dispatched) const {
+ ASSERT_EQUAL(accept_attempts, attempts);
+ ASSERT_EQUAL(server_connect_attempts, accepts);
+ ASSERT_EQUAL(server_reconnect_attempts, reaccepts);
+ ASSERT_EQUAL(cnt_accept_dispatched, dispatched);
+ }
+
+ void assert_accept(unsigned attempts,
+ unsigned dispatched) const {
+ ASSERT_EQUAL(accept_attempts, attempts);
+ ASSERT_EQUAL(cnt_accept_dispatched, dispatched);
+ }
+
+ void assert_reset(unsigned local, unsigned remote) const {
+ ASSERT_EQUAL(cnt_reset_dispatched, local);
+ ASSERT_EQUAL(cnt_remote_reset_dispatched, remote);
+ }
+
+ void dump() const {
+ logger().info("\nResult({}):\n"
+ " conn: [{}] {}:\n"
+ " state: {}\n"
+ " connect_attempts: {}\n"
+ " client_connect_attempts: {}\n"
+ " client_reconnect_attempts: {}\n"
+ " cnt_connect_dispatched: {}\n"
+ " accept_attempts: {}\n"
+ " server_connect_attempts: {}\n"
+ " server_reconnect_attempts: {}\n"
+ " cnt_accept_dispatched: {}\n"
+ " cnt_reset_dispatched: {}\n"
+ " cnt_remote_reset_dispatched: {}\n",
+ static_cast<const void*>(this),
+ index, *conn,
+ state,
+ connect_attempts,
+ client_connect_attempts,
+ client_reconnect_attempts,
+ cnt_connect_dispatched,
+ accept_attempts,
+ server_connect_attempts,
+ server_reconnect_attempts,
+ cnt_accept_dispatched,
+ cnt_reset_dispatched,
+ cnt_remote_reset_dispatched);
+ }
+};
+using ConnResults = std::vector<ConnResult>;
+
+struct TestInterceptor : public Interceptor {
+ std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
+ std::map<Breakpoint, counter_t> breakpoints_counter;
+ std::map<Connection*, unsigned> conns;
+ ConnResults results;
+ std::optional<seastar::abort_source> signal;
+ const seastar::shard_id primary_sid;
+
+ TestInterceptor() : primary_sid{seastar::this_shard_id()} {}
+
+ // only used for copy breakpoint configurations
+ TestInterceptor(const TestInterceptor& other) : primary_sid{other.primary_sid} {
+ assert(other.breakpoints_counter.empty());
+ assert(other.conns.empty());
+ assert(other.results.empty());
+ breakpoints = other.breakpoints;
+ assert(!other.signal);
+ assert(seastar::this_shard_id() == primary_sid);
+ }
+
+ void make_fault(Breakpoint bp, unsigned round = 1) {
+ assert(round >= 1);
+ breakpoints[bp][round] = bp_action_t::FAULT;
+ }
+
+ void make_block(Breakpoint bp, unsigned round = 1) {
+ assert(round >= 1);
+ breakpoints[bp][round] = bp_action_t::BLOCK;
+ }
+
+ void make_stall(Breakpoint bp, unsigned round = 1) {
+ assert(round >= 1);
+ breakpoints[bp][round] = bp_action_t::STALL;
+ }
+
+ ConnResult* find_result(Connection *conn) {
+ assert(seastar::this_shard_id() == primary_sid);
+ auto it = conns.find(conn);
+ if (it == conns.end()) {
+ return nullptr;
+ } else {
+ return &results[it->second];
+ }
+ }
+
+ seastar::future<> wait() {
+ assert(seastar::this_shard_id() == primary_sid);
+ assert(!signal);
+ signal = seastar::abort_source();
+ return seastar::sleep_abortable(10s, *signal).then([] {
+ throw std::runtime_error("Timeout (10s) in TestInterceptor::wait()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
+ }
+
+ void notify() {
+ assert(seastar::this_shard_id() == primary_sid);
+ if (signal) {
+ signal->request_abort();
+ signal = std::nullopt;
+ }
+ }
+
+ private:
+ void register_conn(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
+ if (result != nullptr) {
+ logger().error("The connection [{}] {} already exists when register {}",
+ result->index, *result->conn, *conn);
+ ceph_abort();
+ }
+ unsigned index = results.size();
+ results.emplace_back(conn, index);
+ conns[&*conn] = index;
+ notify();
+ logger().info("[{}] {} new connection registered", index, *conn);
+ }
+
+ void register_conn_closed(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked closed connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (result->state != conn_state_t::replaced) {
+ result->state = conn_state_t::closed;
+ }
+ notify();
+ logger().info("[{}] {} closed({})", result->index, *conn, result->state);
+ }
+
+ void register_conn_ready(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked ready connection: {}", *conn);
+ ceph_abort();
+ }
+
+ ceph_assert(conn->is_protocol_ready());
+ notify();
+ logger().info("[{}] {} ready", result->index, *conn);
+ }
+
+ void register_conn_replaced(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked replaced connection: {}", *conn);
+ ceph_abort();
+ }
+
+ result->state = conn_state_t::replaced;
+ logger().info("[{}] {} {}", result->index, *conn, result->state);
+ }
+
+ seastar::future<bp_action_t>
+ intercept(Connection &_conn, std::vector<Breakpoint> bps) override {
+ assert(bps.size() >= 1);
+ Connection *conn = &_conn;
+
+ return seastar::smp::submit_to(primary_sid, [conn, bps, this] {
+ std::vector<bp_action_t> actions;
+ for (const Breakpoint &bp : bps) {
+ ++breakpoints_counter[bp].counter;
+
+ auto result = find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
+ *conn, bp, breakpoints_counter[bp].counter);
+ ceph_abort();
+ }
+
+ if (bp == custom_bp_t::SOCKET_CONNECTING) {
+ ++result->connect_attempts;
+ logger().info("[Test] connect_attempts={}", result->connect_attempts);
+ } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
+ ++result->client_connect_attempts;
+ logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
+ } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
+ ++result->client_reconnect_attempts;
+ logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
+ } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
+ ++result->accept_attempts;
+ logger().info("[Test] accept_attempts={}", result->accept_attempts);
+ } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
+ ++result->server_connect_attempts;
+ logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
+ } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+ ++result->server_reconnect_attempts;
+ logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
+ }
+
+ auto it_bp = breakpoints.find(bp);
+ if (it_bp != breakpoints.end()) {
+ auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
+ if (it_cnt != it_bp->second.end()) {
+ logger().info("[{}] {} intercepted {}({}) => {}",
+ result->index, *conn, bp,
+ breakpoints_counter[bp].counter, it_cnt->second);
+ actions.emplace_back(it_cnt->second);
+ continue;
+ }
+ }
+ logger().info("[{}] {} intercepted {}({})",
+ result->index, *conn, bp, breakpoints_counter[bp].counter);
+ actions.emplace_back(bp_action_t::CONTINUE);
+ }
+
+ bp_action_t action = bp_action_t::CONTINUE;
+ for (bp_action_t &a : actions) {
+ if (a != bp_action_t::CONTINUE) {
+ if (action == bp_action_t::CONTINUE) {
+ action = a;
+ } else {
+ ceph_abort("got multiple incompatible actions");
+ }
+ }
+ }
+ return seastar::make_ready_future<bp_action_t>(action);
+ });
+ }
+};
+
+SocketPolicy to_socket_policy(policy_t policy) {
+ switch (policy) {
+ case policy_t::stateful_server:
+ return SocketPolicy::stateful_server(0);
+ case policy_t::stateless_server:
+ return SocketPolicy::stateless_server(0);
+ case policy_t::lossless_peer:
+ return SocketPolicy::lossless_peer(0);
+ case policy_t::lossless_peer_reuse:
+ return SocketPolicy::lossless_peer_reuse(0);
+ case policy_t::lossy_client:
+ return SocketPolicy::lossy_client(0);
+ case policy_t::lossless_client:
+ return SocketPolicy::lossless_client(0);
+ default:
+ logger().error("unexpected policy type");
+ ceph_abort();
+ }
+}
+
+class FailoverSuite : public Dispatcher {
+ crimson::auth::DummyAuthClientServer dummy_auth;
+ MessengerRef test_msgr;
+ const entity_addr_t test_peer_addr;
+ TestInterceptor interceptor;
+
+ unsigned tracked_index = 0;
+ Connection *tracked_conn = nullptr;
+ unsigned pending_send = 0;
+ unsigned pending_peer_receive = 0;
+ unsigned pending_receive = 0;
+
+ ShardedGates &gates;
+ const seastar::shard_id primary_sid;
+
+ std::optional<seastar::future<>> ms_dispatch(
+ ConnectionRef conn_ref, MessageRef m) override {
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked ms dispatched connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ }
+
+ ceph_assert(pending_receive > 0);
+ --pending_receive;
+ if (pending_receive == 0) {
+ interceptor.notify();
+ }
+ logger().info("[Test] got op, left {} ops -- [{}] {}",
+ pending_receive, result->index, *conn);
+ }).then([conn_ref] {});
+ });
+ return {seastar::now()};
+ }
+
+ void ms_handle_accept(
+ ConnectionRef conn_ref,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked accepted connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ ceph_abort();
+ }
+
+ tracked_index = result->index;
+ tracked_conn = &*conn;
+ ++result->cnt_accept_dispatched;
+ logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
+ result->cnt_accept_dispatched, result->index, *conn);
+ return flush_pending_send();
+ }).then([conn_ref] {});
+ });
+ }
+
+ void ms_handle_connect(
+ ConnectionRef conn_ref,
+ seastar::shard_id prv_shard) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked connected connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn == &*conn) {
+ ceph_assert(result->index == tracked_index);
+ }
+
+ ++result->cnt_connect_dispatched;
+ logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
+ result->cnt_connect_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
+ }
+
+ void ms_handle_reset(
+ ConnectionRef conn_ref,
+ bool is_replace) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked reset connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ tracked_index = 0;
+ tracked_conn = nullptr;
+ }
+
+ ++result->cnt_reset_dispatched;
+ logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
+ result->cnt_reset_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
+ }
+
+ void ms_handle_remote_reset(
+ ConnectionRef conn_ref) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked remotely reset connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ }
+
+ ++result->cnt_remote_reset_dispatched;
+ logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
+ result->cnt_remote_reset_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
+ }
+
+ private:
+ seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) {
+ test_msgr->set_default_policy(policy);
+ test_msgr->set_auth_client(&dummy_auth);
+ test_msgr->set_auth_server(&dummy_auth);
+ test_msgr->set_interceptor(&interceptor);
+ return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] {
+ return test_msgr->start({this});
+ }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) {
+ logger().error("FailoverSuite: "
+ "there is another instance running at {}", test_addr);
+ ceph_abort();
+ }));
+ }
+
+ seastar::future<> send_op(bool expect_reply=true) {
+ ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
+ if (expect_reply) {
+ ++pending_peer_receive;
+ }
+ pg_t pgid;
+ object_locator_t oloc;
+ hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ spg_t spgid(pgid);
+ return tracked_conn->send(crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0));
+ }
+
+ seastar::future<> flush_pending_send() {
+ if (pending_send != 0) {
+ logger().info("[Test] flush sending {} ops", pending_send);
+ }
+ ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
+ return seastar::do_until(
+ [this] { return pending_send == 0; },
+ [this] {
+ --pending_send;
+ return send_op();
+ });
+ }
+
+ seastar::future<> wait_ready(unsigned num_ready_conns,
+ unsigned num_replaced,
+ bool wait_received) {
+ assert(seastar::this_shard_id() == primary_sid);
+ unsigned pending_conns = 0;
+ unsigned pending_establish = 0;
+ unsigned replaced_conns = 0;
+ for (auto& result : interceptor.results) {
+ if (result.conn->is_protocol_closed_clean()) {
+ if (result.state == conn_state_t::replaced) {
+ ++replaced_conns;
+ }
+ } else if (result.conn->is_protocol_ready()) {
+ if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
+ result.state = conn_state_t::established;
+ } else {
+ ++pending_establish;
+ }
+ } else {
+ ++pending_conns;
+ }
+ }
+
+ bool do_wait = false;
+ if (num_ready_conns > 0) {
+ if (interceptor.results.size() > num_ready_conns) {
+ throw std::runtime_error(fmt::format(
+ "{} connections, more than expected: {}",
+ interceptor.results.size(), num_ready_conns));
+ } else if (interceptor.results.size() < num_ready_conns || pending_conns > 0) {
+ logger().info("[Test] wait_ready(): wait for connections,"
+ " currently {} out of {}, pending {} ready ...",
+ interceptor.results.size(), num_ready_conns, pending_conns);
+ do_wait = true;
+ }
+ }
+ if (wait_received) {
+ if (pending_send || pending_peer_receive || pending_receive) {
+ if (pending_conns || pending_establish) {
+ logger().info("[Test] wait_ready(): wait for pending_send={},"
+ " pending_peer_receive={}, pending_receive={},"
+ " pending {}/{} ready/establish connections ...",
+ pending_send, pending_peer_receive, pending_receive,
+ pending_conns, pending_establish);
+ do_wait = true;
+ } else {
+ // If there are pending messages, stop waiting if there are
+ // no longer pending connections.
+ }
+ } else {
+ // Stop waiting if there are no pending messages. Pending connections
+ // should not be important.
+ }
+ }
+ if (num_replaced > 0) {
+ if (replaced_conns > num_replaced) {
+ throw std::runtime_error(fmt::format(
+ "{} replaced connections, more than expected: {}",
+ replaced_conns, num_replaced));
+ }
+ if (replaced_conns < num_replaced) {
+ logger().info("[Test] wait_ready(): wait for {} replaced connections,"
+ " currently {} ...",
+ num_replaced, replaced_conns);
+ do_wait = true;
+ }
+ }
+
+ if (do_wait) {
+ return interceptor.wait(
+ ).then([this, num_ready_conns, num_replaced, wait_received] {
+ return wait_ready(num_ready_conns, num_replaced, wait_received);
+ });
+ } else {
+ logger().info("[Test] wait_ready(): wait done!");
+ return seastar::now();
+ }
+ }
+
+ // called by FailoverTest
+ public:
+ FailoverSuite(MessengerRef test_msgr,
+ entity_addr_t test_peer_addr,
+ const TestInterceptor& interceptor,
+ ShardedGates &gates)
+ : test_msgr(test_msgr),
+ test_peer_addr(test_peer_addr),
+ interceptor(interceptor),
+ gates{gates},
+ primary_sid{seastar::this_shard_id()} { }
+
+ entity_addr_t get_addr() const {
+ return test_msgr->get_myaddr();
+ }
+
+ seastar::future<> shutdown() {
+ test_msgr->stop();
+ return test_msgr->shutdown();
+ }
+
+ void needs_receive() {
+ ++pending_receive;
+ }
+
+ void notify_peer_reply() {
+ ceph_assert(pending_peer_receive > 0);
+ --pending_peer_receive;
+ logger().info("[Test] TestPeer said got op, left {} ops",
+ pending_peer_receive);
+ if (pending_peer_receive == 0) {
+ interceptor.notify();
+ }
+ }
+
+ void post_check() const {
+ // make sure all breakpoints were hit
+ for (auto& kv : interceptor.breakpoints) {
+ auto it = interceptor.breakpoints_counter.find(kv.first);
+ if (it == interceptor.breakpoints_counter.end()) {
+ throw std::runtime_error(fmt::format("{} was missed", kv.first));
+ }
+ auto expected = kv.second.rbegin()->first;
+ if (expected > it->second.counter) {
+ throw std::runtime_error(fmt::format(
+ "{} only triggered {} times, not the expected {}",
+ kv.first, it->second.counter, expected));
+ }
+ }
+ }
+
+ void dump_results() const {
+ for (auto& result : interceptor.results) {
+ result.dump();
+ }
+ }
+
+ static seastar::future<std::unique_ptr<FailoverSuite>>
+ create(entity_addr_t test_addr,
+ SocketPolicy test_policy,
+ entity_addr_t test_peer_addr,
+ const TestInterceptor& interceptor,
+ ShardedGates &gates) {
+ auto suite = std::make_unique<FailoverSuite>(
+ Messenger::create(
+ entity_name_t::OSD(TEST_OSD),
+ "Test",
+ TEST_NONCE,
+ false),
+ test_peer_addr,
+ interceptor,
+ gates);
+ return suite->init(test_addr, test_policy
+ ).then([suite = std::move(suite)] () mutable {
+ return std::move(suite);
+ });
+ }
+
+ // called by tests
+ public:
+ seastar::future<> connect_peer() {
+ logger().info("[Test] connect_peer({})", test_peer_addr);
+ assert(seastar::this_shard_id() == primary_sid);
+ auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
+ auto result = interceptor.find_result(&*conn);
+ ceph_assert(result != nullptr);
+
+ if (tracked_conn) {
+ if (tracked_conn->is_protocol_closed()) {
+ logger().info("[Test] this is a new session"
+ " replacing an closed one");
+ ceph_assert(tracked_conn != &*conn);
+ } else {
+ logger().info("[Test] this is not a new session");
+ ceph_assert(tracked_index == result->index);
+ ceph_assert(tracked_conn == &*conn);
+ }
+ } else {
+ logger().info("[Test] this is a new session");
+ }
+ tracked_index = result->index;
+ tracked_conn = &*conn;
+
+ return flush_pending_send();
+ }
+
+ seastar::future<> send_peer() {
+ assert(seastar::this_shard_id() == primary_sid);
+ if (tracked_conn) {
+ logger().info("[Test] send_peer()");
+ ceph_assert(!tracked_conn->is_protocol_closed());
+ ceph_assert(!pending_send);
+ return send_op();
+ } else {
+ ++pending_send;
+ logger().info("[Test] send_peer() (pending {})", pending_send);
+ return seastar::now();
+ }
+ }
+
+ seastar::future<> keepalive_peer() {
+ logger().info("[Test] keepalive_peer()");
+ assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
+ return tracked_conn->send_keepalive();
+ }
+
+ seastar::future<> try_send_peer() {
+ logger().info("[Test] try_send_peer()");
+ assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
+ return send_op(false);
+ }
+
+ seastar::future<> markdown() {
+ logger().info("[Test] markdown() in 100ms ...");
+ assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(tracked_conn);
+ // sleep to propagate potential remaining acks
+ return seastar::sleep(50ms
+ ).then([this] {
+ return seastar::smp::submit_to(
+ tracked_conn->get_shard_id(), [tracked_conn=tracked_conn] {
+ assert(tracked_conn->get_shard_id() == seastar::this_shard_id());
+ tracked_conn->mark_down();
+ });
+ }).then([] {
+ // sleep to wait for markdown propagate to the primary sid
+ return seastar::sleep(100ms);
+ });
+ }
+
+ seastar::future<> wait_blocked() {
+ logger().info("[Test] wait_blocked() ...");
+ assert(seastar::this_shard_id() == primary_sid);
+ return interceptor.blocker.wait_blocked();
+ }
+
+ void unblock() {
+ logger().info("[Test] unblock()");
+ assert(seastar::this_shard_id() == primary_sid);
+ return interceptor.blocker.unblock();
+ }
+
+ seastar::future<> wait_replaced(unsigned count) {
+ logger().info("[Test] wait_replaced({}) ...", count);
+ return wait_ready(0, count, false);
+ }
+
+ seastar::future<> wait_established() {
+ logger().info("[Test] wait_established() ...");
+ return wait_ready(0, 0, true);
+ }
+
+ seastar::future<std::reference_wrapper<ConnResults>>
+ wait_results(unsigned count) {
+ logger().info("[Test] wait_result({}) ...", count);
+ return wait_ready(count, 0, true).then([this] {
+ return std::reference_wrapper<ConnResults>(interceptor.results);
+ });
+ }
+
+ bool is_standby() {
+ assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(tracked_conn);
+ return tracked_conn->is_protocol_standby();
+ }
+};
+
+class FailoverTest : public Dispatcher {
+ crimson::auth::DummyAuthClientServer dummy_auth;
+ MessengerRef cmd_msgr;
+ ConnectionRef cmd_conn;
+ const entity_addr_t test_addr;
+ const entity_addr_t test_peer_addr;
+
+ std::optional<seastar::promise<>> recv_pong;
+ std::optional<seastar::promise<>> recv_cmdreply;
+
+ std::unique_ptr<FailoverSuite> test_suite;
+
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ ceph_assert(recv_pong);
+ recv_pong->set_value();
+ recv_pong = std::nullopt;
+ break;
+ case MSG_COMMAND_REPLY:
+ ceph_assert(recv_cmdreply);
+ recv_cmdreply->set_value();
+ recv_cmdreply = std::nullopt;
+ break;
+ case MSG_COMMAND: {
+ auto m_cmd = boost::static_pointer_cast<MCommand>(m);
+ ceph_assert(static_cast<cmd_t>(m_cmd->cmd[0][0]) == cmd_t::suite_recv_op);
+ ceph_assert(test_suite);
+ test_suite->notify_peer_reply();
+ break;
+ }
+ default:
+ logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
+ ceph_abort();
+ }
+ return {seastar::now()};
+ }
+
+ private:
+ seastar::future<> prepare_cmd(
+ cmd_t cmd,
+ std::function<void(MCommand&)>
+ f_prepare = [] (auto& m) { return; }) {
+ assert(!recv_cmdreply);
+ recv_cmdreply = seastar::promise<>();
+ auto fut = recv_cmdreply->get_future();
+ auto m = crimson::make_message<MCommand>();
+ m->cmd.emplace_back(1, static_cast<char>(cmd));
+ f_prepare(*m);
+ return cmd_conn->send(std::move(m)).then([fut = std::move(fut)] () mutable {
+ return std::move(fut);
+ });
+ }
+
+ seastar::future<> start_peer(policy_t peer_policy) {
+ return prepare_cmd(cmd_t::suite_start,
+ [peer_policy] (auto& m) {
+ m.cmd.emplace_back(1, static_cast<char>(peer_policy));
+ });
+ }
+
+ seastar::future<> stop_peer() {
+ return prepare_cmd(cmd_t::suite_stop);
+ }
+
+ seastar::future<> pingpong() {
+ assert(!recv_pong);
+ recv_pong = seastar::promise<>();
+ auto fut = recv_pong->get_future();
+ return cmd_conn->send(crimson::make_message<MPing>()
+ ).then([fut = std::move(fut)] () mutable {
+ return std::move(fut);
+ });
+ }
+
+ seastar::future<> init(entity_addr_t cmd_peer_addr) {
+ cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0));
+ cmd_msgr->set_auth_client(&dummy_auth);
+ cmd_msgr->set_auth_server(&dummy_auth);
+ return cmd_msgr->start({this}).then([this, cmd_peer_addr] {
+ logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
+ cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
+ return pingpong();
+ });
+ }
+
+ public:
+ FailoverTest(MessengerRef cmd_msgr,
+ entity_addr_t test_addr,
+ entity_addr_t test_peer_addr)
+ : cmd_msgr(cmd_msgr),
+ test_addr(test_addr),
+ test_peer_addr(test_peer_addr) { }
+
+ seastar::future<> shutdown() {
+ logger().info("CmdCli shutdown...");
+ assert(!recv_cmdreply);
+ auto m = crimson::make_message<MCommand>();
+ m->cmd.emplace_back(1, static_cast<char>(cmd_t::shutdown));
+ return cmd_conn->send(std::move(m)).then([] {
+ return seastar::sleep(200ms);
+ }).then([this] {
+ cmd_msgr->stop();
+ return cmd_msgr->shutdown();
+ });
+ }
+
+ static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
+ create(entity_addr_t test_addr,
+ entity_addr_t cmd_peer_addr,
+ entity_addr_t test_peer_addr) {
+ auto test = seastar::make_lw_shared<FailoverTest>(
+ Messenger::create(
+ entity_name_t::OSD(CMD_CLI_OSD),
+ "CmdCli",
+ CMD_CLI_NONCE,
+ true),
+ test_addr, test_peer_addr);
+ return test->init(cmd_peer_addr).then([test] {
+ logger().info("CmdCli ready");
+ return test;
+ });
+ }
+
+ // called by tests
+ public:
+ seastar::future<> run_suite(
+ std::string name,
+ const TestInterceptor& interceptor,
+ policy_t test_policy,
+ policy_t peer_policy,
+ std::function<seastar::future<>(FailoverSuite&)>&& f) {
+ logger().info("\n\n[{}]", name);
+ ceph_assert(!test_suite);
+ SocketPolicy test_policy_ = to_socket_policy(test_policy);
+ return ShardedGates::create(
+ ).then([this, test_policy_, peer_policy, interceptor,
+ f=std::move(f)](auto *gates) mutable {
+ return FailoverSuite::create(
+ test_addr, test_policy_, test_peer_addr, interceptor, *gates
+ ).then([this, peer_policy, f = std::move(f)](auto suite) mutable {
+ ceph_assert(suite->get_addr() == test_addr);
+ test_suite.swap(suite);
+ return start_peer(peer_policy
+ ).then([this, f = std::move(f)] {
+ return f(*test_suite);
+ }).then([this] {
+ test_suite->post_check();
+ logger().info("\n[SUCCESS]");
+ }).handle_exception([this](auto eptr) {
+ logger().info("\n[FAIL: {}]", eptr);
+ test_suite->dump_results();
+ throw;
+ }).then([this] {
+ return stop_peer();
+ }).then([this] {
+ return test_suite->shutdown(
+ ).then([this] {
+ test_suite.reset();
+ });
+ });
+ }).then([gates] {
+ return gates->close();
+ });
+ });
+ }
+
+ seastar::future<> peer_connect_me() {
+ logger().info("[Test] peer_connect_me({})", test_addr);
+ return prepare_cmd(cmd_t::suite_connect_me,
+ [this] (auto& m) {
+ m.cmd.emplace_back(fmt::format("{}", test_addr));
+ });
+ }
+
+ seastar::future<> peer_send_me() {
+ logger().info("[Test] peer_send_me()");
+ ceph_assert(test_suite);
+ test_suite->needs_receive();
+ return prepare_cmd(cmd_t::suite_send_me);
+ }
+
+ seastar::future<> try_peer_send_me() {
+ logger().info("[Test] try_peer_send_me()");
+ ceph_assert(test_suite);
+ return prepare_cmd(cmd_t::suite_send_me);
+ }
+
+ seastar::future<> send_bidirectional() {
+ ceph_assert(test_suite);
+ return test_suite->send_peer().then([this] {
+ return peer_send_me();
+ });
+ }
+
+ seastar::future<> peer_keepalive_me() {
+ logger().info("[Test] peer_keepalive_me()");
+ ceph_assert(test_suite);
+ return prepare_cmd(cmd_t::suite_keepalive_me);
+ }
+
+ seastar::future<> markdown_peer() {
+ logger().info("[Test] markdown_peer() in 150ms ...");
+ // sleep to propagate potential remaining acks
+ return seastar::sleep(50ms
+ ).then([this] {
+ return prepare_cmd(cmd_t::suite_markdown);
+ }).then([] {
+ // sleep awhile for peer markdown propagated
+ return seastar::sleep(100ms);
+ });
+ }
+};
+
+class FailoverSuitePeer : public Dispatcher {
+ using cb_t = std::function<seastar::future<>()>;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+ MessengerRef peer_msgr;
+ cb_t op_callback;
+
+ ConnectionRef tracked_conn;
+ unsigned pending_send = 0;
+
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
+ logger().info("[TestPeer] got op from Test");
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ std::ignore = op_callback();
+ return {seastar::now()};
+ }
+
+ void ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ assert(prv_shard == seastar::this_shard_id());
+ logger().info("[TestPeer] got accept from Test");
+
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != conn) {
+ logger().error("[TestPeer] {} got accepted, but there's already a valid traced_conn {}",
+ *conn, *tracked_conn);
+ }
+ tracked_conn = conn;
+ std::ignore = flush_pending_send();
+ }
+
+ void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
+ logger().info("[TestPeer] got reset from Test");
+ }
+
+ private:
+ seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) {
+ peer_msgr->set_default_policy(policy);
+ peer_msgr->set_auth_client(&dummy_auth);
+ peer_msgr->set_auth_server(&dummy_auth);
+ return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] {
+ return peer_msgr->start({this});
+ }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) {
+ logger().error("FailoverSuitePeer: "
+ "there is another instance running at {}", test_peer_addr);
+ ceph_abort();
+ }));
+ }
+
+ seastar::future<> send_op() {
+ ceph_assert(tracked_conn);
+ if (tracked_conn->is_protocol_closed()) {
+ logger().error("[TestPeer] send op but the connection is closed -- {}",
+ *tracked_conn);
+ }
+
+ pg_t pgid;
+ object_locator_t oloc;
+ hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ spg_t spgid(pgid);
+ return tracked_conn->send(crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0));
+ }
+
+ seastar::future<> flush_pending_send() {
+ if (pending_send != 0) {
+ logger().info("[TestPeer] flush sending {} ops", pending_send);
+ }
+ ceph_assert(tracked_conn);
+ return seastar::do_until(
+ [this] { return pending_send == 0; },
+ [this] {
+ --pending_send;
+ return send_op();
+ });
+ }
+
+ public:
+ FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
+ : peer_msgr(peer_msgr),
+ op_callback(op_callback) { }
+
+ seastar::future<> shutdown() {
+ peer_msgr->stop();
+ return peer_msgr->shutdown();
+ }
+
+ seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
+ logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
+ auto conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+
+ if (tracked_conn) {
+ if (tracked_conn->is_protocol_closed()) {
+ logger().info("[TestPeer] this is a new session"
+ " replacing an closed one");
+ ceph_assert(tracked_conn != conn);
+ } else {
+ logger().info("[TestPeer] this is not a new session");
+ ceph_assert(tracked_conn == conn);
+ }
+ } else {
+ logger().info("[TestPeer] this is a new session");
+ }
+ tracked_conn = conn;
+
+ return flush_pending_send();
+ }
+
+ seastar::future<> send_peer() {
+ if (tracked_conn) {
+ logger().info("[TestPeer] send_peer()");
+ ceph_assert(!pending_send);
+ return send_op();
+ } else {
+ ++pending_send;
+ logger().info("[TestPeer] send_peer() (pending {})", pending_send);
+ return seastar::now();
+ }
+ }
+
+ seastar::future<> keepalive_peer() {
+ logger().info("[TestPeer] keepalive_peer()");
+ ceph_assert(tracked_conn);
+ return tracked_conn->send_keepalive();
+ }
+
+ seastar::future<> markdown() {
+ logger().info("[TestPeer] markdown()");
+ ceph_assert(tracked_conn);
+ tracked_conn->mark_down();
+ return seastar::now();
+ }
+
+ static seastar::future<std::unique_ptr<FailoverSuitePeer>>
+ create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) {
+ auto suite = std::make_unique<FailoverSuitePeer>(
+ Messenger::create(
+ entity_name_t::OSD(TEST_PEER_OSD),
+ "TestPeer",
+ TEST_PEER_NONCE,
+ true),
+ op_callback
+ );
+ return suite->init(test_peer_addr, policy
+ ).then([suite = std::move(suite)] () mutable {
+ return std::move(suite);
+ });
+ }
+};
+
+class FailoverTestPeer : public Dispatcher {
+ crimson::auth::DummyAuthClientServer dummy_auth;
+ MessengerRef cmd_msgr;
+ ConnectionRef cmd_conn;
+ const entity_addr_t test_peer_addr;
+ std::unique_ptr<FailoverSuitePeer> test_suite;
+
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ ceph_assert(cmd_conn == c);
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ std::ignore = c->send(crimson::make_message<MPing>());
+ break;
+ case MSG_COMMAND: {
+ auto m_cmd = boost::static_pointer_cast<MCommand>(m);
+ auto cmd = static_cast<cmd_t>(m_cmd->cmd[0][0]);
+ if (cmd == cmd_t::shutdown) {
+ logger().info("CmdSrv shutdown...");
+ // forwarded to FailoverTestPeer::wait()
+ cmd_msgr->stop();
+ std::ignore = cmd_msgr->shutdown();
+ } else {
+ std::ignore = handle_cmd(cmd, m_cmd).then([c] {
+ return c->send(crimson::make_message<MCommandReply>());
+ });
+ }
+ break;
+ }
+ default:
+ logger().error("{} got unexpected msg from cmd client: {}", *c, *m);
+ ceph_abort();
+ }
+ return {seastar::now()};
+ }
+
+ void ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ assert(prv_shard == seastar::this_shard_id());
+ cmd_conn = conn;
+ }
+
+ private:
+ seastar::future<> notify_recv_op() {
+ ceph_assert(cmd_conn);
+ auto m = crimson::make_message<MCommand>();
+ m->cmd.emplace_back(1, static_cast<char>(cmd_t::suite_recv_op));
+ return cmd_conn->send(std::move(m));
+ }
+
+ seastar::future<> handle_cmd(cmd_t cmd, MRef<MCommand> m_cmd) {
+ switch (cmd) {
+ case cmd_t::suite_start: {
+ ceph_assert(!test_suite);
+ auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
+ return FailoverSuitePeer::create(
+ test_peer_addr, policy, [this] { return notify_recv_op(); }
+ ).then([this] (auto suite) {
+ test_suite.swap(suite);
+ });
+ }
+ case cmd_t::suite_stop:
+ ceph_assert(test_suite);
+ return test_suite->shutdown().then([this] {
+ test_suite.reset();
+ });
+ case cmd_t::suite_connect_me: {
+ ceph_assert(test_suite);
+ entity_addr_t test_addr_decoded = entity_addr_t();
+ test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr);
+ return test_suite->connect_peer(test_addr_decoded);
+ }
+ case cmd_t::suite_send_me:
+ ceph_assert(test_suite);
+ return test_suite->send_peer();
+ case cmd_t::suite_keepalive_me:
+ ceph_assert(test_suite);
+ return test_suite->keepalive_peer();
+ case cmd_t::suite_markdown:
+ ceph_assert(test_suite);
+ return test_suite->markdown();
+ default:
+ logger().error("TestPeer got unexpected command {} from Test",
+ fmt::ptr(m_cmd.get()));
+ ceph_abort();
+ return seastar::now();
+ }
+ }
+
+ seastar::future<> init(entity_addr_t cmd_peer_addr) {
+ cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+ cmd_msgr->set_auth_client(&dummy_auth);
+ cmd_msgr->set_auth_server(&dummy_auth);
+ return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).safe_then([this] {
+ return cmd_msgr->start({this});
+ }, Messenger::bind_ertr::all_same_way([cmd_peer_addr] (const std::error_code& e) {
+ logger().error("FailoverTestPeer: "
+ "there is another instance running at {}", cmd_peer_addr);
+ ceph_abort();
+ }));
+ }
+
+ public:
+ FailoverTestPeer(MessengerRef cmd_msgr,
+ entity_addr_t test_peer_addr)
+ : cmd_msgr(cmd_msgr),
+ test_peer_addr(test_peer_addr) { }
+
+ seastar::future<> wait() {
+ return cmd_msgr->wait();
+ }
+
+ static seastar::future<std::unique_ptr<FailoverTestPeer>>
+ create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
+ auto test_peer = std::make_unique<FailoverTestPeer>(
+ Messenger::create(
+ entity_name_t::OSD(CMD_SRV_OSD),
+ "CmdSrv",
+ CMD_SRV_NONCE,
+ true),
+ test_peer_addr);
+ return test_peer->init(cmd_peer_addr
+ ).then([test_peer = std::move(test_peer)] () mutable {
+ logger().info("CmdSrv ready");
+ return std::move(test_peer);
+ });
+ }
+};
+
+seastar::future<>
+test_v2_lossy_early_connect_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {custom_bp_t::SOCKET_CONNECTING},
+ {custom_bp_t::BANNER_WRITE},
+ {custom_bp_t::BANNER_READ},
+ {custom_bp_t::BANNER_PAYLOAD_READ},
+ {Tag::HELLO, bp_type_t::WRITE},
+ {Tag::HELLO, bp_type_t::READ},
+ {Tag::AUTH_REQUEST, bp_type_t::WRITE},
+ {Tag::AUTH_DONE, bp_type_t::READ},
+ {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
+ {Tag::AUTH_SIGNATURE, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossy_early_connect_fault -- {}", bp),
+ interceptor,
+ policy_t::lossy_client,
+ policy_t::stateless_server,
+ [] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossy_connect_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::CLIENT_IDENT, bp_type_t::WRITE},
+ {Tag::SERVER_IDENT, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossy_connect_fault -- {}", bp),
+ interceptor,
+ policy_t::lossy_client,
+ policy_t::stateless_server,
+ [] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 2, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossy_connected_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::MESSAGE, bp_type_t::WRITE},
+ {Tag::MESSAGE, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossy_connected_fault -- {}", bp),
+ interceptor,
+ policy_t::lossy_client,
+ policy_t::stateless_server,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(1, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossy_early_accept_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {custom_bp_t::BANNER_WRITE},
+ {custom_bp_t::BANNER_READ},
+ {custom_bp_t::BANNER_PAYLOAD_READ},
+ {Tag::HELLO, bp_type_t::WRITE},
+ {Tag::HELLO, bp_type_t::READ},
+ {Tag::AUTH_REQUEST, bp_type_t::READ},
+ {Tag::AUTH_DONE, bp_type_t::WRITE},
+ {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
+ {Tag::AUTH_SIGNATURE, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossy_early_accept_fault -- {}", bp),
+ interceptor,
+ policy_t::stateless_server,
+ policy_t::lossy_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossy_accept_fault(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossy_accept_fault -- {}", bp),
+ interceptor,
+ policy_t::stateless_server,
+ policy_t::lossy_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossy_establishing_fault(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossy_establishing_fault -- {}", bp),
+ interceptor,
+ policy_t::stateless_server,
+ policy_t::lossy_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(1, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossy_accepted_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::MESSAGE, bp_type_t::WRITE},
+ {Tag::MESSAGE, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossy_accepted_fault -- {}", bp),
+ interceptor,
+ policy_t::stateless_server,
+ policy_t::lossy_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(1, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_connect_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::CLIENT_IDENT, bp_type_t::WRITE},
+ {Tag::SERVER_IDENT, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_connect_fault -- {}", bp),
+ interceptor,
+ policy_t::lossless_client,
+ policy_t::stateful_server,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 2, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_connected_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::MESSAGE, bp_type_t::WRITE},
+ {Tag::MESSAGE, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_connected_fault -- {}", bp),
+ interceptor,
+ policy_t::lossless_client,
+ policy_t::stateful_server,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 1, 1, 2);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_connected_fault2(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::ACK, bp_type_t::READ},
+ {Tag::ACK, bp_type_t::WRITE},
+ {Tag::KEEPALIVE2, bp_type_t::READ},
+ {Tag::KEEPALIVE2, bp_type_t::WRITE},
+ {Tag::KEEPALIVE2_ACK, bp_type_t::READ},
+ {Tag::KEEPALIVE2_ACK, bp_type_t::WRITE},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_connected_fault2 -- {}", bp),
+ interceptor,
+ policy_t::lossless_client,
+ policy_t::stateful_server,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.keepalive_peer();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_keepalive_me();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 1, 1, 2);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_reconnect_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<std::pair<Breakpoint, Breakpoint>>{
+ {{Tag::MESSAGE, bp_type_t::WRITE},
+ {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
+ {{Tag::MESSAGE, bp_type_t::WRITE},
+ {Tag::SESSION_RECONNECT_OK, bp_type_t::READ}},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp_pair.first);
+ interceptor.make_fault(bp_pair.second);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_reconnect_fault -- {}, {}",
+ bp_pair.first, bp_pair.second),
+ interceptor,
+ policy_t::lossless_client,
+ policy_t::stateful_server,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(3, 1, 2, 2);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_accept_fault(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_accept_fault -- {}", bp),
+ interceptor,
+ policy_t::stateful_server,
+ policy_t::lossless_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_establishing_fault(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_establishing_fault -- {}", bp),
+ interceptor,
+ policy_t::stateful_server,
+ policy_t::lossless_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_accepted_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::MESSAGE, bp_type_t::WRITE},
+ {Tag::MESSAGE, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_accepted_fault -- {}", bp),
+ interceptor,
+ policy_t::stateful_server,
+ policy_t::lossless_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_reaccept_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<std::pair<Breakpoint, Breakpoint>>{
+ {{Tag::MESSAGE, bp_type_t::READ},
+ {Tag::SESSION_RECONNECT, bp_type_t::READ}},
+ {{Tag::MESSAGE, bp_type_t::READ},
+ {Tag::SESSION_RECONNECT_OK, bp_type_t::WRITE}},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp_pair.first);
+ interceptor.make_fault(bp_pair.second);
+ return test.run_suite(
+ fmt::format("test_v2_lossless_reaccept_fault -- {}, {}",
+ bp_pair.first, bp_pair.second),
+ interceptor,
+ policy_t::stateful_server,
+ policy_t::lossless_client,
+ [&test, bp = bp_pair.second] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.send_bidirectional();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(3);
+ }).then([bp] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ if (bp == Breakpoint{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+ results[0].assert_accept(1, 1, 0, 2);
+ } else {
+ results[0].assert_accept(1, 1, 0, 3);
+ }
+ results[0].assert_reset(0, 0);
+ if (bp == Breakpoint{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+ results[1].assert_state_at(conn_state_t::closed);
+ } else {
+ results[1].assert_state_at(conn_state_t::replaced);
+ }
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0, 1, 0);
+ results[1].assert_reset(0, 0);
+ results[2].assert_state_at(conn_state_t::replaced);
+ results[2].assert_connect(0, 0, 0, 0);
+ results[2].assert_accept(1, 0, 1, 0);
+ results[2].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_peer_connect_fault(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {Tag::CLIENT_IDENT, bp_type_t::WRITE},
+ {Tag::SERVER_IDENT, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_peer_connect_fault -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 2, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_peer_accept_fault(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_peer_accept_fault -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_peer_establishing_fault(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_peer_establishing_fault -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_peer_connected_fault_reconnect(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::MESSAGE, bp_type_t::WRITE};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_peer_connected_fault_reconnect -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 1, 1, 2);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_peer_connected_fault_reaccept(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::MESSAGE, bp_type_t::READ};
+ TestInterceptor interceptor;
+ interceptor.make_fault(bp);
+ return test.run_suite(
+ fmt::format("test_v2_peer_connected_fault_reaccept -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 1);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0, 1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<bool>
+check_peer_wins(FailoverTest& test) {
+ return seastar::do_with(bool(), [&test] (auto& ret) {
+ return test.run_suite("check_peer_wins",
+ TestInterceptor(),
+ policy_t::lossy_client,
+ policy_t::stateless_server,
+ [&ret] (FailoverSuite& suite) {
+ return suite.connect_peer().then([&suite] {
+ return suite.wait_results(1);
+ }).then([&ret] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ ret = results[0].conn->peer_wins();
+ logger().info("check_peer_wins: {}", ret);
+ });
+ }).then([&ret] {
+ return ret;
+ });
+ });
+}
+
+seastar::future<>
+test_v2_racing_reconnect_acceptor_lose(FailoverTest& test) {
+ return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+ {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
+ {2, {custom_bp_t::BANNER_WRITE}},
+ {2, {custom_bp_t::BANNER_READ}},
+ {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
+ {2, {Tag::HELLO, bp_type_t::WRITE}},
+ {2, {Tag::HELLO, bp_type_t::READ}},
+ {2, {Tag::AUTH_REQUEST, bp_type_t::READ}},
+ {2, {Tag::AUTH_DONE, bp_type_t::WRITE}},
+ {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
+ {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ // fault acceptor
+ interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
+ // block acceptor
+ interceptor.make_block(bp.second, bp.first);
+ return test.run_suite(
+ fmt::format("test_v2_racing_reconnect_acceptor_lose -- {}({})",
+ bp.second, bp.first),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 0, 1, 1);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::closed);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_racing_reconnect_acceptor_win(FailoverTest& test) {
+ return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+ {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
+ {2, {custom_bp_t::SOCKET_CONNECTING}},
+ {2, {custom_bp_t::BANNER_WRITE}},
+ {2, {custom_bp_t::BANNER_READ}},
+ {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
+ {2, {Tag::HELLO, bp_type_t::WRITE}},
+ {2, {Tag::HELLO, bp_type_t::READ}},
+ {2, {Tag::AUTH_REQUEST, bp_type_t::WRITE}},
+ {2, {Tag::AUTH_DONE, bp_type_t::READ}},
+ {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
+ {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ // fault connector
+ interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
+ // block connector
+ interceptor.make_block(bp.second, bp.first);
+ return test.run_suite(
+ fmt::format("test_v2_racing_reconnect_acceptor_win -- {}({})",
+ bp.second, bp.first),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_replaced(1);
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 1);
+ results[0].assert_accept(0, 0, 0, 1);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0, 1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_racing_connect_acceptor_lose(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {custom_bp_t::BANNER_WRITE},
+ {custom_bp_t::BANNER_READ},
+ {custom_bp_t::BANNER_PAYLOAD_READ},
+ {Tag::HELLO, bp_type_t::WRITE},
+ {Tag::HELLO, bp_type_t::READ},
+ {Tag::AUTH_REQUEST, bp_type_t::READ},
+ {Tag::AUTH_DONE, bp_type_t::WRITE},
+ {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
+ {Tag::AUTH_SIGNATURE, bp_type_t::READ},
+ {Tag::CLIENT_IDENT, bp_type_t::READ},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ // block acceptor
+ interceptor.make_block(bp);
+ return test.run_suite(
+ fmt::format("test_v2_racing_connect_acceptor_lose -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(1, 1, 0, 1);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_racing_connect_acceptor_win(FailoverTest& test) {
+ return seastar::do_with(std::vector<Breakpoint>{
+ {custom_bp_t::SOCKET_CONNECTING},
+ {custom_bp_t::BANNER_WRITE},
+ {custom_bp_t::BANNER_READ},
+ {custom_bp_t::BANNER_PAYLOAD_READ},
+ {Tag::HELLO, bp_type_t::WRITE},
+ {Tag::HELLO, bp_type_t::READ},
+ {Tag::AUTH_REQUEST, bp_type_t::WRITE},
+ {Tag::AUTH_DONE, bp_type_t::READ},
+ {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
+ {Tag::AUTH_SIGNATURE, bp_type_t::READ},
+ {Tag::CLIENT_IDENT, bp_type_t::WRITE},
+ }, [&test] (auto& failure_cases) {
+ return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+ TestInterceptor interceptor;
+ // block connector
+ interceptor.make_block(bp);
+ return test.run_suite(
+ fmt::format("test_v2_racing_connect_acceptor_win -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_replaced(1);
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 0);
+ results[0].assert_accept(0, 0, 0, 1);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+ });
+ });
+}
+
+seastar::future<>
+test_v2_racing_connect_reconnect_lose(FailoverTest& test) {
+ TestInterceptor interceptor;
+ interceptor.make_fault({Tag::SERVER_IDENT, bp_type_t::READ});
+ interceptor.make_block({Tag::CLIENT_IDENT, bp_type_t::WRITE}, 2);
+ return test.run_suite("test_v2_racing_connect_reconnect_lose",
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_replaced(1);
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 2, 0, 0);
+ results[0].assert_accept(0, 0, 0, 1);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_racing_connect_reconnect_win(FailoverTest& test) {
+ TestInterceptor interceptor;
+ interceptor.make_fault({Tag::SERVER_IDENT, bp_type_t::READ});
+ interceptor.make_block({Tag::SESSION_RECONNECT, bp_type_t::READ});
+ return test.run_suite("test_v2_racing_connect_reconnect_win",
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 2, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::closed);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0, 1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_stale_connect(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::READ};
+ TestInterceptor interceptor;
+ interceptor.make_stall(bp);
+ return test.run_suite(
+ fmt::format("test_v2_stale_connect -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_replaced(1);
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 1, 0, 0);
+ results[0].assert_accept(0, 0, 0, 1);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_stale_reconnect(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::SESSION_RECONNECT_OK, bp_type_t::READ};
+ TestInterceptor interceptor;
+ interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
+ interceptor.make_stall(bp);
+ return test.run_suite(
+ fmt::format("test_v2_stale_reconnect -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_replaced(1);
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(2, 1, 1, 1);
+ results[0].assert_accept(0, 0, 0, 1);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0, 1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_stale_accept(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
+ TestInterceptor interceptor;
+ interceptor.make_stall(bp);
+ return test.run_suite(
+ fmt::format("test_v2_stale_accept -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_established();
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_stale_establishing(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
+ TestInterceptor interceptor;
+ interceptor.make_stall(bp);
+ return test.run_suite(
+ fmt::format("test_v2_stale_establishing -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_replaced(1);
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0);
+ results[1].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_stale_reaccept(FailoverTest& test) {
+ auto bp = Breakpoint{Tag::SESSION_RECONNECT_OK, bp_type_t::WRITE};
+ TestInterceptor interceptor;
+ interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
+ interceptor.make_stall(bp);
+ return test.run_suite(
+ fmt::format("test_v2_stale_reaccept -- {}", bp),
+ interceptor,
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ return test.peer_send_me();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&suite] {
+ return suite.wait_blocked();
+ }).then([] {
+ logger().info("[Test] block the broken REPLACING for 210ms...");
+ return seastar::sleep(210ms);
+ }).then([&suite] {
+ suite.unblock();
+ return suite.wait_results(3);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 3);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 0, 1, 0);
+ results[1].assert_reset(0, 0);
+ results[2].assert_state_at(conn_state_t::replaced);
+ results[2].assert_connect(0, 0, 0, 0);
+ results[2].assert_accept(1, 0);
+ results[2].assert_reset(0, 0);
+ ceph_assert(results[2].server_reconnect_attempts >= 1);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossy_client(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_lossy_client",
+ TestInterceptor(),
+ policy_t::lossy_client,
+ policy_t::stateless_server,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return suite.connect_peer();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 1 --");
+ logger().info("[Test] client markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(1, 1, 0, 1);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 2 --");
+ logger().info("[Test] server markdown...");
+ return test.markdown_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::closed);
+ results[1].assert_connect(1, 1, 0, 1);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(1, 0);
+ }).then([&suite] {
+ logger().info("-- 3 --");
+ logger().info("[Test] client reconnect...");
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_results(3);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::closed);
+ results[1].assert_connect(1, 1, 0, 1);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(1, 0);
+ results[2].assert_state_at(conn_state_t::established);
+ results[2].assert_connect(1, 1, 0, 1);
+ results[2].assert_accept(0, 0, 0, 0);
+ results[2].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_stateless_server(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_stateless_server",
+ TestInterceptor(),
+ policy_t::stateless_server,
+ policy_t::lossy_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 1 --");
+ logger().info("[Test] client markdown...");
+ return test.markdown_peer();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(1, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 2 --");
+ logger().info("[Test] server markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(1, 0);
+ results[1].assert_state_at(conn_state_t::closed);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 3 --");
+ logger().info("[Test] client reconnect...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(3);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(1, 0);
+ results[1].assert_state_at(conn_state_t::closed);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 1);
+ results[1].assert_reset(0, 0);
+ results[2].assert_state_at(conn_state_t::established);
+ results[2].assert_connect(0, 0, 0, 0);
+ results[2].assert_accept(1, 1, 0, 1);
+ results[2].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_client(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_lossless_client",
+ TestInterceptor(),
+ policy_t::lossless_client,
+ policy_t::stateful_server,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return suite.connect_peer();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 1 --");
+ logger().info("[Test] client markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(1, 1, 0, 1);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 2 --");
+ logger().info("[Test] server markdown...");
+ return test.markdown_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(2, 2, 1, 2);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 1);
+ }).then([&suite] {
+ logger().info("-- 3 --");
+ logger().info("[Test] client reconnect...");
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(2, 2, 1, 2);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 1);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_stateful_server(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_stateful_server",
+ TestInterceptor(),
+ policy_t::stateful_server,
+ policy_t::lossless_client,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 1 --");
+ logger().info("[Test] client markdown...");
+ return test.markdown_peer();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 1);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 2 --");
+ logger().info("[Test] server markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.wait_results(3);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 1);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ results[2].assert_state_at(conn_state_t::established);
+ results[2].assert_connect(0, 0, 0, 0);
+ results[2].assert_accept(1, 1, 1, 1);
+ results[2].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 3 --");
+ logger().info("[Test] client reconnect...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(3);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 1);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ results[2].assert_state_at(conn_state_t::established);
+ results[2].assert_connect(0, 0, 0, 0);
+ results[2].assert_accept(1, 1, 1, 1);
+ results[2].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_peer_reuse_connector(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_peer_reuse_connector",
+ TestInterceptor(),
+ policy_t::lossless_peer_reuse,
+ policy_t::lossless_peer_reuse,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return suite.connect_peer();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 1 --");
+ logger().info("[Test] connector markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(1, 1, 0, 1);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 2 --");
+ logger().info("[Test] acceptor markdown...");
+ return test.markdown_peer();
+ }).then([&suite] {
+ ceph_assert(suite.is_standby());
+ logger().info("-- 3 --");
+ logger().info("[Test] connector reconnect...");
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.try_send_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(2, 2, 1, 2);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 1);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_peer_reuse_acceptor(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_peer_reuse_acceptor",
+ TestInterceptor(),
+ policy_t::lossless_peer_reuse,
+ policy_t::lossless_peer_reuse,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 1 --");
+ logger().info("[Test] connector markdown...");
+ return test.markdown_peer();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 1);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 2 --");
+ logger().info("[Test] acceptor markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 1);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 3 --");
+ logger().info("[Test] connector reconnect...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.try_peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(3);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 1);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ results[2].assert_state_at(conn_state_t::established);
+ results[2].assert_connect(0, 0, 0, 0);
+ results[2].assert_accept(1, 1, 1, 1);
+ results[2].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_peer_connector(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_lossless_peer_connector",
+ TestInterceptor(),
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&suite] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return suite.connect_peer();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 1 --");
+ logger().info("[Test] connector markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.send_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(1, 1, 0, 1);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 2 --");
+ logger().info("[Test] acceptor markdown...");
+ return test.markdown_peer();
+ }).then([&suite] {
+ ceph_assert(suite.is_standby());
+ logger().info("-- 3 --");
+ logger().info("[Test] connector reconnect...");
+ return suite.connect_peer();
+ }).then([&suite] {
+ return suite.try_send_peer();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(1, 1, 0, 1);
+ results[0].assert_accept(0, 0, 0, 0);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::established);
+ results[1].assert_connect(2, 2, 1, 2);
+ results[1].assert_accept(0, 0, 0, 0);
+ results[1].assert_reset(0, 1);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_lossless_peer_acceptor(FailoverTest& test) {
+ return test.run_suite(
+ "test_v2_lossless_peer_acceptor",
+ TestInterceptor(),
+ policy_t::lossless_peer,
+ policy_t::lossless_peer,
+ [&test] (FailoverSuite& suite) {
+ return seastar::futurize_invoke([&test] {
+ logger().info("-- 0 --");
+ logger().info("[Test] setup connection...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.send_bidirectional();
+ }).then([&suite] {
+ return suite.wait_results(1);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 1);
+ results[0].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 1 --");
+ logger().info("[Test] connector markdown...");
+ return test.markdown_peer();
+ }).then([&test] {
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::established);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&suite] {
+ logger().info("-- 2 --");
+ logger().info("[Test] acceptor markdown...");
+ return suite.markdown();
+ }).then([&suite] {
+ return suite.wait_results(2);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ }).then([&test] {
+ logger().info("-- 3 --");
+ logger().info("[Test] connector reconnect...");
+ return test.peer_connect_me();
+ }).then([&test] {
+ return test.try_peer_send_me();
+ }).then([&suite] {
+ return suite.wait_results(3);
+ }).then([] (ConnResults& results) {
+ results[0].assert_state_at(conn_state_t::closed);
+ results[0].assert_connect(0, 0, 0, 0);
+ results[0].assert_accept(1, 1, 0, 2);
+ results[0].assert_reset(0, 0);
+ results[1].assert_state_at(conn_state_t::replaced);
+ results[1].assert_connect(0, 0, 0, 0);
+ results[1].assert_accept(1, 1, 0, 0);
+ results[1].assert_reset(0, 0);
+ results[2].assert_state_at(conn_state_t::established);
+ results[2].assert_connect(0, 0, 0, 0);
+ results[2].assert_accept(1, 1, 1, 1);
+ results[2].assert_reset(0, 0);
+ });
+ });
+}
+
+seastar::future<>
+test_v2_protocol(entity_addr_t test_addr,
+ entity_addr_t cmd_peer_addr,
+ entity_addr_t test_peer_addr,
+ bool test_peer_islocal,
+ bool peer_wins) {
+ ceph_assert_always(test_addr.is_msgr2());
+ ceph_assert_always(cmd_peer_addr.is_msgr2());
+ ceph_assert_always(test_peer_addr.is_msgr2());
+
+ if (test_peer_islocal) {
+ // initiate crimson test peer locally
+ logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr);
+ return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr
+ ).then([test_addr, cmd_peer_addr, test_peer_addr, peer_wins](auto peer) {
+ return test_v2_protocol(
+ test_addr,
+ cmd_peer_addr,
+ test_peer_addr,
+ false,
+ peer_wins
+ ).then([peer = std::move(peer)] () mutable {
+ return peer->wait().then([peer = std::move(peer)] {});
+ });
+ }).handle_exception([] (auto eptr) {
+ logger().error("FailoverTestPeer failed: got exception {}", eptr);
+ throw;
+ });
+ }
+
+ return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr
+ ).then([peer_wins](auto test) {
+ return seastar::futurize_invoke([test] {
+ return test_v2_lossy_early_connect_fault(*test);
+ }).then([test] {
+ return test_v2_lossy_connect_fault(*test);
+ }).then([test] {
+ return test_v2_lossy_connected_fault(*test);
+ }).then([test] {
+ return test_v2_lossy_early_accept_fault(*test);
+ }).then([test] {
+ return test_v2_lossy_accept_fault(*test);
+ }).then([test] {
+ return test_v2_lossy_establishing_fault(*test);
+ }).then([test] {
+ return test_v2_lossy_accepted_fault(*test);
+ }).then([test] {
+ return test_v2_lossless_connect_fault(*test);
+ }).then([test] {
+ return test_v2_lossless_connected_fault(*test);
+ }).then([test] {
+ return test_v2_lossless_connected_fault2(*test);
+ }).then([test] {
+ return test_v2_lossless_reconnect_fault(*test);
+ }).then([test] {
+ return test_v2_lossless_accept_fault(*test);
+ }).then([test] {
+ return test_v2_lossless_establishing_fault(*test);
+ }).then([test] {
+ return test_v2_lossless_accepted_fault(*test);
+ }).then([test] {
+ return test_v2_lossless_reaccept_fault(*test);
+ }).then([test] {
+ return test_v2_peer_connect_fault(*test);
+ }).then([test] {
+ return test_v2_peer_accept_fault(*test);
+ }).then([test] {
+ return test_v2_peer_establishing_fault(*test);
+ }).then([test] {
+ return test_v2_peer_connected_fault_reconnect(*test);
+ }).then([test] {
+ return test_v2_peer_connected_fault_reaccept(*test);
+ }).then([test] {
+ return check_peer_wins(*test);
+ }).then([test, peer_wins](bool ret_peer_wins) {
+ ceph_assert(peer_wins == ret_peer_wins);
+ if (ret_peer_wins) {
+ return seastar::futurize_invoke([test] {
+ return test_v2_racing_connect_acceptor_win(*test);
+ }).then([test] {
+ return test_v2_racing_reconnect_acceptor_win(*test);
+ });
+ } else {
+ return seastar::futurize_invoke([test] {
+ return test_v2_racing_connect_acceptor_lose(*test);
+ }).then([test] {
+ return test_v2_racing_reconnect_acceptor_lose(*test);
+ });
+ }
+ }).then([test] {
+ return test_v2_racing_connect_reconnect_win(*test);
+ }).then([test] {
+ return test_v2_racing_connect_reconnect_lose(*test);
+ }).then([test] {
+ return test_v2_stale_connect(*test);
+ }).then([test] {
+ return test_v2_stale_reconnect(*test);
+ }).then([test] {
+ return test_v2_stale_accept(*test);
+ }).then([test] {
+ return test_v2_stale_establishing(*test);
+ }).then([test] {
+ return test_v2_stale_reaccept(*test);
+ }).then([test] {
+ return test_v2_lossy_client(*test);
+ }).then([test] {
+ return test_v2_stateless_server(*test);
+ }).then([test] {
+ return test_v2_lossless_client(*test);
+ }).then([test] {
+ return test_v2_stateful_server(*test);
+ }).then([test] {
+ return test_v2_peer_reuse_connector(*test);
+ }).then([test] {
+ return test_v2_peer_reuse_acceptor(*test);
+ }).then([test] {
+ return test_v2_lossless_peer_connector(*test);
+ }).then([test] {
+ return test_v2_lossless_peer_acceptor(*test);
+ }).then([test] {
+ return test->shutdown().then([test] {});
+ });
+ }).handle_exception([] (auto eptr) {
+ logger().error("FailoverTest failed: got exception {}", eptr);
+ throw;
+ });
+}
+
+}
+
+seastar::future<int> do_test(seastar::app_template& app)
+{
+ std::vector<const char*> args;
+ std::string cluster;
+ std::string conf_file_list;
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ &cluster,
+ &conf_file_list);
+ return crimson::common::sharded_conf().start(
+ init_params.name, cluster
+ ).then([] {
+ return local_conf().start();
+ }).then([conf_file_list] {
+ return local_conf().parse_config_files(conf_file_list);
+ }).then([&app] {
+ auto&& config = app.configuration();
+ verbose = config["verbose"].as<bool>();
+ auto rounds = config["rounds"].as<unsigned>();
+ auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+ auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
+
+ entity_addr_t test_addr;
+ ceph_assert(test_addr.parse(
+ config["test-addr"].as<std::string>().c_str(), nullptr));
+ test_addr.set_nonce(TEST_NONCE);
+
+ entity_addr_t cmd_peer_addr;
+ ceph_assert(cmd_peer_addr.parse(
+ config["testpeer-addr"].as<std::string>().c_str(), nullptr));
+ cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
+
+ entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+ bool peer_wins = (test_addr > test_peer_addr);
+
+ logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
+ "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
+ "testpeer_islocal={}, peer_wins={}, smp={}",
+ verbose, rounds, keepalive_ratio,
+ test_addr, cmd_peer_addr, test_peer_addr,
+ testpeer_islocal, peer_wins,
+ seastar::smp::count);
+ return test_echo(rounds, keepalive_ratio
+ ).then([] {
+ return test_preemptive_shutdown();
+ }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
+ return test_v2_protocol(
+ test_addr,
+ cmd_peer_addr,
+ test_peer_addr,
+ testpeer_islocal,
+ peer_wins);
+ }).then([] {
+ logger().info("All tests succeeded");
+ // Seastar has bugs to have events undispatched during shutdown,
+ // which will result in memory leak and thus fail LeakSanitizer.
+ return seastar::sleep(100ms);
+ });
+ }).then([] {
+ return crimson::common::sharded_conf().stop();
+ }).then([] {
+ return 0;
+ }).handle_exception([] (auto eptr) {
+ logger().error("Test failed: got exception {}", eptr);
+ return 1;
+ });
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ app.add_options()
+ ("verbose,v", bpo::value<bool>()->default_value(false),
+ "chatty if true")
+ ("rounds", bpo::value<unsigned>()->default_value(512),
+ "number of pingpong rounds")
+ ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
+ "ratio of keepalive in ping messages")
+ ("test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9014"),
+ "address of v2 failover tests")
+ ("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
+ "addresses of v2 failover testpeer"
+ " (This is CmdSrv address, and TestPeer address is at port+=1)")
+ ("testpeer-islocal", bpo::value<bool>()->default_value(true),
+ "create a local crimson testpeer, or connect to a remote testpeer");
+ return app.run(argc, argv, [&app] {
+ // This test normally succeeds within 60 seconds, so kill it after 300
+ // seconds in case it is blocked forever due to unaddressed bugs.
+ return seastar::with_timeout(seastar::lowres_clock::now() + 300s, do_test(app))
+ .handle_exception_type([](seastar::timed_out_error&) {
+ logger().error("test_messenger timeout after 300s, abort! "
+ "Consider to extend the period if the test is still running.");
+ // use the retcode of timeout(1)
+ return 124;
+ });
+ });
+}