summaryrefslogtreecommitdiffstats
path: root/src/crimson/net
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net')
-rw-r--r--src/crimson/net/Connection.h147
-rw-r--r--src/crimson/net/Dispatcher.h62
-rw-r--r--src/crimson/net/Errors.cc51
-rw-r--r--src/crimson/net/Errors.h53
-rw-r--r--src/crimson/net/FrameAssemblerV2.cc461
-rw-r--r--src/crimson/net/FrameAssemblerV2.h257
-rw-r--r--src/crimson/net/Fwd.h52
-rw-r--r--src/crimson/net/Interceptor.h186
-rw-r--r--src/crimson/net/Messenger.cc19
-rw-r--r--src/crimson/net/Messenger.h130
-rw-r--r--src/crimson/net/ProtocolV2.cc2348
-rw-r--r--src/crimson/net/ProtocolV2.h328
-rw-r--r--src/crimson/net/Socket.cc523
-rw-r--r--src/crimson/net/Socket.h201
-rw-r--r--src/crimson/net/SocketConnection.cc220
-rw-r--r--src/crimson/net/SocketConnection.h236
-rw-r--r--src/crimson/net/SocketMessenger.cc485
-rw-r--r--src/crimson/net/SocketMessenger.h192
-rw-r--r--src/crimson/net/chained_dispatchers.cc114
-rw-r--r--src/crimson/net/chained_dispatchers.h39
-rw-r--r--src/crimson/net/io_handler.cc1287
-rw-r--r--src/crimson/net/io_handler.h623
22 files changed, 8014 insertions, 0 deletions
diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h
new file mode 100644
index 000000000..7141e20f4
--- /dev/null
+++ b/src/crimson/net/Connection.h
@@ -0,0 +1,147 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <queue>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
+
+#include "Fwd.h"
+
+namespace crimson::net {
+
+using seq_num_t = uint64_t;
+
+/**
+ * Connection
+ *
+ * Abstraction for messenger connections.
+ *
+ * Except when otherwise specified, methods must be invoked from the core on which
+ * the connection originates.
+ */
+class Connection : public seastar::enable_shared_from_this<Connection> {
+ public:
+ using clock_t = seastar::lowres_system_clock;
+
+ Connection() {}
+
+ virtual ~Connection() {}
+
+ /**
+ * get_shard_id
+ *
+ * The shard id where the Connection is dispatching events and handling I/O.
+ *
+ * May be changed with the accept/connect events.
+ */
+ virtual const seastar::shard_id get_shard_id() const = 0;
+
+ virtual const entity_name_t &get_peer_name() const = 0;
+
+ entity_type_t get_peer_type() const { return get_peer_name().type(); }
+ int64_t get_peer_id() const { return get_peer_name().num(); }
+ bool peer_is_mon() const { return get_peer_name().is_mon(); }
+ bool peer_is_mgr() const { return get_peer_name().is_mgr(); }
+ bool peer_is_mds() const { return get_peer_name().is_mds(); }
+ bool peer_is_osd() const { return get_peer_name().is_osd(); }
+ bool peer_is_client() const { return get_peer_name().is_client(); }
+
+ virtual const entity_addr_t &get_peer_addr() const = 0;
+
+ const entity_addrvec_t get_peer_addrs() const {
+ return entity_addrvec_t(get_peer_addr());
+ }
+
+ virtual const entity_addr_t &get_peer_socket_addr() const = 0;
+
+ virtual uint64_t get_features() const = 0;
+
+ bool has_feature(uint64_t f) const {
+ return get_features() & f;
+ }
+
+ /// true if the handshake has completed and no errors have been encountered
+ virtual bool is_connected() const = 0;
+
+ /**
+ * send
+ *
+ * Send a message over a connection that has completed its handshake.
+ *
+ * May be invoked from any core, but that requires to chain the returned
+ * future to preserve ordering.
+ */
+ virtual seastar::future<> send(MessageURef msg) = 0;
+
+ /**
+ * send_keepalive
+ *
+ * Send a keepalive message over a connection that has completed its
+ * handshake.
+ *
+ * May be invoked from any core, but that requires to chain the returned
+ * future to preserve ordering.
+ */
+ virtual seastar::future<> send_keepalive() = 0;
+
+ virtual clock_t::time_point get_last_keepalive() const = 0;
+
+ virtual clock_t::time_point get_last_keepalive_ack() const = 0;
+
+ // workaround for the monitor client
+ virtual void set_last_keepalive_ack(clock_t::time_point when) = 0;
+
+ // close the connection and cancel any any pending futures from read/send,
+ // without dispatching any reset event
+ virtual void mark_down() = 0;
+
+ struct user_private_t {
+ virtual ~user_private_t() = default;
+ };
+
+ virtual bool has_user_private() const = 0;
+
+ virtual user_private_t &get_user_private() = 0;
+
+ virtual void set_user_private(std::unique_ptr<user_private_t>) = 0;
+
+ virtual void print(std::ostream& out) const = 0;
+
+#ifdef UNIT_TESTS_BUILT
+ virtual bool is_protocol_ready() const = 0;
+
+ virtual bool is_protocol_standby() const = 0;
+
+ virtual bool is_protocol_closed() const = 0;
+
+ virtual bool is_protocol_closed_clean() const = 0;
+
+ virtual bool peer_wins() const = 0;
+#endif
+};
+
+inline std::ostream& operator<<(std::ostream& out, const Connection& conn) {
+ out << "[";
+ conn.print(out);
+ out << "]";
+ return out;
+}
+
+} // namespace crimson::net
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::Connection> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h
new file mode 100644
index 000000000..9eea0a858
--- /dev/null
+++ b/src/crimson/net/Dispatcher.h
@@ -0,0 +1,62 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "Fwd.h"
+
+class AuthAuthorizer;
+
+namespace crimson::net {
+
+class Dispatcher {
+ public:
+ virtual ~Dispatcher() {}
+
+ // Dispatchers are put into a chain as described by chain-of-responsibility
+ // pattern. If any of the dispatchers claims this message, it returns a valid
+ // future to prevent other dispatchers from processing it, and this is also
+ // used to throttle the connection if it's too busy.
+ virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
+
+ // The connection is moving to the new_shard under accept/connect.
+ // User should not operate conn in this shard thereafter.
+ virtual void ms_handle_shard_change(
+ ConnectionRef conn,
+ seastar::shard_id new_shard,
+ bool is_accept_or_connect) {}
+
+ // The connection is accepted or recoverred(lossless), all the followup
+ // events and messages will be dispatched to this shard.
+ //
+ // is_replace=true means the accepted connection has replaced
+ // another connecting connection with the same peer_addr, which currently only
+ // happens under lossy policy when both sides wish to connect to each other.
+ virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) {}
+
+ // The connection is (re)connected, all the followup events and messages will
+ // be dispatched to this shard.
+ virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id prv_shard) {}
+
+ // a reset event is dispatched when the connection is closed unexpectedly.
+ //
+ // is_replace=true means the reset connection is going to be replaced by
+ // another accepting connection with the same peer_addr, which currently only
+ // happens under lossy policy when both sides wish to connect to each other.
+ virtual void ms_handle_reset(ConnectionRef conn, bool is_replace) {}
+
+ virtual void ms_handle_remote_reset(ConnectionRef conn) {}
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc
new file mode 100644
index 000000000..d07c090db
--- /dev/null
+++ b/src/crimson/net/Errors.cc
@@ -0,0 +1,51 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Errors.h"
+
+namespace crimson::net {
+
+const std::error_category& net_category()
+{
+ struct category : public std::error_category {
+ const char* name() const noexcept override {
+ return "crimson::net";
+ }
+
+ std::string message(int ev) const override {
+ switch (static_cast<error>(ev)) {
+ case error::success:
+ return "success";
+ case error::bad_connect_banner:
+ return "bad connect banner";
+ case error::bad_peer_address:
+ return "bad peer address";
+ case error::negotiation_failure:
+ return "negotiation failure";
+ case error::read_eof:
+ return "read eof";
+ case error::corrupted_message:
+ return "corrupted message";
+ case error::protocol_aborted:
+ return "protocol aborted";
+ default:
+ return "unknown";
+ }
+ }
+ };
+ static category instance;
+ return instance;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h
new file mode 100644
index 000000000..3a17a103a
--- /dev/null
+++ b/src/crimson/net/Errors.h
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <system_error>
+
+namespace crimson::net {
+
+/// net error codes
+enum class error {
+ success = 0,
+ bad_connect_banner,
+ bad_peer_address,
+ negotiation_failure,
+ read_eof,
+ corrupted_message,
+ protocol_aborted,
+};
+
+/// net error category
+const std::error_category& net_category();
+
+inline std::error_code make_error_code(error e)
+{
+ return {static_cast<int>(e), net_category()};
+}
+
+inline std::error_condition make_error_condition(error e)
+{
+ return {static_cast<int>(e), net_category()};
+}
+
+} // namespace crimson::net
+
+namespace std {
+
+/// enables implicit conversion to std::error_condition
+template <>
+struct is_error_condition_enum<crimson::net::error> : public true_type {};
+
+} // namespace std
diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc
new file mode 100644
index 000000000..273a6350d
--- /dev/null
+++ b/src/crimson/net/FrameAssemblerV2.cc
@@ -0,0 +1,461 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "FrameAssemblerV2.h"
+
+#include "Errors.h"
+#include "SocketConnection.h"
+
+using ceph::msgr::v2::FrameAssembler;
+using ceph::msgr::v2::FrameError;
+using ceph::msgr::v2::preamble_block_t;
+using ceph::msgr::v2::segment_t;
+using ceph::msgr::v2::Tag;
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn)
+ : conn{_conn}, sid{seastar::this_shard_id()}
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+}
+
+FrameAssemblerV2::~FrameAssemblerV2()
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ assert(seastar::this_shard_id() == sid);
+ if (has_socket()) {
+ std::ignore = move_socket();
+ }
+}
+
+#ifdef UNIT_TESTS_BUILT
+// should be consistent to intercept() in ProtocolV2.cc
+seastar::future<> FrameAssemblerV2::intercept_frames(
+ std::vector<Breakpoint> bps,
+ bp_type_t type)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ if (!conn.interceptor) {
+ return seastar::now();
+ }
+ return conn.interceptor->intercept(conn, bps
+ ).then([this, type](bp_action_t action) {
+ return seastar::smp::submit_to(
+ socket->get_shard_id(),
+ [this, type, action] {
+ socket->set_trap(type, action, &conn.interceptor->blocker);
+ });
+ });
+}
+#endif
+
+void FrameAssemblerV2::set_is_rev1(bool _is_rev1)
+{
+ assert(seastar::this_shard_id() == sid);
+ is_rev1 = _is_rev1;
+ tx_frame_asm.set_is_rev1(_is_rev1);
+ rx_frame_asm.set_is_rev1(_is_rev1);
+}
+
+void FrameAssemblerV2::create_session_stream_handlers(
+ const AuthConnectionMeta &auth_meta,
+ bool crossed)
+{
+ assert(seastar::this_shard_id() == sid);
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ nullptr, auth_meta, is_rev1, crossed);
+}
+
+void FrameAssemblerV2::reset_handlers()
+{
+ assert(seastar::this_shard_id() == sid);
+ session_stream_handlers = { nullptr, nullptr };
+ session_comp_handlers = { nullptr, nullptr };
+}
+
+FrameAssemblerV2::mover_t
+FrameAssemblerV2::to_replace()
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(is_socket_valid());
+
+ clear();
+
+ return mover_t{
+ move_socket(),
+ std::move(session_stream_handlers),
+ std::move(session_comp_handlers)};
+}
+
+seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
+{
+ assert(seastar::this_shard_id() == sid);
+
+ clear();
+
+ session_stream_handlers = std::move(mover.session_stream_handlers);
+ session_comp_handlers = std::move(mover.session_comp_handlers);
+ if (has_socket()) {
+ return replace_shutdown_socket(std::move(mover.socket));
+ } else {
+ set_socket(std::move(mover.socket));
+ return seastar::now();
+ }
+}
+
+void FrameAssemblerV2::start_recording()
+{
+ assert(seastar::this_shard_id() == sid);
+ record_io = true;
+ rxbuf.clear();
+ txbuf.clear();
+}
+
+FrameAssemblerV2::record_bufs_t
+FrameAssemblerV2::stop_recording()
+{
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(record_io == true);
+ record_io = false;
+ return record_bufs_t{std::move(rxbuf), std::move(txbuf)};
+}
+
+bool FrameAssemblerV2::has_socket() const
+{
+ assert((socket && conn.socket) || (!socket && !conn.socket));
+ return bool(socket);
+}
+
+bool FrameAssemblerV2::is_socket_valid() const
+{
+ assert(seastar::this_shard_id() == sid);
+#ifndef NDEBUG
+ if (has_socket() && socket->get_shard_id() == sid) {
+ assert(socket->is_shutdown() == is_socket_shutdown);
+ }
+#endif
+ return has_socket() && !is_socket_shutdown;
+}
+
+seastar::shard_id
+FrameAssemblerV2::get_socket_shard_id() const
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(is_socket_valid());
+ return socket->get_shard_id();
+}
+
+SocketFRef FrameAssemblerV2::move_socket()
+{
+ assert(has_socket());
+ conn.set_socket(nullptr);
+ return std::move(socket);
+}
+
+void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(!has_socket());
+ assert(new_socket);
+ socket = std::move(new_socket);
+ conn.set_socket(socket.get());
+ is_socket_shutdown = false;
+ assert(is_socket_valid());
+}
+
+void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ // Note: may not invoke on the socket core
+ socket->learn_ephemeral_port_as_connector(port);
+}
+
+template <bool may_cross_core>
+void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *gate)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(is_socket_valid());
+ is_socket_shutdown = true;
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ assert(gate);
+ gate->dispatch_in_background("shutdown_socket", conn, [this] {
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ socket->shutdown();
+ });
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ assert(!gate);
+ socket->shutdown();
+ }
+}
+template void FrameAssemblerV2::shutdown_socket<true>(crimson::common::Gated *);
+template void FrameAssemblerV2::shutdown_socket<false>(crimson::common::Gated *);
+
+seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ assert(!is_socket_valid());
+ auto old_socket = move_socket();
+ auto old_socket_shard_id = old_socket->get_shard_id();
+ set_socket(std::move(new_socket));
+ return seastar::smp::submit_to(
+ old_socket_shard_id,
+ [old_socket = std::move(old_socket)]() mutable {
+ return old_socket->close(
+ ).then([sock = std::move(old_socket)] {});
+ });
+}
+
+seastar::future<> FrameAssemblerV2::close_shutdown_socket()
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ assert(!is_socket_valid());
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ return socket->close();
+ });
+}
+
+template <bool may_cross_core>
+seastar::future<ceph::bufferptr>
+FrameAssemblerV2::read_exactly(std::size_t bytes)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, bytes] {
+ return socket->read_exactly(bytes);
+ }).then([this](auto bptr) {
+ if (record_io) {
+ rxbuf.append(bptr);
+ }
+ return bptr;
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->read_exactly(bytes);
+ }
+}
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<true>(std::size_t);
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<false>(std::size_t);
+
+template <bool may_cross_core>
+seastar::future<ceph::bufferlist>
+FrameAssemblerV2::read(std::size_t bytes)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, bytes] {
+ return socket->read(bytes);
+ }).then([this](auto buf) {
+ if (record_io) {
+ rxbuf.append(buf);
+ }
+ return buf;
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->read(bytes);
+ }
+}
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<true>(std::size_t);
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<false>(std::size_t);
+
+template <bool may_cross_core>
+seastar::future<>
+FrameAssemblerV2::write(ceph::bufferlist buf)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ if (record_io) {
+ txbuf.append(buf);
+ }
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+ return socket->write(std::move(buf));
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->write(std::move(buf));
+ }
+}
+template seastar::future<> FrameAssemblerV2::write<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write<false>(ceph::bufferlist);
+
+template <bool may_cross_core>
+seastar::future<>
+FrameAssemblerV2::flush()
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ return socket->flush();
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->flush();
+ }
+}
+template seastar::future<> FrameAssemblerV2::flush<true>();
+template seastar::future<> FrameAssemblerV2::flush<false>();
+
+template <bool may_cross_core>
+seastar::future<>
+FrameAssemblerV2::write_flush(ceph::bufferlist buf)
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(has_socket());
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+ return socket->write_flush(std::move(buf));
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->write_flush(std::move(buf));
+ }
+}
+template seastar::future<> FrameAssemblerV2::write_flush<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write_flush<false>(ceph::bufferlist);
+
+template <bool may_cross_core>
+seastar::future<FrameAssemblerV2::read_main_t>
+FrameAssemblerV2::read_main_preamble()
+{
+ assert(seastar::this_shard_id() == sid);
+ rx_preamble.clear();
+ return read_exactly<may_cross_core>(
+ rx_frame_asm.get_preamble_onwire_len()
+ ).then([this](auto bptr) {
+ rx_preamble.append(std::move(bptr));
+ Tag tag;
+ try {
+ tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+ } catch (FrameError& e) {
+ logger().warn("{} read_main_preamble: {}", conn, e.what());
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+ }
+#ifdef UNIT_TESTS_BUILT
+ return intercept_frame(tag, false
+ ).then([this, tag] {
+ return read_main_t{tag, &rx_frame_asm};
+ });
+#else
+ return read_main_t{tag, &rx_frame_asm};
+#endif
+ });
+}
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<true>();
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<false>();
+
+template <bool may_cross_core>
+seastar::future<FrameAssemblerV2::read_payload_t*>
+FrameAssemblerV2::read_frame_payload()
+{
+ assert(seastar::this_shard_id() == sid);
+ rx_segments_data.clear();
+ return seastar::do_until(
+ [this] {
+ return rx_frame_asm.get_num_segments() == rx_segments_data.size();
+ },
+ [this] {
+ // TODO: create aligned and contiguous buffer from socket
+ const size_t seg_idx = rx_segments_data.size();
+ if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+ alignment != segment_t::DEFAULT_ALIGNMENT) {
+ logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
+ conn, alignment, rx_segments_data.size());
+ }
+ uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+ // TODO: create aligned and contiguous buffer from socket
+ return read_exactly<may_cross_core>(onwire_len
+ ).then([this](auto bptr) {
+ logger().trace("{} RECV({}) frame segment[{}]",
+ conn, bptr.length(), rx_segments_data.size());
+ bufferlist segment;
+ segment.append(std::move(bptr));
+ rx_segments_data.emplace_back(std::move(segment));
+ });
+ }
+ ).then([this] {
+ return read_exactly<may_cross_core>(rx_frame_asm.get_epilogue_onwire_len());
+ }).then([this](auto bptr) {
+ logger().trace("{} RECV({}) frame epilogue", conn, bptr.length());
+ bool ok = false;
+ try {
+ bufferlist rx_epilogue;
+ rx_epilogue.append(std::move(bptr));
+ ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
+ } catch (FrameError& e) {
+ logger().error("read_frame_payload: {} {}", conn, e.what());
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ logger().error("read_frame_payload: {} bad auth tag", conn);
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+ }
+ // we do have a mechanism that allows transmitter to start sending message
+ // and abort after putting entire data field on wire. This will be used by
+ // the kernel client to avoid unnecessary buffering.
+ if (!ok) {
+ ceph_abort("TODO");
+ }
+ return &rx_segments_data;
+ });
+}
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<true>();
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<false>();
+
+void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
+{
+ const auto main_preamble =
+ reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
+ logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
+ conn, bl.length(), (int)main_preamble->tag,
+ (int)main_preamble->num_segments, main_preamble->crc);
+}
+
+FrameAssemblerV2Ref FrameAssemblerV2::create(SocketConnection &conn)
+{
+ return std::make_unique<FrameAssemblerV2>(conn);
+}
+
+void FrameAssemblerV2::clear()
+{
+ record_io = false;
+ rxbuf.clear();
+ txbuf.clear();
+ rx_preamble.clear();
+ rx_segments_data.clear();
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h
new file mode 100644
index 000000000..9c89c144e
--- /dev/null
+++ b/src/crimson/net/FrameAssemblerV2.h
@@ -0,0 +1,257 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/async/frames_v2.h"
+#include "msg/async/crypto_onwire.h"
+#include "msg/async/compression_onwire.h"
+
+#include "crimson/common/gated.h"
+#include "crimson/net/Socket.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+namespace crimson::net {
+
+class SocketConnection;
+class FrameAssemblerV2;
+using FrameAssemblerV2Ref = std::unique_ptr<FrameAssemblerV2>;
+
+class FrameAssemblerV2 {
+public:
+ FrameAssemblerV2(SocketConnection &conn);
+
+ ~FrameAssemblerV2();
+
+ FrameAssemblerV2(const FrameAssemblerV2 &) = delete;
+
+ FrameAssemblerV2(FrameAssemblerV2 &&) = delete;
+
+ void set_shard_id(seastar::shard_id _sid) {
+ assert(seastar::this_shard_id() == sid);
+ clear();
+ sid = _sid;
+ }
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+ void set_is_rev1(bool is_rev1);
+
+ void create_session_stream_handlers(
+ const AuthConnectionMeta &auth_meta,
+ bool crossed);
+
+ void reset_handlers();
+
+ /*
+ * replacing
+ */
+
+ struct mover_t {
+ SocketFRef socket;
+ ceph::crypto::onwire::rxtx_t session_stream_handlers;
+ ceph::compression::onwire::rxtx_t session_comp_handlers;
+ };
+
+ mover_t to_replace();
+
+ seastar::future<> replace_by(mover_t &&);
+
+ /*
+ * auth signature interfaces
+ */
+
+ void start_recording();
+
+ struct record_bufs_t {
+ ceph::bufferlist rxbuf;
+ ceph::bufferlist txbuf;
+ };
+ record_bufs_t stop_recording();
+
+ /*
+ * socket maintainence interfaces
+ */
+
+ // the socket exists and not shutdown
+ bool is_socket_valid() const;
+
+ seastar::shard_id get_socket_shard_id() const;
+
+ void set_socket(SocketFRef &&);
+
+ void learn_socket_ephemeral_port_as_connector(uint16_t port);
+
+ // if may_cross_core == true, gate is required for cross-core shutdown
+ template <bool may_cross_core>
+ void shutdown_socket(crimson::common::Gated *gate);
+
+ seastar::future<> replace_shutdown_socket(SocketFRef &&);
+
+ seastar::future<> close_shutdown_socket();
+
+ /*
+ * socket read and write interfaces
+ */
+
+ template <bool may_cross_core = true>
+ seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes);
+
+ template <bool may_cross_core = true>
+ seastar::future<ceph::bufferlist> read(std::size_t bytes);
+
+ template <bool may_cross_core = true>
+ seastar::future<> write(ceph::bufferlist);
+
+ template <bool may_cross_core = true>
+ seastar::future<> flush();
+
+ template <bool may_cross_core = true>
+ seastar::future<> write_flush(ceph::bufferlist);
+
+ /*
+ * frame read and write interfaces
+ */
+
+ /// may throw negotiation_failure as fault
+ struct read_main_t {
+ ceph::msgr::v2::Tag tag;
+ const ceph::msgr::v2::FrameAssembler *rx_frame_asm;
+ };
+ template <bool may_cross_core = true>
+ seastar::future<read_main_t> read_main_preamble();
+
+ /// may throw negotiation_failure as fault
+ using read_payload_t = ceph::msgr::v2::segment_bls_t;
+ // FIXME: read_payload_t cannot be no-throw move constructible
+ template <bool may_cross_core = true>
+ seastar::future<read_payload_t*> read_frame_payload();
+
+ template <class F>
+ ceph::bufferlist get_buffer(F &tx_frame) {
+ assert(seastar::this_shard_id() == sid);
+ auto bl = tx_frame.get_buffer(tx_frame_asm);
+ log_main_preamble(bl);
+ return bl;
+ }
+
+ template <class F, bool may_cross_core = true>
+ seastar::future<> write_flush_frame(F &tx_frame) {
+ assert(seastar::this_shard_id() == sid);
+ auto bl = get_buffer(tx_frame);
+#ifdef UNIT_TESTS_BUILT
+ return intercept_frame(F::tag, true
+ ).then([this, bl=std::move(bl)]() mutable {
+ return write_flush<may_cross_core>(std::move(bl));
+ });
+#else
+ return write_flush<may_cross_core>(std::move(bl));
+#endif
+ }
+
+ static FrameAssemblerV2Ref create(SocketConnection &conn);
+
+#ifdef UNIT_TESTS_BUILT
+ seastar::future<> intercept_frames(
+ std::vector<ceph::msgr::v2::Tag> tags,
+ bool is_write) {
+ auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+ std::vector<Breakpoint> bps;
+ for (auto &tag : tags) {
+ bps.emplace_back(Breakpoint{tag, type});
+ }
+ return intercept_frames(bps, type);
+ }
+
+ seastar::future<> intercept_frame(
+ ceph::msgr::v2::Tag tag,
+ bool is_write) {
+ auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+ std::vector<Breakpoint> bps;
+ bps.emplace_back(Breakpoint{tag, type});
+ return intercept_frames(bps, type);
+ }
+
+ seastar::future<> intercept_frame(
+ custom_bp_t bp,
+ bool is_write) {
+ auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+ std::vector<Breakpoint> bps;
+ bps.emplace_back(Breakpoint{bp});
+ return intercept_frames(bps, type);
+ }
+#endif
+
+private:
+#ifdef UNIT_TESTS_BUILT
+ seastar::future<> intercept_frames(
+ std::vector<Breakpoint> bps,
+ bp_type_t type);
+#endif
+
+ bool has_socket() const;
+
+ SocketFRef move_socket();
+
+ void clear();
+
+ void log_main_preamble(const ceph::bufferlist &bl);
+
+ SocketConnection &conn;
+
+ SocketFRef socket;
+
+ // checking Socket::is_shutdown() synchronously is impossible when sid is
+ // different from the socket sid.
+ bool is_socket_shutdown = false;
+
+ // the current working shard, can be messenger or socket shard.
+ // if is messenger shard, should call interfaces with may_cross_core = true.
+ seastar::shard_id sid;
+
+ /*
+ * auth signature
+ *
+ * only in the messenger core
+ */
+
+ bool record_io = false;
+
+ ceph::bufferlist rxbuf;
+
+ ceph::bufferlist txbuf;
+
+ /*
+ * frame data and handlers
+ */
+
+ ceph::crypto::onwire::rxtx_t session_stream_handlers = { nullptr, nullptr };
+
+ // TODO
+ ceph::compression::onwire::rxtx_t session_comp_handlers = { nullptr, nullptr };
+
+ bool is_rev1 = false;
+
+ ceph::msgr::v2::FrameAssembler tx_frame_asm{
+ &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
+ &session_comp_handlers};
+
+ ceph::msgr::v2::FrameAssembler rx_frame_asm{
+ &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
+ &session_comp_handlers};
+
+ // in the messenger core during handshake,
+ // and in the socket core during open,
+ // must be cleaned before switching cores.
+
+ ceph::bufferlist rx_preamble;
+
+ read_payload_t rx_segments_data;
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h
new file mode 100644
index 000000000..2b1595141
--- /dev/null
+++ b/src/crimson/net/Fwd.h
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/container/small_vector.hpp>
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/sharded.hh>
+
+#include "msg/Connection.h"
+#include "msg/MessageRef.h"
+#include "msg/msg_types.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/common/local_shared_foreign_ptr.h"
+
+class AuthConnectionMeta;
+
+namespace crimson::net {
+
+using msgr_tag_t = uint8_t;
+using stop_t = seastar::stop_iteration;
+
+class Connection;
+using ConnectionLRef = seastar::shared_ptr<Connection>;
+using ConnectionFRef = seastar::foreign_ptr<ConnectionLRef>;
+using ConnectionRef = ::crimson::local_shared_foreign_ptr<ConnectionLRef>;
+
+class Dispatcher;
+class ChainedDispatchers;
+constexpr std::size_t NUM_DISPATCHERS = 4u;
+using dispatchers_t = boost::container::small_vector<Dispatcher*, NUM_DISPATCHERS>;
+
+class Messenger;
+using MessengerRef = seastar::shared_ptr<Messenger>;
+
+using MessageFRef = seastar::foreign_ptr<MessageURef>;
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h
new file mode 100644
index 000000000..35b74e243
--- /dev/null
+++ b/src/crimson/net/Interceptor.h
@@ -0,0 +1,186 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <variant>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/sleep.hh>
+
+#include "Fwd.h"
+#include "msg/async/frames_v2.h"
+
+namespace crimson::net {
+
+enum class custom_bp_t : uint8_t {
+ BANNER_WRITE = 0,
+ BANNER_READ,
+ BANNER_PAYLOAD_READ,
+ SOCKET_CONNECTING,
+ SOCKET_ACCEPTED
+};
+inline const char* get_bp_name(custom_bp_t bp) {
+ uint8_t index = static_cast<uint8_t>(bp);
+ static const char *const bp_names[] = {"BANNER_WRITE",
+ "BANNER_READ",
+ "BANNER_PAYLOAD_READ",
+ "SOCKET_CONNECTING",
+ "SOCKET_ACCEPTED"};
+ assert(index < std::size(bp_names));
+ return bp_names[index];
+}
+
+enum class bp_type_t {
+ READ = 0,
+ WRITE
+};
+
+enum class bp_action_t {
+ CONTINUE = 0,
+ FAULT,
+ BLOCK,
+ STALL
+};
+
+class socket_blocker {
+ std::optional<seastar::abort_source> p_blocked;
+ std::optional<seastar::abort_source> p_unblocked;
+ const seastar::shard_id primary_sid;
+
+ public:
+ socket_blocker() : primary_sid{seastar::this_shard_id()} {}
+
+ seastar::future<> wait_blocked() {
+ ceph_assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(!p_blocked);
+ if (p_unblocked) {
+ return seastar::make_ready_future<>();
+ } else {
+ p_blocked = seastar::abort_source();
+ return seastar::sleep_abortable(
+ std::chrono::seconds(10), *p_blocked
+ ).then([] {
+ throw std::runtime_error(
+ "Timeout (10s) in socket_blocker::wait_blocked()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
+ }
+ }
+
+ seastar::future<> block() {
+ return seastar::smp::submit_to(primary_sid, [this] {
+ if (p_blocked) {
+ p_blocked->request_abort();
+ p_blocked = std::nullopt;
+ }
+ ceph_assert(!p_unblocked);
+ p_unblocked = seastar::abort_source();
+ return seastar::sleep_abortable(
+ std::chrono::seconds(10), *p_unblocked
+ ).then([] {
+ ceph_abort("Timeout (10s) in socket_blocker::block()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
+ });
+ }
+
+ void unblock() {
+ ceph_assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(!p_blocked);
+ ceph_assert(p_unblocked);
+ p_unblocked->request_abort();
+ p_unblocked = std::nullopt;
+ }
+};
+
+struct tag_bp_t {
+ ceph::msgr::v2::Tag tag;
+ bp_type_t type;
+ bool operator==(const tag_bp_t& x) const {
+ return tag == x.tag && type == x.type;
+ }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const tag_bp_t& x) const {
+ return std::tie(tag, type) < std::tie(x.tag, x.type);
+ }
+};
+
+struct Breakpoint {
+ using var_t = std::variant<custom_bp_t, tag_bp_t>;
+ var_t bp;
+ Breakpoint(custom_bp_t bp) : bp(bp) { }
+ Breakpoint(ceph::msgr::v2::Tag tag, bp_type_t type)
+ : bp(tag_bp_t{tag, type}) { }
+ bool operator==(const Breakpoint& x) const { return bp == x.bp; }
+ bool operator!=(const Breakpoint& x) const { return !operator==(x); }
+ bool operator==(const custom_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const custom_bp_t& x) const { return !operator==(x); }
+ bool operator==(const tag_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const Breakpoint& x) const { return bp < x.bp; }
+};
+
+struct Interceptor {
+ socket_blocker blocker;
+ virtual ~Interceptor() {}
+ virtual void register_conn(ConnectionRef) = 0;
+ virtual void register_conn_ready(ConnectionRef) = 0;
+ virtual void register_conn_closed(ConnectionRef) = 0;
+ virtual void register_conn_replaced(ConnectionRef) = 0;
+
+ virtual seastar::future<bp_action_t>
+ intercept(Connection&, std::vector<Breakpoint> bp) = 0;
+};
+
+} // namespace crimson::net
+
+template<>
+struct fmt::formatter<crimson::net::bp_action_t> : fmt::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(const crimson::net::bp_action_t& action, FormatContext& ctx) const {
+ static const char *const action_names[] = {"CONTINUE",
+ "FAULT",
+ "BLOCK",
+ "STALL"};
+ assert(static_cast<size_t>(action) < std::size(action_names));
+ return formatter<std::string_view>::format(action_names[static_cast<size_t>(action)], ctx);
+ }
+};
+
+template<>
+struct fmt::formatter<crimson::net::Breakpoint> : fmt::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(const crimson::net::Breakpoint& bp, FormatContext& ctx) const {
+ if (auto custom_bp = std::get_if<crimson::net::custom_bp_t>(&bp.bp)) {
+ return formatter<std::string_view>::format(crimson::net::get_bp_name(*custom_bp), ctx);
+ }
+ auto tag_bp = std::get<crimson::net::tag_bp_t>(bp.bp);
+ static const char *const tag_names[] = {"NONE",
+ "HELLO",
+ "AUTH_REQUEST",
+ "AUTH_BAD_METHOD",
+ "AUTH_REPLY_MORE",
+ "AUTH_REQUEST_MORE",
+ "AUTH_DONE",
+ "AUTH_SIGNATURE",
+ "CLIENT_IDENT",
+ "SERVER_IDENT",
+ "IDENT_MISSING_FEATURES",
+ "SESSION_RECONNECT",
+ "SESSION_RESET",
+ "SESSION_RETRY",
+ "SESSION_RETRY_GLOBAL",
+ "SESSION_RECONNECT_OK",
+ "WAIT",
+ "MESSAGE",
+ "KEEPALIVE2",
+ "KEEPALIVE2_ACK",
+ "ACK"};
+ assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names));
+ return fmt::format_to(ctx.out(), "{}_{}",
+ tag_names[static_cast<size_t>(tag_bp.tag)],
+ tag_bp.type == crimson::net::bp_type_t::WRITE ? "WRITE" : "READ");
+ }
+};
diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc
new file mode 100644
index 000000000..1af198589
--- /dev/null
+++ b/src/crimson/net/Messenger.cc
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Messenger.h"
+#include "SocketMessenger.h"
+
+namespace crimson::net {
+
+MessengerRef
+Messenger::create(const entity_name_t& name,
+ const std::string& lname,
+ uint64_t nonce,
+ bool dispatch_only_on_this_shard)
+{
+ return seastar::make_shared<SocketMessenger>(
+ name, lname, nonce, dispatch_only_on_this_shard);
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h
new file mode 100644
index 000000000..74df062d8
--- /dev/null
+++ b/src/crimson/net/Messenger.h
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "Fwd.h"
+#include "crimson/common/throttle.h"
+#include "msg/Message.h"
+#include "msg/Policy.h"
+
+class AuthAuthorizer;
+
+namespace crimson::auth {
+class AuthClient;
+class AuthServer;
+}
+
+namespace crimson::net {
+
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
+using Throttle = crimson::common::Throttle;
+using SocketPolicy = ceph::net::Policy<Throttle>;
+
+class Messenger {
+public:
+ Messenger() {}
+
+ virtual ~Messenger() {}
+
+ virtual const entity_name_t& get_myname() const = 0;
+
+ entity_type_t get_mytype() const { return get_myname().type(); }
+
+ virtual const entity_addrvec_t &get_myaddrs() const = 0;
+
+ entity_addr_t get_myaddr() const { return get_myaddrs().front(); }
+
+ virtual void set_myaddrs(const entity_addrvec_t& addrs) = 0;
+
+ virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
+
+ virtual void set_auth_client(crimson::auth::AuthClient *) = 0;
+
+ virtual void set_auth_server(crimson::auth::AuthServer *) = 0;
+
+ using bind_ertr = crimson::errorator<
+ crimson::ct_error::address_in_use, // The address (range) is already bound
+ crimson::ct_error::address_not_available
+ >;
+ /// bind to the given address
+ virtual bind_ertr::future<> bind(const entity_addrvec_t& addr) = 0;
+
+ /// start the messenger
+ virtual seastar::future<> start(const dispatchers_t&) = 0;
+
+ /// either return an existing connection to the peer,
+ /// or a new pending connection
+ virtual ConnectionRef
+ connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) = 0;
+
+ ConnectionRef
+ connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) {
+ return connect(peer_addr, entity_name_t(peer_type, -1));
+ }
+
+ virtual bool owns_connection(Connection &) const = 0;
+
+ // wait for messenger shutdown
+ virtual seastar::future<> wait() = 0;
+
+ // stop dispatching events and messages
+ virtual void stop() = 0;
+
+ virtual bool is_started() const = 0;
+
+ // free internal resources before destruction, must be called after stopped,
+ // and must be called if is bound.
+ virtual seastar::future<> shutdown() = 0;
+
+ virtual void print(std::ostream& out) const = 0;
+
+ virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0;
+
+ virtual SocketPolicy get_default_policy() const = 0;
+
+ virtual void set_default_policy(const SocketPolicy& p) = 0;
+
+ virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0;
+
+ virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0;
+
+ static MessengerRef
+ create(const entity_name_t& name,
+ const std::string& lname,
+ uint64_t nonce,
+ bool dispatch_only_on_this_shard);
+
+#ifdef UNIT_TESTS_BUILT
+ virtual void set_interceptor(Interceptor *) = 0;
+#endif
+};
+
+inline std::ostream& operator<<(std::ostream& out, const Messenger& msgr) {
+ out << "[";
+ msgr.print(out);
+ out << "]";
+ return out;
+}
+
+} // namespace crimson::net
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::Messenger> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc
new file mode 100644
index 000000000..55b669384
--- /dev/null
+++ b/src/crimson/net/ProtocolV2.cc
@@ -0,0 +1,2348 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ProtocolV2.h"
+
+#include <fmt/format.h>
+#include <fmt/ranges.h>
+#include "include/msgr.h"
+#include "include/random.h"
+#include "msg/msg_fmt.h"
+
+#include "crimson/auth/AuthClient.h"
+#include "crimson/auth/AuthServer.h"
+#include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
+
+#include "Errors.h"
+#include "SocketMessenger.h"
+
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
+namespace {
+
+// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
+const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
+ (CEPH_MSGR2_FEATURE_REVISION_1 |
+ // CEPH_MSGR2_FEATURE_COMPRESSION |
+ UINT64_C(0));
+
+// Log levels in V2 Protocol:
+// * error level, something error that cause connection to terminate:
+// - fatal errors;
+// - bugs;
+// * warn level: something unusual that identifies connection fault or replacement:
+// - unstable network;
+// - incompatible peer;
+// - auth failure;
+// - connection race;
+// - connection reset;
+// * info level, something very important to show connection lifecycle,
+// which doesn't happen very frequently;
+// * debug level, important logs for debugging, including:
+// - all the messages sent/received (-->/<==);
+// - all the frames exchanged (WRITE/GOT);
+// - important fields updated (UPDATE);
+// - connection state transitions (TRIGGER);
+// * trace level, trivial logs showing:
+// - the exact bytes being sent/received (SEND/RECV(bytes));
+// - detailed information of sub-frames;
+// - integrity checks;
+// - etc.
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+[[noreturn]] void abort_in_fault() {
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+ throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+#define ABORT_IN_CLOSE(is_dispatch_reset) { \
+ do_close(is_dispatch_reset); \
+ abort_protocol(); \
+}
+
+inline void expect_tag(const Tag& expected,
+ const Tag& actual,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ if (actual != expected) {
+ logger().warn("{} {} received wrong tag: {}, expected {}",
+ conn, where,
+ static_cast<uint32_t>(actual),
+ static_cast<uint32_t>(expected));
+ abort_in_fault();
+ }
+}
+
+inline void unexpected_tag(const Tag& unexpected,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ logger().warn("{} {} received unexpected tag: {}",
+ conn, where, static_cast<uint32_t>(unexpected));
+ abort_in_fault();
+}
+
+inline uint64_t generate_client_cookie() {
+ return ceph::util::generate_random_number<uint64_t>(
+ 1, std::numeric_limits<uint64_t>::max());
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+seastar::future<> ProtocolV2::Timer::backoff(double seconds)
+{
+ logger().warn("{} waiting {} seconds ...", conn, seconds);
+ cancel();
+ last_dur_ = seconds;
+ as = seastar::abort_source();
+ auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
+ std::chrono::duration<double>(seconds));
+ return seastar::sleep_abortable(dur, *as
+ ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
+ logger().debug("{} wait aborted", conn);
+ abort_protocol();
+ });
+}
+
+ProtocolV2::ProtocolV2(SocketConnection& conn,
+ IOHandler &io_handler)
+ : conn{conn},
+ messenger{conn.messenger},
+ io_handler{io_handler},
+ frame_assembler{FrameAssemblerV2::create(conn)},
+ auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
+ protocol_timer{conn}
+{
+ io_states = io_handler.get_states();
+}
+
+ProtocolV2::~ProtocolV2() {}
+
+void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
+ const entity_name_t& _peer_name)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert(state == state_t::NONE);
+ ceph_assert(!gate.is_closed());
+ conn.peer_addr = _peer_addr;
+ conn.target_addr = _peer_addr;
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
+ client_cookie = generate_client_cookie();
+ logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
+ " policy(lossy={}, server={}, standby={}, resetcheck={})",
+ conn, _peer_addr, _peer_name, client_cookie,
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ execute_connecting();
+}
+
+void ProtocolV2::start_accept(SocketFRef&& new_socket,
+ const entity_addr_t& _peer_addr)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert(state == state_t::NONE);
+ // until we know better
+ conn.target_addr = _peer_addr;
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ is_socket_valid = true;
+ logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
+ messenger.accept_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+ auto cc_seq = crosscore.prepare_submit();
+ gate.dispatch_in_background("set_accepted_sid", conn, [this, cc_seq] {
+ return io_handler.set_accepted_sid(
+ cc_seq,
+ frame_assembler->get_socket_shard_id(),
+ seastar::make_foreign(conn.shared_from_this()));
+ });
+
+ execute_accepting();
+}
+
+void ProtocolV2::trigger_state_phase1(state_t new_state)
+{
+ ceph_assert_always(!gate.is_closed());
+ if (new_state == state) {
+ logger().error("{} is not allowed to re-trigger state {}",
+ conn, get_state_name(state));
+ ceph_abort();
+ }
+ if (state == state_t::CLOSING) {
+ logger().error("{} CLOSING is not allowed to trigger state {}",
+ conn, get_state_name(new_state));
+ ceph_abort();
+ }
+ logger().debug("{} TRIGGER {}, was {}",
+ conn, get_state_name(new_state), get_state_name(state));
+
+ if (state == state_t::READY) {
+ // from READY
+ ceph_assert_always(!need_exit_io);
+ ceph_assert_always(!pr_exit_io.has_value());
+ need_exit_io = true;
+ pr_exit_io = seastar::shared_promise<>();
+ }
+
+ if (new_state == state_t::STANDBY && !conn.policy.server) {
+ need_notify_out = true;
+ } else {
+ need_notify_out = false;
+ }
+
+ state = new_state;
+}
+
+void ProtocolV2::trigger_state_phase2(
+ state_t new_state, io_state_t new_io_state)
+{
+ ceph_assert_always(new_state == state);
+ ceph_assert_always(!gate.is_closed());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+
+ FrameAssemblerV2Ref fa;
+ if (new_state == state_t::READY) {
+ assert(new_io_state == io_state_t::open);
+ assert(io_handler.get_shard_id() ==
+ frame_assembler->get_socket_shard_id());
+ frame_assembler->set_shard_id(io_handler.get_shard_id());
+ fa = std::move(frame_assembler);
+ } else {
+ assert(new_io_state != io_state_t::open);
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+ "fa={}, set_notify_out={}",
+ conn, cc_seq, get_state_name(new_state), new_io_state,
+ fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
+ need_notify_out);
+ gate.dispatch_in_background(
+ "set_io_state", conn,
+ [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_state,
+ fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+ return io_handler.set_io_state(
+ cc_seq, new_io_state, std::move(fa), set_notify_out);
+ });
+ });
+
+ if (need_exit_io) {
+ // from READY
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
+ conn, cc_seq);
+ assert(pr_exit_io.has_value());
+ assert(new_io_state != io_state_t::open);
+ need_exit_io = false;
+ gate.dispatch_in_background("exit_io", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.wait_io_exit_dispatching(cc_seq);
+ }).then([this, cc_seq](auto ret) {
+ logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
+ conn, cc_seq, ret.io_states);
+ frame_assembler = std::move(ret.frame_assembler);
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert_always(
+ seastar::this_shard_id() == frame_assembler->get_shard_id());
+ ceph_assert_always(!frame_assembler->is_socket_valid());
+ assert(!need_exit_io);
+ io_states = ret.io_states;
+ pr_exit_io->set_value();
+ pr_exit_io = std::nullopt;
+ });
+ });
+ }
+}
+
+void ProtocolV2::fault(
+ state_t expected_state,
+ const char *where,
+ std::exception_ptr eptr)
+{
+ assert(expected_state == state_t::CONNECTING ||
+ expected_state == state_t::ESTABLISHING ||
+ expected_state == state_t::REPLACING ||
+ expected_state == state_t::READY);
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+
+ if (state != expected_state) {
+ logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
+ conn,
+ get_state_name(expected_state),
+ where,
+ get_state_name(state),
+ e_what);
+#ifndef NDEBUG
+ if (expected_state == state_t::REPLACING) {
+ assert(state == state_t::CLOSING);
+ } else if (expected_state == state_t::READY) {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING ||
+ state == state_t::CONNECTING ||
+ state == state_t::STANDBY);
+ } else {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ }
+#endif
+ return;
+ }
+ assert(state == expected_state);
+
+ if (state != state_t::CONNECTING && conn.policy.lossy) {
+ // socket will be shutdown in do_close()
+ logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
+ conn, get_state_name(state), where, e_what);
+ do_close(true);
+ return;
+ }
+
+ if (likely(has_socket)) {
+ if (likely(is_socket_valid)) {
+ ceph_assert_always(state != state_t::READY);
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ } else {
+ ceph_assert_always(state != state_t::ESTABLISHING);
+ }
+ } else { // !has_socket
+ ceph_assert_always(state == state_t::CONNECTING);
+ assert(!is_socket_valid);
+ }
+
+ if (conn.policy.server ||
+ (conn.policy.standby && !io_states.is_out_queued_or_sent())) {
+ if (conn.policy.server) {
+ logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ } else {
+ logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ }
+ execute_standby();
+ } else if (state == state_t::CONNECTING ||
+ state == state_t::REPLACING) {
+ logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ execute_wait(false);
+ } else {
+ assert(state == state_t::READY ||
+ state == state_t::ESTABLISHING);
+ logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ execute_connecting();
+ }
+}
+
+void ProtocolV2::reset_session(bool full)
+{
+ server_cookie = 0;
+ connect_seq = 0;
+ if (full) {
+ client_cookie = generate_client_cookie();
+ peer_global_seq = 0;
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::reset_session({})",
+ conn, cc_seq, full);
+ io_states.reset_session(full);
+ gate.dispatch_in_background(
+ "reset_session", conn, [this, cc_seq, full] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, full] {
+ return io_handler.reset_session(cc_seq, full);
+ });
+ });
+ // user can make changes
+}
+
+seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ProtocolV2::banner_exchange(bool is_connect)
+{
+ // 1. prepare and send banner
+ bufferlist banner_payload;
+ encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+ encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
+
+ bufferlist bl;
+ bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
+ auto len_payload = static_cast<uint16_t>(banner_payload.length());
+ encode(len_payload, bl, 0);
+ bl.claim_append(banner_payload);
+ logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
+ "required={}, banner=\"{}\"",
+ conn, bl.length(), len_payload,
+ CRIMSON_MSGR2_SUPPORTED_FEATURES,
+ CEPH_MSGR2_REQUIRED_FEATURES,
+ CEPH_BANNER_V2_PREFIX);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true
+ ).then([this, bl=std::move(bl)]() mutable {
+ return frame_assembler->write_flush(std::move(bl));
+ }
+#else
+ return frame_assembler->write_flush(std::move(bl)
+#endif
+ ).then([this] {
+ // 2. read peer banner
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false
+ ).then([this, banner_len] {
+ return frame_assembler->read_exactly(banner_len);
+ });
+#else
+ return frame_assembler->read_exactly(banner_len);
+#endif
+ }).then([this](auto bptr) {
+ // 3. process peer banner and read banner_payload
+ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ logger().debug("{} RECV({}) banner: \"{}\"",
+ conn, bptr.length(),
+ std::string(bptr.c_str(), banner_prefix_len));
+
+ if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+ if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+ logger().warn("{} peer is using V1 protocol", conn);
+ } else {
+ logger().warn("{} peer sent bad banner", conn);
+ }
+ abort_in_fault();
+ }
+
+ bptr.set_offset(bptr.offset() + banner_prefix_len);
+ bptr.set_length(bptr.length() - banner_prefix_len);
+ assert(bptr.length() == sizeof(ceph_le16));
+
+ uint16_t payload_len;
+ bufferlist buf;
+ buf.append(std::move(bptr));
+ auto ti = buf.cbegin();
+ try {
+ decode(payload_len, ti);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload len failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(
+ custom_bp_t::BANNER_PAYLOAD_READ, false
+ ).then([this, payload_len] {
+ return frame_assembler->read(payload_len);
+ });
+#else
+ return frame_assembler->read(payload_len);
+#endif
+ }).then([this, is_connect] (bufferlist bl) {
+ // 4. process peer banner_payload and send HelloFrame
+ auto p = bl.cbegin();
+ uint64_t _peer_supported_features;
+ uint64_t _peer_required_features;
+ try {
+ decode(_peer_supported_features, p);
+ decode(_peer_required_features, p);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} RECV({}) banner features: supported={} required={}",
+ conn, bl.length(),
+ _peer_supported_features, _peer_required_features);
+
+ // Check feature bit compatibility
+ uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
+ uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+ if ((required_features & _peer_supported_features) != required_features) {
+ logger().error("{} peer does not support all required features"
+ " required={} peer_supported={}",
+ conn, required_features, _peer_supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ if ((supported_features & _peer_required_features) != _peer_required_features) {
+ logger().error("{} we do not support all peer required features"
+ " peer_required={} supported={}",
+ conn, _peer_required_features, supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ peer_supported_features = _peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
+
+ auto hello = HelloFrame::Encode(messenger.get_mytype(),
+ conn.target_addr);
+ logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+ conn, ceph_entity_type_name(messenger.get_mytype()),
+ conn.target_addr);
+ return frame_assembler->write_flush_frame(hello);
+ }).then([this] {
+ //5. read peer HelloFrame
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ // 6. process peer HelloFrame
+ auto hello = HelloFrame::Decode(payload->back());
+ logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+ conn, ceph_entity_type_name(hello.entity_type()),
+ hello.peer_addr());
+ return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+ std::make_tuple(hello.entity_type(), hello.peer_addr()));
+ });
+}
+
+// CONNECTING state
+
+seastar::future<> ProtocolV2::handle_auth_reply()
+{
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::AUTH_BAD_METHOD:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_auth_bad_method() logic
+ auto bad_method = AuthBadMethodFrame::Decode(payload->back());
+ logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
+ "allowed_methods={}, allowed_modes={}",
+ conn, bad_method.method(), cpp_strerror(bad_method.result()),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_bad_method(
+ conn, *auth_meta,
+ bad_method.method(), bad_method.result(),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_bad_method returned {}",
+ conn, r);
+ abort_in_fault();
+ }
+ return client_auth(bad_method.allowed_methods());
+ });
+ case Tag::AUTH_REPLY_MORE:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_auth_reply_more() logic
+ auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ // let execute_connecting() take care of the thrown exception
+ auto reply = messenger.get_auth_client()->handle_auth_reply_more(
+ conn, *auth_meta, auth_more.auth_payload());
+ auto more_reply = AuthRequestMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
+ conn, reply.length());
+ return frame_assembler->write_flush_frame(more_reply);
+ }).then([this] {
+ return handle_auth_reply();
+ });
+ case Tag::AUTH_DONE:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_auth_done() logic
+ auto auth_done = AuthDoneFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, auth_done.global_id(),
+ ceph_con_mode_name(auth_done.con_mode()),
+ auth_done.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_done(
+ conn,
+ *auth_meta,
+ auth_done.global_id(),
+ auth_done.con_mode(),
+ auth_done.auth_payload());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
+ abort_in_fault();
+ }
+ auth_meta->con_mode = auth_done.con_mode();
+ frame_assembler->create_session_stream_handlers(*auth_meta, false);
+ return finish_auth();
+ });
+ default: {
+ unexpected_tag(ret.tag, conn, "handle_auth_reply");
+ return seastar::now();
+ }
+ }
+ });
+}
+
+seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
+{
+ // send_auth_request() logic
+ ceph_assert(messenger.get_auth_client());
+
+ try {
+ auto [auth_method, preferred_modes, bl] =
+ messenger.get_auth_client()->get_auth_request(conn, *auth_meta);
+ auth_meta->auth_method = auth_method;
+ auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
+ logger().debug("{} WRITE AuthRequestFrame: method={},"
+ " preferred_modes={}, payload_len={}",
+ conn, auth_method, preferred_modes, bl.length());
+ return frame_assembler->write_flush_frame(frame
+ ).then([this] {
+ return handle_auth_reply();
+ });
+ } catch (const crimson::auth::error& e) {
+ logger().error("{} get_initial_auth_request returned {}", conn, e.what());
+ ABORT_IN_CLOSE(true);
+ return seastar::now();
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::process_wait()
+{
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_wait() logic
+ logger().debug("{} GOT WaitFrame", conn);
+ WaitFrame::Decode(payload->back());
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_connect()
+{
+ // send_client_ident() logic
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags |= CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto client_ident = ClientIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ conn.target_addr,
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required, flags,
+ client_cookie);
+
+ logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), conn.target_addr,
+ messenger.get_myname().num(), global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, client_cookie);
+ return frame_assembler->write_flush_frame(client_ident
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::IDENT_MISSING_FEATURES:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_ident_missing_features() logic
+ auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
+ logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
+ " (client does not support all server features)",
+ conn, ident_missing.features());
+ abort_in_fault();
+ return next_step_t::none;
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SERVER_IDENT:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving SERVER_IDENT",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ // handle_server_ident() logic
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent()",
+ conn, cc_seq);
+ io_states.requeue_out_sent();
+ gate.dispatch_in_background(
+ "requeue_out_sent", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.requeue_out_sent(cc_seq);
+ });
+ });
+
+ auto server_ident = ServerIdentFrame::Decode(payload->back());
+ logger().debug("{} GOT ServerIdentFrame:"
+ " addrs={}, gid={}, gs={},"
+ " features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn,
+ server_ident.addrs(), server_ident.gid(),
+ server_ident.global_seq(),
+ server_ident.supported_features(),
+ server_ident.required_features(),
+ server_ident.flags(), server_ident.cookie());
+
+ // is this who we intended to talk to?
+ // be a bit forgiving here, since we may be connecting based on addresses parsed out
+ // of mon_host or something.
+ if (!server_ident.addrs().contains(conn.target_addr)) {
+ logger().warn("{} peer identifies as {}, does not include {}",
+ conn, server_ident.addrs(), conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ server_cookie = server_ident.cookie();
+
+ // TODO: change peer_addr to entity_addrvec_t
+ if (server_ident.addrs().front() != conn.peer_addr) {
+ logger().warn("{} peer advertises as {}, does not match {}",
+ conn, server_ident.addrs(), conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != server_ident.gid()) {
+ logger().error("{} connection peer id ({}) does not match "
+ "what it should be ({}) during connecting, close",
+ conn, server_ident.gid(), conn.get_peer_id());
+ ABORT_IN_CLOSE(true);
+ }
+ conn.set_peer_id(server_ident.gid());
+ conn.set_features(server_ident.supported_features() &
+ conn.policy.features_supported);
+ logger().debug("{} UPDATE: features={}", conn, conn.get_features());
+ peer_global_seq = server_ident.global_seq();
+
+ bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
+ conn.policy.lossy = lossy;
+ }
+ if (lossy && (connect_seq != 0 || server_cookie != 0)) {
+ logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
+ conn, connect_seq, server_cookie);
+ connect_seq = 0;
+ server_cookie = 0;
+ }
+
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(ret.tag, conn, "post_client_connect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_reconnect()
+{
+ // send_reconnect() logic
+ auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
+ client_cookie,
+ server_cookie,
+ global_seq,
+ connect_seq,
+ io_states.in_seq);
+ logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, in_seq={}",
+ conn, messenger.get_myaddrs(),
+ client_cookie, server_cookie,
+ global_seq, connect_seq, io_states.in_seq);
+ return frame_assembler->write_flush_frame(reconnect).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::SESSION_RETRY_GLOBAL:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_session_retry_global() logic
+ auto retry = RetryGlobalFrame::Decode(payload->back());
+ logger().warn("{} GOT RetryGlobalFrame: gs={}",
+ conn, retry.global_seq());
+ global_seq = messenger.get_global_seq(retry.global_seq());
+ logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RETRY:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_session_retry() logic
+ auto retry = RetryFrame::Decode(payload->back());
+ logger().warn("{} GOT RetryFrame: cs={}",
+ conn, retry.connect_seq());
+ connect_seq = retry.connect_seq() + 1;
+ logger().warn("{} UPDATE: cs={}", conn, connect_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RESET:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before reset_session()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ // handle_session_reset() logic
+ auto reset = ResetFrame::Decode(payload->back());
+ logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+
+ reset_session(reset.full());
+ // user can make changes
+
+ return client_connect();
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SESSION_RECONNECT_OK:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving RECONNECT_OK",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ // handle_reconnect_ok() logic
+ auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
+ "send {} IOHandler::requeue_out_sent_up_to()",
+ conn, reconnect_ok.msg_seq(), cc_seq);
+
+ io_states.requeue_out_sent_up_to();
+ auto msg_seq = reconnect_ok.msg_seq();
+ gate.dispatch_in_background(
+ "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq);
+ });
+ });
+
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(ret.tag, conn, "post_client_reconnect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+void ProtocolV2::execute_connecting()
+{
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::CONNECTING, io_state_t::delay);
+ gated_execute("execute_connecting", conn, [this] {
+ global_seq = messenger.get_global_seq();
+ assert(client_cookie != 0);
+ if (!conn.policy.lossy && server_cookie != 0) {
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else { // conn.policy.lossy || server_cookie == 0
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ }
+ return wait_exit_io().then([this] {
+#ifdef UNIT_TESTS_BUILT
+ // process custom_bp_t::SOCKET_CONNECTING
+ // supports CONTINUE/FAULT/BLOCK
+ if (!conn.interceptor) {
+ return seastar::now();
+ }
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}}
+ ).then([this](bp_action_t action) {
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ return seastar::now();
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ abort_in_fault();
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return conn.interceptor->blocker.block();
+ default:
+ ceph_abort("unexpected action from trap");
+ return seastar::now();
+ }
+ });;
+ }).then([this] {
+#endif
+ ceph_assert_always(frame_assembler);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before Socket::connect()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ return Socket::connect(conn.peer_addr);
+ }).then([this](SocketRef _new_socket) {
+ logger().debug("{} socket connected", conn);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during Socket::connect()",
+ conn, get_state_name(state));
+ return _new_socket->close().then([sock=std::move(_new_socket)] {
+ abort_protocol();
+ });
+ }
+ SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
+ if (!has_socket) {
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ } else {
+ gate.dispatch_in_background(
+ "replace_socket_connecting",
+ conn,
+ [this, new_socket=std::move(new_socket)]() mutable {
+ return frame_assembler->replace_shutdown_socket(std::move(new_socket));
+ }
+ );
+ }
+ is_socket_valid = true;
+ return seastar::now();
+ }).then([this] {
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(true);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ if (conn.get_peer_type() != _peer_type) {
+ logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+ conn, ceph_entity_type_name(conn.get_peer_type()),
+ ceph_entity_type_name(_peer_type));
+ ABORT_IN_CLOSE(true);
+ }
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ frame_assembler->learn_socket_ephemeral_port_as_connector(
+ _my_addr_from_peer.get_port());
+ if (unlikely(_my_addr_from_peer.is_legacy())) {
+ logger().warn("{} peer sent a legacy address for me: {}",
+ conn, _my_addr_from_peer);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return client_auth();
+ }).then([this] {
+ if (server_cookie == 0) {
+ ceph_assert(connect_seq == 0);
+ return client_connect();
+ } else {
+ ceph_assert(connect_seq > 0);
+ return client_reconnect();
+ }
+ }).then([this] (next_step_t next) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at the end of execute_connecting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ switch (next) {
+ case next_step_t::ready: {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before dispatch_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_connect()
+ crosscore.prepare_submit();
+ logger().info("{} connected: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "send {} IOHandler::dispatch_connect()",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id(), cc_seq);
+
+ // set io_handler to a new shard
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_connect(
+ cc_seq, new_io_shard, std::move(conn_fref));
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} after dispatch_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_ready();
+ });
+ }
+ case next_step_t::wait: {
+ logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+ ceph_assert_always(is_socket_valid);
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ execute_wait(true);
+ return seastar::now();
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::CONNECTING, "execute_connecting", eptr);
+ });
+ });
+}
+
+// ACCEPTING state
+
+seastar::future<> ProtocolV2::_auth_bad_method(int r)
+{
+ // _auth_bad_method() logic
+ ceph_assert(r < 0);
+ auto [allowed_methods, allowed_modes] =
+ messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
+ auto bad_method = AuthBadMethodFrame::Encode(
+ auth_meta->auth_method, r, allowed_methods, allowed_modes);
+ logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
+ "allowed_methods={}, allowed_modes={})",
+ conn, auth_meta->auth_method, cpp_strerror(r),
+ allowed_methods, allowed_modes);
+ return frame_assembler->write_flush_frame(bad_method
+ ).then([this] {
+ return server_auth();
+ });
+}
+
+seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+{
+ // _handle_auth_request() logic
+ ceph_assert(messenger.get_auth_server());
+ bufferlist reply;
+ int r = messenger.get_auth_server()->handle_auth_request(
+ conn,
+ *auth_meta,
+ more,
+ auth_meta->auth_method,
+ auth_payload,
+ &conn.peer_global_id,
+ &reply);
+ switch (r) {
+ // successful
+ case 1: {
+ auto auth_done = AuthDoneFrame::Encode(
+ conn.peer_global_id, auth_meta->con_mode, reply);
+ logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, conn.peer_global_id,
+ ceph_con_mode_name(auth_meta->con_mode), reply.length());
+ return frame_assembler->write_flush_frame(auth_done
+ ).then([this] {
+ ceph_assert(auth_meta);
+ frame_assembler->create_session_stream_handlers(*auth_meta, true);
+ return finish_auth();
+ });
+ }
+ // auth more
+ case 0: {
+ auto more = AuthReplyMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
+ conn, reply.length());
+ return frame_assembler->write_flush_frame(more
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ return _handle_auth_request(auth_more.auth_payload(), true);
+ });
+ }
+ case -EBUSY: {
+ logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
+ abort_in_fault();
+ return seastar::now();
+ }
+ default: {
+ logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
+ return _auth_bad_method(r);
+ }
+ }
+}
+
+seastar::future<> ProtocolV2::server_auth()
+{
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ // handle_auth_request() logic
+ auto request = AuthRequestFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
+ " payload_len={}",
+ conn, request.method(), request.preferred_modes(),
+ request.auth_payload().length());
+ auth_meta->auth_method = request.method();
+ auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
+ conn.get_peer_type(), auth_meta->auth_method,
+ request.preferred_modes());
+ if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
+ logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
+ return _auth_bad_method(-EOPNOTSUPP);
+ }
+ return _handle_auth_request(request.auth_payload(), false);
+ });
+}
+
+bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
+{
+ auto my_peer_name = conn.get_peer_name();
+ if (my_peer_name.type() != peer_name.type()) {
+ return false;
+ }
+ if (my_peer_name.num() != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ my_peer_name.num() != peer_name.num()) {
+ return false;
+ }
+ return true;
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_wait()
+{
+ auto wait = WaitFrame::Encode();
+ logger().debug("{} WRITE WaitFrame", conn);
+ return frame_assembler->write_flush_frame(wait
+ ).then([] {
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::reuse_connection(
+ ProtocolV2* existing_proto, bool do_reset,
+ bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
+{
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before trigger_replacing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ existing_proto->trigger_replacing(reconnect,
+ do_reset,
+ frame_assembler->to_replace(),
+ std::move(auth_meta),
+ peer_global_seq,
+ client_cookie,
+ conn.get_peer_name(),
+ conn.get_features(),
+ peer_supported_features,
+ conn_seq,
+ msg_seq);
+ ceph_assert_always(has_socket && is_socket_valid);
+ is_socket_valid = false;
+ has_socket = false;
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_replaced(
+ conn.get_local_shared_foreign_from_this());
+ }
+#endif
+ // close this connection because all the necessary information is delivered
+ // to the exisiting connection, and jump to error handling code to abort the
+ // current state.
+ ABORT_IN_CLOSE(false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
+{
+ // handle_existing_connection() logic
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_connect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_connect: racing replace happened while"
+ " replacing existing connection {}, send wait.",
+ conn, *existing_conn);
+ return send_wait();
+ }
+
+ if (existing_proto->peer_global_seq > peer_global_seq) {
+ logger().warn("{} server_connect:"
+ " this is a stale connection, because peer_global_seq({})"
+ " < existing->peer_global_seq({}), close this connection"
+ " in favor of existing connection {}",
+ conn, peer_global_seq,
+ existing_proto->peer_global_seq, *existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_conn->policy.lossy) {
+ // existing connection can be thrown out in favor of this one
+ logger().warn("{} server_connect:"
+ " existing connection {} is a lossy channel. Close existing in favor of"
+ " this connection", conn, *existing_conn);
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing(existing_conn);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+
+ if (existing_proto->server_cookie != 0) {
+ if (existing_proto->client_cookie != client_cookie) {
+ // Found previous session
+ // peer has reset and we're going to reuse the existing connection
+ // by replacing the socket
+ logger().warn("{} server_connect:"
+ " found new session (cs={})"
+ " when existing {} {} is with stale session (cs={}, ss={}),"
+ " peer must have reset",
+ conn,
+ client_cookie,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto, conn.policy.resetcheck);
+ } else {
+ // session establishment interrupted between client_ident and server_ident,
+ // continuing...
+ logger().warn("{} server_connect: found client session with existing {} {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ } else {
+ // Looks like a connection race: server and client are both connecting to
+ // each other at the same time.
+ if (existing_proto->client_cookie != client_cookie) {
+ if (existing_conn->peer_wins()) {
+ // acceptor (this connection, the peer) wins
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and win, reusing existing {} {}",
+ conn,
+ client_cookie,
+ existing_proto->client_cookie,
+ get_state_name(existing_proto->state),
+ *existing_conn);
+ return reuse_connection(existing_proto);
+ } else {
+ // acceptor (this connection, the peer) loses
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and lose to existing {}, ask client to wait",
+ conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+ return existing_conn->send_keepalive().then([this] {
+ return send_wait();
+ });
+ }
+ } else {
+ logger().warn("{} server_connect: found client session with existing {} {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_connect()
+{
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_client_ident() logic
+ auto client_ident = ClientIdentFrame::Decode(payload->back());
+ logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
+ " gid={}, gs={}, features_supported={},"
+ " features_required={}, flags={}, cookie={}",
+ conn, client_ident.addrs(), client_ident.target_addr(),
+ client_ident.gid(), client_ident.global_seq(),
+ client_ident.supported_features(),
+ client_ident.required_features(),
+ client_ident.flags(), client_ident.cookie());
+
+ if (client_ident.addrs().empty() ||
+ client_ident.addrs().front() == entity_addr_t()) {
+ logger().warn("{} oops, client_ident.addrs() is empty", conn);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
+ logger().warn("{} peer is trying to reach {} which is not us ({})",
+ conn, client_ident.target_addr(), messenger.get_myaddrs());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ conn.peer_addr = client_ident.addrs().front();
+ logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
+ conn.target_addr = conn.peer_addr;
+ if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
+ logger().warn("{} we don't know how to reconnect to peer {}",
+ conn, conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != client_ident.gid()) {
+ logger().error("{} client_ident peer_id ({}) does not match"
+ " what it should be ({}) during accepting, abort",
+ conn, client_ident.gid(), conn.get_peer_id());
+ abort_in_fault();
+ }
+ conn.set_peer_id(client_ident.gid());
+ client_cookie = client_ident.cookie();
+
+ uint64_t feat_missing =
+ (conn.policy.features_required | msgr2_required) &
+ ~(uint64_t)client_ident.supported_features();
+ if (feat_missing) {
+ auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+ logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
+ conn, feat_missing);
+ return frame_assembler->write_flush_frame(ident_missing_features
+ ).then([] {
+ return next_step_t::wait;
+ });
+ }
+ conn.set_features(client_ident.supported_features() &
+ conn.policy.features_supported);
+ logger().debug("{} UPDATE: features={}", conn, conn.get_features());
+
+ peer_global_seq = client_ident.global_seq();
+
+ bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
+ conn, conn.policy.lossy, lossy);
+ }
+
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (existing_conn) {
+ return handle_existing_connection(existing_conn);
+ } else {
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing(nullptr);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::read_reconnect()
+{
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect");
+ return server_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry(uint64_t connect_seq)
+{
+ auto retry = RetryFrame::Encode(connect_seq);
+ logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry_global(uint64_t global_seq)
+{
+ auto retry = RetryGlobalFrame::Encode(global_seq);
+ logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_reset(bool full)
+{
+ auto reset = ResetFrame::Encode(full);
+ logger().warn("{} WRITE ResetFrame: full={}", conn, full);
+ return frame_assembler->write_flush_frame(reset
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
+ return server_connect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_reconnect()
+{
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_reconnect() logic
+ auto reconnect = ReconnectFrame::Decode(payload->back());
+
+ logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ conn, reconnect.addrs(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ reconnect.global_seq(), reconnect.connect_seq(),
+ reconnect.msg_seq());
+
+ // can peer_addrs be changed on-the-fly?
+ // TODO: change peer_addr to entity_addrvec_t
+ entity_addr_t paddr = reconnect.addrs().front();
+ if (paddr.is_msgr2() || paddr.is_any()) {
+ // good
+ } else {
+ logger().warn("{} peer's address {} is not v2", conn, paddr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.peer_addr == entity_addr_t()) {
+ conn.peer_addr = paddr;
+ } else if (conn.peer_addr != paddr) {
+ logger().error("{} peer identifies as {}, while conn.peer_addr={},"
+ " reconnect failed",
+ conn, paddr, conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ peer_global_seq = reconnect.global_seq();
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (!existing_conn) {
+ // there is no existing connection therefore cannot reconnect to previous
+ // session
+ logger().warn("{} server_reconnect: no existing connection from address {},"
+ " reseting client", conn, conn.peer_addr);
+ return send_reset(true);
+ }
+
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, reconnect.connect_seq(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ fmt::ptr(existing_conn.get()),
+ get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_reconnect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_reconnect: racing replace happened while "
+ " replacing existing connection {}, retry global.",
+ conn, *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->client_cookie != reconnect.client_cookie()) {
+ logger().warn("{} server_reconnect:"
+ " client_cookie mismatch with existing connection {},"
+ " cc={} rcc={}. I must have reset, reseting client.",
+ conn, *existing_conn,
+ existing_proto->client_cookie, reconnect.client_cookie());
+ return send_reset(conn.policy.resetcheck);
+ } else if (existing_proto->server_cookie == 0) {
+ // this happens when:
+ // - a connects to b
+ // - a sends client_ident
+ // - b gets client_ident, sends server_ident and sets cookie X
+ // - connection fault
+ // - b reconnects to a with cookie X, connect_seq=1
+ // - a has cookie==0
+ logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
+ " server_ident with existing connection {}."
+ " Asking peer to resume session establishment",
+ conn, existing_proto->client_cookie, *existing_conn);
+ return send_reset(false);
+ }
+
+ if (existing_proto->peer_global_seq > reconnect.global_seq()) {
+ logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
+ " with existing connection {},"
+ " ask client to retry global",
+ conn, existing_proto->peer_global_seq,
+ reconnect.global_seq(), *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->connect_seq > reconnect.connect_seq()) {
+ logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
+ " with existing connection {}, ask client to retry",
+ conn, reconnect.connect_seq(),
+ existing_proto->connect_seq, *existing_conn);
+ return send_retry(existing_proto->connect_seq);
+ } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
+ // reconnect race: both peers are sending reconnect messages
+ if (existing_conn->peer_wins()) {
+ // acceptor (this connection, the peer) wins
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and win, reusing existing {} {}",
+ conn,
+ reconnect.connect_seq(),
+ get_state_name(existing_proto->state),
+ *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ } else {
+ // acceptor (this connection, the peer) loses
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and lose to existing {}, ask client to wait",
+ conn, reconnect.connect_seq(), *existing_conn);
+ return send_wait();
+ }
+ } else { // existing_proto->connect_seq < reconnect.connect_seq()
+ logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
+ " reusing existing {} {}",
+ conn,
+ existing_proto->connect_seq,
+ reconnect.connect_seq(),
+ get_state_name(existing_proto->state),
+ *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ }
+ });
+}
+
+void ProtocolV2::execute_accepting()
+{
+ assert(is_socket_valid);
+ trigger_state(state_t::ACCEPTING, io_state_t::none);
+ gate.dispatch_in_background("execute_accepting", conn, [this] {
+ return seastar::futurize_invoke([this] {
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ // only notify socket accepted
+ gate.dispatch_in_background(
+ "test_intercept_socket_accepted", conn, [this] {
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}}
+ ).then([](bp_action_t action) {
+ ceph_assert(action == bp_action_t::CONTINUE);
+ });
+ });
+ }
+#endif
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(false);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ ceph_assert(conn.get_peer_type() == 0);
+ conn.set_peer_type(_peer_type);
+
+ conn.policy = messenger.get_policy(_peer_type);
+ logger().info("{} UPDATE: peer_type={},"
+ " policy(lossy={} server={} standby={} resetcheck={})",
+ conn, ceph_entity_type_name(_peer_type),
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ if (!messenger.get_myaddr().is_blank_ip() &&
+ (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
+ logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+ conn, _my_addr_from_peer, messenger.get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return server_auth();
+ }).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::CLIENT_IDENT:
+ return server_connect();
+ case Tag::SESSION_RECONNECT:
+ return server_reconnect();
+ default: {
+ unexpected_tag(ret.tag, conn, "post_server_auth");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready:
+ assert(state != state_t::ACCEPTING);
+ break;
+ case next_step_t::wait:
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} at the end of execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+ execute_server_wait();
+ break;
+ default:
+ ceph_abort("impossible next step");
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), e_what);
+ do_close(false);
+ });
+ });
+}
+
+// CONNECTING or ACCEPTING state
+
+seastar::future<> ProtocolV2::finish_auth()
+{
+ ceph_assert(auth_meta);
+
+ auto records = frame_assembler->stop_recording();
+ const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
+ auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
+ auto sig_frame = AuthSignatureFrame::Encode(sig);
+ logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
+ return frame_assembler->write_flush_frame(sig_frame
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
+ return frame_assembler->read_frame_payload();
+ }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
+ // handle_auth_signature() logic
+ auto sig_frame = AuthSignatureFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
+
+ const auto actual_tx_sig = auth_meta->session_key.empty() ?
+ sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
+ if (sig_frame.signature() != actual_tx_sig) {
+ logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
+ " sig_frame.signature()={}",
+ conn, actual_tx_sig, sig_frame.signature());
+ abort_in_fault();
+ }
+ });
+}
+
+// ESTABLISHING
+
+void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
+ auto accept_me = [this] {
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ };
+
+ ceph_assert_always(is_socket_valid);
+ trigger_state(state_t::ESTABLISHING, io_state_t::delay);
+ bool is_replace;
+ if (existing_conn) {
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "close existing {}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ io_states, frame_assembler->get_socket_shard_id(),
+ *existing_conn);
+ is_replace = true;
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ existing_proto->do_close(
+ true, // is_dispatch_reset
+ std::move(accept_me));
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().warn("{} triggered {} during execute_establishing(), "
+ "the accept event will not be delivered!",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ } else {
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "no existing",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id());
+ is_replace = false;
+ accept_me();
+ }
+
+ gated_execute("execute_establishing", conn, [this, is_replace] {
+ ceph_assert_always(state == state_t::ESTABLISHING);
+
+ // set io_handler to a new shard
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard, is_replace,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ cc_seq, new_io_shard, std::move(conn_fref), is_replace);
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ return send_server_ident();
+ }).then([this] {
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} at the end of execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} established, going to ready", conn);
+ execute_ready();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::ESTABLISHING, "execute_establishing", eptr);
+ });
+ });
+}
+
+// ESTABLISHING or REPLACING state
+
+seastar::future<>
+ProtocolV2::send_server_ident()
+{
+ ceph_assert_always(state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING);
+ // send_server_ident() logic
+
+ // refered to async-conn v2: not assign gs to global_seq
+ global_seq = messenger.get_global_seq();
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} UPDATE: gs={} for server ident, "
+ "send {} IOHandler::reset_peer_state()",
+ conn, global_seq, cc_seq);
+
+ // this is required for the case when this connection is being replaced
+ io_states.reset_peer_state();
+ gate.dispatch_in_background(
+ "reset_peer_state", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.reset_peer_state(cc_seq);
+ });
+ });
+
+ if (!conn.policy.lossy) {
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
+
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags = flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto server_ident = ServerIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags,
+ server_cookie);
+
+ logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+ global_seq, conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, server_cookie);
+
+ return frame_assembler->write_flush_frame(server_ident);
+}
+
+// REPLACING state
+
+void ProtocolV2::trigger_replacing(bool reconnect,
+ bool do_reset,
+ FrameAssemblerV2::mover_t &&mover,
+ AuthConnectionMetaRef&& new_auth_meta,
+ uint64_t new_peer_global_seq,
+ uint64_t new_client_cookie,
+ entity_name_t new_peer_name,
+ uint64_t new_conn_features,
+ uint64_t new_peer_supported_features,
+ uint64_t new_connect_seq,
+ uint64_t new_msg_seq)
+{
+ ceph_assert_always(state >= state_t::ESTABLISHING);
+ ceph_assert_always(state <= state_t::WAIT);
+ ceph_assert_always(has_socket || state == state_t::CONNECTING);
+ // mover.socket shouldn't be shutdown
+
+ logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
+ "client_cookie was {}, {}, new_sid={}",
+ conn, reconnect ? "reconnected" : "connected",
+ peer_global_seq, connect_seq, client_cookie,
+ io_states, mover.socket->get_shard_id());
+ if (is_socket_valid) {
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ }
+ trigger_state_phase1(state_t::REPLACING);
+ gate.dispatch_in_background(
+ "trigger_replacing",
+ conn,
+ [this,
+ reconnect,
+ do_reset,
+ mover = std::move(mover),
+ new_auth_meta = std::move(new_auth_meta),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_supported_features,
+ new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ ceph_assert_always(state == state_t::REPLACING);
+ auto new_io_shard = mover.socket->get_shard_id();
+ // state may become CLOSING below, but we cannot abort the chain until
+ // mover.socket is correctly handled (closed or replaced).
+
+ // this is preemptive
+ return wait_switch_io_shard(
+ ).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
+ return wait_exit_io();
+ }).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ ceph_assert_always(frame_assembler);
+ protocol_timer.cancel();
+ auto done = std::move(execution_done);
+ execution_done = seastar::now();
+ return done;
+ }).then([this, new_io_shard] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ // set io_handler to a new shard
+ // we should prevent parallel switching core attemps
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ cc_seq, new_io_shard, std::move(conn_fref), false);
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+ });
+ }).then([this,
+ reconnect,
+ do_reset,
+ mover = std::move(mover),
+ new_auth_meta = std::move(new_auth_meta),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_supported_features,
+ new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ if (state == state_t::REPLACING && do_reset) {
+ reset_session(true);
+ // user can make changes
+ }
+
+ if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
+ conn, get_state_name(state));
+ ceph_assert_always(state == state_t::CLOSING);
+ return mover.socket->close(
+ ).then([sock = std::move(mover.socket)] {
+ abort_protocol();
+ });
+ }
+
+ auth_meta = std::move(new_auth_meta);
+ peer_global_seq = new_peer_global_seq;
+ gate.dispatch_in_background(
+ "replace_frame_assembler",
+ conn,
+ [this, mover=std::move(mover)]() mutable {
+ return frame_assembler->replace_by(std::move(mover));
+ }
+ );
+ is_socket_valid = true;
+ has_socket = true;
+
+ if (reconnect) {
+ connect_seq = new_connect_seq;
+ // send_reconnect_ok() logic
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
+ conn, cc_seq, new_msg_seq);
+ io_states.requeue_out_sent_up_to();
+ gate.dispatch_in_background(
+ "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq);
+ });
+ });
+
+ auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
+ return frame_assembler->write_flush_frame(reconnect_ok);
+ } else {
+ client_cookie = new_client_cookie;
+ assert(conn.get_peer_type() == new_peer_name.type());
+ if (conn.get_peer_id() == entity_name_t::NEW) {
+ conn.set_peer_id(new_peer_name.num());
+ }
+ conn.set_features(new_conn_features);
+ peer_supported_features = new_peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
+ return send_server_ident();
+ }
+ }).then([this, reconnect] {
+ if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
+ conn, get_state_name(state));
+ ceph_assert_always(state == state_t::CLOSING);
+ abort_protocol();
+ }
+ logger().info("{} replaced ({}), going to ready: "
+ "gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
+ conn, reconnect ? "reconnected" : "connected",
+ global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states);
+ execute_ready();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::REPLACING, "trigger_replacing", eptr);
+ });
+ });
+}
+
+// READY state
+
+seastar::future<> ProtocolV2::notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr eptr,
+ io_handler_state _io_states)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out_fault(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, where, eptr, _io_states] {
+ return notify_out_fault(cc_seq, where, eptr, _io_states);
+ });
+ }
+
+ io_states = _io_states;
+ logger().debug("{} got {} notify_out_fault(): io_states={}",
+ conn, cc_seq, io_states);
+ fault(state_t::READY, where, eptr);
+ return seastar::now();
+}
+
+void ProtocolV2::execute_ready()
+{
+ assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
+ protocol_timer.cancel();
+ ceph_assert_always(is_socket_valid);
+ // I'm not responsible to shutdown the socket at READY
+ is_socket_valid = false;
+ trigger_state(state_t::READY, io_state_t::open);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ // FIXME: doesn't support cross-core
+ conn.interceptor->register_conn_ready(
+ conn.get_local_shared_foreign_from_this());
+ }
+#endif
+}
+
+// STANDBY state
+
+void ProtocolV2::execute_standby()
+{
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::STANDBY, io_state_t::delay);
+}
+
+seastar::future<> ProtocolV2::notify_out(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_out(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_out(): at {}",
+ conn, cc_seq, get_state_name(state));
+ io_states.is_out_queued = true;
+ if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
+ logger().info("{} notify_out(): at {}, going to CONNECTING",
+ conn, get_state_name(state));
+ execute_connecting();
+ }
+ return seastar::now();
+}
+
+// WAIT state
+
+void ProtocolV2::execute_wait(bool max_backoff)
+{
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::WAIT, io_state_t::delay);
+ gated_execute("execute_wait", conn, [this, max_backoff] {
+ double backoff = protocol_timer.last_dur();
+ if (max_backoff) {
+ backoff = local_conf().get_val<double>("ms_max_backoff");
+ } else if (backoff > 0) {
+ backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
+ } else {
+ backoff = local_conf().get_val<double>("ms_initial_backoff");
+ }
+ return protocol_timer.backoff(backoff).then([this] {
+ if (unlikely(state != state_t::WAIT)) {
+ logger().debug("{} triggered {} at the end of execute_wait()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_wait(): going to CONNECTING", conn);
+ execute_connecting();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_wait(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), e_what);
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ });
+ });
+}
+
+// SERVER_WAIT state
+
+void ProtocolV2::execute_server_wait()
+{
+ ceph_assert_always(is_socket_valid);
+ trigger_state(state_t::SERVER_WAIT, io_state_t::none);
+ gated_execute("execute_server_wait", conn, [this] {
+ return frame_assembler->read_exactly(1
+ ).then([this](auto bptr) {
+ logger().warn("{} SERVER_WAIT got read, abort", conn);
+ abort_in_fault();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), e_what);
+ do_close(false);
+ });
+ });
+}
+
+// CLOSING state
+
+seastar::future<> ProtocolV2::notify_mark_down(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_mark_down(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_mark_down(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_mark_down()",
+ conn, cc_seq);
+ do_close(false);
+ return seastar::now();
+}
+
+seastar::future<> ProtocolV2::close_clean_yielded()
+{
+ // yield() so that do_close() can be called *after* close_clean_yielded() is
+ // applied to all connections in a container using
+ // seastar::parallel_for_each(). otherwise, we could erase a connection in
+ // the container when seastar::parallel_for_each() is still iterating in it.
+ // that'd lead to a segfault.
+ return seastar::yield(
+ ).then([this] {
+ do_close(false);
+ return pr_closed_clean.get_shared_future();
+
+ // connection may be unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).finally([conn_ref = conn.shared_from_this()] {});;
+}
+
+void ProtocolV2::do_close(
+ bool is_dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
+{
+ if (state == state_t::CLOSING) {
+ // already closing
+ return;
+ }
+
+ bool is_replace = f_accept_new ? true : false;
+ logger().info("{} closing: reset {}, replace {}", conn,
+ is_dispatch_reset ? "yes" : "no",
+ is_replace ? "yes" : "no");
+
+ /*
+ * atomic operations
+ */
+
+ ceph_assert_always(!gate.is_closed());
+
+ // messenger registrations, must before user events
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
+ messenger.unregister_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else {
+ // cannot happen
+ ceph_assert(false);
+ }
+ if (f_accept_new) {
+ // the replacing connection must be registerred after the replaced
+ // connection is unreigsterred.
+ (*f_accept_new)();
+ }
+
+ protocol_timer.cancel();
+ if (is_socket_valid) {
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ }
+
+ trigger_state_phase1(state_t::CLOSING);
+ gate.dispatch_in_background(
+ "close_io", conn, [this, is_dispatch_reset, is_replace] {
+ // this is preemptive
+ return wait_switch_io_shard(
+ ).then([this, is_dispatch_reset, is_replace] {
+ trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
+ conn, cc_seq, is_dispatch_reset, is_replace);
+
+ std::ignore = gate.close(
+ ).then([this] {
+ ceph_assert_always(!need_exit_io);
+ ceph_assert_always(!pr_exit_io.has_value());
+ if (has_socket) {
+ ceph_assert_always(frame_assembler);
+ return frame_assembler->close_shutdown_socket();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ pr_closed_clean.set_value();
+#ifdef UNIT_TESTS_BUILT
+ closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(
+ conn.get_local_shared_foreign_from_this());
+ }
+#endif
+ // connection is unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing got unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
+ });
+
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, is_dispatch_reset, is_replace] {
+ return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace);
+ });
+ // user can make changes
+ });
+ });
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h
new file mode 100644
index 000000000..dd7a1e703
--- /dev/null
+++ b/src/crimson/net/ProtocolV2.h
@@ -0,0 +1,328 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sleep.hh>
+
+#include "io_handler.h"
+
+namespace crimson::net {
+
+class ProtocolV2 final : public HandshakeListener {
+ using AuthConnectionMetaRef = seastar::lw_shared_ptr<AuthConnectionMeta>;
+
+public:
+ ProtocolV2(SocketConnection &,
+ IOHandler &);
+
+ ~ProtocolV2() final;
+
+ ProtocolV2(const ProtocolV2 &) = delete;
+ ProtocolV2(ProtocolV2 &&) = delete;
+ ProtocolV2 &operator=(const ProtocolV2 &) = delete;
+ ProtocolV2 &operator=(ProtocolV2 &&) = delete;
+
+/**
+ * as HandshakeListener
+ */
+private:
+ seastar::future<> notify_out(
+ crosscore_t::seq_t cc_seq) final;
+
+ seastar::future<> notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr,
+ io_handler_state) final;
+
+ seastar::future<> notify_mark_down(
+ crosscore_t::seq_t cc_seq) final;
+
+/*
+* as ProtocolV2 to be called by SocketConnection
+*/
+public:
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name);
+
+ void start_accept(SocketFRef&& socket,
+ const entity_addr_t& peer_addr);
+
+ seastar::future<> close_clean_yielded();
+
+#ifdef UNIT_TESTS_BUILT
+ bool is_ready() const {
+ return state == state_t::READY;
+ }
+
+ bool is_standby() const {
+ return state == state_t::STANDBY;
+ }
+
+ bool is_closed_clean() const {
+ return closed_clean;
+ }
+
+ bool is_closed() const {
+ return state == state_t::CLOSING;
+ }
+
+#endif
+private:
+ using io_state_t = IOHandler::io_state_t;
+
+ seastar::future<> wait_switch_io_shard() {
+ if (pr_switch_io_shard.has_value()) {
+ return pr_switch_io_shard->get_shared_future();
+ } else {
+ return seastar::now();
+ }
+ }
+
+ seastar::future<> wait_exit_io() {
+ if (pr_exit_io.has_value()) {
+ return pr_exit_io->get_shared_future();
+ } else {
+ assert(!need_exit_io);
+ return seastar::now();
+ }
+ }
+
+ enum class state_t {
+ NONE = 0,
+ ACCEPTING,
+ SERVER_WAIT,
+ ESTABLISHING,
+ CONNECTING,
+ READY,
+ STANDBY,
+ WAIT,
+ REPLACING,
+ CLOSING
+ };
+
+ static const char *get_state_name(state_t state) {
+ const char *const statenames[] = {"NONE",
+ "ACCEPTING",
+ "SERVER_WAIT",
+ "ESTABLISHING",
+ "CONNECTING",
+ "READY",
+ "STANDBY",
+ "WAIT",
+ "REPLACING",
+ "CLOSING"};
+ return statenames[static_cast<int>(state)];
+ }
+
+ void trigger_state_phase1(state_t new_state);
+
+ void trigger_state_phase2(state_t new_state, io_state_t new_io_state);
+
+ void trigger_state(state_t new_state, io_state_t new_io_state) {
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ trigger_state_phase1(new_state);
+ trigger_state_phase2(new_state, new_io_state);
+ }
+
+ template <typename Func, typename T>
+ void gated_execute(const char *what, T &who, Func &&func) {
+ gate.dispatch_in_background(what, who, [this, &who, &func] {
+ if (!execution_done.available()) {
+ // discard the unready future
+ gate.dispatch_in_background(
+ "gated_execute_abandon",
+ who,
+ [fut=std::move(execution_done)]() mutable {
+ return std::move(fut);
+ }
+ );
+ }
+ seastar::promise<> pr;
+ execution_done = pr.get_future();
+ return seastar::futurize_invoke(std::forward<Func>(func)
+ ).finally([pr=std::move(pr)]() mutable {
+ pr.set_value();
+ });
+ });
+ }
+
+ void fault(state_t expected_state,
+ const char *where,
+ std::exception_ptr eptr);
+
+ void reset_session(bool is_full);
+ seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ banner_exchange(bool is_connect);
+
+ enum class next_step_t {
+ ready,
+ wait,
+ none, // protocol should have been aborted or failed
+ };
+
+ // CONNECTING (client)
+ seastar::future<> handle_auth_reply();
+ inline seastar::future<> client_auth() {
+ std::vector<uint32_t> empty;
+ return client_auth(empty);
+ }
+ seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
+
+ seastar::future<next_step_t> process_wait();
+ seastar::future<next_step_t> client_connect();
+ seastar::future<next_step_t> client_reconnect();
+ void execute_connecting();
+
+ // ACCEPTING (server)
+ seastar::future<> _auth_bad_method(int r);
+ seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
+ seastar::future<> server_auth();
+
+ bool validate_peer_name(const entity_name_t& peer_name) const;
+ seastar::future<next_step_t> send_wait();
+ seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
+ bool do_reset=false,
+ bool reconnect=false,
+ uint64_t conn_seq=0,
+ uint64_t msg_seq=0);
+
+ seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn);
+ seastar::future<next_step_t> server_connect();
+
+ seastar::future<next_step_t> read_reconnect();
+ seastar::future<next_step_t> send_retry(uint64_t connect_seq);
+ seastar::future<next_step_t> send_retry_global(uint64_t global_seq);
+ seastar::future<next_step_t> send_reset(bool full);
+ seastar::future<next_step_t> server_reconnect();
+
+ void execute_accepting();
+
+ // CONNECTING/ACCEPTING
+ seastar::future<> finish_auth();
+
+ // ESTABLISHING
+ void execute_establishing(SocketConnectionRef existing_conn);
+
+ // ESTABLISHING/REPLACING (server)
+ seastar::future<> send_server_ident();
+
+ // REPLACING (server)
+ void trigger_replacing(bool reconnect,
+ bool do_reset,
+ FrameAssemblerV2::mover_t &&mover,
+ AuthConnectionMetaRef&& new_auth_meta,
+ uint64_t new_peer_global_seq,
+ // !reconnect
+ uint64_t new_client_cookie,
+ entity_name_t new_peer_name,
+ uint64_t new_conn_features,
+ uint64_t new_peer_supported_features,
+ // reconnect
+ uint64_t new_connect_seq,
+ uint64_t new_msg_seq);
+
+ // READY
+ void execute_ready();
+
+ // STANDBY
+ void execute_standby();
+
+ // WAIT
+ void execute_wait(bool max_backoff);
+
+ // SERVER_WAIT
+ void execute_server_wait();
+
+ // CLOSING
+ // reentrant
+ void do_close(bool is_dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new=std::nullopt);
+
+private:
+ SocketConnection &conn;
+
+ SocketMessenger &messenger;
+
+ IOHandler &io_handler;
+
+ // asynchronously populated from io_handler
+ io_handler_state io_states;
+
+ crosscore_t crosscore;
+
+ bool has_socket = false;
+
+ // the socket exists and it is not shutdown
+ bool is_socket_valid = false;
+
+ FrameAssemblerV2Ref frame_assembler;
+
+ bool need_notify_out = false;
+
+ std::optional<seastar::shared_promise<>> pr_switch_io_shard;
+
+ bool need_exit_io = false;
+
+ std::optional<seastar::shared_promise<>> pr_exit_io;
+
+ AuthConnectionMetaRef auth_meta;
+
+ crimson::common::Gated gate;
+
+ seastar::shared_promise<> pr_closed_clean;
+
+#ifdef UNIT_TESTS_BUILT
+ bool closed_clean = false;
+
+#endif
+ state_t state = state_t::NONE;
+
+ uint64_t peer_supported_features = 0;
+
+ uint64_t client_cookie = 0;
+ uint64_t server_cookie = 0;
+ uint64_t global_seq = 0;
+ uint64_t peer_global_seq = 0;
+ uint64_t connect_seq = 0;
+
+ seastar::future<> execution_done = seastar::now();
+
+ class Timer {
+ double last_dur_ = 0.0;
+ const SocketConnection& conn;
+ std::optional<seastar::abort_source> as;
+ public:
+ Timer(SocketConnection& conn) : conn(conn) {}
+ double last_dur() const { return last_dur_; }
+ seastar::future<> backoff(double seconds);
+ void cancel() {
+ last_dur_ = 0.0;
+ if (as) {
+ as->request_abort();
+ as = std::nullopt;
+ }
+ }
+ };
+ Timer protocol_timer;
+};
+
+struct create_handlers_ret {
+ std::unique_ptr<ConnectionHandler> io_handler;
+ std::unique_ptr<ProtocolV2> protocol;
+};
+inline create_handlers_ret create_handlers(ChainedDispatchers &dispatchers, SocketConnection &conn) {
+ std::unique_ptr<ConnectionHandler> io_handler = std::make_unique<IOHandler>(dispatchers, conn);
+ IOHandler &io_handler_concrete = static_cast<IOHandler&>(*io_handler);
+ auto protocol = std::make_unique<ProtocolV2>(conn, io_handler_concrete);
+ io_handler_concrete.set_handshake_listener(*protocol);
+ return {std::move(io_handler), std::move(protocol)};
+}
+
+} // namespace crimson::net
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::ProtocolV2> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc
new file mode 100644
index 000000000..95b1e2250
--- /dev/null
+++ b/src/crimson/net/Socket.cc
@@ -0,0 +1,523 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Socket.h"
+
+#include <seastar/core/sleep.hh>
+#include <seastar/core/when_all.hh>
+#include <seastar/net/packet.hh>
+
+#include "crimson/common/log.h"
+#include "Errors.h"
+
+using crimson::common::local_conf;
+
+namespace crimson::net {
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+using tmp_buf = seastar::temporary_buffer<char>;
+using packet = seastar::net::packet;
+
+// an input_stream consumer that reads buffer segments into a bufferlist up to
+// the given number of remaining bytes
+struct bufferlist_consumer {
+ bufferlist& bl;
+ size_t& remaining;
+
+ bufferlist_consumer(bufferlist& bl, size_t& remaining)
+ : bl(bl), remaining(remaining) {}
+
+ using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
+
+ // consume some or all of a buffer segment
+ seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
+ if (remaining >= data.size()) {
+ // consume the whole buffer
+ remaining -= data.size();
+ bl.append(buffer::create(std::move(data)));
+ if (remaining > 0) {
+ // return none to request more segments
+ return seastar::make_ready_future<consumption_result_type>(
+ seastar::continue_consuming{});
+ } else {
+ // return an empty buffer to singal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type({}));
+ }
+ }
+ if (remaining > 0) {
+ // consume the front
+ bl.append(buffer::create(data.share(0, remaining)));
+ data.trim_front(remaining);
+ remaining = 0;
+ }
+ // give the rest back to signal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type{std::move(data)});
+ };
+};
+
+seastar::future<> inject_delay()
+{
+ if (float delay_period = local_conf()->ms_inject_internal_delays;
+ delay_period) {
+ logger().debug("Socket::inject_delay: sleep for {}", delay_period);
+ return seastar::sleep(
+ std::chrono::milliseconds((int)(delay_period * 1000.0)));
+ }
+ return seastar::now();
+}
+
+void inject_failure()
+{
+ if (local_conf()->ms_inject_socket_failures) {
+ uint64_t rand =
+ ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
+ if (rand % local_conf()->ms_inject_socket_failures == 0) {
+ logger().warn("Socket::inject_failure: injecting socket failure");
+ throw std::system_error(make_error_code(
+ error::negotiation_failure));
+ }
+ }
+}
+
+} // anonymous namespace
+
+Socket::Socket(
+ seastar::connected_socket &&_socket,
+ side_t _side,
+ uint16_t e_port,
+ construct_tag)
+ : sid{seastar::this_shard_id()},
+ socket(std::move(_socket)),
+ in(socket.input()),
+ // the default buffer size 8192 is too small that may impact our write
+ // performance. see seastar::net::connected_socket::output()
+ out(socket.output(65536)),
+ socket_is_shutdown(false),
+ side(_side),
+ ephemeral_port(e_port)
+{
+ if (local_conf()->ms_tcp_nodelay) {
+ socket.set_nodelay(true);
+ }
+}
+
+Socket::~Socket()
+{
+ assert(seastar::this_shard_id() == sid);
+#ifndef NDEBUG
+ assert(closed);
+#endif
+}
+
+seastar::future<bufferlist>
+Socket::read(size_t bytes)
+{
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_read).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferlist>();
+ }
+ r.buffer.clear();
+ r.remaining = bytes;
+ return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
+ if (r.remaining) { // throw on short reads
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ inject_failure();
+ return inject_delay().then([this] {
+ return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+ });
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this](auto buf) {
+ return try_trap_post(next_trap_read
+ ).then([buf = std::move(buf)]() mutable {
+ return std::move(buf);
+ });
+ });
+#endif
+}
+
+seastar::future<bufferptr>
+Socket::read_exactly(size_t bytes) {
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_read).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferptr>();
+ }
+ return in.read_exactly(bytes).then([bytes](auto buf) {
+ bufferptr ptr(buffer::create(buf.share()));
+ if (ptr.length() < bytes) {
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ inject_failure();
+ return inject_delay(
+ ).then([ptr = std::move(ptr)]() mutable {
+ return seastar::make_ready_future<bufferptr>(std::move(ptr));
+ });
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this](auto ptr) {
+ return try_trap_post(next_trap_read
+ ).then([ptr = std::move(ptr)]() mutable {
+ return std::move(ptr);
+ });
+ });
+#endif
+}
+
+seastar::future<>
+Socket::write(bufferlist buf)
+{
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write
+ ).then([buf = std::move(buf), this]() mutable {
+#endif
+ inject_failure();
+ return inject_delay(
+ ).then([buf = std::move(buf), this]() mutable {
+ packet p(std::move(buf));
+ return out.write(std::move(p));
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+}
+
+seastar::future<>
+Socket::flush()
+{
+ assert(seastar::this_shard_id() == sid);
+ inject_failure();
+ return inject_delay().then([this] {
+ return out.flush();
+ });
+}
+
+seastar::future<>
+Socket::write_flush(bufferlist buf)
+{
+ assert(seastar::this_shard_id() == sid);
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write
+ ).then([buf = std::move(buf), this]() mutable {
+#endif
+ inject_failure();
+ return inject_delay(
+ ).then([buf = std::move(buf), this]() mutable {
+ packet p(std::move(buf));
+ return out.write(std::move(p)
+ ).then([this] {
+ return out.flush();
+ });
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+}
+
+void Socket::shutdown()
+{
+ assert(seastar::this_shard_id() == sid);
+ socket_is_shutdown = true;
+ socket.shutdown_input();
+ socket.shutdown_output();
+}
+
+static inline seastar::future<>
+close_and_handle_errors(seastar::output_stream<char>& out)
+{
+ return out.close().handle_exception_type([](const std::system_error& e) {
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset) {
+ logger().error("Socket::close(): unexpected error {}", e.what());
+ ceph_abort();
+ }
+ // can happen when out is already shutdown, ignore
+ });
+}
+
+seastar::future<>
+Socket::close()
+{
+ assert(seastar::this_shard_id() == sid);
+#ifndef NDEBUG
+ ceph_assert_always(!closed);
+ closed = true;
+#endif
+ return seastar::when_all_succeed(
+ inject_delay(),
+ in.close(),
+ close_and_handle_errors(out)
+ ).then_unpack([] {
+ return seastar::make_ready_future<>();
+ }).handle_exception([](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("Socket::close(): unexpected exception {}", e_what);
+ ceph_abort();
+ });
+}
+
+seastar::future<SocketRef>
+Socket::connect(const entity_addr_t &peer_addr)
+{
+ inject_failure();
+ return inject_delay(
+ ).then([peer_addr] {
+ return seastar::connect(peer_addr.in4_addr());
+ }).then([peer_addr](seastar::connected_socket socket) {
+ auto ret = std::make_unique<Socket>(
+ std::move(socket), side_t::connector, 0, construct_tag{});
+ logger().debug("Socket::connect(): connected to {}, socket {}",
+ peer_addr, fmt::ptr(ret));
+ return ret;
+ });
+}
+
+#ifdef UNIT_TESTS_BUILT
+void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
+ assert(seastar::this_shard_id() == sid);
+ blocker = blocker_;
+ if (type == bp_type_t::READ) {
+ ceph_assert_always(next_trap_read == bp_action_t::CONTINUE);
+ next_trap_read = action;
+ } else { // type == bp_type_t::WRITE
+ if (next_trap_write == bp_action_t::CONTINUE) {
+ next_trap_write = action;
+ } else if (next_trap_write == bp_action_t::FAULT) {
+ // do_sweep_messages() may combine multiple write events into one socket write
+ ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
+ } else {
+ ceph_abort();
+ }
+ }
+}
+
+seastar::future<>
+Socket::try_trap_pre(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return blocker->block();
+ case bp_action_t::STALL:
+ trap = action;
+ break;
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ return seastar::make_ready_future<>();
+}
+
+seastar::future<>
+Socket::try_trap_post(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::STALL:
+ logger().info("[Test] got STALL and block");
+ force_shutdown();
+ return blocker->block();
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ return seastar::make_ready_future<>();
+}
+#endif
+
+ShardedServerSocket::ShardedServerSocket(
+ seastar::shard_id sid,
+ bool dispatch_only_on_primary_sid,
+ construct_tag)
+ : primary_sid{sid}, dispatch_only_on_primary_sid{dispatch_only_on_primary_sid}
+{
+}
+
+ShardedServerSocket::~ShardedServerSocket()
+{
+ assert(!listener);
+ // detect whether user have called destroy() properly
+ ceph_assert_always(!service);
+}
+
+listen_ertr::future<>
+ShardedServerSocket::listen(entity_addr_t addr)
+{
+ ceph_assert_always(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::listen()...", addr);
+ return this->container().invoke_on_all([addr](auto& ss) {
+ ss.listen_addr = addr;
+ seastar::socket_address s_addr(addr.in4_addr());
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ if (ss.dispatch_only_on_primary_sid) {
+ lo.set_fixed_cpu(ss.primary_sid);
+ }
+ ss.listener = seastar::listen(s_addr, lo);
+ }).then([] {
+ return listen_ertr::now();
+ }).handle_exception_type(
+ [addr](const std::system_error& e) -> listen_ertr::future<> {
+ if (e.code() == std::errc::address_in_use) {
+ logger().debug("ShardedServerSocket({})::listen(): address in use", addr);
+ return crimson::ct_error::address_in_use::make();
+ } else if (e.code() == std::errc::address_not_available) {
+ logger().debug("ShardedServerSocket({})::listen(): address not available",
+ addr);
+ return crimson::ct_error::address_not_available::make();
+ }
+ logger().error("ShardedServerSocket({})::listen(): "
+ "got unexpeted error {}", addr, e.what());
+ ceph_abort();
+ });
+}
+
+seastar::future<>
+ShardedServerSocket::accept(accept_func_t &&_fn_accept)
+{
+ ceph_assert_always(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
+ return this->container().invoke_on_all([_fn_accept](auto &ss) {
+ assert(ss.listener);
+ ss.fn_accept = _fn_accept;
+ // gate accepting
+ // ShardedServerSocket::shutdown() will drain the continuations in the gate
+ // so ignore the returned future
+ std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
+ return seastar::keep_doing([&ss] {
+ return ss.listener->accept(
+ ).then([&ss](seastar::accept_result accept_result) {
+#ifndef NDEBUG
+ if (ss.dispatch_only_on_primary_sid) {
+ // see seastar::listen_options::set_fixed_cpu()
+ ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
+ }
+#endif
+ auto [socket, paddr] = std::move(accept_result);
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ peer_addr.set_type(ss.listen_addr.get_type());
+ SocketRef _socket = std::make_unique<Socket>(
+ std::move(socket), Socket::side_t::acceptor,
+ peer_addr.get_port(), Socket::construct_tag{});
+ logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, "
+ "socket {}, dispatch_only_on_primary_sid = {}",
+ ss.listen_addr, peer_addr, fmt::ptr(_socket),
+ ss.dispatch_only_on_primary_sid);
+ std::ignore = seastar::with_gate(
+ ss.shutdown_gate,
+ [socket=std::move(_socket), peer_addr, &ss]() mutable {
+ return ss.fn_accept(std::move(socket), peer_addr
+ ).handle_exception([&ss, peer_addr](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("ShardedServerSocket({})::accept(): "
+ "fn_accept(s, {}) got unexpected exception {}",
+ ss.listen_addr, peer_addr, e_what);
+ ceph_abort();
+ });
+ });
+ });
+ }).handle_exception_type([&ss](const std::system_error& e) {
+ if (e.code() == std::errc::connection_aborted ||
+ e.code() == std::errc::invalid_argument) {
+ logger().debug("ShardedServerSocket({})::accept(): stopped ({})",
+ ss.listen_addr, e.what());
+ } else {
+ throw;
+ }
+ }).handle_exception([&ss](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("ShardedServerSocket({})::accept(): "
+ "got unexpected exception {}", ss.listen_addr, e_what);
+ ceph_abort();
+ });
+ });
+ });
+}
+
+seastar::future<>
+ShardedServerSocket::shutdown_destroy()
+{
+ assert(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
+ // shutdown shards
+ return this->container().invoke_on_all([](auto& ss) {
+ if (ss.listener) {
+ ss.listener->abort_accept();
+ }
+ return ss.shutdown_gate.close();
+ }).then([this] {
+ // destroy shards
+ return this->container().invoke_on_all([](auto& ss) {
+ assert(ss.shutdown_gate.is_closed());
+ ss.listen_addr = entity_addr_t();
+ ss.listener.reset();
+ });
+ }).then([this] {
+ // stop the sharded service: we should only construct/stop shards on #0
+ return this->container().invoke_on(0, [](auto& ss) {
+ assert(ss.service);
+ return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
+ });
+ });
+}
+
+seastar::future<ShardedServerSocket*>
+ShardedServerSocket::create(bool dispatch_only_on_this_shard)
+{
+ auto primary_sid = seastar::this_shard_id();
+ // start the sharded service: we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [primary_sid, dispatch_only_on_this_shard] {
+ auto service = std::make_unique<sharded_service_t>();
+ return service->start(
+ primary_sid, dispatch_only_on_this_shard, construct_tag{}
+ ).then([service = std::move(service)]() mutable {
+ auto p_shard = service.get();
+ p_shard->local().service = std::move(service);
+ return p_shard;
+ });
+ }).then([](auto p_shard) {
+ return &p_shard->local();
+ });
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h
new file mode 100644
index 000000000..478f2d630
--- /dev/null
+++ b/src/crimson/net/Socket.h
@@ -0,0 +1,201 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+
+#include "include/buffer.h"
+
+#include "crimson/common/log.h"
+#include "Errors.h"
+#include "Fwd.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+namespace crimson::net {
+
+class Socket;
+using SocketRef = std::unique_ptr<Socket>;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
+
+class Socket {
+ struct construct_tag {};
+
+public:
+ // if acceptor side, peer is using a different port (ephemeral_port)
+ // if connector side, I'm using a different port (ephemeral_port)
+ enum class side_t {
+ acceptor,
+ connector
+ };
+ Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag);
+
+ ~Socket();
+
+ Socket(Socket&& o) = delete;
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+ side_t get_side() const {
+ return side;
+ }
+
+ uint16_t get_ephemeral_port() const {
+ return ephemeral_port;
+ }
+
+ seastar::socket_address get_local_address() const {
+ return socket.local_address();
+ }
+
+ bool is_shutdown() const {
+ assert(seastar::this_shard_id() == sid);
+ return socket_is_shutdown;
+ }
+
+ // learn my ephemeral_port as connector.
+ // unfortunately, there's no way to identify which port I'm using as
+ // connector with current seastar interface.
+ void learn_ephemeral_port_as_connector(uint16_t port) {
+ assert(side == side_t::connector &&
+ (ephemeral_port == 0 || ephemeral_port == port));
+ ephemeral_port = port;
+ }
+
+ /// read the requested number of bytes into a bufferlist
+ seastar::future<bufferlist> read(size_t bytes);
+
+ seastar::future<bufferptr> read_exactly(size_t bytes);
+
+ seastar::future<> write(bufferlist);
+
+ seastar::future<> flush();
+
+ seastar::future<> write_flush(bufferlist);
+
+ // preemptively disable further reads or writes, can only be shutdown once.
+ void shutdown();
+
+ /// Socket can only be closed once.
+ seastar::future<> close();
+
+ static seastar::future<SocketRef>
+ connect(const entity_addr_t& peer_addr);
+
+ /*
+ * test interfaces
+ */
+
+ // shutdown for tests
+ void force_shutdown() {
+ assert(seastar::this_shard_id() == sid);
+ socket.shutdown_input();
+ socket.shutdown_output();
+ }
+
+ // shutdown input_stream only, for tests
+ void force_shutdown_in() {
+ assert(seastar::this_shard_id() == sid);
+ socket.shutdown_input();
+ }
+
+ // shutdown output_stream only, for tests
+ void force_shutdown_out() {
+ assert(seastar::this_shard_id() == sid);
+ socket.shutdown_output();
+ }
+
+private:
+ const seastar::shard_id sid;
+ seastar::connected_socket socket;
+ seastar::input_stream<char> in;
+ seastar::output_stream<char> out;
+ bool socket_is_shutdown;
+ side_t side;
+ uint16_t ephemeral_port;
+
+#ifndef NDEBUG
+ bool closed = false;
+#endif
+
+ /// buffer state for read()
+ struct {
+ bufferlist buffer;
+ size_t remaining;
+ } r;
+
+#ifdef UNIT_TESTS_BUILT
+public:
+ void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
+
+private:
+ seastar::future<> try_trap_pre(bp_action_t& trap);
+
+ seastar::future<> try_trap_post(bp_action_t& trap);
+
+ bp_action_t next_trap_read = bp_action_t::CONTINUE;
+ bp_action_t next_trap_write = bp_action_t::CONTINUE;
+ socket_blocker* blocker = nullptr;
+
+#endif
+ friend class ShardedServerSocket;
+};
+
+using listen_ertr = crimson::errorator<
+ crimson::ct_error::address_in_use, // The address is already bound
+ crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
+ >;
+
+class ShardedServerSocket
+ : public seastar::peering_sharded_service<ShardedServerSocket> {
+ struct construct_tag {};
+
+public:
+ ShardedServerSocket(
+ seastar::shard_id sid,
+ bool dispatch_only_on_primary_sid,
+ construct_tag);
+
+ ~ShardedServerSocket();
+
+ ShardedServerSocket(ShardedServerSocket&&) = delete;
+ ShardedServerSocket(const ShardedServerSocket&) = delete;
+ ShardedServerSocket& operator=(ShardedServerSocket&&) = delete;
+ ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
+
+ bool is_fixed_shard_dispatching() const {
+ return dispatch_only_on_primary_sid;
+ }
+
+ listen_ertr::future<> listen(entity_addr_t addr);
+
+ using accept_func_t =
+ std::function<seastar::future<>(SocketRef, entity_addr_t)>;
+ seastar::future<> accept(accept_func_t &&_fn_accept);
+
+ seastar::future<> shutdown_destroy();
+
+ static seastar::future<ShardedServerSocket*> create(
+ bool dispatch_only_on_this_shard);
+
+private:
+ const seastar::shard_id primary_sid;
+ /// XXX: Remove once all infrastructure uses multi-core messenger
+ const bool dispatch_only_on_primary_sid;
+ entity_addr_t listen_addr;
+ std::optional<seastar::server_socket> listener;
+ seastar::gate shutdown_gate;
+ accept_func_t fn_accept;
+
+ using sharded_service_t = seastar::sharded<ShardedServerSocket>;
+ std::unique_ptr<sharded_service_t> service;
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc
new file mode 100644
index 000000000..57e5c12c1
--- /dev/null
+++ b/src/crimson/net/SocketConnection.cc
@@ -0,0 +1,220 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "SocketConnection.h"
+
+#include "ProtocolV2.h"
+#include "SocketMessenger.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+using std::ostream;
+using crimson::common::local_conf;
+
+namespace crimson::net {
+
+SocketConnection::SocketConnection(SocketMessenger& messenger,
+ ChainedDispatchers& dispatchers)
+ : msgr_sid{messenger.get_shard_id()}, messenger(messenger)
+{
+ auto ret = create_handlers(dispatchers, *this);
+ io_handler = std::move(ret.io_handler);
+ protocol = std::move(ret.protocol);
+#ifdef UNIT_TESTS_BUILT
+ if (messenger.interceptor) {
+ interceptor = messenger.interceptor;
+ interceptor->register_conn(this->get_local_shared_foreign_from_this());
+ }
+#endif
+}
+
+SocketConnection::~SocketConnection() {}
+
+bool SocketConnection::is_connected() const
+{
+ return io_handler->is_connected();
+}
+
+#ifdef UNIT_TESTS_BUILT
+bool SocketConnection::is_protocol_ready() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return protocol->is_ready();
+}
+
+bool SocketConnection::is_protocol_standby() const {
+ assert(seastar::this_shard_id() == msgr_sid);
+ return protocol->is_standby();
+}
+
+bool SocketConnection::is_protocol_closed() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return protocol->is_closed();
+}
+
+bool SocketConnection::is_protocol_closed_clean() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return protocol->is_closed_clean();
+}
+
+#endif
+bool SocketConnection::peer_wins() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return (messenger.get_myaddr() > peer_addr || policy.server);
+}
+
+seastar::future<> SocketConnection::send(MessageURef _msg)
+{
+ // may be invoked from any core
+ MessageFRef msg = seastar::make_foreign(std::move(_msg));
+ return io_handler->send(std::move(msg));
+}
+
+seastar::future<> SocketConnection::send_keepalive()
+{
+ // may be invoked from any core
+ return io_handler->send_keepalive();
+}
+
+SocketConnection::clock_t::time_point
+SocketConnection::get_last_keepalive() const
+{
+ return io_handler->get_last_keepalive();
+}
+
+SocketConnection::clock_t::time_point
+SocketConnection::get_last_keepalive_ack() const
+{
+ return io_handler->get_last_keepalive_ack();
+}
+
+void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
+{
+ io_handler->set_last_keepalive_ack(when);
+}
+
+void SocketConnection::mark_down()
+{
+ io_handler->mark_down();
+}
+
+void
+SocketConnection::start_connect(const entity_addr_t& _peer_addr,
+ const entity_name_t& _peer_name)
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ protocol->start_connect(_peer_addr, _peer_name);
+}
+
+void
+SocketConnection::start_accept(SocketFRef&& sock,
+ const entity_addr_t& _peer_addr)
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ protocol->start_accept(std::move(sock), _peer_addr);
+}
+
+seastar::future<>
+SocketConnection::close_clean_yielded()
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return protocol->close_clean_yielded();
+}
+
+seastar::socket_address SocketConnection::get_local_address() const {
+ assert(seastar::this_shard_id() == msgr_sid);
+ return socket->get_local_address();
+}
+
+ConnectionRef
+SocketConnection::get_local_shared_foreign_from_this()
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return make_local_shared_foreign(
+ seastar::make_foreign(shared_from_this()));
+}
+
+SocketMessenger &
+SocketConnection::get_messenger() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return messenger;
+}
+
+seastar::shard_id
+SocketConnection::get_messenger_shard_id() const
+{
+ return msgr_sid;
+}
+
+void SocketConnection::set_peer_type(entity_type_t peer_type) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_type == 0 &&
+ peer_name.type() != 0));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_type != 0 &&
+ peer_name.type() != 0 &&
+ peer_type != peer_name.type()));
+ peer_name._type = peer_type;
+}
+
+void SocketConnection::set_peer_id(int64_t peer_id) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_id == entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_id != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ peer_id != peer_name.num()));
+ peer_name._num = peer_id;
+}
+
+void SocketConnection::set_features(uint64_t f) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ features = f;
+}
+
+void SocketConnection::set_socket(Socket *s) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ socket = s;
+}
+
+void SocketConnection::print(ostream& out) const {
+ out << (void*)this << " ";
+ messenger.print(out);
+ if (seastar::this_shard_id() != msgr_sid) {
+ out << " >> " << get_peer_name() << " " << peer_addr;
+ } else if (!socket) {
+ out << " >> " << get_peer_name() << " " << peer_addr;
+ } else if (socket->get_side() == Socket::side_t::acceptor) {
+ out << " >> " << get_peer_name() << " " << peer_addr
+ << "@" << socket->get_ephemeral_port();
+ } else { // socket->get_side() == Socket::side_t::connector
+ out << "@" << socket->get_ephemeral_port()
+ << " >> " << get_peer_name() << " " << peer_addr;
+ }
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
new file mode 100644
index 000000000..823d6c574
--- /dev/null
+++ b/src/crimson/net/SocketConnection.h
@@ -0,0 +1,236 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/sharded.hh>
+
+#include "msg/Policy.h"
+#include "crimson/common/throttle.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Socket.h"
+
+namespace crimson::net {
+
+class ProtocolV2;
+class SocketMessenger;
+class SocketConnection;
+using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
+
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
+/**
+ * ConnectionHandler
+ *
+ * The interface class to implement Connection, called by SocketConnection.
+ *
+ * The operations must be done in get_shard_id().
+ */
+class ConnectionHandler {
+public:
+ using clock_t = seastar::lowres_system_clock;
+
+ virtual ~ConnectionHandler() = default;
+
+ ConnectionHandler(const ConnectionHandler &) = delete;
+ ConnectionHandler(ConnectionHandler &&) = delete;
+ ConnectionHandler &operator=(const ConnectionHandler &) = delete;
+ ConnectionHandler &operator=(ConnectionHandler &&) = delete;
+
+ virtual seastar::shard_id get_shard_id() const = 0;
+
+ virtual bool is_connected() const = 0;
+
+ virtual seastar::future<> send(MessageFRef) = 0;
+
+ virtual seastar::future<> send_keepalive() = 0;
+
+ virtual clock_t::time_point get_last_keepalive() const = 0;
+
+ virtual clock_t::time_point get_last_keepalive_ack() const = 0;
+
+ virtual void set_last_keepalive_ack(clock_t::time_point) = 0;
+
+ virtual void mark_down() = 0;
+
+protected:
+ ConnectionHandler() = default;
+};
+
+class SocketConnection : public Connection {
+ /*
+ * Connection interfaces, public to users
+ * Working in ConnectionHandler::get_shard_id()
+ */
+ public:
+ SocketConnection(SocketMessenger& messenger,
+ ChainedDispatchers& dispatchers);
+
+ ~SocketConnection() override;
+
+ const seastar::shard_id get_shard_id() const override {
+ return io_handler->get_shard_id();
+ }
+
+ const entity_name_t &get_peer_name() const override {
+ return peer_name;
+ }
+
+ const entity_addr_t &get_peer_addr() const override {
+ return peer_addr;
+ }
+
+ const entity_addr_t &get_peer_socket_addr() const override {
+ return target_addr;
+ }
+
+ uint64_t get_features() const override {
+ return features;
+ }
+
+ bool is_connected() const override;
+
+ seastar::future<> send(MessageURef msg) override;
+
+ seastar::future<> send_keepalive() override;
+
+ clock_t::time_point get_last_keepalive() const override;
+
+ clock_t::time_point get_last_keepalive_ack() const override;
+
+ void set_last_keepalive_ack(clock_t::time_point when) override;
+
+ void mark_down() override;
+
+ bool has_user_private() const override {
+ return user_private != nullptr;
+ }
+
+ user_private_t &get_user_private() override {
+ assert(has_user_private());
+ return *user_private;
+ }
+
+ void set_user_private(std::unique_ptr<user_private_t> new_user_private) override {
+ assert(!has_user_private());
+ user_private = std::move(new_user_private);
+ }
+
+ void print(std::ostream& out) const override;
+
+ /*
+ * Public to SocketMessenger
+ * Working in SocketMessenger::get_shard_id();
+ */
+ public:
+ /// start a handshake from the client's perspective,
+ /// only call when SocketConnection first construct
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name);
+
+ /// start a handshake from the server's perspective,
+ /// only call when SocketConnection first construct
+ void start_accept(SocketFRef&& socket,
+ const entity_addr_t& peer_addr);
+
+ seastar::future<> close_clean_yielded();
+
+ seastar::socket_address get_local_address() const;
+
+ seastar::shard_id get_messenger_shard_id() const;
+
+ SocketMessenger &get_messenger() const;
+
+ ConnectionRef get_local_shared_foreign_from_this();
+
+private:
+ void set_peer_type(entity_type_t peer_type);
+
+ void set_peer_id(int64_t peer_id);
+
+ void set_peer_name(entity_name_t name) {
+ set_peer_type(name.type());
+ set_peer_id(name.num());
+ }
+
+ void set_features(uint64_t f);
+
+ void set_socket(Socket *s);
+
+#ifdef UNIT_TESTS_BUILT
+ bool is_protocol_ready() const override;
+
+ bool is_protocol_standby() const override;
+
+ bool is_protocol_closed_clean() const override;
+
+ bool is_protocol_closed() const override;
+
+ // peer wins if myaddr > peeraddr
+ bool peer_wins() const override;
+
+ Interceptor *interceptor = nullptr;
+#else
+ // peer wins if myaddr > peeraddr
+ bool peer_wins() const;
+#endif
+
+private:
+ const seastar::shard_id msgr_sid;
+
+ /*
+ * Core owner is messenger core, may allow to access from the I/O core.
+ */
+ SocketMessenger& messenger;
+
+ std::unique_ptr<ProtocolV2> protocol;
+
+ Socket *socket = nullptr;
+
+ entity_name_t peer_name = {0, entity_name_t::NEW};
+
+ entity_addr_t peer_addr;
+
+ // which of the peer_addrs we're connecting to (as client)
+ // or should reconnect to (as peer)
+ entity_addr_t target_addr;
+
+ uint64_t features = 0;
+
+ ceph::net::Policy<crimson::common::Throttle> policy;
+
+ uint64_t peer_global_id = 0;
+
+ /*
+ * Core owner is I/O core (mutable).
+ */
+ std::unique_ptr<ConnectionHandler> io_handler;
+
+ /*
+ * Core owner is up to the connection user.
+ */
+ std::unique_ptr<user_private_t> user_private;
+
+ friend class IOHandler;
+ friend class ProtocolV2;
+ friend class FrameAssemblerV2;
+};
+
+} // namespace crimson::net
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::SocketConnection> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc
new file mode 100644
index 000000000..382d08f98
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.cc
@@ -0,0 +1,485 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "SocketMessenger.h"
+
+#include <seastar/core/sleep.hh>
+
+#include <tuple>
+#include <boost/functional/hash.hpp>
+#include <fmt/os.h>
+
+#include "auth/Auth.h"
+#include "Errors.h"
+#include "Socket.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+}
+
+namespace crimson::net {
+
+SocketMessenger::SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce,
+ bool dispatch_only_on_this_shard)
+ : sid{seastar::this_shard_id()},
+ logic_name{logic_name},
+ nonce{nonce},
+ dispatch_only_on_sid{dispatch_only_on_this_shard},
+ my_name{myname}
+{}
+
+SocketMessenger::~SocketMessenger()
+{
+ logger().debug("~SocketMessenger: {}", logic_name);
+ ceph_assert_always(seastar::this_shard_id() == sid);
+ ceph_assert(!listener);
+}
+
+bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
+{
+ assert(seastar::this_shard_id() == sid);
+ bool ret = false;
+
+ entity_addrvec_t newaddrs = my_addrs;
+ for (auto& a : newaddrs.v) {
+ if (a.is_blank_ip()) {
+ int type = a.get_type();
+ int port = a.get_port();
+ uint32_t nonce = a.get_nonce();
+ for (auto& b : addrs.v) {
+ if (a.get_family() == b.get_family()) {
+ logger().debug(" assuming my addr {} matches provided addr {}", a, b);
+ a = b;
+ a.set_nonce(nonce);
+ a.set_type(type);
+ a.set_port(port);
+ ret = true;
+ break;
+ }
+ }
+ }
+ }
+ my_addrs = newaddrs;
+ return ret;
+}
+
+void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
+{
+ assert(seastar::this_shard_id() == sid);
+ my_addrs = addrs;
+ for (auto& addr : my_addrs.v) {
+ addr.nonce = nonce;
+ }
+}
+
+crimson::net::listen_ertr::future<>
+SocketMessenger::do_listen(const entity_addrvec_t& addrs)
+{
+ ceph_assert(addrs.front().get_family() == AF_INET);
+ set_myaddrs(addrs);
+ return seastar::futurize_invoke([this] {
+ if (!listener) {
+ return ShardedServerSocket::create(dispatch_only_on_sid
+ ).then([this] (auto _listener) {
+ listener = _listener;
+ });
+ } else {
+ return seastar::now();
+ }
+ }).then([this] () -> listen_ertr::future<> {
+ const entity_addr_t listen_addr = get_myaddr();
+ logger().debug("{} do_listen: try listen {}...", *this, listen_addr);
+ if (!listener) {
+ logger().warn("{} do_listen: listener doesn't exist", *this);
+ return listen_ertr::now();
+ }
+ return listener->listen(listen_addr);
+ });
+}
+
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::try_bind(const entity_addrvec_t& addrs,
+ uint32_t min_port, uint32_t max_port)
+{
+ // the classical OSD iterates over the addrvec and tries to listen on each
+ // addr. crimson doesn't need to follow as there is a consensus we need to
+ // worry only about proto v2.
+ assert(addrs.size() == 1);
+ auto addr = addrs.msgr2_addr();
+ if (addr.get_port() != 0) {
+ return do_listen(addrs).safe_then([this] {
+ logger().info("{} try_bind: done", *this);
+ });
+ }
+ ceph_assert(min_port <= max_port);
+ return seastar::do_with(uint32_t(min_port),
+ [this, max_port, addr] (auto& port) {
+ return seastar::repeat_until_value([this, max_port, addr, &port] {
+ auto to_bind = addr;
+ to_bind.set_port(port);
+ return do_listen(entity_addrvec_t{to_bind}
+ ).safe_then([this] () -> seastar::future<std::optional<std::error_code>> {
+ logger().info("{} try_bind: done", *this);
+ return seastar::make_ready_future<std::optional<std::error_code>>(
+ std::make_optional<std::error_code>(std::error_code{/* success! */}));
+ }, listen_ertr::all_same_way([this, max_port, &port]
+ (const std::error_code& e) mutable
+ -> seastar::future<std::optional<std::error_code>> {
+ logger().trace("{} try_bind: {} got error {}", *this, port, e);
+ if (port == max_port) {
+ return seastar::make_ready_future<std::optional<std::error_code>>(
+ std::make_optional<std::error_code>(e));
+ }
+ ++port;
+ return seastar::make_ready_future<std::optional<std::error_code>>(
+ std::optional<std::error_code>{std::nullopt});
+ }));
+ }).then([] (const std::error_code e) -> bind_ertr::future<> {
+ if (!e) {
+ return bind_ertr::now(); // success!
+ } else if (e == std::errc::address_in_use) {
+ return crimson::ct_error::address_in_use::make();
+ } else if (e == std::errc::address_not_available) {
+ return crimson::ct_error::address_not_available::make();
+ }
+ ceph_abort();
+ });
+ });
+}
+
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::bind(const entity_addrvec_t& addrs)
+{
+ assert(seastar::this_shard_id() == sid);
+ using crimson::common::local_conf;
+ return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
+ [this, addrs] (auto& count) {
+ return seastar::repeat_until_value([this, addrs, &count] {
+ assert(count >= 0);
+ return try_bind(addrs,
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .safe_then([this] {
+ logger().info("{} try_bind: done", *this);
+ return seastar::make_ready_future<std::optional<std::error_code>>(
+ std::make_optional<std::error_code>(std::error_code{/* success! */}));
+ }, bind_ertr::all_same_way([this, &count] (const std::error_code error) {
+ if (count-- > 0) {
+ logger().info("{} was unable to bind. Trying again in {} seconds",
+ *this, local_conf()->ms_bind_retry_delay);
+ return seastar::sleep(
+ std::chrono::seconds(local_conf()->ms_bind_retry_delay)
+ ).then([] {
+ // one more time, please
+ return seastar::make_ready_future<std::optional<std::error_code>>(
+ std::optional<std::error_code>{std::nullopt});
+ });
+ } else {
+ logger().info("{} was unable to bind after {} attempts: {}",
+ *this, local_conf()->ms_bind_retry_count, error);
+ return seastar::make_ready_future<std::optional<std::error_code>>(
+ std::make_optional<std::error_code>(error));
+ }
+ }));
+ }).then([] (const std::error_code error) -> bind_ertr::future<> {
+ if (!error) {
+ return bind_ertr::now(); // success!
+ } else if (error == std::errc::address_in_use) {
+ return crimson::ct_error::address_in_use::make();
+ } else if (error == std::errc::address_not_available) {
+ return crimson::ct_error::address_not_available::make();
+ }
+ ceph_abort();
+ });
+ });
+}
+
+seastar::future<> SocketMessenger::accept(
+ SocketFRef &&socket, const entity_addr_t &peer_addr)
+{
+ assert(seastar::this_shard_id() == sid);
+ SocketConnectionRef conn =
+ seastar::make_shared<SocketConnection>(*this, dispatchers);
+ conn->start_accept(std::move(socket), peer_addr);
+ return seastar::now();
+}
+
+seastar::future<> SocketMessenger::start(
+ const dispatchers_t& _dispatchers) {
+ assert(seastar::this_shard_id() == sid);
+
+ dispatchers.assign(_dispatchers);
+ if (listener) {
+ // make sure we have already bound to a valid address
+ ceph_assert(get_myaddr().is_msgr2());
+ ceph_assert(get_myaddr().get_port() > 0);
+
+ return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) {
+ assert(get_myaddr().is_msgr2());
+ SocketFRef socket = seastar::make_foreign(std::move(_socket));
+ if (listener->is_fixed_shard_dispatching()) {
+ return accept(std::move(socket), peer_addr);
+ } else {
+ return seastar::smp::submit_to(sid,
+ [this, peer_addr, socket = std::move(socket)]() mutable {
+ return accept(std::move(socket), peer_addr);
+ });
+ }
+ });
+ }
+ return seastar::now();
+}
+
+crimson::net::ConnectionRef
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
+{
+ assert(seastar::this_shard_id() == sid);
+
+ // make sure we connect to a valid peer_addr
+ if (!peer_addr.is_msgr2()) {
+ ceph_abort_msg("ProtocolV1 is no longer supported");
+ }
+ ceph_assert(peer_addr.get_port() > 0);
+
+ if (auto found = lookup_conn(peer_addr); found) {
+ logger().debug("{} connect to existing", *found);
+ return found->get_local_shared_foreign_from_this();
+ }
+ SocketConnectionRef conn =
+ seastar::make_shared<SocketConnection>(*this, dispatchers);
+ conn->start_connect(peer_addr, peer_name);
+ return conn->get_local_shared_foreign_from_this();
+}
+
+seastar::future<> SocketMessenger::shutdown()
+{
+ assert(seastar::this_shard_id() == sid);
+ return seastar::futurize_invoke([this] {
+ assert(dispatchers.empty());
+ if (listener) {
+ auto d_listener = listener;
+ listener = nullptr;
+ return d_listener->shutdown_destroy();
+ } else {
+ return seastar::now();
+ }
+ // close all connections
+ }).then([this] {
+ return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
+ return conn->close_clean_yielded();
+ });
+ }).then([this] {
+ ceph_assert(accepting_conns.empty());
+ return seastar::parallel_for_each(connections, [] (auto conn) {
+ return conn.second->close_clean_yielded();
+ });
+ }).then([this] {
+ return seastar::parallel_for_each(closing_conns, [] (auto conn) {
+ return conn->close_clean_yielded();
+ });
+ }).then([this] {
+ ceph_assert(connections.empty());
+ shutdown_promise.set_value();
+ });
+}
+
+static entity_addr_t choose_addr(
+ const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn)
+{
+ using crimson::common::local_conf;
+ // XXX: a syscall is here
+ if (const auto local_addr = conn.get_local_address();
+ local_conf()->ms_learn_addr_from_peer) {
+ logger().info("{} peer {} says I am {} (socket says {})",
+ conn, conn.get_peer_socket_addr(), peer_addr_for_me,
+ local_addr);
+ return peer_addr_for_me;
+ } else {
+ const auto local_addr_for_me = conn.get_local_address();
+ logger().info("{} socket to {} says I am {} (peer says {})",
+ conn, conn.get_peer_socket_addr(),
+ local_addr, peer_addr_for_me);
+ entity_addr_t addr;
+ addr.set_sockaddr(&local_addr_for_me.as_posix_sockaddr());
+ return addr;
+ }
+}
+
+void SocketMessenger::learned_addr(
+ const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ if (!need_addr) {
+ if ((!get_myaddr().is_any() &&
+ get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
+ get_myaddr().get_family() != peer_addr_for_me.get_family() ||
+ !get_myaddr().is_same_host(peer_addr_for_me)) {
+ logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ return;
+ }
+
+ if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
+ // Not bound
+ auto addr = choose_addr(peer_addr_for_me, conn);
+ addr.set_type(entity_addr_t::TYPE_ANY);
+ addr.set_port(0);
+ need_addr = false;
+ set_myaddrs(entity_addrvec_t{addr});
+ logger().info("{} learned myaddr={} (unbound)", conn, get_myaddr());
+ } else {
+ // Already bound
+ if (!get_myaddr().is_any() &&
+ get_myaddr().get_type() != peer_addr_for_me.get_type()) {
+ logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (get_myaddr().get_family() != peer_addr_for_me.get_family()) {
+ logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (get_myaddr().is_blank_ip()) {
+ auto addr = choose_addr(peer_addr_for_me, conn);
+ addr.set_type(get_myaddr().get_type());
+ addr.set_port(get_myaddr().get_port());
+ need_addr = false;
+ set_myaddrs(entity_addrvec_t{addr});
+ logger().info("{} learned myaddr={} (blank IP)", conn, get_myaddr());
+ } else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
+ logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ } else {
+ need_addr = false;
+ }
+ }
+}
+
+SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
+{
+ assert(seastar::this_shard_id() == sid);
+ return policy_set.get(peer_type);
+}
+
+SocketPolicy SocketMessenger::get_default_policy() const
+{
+ assert(seastar::this_shard_id() == sid);
+ return policy_set.get_default();
+}
+
+void SocketMessenger::set_default_policy(const SocketPolicy& p)
+{
+ assert(seastar::this_shard_id() == sid);
+ policy_set.set_default(p);
+}
+
+void SocketMessenger::set_policy(entity_type_t peer_type,
+ const SocketPolicy& p)
+{
+ assert(seastar::this_shard_id() == sid);
+ policy_set.set(peer_type, p);
+}
+
+void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
+ Throttle* throttle)
+{
+ assert(seastar::this_shard_id() == sid);
+ // only byte throttler is used in OSD
+ policy_set.set_throttlers(peer_type, throttle, nullptr);
+}
+
+crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
+{
+ assert(seastar::this_shard_id() == sid);
+ if (auto found = connections.find(addr);
+ found != connections.end()) {
+ return found->second;
+ } else {
+ return nullptr;
+ }
+}
+
+void SocketMessenger::accept_conn(SocketConnectionRef conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ accepting_conns.insert(conn);
+}
+
+void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ accepting_conns.erase(conn);
+}
+
+void SocketMessenger::register_conn(SocketConnectionRef conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
+ std::ignore = i;
+ ceph_assert(added);
+}
+
+void SocketMessenger::unregister_conn(SocketConnectionRef conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert(conn);
+ auto found = connections.find(conn->get_peer_addr());
+ ceph_assert(found != connections.end());
+ ceph_assert(found->second == conn);
+ connections.erase(found);
+}
+
+void SocketMessenger::closing_conn(SocketConnectionRef conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ closing_conns.push_back(conn);
+}
+
+void SocketMessenger::closed_conn(SocketConnectionRef conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ for (auto it = closing_conns.begin();
+ it != closing_conns.end();) {
+ if (*it == conn) {
+ it = closing_conns.erase(it);
+ } else {
+ it++;
+ }
+ }
+}
+
+uint32_t SocketMessenger::get_global_seq(uint32_t old)
+{
+ assert(seastar::this_shard_id() == sid);
+ if (old > global_seq) {
+ global_seq = old;
+ }
+ return ++global_seq;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h
new file mode 100644
index 000000000..e4ac63184
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.h
@@ -0,0 +1,192 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <map>
+#include <set>
+#include <vector>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "crimson/net/chained_dispatchers.h"
+#include "Messenger.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+
+namespace crimson::net {
+
+class ShardedServerSocket;
+
+class SocketMessenger final : public Messenger {
+// Messenger public interfaces
+public:
+ SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce,
+ bool dispatch_only_on_this_shard);
+
+ ~SocketMessenger() override;
+
+ const entity_name_t &get_myname() const override {
+ return my_name;
+ }
+
+ const entity_addrvec_t &get_myaddrs() const override {
+ return my_addrs;
+ }
+
+ void set_myaddrs(const entity_addrvec_t& addr) override;
+
+ bool set_addr_unknowns(const entity_addrvec_t &addr) override;
+
+ void set_auth_client(crimson::auth::AuthClient *ac) override {
+ assert(seastar::this_shard_id() == sid);
+ auth_client = ac;
+ }
+
+ void set_auth_server(crimson::auth::AuthServer *as) override {
+ assert(seastar::this_shard_id() == sid);
+ auth_server = as;
+ }
+
+ bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
+
+ seastar::future<> start(const dispatchers_t& dispatchers) override;
+
+ ConnectionRef connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) override;
+
+ bool owns_connection(Connection &conn) const override {
+ assert(seastar::this_shard_id() == sid);
+ return this == &static_cast<SocketConnection&>(conn).get_messenger();
+ }
+
+ // can only wait once
+ seastar::future<> wait() override {
+ assert(seastar::this_shard_id() == sid);
+ return shutdown_promise.get_future();
+ }
+
+ void stop() override {
+ assert(seastar::this_shard_id() == sid);
+ dispatchers.clear();
+ }
+
+ bool is_started() const override {
+ assert(seastar::this_shard_id() == sid);
+ return !dispatchers.empty();
+ }
+
+ seastar::future<> shutdown() override;
+
+ void print(std::ostream& out) const override {
+ out << get_myname()
+ << "(" << logic_name
+ << ") " << get_myaddr();
+ }
+
+ SocketPolicy get_policy(entity_type_t peer_type) const override;
+
+ SocketPolicy get_default_policy() const override;
+
+ void set_default_policy(const SocketPolicy& p) override;
+
+ void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
+
+ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
+
+// SocketMessenger public interfaces
+public:
+ crimson::auth::AuthClient* get_auth_client() const {
+ assert(seastar::this_shard_id() == sid);
+ return auth_client;
+ }
+
+ crimson::auth::AuthServer* get_auth_server() const {
+ assert(seastar::this_shard_id() == sid);
+ return auth_server;
+ }
+
+ uint32_t get_global_seq(uint32_t old=0);
+
+ void learned_addr(const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn);
+
+ SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+
+ void accept_conn(SocketConnectionRef);
+
+ void unaccept_conn(SocketConnectionRef);
+
+ void register_conn(SocketConnectionRef);
+
+ void unregister_conn(SocketConnectionRef);
+
+ void closing_conn(SocketConnectionRef);
+
+ void closed_conn(SocketConnectionRef);
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+#ifdef UNIT_TESTS_BUILT
+ void set_interceptor(Interceptor *i) override {
+ interceptor = i;
+ }
+
+ Interceptor *interceptor = nullptr;
+#endif
+
+private:
+ seastar::future<> accept(SocketFRef &&, const entity_addr_t &);
+
+ listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
+
+ /// try to bind to the first unused port of given address
+ bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port);
+
+ const seastar::shard_id sid;
+ // Distinguish messengers with meaningful names for debugging
+ const std::string logic_name;
+ const uint32_t nonce;
+ const bool dispatch_only_on_sid;
+
+ entity_name_t my_name;
+ entity_addrvec_t my_addrs;
+ crimson::auth::AuthClient* auth_client = nullptr;
+ crimson::auth::AuthServer* auth_server = nullptr;
+
+ ShardedServerSocket *listener = nullptr;
+ ChainedDispatchers dispatchers;
+ std::map<entity_addr_t, SocketConnectionRef> connections;
+ std::set<SocketConnectionRef> accepting_conns;
+ std::vector<SocketConnectionRef> closing_conns;
+ ceph::net::PolicySet<Throttle> policy_set;
+ // specifying we haven't learned our addr; set false when we find it.
+ bool need_addr = true;
+ uint32_t global_seq = 0;
+ bool started = false;
+ seastar::promise<> shutdown_promise;
+};
+
+} // namespace crimson::net
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::SocketMessenger> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc
new file mode 100644
index 000000000..1e4af3baa
--- /dev/null
+++ b/src/crimson/net/chained_dispatchers.cc
@@ -0,0 +1,114 @@
+#include "crimson/common/log.h"
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "msg/Message.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+}
+
+namespace crimson::net {
+
+seastar::future<>
+ChainedDispatchers::ms_dispatch(ConnectionRef conn,
+ MessageRef m) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ auto dispatched = dispatcher->ms_dispatch(conn, m);
+ if (dispatched.has_value()) {
+ return std::move(*dispatched
+ ).handle_exception([conn] (std::exception_ptr eptr) {
+ logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
+ *conn, eptr);
+ ceph_abort();
+ });
+ }
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_dispatch() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+ if (!dispatchers.empty()) {
+ logger().error("ms_dispatch unhandled message {}", *m);
+ }
+ return seastar::now();
+}
+
+void
+ChainedDispatchers::ms_handle_shard_change(
+ ConnectionRef conn,
+ seastar::shard_id new_shard,
+ bool ac) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_shard_change(conn, new_shard, ac);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_shard_change() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_accept(conn, prv_shard, is_replace);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_accept() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_connect(
+ ConnectionRef conn,
+ seastar::shard_id prv_shard) {
+ try {
+ for(auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_connect(conn, prv_shard);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_connect() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_reset(ConnectionRef conn, bool is_replace) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_reset(conn, is_replace);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_reset() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_remote_reset(ConnectionRef conn) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_remote_reset(conn);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_remote_reset() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+}
diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h
new file mode 100644
index 000000000..ec085864f
--- /dev/null
+++ b/src/crimson/net/chained_dispatchers.h
@@ -0,0 +1,39 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/smp.hh>
+
+#include "Fwd.h"
+#include "crimson/common/log.h"
+
+namespace crimson::net {
+
+class Dispatcher;
+
+class ChainedDispatchers {
+public:
+ void assign(const dispatchers_t& _dispatchers) {
+ assert(empty());
+ assert(!_dispatchers.empty());
+ dispatchers = _dispatchers;
+ }
+ void clear() {
+ dispatchers.clear();
+ }
+ bool empty() const {
+ return dispatchers.empty();
+ }
+ seastar::future<> ms_dispatch(ConnectionRef, MessageRef);
+ void ms_handle_shard_change(ConnectionRef, seastar::shard_id, bool);
+ void ms_handle_accept(ConnectionRef conn, seastar::shard_id, bool is_replace);
+ void ms_handle_connect(ConnectionRef conn, seastar::shard_id);
+ void ms_handle_reset(ConnectionRef conn, bool is_replace);
+ void ms_handle_remote_reset(ConnectionRef conn);
+
+ private:
+ dispatchers_t dispatchers;
+};
+
+}
diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc
new file mode 100644
index 000000000..c414c48e1
--- /dev/null
+++ b/src/crimson/net/io_handler.cc
@@ -0,0 +1,1287 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "io_handler.h"
+
+#include "auth/Auth.h"
+
+#include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/SocketMessenger.h"
+#include "msg/Message.h"
+#include "msg/msg_fmt.h"
+
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+[[noreturn]] void abort_in_fault() {
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+ throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
+{
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
+ }
+ return sum;
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+IOHandler::IOHandler(ChainedDispatchers &dispatchers,
+ SocketConnection &conn)
+ : shard_states(shard_states_t::create(
+ seastar::this_shard_id(), io_state_t::none)),
+ dispatchers(dispatchers),
+ conn(conn),
+ conn_ref(conn.get_local_shared_foreign_from_this())
+{}
+
+IOHandler::~IOHandler()
+{
+ // close_io() must be finished
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ // should be true in the according shard
+ // ceph_assert_always(shard_states->assert_closed_and_exit());
+ assert(!conn_ref);
+}
+
+#ifdef UNIT_TESTS_BUILT
+IOHandler::sweep_ret
+#else
+ceph::bufferlist
+#endif
+IOHandler::sweep_out_pending_msgs_to_sent(
+ bool require_keepalive,
+ std::optional<utime_t> maybe_keepalive_ack,
+ bool require_ack)
+{
+ std::size_t num_msgs = out_pending_msgs.size();
+ ceph::bufferlist bl;
+
+#ifdef UNIT_TESTS_BUILT
+ std::vector<Tag> tags;
+#endif
+
+ if (unlikely(require_keepalive)) {
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ bl.append(frame_assembler->get_buffer(keepalive_frame));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = KeepAliveFrame::tag;
+ tags.push_back(tag);
+#endif
+ }
+
+ if (unlikely(maybe_keepalive_ack.has_value())) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
+ bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = KeepAliveFrameAck::tag;
+ tags.push_back(tag);
+#endif
+ }
+
+ if (require_ack && num_msgs == 0u) {
+ auto ack_frame = AckFrame::Encode(in_seq);
+ bl.append(frame_assembler->get_buffer(ack_frame));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = AckFrame::tag;
+ tags.push_back(tag);
+#endif
+ }
+
+ std::for_each(
+ out_pending_msgs.begin(),
+ out_pending_msgs.begin()+num_msgs,
+ [this, &bl
+#ifdef UNIT_TESTS_BUILT
+ , &tags
+#endif
+ ](const MessageFRef& msg) {
+ // set priority
+ msg->get_header().src = conn.messenger.get_myname();
+
+ msg->encode(conn.features, 0);
+
+ ceph_assert(!msg->get_seq() && "message already has seq");
+ msg->set_seq(++out_seq);
+
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ ceph_le32(0), header.data_off,
+ ceph_le64(in_seq),
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto message = MessageFrame::Encode(header2,
+ msg->get_payload(), msg->get_middle(), msg->get_data());
+ logger().debug("{} --> #{} === {} ({})",
+ conn, msg->get_seq(), *msg, msg->get_type());
+ bl.append(frame_assembler->get_buffer(message));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = MessageFrame::tag;
+ tags.push_back(tag);
+#endif
+ });
+
+ if (!conn.policy.lossy) {
+ out_sent_msgs.insert(
+ out_sent_msgs.end(),
+ std::make_move_iterator(out_pending_msgs.begin()),
+ std::make_move_iterator(out_pending_msgs.end()));
+ }
+ out_pending_msgs.clear();
+
+#ifdef UNIT_TESTS_BUILT
+ return sweep_ret{std::move(bl), tags};
+#else
+ return bl;
+#endif
+}
+
+seastar::future<> IOHandler::send(MessageFRef msg)
+{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send(std::move(msg));
+ } else {
+ logger().trace("{} send() is directed to {} -- {}",
+ conn, get_shard_id(), *msg);
+ return seastar::smp::submit_to(
+ get_shard_id(), [this, msg=std::move(msg)]() mutable {
+ return send_redirected(std::move(msg));
+ });
+ }
+}
+
+seastar::future<> IOHandler::send_redirected(MessageFRef msg)
+{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send(std::move(msg));
+ } else {
+ logger().debug("{} send() is redirected to {} -- {}",
+ conn, get_shard_id(), *msg);
+ return seastar::smp::submit_to(
+ get_shard_id(), [this, msg=std::move(msg)]() mutable {
+ return send_redirected(std::move(msg));
+ });
+ }
+}
+
+seastar::future<> IOHandler::do_send(MessageFRef msg)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ logger().trace("{} do_send() got message -- {}", conn, *msg);
+ if (get_io_state() != io_state_t::drop) {
+ out_pending_msgs.push_back(std::move(msg));
+ notify_out_dispatch();
+ }
+ return seastar::now();
+}
+
+seastar::future<> IOHandler::send_keepalive()
+{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send_keepalive();
+ } else {
+ logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
+ return seastar::smp::submit_to(
+ get_shard_id(), [this] {
+ return send_keepalive_redirected();
+ });
+ }
+}
+
+seastar::future<> IOHandler::send_keepalive_redirected()
+{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send_keepalive();
+ } else {
+ logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
+ return seastar::smp::submit_to(
+ get_shard_id(), [this] {
+ return send_keepalive_redirected();
+ });
+ }
+}
+
+seastar::future<> IOHandler::do_send_keepalive()
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
+ if (!need_keepalive) {
+ need_keepalive = true;
+ notify_out_dispatch();
+ }
+ return seastar::now();
+}
+
+void IOHandler::mark_down()
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() != io_state_t::none);
+ need_dispatch_reset = false;
+ if (get_io_state() == io_state_t::drop) {
+ return;
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().info("{} mark_down() at {}, send {} notify_mark_down()",
+ conn, io_stat_printer{*this}, cc_seq);
+ do_set_io_state(io_state_t::drop);
+ shard_states->dispatch_in_background(
+ "notify_mark_down", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq] {
+ return handshake_listener->notify_mark_down(cc_seq);
+ });
+ });
+}
+
+void IOHandler::print_io_stat(std::ostream &out) const
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ out << "io_stat("
+ << "io_state=" << fmt::format("{}", get_io_state())
+ << ", in_seq=" << in_seq
+ << ", out_seq=" << out_seq
+ << ", out_pending_msgs_size=" << out_pending_msgs.size()
+ << ", out_sent_msgs_size=" << out_sent_msgs.size()
+ << ", need_ack=" << (ack_left > 0)
+ << ", need_keepalive=" << need_keepalive
+ << ", need_keepalive_ack=" << bool(next_keepalive_ack)
+ << ")";
+}
+
+void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
+{
+ assert(fa != nullptr);
+ ceph_assert_always(frame_assembler == nullptr);
+ frame_assembler = std::move(fa);
+ ceph_assert_always(
+ frame_assembler->get_shard_id() == get_shard_id());
+ // should have been set through dispatch_accept/connect()
+ ceph_assert_always(
+ frame_assembler->get_socket_shard_id() == get_shard_id());
+ ceph_assert_always(frame_assembler->is_socket_valid());
+}
+
+void IOHandler::do_set_io_state(
+ io_state_t new_state,
+ std::optional<crosscore_t::seq_t> cc_seq,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out)
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ auto prv_state = get_io_state();
+ logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
+ "fa={}, set_notify_out={}, at {}",
+ conn,
+ cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "",
+ prv_state, new_state,
+ fa ? "present" : "N/A", set_notify_out,
+ io_stat_printer{*this});
+ ceph_assert_always(!(
+ (new_state == io_state_t::none && prv_state != io_state_t::none) ||
+ (new_state == io_state_t::open && prv_state == io_state_t::open)
+ ));
+
+ if (prv_state == io_state_t::drop) {
+ // only possible due to a racing mark_down() from user
+ if (new_state == io_state_t::open) {
+ assign_frame_assembler(std::move(fa));
+ frame_assembler->shutdown_socket<false>(nullptr);
+ } else {
+ assert(fa == nullptr);
+ }
+ return;
+ }
+
+ bool dispatch_in = false;
+ if (new_state == io_state_t::open) {
+ // to open
+ ceph_assert_always(protocol_is_connected == true);
+ assign_frame_assembler(std::move(fa));
+ dispatch_in = true;
+ } else if (prv_state == io_state_t::open) {
+ // from open
+ ceph_assert_always(protocol_is_connected == true);
+ protocol_is_connected = false;
+ assert(fa == nullptr);
+ ceph_assert_always(frame_assembler->is_socket_valid());
+ frame_assembler->shutdown_socket<false>(nullptr);
+ } else {
+ assert(fa == nullptr);
+ }
+
+ if (new_state == io_state_t::delay) {
+ need_notify_out = set_notify_out;
+ if (need_notify_out) {
+ maybe_notify_out_dispatch();
+ }
+ } else {
+ assert(set_notify_out == false);
+ need_notify_out = false;
+ }
+
+ // FIXME: simplify and drop the prv_state == new_state case
+ if (prv_state != new_state) {
+ shard_states->set_io_state(new_state);
+ }
+
+ /*
+ * not atomic below
+ */
+
+ if (dispatch_in) {
+ do_in_dispatch();
+ }
+}
+
+seastar::future<> IOHandler::set_io_state(
+ crosscore_t::seq_t cc_seq,
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} set_io_state(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_state,
+ fa=std::move(fa), set_notify_out]() mutable {
+ return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
+ });
+ }
+
+ do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out);
+ return seastar::now();
+}
+
+seastar::future<IOHandler::exit_dispatching_ret>
+IOHandler::wait_io_exit_dispatching(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return wait_io_exit_dispatching(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} wait_io_exit_dispatching()",
+ conn, cc_seq);
+ ceph_assert_always(get_io_state() != io_state_t::open);
+ ceph_assert_always(frame_assembler != nullptr);
+ ceph_assert_always(!frame_assembler->is_socket_valid());
+ return seastar::futurize_invoke([this] {
+ // cannot be running in parallel with to_new_sid()
+ if (maybe_dropped_sid.has_value()) {
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+ assert(shard_states->assert_closed_and_exit());
+ auto prv_sid = *maybe_dropped_sid;
+ return seastar::smp::submit_to(prv_sid, [this] {
+ logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn);
+ assert(maybe_prv_shard_states != nullptr);
+ return maybe_prv_shard_states->wait_io_exit_dispatching();
+ });
+ } else {
+ return shard_states->wait_io_exit_dispatching();
+ }
+ }).then([this] {
+ logger().debug("{} finish wait_io_exit_dispatching at {}",
+ conn, io_stat_printer{*this});
+ ceph_assert_always(frame_assembler != nullptr);
+ ceph_assert_always(!frame_assembler->is_socket_valid());
+ frame_assembler->set_shard_id(conn.get_messenger_shard_id());
+ return exit_dispatching_ret{
+ std::move(frame_assembler),
+ get_states()};
+ });
+}
+
+seastar::future<> IOHandler::reset_session(
+ crosscore_t::seq_t cc_seq,
+ bool full)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} reset_session(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, full] {
+ return reset_session(cc_seq, full);
+ });
+ }
+
+ logger().debug("{} got {} reset_session({})",
+ conn, cc_seq, full);
+ assert(get_io_state() != io_state_t::open);
+ reset_in();
+ if (full) {
+ reset_out();
+ dispatch_remote_reset();
+ }
+ return seastar::now();
+}
+
+seastar::future<> IOHandler::reset_peer_state(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} reset_peer_state(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return reset_peer_state(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} reset_peer_state()",
+ conn, cc_seq);
+ assert(get_io_state() != io_state_t::open);
+ reset_in();
+ do_requeue_out_sent_up_to(0);
+ discard_out_sent();
+ return seastar::now();
+}
+
+seastar::future<> IOHandler::requeue_out_sent(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} requeue_out_sent(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return requeue_out_sent(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} requeue_out_sent()",
+ conn, cc_seq);
+ do_requeue_out_sent();
+ return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent()
+{
+ assert(get_io_state() != io_state_t::open);
+ if (out_sent_msgs.empty()) {
+ return;
+ }
+
+ out_seq -= out_sent_msgs.size();
+ logger().debug("{} requeue {} items, revert out_seq to {}",
+ conn, out_sent_msgs.size(), out_seq);
+ for (MessageFRef& msg : out_sent_msgs) {
+ msg->clear_payload();
+ msg->set_seq(0);
+ }
+ out_pending_msgs.insert(
+ out_pending_msgs.begin(),
+ std::make_move_iterator(out_sent_msgs.begin()),
+ std::make_move_iterator(out_sent_msgs.end()));
+ out_sent_msgs.clear();
+ maybe_notify_out_dispatch();
+}
+
+seastar::future<> IOHandler::requeue_out_sent_up_to(
+ crosscore_t::seq_t cc_seq,
+ seq_num_t msg_seq)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, msg_seq] {
+ return requeue_out_sent_up_to(cc_seq, msg_seq);
+ });
+ }
+
+ logger().debug("{} got {} requeue_out_sent_up_to({})",
+ conn, cc_seq, msg_seq);
+ do_requeue_out_sent_up_to(msg_seq);
+ return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq)
+{
+ assert(get_io_state() != io_state_t::open);
+ if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
+ logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
+ conn, out_seq, seq);
+ out_seq = seq;
+ return;
+ }
+ logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
+ conn, seq, out_sent_msgs.size(), out_seq);
+ while (!out_sent_msgs.empty()) {
+ auto cur_seq = out_sent_msgs.front()->get_seq();
+ if (cur_seq == 0 || cur_seq > seq) {
+ break;
+ } else {
+ out_sent_msgs.pop_front();
+ }
+ }
+ do_requeue_out_sent();
+}
+
+void IOHandler::reset_in()
+{
+ assert(get_io_state() != io_state_t::open);
+ in_seq = 0;
+}
+
+void IOHandler::reset_out()
+{
+ assert(get_io_state() != io_state_t::open);
+ discard_out_sent();
+ out_pending_msgs.clear();
+ need_keepalive = false;
+ next_keepalive_ack = std::nullopt;
+ ack_left = 0;
+}
+
+void IOHandler::discard_out_sent()
+{
+ assert(get_io_state() != io_state_t::open);
+ out_seq = 0;
+ out_sent_msgs.clear();
+}
+
+seastar::future<>
+IOHandler::dispatch_accept(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref,
+ bool is_replace)
+{
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+}
+
+seastar::future<>
+IOHandler::dispatch_connect(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref)
+{
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt);
+}
+
+seastar::future<>
+IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ return seastar::smp::submit_to(prv_sid, [this] {
+ logger().debug("{} got cleanup_prv_shard()", conn);
+ assert(maybe_prv_shard_states != nullptr);
+ auto ref_prv_states = std::move(maybe_prv_shard_states);
+ auto &prv_states = *ref_prv_states;
+ return prv_states.close(
+ ).then([ref_prv_states=std::move(ref_prv_states)] {
+ ceph_assert_always(ref_prv_states->assert_closed_and_exit());
+ });
+ }).then([this] {
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ });
+}
+
+seastar::future<>
+IOHandler::to_new_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref,
+ std::optional<bool> is_replace)
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} to_new_sid(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_sid, is_replace,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+ });
+ }
+
+ bool is_accept_or_connect = is_replace.has_value();
+ logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
+ conn, cc_seq, new_sid,
+ fmt::format("{}",
+ is_accept_or_connect ?
+ (*is_replace ? "accept(replace)" : "accept(!replace)") :
+ "connect"),
+ io_stat_printer{*this});
+ auto next_cc_seq = ++cc_seq;
+
+ if (get_io_state() != io_state_t::drop) {
+ ceph_assert_always(conn_ref);
+ if (new_sid != seastar::this_shard_id()) {
+ dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect);
+ // user can make changes
+ }
+ } else {
+ // it is possible that both io_handler and protocolv2 are
+ // trying to close each other from different cores simultaneously.
+ assert(!protocol_is_connected);
+ }
+
+ if (get_io_state() != io_state_t::drop) {
+ if (is_accept_or_connect) {
+ // protocol_is_connected can be from true to true here if the replacing is
+ // happening to a connected connection.
+ } else {
+ ceph_assert_always(protocol_is_connected == false);
+ }
+ protocol_is_connected = true;
+ } else {
+ assert(!protocol_is_connected);
+ }
+
+ bool is_dropped = false;
+ if (get_io_state() == io_state_t::drop) {
+ is_dropped = true;
+ }
+ ceph_assert_always(get_io_state() != io_state_t::open);
+
+ // apply the switching atomically
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+ auto prv_sid = get_shard_id();
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ maybe_prv_shard_states = std::move(shard_states);
+ shard_states = shard_states_t::create_from_previous(
+ *maybe_prv_shard_states, new_sid);
+ assert(new_sid == get_shard_id());
+
+ return seastar::smp::submit_to(new_sid,
+ [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
+ logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
+ conn, next_cc_seq, prv_sid, is_dropped,
+ fmt::format("{}",
+ is_replace.has_value() ?
+ (*is_replace ? "accept(replace)" : "accept(!replace)") :
+ "connect"),
+ io_stat_printer{*this});
+
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() != io_state_t::open);
+ ceph_assert_always(!maybe_dropped_sid.has_value());
+ ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
+
+ if (is_dropped) {
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+ ceph_assert_always(shard_states->assert_closed_and_exit());
+ maybe_dropped_sid = prv_sid;
+ // cleanup_prv_shard() will be done in a follow-up close_io()
+ } else {
+ // possible at io_state_t::drop
+
+ // previous shard is not cleaned,
+ // but close_io() is responsible to clean up the current shard,
+ // so cleanup the previous shard here.
+ shard_states->dispatch_in_background(
+ "cleanup_prv_sid", conn, [this, prv_sid] {
+ return cleanup_prv_shard(prv_sid);
+ });
+ maybe_notify_out_dispatch();
+ }
+
+ ceph_assert_always(!conn_ref);
+ // assign even if already dropping
+ conn_ref = make_local_shared_foreign(std::move(conn_fref));
+
+ if (get_io_state() != io_state_t::drop) {
+ if (is_replace.has_value()) {
+ dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace);
+ } else {
+ dispatchers.ms_handle_connect(conn_ref, prv_sid);
+ }
+ // user can make changes
+ }
+ });
+}
+
+seastar::future<> IOHandler::set_accepted_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id sid,
+ ConnectionFRef conn_fref)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ assert(get_io_state() == io_state_t::none);
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+ assert(maybe_prv_shard_states == nullptr);
+ shard_states.reset();
+ shard_states = shard_states_t::create(sid, io_state_t::none);
+ return seastar::smp::submit_to(sid,
+ [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
+ // must be the first to proceed
+ ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
+
+ logger().debug("{} set accepted sid", conn);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() == io_state_t::none);
+ assert(maybe_prv_shard_states == nullptr);
+ ceph_assert_always(!conn_ref);
+ conn_ref = make_local_shared_foreign(std::move(conn_fref));
+ });
+}
+
+void IOHandler::dispatch_reset(bool is_replace)
+{
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+ if (!need_dispatch_reset) {
+ return;
+ }
+ need_dispatch_reset = false;
+ ceph_assert_always(conn_ref);
+
+ dispatchers.ms_handle_reset(conn_ref, is_replace);
+ // user can make changes
+}
+
+void IOHandler::dispatch_remote_reset()
+{
+ if (get_io_state() == io_state_t::drop) {
+ return;
+ }
+ ceph_assert_always(conn_ref);
+
+ dispatchers.ms_handle_remote_reset(conn_ref);
+ // user can make changes
+}
+
+void IOHandler::ack_out_sent(seq_num_t seq)
+{
+ if (conn.policy.lossy) { // lossy connections don't keep sent messages
+ return;
+ }
+ while (!out_sent_msgs.empty() &&
+ out_sent_msgs.front()->get_seq() <= seq) {
+ logger().trace("{} got ack seq {} >= {}, pop {}",
+ conn, seq, out_sent_msgs.front()->get_seq(),
+ *out_sent_msgs.front());
+ out_sent_msgs.pop_front();
+ }
+}
+
+seastar::future<>
+IOHandler::do_out_dispatch(shard_states_t &ctx)
+{
+ return seastar::repeat([this, &ctx] {
+ switch (ctx.get_io_state()) {
+ case io_state_t::open: {
+ if (unlikely(!is_out_queued())) {
+ // try exit open dispatching
+ return frame_assembler->flush<false>(
+ ).then([this, &ctx] {
+ if (ctx.get_io_state() != io_state_t::open || is_out_queued()) {
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ // still nothing pending to send after flush,
+ // open dispatching can ONLY stop now
+ ctx.exit_out_dispatching("exit-open", conn);
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ });
+ }
+
+ auto require_keepalive = need_keepalive;
+ need_keepalive = false;
+ auto maybe_keepalive_ack = next_keepalive_ack;
+ next_keepalive_ack = std::nullopt;
+ auto to_ack = ack_left;
+ assert(to_ack == 0 || in_seq > 0);
+ ack_left = 0;
+#ifdef UNIT_TESTS_BUILT
+ auto ret = sweep_out_pending_msgs_to_sent(
+ require_keepalive, maybe_keepalive_ack, to_ack > 0);
+ return frame_assembler->intercept_frames(ret.tags, true
+ ).then([this, bl=std::move(ret.bl)]() mutable {
+ return frame_assembler->write<false>(std::move(bl));
+ }
+#else
+ auto bl = sweep_out_pending_msgs_to_sent(
+ require_keepalive, maybe_keepalive_ack, to_ack > 0);
+ return frame_assembler->write<false>(std::move(bl)
+#endif
+ ).then([this, &ctx] {
+ if (ctx.get_io_state() != io_state_t::open) {
+ return frame_assembler->flush<false>(
+ ).then([] {
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
+ }
+
+ // FIXME: may leak a flush if state is changed after return and before
+ // the next repeat body.
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
+ }
+ case io_state_t::delay:
+ // delay out dispatching until open
+ ctx.notify_out_dispatching_stopped("delay...", conn);
+ return ctx.wait_state_change(
+ ).then([] { return stop_t::no; });
+ case io_state_t::drop:
+ ctx.exit_out_dispatching("dropped", conn);
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ case io_state_t::switched:
+ ctx.exit_out_dispatching("switched", conn);
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_abort("impossible");
+ }
+ }).handle_exception_type([this, &ctx](const std::system_error& e) {
+ auto io_state = ctx.get_io_state();
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset &&
+ e.code() != error::negotiation_failure) {
+ logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
+ conn, io_state, e.what());
+ ceph_abort();
+ }
+
+ if (io_state == io_state_t::open) {
+ auto cc_seq = crosscore.prepare_submit();
+ logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
+ "send {} notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
+ std::exception_ptr eptr;
+ try {
+ throw e;
+ } catch(...) {
+ eptr = std::current_exception();
+ }
+ do_set_io_state(io_state_t::delay);
+ shard_states->dispatch_in_background(
+ "notify_out_fault(out)", conn, [this, cc_seq, eptr] {
+ auto states = get_states();
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+ return handshake_listener->notify_out_fault(
+ cc_seq, "do_out_dispatch", eptr, states);
+ });
+ });
+ } else {
+ if (io_state != io_state_t::switched) {
+ logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
+ conn, io_state, io_stat_printer{*this}, e.what());
+ } else {
+ logger().info("{} do_out_dispatch(): fault at {} -- {}",
+ conn, io_state, e.what());
+ }
+ }
+
+ return do_out_dispatch(ctx);
+ });
+}
+
+void IOHandler::maybe_notify_out_dispatch()
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ if (is_out_queued()) {
+ notify_out_dispatch();
+ }
+}
+
+void IOHandler::notify_out_dispatch()
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ assert(is_out_queued());
+ if (need_notify_out) {
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} notify_out()",
+ conn, cc_seq);
+ shard_states->dispatch_in_background(
+ "notify_out", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq] {
+ return handshake_listener->notify_out(cc_seq);
+ });
+ });
+ }
+ if (shard_states->try_enter_out_dispatching()) {
+ shard_states->dispatch_in_background(
+ "do_out_dispatch", conn, [this] {
+ return do_out_dispatch(*shard_states);
+ });
+ }
+}
+
+seastar::future<>
+IOHandler::read_message(
+ shard_states_t &ctx,
+ utime_t throttle_stamp,
+ std::size_t msg_size)
+{
+ return frame_assembler->read_frame_payload<false>(
+ ).then([this, throttle_stamp, msg_size, &ctx](auto payload) {
+ if (unlikely(ctx.get_io_state() != io_state_t::open)) {
+ logger().debug("{} triggered {} during read_message()",
+ conn, ctx.get_io_state());
+ abort_protocol();
+ }
+
+ utime_t recv_stamp{seastar::lowres_system_clock::now()};
+
+ // we need to get the size before std::moving segments data
+ auto msg_frame = MessageFrame::Decode(*payload);
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = msg_frame.header();
+
+ logger().trace("{} got {} + {} + {} byte message,"
+ " envelope type={} src={} off={} seq={}",
+ conn,
+ msg_frame.front_len(),
+ msg_frame.middle_len(),
+ msg_frame.data_len(),
+ current_header.type,
+ conn.get_peer_name(),
+ current_header.data_off,
+ current_header.seq);
+
+ ceph_msg_header header{current_header.seq,
+ current_header.tid,
+ current_header.type,
+ current_header.priority,
+ current_header.version,
+ ceph_le32(msg_frame.front_len()),
+ ceph_le32(msg_frame.middle_len()),
+ ceph_le32(msg_frame.data_len()),
+ current_header.data_off,
+ conn.get_peer_name(),
+ current_header.compat_version,
+ current_header.reserved,
+ ceph_le32(0)};
+ ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+ ceph_le32(0), ceph_le64(0), current_header.flags};
+
+ Message *message = decode_message(nullptr, 0, header, footer,
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
+ if (!message) {
+ logger().warn("{} decode message failed", conn);
+ abort_in_fault();
+ }
+
+ // store reservation size in message, so we don't get confused
+ // by messages entering the dispatch queue through other paths.
+ message->set_dispatch_throttle_size(msg_size);
+
+ message->set_throttle_stamp(throttle_stamp);
+ message->set_recv_stamp(recv_stamp);
+ message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
+
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the
+ // client side queueing because messages can't be renumbered, but the (kernel)
+ // client will occasionally pull a message out of the sent queue to send
+ // elsewhere. in that case it doesn't matter if we "got" it or not.
+ uint64_t cur_seq = in_seq;
+ if (message->get_seq() <= cur_seq) {
+ logger().error("{} got old message {} <= {} {}, discarding",
+ conn, message->get_seq(), cur_seq, *message);
+ if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+ local_conf()->ms_die_on_old_message) {
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return seastar::now();
+ } else if (message->get_seq() > cur_seq + 1) {
+ logger().error("{} missed message? skipped from seq {} to {}",
+ conn, cur_seq, message->get_seq());
+ if (local_conf()->ms_die_on_skipped_message) {
+ ceph_assert(0 == "skipped incoming seq");
+ }
+ }
+
+ // note last received message.
+ in_seq = message->get_seq();
+ if (conn.policy.lossy) {
+ logger().debug("{} <== #{} === {} ({})",
+ conn,
+ message->get_seq(),
+ *message,
+ message->get_type());
+ } else {
+ logger().debug("{} <== #{},{} === {} ({})",
+ conn,
+ message->get_seq(),
+ current_header.ack_seq,
+ *message,
+ message->get_type());
+ }
+
+ // notify ack
+ if (!conn.policy.lossy) {
+ ++ack_left;
+ notify_out_dispatch();
+ }
+
+ ack_out_sent(current_header.ack_seq);
+
+ // TODO: change MessageRef with seastar::shared_ptr
+ auto msg_ref = MessageRef{message, false};
+ assert(ctx.get_io_state() == io_state_t::open);
+ assert(get_io_state() == io_state_t::open);
+ ceph_assert_always(conn_ref);
+
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ // user can make changes
+ });
+}
+
+void IOHandler::do_in_dispatch()
+{
+ shard_states->enter_in_dispatching();
+ shard_states->dispatch_in_background(
+ "do_in_dispatch", conn, [this, &ctx=*shard_states] {
+ return seastar::keep_doing([this, &ctx] {
+ return frame_assembler->read_main_preamble<false>(
+ ).then([this, &ctx](auto ret) {
+ switch (ret.tag) {
+ case Tag::MESSAGE: {
+ size_t msg_size = get_msg_size(*ret.rx_frame_asm);
+ return seastar::futurize_invoke([this] {
+ // throttle_message() logic
+ if (!conn.policy.throttler_messages) {
+ return seastar::now();
+ }
+ // TODO: message throttler
+ ceph_abort("TODO");
+ return seastar::now();
+ }).then([this, msg_size] {
+ // throttle_bytes() logic
+ if (!conn.policy.throttler_bytes) {
+ return seastar::now();
+ }
+ if (!msg_size) {
+ return seastar::now();
+ }
+ logger().trace("{} wants {} bytes from policy throttler {}/{}",
+ conn, msg_size,
+ conn.policy.throttler_bytes->get_current(),
+ conn.policy.throttler_bytes->get_max());
+ return conn.policy.throttler_bytes->get(msg_size);
+ }).then([this, msg_size, &ctx] {
+ // TODO: throttle_dispatch_queue() logic
+ utime_t throttle_stamp{seastar::lowres_system_clock::now()};
+ return read_message(ctx, throttle_stamp, msg_size);
+ });
+ }
+ case Tag::ACK:
+ return frame_assembler->read_frame_payload<false>(
+ ).then([this](auto payload) {
+ // handle_message_ack() logic
+ auto ack = AckFrame::Decode(payload->back());
+ logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
+ ack_out_sent(ack.seq());
+ });
+ case Tag::KEEPALIVE2:
+ return frame_assembler->read_frame_payload<false>(
+ ).then([this](auto payload) {
+ // handle_keepalive2() logic
+ auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
+ logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+ conn, keepalive_frame.timestamp());
+ // notify keepalive ack
+ next_keepalive_ack = keepalive_frame.timestamp();
+ if (seastar::this_shard_id() == get_shard_id()) {
+ notify_out_dispatch();
+ }
+
+ last_keepalive = seastar::lowres_system_clock::now();
+ });
+ case Tag::KEEPALIVE2_ACK:
+ return frame_assembler->read_frame_payload<false>(
+ ).then([this](auto payload) {
+ // handle_keepalive2_ack() logic
+ auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
+ auto _last_keepalive_ack =
+ seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
+ set_last_keepalive_ack(_last_keepalive_ack);
+ logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
+ conn, _last_keepalive_ack);
+ });
+ default: {
+ logger().warn("{} do_in_dispatch() received unexpected tag: {}",
+ conn, static_cast<uint32_t>(ret.tag));
+ abort_in_fault();
+ }
+ }
+ });
+ }).handle_exception([this, &ctx](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+
+ auto io_state = ctx.get_io_state();
+ if (io_state == io_state_t::open) {
+ auto cc_seq = crosscore.prepare_submit();
+ logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
+ "send {} notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
+ do_set_io_state(io_state_t::delay);
+ shard_states->dispatch_in_background(
+ "notify_out_fault(in)", conn, [this, cc_seq, eptr] {
+ auto states = get_states();
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+ return handshake_listener->notify_out_fault(
+ cc_seq, "do_in_dispatch", eptr, states);
+ });
+ });
+ } else {
+ if (io_state != io_state_t::switched) {
+ logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",
+ conn, io_state, io_stat_printer{*this}, e_what);
+ } else {
+ logger().info("{} do_in_dispatch(): fault at {} -- {}",
+ conn, io_state, e_what);
+ }
+ }
+ }).finally([&ctx] {
+ ctx.exit_in_dispatching();
+ });
+ });
+}
+
+seastar::future<>
+IOHandler::close_io(
+ crosscore_t::seq_t cc_seq,
+ bool is_dispatch_reset,
+ bool is_replace)
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} close_io(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, is_dispatch_reset, is_replace] {
+ return close_io(cc_seq, is_dispatch_reset, is_replace);
+ });
+ }
+
+ logger().debug("{} got {} close_io(reset={}, replace={})",
+ conn, cc_seq, is_dispatch_reset, is_replace);
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+
+ if (is_dispatch_reset) {
+ dispatch_reset(is_replace);
+ }
+
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+
+ // cannot be running in parallel with to_new_sid()
+ if (maybe_dropped_sid.has_value()) {
+ assert(shard_states->assert_closed_and_exit());
+ auto prv_sid = *maybe_dropped_sid;
+ return cleanup_prv_shard(prv_sid);
+ } else {
+ return shard_states->close(
+ ).then([this] {
+ assert(shard_states->assert_closed_and_exit());
+ });
+ }
+}
+
+/*
+ * IOHandler::shard_states_t
+ */
+
+void
+IOHandler::shard_states_t::notify_out_dispatching_stopped(
+ const char *what, SocketConnection &conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ if (unlikely(out_exit_dispatching.has_value())) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
+ conn, what, io_state);
+ } else {
+ if (unlikely(io_state != io_state_t::open)) {
+ logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
+ conn, what, io_state);
+ }
+ }
+}
+
+seastar::future<>
+IOHandler::shard_states_t::wait_io_exit_dispatching()
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != io_state_t::open);
+ assert(!gate.is_closed());
+ return seastar::when_all(
+ [this] {
+ if (out_exit_dispatching) {
+ return out_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }(),
+ [this] {
+ if (in_exit_dispatching) {
+ return in_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }()
+ ).discard_result();
+}
+
+IOHandler::shard_states_ref_t
+IOHandler::shard_states_t::create_from_previous(
+ shard_states_t &prv_states,
+ seastar::shard_id new_sid)
+{
+ auto io_state = prv_states.io_state;
+ assert(io_state != io_state_t::open);
+ auto ret = shard_states_t::create(new_sid, io_state);
+ if (io_state == io_state_t::drop) {
+ // the new gate should not never be used
+ auto fut = ret->gate.close();
+ ceph_assert_always(fut.available());
+ }
+ prv_states.set_io_state(io_state_t::switched);
+ return ret;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h
new file mode 100644
index 000000000..f53c2ba64
--- /dev/null
+++ b/src/crimson/net/io_handler.h
@@ -0,0 +1,623 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <vector>
+
+#include <seastar/core/shared_future.hh>
+#include <seastar/util/later.hh>
+
+#include "crimson/common/gated.h"
+#include "Fwd.h"
+#include "SocketConnection.h"
+#include "FrameAssemblerV2.h"
+
+namespace crimson::net {
+
+/**
+ * crosscore_t
+ *
+ * To preserve the event order across cores.
+ */
+class crosscore_t {
+public:
+ using seq_t = uint64_t;
+
+ crosscore_t() = default;
+ ~crosscore_t() = default;
+
+ seq_t get_in_seq() const {
+ return in_seq;
+ }
+
+ seq_t prepare_submit() {
+ ++out_seq;
+ return out_seq;
+ }
+
+ bool proceed_or_wait(seq_t seq) {
+ if (seq == in_seq + 1) {
+ ++in_seq;
+ if (unlikely(in_pr_wait.has_value())) {
+ in_pr_wait->set_value();
+ in_pr_wait = std::nullopt;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ seastar::future<> wait(seq_t seq) {
+ assert(seq != in_seq + 1);
+ if (!in_pr_wait.has_value()) {
+ in_pr_wait = seastar::shared_promise<>();
+ }
+ return in_pr_wait->get_shared_future();
+ }
+
+private:
+ seq_t out_seq = 0;
+ seq_t in_seq = 0;
+ std::optional<seastar::shared_promise<>> in_pr_wait;
+};
+
+/**
+ * io_handler_state
+ *
+ * It is required to populate the states from IOHandler to ProtocolV2
+ * asynchronously.
+ */
+struct io_handler_state {
+ seq_num_t in_seq;
+ bool is_out_queued;
+ bool has_out_sent;
+
+ bool is_out_queued_or_sent() const {
+ return is_out_queued || has_out_sent;
+ }
+
+ /*
+ * should be consistent with the accroding interfaces in IOHandler
+ */
+
+ void reset_session(bool full) {
+ in_seq = 0;
+ if (full) {
+ is_out_queued = false;
+ has_out_sent = false;
+ }
+ }
+
+ void reset_peer_state() {
+ in_seq = 0;
+ is_out_queued = is_out_queued_or_sent();
+ has_out_sent = false;
+ }
+
+ void requeue_out_sent_up_to() {
+ // noop since the information is insufficient
+ }
+
+ void requeue_out_sent() {
+ if (has_out_sent) {
+ has_out_sent = false;
+ is_out_queued = true;
+ }
+ }
+};
+
+/**
+ * HandshakeListener
+ *
+ * The interface class for IOHandler to notify the ProtocolV2.
+ *
+ * The notifications may be cross-core and must be sent to
+ * SocketConnection::get_messenger_shard_id()
+ */
+class HandshakeListener {
+public:
+ virtual ~HandshakeListener() = default;
+
+ HandshakeListener(const HandshakeListener&) = delete;
+ HandshakeListener(HandshakeListener &&) = delete;
+ HandshakeListener &operator=(const HandshakeListener &) = delete;
+ HandshakeListener &operator=(HandshakeListener &&) = delete;
+
+ virtual seastar::future<> notify_out(
+ crosscore_t::seq_t cc_seq) = 0;
+
+ virtual seastar::future<> notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr,
+ io_handler_state) = 0;
+
+ virtual seastar::future<> notify_mark_down(
+ crosscore_t::seq_t cc_seq) = 0;
+
+protected:
+ HandshakeListener() = default;
+};
+
+/**
+ * IOHandler
+ *
+ * Implements the message read and write paths after the handshake, and also be
+ * responsible to dispatch events. It is supposed to be working on the same
+ * core with the underlying socket and the FrameAssemblerV2 class.
+ */
+class IOHandler final : public ConnectionHandler {
+public:
+ IOHandler(ChainedDispatchers &,
+ SocketConnection &);
+
+ ~IOHandler() final;
+
+ IOHandler(const IOHandler &) = delete;
+ IOHandler(IOHandler &&) = delete;
+ IOHandler &operator=(const IOHandler &) = delete;
+ IOHandler &operator=(IOHandler &&) = delete;
+
+/*
+ * as ConnectionHandler
+ */
+public:
+ seastar::shard_id get_shard_id() const final {
+ return shard_states->get_shard_id();
+ }
+
+ bool is_connected() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ return protocol_is_connected;
+ }
+
+ seastar::future<> send(MessageFRef msg) final;
+
+ seastar::future<> send_keepalive() final;
+
+ clock_t::time_point get_last_keepalive() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ return last_keepalive;
+ }
+
+ clock_t::time_point get_last_keepalive_ack() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ return last_keepalive_ack;
+ }
+
+ void set_last_keepalive_ack(clock_t::time_point when) final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ last_keepalive_ack = when;
+ }
+
+ void mark_down() final;
+
+/*
+ * as IOHandler to be called by ProtocolV2 handshake
+ *
+ * The calls may be cross-core and asynchronous
+ */
+public:
+ /*
+ * should not be called cross-core
+ */
+
+ void set_handshake_listener(HandshakeListener &hl) {
+ assert(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(handshake_listener == nullptr);
+ handshake_listener = &hl;
+ }
+
+ io_handler_state get_states() const {
+ // might be called from prv_sid during wait_io_exit_dispatching()
+ return {in_seq, is_out_queued(), has_out_sent()};
+ }
+
+ struct io_stat_printer {
+ const IOHandler &io_handler;
+ };
+ void print_io_stat(std::ostream &out) const;
+
+ seastar::future<> set_accepted_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id sid,
+ ConnectionFRef conn_fref);
+
+ /*
+ * may be called cross-core
+ */
+
+ seastar::future<> close_io(
+ crosscore_t::seq_t cc_seq,
+ bool is_dispatch_reset,
+ bool is_replace);
+
+ /**
+ * io_state_t
+ *
+ * The io_state is changed with the protocol state, to control the
+ * io behavior accordingly.
+ */
+ enum class io_state_t : uint8_t {
+ none, // no IO is possible as the connection is not available to the user yet.
+ delay, // IO is delayed until open.
+ open, // Dispatch In and Out concurrently.
+ drop, // Drop IO as the connection is closed.
+ switched // IO is switched to a different core
+ // (is moved to maybe_prv_shard_states)
+ };
+ friend class fmt::formatter<io_state_t>;
+
+ seastar::future<> set_io_state(
+ crosscore_t::seq_t cc_seq,
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out);
+
+ struct exit_dispatching_ret {
+ FrameAssemblerV2Ref frame_assembler;
+ io_handler_state io_states;
+ };
+ seastar::future<exit_dispatching_ret>
+ wait_io_exit_dispatching(
+ crosscore_t::seq_t cc_seq);
+
+ seastar::future<> reset_session(
+ crosscore_t::seq_t cc_seq,
+ bool full);
+
+ seastar::future<> reset_peer_state(
+ crosscore_t::seq_t cc_seq);
+
+ seastar::future<> requeue_out_sent_up_to(
+ crosscore_t::seq_t cc_seq,
+ seq_num_t msg_seq);
+
+ seastar::future<> requeue_out_sent(
+ crosscore_t::seq_t cc_seq);
+
+ seastar::future<> dispatch_accept(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef,
+ bool is_replace);
+
+ seastar::future<> dispatch_connect(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef);
+
+ private:
+ class shard_states_t;
+ using shard_states_ref_t = std::unique_ptr<shard_states_t>;
+
+ class shard_states_t {
+ public:
+ shard_states_t(seastar::shard_id _sid, io_state_t state)
+ : sid{_sid}, io_state{state} {}
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+ io_state_t get_io_state() const {
+ assert(seastar::this_shard_id() == sid);
+ return io_state;
+ }
+
+ void set_io_state(io_state_t new_state) {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != new_state);
+ pr_io_state_changed.set_value();
+ pr_io_state_changed = seastar::promise<>();
+ if (io_state == io_state_t::open) {
+ // from open
+ if (out_dispatching) {
+ ceph_assert_always(!out_exit_dispatching.has_value());
+ out_exit_dispatching = seastar::promise<>();
+ }
+ }
+ io_state = new_state;
+ }
+
+ seastar::future<> wait_state_change() {
+ assert(seastar::this_shard_id() == sid);
+ return pr_io_state_changed.get_future();
+ }
+
+ template <typename Func>
+ void dispatch_in_background(
+ const char *what, SocketConnection &who, Func &&func) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(!gate.is_closed());
+ gate.dispatch_in_background(what, who, std::move(func));
+ }
+
+ void enter_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state == io_state_t::open);
+ ceph_assert_always(!in_exit_dispatching.has_value());
+ in_exit_dispatching = seastar::promise<>();
+ }
+
+ void exit_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != io_state_t::open);
+ ceph_assert_always(in_exit_dispatching.has_value());
+ in_exit_dispatching->set_value();
+ in_exit_dispatching = std::nullopt;
+ }
+
+ bool try_enter_out_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ if (out_dispatching) {
+ // already dispatching out
+ return false;
+ }
+ switch (io_state) {
+ case io_state_t::open:
+ [[fallthrough]];
+ case io_state_t::delay:
+ out_dispatching = true;
+ return true;
+ case io_state_t::drop:
+ [[fallthrough]];
+ case io_state_t::switched:
+ // do not dispatch out
+ return false;
+ default:
+ ceph_abort("impossible");
+ }
+ }
+
+ void notify_out_dispatching_stopped(
+ const char *what, SocketConnection &conn);
+
+ void exit_out_dispatching(
+ const char *what, SocketConnection &conn) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(out_dispatching);
+ out_dispatching = false;
+ notify_out_dispatching_stopped(what, conn);
+ }
+
+ seastar::future<> wait_io_exit_dispatching();
+
+ seastar::future<> close() {
+ assert(seastar::this_shard_id() == sid);
+ assert(!gate.is_closed());
+ return gate.close();
+ }
+
+ bool assert_closed_and_exit() const {
+ assert(seastar::this_shard_id() == sid);
+ if (gate.is_closed()) {
+ ceph_assert_always(io_state == io_state_t::drop ||
+ io_state == io_state_t::switched);
+ ceph_assert_always(!out_dispatching);
+ ceph_assert_always(!out_exit_dispatching);
+ ceph_assert_always(!in_exit_dispatching);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ static shard_states_ref_t create(
+ seastar::shard_id sid, io_state_t state) {
+ return std::make_unique<shard_states_t>(sid, state);
+ }
+
+ static shard_states_ref_t create_from_previous(
+ shard_states_t &prv_states, seastar::shard_id new_sid);
+
+ private:
+ const seastar::shard_id sid;
+ io_state_t io_state;
+
+ crimson::common::Gated gate;
+ seastar::promise<> pr_io_state_changed;
+ bool out_dispatching = false;
+ std::optional<seastar::promise<>> out_exit_dispatching;
+ std::optional<seastar::promise<>> in_exit_dispatching;
+ };
+
+ void do_set_io_state(
+ io_state_t new_state,
+ std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+ FrameAssemblerV2Ref fa = nullptr,
+ bool set_notify_out = false);
+
+ io_state_t get_io_state() const {
+ return shard_states->get_io_state();
+ }
+
+ void do_requeue_out_sent();
+
+ void do_requeue_out_sent_up_to(seq_num_t seq);
+
+ void assign_frame_assembler(FrameAssemblerV2Ref);
+
+ seastar::future<> send_redirected(MessageFRef msg);
+
+ seastar::future<> do_send(MessageFRef msg);
+
+ seastar::future<> send_keepalive_redirected();
+
+ seastar::future<> do_send_keepalive();
+
+ seastar::future<> to_new_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef,
+ std::optional<bool> is_replace);
+
+ void dispatch_reset(bool is_replace);
+
+ void dispatch_remote_reset();
+
+ bool is_out_queued() const {
+ return (!out_pending_msgs.empty() ||
+ ack_left > 0 ||
+ need_keepalive ||
+ next_keepalive_ack.has_value());
+ }
+
+ bool has_out_sent() const {
+ return !out_sent_msgs.empty();
+ }
+
+ void reset_in();
+
+ void reset_out();
+
+ void discard_out_sent();
+
+ seastar::future<> do_out_dispatch(shard_states_t &ctx);
+
+#ifdef UNIT_TESTS_BUILT
+ struct sweep_ret {
+ ceph::bufferlist bl;
+ std::vector<ceph::msgr::v2::Tag> tags;
+ };
+ sweep_ret
+#else
+ ceph::bufferlist
+#endif
+ sweep_out_pending_msgs_to_sent(
+ bool require_keepalive,
+ std::optional<utime_t> maybe_keepalive_ack,
+ bool require_ack);
+
+ void maybe_notify_out_dispatch();
+
+ void notify_out_dispatch();
+
+ void ack_out_sent(seq_num_t seq);
+
+ seastar::future<> read_message(
+ shard_states_t &ctx,
+ utime_t throttle_stamp,
+ std::size_t msg_size);
+
+ void do_in_dispatch();
+
+ seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid);
+
+private:
+ shard_states_ref_t shard_states;
+
+ crosscore_t crosscore;
+
+ // drop was happening in the previous sid
+ std::optional<seastar::shard_id> maybe_dropped_sid;
+
+ // the remaining states in the previous sid for cleanup, see to_new_sid()
+ shard_states_ref_t maybe_prv_shard_states;
+
+ ChainedDispatchers &dispatchers;
+
+ SocketConnection &conn;
+
+ // core local reference for dispatching, valid until reset/close
+ ConnectionRef conn_ref;
+
+ HandshakeListener *handshake_listener = nullptr;
+
+ FrameAssemblerV2Ref frame_assembler;
+
+ bool protocol_is_connected = false;
+
+ bool need_dispatch_reset = true;
+
+ /*
+ * out states for writing
+ */
+
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+
+ // messages to be resent after connection gets reset
+ std::deque<MessageFRef> out_pending_msgs;
+
+ // messages sent, but not yet acked by peer
+ std::deque<MessageFRef> out_sent_msgs;
+
+ bool need_keepalive = false;
+
+ std::optional<utime_t> next_keepalive_ack = std::nullopt;
+
+ uint64_t ack_left = 0;
+
+ bool need_notify_out = false;
+
+ /*
+ * in states for reading
+ */
+
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+
+ clock_t::time_point last_keepalive;
+
+ clock_t::time_point last_keepalive_ack;
+};
+
+inline std::ostream& operator<<(
+ std::ostream& out, IOHandler::io_stat_printer stat) {
+ stat.io_handler.print_io_stat(out);
+ return out;
+}
+
+} // namespace crimson::net
+
+template <>
+struct fmt::formatter<crimson::net::io_handler_state> {
+ constexpr auto parse(format_parse_context& ctx) {
+ return ctx.begin();
+ }
+
+ template <typename FormatContext>
+ auto format(crimson::net::io_handler_state state, FormatContext& ctx) {
+ return fmt::format_to(
+ ctx.out(),
+ "io(in_seq={}, is_out_queued={}, has_out_sent={})",
+ state.in_seq,
+ state.is_out_queued,
+ state.has_out_sent);
+ }
+};
+
+template <>
+struct fmt::formatter<crimson::net::IOHandler::io_state_t>
+ : fmt::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(crimson::net::IOHandler::io_state_t state, FormatContext& ctx) {
+ using enum crimson::net::IOHandler::io_state_t;
+ std::string_view name;
+ switch (state) {
+ case none:
+ name = "none";
+ break;
+ case delay:
+ name = "delay";
+ break;
+ case open:
+ name = "open";
+ break;
+ case drop:
+ name = "drop";
+ break;
+ case switched:
+ name = "switched";
+ break;
+ }
+ return formatter<string_view>::format(name, ctx);
+ }
+};
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::IOHandler::io_stat_printer> : fmt::ostream_formatter {};
+#endif