summaryrefslogtreecommitdiffstats
path: root/src/crimson/net
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/crimson/net
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net')
-rw-r--r--src/crimson/net/Connection.h175
-rw-r--r--src/crimson/net/Dispatcher.h46
-rw-r--r--src/crimson/net/Errors.cc51
-rw-r--r--src/crimson/net/Errors.h53
-rw-r--r--src/crimson/net/Fwd.h50
-rw-r--r--src/crimson/net/Interceptor.h165
-rw-r--r--src/crimson/net/Messenger.cc17
-rw-r--r--src/crimson/net/Messenger.h154
-rw-r--r--src/crimson/net/Protocol.cc323
-rw-r--r--src/crimson/net/Protocol.h173
-rw-r--r--src/crimson/net/ProtocolV1.cc1014
-rw-r--r--src/crimson/net/ProtocolV1.h137
-rw-r--r--src/crimson/net/ProtocolV2.cc2139
-rw-r--r--src/crimson/net/ProtocolV2.h225
-rw-r--r--src/crimson/net/Socket.cc276
-rw-r--r--src/crimson/net/Socket.h268
-rw-r--r--src/crimson/net/SocketConnection.cc150
-rw-r--r--src/crimson/net/SocketConnection.h106
-rw-r--r--src/crimson/net/SocketMessenger.cc351
-rw-r--r--src/crimson/net/SocketMessenger.h122
-rw-r--r--src/crimson/net/chained_dispatchers.cc93
-rw-r--r--src/crimson/net/chained_dispatchers.h36
22 files changed, 6124 insertions, 0 deletions
diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h
new file mode 100644
index 000000000..6af12692e
--- /dev/null
+++ b/src/crimson/net/Connection.h
@@ -0,0 +1,175 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <queue>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
+
+#include "Fwd.h"
+
+namespace crimson::net {
+
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
+using seq_num_t = uint64_t;
+
+class Connection : public seastar::enable_shared_from_this<Connection> {
+ entity_name_t peer_name = {0, entity_name_t::NEW};
+
+ protected:
+ entity_addr_t peer_addr;
+
+ // which of the peer_addrs we're connecting to (as client)
+ // or should reconnect to (as peer)
+ entity_addr_t target_addr;
+
+ using clock_t = seastar::lowres_system_clock;
+ clock_t::time_point last_keepalive;
+ clock_t::time_point last_keepalive_ack;
+
+ void set_peer_type(entity_type_t peer_type) {
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_type == 0 &&
+ peer_name.type() != 0));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_type != 0 &&
+ peer_name.type() != 0 &&
+ peer_type != peer_name.type()));
+ peer_name._type = peer_type;
+ }
+ void set_peer_id(int64_t peer_id) {
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_id == entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_id != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ peer_id != peer_name.num()));
+ peer_name._num = peer_id;
+ }
+ void set_peer_name(entity_name_t name) {
+ set_peer_type(name.type());
+ set_peer_id(name.num());
+ }
+
+ public:
+ uint64_t peer_global_id = 0;
+
+ protected:
+ uint64_t features = 0;
+
+ public:
+ void set_features(uint64_t new_features) {
+ features = new_features;
+ }
+ auto get_features() const {
+ return features;
+ }
+ bool has_feature(uint64_t f) const {
+ return features & f;
+ }
+
+ public:
+ Connection() {}
+ virtual ~Connection() {}
+
+#ifdef UNIT_TESTS_BUILT
+ Interceptor *interceptor = nullptr;
+#endif
+
+ virtual Messenger* get_messenger() const = 0;
+ const entity_addr_t& get_peer_addr() const { return peer_addr; }
+ const entity_addrvec_t get_peer_addrs() const {
+ return entity_addrvec_t(peer_addr);
+ }
+ const auto& get_peer_socket_addr() const {
+ return target_addr;
+ }
+ const entity_name_t& get_peer_name() const { return peer_name; }
+ entity_type_t get_peer_type() const { return peer_name.type(); }
+ int64_t get_peer_id() const { return peer_name.num(); }
+
+ bool peer_is_mon() const { return peer_name.is_mon(); }
+ bool peer_is_mgr() const { return peer_name.is_mgr(); }
+ bool peer_is_mds() const { return peer_name.is_mds(); }
+ bool peer_is_osd() const { return peer_name.is_osd(); }
+ bool peer_is_client() const { return peer_name.is_client(); }
+
+ /// true if the handshake has completed and no errors have been encountered
+ virtual bool is_connected() const = 0;
+
+#ifdef UNIT_TESTS_BUILT
+ virtual bool is_closed() const = 0;
+
+ virtual bool is_closed_clean() const = 0;
+
+ virtual bool peer_wins() const = 0;
+#endif
+
+ /// send a message over a connection that has completed its handshake
+ virtual seastar::future<> send(MessageRef msg) = 0;
+
+ /// send a keepalive message over a connection that has completed its
+ /// handshake
+ virtual seastar::future<> keepalive() = 0;
+
+ // close the connection and cancel any any pending futures from read/send,
+ // without dispatching any reset event
+ virtual void mark_down() = 0;
+
+ virtual void print(ostream& out) const = 0;
+
+ void set_last_keepalive(clock_t::time_point when) {
+ last_keepalive = when;
+ }
+ void set_last_keepalive_ack(clock_t::time_point when) {
+ last_keepalive_ack = when;
+ }
+ auto get_last_keepalive() const { return last_keepalive; }
+ auto get_last_keepalive_ack() const { return last_keepalive_ack; }
+
+ struct user_private_t {
+ virtual ~user_private_t() = default;
+ };
+private:
+ unique_ptr<user_private_t> user_private;
+public:
+ bool has_user_private() const {
+ return user_private != nullptr;
+ }
+ void set_user_private(unique_ptr<user_private_t> new_user_private) {
+ user_private = std::move(new_user_private);
+ }
+ user_private_t &get_user_private() {
+ ceph_assert(user_private);
+ return *user_private;
+ }
+};
+
+inline ostream& operator<<(ostream& out, const Connection& conn) {
+ out << "[";
+ conn.print(out);
+ out << "]";
+ return out;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h
new file mode 100644
index 000000000..cc6fd4574
--- /dev/null
+++ b/src/crimson/net/Dispatcher.h
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "Fwd.h"
+
+class AuthAuthorizer;
+
+namespace crimson::net {
+
+class Dispatcher {
+ public:
+ virtual ~Dispatcher() {}
+
+ // Dispatchers are put into a chain as described by chain-of-responsibility
+ // pattern. If any of the dispatchers claims this message, it returns a valid
+ // future to prevent other dispatchers from processing it, and this is also
+ // used to throttle the connection if it's too busy.
+ virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
+
+ virtual void ms_handle_accept(ConnectionRef conn) {}
+
+ virtual void ms_handle_connect(ConnectionRef conn) {}
+
+ // a reset event is dispatched when the connection is closed unexpectedly.
+ // is_replace=true means the reset connection is going to be replaced by
+ // another accepting connection with the same peer_addr, which currently only
+ // happens under lossy policy when both sides wish to connect to each other.
+ virtual void ms_handle_reset(ConnectionRef conn, bool is_replace) {}
+
+ virtual void ms_handle_remote_reset(ConnectionRef conn) {}
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc
new file mode 100644
index 000000000..d07c090db
--- /dev/null
+++ b/src/crimson/net/Errors.cc
@@ -0,0 +1,51 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Errors.h"
+
+namespace crimson::net {
+
+const std::error_category& net_category()
+{
+ struct category : public std::error_category {
+ const char* name() const noexcept override {
+ return "crimson::net";
+ }
+
+ std::string message(int ev) const override {
+ switch (static_cast<error>(ev)) {
+ case error::success:
+ return "success";
+ case error::bad_connect_banner:
+ return "bad connect banner";
+ case error::bad_peer_address:
+ return "bad peer address";
+ case error::negotiation_failure:
+ return "negotiation failure";
+ case error::read_eof:
+ return "read eof";
+ case error::corrupted_message:
+ return "corrupted message";
+ case error::protocol_aborted:
+ return "protocol aborted";
+ default:
+ return "unknown";
+ }
+ }
+ };
+ static category instance;
+ return instance;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h
new file mode 100644
index 000000000..3a17a103a
--- /dev/null
+++ b/src/crimson/net/Errors.h
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <system_error>
+
+namespace crimson::net {
+
+/// net error codes
+enum class error {
+ success = 0,
+ bad_connect_banner,
+ bad_peer_address,
+ negotiation_failure,
+ read_eof,
+ corrupted_message,
+ protocol_aborted,
+};
+
+/// net error category
+const std::error_category& net_category();
+
+inline std::error_code make_error_code(error e)
+{
+ return {static_cast<int>(e), net_category()};
+}
+
+inline std::error_condition make_error_condition(error e)
+{
+ return {static_cast<int>(e), net_category()};
+}
+
+} // namespace crimson::net
+
+namespace std {
+
+/// enables implicit conversion to std::error_condition
+template <>
+struct is_error_condition_enum<crimson::net::error> : public true_type {};
+
+} // namespace std
diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h
new file mode 100644
index 000000000..e10120571
--- /dev/null
+++ b/src/crimson/net/Fwd.h
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/container/small_vector.hpp>
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/sharded.hh>
+
+#include "msg/Connection.h"
+#include "msg/MessageRef.h"
+#include "msg/msg_types.h"
+
+#include "crimson/common/errorator.h"
+
+using auth_proto_t = int;
+
+class AuthConnectionMeta;
+using AuthConnectionMetaRef = seastar::lw_shared_ptr<AuthConnectionMeta>;
+
+namespace crimson::net {
+
+using msgr_tag_t = uint8_t;
+using stop_t = seastar::stop_iteration;
+
+class Connection;
+using ConnectionRef = seastar::shared_ptr<Connection>;
+
+class Dispatcher;
+class ChainedDispatchers;
+constexpr std::size_t NUM_DISPATCHERS = 4u;
+using dispatchers_t = boost::container::small_vector<Dispatcher*, NUM_DISPATCHERS>;
+
+class Messenger;
+using MessengerRef = seastar::shared_ptr<Messenger>;
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h
new file mode 100644
index 000000000..dfa2183ec
--- /dev/null
+++ b/src/crimson/net/Interceptor.h
@@ -0,0 +1,165 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <variant>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/sleep.hh>
+
+#include "Fwd.h"
+#include "msg/async/frames_v2.h"
+
+namespace crimson::net {
+
+enum class custom_bp_t : uint8_t {
+ BANNER_WRITE = 0,
+ BANNER_READ,
+ BANNER_PAYLOAD_READ,
+ SOCKET_CONNECTING,
+ SOCKET_ACCEPTED
+};
+inline const char* get_bp_name(custom_bp_t bp) {
+ uint8_t index = static_cast<uint8_t>(bp);
+ static const char *const bp_names[] = {"BANNER_WRITE",
+ "BANNER_READ",
+ "BANNER_PAYLOAD_READ",
+ "SOCKET_CONNECTING",
+ "SOCKET_ACCEPTED"};
+ assert(index < std::size(bp_names));
+ return bp_names[index];
+}
+
+enum class bp_type_t {
+ READ = 0,
+ WRITE
+};
+
+enum class bp_action_t {
+ CONTINUE = 0,
+ FAULT,
+ BLOCK,
+ STALL
+};
+
+inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) {
+ static const char *const action_names[] = {"CONTINUE",
+ "FAULT",
+ "BLOCK",
+ "STALL"};
+ assert(static_cast<size_t>(action) < std::size(action_names));
+ return out << action_names[static_cast<size_t>(action)];
+}
+
+class socket_blocker {
+ std::optional<seastar::abort_source> p_blocked;
+ std::optional<seastar::abort_source> p_unblocked;
+
+ public:
+ seastar::future<> wait_blocked() {
+ ceph_assert(!p_blocked);
+ if (p_unblocked) {
+ return seastar::make_ready_future<>();
+ } else {
+ p_blocked = seastar::abort_source();
+ return seastar::sleep_abortable(10s, *p_blocked).then([] {
+ throw std::runtime_error(
+ "Timeout (10s) in socket_blocker::wait_blocked()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
+ }
+ }
+
+ seastar::future<> block() {
+ if (p_blocked) {
+ p_blocked->request_abort();
+ p_blocked = std::nullopt;
+ }
+ ceph_assert(!p_unblocked);
+ p_unblocked = seastar::abort_source();
+ return seastar::sleep_abortable(10s, *p_unblocked).then([] {
+ ceph_abort("Timeout (10s) in socket_blocker::block()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
+ }
+
+ void unblock() {
+ ceph_assert(!p_blocked);
+ ceph_assert(p_unblocked);
+ p_unblocked->request_abort();
+ p_unblocked = std::nullopt;
+ }
+};
+
+struct tag_bp_t {
+ ceph::msgr::v2::Tag tag;
+ bp_type_t type;
+ bool operator==(const tag_bp_t& x) const {
+ return tag == x.tag && type == x.type;
+ }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const tag_bp_t& x) const {
+ return std::tie(tag, type) < std::tie(x.tag, x.type);
+ }
+};
+
+struct Breakpoint {
+ using var_t = std::variant<custom_bp_t, tag_bp_t>;
+ var_t bp;
+ Breakpoint(custom_bp_t bp) : bp(bp) { }
+ Breakpoint(ceph::msgr::v2::Tag tag, bp_type_t type)
+ : bp(tag_bp_t{tag, type}) { }
+ bool operator==(const Breakpoint& x) const { return bp == x.bp; }
+ bool operator!=(const Breakpoint& x) const { return !operator==(x); }
+ bool operator==(const custom_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const custom_bp_t& x) const { return !operator==(x); }
+ bool operator==(const tag_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const Breakpoint& x) const { return bp < x.bp; }
+};
+
+inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) {
+ if (auto custom_bp = std::get_if<custom_bp_t>(&bp.bp)) {
+ return out << get_bp_name(*custom_bp);
+ } else {
+ auto tag_bp = std::get<tag_bp_t>(bp.bp);
+ static const char *const tag_names[] = {"NONE",
+ "HELLO",
+ "AUTH_REQUEST",
+ "AUTH_BAD_METHOD",
+ "AUTH_REPLY_MORE",
+ "AUTH_REQUEST_MORE",
+ "AUTH_DONE",
+ "AUTH_SIGNATURE",
+ "CLIENT_IDENT",
+ "SERVER_IDENT",
+ "IDENT_MISSING_FEATURES",
+ "SESSION_RECONNECT",
+ "SESSION_RESET",
+ "SESSION_RETRY",
+ "SESSION_RETRY_GLOBAL",
+ "SESSION_RECONNECT_OK",
+ "WAIT",
+ "MESSAGE",
+ "KEEPALIVE2",
+ "KEEPALIVE2_ACK",
+ "ACK"};
+ assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names));
+ return out << tag_names[static_cast<size_t>(tag_bp.tag)]
+ << (tag_bp.type == bp_type_t::WRITE ? "_WRITE" : "_READ");
+ }
+}
+
+struct Interceptor {
+ socket_blocker blocker;
+ virtual ~Interceptor() {}
+ virtual void register_conn(Connection& conn) = 0;
+ virtual void register_conn_ready(Connection& conn) = 0;
+ virtual void register_conn_closed(Connection& conn) = 0;
+ virtual void register_conn_replaced(Connection& conn) = 0;
+ virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0;
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc
new file mode 100644
index 000000000..aab476f7a
--- /dev/null
+++ b/src/crimson/net/Messenger.cc
@@ -0,0 +1,17 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Messenger.h"
+#include "SocketMessenger.h"
+
+namespace crimson::net {
+
+MessengerRef
+Messenger::create(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce)
+{
+ return seastar::make_shared<SocketMessenger>(name, lname, nonce);
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h
new file mode 100644
index 000000000..2b39fbf63
--- /dev/null
+++ b/src/crimson/net/Messenger.h
@@ -0,0 +1,154 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "Fwd.h"
+#include "crimson/common/throttle.h"
+#include "msg/Message.h"
+#include "msg/Policy.h"
+
+class AuthAuthorizer;
+
+namespace crimson::auth {
+class AuthClient;
+class AuthServer;
+}
+
+namespace crimson::net {
+
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
+using Throttle = crimson::common::Throttle;
+using SocketPolicy = ceph::net::Policy<Throttle>;
+
+class Messenger {
+ entity_name_t my_name;
+ entity_addrvec_t my_addrs;
+ uint32_t crc_flags = 0;
+ crimson::auth::AuthClient* auth_client = nullptr;
+ crimson::auth::AuthServer* auth_server = nullptr;
+ bool require_authorizer = true;
+
+public:
+ Messenger(const entity_name_t& name)
+ : my_name(name)
+ {}
+ virtual ~Messenger() {}
+
+#ifdef UNIT_TESTS_BUILT
+ Interceptor *interceptor = nullptr;
+#endif
+
+ entity_type_t get_mytype() const { return my_name.type(); }
+ const entity_name_t& get_myname() const { return my_name; }
+ const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
+ entity_addr_t get_myaddr() const { return my_addrs.front(); }
+ virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) {
+ my_addrs = addrs;
+ return seastar::now();
+ }
+
+ using bind_ertr = crimson::errorator<
+ crimson::ct_error::address_in_use // The address (range) is already bound
+ >;
+ /// bind to the given address
+ virtual bind_ertr::future<> bind(const entity_addrvec_t& addr) = 0;
+
+ /// try to bind to the first unused port of given address
+ virtual bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) = 0;
+
+ /// start the messenger
+ virtual seastar::future<> start(const dispatchers_t&) = 0;
+
+ /// either return an existing connection to the peer,
+ /// or a new pending connection
+ virtual ConnectionRef
+ connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) = 0;
+
+ ConnectionRef
+ connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) {
+ return connect(peer_addr, entity_name_t(peer_type, -1));
+ }
+
+ // wait for messenger shutdown
+ virtual seastar::future<> wait() = 0;
+
+ // stop dispatching events and messages
+ virtual void stop() = 0;
+
+ virtual bool is_started() const = 0;
+
+ // free internal resources before destruction, must be called after stopped,
+ // and must be called if is bound.
+ virtual seastar::future<> shutdown() = 0;
+
+ uint32_t get_crc_flags() const {
+ return crc_flags;
+ }
+ void set_crc_data() {
+ crc_flags |= MSG_CRC_DATA;
+ }
+ void set_crc_header() {
+ crc_flags |= MSG_CRC_HEADER;
+ }
+
+ crimson::auth::AuthClient* get_auth_client() const { return auth_client; }
+ void set_auth_client(crimson::auth::AuthClient *ac) {
+ auth_client = ac;
+ }
+ crimson::auth::AuthServer* get_auth_server() const { return auth_server; }
+ void set_auth_server(crimson::auth::AuthServer *as) {
+ auth_server = as;
+ }
+
+ virtual void print(ostream& out) const = 0;
+
+ virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0;
+
+ virtual SocketPolicy get_default_policy() const = 0;
+
+ virtual void set_default_policy(const SocketPolicy& p) = 0;
+
+ virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0;
+
+ virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0;
+
+ // allow unauthenticated connections. This is needed for compatibility with
+ // pre-nautilus OSDs, which do not authenticate the heartbeat sessions.
+ bool get_require_authorizer() const {
+ return require_authorizer;
+ }
+ void set_require_authorizer(bool r) {
+ require_authorizer = r;
+ }
+ static MessengerRef
+ create(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce);
+};
+
+inline ostream& operator<<(ostream& out, const Messenger& msgr) {
+ out << "[";
+ msgr.print(out);
+ out << "]";
+ return out;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc
new file mode 100644
index 000000000..50b5c45a3
--- /dev/null
+++ b/src/crimson/net/Protocol.cc
@@ -0,0 +1,323 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Protocol.h"
+
+#include "auth/Auth.h"
+
+#include "crimson/common/log.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/Socket.h"
+#include "crimson/net/SocketConnection.h"
+#include "msg/Message.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+}
+
+namespace crimson::net {
+
+Protocol::Protocol(proto_t type,
+ ChainedDispatchers& dispatchers,
+ SocketConnection& conn)
+ : proto_type(type),
+ dispatchers(dispatchers),
+ conn(conn),
+ auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}
+{}
+
+Protocol::~Protocol()
+{
+ ceph_assert(gate.is_closed());
+ assert(!exit_open);
+}
+
+void Protocol::close(bool dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
+{
+ if (closed) {
+ // already closing
+ return;
+ }
+
+ bool is_replace = f_accept_new ? true : false;
+ logger().info("{} closing: reset {}, replace {}", conn,
+ dispatch_reset ? "yes" : "no",
+ is_replace ? "yes" : "no");
+
+ // atomic operations
+ closed = true;
+ trigger_close();
+ if (f_accept_new) {
+ (*f_accept_new)();
+ }
+ if (socket) {
+ socket->shutdown();
+ }
+ set_write_state(write_state_t::drop);
+ assert(!gate.is_closed());
+ auto gate_closed = gate.close();
+
+ if (dispatch_reset) {
+ dispatchers.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
+ }
+
+ // asynchronous operations
+ assert(!close_ready.valid());
+ close_ready = std::move(gate_closed).then([this] {
+ if (socket) {
+ return socket->close();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ on_closed();
+#ifdef UNIT_TESTS_BUILT
+ is_closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(conn);
+ }
+#endif
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr);
+ ceph_abort();
+ });
+}
+
+seastar::future<> Protocol::send(MessageRef msg)
+{
+ if (write_state != write_state_t::drop) {
+ conn.out_q.push_back(std::move(msg));
+ write_event();
+ }
+ return seastar::now();
+}
+
+seastar::future<> Protocol::keepalive()
+{
+ if (!need_keepalive) {
+ need_keepalive = true;
+ write_event();
+ }
+ return seastar::now();
+}
+
+void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
+{
+ logger().trace("{} got keepalive ack {}", conn, _keepalive_ack);
+ keepalive_ack = _keepalive_ack;
+ write_event();
+}
+
+void Protocol::notify_ack()
+{
+ if (!conn.policy.lossy) {
+ ++ack_left;
+ write_event();
+ }
+}
+
+void Protocol::requeue_sent()
+{
+ assert(write_state != write_state_t::open);
+ if (conn.sent.empty()) {
+ return;
+ }
+
+ conn.out_seq -= conn.sent.size();
+ logger().debug("{} requeue {} items, revert out_seq to {}",
+ conn, conn.sent.size(), conn.out_seq);
+ for (MessageRef& msg : conn.sent) {
+ msg->clear_payload();
+ msg->set_seq(0);
+ }
+ conn.out_q.insert(conn.out_q.begin(),
+ std::make_move_iterator(conn.sent.begin()),
+ std::make_move_iterator(conn.sent.end()));
+ conn.sent.clear();
+ write_event();
+}
+
+void Protocol::requeue_up_to(seq_num_t seq)
+{
+ assert(write_state != write_state_t::open);
+ if (conn.sent.empty() && conn.out_q.empty()) {
+ logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
+ conn, conn.out_seq, seq);
+ conn.out_seq = seq;
+ return;
+ }
+ logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
+ conn, seq, conn.sent.size(), conn.out_seq);
+ while (!conn.sent.empty()) {
+ auto cur_seq = conn.sent.front()->get_seq();
+ if (cur_seq == 0 || cur_seq > seq) {
+ break;
+ } else {
+ conn.sent.pop_front();
+ }
+ }
+ requeue_sent();
+}
+
+void Protocol::reset_write()
+{
+ assert(write_state != write_state_t::open);
+ conn.out_seq = 0;
+ conn.out_q.clear();
+ conn.sent.clear();
+ need_keepalive = false;
+ keepalive_ack = std::nullopt;
+ ack_left = 0;
+}
+
+void Protocol::ack_writes(seq_num_t seq)
+{
+ if (conn.policy.lossy) { // lossy connections don't keep sent messages
+ return;
+ }
+ while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
+ logger().trace("{} got ack seq {} >= {}, pop {}",
+ conn, seq, conn.sent.front()->get_seq(), conn.sent.front());
+ conn.sent.pop_front();
+ }
+}
+
+seastar::future<stop_t> Protocol::try_exit_sweep() {
+ assert(!is_queued());
+ return socket->flush().then([this] {
+ if (!is_queued()) {
+ // still nothing pending to send after flush,
+ // the dispatching can ONLY stop now
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ if (unlikely(exit_open.has_value())) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: nothing queued at {},"
+ " set exit_open",
+ conn, get_state_name(write_state));
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ } else {
+ // something is pending to send during flushing
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+}
+
+seastar::future<> Protocol::do_write_dispatch_sweep()
+{
+ return seastar::repeat([this] {
+ switch (write_state) {
+ case write_state_t::open: {
+ size_t num_msgs = conn.out_q.size();
+ bool still_queued = is_queued();
+ if (unlikely(!still_queued)) {
+ return try_exit_sweep();
+ }
+ conn.pending_q.clear();
+ conn.pending_q.swap(conn.out_q);
+ if (!conn.policy.lossy) {
+ conn.sent.insert(conn.sent.end(),
+ conn.pending_q.begin(),
+ conn.pending_q.end());
+ }
+ auto acked = ack_left;
+ assert(acked == 0 || conn.in_seq > 0);
+ // sweep all pending writes with the concrete Protocol
+ return socket->write(do_sweep_messages(
+ conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
+ ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
+ need_keepalive = false;
+ if (keepalive_ack == prv_keepalive_ack) {
+ keepalive_ack = std::nullopt;
+ }
+ assert(ack_left >= acked);
+ ack_left -= acked;
+ if (!is_queued()) {
+ return try_exit_sweep();
+ } else {
+ // messages were enqueued during socket write
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+ }
+ case write_state_t::delay:
+ // delay dispatching writes until open
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: delay and set exit_open ...", conn);
+ } else {
+ logger().info("{} write_event: delay ...", conn);
+ }
+ return state_changed.get_shared_future()
+ .then([] { return stop_t::no; });
+ case write_state_t::drop:
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: dropped and set exit_open", conn);
+ } else {
+ logger().info("{} write_event: dropped", conn);
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_assert(false);
+ }
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset &&
+ e.code() != error::negotiation_failure) {
+ logger().error("{} write_event(): unexpected error at {} -- {}",
+ conn, get_state_name(write_state), e);
+ ceph_abort();
+ }
+ socket->shutdown();
+ if (write_state == write_state_t::open) {
+ logger().info("{} write_event(): fault at {}, going to delay -- {}",
+ conn, get_state_name(write_state), e);
+ write_state = write_state_t::delay;
+ } else {
+ logger().info("{} write_event(): fault at {} -- {}",
+ conn, get_state_name(write_state), e);
+ }
+ return do_write_dispatch_sweep();
+ });
+}
+
+void Protocol::write_event()
+{
+ notify_write();
+ if (write_dispatching) {
+ // already dispatching
+ return;
+ }
+ write_dispatching = true;
+ switch (write_state) {
+ case write_state_t::open:
+ [[fallthrough]];
+ case write_state_t::delay:
+ assert(!gate.is_closed());
+ gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
+ return do_write_dispatch_sweep();
+ });
+ return;
+ case write_state_t::drop:
+ write_dispatching = false;
+ return;
+ default:
+ ceph_assert(false);
+ }
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h
new file mode 100644
index 000000000..dc4e4f2af
--- /dev/null
+++ b/src/crimson/net/Protocol.h
@@ -0,0 +1,173 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "crimson/common/gated.h"
+#include "crimson/common/log.h"
+#include "Fwd.h"
+#include "SocketConnection.h"
+
+namespace crimson::net {
+
+class Protocol {
+ public:
+ enum class proto_t {
+ none,
+ v1,
+ v2
+ };
+
+ Protocol(Protocol&&) = delete;
+ virtual ~Protocol();
+
+ virtual bool is_connected() const = 0;
+
+#ifdef UNIT_TESTS_BUILT
+ bool is_closed_clean = false;
+ bool is_closed() const { return closed; }
+#endif
+
+ // Reentrant closing
+ void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
+ seastar::future<> close_clean(bool dispatch_reset) {
+ close(dispatch_reset);
+ // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
+ // which will otherwise result in deadlock
+ assert(close_ready.valid());
+ return close_ready.get_future();
+ }
+
+ virtual void start_connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) = 0;
+
+ virtual void start_accept(SocketRef&& socket,
+ const entity_addr_t& peer_addr) = 0;
+
+ virtual void print(std::ostream&) const = 0;
+ protected:
+ Protocol(proto_t type,
+ ChainedDispatchers& dispatchers,
+ SocketConnection& conn);
+
+ virtual void trigger_close() = 0;
+
+ virtual ceph::bufferlist do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack) = 0;
+
+ virtual void notify_write() {};
+
+ virtual void on_closed() {}
+
+ public:
+ const proto_t proto_type;
+ SocketRef socket;
+
+ protected:
+ ChainedDispatchers& dispatchers;
+ SocketConnection &conn;
+
+ AuthConnectionMetaRef auth_meta;
+
+ private:
+ bool closed = false;
+ // become valid only after closed == true
+ seastar::shared_future<> close_ready;
+
+// the write state-machine
+ public:
+ seastar::future<> send(MessageRef msg);
+ seastar::future<> keepalive();
+
+// TODO: encapsulate a SessionedSender class
+ protected:
+ // write_state is changed with state atomically, indicating the write
+ // behavior of the according state.
+ enum class write_state_t : uint8_t {
+ none,
+ delay,
+ open,
+ drop
+ };
+
+ static const char* get_state_name(write_state_t state) {
+ uint8_t index = static_cast<uint8_t>(state);
+ static const char *const state_names[] = {"none",
+ "delay",
+ "open",
+ "drop"};
+ assert(index < std::size(state_names));
+ return state_names[index];
+ }
+
+ void set_write_state(const write_state_t& state) {
+ if (write_state == write_state_t::open &&
+ state != write_state_t::open &&
+ write_dispatching) {
+ exit_open = seastar::shared_promise<>();
+ }
+ write_state = state;
+ state_changed.set_value();
+ state_changed = seastar::shared_promise<>();
+ }
+
+ seastar::future<> wait_write_exit() {
+ if (exit_open) {
+ return exit_open->get_shared_future();
+ }
+ return seastar::now();
+ }
+
+ void notify_keepalive_ack(utime_t keepalive_ack);
+
+ void notify_ack();
+
+ void requeue_up_to(seq_num_t seq);
+
+ void requeue_sent();
+
+ void reset_write();
+
+ bool is_queued() const {
+ return (!conn.out_q.empty() ||
+ ack_left > 0 ||
+ need_keepalive ||
+ keepalive_ack.has_value());
+ }
+
+ void ack_writes(seq_num_t seq);
+ crimson::common::Gated gate;
+
+ private:
+ write_state_t write_state = write_state_t::none;
+ // wait until current state changed
+ seastar::shared_promise<> state_changed;
+
+ bool need_keepalive = false;
+ std::optional<utime_t> keepalive_ack = std::nullopt;
+ uint64_t ack_left = 0;
+ bool write_dispatching = false;
+ // If another continuation is trying to close or replace socket when
+ // write_dispatching is true and write_state is open,
+ // it needs to wait for exit_open until writing is stopped or failed.
+ std::optional<seastar::shared_promise<>> exit_open;
+
+ seastar::future<stop_t> try_exit_sweep();
+ seastar::future<> do_write_dispatch_sweep();
+ void write_event();
+};
+
+inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
+ proto.print(out);
+ return out;
+}
+
+
+} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc
new file mode 100644
index 000000000..3c604240d
--- /dev/null
+++ b/src/crimson/net/ProtocolV1.cc
@@ -0,0 +1,1014 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ProtocolV1.h"
+
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/net/packet.hh>
+
+#include "include/msgr.h"
+#include "include/random.h"
+#include "auth/Auth.h"
+#include "auth/AuthSessionHandler.h"
+
+#include "crimson/auth/AuthClient.h"
+#include "crimson/auth/AuthServer.h"
+#include "crimson/common/log.h"
+#include "chained_dispatchers.h"
+#include "Errors.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+#include "SocketMessenger.h"
+
+WRITE_RAW_ENCODER(ceph_msg_connect);
+WRITE_RAW_ENCODER(ceph_msg_connect_reply);
+
+using crimson::common::local_conf;
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
+{
+ return out << "connect{features=" << std::hex << c.features << std::dec
+ << " host_type=" << c.host_type
+ << " global_seq=" << c.global_seq
+ << " connect_seq=" << c.connect_seq
+ << " protocol_version=" << c.protocol_version
+ << " authorizer_protocol=" << c.authorizer_protocol
+ << " authorizer_len=" << c.authorizer_len
+ << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
+}
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
+{
+ return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
+ << " features=" << std::hex << r.features << std::dec
+ << " global_seq=" << r.global_seq
+ << " connect_seq=" << r.connect_seq
+ << " protocol_version=" << r.protocol_version
+ << " authorizer_len=" << r.authorizer_len
+ << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
+}
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+template <typename T>
+seastar::net::packet make_static_packet(const T& value) {
+ return { reinterpret_cast<const char*>(&value), sizeof(value) };
+}
+
+// store the banner in a non-const string for buffer::create_static()
+char banner[] = CEPH_BANNER;
+constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
+
+constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
+constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
+
+// check that the buffer starts with a valid banner without requiring it to
+// be contiguous in memory
+void validate_banner(bufferlist::const_iterator& p)
+{
+ auto b = std::cbegin(banner);
+ auto end = b + banner_size;
+ while (b != end) {
+ const char *buf{nullptr};
+ auto remaining = std::distance(b, end);
+ auto len = p.get_ptr_and_advance(remaining, &buf);
+ if (!std::equal(buf, buf + len, b)) {
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_connect_banner));
+ }
+ b += len;
+ }
+}
+
+// return a static bufferptr to the given object
+template <typename T>
+bufferptr create_static(T& obj)
+{
+ return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
+}
+
+uint32_t get_proto_version(entity_type_t peer_type, bool connect)
+{
+ constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
+ // see also OSD.h, unlike other connection of simple/async messenger,
+ // crimson msgr is only used by osd
+ constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
+ if (peer_type == my_type) {
+ // internal
+ return CEPH_OSD_PROTOCOL;
+ } else {
+ // public
+ switch (connect ? peer_type : my_type) {
+ case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
+ case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
+ case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
+ default: return 0;
+ }
+ }
+}
+
+void discard_up_to(std::deque<MessageRef>* queue,
+ crimson::net::seq_num_t seq)
+{
+ while (!queue->empty() &&
+ queue->front()->get_seq() < seq) {
+ queue->pop_front();
+ }
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers,
+ SocketConnection& conn,
+ SocketMessenger& messenger)
+ : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {}
+
+ProtocolV1::~ProtocolV1() {}
+
+bool ProtocolV1::is_connected() const
+{
+ return state == state_t::open;
+}
+
+// connecting state
+
+void ProtocolV1::reset_session()
+{
+ conn.out_q = {};
+ conn.sent = {};
+ conn.in_seq = 0;
+ h.connect_seq = 0;
+ if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+ // Set out_seq to a random value, so CRC won't be predictable.
+ // Constant to limit starting sequence number to 2^31. Nothing special
+ // about it, just a big number.
+ constexpr uint64_t SEQ_MASK = 0x7fffffff;
+ conn.out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
+ } else {
+ // previously, seq #'s always started at 0.
+ conn.out_seq = 0;
+ }
+}
+
+seastar::future<stop_t>
+ProtocolV1::handle_connect_reply(msgr_tag_t tag)
+{
+ if (h.auth_payload.length() && !conn.peer_is_mon()) {
+ if (tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { // more
+ h.auth_more = messenger.get_auth_client()->handle_auth_reply_more(
+ conn.shared_from_this(), auth_meta, h.auth_payload);
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ } else {
+ int ret = messenger.get_auth_client()->handle_auth_done(
+ conn.shared_from_this(), auth_meta, 0, 0, h.auth_payload);
+ if (ret < 0) {
+ // fault
+ logger().warn("{} AuthClient::handle_auth_done() return {}", conn, ret);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ }
+ }
+
+ switch (tag) {
+ case CEPH_MSGR_TAG_FEATURES:
+ logger().error("{} connect protocol feature mispatch", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_BADPROTOVER:
+ logger().error("{} connect protocol version mispatch", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_BADAUTHORIZER:
+ logger().error("{} got bad authorizer", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_RESETSESSION:
+ reset_session();
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ case CEPH_MSGR_TAG_RETRY_GLOBAL:
+ return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) {
+ h.global_seq = gs;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
+ case CEPH_MSGR_TAG_RETRY_SESSION:
+ ceph_assert(h.reply.connect_seq > h.connect_seq);
+ h.connect_seq = h.reply.connect_seq;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ case CEPH_MSGR_TAG_WAIT:
+ // TODO: state wait
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_SEQ:
+ case CEPH_MSGR_TAG_READY:
+ if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features);
+ missing) {
+ logger().error("{} missing required features", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ return seastar::futurize_invoke([this, tag] {
+ if (tag == CEPH_MSGR_TAG_SEQ) {
+ return socket->read_exactly(sizeof(seq_num_t))
+ .then([this] (auto buf) {
+ auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+ discard_up_to(&conn.out_q, *acked_seq);
+ return socket->write_flush(make_static_packet(conn.in_seq));
+ });
+ }
+ // tag CEPH_MSGR_TAG_READY
+ return seastar::now();
+ }).then([this] {
+ // hooray!
+ h.peer_global_seq = h.reply.global_seq;
+ conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
+ h.connect_seq++;
+ h.backoff = 0ms;
+ conn.set_features(h.reply.features & h.connect.features);
+ if (auth_meta->authorizer) {
+ session_security.reset(
+ get_auth_session_handler(nullptr,
+ auth_meta->authorizer->protocol,
+ auth_meta->session_key,
+ conn.features));
+ } else {
+ session_security.reset();
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ });
+ break;
+ default:
+ // unknown tag
+ logger().error("{} got unknown tag", __func__, int(tag));
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+}
+
+ceph::bufferlist ProtocolV1::get_auth_payload()
+{
+ // only non-mons connectings to mons use MAuth messages
+ if (conn.peer_is_mon() &&
+ messenger.get_mytype() != CEPH_ENTITY_TYPE_MON) {
+ return {};
+ } else {
+ if (h.auth_more.length()) {
+ logger().info("using augmented (challenge) auth payload");
+ return std::move(h.auth_more);
+ } else {
+ auto [auth_method, preferred_modes, auth_bl] =
+ messenger.get_auth_client()->get_auth_request(
+ conn.shared_from_this(), auth_meta);
+ auth_meta->auth_method = auth_method;
+ return auth_bl;
+ }
+ }
+}
+
+seastar::future<stop_t>
+ProtocolV1::repeat_connect()
+{
+ // encode ceph_msg_connect
+ memset(&h.connect, 0, sizeof(h.connect));
+ h.connect.features = conn.policy.features_supported;
+ h.connect.host_type = messenger.get_myname().type();
+ h.connect.global_seq = h.global_seq;
+ h.connect.connect_seq = h.connect_seq;
+ h.connect.protocol_version = get_proto_version(conn.get_peer_type(), true);
+ // this is fyi, actually, server decides!
+ h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
+
+ ceph_assert(messenger.get_auth_client());
+
+ bufferlist bl;
+ bufferlist auth_bl = get_auth_payload();
+ if (auth_bl.length()) {
+ h.connect.authorizer_protocol = auth_meta->auth_method;
+ h.connect.authorizer_len = auth_bl.length();
+ bl.append(create_static(h.connect));
+ bl.claim_append(auth_bl);
+ } else {
+ h.connect.authorizer_protocol = 0;
+ h.connect.authorizer_len = 0;
+ bl.append(create_static(h.connect));
+ };
+ return socket->write_flush(std::move(bl))
+ .then([this] {
+ // read the reply
+ return socket->read(sizeof(h.reply));
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(h.reply, p);
+ ceph_assert(p.end());
+ return socket->read(h.reply.authorizer_len);
+ }).then([this] (bufferlist bl) {
+ h.auth_payload = std::move(bl);
+ return handle_connect_reply(h.reply.tag);
+ });
+}
+
+void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
+ const entity_name_t& _peer_name)
+{
+ ceph_assert(state == state_t::none);
+ logger().trace("{} trigger connecting, was {}", conn, static_cast<int>(state));
+ state = state_t::connecting;
+ set_write_state(write_state_t::delay);
+
+ ceph_assert(!socket);
+ ceph_assert(!gate.is_closed());
+ conn.peer_addr = _peer_addr;
+ conn.target_addr = _peer_addr;
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ gate.dispatch_in_background("start_connect", *this, [this] {
+ return Socket::connect(conn.peer_addr)
+ .then([this](SocketRef sock) {
+ socket = std::move(sock);
+ if (state != state_t::connecting) {
+ assert(state == state_t::closing);
+ return socket->close().then([] {
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ });
+ }
+ return seastar::now();
+ }).then([this] {
+ return messenger.get_global_seq();
+ }).then([this] (auto gs) {
+ h.global_seq = gs;
+ // read server's handshake header
+ return socket->read(server_header_size);
+ }).then([this] (bufferlist headerbl) {
+ auto p = headerbl.cbegin();
+ validate_banner(p);
+ entity_addr_t saddr, caddr;
+ ::decode(saddr, p);
+ ::decode(caddr, p);
+ ceph_assert(p.end());
+ if (saddr != conn.peer_addr) {
+ logger().error("{} my peer_addr {} doesn't match what peer advertized {}",
+ conn, conn.peer_addr, saddr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (state != state_t::connecting) {
+ assert(state == state_t::closing);
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ }
+ socket->learn_ephemeral_port_as_connector(caddr.get_port());
+ if (unlikely(caddr.is_msgr2())) {
+ logger().warn("{} peer sent a v2 address for me: {}",
+ conn, caddr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ caddr.set_type(entity_addr_t::TYPE_LEGACY);
+ return messenger.learned_addr(caddr, conn);
+ }).then([this] {
+ // encode/send client's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(messenger.get_myaddr(), bl, 0);
+ return socket->write_flush(std::move(bl));
+ }).then([=] {
+ return seastar::repeat([this] {
+ return repeat_connect();
+ });
+ }).then([this] {
+ if (state != state_t::connecting) {
+ assert(state == state_t::closing);
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ }
+ execute_open(open_t::connected);
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // TODO: handle fault in the connecting state
+ logger().warn("{} connecting fault: {}", conn, eptr);
+ close(true);
+ });
+ });
+}
+
+// accepting state
+
+seastar::future<stop_t> ProtocolV1::send_connect_reply(
+ msgr_tag_t tag, bufferlist&& authorizer_reply)
+{
+ h.reply.tag = tag;
+ h.reply.features = static_cast<uint64_t>((h.connect.features &
+ conn.policy.features_supported) |
+ conn.policy.features_required);
+ h.reply.authorizer_len = authorizer_reply.length();
+ return socket->write(make_static_packet(h.reply))
+ .then([this, reply=std::move(authorizer_reply)]() mutable {
+ return socket->write_flush(std::move(reply));
+ }).then([] {
+ return stop_t::no;
+ });
+}
+
+seastar::future<stop_t> ProtocolV1::send_connect_reply_ready(
+ msgr_tag_t tag, bufferlist&& authorizer_reply)
+{
+ return messenger.get_global_seq(
+ ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) {
+ h.global_seq = gs;
+ h.reply.tag = tag;
+ h.reply.features = conn.policy.features_supported;
+ h.reply.global_seq = h.global_seq;
+ h.reply.connect_seq = h.connect_seq;
+ h.reply.flags = 0;
+ if (conn.policy.lossy) {
+ h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+ h.reply.authorizer_len = auth_len;
+
+ session_security.reset(
+ get_auth_session_handler(nullptr,
+ auth_meta->auth_method,
+ auth_meta->session_key,
+ conn.features));
+
+ return socket->write(make_static_packet(h.reply));
+ }).then([this, reply=std::move(authorizer_reply)]() mutable {
+ if (reply.length()) {
+ return socket->write(std::move(reply));
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
+ return socket->write_flush(make_static_packet(conn.in_seq))
+ .then([this] {
+ return socket->read_exactly(sizeof(seq_num_t));
+ }).then([this] (auto buf) {
+ auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+ discard_up_to(&conn.out_q, *acked_seq);
+ });
+ } else {
+ return socket->flush();
+ }
+ }).then([] {
+ return stop_t::yes;
+ });
+}
+
+seastar::future<stop_t> ProtocolV1::replace_existing(
+ SocketConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer)
+{
+ msgr_tag_t reply_tag;
+ if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
+ !is_reset_from_peer) {
+ reply_tag = CEPH_MSGR_TAG_SEQ;
+ } else {
+ reply_tag = CEPH_MSGR_TAG_READY;
+ }
+ if (!existing->is_lossy()) {
+ // XXX: we decided not to support lossless connection in v1. as the
+ // client's default policy is
+ // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+ // lossy. And by the time
+ // will all be performed using v2 protocol.
+ ceph_abort("lossless policy not supported for v1");
+ }
+ existing->protocol->close(true);
+ return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
+}
+
+seastar::future<stop_t> ProtocolV1::handle_connect_with_existing(
+ SocketConnectionRef existing, bufferlist&& authorizer_reply)
+{
+ ProtocolV1 *exproto = dynamic_cast<ProtocolV1*>(existing->protocol.get());
+
+ if (h.connect.global_seq < exproto->peer_global_seq()) {
+ h.reply.global_seq = exproto->peer_global_seq();
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
+ } else if (existing->is_lossy()) {
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) {
+ return replace_existing(existing, std::move(authorizer_reply), true);
+ } else if (h.connect.connect_seq < exproto->connect_seq()) {
+ // old attempt, or we sent READY but they didn't get it.
+ h.reply.connect_seq = exproto->connect_seq() + 1;
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+ } else if (h.connect.connect_seq == exproto->connect_seq()) {
+ // if the existing connection successfully opened, and/or
+ // subsequently went to standby, then the peer should bump
+ // their connect_seq and retry: this is not a connection race
+ // we need to resolve here.
+ if (exproto->get_state() == state_t::open ||
+ exproto->get_state() == state_t::standby) {
+ if (conn.policy.resetcheck && exproto->connect_seq() == 0) {
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else {
+ h.reply.connect_seq = exproto->connect_seq() + 1;
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+ }
+ } else if (existing->peer_wins()) {
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else {
+ return send_connect_reply(CEPH_MSGR_TAG_WAIT);
+ }
+ } else if (conn.policy.resetcheck &&
+ exproto->connect_seq() == 0) {
+ return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
+ } else {
+ return replace_existing(existing, std::move(authorizer_reply));
+ }
+}
+
+bool ProtocolV1::require_auth_feature() const
+{
+ if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
+ return false;
+ }
+ if (local_conf()->cephx_require_signatures) {
+ return true;
+ }
+ if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
+ h.connect.host_type == CEPH_ENTITY_TYPE_MDS ||
+ h.connect.host_type == CEPH_ENTITY_TYPE_MGR) {
+ return local_conf()->cephx_cluster_require_signatures;
+ } else {
+ return local_conf()->cephx_service_require_signatures;
+ }
+}
+
+bool ProtocolV1::require_cephx_v2_feature() const
+{
+ if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
+ return false;
+ }
+ if (local_conf()->cephx_require_version >= 2) {
+ return true;
+ }
+ if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
+ h.connect.host_type == CEPH_ENTITY_TYPE_MDS ||
+ h.connect.host_type == CEPH_ENTITY_TYPE_MGR) {
+ return local_conf()->cephx_cluster_require_version >= 2;
+ } else {
+ return local_conf()->cephx_service_require_version >= 2;
+ }
+}
+
+seastar::future<stop_t> ProtocolV1::repeat_handle_connect()
+{
+ return socket->read(sizeof(h.connect))
+ .then([this](bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(h.connect, p);
+ if (conn.get_peer_type() != 0 &&
+ conn.get_peer_type() != h.connect.host_type) {
+ logger().error("{} repeat_handle_connect(): my peer type does not match"
+ " what peer advertises {} != {}",
+ conn, conn.get_peer_type(), h.connect.host_type);
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ }
+ conn.set_peer_type(h.connect.host_type);
+ conn.policy = messenger.get_policy(h.connect.host_type);
+ if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
+ logger().error("{} we don't know how to reconnect to peer {}",
+ conn, conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ return socket->read(h.connect.authorizer_len);
+ }).then([this] (bufferlist authorizer) {
+ memset(&h.reply, 0, sizeof(h.reply));
+ // TODO: set reply.protocol_version
+ if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
+ return send_connect_reply(
+ CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
+ }
+ if (require_auth_feature()) {
+ conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH;
+ }
+ if (require_cephx_v2_feature()) {
+ conn.policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
+ }
+ if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features;
+ feat_missing != 0) {
+ return send_connect_reply(
+ CEPH_MSGR_TAG_FEATURES, bufferlist{});
+ }
+
+ bufferlist authorizer_reply;
+ auth_meta->auth_method = h.connect.authorizer_protocol;
+ if (!HAVE_FEATURE((uint64_t)h.connect.features, CEPHX_V2)) {
+ // peer doesn't support it and we won't get here if we require it
+ auth_meta->skip_authorizer_challenge = true;
+ }
+ auto more = static_cast<bool>(auth_meta->authorizer_challenge);
+ ceph_assert(messenger.get_auth_server());
+ int r = messenger.get_auth_server()->handle_auth_request(
+ conn.shared_from_this(), auth_meta, more, auth_meta->auth_method, authorizer,
+ &authorizer_reply);
+
+ if (r < 0) {
+ session_security.reset();
+ return send_connect_reply(
+ CEPH_MSGR_TAG_BADAUTHORIZER, std::move(authorizer_reply));
+ } else if (r == 0) {
+ ceph_assert(authorizer_reply.length());
+ return send_connect_reply(
+ CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, std::move(authorizer_reply));
+ }
+
+ // r > 0
+ if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) {
+ if (existing->protocol->proto_type != proto_t::v1) {
+ logger().warn("{} existing {} proto version is {} not 1, close existing",
+ conn, *existing,
+ static_cast<int>(existing->protocol->proto_type));
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
+ existing->mark_down();
+ } else {
+ return handle_connect_with_existing(existing, std::move(authorizer_reply));
+ }
+ }
+ if (h.connect.connect_seq > 0) {
+ return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
+ std::move(authorizer_reply));
+ }
+ h.connect_seq = h.connect.connect_seq + 1;
+ h.peer_global_seq = h.connect.global_seq;
+ conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features);
+ // TODO: cct
+ return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
+ });
+}
+
+void ProtocolV1::start_accept(SocketRef&& sock,
+ const entity_addr_t& _peer_addr)
+{
+ ceph_assert(state == state_t::none);
+ logger().trace("{} trigger accepting, was {}",
+ conn, static_cast<int>(state));
+ state = state_t::accepting;
+ set_write_state(write_state_t::delay);
+
+ ceph_assert(!socket);
+ // until we know better
+ conn.target_addr = _peer_addr;
+ socket = std::move(sock);
+ messenger.accept_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ gate.dispatch_in_background("start_accept", *this, [this] {
+ // stop learning my_addr before sending it out, so it won't change
+ return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
+ // encode/send server's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(messenger.get_myaddr(), bl, 0);
+ ::encode(conn.target_addr, bl, 0);
+ return socket->write_flush(std::move(bl));
+ }).then([this] {
+ // read client's handshake header and connect request
+ return socket->read(client_header_size);
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ validate_banner(p);
+ entity_addr_t addr;
+ ::decode(addr, p);
+ ceph_assert(p.end());
+ if ((addr.is_legacy() || addr.is_any()) &&
+ addr.is_same_host(conn.target_addr)) {
+ // good
+ } else {
+ logger().error("{} peer advertized an invalid peer_addr: {},"
+ " which should be v1 and the same host with {}.",
+ conn, addr, conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ conn.peer_addr = addr;
+ conn.target_addr = conn.peer_addr;
+ return seastar::repeat([this] {
+ return repeat_handle_connect();
+ });
+ }).then([this] {
+ if (state != state_t::accepting) {
+ assert(state == state_t::closing);
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ }
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ execute_open(open_t::accepted);
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // TODO: handle fault in the accepting state
+ logger().warn("{} accepting fault: {}", conn, eptr);
+ close(false);
+ });
+ });
+}
+
+// open state
+
+ceph::bufferlist ProtocolV1::do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> _keepalive_ack,
+ bool require_ack)
+{
+ static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
+ sizeof(ceph_msg_header) +
+ sizeof(ceph_msg_footer);
+ static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) +
+ sizeof(ceph_msg_header) +
+ sizeof(ceph_msg_footer_old);
+
+ ceph::bufferlist bl;
+ if (likely(num_msgs)) {
+ if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+ bl.reserve(num_msgs * RESERVE_MSG_SIZE);
+ } else {
+ bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD);
+ }
+ }
+
+ if (unlikely(require_keepalive)) {
+ k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+ ceph::coarse_real_clock::now());
+ logger().trace("{} write keepalive2 {}", conn, k.req.stamp.tv_sec);
+ bl.append(create_static(k.req));
+ }
+
+ if (unlikely(_keepalive_ack.has_value())) {
+ logger().trace("{} write keepalive2 ack {}", conn, *_keepalive_ack);
+ k.ack.stamp = ceph_timespec(*_keepalive_ack);
+ bl.append(create_static(k.ack));
+ }
+
+ if (require_ack) {
+ // XXX: we decided not to support lossless connection in v1. as the
+ // client's default policy is
+ // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+ // lossy. And by the time of crimson-osd's GA, the in-cluster communication
+ // will all be performed using v2 protocol.
+ ceph_abort("lossless policy not supported for v1");
+ }
+
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ ceph_assert(!msg->get_seq() && "message already has seq");
+ msg->set_seq(++conn.out_seq);
+ auto& header = msg->get_header();
+ header.src = messenger.get_myname();
+ msg->encode(conn.features, messenger.get_crc_flags());
+ if (session_security) {
+ session_security->sign_message(msg.get());
+ }
+ logger().debug("{} --> #{} === {} ({})",
+ conn, msg->get_seq(), *msg, msg->get_type());
+ bl.append(CEPH_MSGR_TAG_MSG);
+ bl.append((const char*)&header, sizeof(header));
+ bl.append(msg->get_payload());
+ bl.append(msg->get_middle());
+ bl.append(msg->get_data());
+ auto& footer = msg->get_footer();
+ if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+ bl.append((const char*)&footer, sizeof(footer));
+ } else {
+ ceph_msg_footer_old old_footer;
+ if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
+ old_footer.front_crc = footer.front_crc;
+ old_footer.middle_crc = footer.middle_crc;
+ } else {
+ old_footer.front_crc = old_footer.middle_crc = 0;
+ }
+ if (messenger.get_crc_flags() & MSG_CRC_DATA) {
+ old_footer.data_crc = footer.data_crc;
+ } else {
+ old_footer.data_crc = 0;
+ }
+ old_footer.flags = footer.flags;
+ bl.append((const char*)&old_footer, sizeof(old_footer));
+ }
+ });
+
+ return bl;
+}
+
+seastar::future<> ProtocolV1::handle_keepalive2_ack()
+{
+ return socket->read_exactly(sizeof(ceph_timespec))
+ .then([this] (auto buf) {
+ auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+ k.ack_stamp = *t;
+ logger().trace("{} got keepalive2 ack {}", conn, t->tv_sec);
+ });
+}
+
+seastar::future<> ProtocolV1::handle_keepalive2()
+{
+ return socket->read_exactly(sizeof(ceph_timespec))
+ .then([this] (auto buf) {
+ utime_t ack{*reinterpret_cast<const ceph_timespec*>(buf.get())};
+ notify_keepalive_ack(ack);
+ });
+}
+
+seastar::future<> ProtocolV1::handle_ack()
+{
+ return socket->read_exactly(sizeof(ceph_le64))
+ .then([this] (auto buf) {
+ auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
+ discard_up_to(&conn.sent, *seq);
+ });
+}
+
+seastar::future<> ProtocolV1::maybe_throttle()
+{
+ if (!conn.policy.throttler_bytes) {
+ return seastar::now();
+ }
+ const auto to_read = (m.header.front_len +
+ m.header.middle_len +
+ m.header.data_len);
+ return conn.policy.throttler_bytes->get(to_read);
+}
+
+seastar::future<> ProtocolV1::read_message()
+{
+ return socket->read(sizeof(m.header))
+ .then([this] (bufferlist bl) {
+ // throttle the traffic, maybe
+ auto p = bl.cbegin();
+ ::decode(m.header, p);
+ return maybe_throttle();
+ }).then([this] {
+ // read front
+ return socket->read(m.header.front_len);
+ }).then([this] (bufferlist bl) {
+ m.front = std::move(bl);
+ // read middle
+ return socket->read(m.header.middle_len);
+ }).then([this] (bufferlist bl) {
+ m.middle = std::move(bl);
+ // read data
+ return socket->read(m.header.data_len);
+ }).then([this] (bufferlist bl) {
+ m.data = std::move(bl);
+ // read footer
+ return socket->read(sizeof(m.footer));
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(m.footer, p);
+ auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this());
+ auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
+ m.front, m.middle, m.data, conn_ref);
+ if (unlikely(!msg)) {
+ logger().warn("{} decode message failed", conn);
+ throw std::system_error{make_error_code(error::corrupted_message)};
+ }
+ constexpr bool add_ref = false; // Message starts with 1 ref
+ // TODO: change MessageRef with foreign_ptr
+ auto msg_ref = MessageRef{msg, add_ref};
+
+ if (session_security) {
+ if (unlikely(session_security->check_message_signature(msg))) {
+ logger().warn("{} message signature check failed", conn);
+ throw std::system_error{make_error_code(error::corrupted_message)};
+ }
+ }
+ // TODO: set time stamps
+ msg->set_byte_throttler(conn.policy.throttler_bytes);
+
+ if (unlikely(!conn.update_rx_seq(msg->get_seq()))) {
+ // skip this message
+ return seastar::now();
+ }
+
+ logger().debug("{} <== #{} === {} ({})",
+ conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type());
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ });
+}
+
+seastar::future<> ProtocolV1::handle_tags()
+{
+ return seastar::keep_doing([this] {
+ // read the next tag
+ return socket->read_exactly(1)
+ .then([this] (auto buf) {
+ switch (buf[0]) {
+ case CEPH_MSGR_TAG_MSG:
+ return read_message();
+ case CEPH_MSGR_TAG_ACK:
+ return handle_ack();
+ case CEPH_MSGR_TAG_KEEPALIVE:
+ return seastar::now();
+ case CEPH_MSGR_TAG_KEEPALIVE2:
+ return handle_keepalive2();
+ case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+ return handle_keepalive2_ack();
+ case CEPH_MSGR_TAG_CLOSE:
+ logger().info("{} got tag close", conn);
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ default:
+ logger().error("{} got unknown msgr tag {}",
+ conn, static_cast<int>(buf[0]));
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ });
+ });
+}
+
+void ProtocolV1::execute_open(open_t type)
+{
+ logger().trace("{} trigger open, was {}", conn, static_cast<int>(state));
+ state = state_t::open;
+ set_write_state(write_state_t::open);
+
+ if (type == open_t::connected) {
+ dispatchers.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ } else { // type == open_t::accepted
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }
+
+ gate.dispatch_in_background("execute_open", *this, [this] {
+ // start background processing of tags
+ return handle_tags()
+ .handle_exception_type([this] (const std::system_error& e) {
+ logger().warn("{} open fault: {}", conn, e);
+ if (e.code() == error::protocol_aborted ||
+ e.code() == std::errc::connection_reset ||
+ e.code() == error::read_eof) {
+ close(true);
+ return seastar::now();
+ } else {
+ throw e;
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // TODO: handle fault in the open state
+ logger().warn("{} open fault: {}", conn, eptr);
+ close(true);
+ });
+ });
+}
+
+// closing state
+
+void ProtocolV1::trigger_close()
+{
+ logger().trace("{} trigger closing, was {}",
+ conn, static_cast<int>(state));
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+
+ if (state == state_t::accepting) {
+ messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else if (state >= state_t::connecting && state < state_t::closing) {
+ messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else {
+ // cannot happen
+ ceph_assert(false);
+ }
+
+ if (!socket) {
+ ceph_assert(state == state_t::connecting);
+ }
+
+ state = state_t::closing;
+}
+
+void ProtocolV1::on_closed()
+{
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+}
+
+seastar::future<> ProtocolV1::fault()
+{
+ if (conn.policy.lossy) {
+ messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ }
+ // XXX: we decided not to support lossless connection in v1. as the
+ // client's default policy is
+ // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+ // lossy. And by the time of crimson-osd's GA, the in-cluster communication
+ // will all be performed using v2 protocol.
+ ceph_abort("lossless policy not supported for v1");
+ return seastar::now();
+}
+
+void ProtocolV1::print(std::ostream& out) const
+{
+ out << conn;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h
new file mode 100644
index 000000000..ed6df8954
--- /dev/null
+++ b/src/crimson/net/ProtocolV1.h
@@ -0,0 +1,137 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "Protocol.h"
+
+class AuthAuthorizer;
+class AuthSessionHandler;
+
+namespace crimson::net {
+
+class ProtocolV1 final : public Protocol {
+ public:
+ ProtocolV1(ChainedDispatchers& dispatchers,
+ SocketConnection& conn,
+ SocketMessenger& messenger);
+ ~ProtocolV1() override;
+ void print(std::ostream&) const final;
+ private:
+ void on_closed() override;
+ bool is_connected() const override;
+
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) override;
+
+ void start_accept(SocketRef&& socket,
+ const entity_addr_t& peer_addr) override;
+
+ void trigger_close() override;
+
+ ceph::bufferlist do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack) override;
+
+ private:
+ SocketMessenger &messenger;
+
+ enum class state_t {
+ none,
+ accepting,
+ connecting,
+ open,
+ standby,
+ wait,
+ closing
+ };
+ state_t state = state_t::none;
+
+ // state for handshake
+ struct Handshake {
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ ceph::bufferlist auth_payload; // auth(orizer) payload read off the wire
+ ceph::bufferlist auth_more; // connect-side auth retry (we added challenge)
+ std::chrono::milliseconds backoff;
+ uint32_t connect_seq = 0;
+ uint32_t peer_global_seq = 0;
+ uint32_t global_seq;
+ } h;
+
+ std::unique_ptr<AuthSessionHandler> session_security;
+
+ // state for an incoming message
+ struct MessageReader {
+ ceph_msg_header header;
+ ceph_msg_footer footer;
+ bufferlist front;
+ bufferlist middle;
+ bufferlist data;
+ } m;
+
+ struct Keepalive {
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
+ ceph_timespec stamp;
+ } __attribute__((packed)) req;
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+ ceph_timespec stamp;
+ } __attribute__((packed)) ack;
+ ceph_timespec ack_stamp;
+ } k;
+
+ private:
+ // connecting
+ void reset_session();
+ seastar::future<stop_t> handle_connect_reply(crimson::net::msgr_tag_t tag);
+ seastar::future<stop_t> repeat_connect();
+ ceph::bufferlist get_auth_payload();
+
+ // accepting
+ seastar::future<stop_t> send_connect_reply(
+ msgr_tag_t tag, bufferlist&& authorizer_reply = {});
+ seastar::future<stop_t> send_connect_reply_ready(
+ msgr_tag_t tag, bufferlist&& authorizer_reply);
+ seastar::future<stop_t> replace_existing(
+ SocketConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer = false);
+ seastar::future<stop_t> handle_connect_with_existing(
+ SocketConnectionRef existing, bufferlist&& authorizer_reply);
+ bool require_auth_feature() const;
+ bool require_cephx_v2_feature() const;
+ seastar::future<stop_t> repeat_handle_connect();
+
+ // open
+ seastar::future<> handle_keepalive2_ack();
+ seastar::future<> handle_keepalive2();
+ seastar::future<> handle_ack();
+ seastar::future<> maybe_throttle();
+ seastar::future<> read_message();
+ seastar::future<> handle_tags();
+
+ enum class open_t {
+ connected,
+ accepted
+ };
+ void execute_open(open_t type);
+
+ // replacing
+ // the number of connections initiated in this session, increment when a
+ // new connection is established
+ uint32_t connect_seq() const { return h.connect_seq; }
+ // the client side should connect us with a gseq. it will be reset with
+ // the one of exsting connection if it's greater.
+ uint32_t peer_global_seq() const { return h.peer_global_seq; }
+ // current state of ProtocolV1
+ state_t get_state() const { return state; }
+
+ seastar::future<> fault();
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc
new file mode 100644
index 000000000..b7137b8b8
--- /dev/null
+++ b/src/crimson/net/ProtocolV2.cc
@@ -0,0 +1,2139 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ProtocolV2.h"
+
+#include <seastar/core/lowres_clock.hh>
+#include <fmt/format.h>
+#include "include/msgr.h"
+#include "include/random.h"
+
+#include "crimson/auth/AuthClient.h"
+#include "crimson/auth/AuthServer.h"
+#include "crimson/common/formatter.h"
+
+#include "chained_dispatchers.h"
+#include "Errors.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+#include "SocketMessenger.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
+namespace {
+
+// TODO: apply the same logging policy to Protocol V1
+// Log levels in V2 Protocol:
+// * error level, something error that cause connection to terminate:
+// - fatal errors;
+// - bugs;
+// * warn level: something unusual that identifies connection fault or replacement:
+// - unstable network;
+// - incompatible peer;
+// - auth failure;
+// - connection race;
+// - connection reset;
+// * info level, something very important to show connection lifecycle,
+// which doesn't happen very frequently;
+// * debug level, important logs for debugging, including:
+// - all the messages sent/received (-->/<==);
+// - all the frames exchanged (WRITE/GOT);
+// - important fields updated (UPDATE);
+// - connection state transitions (TRIGGER);
+// * trace level, trivial logs showing:
+// - the exact bytes being sent/received (SEND/RECV(bytes));
+// - detailed information of sub-frames;
+// - integrity checks;
+// - etc.
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+[[noreturn]] void abort_in_fault() {
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+ throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
+ proto.close(dispatch_reset);
+ abort_protocol();
+}
+
+inline void expect_tag(const Tag& expected,
+ const Tag& actual,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ if (actual != expected) {
+ logger().warn("{} {} received wrong tag: {}, expected {}",
+ conn, where,
+ static_cast<uint32_t>(actual),
+ static_cast<uint32_t>(expected));
+ abort_in_fault();
+ }
+}
+
+inline void unexpected_tag(const Tag& unexpected,
+ crimson::net::SocketConnection& conn,
+ const char *where) {
+ logger().warn("{} {} received unexpected tag: {}",
+ conn, where, static_cast<uint32_t>(unexpected));
+ abort_in_fault();
+}
+
+inline uint64_t generate_client_cookie() {
+ return ceph::util::generate_random_number<uint64_t>(
+ 1, std::numeric_limits<uint64_t>::max());
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+#ifdef UNIT_TESTS_BUILT
+void intercept(Breakpoint bp, bp_type_t type,
+ SocketConnection& conn, SocketRef& socket) {
+ if (conn.interceptor) {
+ auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
+ socket->set_trap(type, action, &conn.interceptor->blocker);
+ }
+}
+
+#define INTERCEPT_CUSTOM(bp, type) \
+intercept({bp}, type, conn, socket)
+
+#define INTERCEPT_FRAME(tag, type) \
+intercept({static_cast<Tag>(tag), type}, \
+ type, conn, socket)
+
+#define INTERCEPT_N_RW(bp) \
+if (conn.interceptor) { \
+ auto action = conn.interceptor->intercept(conn, {bp}); \
+ ceph_assert(action != bp_action_t::BLOCK); \
+ if (action == bp_action_t::FAULT) { \
+ abort_in_fault(); \
+ } \
+}
+
+#else
+#define INTERCEPT_CUSTOM(bp, type)
+#define INTERCEPT_FRAME(tag, type)
+#define INTERCEPT_N_RW(bp)
+#endif
+
+seastar::future<> ProtocolV2::Timer::backoff(double seconds)
+{
+ logger().warn("{} waiting {} seconds ...", conn, seconds);
+ cancel();
+ last_dur_ = seconds;
+ as = seastar::abort_source();
+ auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
+ std::chrono::duration<double>(seconds));
+ return seastar::sleep_abortable(dur, *as
+ ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
+ logger().debug("{} wait aborted", conn);
+ abort_protocol();
+ });
+}
+
+ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
+ SocketConnection& conn,
+ SocketMessenger& messenger)
+ : Protocol(proto_t::v2, dispatchers, conn),
+ messenger{messenger},
+ protocol_timer{conn}
+{}
+
+ProtocolV2::~ProtocolV2() {}
+
+bool ProtocolV2::is_connected() const {
+ return state == state_t::READY ||
+ state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING;
+}
+
+void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
+ const entity_name_t& _peer_name)
+{
+ ceph_assert(state == state_t::NONE);
+ ceph_assert(!socket);
+ ceph_assert(!gate.is_closed());
+ conn.peer_addr = _peer_addr;
+ conn.target_addr = _peer_addr;
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
+ client_cookie = generate_client_cookie();
+ logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
+ " policy(lossy={}, server={}, standby={}, resetcheck={})",
+ conn, _peer_addr, _peer_name, client_cookie,
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ execute_connecting();
+}
+
+void ProtocolV2::start_accept(SocketRef&& sock,
+ const entity_addr_t& _peer_addr)
+{
+ ceph_assert(state == state_t::NONE);
+ ceph_assert(!socket);
+ // until we know better
+ conn.target_addr = _peer_addr;
+ socket = std::move(sock);
+ logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
+ messenger.accept_conn(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ execute_accepting();
+}
+
+// TODO: Frame related implementations, probably to a separate class.
+
+void ProtocolV2::enable_recording()
+{
+ rxbuf.clear();
+ txbuf.clear();
+ record_io = true;
+}
+
+seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
+{
+ if (unlikely(record_io)) {
+ return socket->read_exactly(bytes)
+ .then([this] (auto bl) {
+ rxbuf.append(buffer::create(bl.share()));
+ return bl;
+ });
+ } else {
+ return socket->read_exactly(bytes);
+ };
+}
+
+seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
+{
+ if (unlikely(record_io)) {
+ return socket->read(bytes)
+ .then([this] (auto buf) {
+ rxbuf.append(buf);
+ return buf;
+ });
+ } else {
+ return socket->read(bytes);
+ }
+}
+
+seastar::future<> ProtocolV2::write(bufferlist&& buf)
+{
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return socket->write(std::move(buf));
+}
+
+seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
+{
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return socket->write_flush(std::move(buf));
+}
+
+size_t ProtocolV2::get_current_msg_size() const
+{
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
+ }
+ return sum;
+}
+
+seastar::future<Tag> ProtocolV2::read_main_preamble()
+{
+ rx_preamble.clear();
+ return read_exactly(rx_frame_asm.get_preamble_onwire_len())
+ .then([this] (auto bl) {
+ rx_segments_data.clear();
+ try {
+ rx_preamble.append(buffer::create(std::move(bl)));
+ const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+ INTERCEPT_FRAME(tag, bp_type_t::READ);
+ return tag;
+ } catch (FrameError& e) {
+ logger().warn("{} read_main_preamble: {}", conn, e.what());
+ abort_in_fault();
+ }
+ });
+}
+
+seastar::future<> ProtocolV2::read_frame_payload()
+{
+ ceph_assert(rx_segments_data.empty());
+
+ return seastar::do_until(
+ [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
+ [this] {
+ // TODO: create aligned and contiguous buffer from socket
+ const size_t seg_idx = rx_segments_data.size();
+ if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+ alignment != segment_t::DEFAULT_ALIGNMENT) {
+ logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
+ conn, alignment, rx_segments_data.size());
+ }
+ uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+ // TODO: create aligned and contiguous buffer from socket
+ return read_exactly(onwire_len).then([this] (auto tmp_bl) {
+ logger().trace("{} RECV({}) frame segment[{}]",
+ conn, tmp_bl.size(), rx_segments_data.size());
+ bufferlist segment;
+ segment.append(buffer::create(std::move(tmp_bl)));
+ rx_segments_data.emplace_back(std::move(segment));
+ });
+ }
+ ).then([this] {
+ return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
+ }).then([this] (auto bl) {
+ logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
+ bool ok = false;
+ try {
+ rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
+ bufferlist rx_epilogue;
+ rx_epilogue.append(buffer::create(std::move(bl)));
+ ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+ } catch (FrameError& e) {
+ logger().error("read_frame_payload: {} {}", conn, e.what());
+ abort_in_fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ logger().error("read_frame_payload: {} bad auth tag", conn);
+ abort_in_fault();
+ }
+ // we do have a mechanism that allows transmitter to start sending message
+ // and abort after putting entire data field on wire. This will be used by
+ // the kernel client to avoid unnecessary buffering.
+ if (!ok) {
+ // TODO
+ ceph_assert(false);
+ }
+ });
+}
+
+template <class F>
+seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
+{
+ auto bl = frame.get_buffer(tx_frame_asm);
+ const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
+ logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
+ conn, bl.length(), (int)main_preamble->tag,
+ (int)main_preamble->num_segments, main_preamble->crc);
+ INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
+ if (flush) {
+ return write_flush(std::move(bl));
+ } else {
+ return write(std::move(bl));
+ }
+}
+
+void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
+{
+ if (!reentrant && _state == state) {
+ logger().error("{} is not allowed to re-trigger state {}",
+ conn, get_state_name(state));
+ ceph_assert(false);
+ }
+ logger().debug("{} TRIGGER {}, was {}",
+ conn, get_state_name(_state), get_state_name(state));
+ state = _state;
+ set_write_state(_write_state);
+}
+
+void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
+{
+ if (conn.policy.lossy) {
+ logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ close(true);
+ } else if (conn.policy.server ||
+ (conn.policy.standby &&
+ (!is_queued() && conn.sent.empty()))) {
+ logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ execute_standby();
+ } else if (backoff) {
+ logger().info("{} {}: fault at {}, going to WAIT -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ execute_wait(false);
+ } else {
+ logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
+ conn, func_name, get_state_name(state), eptr);
+ execute_connecting();
+ }
+}
+
+void ProtocolV2::reset_session(bool full)
+{
+ server_cookie = 0;
+ connect_seq = 0;
+ conn.in_seq = 0;
+ if (full) {
+ client_cookie = generate_client_cookie();
+ peer_global_seq = 0;
+ reset_write();
+ dispatchers.ms_handle_remote_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }
+}
+
+seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ProtocolV2::banner_exchange(bool is_connect)
+{
+ // 1. prepare and send banner
+ bufferlist banner_payload;
+ encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+ encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
+
+ bufferlist bl;
+ bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
+ auto len_payload = static_cast<uint16_t>(banner_payload.length());
+ encode(len_payload, bl, 0);
+ bl.claim_append(banner_payload);
+ logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
+ "required={}, banner=\"{}\"",
+ conn, bl.length(), len_payload,
+ CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+ CEPH_BANNER_V2_PREFIX);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
+ return write_flush(std::move(bl)).then([this] {
+ // 2. read peer banner
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
+ return read_exactly(banner_len); // or read exactly?
+ }).then([this] (auto bl) {
+ // 3. process peer banner and read banner_payload
+ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ logger().debug("{} RECV({}) banner: \"{}\"",
+ conn, bl.size(),
+ std::string((const char*)bl.get(), banner_prefix_len));
+
+ if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+ if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+ logger().warn("{} peer is using V1 protocol", conn);
+ } else {
+ logger().warn("{} peer sent bad banner", conn);
+ }
+ abort_in_fault();
+ }
+ bl.trim_front(banner_prefix_len);
+
+ uint16_t payload_len;
+ bufferlist buf;
+ buf.append(buffer::create(std::move(bl)));
+ auto ti = buf.cbegin();
+ try {
+ decode(payload_len, ti);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload len failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
+ return read(payload_len);
+ }).then([this, is_connect] (bufferlist bl) {
+ // 4. process peer banner_payload and send HelloFrame
+ auto p = bl.cbegin();
+ uint64_t peer_supported_features;
+ uint64_t peer_required_features;
+ try {
+ decode(peer_supported_features, p);
+ decode(peer_required_features, p);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} RECV({}) banner features: supported={} required={}",
+ conn, bl.length(),
+ peer_supported_features, peer_required_features);
+
+ // Check feature bit compatibility
+ uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+ uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+ if ((required_features & peer_supported_features) != required_features) {
+ logger().error("{} peer does not support all required features"
+ " required={} peer_supported={}",
+ conn, required_features, peer_supported_features);
+ abort_in_close(*this, is_connect);
+ }
+ if ((supported_features & peer_required_features) != peer_required_features) {
+ logger().error("{} we do not support all peer required features"
+ " peer_required={} supported={}",
+ conn, peer_required_features, supported_features);
+ abort_in_close(*this, is_connect);
+ }
+ this->peer_required_features = peer_required_features;
+ if (this->peer_required_features == 0) {
+ this->connection_features = msgr2_required;
+ }
+ const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ tx_frame_asm.set_is_rev1(is_rev1);
+ rx_frame_asm.set_is_rev1(is_rev1);
+
+ auto hello = HelloFrame::Encode(messenger.get_mytype(),
+ conn.target_addr);
+ logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+ conn, ceph_entity_type_name(messenger.get_mytype()),
+ conn.target_addr);
+ return write_frame(hello);
+ }).then([this] {
+ //5. read peer HelloFrame
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::HELLO, tag, conn, __func__);
+ return read_frame_payload();
+ }).then([this] {
+ // 6. process peer HelloFrame
+ auto hello = HelloFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+ conn, ceph_entity_type_name(hello.entity_type()),
+ hello.peer_addr());
+ return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+ std::make_tuple(hello.entity_type(), hello.peer_addr()));
+ });
+}
+
+// CONNECTING state
+
+seastar::future<> ProtocolV2::handle_auth_reply()
+{
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::AUTH_BAD_METHOD:
+ return read_frame_payload().then([this] {
+ // handle_auth_bad_method() logic
+ auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
+ "allowed_methods={}, allowed_modes={}",
+ conn, bad_method.method(), cpp_strerror(bad_method.result()),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_bad_method(
+ conn.shared_from_this(), auth_meta,
+ bad_method.method(), bad_method.result(),
+ bad_method.allowed_methods(), bad_method.allowed_modes());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_bad_method returned {}",
+ conn, r);
+ abort_in_fault();
+ }
+ return client_auth(bad_method.allowed_methods());
+ });
+ case Tag::AUTH_REPLY_MORE:
+ return read_frame_payload().then([this] {
+ // handle_auth_reply_more() logic
+ auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ // let execute_connecting() take care of the thrown exception
+ auto reply = messenger.get_auth_client()->handle_auth_reply_more(
+ conn.shared_from_this(), auth_meta, auth_more.auth_payload());
+ auto more_reply = AuthRequestMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
+ conn, reply.length());
+ return write_frame(more_reply);
+ }).then([this] {
+ return handle_auth_reply();
+ });
+ case Tag::AUTH_DONE:
+ return read_frame_payload().then([this] {
+ // handle_auth_done() logic
+ auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, auth_done.global_id(),
+ ceph_con_mode_name(auth_done.con_mode()),
+ auth_done.auth_payload().length());
+ ceph_assert(messenger.get_auth_client());
+ int r = messenger.get_auth_client()->handle_auth_done(
+ conn.shared_from_this(), auth_meta,
+ auth_done.global_id(),
+ auth_done.con_mode(),
+ auth_done.auth_payload());
+ if (r < 0) {
+ logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
+ abort_in_fault();
+ }
+ auth_meta->con_mode = auth_done.con_mode();
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
+ return finish_auth();
+ });
+ default: {
+ unexpected_tag(tag, conn, __func__);
+ return seastar::now();
+ }
+ }
+ });
+}
+
+seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
+{
+ // send_auth_request() logic
+ ceph_assert(messenger.get_auth_client());
+
+ try {
+ auto [auth_method, preferred_modes, bl] =
+ messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
+ auth_meta->auth_method = auth_method;
+ auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
+ logger().debug("{} WRITE AuthRequestFrame: method={},"
+ " preferred_modes={}, payload_len={}",
+ conn, auth_method, preferred_modes, bl.length());
+ return write_frame(frame).then([this] {
+ return handle_auth_reply();
+ });
+ } catch (const crimson::auth::error& e) {
+ logger().error("{} get_initial_auth_request returned {}", conn, e);
+ abort_in_close(*this, true);
+ return seastar::now();
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::process_wait()
+{
+ return read_frame_payload().then([this] {
+ // handle_wait() logic
+ logger().debug("{} GOT WaitFrame", conn);
+ WaitFrame::Decode(rx_segments_data.back());
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_connect()
+{
+ // send_client_ident() logic
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags |= CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto client_ident = ClientIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ conn.target_addr,
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required, flags,
+ client_cookie);
+
+ logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), conn.target_addr,
+ messenger.get_myname().num(), global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, client_cookie);
+ return write_frame(client_ident).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::IDENT_MISSING_FEATURES:
+ return read_frame_payload().then([this] {
+ // handle_ident_missing_features() logic
+ auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
+ " (client does not support all server features)",
+ conn, ident_missing.features());
+ abort_in_fault();
+ return next_step_t::none;
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SERVER_IDENT:
+ return read_frame_payload().then([this] {
+ // handle_server_ident() logic
+ requeue_sent();
+ auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT ServerIdentFrame:"
+ " addrs={}, gid={}, gs={},"
+ " features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn,
+ server_ident.addrs(), server_ident.gid(),
+ server_ident.global_seq(),
+ server_ident.supported_features(),
+ server_ident.required_features(),
+ server_ident.flags(), server_ident.cookie());
+
+ // is this who we intended to talk to?
+ // be a bit forgiving here, since we may be connecting based on addresses parsed out
+ // of mon_host or something.
+ if (!server_ident.addrs().contains(conn.target_addr)) {
+ logger().warn("{} peer identifies as {}, does not include {}",
+ conn, server_ident.addrs(), conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ server_cookie = server_ident.cookie();
+
+ // TODO: change peer_addr to entity_addrvec_t
+ if (server_ident.addrs().front() != conn.peer_addr) {
+ logger().warn("{} peer advertises as {}, does not match {}",
+ conn, server_ident.addrs(), conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != server_ident.gid()) {
+ logger().error("{} connection peer id ({}) does not match "
+ "what it should be ({}) during connecting, close",
+ conn, server_ident.gid(), conn.get_peer_id());
+ abort_in_close(*this, true);
+ }
+ conn.set_peer_id(server_ident.gid());
+ conn.set_features(server_ident.supported_features() &
+ conn.policy.features_supported);
+ peer_global_seq = server_ident.global_seq();
+
+ bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
+ conn.policy.lossy = lossy;
+ }
+ if (lossy && (connect_seq != 0 || server_cookie != 0)) {
+ logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
+ conn, connect_seq, server_cookie);
+ connect_seq = 0;
+ server_cookie = 0;
+ }
+
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(tag, conn, "post_client_connect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_reconnect()
+{
+ // send_reconnect() logic
+ auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
+ client_cookie,
+ server_cookie,
+ global_seq,
+ connect_seq,
+ conn.in_seq);
+ logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ conn, messenger.get_myaddrs(),
+ client_cookie, server_cookie,
+ global_seq, connect_seq, conn.in_seq);
+ return write_frame(reconnect).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::SESSION_RETRY_GLOBAL:
+ return read_frame_payload().then([this] {
+ // handle_session_retry_global() logic
+ auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT RetryGlobalFrame: gs={}",
+ conn, retry.global_seq());
+ return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
+ global_seq = gs;
+ logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
+ return client_reconnect();
+ });
+ });
+ case Tag::SESSION_RETRY:
+ return read_frame_payload().then([this] {
+ // handle_session_retry() logic
+ auto retry = RetryFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT RetryFrame: cs={}",
+ conn, retry.connect_seq());
+ connect_seq = retry.connect_seq() + 1;
+ logger().warn("{} UPDATE: cs={}", conn, connect_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RESET:
+ return read_frame_payload().then([this] {
+ // handle_session_reset() logic
+ auto reset = ResetFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+ reset_session(reset.full());
+ return client_connect();
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SESSION_RECONNECT_OK:
+ return read_frame_payload().then([this] {
+ // handle_reconnect_ok() logic
+ auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
+ conn, reconnect_ok.msg_seq());
+ requeue_up_to(reconnect_ok.msg_seq());
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ });
+ default: {
+ unexpected_tag(tag, conn, "post_client_reconnect");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ });
+}
+
+void ProtocolV2::execute_connecting()
+{
+ trigger_state(state_t::CONNECTING, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
+ gated_execute("execute_connecting", [this] {
+ return messenger.get_global_seq().then([this] (auto gs) {
+ global_seq = gs;
+ assert(client_cookie != 0);
+ if (!conn.policy.lossy && server_cookie != 0) {
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else { // conn.policy.lossy || server_cookie == 0
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ }
+
+ return wait_write_exit();
+ }).then([this] {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before Socket::connect()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ if (socket) {
+ gate.dispatch_in_background("close_sockect_connecting", *this,
+ [sock = std::move(socket)] () mutable {
+ return sock->close().then([sock = std::move(sock)] {});
+ });
+ }
+ INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
+ return Socket::connect(conn.peer_addr);
+ }).then([this](SocketRef sock) {
+ logger().debug("{} socket connected", conn);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during Socket::connect()",
+ conn, get_state_name(state));
+ return sock->close().then([sock = std::move(sock)] {
+ abort_protocol();
+ });
+ }
+ socket = std::move(sock);
+ return seastar::now();
+ }).then([this] {
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ session_stream_handlers = { nullptr, nullptr };
+ enable_recording();
+ return banner_exchange(true);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ if (conn.get_peer_type() != _peer_type) {
+ logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+ conn, ceph_entity_type_name(conn.get_peer_type()),
+ ceph_entity_type_name(_peer_type));
+ abort_in_close(*this, true);
+ }
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+ if (unlikely(_my_addr_from_peer.is_legacy())) {
+ logger().warn("{} peer sent a legacy address for me: {}",
+ conn, _my_addr_from_peer);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+ return messenger.learned_addr(_my_addr_from_peer, conn);
+ }).then([this] {
+ return client_auth();
+ }).then([this] {
+ if (server_cookie == 0) {
+ ceph_assert(connect_seq == 0);
+ return client_connect();
+ } else {
+ ceph_assert(connect_seq > 0);
+ return client_reconnect();
+ }
+ }).then([this] (next_step_t next) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at the end of execute_connecting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ switch (next) {
+ case next_step_t::ready: {
+ logger().info("{} connected:"
+ " gs={}, pgs={}, cs={}, client_cookie={},"
+ " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq,
+ conn.out_seq, conn.out_q.size());
+ execute_ready(true);
+ break;
+ }
+ case next_step_t::wait: {
+ logger().info("{} execute_connecting(): going to WAIT", conn);
+ execute_wait(true);
+ break;
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::CONNECTING) {
+ logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ return;
+ }
+
+ if (conn.policy.server ||
+ (conn.policy.standby &&
+ (!is_queued() && conn.sent.empty()))) {
+ logger().info("{} execute_connecting(): fault at {} with nothing to send,"
+ " going to STANDBY -- {}",
+ conn, get_state_name(state), eptr);
+ execute_standby();
+ } else {
+ logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
+ conn, get_state_name(state), eptr);
+ execute_wait(false);
+ }
+ });
+ });
+}
+
+// ACCEPTING state
+
+seastar::future<> ProtocolV2::_auth_bad_method(int r)
+{
+ // _auth_bad_method() logic
+ ceph_assert(r < 0);
+ auto [allowed_methods, allowed_modes] =
+ messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
+ auto bad_method = AuthBadMethodFrame::Encode(
+ auth_meta->auth_method, r, allowed_methods, allowed_modes);
+ logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
+ "allowed_methods={}, allowed_modes={})",
+ conn, auth_meta->auth_method, cpp_strerror(r),
+ allowed_methods, allowed_modes);
+ return write_frame(bad_method).then([this] {
+ return server_auth();
+ });
+}
+
+seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+{
+ // _handle_auth_request() logic
+ ceph_assert(messenger.get_auth_server());
+ bufferlist reply;
+ int r = messenger.get_auth_server()->handle_auth_request(
+ conn.shared_from_this(), auth_meta,
+ more, auth_meta->auth_method, auth_payload,
+ &reply);
+ switch (r) {
+ // successful
+ case 1: {
+ auto auth_done = AuthDoneFrame::Encode(
+ conn.peer_global_id, auth_meta->con_mode, reply);
+ logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, conn.peer_global_id,
+ ceph_con_mode_name(auth_meta->con_mode), reply.length());
+ return write_frame(auth_done).then([this] {
+ ceph_assert(auth_meta);
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
+ return finish_auth();
+ });
+ }
+ // auth more
+ case 0: {
+ auto more = AuthReplyMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
+ conn, reply.length());
+ return write_frame(more).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
+ return read_frame_payload();
+ }).then([this] {
+ auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
+ return _handle_auth_request(auth_more.auth_payload(), true);
+ });
+ }
+ case -EBUSY: {
+ logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
+ abort_in_fault();
+ return seastar::now();
+ }
+ default: {
+ logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
+ return _auth_bad_method(r);
+ }
+ }
+}
+
+seastar::future<> ProtocolV2::server_auth()
+{
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
+ return read_frame_payload();
+ }).then([this] {
+ // handle_auth_request() logic
+ auto request = AuthRequestFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
+ " payload_len={}",
+ conn, request.method(), request.preferred_modes(),
+ request.auth_payload().length());
+ auth_meta->auth_method = request.method();
+ auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
+ conn.get_peer_type(), auth_meta->auth_method,
+ request.preferred_modes());
+ if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
+ logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
+ return _auth_bad_method(-EOPNOTSUPP);
+ }
+ return _handle_auth_request(request.auth_payload(), false);
+ });
+}
+
+bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
+{
+ auto my_peer_name = conn.get_peer_name();
+ if (my_peer_name.type() != peer_name.type()) {
+ return false;
+ }
+ if (my_peer_name.num() != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ my_peer_name.num() != peer_name.num()) {
+ return false;
+ }
+ return true;
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_wait()
+{
+ auto wait = WaitFrame::Encode();
+ logger().debug("{} WRITE WaitFrame", conn);
+ return write_frame(wait).then([] {
+ return next_step_t::wait;
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::reuse_connection(
+ ProtocolV2* existing_proto, bool do_reset,
+ bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
+{
+ existing_proto->trigger_replacing(reconnect,
+ do_reset,
+ std::move(socket),
+ std::move(auth_meta),
+ std::move(session_stream_handlers),
+ peer_global_seq,
+ client_cookie,
+ conn.get_peer_name(),
+ connection_features,
+ tx_frame_asm.get_is_rev1(),
+ rx_frame_asm.get_is_rev1(),
+ conn_seq,
+ msg_seq);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_replaced(conn);
+ }
+#endif
+ // close this connection because all the necessary information is delivered
+ // to the exisiting connection, and jump to error handling code to abort the
+ // current state.
+ abort_in_close(*this, false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
+{
+ // handle_existing_connection() logic
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ existing_conn, get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_connect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_connect: racing replace happened while"
+ " replacing existing connection {}, send wait.",
+ conn, *existing_conn);
+ return send_wait();
+ }
+
+ if (existing_proto->peer_global_seq > peer_global_seq) {
+ logger().warn("{} server_connect:"
+ " this is a stale connection, because peer_global_seq({})"
+ " < existing->peer_global_seq({}), close this connection"
+ " in favor of existing connection {}",
+ conn, peer_global_seq,
+ existing_proto->peer_global_seq, *existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_conn->policy.lossy) {
+ // existing connection can be thrown out in favor of this one
+ logger().warn("{} server_connect:"
+ " existing connection {} is a lossy channel. Close existing in favor of"
+ " this connection", conn, *existing_conn);
+ execute_establishing(existing_conn, true);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+
+ if (existing_proto->server_cookie != 0) {
+ if (existing_proto->client_cookie != client_cookie) {
+ // Found previous session
+ // peer has reset and we're going to reuse the existing connection
+ // by replacing the socket
+ logger().warn("{} server_connect:"
+ " found new session (cs={})"
+ " when existing {} is with stale session (cs={}, ss={}),"
+ " peer must have reset",
+ conn, client_cookie,
+ *existing_conn, existing_proto->client_cookie,
+ existing_proto->server_cookie);
+ return reuse_connection(existing_proto, conn.policy.resetcheck);
+ } else {
+ // session establishment interrupted between client_ident and server_ident,
+ // continuing...
+ logger().warn("{} server_connect: found client session with existing {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ } else {
+ // Looks like a connection race: server and client are both connecting to
+ // each other at the same time.
+ if (existing_proto->client_cookie != client_cookie) {
+ if (existing_conn->peer_wins()) {
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and win, reusing existing {}",
+ conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+ return reuse_connection(existing_proto);
+ } else {
+ logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
+ " and lose to existing {}, ask client to wait",
+ conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+ return existing_conn->keepalive().then([this] {
+ return send_wait();
+ });
+ }
+ } else {
+ logger().warn("{} server_connect: found client session with existing {}"
+ " matched (cs={}, ss={}), continuing session establishment",
+ conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+ return reuse_connection(existing_proto);
+ }
+ }
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_connect()
+{
+ return read_frame_payload().then([this] {
+ // handle_client_ident() logic
+ auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
+ " gid={}, gs={}, features_supported={},"
+ " features_required={}, flags={}, cookie={}",
+ conn, client_ident.addrs(), client_ident.target_addr(),
+ client_ident.gid(), client_ident.global_seq(),
+ client_ident.supported_features(),
+ client_ident.required_features(),
+ client_ident.flags(), client_ident.cookie());
+
+ if (client_ident.addrs().empty() ||
+ client_ident.addrs().front() == entity_addr_t()) {
+ logger().warn("{} oops, client_ident.addrs() is empty", conn);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
+ logger().warn("{} peer is trying to reach {} which is not us ({})",
+ conn, client_ident.target_addr(), messenger.get_myaddrs());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ // TODO: change peer_addr to entity_addrvec_t
+ entity_addr_t paddr = client_ident.addrs().front();
+ if ((paddr.is_msgr2() || paddr.is_any()) &&
+ paddr.is_same_host(conn.target_addr)) {
+ // good
+ } else {
+ logger().warn("{} peer's address {} is not v2 or not the same host with {}",
+ conn, paddr, conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ conn.peer_addr = paddr;
+ logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
+ conn.target_addr = conn.peer_addr;
+ if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
+ logger().warn("{} we don't know how to reconnect to peer {}",
+ conn, conn.target_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != client_ident.gid()) {
+ logger().error("{} client_ident peer_id ({}) does not match"
+ " what it should be ({}) during accepting, abort",
+ conn, client_ident.gid(), conn.get_peer_id());
+ abort_in_fault();
+ }
+ conn.set_peer_id(client_ident.gid());
+ client_cookie = client_ident.cookie();
+
+ uint64_t feat_missing =
+ (conn.policy.features_required | msgr2_required) &
+ ~(uint64_t)client_ident.supported_features();
+ if (feat_missing) {
+ auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+ logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
+ conn, feat_missing);
+ return write_frame(ident_missing_features).then([] {
+ return next_step_t::wait;
+ });
+ }
+ connection_features =
+ client_ident.supported_features() & conn.policy.features_supported;
+ logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
+
+ peer_global_seq = client_ident.global_seq();
+
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (existing_conn) {
+ if (existing_conn->protocol->proto_type != proto_t::v2) {
+ logger().warn("{} existing connection {} proto version is {}, close existing",
+ conn, *existing_conn,
+ static_cast<int>(existing_conn->protocol->proto_type));
+ // should unregister the existing from msgr atomically
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
+ execute_establishing(existing_conn, false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ } else {
+ return handle_existing_connection(existing_conn);
+ }
+ } else {
+ execute_establishing(nullptr, true);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
+ }
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::read_reconnect()
+{
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+ return server_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry(uint64_t connect_seq)
+{
+ auto retry = RetryFrame::Encode(connect_seq);
+ logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
+ return write_frame(retry).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry_global(uint64_t global_seq)
+{
+ auto retry = RetryGlobalFrame::Encode(global_seq);
+ logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
+ return write_frame(retry).then([this] {
+ return read_reconnect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_reset(bool full)
+{
+ auto reset = ResetFrame::Encode(full);
+ logger().warn("{} WRITE ResetFrame: full={}", conn, full);
+ return write_frame(reset).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+ return server_connect();
+ });
+}
+
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_reconnect()
+{
+ return read_frame_payload().then([this] {
+ // handle_reconnect() logic
+ auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+
+ logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ conn, reconnect.addrs(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ reconnect.global_seq(), reconnect.connect_seq(),
+ reconnect.msg_seq());
+
+ // can peer_addrs be changed on-the-fly?
+ // TODO: change peer_addr to entity_addrvec_t
+ entity_addr_t paddr = reconnect.addrs().front();
+ if (paddr.is_msgr2() || paddr.is_any()) {
+ // good
+ } else {
+ logger().warn("{} peer's address {} is not v2", conn, paddr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (conn.peer_addr == entity_addr_t()) {
+ conn.peer_addr = paddr;
+ } else if (conn.peer_addr != paddr) {
+ logger().error("{} peer identifies as {}, while conn.peer_addr={},"
+ " reconnect failed",
+ conn, paddr, conn.peer_addr);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ peer_global_seq = reconnect.global_seq();
+
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+
+ if (!existing_conn) {
+ // there is no existing connection therefore cannot reconnect to previous
+ // session
+ logger().warn("{} server_reconnect: no existing connection from address {},"
+ " reseting client", conn, conn.peer_addr);
+ return send_reset(true);
+ }
+
+ if (existing_conn->protocol->proto_type != proto_t::v2) {
+ logger().warn("{} server_reconnect: existing connection {} proto version is {},"
+ "close existing and reset client.",
+ conn, *existing_conn,
+ static_cast<int>(existing_conn->protocol->proto_type));
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
+ existing_conn->mark_down();
+ return send_reset(true);
+ }
+
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, reconnect.connect_seq(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ existing_conn,
+ get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
+
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_reconnect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
+ if (existing_proto->state == state_t::REPLACING) {
+ logger().warn("{} server_reconnect: racing replace happened while "
+ " replacing existing connection {}, retry global.",
+ conn, *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->client_cookie != reconnect.client_cookie()) {
+ logger().warn("{} server_reconnect:"
+ " client_cookie mismatch with existing connection {},"
+ " cc={} rcc={}. I must have reset, reseting client.",
+ conn, *existing_conn,
+ existing_proto->client_cookie, reconnect.client_cookie());
+ return send_reset(conn.policy.resetcheck);
+ } else if (existing_proto->server_cookie == 0) {
+ // this happens when:
+ // - a connects to b
+ // - a sends client_ident
+ // - b gets client_ident, sends server_ident and sets cookie X
+ // - connection fault
+ // - b reconnects to a with cookie X, connect_seq=1
+ // - a has cookie==0
+ logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
+ " server_ident with existing connection {}."
+ " Asking peer to resume session establishment",
+ conn, existing_proto->client_cookie, *existing_conn);
+ return send_reset(false);
+ }
+
+ if (existing_proto->peer_global_seq > reconnect.global_seq()) {
+ logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
+ " with existing connection {},"
+ " ask client to retry global",
+ conn, existing_proto->peer_global_seq,
+ reconnect.global_seq(), *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
+ }
+
+ if (existing_proto->connect_seq > reconnect.connect_seq()) {
+ logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
+ " with existing connection {}, ask client to retry",
+ conn, reconnect.connect_seq(),
+ existing_proto->connect_seq, *existing_conn);
+ return send_retry(existing_proto->connect_seq);
+ } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
+ // reconnect race: both peers are sending reconnect messages
+ if (existing_conn->peer_wins()) {
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and win, reusing existing {}",
+ conn, reconnect.connect_seq(), *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ } else {
+ logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+ " and lose to existing {}, ask client to wait",
+ conn, reconnect.connect_seq(), *existing_conn);
+ return send_wait();
+ }
+ } else { // existing_proto->connect_seq < reconnect.connect_seq()
+ logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
+ " reusing existing {}",
+ conn, existing_proto->connect_seq,
+ reconnect.connect_seq(), *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
+ }
+ });
+}
+
+void ProtocolV2::execute_accepting()
+{
+ trigger_state(state_t::ACCEPTING, write_state_t::none, false);
+ gate.dispatch_in_background("execute_accepting", *this, [this] {
+ return seastar::futurize_invoke([this] {
+ INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ session_stream_handlers = { nullptr, nullptr };
+ enable_recording();
+ return banner_exchange(false);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ ceph_assert(conn.get_peer_type() == 0);
+ conn.set_peer_type(_peer_type);
+
+ conn.policy = messenger.get_policy(_peer_type);
+ logger().info("{} UPDATE: peer_type={},"
+ " policy(lossy={} server={} standby={} resetcheck={})",
+ conn, ceph_entity_type_name(_peer_type),
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
+ logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+ conn, _my_addr_from_peer, messenger.get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ return messenger.learned_addr(_my_addr_from_peer, conn);
+ }).then([this] {
+ return server_auth();
+ }).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::CLIENT_IDENT:
+ return server_connect();
+ case Tag::SESSION_RECONNECT:
+ return server_reconnect();
+ default: {
+ unexpected_tag(tag, conn, "post_server_auth");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready:
+ assert(state != state_t::ACCEPTING);
+ break;
+ case next_step_t::wait:
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} at the end of execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+ execute_server_wait();
+ break;
+ default:
+ ceph_abort("impossible next step");
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), eptr);
+ close(false);
+ });
+ });
+}
+
+// CONNECTING or ACCEPTING state
+
+seastar::future<> ProtocolV2::finish_auth()
+{
+ ceph_assert(auth_meta);
+
+ const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
+ auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
+ auto sig_frame = AuthSignatureFrame::Encode(sig);
+ ceph_assert(record_io);
+ record_io = false;
+ rxbuf.clear();
+ logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
+ return write_frame(sig_frame).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
+ return read_frame_payload();
+ }).then([this] {
+ // handle_auth_signature() logic
+ auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
+
+ const auto actual_tx_sig = auth_meta->session_key.empty() ?
+ sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
+ if (sig_frame.signature() != actual_tx_sig) {
+ logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
+ " sig_frame.signature()={}",
+ conn, actual_tx_sig, sig_frame.signature());
+ abort_in_fault();
+ }
+ txbuf.clear();
+ });
+}
+
+// ESTABLISHING
+
+void ProtocolV2::execute_establishing(
+ SocketConnectionRef existing_conn, bool dispatch_reset) {
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto accept_me = [this] {
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ };
+
+ trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+ if (existing_conn) {
+ existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().warn("{} triggered {} during execute_establishing(), "
+ "the accept event will not be delivered!",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ } else {
+ accept_me();
+ }
+
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+ gated_execute("execute_establishing", [this] {
+ return seastar::futurize_invoke([this] {
+ return send_server_ident();
+ }).then([this] {
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} at the end of execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
+ " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq,
+ conn.out_seq, conn.out_q.size());
+ execute_ready(false);
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::ESTABLISHING) {
+ logger().info("{} execute_establishing() protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ return;
+ }
+ fault(false, "execute_establishing()", eptr);
+ });
+ });
+}
+
+// ESTABLISHING or REPLACING state
+
+seastar::future<>
+ProtocolV2::send_server_ident()
+{
+ // send_server_ident() logic
+
+ // refered to async-conn v2: not assign gs to global_seq
+ return messenger.get_global_seq().then([this] (auto gs) {
+ logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+
+ // this is required for the case when this connection is being replaced
+ requeue_up_to(0);
+ conn.in_seq = 0;
+
+ if (!conn.policy.lossy) {
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
+
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags = flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto server_ident = ServerIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ messenger.get_myname().num(),
+ gs,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags,
+ server_cookie);
+
+ logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+ gs, conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, server_cookie);
+
+ conn.set_features(connection_features);
+
+ return write_frame(server_ident);
+ });
+}
+
+// REPLACING state
+
+void ProtocolV2::trigger_replacing(bool reconnect,
+ bool do_reset,
+ SocketRef&& new_socket,
+ AuthConnectionMetaRef&& new_auth_meta,
+ ceph::crypto::onwire::rxtx_t new_rxtx,
+ uint64_t new_peer_global_seq,
+ uint64_t new_client_cookie,
+ entity_name_t new_peer_name,
+ uint64_t new_conn_features,
+ bool tx_is_rev1,
+ bool rx_is_rev1,
+ uint64_t new_connect_seq,
+ uint64_t new_msg_seq)
+{
+ trigger_state(state_t::REPLACING, write_state_t::delay, false);
+ if (socket) {
+ socket->shutdown();
+ }
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ gate.dispatch_in_background("trigger_replacing", *this,
+ [this,
+ reconnect,
+ do_reset,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ tx_is_rev1, rx_is_rev1,
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ return wait_write_exit().then([this, do_reset] {
+ if (do_reset) {
+ reset_session(true);
+ }
+ protocol_timer.cancel();
+ return execution_done.get_future();
+ }).then([this,
+ reconnect,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ tx_is_rev1, rx_is_rev1,
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ if (unlikely(state != state_t::REPLACING)) {
+ return new_socket->close().then([sock = std::move(new_socket)] {
+ abort_protocol();
+ });
+ }
+
+ if (socket) {
+ gate.dispatch_in_background("close_socket_replacing", *this,
+ [sock = std::move(socket)] () mutable {
+ return sock->close().then([sock = std::move(sock)] {});
+ });
+ }
+ socket = std::move(new_socket);
+ auth_meta = std::move(new_auth_meta);
+ session_stream_handlers = std::move(new_rxtx);
+ record_io = false;
+ peer_global_seq = new_peer_global_seq;
+
+ if (reconnect) {
+ connect_seq = new_connect_seq;
+ // send_reconnect_ok() logic
+ requeue_up_to(new_msg_seq);
+ auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
+ return write_frame(reconnect_ok);
+ } else {
+ client_cookie = new_client_cookie;
+ assert(conn.get_peer_type() == new_peer_name.type());
+ if (conn.get_peer_id() == entity_name_t::NEW) {
+ conn.set_peer_id(new_peer_name.num());
+ }
+ connection_features = new_conn_features;
+ tx_frame_asm.set_is_rev1(tx_is_rev1);
+ rx_frame_asm.set_is_rev1(rx_is_rev1);
+ return send_server_ident();
+ }
+ }).then([this, reconnect] {
+ if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} at the end of trigger_replacing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} replaced ({}):"
+ " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
+ " in_seq={}, out_seq={}, out_q={}",
+ conn, reconnect ? "reconnected" : "connected",
+ global_seq, peer_global_seq, connect_seq, client_cookie,
+ server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
+ execute_ready(false);
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::REPLACING) {
+ logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::CLOSING);
+ return;
+ }
+ fault(true, "trigger_replacing()", eptr);
+ });
+ });
+}
+
+// READY state
+
+ceph::bufferlist ProtocolV2::do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> _keepalive_ack,
+ bool require_ack)
+{
+ ceph::bufferlist bl;
+
+ if (unlikely(require_keepalive)) {
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ bl.append(keepalive_frame.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
+ }
+
+ if (unlikely(_keepalive_ack.has_value())) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
+ bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
+ }
+
+ if (require_ack && !num_msgs) {
+ auto ack_frame = AckFrame::Encode(conn.in_seq);
+ bl.append(ack_frame.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
+ }
+
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ // TODO: move to common code
+ // set priority
+ msg->get_header().src = messenger.get_myname();
+
+ msg->encode(conn.features, 0);
+
+ ceph_assert(!msg->get_seq() && "message already has seq");
+ msg->set_seq(++conn.out_seq);
+
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ init_le32(0), header.data_off,
+ init_le64(conn.in_seq),
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto message = MessageFrame::Encode(header2,
+ msg->get_payload(), msg->get_middle(), msg->get_data());
+ logger().debug("{} --> #{} === {} ({})",
+ conn, msg->get_seq(), *msg, msg->get_type());
+ bl.append(message.get_buffer(tx_frame_asm));
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
+ });
+
+ return bl;
+}
+
+seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
+{
+ return read_frame_payload()
+ .then([this, throttle_stamp] {
+ utime_t recv_stamp{seastar::lowres_system_clock::now()};
+
+ // we need to get the size before std::moving segments data
+ const size_t cur_msg_size = get_current_msg_size();
+ auto msg_frame = MessageFrame::Decode(rx_segments_data);
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = msg_frame.header();
+
+ logger().trace("{} got {} + {} + {} byte message,"
+ " envelope type={} src={} off={} seq={}",
+ conn, msg_frame.front_len(), msg_frame.middle_len(),
+ msg_frame.data_len(), current_header.type, conn.get_peer_name(),
+ current_header.data_off, current_header.seq);
+
+ ceph_msg_header header{current_header.seq,
+ current_header.tid,
+ current_header.type,
+ current_header.priority,
+ current_header.version,
+ init_le32(msg_frame.front_len()),
+ init_le32(msg_frame.middle_len()),
+ init_le32(msg_frame.data_len()),
+ current_header.data_off,
+ conn.get_peer_name(),
+ current_header.compat_version,
+ current_header.reserved,
+ init_le32(0)};
+ ceph_msg_footer footer{init_le32(0), init_le32(0),
+ init_le32(0), init_le64(0), current_header.flags};
+
+ auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this());
+ Message *message = decode_message(nullptr, 0, header, footer,
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+ if (!message) {
+ logger().warn("{} decode message failed", conn);
+ abort_in_fault();
+ }
+
+ // store reservation size in message, so we don't get confused
+ // by messages entering the dispatch queue through other paths.
+ message->set_dispatch_throttle_size(cur_msg_size);
+
+ message->set_throttle_stamp(throttle_stamp);
+ message->set_recv_stamp(recv_stamp);
+ message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
+
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the
+ // client side queueing because messages can't be renumbered, but the (kernel)
+ // client will occasionally pull a message out of the sent queue to send
+ // elsewhere. in that case it doesn't matter if we "got" it or not.
+ uint64_t cur_seq = conn.in_seq;
+ if (message->get_seq() <= cur_seq) {
+ logger().error("{} got old message {} <= {} {}, discarding",
+ conn, message->get_seq(), cur_seq, *message);
+ if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+ local_conf()->ms_die_on_old_message) {
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return seastar::now();
+ } else if (message->get_seq() > cur_seq + 1) {
+ logger().error("{} missed message? skipped from seq {} to {}",
+ conn, cur_seq, message->get_seq());
+ if (local_conf()->ms_die_on_skipped_message) {
+ ceph_assert(0 == "skipped incoming seq");
+ }
+ }
+
+ // note last received message.
+ conn.in_seq = message->get_seq();
+ logger().debug("{} <== #{} === {} ({})",
+ conn, message->get_seq(), *message, message->get_type());
+ notify_ack();
+ ack_writes(current_header.ack_seq);
+
+ // TODO: change MessageRef with seastar::shared_ptr
+ auto msg_ref = MessageRef{message, false};
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ });
+}
+
+void ProtocolV2::execute_ready(bool dispatch_connect)
+{
+ assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
+ trigger_state(state_t::READY, write_state_t::open, false);
+ if (dispatch_connect) {
+ dispatchers.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_ready(conn);
+ }
+#endif
+ gated_execute("execute_ready", [this] {
+ protocol_timer.cancel();
+ return seastar::keep_doing([this] {
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::MESSAGE: {
+ return seastar::futurize_invoke([this] {
+ // throttle_message() logic
+ if (!conn.policy.throttler_messages) {
+ return seastar::now();
+ }
+ // TODO: message throttler
+ ceph_assert(false);
+ return seastar::now();
+ }).then([this] {
+ // throttle_bytes() logic
+ if (!conn.policy.throttler_bytes) {
+ return seastar::now();
+ }
+ size_t cur_msg_size = get_current_msg_size();
+ if (!cur_msg_size) {
+ return seastar::now();
+ }
+ logger().trace("{} wants {} bytes from policy throttler {}/{}",
+ conn, cur_msg_size,
+ conn.policy.throttler_bytes->get_current(),
+ conn.policy.throttler_bytes->get_max());
+ return conn.policy.throttler_bytes->get(cur_msg_size);
+ }).then([this] {
+ // TODO: throttle_dispatch_queue() logic
+ utime_t throttle_stamp{seastar::lowres_system_clock::now()};
+ return read_message(throttle_stamp);
+ });
+ }
+ case Tag::ACK:
+ return read_frame_payload().then([this] {
+ // handle_message_ack() logic
+ auto ack = AckFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
+ ack_writes(ack.seq());
+ });
+ case Tag::KEEPALIVE2:
+ return read_frame_payload().then([this] {
+ // handle_keepalive2() logic
+ auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+ conn, keepalive_frame.timestamp());
+ notify_keepalive_ack(keepalive_frame.timestamp());
+ conn.set_last_keepalive(seastar::lowres_system_clock::now());
+ });
+ case Tag::KEEPALIVE2_ACK:
+ return read_frame_payload().then([this] {
+ // handle_keepalive2_ack() logic
+ auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
+ conn.set_last_keepalive_ack(
+ seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
+ logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
+ conn, conn.last_keepalive_ack);
+ });
+ default: {
+ unexpected_tag(tag, conn, "execute_ready");
+ return seastar::now();
+ }
+ }
+ });
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::READY) {
+ logger().info("{} execute_ready(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ return;
+ }
+ fault(false, "execute_ready()", eptr);
+ });
+ });
+}
+
+// STANDBY state
+
+void ProtocolV2::execute_standby()
+{
+ trigger_state(state_t::STANDBY, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
+}
+
+void ProtocolV2::notify_write()
+{
+ if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
+ logger().info("{} notify_write(): at {}, going to CONNECTING",
+ conn, get_state_name(state));
+ execute_connecting();
+ }
+}
+
+// WAIT state
+
+void ProtocolV2::execute_wait(bool max_backoff)
+{
+ trigger_state(state_t::WAIT, write_state_t::delay, true);
+ if (socket) {
+ socket->shutdown();
+ }
+ gated_execute("execute_wait", [this, max_backoff] {
+ double backoff = protocol_timer.last_dur();
+ if (max_backoff) {
+ backoff = local_conf().get_val<double>("ms_max_backoff");
+ } else if (backoff > 0) {
+ backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
+ } else {
+ backoff = local_conf().get_val<double>("ms_initial_backoff");
+ }
+ return protocol_timer.backoff(backoff).then([this] {
+ if (unlikely(state != state_t::WAIT)) {
+ logger().debug("{} triggered {} at the end of execute_wait()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_wait(): going to CONNECTING", conn);
+ execute_connecting();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().info("{} execute_wait(): protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ });
+ });
+}
+
+// SERVER_WAIT state
+
+void ProtocolV2::execute_server_wait()
+{
+ trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
+ gated_execute("execute_server_wait", [this] {
+ return read_exactly(1).then([this] (auto bl) {
+ logger().warn("{} SERVER_WAIT got read, abort", conn);
+ abort_in_fault();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), eptr);
+ close(false);
+ });
+ });
+}
+
+// CLOSING state
+
+void ProtocolV2::trigger_close()
+{
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+
+ if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
+ messenger.unregister_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ } else {
+ // cannot happen
+ ceph_assert(false);
+ }
+
+ protocol_timer.cancel();
+ trigger_state(state_t::CLOSING, write_state_t::drop, false);
+}
+
+void ProtocolV2::on_closed()
+{
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+}
+
+void ProtocolV2::print(std::ostream& out) const
+{
+ out << conn;
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h
new file mode 100644
index 000000000..be9a22816
--- /dev/null
+++ b/src/crimson/net/ProtocolV2.h
@@ -0,0 +1,225 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/sleep.hh>
+
+#include "Protocol.h"
+#include "msg/async/frames_v2.h"
+#include "msg/async/crypto_onwire.h"
+
+namespace crimson::net {
+
+class ProtocolV2 final : public Protocol {
+ public:
+ ProtocolV2(ChainedDispatchers& dispatchers,
+ SocketConnection& conn,
+ SocketMessenger& messenger);
+ ~ProtocolV2() override;
+ void print(std::ostream&) const final;
+ private:
+ void on_closed() override;
+ bool is_connected() const override;
+
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) override;
+
+ void start_accept(SocketRef&& socket,
+ const entity_addr_t& peer_addr) override;
+
+ void trigger_close() override;
+
+ ceph::bufferlist do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack) override;
+
+ void notify_write() override;
+
+ private:
+ SocketMessenger &messenger;
+
+ enum class state_t {
+ NONE = 0,
+ ACCEPTING,
+ SERVER_WAIT,
+ ESTABLISHING,
+ CONNECTING,
+ READY,
+ STANDBY,
+ WAIT,
+ REPLACING,
+ CLOSING
+ };
+ state_t state = state_t::NONE;
+
+ static const char *get_state_name(state_t state) {
+ const char *const statenames[] = {"NONE",
+ "ACCEPTING",
+ "SERVER_WAIT",
+ "ESTABLISHING",
+ "CONNECTING",
+ "READY",
+ "STANDBY",
+ "WAIT",
+ "REPLACING",
+ "CLOSING"};
+ return statenames[static_cast<int>(state)];
+ }
+
+ void trigger_state(state_t state, write_state_t write_state, bool reentrant);
+
+ uint64_t connection_features = 0;
+ uint64_t peer_required_features = 0;
+
+ uint64_t client_cookie = 0;
+ uint64_t server_cookie = 0;
+ uint64_t global_seq = 0;
+ uint64_t peer_global_seq = 0;
+ uint64_t connect_seq = 0;
+
+ seastar::shared_future<> execution_done = seastar::now();
+
+ template <typename Func>
+ void gated_execute(const char* what, Func&& func) {
+ gate.dispatch_in_background(what, *this, [this, &func] {
+ execution_done = seastar::futurize_invoke(std::forward<Func>(func));
+ return execution_done.get_future();
+ });
+ }
+
+ class Timer {
+ double last_dur_ = 0.0;
+ const SocketConnection& conn;
+ std::optional<seastar::abort_source> as;
+ public:
+ Timer(SocketConnection& conn) : conn(conn) {}
+ double last_dur() const { return last_dur_; }
+ seastar::future<> backoff(double seconds);
+ void cancel() {
+ last_dur_ = 0.0;
+ if (as) {
+ as->request_abort();
+ as = std::nullopt;
+ }
+ }
+ };
+ Timer protocol_timer;
+
+ // TODO: Frame related implementations, probably to a separate class.
+ private:
+ bool record_io = false;
+ ceph::bufferlist rxbuf;
+ ceph::bufferlist txbuf;
+
+ void enable_recording();
+ seastar::future<Socket::tmp_buf> read_exactly(size_t bytes);
+ seastar::future<bufferlist> read(size_t bytes);
+ seastar::future<> write(bufferlist&& buf);
+ seastar::future<> write_flush(bufferlist&& buf);
+
+ ceph::crypto::onwire::rxtx_t session_stream_handlers;
+ ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false};
+ ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false};
+ ceph::bufferlist rx_preamble;
+ ceph::msgr::v2::segment_bls_t rx_segments_data;
+
+ size_t get_current_msg_size() const;
+ seastar::future<ceph::msgr::v2::Tag> read_main_preamble();
+ seastar::future<> read_frame_payload();
+ template <class F>
+ seastar::future<> write_frame(F &frame, bool flush=true);
+
+ private:
+ void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
+ void reset_session(bool full);
+ seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ banner_exchange(bool is_connect);
+
+ enum class next_step_t {
+ ready,
+ wait,
+ none, // protocol should have been aborted or failed
+ };
+
+ // CONNECTING (client)
+ seastar::future<> handle_auth_reply();
+ inline seastar::future<> client_auth() {
+ std::vector<uint32_t> empty;
+ return client_auth(empty);
+ }
+ seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
+
+ seastar::future<next_step_t> process_wait();
+ seastar::future<next_step_t> client_connect();
+ seastar::future<next_step_t> client_reconnect();
+ void execute_connecting();
+
+ // ACCEPTING (server)
+ seastar::future<> _auth_bad_method(int r);
+ seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
+ seastar::future<> server_auth();
+
+ bool validate_peer_name(const entity_name_t& peer_name) const;
+ seastar::future<next_step_t> send_wait();
+ seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
+ bool do_reset=false,
+ bool reconnect=false,
+ uint64_t conn_seq=0,
+ uint64_t msg_seq=0);
+
+ seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn);
+ seastar::future<next_step_t> server_connect();
+
+ seastar::future<next_step_t> read_reconnect();
+ seastar::future<next_step_t> send_retry(uint64_t connect_seq);
+ seastar::future<next_step_t> send_retry_global(uint64_t global_seq);
+ seastar::future<next_step_t> send_reset(bool full);
+ seastar::future<next_step_t> server_reconnect();
+
+ void execute_accepting();
+
+ // CONNECTING/ACCEPTING
+ seastar::future<> finish_auth();
+
+ // ESTABLISHING
+ void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset);
+
+ // ESTABLISHING/REPLACING (server)
+ seastar::future<> send_server_ident();
+
+ // REPLACING (server)
+ void trigger_replacing(bool reconnect,
+ bool do_reset,
+ SocketRef&& new_socket,
+ AuthConnectionMetaRef&& new_auth_meta,
+ ceph::crypto::onwire::rxtx_t new_rxtx,
+ uint64_t new_peer_global_seq,
+ // !reconnect
+ uint64_t new_client_cookie,
+ entity_name_t new_peer_name,
+ uint64_t new_conn_features,
+ bool tx_is_rev1,
+ bool rx_is_rev1,
+ // reconnect
+ uint64_t new_connect_seq,
+ uint64_t new_msg_seq);
+
+ // READY
+ seastar::future<> read_message(utime_t throttle_stamp);
+ void execute_ready(bool dispatch_connect);
+
+ // STANDBY
+ void execute_standby();
+
+ // WAIT
+ void execute_wait(bool max_backoff);
+
+ // SERVER_WAIT
+ void execute_server_wait();
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc
new file mode 100644
index 000000000..8ad106dbd
--- /dev/null
+++ b/src/crimson/net/Socket.cc
@@ -0,0 +1,276 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Socket.h"
+
+#include <seastar/core/when_all.hh>
+
+#include "crimson/common/log.h"
+#include "Errors.h"
+
+namespace crimson::net {
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+// an input_stream consumer that reads buffer segments into a bufferlist up to
+// the given number of remaining bytes
+struct bufferlist_consumer {
+ bufferlist& bl;
+ size_t& remaining;
+
+ bufferlist_consumer(bufferlist& bl, size_t& remaining)
+ : bl(bl), remaining(remaining) {}
+
+ using tmp_buf = seastar::temporary_buffer<char>;
+ using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
+
+ // consume some or all of a buffer segment
+ seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
+ if (remaining >= data.size()) {
+ // consume the whole buffer
+ remaining -= data.size();
+ bl.append(buffer::create_foreign(std::move(data)));
+ if (remaining > 0) {
+ // return none to request more segments
+ return seastar::make_ready_future<consumption_result_type>(
+ seastar::continue_consuming{});
+ } else {
+ // return an empty buffer to singal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type({}));
+ }
+ }
+ if (remaining > 0) {
+ // consume the front
+ bl.append(buffer::create_foreign(data.share(0, remaining)));
+ data.trim_front(remaining);
+ remaining = 0;
+ }
+ // give the rest back to signal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type{std::move(data)});
+ };
+};
+
+} // anonymous namespace
+
+seastar::future<bufferlist> Socket::read(size_t bytes)
+{
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_read).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferlist>();
+ }
+ r.buffer.clear();
+ r.remaining = bytes;
+ return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
+ if (r.remaining) { // throw on short reads
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] (auto buf) {
+ return try_trap_post(next_trap_read
+ ).then([buf = std::move(buf)] () mutable {
+ return std::move(buf);
+ });
+ });
+#endif
+}
+
+seastar::future<seastar::temporary_buffer<char>>
+Socket::read_exactly(size_t bytes) {
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_read).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<seastar::temporary_buffer<char>>();
+ }
+ return in.read_exactly(bytes).then([](auto buf) {
+ if (buf.empty()) {
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ return seastar::make_ready_future<tmp_buf>(std::move(buf));
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] (auto buf) {
+ return try_trap_post(next_trap_read
+ ).then([buf = std::move(buf)] () mutable {
+ return std::move(buf);
+ });
+ });
+#endif
+}
+
+void Socket::shutdown() {
+ socket.shutdown_input();
+ socket.shutdown_output();
+}
+
+static inline seastar::future<>
+close_and_handle_errors(seastar::output_stream<char>& out)
+{
+ return out.close().handle_exception_type([] (const std::system_error& e) {
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset) {
+ logger().error("Socket::close(): unexpected error {}", e);
+ ceph_abort();
+ }
+ // can happen when out is already shutdown, ignore
+ });
+}
+
+seastar::future<> Socket::close() {
+#ifndef NDEBUG
+ ceph_assert(!closed);
+ closed = true;
+#endif
+ return seastar::when_all_succeed(
+ in.close(),
+ close_and_handle_errors(out)
+ ).then_unpack([] {
+ return seastar::make_ready_future<>();
+ }).handle_exception([] (auto eptr) {
+ logger().error("Socket::close(): unexpected exception {}", eptr);
+ ceph_abort();
+ });
+}
+
+#ifdef UNIT_TESTS_BUILT
+seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return blocker->block();
+ case bp_action_t::STALL:
+ trap = action;
+ break;
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ return seastar::make_ready_future<>();
+}
+
+seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::STALL:
+ logger().info("[Test] got STALL and block");
+ shutdown();
+ return blocker->block();
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ return seastar::make_ready_future<>();
+}
+
+void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
+ blocker = blocker_;
+ if (type == bp_type_t::READ) {
+ ceph_assert(next_trap_read == bp_action_t::CONTINUE);
+ next_trap_read = action;
+ } else { // type == bp_type_t::WRITE
+ if (next_trap_write == bp_action_t::CONTINUE) {
+ next_trap_write = action;
+ } else if (next_trap_write == bp_action_t::FAULT) {
+ // do_sweep_messages() may combine multiple write events into one socket write
+ ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
+ } else {
+ ceph_abort();
+ }
+ }
+}
+#endif
+
+FixedCPUServerSocket::listen_ertr::future<>
+FixedCPUServerSocket::listen(entity_addr_t addr)
+{
+ assert(seastar::this_shard_id() == cpu);
+ logger().trace("FixedCPUServerSocket::listen({})...", addr);
+ return container().invoke_on_all([addr] (auto& ss) {
+ ss.addr = addr;
+ seastar::socket_address s_addr(addr.in4_addr());
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ lo.set_fixed_cpu(ss.cpu);
+ ss.listener = seastar::listen(s_addr, lo);
+ }).then([] {
+ return true;
+ }).handle_exception_type([addr] (const std::system_error& e) {
+ if (e.code() == std::errc::address_in_use) {
+ logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
+ } else {
+ logger().error("FixedCPUServerSocket::listen({}): "
+ "got unexpeted error {}", addr, e);
+ ceph_abort();
+ }
+ return false;
+ }).then([] (bool success) -> listen_ertr::future<> {
+ if (success) {
+ return listen_ertr::now();
+ } else {
+ return crimson::ct_error::address_in_use::make();
+ }
+ });
+}
+
+seastar::future<> FixedCPUServerSocket::shutdown()
+{
+ assert(seastar::this_shard_id() == cpu);
+ logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
+ return container().invoke_on_all([] (auto& ss) {
+ if (ss.listener) {
+ ss.listener->abort_accept();
+ }
+ return ss.shutdown_gate.close();
+ }).then([this] {
+ return reset();
+ });
+}
+
+seastar::future<> FixedCPUServerSocket::destroy()
+{
+ assert(seastar::this_shard_id() == cpu);
+ return shutdown().then([this] {
+ // we should only construct/stop shards on #0
+ return container().invoke_on(0, [] (auto& ss) {
+ assert(ss.service);
+ return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
+ });
+ });
+}
+
+seastar::future<FixedCPUServerSocket*> FixedCPUServerSocket::create()
+{
+ auto cpu = seastar::this_shard_id();
+ // we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [cpu] {
+ auto service = std::make_unique<sharded_service_t>();
+ return service->start(cpu, construct_tag{}
+ ).then([service = std::move(service)] () mutable {
+ auto p_shard = service.get();
+ p_shard->local().service = std::move(service);
+ return p_shard;
+ });
+ }).then([] (auto p_shard) {
+ return &p_shard->local();
+ });
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h
new file mode 100644
index 000000000..d39a2517f
--- /dev/null
+++ b/src/crimson/net/Socket.h
@@ -0,0 +1,268 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/net/packet.hh>
+
+#include "include/buffer.h"
+
+#include "crimson/common/log.h"
+#include "Errors.h"
+#include "Fwd.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+namespace crimson::net {
+
+class Socket;
+using SocketRef = std::unique_ptr<Socket>;
+
+class Socket
+{
+ struct construct_tag {};
+
+ public:
+ // if acceptor side, peer is using a different port (ephemeral_port)
+ // if connector side, I'm using a different port (ephemeral_port)
+ enum class side_t {
+ acceptor,
+ connector
+ };
+
+ Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
+ : sid{seastar::this_shard_id()},
+ socket(std::move(_socket)),
+ in(socket.input()),
+ // the default buffer size 8192 is too small that may impact our write
+ // performance. see seastar::net::connected_socket::output()
+ out(socket.output(65536)),
+ side(_side),
+ ephemeral_port(e_port) {}
+
+ ~Socket() {
+#ifndef NDEBUG
+ assert(closed);
+#endif
+ }
+
+ Socket(Socket&& o) = delete;
+
+ static seastar::future<SocketRef>
+ connect(const entity_addr_t& peer_addr) {
+ return seastar::connect(peer_addr.in4_addr()
+ ).then([] (seastar::connected_socket socket) {
+ return std::make_unique<Socket>(
+ std::move(socket), side_t::connector, 0, construct_tag{});
+ });
+ }
+
+ /// read the requested number of bytes into a bufferlist
+ seastar::future<bufferlist> read(size_t bytes);
+ using tmp_buf = seastar::temporary_buffer<char>;
+ using packet = seastar::net::packet;
+ seastar::future<tmp_buf> read_exactly(size_t bytes);
+
+ seastar::future<> write(packet&& buf) {
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
+#endif
+ return out.write(std::move(buf));
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+ }
+ seastar::future<> flush() {
+ return out.flush();
+ }
+ seastar::future<> write_flush(packet&& buf) {
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
+#endif
+ return out.write(std::move(buf)).then([this] { return out.flush(); });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+ }
+
+ // preemptively disable further reads or writes, can only be shutdown once.
+ void shutdown();
+
+ /// Socket can only be closed once.
+ seastar::future<> close();
+
+ // shutdown input_stream only, for tests
+ void force_shutdown_in() {
+ socket.shutdown_input();
+ }
+
+ // shutdown output_stream only, for tests
+ void force_shutdown_out() {
+ socket.shutdown_output();
+ }
+
+ side_t get_side() const {
+ return side;
+ }
+
+ uint16_t get_ephemeral_port() const {
+ return ephemeral_port;
+ }
+
+ // learn my ephemeral_port as connector.
+ // unfortunately, there's no way to identify which port I'm using as
+ // connector with current seastar interface.
+ void learn_ephemeral_port_as_connector(uint16_t port) {
+ assert(side == side_t::connector &&
+ (ephemeral_port == 0 || ephemeral_port == port));
+ ephemeral_port = port;
+ }
+
+ private:
+ const seastar::shard_id sid;
+ seastar::connected_socket socket;
+ seastar::input_stream<char> in;
+ seastar::output_stream<char> out;
+ side_t side;
+ uint16_t ephemeral_port;
+
+#ifndef NDEBUG
+ bool closed = false;
+#endif
+
+ /// buffer state for read()
+ struct {
+ bufferlist buffer;
+ size_t remaining;
+ } r;
+
+#ifdef UNIT_TESTS_BUILT
+ public:
+ void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
+
+ private:
+ bp_action_t next_trap_read = bp_action_t::CONTINUE;
+ bp_action_t next_trap_write = bp_action_t::CONTINUE;
+ socket_blocker* blocker = nullptr;
+ seastar::future<> try_trap_pre(bp_action_t& trap);
+ seastar::future<> try_trap_post(bp_action_t& trap);
+
+#endif
+ friend class FixedCPUServerSocket;
+};
+
+class FixedCPUServerSocket
+ : public seastar::peering_sharded_service<FixedCPUServerSocket> {
+ const seastar::shard_id cpu;
+ entity_addr_t addr;
+ std::optional<seastar::server_socket> listener;
+ seastar::gate shutdown_gate;
+
+ using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+ std::unique_ptr<sharded_service_t> service;
+
+ struct construct_tag {};
+
+ static seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+
+ seastar::future<> reset() {
+ return container().invoke_on_all([] (auto& ss) {
+ assert(ss.shutdown_gate.is_closed());
+ ss.shutdown_gate = seastar::gate();
+ ss.addr = entity_addr_t();
+ ss.listener.reset();
+ });
+ }
+
+public:
+ FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
+ ~FixedCPUServerSocket() {
+ assert(!listener);
+ // detect whether user have called destroy() properly
+ ceph_assert(!service);
+ }
+
+ FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
+ FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
+ FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+
+ using listen_ertr = crimson::errorator<
+ crimson::ct_error::address_in_use // The address is already bound
+ >;
+ listen_ertr::future<> listen(entity_addr_t addr);
+
+ // fn_accept should be a nothrow function of type
+ // seastar::future<>(SocketRef, entity_addr_t)
+ template <typename Func>
+ seastar::future<> accept(Func&& fn_accept) {
+ assert(seastar::this_shard_id() == cpu);
+ logger().trace("FixedCPUServerSocket({})::accept()...", addr);
+ return container().invoke_on_all(
+ [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
+ assert(ss.listener);
+ // gate accepting
+ // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+ // so ignore the returned future
+ std::ignore = seastar::with_gate(ss.shutdown_gate,
+ [&ss, fn_accept = std::move(fn_accept)] () mutable {
+ return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
+ return ss.listener->accept().then(
+ [&ss, fn_accept = std::move(fn_accept)]
+ (seastar::accept_result accept_result) mutable {
+ // assert seastar::listen_options::set_fixed_cpu() works
+ assert(seastar::this_shard_id() == ss.cpu);
+ auto [socket, paddr] = std::move(accept_result);
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ peer_addr.set_type(entity_addr_t::TYPE_ANY);
+ SocketRef _socket = std::make_unique<Socket>(
+ std::move(socket), Socket::side_t::acceptor,
+ peer_addr.get_port(), Socket::construct_tag{});
+ std::ignore = seastar::with_gate(ss.shutdown_gate,
+ [socket = std::move(_socket), peer_addr,
+ &ss, fn_accept = std::move(fn_accept)] () mutable {
+ logger().trace("FixedCPUServerSocket({})::accept(): "
+ "accepted peer {}", ss.addr, peer_addr);
+ return fn_accept(std::move(socket), peer_addr
+ ).handle_exception([&ss, peer_addr] (auto eptr) {
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "fn_accept(s, {}) got unexpected exception {}",
+ ss.addr, peer_addr, eptr);
+ ceph_abort();
+ });
+ });
+ });
+ }).handle_exception_type([&ss] (const std::system_error& e) {
+ if (e.code() == std::errc::connection_aborted ||
+ e.code() == std::errc::invalid_argument) {
+ logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
+ ss.addr, e);
+ } else {
+ throw;
+ }
+ }).handle_exception([&ss] (auto eptr) {
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "got unexpected exception {}", ss.addr, eptr);
+ ceph_abort();
+ });
+ });
+ });
+ }
+
+ seastar::future<> shutdown();
+ seastar::future<> destroy();
+ static seastar::future<FixedCPUServerSocket*> create();
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc
new file mode 100644
index 000000000..623dca32f
--- /dev/null
+++ b/src/crimson/net/SocketConnection.cc
@@ -0,0 +1,150 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "SocketConnection.h"
+
+#include "ProtocolV1.h"
+#include "ProtocolV2.h"
+#include "SocketMessenger.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+using namespace crimson::net;
+using crimson::common::local_conf;
+
+SocketConnection::SocketConnection(SocketMessenger& messenger,
+ ChainedDispatchers& dispatchers,
+ bool is_msgr2)
+ : messenger(messenger)
+{
+ if (is_msgr2) {
+ protocol = std::make_unique<ProtocolV2>(dispatchers, *this, messenger);
+ } else {
+ protocol = std::make_unique<ProtocolV1>(dispatchers, *this, messenger);
+ }
+#ifdef UNIT_TESTS_BUILT
+ if (messenger.interceptor) {
+ interceptor = messenger.interceptor;
+ interceptor->register_conn(*this);
+ }
+#endif
+}
+
+SocketConnection::~SocketConnection() {}
+
+crimson::net::Messenger*
+SocketConnection::get_messenger() const {
+ return &messenger;
+}
+
+bool SocketConnection::is_connected() const
+{
+ assert(seastar::this_shard_id() == shard_id());
+ return protocol->is_connected();
+}
+
+#ifdef UNIT_TESTS_BUILT
+bool SocketConnection::is_closed() const
+{
+ assert(seastar::this_shard_id() == shard_id());
+ return protocol->is_closed();
+}
+
+bool SocketConnection::is_closed_clean() const
+{
+ assert(seastar::this_shard_id() == shard_id());
+ return protocol->is_closed_clean;
+}
+
+#endif
+bool SocketConnection::peer_wins() const
+{
+ return (messenger.get_myaddr() > peer_addr || policy.server);
+}
+
+seastar::future<> SocketConnection::send(MessageRef msg)
+{
+ assert(seastar::this_shard_id() == shard_id());
+ return protocol->send(std::move(msg));
+}
+
+seastar::future<> SocketConnection::keepalive()
+{
+ assert(seastar::this_shard_id() == shard_id());
+ return protocol->keepalive();
+}
+
+void SocketConnection::mark_down()
+{
+ assert(seastar::this_shard_id() == shard_id());
+ protocol->close(false);
+}
+
+bool SocketConnection::update_rx_seq(seq_num_t seq)
+{
+ if (seq <= in_seq) {
+ if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
+ local_conf()->ms_die_on_old_message) {
+ ceph_abort_msg("old msgs despite reconnect_seq feature");
+ }
+ return false;
+ } else if (seq > in_seq + 1) {
+ if (local_conf()->ms_die_on_skipped_message) {
+ ceph_abort_msg("skipped incoming seq");
+ }
+ return false;
+ } else {
+ in_seq = seq;
+ return true;
+ }
+}
+
+void
+SocketConnection::start_connect(const entity_addr_t& _peer_addr,
+ const entity_name_t& _peer_name)
+{
+ protocol->start_connect(_peer_addr, _peer_name);
+}
+
+void
+SocketConnection::start_accept(SocketRef&& sock,
+ const entity_addr_t& _peer_addr)
+{
+ protocol->start_accept(std::move(sock), _peer_addr);
+}
+
+seastar::future<>
+SocketConnection::close_clean(bool dispatch_reset)
+{
+ return protocol->close_clean(dispatch_reset);
+}
+
+seastar::shard_id SocketConnection::shard_id() const {
+ return messenger.shard_id();
+}
+
+void SocketConnection::print(ostream& out) const {
+ messenger.print(out);
+ if (!protocol->socket) {
+ out << " >> " << get_peer_name() << " " << peer_addr;
+ } else if (protocol->socket->get_side() == Socket::side_t::acceptor) {
+ out << " >> " << get_peer_name() << " " << peer_addr
+ << "@" << protocol->socket->get_ephemeral_port();
+ } else { // protocol->socket->get_side() == Socket::side_t::connector
+ out << "@" << protocol->socket->get_ephemeral_port()
+ << " >> " << get_peer_name() << " " << peer_addr;
+ }
+}
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
new file mode 100644
index 000000000..9c977c7cf
--- /dev/null
+++ b/src/crimson/net/SocketConnection.h
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/sharded.hh>
+
+#include "msg/Policy.h"
+#include "crimson/common/throttle.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Socket.h"
+
+namespace crimson::net {
+
+class Protocol;
+class SocketMessenger;
+class SocketConnection;
+using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
+
+class SocketConnection : public Connection {
+ SocketMessenger& messenger;
+ std::unique_ptr<Protocol> protocol;
+
+ ceph::net::Policy<crimson::common::Throttle> policy;
+
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+ /// update the seq num of last received message
+ /// @returns true if the @c seq is valid, and @c in_seq is updated,
+ /// false otherwise.
+ bool update_rx_seq(seq_num_t seq);
+
+ // messages to be resent after connection gets reset
+ std::deque<MessageRef> out_q;
+ std::deque<MessageRef> pending_q;
+ // messages sent, but not yet acked by peer
+ std::deque<MessageRef> sent;
+
+ seastar::shard_id shard_id() const;
+
+ public:
+ SocketConnection(SocketMessenger& messenger,
+ ChainedDispatchers& dispatchers,
+ bool is_msgr2);
+ ~SocketConnection() override;
+
+ Messenger* get_messenger() const override;
+
+ bool is_connected() const override;
+
+#ifdef UNIT_TESTS_BUILT
+ bool is_closed_clean() const override;
+
+ bool is_closed() const override;
+
+ bool peer_wins() const override;
+#else
+ bool peer_wins() const;
+#endif
+
+ seastar::future<> send(MessageRef msg) override;
+
+ seastar::future<> keepalive() override;
+
+ void mark_down() override;
+
+ void print(ostream& out) const override;
+
+ /// start a handshake from the client's perspective,
+ /// only call when SocketConnection first construct
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name);
+ /// start a handshake from the server's perspective,
+ /// only call when SocketConnection first construct
+ void start_accept(SocketRef&& socket,
+ const entity_addr_t& peer_addr);
+
+ seastar::future<> close_clean(bool dispatch_reset);
+
+ bool is_server_side() const {
+ return policy.server;
+ }
+
+ bool is_lossy() const {
+ return policy.lossy;
+ }
+
+ friend class Protocol;
+ friend class ProtocolV1;
+ friend class ProtocolV2;
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc
new file mode 100644
index 000000000..db9421e79
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.cc
@@ -0,0 +1,351 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "SocketMessenger.h"
+
+#include <tuple>
+#include <boost/functional/hash.hpp>
+
+#include "auth/Auth.h"
+#include "Errors.h"
+#include "Socket.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+}
+
+namespace crimson::net {
+
+SocketMessenger::SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce)
+ : Messenger{myname},
+ master_sid{seastar::this_shard_id()},
+ logic_name{logic_name},
+ nonce{nonce}
+{}
+
+seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
+{
+ assert(seastar::this_shard_id() == master_sid);
+ auto my_addrs = addrs;
+ for (auto& addr : my_addrs.v) {
+ addr.nonce = nonce;
+ }
+ return Messenger::set_myaddrs(my_addrs);
+}
+
+SocketMessenger::bind_ertr::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+{
+ assert(seastar::this_shard_id() == master_sid);
+ ceph_assert(addrs.front().get_family() == AF_INET);
+ return set_myaddrs(addrs).then([this] {
+ if (!listener) {
+ return FixedCPUServerSocket::create().then([this] (auto _listener) {
+ listener = _listener;
+ });
+ } else {
+ return seastar::now();
+ }
+ }).then([this] () -> bind_ertr::future<> {
+ const entity_addr_t listen_addr = get_myaddr();
+ logger().debug("{} do_bind: try listen {}...", *this, listen_addr);
+ if (!listener) {
+ logger().warn("{} do_bind: listener doesn't exist", *this);
+ return bind_ertr::now();
+ }
+ return listener->listen(listen_addr);
+ });
+}
+
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::bind(const entity_addrvec_t& addrs)
+{
+ return do_bind(addrs).safe_then([this] {
+ logger().info("{} bind: done", *this);
+ });
+}
+
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::try_bind(const entity_addrvec_t& addrs,
+ uint32_t min_port, uint32_t max_port)
+{
+ auto addr = addrs.front();
+ if (addr.get_port() != 0) {
+ return do_bind(addrs).safe_then([this] {
+ logger().info("{} try_bind: done", *this);
+ });
+ }
+ ceph_assert(min_port <= max_port);
+ return seastar::do_with(uint32_t(min_port),
+ [this, max_port, addr] (auto& port) {
+ return seastar::repeat_until_value([this, max_port, addr, &port] {
+ auto to_bind = addr;
+ to_bind.set_port(port);
+ return do_bind(entity_addrvec_t{to_bind}
+ ).safe_then([this] () -> seastar::future<std::optional<bool>> {
+ logger().info("{} try_bind: done", *this);
+ return seastar::make_ready_future<std::optional<bool>>(
+ std::make_optional<bool>(true));
+ }, bind_ertr::all_same_way([this, max_port, &port]
+ (const std::error_code& e) mutable
+ -> seastar::future<std::optional<bool>> {
+ assert(e == std::errc::address_in_use);
+ logger().trace("{} try_bind: {} already used", *this, port);
+ if (port == max_port) {
+ return seastar::make_ready_future<std::optional<bool>>(
+ std::make_optional<bool>(false));
+ }
+ ++port;
+ return seastar::make_ready_future<std::optional<bool>>();
+ }));
+ }).then([] (bool success) -> bind_ertr::future<> {
+ if (success) {
+ return bind_ertr::now();
+ } else {
+ return crimson::ct_error::address_in_use::make();
+ }
+ });
+ });
+}
+
+seastar::future<> SocketMessenger::start(
+ const dispatchers_t& _dispatchers) {
+ assert(seastar::this_shard_id() == master_sid);
+
+ dispatchers.assign(_dispatchers);
+ if (listener) {
+ // make sure we have already bound to a valid address
+ ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
+ ceph_assert(get_myaddr().get_port() > 0);
+
+ return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
+ assert(seastar::this_shard_id() == master_sid);
+ SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
+ *this, dispatchers, get_myaddr().is_msgr2());
+ conn->start_accept(std::move(socket), peer_addr);
+ return seastar::now();
+ });
+ }
+ return seastar::now();
+}
+
+crimson::net::ConnectionRef
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
+{
+ assert(seastar::this_shard_id() == master_sid);
+
+ // make sure we connect to a valid peer_addr
+ ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
+ ceph_assert(peer_addr.get_port() > 0);
+
+ if (auto found = lookup_conn(peer_addr); found) {
+ logger().debug("{} connect to existing", *found);
+ return found->shared_from_this();
+ }
+ SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
+ *this, dispatchers, peer_addr.is_msgr2());
+ conn->start_connect(peer_addr, peer_name);
+ return conn->shared_from_this();
+}
+
+seastar::future<> SocketMessenger::shutdown()
+{
+ assert(seastar::this_shard_id() == master_sid);
+ return seastar::futurize_invoke([this] {
+ assert(dispatchers.empty());
+ if (listener) {
+ auto d_listener = listener;
+ listener = nullptr;
+ return d_listener->destroy();
+ } else {
+ return seastar::now();
+ }
+ // close all connections
+ }).then([this] {
+ return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
+ return conn->close_clean(false);
+ });
+ }).then([this] {
+ ceph_assert(accepting_conns.empty());
+ return seastar::parallel_for_each(connections, [] (auto conn) {
+ return conn.second->close_clean(false);
+ });
+ }).then([this] {
+ return seastar::parallel_for_each(closing_conns, [] (auto conn) {
+ return conn->close_clean(false);
+ });
+ }).then([this] {
+ ceph_assert(connections.empty());
+ shutdown_promise.set_value();
+ });
+}
+
+seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
+{
+ assert(seastar::this_shard_id() == master_sid);
+ if (!need_addr) {
+ if ((!get_myaddr().is_any() &&
+ get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
+ get_myaddr().get_family() != peer_addr_for_me.get_family() ||
+ !get_myaddr().is_same_host(peer_addr_for_me)) {
+ logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ return seastar::now();
+ }
+
+ if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
+ // Not bound
+ entity_addr_t addr = peer_addr_for_me;
+ addr.set_type(entity_addr_t::TYPE_ANY);
+ addr.set_port(0);
+ need_addr = false;
+ return set_myaddrs(entity_addrvec_t{addr}
+ ).then([this, &conn, peer_addr_for_me] {
+ logger().info("{} learned myaddr={} (unbound) from {}",
+ conn, get_myaddr(), peer_addr_for_me);
+ });
+ } else {
+ // Already bound
+ if (!get_myaddr().is_any() &&
+ get_myaddr().get_type() != peer_addr_for_me.get_type()) {
+ logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (get_myaddr().get_family() != peer_addr_for_me.get_family()) {
+ logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ if (get_myaddr().is_blank_ip()) {
+ entity_addr_t addr = peer_addr_for_me;
+ addr.set_type(get_myaddr().get_type());
+ addr.set_port(get_myaddr().get_port());
+ need_addr = false;
+ return set_myaddrs(entity_addrvec_t{addr}
+ ).then([this, &conn, peer_addr_for_me] {
+ logger().info("{} learned myaddr={} (blank IP) from {}",
+ conn, get_myaddr(), peer_addr_for_me);
+ });
+ } else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
+ logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
+ conn, peer_addr_for_me, get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ } else {
+ need_addr = false;
+ return seastar::now();
+ }
+ }
+}
+
+SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
+{
+ return policy_set.get(peer_type);
+}
+
+SocketPolicy SocketMessenger::get_default_policy() const
+{
+ return policy_set.get_default();
+}
+
+void SocketMessenger::set_default_policy(const SocketPolicy& p)
+{
+ policy_set.set_default(p);
+}
+
+void SocketMessenger::set_policy(entity_type_t peer_type,
+ const SocketPolicy& p)
+{
+ policy_set.set(peer_type, p);
+}
+
+void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
+ Throttle* throttle)
+{
+ // only byte throttler is used in OSD
+ policy_set.set_throttlers(peer_type, throttle, nullptr);
+}
+
+crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
+{
+ if (auto found = connections.find(addr);
+ found != connections.end()) {
+ return found->second;
+ } else {
+ return nullptr;
+ }
+}
+
+void SocketMessenger::accept_conn(SocketConnectionRef conn)
+{
+ accepting_conns.insert(conn);
+}
+
+void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
+{
+ accepting_conns.erase(conn);
+}
+
+void SocketMessenger::register_conn(SocketConnectionRef conn)
+{
+ auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
+ std::ignore = i;
+ ceph_assert(added);
+}
+
+void SocketMessenger::unregister_conn(SocketConnectionRef conn)
+{
+ ceph_assert(conn);
+ auto found = connections.find(conn->get_peer_addr());
+ ceph_assert(found != connections.end());
+ ceph_assert(found->second == conn);
+ connections.erase(found);
+}
+
+void SocketMessenger::closing_conn(SocketConnectionRef conn)
+{
+ closing_conns.push_back(conn);
+}
+
+void SocketMessenger::closed_conn(SocketConnectionRef conn)
+{
+ for (auto it = closing_conns.begin();
+ it != closing_conns.end();) {
+ if (*it == conn) {
+ it = closing_conns.erase(it);
+ } else {
+ it++;
+ }
+ }
+}
+
+seastar::future<uint32_t>
+SocketMessenger::get_global_seq(uint32_t old)
+{
+ if (old > global_seq) {
+ global_seq = old;
+ }
+ return seastar::make_ready_future<uint32_t>(++global_seq);
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h
new file mode 100644
index 000000000..44c1d3c21
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.h
@@ -0,0 +1,122 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <map>
+#include <set>
+#include <vector>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "crimson/net/chained_dispatchers.h"
+#include "Messenger.h"
+#include "SocketConnection.h"
+
+namespace crimson::net {
+
+class FixedCPUServerSocket;
+
+class SocketMessenger final : public Messenger {
+ const seastar::shard_id master_sid;
+ seastar::promise<> shutdown_promise;
+
+ FixedCPUServerSocket* listener = nullptr;
+ ChainedDispatchers dispatchers;
+ std::map<entity_addr_t, SocketConnectionRef> connections;
+ std::set<SocketConnectionRef> accepting_conns;
+ std::vector<SocketConnectionRef> closing_conns;
+ ceph::net::PolicySet<Throttle> policy_set;
+ // Distinguish messengers with meaningful names for debugging
+ const std::string logic_name;
+ const uint32_t nonce;
+ // specifying we haven't learned our addr; set false when we find it.
+ bool need_addr = true;
+ uint32_t global_seq = 0;
+ bool started = false;
+
+ bind_ertr::future<> do_bind(const entity_addrvec_t& addr);
+
+ public:
+ SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce);
+ ~SocketMessenger() override { ceph_assert(!listener); }
+
+ seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
+
+ // Messenger interfaces are assumed to be called from its own shard, but its
+ // behavior should be symmetric when called from any shard.
+ bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
+
+ bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) override;
+
+ seastar::future<> start(const dispatchers_t& dispatchers) override;
+
+ ConnectionRef connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) override;
+ // can only wait once
+ seastar::future<> wait() override {
+ assert(seastar::this_shard_id() == master_sid);
+ return shutdown_promise.get_future();
+ }
+
+ void stop() override {
+ dispatchers.clear();
+ }
+
+ bool is_started() const override {
+ return !dispatchers.empty();
+ }
+
+ seastar::future<> shutdown() override;
+
+ void print(ostream& out) const override {
+ out << get_myname()
+ << "(" << logic_name
+ << ") " << get_myaddr();
+ }
+
+ SocketPolicy get_policy(entity_type_t peer_type) const override;
+
+ SocketPolicy get_default_policy() const override;
+
+ void set_default_policy(const SocketPolicy& p) override;
+
+ void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
+
+ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
+
+ public:
+ seastar::future<uint32_t> get_global_seq(uint32_t old=0);
+ seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn);
+
+ SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+ void accept_conn(SocketConnectionRef);
+ void unaccept_conn(SocketConnectionRef);
+ void register_conn(SocketConnectionRef);
+ void unregister_conn(SocketConnectionRef);
+ void closing_conn(SocketConnectionRef);
+ void closed_conn(SocketConnectionRef);
+ seastar::shard_id shard_id() const {
+ assert(seastar::this_shard_id() == master_sid);
+ return master_sid;
+ }
+};
+
+} // namespace crimson::net
diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc
new file mode 100644
index 000000000..b13d40c8f
--- /dev/null
+++ b/src/crimson/net/chained_dispatchers.cc
@@ -0,0 +1,93 @@
+#include "crimson/common/log.h"
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "msg/Message.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+}
+
+namespace crimson::net {
+
+seastar::future<>
+ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
+ MessageRef m) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ auto dispatched = dispatcher->ms_dispatch(conn, m);
+ if (dispatched.has_value()) {
+ return std::move(*dispatched
+ ).handle_exception([conn] (std::exception_ptr eptr) {
+ logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
+ *conn, eptr);
+ ceph_abort();
+ });
+ }
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_dispatch() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+ if (!dispatchers.empty()) {
+ logger().error("ms_dispatch unhandled message {}", *m);
+ }
+ return seastar::now();
+}
+
+void
+ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_accept(conn);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_accept() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
+ try {
+ for(auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_connect(conn);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_connect() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_reset(conn, is_replace);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_reset() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_remote_reset(conn);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_remote_reset() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+}
diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h
new file mode 100644
index 000000000..712b0894b
--- /dev/null
+++ b/src/crimson/net/chained_dispatchers.h
@@ -0,0 +1,36 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "Fwd.h"
+#include "crimson/common/log.h"
+
+namespace crimson::net {
+
+class Dispatcher;
+
+class ChainedDispatchers {
+public:
+ void assign(const dispatchers_t& _dispatchers) {
+ assert(empty());
+ assert(!_dispatchers.empty());
+ dispatchers = _dispatchers;
+ }
+ void clear() {
+ dispatchers.clear();
+ }
+ bool empty() const {
+ return dispatchers.empty();
+ }
+ seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
+ void ms_handle_accept(crimson::net::ConnectionRef conn);
+ void ms_handle_connect(crimson::net::ConnectionRef conn);
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
+ void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
+
+ private:
+ dispatchers_t dispatchers;
+};
+
+}