summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Socket.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/net/Socket.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net/Socket.cc')
-rw-r--r--src/crimson/net/Socket.cc523
1 files changed, 523 insertions, 0 deletions
diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc
new file mode 100644
index 000000000..95b1e2250
--- /dev/null
+++ b/src/crimson/net/Socket.cc
@@ -0,0 +1,523 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Socket.h"
+
+#include <seastar/core/sleep.hh>
+#include <seastar/core/when_all.hh>
+#include <seastar/net/packet.hh>
+
+#include "crimson/common/log.h"
+#include "Errors.h"
+
+using crimson::common::local_conf;
+
+namespace crimson::net {
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+using tmp_buf = seastar::temporary_buffer<char>;
+using packet = seastar::net::packet;
+
+// an input_stream consumer that reads buffer segments into a bufferlist up to
+// the given number of remaining bytes
+struct bufferlist_consumer {
+ bufferlist& bl;
+ size_t& remaining;
+
+ bufferlist_consumer(bufferlist& bl, size_t& remaining)
+ : bl(bl), remaining(remaining) {}
+
+ using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
+
+ // consume some or all of a buffer segment
+ seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
+ if (remaining >= data.size()) {
+ // consume the whole buffer
+ remaining -= data.size();
+ bl.append(buffer::create(std::move(data)));
+ if (remaining > 0) {
+ // return none to request more segments
+ return seastar::make_ready_future<consumption_result_type>(
+ seastar::continue_consuming{});
+ } else {
+ // return an empty buffer to singal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type({}));
+ }
+ }
+ if (remaining > 0) {
+ // consume the front
+ bl.append(buffer::create(data.share(0, remaining)));
+ data.trim_front(remaining);
+ remaining = 0;
+ }
+ // give the rest back to signal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type{std::move(data)});
+ };
+};
+
+seastar::future<> inject_delay()
+{
+ if (float delay_period = local_conf()->ms_inject_internal_delays;
+ delay_period) {
+ logger().debug("Socket::inject_delay: sleep for {}", delay_period);
+ return seastar::sleep(
+ std::chrono::milliseconds((int)(delay_period * 1000.0)));
+ }
+ return seastar::now();
+}
+
+void inject_failure()
+{
+ if (local_conf()->ms_inject_socket_failures) {
+ uint64_t rand =
+ ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
+ if (rand % local_conf()->ms_inject_socket_failures == 0) {
+ logger().warn("Socket::inject_failure: injecting socket failure");
+ throw std::system_error(make_error_code(
+ error::negotiation_failure));
+ }
+ }
+}
+
+} // anonymous namespace
+
+Socket::Socket(
+ seastar::connected_socket &&_socket,
+ side_t _side,
+ uint16_t e_port,
+ construct_tag)
+ : sid{seastar::this_shard_id()},
+ socket(std::move(_socket)),
+ in(socket.input()),
+ // the default buffer size 8192 is too small that may impact our write
+ // performance. see seastar::net::connected_socket::output()
+ out(socket.output(65536)),
+ socket_is_shutdown(false),
+ side(_side),
+ ephemeral_port(e_port)
+{
+ if (local_conf()->ms_tcp_nodelay) {
+ socket.set_nodelay(true);
+ }
+}
+
+Socket::~Socket()
+{
+ assert(seastar::this_shard_id() == sid);
+#ifndef NDEBUG
+ assert(closed);
+#endif
+}
+
+seastar::future<bufferlist>
+Socket::read(size_t bytes)
+{
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_read).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferlist>();
+ }
+ r.buffer.clear();
+ r.remaining = bytes;
+ return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
+ if (r.remaining) { // throw on short reads
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ inject_failure();
+ return inject_delay().then([this] {
+ return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+ });
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this](auto buf) {
+ return try_trap_post(next_trap_read
+ ).then([buf = std::move(buf)]() mutable {
+ return std::move(buf);
+ });
+ });
+#endif
+}
+
+seastar::future<bufferptr>
+Socket::read_exactly(size_t bytes) {
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_read).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferptr>();
+ }
+ return in.read_exactly(bytes).then([bytes](auto buf) {
+ bufferptr ptr(buffer::create(buf.share()));
+ if (ptr.length() < bytes) {
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ inject_failure();
+ return inject_delay(
+ ).then([ptr = std::move(ptr)]() mutable {
+ return seastar::make_ready_future<bufferptr>(std::move(ptr));
+ });
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this](auto ptr) {
+ return try_trap_post(next_trap_read
+ ).then([ptr = std::move(ptr)]() mutable {
+ return std::move(ptr);
+ });
+ });
+#endif
+}
+
+seastar::future<>
+Socket::write(bufferlist buf)
+{
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write
+ ).then([buf = std::move(buf), this]() mutable {
+#endif
+ inject_failure();
+ return inject_delay(
+ ).then([buf = std::move(buf), this]() mutable {
+ packet p(std::move(buf));
+ return out.write(std::move(p));
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+}
+
+seastar::future<>
+Socket::flush()
+{
+ assert(seastar::this_shard_id() == sid);
+ inject_failure();
+ return inject_delay().then([this] {
+ return out.flush();
+ });
+}
+
+seastar::future<>
+Socket::write_flush(bufferlist buf)
+{
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write
+ ).then([buf = std::move(buf), this]() mutable {
+#endif
+ inject_failure();
+ return inject_delay(
+ ).then([buf = std::move(buf), this]() mutable {
+ packet p(std::move(buf));
+ return out.write(std::move(p)
+ ).then([this] {
+ return out.flush();
+ });
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+}
+
+void Socket::shutdown()
+{
+ assert(seastar::this_shard_id() == sid);
+ socket_is_shutdown = true;
+ socket.shutdown_input();
+ socket.shutdown_output();
+}
+
+static inline seastar::future<>
+close_and_handle_errors(seastar::output_stream<char>& out)
+{
+ return out.close().handle_exception_type([](const std::system_error& e) {
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset) {
+ logger().error("Socket::close(): unexpected error {}", e.what());
+ ceph_abort();
+ }
+ // can happen when out is already shutdown, ignore
+ });
+}
+
+seastar::future<>
+Socket::close()
+{
+ assert(seastar::this_shard_id() == sid);
+#ifndef NDEBUG
+ ceph_assert_always(!closed);
+ closed = true;
+#endif
+ return seastar::when_all_succeed(
+ inject_delay(),
+ in.close(),
+ close_and_handle_errors(out)
+ ).then_unpack([] {
+ return seastar::make_ready_future<>();
+ }).handle_exception([](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("Socket::close(): unexpected exception {}", e_what);
+ ceph_abort();
+ });
+}
+
+seastar::future<SocketRef>
+Socket::connect(const entity_addr_t &peer_addr)
+{
+ inject_failure();
+ return inject_delay(
+ ).then([peer_addr] {
+ return seastar::connect(peer_addr.in4_addr());
+ }).then([peer_addr](seastar::connected_socket socket) {
+ auto ret = std::make_unique<Socket>(
+ std::move(socket), side_t::connector, 0, construct_tag{});
+ logger().debug("Socket::connect(): connected to {}, socket {}",
+ peer_addr, fmt::ptr(ret));
+ return ret;
+ });
+}
+
+#ifdef UNIT_TESTS_BUILT
+void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
+ assert(seastar::this_shard_id() == sid);
+ blocker = blocker_;
+ if (type == bp_type_t::READ) {
+ ceph_assert_always(next_trap_read == bp_action_t::CONTINUE);
+ next_trap_read = action;
+ } else { // type == bp_type_t::WRITE
+ if (next_trap_write == bp_action_t::CONTINUE) {
+ next_trap_write = action;
+ } else if (next_trap_write == bp_action_t::FAULT) {
+ // do_sweep_messages() may combine multiple write events into one socket write
+ ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
+ } else {
+ ceph_abort();
+ }
+ }
+}
+
+seastar::future<>
+Socket::try_trap_pre(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return blocker->block();
+ case bp_action_t::STALL:
+ trap = action;
+ break;
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ return seastar::make_ready_future<>();
+}
+
+seastar::future<>
+Socket::try_trap_post(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::STALL:
+ logger().info("[Test] got STALL and block");
+ force_shutdown();
+ return blocker->block();
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ return seastar::make_ready_future<>();
+}
+#endif
+
+ShardedServerSocket::ShardedServerSocket(
+ seastar::shard_id sid,
+ bool dispatch_only_on_primary_sid,
+ construct_tag)
+ : primary_sid{sid}, dispatch_only_on_primary_sid{dispatch_only_on_primary_sid}
+{
+}
+
+ShardedServerSocket::~ShardedServerSocket()
+{
+ assert(!listener);
+ // detect whether user have called destroy() properly
+ ceph_assert_always(!service);
+}
+
+listen_ertr::future<>
+ShardedServerSocket::listen(entity_addr_t addr)
+{
+ ceph_assert_always(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::listen()...", addr);
+ return this->container().invoke_on_all([addr](auto& ss) {
+ ss.listen_addr = addr;
+ seastar::socket_address s_addr(addr.in4_addr());
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ if (ss.dispatch_only_on_primary_sid) {
+ lo.set_fixed_cpu(ss.primary_sid);
+ }
+ ss.listener = seastar::listen(s_addr, lo);
+ }).then([] {
+ return listen_ertr::now();
+ }).handle_exception_type(
+ [addr](const std::system_error& e) -> listen_ertr::future<> {
+ if (e.code() == std::errc::address_in_use) {
+ logger().debug("ShardedServerSocket({})::listen(): address in use", addr);
+ return crimson::ct_error::address_in_use::make();
+ } else if (e.code() == std::errc::address_not_available) {
+ logger().debug("ShardedServerSocket({})::listen(): address not available",
+ addr);
+ return crimson::ct_error::address_not_available::make();
+ }
+ logger().error("ShardedServerSocket({})::listen(): "
+ "got unexpeted error {}", addr, e.what());
+ ceph_abort();
+ });
+}
+
+seastar::future<>
+ShardedServerSocket::accept(accept_func_t &&_fn_accept)
+{
+ ceph_assert_always(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
+ return this->container().invoke_on_all([_fn_accept](auto &ss) {
+ assert(ss.listener);
+ ss.fn_accept = _fn_accept;
+ // gate accepting
+ // ShardedServerSocket::shutdown() will drain the continuations in the gate
+ // so ignore the returned future
+ std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
+ return seastar::keep_doing([&ss] {
+ return ss.listener->accept(
+ ).then([&ss](seastar::accept_result accept_result) {
+#ifndef NDEBUG
+ if (ss.dispatch_only_on_primary_sid) {
+ // see seastar::listen_options::set_fixed_cpu()
+ ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
+ }
+#endif
+ auto [socket, paddr] = std::move(accept_result);
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ peer_addr.set_type(ss.listen_addr.get_type());
+ SocketRef _socket = std::make_unique<Socket>(
+ std::move(socket), Socket::side_t::acceptor,
+ peer_addr.get_port(), Socket::construct_tag{});
+ logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, "
+ "socket {}, dispatch_only_on_primary_sid = {}",
+ ss.listen_addr, peer_addr, fmt::ptr(_socket),
+ ss.dispatch_only_on_primary_sid);
+ std::ignore = seastar::with_gate(
+ ss.shutdown_gate,
+ [socket=std::move(_socket), peer_addr, &ss]() mutable {
+ return ss.fn_accept(std::move(socket), peer_addr
+ ).handle_exception([&ss, peer_addr](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("ShardedServerSocket({})::accept(): "
+ "fn_accept(s, {}) got unexpected exception {}",
+ ss.listen_addr, peer_addr, e_what);
+ ceph_abort();
+ });
+ });
+ });
+ }).handle_exception_type([&ss](const std::system_error& e) {
+ if (e.code() == std::errc::connection_aborted ||
+ e.code() == std::errc::invalid_argument) {
+ logger().debug("ShardedServerSocket({})::accept(): stopped ({})",
+ ss.listen_addr, e.what());
+ } else {
+ throw;
+ }
+ }).handle_exception([&ss](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("ShardedServerSocket({})::accept(): "
+ "got unexpected exception {}", ss.listen_addr, e_what);
+ ceph_abort();
+ });
+ });
+ });
+}
+
+seastar::future<>
+ShardedServerSocket::shutdown_destroy()
+{
+ assert(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
+ // shutdown shards
+ return this->container().invoke_on_all([](auto& ss) {
+ if (ss.listener) {
+ ss.listener->abort_accept();
+ }
+ return ss.shutdown_gate.close();
+ }).then([this] {
+ // destroy shards
+ return this->container().invoke_on_all([](auto& ss) {
+ assert(ss.shutdown_gate.is_closed());
+ ss.listen_addr = entity_addr_t();
+ ss.listener.reset();
+ });
+ }).then([this] {
+ // stop the sharded service: we should only construct/stop shards on #0
+ return this->container().invoke_on(0, [](auto& ss) {
+ assert(ss.service);
+ return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
+ });
+ });
+}
+
+seastar::future<ShardedServerSocket*>
+ShardedServerSocket::create(bool dispatch_only_on_this_shard)
+{
+ auto primary_sid = seastar::this_shard_id();
+ // start the sharded service: we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [primary_sid, dispatch_only_on_this_shard] {
+ auto service = std::make_unique<sharded_service_t>();
+ return service->start(
+ primary_sid, dispatch_only_on_this_shard, construct_tag{}
+ ).then([service = std::move(service)]() mutable {
+ auto p_shard = service.get();
+ p_shard->local().service = std::move(service);
+ return p_shard;
+ });
+ }).then([](auto p_shard) {
+ return &p_shard->local();
+ });
+}
+
+} // namespace crimson::net