diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/net/io_handler.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net/io_handler.cc')
-rw-r--r-- | src/crimson/net/io_handler.cc | 1287 |
1 files changed, 1287 insertions, 0 deletions
diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc new file mode 100644 index 000000000..c414c48e1 --- /dev/null +++ b/src/crimson/net/io_handler.cc @@ -0,0 +1,1287 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "io_handler.h" + +#include "auth/Auth.h" + +#include "crimson/common/formatter.h" +#include "crimson/common/log.h" +#include "crimson/net/Errors.h" +#include "crimson/net/chained_dispatchers.h" +#include "crimson/net/SocketMessenger.h" +#include "msg/Message.h" +#include "msg/msg_fmt.h" + +using namespace ceph::msgr::v2; +using crimson::common::local_conf; + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +[[noreturn]] void abort_in_fault() { + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); +} + +[[noreturn]] void abort_protocol() { + throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); +} + +std::size_t get_msg_size(const FrameAssembler &rx_frame_asm) +{ + ceph_assert(rx_frame_asm.get_num_segments() > 0); + size_t sum = 0; + // we don't include SegmentIndex::Msg::HEADER. + for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { + sum += rx_frame_asm.get_segment_logical_len(idx); + } + return sum; +} + +} // namespace anonymous + +namespace crimson::net { + +IOHandler::IOHandler(ChainedDispatchers &dispatchers, + SocketConnection &conn) + : shard_states(shard_states_t::create( + seastar::this_shard_id(), io_state_t::none)), + dispatchers(dispatchers), + conn(conn), + conn_ref(conn.get_local_shared_foreign_from_this()) +{} + +IOHandler::~IOHandler() +{ + // close_io() must be finished + ceph_assert_always(maybe_prv_shard_states == nullptr); + // should be true in the according shard + // ceph_assert_always(shard_states->assert_closed_and_exit()); + assert(!conn_ref); +} + +#ifdef UNIT_TESTS_BUILT +IOHandler::sweep_ret +#else +ceph::bufferlist +#endif +IOHandler::sweep_out_pending_msgs_to_sent( + bool require_keepalive, + std::optional<utime_t> maybe_keepalive_ack, + bool require_ack) +{ + std::size_t num_msgs = out_pending_msgs.size(); + ceph::bufferlist bl; + +#ifdef UNIT_TESTS_BUILT + std::vector<Tag> tags; +#endif + + if (unlikely(require_keepalive)) { + auto keepalive_frame = KeepAliveFrame::Encode(); + bl.append(frame_assembler->get_buffer(keepalive_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = KeepAliveFrame::tag; + tags.push_back(tag); +#endif + } + + if (unlikely(maybe_keepalive_ack.has_value())) { + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); + bl.append(frame_assembler->get_buffer(keepalive_ack_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = KeepAliveFrameAck::tag; + tags.push_back(tag); +#endif + } + + if (require_ack && num_msgs == 0u) { + auto ack_frame = AckFrame::Encode(in_seq); + bl.append(frame_assembler->get_buffer(ack_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = AckFrame::tag; + tags.push_back(tag); +#endif + } + + std::for_each( + out_pending_msgs.begin(), + out_pending_msgs.begin()+num_msgs, + [this, &bl +#ifdef UNIT_TESTS_BUILT + , &tags +#endif + ](const MessageFRef& msg) { + // set priority + msg->get_header().src = conn.messenger.get_myname(); + + msg->encode(conn.features, 0); + + ceph_assert(!msg->get_seq() && "message already has seq"); + msg->set_seq(++out_seq); + + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + ceph_msg_header2 header2{header.seq, header.tid, + header.type, header.priority, + header.version, + ceph_le32(0), header.data_off, + ceph_le64(in_seq), + footer.flags, header.compat_version, + header.reserved}; + + auto message = MessageFrame::Encode(header2, + msg->get_payload(), msg->get_middle(), msg->get_data()); + logger().debug("{} --> #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + bl.append(frame_assembler->get_buffer(message)); +#ifdef UNIT_TESTS_BUILT + auto tag = MessageFrame::tag; + tags.push_back(tag); +#endif + }); + + if (!conn.policy.lossy) { + out_sent_msgs.insert( + out_sent_msgs.end(), + std::make_move_iterator(out_pending_msgs.begin()), + std::make_move_iterator(out_pending_msgs.end())); + } + out_pending_msgs.clear(); + +#ifdef UNIT_TESTS_BUILT + return sweep_ret{std::move(bl), tags}; +#else + return bl; +#endif +} + +seastar::future<> IOHandler::send(MessageFRef msg) +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send(std::move(msg)); + } else { + logger().trace("{} send() is directed to {} -- {}", + conn, get_shard_id(), *msg); + return seastar::smp::submit_to( + get_shard_id(), [this, msg=std::move(msg)]() mutable { + return send_redirected(std::move(msg)); + }); + } +} + +seastar::future<> IOHandler::send_redirected(MessageFRef msg) +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send(std::move(msg)); + } else { + logger().debug("{} send() is redirected to {} -- {}", + conn, get_shard_id(), *msg); + return seastar::smp::submit_to( + get_shard_id(), [this, msg=std::move(msg)]() mutable { + return send_redirected(std::move(msg)); + }); + } +} + +seastar::future<> IOHandler::do_send(MessageFRef msg) +{ + assert(seastar::this_shard_id() == get_shard_id()); + logger().trace("{} do_send() got message -- {}", conn, *msg); + if (get_io_state() != io_state_t::drop) { + out_pending_msgs.push_back(std::move(msg)); + notify_out_dispatch(); + } + return seastar::now(); +} + +seastar::future<> IOHandler::send_keepalive() +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send_keepalive(); + } else { + logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id()); + return seastar::smp::submit_to( + get_shard_id(), [this] { + return send_keepalive_redirected(); + }); + } +} + +seastar::future<> IOHandler::send_keepalive_redirected() +{ + // sid may be changed on-the-fly during the submission + if (seastar::this_shard_id() == get_shard_id()) { + return do_send_keepalive(); + } else { + logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id()); + return seastar::smp::submit_to( + get_shard_id(), [this] { + return send_keepalive_redirected(); + }); + } +} + +seastar::future<> IOHandler::do_send_keepalive() +{ + assert(seastar::this_shard_id() == get_shard_id()); + logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive); + if (!need_keepalive) { + need_keepalive = true; + notify_out_dispatch(); + } + return seastar::now(); +} + +void IOHandler::mark_down() +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() != io_state_t::none); + need_dispatch_reset = false; + if (get_io_state() == io_state_t::drop) { + return; + } + + auto cc_seq = crosscore.prepare_submit(); + logger().info("{} mark_down() at {}, send {} notify_mark_down()", + conn, io_stat_printer{*this}, cc_seq); + do_set_io_state(io_state_t::drop); + shard_states->dispatch_in_background( + "notify_mark_down", conn, [this, cc_seq] { + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq] { + return handshake_listener->notify_mark_down(cc_seq); + }); + }); +} + +void IOHandler::print_io_stat(std::ostream &out) const +{ + assert(seastar::this_shard_id() == get_shard_id()); + out << "io_stat(" + << "io_state=" << fmt::format("{}", get_io_state()) + << ", in_seq=" << in_seq + << ", out_seq=" << out_seq + << ", out_pending_msgs_size=" << out_pending_msgs.size() + << ", out_sent_msgs_size=" << out_sent_msgs.size() + << ", need_ack=" << (ack_left > 0) + << ", need_keepalive=" << need_keepalive + << ", need_keepalive_ack=" << bool(next_keepalive_ack) + << ")"; +} + +void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa) +{ + assert(fa != nullptr); + ceph_assert_always(frame_assembler == nullptr); + frame_assembler = std::move(fa); + ceph_assert_always( + frame_assembler->get_shard_id() == get_shard_id()); + // should have been set through dispatch_accept/connect() + ceph_assert_always( + frame_assembler->get_socket_shard_id() == get_shard_id()); + ceph_assert_always(frame_assembler->is_socket_valid()); +} + +void IOHandler::do_set_io_state( + io_state_t new_state, + std::optional<crosscore_t::seq_t> cc_seq, + FrameAssemblerV2Ref fa, + bool set_notify_out) +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + auto prv_state = get_io_state(); + logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, " + "fa={}, set_notify_out={}, at {}", + conn, + cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "", + prv_state, new_state, + fa ? "present" : "N/A", set_notify_out, + io_stat_printer{*this}); + ceph_assert_always(!( + (new_state == io_state_t::none && prv_state != io_state_t::none) || + (new_state == io_state_t::open && prv_state == io_state_t::open) + )); + + if (prv_state == io_state_t::drop) { + // only possible due to a racing mark_down() from user + if (new_state == io_state_t::open) { + assign_frame_assembler(std::move(fa)); + frame_assembler->shutdown_socket<false>(nullptr); + } else { + assert(fa == nullptr); + } + return; + } + + bool dispatch_in = false; + if (new_state == io_state_t::open) { + // to open + ceph_assert_always(protocol_is_connected == true); + assign_frame_assembler(std::move(fa)); + dispatch_in = true; + } else if (prv_state == io_state_t::open) { + // from open + ceph_assert_always(protocol_is_connected == true); + protocol_is_connected = false; + assert(fa == nullptr); + ceph_assert_always(frame_assembler->is_socket_valid()); + frame_assembler->shutdown_socket<false>(nullptr); + } else { + assert(fa == nullptr); + } + + if (new_state == io_state_t::delay) { + need_notify_out = set_notify_out; + if (need_notify_out) { + maybe_notify_out_dispatch(); + } + } else { + assert(set_notify_out == false); + need_notify_out = false; + } + + // FIXME: simplify and drop the prv_state == new_state case + if (prv_state != new_state) { + shard_states->set_io_state(new_state); + } + + /* + * not atomic below + */ + + if (dispatch_in) { + do_in_dispatch(); + } +} + +seastar::future<> IOHandler::set_io_state( + crosscore_t::seq_t cc_seq, + io_state_t new_state, + FrameAssemblerV2Ref fa, + bool set_notify_out) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} set_io_state(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_state, + fa=std::move(fa), set_notify_out]() mutable { + return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out); + }); + } + + do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out); + return seastar::now(); +} + +seastar::future<IOHandler::exit_dispatching_ret> +IOHandler::wait_io_exit_dispatching( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return wait_io_exit_dispatching(cc_seq); + }); + } + + logger().debug("{} got {} wait_io_exit_dispatching()", + conn, cc_seq); + ceph_assert_always(get_io_state() != io_state_t::open); + ceph_assert_always(frame_assembler != nullptr); + ceph_assert_always(!frame_assembler->is_socket_valid()); + return seastar::futurize_invoke([this] { + // cannot be running in parallel with to_new_sid() + if (maybe_dropped_sid.has_value()) { + ceph_assert_always(get_io_state() == io_state_t::drop); + assert(shard_states->assert_closed_and_exit()); + auto prv_sid = *maybe_dropped_sid; + return seastar::smp::submit_to(prv_sid, [this] { + logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn); + assert(maybe_prv_shard_states != nullptr); + return maybe_prv_shard_states->wait_io_exit_dispatching(); + }); + } else { + return shard_states->wait_io_exit_dispatching(); + } + }).then([this] { + logger().debug("{} finish wait_io_exit_dispatching at {}", + conn, io_stat_printer{*this}); + ceph_assert_always(frame_assembler != nullptr); + ceph_assert_always(!frame_assembler->is_socket_valid()); + frame_assembler->set_shard_id(conn.get_messenger_shard_id()); + return exit_dispatching_ret{ + std::move(frame_assembler), + get_states()}; + }); +} + +seastar::future<> IOHandler::reset_session( + crosscore_t::seq_t cc_seq, + bool full) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} reset_session(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, full] { + return reset_session(cc_seq, full); + }); + } + + logger().debug("{} got {} reset_session({})", + conn, cc_seq, full); + assert(get_io_state() != io_state_t::open); + reset_in(); + if (full) { + reset_out(); + dispatch_remote_reset(); + } + return seastar::now(); +} + +seastar::future<> IOHandler::reset_peer_state( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} reset_peer_state(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return reset_peer_state(cc_seq); + }); + } + + logger().debug("{} got {} reset_peer_state()", + conn, cc_seq); + assert(get_io_state() != io_state_t::open); + reset_in(); + do_requeue_out_sent_up_to(0); + discard_out_sent(); + return seastar::now(); +} + +seastar::future<> IOHandler::requeue_out_sent( + crosscore_t::seq_t cc_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} requeue_out_sent(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return requeue_out_sent(cc_seq); + }); + } + + logger().debug("{} got {} requeue_out_sent()", + conn, cc_seq); + do_requeue_out_sent(); + return seastar::now(); +} + +void IOHandler::do_requeue_out_sent() +{ + assert(get_io_state() != io_state_t::open); + if (out_sent_msgs.empty()) { + return; + } + + out_seq -= out_sent_msgs.size(); + logger().debug("{} requeue {} items, revert out_seq to {}", + conn, out_sent_msgs.size(), out_seq); + for (MessageFRef& msg : out_sent_msgs) { + msg->clear_payload(); + msg->set_seq(0); + } + out_pending_msgs.insert( + out_pending_msgs.begin(), + std::make_move_iterator(out_sent_msgs.begin()), + std::make_move_iterator(out_sent_msgs.end())); + out_sent_msgs.clear(); + maybe_notify_out_dispatch(); +} + +seastar::future<> IOHandler::requeue_out_sent_up_to( + crosscore_t::seq_t cc_seq, + seq_num_t msg_seq) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, msg_seq] { + return requeue_out_sent_up_to(cc_seq, msg_seq); + }); + } + + logger().debug("{} got {} requeue_out_sent_up_to({})", + conn, cc_seq, msg_seq); + do_requeue_out_sent_up_to(msg_seq); + return seastar::now(); +} + +void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq) +{ + assert(get_io_state() != io_state_t::open); + if (out_sent_msgs.empty() && out_pending_msgs.empty()) { + logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", + conn, out_seq, seq); + out_seq = seq; + return; + } + logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})", + conn, seq, out_sent_msgs.size(), out_seq); + while (!out_sent_msgs.empty()) { + auto cur_seq = out_sent_msgs.front()->get_seq(); + if (cur_seq == 0 || cur_seq > seq) { + break; + } else { + out_sent_msgs.pop_front(); + } + } + do_requeue_out_sent(); +} + +void IOHandler::reset_in() +{ + assert(get_io_state() != io_state_t::open); + in_seq = 0; +} + +void IOHandler::reset_out() +{ + assert(get_io_state() != io_state_t::open); + discard_out_sent(); + out_pending_msgs.clear(); + need_keepalive = false; + next_keepalive_ack = std::nullopt; + ack_left = 0; +} + +void IOHandler::discard_out_sent() +{ + assert(get_io_state() != io_state_t::open); + out_seq = 0; + out_sent_msgs.clear(); +} + +seastar::future<> +IOHandler::dispatch_accept( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef conn_fref, + bool is_replace) +{ + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); +} + +seastar::future<> +IOHandler::dispatch_connect( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef conn_fref) +{ + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt); +} + +seastar::future<> +IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid) +{ + assert(seastar::this_shard_id() == get_shard_id()); + return seastar::smp::submit_to(prv_sid, [this] { + logger().debug("{} got cleanup_prv_shard()", conn); + assert(maybe_prv_shard_states != nullptr); + auto ref_prv_states = std::move(maybe_prv_shard_states); + auto &prv_states = *ref_prv_states; + return prv_states.close( + ).then([ref_prv_states=std::move(ref_prv_states)] { + ceph_assert_always(ref_prv_states->assert_closed_and_exit()); + }); + }).then([this] { + ceph_assert_always(maybe_prv_shard_states == nullptr); + }); +} + +seastar::future<> +IOHandler::to_new_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef conn_fref, + std::optional<bool> is_replace) +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} to_new_sid(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_sid, is_replace, + conn_fref=std::move(conn_fref)]() mutable { + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); + }); + } + + bool is_accept_or_connect = is_replace.has_value(); + logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}", + conn, cc_seq, new_sid, + fmt::format("{}", + is_accept_or_connect ? + (*is_replace ? "accept(replace)" : "accept(!replace)") : + "connect"), + io_stat_printer{*this}); + auto next_cc_seq = ++cc_seq; + + if (get_io_state() != io_state_t::drop) { + ceph_assert_always(conn_ref); + if (new_sid != seastar::this_shard_id()) { + dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect); + // user can make changes + } + } else { + // it is possible that both io_handler and protocolv2 are + // trying to close each other from different cores simultaneously. + assert(!protocol_is_connected); + } + + if (get_io_state() != io_state_t::drop) { + if (is_accept_or_connect) { + // protocol_is_connected can be from true to true here if the replacing is + // happening to a connected connection. + } else { + ceph_assert_always(protocol_is_connected == false); + } + protocol_is_connected = true; + } else { + assert(!protocol_is_connected); + } + + bool is_dropped = false; + if (get_io_state() == io_state_t::drop) { + is_dropped = true; + } + ceph_assert_always(get_io_state() != io_state_t::open); + + // apply the switching atomically + ceph_assert_always(conn_ref); + conn_ref.reset(); + auto prv_sid = get_shard_id(); + ceph_assert_always(maybe_prv_shard_states == nullptr); + maybe_prv_shard_states = std::move(shard_states); + shard_states = shard_states_t::create_from_previous( + *maybe_prv_shard_states, new_sid); + assert(new_sid == get_shard_id()); + + return seastar::smp::submit_to(new_sid, + [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable { + logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}", + conn, next_cc_seq, prv_sid, is_dropped, + fmt::format("{}", + is_replace.has_value() ? + (*is_replace ? "accept(replace)" : "accept(!replace)") : + "connect"), + io_stat_printer{*this}); + + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() != io_state_t::open); + ceph_assert_always(!maybe_dropped_sid.has_value()); + ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq)); + + if (is_dropped) { + ceph_assert_always(get_io_state() == io_state_t::drop); + ceph_assert_always(shard_states->assert_closed_and_exit()); + maybe_dropped_sid = prv_sid; + // cleanup_prv_shard() will be done in a follow-up close_io() + } else { + // possible at io_state_t::drop + + // previous shard is not cleaned, + // but close_io() is responsible to clean up the current shard, + // so cleanup the previous shard here. + shard_states->dispatch_in_background( + "cleanup_prv_sid", conn, [this, prv_sid] { + return cleanup_prv_shard(prv_sid); + }); + maybe_notify_out_dispatch(); + } + + ceph_assert_always(!conn_ref); + // assign even if already dropping + conn_ref = make_local_shared_foreign(std::move(conn_fref)); + + if (get_io_state() != io_state_t::drop) { + if (is_replace.has_value()) { + dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace); + } else { + dispatchers.ms_handle_connect(conn_ref, prv_sid); + } + // user can make changes + } + }); +} + +seastar::future<> IOHandler::set_accepted_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id sid, + ConnectionFRef conn_fref) +{ + assert(seastar::this_shard_id() == get_shard_id()); + assert(get_io_state() == io_state_t::none); + ceph_assert_always(conn_ref); + conn_ref.reset(); + assert(maybe_prv_shard_states == nullptr); + shard_states.reset(); + shard_states = shard_states_t::create(sid, io_state_t::none); + return seastar::smp::submit_to(sid, + [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable { + // must be the first to proceed + ceph_assert_always(crosscore.proceed_or_wait(cc_seq)); + + logger().debug("{} set accepted sid", conn); + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() == io_state_t::none); + assert(maybe_prv_shard_states == nullptr); + ceph_assert_always(!conn_ref); + conn_ref = make_local_shared_foreign(std::move(conn_fref)); + }); +} + +void IOHandler::dispatch_reset(bool is_replace) +{ + ceph_assert_always(get_io_state() == io_state_t::drop); + if (!need_dispatch_reset) { + return; + } + need_dispatch_reset = false; + ceph_assert_always(conn_ref); + + dispatchers.ms_handle_reset(conn_ref, is_replace); + // user can make changes +} + +void IOHandler::dispatch_remote_reset() +{ + if (get_io_state() == io_state_t::drop) { + return; + } + ceph_assert_always(conn_ref); + + dispatchers.ms_handle_remote_reset(conn_ref); + // user can make changes +} + +void IOHandler::ack_out_sent(seq_num_t seq) +{ + if (conn.policy.lossy) { // lossy connections don't keep sent messages + return; + } + while (!out_sent_msgs.empty() && + out_sent_msgs.front()->get_seq() <= seq) { + logger().trace("{} got ack seq {} >= {}, pop {}", + conn, seq, out_sent_msgs.front()->get_seq(), + *out_sent_msgs.front()); + out_sent_msgs.pop_front(); + } +} + +seastar::future<> +IOHandler::do_out_dispatch(shard_states_t &ctx) +{ + return seastar::repeat([this, &ctx] { + switch (ctx.get_io_state()) { + case io_state_t::open: { + if (unlikely(!is_out_queued())) { + // try exit open dispatching + return frame_assembler->flush<false>( + ).then([this, &ctx] { + if (ctx.get_io_state() != io_state_t::open || is_out_queued()) { + return seastar::make_ready_future<stop_t>(stop_t::no); + } + // still nothing pending to send after flush, + // open dispatching can ONLY stop now + ctx.exit_out_dispatching("exit-open", conn); + return seastar::make_ready_future<stop_t>(stop_t::yes); + }); + } + + auto require_keepalive = need_keepalive; + need_keepalive = false; + auto maybe_keepalive_ack = next_keepalive_ack; + next_keepalive_ack = std::nullopt; + auto to_ack = ack_left; + assert(to_ack == 0 || in_seq > 0); + ack_left = 0; +#ifdef UNIT_TESTS_BUILT + auto ret = sweep_out_pending_msgs_to_sent( + require_keepalive, maybe_keepalive_ack, to_ack > 0); + return frame_assembler->intercept_frames(ret.tags, true + ).then([this, bl=std::move(ret.bl)]() mutable { + return frame_assembler->write<false>(std::move(bl)); + } +#else + auto bl = sweep_out_pending_msgs_to_sent( + require_keepalive, maybe_keepalive_ack, to_ack > 0); + return frame_assembler->write<false>(std::move(bl) +#endif + ).then([this, &ctx] { + if (ctx.get_io_state() != io_state_t::open) { + return frame_assembler->flush<false>( + ).then([] { + return seastar::make_ready_future<stop_t>(stop_t::no); + }); + } + + // FIXME: may leak a flush if state is changed after return and before + // the next repeat body. + return seastar::make_ready_future<stop_t>(stop_t::no); + }); + } + case io_state_t::delay: + // delay out dispatching until open + ctx.notify_out_dispatching_stopped("delay...", conn); + return ctx.wait_state_change( + ).then([] { return stop_t::no; }); + case io_state_t::drop: + ctx.exit_out_dispatching("dropped", conn); + return seastar::make_ready_future<stop_t>(stop_t::yes); + case io_state_t::switched: + ctx.exit_out_dispatching("switched", conn); + return seastar::make_ready_future<stop_t>(stop_t::yes); + default: + ceph_abort("impossible"); + } + }).handle_exception_type([this, &ctx](const std::system_error& e) { + auto io_state = ctx.get_io_state(); + if (e.code() != std::errc::broken_pipe && + e.code() != std::errc::connection_reset && + e.code() != error::negotiation_failure) { + logger().error("{} do_out_dispatch(): unexpected error at {} -- {}", + conn, io_state, e.what()); + ceph_abort(); + } + + if (io_state == io_state_t::open) { + auto cc_seq = crosscore.prepare_submit(); + logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, " + "send {} notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e.what(), cc_seq); + std::exception_ptr eptr; + try { + throw e; + } catch(...) { + eptr = std::current_exception(); + } + do_set_io_state(io_state_t::delay); + shard_states->dispatch_in_background( + "notify_out_fault(out)", conn, [this, cc_seq, eptr] { + auto states = get_states(); + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { + return handshake_listener->notify_out_fault( + cc_seq, "do_out_dispatch", eptr, states); + }); + }); + } else { + if (io_state != io_state_t::switched) { + logger().info("{} do_out_dispatch(): fault at {}, {} -- {}", + conn, io_state, io_stat_printer{*this}, e.what()); + } else { + logger().info("{} do_out_dispatch(): fault at {} -- {}", + conn, io_state, e.what()); + } + } + + return do_out_dispatch(ctx); + }); +} + +void IOHandler::maybe_notify_out_dispatch() +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + if (is_out_queued()) { + notify_out_dispatch(); + } +} + +void IOHandler::notify_out_dispatch() +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + assert(is_out_queued()); + if (need_notify_out) { + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} notify_out()", + conn, cc_seq); + shard_states->dispatch_in_background( + "notify_out", conn, [this, cc_seq] { + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq] { + return handshake_listener->notify_out(cc_seq); + }); + }); + } + if (shard_states->try_enter_out_dispatching()) { + shard_states->dispatch_in_background( + "do_out_dispatch", conn, [this] { + return do_out_dispatch(*shard_states); + }); + } +} + +seastar::future<> +IOHandler::read_message( + shard_states_t &ctx, + utime_t throttle_stamp, + std::size_t msg_size) +{ + return frame_assembler->read_frame_payload<false>( + ).then([this, throttle_stamp, msg_size, &ctx](auto payload) { + if (unlikely(ctx.get_io_state() != io_state_t::open)) { + logger().debug("{} triggered {} during read_message()", + conn, ctx.get_io_state()); + abort_protocol(); + } + + utime_t recv_stamp{seastar::lowres_system_clock::now()}; + + // we need to get the size before std::moving segments data + auto msg_frame = MessageFrame::Decode(*payload); + // XXX: paranoid copy just to avoid oops + ceph_msg_header2 current_header = msg_frame.header(); + + logger().trace("{} got {} + {} + {} byte message," + " envelope type={} src={} off={} seq={}", + conn, + msg_frame.front_len(), + msg_frame.middle_len(), + msg_frame.data_len(), + current_header.type, + conn.get_peer_name(), + current_header.data_off, + current_header.seq); + + ceph_msg_header header{current_header.seq, + current_header.tid, + current_header.type, + current_header.priority, + current_header.version, + ceph_le32(msg_frame.front_len()), + ceph_le32(msg_frame.middle_len()), + ceph_le32(msg_frame.data_len()), + current_header.data_off, + conn.get_peer_name(), + current_header.compat_version, + current_header.reserved, + ceph_le32(0)}; + ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), + ceph_le32(0), ceph_le64(0), current_header.flags}; + + Message *message = decode_message(nullptr, 0, header, footer, + msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr); + if (!message) { + logger().warn("{} decode message failed", conn); + abort_in_fault(); + } + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(msg_size); + + message->set_throttle_stamp(throttle_stamp); + message->set_recv_stamp(recv_stamp); + message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()}); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the + // client side queueing because messages can't be renumbered, but the (kernel) + // client will occasionally pull a message out of the sent queue to send + // elsewhere. in that case it doesn't matter if we "got" it or not. + uint64_t cur_seq = in_seq; + if (message->get_seq() <= cur_seq) { + logger().error("{} got old message {} <= {} {}, discarding", + conn, message->get_seq(), cur_seq, *message); + if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && + local_conf()->ms_die_on_old_message) { + ceph_assert(0 == "old msgs despite reconnect_seq feature"); + } + return seastar::now(); + } else if (message->get_seq() > cur_seq + 1) { + logger().error("{} missed message? skipped from seq {} to {}", + conn, cur_seq, message->get_seq()); + if (local_conf()->ms_die_on_skipped_message) { + ceph_assert(0 == "skipped incoming seq"); + } + } + + // note last received message. + in_seq = message->get_seq(); + if (conn.policy.lossy) { + logger().debug("{} <== #{} === {} ({})", + conn, + message->get_seq(), + *message, + message->get_type()); + } else { + logger().debug("{} <== #{},{} === {} ({})", + conn, + message->get_seq(), + current_header.ack_seq, + *message, + message->get_type()); + } + + // notify ack + if (!conn.policy.lossy) { + ++ack_left; + notify_out_dispatch(); + } + + ack_out_sent(current_header.ack_seq); + + // TODO: change MessageRef with seastar::shared_ptr + auto msg_ref = MessageRef{message, false}; + assert(ctx.get_io_state() == io_state_t::open); + assert(get_io_state() == io_state_t::open); + ceph_assert_always(conn_ref); + + // throttle the reading process by the returned future + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); + // user can make changes + }); +} + +void IOHandler::do_in_dispatch() +{ + shard_states->enter_in_dispatching(); + shard_states->dispatch_in_background( + "do_in_dispatch", conn, [this, &ctx=*shard_states] { + return seastar::keep_doing([this, &ctx] { + return frame_assembler->read_main_preamble<false>( + ).then([this, &ctx](auto ret) { + switch (ret.tag) { + case Tag::MESSAGE: { + size_t msg_size = get_msg_size(*ret.rx_frame_asm); + return seastar::futurize_invoke([this] { + // throttle_message() logic + if (!conn.policy.throttler_messages) { + return seastar::now(); + } + // TODO: message throttler + ceph_abort("TODO"); + return seastar::now(); + }).then([this, msg_size] { + // throttle_bytes() logic + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + if (!msg_size) { + return seastar::now(); + } + logger().trace("{} wants {} bytes from policy throttler {}/{}", + conn, msg_size, + conn.policy.throttler_bytes->get_current(), + conn.policy.throttler_bytes->get_max()); + return conn.policy.throttler_bytes->get(msg_size); + }).then([this, msg_size, &ctx] { + // TODO: throttle_dispatch_queue() logic + utime_t throttle_stamp{seastar::lowres_system_clock::now()}; + return read_message(ctx, throttle_stamp, msg_size); + }); + } + case Tag::ACK: + return frame_assembler->read_frame_payload<false>( + ).then([this](auto payload) { + // handle_message_ack() logic + auto ack = AckFrame::Decode(payload->back()); + logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); + ack_out_sent(ack.seq()); + }); + case Tag::KEEPALIVE2: + return frame_assembler->read_frame_payload<false>( + ).then([this](auto payload) { + // handle_keepalive2() logic + auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); + logger().debug("{} GOT KeepAliveFrame: timestamp={}", + conn, keepalive_frame.timestamp()); + // notify keepalive ack + next_keepalive_ack = keepalive_frame.timestamp(); + if (seastar::this_shard_id() == get_shard_id()) { + notify_out_dispatch(); + } + + last_keepalive = seastar::lowres_system_clock::now(); + }); + case Tag::KEEPALIVE2_ACK: + return frame_assembler->read_frame_payload<false>( + ).then([this](auto payload) { + // handle_keepalive2_ack() logic + auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back()); + auto _last_keepalive_ack = + seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}; + set_last_keepalive_ack(_last_keepalive_ack); + logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", + conn, _last_keepalive_ack); + }); + default: { + logger().warn("{} do_in_dispatch() received unexpected tag: {}", + conn, static_cast<uint32_t>(ret.tag)); + abort_in_fault(); + } + } + }); + }).handle_exception([this, &ctx](std::exception_ptr eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + + auto io_state = ctx.get_io_state(); + if (io_state == io_state_t::open) { + auto cc_seq = crosscore.prepare_submit(); + logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, " + "send {} notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e_what, cc_seq); + do_set_io_state(io_state_t::delay); + shard_states->dispatch_in_background( + "notify_out_fault(in)", conn, [this, cc_seq, eptr] { + auto states = get_states(); + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { + return handshake_listener->notify_out_fault( + cc_seq, "do_in_dispatch", eptr, states); + }); + }); + } else { + if (io_state != io_state_t::switched) { + logger().info("{} do_in_dispatch(): fault at {}, {} -- {}", + conn, io_state, io_stat_printer{*this}, e_what); + } else { + logger().info("{} do_in_dispatch(): fault at {} -- {}", + conn, io_state, e_what); + } + } + }).finally([&ctx] { + ctx.exit_in_dispatching(); + }); + }); +} + +seastar::future<> +IOHandler::close_io( + crosscore_t::seq_t cc_seq, + bool is_dispatch_reset, + bool is_replace) +{ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} close_io(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, is_dispatch_reset, is_replace] { + return close_io(cc_seq, is_dispatch_reset, is_replace); + }); + } + + logger().debug("{} got {} close_io(reset={}, replace={})", + conn, cc_seq, is_dispatch_reset, is_replace); + ceph_assert_always(get_io_state() == io_state_t::drop); + + if (is_dispatch_reset) { + dispatch_reset(is_replace); + } + + ceph_assert_always(conn_ref); + conn_ref.reset(); + + // cannot be running in parallel with to_new_sid() + if (maybe_dropped_sid.has_value()) { + assert(shard_states->assert_closed_and_exit()); + auto prv_sid = *maybe_dropped_sid; + return cleanup_prv_shard(prv_sid); + } else { + return shard_states->close( + ).then([this] { + assert(shard_states->assert_closed_and_exit()); + }); + } +} + +/* + * IOHandler::shard_states_t + */ + +void +IOHandler::shard_states_t::notify_out_dispatching_stopped( + const char *what, SocketConnection &conn) +{ + assert(seastar::this_shard_id() == sid); + if (unlikely(out_exit_dispatching.has_value())) { + out_exit_dispatching->set_value(); + out_exit_dispatching = std::nullopt; + logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching", + conn, what, io_state); + } else { + if (unlikely(io_state != io_state_t::open)) { + logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching", + conn, what, io_state); + } + } +} + +seastar::future<> +IOHandler::shard_states_t::wait_io_exit_dispatching() +{ + assert(seastar::this_shard_id() == sid); + assert(io_state != io_state_t::open); + assert(!gate.is_closed()); + return seastar::when_all( + [this] { + if (out_exit_dispatching) { + return out_exit_dispatching->get_future(); + } else { + return seastar::now(); + } + }(), + [this] { + if (in_exit_dispatching) { + return in_exit_dispatching->get_future(); + } else { + return seastar::now(); + } + }() + ).discard_result(); +} + +IOHandler::shard_states_ref_t +IOHandler::shard_states_t::create_from_previous( + shard_states_t &prv_states, + seastar::shard_id new_sid) +{ + auto io_state = prv_states.io_state; + assert(io_state != io_state_t::open); + auto ret = shard_states_t::create(new_sid, io_state); + if (io_state == io_state_t::drop) { + // the new gate should not never be used + auto fut = ret->gate.close(); + ceph_assert_always(fut.available()); + } + prv_states.set_io_state(io_state_t::switched); + return ret; +} + +} // namespace crimson::net |