summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/ProtocolV2.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net/ProtocolV2.cc')
-rw-r--r--src/crimson/net/ProtocolV2.cc2139
1 files changed, 2139 insertions, 0 deletions
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc
new file mode 100644
index 000000000..b7137b8b8
--- /dev/null
+++ b/src/crimson/net/ProtocolV2.cc
@@ -0,0 +1,2139 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ProtocolV2.h"
+
+#include <seastar/core/lowres_clock.hh>
+#include <fmt/format.h>
+#include "include/msgr.h"
+#include "include/random.h"
+
+#include "crimson/auth/AuthClient.h"
+#include "crimson/auth/AuthServer.h"
+#include "crimson/common/formatter.h"
+
+#include "chained_dispatchers.h"
+#include "Errors.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+#include "SocketMessenger.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
+namespace {
+
+// TODO: apply the same logging policy to Protocol V1
+// Log levels in V2 Protocol:
+// * error level, something error that cause connection to terminate:
+// - fatal errors;
+// - bugs;
+// * warn level: something unusual that identifies connection fault or replacement:
+// - unstable network;
+// - incompatible peer;
+// - auth failure;
+// - connection race;
+// - connection reset;
+// * info level, something very important to show connection lifecycle,
+// which doesn't happen very frequently;
+// * debug level, important logs for debugging, including:
+// - all the messages sent/received (-->/<==);
+// - all the frames exchanged (WRITE/GOT);
+// - important fields updated (UPDATE);
+// - connection state transitions (TRIGGER);
+// * trace level, trivial logs showing:
+// - the exact bytes being sent/received (SEND/RECV(bytes));
+// - detailed information of sub-frames;
+// - integrity checks;
+// - etc.
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+[[noreturn]] void abort_in_fault() {
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+ throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
+ proto.close(dispatch_reset);
+ abort_protocol();
+}
+
+inline void expect_tag(const Tag& expected,
+ const Tag& actual,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ if (actual != expected) {
+ logger().warn("{} {} received wrong tag: {}, expected {}",
+ conn, where,
+ static_cast<uint32_t>(actual),
+ static_cast<uint32_t>(expected));
+ abort_in_fault();
+ }
+}
+
+inline void unexpected_tag(const Tag& unexpected,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ logger().warn("{} {} received unexpected tag: {}",
+ conn, where, static_cast<uint32_t>(unexpected));
+ abort_in_fault();
+}
+
+inline uint64_t generate_client_cookie() {
+ return ceph::util::generate_random_number<uint64_t>(
+ 1, std::numeric_limits<uint64_t>::max());
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+#ifdef UNIT_TESTS_BUILT
+void intercept(Breakpoint bp, bp_type_t type,
+ SocketConnection& conn, SocketRef& socket) {
+ if (conn.interceptor) {
+ auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
+ socket->set_trap(type, action, &conn.interceptor->blocker);
+ }
+}
+
+#define INTERCEPT_CUSTOM(bp, type) \
+intercept({bp}, type, conn, socket)
+
+#define INTERCEPT_FRAME(tag, type) \
+intercept({static_cast<Tag>(tag), type}, \
+ type, conn, socket)
+
+#define INTERCEPT_N_RW(bp) \
+if (conn.interceptor) { \
+ auto action = conn.interceptor->intercept(conn, {bp}); \
+ ceph_assert(action != bp_action_t::BLOCK); \
+ if (action == bp_action_t::FAULT) { \
+ abort_in_fault(); \
+ } \
+}
+
+#else
+#define INTERCEPT_CUSTOM(bp, type)
+#define INTERCEPT_FRAME(tag, type)
+#define INTERCEPT_N_RW(bp)
+#endif
+
+seastar::future<> ProtocolV2::Timer::backoff(double seconds)
+{
+ logger().warn("{} waiting {} seconds ...", conn, seconds);
+ cancel();
+ last_dur_ = seconds;
+ as = seastar::abort_source();
+ auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
+ std::chrono::duration<double>(seconds));
+ return seastar::sleep_abortable(dur, *as
+ ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
+ logger().debug("{} wait aborted", conn);
+ abort_protocol();
+ });
+}
+
+ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
+ SocketConnection& conn,
+ SocketMessenger& messenger)
+ : Protocol(proto_t::v2, dispatchers, conn),
+ messenger{messenger},
+ protocol_timer{conn}
+{}
+
+ProtocolV2::~ProtocolV2() {}
+
+bool ProtocolV2::is_connected() const {
+ return state == state_t::READY ||
+ state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING;
+}
+
+void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
+ const entity_name_t& _peer_name)
+{
+ ceph_assert(state == state_t::NONE);
+ ceph_assert(!socket);
+ ceph_assert(!gate.is_closed());
+ conn.peer_addr = _peer_addr;
+ conn.target_addr = _peer_addr;
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
+ client_cookie = generate_client_cookie();
+ logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
+ " policy(lossy={}, server={}, standby={}, resetcheck={})",
+ conn, _peer_addr, _peer_name, client_cookie,
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ execute_connecting();
+}
+
+void ProtocolV2::start_accept(SocketRef&& sock,
+ const entity_addr_t& _peer_addr)
+{
+ ceph_assert(state == state_t::NONE);
+ ceph_assert(!socket);
+ // until we know better
+ conn.target_addr = _peer_addr;
+ socket = std::move(sock);
+ logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
+ messenger.accept_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ execute_accepting();
+}
+
+// TODO: Frame related implementations, probably to a separate class.
+
+void ProtocolV2::enable_recording()
+{
+ rxbuf.clear();
+ txbuf.clear();
+ record_io = true;
+}
+
+seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
+{
+ if (unlikely(record_io)) {
+ return socket->read_exactly(bytes)
+ .then([this] (auto bl) {
+ rxbuf.append(buffer::create(bl.share()));
+ return bl;
+ });
+ } else {
+ return socket->read_exactly(bytes);
+ };
+}
+
+seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
+{
+ if (unlikely(record_io)) {
+ return socket->read(bytes)
+ .then([this] (auto buf) {
+ rxbuf.append(buf);
+ return buf;
+ });
+ } else {
+ return socket->read(bytes);
+ }
+}
+
+seastar::future<> ProtocolV2::write(bufferlist&& buf)
+{
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return socket->write(std::move(buf));
+}
+
+seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
+{
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return socket->write_flush(std::move(buf));
+}
+
+size_t ProtocolV2::get_current_msg_size() const
+{
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
+ }
+ return sum;
+}
+
+seastar::future<Tag> ProtocolV2::read_main_preamble()
+{
+ rx_preamble.clear();
+ return read_exactly(rx_frame_asm.get_preamble_onwire_len())
+ .then([this] (auto bl) {
+ rx_segments_data.clear();
+ try {
+ rx_preamble.append(buffer::create(std::move(bl)));
+ const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+ INTERCEPT_FRAME(tag, bp_type_t::READ);
+ return tag;
+ } catch (FrameError& e) {
+ logger().warn("{} read_main_preamble: {}", conn, e.what());
+ abort_in_fault();
+ }
+ });
+}
+
+seastar::future<> ProtocolV2::read_frame_payload()
+{
+ ceph_assert(rx_segments_data.empty());
+
+ return seastar::do_until(
+ [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
+ [this] {
+ // TODO: create aligned and contiguous buffer from socket
+ const size_t seg_idx = rx_segments_data.size();
+ if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+ alignment != segment_t::DEFAULT_ALIGNMENT) {
+ logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
+ conn, alignment, rx_segments_data.size());
+ }
+ uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+ // TODO: create aligned and contiguous buffer from socket
+ return read_exactly(onwire_len).then([this] (auto tmp_bl) {
+ logger().trace("{} RECV({}) frame segment[{}]",
+ conn, tmp_bl.size(), rx_segments_data.size());
+ bufferlist segment;
+ segment.append(buffer::create(std::move(tmp_bl)));
+ rx_segments_data.emplace_back(std::move(segment));
+ });
+ }
+ ).then([this] {
+ return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
+ }).then([this] (auto bl) {
+ logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
+ bool ok = false;
+ try {
+ rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
+ bufferlist rx_epilogue;
+ rx_epilogue.append(buffer::create(std::move(bl)));
+ ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+ } catch (FrameError& e) {
+ logger().error("read_frame_payload: {} {}", conn, e.what());
+ abort_in_fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ logger().error("read_frame_payload: {} bad auth tag", conn);
+ abort_in_fault();
+ }
+ // we do have a mechanism that allows transmitter to start sending message
+ // and abort after putting entire data field on wire. This will be used by
+ // the kernel client to avoid unnecessary buffering.
+ if (!ok) {
+ // TODO
+ ceph_assert(false);
+ }
+ });
+}
+
+template <class F>
+seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
+{
+ auto bl = frame.get_buffer(tx_frame_asm);
+ const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
+ logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
+ conn, bl.length(), (int)main_preamble->tag,
+ (int)main_preamble->num_segments, main_preamble->crc);
+ INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
+ if (flush) {
+ return write_flush(std::move(bl));
+ } else {
+ return write(std::move(bl));
+ }
+}
+
+void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
+{
+ if (!reentrant && _state == state) {
+ logger().error("{} is not allowed to re-trigger state {}",
+ conn, get_state_name(state));
+ ceph_assert(false);
+ }
+ logger().debug("{} TRIGGER {}, was {}",
+ conn, get_state_name(_state), get_state_name(state));
+ state = _state;
+ set_write_state(_write_state);
+}
+
+void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
+{
+ if (conn.policy.lossy) {
+ logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ close(true);
+ } else if (conn.policy.server ||
+ (conn.policy.standby &&
+ (!is_queued() && conn.sent.empty()))) {
+ logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ execute_standby();
+ } else if (backoff) {
+ logger().info("{} {}: fault at {}, going to WAIT -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ execute_wait(false);
+ } else {
+ logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ execute_connecting();
+ }
+}
+
+void ProtocolV2::reset_session(bool full)
+{
+ server_cookie = 0;
+ connect_seq = 0;
+ conn.in_seq = 0;
+ if (full) {
+ client_cookie = generate_client_cookie();
+ peer_global_seq = 0;
+ reset_write();
+ dispatchers.ms_handle_remote_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }
+}
+
+seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ProtocolV2::banner_exchange(bool is_connect)
+{
+ // 1. prepare and send banner
+ bufferlist banner_payload;
+ encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+ encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
+
+ bufferlist bl;
+ bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
+ auto len_payload = static_cast<uint16_t>(banner_payload.length());
+ encode(len_payload, bl, 0);
+ bl.claim_append(banner_payload);
+ logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
+ "required={}, banner=\"{}\"",
+ conn, bl.length(), len_payload,
+ CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+ CEPH_BANNER_V2_PREFIX);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
+ return write_flush(std::move(bl)).then([this] {
+ // 2. read peer banner
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
+ return read_exactly(banner_len); // or read exactly?
+ }).then([this] (auto bl) {
+ // 3. process peer banner and read banner_payload
+ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ logger().debug("{} RECV({}) banner: \"{}\"",
+ conn, bl.size(),
+ std::string((const char*)bl.get(), banner_prefix_len));
+
+ if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+ if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+ logger().warn("{} peer is using V1 protocol", conn);
+ } else {
+ logger().warn("{} peer sent bad banner", conn);
+ }
+ abort_in_fault();
+ }
+ bl.trim_front(banner_prefix_len);
+
+ uint16_t payload_len;
+ bufferlist buf;
+ buf.append(buffer::create(std::move(bl)));
+ auto ti = buf.cbegin();
+ try {
+ decode(payload_len, ti);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload len failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
+ return read(payload_len);
+ }).then([this, is_connect] (bufferlist bl) {
+ // 4. process peer banner_payload and send HelloFrame
+ auto p = bl.cbegin();
+ uint64_t peer_supported_features;
+ uint64_t peer_required_features;
+ try {
+ decode(peer_supported_features, p);
+ decode(peer_required_features, p);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} RECV({}) banner features: supported={} required={}",
+ conn, bl.length(),
+ peer_supported_features, peer_required_features);
+
+ // Check feature bit compatibility
+ uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+ uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+ if ((required_features & peer_supported_features) != required_features) {
+ logger().error("{} peer does not support all required features"
+ " required={} peer_supported={}",
+ conn, required_features, peer_supported_features);
+ abort_in_close(*this, is_connect);
+ }
+ if ((supported_features & peer_required_features) != peer_required_features) {
+ logger().error("{} we do not support all peer required features"
+ " peer_required={} supported={}",
+ conn, peer_required_features, supported_features);
+ abort_in_close(*this, is_connect);
+ }
+ this->peer_required_features = peer_required_features;
+ if (this->peer_required_features == 0) {
+ this->connection_features = msgr2_required;
+ }
+ const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ tx_frame_asm.set_is_rev1(is_rev1);
+ rx_frame_asm.set_is_rev1(is_rev1);
+
+ auto hello = HelloFrame::Encode(messenger.get_mytype(),
+ conn.target_addr);
+ logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+ conn, ceph_entity_type_name(messenger.get_mytype()),
+ conn.target_addr);
+ return write_frame(hello);
+ }).then([this] {
+ //5. read peer HelloFrame
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::HELLO, tag, conn, __func__);
+ return read_frame_payload();
+ }).then([this] {
+ // 6. process peer HelloFrame
+ auto hello = HelloFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+ conn, ceph_entity_type_name(hello.entity_type()),
+ hello.peer_addr());
+ return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+ std::make_tuple(hello.entity_type(), hello.peer_addr()));
+ });
+}
+
+// CONNECTING state
+
+seastar::future<> ProtocolV2::handle_auth_reply()
+{
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::AUTH_BAD_METHOD:
+ return read_frame_payload().then([this] {
+ // handle_auth_bad_method() logic
+ auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
+ "allowed_methods={}, allowed_modes={}",
+ conn, bad_method.method(), cpp_strerror(bad_method.result()),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_bad_method(
+ conn.shared_from_this(), auth_meta,
+ bad_method.method(), bad_method.result(),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_bad_method returned {}",
+ conn, r);
+ abort_in_fault();
+ }
+ return client_auth(bad_method.allowed_methods());
+ });
+ case Tag::AUTH_REPLY_MORE:
+ return read_frame_payload().then([this] {
+ // handle_auth_reply_more() logic
+ auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ // let execute_connecting() take care of the thrown exception
+ auto reply = messenger.get_auth_client()->handle_auth_reply_more(
+ conn.shared_from_this(), auth_meta, auth_more.auth_payload());
+ auto more_reply = AuthRequestMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
+ conn, reply.length());
+ return write_frame(more_reply);
+ }).then([this] {
+ return handle_auth_reply();
+ });
+ case Tag::AUTH_DONE:
+ return read_frame_payload().then([this] {
+ // handle_auth_done() logic
+ auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, auth_done.global_id(),
+ ceph_con_mode_name(auth_done.con_mode()),
+ auth_done.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_done(
+ conn.shared_from_this(), auth_meta,
+ auth_done.global_id(),
+ auth_done.con_mode(),
+ auth_done.auth_payload());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
+ abort_in_fault();
+ }
+ auth_meta->con_mode = auth_done.con_mode();
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
+ return finish_auth();
+ });
+ default: {
+ unexpected_tag(tag, conn, __func__);
+ return seastar::now();
+ }
+ }
+ });
+}
+
+seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
+{
+ // send_auth_request() logic
+ ceph_assert(messenger.get_auth_client());
+
+ try {
+ auto [auth_method, preferred_modes, bl] =
+ messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
+ auth_meta->auth_method = auth_method;
+ auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
+ logger().debug("{} WRITE AuthRequestFrame: method={},"
+ " preferred_modes={}, payload_len={}",
+ conn, auth_method, preferred_modes, bl.length());
+ return write_frame(frame).then([this] {
+ return handle_auth_reply();
+ });
+ } catch (const crimson::auth::error& e) {
+ logger().error("{} get_initial_auth_request returned {}", conn, e);
+ abort_in_close(*this, true);
+ return seastar::now();
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::process_wait()
+{
+ return read_frame_payload().then([this] {
+ // handle_wait() logic
+ logger().debug("{} GOT WaitFrame", conn);
+ WaitFrame::Decode(rx_segments_data.back());
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_connect()
+{
+ // send_client_ident() logic
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags |= CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto client_ident = ClientIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ conn.target_addr,
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required, flags,
+ client_cookie);
+
+ logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), conn.target_addr,
+ messenger.get_myname().num(), global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, client_cookie);
+ return write_frame(client_ident).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::IDENT_MISSING_FEATURES:
+ return read_frame_payload().then([this] {
+ // handle_ident_missing_features() logic
+ auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
+ " (client does not support all server features)",
+ conn, ident_missing.features());
+ abort_in_fault();
+ return next_step_t::none;
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SERVER_IDENT:
+ return read_frame_payload().then([this] {
+ // handle_server_ident() logic
+ requeue_sent();
+ auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT ServerIdentFrame:"
+ " addrs={}, gid={}, gs={},"
+ " features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn,
+ server_ident.addrs(), server_ident.gid(),
+ server_ident.global_seq(),
+ server_ident.supported_features(),
+ server_ident.required_features(),
+ server_ident.flags(), server_ident.cookie());
+
+ // is this who we intended to talk to?
+ // be a bit forgiving here, since we may be connecting based on addresses parsed out
+ // of mon_host or something.
+ if (!server_ident.addrs().contains(conn.target_addr)) {
+ logger().warn("{} peer identifies as {}, does not include {}",
+ conn, server_ident.addrs(), conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ server_cookie = server_ident.cookie();
+
+ // TODO: change peer_addr to entity_addrvec_t
+ if (server_ident.addrs().front() != conn.peer_addr) {
+ logger().warn("{} peer advertises as {}, does not match {}",
+ conn, server_ident.addrs(), conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != server_ident.gid()) {
+ logger().error("{} connection peer id ({}) does not match "
+ "what it should be ({}) during connecting, close",
+ conn, server_ident.gid(), conn.get_peer_id());
+ abort_in_close(*this, true);
+ }
+ conn.set_peer_id(server_ident.gid());
+ conn.set_features(server_ident.supported_features() &
+ conn.policy.features_supported);
+ peer_global_seq = server_ident.global_seq();
+
+ bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
+ conn.policy.lossy = lossy;
+ }
+ if (lossy && (connect_seq != 0 || server_cookie != 0)) {
+ logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
+ conn, connect_seq, server_cookie);
+ connect_seq = 0;
+ server_cookie = 0;
+ }
+
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(tag, conn, "post_client_connect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_reconnect()
+{
+ // send_reconnect() logic
+ auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
+ client_cookie,
+ server_cookie,
+ global_seq,
+ connect_seq,
+ conn.in_seq);
+ logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ conn, messenger.get_myaddrs(),
+ client_cookie, server_cookie,
+ global_seq, connect_seq, conn.in_seq);
+ return write_frame(reconnect).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::SESSION_RETRY_GLOBAL:
+ return read_frame_payload().then([this] {
+ // handle_session_retry_global() logic
+ auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT RetryGlobalFrame: gs={}",
+ conn, retry.global_seq());
+ return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
+ global_seq = gs;
+ logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
+ return client_reconnect();
+ });
+ });
+ case Tag::SESSION_RETRY:
+ return read_frame_payload().then([this] {
+ // handle_session_retry() logic
+ auto retry = RetryFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT RetryFrame: cs={}",
+ conn, retry.connect_seq());
+ connect_seq = retry.connect_seq() + 1;
+ logger().warn("{} UPDATE: cs={}", conn, connect_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RESET:
+ return read_frame_payload().then([this] {
+ // handle_session_reset() logic
+ auto reset = ResetFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+ reset_session(reset.full());
+ return client_connect();
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SESSION_RECONNECT_OK:
+ return read_frame_payload().then([this] {
+ // handle_reconnect_ok() logic
+ auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
+ conn, reconnect_ok.msg_seq());
+ requeue_up_to(reconnect_ok.msg_seq());
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(tag, conn, "post_client_reconnect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+void ProtocolV2::execute_connecting()
+{
+ trigger_state(state_t::CONNECTING, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
+ gated_execute("execute_connecting", [this] {
+ return messenger.get_global_seq().then([this] (auto gs) {
+ global_seq = gs;
+ assert(client_cookie != 0);
+ if (!conn.policy.lossy && server_cookie != 0) {
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else { // conn.policy.lossy || server_cookie == 0
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ }
+
+ return wait_write_exit();
+ }).then([this] {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before Socket::connect()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ if (socket) {
+ gate.dispatch_in_background("close_sockect_connecting", *this,
+ [sock = std::move(socket)] () mutable {
+ return sock->close().then([sock = std::move(sock)] {});
+ });
+ }
+ INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
+ return Socket::connect(conn.peer_addr);
+ }).then([this](SocketRef sock) {
+ logger().debug("{} socket connected", conn);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during Socket::connect()",
+ conn, get_state_name(state));
+ return sock->close().then([sock = std::move(sock)] {
+ abort_protocol();
+ });
+ }
+ socket = std::move(sock);
+ return seastar::now();
+ }).then([this] {
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ session_stream_handlers = { nullptr, nullptr };
+ enable_recording();
+ return banner_exchange(true);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ if (conn.get_peer_type() != _peer_type) {
+ logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+ conn, ceph_entity_type_name(conn.get_peer_type()),
+ ceph_entity_type_name(_peer_type));
+ abort_in_close(*this, true);
+ }
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+ if (unlikely(_my_addr_from_peer.is_legacy())) {
+ logger().warn("{} peer sent a legacy address for me: {}",
+ conn, _my_addr_from_peer);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+ return messenger.learned_addr(_my_addr_from_peer, conn);
+ }).then([this] {
+ return client_auth();
+ }).then([this] {
+ if (server_cookie == 0) {
+ ceph_assert(connect_seq == 0);
+ return client_connect();
+ } else {
+ ceph_assert(connect_seq > 0);
+ return client_reconnect();
+ }
+ }).then([this] (next_step_t next) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at the end of execute_connecting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ switch (next) {
+ case next_step_t::ready: {
+ logger().info("{} connected:"
+ " gs={}, pgs={}, cs={}, client_cookie={},"
+ " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq,
+ conn.out_seq, conn.out_q.size());
+ execute_ready(true);
+ break;
+ }
+ case next_step_t::wait: {
+ logger().info("{} execute_connecting(): going to WAIT", conn);
+ execute_wait(true);
+ break;
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::CONNECTING) {
+ logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ return;
+ }
+
+ if (conn.policy.server ||
+ (conn.policy.standby &&
+ (!is_queued() && conn.sent.empty()))) {
+ logger().info("{} execute_connecting(): fault at {} with nothing to send,"
+ " going to STANDBY -- {}",
+ conn, get_state_name(state), eptr);
+ execute_standby();
+ } else {
+ logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
+ conn, get_state_name(state), eptr);
+ execute_wait(false);
+ }
+ });
+ });
+}
+
+// ACCEPTING state
+
+seastar::future<> ProtocolV2::_auth_bad_method(int r)
+{
+ // _auth_bad_method() logic
+ ceph_assert(r < 0);
+ auto [allowed_methods, allowed_modes] =
+ messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
+ auto bad_method = AuthBadMethodFrame::Encode(
+ auth_meta->auth_method, r, allowed_methods, allowed_modes);
+ logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
+ "allowed_methods={}, allowed_modes={})",
+ conn, auth_meta->auth_method, cpp_strerror(r),
+ allowed_methods, allowed_modes);
+ return write_frame(bad_method).then([this] {
+ return server_auth();
+ });
+}
+
+seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+{
+ // _handle_auth_request() logic
+ ceph_assert(messenger.get_auth_server());
+ bufferlist reply;
+ int r = messenger.get_auth_server()->handle_auth_request(
+ conn.shared_from_this(), auth_meta,
+ more, auth_meta->auth_method, auth_payload,
+ &reply);
+ switch (r) {
+ // successful
+ case 1: {
+ auto auth_done = AuthDoneFrame::Encode(
+ conn.peer_global_id, auth_meta->con_mode, reply);
+ logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, conn.peer_global_id,
+ ceph_con_mode_name(auth_meta->con_mode), reply.length());
+ return write_frame(auth_done).then([this] {
+ ceph_assert(auth_meta);
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
+ return finish_auth();
+ });
+ }
+ // auth more
+ case 0: {
+ auto more = AuthReplyMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
+ conn, reply.length());
+ return write_frame(more).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
+ return read_frame_payload();
+ }).then([this] {
+ auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ return _handle_auth_request(auth_more.auth_payload(), true);
+ });
+ }
+ case -EBUSY: {
+ logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
+ abort_in_fault();
+ return seastar::now();
+ }
+ default: {
+ logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
+ return _auth_bad_method(r);
+ }
+ }
+}
+
+seastar::future<> ProtocolV2::server_auth()
+{
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
+ return read_frame_payload();
+ }).then([this] {
+ // handle_auth_request() logic
+ auto request = AuthRequestFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
+ " payload_len={}",
+ conn, request.method(), request.preferred_modes(),
+ request.auth_payload().length());
+ auth_meta->auth_method = request.method();
+ auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
+ conn.get_peer_type(), auth_meta->auth_method,
+ request.preferred_modes());
+ if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
+ logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
+ return _auth_bad_method(-EOPNOTSUPP);
+ }
+ return _handle_auth_request(request.auth_payload(), false);
+ });
+}
+
+bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
+{
+ auto my_peer_name = conn.get_peer_name();
+ if (my_peer_name.type() != peer_name.type()) {
+ return false;
+ }
+ if (my_peer_name.num() != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ my_peer_name.num() != peer_name.num()) {
+ return false;
+ }
+ return true;
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_wait()
+{
+ auto wait = WaitFrame::Encode();
+ logger().debug("{} WRITE WaitFrame", conn);
+ return write_frame(wait).then([] {
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::reuse_connection(
+ ProtocolV2* existing_proto, bool do_reset,
+ bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
+{
+ existing_proto->trigger_replacing(reconnect,
+ do_reset,
+ std::move(socket),
+ std::move(auth_meta),
+ std::move(session_stream_handlers),
+ peer_global_seq,
+ client_cookie,
+ conn.get_peer_name(),
+ connection_features,
+ tx_frame_asm.get_is_rev1(),
+ rx_frame_asm.get_is_rev1(),
+ conn_seq,
+ msg_seq);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_replaced(conn);
+ }
+#endif
+ // close this connection because all the necessary information is delivered
+ // to the exisiting connection, and jump to error handling code to abort the
+ // current state.
+ abort_in_close(*this, false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
+{
+ // handle_existing_connection() logic
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ existing_conn, get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_connect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_connect: racing replace happened while"
+ " replacing existing connection {}, send wait.",
+ conn, *existing_conn);
+ return send_wait();
+ }
+
+ if (existing_proto->peer_global_seq > peer_global_seq) {
+ logger().warn("{} server_connect:"
+ " this is a stale connection, because peer_global_seq({})"
+ " < existing->peer_global_seq({}), close this connection"
+ " in favor of existing connection {}",
+ conn, peer_global_seq,
+ existing_proto->peer_global_seq, *existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_conn->policy.lossy) {
+ // existing connection can be thrown out in favor of this one
+ logger().warn("{} server_connect:"
+ " existing connection {} is a lossy channel. Close existing in favor of"
+ " this connection", conn, *existing_conn);
+ execute_establishing(existing_conn, true);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+
+ if (existing_proto->server_cookie != 0) {
+ if (existing_proto->client_cookie != client_cookie) {
+ // Found previous session
+ // peer has reset and we're going to reuse the existing connection
+ // by replacing the socket
+ logger().warn("{} server_connect:"
+ " found new session (cs={})"
+ " when existing {} is with stale session (cs={}, ss={}),"
+ " peer must have reset",
+ conn, client_cookie,
+ *existing_conn, existing_proto->client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto, conn.policy.resetcheck);
+ } else {
+ // session establishment interrupted between client_ident and server_ident,
+ // continuing...
+ logger().warn("{} server_connect: found client session with existing {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ } else {
+ // Looks like a connection race: server and client are both connecting to
+ // each other at the same time.
+ if (existing_proto->client_cookie != client_cookie) {
+ if (existing_conn->peer_wins()) {
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and win, reusing existing {}",
+ conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+ return reuse_connection(existing_proto);
+ } else {
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and lose to existing {}, ask client to wait",
+ conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+ return existing_conn->keepalive().then([this] {
+ return send_wait();
+ });
+ }
+ } else {
+ logger().warn("{} server_connect: found client session with existing {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_connect()
+{
+ return read_frame_payload().then([this] {
+ // handle_client_ident() logic
+ auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
+ " gid={}, gs={}, features_supported={},"
+ " features_required={}, flags={}, cookie={}",
+ conn, client_ident.addrs(), client_ident.target_addr(),
+ client_ident.gid(), client_ident.global_seq(),
+ client_ident.supported_features(),
+ client_ident.required_features(),
+ client_ident.flags(), client_ident.cookie());
+
+ if (client_ident.addrs().empty() ||
+ client_ident.addrs().front() == entity_addr_t()) {
+ logger().warn("{} oops, client_ident.addrs() is empty", conn);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
+ logger().warn("{} peer is trying to reach {} which is not us ({})",
+ conn, client_ident.target_addr(), messenger.get_myaddrs());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ // TODO: change peer_addr to entity_addrvec_t
+ entity_addr_t paddr = client_ident.addrs().front();
+ if ((paddr.is_msgr2() || paddr.is_any()) &&
+ paddr.is_same_host(conn.target_addr)) {
+ // good
+ } else {
+ logger().warn("{} peer's address {} is not v2 or not the same host with {}",
+ conn, paddr, conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ conn.peer_addr = paddr;
+ logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
+ conn.target_addr = conn.peer_addr;
+ if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
+ logger().warn("{} we don't know how to reconnect to peer {}",
+ conn, conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != client_ident.gid()) {
+ logger().error("{} client_ident peer_id ({}) does not match"
+ " what it should be ({}) during accepting, abort",
+ conn, client_ident.gid(), conn.get_peer_id());
+ abort_in_fault();
+ }
+ conn.set_peer_id(client_ident.gid());
+ client_cookie = client_ident.cookie();
+
+ uint64_t feat_missing =
+ (conn.policy.features_required | msgr2_required) &
+ ~(uint64_t)client_ident.supported_features();
+ if (feat_missing) {
+ auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+ logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
+ conn, feat_missing);
+ return write_frame(ident_missing_features).then([] {
+ return next_step_t::wait;
+ });
+ }
+ connection_features =
+ client_ident.supported_features() & conn.policy.features_supported;
+ logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
+
+ peer_global_seq = client_ident.global_seq();
+
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (existing_conn) {
+ if (existing_conn->protocol->proto_type != proto_t::v2) {
+ logger().warn("{} existing connection {} proto version is {}, close existing",
+ conn, *existing_conn,
+ static_cast<int>(existing_conn->protocol->proto_type));
+ // should unregister the existing from msgr atomically
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
+ execute_establishing(existing_conn, false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ } else {
+ return handle_existing_connection(existing_conn);
+ }
+ } else {
+ execute_establishing(nullptr, true);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::read_reconnect()
+{
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+ return server_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry(uint64_t connect_seq)
+{
+ auto retry = RetryFrame::Encode(connect_seq);
+ logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
+ return write_frame(retry).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry_global(uint64_t global_seq)
+{
+ auto retry = RetryGlobalFrame::Encode(global_seq);
+ logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
+ return write_frame(retry).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_reset(bool full)
+{
+ auto reset = ResetFrame::Encode(full);
+ logger().warn("{} WRITE ResetFrame: full={}", conn, full);
+ return write_frame(reset).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+ return server_connect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_reconnect()
+{
+ return read_frame_payload().then([this] {
+ // handle_reconnect() logic
+ auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+
+ logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ conn, reconnect.addrs(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ reconnect.global_seq(), reconnect.connect_seq(),
+ reconnect.msg_seq());
+
+ // can peer_addrs be changed on-the-fly?
+ // TODO: change peer_addr to entity_addrvec_t
+ entity_addr_t paddr = reconnect.addrs().front();
+ if (paddr.is_msgr2() || paddr.is_any()) {
+ // good
+ } else {
+ logger().warn("{} peer's address {} is not v2", conn, paddr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.peer_addr == entity_addr_t()) {
+ conn.peer_addr = paddr;
+ } else if (conn.peer_addr != paddr) {
+ logger().error("{} peer identifies as {}, while conn.peer_addr={},"
+ " reconnect failed",
+ conn, paddr, conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ peer_global_seq = reconnect.global_seq();
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (!existing_conn) {
+ // there is no existing connection therefore cannot reconnect to previous
+ // session
+ logger().warn("{} server_reconnect: no existing connection from address {},"
+ " reseting client", conn, conn.peer_addr);
+ return send_reset(true);
+ }
+
+ if (existing_conn->protocol->proto_type != proto_t::v2) {
+ logger().warn("{} server_reconnect: existing connection {} proto version is {},"
+ "close existing and reset client.",
+ conn, *existing_conn,
+ static_cast<int>(existing_conn->protocol->proto_type));
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
+ existing_conn->mark_down();
+ return send_reset(true);
+ }
+
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, reconnect.connect_seq(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ existing_conn,
+ get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_reconnect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_reconnect: racing replace happened while "
+ " replacing existing connection {}, retry global.",
+ conn, *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->client_cookie != reconnect.client_cookie()) {
+ logger().warn("{} server_reconnect:"
+ " client_cookie mismatch with existing connection {},"
+ " cc={} rcc={}. I must have reset, reseting client.",
+ conn, *existing_conn,
+ existing_proto->client_cookie, reconnect.client_cookie());
+ return send_reset(conn.policy.resetcheck);
+ } else if (existing_proto->server_cookie == 0) {
+ // this happens when:
+ // - a connects to b
+ // - a sends client_ident
+ // - b gets client_ident, sends server_ident and sets cookie X
+ // - connection fault
+ // - b reconnects to a with cookie X, connect_seq=1
+ // - a has cookie==0
+ logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
+ " server_ident with existing connection {}."
+ " Asking peer to resume session establishment",
+ conn, existing_proto->client_cookie, *existing_conn);
+ return send_reset(false);
+ }
+
+ if (existing_proto->peer_global_seq > reconnect.global_seq()) {
+ logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
+ " with existing connection {},"
+ " ask client to retry global",
+ conn, existing_proto->peer_global_seq,
+ reconnect.global_seq(), *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->connect_seq > reconnect.connect_seq()) {
+ logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
+ " with existing connection {}, ask client to retry",
+ conn, reconnect.connect_seq(),
+ existing_proto->connect_seq, *existing_conn);
+ return send_retry(existing_proto->connect_seq);
+ } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
+ // reconnect race: both peers are sending reconnect messages
+ if (existing_conn->peer_wins()) {
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and win, reusing existing {}",
+ conn, reconnect.connect_seq(), *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ } else {
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and lose to existing {}, ask client to wait",
+ conn, reconnect.connect_seq(), *existing_conn);
+ return send_wait();
+ }
+ } else { // existing_proto->connect_seq < reconnect.connect_seq()
+ logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
+ " reusing existing {}",
+ conn, existing_proto->connect_seq,
+ reconnect.connect_seq(), *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ }
+ });
+}
+
+void ProtocolV2::execute_accepting()
+{
+ trigger_state(state_t::ACCEPTING, write_state_t::none, false);
+ gate.dispatch_in_background("execute_accepting", *this, [this] {
+ return seastar::futurize_invoke([this] {
+ INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ session_stream_handlers = { nullptr, nullptr };
+ enable_recording();
+ return banner_exchange(false);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ ceph_assert(conn.get_peer_type() == 0);
+ conn.set_peer_type(_peer_type);
+
+ conn.policy = messenger.get_policy(_peer_type);
+ logger().info("{} UPDATE: peer_type={},"
+ " policy(lossy={} server={} standby={} resetcheck={})",
+ conn, ceph_entity_type_name(_peer_type),
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
+ logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+ conn, _my_addr_from_peer, messenger.get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ return messenger.learned_addr(_my_addr_from_peer, conn);
+ }).then([this] {
+ return server_auth();
+ }).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::CLIENT_IDENT:
+ return server_connect();
+ case Tag::SESSION_RECONNECT:
+ return server_reconnect();
+ default: {
+ unexpected_tag(tag, conn, "post_server_auth");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready:
+ assert(state != state_t::ACCEPTING);
+ break;
+ case next_step_t::wait:
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} at the end of execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+ execute_server_wait();
+ break;
+ default:
+ ceph_abort("impossible next step");
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), eptr);
+ close(false);
+ });
+ });
+}
+
+// CONNECTING or ACCEPTING state
+
+seastar::future<> ProtocolV2::finish_auth()
+{
+ ceph_assert(auth_meta);
+
+ const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
+ auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
+ auto sig_frame = AuthSignatureFrame::Encode(sig);
+ ceph_assert(record_io);
+ record_io = false;
+ rxbuf.clear();
+ logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
+ return write_frame(sig_frame).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
+ return read_frame_payload();
+ }).then([this] {
+ // handle_auth_signature() logic
+ auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
+
+ const auto actual_tx_sig = auth_meta->session_key.empty() ?
+ sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
+ if (sig_frame.signature() != actual_tx_sig) {
+ logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
+ " sig_frame.signature()={}",
+ conn, actual_tx_sig, sig_frame.signature());
+ abort_in_fault();
+ }
+ txbuf.clear();
+ });
+}
+
+// ESTABLISHING
+
+void ProtocolV2::execute_establishing(
+ SocketConnectionRef existing_conn, bool dispatch_reset) {
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto accept_me = [this] {
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ };
+
+ trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+ if (existing_conn) {
+ existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().warn("{} triggered {} during execute_establishing(), "
+ "the accept event will not be delivered!",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ } else {
+ accept_me();
+ }
+
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+ gated_execute("execute_establishing", [this] {
+ return seastar::futurize_invoke([this] {
+ return send_server_ident();
+ }).then([this] {
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} at the end of execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
+ " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq,
+ conn.out_seq, conn.out_q.size());
+ execute_ready(false);
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::ESTABLISHING) {
+ logger().info("{} execute_establishing() protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ return;
+ }
+ fault(false, "execute_establishing()", eptr);
+ });
+ });
+}
+
+// ESTABLISHING or REPLACING state
+
+seastar::future<>
+ProtocolV2::send_server_ident()
+{
+ // send_server_ident() logic
+
+ // refered to async-conn v2: not assign gs to global_seq
+ return messenger.get_global_seq().then([this] (auto gs) {
+ logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+
+ // this is required for the case when this connection is being replaced
+ requeue_up_to(0);
+ conn.in_seq = 0;
+
+ if (!conn.policy.lossy) {
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
+
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags = flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto server_ident = ServerIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ messenger.get_myname().num(),
+ gs,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags,
+ server_cookie);
+
+ logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+ gs, conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, server_cookie);
+
+ conn.set_features(connection_features);
+
+ return write_frame(server_ident);
+ });
+}
+
+// REPLACING state
+
+void ProtocolV2::trigger_replacing(bool reconnect,
+ bool do_reset,
+ SocketRef&& new_socket,
+ AuthConnectionMetaRef&& new_auth_meta,
+ ceph::crypto::onwire::rxtx_t new_rxtx,
+ uint64_t new_peer_global_seq,
+ uint64_t new_client_cookie,
+ entity_name_t new_peer_name,
+ uint64_t new_conn_features,
+ bool tx_is_rev1,
+ bool rx_is_rev1,
+ uint64_t new_connect_seq,
+ uint64_t new_msg_seq)
+{
+ trigger_state(state_t::REPLACING, write_state_t::delay, false);
+ if (socket) {
+ socket->shutdown();
+ }
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ gate.dispatch_in_background("trigger_replacing", *this,
+ [this,
+ reconnect,
+ do_reset,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ tx_is_rev1, rx_is_rev1,
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ return wait_write_exit().then([this, do_reset] {
+ if (do_reset) {
+ reset_session(true);
+ }
+ protocol_timer.cancel();
+ return execution_done.get_future();
+ }).then([this,
+ reconnect,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ tx_is_rev1, rx_is_rev1,
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ if (unlikely(state != state_t::REPLACING)) {
+ return new_socket->close().then([sock = std::move(new_socket)] {
+ abort_protocol();
+ });
+ }
+
+ if (socket) {
+ gate.dispatch_in_background("close_socket_replacing", *this,
+ [sock = std::move(socket)] () mutable {
+ return sock->close().then([sock = std::move(sock)] {});
+ });
+ }
+ socket = std::move(new_socket);
+ auth_meta = std::move(new_auth_meta);
+ session_stream_handlers = std::move(new_rxtx);
+ record_io = false;
+ peer_global_seq = new_peer_global_seq;
+
+ if (reconnect) {
+ connect_seq = new_connect_seq;
+ // send_reconnect_ok() logic
+ requeue_up_to(new_msg_seq);
+ auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
+ return write_frame(reconnect_ok);
+ } else {
+ client_cookie = new_client_cookie;
+ assert(conn.get_peer_type() == new_peer_name.type());
+ if (conn.get_peer_id() == entity_name_t::NEW) {
+ conn.set_peer_id(new_peer_name.num());
+ }
+ connection_features = new_conn_features;
+ tx_frame_asm.set_is_rev1(tx_is_rev1);
+ rx_frame_asm.set_is_rev1(rx_is_rev1);
+ return send_server_ident();
+ }
+ }).then([this, reconnect] {
+ if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} at the end of trigger_replacing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} replaced ({}):"
+ " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
+ " in_seq={}, out_seq={}, out_q={}",
+ conn, reconnect ? "reconnected" : "connected",
+ global_seq, peer_global_seq, connect_seq, client_cookie,
+ server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
+ execute_ready(false);
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::REPLACING) {
+ logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::CLOSING);
+ return;
+ }
+ fault(true, "trigger_replacing()", eptr);
+ });
+ });
+}
+
+// READY state
+
+ceph::bufferlist ProtocolV2::do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> _keepalive_ack,
+ bool require_ack)
+{
+ ceph::bufferlist bl;
+
+ if (unlikely(require_keepalive)) {
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ bl.append(keepalive_frame.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
+ }
+
+ if (unlikely(_keepalive_ack.has_value())) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
+ bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
+ }
+
+ if (require_ack && !num_msgs) {
+ auto ack_frame = AckFrame::Encode(conn.in_seq);
+ bl.append(ack_frame.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
+ }
+
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ // TODO: move to common code
+ // set priority
+ msg->get_header().src = messenger.get_myname();
+
+ msg->encode(conn.features, 0);
+
+ ceph_assert(!msg->get_seq() && "message already has seq");
+ msg->set_seq(++conn.out_seq);
+
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ init_le32(0), header.data_off,
+ init_le64(conn.in_seq),
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto message = MessageFrame::Encode(header2,
+ msg->get_payload(), msg->get_middle(), msg->get_data());
+ logger().debug("{} --> #{} === {} ({})",
+ conn, msg->get_seq(), *msg, msg->get_type());
+ bl.append(message.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
+ });
+
+ return bl;
+}
+
+seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
+{
+ return read_frame_payload()
+ .then([this, throttle_stamp] {
+ utime_t recv_stamp{seastar::lowres_system_clock::now()};
+
+ // we need to get the size before std::moving segments data
+ const size_t cur_msg_size = get_current_msg_size();
+ auto msg_frame = MessageFrame::Decode(rx_segments_data);
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = msg_frame.header();
+
+ logger().trace("{} got {} + {} + {} byte message,"
+ " envelope type={} src={} off={} seq={}",
+ conn, msg_frame.front_len(), msg_frame.middle_len(),
+ msg_frame.data_len(), current_header.type, conn.get_peer_name(),
+ current_header.data_off, current_header.seq);
+
+ ceph_msg_header header{current_header.seq,
+ current_header.tid,
+ current_header.type,
+ current_header.priority,
+ current_header.version,
+ init_le32(msg_frame.front_len()),
+ init_le32(msg_frame.middle_len()),
+ init_le32(msg_frame.data_len()),
+ current_header.data_off,
+ conn.get_peer_name(),
+ current_header.compat_version,
+ current_header.reserved,
+ init_le32(0)};
+ ceph_msg_footer footer{init_le32(0), init_le32(0),
+ init_le32(0), init_le64(0), current_header.flags};
+
+ auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this());
+ Message *message = decode_message(nullptr, 0, header, footer,
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+ if (!message) {
+ logger().warn("{} decode message failed", conn);
+ abort_in_fault();
+ }
+
+ // store reservation size in message, so we don't get confused
+ // by messages entering the dispatch queue through other paths.
+ message->set_dispatch_throttle_size(cur_msg_size);
+
+ message->set_throttle_stamp(throttle_stamp);
+ message->set_recv_stamp(recv_stamp);
+ message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
+
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the
+ // client side queueing because messages can't be renumbered, but the (kernel)
+ // client will occasionally pull a message out of the sent queue to send
+ // elsewhere. in that case it doesn't matter if we "got" it or not.
+ uint64_t cur_seq = conn.in_seq;
+ if (message->get_seq() <= cur_seq) {
+ logger().error("{} got old message {} <= {} {}, discarding",
+ conn, message->get_seq(), cur_seq, *message);
+ if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+ local_conf()->ms_die_on_old_message) {
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return seastar::now();
+ } else if (message->get_seq() > cur_seq + 1) {
+ logger().error("{} missed message? skipped from seq {} to {}",
+ conn, cur_seq, message->get_seq());
+ if (local_conf()->ms_die_on_skipped_message) {
+ ceph_assert(0 == "skipped incoming seq");
+ }
+ }
+
+ // note last received message.
+ conn.in_seq = message->get_seq();
+ logger().debug("{} <== #{} === {} ({})",
+ conn, message->get_seq(), *message, message->get_type());
+ notify_ack();
+ ack_writes(current_header.ack_seq);
+
+ // TODO: change MessageRef with seastar::shared_ptr
+ auto msg_ref = MessageRef{message, false};
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ });
+}
+
+void ProtocolV2::execute_ready(bool dispatch_connect)
+{
+ assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
+ trigger_state(state_t::READY, write_state_t::open, false);
+ if (dispatch_connect) {
+ dispatchers.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_ready(conn);
+ }
+#endif
+ gated_execute("execute_ready", [this] {
+ protocol_timer.cancel();
+ return seastar::keep_doing([this] {
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::MESSAGE: {
+ return seastar::futurize_invoke([this] {
+ // throttle_message() logic
+ if (!conn.policy.throttler_messages) {
+ return seastar::now();
+ }
+ // TODO: message throttler
+ ceph_assert(false);
+ return seastar::now();
+ }).then([this] {
+ // throttle_bytes() logic
+ if (!conn.policy.throttler_bytes) {
+ return seastar::now();
+ }
+ size_t cur_msg_size = get_current_msg_size();
+ if (!cur_msg_size) {
+ return seastar::now();
+ }
+ logger().trace("{} wants {} bytes from policy throttler {}/{}",
+ conn, cur_msg_size,
+ conn.policy.throttler_bytes->get_current(),
+ conn.policy.throttler_bytes->get_max());
+ return conn.policy.throttler_bytes->get(cur_msg_size);
+ }).then([this] {
+ // TODO: throttle_dispatch_queue() logic
+ utime_t throttle_stamp{seastar::lowres_system_clock::now()};
+ return read_message(throttle_stamp);
+ });
+ }
+ case Tag::ACK:
+ return read_frame_payload().then([this] {
+ // handle_message_ack() logic
+ auto ack = AckFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
+ ack_writes(ack.seq());
+ });
+ case Tag::KEEPALIVE2:
+ return read_frame_payload().then([this] {
+ // handle_keepalive2() logic
+ auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+ conn, keepalive_frame.timestamp());
+ notify_keepalive_ack(keepalive_frame.timestamp());
+ conn.set_last_keepalive(seastar::lowres_system_clock::now());
+ });
+ case Tag::KEEPALIVE2_ACK:
+ return read_frame_payload().then([this] {
+ // handle_keepalive2_ack() logic
+ auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
+ conn.set_last_keepalive_ack(
+ seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
+ logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
+ conn, conn.last_keepalive_ack);
+ });
+ default: {
+ unexpected_tag(tag, conn, "execute_ready");
+ return seastar::now();
+ }
+ }
+ });
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::READY) {
+ logger().info("{} execute_ready(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ return;
+ }
+ fault(false, "execute_ready()", eptr);
+ });
+ });
+}
+
+// STANDBY state
+
+void ProtocolV2::execute_standby()
+{
+ trigger_state(state_t::STANDBY, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
+}
+
+void ProtocolV2::notify_write()
+{
+ if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
+ logger().info("{} notify_write(): at {}, going to CONNECTING",
+ conn, get_state_name(state));
+ execute_connecting();
+ }
+}
+
+// WAIT state
+
+void ProtocolV2::execute_wait(bool max_backoff)
+{
+ trigger_state(state_t::WAIT, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
+ gated_execute("execute_wait", [this, max_backoff] {
+ double backoff = protocol_timer.last_dur();
+ if (max_backoff) {
+ backoff = local_conf().get_val<double>("ms_max_backoff");
+ } else if (backoff > 0) {
+ backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
+ } else {
+ backoff = local_conf().get_val<double>("ms_initial_backoff");
+ }
+ return protocol_timer.backoff(backoff).then([this] {
+ if (unlikely(state != state_t::WAIT)) {
+ logger().debug("{} triggered {} at the end of execute_wait()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_wait(): going to CONNECTING", conn);
+ execute_connecting();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().info("{} execute_wait(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ });
+ });
+}
+
+// SERVER_WAIT state
+
+void ProtocolV2::execute_server_wait()
+{
+ trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
+ gated_execute("execute_server_wait", [this] {
+ return read_exactly(1).then([this] (auto bl) {
+ logger().warn("{} SERVER_WAIT got read, abort", conn);
+ abort_in_fault();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), eptr);
+ close(false);
+ });
+ });
+}
+
+// CLOSING state
+
+void ProtocolV2::trigger_close()
+{
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+
+ if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
+ messenger.unregister_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else {
+ // cannot happen
+ ceph_assert(false);
+ }
+
+ protocol_timer.cancel();
+ trigger_state(state_t::CLOSING, write_state_t::drop, false);
+}
+
+void ProtocolV2::on_closed()
+{
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+}
+
+void ProtocolV2::print(std::ostream& out) const
+{
+ out << conn;
+}
+
+} // namespace crimson::net