summaryrefslogtreecommitdiffstats
path: root/src/test/crimson/test_socket.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/crimson/test_socket.cc')
-rw-r--r--src/test/crimson/test_socket.cc490
1 files changed, 490 insertions, 0 deletions
diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc
new file mode 100644
index 000000000..bfdeeea2a
--- /dev/null
+++ b/src/test/crimson/test_socket.cc
@@ -0,0 +1,490 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/when_all.hh>
+#include <seastar/util/later.hh>
+
+#include "crimson/common/log.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/Fwd.h"
+#include "crimson/net/Socket.h"
+
+namespace {
+
+using seastar::engine;
+using seastar::future;
+using crimson::net::error;
+using crimson::net::FixedCPUServerSocket;
+using crimson::net::Socket;
+using crimson::net::SocketRef;
+using crimson::net::stop_t;
+
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
+
+static seastar::logger logger{"crimsontest"};
+static entity_addr_t get_server_addr() {
+ static int port = 9020;
+ ++port;
+ ceph_assert(port < 9030 && "socket and messenger test ports should not overlap");
+ entity_addr_t saddr;
+ saddr.parse("127.0.0.1", nullptr);
+ saddr.set_port(port);
+ return saddr;
+}
+
+future<SocketRef> socket_connect(const entity_addr_t& saddr) {
+ logger.debug("socket_connect() to {} ...", saddr);
+ return Socket::connect(saddr).then([] (auto socket) {
+ logger.debug("socket_connect() connected");
+ return socket;
+ });
+}
+
+future<> test_refused() {
+ logger.info("test_refused()...");
+ auto saddr = get_server_addr();
+ return socket_connect(saddr).discard_result().then([saddr] {
+ logger.error("test_refused(): connection to {} is not refused", saddr);
+ ceph_abort();
+ }).handle_exception_type([] (const std::system_error& e) {
+ if (e.code() != std::errc::connection_refused) {
+ logger.error("test_refused() got unexpeted error {}", e);
+ ceph_abort();
+ } else {
+ logger.info("test_refused() ok\n");
+ }
+ }).handle_exception([] (auto eptr) {
+ logger.error("test_refused() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+}
+
+future<> test_bind_same() {
+ logger.info("test_bind_same()...");
+ return FixedCPUServerSocket::create().then([] (auto pss1) {
+ auto saddr = get_server_addr();
+ return pss1->listen(saddr).safe_then([saddr] {
+ // try to bind the same address
+ return FixedCPUServerSocket::create().then([saddr] (auto pss2) {
+ return pss2->listen(saddr).safe_then([] {
+ logger.error("test_bind_same() should raise address_in_use");
+ ceph_abort();
+ }, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ if (e == std::errc::address_in_use) {
+ // successful!
+ logger.info("test_bind_same() ok\n");
+ } else {
+ logger.error("test_bind_same() got unexpected error {}", e);
+ ceph_abort();
+ }
+ // Note: need to return a explicit ready future, or there will be a
+ // runtime error: member access within null pointer of type 'struct promise_base'
+ return seastar::now();
+ })).then([pss2] {
+ return pss2->destroy();
+ });
+ });
+ }, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [saddr] (const std::error_code& e) {
+ logger.error("test_bind_same(): there is another instance running at {}",
+ saddr);
+ ceph_abort();
+ })).then([pss1] {
+ return pss1->destroy();
+ }).handle_exception([] (auto eptr) {
+ logger.error("test_bind_same() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+ });
+}
+
+future<> test_accept() {
+ logger.info("test_accept()");
+ return FixedCPUServerSocket::create().then([] (auto pss) {
+ auto saddr = get_server_addr();
+ return pss->listen(saddr).safe_then([pss] {
+ return pss->accept([] (auto socket, auto paddr) {
+ // simple accept
+ return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
+ return socket->close().finally([cleanup = std::move(socket)] {});
+ });
+ });
+ }, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [saddr] (const std::error_code& e) {
+ logger.error("test_accept(): there is another instance running at {}",
+ saddr);
+ ceph_abort();
+ })).then([saddr] {
+ return seastar::when_all(
+ socket_connect(saddr).then([] (auto socket) {
+ return socket->close().finally([cleanup = std::move(socket)] {}); }),
+ socket_connect(saddr).then([] (auto socket) {
+ return socket->close().finally([cleanup = std::move(socket)] {}); }),
+ socket_connect(saddr).then([] (auto socket) {
+ return socket->close().finally([cleanup = std::move(socket)] {}); })
+ ).discard_result();
+ }).then([] {
+ // should be enough to be connected locally
+ return seastar::sleep(50ms);
+ }).then([] {
+ logger.info("test_accept() ok\n");
+ }).then([pss] {
+ return pss->destroy();
+ }).handle_exception([] (auto eptr) {
+ logger.error("test_accept() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+ });
+}
+
+class SocketFactory {
+ SocketRef client_socket;
+ SocketFRef server_socket;
+ FixedCPUServerSocket *pss = nullptr;
+ seastar::promise<> server_connected;
+
+ public:
+ // cb_client() on CPU#0, cb_server() on CPU#1
+ template <typename FuncC, typename FuncS>
+ static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
+ assert(seastar::this_shard_id() == 0u);
+ auto owner = std::make_unique<SocketFactory>();
+ auto psf = owner.get();
+ auto saddr = get_server_addr();
+ return seastar::smp::submit_to(1u, [psf, saddr] {
+ return FixedCPUServerSocket::create().then([psf, saddr] (auto pss) {
+ psf->pss = pss;
+ return pss->listen(saddr
+ ).safe_then([]{}, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [saddr] (const std::error_code& e) {
+ logger.error("dispatch_sockets(): there is another instance running at {}",
+ saddr);
+ ceph_abort();
+ }));
+ });
+ }).then([psf, saddr] {
+ return seastar::when_all_succeed(
+ seastar::smp::submit_to(0u, [psf, saddr] {
+ return socket_connect(saddr).then([psf] (auto socket) {
+ psf->client_socket = std::move(socket);
+ });
+ }),
+ seastar::smp::submit_to(1u, [psf] {
+ return psf->pss->accept([psf] (auto socket, auto paddr) {
+ psf->server_socket = seastar::make_foreign(std::move(socket));
+ return seastar::smp::submit_to(0u, [psf] {
+ psf->server_connected.set_value();
+ });
+ });
+ })
+ );
+ }).then_unpack([] {
+ return seastar::now();
+ }).then([psf] {
+ return psf->server_connected.get_future();
+ }).then([psf] {
+ if (psf->pss) {
+ return seastar::smp::submit_to(1u, [psf] {
+ return psf->pss->destroy();
+ });
+ }
+ return seastar::now();
+ }).then([psf,
+ cb_client = std::move(cb_client),
+ cb_server = std::move(cb_server)] () mutable {
+ logger.debug("dispatch_sockets(): client/server socket are ready");
+ return seastar::when_all_succeed(
+ seastar::smp::submit_to(0u, [socket = psf->client_socket.get(),
+ cb_client = std::move(cb_client)] {
+ return cb_client(socket).then([socket] {
+ logger.debug("closing client socket...");
+ return socket->close();
+ }).handle_exception([] (auto eptr) {
+ logger.error("dispatch_sockets():"
+ " cb_client() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+ }),
+ seastar::smp::submit_to(1u, [socket = psf->server_socket.get(),
+ cb_server = std::move(cb_server)] {
+ return cb_server(socket).then([socket] {
+ logger.debug("closing server socket...");
+ return socket->close();
+ }).handle_exception([] (auto eptr) {
+ logger.error("dispatch_sockets():"
+ " cb_server() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+ })
+ );
+ }).then_unpack([] {
+ return seastar::now();
+ }).finally([cleanup = std::move(owner)] {});
+ }
+};
+
+class Connection {
+ static const uint64_t DATA_TAIL = 5327;
+ static const unsigned DATA_SIZE = 4096;
+ std::array<uint64_t, DATA_SIZE> data = {0};
+
+ void verify_data_read(const uint64_t read_data[]) {
+ ceph_assert(read_data[0] == read_count);
+ ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL);
+ }
+
+ Socket* socket = nullptr;
+ uint64_t write_count = 0;
+ uint64_t read_count = 0;
+
+ Connection(Socket* socket) : socket{socket} {
+ assert(socket);
+ data[DATA_SIZE - 1] = DATA_TAIL;
+ }
+
+ future<> dispatch_write(unsigned round = 0, bool force_shut = false) {
+ logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut);
+ return seastar::repeat([this, round, force_shut] {
+ if (round != 0 && round <= write_count) {
+ return seastar::futurize_invoke([this, force_shut] {
+ if (force_shut) {
+ logger.debug("dispatch_write() done, force shutdown output");
+ socket->force_shutdown_out();
+ } else {
+ logger.debug("dispatch_write() done");
+ }
+ }).then([] {
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ });
+ } else {
+ data[0] = write_count;
+ return socket->write(seastar::net::packet(
+ reinterpret_cast<const char*>(&data), sizeof(data))
+ ).then([this] {
+ return socket->flush();
+ }).then([this] {
+ write_count += 1;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
+ }
+ });
+ }
+
+ future<> dispatch_write_unbounded() {
+ return dispatch_write(
+ ).then([] {
+ ceph_abort();
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset) {
+ logger.error("dispatch_write_unbounded(): "
+ "unexpected error {}", e);
+ throw;
+ }
+ // successful
+ logger.debug("dispatch_write_unbounded(): "
+ "expected error {}", e);
+ shutdown();
+ });
+ }
+
+ future<> dispatch_read(unsigned round = 0, bool force_shut = false) {
+ logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut);
+ return seastar::repeat([this, round, force_shut] {
+ if (round != 0 && round <= read_count) {
+ return seastar::futurize_invoke([this, force_shut] {
+ if (force_shut) {
+ logger.debug("dispatch_read() done, force shutdown input");
+ socket->force_shutdown_in();
+ } else {
+ logger.debug("dispatch_read() done");
+ }
+ }).then([] {
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ });
+ } else {
+ return seastar::futurize_invoke([this] {
+ // we want to test both Socket::read() and Socket::read_exactly()
+ if (read_count % 2) {
+ return socket->read(DATA_SIZE * sizeof(uint64_t)
+ ).then([this] (ceph::bufferlist bl) {
+ uint64_t read_data[DATA_SIZE];
+ auto p = bl.cbegin();
+ ::ceph::decode_raw(read_data, p);
+ verify_data_read(read_data);
+ });
+ } else {
+ return socket->read_exactly(DATA_SIZE * sizeof(uint64_t)
+ ).then([this] (auto buf) {
+ auto read_data = reinterpret_cast<const uint64_t*>(buf.get());
+ verify_data_read(read_data);
+ });
+ }
+ }).then([this] {
+ ++read_count;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
+ }
+ });
+ }
+
+ future<> dispatch_read_unbounded() {
+ return dispatch_read(
+ ).then([] {
+ ceph_abort();
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() != error::read_eof
+ && e.code() != std::errc::connection_reset) {
+ logger.error("dispatch_read_unbounded(): "
+ "unexpected error {}", e);
+ throw;
+ }
+ // successful
+ logger.debug("dispatch_read_unbounded(): "
+ "expected error {}", e);
+ shutdown();
+ });
+ }
+
+ void shutdown() {
+ socket->shutdown();
+ }
+
+ public:
+ static future<> dispatch_rw_bounded(Socket* socket, unsigned round,
+ bool force_shut = false) {
+ logger.debug("dispatch_rw_bounded(round={}, force_shut={})...",
+ round, force_shut);
+ return seastar::do_with(Connection{socket},
+ [round, force_shut] (auto& conn) {
+ ceph_assert(round != 0);
+ return seastar::when_all_succeed(
+ conn.dispatch_write(round, force_shut),
+ conn.dispatch_read(round, force_shut)
+ ).then_unpack([] {
+ return seastar::now();
+ });
+ });
+ }
+
+ static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) {
+ logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut);
+ return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) {
+ return seastar::when_all_succeed(
+ conn.dispatch_write_unbounded(),
+ conn.dispatch_read_unbounded(),
+ seastar::futurize_invoke([&conn, preemptive_shut] {
+ if (preemptive_shut) {
+ return seastar::sleep(100ms).then([&conn] {
+ logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)");
+ conn.shutdown();
+ });
+ } else {
+ return seastar::now();
+ }
+ })
+ ).then_unpack([] {
+ return seastar::now();
+ });
+ });
+ }
+};
+
+future<> test_read_write() {
+ logger.info("test_read_write()...");
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
+ [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
+ ).then([] {
+ logger.info("test_read_write() ok\n");
+ }).handle_exception([] (auto eptr) {
+ logger.error("test_read_write() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+}
+
+future<> test_unexpected_down() {
+ logger.info("test_unexpected_down()...");
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) {
+ return Connection::dispatch_rw_bounded(cs, 128, true
+ ).handle_exception_type([] (const std::system_error& e) {
+ logger.debug("test_unexpected_down(): client get error {}", e);
+ ceph_assert(e.code() == error::read_eof);
+ });
+ },
+ [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+ ).then([] {
+ logger.info("test_unexpected_down() ok\n");
+ }).handle_exception([] (auto eptr) {
+ logger.error("test_unexpected_down() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+}
+
+future<> test_shutdown_propagated() {
+ logger.info("test_shutdown_propagated()...");
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) {
+ logger.debug("test_shutdown_propagated() shutdown client socket");
+ cs->shutdown();
+ return seastar::now();
+ },
+ [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+ ).then([] {
+ logger.info("test_shutdown_propagated() ok\n");
+ }).handle_exception([] (auto eptr) {
+ logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+}
+
+future<> test_preemptive_down() {
+ logger.info("test_preemptive_down()...");
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
+ [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+ ).then([] {
+ logger.info("test_preemptive_down() ok\n");
+ }).handle_exception([] (auto eptr) {
+ logger.error("test_preemptive_down() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+}
+
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ return app.run(argc, argv, [] {
+ return seastar::futurize_invoke([] {
+ return test_refused();
+ }).then([] {
+ return test_bind_same();
+ }).then([] {
+ return test_accept();
+ }).then([] {
+ return test_read_write();
+ }).then([] {
+ return test_unexpected_down();
+ }).then([] {
+ return test_shutdown_propagated();
+ }).then([] {
+ return test_preemptive_down();
+ }).then([] {
+ logger.info("All tests succeeded");
+ // Seastar has bugs to have events undispatched during shutdown,
+ // which will result in memory leak and thus fail LeakSanitizer.
+ return seastar::sleep(100ms);
+ }).handle_exception([] (auto eptr) {
+ std::cout << "Test failure" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}