summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/io_handler.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/net/io_handler.h
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net/io_handler.h')
-rw-r--r--src/crimson/net/io_handler.h623
1 files changed, 623 insertions, 0 deletions
diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h
new file mode 100644
index 000000000..f53c2ba64
--- /dev/null
+++ b/src/crimson/net/io_handler.h
@@ -0,0 +1,623 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <vector>
+
+#include <seastar/core/shared_future.hh>
+#include <seastar/util/later.hh>
+
+#include "crimson/common/gated.h"
+#include "Fwd.h"
+#include "SocketConnection.h"
+#include "FrameAssemblerV2.h"
+
+namespace crimson::net {
+
+/**
+ * crosscore_t
+ *
+ * To preserve the event order across cores.
+ */
+class crosscore_t {
+public:
+ using seq_t = uint64_t;
+
+ crosscore_t() = default;
+ ~crosscore_t() = default;
+
+ seq_t get_in_seq() const {
+ return in_seq;
+ }
+
+ seq_t prepare_submit() {
+ ++out_seq;
+ return out_seq;
+ }
+
+ bool proceed_or_wait(seq_t seq) {
+ if (seq == in_seq + 1) {
+ ++in_seq;
+ if (unlikely(in_pr_wait.has_value())) {
+ in_pr_wait->set_value();
+ in_pr_wait = std::nullopt;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ seastar::future<> wait(seq_t seq) {
+ assert(seq != in_seq + 1);
+ if (!in_pr_wait.has_value()) {
+ in_pr_wait = seastar::shared_promise<>();
+ }
+ return in_pr_wait->get_shared_future();
+ }
+
+private:
+ seq_t out_seq = 0;
+ seq_t in_seq = 0;
+ std::optional<seastar::shared_promise<>> in_pr_wait;
+};
+
+/**
+ * io_handler_state
+ *
+ * It is required to populate the states from IOHandler to ProtocolV2
+ * asynchronously.
+ */
+struct io_handler_state {
+ seq_num_t in_seq;
+ bool is_out_queued;
+ bool has_out_sent;
+
+ bool is_out_queued_or_sent() const {
+ return is_out_queued || has_out_sent;
+ }
+
+ /*
+ * should be consistent with the accroding interfaces in IOHandler
+ */
+
+ void reset_session(bool full) {
+ in_seq = 0;
+ if (full) {
+ is_out_queued = false;
+ has_out_sent = false;
+ }
+ }
+
+ void reset_peer_state() {
+ in_seq = 0;
+ is_out_queued = is_out_queued_or_sent();
+ has_out_sent = false;
+ }
+
+ void requeue_out_sent_up_to() {
+ // noop since the information is insufficient
+ }
+
+ void requeue_out_sent() {
+ if (has_out_sent) {
+ has_out_sent = false;
+ is_out_queued = true;
+ }
+ }
+};
+
+/**
+ * HandshakeListener
+ *
+ * The interface class for IOHandler to notify the ProtocolV2.
+ *
+ * The notifications may be cross-core and must be sent to
+ * SocketConnection::get_messenger_shard_id()
+ */
+class HandshakeListener {
+public:
+ virtual ~HandshakeListener() = default;
+
+ HandshakeListener(const HandshakeListener&) = delete;
+ HandshakeListener(HandshakeListener &&) = delete;
+ HandshakeListener &operator=(const HandshakeListener &) = delete;
+ HandshakeListener &operator=(HandshakeListener &&) = delete;
+
+ virtual seastar::future<> notify_out(
+ crosscore_t::seq_t cc_seq) = 0;
+
+ virtual seastar::future<> notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr,
+ io_handler_state) = 0;
+
+ virtual seastar::future<> notify_mark_down(
+ crosscore_t::seq_t cc_seq) = 0;
+
+protected:
+ HandshakeListener() = default;
+};
+
+/**
+ * IOHandler
+ *
+ * Implements the message read and write paths after the handshake, and also be
+ * responsible to dispatch events. It is supposed to be working on the same
+ * core with the underlying socket and the FrameAssemblerV2 class.
+ */
+class IOHandler final : public ConnectionHandler {
+public:
+ IOHandler(ChainedDispatchers &,
+ SocketConnection &);
+
+ ~IOHandler() final;
+
+ IOHandler(const IOHandler &) = delete;
+ IOHandler(IOHandler &&) = delete;
+ IOHandler &operator=(const IOHandler &) = delete;
+ IOHandler &operator=(IOHandler &&) = delete;
+
+/*
+ * as ConnectionHandler
+ */
+public:
+ seastar::shard_id get_shard_id() const final {
+ return shard_states->get_shard_id();
+ }
+
+ bool is_connected() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ return protocol_is_connected;
+ }
+
+ seastar::future<> send(MessageFRef msg) final;
+
+ seastar::future<> send_keepalive() final;
+
+ clock_t::time_point get_last_keepalive() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ return last_keepalive;
+ }
+
+ clock_t::time_point get_last_keepalive_ack() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ return last_keepalive_ack;
+ }
+
+ void set_last_keepalive_ack(clock_t::time_point when) final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ last_keepalive_ack = when;
+ }
+
+ void mark_down() final;
+
+/*
+ * as IOHandler to be called by ProtocolV2 handshake
+ *
+ * The calls may be cross-core and asynchronous
+ */
+public:
+ /*
+ * should not be called cross-core
+ */
+
+ void set_handshake_listener(HandshakeListener &hl) {
+ assert(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(handshake_listener == nullptr);
+ handshake_listener = &hl;
+ }
+
+ io_handler_state get_states() const {
+ // might be called from prv_sid during wait_io_exit_dispatching()
+ return {in_seq, is_out_queued(), has_out_sent()};
+ }
+
+ struct io_stat_printer {
+ const IOHandler &io_handler;
+ };
+ void print_io_stat(std::ostream &out) const;
+
+ seastar::future<> set_accepted_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id sid,
+ ConnectionFRef conn_fref);
+
+ /*
+ * may be called cross-core
+ */
+
+ seastar::future<> close_io(
+ crosscore_t::seq_t cc_seq,
+ bool is_dispatch_reset,
+ bool is_replace);
+
+ /**
+ * io_state_t
+ *
+ * The io_state is changed with the protocol state, to control the
+ * io behavior accordingly.
+ */
+ enum class io_state_t : uint8_t {
+ none, // no IO is possible as the connection is not available to the user yet.
+ delay, // IO is delayed until open.
+ open, // Dispatch In and Out concurrently.
+ drop, // Drop IO as the connection is closed.
+ switched // IO is switched to a different core
+ // (is moved to maybe_prv_shard_states)
+ };
+ friend class fmt::formatter<io_state_t>;
+
+ seastar::future<> set_io_state(
+ crosscore_t::seq_t cc_seq,
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out);
+
+ struct exit_dispatching_ret {
+ FrameAssemblerV2Ref frame_assembler;
+ io_handler_state io_states;
+ };
+ seastar::future<exit_dispatching_ret>
+ wait_io_exit_dispatching(
+ crosscore_t::seq_t cc_seq);
+
+ seastar::future<> reset_session(
+ crosscore_t::seq_t cc_seq,
+ bool full);
+
+ seastar::future<> reset_peer_state(
+ crosscore_t::seq_t cc_seq);
+
+ seastar::future<> requeue_out_sent_up_to(
+ crosscore_t::seq_t cc_seq,
+ seq_num_t msg_seq);
+
+ seastar::future<> requeue_out_sent(
+ crosscore_t::seq_t cc_seq);
+
+ seastar::future<> dispatch_accept(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef,
+ bool is_replace);
+
+ seastar::future<> dispatch_connect(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef);
+
+ private:
+ class shard_states_t;
+ using shard_states_ref_t = std::unique_ptr<shard_states_t>;
+
+ class shard_states_t {
+ public:
+ shard_states_t(seastar::shard_id _sid, io_state_t state)
+ : sid{_sid}, io_state{state} {}
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+ io_state_t get_io_state() const {
+ assert(seastar::this_shard_id() == sid);
+ return io_state;
+ }
+
+ void set_io_state(io_state_t new_state) {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != new_state);
+ pr_io_state_changed.set_value();
+ pr_io_state_changed = seastar::promise<>();
+ if (io_state == io_state_t::open) {
+ // from open
+ if (out_dispatching) {
+ ceph_assert_always(!out_exit_dispatching.has_value());
+ out_exit_dispatching = seastar::promise<>();
+ }
+ }
+ io_state = new_state;
+ }
+
+ seastar::future<> wait_state_change() {
+ assert(seastar::this_shard_id() == sid);
+ return pr_io_state_changed.get_future();
+ }
+
+ template <typename Func>
+ void dispatch_in_background(
+ const char *what, SocketConnection &who, Func &&func) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(!gate.is_closed());
+ gate.dispatch_in_background(what, who, std::move(func));
+ }
+
+ void enter_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state == io_state_t::open);
+ ceph_assert_always(!in_exit_dispatching.has_value());
+ in_exit_dispatching = seastar::promise<>();
+ }
+
+ void exit_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != io_state_t::open);
+ ceph_assert_always(in_exit_dispatching.has_value());
+ in_exit_dispatching->set_value();
+ in_exit_dispatching = std::nullopt;
+ }
+
+ bool try_enter_out_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ if (out_dispatching) {
+ // already dispatching out
+ return false;
+ }
+ switch (io_state) {
+ case io_state_t::open:
+ [[fallthrough]];
+ case io_state_t::delay:
+ out_dispatching = true;
+ return true;
+ case io_state_t::drop:
+ [[fallthrough]];
+ case io_state_t::switched:
+ // do not dispatch out
+ return false;
+ default:
+ ceph_abort("impossible");
+ }
+ }
+
+ void notify_out_dispatching_stopped(
+ const char *what, SocketConnection &conn);
+
+ void exit_out_dispatching(
+ const char *what, SocketConnection &conn) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(out_dispatching);
+ out_dispatching = false;
+ notify_out_dispatching_stopped(what, conn);
+ }
+
+ seastar::future<> wait_io_exit_dispatching();
+
+ seastar::future<> close() {
+ assert(seastar::this_shard_id() == sid);
+ assert(!gate.is_closed());
+ return gate.close();
+ }
+
+ bool assert_closed_and_exit() const {
+ assert(seastar::this_shard_id() == sid);
+ if (gate.is_closed()) {
+ ceph_assert_always(io_state == io_state_t::drop ||
+ io_state == io_state_t::switched);
+ ceph_assert_always(!out_dispatching);
+ ceph_assert_always(!out_exit_dispatching);
+ ceph_assert_always(!in_exit_dispatching);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ static shard_states_ref_t create(
+ seastar::shard_id sid, io_state_t state) {
+ return std::make_unique<shard_states_t>(sid, state);
+ }
+
+ static shard_states_ref_t create_from_previous(
+ shard_states_t &prv_states, seastar::shard_id new_sid);
+
+ private:
+ const seastar::shard_id sid;
+ io_state_t io_state;
+
+ crimson::common::Gated gate;
+ seastar::promise<> pr_io_state_changed;
+ bool out_dispatching = false;
+ std::optional<seastar::promise<>> out_exit_dispatching;
+ std::optional<seastar::promise<>> in_exit_dispatching;
+ };
+
+ void do_set_io_state(
+ io_state_t new_state,
+ std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+ FrameAssemblerV2Ref fa = nullptr,
+ bool set_notify_out = false);
+
+ io_state_t get_io_state() const {
+ return shard_states->get_io_state();
+ }
+
+ void do_requeue_out_sent();
+
+ void do_requeue_out_sent_up_to(seq_num_t seq);
+
+ void assign_frame_assembler(FrameAssemblerV2Ref);
+
+ seastar::future<> send_redirected(MessageFRef msg);
+
+ seastar::future<> do_send(MessageFRef msg);
+
+ seastar::future<> send_keepalive_redirected();
+
+ seastar::future<> do_send_keepalive();
+
+ seastar::future<> to_new_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef,
+ std::optional<bool> is_replace);
+
+ void dispatch_reset(bool is_replace);
+
+ void dispatch_remote_reset();
+
+ bool is_out_queued() const {
+ return (!out_pending_msgs.empty() ||
+ ack_left > 0 ||
+ need_keepalive ||
+ next_keepalive_ack.has_value());
+ }
+
+ bool has_out_sent() const {
+ return !out_sent_msgs.empty();
+ }
+
+ void reset_in();
+
+ void reset_out();
+
+ void discard_out_sent();
+
+ seastar::future<> do_out_dispatch(shard_states_t &ctx);
+
+#ifdef UNIT_TESTS_BUILT
+ struct sweep_ret {
+ ceph::bufferlist bl;
+ std::vector<ceph::msgr::v2::Tag> tags;
+ };
+ sweep_ret
+#else
+ ceph::bufferlist
+#endif
+ sweep_out_pending_msgs_to_sent(
+ bool require_keepalive,
+ std::optional<utime_t> maybe_keepalive_ack,
+ bool require_ack);
+
+ void maybe_notify_out_dispatch();
+
+ void notify_out_dispatch();
+
+ void ack_out_sent(seq_num_t seq);
+
+ seastar::future<> read_message(
+ shard_states_t &ctx,
+ utime_t throttle_stamp,
+ std::size_t msg_size);
+
+ void do_in_dispatch();
+
+ seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid);
+
+private:
+ shard_states_ref_t shard_states;
+
+ crosscore_t crosscore;
+
+ // drop was happening in the previous sid
+ std::optional<seastar::shard_id> maybe_dropped_sid;
+
+ // the remaining states in the previous sid for cleanup, see to_new_sid()
+ shard_states_ref_t maybe_prv_shard_states;
+
+ ChainedDispatchers &dispatchers;
+
+ SocketConnection &conn;
+
+ // core local reference for dispatching, valid until reset/close
+ ConnectionRef conn_ref;
+
+ HandshakeListener *handshake_listener = nullptr;
+
+ FrameAssemblerV2Ref frame_assembler;
+
+ bool protocol_is_connected = false;
+
+ bool need_dispatch_reset = true;
+
+ /*
+ * out states for writing
+ */
+
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+
+ // messages to be resent after connection gets reset
+ std::deque<MessageFRef> out_pending_msgs;
+
+ // messages sent, but not yet acked by peer
+ std::deque<MessageFRef> out_sent_msgs;
+
+ bool need_keepalive = false;
+
+ std::optional<utime_t> next_keepalive_ack = std::nullopt;
+
+ uint64_t ack_left = 0;
+
+ bool need_notify_out = false;
+
+ /*
+ * in states for reading
+ */
+
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+
+ clock_t::time_point last_keepalive;
+
+ clock_t::time_point last_keepalive_ack;
+};
+
+inline std::ostream& operator<<(
+ std::ostream& out, IOHandler::io_stat_printer stat) {
+ stat.io_handler.print_io_stat(out);
+ return out;
+}
+
+} // namespace crimson::net
+
+template <>
+struct fmt::formatter<crimson::net::io_handler_state> {
+ constexpr auto parse(format_parse_context& ctx) {
+ return ctx.begin();
+ }
+
+ template <typename FormatContext>
+ auto format(crimson::net::io_handler_state state, FormatContext& ctx) {
+ return fmt::format_to(
+ ctx.out(),
+ "io(in_seq={}, is_out_queued={}, has_out_sent={})",
+ state.in_seq,
+ state.is_out_queued,
+ state.has_out_sent);
+ }
+};
+
+template <>
+struct fmt::formatter<crimson::net::IOHandler::io_state_t>
+ : fmt::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(crimson::net::IOHandler::io_state_t state, FormatContext& ctx) {
+ using enum crimson::net::IOHandler::io_state_t;
+ std::string_view name;
+ switch (state) {
+ case none:
+ name = "none";
+ break;
+ case delay:
+ name = "delay";
+ break;
+ case open:
+ name = "open";
+ break;
+ case drop:
+ name = "drop";
+ break;
+ case switched:
+ name = "switched";
+ break;
+ }
+ return formatter<string_view>::format(name, ctx);
+ }
+};
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::IOHandler::io_stat_printer> : fmt::ostream_formatter {};
+#endif