diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/net | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net')
-rw-r--r-- | src/crimson/net/Connection.h | 147 | ||||
-rw-r--r-- | src/crimson/net/Dispatcher.h | 62 | ||||
-rw-r--r-- | src/crimson/net/Errors.cc | 51 | ||||
-rw-r--r-- | src/crimson/net/Errors.h | 53 | ||||
-rw-r--r-- | src/crimson/net/FrameAssemblerV2.cc | 461 | ||||
-rw-r--r-- | src/crimson/net/FrameAssemblerV2.h | 257 | ||||
-rw-r--r-- | src/crimson/net/Fwd.h | 52 | ||||
-rw-r--r-- | src/crimson/net/Interceptor.h | 186 | ||||
-rw-r--r-- | src/crimson/net/Messenger.cc | 19 | ||||
-rw-r--r-- | src/crimson/net/Messenger.h | 130 | ||||
-rw-r--r-- | src/crimson/net/ProtocolV2.cc | 2348 | ||||
-rw-r--r-- | src/crimson/net/ProtocolV2.h | 328 | ||||
-rw-r--r-- | src/crimson/net/Socket.cc | 523 | ||||
-rw-r--r-- | src/crimson/net/Socket.h | 201 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.cc | 220 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.h | 236 | ||||
-rw-r--r-- | src/crimson/net/SocketMessenger.cc | 485 | ||||
-rw-r--r-- | src/crimson/net/SocketMessenger.h | 192 | ||||
-rw-r--r-- | src/crimson/net/chained_dispatchers.cc | 114 | ||||
-rw-r--r-- | src/crimson/net/chained_dispatchers.h | 39 | ||||
-rw-r--r-- | src/crimson/net/io_handler.cc | 1287 | ||||
-rw-r--r-- | src/crimson/net/io_handler.h | 623 |
22 files changed, 8014 insertions, 0 deletions
diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h new file mode 100644 index 000000000..7141e20f4 --- /dev/null +++ b/src/crimson/net/Connection.h @@ -0,0 +1,147 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <queue> +#include <seastar/core/future.hh> +#include <seastar/core/shared_ptr.hh> + +#include "Fwd.h" + +namespace crimson::net { + +using seq_num_t = uint64_t; + +/** + * Connection + * + * Abstraction for messenger connections. + * + * Except when otherwise specified, methods must be invoked from the core on which + * the connection originates. + */ +class Connection : public seastar::enable_shared_from_this<Connection> { + public: + using clock_t = seastar::lowres_system_clock; + + Connection() {} + + virtual ~Connection() {} + + /** + * get_shard_id + * + * The shard id where the Connection is dispatching events and handling I/O. + * + * May be changed with the accept/connect events. + */ + virtual const seastar::shard_id get_shard_id() const = 0; + + virtual const entity_name_t &get_peer_name() const = 0; + + entity_type_t get_peer_type() const { return get_peer_name().type(); } + int64_t get_peer_id() const { return get_peer_name().num(); } + bool peer_is_mon() const { return get_peer_name().is_mon(); } + bool peer_is_mgr() const { return get_peer_name().is_mgr(); } + bool peer_is_mds() const { return get_peer_name().is_mds(); } + bool peer_is_osd() const { return get_peer_name().is_osd(); } + bool peer_is_client() const { return get_peer_name().is_client(); } + + virtual const entity_addr_t &get_peer_addr() const = 0; + + const entity_addrvec_t get_peer_addrs() const { + return entity_addrvec_t(get_peer_addr()); + } + + virtual const entity_addr_t &get_peer_socket_addr() const = 0; + + virtual uint64_t get_features() const = 0; + + bool has_feature(uint64_t f) const { + return get_features() & f; + } + + /// true if the handshake has completed and no errors have been encountered + virtual bool is_connected() const = 0; + + /** + * send + * + * Send a message over a connection that has completed its handshake. + * + * May be invoked from any core, but that requires to chain the returned + * future to preserve ordering. + */ + virtual seastar::future<> send(MessageURef msg) = 0; + + /** + * send_keepalive + * + * Send a keepalive message over a connection that has completed its + * handshake. + * + * May be invoked from any core, but that requires to chain the returned + * future to preserve ordering. + */ + virtual seastar::future<> send_keepalive() = 0; + + virtual clock_t::time_point get_last_keepalive() const = 0; + + virtual clock_t::time_point get_last_keepalive_ack() const = 0; + + // workaround for the monitor client + virtual void set_last_keepalive_ack(clock_t::time_point when) = 0; + + // close the connection and cancel any any pending futures from read/send, + // without dispatching any reset event + virtual void mark_down() = 0; + + struct user_private_t { + virtual ~user_private_t() = default; + }; + + virtual bool has_user_private() const = 0; + + virtual user_private_t &get_user_private() = 0; + + virtual void set_user_private(std::unique_ptr<user_private_t>) = 0; + + virtual void print(std::ostream& out) const = 0; + +#ifdef UNIT_TESTS_BUILT + virtual bool is_protocol_ready() const = 0; + + virtual bool is_protocol_standby() const = 0; + + virtual bool is_protocol_closed() const = 0; + + virtual bool is_protocol_closed_clean() const = 0; + + virtual bool peer_wins() const = 0; +#endif +}; + +inline std::ostream& operator<<(std::ostream& out, const Connection& conn) { + out << "["; + conn.print(out); + out << "]"; + return out; +} + +} // namespace crimson::net + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::net::Connection> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h new file mode 100644 index 000000000..9eea0a858 --- /dev/null +++ b/src/crimson/net/Dispatcher.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "Fwd.h" + +class AuthAuthorizer; + +namespace crimson::net { + +class Dispatcher { + public: + virtual ~Dispatcher() {} + + // Dispatchers are put into a chain as described by chain-of-responsibility + // pattern. If any of the dispatchers claims this message, it returns a valid + // future to prevent other dispatchers from processing it, and this is also + // used to throttle the connection if it's too busy. + virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0; + + // The connection is moving to the new_shard under accept/connect. + // User should not operate conn in this shard thereafter. + virtual void ms_handle_shard_change( + ConnectionRef conn, + seastar::shard_id new_shard, + bool is_accept_or_connect) {} + + // The connection is accepted or recoverred(lossless), all the followup + // events and messages will be dispatched to this shard. + // + // is_replace=true means the accepted connection has replaced + // another connecting connection with the same peer_addr, which currently only + // happens under lossy policy when both sides wish to connect to each other. + virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) {} + + // The connection is (re)connected, all the followup events and messages will + // be dispatched to this shard. + virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id prv_shard) {} + + // a reset event is dispatched when the connection is closed unexpectedly. + // + // is_replace=true means the reset connection is going to be replaced by + // another accepting connection with the same peer_addr, which currently only + // happens under lossy policy when both sides wish to connect to each other. + virtual void ms_handle_reset(ConnectionRef conn, bool is_replace) {} + + virtual void ms_handle_remote_reset(ConnectionRef conn) {} +}; + +} // namespace crimson::net diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc new file mode 100644 index 000000000..d07c090db --- /dev/null +++ b/src/crimson/net/Errors.cc @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Errors.h" + +namespace crimson::net { + +const std::error_category& net_category() +{ + struct category : public std::error_category { + const char* name() const noexcept override { + return "crimson::net"; + } + + std::string message(int ev) const override { + switch (static_cast<error>(ev)) { + case error::success: + return "success"; + case error::bad_connect_banner: + return "bad connect banner"; + case error::bad_peer_address: + return "bad peer address"; + case error::negotiation_failure: + return "negotiation failure"; + case error::read_eof: + return "read eof"; + case error::corrupted_message: + return "corrupted message"; + case error::protocol_aborted: + return "protocol aborted"; + default: + return "unknown"; + } + } + }; + static category instance; + return instance; +} + +} // namespace crimson::net diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h new file mode 100644 index 000000000..3a17a103a --- /dev/null +++ b/src/crimson/net/Errors.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <system_error> + +namespace crimson::net { + +/// net error codes +enum class error { + success = 0, + bad_connect_banner, + bad_peer_address, + negotiation_failure, + read_eof, + corrupted_message, + protocol_aborted, +}; + +/// net error category +const std::error_category& net_category(); + +inline std::error_code make_error_code(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +inline std::error_condition make_error_condition(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +} // namespace crimson::net + +namespace std { + +/// enables implicit conversion to std::error_condition +template <> +struct is_error_condition_enum<crimson::net::error> : public true_type {}; + +} // namespace std diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc new file mode 100644 index 000000000..273a6350d --- /dev/null +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -0,0 +1,461 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include "FrameAssemblerV2.h" + +#include "Errors.h" +#include "SocketConnection.h" + +using ceph::msgr::v2::FrameAssembler; +using ceph::msgr::v2::FrameError; +using ceph::msgr::v2::preamble_block_t; +using ceph::msgr::v2::segment_t; +using ceph::msgr::v2::Tag; + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +} // namespace anonymous + +namespace crimson::net { + +FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn) + : conn{_conn}, sid{seastar::this_shard_id()} +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); +} + +FrameAssemblerV2::~FrameAssemblerV2() +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + assert(seastar::this_shard_id() == sid); + if (has_socket()) { + std::ignore = move_socket(); + } +} + +#ifdef UNIT_TESTS_BUILT +// should be consistent to intercept() in ProtocolV2.cc +seastar::future<> FrameAssemblerV2::intercept_frames( + std::vector<Breakpoint> bps, + bp_type_t type) +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + if (!conn.interceptor) { + return seastar::now(); + } + return conn.interceptor->intercept(conn, bps + ).then([this, type](bp_action_t action) { + return seastar::smp::submit_to( + socket->get_shard_id(), + [this, type, action] { + socket->set_trap(type, action, &conn.interceptor->blocker); + }); + }); +} +#endif + +void FrameAssemblerV2::set_is_rev1(bool _is_rev1) +{ + assert(seastar::this_shard_id() == sid); + is_rev1 = _is_rev1; + tx_frame_asm.set_is_rev1(_is_rev1); + rx_frame_asm.set_is_rev1(_is_rev1); +} + +void FrameAssemblerV2::create_session_stream_handlers( + const AuthConnectionMeta &auth_meta, + bool crossed) +{ + assert(seastar::this_shard_id() == sid); + session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( + nullptr, auth_meta, is_rev1, crossed); +} + +void FrameAssemblerV2::reset_handlers() +{ + assert(seastar::this_shard_id() == sid); + session_stream_handlers = { nullptr, nullptr }; + session_comp_handlers = { nullptr, nullptr }; +} + +FrameAssemblerV2::mover_t +FrameAssemblerV2::to_replace() +{ + assert(seastar::this_shard_id() == sid); + assert(is_socket_valid()); + + clear(); + + return mover_t{ + move_socket(), + std::move(session_stream_handlers), + std::move(session_comp_handlers)}; +} + +seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover) +{ + assert(seastar::this_shard_id() == sid); + + clear(); + + session_stream_handlers = std::move(mover.session_stream_handlers); + session_comp_handlers = std::move(mover.session_comp_handlers); + if (has_socket()) { + return replace_shutdown_socket(std::move(mover.socket)); + } else { + set_socket(std::move(mover.socket)); + return seastar::now(); + } +} + +void FrameAssemblerV2::start_recording() +{ + assert(seastar::this_shard_id() == sid); + record_io = true; + rxbuf.clear(); + txbuf.clear(); +} + +FrameAssemblerV2::record_bufs_t +FrameAssemblerV2::stop_recording() +{ + assert(seastar::this_shard_id() == sid); + ceph_assert_always(record_io == true); + record_io = false; + return record_bufs_t{std::move(rxbuf), std::move(txbuf)}; +} + +bool FrameAssemblerV2::has_socket() const +{ + assert((socket && conn.socket) || (!socket && !conn.socket)); + return bool(socket); +} + +bool FrameAssemblerV2::is_socket_valid() const +{ + assert(seastar::this_shard_id() == sid); +#ifndef NDEBUG + if (has_socket() && socket->get_shard_id() == sid) { + assert(socket->is_shutdown() == is_socket_shutdown); + } +#endif + return has_socket() && !is_socket_shutdown; +} + +seastar::shard_id +FrameAssemblerV2::get_socket_shard_id() const +{ + assert(seastar::this_shard_id() == sid); + assert(is_socket_valid()); + return socket->get_shard_id(); +} + +SocketFRef FrameAssemblerV2::move_socket() +{ + assert(has_socket()); + conn.set_socket(nullptr); + return std::move(socket); +} + +void FrameAssemblerV2::set_socket(SocketFRef &&new_socket) +{ + assert(seastar::this_shard_id() == sid); + assert(!has_socket()); + assert(new_socket); + socket = std::move(new_socket); + conn.set_socket(socket.get()); + is_socket_shutdown = false; + assert(is_socket_valid()); +} + +void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port) +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + // Note: may not invoke on the socket core + socket->learn_ephemeral_port_as_connector(port); +} + +template <bool may_cross_core> +void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *gate) +{ + assert(seastar::this_shard_id() == sid); + assert(is_socket_valid()); + is_socket_shutdown = true; + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + assert(gate); + gate->dispatch_in_background("shutdown_socket", conn, [this] { + return seastar::smp::submit_to( + socket->get_shard_id(), [this] { + socket->shutdown(); + }); + }); + } else { + assert(socket->get_shard_id() == sid); + assert(!gate); + socket->shutdown(); + } +} +template void FrameAssemblerV2::shutdown_socket<true>(crimson::common::Gated *); +template void FrameAssemblerV2::shutdown_socket<false>(crimson::common::Gated *); + +seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket) +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + assert(!is_socket_valid()); + auto old_socket = move_socket(); + auto old_socket_shard_id = old_socket->get_shard_id(); + set_socket(std::move(new_socket)); + return seastar::smp::submit_to( + old_socket_shard_id, + [old_socket = std::move(old_socket)]() mutable { + return old_socket->close( + ).then([sock = std::move(old_socket)] {}); + }); +} + +seastar::future<> FrameAssemblerV2::close_shutdown_socket() +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + assert(!is_socket_valid()); + return seastar::smp::submit_to( + socket->get_shard_id(), [this] { + return socket->close(); + }); +} + +template <bool may_cross_core> +seastar::future<ceph::bufferptr> +FrameAssemblerV2::read_exactly(std::size_t bytes) +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + return seastar::smp::submit_to( + socket->get_shard_id(), [this, bytes] { + return socket->read_exactly(bytes); + }).then([this](auto bptr) { + if (record_io) { + rxbuf.append(bptr); + } + return bptr; + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->read_exactly(bytes); + } +} +template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<true>(std::size_t); +template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<false>(std::size_t); + +template <bool may_cross_core> +seastar::future<ceph::bufferlist> +FrameAssemblerV2::read(std::size_t bytes) +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + return seastar::smp::submit_to( + socket->get_shard_id(), [this, bytes] { + return socket->read(bytes); + }).then([this](auto buf) { + if (record_io) { + rxbuf.append(buf); + } + return buf; + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->read(bytes); + } +} +template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<true>(std::size_t); +template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<false>(std::size_t); + +template <bool may_cross_core> +seastar::future<> +FrameAssemblerV2::write(ceph::bufferlist buf) +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + if (record_io) { + txbuf.append(buf); + } + return seastar::smp::submit_to( + socket->get_shard_id(), [this, buf = std::move(buf)]() mutable { + return socket->write(std::move(buf)); + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->write(std::move(buf)); + } +} +template seastar::future<> FrameAssemblerV2::write<true>(ceph::bufferlist); +template seastar::future<> FrameAssemblerV2::write<false>(ceph::bufferlist); + +template <bool may_cross_core> +seastar::future<> +FrameAssemblerV2::flush() +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + return seastar::smp::submit_to( + socket->get_shard_id(), [this] { + return socket->flush(); + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->flush(); + } +} +template seastar::future<> FrameAssemblerV2::flush<true>(); +template seastar::future<> FrameAssemblerV2::flush<false>(); + +template <bool may_cross_core> +seastar::future<> +FrameAssemblerV2::write_flush(ceph::bufferlist buf) +{ + assert(seastar::this_shard_id() == sid); + assert(has_socket()); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + if (unlikely(record_io)) { + txbuf.append(buf); + } + return seastar::smp::submit_to( + socket->get_shard_id(), [this, buf = std::move(buf)]() mutable { + return socket->write_flush(std::move(buf)); + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->write_flush(std::move(buf)); + } +} +template seastar::future<> FrameAssemblerV2::write_flush<true>(ceph::bufferlist); +template seastar::future<> FrameAssemblerV2::write_flush<false>(ceph::bufferlist); + +template <bool may_cross_core> +seastar::future<FrameAssemblerV2::read_main_t> +FrameAssemblerV2::read_main_preamble() +{ + assert(seastar::this_shard_id() == sid); + rx_preamble.clear(); + return read_exactly<may_cross_core>( + rx_frame_asm.get_preamble_onwire_len() + ).then([this](auto bptr) { + rx_preamble.append(std::move(bptr)); + Tag tag; + try { + tag = rx_frame_asm.disassemble_preamble(rx_preamble); + } catch (FrameError& e) { + logger().warn("{} read_main_preamble: {}", conn, e.what()); + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + } +#ifdef UNIT_TESTS_BUILT + return intercept_frame(tag, false + ).then([this, tag] { + return read_main_t{tag, &rx_frame_asm}; + }); +#else + return read_main_t{tag, &rx_frame_asm}; +#endif + }); +} +template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<true>(); +template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<false>(); + +template <bool may_cross_core> +seastar::future<FrameAssemblerV2::read_payload_t*> +FrameAssemblerV2::read_frame_payload() +{ + assert(seastar::this_shard_id() == sid); + rx_segments_data.clear(); + return seastar::do_until( + [this] { + return rx_frame_asm.get_num_segments() == rx_segments_data.size(); + }, + [this] { + // TODO: create aligned and contiguous buffer from socket + const size_t seg_idx = rx_segments_data.size(); + if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx); + alignment != segment_t::DEFAULT_ALIGNMENT) { + logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}", + conn, alignment, rx_segments_data.size()); + } + uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); + // TODO: create aligned and contiguous buffer from socket + return read_exactly<may_cross_core>(onwire_len + ).then([this](auto bptr) { + logger().trace("{} RECV({}) frame segment[{}]", + conn, bptr.length(), rx_segments_data.size()); + bufferlist segment; + segment.append(std::move(bptr)); + rx_segments_data.emplace_back(std::move(segment)); + }); + } + ).then([this] { + return read_exactly<may_cross_core>(rx_frame_asm.get_epilogue_onwire_len()); + }).then([this](auto bptr) { + logger().trace("{} RECV({}) frame epilogue", conn, bptr.length()); + bool ok = false; + try { + bufferlist rx_epilogue; + rx_epilogue.append(std::move(bptr)); + ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue); + } catch (FrameError& e) { + logger().error("read_frame_payload: {} {}", conn, e.what()); + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + } catch (ceph::crypto::onwire::MsgAuthError&) { + logger().error("read_frame_payload: {} bad auth tag", conn); + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + } + // we do have a mechanism that allows transmitter to start sending message + // and abort after putting entire data field on wire. This will be used by + // the kernel client to avoid unnecessary buffering. + if (!ok) { + ceph_abort("TODO"); + } + return &rx_segments_data; + }); +} +template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<true>(); +template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<false>(); + +void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl) +{ + const auto main_preamble = + reinterpret_cast<const preamble_block_t*>(bl.front().c_str()); + logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", + conn, bl.length(), (int)main_preamble->tag, + (int)main_preamble->num_segments, main_preamble->crc); +} + +FrameAssemblerV2Ref FrameAssemblerV2::create(SocketConnection &conn) +{ + return std::make_unique<FrameAssemblerV2>(conn); +} + +void FrameAssemblerV2::clear() +{ + record_io = false; + rxbuf.clear(); + txbuf.clear(); + rx_preamble.clear(); + rx_segments_data.clear(); +} + +} // namespace crimson::net diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h new file mode 100644 index 000000000..9c89c144e --- /dev/null +++ b/src/crimson/net/FrameAssemblerV2.h @@ -0,0 +1,257 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "msg/async/frames_v2.h" +#include "msg/async/crypto_onwire.h" +#include "msg/async/compression_onwire.h" + +#include "crimson/common/gated.h" +#include "crimson/net/Socket.h" + +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + +namespace crimson::net { + +class SocketConnection; +class FrameAssemblerV2; +using FrameAssemblerV2Ref = std::unique_ptr<FrameAssemblerV2>; + +class FrameAssemblerV2 { +public: + FrameAssemblerV2(SocketConnection &conn); + + ~FrameAssemblerV2(); + + FrameAssemblerV2(const FrameAssemblerV2 &) = delete; + + FrameAssemblerV2(FrameAssemblerV2 &&) = delete; + + void set_shard_id(seastar::shard_id _sid) { + assert(seastar::this_shard_id() == sid); + clear(); + sid = _sid; + } + + seastar::shard_id get_shard_id() const { + return sid; + } + + void set_is_rev1(bool is_rev1); + + void create_session_stream_handlers( + const AuthConnectionMeta &auth_meta, + bool crossed); + + void reset_handlers(); + + /* + * replacing + */ + + struct mover_t { + SocketFRef socket; + ceph::crypto::onwire::rxtx_t session_stream_handlers; + ceph::compression::onwire::rxtx_t session_comp_handlers; + }; + + mover_t to_replace(); + + seastar::future<> replace_by(mover_t &&); + + /* + * auth signature interfaces + */ + + void start_recording(); + + struct record_bufs_t { + ceph::bufferlist rxbuf; + ceph::bufferlist txbuf; + }; + record_bufs_t stop_recording(); + + /* + * socket maintainence interfaces + */ + + // the socket exists and not shutdown + bool is_socket_valid() const; + + seastar::shard_id get_socket_shard_id() const; + + void set_socket(SocketFRef &&); + + void learn_socket_ephemeral_port_as_connector(uint16_t port); + + // if may_cross_core == true, gate is required for cross-core shutdown + template <bool may_cross_core> + void shutdown_socket(crimson::common::Gated *gate); + + seastar::future<> replace_shutdown_socket(SocketFRef &&); + + seastar::future<> close_shutdown_socket(); + + /* + * socket read and write interfaces + */ + + template <bool may_cross_core = true> + seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes); + + template <bool may_cross_core = true> + seastar::future<ceph::bufferlist> read(std::size_t bytes); + + template <bool may_cross_core = true> + seastar::future<> write(ceph::bufferlist); + + template <bool may_cross_core = true> + seastar::future<> flush(); + + template <bool may_cross_core = true> + seastar::future<> write_flush(ceph::bufferlist); + + /* + * frame read and write interfaces + */ + + /// may throw negotiation_failure as fault + struct read_main_t { + ceph::msgr::v2::Tag tag; + const ceph::msgr::v2::FrameAssembler *rx_frame_asm; + }; + template <bool may_cross_core = true> + seastar::future<read_main_t> read_main_preamble(); + + /// may throw negotiation_failure as fault + using read_payload_t = ceph::msgr::v2::segment_bls_t; + // FIXME: read_payload_t cannot be no-throw move constructible + template <bool may_cross_core = true> + seastar::future<read_payload_t*> read_frame_payload(); + + template <class F> + ceph::bufferlist get_buffer(F &tx_frame) { + assert(seastar::this_shard_id() == sid); + auto bl = tx_frame.get_buffer(tx_frame_asm); + log_main_preamble(bl); + return bl; + } + + template <class F, bool may_cross_core = true> + seastar::future<> write_flush_frame(F &tx_frame) { + assert(seastar::this_shard_id() == sid); + auto bl = get_buffer(tx_frame); +#ifdef UNIT_TESTS_BUILT + return intercept_frame(F::tag, true + ).then([this, bl=std::move(bl)]() mutable { + return write_flush<may_cross_core>(std::move(bl)); + }); +#else + return write_flush<may_cross_core>(std::move(bl)); +#endif + } + + static FrameAssemblerV2Ref create(SocketConnection &conn); + +#ifdef UNIT_TESTS_BUILT + seastar::future<> intercept_frames( + std::vector<ceph::msgr::v2::Tag> tags, + bool is_write) { + auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + std::vector<Breakpoint> bps; + for (auto &tag : tags) { + bps.emplace_back(Breakpoint{tag, type}); + } + return intercept_frames(bps, type); + } + + seastar::future<> intercept_frame( + ceph::msgr::v2::Tag tag, + bool is_write) { + auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + std::vector<Breakpoint> bps; + bps.emplace_back(Breakpoint{tag, type}); + return intercept_frames(bps, type); + } + + seastar::future<> intercept_frame( + custom_bp_t bp, + bool is_write) { + auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + std::vector<Breakpoint> bps; + bps.emplace_back(Breakpoint{bp}); + return intercept_frames(bps, type); + } +#endif + +private: +#ifdef UNIT_TESTS_BUILT + seastar::future<> intercept_frames( + std::vector<Breakpoint> bps, + bp_type_t type); +#endif + + bool has_socket() const; + + SocketFRef move_socket(); + + void clear(); + + void log_main_preamble(const ceph::bufferlist &bl); + + SocketConnection &conn; + + SocketFRef socket; + + // checking Socket::is_shutdown() synchronously is impossible when sid is + // different from the socket sid. + bool is_socket_shutdown = false; + + // the current working shard, can be messenger or socket shard. + // if is messenger shard, should call interfaces with may_cross_core = true. + seastar::shard_id sid; + + /* + * auth signature + * + * only in the messenger core + */ + + bool record_io = false; + + ceph::bufferlist rxbuf; + + ceph::bufferlist txbuf; + + /* + * frame data and handlers + */ + + ceph::crypto::onwire::rxtx_t session_stream_handlers = { nullptr, nullptr }; + + // TODO + ceph::compression::onwire::rxtx_t session_comp_handlers = { nullptr, nullptr }; + + bool is_rev1 = false; + + ceph::msgr::v2::FrameAssembler tx_frame_asm{ + &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data, + &session_comp_handlers}; + + ceph::msgr::v2::FrameAssembler rx_frame_asm{ + &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data, + &session_comp_handlers}; + + // in the messenger core during handshake, + // and in the socket core during open, + // must be cleaned before switching cores. + + ceph::bufferlist rx_preamble; + + read_payload_t rx_segments_data; +}; + +} // namespace crimson::net diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h new file mode 100644 index 000000000..2b1595141 --- /dev/null +++ b/src/crimson/net/Fwd.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <boost/container/small_vector.hpp> +#include <seastar/core/future.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/sharded.hh> + +#include "msg/Connection.h" +#include "msg/MessageRef.h" +#include "msg/msg_types.h" + +#include "crimson/common/errorator.h" +#include "crimson/common/local_shared_foreign_ptr.h" + +class AuthConnectionMeta; + +namespace crimson::net { + +using msgr_tag_t = uint8_t; +using stop_t = seastar::stop_iteration; + +class Connection; +using ConnectionLRef = seastar::shared_ptr<Connection>; +using ConnectionFRef = seastar::foreign_ptr<ConnectionLRef>; +using ConnectionRef = ::crimson::local_shared_foreign_ptr<ConnectionLRef>; + +class Dispatcher; +class ChainedDispatchers; +constexpr std::size_t NUM_DISPATCHERS = 4u; +using dispatchers_t = boost::container::small_vector<Dispatcher*, NUM_DISPATCHERS>; + +class Messenger; +using MessengerRef = seastar::shared_ptr<Messenger>; + +using MessageFRef = seastar::foreign_ptr<MessageURef>; + +} // namespace crimson::net diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h new file mode 100644 index 000000000..35b74e243 --- /dev/null +++ b/src/crimson/net/Interceptor.h @@ -0,0 +1,186 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <variant> +#include <seastar/core/sharded.hh> +#include <seastar/core/sleep.hh> + +#include "Fwd.h" +#include "msg/async/frames_v2.h" + +namespace crimson::net { + +enum class custom_bp_t : uint8_t { + BANNER_WRITE = 0, + BANNER_READ, + BANNER_PAYLOAD_READ, + SOCKET_CONNECTING, + SOCKET_ACCEPTED +}; +inline const char* get_bp_name(custom_bp_t bp) { + uint8_t index = static_cast<uint8_t>(bp); + static const char *const bp_names[] = {"BANNER_WRITE", + "BANNER_READ", + "BANNER_PAYLOAD_READ", + "SOCKET_CONNECTING", + "SOCKET_ACCEPTED"}; + assert(index < std::size(bp_names)); + return bp_names[index]; +} + +enum class bp_type_t { + READ = 0, + WRITE +}; + +enum class bp_action_t { + CONTINUE = 0, + FAULT, + BLOCK, + STALL +}; + +class socket_blocker { + std::optional<seastar::abort_source> p_blocked; + std::optional<seastar::abort_source> p_unblocked; + const seastar::shard_id primary_sid; + + public: + socket_blocker() : primary_sid{seastar::this_shard_id()} {} + + seastar::future<> wait_blocked() { + ceph_assert(seastar::this_shard_id() == primary_sid); + ceph_assert(!p_blocked); + if (p_unblocked) { + return seastar::make_ready_future<>(); + } else { + p_blocked = seastar::abort_source(); + return seastar::sleep_abortable( + std::chrono::seconds(10), *p_blocked + ).then([] { + throw std::runtime_error( + "Timeout (10s) in socket_blocker::wait_blocked()"); + }).handle_exception_type([] (const seastar::sleep_aborted& e) { + // wait done! + }); + } + } + + seastar::future<> block() { + return seastar::smp::submit_to(primary_sid, [this] { + if (p_blocked) { + p_blocked->request_abort(); + p_blocked = std::nullopt; + } + ceph_assert(!p_unblocked); + p_unblocked = seastar::abort_source(); + return seastar::sleep_abortable( + std::chrono::seconds(10), *p_unblocked + ).then([] { + ceph_abort("Timeout (10s) in socket_blocker::block()"); + }).handle_exception_type([] (const seastar::sleep_aborted& e) { + // wait done! + }); + }); + } + + void unblock() { + ceph_assert(seastar::this_shard_id() == primary_sid); + ceph_assert(!p_blocked); + ceph_assert(p_unblocked); + p_unblocked->request_abort(); + p_unblocked = std::nullopt; + } +}; + +struct tag_bp_t { + ceph::msgr::v2::Tag tag; + bp_type_t type; + bool operator==(const tag_bp_t& x) const { + return tag == x.tag && type == x.type; + } + bool operator!=(const tag_bp_t& x) const { return !operator==(x); } + bool operator<(const tag_bp_t& x) const { + return std::tie(tag, type) < std::tie(x.tag, x.type); + } +}; + +struct Breakpoint { + using var_t = std::variant<custom_bp_t, tag_bp_t>; + var_t bp; + Breakpoint(custom_bp_t bp) : bp(bp) { } + Breakpoint(ceph::msgr::v2::Tag tag, bp_type_t type) + : bp(tag_bp_t{tag, type}) { } + bool operator==(const Breakpoint& x) const { return bp == x.bp; } + bool operator!=(const Breakpoint& x) const { return !operator==(x); } + bool operator==(const custom_bp_t& x) const { return bp == var_t(x); } + bool operator!=(const custom_bp_t& x) const { return !operator==(x); } + bool operator==(const tag_bp_t& x) const { return bp == var_t(x); } + bool operator!=(const tag_bp_t& x) const { return !operator==(x); } + bool operator<(const Breakpoint& x) const { return bp < x.bp; } +}; + +struct Interceptor { + socket_blocker blocker; + virtual ~Interceptor() {} + virtual void register_conn(ConnectionRef) = 0; + virtual void register_conn_ready(ConnectionRef) = 0; + virtual void register_conn_closed(ConnectionRef) = 0; + virtual void register_conn_replaced(ConnectionRef) = 0; + + virtual seastar::future<bp_action_t> + intercept(Connection&, std::vector<Breakpoint> bp) = 0; +}; + +} // namespace crimson::net + +template<> +struct fmt::formatter<crimson::net::bp_action_t> : fmt::formatter<std::string_view> { + template <typename FormatContext> + auto format(const crimson::net::bp_action_t& action, FormatContext& ctx) const { + static const char *const action_names[] = {"CONTINUE", + "FAULT", + "BLOCK", + "STALL"}; + assert(static_cast<size_t>(action) < std::size(action_names)); + return formatter<std::string_view>::format(action_names[static_cast<size_t>(action)], ctx); + } +}; + +template<> +struct fmt::formatter<crimson::net::Breakpoint> : fmt::formatter<std::string_view> { + template <typename FormatContext> + auto format(const crimson::net::Breakpoint& bp, FormatContext& ctx) const { + if (auto custom_bp = std::get_if<crimson::net::custom_bp_t>(&bp.bp)) { + return formatter<std::string_view>::format(crimson::net::get_bp_name(*custom_bp), ctx); + } + auto tag_bp = std::get<crimson::net::tag_bp_t>(bp.bp); + static const char *const tag_names[] = {"NONE", + "HELLO", + "AUTH_REQUEST", + "AUTH_BAD_METHOD", + "AUTH_REPLY_MORE", + "AUTH_REQUEST_MORE", + "AUTH_DONE", + "AUTH_SIGNATURE", + "CLIENT_IDENT", + "SERVER_IDENT", + "IDENT_MISSING_FEATURES", + "SESSION_RECONNECT", + "SESSION_RESET", + "SESSION_RETRY", + "SESSION_RETRY_GLOBAL", + "SESSION_RECONNECT_OK", + "WAIT", + "MESSAGE", + "KEEPALIVE2", + "KEEPALIVE2_ACK", + "ACK"}; + assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names)); + return fmt::format_to(ctx.out(), "{}_{}", + tag_names[static_cast<size_t>(tag_bp.tag)], + tag_bp.type == crimson::net::bp_type_t::WRITE ? "WRITE" : "READ"); + } +}; diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc new file mode 100644 index 000000000..1af198589 --- /dev/null +++ b/src/crimson/net/Messenger.cc @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Messenger.h" +#include "SocketMessenger.h" + +namespace crimson::net { + +MessengerRef +Messenger::create(const entity_name_t& name, + const std::string& lname, + uint64_t nonce, + bool dispatch_only_on_this_shard) +{ + return seastar::make_shared<SocketMessenger>( + name, lname, nonce, dispatch_only_on_this_shard); +} + +} // namespace crimson::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h new file mode 100644 index 000000000..74df062d8 --- /dev/null +++ b/src/crimson/net/Messenger.h @@ -0,0 +1,130 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "Fwd.h" +#include "crimson/common/throttle.h" +#include "msg/Message.h" +#include "msg/Policy.h" + +class AuthAuthorizer; + +namespace crimson::auth { +class AuthClient; +class AuthServer; +} + +namespace crimson::net { + +#ifdef UNIT_TESTS_BUILT +class Interceptor; +#endif + +using Throttle = crimson::common::Throttle; +using SocketPolicy = ceph::net::Policy<Throttle>; + +class Messenger { +public: + Messenger() {} + + virtual ~Messenger() {} + + virtual const entity_name_t& get_myname() const = 0; + + entity_type_t get_mytype() const { return get_myname().type(); } + + virtual const entity_addrvec_t &get_myaddrs() const = 0; + + entity_addr_t get_myaddr() const { return get_myaddrs().front(); } + + virtual void set_myaddrs(const entity_addrvec_t& addrs) = 0; + + virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0; + + virtual void set_auth_client(crimson::auth::AuthClient *) = 0; + + virtual void set_auth_server(crimson::auth::AuthServer *) = 0; + + using bind_ertr = crimson::errorator< + crimson::ct_error::address_in_use, // The address (range) is already bound + crimson::ct_error::address_not_available + >; + /// bind to the given address + virtual bind_ertr::future<> bind(const entity_addrvec_t& addr) = 0; + + /// start the messenger + virtual seastar::future<> start(const dispatchers_t&) = 0; + + /// either return an existing connection to the peer, + /// or a new pending connection + virtual ConnectionRef + connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) = 0; + + ConnectionRef + connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) { + return connect(peer_addr, entity_name_t(peer_type, -1)); + } + + virtual bool owns_connection(Connection &) const = 0; + + // wait for messenger shutdown + virtual seastar::future<> wait() = 0; + + // stop dispatching events and messages + virtual void stop() = 0; + + virtual bool is_started() const = 0; + + // free internal resources before destruction, must be called after stopped, + // and must be called if is bound. + virtual seastar::future<> shutdown() = 0; + + virtual void print(std::ostream& out) const = 0; + + virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0; + + virtual SocketPolicy get_default_policy() const = 0; + + virtual void set_default_policy(const SocketPolicy& p) = 0; + + virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0; + + virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0; + + static MessengerRef + create(const entity_name_t& name, + const std::string& lname, + uint64_t nonce, + bool dispatch_only_on_this_shard); + +#ifdef UNIT_TESTS_BUILT + virtual void set_interceptor(Interceptor *) = 0; +#endif +}; + +inline std::ostream& operator<<(std::ostream& out, const Messenger& msgr) { + out << "["; + msgr.print(out); + out << "]"; + return out; +} + +} // namespace crimson::net + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::net::Messenger> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc new file mode 100644 index 000000000..55b669384 --- /dev/null +++ b/src/crimson/net/ProtocolV2.cc @@ -0,0 +1,2348 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ProtocolV2.h" + +#include <fmt/format.h> +#include <fmt/ranges.h> +#include "include/msgr.h" +#include "include/random.h" +#include "msg/msg_fmt.h" + +#include "crimson/auth/AuthClient.h" +#include "crimson/auth/AuthServer.h" +#include "crimson/common/formatter.h" +#include "crimson/common/log.h" + +#include "Errors.h" +#include "SocketMessenger.h" + +using namespace ceph::msgr::v2; +using crimson::common::local_conf; + +namespace { + +// TODO: CEPH_MSGR2_FEATURE_COMPRESSION +const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES = + (CEPH_MSGR2_FEATURE_REVISION_1 | + // CEPH_MSGR2_FEATURE_COMPRESSION | + UINT64_C(0)); + +// Log levels in V2 Protocol: +// * error level, something error that cause connection to terminate: +// - fatal errors; +// - bugs; +// * warn level: something unusual that identifies connection fault or replacement: +// - unstable network; +// - incompatible peer; +// - auth failure; +// - connection race; +// - connection reset; +// * info level, something very important to show connection lifecycle, +// which doesn't happen very frequently; +// * debug level, important logs for debugging, including: +// - all the messages sent/received (-->/<==); +// - all the frames exchanged (WRITE/GOT); +// - important fields updated (UPDATE); +// - connection state transitions (TRIGGER); +// * trace level, trivial logs showing: +// - the exact bytes being sent/received (SEND/RECV(bytes)); +// - detailed information of sub-frames; +// - integrity checks; +// - etc. +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +[[noreturn]] void abort_in_fault() { + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); +} + +[[noreturn]] void abort_protocol() { + throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); +} + +#define ABORT_IN_CLOSE(is_dispatch_reset) { \ + do_close(is_dispatch_reset); \ + abort_protocol(); \ +} + +inline void expect_tag(const Tag& expected, + const Tag& actual, + crimson::net::SocketConnection& conn, + const char *where) { + if (actual != expected) { + logger().warn("{} {} received wrong tag: {}, expected {}", + conn, where, + static_cast<uint32_t>(actual), + static_cast<uint32_t>(expected)); + abort_in_fault(); + } +} + +inline void unexpected_tag(const Tag& unexpected, + crimson::net::SocketConnection& conn, + const char *where) { + logger().warn("{} {} received unexpected tag: {}", + conn, where, static_cast<uint32_t>(unexpected)); + abort_in_fault(); +} + +inline uint64_t generate_client_cookie() { + return ceph::util::generate_random_number<uint64_t>( + 1, std::numeric_limits<uint64_t>::max()); +} + +} // namespace anonymous + +namespace crimson::net { + +seastar::future<> ProtocolV2::Timer::backoff(double seconds) +{ + logger().warn("{} waiting {} seconds ...", conn, seconds); + cancel(); + last_dur_ = seconds; + as = seastar::abort_source(); + auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>( + std::chrono::duration<double>(seconds)); + return seastar::sleep_abortable(dur, *as + ).handle_exception_type([this] (const seastar::sleep_aborted& e) { + logger().debug("{} wait aborted", conn); + abort_protocol(); + }); +} + +ProtocolV2::ProtocolV2(SocketConnection& conn, + IOHandler &io_handler) + : conn{conn}, + messenger{conn.messenger}, + io_handler{io_handler}, + frame_assembler{FrameAssemblerV2::create(conn)}, + auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}, + protocol_timer{conn} +{ + io_states = io_handler.get_states(); +} + +ProtocolV2::~ProtocolV2() {} + +void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, + const entity_name_t& _peer_name) +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + ceph_assert(state == state_t::NONE); + ceph_assert(!gate.is_closed()); + conn.peer_addr = _peer_addr; + conn.target_addr = _peer_addr; + conn.set_peer_name(_peer_name); + conn.policy = messenger.get_policy(_peer_name.type()); + client_cookie = generate_client_cookie(); + logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}" + " policy(lossy={}, server={}, standby={}, resetcheck={})", + conn, _peer_addr, _peer_name, client_cookie, + conn.policy.lossy, conn.policy.server, + conn.policy.standby, conn.policy.resetcheck); + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + execute_connecting(); +} + +void ProtocolV2::start_accept(SocketFRef&& new_socket, + const entity_addr_t& _peer_addr) +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + ceph_assert(state == state_t::NONE); + // until we know better + conn.target_addr = _peer_addr; + frame_assembler->set_socket(std::move(new_socket)); + has_socket = true; + is_socket_valid = true; + logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); + messenger.accept_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + + auto cc_seq = crosscore.prepare_submit(); + gate.dispatch_in_background("set_accepted_sid", conn, [this, cc_seq] { + return io_handler.set_accepted_sid( + cc_seq, + frame_assembler->get_socket_shard_id(), + seastar::make_foreign(conn.shared_from_this())); + }); + + execute_accepting(); +} + +void ProtocolV2::trigger_state_phase1(state_t new_state) +{ + ceph_assert_always(!gate.is_closed()); + if (new_state == state) { + logger().error("{} is not allowed to re-trigger state {}", + conn, get_state_name(state)); + ceph_abort(); + } + if (state == state_t::CLOSING) { + logger().error("{} CLOSING is not allowed to trigger state {}", + conn, get_state_name(new_state)); + ceph_abort(); + } + logger().debug("{} TRIGGER {}, was {}", + conn, get_state_name(new_state), get_state_name(state)); + + if (state == state_t::READY) { + // from READY + ceph_assert_always(!need_exit_io); + ceph_assert_always(!pr_exit_io.has_value()); + need_exit_io = true; + pr_exit_io = seastar::shared_promise<>(); + } + + if (new_state == state_t::STANDBY && !conn.policy.server) { + need_notify_out = true; + } else { + need_notify_out = false; + } + + state = new_state; +} + +void ProtocolV2::trigger_state_phase2( + state_t new_state, io_state_t new_io_state) +{ + ceph_assert_always(new_state == state); + ceph_assert_always(!gate.is_closed()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + + FrameAssemblerV2Ref fa; + if (new_state == state_t::READY) { + assert(new_io_state == io_state_t::open); + assert(io_handler.get_shard_id() == + frame_assembler->get_socket_shard_id()); + frame_assembler->set_shard_id(io_handler.get_shard_id()); + fa = std::move(frame_assembler); + } else { + assert(new_io_state != io_state_t::open); + } + + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, " + "fa={}, set_notify_out={}", + conn, cc_seq, get_state_name(new_state), new_io_state, + fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A", + need_notify_out); + gate.dispatch_in_background( + "set_io_state", conn, + [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable { + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, cc_seq, new_io_state, + fa=std::move(fa), set_notify_out=need_notify_out]() mutable { + return io_handler.set_io_state( + cc_seq, new_io_state, std::move(fa), set_notify_out); + }); + }); + + if (need_exit_io) { + // from READY + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...", + conn, cc_seq); + assert(pr_exit_io.has_value()); + assert(new_io_state != io_state_t::open); + need_exit_io = false; + gate.dispatch_in_background("exit_io", conn, [this, cc_seq] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, cc_seq] { + return io_handler.wait_io_exit_dispatching(cc_seq); + }).then([this, cc_seq](auto ret) { + logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}", + conn, cc_seq, ret.io_states); + frame_assembler = std::move(ret.frame_assembler); + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + ceph_assert_always( + seastar::this_shard_id() == frame_assembler->get_shard_id()); + ceph_assert_always(!frame_assembler->is_socket_valid()); + assert(!need_exit_io); + io_states = ret.io_states; + pr_exit_io->set_value(); + pr_exit_io = std::nullopt; + }); + }); + } +} + +void ProtocolV2::fault( + state_t expected_state, + const char *where, + std::exception_ptr eptr) +{ + assert(expected_state == state_t::CONNECTING || + expected_state == state_t::ESTABLISHING || + expected_state == state_t::REPLACING || + expected_state == state_t::READY); + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + + if (state != expected_state) { + logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}", + conn, + get_state_name(expected_state), + where, + get_state_name(state), + e_what); +#ifndef NDEBUG + if (expected_state == state_t::REPLACING) { + assert(state == state_t::CLOSING); + } else if (expected_state == state_t::READY) { + assert(state == state_t::CLOSING || + state == state_t::REPLACING || + state == state_t::CONNECTING || + state == state_t::STANDBY); + } else { + assert(state == state_t::CLOSING || + state == state_t::REPLACING); + } +#endif + return; + } + assert(state == expected_state); + + if (state != state_t::CONNECTING && conn.policy.lossy) { + // socket will be shutdown in do_close() + logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}", + conn, get_state_name(state), where, e_what); + do_close(true); + return; + } + + if (likely(has_socket)) { + if (likely(is_socket_valid)) { + ceph_assert_always(state != state_t::READY); + frame_assembler->shutdown_socket<true>(&gate); + is_socket_valid = false; + } else { + ceph_assert_always(state != state_t::ESTABLISHING); + } + } else { // !has_socket + ceph_assert_always(state == state_t::CONNECTING); + assert(!is_socket_valid); + } + + if (conn.policy.server || + (conn.policy.standby && !io_states.is_out_queued_or_sent())) { + if (conn.policy.server) { + logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}", + conn, + get_state_name(state), + where, + io_states, + e_what); + } else { + logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}", + conn, + get_state_name(state), + where, + io_states, + e_what); + } + execute_standby(); + } else if (state == state_t::CONNECTING || + state == state_t::REPLACING) { + logger().info("{} protocol {} {} fault, going to WAIT {} -- {}", + conn, + get_state_name(state), + where, + io_states, + e_what); + execute_wait(false); + } else { + assert(state == state_t::READY || + state == state_t::ESTABLISHING); + logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}", + conn, + get_state_name(state), + where, + io_states, + e_what); + execute_connecting(); + } +} + +void ProtocolV2::reset_session(bool full) +{ + server_cookie = 0; + connect_seq = 0; + if (full) { + client_cookie = generate_client_cookie(); + peer_global_seq = 0; + } + + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::reset_session({})", + conn, cc_seq, full); + io_states.reset_session(full); + gate.dispatch_in_background( + "reset_session", conn, [this, cc_seq, full] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, cc_seq, full] { + return io_handler.reset_session(cc_seq, full); + }); + }); + // user can make changes +} + +seastar::future<std::tuple<entity_type_t, entity_addr_t>> +ProtocolV2::banner_exchange(bool is_connect) +{ + // 1. prepare and send banner + bufferlist banner_payload; + encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0); + encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0); + + bufferlist bl; + bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); + auto len_payload = static_cast<uint16_t>(banner_payload.length()); + encode(len_payload, bl, 0); + bl.claim_append(banner_payload); + logger().debug("{} SEND({}) banner: len_payload={}, supported={}, " + "required={}, banner=\"{}\"", + conn, bl.length(), len_payload, + CRIMSON_MSGR2_SUPPORTED_FEATURES, + CEPH_MSGR2_REQUIRED_FEATURES, + CEPH_BANNER_V2_PREFIX); +#ifdef UNIT_TESTS_BUILT + return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true + ).then([this, bl=std::move(bl)]() mutable { + return frame_assembler->write_flush(std::move(bl)); + } +#else + return frame_assembler->write_flush(std::move(bl) +#endif + ).then([this] { + // 2. read peer banner + unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); +#ifdef UNIT_TESTS_BUILT + return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false + ).then([this, banner_len] { + return frame_assembler->read_exactly(banner_len); + }); +#else + return frame_assembler->read_exactly(banner_len); +#endif + }).then([this](auto bptr) { + // 3. process peer banner and read banner_payload + unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); + logger().debug("{} RECV({}) banner: \"{}\"", + conn, bptr.length(), + std::string(bptr.c_str(), banner_prefix_len)); + + if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { + if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { + logger().warn("{} peer is using V1 protocol", conn); + } else { + logger().warn("{} peer sent bad banner", conn); + } + abort_in_fault(); + } + + bptr.set_offset(bptr.offset() + banner_prefix_len); + bptr.set_length(bptr.length() - banner_prefix_len); + assert(bptr.length() == sizeof(ceph_le16)); + + uint16_t payload_len; + bufferlist buf; + buf.append(std::move(bptr)); + auto ti = buf.cbegin(); + try { + decode(payload_len, ti); + } catch (const buffer::error &e) { + logger().warn("{} decode banner payload len failed", conn); + abort_in_fault(); + } + logger().debug("{} GOT banner: payload_len={}", conn, payload_len); +#ifdef UNIT_TESTS_BUILT + return frame_assembler->intercept_frame( + custom_bp_t::BANNER_PAYLOAD_READ, false + ).then([this, payload_len] { + return frame_assembler->read(payload_len); + }); +#else + return frame_assembler->read(payload_len); +#endif + }).then([this, is_connect] (bufferlist bl) { + // 4. process peer banner_payload and send HelloFrame + auto p = bl.cbegin(); + uint64_t _peer_supported_features; + uint64_t _peer_required_features; + try { + decode(_peer_supported_features, p); + decode(_peer_required_features, p); + } catch (const buffer::error &e) { + logger().warn("{} decode banner payload failed", conn); + abort_in_fault(); + } + logger().debug("{} RECV({}) banner features: supported={} required={}", + conn, bl.length(), + _peer_supported_features, _peer_required_features); + + // Check feature bit compatibility + uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES; + uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; + if ((required_features & _peer_supported_features) != required_features) { + logger().error("{} peer does not support all required features" + " required={} peer_supported={}", + conn, required_features, _peer_supported_features); + ABORT_IN_CLOSE(is_connect); + } + if ((supported_features & _peer_required_features) != _peer_required_features) { + logger().error("{} we do not support all peer required features" + " peer_required={} supported={}", + conn, _peer_required_features, supported_features); + ABORT_IN_CLOSE(is_connect); + } + peer_supported_features = _peer_supported_features; + bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); + frame_assembler->set_is_rev1(is_rev1); + + auto hello = HelloFrame::Encode(messenger.get_mytype(), + conn.target_addr); + logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", + conn, ceph_entity_type_name(messenger.get_mytype()), + conn.target_addr); + return frame_assembler->write_flush_frame(hello); + }).then([this] { + //5. read peer HelloFrame + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame"); + return frame_assembler->read_frame_payload(); + }).then([this](auto payload) { + // 6. process peer HelloFrame + auto hello = HelloFrame::Decode(payload->back()); + logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", + conn, ceph_entity_type_name(hello.entity_type()), + hello.peer_addr()); + return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>( + std::make_tuple(hello.entity_type(), hello.peer_addr())); + }); +} + +// CONNECTING state + +seastar::future<> ProtocolV2::handle_auth_reply() +{ + return frame_assembler->read_main_preamble( + ).then([this](auto ret) { + switch (ret.tag) { + case Tag::AUTH_BAD_METHOD: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_auth_bad_method() logic + auto bad_method = AuthBadMethodFrame::Decode(payload->back()); + logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, " + "allowed_methods={}, allowed_modes={}", + conn, bad_method.method(), cpp_strerror(bad_method.result()), + bad_method.allowed_methods(), bad_method.allowed_modes()); + ceph_assert(messenger.get_auth_client()); + int r = messenger.get_auth_client()->handle_auth_bad_method( + conn, *auth_meta, + bad_method.method(), bad_method.result(), + bad_method.allowed_methods(), bad_method.allowed_modes()); + if (r < 0) { + logger().warn("{} auth_client handle_auth_bad_method returned {}", + conn, r); + abort_in_fault(); + } + return client_auth(bad_method.allowed_methods()); + }); + case Tag::AUTH_REPLY_MORE: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_auth_reply_more() logic + auto auth_more = AuthReplyMoreFrame::Decode(payload->back()); + logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}", + conn, auth_more.auth_payload().length()); + ceph_assert(messenger.get_auth_client()); + // let execute_connecting() take care of the thrown exception + auto reply = messenger.get_auth_client()->handle_auth_reply_more( + conn, *auth_meta, auth_more.auth_payload()); + auto more_reply = AuthRequestMoreFrame::Encode(reply); + logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", + conn, reply.length()); + return frame_assembler->write_flush_frame(more_reply); + }).then([this] { + return handle_auth_reply(); + }); + case Tag::AUTH_DONE: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_auth_done() logic + auto auth_done = AuthDoneFrame::Decode(payload->back()); + logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}", + conn, auth_done.global_id(), + ceph_con_mode_name(auth_done.con_mode()), + auth_done.auth_payload().length()); + ceph_assert(messenger.get_auth_client()); + int r = messenger.get_auth_client()->handle_auth_done( + conn, + *auth_meta, + auth_done.global_id(), + auth_done.con_mode(), + auth_done.auth_payload()); + if (r < 0) { + logger().warn("{} auth_client handle_auth_done returned {}", conn, r); + abort_in_fault(); + } + auth_meta->con_mode = auth_done.con_mode(); + frame_assembler->create_session_stream_handlers(*auth_meta, false); + return finish_auth(); + }); + default: { + unexpected_tag(ret.tag, conn, "handle_auth_reply"); + return seastar::now(); + } + } + }); +} + +seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods) +{ + // send_auth_request() logic + ceph_assert(messenger.get_auth_client()); + + try { + auto [auth_method, preferred_modes, bl] = + messenger.get_auth_client()->get_auth_request(conn, *auth_meta); + auth_meta->auth_method = auth_method; + auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl); + logger().debug("{} WRITE AuthRequestFrame: method={}," + " preferred_modes={}, payload_len={}", + conn, auth_method, preferred_modes, bl.length()); + return frame_assembler->write_flush_frame(frame + ).then([this] { + return handle_auth_reply(); + }); + } catch (const crimson::auth::error& e) { + logger().error("{} get_initial_auth_request returned {}", conn, e.what()); + ABORT_IN_CLOSE(true); + return seastar::now(); + } +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::process_wait() +{ + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_wait() logic + logger().debug("{} GOT WaitFrame", conn); + WaitFrame::Decode(payload->back()); + return next_step_t::wait; + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::client_connect() +{ + // send_client_ident() logic + uint64_t flags = 0; + if (conn.policy.lossy) { + flags |= CEPH_MSG_CONNECT_LOSSY; + } + + auto client_ident = ClientIdentFrame::Encode( + messenger.get_myaddrs(), + conn.target_addr, + messenger.get_myname().num(), + global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, flags, + client_cookie); + + logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, messenger.get_myaddrs(), conn.target_addr, + messenger.get_myname().num(), global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, client_cookie); + return frame_assembler->write_flush_frame(client_ident + ).then([this] { + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + switch (ret.tag) { + case Tag::IDENT_MISSING_FEATURES: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_ident_missing_features() logic + auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back()); + logger().warn("{} GOT IdentMissingFeaturesFrame: features={}" + " (client does not support all server features)", + conn, ident_missing.features()); + abort_in_fault(); + return next_step_t::none; + }); + case Tag::WAIT: + return process_wait(); + case Tag::SERVER_IDENT: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} at receiving SERVER_IDENT", + conn, get_state_name(state)); + abort_protocol(); + } + + // handle_server_ident() logic + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::requeue_out_sent()", + conn, cc_seq); + io_states.requeue_out_sent(); + gate.dispatch_in_background( + "requeue_out_sent", conn, [this, cc_seq] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, cc_seq] { + return io_handler.requeue_out_sent(cc_seq); + }); + }); + + auto server_ident = ServerIdentFrame::Decode(payload->back()); + logger().debug("{} GOT ServerIdentFrame:" + " addrs={}, gid={}, gs={}," + " features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, + server_ident.addrs(), server_ident.gid(), + server_ident.global_seq(), + server_ident.supported_features(), + server_ident.required_features(), + server_ident.flags(), server_ident.cookie()); + + // is this who we intended to talk to? + // be a bit forgiving here, since we may be connecting based on addresses parsed out + // of mon_host or something. + if (!server_ident.addrs().contains(conn.target_addr)) { + logger().warn("{} peer identifies as {}, does not include {}", + conn, server_ident.addrs(), conn.target_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + + server_cookie = server_ident.cookie(); + + // TODO: change peer_addr to entity_addrvec_t + if (server_ident.addrs().front() != conn.peer_addr) { + logger().warn("{} peer advertises as {}, does not match {}", + conn, server_ident.addrs(), conn.peer_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (conn.get_peer_id() != entity_name_t::NEW && + conn.get_peer_id() != server_ident.gid()) { + logger().error("{} connection peer id ({}) does not match " + "what it should be ({}) during connecting, close", + conn, server_ident.gid(), conn.get_peer_id()); + ABORT_IN_CLOSE(true); + } + conn.set_peer_id(server_ident.gid()); + conn.set_features(server_ident.supported_features() & + conn.policy.features_supported); + logger().debug("{} UPDATE: features={}", conn, conn.get_features()); + peer_global_seq = server_ident.global_seq(); + + bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY; + if (lossy != conn.policy.lossy) { + logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy); + conn.policy.lossy = lossy; + } + if (lossy && (connect_seq != 0 || server_cookie != 0)) { + logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy", + conn, connect_seq, server_cookie); + connect_seq = 0; + server_cookie = 0; + } + + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + }); + default: { + unexpected_tag(ret.tag, conn, "post_client_connect"); + return seastar::make_ready_future<next_step_t>(next_step_t::none); + } + } + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::client_reconnect() +{ + // send_reconnect() logic + auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(), + client_cookie, + server_cookie, + global_seq, + connect_seq, + io_states.in_seq); + logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={}," + " server_cookie={}, gs={}, cs={}, in_seq={}", + conn, messenger.get_myaddrs(), + client_cookie, server_cookie, + global_seq, connect_seq, io_states.in_seq); + return frame_assembler->write_flush_frame(reconnect).then([this] { + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + switch (ret.tag) { + case Tag::SESSION_RETRY_GLOBAL: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_session_retry_global() logic + auto retry = RetryGlobalFrame::Decode(payload->back()); + logger().warn("{} GOT RetryGlobalFrame: gs={}", + conn, retry.global_seq()); + global_seq = messenger.get_global_seq(retry.global_seq()); + logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq); + return client_reconnect(); + }); + case Tag::SESSION_RETRY: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_session_retry() logic + auto retry = RetryFrame::Decode(payload->back()); + logger().warn("{} GOT RetryFrame: cs={}", + conn, retry.connect_seq()); + connect_seq = retry.connect_seq() + 1; + logger().warn("{} UPDATE: cs={}", conn, connect_seq); + return client_reconnect(); + }); + case Tag::SESSION_RESET: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} before reset_session()", + conn, get_state_name(state)); + abort_protocol(); + } + // handle_session_reset() logic + auto reset = ResetFrame::Decode(payload->back()); + logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); + + reset_session(reset.full()); + // user can make changes + + return client_connect(); + }); + case Tag::WAIT: + return process_wait(); + case Tag::SESSION_RECONNECT_OK: + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} at receiving RECONNECT_OK", + conn, get_state_name(state)); + abort_protocol(); + } + + // handle_reconnect_ok() logic + auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, " + "send {} IOHandler::requeue_out_sent_up_to()", + conn, reconnect_ok.msg_seq(), cc_seq); + + io_states.requeue_out_sent_up_to(); + auto msg_seq = reconnect_ok.msg_seq(); + gate.dispatch_in_background( + "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, cc_seq, msg_seq] { + return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq); + }); + }); + + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + }); + default: { + unexpected_tag(ret.tag, conn, "post_client_reconnect"); + return seastar::make_ready_future<next_step_t>(next_step_t::none); + } + } + }); +} + +void ProtocolV2::execute_connecting() +{ + ceph_assert_always(!is_socket_valid); + trigger_state(state_t::CONNECTING, io_state_t::delay); + gated_execute("execute_connecting", conn, [this] { + global_seq = messenger.get_global_seq(); + assert(client_cookie != 0); + if (!conn.policy.lossy && server_cookie != 0) { + ++connect_seq; + logger().debug("{} UPDATE: gs={}, cs={} for reconnect", + conn, global_seq, connect_seq); + } else { // conn.policy.lossy || server_cookie == 0 + assert(connect_seq == 0); + assert(server_cookie == 0); + logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); + } + return wait_exit_io().then([this] { +#ifdef UNIT_TESTS_BUILT + // process custom_bp_t::SOCKET_CONNECTING + // supports CONTINUE/FAULT/BLOCK + if (!conn.interceptor) { + return seastar::now(); + } + return conn.interceptor->intercept( + conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}} + ).then([this](bp_action_t action) { + switch (action) { + case bp_action_t::CONTINUE: + return seastar::now(); + case bp_action_t::FAULT: + logger().info("[Test] got FAULT"); + abort_in_fault(); + case bp_action_t::BLOCK: + logger().info("[Test] got BLOCK"); + return conn.interceptor->blocker.block(); + default: + ceph_abort("unexpected action from trap"); + return seastar::now(); + } + });; + }).then([this] { +#endif + ceph_assert_always(frame_assembler); + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} before Socket::connect()", + conn, get_state_name(state)); + abort_protocol(); + } + return Socket::connect(conn.peer_addr); + }).then([this](SocketRef _new_socket) { + logger().debug("{} socket connected", conn); + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} during Socket::connect()", + conn, get_state_name(state)); + return _new_socket->close().then([sock=std::move(_new_socket)] { + abort_protocol(); + }); + } + SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket)); + if (!has_socket) { + frame_assembler->set_socket(std::move(new_socket)); + has_socket = true; + } else { + gate.dispatch_in_background( + "replace_socket_connecting", + conn, + [this, new_socket=std::move(new_socket)]() mutable { + return frame_assembler->replace_shutdown_socket(std::move(new_socket)); + } + ); + } + is_socket_valid = true; + return seastar::now(); + }).then([this] { + auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); + frame_assembler->reset_handlers(); + frame_assembler->start_recording(); + return banner_exchange(true); + }).then([this] (auto&& ret) { + auto [_peer_type, _my_addr_from_peer] = std::move(ret); + if (conn.get_peer_type() != _peer_type) { + logger().warn("{} connection peer type does not match what peer advertises {} != {}", + conn, ceph_entity_type_name(conn.get_peer_type()), + ceph_entity_type_name(_peer_type)); + ABORT_IN_CLOSE(true); + } + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} during banner_exchange(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + frame_assembler->learn_socket_ephemeral_port_as_connector( + _my_addr_from_peer.get_port()); + if (unlikely(_my_addr_from_peer.is_legacy())) { + logger().warn("{} peer sent a legacy address for me: {}", + conn, _my_addr_from_peer); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2); + messenger.learned_addr(_my_addr_from_peer, conn); + return client_auth(); + }).then([this] { + if (server_cookie == 0) { + ceph_assert(connect_seq == 0); + return client_connect(); + } else { + ceph_assert(connect_seq > 0); + return client_reconnect(); + } + }).then([this] (next_step_t next) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} at the end of execute_connecting()", + conn, get_state_name(state)); + abort_protocol(); + } + switch (next) { + case next_step_t::ready: { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} before dispatch_connect(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + + auto cc_seq = crosscore.prepare_submit(); + // there are 2 hops with dispatch_connect() + crosscore.prepare_submit(); + logger().info("{} connected: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}, new_sid={}, " + "send {} IOHandler::dispatch_connect()", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, io_states, + frame_assembler->get_socket_shard_id(), cc_seq); + + // set io_handler to a new shard + auto new_io_shard = frame_assembler->get_socket_shard_id(); + ConnectionFRef conn_fref = seastar::make_foreign( + conn.shared_from_this()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + pr_switch_io_shard = seastar::shared_promise<>(); + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, cc_seq, new_io_shard, + conn_fref=std::move(conn_fref)]() mutable { + return io_handler.dispatch_connect( + cc_seq, new_io_shard, std::move(conn_fref)); + }).then([this, new_io_shard] { + ceph_assert_always(io_handler.get_shard_id() == new_io_shard); + pr_switch_io_shard->set_value(); + pr_switch_io_shard = std::nullopt; + // user can make changes + + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} after dispatch_connect(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + execute_ready(); + }); + } + case next_step_t::wait: { + logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); + ceph_assert_always(is_socket_valid); + frame_assembler->shutdown_socket<true>(&gate); + is_socket_valid = false; + execute_wait(true); + return seastar::now(); + } + default: { + ceph_abort("impossible next step"); + } + } + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::CONNECTING, "execute_connecting", eptr); + }); + }); +} + +// ACCEPTING state + +seastar::future<> ProtocolV2::_auth_bad_method(int r) +{ + // _auth_bad_method() logic + ceph_assert(r < 0); + auto [allowed_methods, allowed_modes] = + messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type()); + auto bad_method = AuthBadMethodFrame::Encode( + auth_meta->auth_method, r, allowed_methods, allowed_modes); + logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, " + "allowed_methods={}, allowed_modes={})", + conn, auth_meta->auth_method, cpp_strerror(r), + allowed_methods, allowed_modes); + return frame_assembler->write_flush_frame(bad_method + ).then([this] { + return server_auth(); + }); +} + +seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) +{ + // _handle_auth_request() logic + ceph_assert(messenger.get_auth_server()); + bufferlist reply; + int r = messenger.get_auth_server()->handle_auth_request( + conn, + *auth_meta, + more, + auth_meta->auth_method, + auth_payload, + &conn.peer_global_id, + &reply); + switch (r) { + // successful + case 1: { + auto auth_done = AuthDoneFrame::Encode( + conn.peer_global_id, auth_meta->con_mode, reply); + logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", + conn, conn.peer_global_id, + ceph_con_mode_name(auth_meta->con_mode), reply.length()); + return frame_assembler->write_flush_frame(auth_done + ).then([this] { + ceph_assert(auth_meta); + frame_assembler->create_session_stream_handlers(*auth_meta, true); + return finish_auth(); + }); + } + // auth more + case 0: { + auto more = AuthReplyMoreFrame::Encode(reply); + logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", + conn, reply.length()); + return frame_assembler->write_flush_frame(more + ).then([this] { + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more"); + return frame_assembler->read_frame_payload(); + }).then([this](auto payload) { + auto auth_more = AuthRequestMoreFrame::Decode(payload->back()); + logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}", + conn, auth_more.auth_payload().length()); + return _handle_auth_request(auth_more.auth_payload(), true); + }); + } + case -EBUSY: { + logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn); + abort_in_fault(); + return seastar::now(); + } + default: { + logger().warn("{} auth_server handle_auth_request returned {}", conn, r); + return _auth_bad_method(r); + } + } +} + +seastar::future<> ProtocolV2::server_auth() +{ + return frame_assembler->read_main_preamble( + ).then([this](auto ret) { + expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request"); + return frame_assembler->read_frame_payload(); + }).then([this](auto payload) { + // handle_auth_request() logic + auto request = AuthRequestFrame::Decode(payload->back()); + logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={}," + " payload_len={}", + conn, request.method(), request.preferred_modes(), + request.auth_payload().length()); + auth_meta->auth_method = request.method(); + auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode( + conn.get_peer_type(), auth_meta->auth_method, + request.preferred_modes()); + if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) { + logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn); + return _auth_bad_method(-EOPNOTSUPP); + } + return _handle_auth_request(request.auth_payload(), false); + }); +} + +bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const +{ + auto my_peer_name = conn.get_peer_name(); + if (my_peer_name.type() != peer_name.type()) { + return false; + } + if (my_peer_name.num() != entity_name_t::NEW && + peer_name.num() != entity_name_t::NEW && + my_peer_name.num() != peer_name.num()) { + return false; + } + return true; +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_wait() +{ + auto wait = WaitFrame::Encode(); + logger().debug("{} WRITE WaitFrame", conn); + return frame_assembler->write_flush_frame(wait + ).then([] { + return next_step_t::wait; + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::reuse_connection( + ProtocolV2* existing_proto, bool do_reset, + bool reconnect, uint64_t conn_seq, uint64_t msg_seq) +{ + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before trigger_replacing()", + conn, get_state_name(state)); + abort_protocol(); + } + + existing_proto->trigger_replacing(reconnect, + do_reset, + frame_assembler->to_replace(), + std::move(auth_meta), + peer_global_seq, + client_cookie, + conn.get_peer_name(), + conn.get_features(), + peer_supported_features, + conn_seq, + msg_seq); + ceph_assert_always(has_socket && is_socket_valid); + is_socket_valid = false; + has_socket = false; +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + conn.interceptor->register_conn_replaced( + conn.get_local_shared_foreign_from_this()); + } +#endif + // close this connection because all the necessary information is delivered + // to the exisiting connection, and jump to error handling code to abort the + // current state. + ABORT_IN_CLOSE(false); + return seastar::make_ready_future<next_step_t>(next_step_t::none); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) +{ + // handle_existing_connection() logic + ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( + existing_conn->protocol.get()); + ceph_assert(existing_proto); + logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting," + " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, + fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state), + existing_proto->global_seq, + existing_proto->peer_global_seq, + existing_proto->connect_seq, + existing_proto->client_cookie, + existing_proto->server_cookie); + + if (!validate_peer_name(existing_conn->get_peer_name())) { + logger().error("{} server_connect: my peer_name doesn't match" + " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get())); + abort_in_fault(); + } + + if (existing_proto->state == state_t::REPLACING) { + logger().warn("{} server_connect: racing replace happened while" + " replacing existing connection {}, send wait.", + conn, *existing_conn); + return send_wait(); + } + + if (existing_proto->peer_global_seq > peer_global_seq) { + logger().warn("{} server_connect:" + " this is a stale connection, because peer_global_seq({})" + " < existing->peer_global_seq({}), close this connection" + " in favor of existing connection {}", + conn, peer_global_seq, + existing_proto->peer_global_seq, *existing_conn); + abort_in_fault(); + } + + if (existing_conn->policy.lossy) { + // existing connection can be thrown out in favor of this one + logger().warn("{} server_connect:" + " existing connection {} is a lossy channel. Close existing in favor of" + " this connection", conn, *existing_conn); + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + execute_establishing(existing_conn); + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + } + + if (existing_proto->server_cookie != 0) { + if (existing_proto->client_cookie != client_cookie) { + // Found previous session + // peer has reset and we're going to reuse the existing connection + // by replacing the socket + logger().warn("{} server_connect:" + " found new session (cs={})" + " when existing {} {} is with stale session (cs={}, ss={})," + " peer must have reset", + conn, + client_cookie, + get_state_name(existing_proto->state), + *existing_conn, + existing_proto->client_cookie, + existing_proto->server_cookie); + return reuse_connection(existing_proto, conn.policy.resetcheck); + } else { + // session establishment interrupted between client_ident and server_ident, + // continuing... + logger().warn("{} server_connect: found client session with existing {} {}" + " matched (cs={}, ss={}), continuing session establishment", + conn, + get_state_name(existing_proto->state), + *existing_conn, + client_cookie, + existing_proto->server_cookie); + return reuse_connection(existing_proto); + } + } else { + // Looks like a connection race: server and client are both connecting to + // each other at the same time. + if (existing_proto->client_cookie != client_cookie) { + if (existing_conn->peer_wins()) { + // acceptor (this connection, the peer) wins + logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" + " and win, reusing existing {} {}", + conn, + client_cookie, + existing_proto->client_cookie, + get_state_name(existing_proto->state), + *existing_conn); + return reuse_connection(existing_proto); + } else { + // acceptor (this connection, the peer) loses + logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" + " and lose to existing {}, ask client to wait", + conn, client_cookie, existing_proto->client_cookie, *existing_conn); + return existing_conn->send_keepalive().then([this] { + return send_wait(); + }); + } + } else { + logger().warn("{} server_connect: found client session with existing {} {}" + " matched (cs={}, ss={}), continuing session establishment", + conn, + get_state_name(existing_proto->state), + *existing_conn, + client_cookie, + existing_proto->server_cookie); + return reuse_connection(existing_proto); + } + } +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::server_connect() +{ + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_client_ident() logic + auto client_ident = ClientIdentFrame::Decode(payload->back()); + logger().debug("{} GOT ClientIdentFrame: addrs={}, target={}," + " gid={}, gs={}, features_supported={}," + " features_required={}, flags={}, cookie={}", + conn, client_ident.addrs(), client_ident.target_addr(), + client_ident.gid(), client_ident.global_seq(), + client_ident.supported_features(), + client_ident.required_features(), + client_ident.flags(), client_ident.cookie()); + + if (client_ident.addrs().empty() || + client_ident.addrs().front() == entity_addr_t()) { + logger().warn("{} oops, client_ident.addrs() is empty", conn); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (!messenger.get_myaddrs().contains(client_ident.target_addr())) { + logger().warn("{} peer is trying to reach {} which is not us ({})", + conn, client_ident.target_addr(), messenger.get_myaddrs()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + conn.peer_addr = client_ident.addrs().front(); + logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr); + conn.target_addr = conn.peer_addr; + if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { + logger().warn("{} we don't know how to reconnect to peer {}", + conn, conn.target_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + + if (conn.get_peer_id() != entity_name_t::NEW && + conn.get_peer_id() != client_ident.gid()) { + logger().error("{} client_ident peer_id ({}) does not match" + " what it should be ({}) during accepting, abort", + conn, client_ident.gid(), conn.get_peer_id()); + abort_in_fault(); + } + conn.set_peer_id(client_ident.gid()); + client_cookie = client_ident.cookie(); + + uint64_t feat_missing = + (conn.policy.features_required | msgr2_required) & + ~(uint64_t)client_ident.supported_features(); + if (feat_missing) { + auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); + logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", + conn, feat_missing); + return frame_assembler->write_flush_frame(ident_missing_features + ).then([] { + return next_step_t::wait; + }); + } + conn.set_features(client_ident.supported_features() & + conn.policy.features_supported); + logger().debug("{} UPDATE: features={}", conn, conn.get_features()); + + peer_global_seq = client_ident.global_seq(); + + bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY; + if (lossy != conn.policy.lossy) { + logger().warn("{} my lossy policy {} doesn't match client {}, ignore", + conn, conn.policy.lossy, lossy); + } + + // Looks good so far, let's check if there is already an existing connection + // to this peer. + + SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); + + if (existing_conn) { + return handle_existing_connection(existing_conn); + } else { + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + execute_establishing(nullptr); + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + } + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::read_reconnect() +{ + return frame_assembler->read_main_preamble( + ).then([this](auto ret) { + expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect"); + return server_reconnect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_retry(uint64_t connect_seq) +{ + auto retry = RetryFrame::Encode(connect_seq); + logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); + return frame_assembler->write_flush_frame(retry + ).then([this] { + return read_reconnect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_retry_global(uint64_t global_seq) +{ + auto retry = RetryGlobalFrame::Encode(global_seq); + logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); + return frame_assembler->write_flush_frame(retry + ).then([this] { + return read_reconnect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_reset(bool full) +{ + auto reset = ResetFrame::Encode(full); + logger().warn("{} WRITE ResetFrame: full={}", conn, full); + return frame_assembler->write_flush_frame(reset + ).then([this] { + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset"); + return server_connect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::server_reconnect() +{ + return frame_assembler->read_frame_payload( + ).then([this](auto payload) { + // handle_reconnect() logic + auto reconnect = ReconnectFrame::Decode(payload->back()); + + logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={}," + " server_cookie={}, gs={}, cs={}, msg_seq={}", + conn, reconnect.addrs(), + reconnect.client_cookie(), reconnect.server_cookie(), + reconnect.global_seq(), reconnect.connect_seq(), + reconnect.msg_seq()); + + // can peer_addrs be changed on-the-fly? + // TODO: change peer_addr to entity_addrvec_t + entity_addr_t paddr = reconnect.addrs().front(); + if (paddr.is_msgr2() || paddr.is_any()) { + // good + } else { + logger().warn("{} peer's address {} is not v2", conn, paddr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (conn.peer_addr == entity_addr_t()) { + conn.peer_addr = paddr; + } else if (conn.peer_addr != paddr) { + logger().error("{} peer identifies as {}, while conn.peer_addr={}," + " reconnect failed", + conn, paddr, conn.peer_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + peer_global_seq = reconnect.global_seq(); + + SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); + + if (!existing_conn) { + // there is no existing connection therefore cannot reconnect to previous + // session + logger().warn("{} server_reconnect: no existing connection from address {}," + " reseting client", conn, conn.peer_addr); + return send_reset(true); + } + + ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( + existing_conn->protocol.get()); + ceph_assert(existing_proto); + logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting," + " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", + conn, global_seq, peer_global_seq, reconnect.connect_seq(), + reconnect.client_cookie(), reconnect.server_cookie(), + fmt::ptr(existing_conn.get()), + get_state_name(existing_proto->state), + existing_proto->global_seq, + existing_proto->peer_global_seq, + existing_proto->connect_seq, + existing_proto->client_cookie, + existing_proto->server_cookie); + + if (!validate_peer_name(existing_conn->get_peer_name())) { + logger().error("{} server_reconnect: my peer_name doesn't match" + " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get())); + abort_in_fault(); + } + + if (existing_proto->state == state_t::REPLACING) { + logger().warn("{} server_reconnect: racing replace happened while " + " replacing existing connection {}, retry global.", + conn, *existing_conn); + return send_retry_global(existing_proto->peer_global_seq); + } + + if (existing_proto->client_cookie != reconnect.client_cookie()) { + logger().warn("{} server_reconnect:" + " client_cookie mismatch with existing connection {}," + " cc={} rcc={}. I must have reset, reseting client.", + conn, *existing_conn, + existing_proto->client_cookie, reconnect.client_cookie()); + return send_reset(conn.policy.resetcheck); + } else if (existing_proto->server_cookie == 0) { + // this happens when: + // - a connects to b + // - a sends client_ident + // - b gets client_ident, sends server_ident and sets cookie X + // - connection fault + // - b reconnects to a with cookie X, connect_seq=1 + // - a has cookie==0 + logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the" + " server_ident with existing connection {}." + " Asking peer to resume session establishment", + conn, existing_proto->client_cookie, *existing_conn); + return send_reset(false); + } + + if (existing_proto->peer_global_seq > reconnect.global_seq()) { + logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({})," + " with existing connection {}," + " ask client to retry global", + conn, existing_proto->peer_global_seq, + reconnect.global_seq(), *existing_conn); + return send_retry_global(existing_proto->peer_global_seq); + } + + if (existing_proto->connect_seq > reconnect.connect_seq()) { + logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({})," + " with existing connection {}, ask client to retry", + conn, reconnect.connect_seq(), + existing_proto->connect_seq, *existing_conn); + return send_retry(existing_proto->connect_seq); + } else if (existing_proto->connect_seq == reconnect.connect_seq()) { + // reconnect race: both peers are sending reconnect messages + if (existing_conn->peer_wins()) { + // acceptor (this connection, the peer) wins + logger().warn("{} server_reconnect: reconnect race detected (cs={})" + " and win, reusing existing {} {}", + conn, + reconnect.connect_seq(), + get_state_name(existing_proto->state), + *existing_conn); + return reuse_connection( + existing_proto, false, + true, reconnect.connect_seq(), reconnect.msg_seq()); + } else { + // acceptor (this connection, the peer) loses + logger().warn("{} server_reconnect: reconnect race detected (cs={})" + " and lose to existing {}, ask client to wait", + conn, reconnect.connect_seq(), *existing_conn); + return send_wait(); + } + } else { // existing_proto->connect_seq < reconnect.connect_seq() + logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({})," + " reusing existing {} {}", + conn, + existing_proto->connect_seq, + reconnect.connect_seq(), + get_state_name(existing_proto->state), + *existing_conn); + return reuse_connection( + existing_proto, false, + true, reconnect.connect_seq(), reconnect.msg_seq()); + } + }); +} + +void ProtocolV2::execute_accepting() +{ + assert(is_socket_valid); + trigger_state(state_t::ACCEPTING, io_state_t::none); + gate.dispatch_in_background("execute_accepting", conn, [this] { + return seastar::futurize_invoke([this] { +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + // only notify socket accepted + gate.dispatch_in_background( + "test_intercept_socket_accepted", conn, [this] { + return conn.interceptor->intercept( + conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}} + ).then([](bp_action_t action) { + ceph_assert(action == bp_action_t::CONTINUE); + }); + }); + } +#endif + auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); + frame_assembler->reset_handlers(); + frame_assembler->start_recording(); + return banner_exchange(false); + }).then([this] (auto&& ret) { + auto [_peer_type, _my_addr_from_peer] = std::move(ret); + ceph_assert(conn.get_peer_type() == 0); + conn.set_peer_type(_peer_type); + + conn.policy = messenger.get_policy(_peer_type); + logger().info("{} UPDATE: peer_type={}," + " policy(lossy={} server={} standby={} resetcheck={})", + conn, ceph_entity_type_name(_peer_type), + conn.policy.lossy, conn.policy.server, + conn.policy.standby, conn.policy.resetcheck); + if (!messenger.get_myaddr().is_blank_ip() && + (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() || + messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) { + logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}", + conn, _my_addr_from_peer, messenger.get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + messenger.learned_addr(_my_addr_from_peer, conn); + return server_auth(); + }).then([this] { + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + switch (ret.tag) { + case Tag::CLIENT_IDENT: + return server_connect(); + case Tag::SESSION_RECONNECT: + return server_reconnect(); + default: { + unexpected_tag(ret.tag, conn, "post_server_auth"); + return seastar::make_ready_future<next_step_t>(next_step_t::none); + } + } + }).then([this] (next_step_t next) { + switch (next) { + case next_step_t::ready: + assert(state != state_t::ACCEPTING); + break; + case next_step_t::wait: + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} at the end of execute_accepting()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} execute_accepting(): going to SERVER_WAIT", conn); + execute_server_wait(); + break; + default: + ceph_abort("impossible next step"); + } + }).handle_exception([this](std::exception_ptr eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", + conn, get_state_name(state), e_what); + do_close(false); + }); + }); +} + +// CONNECTING or ACCEPTING state + +seastar::future<> ProtocolV2::finish_auth() +{ + ceph_assert(auth_meta); + + auto records = frame_assembler->stop_recording(); + const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() : + auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf); + auto sig_frame = AuthSignatureFrame::Encode(sig); + logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); + return frame_assembler->write_flush_frame(sig_frame + ).then([this] { + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth"); + return frame_assembler->read_frame_payload(); + }).then([this, txbuf=std::move(records.txbuf)](auto payload) { + // handle_auth_signature() logic + auto sig_frame = AuthSignatureFrame::Decode(payload->back()); + logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature()); + + const auto actual_tx_sig = auth_meta->session_key.empty() ? + sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf); + if (sig_frame.signature() != actual_tx_sig) { + logger().warn("{} pre-auth signature mismatch actual_tx_sig={}" + " sig_frame.signature()={}", + conn, actual_tx_sig, sig_frame.signature()); + abort_in_fault(); + } + }); +} + +// ESTABLISHING + +void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { + auto accept_me = [this] { + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + }; + + ceph_assert_always(is_socket_valid); + trigger_state(state_t::ESTABLISHING, io_state_t::delay); + bool is_replace; + if (existing_conn) { + logger().info("{} start establishing: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}, new_sid={}, " + "close existing {}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, + io_states, frame_assembler->get_socket_shard_id(), + *existing_conn); + is_replace = true; + ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( + existing_conn->protocol.get()); + existing_proto->do_close( + true, // is_dispatch_reset + std::move(accept_me)); + if (unlikely(state != state_t::ESTABLISHING)) { + logger().warn("{} triggered {} during execute_establishing(), " + "the accept event will not be delivered!", + conn, get_state_name(state)); + abort_protocol(); + } + } else { + logger().info("{} start establishing: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}, new_sid={}, " + "no existing", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, io_states, + frame_assembler->get_socket_shard_id()); + is_replace = false; + accept_me(); + } + + gated_execute("execute_establishing", conn, [this, is_replace] { + ceph_assert_always(state == state_t::ESTABLISHING); + + // set io_handler to a new shard + auto cc_seq = crosscore.prepare_submit(); + // there are 2 hops with dispatch_accept() + crosscore.prepare_submit(); + auto new_io_shard = frame_assembler->get_socket_shard_id(); + logger().debug("{} send {} IOHandler::dispatch_accept({})", + conn, cc_seq, new_io_shard); + ConnectionFRef conn_fref = seastar::make_foreign( + conn.shared_from_this()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + pr_switch_io_shard = seastar::shared_promise<>(); + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, cc_seq, new_io_shard, is_replace, + conn_fref=std::move(conn_fref)]() mutable { + return io_handler.dispatch_accept( + cc_seq, new_io_shard, std::move(conn_fref), is_replace); + }).then([this, new_io_shard] { + ceph_assert_always(io_handler.get_shard_id() == new_io_shard); + pr_switch_io_shard->set_value(); + pr_switch_io_shard = std::nullopt; + // user can make changes + + if (unlikely(state != state_t::ESTABLISHING)) { + logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + + return send_server_ident(); + }).then([this] { + if (unlikely(state != state_t::ESTABLISHING)) { + logger().debug("{} triggered {} at the end of execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} established, going to ready", conn); + execute_ready(); + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::ESTABLISHING, "execute_establishing", eptr); + }); + }); +} + +// ESTABLISHING or REPLACING state + +seastar::future<> +ProtocolV2::send_server_ident() +{ + ceph_assert_always(state == state_t::ESTABLISHING || + state == state_t::REPLACING); + // send_server_ident() logic + + // refered to async-conn v2: not assign gs to global_seq + global_seq = messenger.get_global_seq(); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} UPDATE: gs={} for server ident, " + "send {} IOHandler::reset_peer_state()", + conn, global_seq, cc_seq); + + // this is required for the case when this connection is being replaced + io_states.reset_peer_state(); + gate.dispatch_in_background( + "reset_peer_state", conn, [this, cc_seq] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, cc_seq] { + return io_handler.reset_peer_state(cc_seq); + }); + }); + + if (!conn.policy.lossy) { + server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll); + } + + uint64_t flags = 0; + if (conn.policy.lossy) { + flags = flags | CEPH_MSG_CONNECT_LOSSY; + } + + auto server_ident = ServerIdentFrame::Encode( + messenger.get_myaddrs(), + messenger.get_myname().num(), + global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, + server_cookie); + + logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, messenger.get_myaddrs(), messenger.get_myname().num(), + global_seq, conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, server_cookie); + + return frame_assembler->write_flush_frame(server_ident); +} + +// REPLACING state + +void ProtocolV2::trigger_replacing(bool reconnect, + bool do_reset, + FrameAssemblerV2::mover_t &&mover, + AuthConnectionMetaRef&& new_auth_meta, + uint64_t new_peer_global_seq, + uint64_t new_client_cookie, + entity_name_t new_peer_name, + uint64_t new_conn_features, + uint64_t new_peer_supported_features, + uint64_t new_connect_seq, + uint64_t new_msg_seq) +{ + ceph_assert_always(state >= state_t::ESTABLISHING); + ceph_assert_always(state <= state_t::WAIT); + ceph_assert_always(has_socket || state == state_t::CONNECTING); + // mover.socket shouldn't be shutdown + + logger().info("{} start replacing ({}): pgs was {}, cs was {}, " + "client_cookie was {}, {}, new_sid={}", + conn, reconnect ? "reconnected" : "connected", + peer_global_seq, connect_seq, client_cookie, + io_states, mover.socket->get_shard_id()); + if (is_socket_valid) { + frame_assembler->shutdown_socket<true>(&gate); + is_socket_valid = false; + } + trigger_state_phase1(state_t::REPLACING); + gate.dispatch_in_background( + "trigger_replacing", + conn, + [this, + reconnect, + do_reset, + mover = std::move(mover), + new_auth_meta = std::move(new_auth_meta), + new_client_cookie, new_peer_name, + new_conn_features, new_peer_supported_features, + new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { + ceph_assert_always(state == state_t::REPLACING); + auto new_io_shard = mover.socket->get_shard_id(); + // state may become CLOSING below, but we cannot abort the chain until + // mover.socket is correctly handled (closed or replaced). + + // this is preemptive + return wait_switch_io_shard( + ).then([this] { + if (unlikely(state != state_t::REPLACING)) { + ceph_assert_always(state == state_t::CLOSING); + return seastar::now(); + } + + trigger_state_phase2(state_t::REPLACING, io_state_t::delay); + return wait_exit_io(); + }).then([this] { + if (unlikely(state != state_t::REPLACING)) { + ceph_assert_always(state == state_t::CLOSING); + return seastar::now(); + } + + ceph_assert_always(frame_assembler); + protocol_timer.cancel(); + auto done = std::move(execution_done); + execution_done = seastar::now(); + return done; + }).then([this, new_io_shard] { + if (unlikely(state != state_t::REPLACING)) { + ceph_assert_always(state == state_t::CLOSING); + return seastar::now(); + } + + // set io_handler to a new shard + // we should prevent parallel switching core attemps + auto cc_seq = crosscore.prepare_submit(); + // there are 2 hops with dispatch_accept() + crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::dispatch_accept({})", + conn, cc_seq, new_io_shard); + ConnectionFRef conn_fref = seastar::make_foreign( + conn.shared_from_this()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + pr_switch_io_shard = seastar::shared_promise<>(); + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, cc_seq, new_io_shard, + conn_fref=std::move(conn_fref)]() mutable { + return io_handler.dispatch_accept( + cc_seq, new_io_shard, std::move(conn_fref), false); + }).then([this, new_io_shard] { + ceph_assert_always(io_handler.get_shard_id() == new_io_shard); + pr_switch_io_shard->set_value(); + pr_switch_io_shard = std::nullopt; + // user can make changes + }); + }).then([this, + reconnect, + do_reset, + mover = std::move(mover), + new_auth_meta = std::move(new_auth_meta), + new_client_cookie, new_peer_name, + new_conn_features, new_peer_supported_features, + new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { + if (state == state_t::REPLACING && do_reset) { + reset_session(true); + // user can make changes + } + + if (unlikely(state != state_t::REPLACING)) { + logger().debug("{} triggered {} in the middle of trigger_replacing(), abort", + conn, get_state_name(state)); + ceph_assert_always(state == state_t::CLOSING); + return mover.socket->close( + ).then([sock = std::move(mover.socket)] { + abort_protocol(); + }); + } + + auth_meta = std::move(new_auth_meta); + peer_global_seq = new_peer_global_seq; + gate.dispatch_in_background( + "replace_frame_assembler", + conn, + [this, mover=std::move(mover)]() mutable { + return frame_assembler->replace_by(std::move(mover)); + } + ); + is_socket_valid = true; + has_socket = true; + + if (reconnect) { + connect_seq = new_connect_seq; + // send_reconnect_ok() logic + + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})", + conn, cc_seq, new_msg_seq); + io_states.requeue_out_sent_up_to(); + gate.dispatch_in_background( + "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] { + return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq); + }); + }); + + auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq); + logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq); + return frame_assembler->write_flush_frame(reconnect_ok); + } else { + client_cookie = new_client_cookie; + assert(conn.get_peer_type() == new_peer_name.type()); + if (conn.get_peer_id() == entity_name_t::NEW) { + conn.set_peer_id(new_peer_name.num()); + } + conn.set_features(new_conn_features); + peer_supported_features = new_peer_supported_features; + bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); + frame_assembler->set_is_rev1(is_rev1); + return send_server_ident(); + } + }).then([this, reconnect] { + if (unlikely(state != state_t::REPLACING)) { + logger().debug("{} triggered {} at the end of trigger_replacing(), abort", + conn, get_state_name(state)); + ceph_assert_always(state == state_t::CLOSING); + abort_protocol(); + } + logger().info("{} replaced ({}), going to ready: " + "gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}", + conn, reconnect ? "reconnected" : "connected", + global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, io_states); + execute_ready(); + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::REPLACING, "trigger_replacing", eptr); + }); + }); +} + +// READY state + +seastar::future<> ProtocolV2::notify_out_fault( + crosscore_t::seq_t cc_seq, + const char *where, + std::exception_ptr eptr, + io_handler_state _io_states) +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} notify_out_fault(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, where, eptr, _io_states] { + return notify_out_fault(cc_seq, where, eptr, _io_states); + }); + } + + io_states = _io_states; + logger().debug("{} got {} notify_out_fault(): io_states={}", + conn, cc_seq, io_states); + fault(state_t::READY, where, eptr); + return seastar::now(); +} + +void ProtocolV2::execute_ready() +{ + assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); + protocol_timer.cancel(); + ceph_assert_always(is_socket_valid); + // I'm not responsible to shutdown the socket at READY + is_socket_valid = false; + trigger_state(state_t::READY, io_state_t::open); +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + // FIXME: doesn't support cross-core + conn.interceptor->register_conn_ready( + conn.get_local_shared_foreign_from_this()); + } +#endif +} + +// STANDBY state + +void ProtocolV2::execute_standby() +{ + ceph_assert_always(!is_socket_valid); + trigger_state(state_t::STANDBY, io_state_t::delay); +} + +seastar::future<> ProtocolV2::notify_out( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} notify_out(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return notify_out(cc_seq); + }); + } + + logger().debug("{} got {} notify_out(): at {}", + conn, cc_seq, get_state_name(state)); + io_states.is_out_queued = true; + if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { + logger().info("{} notify_out(): at {}, going to CONNECTING", + conn, get_state_name(state)); + execute_connecting(); + } + return seastar::now(); +} + +// WAIT state + +void ProtocolV2::execute_wait(bool max_backoff) +{ + ceph_assert_always(!is_socket_valid); + trigger_state(state_t::WAIT, io_state_t::delay); + gated_execute("execute_wait", conn, [this, max_backoff] { + double backoff = protocol_timer.last_dur(); + if (max_backoff) { + backoff = local_conf().get_val<double>("ms_max_backoff"); + } else if (backoff > 0) { + backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff); + } else { + backoff = local_conf().get_val<double>("ms_initial_backoff"); + } + return protocol_timer.backoff(backoff).then([this] { + if (unlikely(state != state_t::WAIT)) { + logger().debug("{} triggered {} at the end of execute_wait()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} execute_wait(): going to CONNECTING", conn); + execute_connecting(); + }).handle_exception([this](std::exception_ptr eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().info("{} execute_wait(): protocol aborted at {} -- {}", + conn, get_state_name(state), e_what); + assert(state == state_t::REPLACING || + state == state_t::CLOSING); + }); + }); +} + +// SERVER_WAIT state + +void ProtocolV2::execute_server_wait() +{ + ceph_assert_always(is_socket_valid); + trigger_state(state_t::SERVER_WAIT, io_state_t::none); + gated_execute("execute_server_wait", conn, [this] { + return frame_assembler->read_exactly(1 + ).then([this](auto bptr) { + logger().warn("{} SERVER_WAIT got read, abort", conn); + abort_in_fault(); + }).handle_exception([this](std::exception_ptr eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}", + conn, get_state_name(state), e_what); + do_close(false); + }); + }); +} + +// CLOSING state + +seastar::future<> ProtocolV2::notify_mark_down( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} notify_mark_down(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return notify_mark_down(cc_seq); + }); + } + + logger().debug("{} got {} notify_mark_down()", + conn, cc_seq); + do_close(false); + return seastar::now(); +} + +seastar::future<> ProtocolV2::close_clean_yielded() +{ + // yield() so that do_close() can be called *after* close_clean_yielded() is + // applied to all connections in a container using + // seastar::parallel_for_each(). otherwise, we could erase a connection in + // the container when seastar::parallel_for_each() is still iterating in it. + // that'd lead to a segfault. + return seastar::yield( + ).then([this] { + do_close(false); + return pr_closed_clean.get_shared_future(); + + // connection may be unreferenced from the messenger, + // so need to hold the additional reference. + }).finally([conn_ref = conn.shared_from_this()] {});; +} + +void ProtocolV2::do_close( + bool is_dispatch_reset, + std::optional<std::function<void()>> f_accept_new) +{ + if (state == state_t::CLOSING) { + // already closing + return; + } + + bool is_replace = f_accept_new ? true : false; + logger().info("{} closing: reset {}, replace {}", conn, + is_dispatch_reset ? "yes" : "no", + is_replace ? "yes" : "no"); + + /* + * atomic operations + */ + + ceph_assert_always(!gate.is_closed()); + + // messenger registrations, must before user events + messenger.closing_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) { + messenger.unaccept_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) { + messenger.unregister_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + if (f_accept_new) { + // the replacing connection must be registerred after the replaced + // connection is unreigsterred. + (*f_accept_new)(); + } + + protocol_timer.cancel(); + if (is_socket_valid) { + frame_assembler->shutdown_socket<true>(&gate); + is_socket_valid = false; + } + + trigger_state_phase1(state_t::CLOSING); + gate.dispatch_in_background( + "close_io", conn, [this, is_dispatch_reset, is_replace] { + // this is preemptive + return wait_switch_io_shard( + ).then([this, is_dispatch_reset, is_replace] { + trigger_state_phase2(state_t::CLOSING, io_state_t::drop); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})", + conn, cc_seq, is_dispatch_reset, is_replace); + + std::ignore = gate.close( + ).then([this] { + ceph_assert_always(!need_exit_io); + ceph_assert_always(!pr_exit_io.has_value()); + if (has_socket) { + ceph_assert_always(frame_assembler); + return frame_assembler->close_shutdown_socket(); + } else { + return seastar::now(); + } + }).then([this] { + logger().debug("{} closed!", conn); + messenger.closed_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + pr_closed_clean.set_value(); +#ifdef UNIT_TESTS_BUILT + closed_clean = true; + if (conn.interceptor) { + conn.interceptor->register_conn_closed( + conn.get_local_shared_foreign_from_this()); + } +#endif + // connection is unreferenced from the messenger, + // so need to hold the additional reference. + }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { + logger().error("{} closing got unexpected exception {}", + conn, eptr); + ceph_abort(); + }); + + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, cc_seq, is_dispatch_reset, is_replace] { + return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace); + }); + // user can make changes + }); + }); +} + +} // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h new file mode 100644 index 000000000..dd7a1e703 --- /dev/null +++ b/src/crimson/net/ProtocolV2.h @@ -0,0 +1,328 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/shared_future.hh> +#include <seastar/core/sleep.hh> + +#include "io_handler.h" + +namespace crimson::net { + +class ProtocolV2 final : public HandshakeListener { + using AuthConnectionMetaRef = seastar::lw_shared_ptr<AuthConnectionMeta>; + +public: + ProtocolV2(SocketConnection &, + IOHandler &); + + ~ProtocolV2() final; + + ProtocolV2(const ProtocolV2 &) = delete; + ProtocolV2(ProtocolV2 &&) = delete; + ProtocolV2 &operator=(const ProtocolV2 &) = delete; + ProtocolV2 &operator=(ProtocolV2 &&) = delete; + +/** + * as HandshakeListener + */ +private: + seastar::future<> notify_out( + crosscore_t::seq_t cc_seq) final; + + seastar::future<> notify_out_fault( + crosscore_t::seq_t cc_seq, + const char *where, + std::exception_ptr, + io_handler_state) final; + + seastar::future<> notify_mark_down( + crosscore_t::seq_t cc_seq) final; + +/* +* as ProtocolV2 to be called by SocketConnection +*/ +public: + void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name); + + void start_accept(SocketFRef&& socket, + const entity_addr_t& peer_addr); + + seastar::future<> close_clean_yielded(); + +#ifdef UNIT_TESTS_BUILT + bool is_ready() const { + return state == state_t::READY; + } + + bool is_standby() const { + return state == state_t::STANDBY; + } + + bool is_closed_clean() const { + return closed_clean; + } + + bool is_closed() const { + return state == state_t::CLOSING; + } + +#endif +private: + using io_state_t = IOHandler::io_state_t; + + seastar::future<> wait_switch_io_shard() { + if (pr_switch_io_shard.has_value()) { + return pr_switch_io_shard->get_shared_future(); + } else { + return seastar::now(); + } + } + + seastar::future<> wait_exit_io() { + if (pr_exit_io.has_value()) { + return pr_exit_io->get_shared_future(); + } else { + assert(!need_exit_io); + return seastar::now(); + } + } + + enum class state_t { + NONE = 0, + ACCEPTING, + SERVER_WAIT, + ESTABLISHING, + CONNECTING, + READY, + STANDBY, + WAIT, + REPLACING, + CLOSING + }; + + static const char *get_state_name(state_t state) { + const char *const statenames[] = {"NONE", + "ACCEPTING", + "SERVER_WAIT", + "ESTABLISHING", + "CONNECTING", + "READY", + "STANDBY", + "WAIT", + "REPLACING", + "CLOSING"}; + return statenames[static_cast<int>(state)]; + } + + void trigger_state_phase1(state_t new_state); + + void trigger_state_phase2(state_t new_state, io_state_t new_io_state); + + void trigger_state(state_t new_state, io_state_t new_io_state) { + ceph_assert_always(!pr_switch_io_shard.has_value()); + trigger_state_phase1(new_state); + trigger_state_phase2(new_state, new_io_state); + } + + template <typename Func, typename T> + void gated_execute(const char *what, T &who, Func &&func) { + gate.dispatch_in_background(what, who, [this, &who, &func] { + if (!execution_done.available()) { + // discard the unready future + gate.dispatch_in_background( + "gated_execute_abandon", + who, + [fut=std::move(execution_done)]() mutable { + return std::move(fut); + } + ); + } + seastar::promise<> pr; + execution_done = pr.get_future(); + return seastar::futurize_invoke(std::forward<Func>(func) + ).finally([pr=std::move(pr)]() mutable { + pr.set_value(); + }); + }); + } + + void fault(state_t expected_state, + const char *where, + std::exception_ptr eptr); + + void reset_session(bool is_full); + seastar::future<std::tuple<entity_type_t, entity_addr_t>> + banner_exchange(bool is_connect); + + enum class next_step_t { + ready, + wait, + none, // protocol should have been aborted or failed + }; + + // CONNECTING (client) + seastar::future<> handle_auth_reply(); + inline seastar::future<> client_auth() { + std::vector<uint32_t> empty; + return client_auth(empty); + } + seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods); + + seastar::future<next_step_t> process_wait(); + seastar::future<next_step_t> client_connect(); + seastar::future<next_step_t> client_reconnect(); + void execute_connecting(); + + // ACCEPTING (server) + seastar::future<> _auth_bad_method(int r); + seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more); + seastar::future<> server_auth(); + + bool validate_peer_name(const entity_name_t& peer_name) const; + seastar::future<next_step_t> send_wait(); + seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto, + bool do_reset=false, + bool reconnect=false, + uint64_t conn_seq=0, + uint64_t msg_seq=0); + + seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn); + seastar::future<next_step_t> server_connect(); + + seastar::future<next_step_t> read_reconnect(); + seastar::future<next_step_t> send_retry(uint64_t connect_seq); + seastar::future<next_step_t> send_retry_global(uint64_t global_seq); + seastar::future<next_step_t> send_reset(bool full); + seastar::future<next_step_t> server_reconnect(); + + void execute_accepting(); + + // CONNECTING/ACCEPTING + seastar::future<> finish_auth(); + + // ESTABLISHING + void execute_establishing(SocketConnectionRef existing_conn); + + // ESTABLISHING/REPLACING (server) + seastar::future<> send_server_ident(); + + // REPLACING (server) + void trigger_replacing(bool reconnect, + bool do_reset, + FrameAssemblerV2::mover_t &&mover, + AuthConnectionMetaRef&& new_auth_meta, + uint64_t new_peer_global_seq, + // !reconnect + uint64_t new_client_cookie, + entity_name_t new_peer_name, + uint64_t new_conn_features, + uint64_t new_peer_supported_features, + // reconnect + uint64_t new_connect_seq, + uint64_t new_msg_seq); + + // READY + void execute_ready(); + + // STANDBY + void execute_standby(); + + // WAIT + void execute_wait(bool max_backoff); + + // SERVER_WAIT + void execute_server_wait(); + + // CLOSING + // reentrant + void do_close(bool is_dispatch_reset, + std::optional<std::function<void()>> f_accept_new=std::nullopt); + +private: + SocketConnection &conn; + + SocketMessenger &messenger; + + IOHandler &io_handler; + + // asynchronously populated from io_handler + io_handler_state io_states; + + crosscore_t crosscore; + + bool has_socket = false; + + // the socket exists and it is not shutdown + bool is_socket_valid = false; + + FrameAssemblerV2Ref frame_assembler; + + bool need_notify_out = false; + + std::optional<seastar::shared_promise<>> pr_switch_io_shard; + + bool need_exit_io = false; + + std::optional<seastar::shared_promise<>> pr_exit_io; + + AuthConnectionMetaRef auth_meta; + + crimson::common::Gated gate; + + seastar::shared_promise<> pr_closed_clean; + +#ifdef UNIT_TESTS_BUILT + bool closed_clean = false; + +#endif + state_t state = state_t::NONE; + + uint64_t peer_supported_features = 0; + + uint64_t client_cookie = 0; + uint64_t server_cookie = 0; + uint64_t global_seq = 0; + uint64_t peer_global_seq = 0; + uint64_t connect_seq = 0; + + seastar::future<> execution_done = seastar::now(); + + class Timer { + double last_dur_ = 0.0; + const SocketConnection& conn; + std::optional<seastar::abort_source> as; + public: + Timer(SocketConnection& conn) : conn(conn) {} + double last_dur() const { return last_dur_; } + seastar::future<> backoff(double seconds); + void cancel() { + last_dur_ = 0.0; + if (as) { + as->request_abort(); + as = std::nullopt; + } + } + }; + Timer protocol_timer; +}; + +struct create_handlers_ret { + std::unique_ptr<ConnectionHandler> io_handler; + std::unique_ptr<ProtocolV2> protocol; +}; +inline create_handlers_ret create_handlers(ChainedDispatchers &dispatchers, SocketConnection &conn) { + std::unique_ptr<ConnectionHandler> io_handler = std::make_unique<IOHandler>(dispatchers, conn); + IOHandler &io_handler_concrete = static_cast<IOHandler&>(*io_handler); + auto protocol = std::make_unique<ProtocolV2>(conn, io_handler_concrete); + io_handler_concrete.set_handshake_listener(*protocol); + return {std::move(io_handler), std::move(protocol)}; +} + +} // namespace crimson::net + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::net::ProtocolV2> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc new file mode 100644 index 000000000..95b1e2250 --- /dev/null +++ b/src/crimson/net/Socket.cc @@ -0,0 +1,523 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Socket.h" + +#include <seastar/core/sleep.hh> +#include <seastar/core/when_all.hh> +#include <seastar/net/packet.hh> + +#include "crimson/common/log.h" +#include "Errors.h" + +using crimson::common::local_conf; + +namespace crimson::net { + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +using tmp_buf = seastar::temporary_buffer<char>; +using packet = seastar::net::packet; + +// an input_stream consumer that reads buffer segments into a bufferlist up to +// the given number of remaining bytes +struct bufferlist_consumer { + bufferlist& bl; + size_t& remaining; + + bufferlist_consumer(bufferlist& bl, size_t& remaining) + : bl(bl), remaining(remaining) {} + + using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type; + + // consume some or all of a buffer segment + seastar::future<consumption_result_type> operator()(tmp_buf&& data) { + if (remaining >= data.size()) { + // consume the whole buffer + remaining -= data.size(); + bl.append(buffer::create(std::move(data))); + if (remaining > 0) { + // return none to request more segments + return seastar::make_ready_future<consumption_result_type>( + seastar::continue_consuming{}); + } else { + // return an empty buffer to singal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type({})); + } + } + if (remaining > 0) { + // consume the front + bl.append(buffer::create(data.share(0, remaining))); + data.trim_front(remaining); + remaining = 0; + } + // give the rest back to signal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type{std::move(data)}); + }; +}; + +seastar::future<> inject_delay() +{ + if (float delay_period = local_conf()->ms_inject_internal_delays; + delay_period) { + logger().debug("Socket::inject_delay: sleep for {}", delay_period); + return seastar::sleep( + std::chrono::milliseconds((int)(delay_period * 1000.0))); + } + return seastar::now(); +} + +void inject_failure() +{ + if (local_conf()->ms_inject_socket_failures) { + uint64_t rand = + ceph::util::generate_random_number<uint64_t>(1, RAND_MAX); + if (rand % local_conf()->ms_inject_socket_failures == 0) { + logger().warn("Socket::inject_failure: injecting socket failure"); + throw std::system_error(make_error_code( + error::negotiation_failure)); + } + } +} + +} // anonymous namespace + +Socket::Socket( + seastar::connected_socket &&_socket, + side_t _side, + uint16_t e_port, + construct_tag) + : sid{seastar::this_shard_id()}, + socket(std::move(_socket)), + in(socket.input()), + // the default buffer size 8192 is too small that may impact our write + // performance. see seastar::net::connected_socket::output() + out(socket.output(65536)), + socket_is_shutdown(false), + side(_side), + ephemeral_port(e_port) +{ + if (local_conf()->ms_tcp_nodelay) { + socket.set_nodelay(true); + } +} + +Socket::~Socket() +{ + assert(seastar::this_shard_id() == sid); +#ifndef NDEBUG + assert(closed); +#endif +} + +seastar::future<bufferlist> +Socket::read(size_t bytes) +{ + assert(seastar::this_shard_id() == sid); +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_read).then([bytes, this] { +#endif + if (bytes == 0) { + return seastar::make_ready_future<bufferlist>(); + } + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] { + if (r.remaining) { // throw on short reads + throw std::system_error(make_error_code(error::read_eof)); + } + inject_failure(); + return inject_delay().then([this] { + return seastar::make_ready_future<bufferlist>(std::move(r.buffer)); + }); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this](auto buf) { + return try_trap_post(next_trap_read + ).then([buf = std::move(buf)]() mutable { + return std::move(buf); + }); + }); +#endif +} + +seastar::future<bufferptr> +Socket::read_exactly(size_t bytes) { + assert(seastar::this_shard_id() == sid); +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_read).then([bytes, this] { +#endif + if (bytes == 0) { + return seastar::make_ready_future<bufferptr>(); + } + return in.read_exactly(bytes).then([bytes](auto buf) { + bufferptr ptr(buffer::create(buf.share())); + if (ptr.length() < bytes) { + throw std::system_error(make_error_code(error::read_eof)); + } + inject_failure(); + return inject_delay( + ).then([ptr = std::move(ptr)]() mutable { + return seastar::make_ready_future<bufferptr>(std::move(ptr)); + }); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this](auto ptr) { + return try_trap_post(next_trap_read + ).then([ptr = std::move(ptr)]() mutable { + return std::move(ptr); + }); + }); +#endif +} + +seastar::future<> +Socket::write(bufferlist buf) +{ + assert(seastar::this_shard_id() == sid); +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write + ).then([buf = std::move(buf), this]() mutable { +#endif + inject_failure(); + return inject_delay( + ).then([buf = std::move(buf), this]() mutable { + packet p(std::move(buf)); + return out.write(std::move(p)); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif +} + +seastar::future<> +Socket::flush() +{ + assert(seastar::this_shard_id() == sid); + inject_failure(); + return inject_delay().then([this] { + return out.flush(); + }); +} + +seastar::future<> +Socket::write_flush(bufferlist buf) +{ + assert(seastar::this_shard_id() == sid); +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write + ).then([buf = std::move(buf), this]() mutable { +#endif + inject_failure(); + return inject_delay( + ).then([buf = std::move(buf), this]() mutable { + packet p(std::move(buf)); + return out.write(std::move(p) + ).then([this] { + return out.flush(); + }); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif +} + +void Socket::shutdown() +{ + assert(seastar::this_shard_id() == sid); + socket_is_shutdown = true; + socket.shutdown_input(); + socket.shutdown_output(); +} + +static inline seastar::future<> +close_and_handle_errors(seastar::output_stream<char>& out) +{ + return out.close().handle_exception_type([](const std::system_error& e) { + if (e.code() != std::errc::broken_pipe && + e.code() != std::errc::connection_reset) { + logger().error("Socket::close(): unexpected error {}", e.what()); + ceph_abort(); + } + // can happen when out is already shutdown, ignore + }); +} + +seastar::future<> +Socket::close() +{ + assert(seastar::this_shard_id() == sid); +#ifndef NDEBUG + ceph_assert_always(!closed); + closed = true; +#endif + return seastar::when_all_succeed( + inject_delay(), + in.close(), + close_and_handle_errors(out) + ).then_unpack([] { + return seastar::make_ready_future<>(); + }).handle_exception([](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("Socket::close(): unexpected exception {}", e_what); + ceph_abort(); + }); +} + +seastar::future<SocketRef> +Socket::connect(const entity_addr_t &peer_addr) +{ + inject_failure(); + return inject_delay( + ).then([peer_addr] { + return seastar::connect(peer_addr.in4_addr()); + }).then([peer_addr](seastar::connected_socket socket) { + auto ret = std::make_unique<Socket>( + std::move(socket), side_t::connector, 0, construct_tag{}); + logger().debug("Socket::connect(): connected to {}, socket {}", + peer_addr, fmt::ptr(ret)); + return ret; + }); +} + +#ifdef UNIT_TESTS_BUILT +void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { + assert(seastar::this_shard_id() == sid); + blocker = blocker_; + if (type == bp_type_t::READ) { + ceph_assert_always(next_trap_read == bp_action_t::CONTINUE); + next_trap_read = action; + } else { // type == bp_type_t::WRITE + if (next_trap_write == bp_action_t::CONTINUE) { + next_trap_write = action; + } else if (next_trap_write == bp_action_t::FAULT) { + // do_sweep_messages() may combine multiple write events into one socket write + ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); + } else { + ceph_abort(); + } + } +} + +seastar::future<> +Socket::try_trap_pre(bp_action_t& trap) { + auto action = trap; + trap = bp_action_t::CONTINUE; + switch (action) { + case bp_action_t::CONTINUE: + break; + case bp_action_t::FAULT: + logger().info("[Test] got FAULT"); + throw std::system_error(make_error_code(error::negotiation_failure)); + case bp_action_t::BLOCK: + logger().info("[Test] got BLOCK"); + return blocker->block(); + case bp_action_t::STALL: + trap = action; + break; + default: + ceph_abort("unexpected action from trap"); + } + return seastar::make_ready_future<>(); +} + +seastar::future<> +Socket::try_trap_post(bp_action_t& trap) { + auto action = trap; + trap = bp_action_t::CONTINUE; + switch (action) { + case bp_action_t::CONTINUE: + break; + case bp_action_t::STALL: + logger().info("[Test] got STALL and block"); + force_shutdown(); + return blocker->block(); + default: + ceph_abort("unexpected action from trap"); + } + return seastar::make_ready_future<>(); +} +#endif + +ShardedServerSocket::ShardedServerSocket( + seastar::shard_id sid, + bool dispatch_only_on_primary_sid, + construct_tag) + : primary_sid{sid}, dispatch_only_on_primary_sid{dispatch_only_on_primary_sid} +{ +} + +ShardedServerSocket::~ShardedServerSocket() +{ + assert(!listener); + // detect whether user have called destroy() properly + ceph_assert_always(!service); +} + +listen_ertr::future<> +ShardedServerSocket::listen(entity_addr_t addr) +{ + ceph_assert_always(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::listen()...", addr); + return this->container().invoke_on_all([addr](auto& ss) { + ss.listen_addr = addr; + seastar::socket_address s_addr(addr.in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + if (ss.dispatch_only_on_primary_sid) { + lo.set_fixed_cpu(ss.primary_sid); + } + ss.listener = seastar::listen(s_addr, lo); + }).then([] { + return listen_ertr::now(); + }).handle_exception_type( + [addr](const std::system_error& e) -> listen_ertr::future<> { + if (e.code() == std::errc::address_in_use) { + logger().debug("ShardedServerSocket({})::listen(): address in use", addr); + return crimson::ct_error::address_in_use::make(); + } else if (e.code() == std::errc::address_not_available) { + logger().debug("ShardedServerSocket({})::listen(): address not available", + addr); + return crimson::ct_error::address_not_available::make(); + } + logger().error("ShardedServerSocket({})::listen(): " + "got unexpeted error {}", addr, e.what()); + ceph_abort(); + }); +} + +seastar::future<> +ShardedServerSocket::accept(accept_func_t &&_fn_accept) +{ + ceph_assert_always(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::accept()...", listen_addr); + return this->container().invoke_on_all([_fn_accept](auto &ss) { + assert(ss.listener); + ss.fn_accept = _fn_accept; + // gate accepting + // ShardedServerSocket::shutdown() will drain the continuations in the gate + // so ignore the returned future + std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] { + return seastar::keep_doing([&ss] { + return ss.listener->accept( + ).then([&ss](seastar::accept_result accept_result) { +#ifndef NDEBUG + if (ss.dispatch_only_on_primary_sid) { + // see seastar::listen_options::set_fixed_cpu() + ceph_assert_always(seastar::this_shard_id() == ss.primary_sid); + } +#endif + auto [socket, paddr] = std::move(accept_result); + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + peer_addr.set_type(ss.listen_addr.get_type()); + SocketRef _socket = std::make_unique<Socket>( + std::move(socket), Socket::side_t::acceptor, + peer_addr.get_port(), Socket::construct_tag{}); + logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, " + "socket {}, dispatch_only_on_primary_sid = {}", + ss.listen_addr, peer_addr, fmt::ptr(_socket), + ss.dispatch_only_on_primary_sid); + std::ignore = seastar::with_gate( + ss.shutdown_gate, + [socket=std::move(_socket), peer_addr, &ss]() mutable { + return ss.fn_accept(std::move(socket), peer_addr + ).handle_exception([&ss, peer_addr](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("ShardedServerSocket({})::accept(): " + "fn_accept(s, {}) got unexpected exception {}", + ss.listen_addr, peer_addr, e_what); + ceph_abort(); + }); + }); + }); + }).handle_exception_type([&ss](const std::system_error& e) { + if (e.code() == std::errc::connection_aborted || + e.code() == std::errc::invalid_argument) { + logger().debug("ShardedServerSocket({})::accept(): stopped ({})", + ss.listen_addr, e.what()); + } else { + throw; + } + }).handle_exception([&ss](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("ShardedServerSocket({})::accept(): " + "got unexpected exception {}", ss.listen_addr, e_what); + ceph_abort(); + }); + }); + }); +} + +seastar::future<> +ShardedServerSocket::shutdown_destroy() +{ + assert(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr); + // shutdown shards + return this->container().invoke_on_all([](auto& ss) { + if (ss.listener) { + ss.listener->abort_accept(); + } + return ss.shutdown_gate.close(); + }).then([this] { + // destroy shards + return this->container().invoke_on_all([](auto& ss) { + assert(ss.shutdown_gate.is_closed()); + ss.listen_addr = entity_addr_t(); + ss.listener.reset(); + }); + }).then([this] { + // stop the sharded service: we should only construct/stop shards on #0 + return this->container().invoke_on(0, [](auto& ss) { + assert(ss.service); + return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); + }); + }); +} + +seastar::future<ShardedServerSocket*> +ShardedServerSocket::create(bool dispatch_only_on_this_shard) +{ + auto primary_sid = seastar::this_shard_id(); + // start the sharded service: we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [primary_sid, dispatch_only_on_this_shard] { + auto service = std::make_unique<sharded_service_t>(); + return service->start( + primary_sid, dispatch_only_on_this_shard, construct_tag{} + ).then([service = std::move(service)]() mutable { + auto p_shard = service.get(); + p_shard->local().service = std::move(service); + return p_shard; + }); + }).then([](auto p_shard) { + return &p_shard->local(); + }); +} + +} // namespace crimson::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h new file mode 100644 index 000000000..478f2d630 --- /dev/null +++ b/src/crimson/net/Socket.h @@ -0,0 +1,201 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> + +#include "include/buffer.h" + +#include "crimson/common/log.h" +#include "Errors.h" +#include "Fwd.h" + +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + +namespace crimson::net { + +class Socket; +using SocketRef = std::unique_ptr<Socket>; +using SocketFRef = seastar::foreign_ptr<SocketRef>; + +class Socket { + struct construct_tag {}; + +public: + // if acceptor side, peer is using a different port (ephemeral_port) + // if connector side, I'm using a different port (ephemeral_port) + enum class side_t { + acceptor, + connector + }; + Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag); + + ~Socket(); + + Socket(Socket&& o) = delete; + + seastar::shard_id get_shard_id() const { + return sid; + } + + side_t get_side() const { + return side; + } + + uint16_t get_ephemeral_port() const { + return ephemeral_port; + } + + seastar::socket_address get_local_address() const { + return socket.local_address(); + } + + bool is_shutdown() const { + assert(seastar::this_shard_id() == sid); + return socket_is_shutdown; + } + + // learn my ephemeral_port as connector. + // unfortunately, there's no way to identify which port I'm using as + // connector with current seastar interface. + void learn_ephemeral_port_as_connector(uint16_t port) { + assert(side == side_t::connector && + (ephemeral_port == 0 || ephemeral_port == port)); + ephemeral_port = port; + } + + /// read the requested number of bytes into a bufferlist + seastar::future<bufferlist> read(size_t bytes); + + seastar::future<bufferptr> read_exactly(size_t bytes); + + seastar::future<> write(bufferlist); + + seastar::future<> flush(); + + seastar::future<> write_flush(bufferlist); + + // preemptively disable further reads or writes, can only be shutdown once. + void shutdown(); + + /// Socket can only be closed once. + seastar::future<> close(); + + static seastar::future<SocketRef> + connect(const entity_addr_t& peer_addr); + + /* + * test interfaces + */ + + // shutdown for tests + void force_shutdown() { + assert(seastar::this_shard_id() == sid); + socket.shutdown_input(); + socket.shutdown_output(); + } + + // shutdown input_stream only, for tests + void force_shutdown_in() { + assert(seastar::this_shard_id() == sid); + socket.shutdown_input(); + } + + // shutdown output_stream only, for tests + void force_shutdown_out() { + assert(seastar::this_shard_id() == sid); + socket.shutdown_output(); + } + +private: + const seastar::shard_id sid; + seastar::connected_socket socket; + seastar::input_stream<char> in; + seastar::output_stream<char> out; + bool socket_is_shutdown; + side_t side; + uint16_t ephemeral_port; + +#ifndef NDEBUG + bool closed = false; +#endif + + /// buffer state for read() + struct { + bufferlist buffer; + size_t remaining; + } r; + +#ifdef UNIT_TESTS_BUILT +public: + void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); + +private: + seastar::future<> try_trap_pre(bp_action_t& trap); + + seastar::future<> try_trap_post(bp_action_t& trap); + + bp_action_t next_trap_read = bp_action_t::CONTINUE; + bp_action_t next_trap_write = bp_action_t::CONTINUE; + socket_blocker* blocker = nullptr; + +#endif + friend class ShardedServerSocket; +}; + +using listen_ertr = crimson::errorator< + crimson::ct_error::address_in_use, // The address is already bound + crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/ + >; + +class ShardedServerSocket + : public seastar::peering_sharded_service<ShardedServerSocket> { + struct construct_tag {}; + +public: + ShardedServerSocket( + seastar::shard_id sid, + bool dispatch_only_on_primary_sid, + construct_tag); + + ~ShardedServerSocket(); + + ShardedServerSocket(ShardedServerSocket&&) = delete; + ShardedServerSocket(const ShardedServerSocket&) = delete; + ShardedServerSocket& operator=(ShardedServerSocket&&) = delete; + ShardedServerSocket& operator=(const ShardedServerSocket&) = delete; + + bool is_fixed_shard_dispatching() const { + return dispatch_only_on_primary_sid; + } + + listen_ertr::future<> listen(entity_addr_t addr); + + using accept_func_t = + std::function<seastar::future<>(SocketRef, entity_addr_t)>; + seastar::future<> accept(accept_func_t &&_fn_accept); + + seastar::future<> shutdown_destroy(); + + static seastar::future<ShardedServerSocket*> create( + bool dispatch_only_on_this_shard); + +private: + const seastar::shard_id primary_sid; + /// XXX: Remove once all infrastructure uses multi-core messenger + const bool dispatch_only_on_primary_sid; + entity_addr_t listen_addr; + std::optional<seastar::server_socket> listener; + seastar::gate shutdown_gate; + accept_func_t fn_accept; + + using sharded_service_t = seastar::sharded<ShardedServerSocket>; + std::unique_ptr<sharded_service_t> service; +}; + +} // namespace crimson::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc new file mode 100644 index 000000000..57e5c12c1 --- /dev/null +++ b/src/crimson/net/SocketConnection.cc @@ -0,0 +1,220 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketConnection.h" + +#include "ProtocolV2.h" +#include "SocketMessenger.h" + +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + +using std::ostream; +using crimson::common::local_conf; + +namespace crimson::net { + +SocketConnection::SocketConnection(SocketMessenger& messenger, + ChainedDispatchers& dispatchers) + : msgr_sid{messenger.get_shard_id()}, messenger(messenger) +{ + auto ret = create_handlers(dispatchers, *this); + io_handler = std::move(ret.io_handler); + protocol = std::move(ret.protocol); +#ifdef UNIT_TESTS_BUILT + if (messenger.interceptor) { + interceptor = messenger.interceptor; + interceptor->register_conn(this->get_local_shared_foreign_from_this()); + } +#endif +} + +SocketConnection::~SocketConnection() {} + +bool SocketConnection::is_connected() const +{ + return io_handler->is_connected(); +} + +#ifdef UNIT_TESTS_BUILT +bool SocketConnection::is_protocol_ready() const +{ + assert(seastar::this_shard_id() == msgr_sid); + return protocol->is_ready(); +} + +bool SocketConnection::is_protocol_standby() const { + assert(seastar::this_shard_id() == msgr_sid); + return protocol->is_standby(); +} + +bool SocketConnection::is_protocol_closed() const +{ + assert(seastar::this_shard_id() == msgr_sid); + return protocol->is_closed(); +} + +bool SocketConnection::is_protocol_closed_clean() const +{ + assert(seastar::this_shard_id() == msgr_sid); + return protocol->is_closed_clean(); +} + +#endif +bool SocketConnection::peer_wins() const +{ + assert(seastar::this_shard_id() == msgr_sid); + return (messenger.get_myaddr() > peer_addr || policy.server); +} + +seastar::future<> SocketConnection::send(MessageURef _msg) +{ + // may be invoked from any core + MessageFRef msg = seastar::make_foreign(std::move(_msg)); + return io_handler->send(std::move(msg)); +} + +seastar::future<> SocketConnection::send_keepalive() +{ + // may be invoked from any core + return io_handler->send_keepalive(); +} + +SocketConnection::clock_t::time_point +SocketConnection::get_last_keepalive() const +{ + return io_handler->get_last_keepalive(); +} + +SocketConnection::clock_t::time_point +SocketConnection::get_last_keepalive_ack() const +{ + return io_handler->get_last_keepalive_ack(); +} + +void SocketConnection::set_last_keepalive_ack(clock_t::time_point when) +{ + io_handler->set_last_keepalive_ack(when); +} + +void SocketConnection::mark_down() +{ + io_handler->mark_down(); +} + +void +SocketConnection::start_connect(const entity_addr_t& _peer_addr, + const entity_name_t& _peer_name) +{ + assert(seastar::this_shard_id() == msgr_sid); + protocol->start_connect(_peer_addr, _peer_name); +} + +void +SocketConnection::start_accept(SocketFRef&& sock, + const entity_addr_t& _peer_addr) +{ + assert(seastar::this_shard_id() == msgr_sid); + protocol->start_accept(std::move(sock), _peer_addr); +} + +seastar::future<> +SocketConnection::close_clean_yielded() +{ + assert(seastar::this_shard_id() == msgr_sid); + return protocol->close_clean_yielded(); +} + +seastar::socket_address SocketConnection::get_local_address() const { + assert(seastar::this_shard_id() == msgr_sid); + return socket->get_local_address(); +} + +ConnectionRef +SocketConnection::get_local_shared_foreign_from_this() +{ + assert(seastar::this_shard_id() == msgr_sid); + return make_local_shared_foreign( + seastar::make_foreign(shared_from_this())); +} + +SocketMessenger & +SocketConnection::get_messenger() const +{ + assert(seastar::this_shard_id() == msgr_sid); + return messenger; +} + +seastar::shard_id +SocketConnection::get_messenger_shard_id() const +{ + return msgr_sid; +} + +void SocketConnection::set_peer_type(entity_type_t peer_type) { + assert(seastar::this_shard_id() == msgr_sid); + // it is not allowed to assign an unknown value when the current + // value is known + assert(!(peer_type == 0 && + peer_name.type() != 0)); + // it is not allowed to assign a different known value when the + // current value is also known. + assert(!(peer_type != 0 && + peer_name.type() != 0 && + peer_type != peer_name.type())); + peer_name._type = peer_type; +} + +void SocketConnection::set_peer_id(int64_t peer_id) { + assert(seastar::this_shard_id() == msgr_sid); + // it is not allowed to assign an unknown value when the current + // value is known + assert(!(peer_id == entity_name_t::NEW && + peer_name.num() != entity_name_t::NEW)); + // it is not allowed to assign a different known value when the + // current value is also known. + assert(!(peer_id != entity_name_t::NEW && + peer_name.num() != entity_name_t::NEW && + peer_id != peer_name.num())); + peer_name._num = peer_id; +} + +void SocketConnection::set_features(uint64_t f) { + assert(seastar::this_shard_id() == msgr_sid); + features = f; +} + +void SocketConnection::set_socket(Socket *s) { + assert(seastar::this_shard_id() == msgr_sid); + socket = s; +} + +void SocketConnection::print(ostream& out) const { + out << (void*)this << " "; + messenger.print(out); + if (seastar::this_shard_id() != msgr_sid) { + out << " >> " << get_peer_name() << " " << peer_addr; + } else if (!socket) { + out << " >> " << get_peer_name() << " " << peer_addr; + } else if (socket->get_side() == Socket::side_t::acceptor) { + out << " >> " << get_peer_name() << " " << peer_addr + << "@" << socket->get_ephemeral_port(); + } else { // socket->get_side() == Socket::side_t::connector + out << "@" << socket->get_ephemeral_port() + << " >> " << get_peer_name() << " " << peer_addr; + } +} + +} // namespace crimson::net diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h new file mode 100644 index 000000000..823d6c574 --- /dev/null +++ b/src/crimson/net/SocketConnection.h @@ -0,0 +1,236 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/sharded.hh> + +#include "msg/Policy.h" +#include "crimson/common/throttle.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Socket.h" + +namespace crimson::net { + +class ProtocolV2; +class SocketMessenger; +class SocketConnection; +using SocketConnectionRef = seastar::shared_ptr<SocketConnection>; + +#ifdef UNIT_TESTS_BUILT +class Interceptor; +#endif + +/** + * ConnectionHandler + * + * The interface class to implement Connection, called by SocketConnection. + * + * The operations must be done in get_shard_id(). + */ +class ConnectionHandler { +public: + using clock_t = seastar::lowres_system_clock; + + virtual ~ConnectionHandler() = default; + + ConnectionHandler(const ConnectionHandler &) = delete; + ConnectionHandler(ConnectionHandler &&) = delete; + ConnectionHandler &operator=(const ConnectionHandler &) = delete; + ConnectionHandler &operator=(ConnectionHandler &&) = delete; + + virtual seastar::shard_id get_shard_id() const = 0; + + virtual bool is_connected() const = 0; + + virtual seastar::future<> send(MessageFRef) = 0; + + virtual seastar::future<> send_keepalive() = 0; + + virtual clock_t::time_point get_last_keepalive() const = 0; + + virtual clock_t::time_point get_last_keepalive_ack() const = 0; + + virtual void set_last_keepalive_ack(clock_t::time_point) = 0; + + virtual void mark_down() = 0; + +protected: + ConnectionHandler() = default; +}; + +class SocketConnection : public Connection { + /* + * Connection interfaces, public to users + * Working in ConnectionHandler::get_shard_id() + */ + public: + SocketConnection(SocketMessenger& messenger, + ChainedDispatchers& dispatchers); + + ~SocketConnection() override; + + const seastar::shard_id get_shard_id() const override { + return io_handler->get_shard_id(); + } + + const entity_name_t &get_peer_name() const override { + return peer_name; + } + + const entity_addr_t &get_peer_addr() const override { + return peer_addr; + } + + const entity_addr_t &get_peer_socket_addr() const override { + return target_addr; + } + + uint64_t get_features() const override { + return features; + } + + bool is_connected() const override; + + seastar::future<> send(MessageURef msg) override; + + seastar::future<> send_keepalive() override; + + clock_t::time_point get_last_keepalive() const override; + + clock_t::time_point get_last_keepalive_ack() const override; + + void set_last_keepalive_ack(clock_t::time_point when) override; + + void mark_down() override; + + bool has_user_private() const override { + return user_private != nullptr; + } + + user_private_t &get_user_private() override { + assert(has_user_private()); + return *user_private; + } + + void set_user_private(std::unique_ptr<user_private_t> new_user_private) override { + assert(!has_user_private()); + user_private = std::move(new_user_private); + } + + void print(std::ostream& out) const override; + + /* + * Public to SocketMessenger + * Working in SocketMessenger::get_shard_id(); + */ + public: + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name); + + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + void start_accept(SocketFRef&& socket, + const entity_addr_t& peer_addr); + + seastar::future<> close_clean_yielded(); + + seastar::socket_address get_local_address() const; + + seastar::shard_id get_messenger_shard_id() const; + + SocketMessenger &get_messenger() const; + + ConnectionRef get_local_shared_foreign_from_this(); + +private: + void set_peer_type(entity_type_t peer_type); + + void set_peer_id(int64_t peer_id); + + void set_peer_name(entity_name_t name) { + set_peer_type(name.type()); + set_peer_id(name.num()); + } + + void set_features(uint64_t f); + + void set_socket(Socket *s); + +#ifdef UNIT_TESTS_BUILT + bool is_protocol_ready() const override; + + bool is_protocol_standby() const override; + + bool is_protocol_closed_clean() const override; + + bool is_protocol_closed() const override; + + // peer wins if myaddr > peeraddr + bool peer_wins() const override; + + Interceptor *interceptor = nullptr; +#else + // peer wins if myaddr > peeraddr + bool peer_wins() const; +#endif + +private: + const seastar::shard_id msgr_sid; + + /* + * Core owner is messenger core, may allow to access from the I/O core. + */ + SocketMessenger& messenger; + + std::unique_ptr<ProtocolV2> protocol; + + Socket *socket = nullptr; + + entity_name_t peer_name = {0, entity_name_t::NEW}; + + entity_addr_t peer_addr; + + // which of the peer_addrs we're connecting to (as client) + // or should reconnect to (as peer) + entity_addr_t target_addr; + + uint64_t features = 0; + + ceph::net::Policy<crimson::common::Throttle> policy; + + uint64_t peer_global_id = 0; + + /* + * Core owner is I/O core (mutable). + */ + std::unique_ptr<ConnectionHandler> io_handler; + + /* + * Core owner is up to the connection user. + */ + std::unique_ptr<user_private_t> user_private; + + friend class IOHandler; + friend class ProtocolV2; + friend class FrameAssemblerV2; +}; + +} // namespace crimson::net + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::net::SocketConnection> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc new file mode 100644 index 000000000..382d08f98 --- /dev/null +++ b/src/crimson/net/SocketMessenger.cc @@ -0,0 +1,485 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketMessenger.h" + +#include <seastar/core/sleep.hh> + +#include <tuple> +#include <boost/functional/hash.hpp> +#include <fmt/os.h> + +#include "auth/Auth.h" +#include "Errors.h" +#include "Socket.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); + } +} + +namespace crimson::net { + +SocketMessenger::SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce, + bool dispatch_only_on_this_shard) + : sid{seastar::this_shard_id()}, + logic_name{logic_name}, + nonce{nonce}, + dispatch_only_on_sid{dispatch_only_on_this_shard}, + my_name{myname} +{} + +SocketMessenger::~SocketMessenger() +{ + logger().debug("~SocketMessenger: {}", logic_name); + ceph_assert_always(seastar::this_shard_id() == sid); + ceph_assert(!listener); +} + +bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) +{ + assert(seastar::this_shard_id() == sid); + bool ret = false; + + entity_addrvec_t newaddrs = my_addrs; + for (auto& a : newaddrs.v) { + if (a.is_blank_ip()) { + int type = a.get_type(); + int port = a.get_port(); + uint32_t nonce = a.get_nonce(); + for (auto& b : addrs.v) { + if (a.get_family() == b.get_family()) { + logger().debug(" assuming my addr {} matches provided addr {}", a, b); + a = b; + a.set_nonce(nonce); + a.set_type(type); + a.set_port(port); + ret = true; + break; + } + } + } + } + my_addrs = newaddrs; + return ret; +} + +void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) +{ + assert(seastar::this_shard_id() == sid); + my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } +} + +crimson::net::listen_ertr::future<> +SocketMessenger::do_listen(const entity_addrvec_t& addrs) +{ + ceph_assert(addrs.front().get_family() == AF_INET); + set_myaddrs(addrs); + return seastar::futurize_invoke([this] { + if (!listener) { + return ShardedServerSocket::create(dispatch_only_on_sid + ).then([this] (auto _listener) { + listener = _listener; + }); + } else { + return seastar::now(); + } + }).then([this] () -> listen_ertr::future<> { + const entity_addr_t listen_addr = get_myaddr(); + logger().debug("{} do_listen: try listen {}...", *this, listen_addr); + if (!listener) { + logger().warn("{} do_listen: listener doesn't exist", *this); + return listen_ertr::now(); + } + return listener->listen(listen_addr); + }); +} + +SocketMessenger::bind_ertr::future<> +SocketMessenger::try_bind(const entity_addrvec_t& addrs, + uint32_t min_port, uint32_t max_port) +{ + // the classical OSD iterates over the addrvec and tries to listen on each + // addr. crimson doesn't need to follow as there is a consensus we need to + // worry only about proto v2. + assert(addrs.size() == 1); + auto addr = addrs.msgr2_addr(); + if (addr.get_port() != 0) { + return do_listen(addrs).safe_then([this] { + logger().info("{} try_bind: done", *this); + }); + } + ceph_assert(min_port <= max_port); + return seastar::do_with(uint32_t(min_port), + [this, max_port, addr] (auto& port) { + return seastar::repeat_until_value([this, max_port, addr, &port] { + auto to_bind = addr; + to_bind.set_port(port); + return do_listen(entity_addrvec_t{to_bind} + ).safe_then([this] () -> seastar::future<std::optional<std::error_code>> { + logger().info("{} try_bind: done", *this); + return seastar::make_ready_future<std::optional<std::error_code>>( + std::make_optional<std::error_code>(std::error_code{/* success! */})); + }, listen_ertr::all_same_way([this, max_port, &port] + (const std::error_code& e) mutable + -> seastar::future<std::optional<std::error_code>> { + logger().trace("{} try_bind: {} got error {}", *this, port, e); + if (port == max_port) { + return seastar::make_ready_future<std::optional<std::error_code>>( + std::make_optional<std::error_code>(e)); + } + ++port; + return seastar::make_ready_future<std::optional<std::error_code>>( + std::optional<std::error_code>{std::nullopt}); + })); + }).then([] (const std::error_code e) -> bind_ertr::future<> { + if (!e) { + return bind_ertr::now(); // success! + } else if (e == std::errc::address_in_use) { + return crimson::ct_error::address_in_use::make(); + } else if (e == std::errc::address_not_available) { + return crimson::ct_error::address_not_available::make(); + } + ceph_abort(); + }); + }); +} + +SocketMessenger::bind_ertr::future<> +SocketMessenger::bind(const entity_addrvec_t& addrs) +{ + assert(seastar::this_shard_id() == sid); + using crimson::common::local_conf; + return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count}, + [this, addrs] (auto& count) { + return seastar::repeat_until_value([this, addrs, &count] { + assert(count >= 0); + return try_bind(addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .safe_then([this] { + logger().info("{} try_bind: done", *this); + return seastar::make_ready_future<std::optional<std::error_code>>( + std::make_optional<std::error_code>(std::error_code{/* success! */})); + }, bind_ertr::all_same_way([this, &count] (const std::error_code error) { + if (count-- > 0) { + logger().info("{} was unable to bind. Trying again in {} seconds", + *this, local_conf()->ms_bind_retry_delay); + return seastar::sleep( + std::chrono::seconds(local_conf()->ms_bind_retry_delay) + ).then([] { + // one more time, please + return seastar::make_ready_future<std::optional<std::error_code>>( + std::optional<std::error_code>{std::nullopt}); + }); + } else { + logger().info("{} was unable to bind after {} attempts: {}", + *this, local_conf()->ms_bind_retry_count, error); + return seastar::make_ready_future<std::optional<std::error_code>>( + std::make_optional<std::error_code>(error)); + } + })); + }).then([] (const std::error_code error) -> bind_ertr::future<> { + if (!error) { + return bind_ertr::now(); // success! + } else if (error == std::errc::address_in_use) { + return crimson::ct_error::address_in_use::make(); + } else if (error == std::errc::address_not_available) { + return crimson::ct_error::address_not_available::make(); + } + ceph_abort(); + }); + }); +} + +seastar::future<> SocketMessenger::accept( + SocketFRef &&socket, const entity_addr_t &peer_addr) +{ + assert(seastar::this_shard_id() == sid); + SocketConnectionRef conn = + seastar::make_shared<SocketConnection>(*this, dispatchers); + conn->start_accept(std::move(socket), peer_addr); + return seastar::now(); +} + +seastar::future<> SocketMessenger::start( + const dispatchers_t& _dispatchers) { + assert(seastar::this_shard_id() == sid); + + dispatchers.assign(_dispatchers); + if (listener) { + // make sure we have already bound to a valid address + ceph_assert(get_myaddr().is_msgr2()); + ceph_assert(get_myaddr().get_port() > 0); + + return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) { + assert(get_myaddr().is_msgr2()); + SocketFRef socket = seastar::make_foreign(std::move(_socket)); + if (listener->is_fixed_shard_dispatching()) { + return accept(std::move(socket), peer_addr); + } else { + return seastar::smp::submit_to(sid, + [this, peer_addr, socket = std::move(socket)]() mutable { + return accept(std::move(socket), peer_addr); + }); + } + }); + } + return seastar::now(); +} + +crimson::net::ConnectionRef +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) +{ + assert(seastar::this_shard_id() == sid); + + // make sure we connect to a valid peer_addr + if (!peer_addr.is_msgr2()) { + ceph_abort_msg("ProtocolV1 is no longer supported"); + } + ceph_assert(peer_addr.get_port() > 0); + + if (auto found = lookup_conn(peer_addr); found) { + logger().debug("{} connect to existing", *found); + return found->get_local_shared_foreign_from_this(); + } + SocketConnectionRef conn = + seastar::make_shared<SocketConnection>(*this, dispatchers); + conn->start_connect(peer_addr, peer_name); + return conn->get_local_shared_foreign_from_this(); +} + +seastar::future<> SocketMessenger::shutdown() +{ + assert(seastar::this_shard_id() == sid); + return seastar::futurize_invoke([this] { + assert(dispatchers.empty()); + if (listener) { + auto d_listener = listener; + listener = nullptr; + return d_listener->shutdown_destroy(); + } else { + return seastar::now(); + } + // close all connections + }).then([this] { + return seastar::parallel_for_each(accepting_conns, [] (auto conn) { + return conn->close_clean_yielded(); + }); + }).then([this] { + ceph_assert(accepting_conns.empty()); + return seastar::parallel_for_each(connections, [] (auto conn) { + return conn.second->close_clean_yielded(); + }); + }).then([this] { + return seastar::parallel_for_each(closing_conns, [] (auto conn) { + return conn->close_clean_yielded(); + }); + }).then([this] { + ceph_assert(connections.empty()); + shutdown_promise.set_value(); + }); +} + +static entity_addr_t choose_addr( + const entity_addr_t &peer_addr_for_me, + const SocketConnection& conn) +{ + using crimson::common::local_conf; + // XXX: a syscall is here + if (const auto local_addr = conn.get_local_address(); + local_conf()->ms_learn_addr_from_peer) { + logger().info("{} peer {} says I am {} (socket says {})", + conn, conn.get_peer_socket_addr(), peer_addr_for_me, + local_addr); + return peer_addr_for_me; + } else { + const auto local_addr_for_me = conn.get_local_address(); + logger().info("{} socket to {} says I am {} (peer says {})", + conn, conn.get_peer_socket_addr(), + local_addr, peer_addr_for_me); + entity_addr_t addr; + addr.set_sockaddr(&local_addr_for_me.as_posix_sockaddr()); + return addr; + } +} + +void SocketMessenger::learned_addr( + const entity_addr_t &peer_addr_for_me, + const SocketConnection& conn) +{ + assert(seastar::this_shard_id() == sid); + if (!need_addr) { + if ((!get_myaddr().is_any() && + get_myaddr().get_type() != peer_addr_for_me.get_type()) || + get_myaddr().get_family() != peer_addr_for_me.get_family() || + !get_myaddr().is_same_host(peer_addr_for_me)) { + logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + return; + } + + if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) { + // Not bound + auto addr = choose_addr(peer_addr_for_me, conn); + addr.set_type(entity_addr_t::TYPE_ANY); + addr.set_port(0); + need_addr = false; + set_myaddrs(entity_addrvec_t{addr}); + logger().info("{} learned myaddr={} (unbound)", conn, get_myaddr()); + } else { + // Already bound + if (!get_myaddr().is_any() && + get_myaddr().get_type() != peer_addr_for_me.get_type()) { + logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (get_myaddr().get_family() != peer_addr_for_me.get_family()) { + logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (get_myaddr().is_blank_ip()) { + auto addr = choose_addr(peer_addr_for_me, conn); + addr.set_type(get_myaddr().get_type()); + addr.set_port(get_myaddr().get_port()); + need_addr = false; + set_myaddrs(entity_addrvec_t{addr}); + logger().info("{} learned myaddr={} (blank IP)", conn, get_myaddr()); + } else if (!get_myaddr().is_same_host(peer_addr_for_me)) { + logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } else { + need_addr = false; + } + } +} + +SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const +{ + assert(seastar::this_shard_id() == sid); + return policy_set.get(peer_type); +} + +SocketPolicy SocketMessenger::get_default_policy() const +{ + assert(seastar::this_shard_id() == sid); + return policy_set.get_default(); +} + +void SocketMessenger::set_default_policy(const SocketPolicy& p) +{ + assert(seastar::this_shard_id() == sid); + policy_set.set_default(p); +} + +void SocketMessenger::set_policy(entity_type_t peer_type, + const SocketPolicy& p) +{ + assert(seastar::this_shard_id() == sid); + policy_set.set(peer_type, p); +} + +void SocketMessenger::set_policy_throttler(entity_type_t peer_type, + Throttle* throttle) +{ + assert(seastar::this_shard_id() == sid); + // only byte throttler is used in OSD + policy_set.set_throttlers(peer_type, throttle, nullptr); +} + +crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) +{ + assert(seastar::this_shard_id() == sid); + if (auto found = connections.find(addr); + found != connections.end()) { + return found->second; + } else { + return nullptr; + } +} + +void SocketMessenger::accept_conn(SocketConnectionRef conn) +{ + assert(seastar::this_shard_id() == sid); + accepting_conns.insert(conn); +} + +void SocketMessenger::unaccept_conn(SocketConnectionRef conn) +{ + assert(seastar::this_shard_id() == sid); + accepting_conns.erase(conn); +} + +void SocketMessenger::register_conn(SocketConnectionRef conn) +{ + assert(seastar::this_shard_id() == sid); + auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); + std::ignore = i; + ceph_assert(added); +} + +void SocketMessenger::unregister_conn(SocketConnectionRef conn) +{ + assert(seastar::this_shard_id() == sid); + ceph_assert(conn); + auto found = connections.find(conn->get_peer_addr()); + ceph_assert(found != connections.end()); + ceph_assert(found->second == conn); + connections.erase(found); +} + +void SocketMessenger::closing_conn(SocketConnectionRef conn) +{ + assert(seastar::this_shard_id() == sid); + closing_conns.push_back(conn); +} + +void SocketMessenger::closed_conn(SocketConnectionRef conn) +{ + assert(seastar::this_shard_id() == sid); + for (auto it = closing_conns.begin(); + it != closing_conns.end();) { + if (*it == conn) { + it = closing_conns.erase(it); + } else { + it++; + } + } +} + +uint32_t SocketMessenger::get_global_seq(uint32_t old) +{ + assert(seastar::this_shard_id() == sid); + if (old > global_seq) { + global_seq = old; + } + return ++global_seq; +} + +} // namespace crimson::net diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h new file mode 100644 index 000000000..e4ac63184 --- /dev/null +++ b/src/crimson/net/SocketMessenger.h @@ -0,0 +1,192 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <map> +#include <set> +#include <vector> +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> +#include <seastar/core/shared_future.hh> + +#include "crimson/net/chained_dispatchers.h" +#include "Messenger.h" +#include "Socket.h" +#include "SocketConnection.h" + +namespace crimson::net { + +class ShardedServerSocket; + +class SocketMessenger final : public Messenger { +// Messenger public interfaces +public: + SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce, + bool dispatch_only_on_this_shard); + + ~SocketMessenger() override; + + const entity_name_t &get_myname() const override { + return my_name; + } + + const entity_addrvec_t &get_myaddrs() const override { + return my_addrs; + } + + void set_myaddrs(const entity_addrvec_t& addr) override; + + bool set_addr_unknowns(const entity_addrvec_t &addr) override; + + void set_auth_client(crimson::auth::AuthClient *ac) override { + assert(seastar::this_shard_id() == sid); + auth_client = ac; + } + + void set_auth_server(crimson::auth::AuthServer *as) override { + assert(seastar::this_shard_id() == sid); + auth_server = as; + } + + bind_ertr::future<> bind(const entity_addrvec_t& addr) override; + + seastar::future<> start(const dispatchers_t& dispatchers) override; + + ConnectionRef connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) override; + + bool owns_connection(Connection &conn) const override { + assert(seastar::this_shard_id() == sid); + return this == &static_cast<SocketConnection&>(conn).get_messenger(); + } + + // can only wait once + seastar::future<> wait() override { + assert(seastar::this_shard_id() == sid); + return shutdown_promise.get_future(); + } + + void stop() override { + assert(seastar::this_shard_id() == sid); + dispatchers.clear(); + } + + bool is_started() const override { + assert(seastar::this_shard_id() == sid); + return !dispatchers.empty(); + } + + seastar::future<> shutdown() override; + + void print(std::ostream& out) const override { + out << get_myname() + << "(" << logic_name + << ") " << get_myaddr(); + } + + SocketPolicy get_policy(entity_type_t peer_type) const override; + + SocketPolicy get_default_policy() const override; + + void set_default_policy(const SocketPolicy& p) override; + + void set_policy(entity_type_t peer_type, const SocketPolicy& p) override; + + void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; + +// SocketMessenger public interfaces +public: + crimson::auth::AuthClient* get_auth_client() const { + assert(seastar::this_shard_id() == sid); + return auth_client; + } + + crimson::auth::AuthServer* get_auth_server() const { + assert(seastar::this_shard_id() == sid); + return auth_server; + } + + uint32_t get_global_seq(uint32_t old=0); + + void learned_addr(const entity_addr_t &peer_addr_for_me, + const SocketConnection& conn); + + SocketConnectionRef lookup_conn(const entity_addr_t& addr); + + void accept_conn(SocketConnectionRef); + + void unaccept_conn(SocketConnectionRef); + + void register_conn(SocketConnectionRef); + + void unregister_conn(SocketConnectionRef); + + void closing_conn(SocketConnectionRef); + + void closed_conn(SocketConnectionRef); + + seastar::shard_id get_shard_id() const { + return sid; + } + +#ifdef UNIT_TESTS_BUILT + void set_interceptor(Interceptor *i) override { + interceptor = i; + } + + Interceptor *interceptor = nullptr; +#endif + +private: + seastar::future<> accept(SocketFRef &&, const entity_addr_t &); + + listen_ertr::future<> do_listen(const entity_addrvec_t& addr); + + /// try to bind to the first unused port of given address + bind_ertr::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port); + + const seastar::shard_id sid; + // Distinguish messengers with meaningful names for debugging + const std::string logic_name; + const uint32_t nonce; + const bool dispatch_only_on_sid; + + entity_name_t my_name; + entity_addrvec_t my_addrs; + crimson::auth::AuthClient* auth_client = nullptr; + crimson::auth::AuthServer* auth_server = nullptr; + + ShardedServerSocket *listener = nullptr; + ChainedDispatchers dispatchers; + std::map<entity_addr_t, SocketConnectionRef> connections; + std::set<SocketConnectionRef> accepting_conns; + std::vector<SocketConnectionRef> closing_conns; + ceph::net::PolicySet<Throttle> policy_set; + // specifying we haven't learned our addr; set false when we find it. + bool need_addr = true; + uint32_t global_seq = 0; + bool started = false; + seastar::promise<> shutdown_promise; +}; + +} // namespace crimson::net + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::net::SocketMessenger> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc new file mode 100644 index 000000000..1e4af3baa --- /dev/null +++ b/src/crimson/net/chained_dispatchers.cc @@ -0,0 +1,114 @@ +#include "crimson/common/log.h" +#include "crimson/net/chained_dispatchers.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "msg/Message.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); + } +} + +namespace crimson::net { + +seastar::future<> +ChainedDispatchers::ms_dispatch(ConnectionRef conn, + MessageRef m) { + try { + for (auto& dispatcher : dispatchers) { + auto dispatched = dispatcher->ms_dispatch(conn, m); + if (dispatched.has_value()) { + return std::move(*dispatched + ).handle_exception([conn] (std::exception_ptr eptr) { + logger().error("{} got unexpected exception in ms_dispatch() throttling {}", + *conn, eptr); + ceph_abort(); + }); + } + } + } catch (...) { + logger().error("{} got unexpected exception in ms_dispatch() {}", + *conn, std::current_exception()); + ceph_abort(); + } + if (!dispatchers.empty()) { + logger().error("ms_dispatch unhandled message {}", *m); + } + return seastar::now(); +} + +void +ChainedDispatchers::ms_handle_shard_change( + ConnectionRef conn, + seastar::shard_id new_shard, + bool ac) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_shard_change(conn, new_shard, ac); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_shard_change() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_accept( + ConnectionRef conn, + seastar::shard_id prv_shard, + bool is_replace) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_accept(conn, prv_shard, is_replace); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_accept() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_connect( + ConnectionRef conn, + seastar::shard_id prv_shard) { + try { + for(auto& dispatcher : dispatchers) { + dispatcher->ms_handle_connect(conn, prv_shard); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_connect() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_reset(ConnectionRef conn, bool is_replace) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_reset(conn, is_replace); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_reset() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_remote_reset(ConnectionRef conn) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_remote_reset(conn); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_remote_reset() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +} diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h new file mode 100644 index 000000000..ec085864f --- /dev/null +++ b/src/crimson/net/chained_dispatchers.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/smp.hh> + +#include "Fwd.h" +#include "crimson/common/log.h" + +namespace crimson::net { + +class Dispatcher; + +class ChainedDispatchers { +public: + void assign(const dispatchers_t& _dispatchers) { + assert(empty()); + assert(!_dispatchers.empty()); + dispatchers = _dispatchers; + } + void clear() { + dispatchers.clear(); + } + bool empty() const { + return dispatchers.empty(); + } + seastar::future<> ms_dispatch(ConnectionRef, MessageRef); + void ms_handle_shard_change(ConnectionRef, seastar::shard_id, bool); + void ms_handle_accept(ConnectionRef conn, seastar::shard_id, bool is_replace); + void ms_handle_connect(ConnectionRef conn, seastar::shard_id); + void ms_handle_reset(ConnectionRef conn, bool is_replace); + void ms_handle_remote_reset(ConnectionRef conn); + + private: + dispatchers_t dispatchers; +}; + +} diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc new file mode 100644 index 000000000..c414c48e1 --- /dev/null +++ b/src/crimson/net/io_handler.cc @@ -0,0 +1,1287 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "io_handler.h" + +#include "auth/Auth.h" + +#include "crimson/common/formatter.h" +#include "crimson/common/log.h" +#include "crimson/net/Errors.h" +#include "crimson/net/chained_dispatchers.h" +#include "crimson/net/SocketMessenger.h" +#include "msg/Message.h" +#include "msg/msg_fmt.h" + +using namespace ceph::msgr::v2; +using crimson::common::local_conf; + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +[[noreturn]] void abort_in_fault() { + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); +} + +[[noreturn]] void abort_protocol() { + throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); +} + +std::size_t get_msg_size(const FrameAssembler &rx_frame_asm) +{ + ceph_assert(rx_frame_asm.get_num_segments() > 0); + size_t sum = 0; + // we don't include SegmentIndex::Msg::HEADER. + for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { + sum += rx_frame_asm.get_segment_logical_len(idx); + } + return sum; +} + +} // namespace anonymous + +namespace crimson::net { + +IOHandler::IOHandler(ChainedDispatchers &dispatchers, + SocketConnection &conn) + : shard_states(shard_states_t::create( + seastar::this_shard_id(), io_state_t::none)), + dispatchers(dispatchers), + conn(conn), + conn_ref(conn.get_local_shared_foreign_from_this()) +{} + +IOHandler::~IOHandler() +{ + // close_io() must be finished + ceph_assert_always(maybe_prv_shard_states == nullptr); + // should be true in the according shard + // ceph_assert_always(shard_states->assert_closed_and_exit()); + assert(!conn_ref); +} + +#ifdef UNIT_TESTS_BUILT +IOHandler::sweep_ret +#else +ceph::bufferlist +#endif +IOHandler::sweep_out_pending_msgs_to_sent( + bool require_keepalive, + std::optional<utime_t> maybe_keepalive_ack, + bool require_ack) +{ + std::size_t num_msgs = out_pending_msgs.size(); + ceph::bufferlist bl; + +#ifdef UNIT_TESTS_BUILT + std::vector<Tag> tags; +#endif + + if (unlikely(require_keepalive)) { + auto keepalive_frame = KeepAliveFrame::Encode(); + bl.append(frame_assembler->get_buffer(keepalive_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = KeepAliveFrame::tag; + tags.push_back(tag); +#endif + } + + if (unlikely(maybe_keepalive_ack.has_value())) { + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); + bl.append(frame_assembler->get_buffer(keepalive_ack_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = KeepAliveFrameAck::tag; + tags.push_back(tag); +#endif + } + + if (require_ack && num_msgs == 0u) { + auto ack_frame = AckFrame::Encode(in_seq); + bl.append(frame_assembler->get_buffer(ack_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = AckFrame::tag; + tags.push_back(tag); +#endif + } + + std::for_each( + out_pending_msgs.begin(), + out_pending_msgs.begin()+num_msgs, + [this, &bl +#ifdef UNIT_TESTS_BUILT + , &tags +#endif + ](const MessageFRef& msg) { + // set priority + msg->get_header().src = conn.messenger.get_myname(); + + msg->encode(conn.features, 0); + + ceph_assert(!msg->get_seq() && "message already has seq"); + msg->set_seq(++out_seq); + + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + ceph_msg_header2 header2{header.seq, header.tid, + header.type, header.priority, + header.version, + ceph_le32(0), header.data_off, + ceph_le64(in_seq), + footer.flags, header.compat_version, + header.reserved}; + + auto message = MessageFrame::Encode(header2, + msg->get_payload(), msg->get_middle(), msg->get_data()); + logger().debug("{} --> #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + bl.append(frame_assembler->get_buffer(message)); +#ifdef UNIT_TESTS_BUILT + auto tag = MessageFrame::tag; + tags.push_back(tag); +#endif + }); + + if (!conn.policy.lossy) { + out_sent_msgs.insert( + out_sent_msgs.end(), + std::make_move_iterator(out_pending_msgs.begin()), + std::make_move_iterator(out_pending_msgs.end())); + } + out_pending_msgs.clear(); + +#ifdef UNIT_TESTS_BUILT + return sweep_ret{std::move(bl), tags}; +#else + return bl; +#endif +} + +seastar::future<> IOHandler::send(MessageFRef msg) +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send(std::move(msg)); + } else { + logger().trace("{} send() is directed to {} -- {}", + conn, get_shard_id(), *msg); + return seastar::smp::submit_to( + get_shard_id(), [this, msg=std::move(msg)]() mutable { + return send_redirected(std::move(msg)); + }); + } +} + +seastar::future<> IOHandler::send_redirected(MessageFRef msg) +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send(std::move(msg)); + } else { + logger().debug("{} send() is redirected to {} -- {}", + conn, get_shard_id(), *msg); + return seastar::smp::submit_to( + get_shard_id(), [this, msg=std::move(msg)]() mutable { + return send_redirected(std::move(msg)); + }); + } +} + +seastar::future<> IOHandler::do_send(MessageFRef msg) +{ + assert(seastar::this_shard_id() == get_shard_id()); + logger().trace("{} do_send() got message -- {}", conn, *msg); + if (get_io_state() != io_state_t::drop) { + out_pending_msgs.push_back(std::move(msg)); + notify_out_dispatch(); + } + return seastar::now(); +} + +seastar::future<> IOHandler::send_keepalive() +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send_keepalive(); + } else { + logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id()); + return seastar::smp::submit_to( + get_shard_id(), [this] { + return send_keepalive_redirected(); + }); + } +} + +seastar::future<> IOHandler::send_keepalive_redirected() +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send_keepalive(); + } else { + logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id()); + return seastar::smp::submit_to( + get_shard_id(), [this] { + return send_keepalive_redirected(); + }); + } +} + +seastar::future<> IOHandler::do_send_keepalive() +{ + assert(seastar::this_shard_id() == get_shard_id()); + logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive); + if (!need_keepalive) { + need_keepalive = true; + notify_out_dispatch(); + } + return seastar::now(); +} + +void IOHandler::mark_down() +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() != io_state_t::none); + need_dispatch_reset = false; + if (get_io_state() == io_state_t::drop) { + return; + } + + auto cc_seq = crosscore.prepare_submit(); + logger().info("{} mark_down() at {}, send {} notify_mark_down()", + conn, io_stat_printer{*this}, cc_seq); + do_set_io_state(io_state_t::drop); + shard_states->dispatch_in_background( + "notify_mark_down", conn, [this, cc_seq] { + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq] { + return handshake_listener->notify_mark_down(cc_seq); + }); + }); +} + +void IOHandler::print_io_stat(std::ostream &out) const +{ + assert(seastar::this_shard_id() == get_shard_id()); + out << "io_stat(" + << "io_state=" << fmt::format("{}", get_io_state()) + << ", in_seq=" << in_seq + << ", out_seq=" << out_seq + << ", out_pending_msgs_size=" << out_pending_msgs.size() + << ", out_sent_msgs_size=" << out_sent_msgs.size() + << ", need_ack=" << (ack_left > 0) + << ", need_keepalive=" << need_keepalive + << ", need_keepalive_ack=" << bool(next_keepalive_ack) + << ")"; +} + +void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa) +{ + assert(fa != nullptr); + ceph_assert_always(frame_assembler == nullptr); + frame_assembler = std::move(fa); + ceph_assert_always( + frame_assembler->get_shard_id() == get_shard_id()); + // should have been set through dispatch_accept/connect() + ceph_assert_always( + frame_assembler->get_socket_shard_id() == get_shard_id()); + ceph_assert_always(frame_assembler->is_socket_valid()); +} + +void IOHandler::do_set_io_state( + io_state_t new_state, + std::optional<crosscore_t::seq_t> cc_seq, + FrameAssemblerV2Ref fa, + bool set_notify_out) +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + auto prv_state = get_io_state(); + logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, " + "fa={}, set_notify_out={}, at {}", + conn, + cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "", + prv_state, new_state, + fa ? "present" : "N/A", set_notify_out, + io_stat_printer{*this}); + ceph_assert_always(!( + (new_state == io_state_t::none && prv_state != io_state_t::none) || + (new_state == io_state_t::open && prv_state == io_state_t::open) + )); + + if (prv_state == io_state_t::drop) { + // only possible due to a racing mark_down() from user + if (new_state == io_state_t::open) { + assign_frame_assembler(std::move(fa)); + frame_assembler->shutdown_socket<false>(nullptr); + } else { + assert(fa == nullptr); + } + return; + } + + bool dispatch_in = false; + if (new_state == io_state_t::open) { + // to open + ceph_assert_always(protocol_is_connected == true); + assign_frame_assembler(std::move(fa)); + dispatch_in = true; + } else if (prv_state == io_state_t::open) { + // from open + ceph_assert_always(protocol_is_connected == true); + protocol_is_connected = false; + assert(fa == nullptr); + ceph_assert_always(frame_assembler->is_socket_valid()); + frame_assembler->shutdown_socket<false>(nullptr); + } else { + assert(fa == nullptr); + } + + if (new_state == io_state_t::delay) { + need_notify_out = set_notify_out; + if (need_notify_out) { + maybe_notify_out_dispatch(); + } + } else { + assert(set_notify_out == false); + need_notify_out = false; + } + + // FIXME: simplify and drop the prv_state == new_state case + if (prv_state != new_state) { + shard_states->set_io_state(new_state); + } + + /* + * not atomic below + */ + + if (dispatch_in) { + do_in_dispatch(); + } +} + +seastar::future<> IOHandler::set_io_state( + crosscore_t::seq_t cc_seq, + io_state_t new_state, + FrameAssemblerV2Ref fa, + bool set_notify_out) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} set_io_state(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_state, + fa=std::move(fa), set_notify_out]() mutable { + return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out); + }); + } + + do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out); + return seastar::now(); +} + +seastar::future<IOHandler::exit_dispatching_ret> +IOHandler::wait_io_exit_dispatching( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return wait_io_exit_dispatching(cc_seq); + }); + } + + logger().debug("{} got {} wait_io_exit_dispatching()", + conn, cc_seq); + ceph_assert_always(get_io_state() != io_state_t::open); + ceph_assert_always(frame_assembler != nullptr); + ceph_assert_always(!frame_assembler->is_socket_valid()); + return seastar::futurize_invoke([this] { + // cannot be running in parallel with to_new_sid() + if (maybe_dropped_sid.has_value()) { + ceph_assert_always(get_io_state() == io_state_t::drop); + assert(shard_states->assert_closed_and_exit()); + auto prv_sid = *maybe_dropped_sid; + return seastar::smp::submit_to(prv_sid, [this] { + logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn); + assert(maybe_prv_shard_states != nullptr); + return maybe_prv_shard_states->wait_io_exit_dispatching(); + }); + } else { + return shard_states->wait_io_exit_dispatching(); + } + }).then([this] { + logger().debug("{} finish wait_io_exit_dispatching at {}", + conn, io_stat_printer{*this}); + ceph_assert_always(frame_assembler != nullptr); + ceph_assert_always(!frame_assembler->is_socket_valid()); + frame_assembler->set_shard_id(conn.get_messenger_shard_id()); + return exit_dispatching_ret{ + std::move(frame_assembler), + get_states()}; + }); +} + +seastar::future<> IOHandler::reset_session( + crosscore_t::seq_t cc_seq, + bool full) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} reset_session(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, full] { + return reset_session(cc_seq, full); + }); + } + + logger().debug("{} got {} reset_session({})", + conn, cc_seq, full); + assert(get_io_state() != io_state_t::open); + reset_in(); + if (full) { + reset_out(); + dispatch_remote_reset(); + } + return seastar::now(); +} + +seastar::future<> IOHandler::reset_peer_state( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} reset_peer_state(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return reset_peer_state(cc_seq); + }); + } + + logger().debug("{} got {} reset_peer_state()", + conn, cc_seq); + assert(get_io_state() != io_state_t::open); + reset_in(); + do_requeue_out_sent_up_to(0); + discard_out_sent(); + return seastar::now(); +} + +seastar::future<> IOHandler::requeue_out_sent( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} requeue_out_sent(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return requeue_out_sent(cc_seq); + }); + } + + logger().debug("{} got {} requeue_out_sent()", + conn, cc_seq); + do_requeue_out_sent(); + return seastar::now(); +} + +void IOHandler::do_requeue_out_sent() +{ + assert(get_io_state() != io_state_t::open); + if (out_sent_msgs.empty()) { + return; + } + + out_seq -= out_sent_msgs.size(); + logger().debug("{} requeue {} items, revert out_seq to {}", + conn, out_sent_msgs.size(), out_seq); + for (MessageFRef& msg : out_sent_msgs) { + msg->clear_payload(); + msg->set_seq(0); + } + out_pending_msgs.insert( + out_pending_msgs.begin(), + std::make_move_iterator(out_sent_msgs.begin()), + std::make_move_iterator(out_sent_msgs.end())); + out_sent_msgs.clear(); + maybe_notify_out_dispatch(); +} + +seastar::future<> IOHandler::requeue_out_sent_up_to( + crosscore_t::seq_t cc_seq, + seq_num_t msg_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, msg_seq] { + return requeue_out_sent_up_to(cc_seq, msg_seq); + }); + } + + logger().debug("{} got {} requeue_out_sent_up_to({})", + conn, cc_seq, msg_seq); + do_requeue_out_sent_up_to(msg_seq); + return seastar::now(); +} + +void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq) +{ + assert(get_io_state() != io_state_t::open); + if (out_sent_msgs.empty() && out_pending_msgs.empty()) { + logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", + conn, out_seq, seq); + out_seq = seq; + return; + } + logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})", + conn, seq, out_sent_msgs.size(), out_seq); + while (!out_sent_msgs.empty()) { + auto cur_seq = out_sent_msgs.front()->get_seq(); + if (cur_seq == 0 || cur_seq > seq) { + break; + } else { + out_sent_msgs.pop_front(); + } + } + do_requeue_out_sent(); +} + +void IOHandler::reset_in() +{ + assert(get_io_state() != io_state_t::open); + in_seq = 0; +} + +void IOHandler::reset_out() +{ + assert(get_io_state() != io_state_t::open); + discard_out_sent(); + out_pending_msgs.clear(); + need_keepalive = false; + next_keepalive_ack = std::nullopt; + ack_left = 0; +} + +void IOHandler::discard_out_sent() +{ + assert(get_io_state() != io_state_t::open); + out_seq = 0; + out_sent_msgs.clear(); +} + +seastar::future<> +IOHandler::dispatch_accept( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef conn_fref, + bool is_replace) +{ + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); +} + +seastar::future<> +IOHandler::dispatch_connect( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef conn_fref) +{ + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt); +} + +seastar::future<> +IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid) +{ + assert(seastar::this_shard_id() == get_shard_id()); + return seastar::smp::submit_to(prv_sid, [this] { + logger().debug("{} got cleanup_prv_shard()", conn); + assert(maybe_prv_shard_states != nullptr); + auto ref_prv_states = std::move(maybe_prv_shard_states); + auto &prv_states = *ref_prv_states; + return prv_states.close( + ).then([ref_prv_states=std::move(ref_prv_states)] { + ceph_assert_always(ref_prv_states->assert_closed_and_exit()); + }); + }).then([this] { + ceph_assert_always(maybe_prv_shard_states == nullptr); + }); +} + +seastar::future<> +IOHandler::to_new_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef conn_fref, + std::optional<bool> is_replace) +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} to_new_sid(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_sid, is_replace, + conn_fref=std::move(conn_fref)]() mutable { + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); + }); + } + + bool is_accept_or_connect = is_replace.has_value(); + logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}", + conn, cc_seq, new_sid, + fmt::format("{}", + is_accept_or_connect ? + (*is_replace ? "accept(replace)" : "accept(!replace)") : + "connect"), + io_stat_printer{*this}); + auto next_cc_seq = ++cc_seq; + + if (get_io_state() != io_state_t::drop) { + ceph_assert_always(conn_ref); + if (new_sid != seastar::this_shard_id()) { + dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect); + // user can make changes + } + } else { + // it is possible that both io_handler and protocolv2 are + // trying to close each other from different cores simultaneously. + assert(!protocol_is_connected); + } + + if (get_io_state() != io_state_t::drop) { + if (is_accept_or_connect) { + // protocol_is_connected can be from true to true here if the replacing is + // happening to a connected connection. + } else { + ceph_assert_always(protocol_is_connected == false); + } + protocol_is_connected = true; + } else { + assert(!protocol_is_connected); + } + + bool is_dropped = false; + if (get_io_state() == io_state_t::drop) { + is_dropped = true; + } + ceph_assert_always(get_io_state() != io_state_t::open); + + // apply the switching atomically + ceph_assert_always(conn_ref); + conn_ref.reset(); + auto prv_sid = get_shard_id(); + ceph_assert_always(maybe_prv_shard_states == nullptr); + maybe_prv_shard_states = std::move(shard_states); + shard_states = shard_states_t::create_from_previous( + *maybe_prv_shard_states, new_sid); + assert(new_sid == get_shard_id()); + + return seastar::smp::submit_to(new_sid, + [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable { + logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}", + conn, next_cc_seq, prv_sid, is_dropped, + fmt::format("{}", + is_replace.has_value() ? + (*is_replace ? "accept(replace)" : "accept(!replace)") : + "connect"), + io_stat_printer{*this}); + + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() != io_state_t::open); + ceph_assert_always(!maybe_dropped_sid.has_value()); + ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq)); + + if (is_dropped) { + ceph_assert_always(get_io_state() == io_state_t::drop); + ceph_assert_always(shard_states->assert_closed_and_exit()); + maybe_dropped_sid = prv_sid; + // cleanup_prv_shard() will be done in a follow-up close_io() + } else { + // possible at io_state_t::drop + + // previous shard is not cleaned, + // but close_io() is responsible to clean up the current shard, + // so cleanup the previous shard here. + shard_states->dispatch_in_background( + "cleanup_prv_sid", conn, [this, prv_sid] { + return cleanup_prv_shard(prv_sid); + }); + maybe_notify_out_dispatch(); + } + + ceph_assert_always(!conn_ref); + // assign even if already dropping + conn_ref = make_local_shared_foreign(std::move(conn_fref)); + + if (get_io_state() != io_state_t::drop) { + if (is_replace.has_value()) { + dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace); + } else { + dispatchers.ms_handle_connect(conn_ref, prv_sid); + } + // user can make changes + } + }); +} + +seastar::future<> IOHandler::set_accepted_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id sid, + ConnectionFRef conn_fref) +{ + assert(seastar::this_shard_id() == get_shard_id()); + assert(get_io_state() == io_state_t::none); + ceph_assert_always(conn_ref); + conn_ref.reset(); + assert(maybe_prv_shard_states == nullptr); + shard_states.reset(); + shard_states = shard_states_t::create(sid, io_state_t::none); + return seastar::smp::submit_to(sid, + [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable { + // must be the first to proceed + ceph_assert_always(crosscore.proceed_or_wait(cc_seq)); + + logger().debug("{} set accepted sid", conn); + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() == io_state_t::none); + assert(maybe_prv_shard_states == nullptr); + ceph_assert_always(!conn_ref); + conn_ref = make_local_shared_foreign(std::move(conn_fref)); + }); +} + +void IOHandler::dispatch_reset(bool is_replace) +{ + ceph_assert_always(get_io_state() == io_state_t::drop); + if (!need_dispatch_reset) { + return; + } + need_dispatch_reset = false; + ceph_assert_always(conn_ref); + + dispatchers.ms_handle_reset(conn_ref, is_replace); + // user can make changes +} + +void IOHandler::dispatch_remote_reset() +{ + if (get_io_state() == io_state_t::drop) { + return; + } + ceph_assert_always(conn_ref); + + dispatchers.ms_handle_remote_reset(conn_ref); + // user can make changes +} + +void IOHandler::ack_out_sent(seq_num_t seq) +{ + if (conn.policy.lossy) { // lossy connections don't keep sent messages + return; + } + while (!out_sent_msgs.empty() && + out_sent_msgs.front()->get_seq() <= seq) { + logger().trace("{} got ack seq {} >= {}, pop {}", + conn, seq, out_sent_msgs.front()->get_seq(), + *out_sent_msgs.front()); + out_sent_msgs.pop_front(); + } +} + +seastar::future<> +IOHandler::do_out_dispatch(shard_states_t &ctx) +{ + return seastar::repeat([this, &ctx] { + switch (ctx.get_io_state()) { + case io_state_t::open: { + if (unlikely(!is_out_queued())) { + // try exit open dispatching + return frame_assembler->flush<false>( + ).then([this, &ctx] { + if (ctx.get_io_state() != io_state_t::open || is_out_queued()) { + return seastar::make_ready_future<stop_t>(stop_t::no); + } + // still nothing pending to send after flush, + // open dispatching can ONLY stop now + ctx.exit_out_dispatching("exit-open", conn); + return seastar::make_ready_future<stop_t>(stop_t::yes); + }); + } + + auto require_keepalive = need_keepalive; + need_keepalive = false; + auto maybe_keepalive_ack = next_keepalive_ack; + next_keepalive_ack = std::nullopt; + auto to_ack = ack_left; + assert(to_ack == 0 || in_seq > 0); + ack_left = 0; +#ifdef UNIT_TESTS_BUILT + auto ret = sweep_out_pending_msgs_to_sent( + require_keepalive, maybe_keepalive_ack, to_ack > 0); + return frame_assembler->intercept_frames(ret.tags, true + ).then([this, bl=std::move(ret.bl)]() mutable { + return frame_assembler->write<false>(std::move(bl)); + } +#else + auto bl = sweep_out_pending_msgs_to_sent( + require_keepalive, maybe_keepalive_ack, to_ack > 0); + return frame_assembler->write<false>(std::move(bl) +#endif + ).then([this, &ctx] { + if (ctx.get_io_state() != io_state_t::open) { + return frame_assembler->flush<false>( + ).then([] { + return seastar::make_ready_future<stop_t>(stop_t::no); + }); + } + + // FIXME: may leak a flush if state is changed after return and before + // the next repeat body. + return seastar::make_ready_future<stop_t>(stop_t::no); + }); + } + case io_state_t::delay: + // delay out dispatching until open + ctx.notify_out_dispatching_stopped("delay...", conn); + return ctx.wait_state_change( + ).then([] { return stop_t::no; }); + case io_state_t::drop: + ctx.exit_out_dispatching("dropped", conn); + return seastar::make_ready_future<stop_t>(stop_t::yes); + case io_state_t::switched: + ctx.exit_out_dispatching("switched", conn); + return seastar::make_ready_future<stop_t>(stop_t::yes); + default: + ceph_abort("impossible"); + } + }).handle_exception_type([this, &ctx](const std::system_error& e) { + auto io_state = ctx.get_io_state(); + if (e.code() != std::errc::broken_pipe && + e.code() != std::errc::connection_reset && + e.code() != error::negotiation_failure) { + logger().error("{} do_out_dispatch(): unexpected error at {} -- {}", + conn, io_state, e.what()); + ceph_abort(); + } + + if (io_state == io_state_t::open) { + auto cc_seq = crosscore.prepare_submit(); + logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, " + "send {} notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e.what(), cc_seq); + std::exception_ptr eptr; + try { + throw e; + } catch(...) { + eptr = std::current_exception(); + } + do_set_io_state(io_state_t::delay); + shard_states->dispatch_in_background( + "notify_out_fault(out)", conn, [this, cc_seq, eptr] { + auto states = get_states(); + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { + return handshake_listener->notify_out_fault( + cc_seq, "do_out_dispatch", eptr, states); + }); + }); + } else { + if (io_state != io_state_t::switched) { + logger().info("{} do_out_dispatch(): fault at {}, {} -- {}", + conn, io_state, io_stat_printer{*this}, e.what()); + } else { + logger().info("{} do_out_dispatch(): fault at {} -- {}", + conn, io_state, e.what()); + } + } + + return do_out_dispatch(ctx); + }); +} + +void IOHandler::maybe_notify_out_dispatch() +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + if (is_out_queued()) { + notify_out_dispatch(); + } +} + +void IOHandler::notify_out_dispatch() +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + assert(is_out_queued()); + if (need_notify_out) { + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} notify_out()", + conn, cc_seq); + shard_states->dispatch_in_background( + "notify_out", conn, [this, cc_seq] { + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq] { + return handshake_listener->notify_out(cc_seq); + }); + }); + } + if (shard_states->try_enter_out_dispatching()) { + shard_states->dispatch_in_background( + "do_out_dispatch", conn, [this] { + return do_out_dispatch(*shard_states); + }); + } +} + +seastar::future<> +IOHandler::read_message( + shard_states_t &ctx, + utime_t throttle_stamp, + std::size_t msg_size) +{ + return frame_assembler->read_frame_payload<false>( + ).then([this, throttle_stamp, msg_size, &ctx](auto payload) { + if (unlikely(ctx.get_io_state() != io_state_t::open)) { + logger().debug("{} triggered {} during read_message()", + conn, ctx.get_io_state()); + abort_protocol(); + } + + utime_t recv_stamp{seastar::lowres_system_clock::now()}; + + // we need to get the size before std::moving segments data + auto msg_frame = MessageFrame::Decode(*payload); + // XXX: paranoid copy just to avoid oops + ceph_msg_header2 current_header = msg_frame.header(); + + logger().trace("{} got {} + {} + {} byte message," + " envelope type={} src={} off={} seq={}", + conn, + msg_frame.front_len(), + msg_frame.middle_len(), + msg_frame.data_len(), + current_header.type, + conn.get_peer_name(), + current_header.data_off, + current_header.seq); + + ceph_msg_header header{current_header.seq, + current_header.tid, + current_header.type, + current_header.priority, + current_header.version, + ceph_le32(msg_frame.front_len()), + ceph_le32(msg_frame.middle_len()), + ceph_le32(msg_frame.data_len()), + current_header.data_off, + conn.get_peer_name(), + current_header.compat_version, + current_header.reserved, + ceph_le32(0)}; + ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), + ceph_le32(0), ceph_le64(0), current_header.flags}; + + Message *message = decode_message(nullptr, 0, header, footer, + msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr); + if (!message) { + logger().warn("{} decode message failed", conn); + abort_in_fault(); + } + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(msg_size); + + message->set_throttle_stamp(throttle_stamp); + message->set_recv_stamp(recv_stamp); + message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()}); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the + // client side queueing because messages can't be renumbered, but the (kernel) + // client will occasionally pull a message out of the sent queue to send + // elsewhere. in that case it doesn't matter if we "got" it or not. + uint64_t cur_seq = in_seq; + if (message->get_seq() <= cur_seq) { + logger().error("{} got old message {} <= {} {}, discarding", + conn, message->get_seq(), cur_seq, *message); + if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && + local_conf()->ms_die_on_old_message) { + ceph_assert(0 == "old msgs despite reconnect_seq feature"); + } + return seastar::now(); + } else if (message->get_seq() > cur_seq + 1) { + logger().error("{} missed message? skipped from seq {} to {}", + conn, cur_seq, message->get_seq()); + if (local_conf()->ms_die_on_skipped_message) { + ceph_assert(0 == "skipped incoming seq"); + } + } + + // note last received message. + in_seq = message->get_seq(); + if (conn.policy.lossy) { + logger().debug("{} <== #{} === {} ({})", + conn, + message->get_seq(), + *message, + message->get_type()); + } else { + logger().debug("{} <== #{},{} === {} ({})", + conn, + message->get_seq(), + current_header.ack_seq, + *message, + message->get_type()); + } + + // notify ack + if (!conn.policy.lossy) { + ++ack_left; + notify_out_dispatch(); + } + + ack_out_sent(current_header.ack_seq); + + // TODO: change MessageRef with seastar::shared_ptr + auto msg_ref = MessageRef{message, false}; + assert(ctx.get_io_state() == io_state_t::open); + assert(get_io_state() == io_state_t::open); + ceph_assert_always(conn_ref); + + // throttle the reading process by the returned future + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); + // user can make changes + }); +} + +void IOHandler::do_in_dispatch() +{ + shard_states->enter_in_dispatching(); + shard_states->dispatch_in_background( + "do_in_dispatch", conn, [this, &ctx=*shard_states] { + return seastar::keep_doing([this, &ctx] { + return frame_assembler->read_main_preamble<false>( + ).then([this, &ctx](auto ret) { + switch (ret.tag) { + case Tag::MESSAGE: { + size_t msg_size = get_msg_size(*ret.rx_frame_asm); + return seastar::futurize_invoke([this] { + // throttle_message() logic + if (!conn.policy.throttler_messages) { + return seastar::now(); + } + // TODO: message throttler + ceph_abort("TODO"); + return seastar::now(); + }).then([this, msg_size] { + // throttle_bytes() logic + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + if (!msg_size) { + return seastar::now(); + } + logger().trace("{} wants {} bytes from policy throttler {}/{}", + conn, msg_size, + conn.policy.throttler_bytes->get_current(), + conn.policy.throttler_bytes->get_max()); + return conn.policy.throttler_bytes->get(msg_size); + }).then([this, msg_size, &ctx] { + // TODO: throttle_dispatch_queue() logic + utime_t throttle_stamp{seastar::lowres_system_clock::now()}; + return read_message(ctx, throttle_stamp, msg_size); + }); + } + case Tag::ACK: + return frame_assembler->read_frame_payload<false>( + ).then([this](auto payload) { + // handle_message_ack() logic + auto ack = AckFrame::Decode(payload->back()); + logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); + ack_out_sent(ack.seq()); + }); + case Tag::KEEPALIVE2: + return frame_assembler->read_frame_payload<false>( + ).then([this](auto payload) { + // handle_keepalive2() logic + auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); + logger().debug("{} GOT KeepAliveFrame: timestamp={}", + conn, keepalive_frame.timestamp()); + // notify keepalive ack + next_keepalive_ack = keepalive_frame.timestamp(); + if (seastar::this_shard_id() == get_shard_id()) { + notify_out_dispatch(); + } + + last_keepalive = seastar::lowres_system_clock::now(); + }); + case Tag::KEEPALIVE2_ACK: + return frame_assembler->read_frame_payload<false>( + ).then([this](auto payload) { + // handle_keepalive2_ack() logic + auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back()); + auto _last_keepalive_ack = + seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}; + set_last_keepalive_ack(_last_keepalive_ack); + logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", + conn, _last_keepalive_ack); + }); + default: { + logger().warn("{} do_in_dispatch() received unexpected tag: {}", + conn, static_cast<uint32_t>(ret.tag)); + abort_in_fault(); + } + } + }); + }).handle_exception([this, &ctx](std::exception_ptr eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + + auto io_state = ctx.get_io_state(); + if (io_state == io_state_t::open) { + auto cc_seq = crosscore.prepare_submit(); + logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, " + "send {} notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e_what, cc_seq); + do_set_io_state(io_state_t::delay); + shard_states->dispatch_in_background( + "notify_out_fault(in)", conn, [this, cc_seq, eptr] { + auto states = get_states(); + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { + return handshake_listener->notify_out_fault( + cc_seq, "do_in_dispatch", eptr, states); + }); + }); + } else { + if (io_state != io_state_t::switched) { + logger().info("{} do_in_dispatch(): fault at {}, {} -- {}", + conn, io_state, io_stat_printer{*this}, e_what); + } else { + logger().info("{} do_in_dispatch(): fault at {} -- {}", + conn, io_state, e_what); + } + } + }).finally([&ctx] { + ctx.exit_in_dispatching(); + }); + }); +} + +seastar::future<> +IOHandler::close_io( + crosscore_t::seq_t cc_seq, + bool is_dispatch_reset, + bool is_replace) +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} close_io(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, is_dispatch_reset, is_replace] { + return close_io(cc_seq, is_dispatch_reset, is_replace); + }); + } + + logger().debug("{} got {} close_io(reset={}, replace={})", + conn, cc_seq, is_dispatch_reset, is_replace); + ceph_assert_always(get_io_state() == io_state_t::drop); + + if (is_dispatch_reset) { + dispatch_reset(is_replace); + } + + ceph_assert_always(conn_ref); + conn_ref.reset(); + + // cannot be running in parallel with to_new_sid() + if (maybe_dropped_sid.has_value()) { + assert(shard_states->assert_closed_and_exit()); + auto prv_sid = *maybe_dropped_sid; + return cleanup_prv_shard(prv_sid); + } else { + return shard_states->close( + ).then([this] { + assert(shard_states->assert_closed_and_exit()); + }); + } +} + +/* + * IOHandler::shard_states_t + */ + +void +IOHandler::shard_states_t::notify_out_dispatching_stopped( + const char *what, SocketConnection &conn) +{ + assert(seastar::this_shard_id() == sid); + if (unlikely(out_exit_dispatching.has_value())) { + out_exit_dispatching->set_value(); + out_exit_dispatching = std::nullopt; + logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching", + conn, what, io_state); + } else { + if (unlikely(io_state != io_state_t::open)) { + logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching", + conn, what, io_state); + } + } +} + +seastar::future<> +IOHandler::shard_states_t::wait_io_exit_dispatching() +{ + assert(seastar::this_shard_id() == sid); + assert(io_state != io_state_t::open); + assert(!gate.is_closed()); + return seastar::when_all( + [this] { + if (out_exit_dispatching) { + return out_exit_dispatching->get_future(); + } else { + return seastar::now(); + } + }(), + [this] { + if (in_exit_dispatching) { + return in_exit_dispatching->get_future(); + } else { + return seastar::now(); + } + }() + ).discard_result(); +} + +IOHandler::shard_states_ref_t +IOHandler::shard_states_t::create_from_previous( + shard_states_t &prv_states, + seastar::shard_id new_sid) +{ + auto io_state = prv_states.io_state; + assert(io_state != io_state_t::open); + auto ret = shard_states_t::create(new_sid, io_state); + if (io_state == io_state_t::drop) { + // the new gate should not never be used + auto fut = ret->gate.close(); + ceph_assert_always(fut.available()); + } + prv_states.set_io_state(io_state_t::switched); + return ret; +} + +} // namespace crimson::net diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h new file mode 100644 index 000000000..f53c2ba64 --- /dev/null +++ b/src/crimson/net/io_handler.h @@ -0,0 +1,623 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <vector> + +#include <seastar/core/shared_future.hh> +#include <seastar/util/later.hh> + +#include "crimson/common/gated.h" +#include "Fwd.h" +#include "SocketConnection.h" +#include "FrameAssemblerV2.h" + +namespace crimson::net { + +/** + * crosscore_t + * + * To preserve the event order across cores. + */ +class crosscore_t { +public: + using seq_t = uint64_t; + + crosscore_t() = default; + ~crosscore_t() = default; + + seq_t get_in_seq() const { + return in_seq; + } + + seq_t prepare_submit() { + ++out_seq; + return out_seq; + } + + bool proceed_or_wait(seq_t seq) { + if (seq == in_seq + 1) { + ++in_seq; + if (unlikely(in_pr_wait.has_value())) { + in_pr_wait->set_value(); + in_pr_wait = std::nullopt; + } + return true; + } else { + return false; + } + } + + seastar::future<> wait(seq_t seq) { + assert(seq != in_seq + 1); + if (!in_pr_wait.has_value()) { + in_pr_wait = seastar::shared_promise<>(); + } + return in_pr_wait->get_shared_future(); + } + +private: + seq_t out_seq = 0; + seq_t in_seq = 0; + std::optional<seastar::shared_promise<>> in_pr_wait; +}; + +/** + * io_handler_state + * + * It is required to populate the states from IOHandler to ProtocolV2 + * asynchronously. + */ +struct io_handler_state { + seq_num_t in_seq; + bool is_out_queued; + bool has_out_sent; + + bool is_out_queued_or_sent() const { + return is_out_queued || has_out_sent; + } + + /* + * should be consistent with the accroding interfaces in IOHandler + */ + + void reset_session(bool full) { + in_seq = 0; + if (full) { + is_out_queued = false; + has_out_sent = false; + } + } + + void reset_peer_state() { + in_seq = 0; + is_out_queued = is_out_queued_or_sent(); + has_out_sent = false; + } + + void requeue_out_sent_up_to() { + // noop since the information is insufficient + } + + void requeue_out_sent() { + if (has_out_sent) { + has_out_sent = false; + is_out_queued = true; + } + } +}; + +/** + * HandshakeListener + * + * The interface class for IOHandler to notify the ProtocolV2. + * + * The notifications may be cross-core and must be sent to + * SocketConnection::get_messenger_shard_id() + */ +class HandshakeListener { +public: + virtual ~HandshakeListener() = default; + + HandshakeListener(const HandshakeListener&) = delete; + HandshakeListener(HandshakeListener &&) = delete; + HandshakeListener &operator=(const HandshakeListener &) = delete; + HandshakeListener &operator=(HandshakeListener &&) = delete; + + virtual seastar::future<> notify_out( + crosscore_t::seq_t cc_seq) = 0; + + virtual seastar::future<> notify_out_fault( + crosscore_t::seq_t cc_seq, + const char *where, + std::exception_ptr, + io_handler_state) = 0; + + virtual seastar::future<> notify_mark_down( + crosscore_t::seq_t cc_seq) = 0; + +protected: + HandshakeListener() = default; +}; + +/** + * IOHandler + * + * Implements the message read and write paths after the handshake, and also be + * responsible to dispatch events. It is supposed to be working on the same + * core with the underlying socket and the FrameAssemblerV2 class. + */ +class IOHandler final : public ConnectionHandler { +public: + IOHandler(ChainedDispatchers &, + SocketConnection &); + + ~IOHandler() final; + + IOHandler(const IOHandler &) = delete; + IOHandler(IOHandler &&) = delete; + IOHandler &operator=(const IOHandler &) = delete; + IOHandler &operator=(IOHandler &&) = delete; + +/* + * as ConnectionHandler + */ +public: + seastar::shard_id get_shard_id() const final { + return shard_states->get_shard_id(); + } + + bool is_connected() const final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + return protocol_is_connected; + } + + seastar::future<> send(MessageFRef msg) final; + + seastar::future<> send_keepalive() final; + + clock_t::time_point get_last_keepalive() const final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + return last_keepalive; + } + + clock_t::time_point get_last_keepalive_ack() const final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + return last_keepalive_ack; + } + + void set_last_keepalive_ack(clock_t::time_point when) final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + last_keepalive_ack = when; + } + + void mark_down() final; + +/* + * as IOHandler to be called by ProtocolV2 handshake + * + * The calls may be cross-core and asynchronous + */ +public: + /* + * should not be called cross-core + */ + + void set_handshake_listener(HandshakeListener &hl) { + assert(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(handshake_listener == nullptr); + handshake_listener = &hl; + } + + io_handler_state get_states() const { + // might be called from prv_sid during wait_io_exit_dispatching() + return {in_seq, is_out_queued(), has_out_sent()}; + } + + struct io_stat_printer { + const IOHandler &io_handler; + }; + void print_io_stat(std::ostream &out) const; + + seastar::future<> set_accepted_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id sid, + ConnectionFRef conn_fref); + + /* + * may be called cross-core + */ + + seastar::future<> close_io( + crosscore_t::seq_t cc_seq, + bool is_dispatch_reset, + bool is_replace); + + /** + * io_state_t + * + * The io_state is changed with the protocol state, to control the + * io behavior accordingly. + */ + enum class io_state_t : uint8_t { + none, // no IO is possible as the connection is not available to the user yet. + delay, // IO is delayed until open. + open, // Dispatch In and Out concurrently. + drop, // Drop IO as the connection is closed. + switched // IO is switched to a different core + // (is moved to maybe_prv_shard_states) + }; + friend class fmt::formatter<io_state_t>; + + seastar::future<> set_io_state( + crosscore_t::seq_t cc_seq, + io_state_t new_state, + FrameAssemblerV2Ref fa, + bool set_notify_out); + + struct exit_dispatching_ret { + FrameAssemblerV2Ref frame_assembler; + io_handler_state io_states; + }; + seastar::future<exit_dispatching_ret> + wait_io_exit_dispatching( + crosscore_t::seq_t cc_seq); + + seastar::future<> reset_session( + crosscore_t::seq_t cc_seq, + bool full); + + seastar::future<> reset_peer_state( + crosscore_t::seq_t cc_seq); + + seastar::future<> requeue_out_sent_up_to( + crosscore_t::seq_t cc_seq, + seq_num_t msg_seq); + + seastar::future<> requeue_out_sent( + crosscore_t::seq_t cc_seq); + + seastar::future<> dispatch_accept( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef, + bool is_replace); + + seastar::future<> dispatch_connect( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef); + + private: + class shard_states_t; + using shard_states_ref_t = std::unique_ptr<shard_states_t>; + + class shard_states_t { + public: + shard_states_t(seastar::shard_id _sid, io_state_t state) + : sid{_sid}, io_state{state} {} + + seastar::shard_id get_shard_id() const { + return sid; + } + + io_state_t get_io_state() const { + assert(seastar::this_shard_id() == sid); + return io_state; + } + + void set_io_state(io_state_t new_state) { + assert(seastar::this_shard_id() == sid); + assert(io_state != new_state); + pr_io_state_changed.set_value(); + pr_io_state_changed = seastar::promise<>(); + if (io_state == io_state_t::open) { + // from open + if (out_dispatching) { + ceph_assert_always(!out_exit_dispatching.has_value()); + out_exit_dispatching = seastar::promise<>(); + } + } + io_state = new_state; + } + + seastar::future<> wait_state_change() { + assert(seastar::this_shard_id() == sid); + return pr_io_state_changed.get_future(); + } + + template <typename Func> + void dispatch_in_background( + const char *what, SocketConnection &who, Func &&func) { + assert(seastar::this_shard_id() == sid); + ceph_assert_always(!gate.is_closed()); + gate.dispatch_in_background(what, who, std::move(func)); + } + + void enter_in_dispatching() { + assert(seastar::this_shard_id() == sid); + assert(io_state == io_state_t::open); + ceph_assert_always(!in_exit_dispatching.has_value()); + in_exit_dispatching = seastar::promise<>(); + } + + void exit_in_dispatching() { + assert(seastar::this_shard_id() == sid); + assert(io_state != io_state_t::open); + ceph_assert_always(in_exit_dispatching.has_value()); + in_exit_dispatching->set_value(); + in_exit_dispatching = std::nullopt; + } + + bool try_enter_out_dispatching() { + assert(seastar::this_shard_id() == sid); + if (out_dispatching) { + // already dispatching out + return false; + } + switch (io_state) { + case io_state_t::open: + [[fallthrough]]; + case io_state_t::delay: + out_dispatching = true; + return true; + case io_state_t::drop: + [[fallthrough]]; + case io_state_t::switched: + // do not dispatch out + return false; + default: + ceph_abort("impossible"); + } + } + + void notify_out_dispatching_stopped( + const char *what, SocketConnection &conn); + + void exit_out_dispatching( + const char *what, SocketConnection &conn) { + assert(seastar::this_shard_id() == sid); + ceph_assert_always(out_dispatching); + out_dispatching = false; + notify_out_dispatching_stopped(what, conn); + } + + seastar::future<> wait_io_exit_dispatching(); + + seastar::future<> close() { + assert(seastar::this_shard_id() == sid); + assert(!gate.is_closed()); + return gate.close(); + } + + bool assert_closed_and_exit() const { + assert(seastar::this_shard_id() == sid); + if (gate.is_closed()) { + ceph_assert_always(io_state == io_state_t::drop || + io_state == io_state_t::switched); + ceph_assert_always(!out_dispatching); + ceph_assert_always(!out_exit_dispatching); + ceph_assert_always(!in_exit_dispatching); + return true; + } else { + return false; + } + } + + static shard_states_ref_t create( + seastar::shard_id sid, io_state_t state) { + return std::make_unique<shard_states_t>(sid, state); + } + + static shard_states_ref_t create_from_previous( + shard_states_t &prv_states, seastar::shard_id new_sid); + + private: + const seastar::shard_id sid; + io_state_t io_state; + + crimson::common::Gated gate; + seastar::promise<> pr_io_state_changed; + bool out_dispatching = false; + std::optional<seastar::promise<>> out_exit_dispatching; + std::optional<seastar::promise<>> in_exit_dispatching; + }; + + void do_set_io_state( + io_state_t new_state, + std::optional<crosscore_t::seq_t> cc_seq = std::nullopt, + FrameAssemblerV2Ref fa = nullptr, + bool set_notify_out = false); + + io_state_t get_io_state() const { + return shard_states->get_io_state(); + } + + void do_requeue_out_sent(); + + void do_requeue_out_sent_up_to(seq_num_t seq); + + void assign_frame_assembler(FrameAssemblerV2Ref); + + seastar::future<> send_redirected(MessageFRef msg); + + seastar::future<> do_send(MessageFRef msg); + + seastar::future<> send_keepalive_redirected(); + + seastar::future<> do_send_keepalive(); + + seastar::future<> to_new_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef, + std::optional<bool> is_replace); + + void dispatch_reset(bool is_replace); + + void dispatch_remote_reset(); + + bool is_out_queued() const { + return (!out_pending_msgs.empty() || + ack_left > 0 || + need_keepalive || + next_keepalive_ack.has_value()); + } + + bool has_out_sent() const { + return !out_sent_msgs.empty(); + } + + void reset_in(); + + void reset_out(); + + void discard_out_sent(); + + seastar::future<> do_out_dispatch(shard_states_t &ctx); + +#ifdef UNIT_TESTS_BUILT + struct sweep_ret { + ceph::bufferlist bl; + std::vector<ceph::msgr::v2::Tag> tags; + }; + sweep_ret +#else + ceph::bufferlist +#endif + sweep_out_pending_msgs_to_sent( + bool require_keepalive, + std::optional<utime_t> maybe_keepalive_ack, + bool require_ack); + + void maybe_notify_out_dispatch(); + + void notify_out_dispatch(); + + void ack_out_sent(seq_num_t seq); + + seastar::future<> read_message( + shard_states_t &ctx, + utime_t throttle_stamp, + std::size_t msg_size); + + void do_in_dispatch(); + + seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid); + +private: + shard_states_ref_t shard_states; + + crosscore_t crosscore; + + // drop was happening in the previous sid + std::optional<seastar::shard_id> maybe_dropped_sid; + + // the remaining states in the previous sid for cleanup, see to_new_sid() + shard_states_ref_t maybe_prv_shard_states; + + ChainedDispatchers &dispatchers; + + SocketConnection &conn; + + // core local reference for dispatching, valid until reset/close + ConnectionRef conn_ref; + + HandshakeListener *handshake_listener = nullptr; + + FrameAssemblerV2Ref frame_assembler; + + bool protocol_is_connected = false; + + bool need_dispatch_reset = true; + + /* + * out states for writing + */ + + /// the seq num of the last transmitted message + seq_num_t out_seq = 0; + + // messages to be resent after connection gets reset + std::deque<MessageFRef> out_pending_msgs; + + // messages sent, but not yet acked by peer + std::deque<MessageFRef> out_sent_msgs; + + bool need_keepalive = false; + + std::optional<utime_t> next_keepalive_ack = std::nullopt; + + uint64_t ack_left = 0; + + bool need_notify_out = false; + + /* + * in states for reading + */ + + /// the seq num of the last received message + seq_num_t in_seq = 0; + + clock_t::time_point last_keepalive; + + clock_t::time_point last_keepalive_ack; +}; + +inline std::ostream& operator<<( + std::ostream& out, IOHandler::io_stat_printer stat) { + stat.io_handler.print_io_stat(out); + return out; +} + +} // namespace crimson::net + +template <> +struct fmt::formatter<crimson::net::io_handler_state> { + constexpr auto parse(format_parse_context& ctx) { + return ctx.begin(); + } + + template <typename FormatContext> + auto format(crimson::net::io_handler_state state, FormatContext& ctx) { + return fmt::format_to( + ctx.out(), + "io(in_seq={}, is_out_queued={}, has_out_sent={})", + state.in_seq, + state.is_out_queued, + state.has_out_sent); + } +}; + +template <> +struct fmt::formatter<crimson::net::IOHandler::io_state_t> + : fmt::formatter<std::string_view> { + template <typename FormatContext> + auto format(crimson::net::IOHandler::io_state_t state, FormatContext& ctx) { + using enum crimson::net::IOHandler::io_state_t; + std::string_view name; + switch (state) { + case none: + name = "none"; + break; + case delay: + name = "delay"; + break; + case open: + name = "open"; + break; + case drop: + name = "drop"; + break; + case switched: + name = "switched"; + break; + } + return formatter<string_view>::format(name, ctx); + } +}; + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::net::IOHandler::io_stat_printer> : fmt::ostream_formatter {}; +#endif |