summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Protocol.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net/Protocol.cc')
-rw-r--r--src/crimson/net/Protocol.cc323
1 files changed, 323 insertions, 0 deletions
diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc
new file mode 100644
index 000000000..50b5c45a3
--- /dev/null
+++ b/src/crimson/net/Protocol.cc
@@ -0,0 +1,323 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Protocol.h"
+
+#include "auth/Auth.h"
+
+#include "crimson/common/log.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/Socket.h"
+#include "crimson/net/SocketConnection.h"
+#include "msg/Message.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+}
+
+namespace crimson::net {
+
+Protocol::Protocol(proto_t type,
+ ChainedDispatchers& dispatchers,
+ SocketConnection& conn)
+ : proto_type(type),
+ dispatchers(dispatchers),
+ conn(conn),
+ auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}
+{}
+
+Protocol::~Protocol()
+{
+ ceph_assert(gate.is_closed());
+ assert(!exit_open);
+}
+
+void Protocol::close(bool dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
+{
+ if (closed) {
+ // already closing
+ return;
+ }
+
+ bool is_replace = f_accept_new ? true : false;
+ logger().info("{} closing: reset {}, replace {}", conn,
+ dispatch_reset ? "yes" : "no",
+ is_replace ? "yes" : "no");
+
+ // atomic operations
+ closed = true;
+ trigger_close();
+ if (f_accept_new) {
+ (*f_accept_new)();
+ }
+ if (socket) {
+ socket->shutdown();
+ }
+ set_write_state(write_state_t::drop);
+ assert(!gate.is_closed());
+ auto gate_closed = gate.close();
+
+ if (dispatch_reset) {
+ dispatchers.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
+ }
+
+ // asynchronous operations
+ assert(!close_ready.valid());
+ close_ready = std::move(gate_closed).then([this] {
+ if (socket) {
+ return socket->close();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ on_closed();
+#ifdef UNIT_TESTS_BUILT
+ is_closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(conn);
+ }
+#endif
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr);
+ ceph_abort();
+ });
+}
+
+seastar::future<> Protocol::send(MessageRef msg)
+{
+ if (write_state != write_state_t::drop) {
+ conn.out_q.push_back(std::move(msg));
+ write_event();
+ }
+ return seastar::now();
+}
+
+seastar::future<> Protocol::keepalive()
+{
+ if (!need_keepalive) {
+ need_keepalive = true;
+ write_event();
+ }
+ return seastar::now();
+}
+
+void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
+{
+ logger().trace("{} got keepalive ack {}", conn, _keepalive_ack);
+ keepalive_ack = _keepalive_ack;
+ write_event();
+}
+
+void Protocol::notify_ack()
+{
+ if (!conn.policy.lossy) {
+ ++ack_left;
+ write_event();
+ }
+}
+
+void Protocol::requeue_sent()
+{
+ assert(write_state != write_state_t::open);
+ if (conn.sent.empty()) {
+ return;
+ }
+
+ conn.out_seq -= conn.sent.size();
+ logger().debug("{} requeue {} items, revert out_seq to {}",
+ conn, conn.sent.size(), conn.out_seq);
+ for (MessageRef& msg : conn.sent) {
+ msg->clear_payload();
+ msg->set_seq(0);
+ }
+ conn.out_q.insert(conn.out_q.begin(),
+ std::make_move_iterator(conn.sent.begin()),
+ std::make_move_iterator(conn.sent.end()));
+ conn.sent.clear();
+ write_event();
+}
+
+void Protocol::requeue_up_to(seq_num_t seq)
+{
+ assert(write_state != write_state_t::open);
+ if (conn.sent.empty() && conn.out_q.empty()) {
+ logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
+ conn, conn.out_seq, seq);
+ conn.out_seq = seq;
+ return;
+ }
+ logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
+ conn, seq, conn.sent.size(), conn.out_seq);
+ while (!conn.sent.empty()) {
+ auto cur_seq = conn.sent.front()->get_seq();
+ if (cur_seq == 0 || cur_seq > seq) {
+ break;
+ } else {
+ conn.sent.pop_front();
+ }
+ }
+ requeue_sent();
+}
+
+void Protocol::reset_write()
+{
+ assert(write_state != write_state_t::open);
+ conn.out_seq = 0;
+ conn.out_q.clear();
+ conn.sent.clear();
+ need_keepalive = false;
+ keepalive_ack = std::nullopt;
+ ack_left = 0;
+}
+
+void Protocol::ack_writes(seq_num_t seq)
+{
+ if (conn.policy.lossy) { // lossy connections don't keep sent messages
+ return;
+ }
+ while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
+ logger().trace("{} got ack seq {} >= {}, pop {}",
+ conn, seq, conn.sent.front()->get_seq(), conn.sent.front());
+ conn.sent.pop_front();
+ }
+}
+
+seastar::future<stop_t> Protocol::try_exit_sweep() {
+ assert(!is_queued());
+ return socket->flush().then([this] {
+ if (!is_queued()) {
+ // still nothing pending to send after flush,
+ // the dispatching can ONLY stop now
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ if (unlikely(exit_open.has_value())) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: nothing queued at {},"
+ " set exit_open",
+ conn, get_state_name(write_state));
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ } else {
+ // something is pending to send during flushing
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+}
+
+seastar::future<> Protocol::do_write_dispatch_sweep()
+{
+ return seastar::repeat([this] {
+ switch (write_state) {
+ case write_state_t::open: {
+ size_t num_msgs = conn.out_q.size();
+ bool still_queued = is_queued();
+ if (unlikely(!still_queued)) {
+ return try_exit_sweep();
+ }
+ conn.pending_q.clear();
+ conn.pending_q.swap(conn.out_q);
+ if (!conn.policy.lossy) {
+ conn.sent.insert(conn.sent.end(),
+ conn.pending_q.begin(),
+ conn.pending_q.end());
+ }
+ auto acked = ack_left;
+ assert(acked == 0 || conn.in_seq > 0);
+ // sweep all pending writes with the concrete Protocol
+ return socket->write(do_sweep_messages(
+ conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
+ ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
+ need_keepalive = false;
+ if (keepalive_ack == prv_keepalive_ack) {
+ keepalive_ack = std::nullopt;
+ }
+ assert(ack_left >= acked);
+ ack_left -= acked;
+ if (!is_queued()) {
+ return try_exit_sweep();
+ } else {
+ // messages were enqueued during socket write
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+ }
+ case write_state_t::delay:
+ // delay dispatching writes until open
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: delay and set exit_open ...", conn);
+ } else {
+ logger().info("{} write_event: delay ...", conn);
+ }
+ return state_changed.get_shared_future()
+ .then([] { return stop_t::no; });
+ case write_state_t::drop:
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: dropped and set exit_open", conn);
+ } else {
+ logger().info("{} write_event: dropped", conn);
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_assert(false);
+ }
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset &&
+ e.code() != error::negotiation_failure) {
+ logger().error("{} write_event(): unexpected error at {} -- {}",
+ conn, get_state_name(write_state), e);
+ ceph_abort();
+ }
+ socket->shutdown();
+ if (write_state == write_state_t::open) {
+ logger().info("{} write_event(): fault at {}, going to delay -- {}",
+ conn, get_state_name(write_state), e);
+ write_state = write_state_t::delay;
+ } else {
+ logger().info("{} write_event(): fault at {} -- {}",
+ conn, get_state_name(write_state), e);
+ }
+ return do_write_dispatch_sweep();
+ });
+}
+
+void Protocol::write_event()
+{
+ notify_write();
+ if (write_dispatching) {
+ // already dispatching
+ return;
+ }
+ write_dispatching = true;
+ switch (write_state) {
+ case write_state_t::open:
+ [[fallthrough]];
+ case write_state_t::delay:
+ assert(!gate.is_closed());
+ gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
+ return do_write_dispatch_sweep();
+ });
+ return;
+ case write_state_t::drop:
+ write_dispatching = false;
+ return;
+ default:
+ ceph_assert(false);
+ }
+}
+
+} // namespace crimson::net