summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/ProtocolV2.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/net/ProtocolV2.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net/ProtocolV2.cc')
-rw-r--r--src/crimson/net/ProtocolV2.cc2348
1 files changed, 2348 insertions, 0 deletions
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc
new file mode 100644
index 000000000..55b669384
--- /dev/null
+++ b/src/crimson/net/ProtocolV2.cc
@@ -0,0 +1,2348 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ProtocolV2.h"
+
+#include <fmt/format.h>
+#include <fmt/ranges.h>
+#include "include/msgr.h"
+#include "include/random.h"
+#include "msg/msg_fmt.h"
+
+#include "crimson/auth/AuthClient.h"
+#include "crimson/auth/AuthServer.h"
+#include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
+
+#include "Errors.h"
+#include "SocketMessenger.h"
+
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
+namespace {
+
+// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
+const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
+ (CEPH_MSGR2_FEATURE_REVISION_1 |
+ // CEPH_MSGR2_FEATURE_COMPRESSION |
+ UINT64_C(0));
+
+// Log levels in V2 Protocol:
+// * error level, something error that cause connection to terminate:
+// - fatal errors;
+// - bugs;
+// * warn level: something unusual that identifies connection fault or replacement:
+// - unstable network;
+// - incompatible peer;
+// - auth failure;
+// - connection race;
+// - connection reset;
+// * info level, something very important to show connection lifecycle,
+// which doesn't happen very frequently;
+// * debug level, important logs for debugging, including:
+// - all the messages sent/received (-->/<==);
+// - all the frames exchanged (WRITE/GOT);
+// - important fields updated (UPDATE);
+// - connection state transitions (TRIGGER);
+// * trace level, trivial logs showing:
+// - the exact bytes being sent/received (SEND/RECV(bytes));
+// - detailed information of sub-frames;
+// - integrity checks;
+// - etc.
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+[[noreturn]] void abort_in_fault() {
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+ throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+#define ABORT_IN_CLOSE(is_dispatch_reset) { \
+ do_close(is_dispatch_reset); \
+ abort_protocol(); \
+}
+
+inline void expect_tag(const Tag& expected,
+ const Tag& actual,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ if (actual != expected) {
+ logger().warn("{} {} received wrong tag: {}, expected {}",
+ conn, where,
+ static_cast<uint32_t>(actual),
+ static_cast<uint32_t>(expected));
+ abort_in_fault();
+ }
+}
+
+inline void unexpected_tag(const Tag& unexpected,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ logger().warn("{} {} received unexpected tag: {}",
+ conn, where, static_cast<uint32_t>(unexpected));
+ abort_in_fault();
+}
+
+inline uint64_t generate_client_cookie() {
+ return ceph::util::generate_random_number<uint64_t>(
+ 1, std::numeric_limits<uint64_t>::max());
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+seastar::future<> ProtocolV2::Timer::backoff(double seconds)
+{
+ logger().warn("{} waiting {} seconds ...", conn, seconds);
+ cancel();
+ last_dur_ = seconds;
+ as = seastar::abort_source();
+ auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
+ std::chrono::duration<double>(seconds));
+ return seastar::sleep_abortable(dur, *as
+ ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
+ logger().debug("{} wait aborted", conn);
+ abort_protocol();
+ });
+}
+
+ProtocolV2::ProtocolV2(SocketConnection& conn,
+ IOHandler &io_handler)
+ : conn{conn},
+ messenger{conn.messenger},
+ io_handler{io_handler},
+ frame_assembler{FrameAssemblerV2::create(conn)},
+ auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
+ protocol_timer{conn}
+{
+ io_states = io_handler.get_states();
+}
+
+ProtocolV2::~ProtocolV2() {}
+
+void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
+ const entity_name_t& _peer_name)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert(state == state_t::NONE);
+ ceph_assert(!gate.is_closed());
+ conn.peer_addr = _peer_addr;
+ conn.target_addr = _peer_addr;
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
+ client_cookie = generate_client_cookie();
+ logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
+ " policy(lossy={}, server={}, standby={}, resetcheck={})",
+ conn, _peer_addr, _peer_name, client_cookie,
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ execute_connecting();
+}
+
+void ProtocolV2::start_accept(SocketFRef&& new_socket,
+ const entity_addr_t& _peer_addr)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert(state == state_t::NONE);
+ // until we know better
+ conn.target_addr = _peer_addr;
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ is_socket_valid = true;
+ logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
+ messenger.accept_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+ auto cc_seq = crosscore.prepare_submit();
+ gate.dispatch_in_background("set_accepted_sid", conn, [this, cc_seq] {
+ return io_handler.set_accepted_sid(
+ cc_seq,
+ frame_assembler->get_socket_shard_id(),
+ seastar::make_foreign(conn.shared_from_this()));
+ });
+
+ execute_accepting();
+}
+
+void ProtocolV2::trigger_state_phase1(state_t new_state)
+{
+ ceph_assert_always(!gate.is_closed());
+ if (new_state == state) {
+ logger().error("{} is not allowed to re-trigger state {}",
+ conn, get_state_name(state));
+ ceph_abort();
+ }
+ if (state == state_t::CLOSING) {
+ logger().error("{} CLOSING is not allowed to trigger state {}",
+ conn, get_state_name(new_state));
+ ceph_abort();
+ }
+ logger().debug("{} TRIGGER {}, was {}",
+ conn, get_state_name(new_state), get_state_name(state));
+
+ if (state == state_t::READY) {
+ // from READY
+ ceph_assert_always(!need_exit_io);
+ ceph_assert_always(!pr_exit_io.has_value());
+ need_exit_io = true;
+ pr_exit_io = seastar::shared_promise<>();
+ }
+
+ if (new_state == state_t::STANDBY && !conn.policy.server) {
+ need_notify_out = true;
+ } else {
+ need_notify_out = false;
+ }
+
+ state = new_state;
+}
+
+void ProtocolV2::trigger_state_phase2(
+ state_t new_state, io_state_t new_io_state)
+{
+ ceph_assert_always(new_state == state);
+ ceph_assert_always(!gate.is_closed());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+
+ FrameAssemblerV2Ref fa;
+ if (new_state == state_t::READY) {
+ assert(new_io_state == io_state_t::open);
+ assert(io_handler.get_shard_id() ==
+ frame_assembler->get_socket_shard_id());
+ frame_assembler->set_shard_id(io_handler.get_shard_id());
+ fa = std::move(frame_assembler);
+ } else {
+ assert(new_io_state != io_state_t::open);
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+ "fa={}, set_notify_out={}",
+ conn, cc_seq, get_state_name(new_state), new_io_state,
+ fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
+ need_notify_out);
+ gate.dispatch_in_background(
+ "set_io_state", conn,
+ [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_state,
+ fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+ return io_handler.set_io_state(
+ cc_seq, new_io_state, std::move(fa), set_notify_out);
+ });
+ });
+
+ if (need_exit_io) {
+ // from READY
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
+ conn, cc_seq);
+ assert(pr_exit_io.has_value());
+ assert(new_io_state != io_state_t::open);
+ need_exit_io = false;
+ gate.dispatch_in_background("exit_io", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.wait_io_exit_dispatching(cc_seq);
+ }).then([this, cc_seq](auto ret) {
+ logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
+ conn, cc_seq, ret.io_states);
+ frame_assembler = std::move(ret.frame_assembler);
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert_always(
+ seastar::this_shard_id() == frame_assembler->get_shard_id());
+ ceph_assert_always(!frame_assembler->is_socket_valid());
+ assert(!need_exit_io);
+ io_states = ret.io_states;
+ pr_exit_io->set_value();
+ pr_exit_io = std::nullopt;
+ });
+ });
+ }
+}
+
+void ProtocolV2::fault(
+ state_t expected_state,
+ const char *where,
+ std::exception_ptr eptr)
+{
+ assert(expected_state == state_t::CONNECTING ||
+ expected_state == state_t::ESTABLISHING ||
+ expected_state == state_t::REPLACING ||
+ expected_state == state_t::READY);
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+
+ if (state != expected_state) {
+ logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
+ conn,
+ get_state_name(expected_state),
+ where,
+ get_state_name(state),
+ e_what);
+#ifndef NDEBUG
+ if (expected_state == state_t::REPLACING) {
+ assert(state == state_t::CLOSING);
+ } else if (expected_state == state_t::READY) {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING ||
+ state == state_t::CONNECTING ||
+ state == state_t::STANDBY);
+ } else {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ }
+#endif
+ return;
+ }
+ assert(state == expected_state);
+
+ if (state != state_t::CONNECTING && conn.policy.lossy) {
+ // socket will be shutdown in do_close()
+ logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
+ conn, get_state_name(state), where, e_what);
+ do_close(true);
+ return;
+ }
+
+ if (likely(has_socket)) {
+ if (likely(is_socket_valid)) {
+ ceph_assert_always(state != state_t::READY);
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ } else {
+ ceph_assert_always(state != state_t::ESTABLISHING);
+ }
+ } else { // !has_socket
+ ceph_assert_always(state == state_t::CONNECTING);
+ assert(!is_socket_valid);
+ }
+
+ if (conn.policy.server ||
+ (conn.policy.standby && !io_states.is_out_queued_or_sent())) {
+ if (conn.policy.server) {
+ logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ } else {
+ logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ }
+ execute_standby();
+ } else if (state == state_t::CONNECTING ||
+ state == state_t::REPLACING) {
+ logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ execute_wait(false);
+ } else {
+ assert(state == state_t::READY ||
+ state == state_t::ESTABLISHING);
+ logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_states,
+ e_what);
+ execute_connecting();
+ }
+}
+
+void ProtocolV2::reset_session(bool full)
+{
+ server_cookie = 0;
+ connect_seq = 0;
+ if (full) {
+ client_cookie = generate_client_cookie();
+ peer_global_seq = 0;
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::reset_session({})",
+ conn, cc_seq, full);
+ io_states.reset_session(full);
+ gate.dispatch_in_background(
+ "reset_session", conn, [this, cc_seq, full] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, full] {
+ return io_handler.reset_session(cc_seq, full);
+ });
+ });
+ // user can make changes
+}
+
+seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ProtocolV2::banner_exchange(bool is_connect)
+{
+ // 1. prepare and send banner
+ bufferlist banner_payload;
+ encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+ encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
+
+ bufferlist bl;
+ bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
+ auto len_payload = static_cast<uint16_t>(banner_payload.length());
+ encode(len_payload, bl, 0);
+ bl.claim_append(banner_payload);
+ logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
+ "required={}, banner=\"{}\"",
+ conn, bl.length(), len_payload,
+ CRIMSON_MSGR2_SUPPORTED_FEATURES,
+ CEPH_MSGR2_REQUIRED_FEATURES,
+ CEPH_BANNER_V2_PREFIX);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true
+ ).then([this, bl=std::move(bl)]() mutable {
+ return frame_assembler->write_flush(std::move(bl));
+ }
+#else
+ return frame_assembler->write_flush(std::move(bl)
+#endif
+ ).then([this] {
+ // 2. read peer banner
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false
+ ).then([this, banner_len] {
+ return frame_assembler->read_exactly(banner_len);
+ });
+#else
+ return frame_assembler->read_exactly(banner_len);
+#endif
+ }).then([this](auto bptr) {
+ // 3. process peer banner and read banner_payload
+ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ logger().debug("{} RECV({}) banner: \"{}\"",
+ conn, bptr.length(),
+ std::string(bptr.c_str(), banner_prefix_len));
+
+ if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+ if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+ logger().warn("{} peer is using V1 protocol", conn);
+ } else {
+ logger().warn("{} peer sent bad banner", conn);
+ }
+ abort_in_fault();
+ }
+
+ bptr.set_offset(bptr.offset() + banner_prefix_len);
+ bptr.set_length(bptr.length() - banner_prefix_len);
+ assert(bptr.length() == sizeof(ceph_le16));
+
+ uint16_t payload_len;
+ bufferlist buf;
+ buf.append(std::move(bptr));
+ auto ti = buf.cbegin();
+ try {
+ decode(payload_len, ti);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload len failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(
+ custom_bp_t::BANNER_PAYLOAD_READ, false
+ ).then([this, payload_len] {
+ return frame_assembler->read(payload_len);
+ });
+#else
+ return frame_assembler->read(payload_len);
+#endif
+ }).then([this, is_connect] (bufferlist bl) {
+ // 4. process peer banner_payload and send HelloFrame
+ auto p = bl.cbegin();
+ uint64_t _peer_supported_features;
+ uint64_t _peer_required_features;
+ try {
+ decode(_peer_supported_features, p);
+ decode(_peer_required_features, p);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} RECV({}) banner features: supported={} required={}",
+ conn, bl.length(),
+ _peer_supported_features, _peer_required_features);
+
+ // Check feature bit compatibility
+ uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
+ uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+ if ((required_features & _peer_supported_features) != required_features) {
+ logger().error("{} peer does not support all required features"
+ " required={} peer_supported={}",
+ conn, required_features, _peer_supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ if ((supported_features & _peer_required_features) != _peer_required_features) {
+ logger().error("{} we do not support all peer required features"
+ " peer_required={} supported={}",
+ conn, _peer_required_features, supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ peer_supported_features = _peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
+
+ auto hello = HelloFrame::Encode(messenger.get_mytype(),
+ conn.target_addr);
+ logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+ conn, ceph_entity_type_name(messenger.get_mytype()),
+ conn.target_addr);
+ return frame_assembler->write_flush_frame(hello);
+ }).then([this] {
+ //5. read peer HelloFrame
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ // 6. process peer HelloFrame
+ auto hello = HelloFrame::Decode(payload->back());
+ logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+ conn, ceph_entity_type_name(hello.entity_type()),
+ hello.peer_addr());
+ return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+ std::make_tuple(hello.entity_type(), hello.peer_addr()));
+ });
+}
+
+// CONNECTING state
+
+seastar::future<> ProtocolV2::handle_auth_reply()
+{
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::AUTH_BAD_METHOD:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_auth_bad_method() logic
+ auto bad_method = AuthBadMethodFrame::Decode(payload->back());
+ logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
+ "allowed_methods={}, allowed_modes={}",
+ conn, bad_method.method(), cpp_strerror(bad_method.result()),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_bad_method(
+ conn, *auth_meta,
+ bad_method.method(), bad_method.result(),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_bad_method returned {}",
+ conn, r);
+ abort_in_fault();
+ }
+ return client_auth(bad_method.allowed_methods());
+ });
+ case Tag::AUTH_REPLY_MORE:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_auth_reply_more() logic
+ auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ // let execute_connecting() take care of the thrown exception
+ auto reply = messenger.get_auth_client()->handle_auth_reply_more(
+ conn, *auth_meta, auth_more.auth_payload());
+ auto more_reply = AuthRequestMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
+ conn, reply.length());
+ return frame_assembler->write_flush_frame(more_reply);
+ }).then([this] {
+ return handle_auth_reply();
+ });
+ case Tag::AUTH_DONE:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_auth_done() logic
+ auto auth_done = AuthDoneFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, auth_done.global_id(),
+ ceph_con_mode_name(auth_done.con_mode()),
+ auth_done.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_done(
+ conn,
+ *auth_meta,
+ auth_done.global_id(),
+ auth_done.con_mode(),
+ auth_done.auth_payload());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
+ abort_in_fault();
+ }
+ auth_meta->con_mode = auth_done.con_mode();
+ frame_assembler->create_session_stream_handlers(*auth_meta, false);
+ return finish_auth();
+ });
+ default: {
+ unexpected_tag(ret.tag, conn, "handle_auth_reply");
+ return seastar::now();
+ }
+ }
+ });
+}
+
+seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
+{
+ // send_auth_request() logic
+ ceph_assert(messenger.get_auth_client());
+
+ try {
+ auto [auth_method, preferred_modes, bl] =
+ messenger.get_auth_client()->get_auth_request(conn, *auth_meta);
+ auth_meta->auth_method = auth_method;
+ auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
+ logger().debug("{} WRITE AuthRequestFrame: method={},"
+ " preferred_modes={}, payload_len={}",
+ conn, auth_method, preferred_modes, bl.length());
+ return frame_assembler->write_flush_frame(frame
+ ).then([this] {
+ return handle_auth_reply();
+ });
+ } catch (const crimson::auth::error& e) {
+ logger().error("{} get_initial_auth_request returned {}", conn, e.what());
+ ABORT_IN_CLOSE(true);
+ return seastar::now();
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::process_wait()
+{
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_wait() logic
+ logger().debug("{} GOT WaitFrame", conn);
+ WaitFrame::Decode(payload->back());
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_connect()
+{
+ // send_client_ident() logic
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags |= CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto client_ident = ClientIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ conn.target_addr,
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required, flags,
+ client_cookie);
+
+ logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), conn.target_addr,
+ messenger.get_myname().num(), global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, client_cookie);
+ return frame_assembler->write_flush_frame(client_ident
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::IDENT_MISSING_FEATURES:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_ident_missing_features() logic
+ auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
+ logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
+ " (client does not support all server features)",
+ conn, ident_missing.features());
+ abort_in_fault();
+ return next_step_t::none;
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SERVER_IDENT:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving SERVER_IDENT",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ // handle_server_ident() logic
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent()",
+ conn, cc_seq);
+ io_states.requeue_out_sent();
+ gate.dispatch_in_background(
+ "requeue_out_sent", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.requeue_out_sent(cc_seq);
+ });
+ });
+
+ auto server_ident = ServerIdentFrame::Decode(payload->back());
+ logger().debug("{} GOT ServerIdentFrame:"
+ " addrs={}, gid={}, gs={},"
+ " features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn,
+ server_ident.addrs(), server_ident.gid(),
+ server_ident.global_seq(),
+ server_ident.supported_features(),
+ server_ident.required_features(),
+ server_ident.flags(), server_ident.cookie());
+
+ // is this who we intended to talk to?
+ // be a bit forgiving here, since we may be connecting based on addresses parsed out
+ // of mon_host or something.
+ if (!server_ident.addrs().contains(conn.target_addr)) {
+ logger().warn("{} peer identifies as {}, does not include {}",
+ conn, server_ident.addrs(), conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ server_cookie = server_ident.cookie();
+
+ // TODO: change peer_addr to entity_addrvec_t
+ if (server_ident.addrs().front() != conn.peer_addr) {
+ logger().warn("{} peer advertises as {}, does not match {}",
+ conn, server_ident.addrs(), conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != server_ident.gid()) {
+ logger().error("{} connection peer id ({}) does not match "
+ "what it should be ({}) during connecting, close",
+ conn, server_ident.gid(), conn.get_peer_id());
+ ABORT_IN_CLOSE(true);
+ }
+ conn.set_peer_id(server_ident.gid());
+ conn.set_features(server_ident.supported_features() &
+ conn.policy.features_supported);
+ logger().debug("{} UPDATE: features={}", conn, conn.get_features());
+ peer_global_seq = server_ident.global_seq();
+
+ bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
+ conn.policy.lossy = lossy;
+ }
+ if (lossy && (connect_seq != 0 || server_cookie != 0)) {
+ logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
+ conn, connect_seq, server_cookie);
+ connect_seq = 0;
+ server_cookie = 0;
+ }
+
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(ret.tag, conn, "post_client_connect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_reconnect()
+{
+ // send_reconnect() logic
+ auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
+ client_cookie,
+ server_cookie,
+ global_seq,
+ connect_seq,
+ io_states.in_seq);
+ logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, in_seq={}",
+ conn, messenger.get_myaddrs(),
+ client_cookie, server_cookie,
+ global_seq, connect_seq, io_states.in_seq);
+ return frame_assembler->write_flush_frame(reconnect).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::SESSION_RETRY_GLOBAL:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_session_retry_global() logic
+ auto retry = RetryGlobalFrame::Decode(payload->back());
+ logger().warn("{} GOT RetryGlobalFrame: gs={}",
+ conn, retry.global_seq());
+ global_seq = messenger.get_global_seq(retry.global_seq());
+ logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RETRY:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_session_retry() logic
+ auto retry = RetryFrame::Decode(payload->back());
+ logger().warn("{} GOT RetryFrame: cs={}",
+ conn, retry.connect_seq());
+ connect_seq = retry.connect_seq() + 1;
+ logger().warn("{} UPDATE: cs={}", conn, connect_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RESET:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before reset_session()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ // handle_session_reset() logic
+ auto reset = ResetFrame::Decode(payload->back());
+ logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+
+ reset_session(reset.full());
+ // user can make changes
+
+ return client_connect();
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SESSION_RECONNECT_OK:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving RECONNECT_OK",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ // handle_reconnect_ok() logic
+ auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
+ "send {} IOHandler::requeue_out_sent_up_to()",
+ conn, reconnect_ok.msg_seq(), cc_seq);
+
+ io_states.requeue_out_sent_up_to();
+ auto msg_seq = reconnect_ok.msg_seq();
+ gate.dispatch_in_background(
+ "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq);
+ });
+ });
+
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(ret.tag, conn, "post_client_reconnect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+void ProtocolV2::execute_connecting()
+{
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::CONNECTING, io_state_t::delay);
+ gated_execute("execute_connecting", conn, [this] {
+ global_seq = messenger.get_global_seq();
+ assert(client_cookie != 0);
+ if (!conn.policy.lossy && server_cookie != 0) {
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else { // conn.policy.lossy || server_cookie == 0
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ }
+ return wait_exit_io().then([this] {
+#ifdef UNIT_TESTS_BUILT
+ // process custom_bp_t::SOCKET_CONNECTING
+ // supports CONTINUE/FAULT/BLOCK
+ if (!conn.interceptor) {
+ return seastar::now();
+ }
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}}
+ ).then([this](bp_action_t action) {
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ return seastar::now();
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ abort_in_fault();
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return conn.interceptor->blocker.block();
+ default:
+ ceph_abort("unexpected action from trap");
+ return seastar::now();
+ }
+ });;
+ }).then([this] {
+#endif
+ ceph_assert_always(frame_assembler);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before Socket::connect()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ return Socket::connect(conn.peer_addr);
+ }).then([this](SocketRef _new_socket) {
+ logger().debug("{} socket connected", conn);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during Socket::connect()",
+ conn, get_state_name(state));
+ return _new_socket->close().then([sock=std::move(_new_socket)] {
+ abort_protocol();
+ });
+ }
+ SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
+ if (!has_socket) {
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ } else {
+ gate.dispatch_in_background(
+ "replace_socket_connecting",
+ conn,
+ [this, new_socket=std::move(new_socket)]() mutable {
+ return frame_assembler->replace_shutdown_socket(std::move(new_socket));
+ }
+ );
+ }
+ is_socket_valid = true;
+ return seastar::now();
+ }).then([this] {
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(true);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ if (conn.get_peer_type() != _peer_type) {
+ logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+ conn, ceph_entity_type_name(conn.get_peer_type()),
+ ceph_entity_type_name(_peer_type));
+ ABORT_IN_CLOSE(true);
+ }
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ frame_assembler->learn_socket_ephemeral_port_as_connector(
+ _my_addr_from_peer.get_port());
+ if (unlikely(_my_addr_from_peer.is_legacy())) {
+ logger().warn("{} peer sent a legacy address for me: {}",
+ conn, _my_addr_from_peer);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return client_auth();
+ }).then([this] {
+ if (server_cookie == 0) {
+ ceph_assert(connect_seq == 0);
+ return client_connect();
+ } else {
+ ceph_assert(connect_seq > 0);
+ return client_reconnect();
+ }
+ }).then([this] (next_step_t next) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at the end of execute_connecting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ switch (next) {
+ case next_step_t::ready: {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before dispatch_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_connect()
+ crosscore.prepare_submit();
+ logger().info("{} connected: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "send {} IOHandler::dispatch_connect()",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id(), cc_seq);
+
+ // set io_handler to a new shard
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_connect(
+ cc_seq, new_io_shard, std::move(conn_fref));
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} after dispatch_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_ready();
+ });
+ }
+ case next_step_t::wait: {
+ logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+ ceph_assert_always(is_socket_valid);
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ execute_wait(true);
+ return seastar::now();
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::CONNECTING, "execute_connecting", eptr);
+ });
+ });
+}
+
+// ACCEPTING state
+
+seastar::future<> ProtocolV2::_auth_bad_method(int r)
+{
+ // _auth_bad_method() logic
+ ceph_assert(r < 0);
+ auto [allowed_methods, allowed_modes] =
+ messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
+ auto bad_method = AuthBadMethodFrame::Encode(
+ auth_meta->auth_method, r, allowed_methods, allowed_modes);
+ logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
+ "allowed_methods={}, allowed_modes={})",
+ conn, auth_meta->auth_method, cpp_strerror(r),
+ allowed_methods, allowed_modes);
+ return frame_assembler->write_flush_frame(bad_method
+ ).then([this] {
+ return server_auth();
+ });
+}
+
+seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+{
+ // _handle_auth_request() logic
+ ceph_assert(messenger.get_auth_server());
+ bufferlist reply;
+ int r = messenger.get_auth_server()->handle_auth_request(
+ conn,
+ *auth_meta,
+ more,
+ auth_meta->auth_method,
+ auth_payload,
+ &conn.peer_global_id,
+ &reply);
+ switch (r) {
+ // successful
+ case 1: {
+ auto auth_done = AuthDoneFrame::Encode(
+ conn.peer_global_id, auth_meta->con_mode, reply);
+ logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, conn.peer_global_id,
+ ceph_con_mode_name(auth_meta->con_mode), reply.length());
+ return frame_assembler->write_flush_frame(auth_done
+ ).then([this] {
+ ceph_assert(auth_meta);
+ frame_assembler->create_session_stream_handlers(*auth_meta, true);
+ return finish_auth();
+ });
+ }
+ // auth more
+ case 0: {
+ auto more = AuthReplyMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
+ conn, reply.length());
+ return frame_assembler->write_flush_frame(more
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ return _handle_auth_request(auth_more.auth_payload(), true);
+ });
+ }
+ case -EBUSY: {
+ logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
+ abort_in_fault();
+ return seastar::now();
+ }
+ default: {
+ logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
+ return _auth_bad_method(r);
+ }
+ }
+}
+
+seastar::future<> ProtocolV2::server_auth()
+{
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ // handle_auth_request() logic
+ auto request = AuthRequestFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
+ " payload_len={}",
+ conn, request.method(), request.preferred_modes(),
+ request.auth_payload().length());
+ auth_meta->auth_method = request.method();
+ auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
+ conn.get_peer_type(), auth_meta->auth_method,
+ request.preferred_modes());
+ if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
+ logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
+ return _auth_bad_method(-EOPNOTSUPP);
+ }
+ return _handle_auth_request(request.auth_payload(), false);
+ });
+}
+
+bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
+{
+ auto my_peer_name = conn.get_peer_name();
+ if (my_peer_name.type() != peer_name.type()) {
+ return false;
+ }
+ if (my_peer_name.num() != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ my_peer_name.num() != peer_name.num()) {
+ return false;
+ }
+ return true;
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_wait()
+{
+ auto wait = WaitFrame::Encode();
+ logger().debug("{} WRITE WaitFrame", conn);
+ return frame_assembler->write_flush_frame(wait
+ ).then([] {
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::reuse_connection(
+ ProtocolV2* existing_proto, bool do_reset,
+ bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
+{
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before trigger_replacing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ existing_proto->trigger_replacing(reconnect,
+ do_reset,
+ frame_assembler->to_replace(),
+ std::move(auth_meta),
+ peer_global_seq,
+ client_cookie,
+ conn.get_peer_name(),
+ conn.get_features(),
+ peer_supported_features,
+ conn_seq,
+ msg_seq);
+ ceph_assert_always(has_socket && is_socket_valid);
+ is_socket_valid = false;
+ has_socket = false;
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_replaced(
+ conn.get_local_shared_foreign_from_this());
+ }
+#endif
+ // close this connection because all the necessary information is delivered
+ // to the exisiting connection, and jump to error handling code to abort the
+ // current state.
+ ABORT_IN_CLOSE(false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
+{
+ // handle_existing_connection() logic
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_connect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_connect: racing replace happened while"
+ " replacing existing connection {}, send wait.",
+ conn, *existing_conn);
+ return send_wait();
+ }
+
+ if (existing_proto->peer_global_seq > peer_global_seq) {
+ logger().warn("{} server_connect:"
+ " this is a stale connection, because peer_global_seq({})"
+ " < existing->peer_global_seq({}), close this connection"
+ " in favor of existing connection {}",
+ conn, peer_global_seq,
+ existing_proto->peer_global_seq, *existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_conn->policy.lossy) {
+ // existing connection can be thrown out in favor of this one
+ logger().warn("{} server_connect:"
+ " existing connection {} is a lossy channel. Close existing in favor of"
+ " this connection", conn, *existing_conn);
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing(existing_conn);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+
+ if (existing_proto->server_cookie != 0) {
+ if (existing_proto->client_cookie != client_cookie) {
+ // Found previous session
+ // peer has reset and we're going to reuse the existing connection
+ // by replacing the socket
+ logger().warn("{} server_connect:"
+ " found new session (cs={})"
+ " when existing {} {} is with stale session (cs={}, ss={}),"
+ " peer must have reset",
+ conn,
+ client_cookie,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto, conn.policy.resetcheck);
+ } else {
+ // session establishment interrupted between client_ident and server_ident,
+ // continuing...
+ logger().warn("{} server_connect: found client session with existing {} {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ } else {
+ // Looks like a connection race: server and client are both connecting to
+ // each other at the same time.
+ if (existing_proto->client_cookie != client_cookie) {
+ if (existing_conn->peer_wins()) {
+ // acceptor (this connection, the peer) wins
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and win, reusing existing {} {}",
+ conn,
+ client_cookie,
+ existing_proto->client_cookie,
+ get_state_name(existing_proto->state),
+ *existing_conn);
+ return reuse_connection(existing_proto);
+ } else {
+ // acceptor (this connection, the peer) loses
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and lose to existing {}, ask client to wait",
+ conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+ return existing_conn->send_keepalive().then([this] {
+ return send_wait();
+ });
+ }
+ } else {
+ logger().warn("{} server_connect: found client session with existing {} {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_connect()
+{
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_client_ident() logic
+ auto client_ident = ClientIdentFrame::Decode(payload->back());
+ logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
+ " gid={}, gs={}, features_supported={},"
+ " features_required={}, flags={}, cookie={}",
+ conn, client_ident.addrs(), client_ident.target_addr(),
+ client_ident.gid(), client_ident.global_seq(),
+ client_ident.supported_features(),
+ client_ident.required_features(),
+ client_ident.flags(), client_ident.cookie());
+
+ if (client_ident.addrs().empty() ||
+ client_ident.addrs().front() == entity_addr_t()) {
+ logger().warn("{} oops, client_ident.addrs() is empty", conn);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
+ logger().warn("{} peer is trying to reach {} which is not us ({})",
+ conn, client_ident.target_addr(), messenger.get_myaddrs());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ conn.peer_addr = client_ident.addrs().front();
+ logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
+ conn.target_addr = conn.peer_addr;
+ if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
+ logger().warn("{} we don't know how to reconnect to peer {}",
+ conn, conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != client_ident.gid()) {
+ logger().error("{} client_ident peer_id ({}) does not match"
+ " what it should be ({}) during accepting, abort",
+ conn, client_ident.gid(), conn.get_peer_id());
+ abort_in_fault();
+ }
+ conn.set_peer_id(client_ident.gid());
+ client_cookie = client_ident.cookie();
+
+ uint64_t feat_missing =
+ (conn.policy.features_required | msgr2_required) &
+ ~(uint64_t)client_ident.supported_features();
+ if (feat_missing) {
+ auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+ logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
+ conn, feat_missing);
+ return frame_assembler->write_flush_frame(ident_missing_features
+ ).then([] {
+ return next_step_t::wait;
+ });
+ }
+ conn.set_features(client_ident.supported_features() &
+ conn.policy.features_supported);
+ logger().debug("{} UPDATE: features={}", conn, conn.get_features());
+
+ peer_global_seq = client_ident.global_seq();
+
+ bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
+ conn, conn.policy.lossy, lossy);
+ }
+
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (existing_conn) {
+ return handle_existing_connection(existing_conn);
+ } else {
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing(nullptr);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::read_reconnect()
+{
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect");
+ return server_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry(uint64_t connect_seq)
+{
+ auto retry = RetryFrame::Encode(connect_seq);
+ logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry_global(uint64_t global_seq)
+{
+ auto retry = RetryGlobalFrame::Encode(global_seq);
+ logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_reset(bool full)
+{
+ auto reset = ResetFrame::Encode(full);
+ logger().warn("{} WRITE ResetFrame: full={}", conn, full);
+ return frame_assembler->write_flush_frame(reset
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
+ return server_connect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_reconnect()
+{
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_reconnect() logic
+ auto reconnect = ReconnectFrame::Decode(payload->back());
+
+ logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ conn, reconnect.addrs(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ reconnect.global_seq(), reconnect.connect_seq(),
+ reconnect.msg_seq());
+
+ // can peer_addrs be changed on-the-fly?
+ // TODO: change peer_addr to entity_addrvec_t
+ entity_addr_t paddr = reconnect.addrs().front();
+ if (paddr.is_msgr2() || paddr.is_any()) {
+ // good
+ } else {
+ logger().warn("{} peer's address {} is not v2", conn, paddr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.peer_addr == entity_addr_t()) {
+ conn.peer_addr = paddr;
+ } else if (conn.peer_addr != paddr) {
+ logger().error("{} peer identifies as {}, while conn.peer_addr={},"
+ " reconnect failed",
+ conn, paddr, conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ peer_global_seq = reconnect.global_seq();
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (!existing_conn) {
+ // there is no existing connection therefore cannot reconnect to previous
+ // session
+ logger().warn("{} server_reconnect: no existing connection from address {},"
+ " reseting client", conn, conn.peer_addr);
+ return send_reset(true);
+ }
+
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, reconnect.connect_seq(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ fmt::ptr(existing_conn.get()),
+ get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_reconnect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_reconnect: racing replace happened while "
+ " replacing existing connection {}, retry global.",
+ conn, *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->client_cookie != reconnect.client_cookie()) {
+ logger().warn("{} server_reconnect:"
+ " client_cookie mismatch with existing connection {},"
+ " cc={} rcc={}. I must have reset, reseting client.",
+ conn, *existing_conn,
+ existing_proto->client_cookie, reconnect.client_cookie());
+ return send_reset(conn.policy.resetcheck);
+ } else if (existing_proto->server_cookie == 0) {
+ // this happens when:
+ // - a connects to b
+ // - a sends client_ident
+ // - b gets client_ident, sends server_ident and sets cookie X
+ // - connection fault
+ // - b reconnects to a with cookie X, connect_seq=1
+ // - a has cookie==0
+ logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
+ " server_ident with existing connection {}."
+ " Asking peer to resume session establishment",
+ conn, existing_proto->client_cookie, *existing_conn);
+ return send_reset(false);
+ }
+
+ if (existing_proto->peer_global_seq > reconnect.global_seq()) {
+ logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
+ " with existing connection {},"
+ " ask client to retry global",
+ conn, existing_proto->peer_global_seq,
+ reconnect.global_seq(), *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->connect_seq > reconnect.connect_seq()) {
+ logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
+ " with existing connection {}, ask client to retry",
+ conn, reconnect.connect_seq(),
+ existing_proto->connect_seq, *existing_conn);
+ return send_retry(existing_proto->connect_seq);
+ } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
+ // reconnect race: both peers are sending reconnect messages
+ if (existing_conn->peer_wins()) {
+ // acceptor (this connection, the peer) wins
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and win, reusing existing {} {}",
+ conn,
+ reconnect.connect_seq(),
+ get_state_name(existing_proto->state),
+ *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ } else {
+ // acceptor (this connection, the peer) loses
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and lose to existing {}, ask client to wait",
+ conn, reconnect.connect_seq(), *existing_conn);
+ return send_wait();
+ }
+ } else { // existing_proto->connect_seq < reconnect.connect_seq()
+ logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
+ " reusing existing {} {}",
+ conn,
+ existing_proto->connect_seq,
+ reconnect.connect_seq(),
+ get_state_name(existing_proto->state),
+ *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ }
+ });
+}
+
+void ProtocolV2::execute_accepting()
+{
+ assert(is_socket_valid);
+ trigger_state(state_t::ACCEPTING, io_state_t::none);
+ gate.dispatch_in_background("execute_accepting", conn, [this] {
+ return seastar::futurize_invoke([this] {
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ // only notify socket accepted
+ gate.dispatch_in_background(
+ "test_intercept_socket_accepted", conn, [this] {
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}}
+ ).then([](bp_action_t action) {
+ ceph_assert(action == bp_action_t::CONTINUE);
+ });
+ });
+ }
+#endif
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(false);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ ceph_assert(conn.get_peer_type() == 0);
+ conn.set_peer_type(_peer_type);
+
+ conn.policy = messenger.get_policy(_peer_type);
+ logger().info("{} UPDATE: peer_type={},"
+ " policy(lossy={} server={} standby={} resetcheck={})",
+ conn, ceph_entity_type_name(_peer_type),
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ if (!messenger.get_myaddr().is_blank_ip() &&
+ (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
+ logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+ conn, _my_addr_from_peer, messenger.get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return server_auth();
+ }).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::CLIENT_IDENT:
+ return server_connect();
+ case Tag::SESSION_RECONNECT:
+ return server_reconnect();
+ default: {
+ unexpected_tag(ret.tag, conn, "post_server_auth");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready:
+ assert(state != state_t::ACCEPTING);
+ break;
+ case next_step_t::wait:
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} at the end of execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+ execute_server_wait();
+ break;
+ default:
+ ceph_abort("impossible next step");
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), e_what);
+ do_close(false);
+ });
+ });
+}
+
+// CONNECTING or ACCEPTING state
+
+seastar::future<> ProtocolV2::finish_auth()
+{
+ ceph_assert(auth_meta);
+
+ auto records = frame_assembler->stop_recording();
+ const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
+ auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
+ auto sig_frame = AuthSignatureFrame::Encode(sig);
+ logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
+ return frame_assembler->write_flush_frame(sig_frame
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
+ return frame_assembler->read_frame_payload();
+ }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
+ // handle_auth_signature() logic
+ auto sig_frame = AuthSignatureFrame::Decode(payload->back());
+ logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
+
+ const auto actual_tx_sig = auth_meta->session_key.empty() ?
+ sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
+ if (sig_frame.signature() != actual_tx_sig) {
+ logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
+ " sig_frame.signature()={}",
+ conn, actual_tx_sig, sig_frame.signature());
+ abort_in_fault();
+ }
+ });
+}
+
+// ESTABLISHING
+
+void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
+ auto accept_me = [this] {
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ };
+
+ ceph_assert_always(is_socket_valid);
+ trigger_state(state_t::ESTABLISHING, io_state_t::delay);
+ bool is_replace;
+ if (existing_conn) {
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "close existing {}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ io_states, frame_assembler->get_socket_shard_id(),
+ *existing_conn);
+ is_replace = true;
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ existing_proto->do_close(
+ true, // is_dispatch_reset
+ std::move(accept_me));
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().warn("{} triggered {} during execute_establishing(), "
+ "the accept event will not be delivered!",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ } else {
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "no existing",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id());
+ is_replace = false;
+ accept_me();
+ }
+
+ gated_execute("execute_establishing", conn, [this, is_replace] {
+ ceph_assert_always(state == state_t::ESTABLISHING);
+
+ // set io_handler to a new shard
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard, is_replace,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ cc_seq, new_io_shard, std::move(conn_fref), is_replace);
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ return send_server_ident();
+ }).then([this] {
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} at the end of execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} established, going to ready", conn);
+ execute_ready();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::ESTABLISHING, "execute_establishing", eptr);
+ });
+ });
+}
+
+// ESTABLISHING or REPLACING state
+
+seastar::future<>
+ProtocolV2::send_server_ident()
+{
+ ceph_assert_always(state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING);
+ // send_server_ident() logic
+
+ // refered to async-conn v2: not assign gs to global_seq
+ global_seq = messenger.get_global_seq();
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} UPDATE: gs={} for server ident, "
+ "send {} IOHandler::reset_peer_state()",
+ conn, global_seq, cc_seq);
+
+ // this is required for the case when this connection is being replaced
+ io_states.reset_peer_state();
+ gate.dispatch_in_background(
+ "reset_peer_state", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.reset_peer_state(cc_seq);
+ });
+ });
+
+ if (!conn.policy.lossy) {
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
+
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags = flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto server_ident = ServerIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags,
+ server_cookie);
+
+ logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+ global_seq, conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, server_cookie);
+
+ return frame_assembler->write_flush_frame(server_ident);
+}
+
+// REPLACING state
+
+void ProtocolV2::trigger_replacing(bool reconnect,
+ bool do_reset,
+ FrameAssemblerV2::mover_t &&mover,
+ AuthConnectionMetaRef&& new_auth_meta,
+ uint64_t new_peer_global_seq,
+ uint64_t new_client_cookie,
+ entity_name_t new_peer_name,
+ uint64_t new_conn_features,
+ uint64_t new_peer_supported_features,
+ uint64_t new_connect_seq,
+ uint64_t new_msg_seq)
+{
+ ceph_assert_always(state >= state_t::ESTABLISHING);
+ ceph_assert_always(state <= state_t::WAIT);
+ ceph_assert_always(has_socket || state == state_t::CONNECTING);
+ // mover.socket shouldn't be shutdown
+
+ logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
+ "client_cookie was {}, {}, new_sid={}",
+ conn, reconnect ? "reconnected" : "connected",
+ peer_global_seq, connect_seq, client_cookie,
+ io_states, mover.socket->get_shard_id());
+ if (is_socket_valid) {
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ }
+ trigger_state_phase1(state_t::REPLACING);
+ gate.dispatch_in_background(
+ "trigger_replacing",
+ conn,
+ [this,
+ reconnect,
+ do_reset,
+ mover = std::move(mover),
+ new_auth_meta = std::move(new_auth_meta),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_supported_features,
+ new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ ceph_assert_always(state == state_t::REPLACING);
+ auto new_io_shard = mover.socket->get_shard_id();
+ // state may become CLOSING below, but we cannot abort the chain until
+ // mover.socket is correctly handled (closed or replaced).
+
+ // this is preemptive
+ return wait_switch_io_shard(
+ ).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
+ return wait_exit_io();
+ }).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ ceph_assert_always(frame_assembler);
+ protocol_timer.cancel();
+ auto done = std::move(execution_done);
+ execution_done = seastar::now();
+ return done;
+ }).then([this, new_io_shard] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ // set io_handler to a new shard
+ // we should prevent parallel switching core attemps
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ cc_seq, new_io_shard, std::move(conn_fref), false);
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+ });
+ }).then([this,
+ reconnect,
+ do_reset,
+ mover = std::move(mover),
+ new_auth_meta = std::move(new_auth_meta),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_supported_features,
+ new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ if (state == state_t::REPLACING && do_reset) {
+ reset_session(true);
+ // user can make changes
+ }
+
+ if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
+ conn, get_state_name(state));
+ ceph_assert_always(state == state_t::CLOSING);
+ return mover.socket->close(
+ ).then([sock = std::move(mover.socket)] {
+ abort_protocol();
+ });
+ }
+
+ auth_meta = std::move(new_auth_meta);
+ peer_global_seq = new_peer_global_seq;
+ gate.dispatch_in_background(
+ "replace_frame_assembler",
+ conn,
+ [this, mover=std::move(mover)]() mutable {
+ return frame_assembler->replace_by(std::move(mover));
+ }
+ );
+ is_socket_valid = true;
+ has_socket = true;
+
+ if (reconnect) {
+ connect_seq = new_connect_seq;
+ // send_reconnect_ok() logic
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
+ conn, cc_seq, new_msg_seq);
+ io_states.requeue_out_sent_up_to();
+ gate.dispatch_in_background(
+ "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq);
+ });
+ });
+
+ auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
+ return frame_assembler->write_flush_frame(reconnect_ok);
+ } else {
+ client_cookie = new_client_cookie;
+ assert(conn.get_peer_type() == new_peer_name.type());
+ if (conn.get_peer_id() == entity_name_t::NEW) {
+ conn.set_peer_id(new_peer_name.num());
+ }
+ conn.set_features(new_conn_features);
+ peer_supported_features = new_peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
+ return send_server_ident();
+ }
+ }).then([this, reconnect] {
+ if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
+ conn, get_state_name(state));
+ ceph_assert_always(state == state_t::CLOSING);
+ abort_protocol();
+ }
+ logger().info("{} replaced ({}), going to ready: "
+ "gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
+ conn, reconnect ? "reconnected" : "connected",
+ global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states);
+ execute_ready();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::REPLACING, "trigger_replacing", eptr);
+ });
+ });
+}
+
+// READY state
+
+seastar::future<> ProtocolV2::notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr eptr,
+ io_handler_state _io_states)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out_fault(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, where, eptr, _io_states] {
+ return notify_out_fault(cc_seq, where, eptr, _io_states);
+ });
+ }
+
+ io_states = _io_states;
+ logger().debug("{} got {} notify_out_fault(): io_states={}",
+ conn, cc_seq, io_states);
+ fault(state_t::READY, where, eptr);
+ return seastar::now();
+}
+
+void ProtocolV2::execute_ready()
+{
+ assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
+ protocol_timer.cancel();
+ ceph_assert_always(is_socket_valid);
+ // I'm not responsible to shutdown the socket at READY
+ is_socket_valid = false;
+ trigger_state(state_t::READY, io_state_t::open);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ // FIXME: doesn't support cross-core
+ conn.interceptor->register_conn_ready(
+ conn.get_local_shared_foreign_from_this());
+ }
+#endif
+}
+
+// STANDBY state
+
+void ProtocolV2::execute_standby()
+{
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::STANDBY, io_state_t::delay);
+}
+
+seastar::future<> ProtocolV2::notify_out(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_out(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_out(): at {}",
+ conn, cc_seq, get_state_name(state));
+ io_states.is_out_queued = true;
+ if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
+ logger().info("{} notify_out(): at {}, going to CONNECTING",
+ conn, get_state_name(state));
+ execute_connecting();
+ }
+ return seastar::now();
+}
+
+// WAIT state
+
+void ProtocolV2::execute_wait(bool max_backoff)
+{
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::WAIT, io_state_t::delay);
+ gated_execute("execute_wait", conn, [this, max_backoff] {
+ double backoff = protocol_timer.last_dur();
+ if (max_backoff) {
+ backoff = local_conf().get_val<double>("ms_max_backoff");
+ } else if (backoff > 0) {
+ backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
+ } else {
+ backoff = local_conf().get_val<double>("ms_initial_backoff");
+ }
+ return protocol_timer.backoff(backoff).then([this] {
+ if (unlikely(state != state_t::WAIT)) {
+ logger().debug("{} triggered {} at the end of execute_wait()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_wait(): going to CONNECTING", conn);
+ execute_connecting();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_wait(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), e_what);
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ });
+ });
+}
+
+// SERVER_WAIT state
+
+void ProtocolV2::execute_server_wait()
+{
+ ceph_assert_always(is_socket_valid);
+ trigger_state(state_t::SERVER_WAIT, io_state_t::none);
+ gated_execute("execute_server_wait", conn, [this] {
+ return frame_assembler->read_exactly(1
+ ).then([this](auto bptr) {
+ logger().warn("{} SERVER_WAIT got read, abort", conn);
+ abort_in_fault();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), e_what);
+ do_close(false);
+ });
+ });
+}
+
+// CLOSING state
+
+seastar::future<> ProtocolV2::notify_mark_down(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_mark_down(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_mark_down(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_mark_down()",
+ conn, cc_seq);
+ do_close(false);
+ return seastar::now();
+}
+
+seastar::future<> ProtocolV2::close_clean_yielded()
+{
+ // yield() so that do_close() can be called *after* close_clean_yielded() is
+ // applied to all connections in a container using
+ // seastar::parallel_for_each(). otherwise, we could erase a connection in
+ // the container when seastar::parallel_for_each() is still iterating in it.
+ // that'd lead to a segfault.
+ return seastar::yield(
+ ).then([this] {
+ do_close(false);
+ return pr_closed_clean.get_shared_future();
+
+ // connection may be unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).finally([conn_ref = conn.shared_from_this()] {});;
+}
+
+void ProtocolV2::do_close(
+ bool is_dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
+{
+ if (state == state_t::CLOSING) {
+ // already closing
+ return;
+ }
+
+ bool is_replace = f_accept_new ? true : false;
+ logger().info("{} closing: reset {}, replace {}", conn,
+ is_dispatch_reset ? "yes" : "no",
+ is_replace ? "yes" : "no");
+
+ /*
+ * atomic operations
+ */
+
+ ceph_assert_always(!gate.is_closed());
+
+ // messenger registrations, must before user events
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
+ messenger.unregister_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else {
+ // cannot happen
+ ceph_assert(false);
+ }
+ if (f_accept_new) {
+ // the replacing connection must be registerred after the replaced
+ // connection is unreigsterred.
+ (*f_accept_new)();
+ }
+
+ protocol_timer.cancel();
+ if (is_socket_valid) {
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ }
+
+ trigger_state_phase1(state_t::CLOSING);
+ gate.dispatch_in_background(
+ "close_io", conn, [this, is_dispatch_reset, is_replace] {
+ // this is preemptive
+ return wait_switch_io_shard(
+ ).then([this, is_dispatch_reset, is_replace] {
+ trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
+ conn, cc_seq, is_dispatch_reset, is_replace);
+
+ std::ignore = gate.close(
+ ).then([this] {
+ ceph_assert_always(!need_exit_io);
+ ceph_assert_always(!pr_exit_io.has_value());
+ if (has_socket) {
+ ceph_assert_always(frame_assembler);
+ return frame_assembler->close_shutdown_socket();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ pr_closed_clean.set_value();
+#ifdef UNIT_TESTS_BUILT
+ closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(
+ conn.get_local_shared_foreign_from_this());
+ }
+#endif
+ // connection is unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing got unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
+ });
+
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, is_dispatch_reset, is_replace] {
+ return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace);
+ });
+ // user can make changes
+ });
+ });
+}
+
+} // namespace crimson::net