diff options
Diffstat (limited to '')
-rw-r--r-- | src/crimson/net/Connection.h | 175 | ||||
-rw-r--r-- | src/crimson/net/Dispatcher.h | 46 | ||||
-rw-r--r-- | src/crimson/net/Errors.cc | 51 | ||||
-rw-r--r-- | src/crimson/net/Errors.h | 53 | ||||
-rw-r--r-- | src/crimson/net/Fwd.h | 50 | ||||
-rw-r--r-- | src/crimson/net/Interceptor.h | 165 | ||||
-rw-r--r-- | src/crimson/net/Messenger.cc | 17 | ||||
-rw-r--r-- | src/crimson/net/Messenger.h | 154 | ||||
-rw-r--r-- | src/crimson/net/Protocol.cc | 323 | ||||
-rw-r--r-- | src/crimson/net/Protocol.h | 173 | ||||
-rw-r--r-- | src/crimson/net/ProtocolV1.cc | 1014 | ||||
-rw-r--r-- | src/crimson/net/ProtocolV1.h | 137 | ||||
-rw-r--r-- | src/crimson/net/ProtocolV2.cc | 2139 | ||||
-rw-r--r-- | src/crimson/net/ProtocolV2.h | 225 | ||||
-rw-r--r-- | src/crimson/net/Socket.cc | 276 | ||||
-rw-r--r-- | src/crimson/net/Socket.h | 268 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.cc | 150 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.h | 106 | ||||
-rw-r--r-- | src/crimson/net/SocketMessenger.cc | 351 | ||||
-rw-r--r-- | src/crimson/net/SocketMessenger.h | 122 | ||||
-rw-r--r-- | src/crimson/net/chained_dispatchers.cc | 93 | ||||
-rw-r--r-- | src/crimson/net/chained_dispatchers.h | 36 |
22 files changed, 6124 insertions, 0 deletions
diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h new file mode 100644 index 000000000..6af12692e --- /dev/null +++ b/src/crimson/net/Connection.h @@ -0,0 +1,175 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <queue> +#include <seastar/core/future.hh> +#include <seastar/core/shared_ptr.hh> + +#include "Fwd.h" + +namespace crimson::net { + +#ifdef UNIT_TESTS_BUILT +class Interceptor; +#endif + +using seq_num_t = uint64_t; + +class Connection : public seastar::enable_shared_from_this<Connection> { + entity_name_t peer_name = {0, entity_name_t::NEW}; + + protected: + entity_addr_t peer_addr; + + // which of the peer_addrs we're connecting to (as client) + // or should reconnect to (as peer) + entity_addr_t target_addr; + + using clock_t = seastar::lowres_system_clock; + clock_t::time_point last_keepalive; + clock_t::time_point last_keepalive_ack; + + void set_peer_type(entity_type_t peer_type) { + // it is not allowed to assign an unknown value when the current + // value is known + assert(!(peer_type == 0 && + peer_name.type() != 0)); + // it is not allowed to assign a different known value when the + // current value is also known. + assert(!(peer_type != 0 && + peer_name.type() != 0 && + peer_type != peer_name.type())); + peer_name._type = peer_type; + } + void set_peer_id(int64_t peer_id) { + // it is not allowed to assign an unknown value when the current + // value is known + assert(!(peer_id == entity_name_t::NEW && + peer_name.num() != entity_name_t::NEW)); + // it is not allowed to assign a different known value when the + // current value is also known. + assert(!(peer_id != entity_name_t::NEW && + peer_name.num() != entity_name_t::NEW && + peer_id != peer_name.num())); + peer_name._num = peer_id; + } + void set_peer_name(entity_name_t name) { + set_peer_type(name.type()); + set_peer_id(name.num()); + } + + public: + uint64_t peer_global_id = 0; + + protected: + uint64_t features = 0; + + public: + void set_features(uint64_t new_features) { + features = new_features; + } + auto get_features() const { + return features; + } + bool has_feature(uint64_t f) const { + return features & f; + } + + public: + Connection() {} + virtual ~Connection() {} + +#ifdef UNIT_TESTS_BUILT + Interceptor *interceptor = nullptr; +#endif + + virtual Messenger* get_messenger() const = 0; + const entity_addr_t& get_peer_addr() const { return peer_addr; } + const entity_addrvec_t get_peer_addrs() const { + return entity_addrvec_t(peer_addr); + } + const auto& get_peer_socket_addr() const { + return target_addr; + } + const entity_name_t& get_peer_name() const { return peer_name; } + entity_type_t get_peer_type() const { return peer_name.type(); } + int64_t get_peer_id() const { return peer_name.num(); } + + bool peer_is_mon() const { return peer_name.is_mon(); } + bool peer_is_mgr() const { return peer_name.is_mgr(); } + bool peer_is_mds() const { return peer_name.is_mds(); } + bool peer_is_osd() const { return peer_name.is_osd(); } + bool peer_is_client() const { return peer_name.is_client(); } + + /// true if the handshake has completed and no errors have been encountered + virtual bool is_connected() const = 0; + +#ifdef UNIT_TESTS_BUILT + virtual bool is_closed() const = 0; + + virtual bool is_closed_clean() const = 0; + + virtual bool peer_wins() const = 0; +#endif + + /// send a message over a connection that has completed its handshake + virtual seastar::future<> send(MessageRef msg) = 0; + + /// send a keepalive message over a connection that has completed its + /// handshake + virtual seastar::future<> keepalive() = 0; + + // close the connection and cancel any any pending futures from read/send, + // without dispatching any reset event + virtual void mark_down() = 0; + + virtual void print(ostream& out) const = 0; + + void set_last_keepalive(clock_t::time_point when) { + last_keepalive = when; + } + void set_last_keepalive_ack(clock_t::time_point when) { + last_keepalive_ack = when; + } + auto get_last_keepalive() const { return last_keepalive; } + auto get_last_keepalive_ack() const { return last_keepalive_ack; } + + struct user_private_t { + virtual ~user_private_t() = default; + }; +private: + unique_ptr<user_private_t> user_private; +public: + bool has_user_private() const { + return user_private != nullptr; + } + void set_user_private(unique_ptr<user_private_t> new_user_private) { + user_private = std::move(new_user_private); + } + user_private_t &get_user_private() { + ceph_assert(user_private); + return *user_private; + } +}; + +inline ostream& operator<<(ostream& out, const Connection& conn) { + out << "["; + conn.print(out); + out << "]"; + return out; +} + +} // namespace crimson::net diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h new file mode 100644 index 000000000..cc6fd4574 --- /dev/null +++ b/src/crimson/net/Dispatcher.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "Fwd.h" + +class AuthAuthorizer; + +namespace crimson::net { + +class Dispatcher { + public: + virtual ~Dispatcher() {} + + // Dispatchers are put into a chain as described by chain-of-responsibility + // pattern. If any of the dispatchers claims this message, it returns a valid + // future to prevent other dispatchers from processing it, and this is also + // used to throttle the connection if it's too busy. + virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0; + + virtual void ms_handle_accept(ConnectionRef conn) {} + + virtual void ms_handle_connect(ConnectionRef conn) {} + + // a reset event is dispatched when the connection is closed unexpectedly. + // is_replace=true means the reset connection is going to be replaced by + // another accepting connection with the same peer_addr, which currently only + // happens under lossy policy when both sides wish to connect to each other. + virtual void ms_handle_reset(ConnectionRef conn, bool is_replace) {} + + virtual void ms_handle_remote_reset(ConnectionRef conn) {} +}; + +} // namespace crimson::net diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc new file mode 100644 index 000000000..d07c090db --- /dev/null +++ b/src/crimson/net/Errors.cc @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Errors.h" + +namespace crimson::net { + +const std::error_category& net_category() +{ + struct category : public std::error_category { + const char* name() const noexcept override { + return "crimson::net"; + } + + std::string message(int ev) const override { + switch (static_cast<error>(ev)) { + case error::success: + return "success"; + case error::bad_connect_banner: + return "bad connect banner"; + case error::bad_peer_address: + return "bad peer address"; + case error::negotiation_failure: + return "negotiation failure"; + case error::read_eof: + return "read eof"; + case error::corrupted_message: + return "corrupted message"; + case error::protocol_aborted: + return "protocol aborted"; + default: + return "unknown"; + } + } + }; + static category instance; + return instance; +} + +} // namespace crimson::net diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h new file mode 100644 index 000000000..3a17a103a --- /dev/null +++ b/src/crimson/net/Errors.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <system_error> + +namespace crimson::net { + +/// net error codes +enum class error { + success = 0, + bad_connect_banner, + bad_peer_address, + negotiation_failure, + read_eof, + corrupted_message, + protocol_aborted, +}; + +/// net error category +const std::error_category& net_category(); + +inline std::error_code make_error_code(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +inline std::error_condition make_error_condition(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +} // namespace crimson::net + +namespace std { + +/// enables implicit conversion to std::error_condition +template <> +struct is_error_condition_enum<crimson::net::error> : public true_type {}; + +} // namespace std diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h new file mode 100644 index 000000000..e10120571 --- /dev/null +++ b/src/crimson/net/Fwd.h @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <boost/container/small_vector.hpp> +#include <seastar/core/future.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/sharded.hh> + +#include "msg/Connection.h" +#include "msg/MessageRef.h" +#include "msg/msg_types.h" + +#include "crimson/common/errorator.h" + +using auth_proto_t = int; + +class AuthConnectionMeta; +using AuthConnectionMetaRef = seastar::lw_shared_ptr<AuthConnectionMeta>; + +namespace crimson::net { + +using msgr_tag_t = uint8_t; +using stop_t = seastar::stop_iteration; + +class Connection; +using ConnectionRef = seastar::shared_ptr<Connection>; + +class Dispatcher; +class ChainedDispatchers; +constexpr std::size_t NUM_DISPATCHERS = 4u; +using dispatchers_t = boost::container::small_vector<Dispatcher*, NUM_DISPATCHERS>; + +class Messenger; +using MessengerRef = seastar::shared_ptr<Messenger>; + +} // namespace crimson::net diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h new file mode 100644 index 000000000..dfa2183ec --- /dev/null +++ b/src/crimson/net/Interceptor.h @@ -0,0 +1,165 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <variant> +#include <seastar/core/sharded.hh> +#include <seastar/core/sleep.hh> + +#include "Fwd.h" +#include "msg/async/frames_v2.h" + +namespace crimson::net { + +enum class custom_bp_t : uint8_t { + BANNER_WRITE = 0, + BANNER_READ, + BANNER_PAYLOAD_READ, + SOCKET_CONNECTING, + SOCKET_ACCEPTED +}; +inline const char* get_bp_name(custom_bp_t bp) { + uint8_t index = static_cast<uint8_t>(bp); + static const char *const bp_names[] = {"BANNER_WRITE", + "BANNER_READ", + "BANNER_PAYLOAD_READ", + "SOCKET_CONNECTING", + "SOCKET_ACCEPTED"}; + assert(index < std::size(bp_names)); + return bp_names[index]; +} + +enum class bp_type_t { + READ = 0, + WRITE +}; + +enum class bp_action_t { + CONTINUE = 0, + FAULT, + BLOCK, + STALL +}; + +inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) { + static const char *const action_names[] = {"CONTINUE", + "FAULT", + "BLOCK", + "STALL"}; + assert(static_cast<size_t>(action) < std::size(action_names)); + return out << action_names[static_cast<size_t>(action)]; +} + +class socket_blocker { + std::optional<seastar::abort_source> p_blocked; + std::optional<seastar::abort_source> p_unblocked; + + public: + seastar::future<> wait_blocked() { + ceph_assert(!p_blocked); + if (p_unblocked) { + return seastar::make_ready_future<>(); + } else { + p_blocked = seastar::abort_source(); + return seastar::sleep_abortable(10s, *p_blocked).then([] { + throw std::runtime_error( + "Timeout (10s) in socket_blocker::wait_blocked()"); + }).handle_exception_type([] (const seastar::sleep_aborted& e) { + // wait done! + }); + } + } + + seastar::future<> block() { + if (p_blocked) { + p_blocked->request_abort(); + p_blocked = std::nullopt; + } + ceph_assert(!p_unblocked); + p_unblocked = seastar::abort_source(); + return seastar::sleep_abortable(10s, *p_unblocked).then([] { + ceph_abort("Timeout (10s) in socket_blocker::block()"); + }).handle_exception_type([] (const seastar::sleep_aborted& e) { + // wait done! + }); + } + + void unblock() { + ceph_assert(!p_blocked); + ceph_assert(p_unblocked); + p_unblocked->request_abort(); + p_unblocked = std::nullopt; + } +}; + +struct tag_bp_t { + ceph::msgr::v2::Tag tag; + bp_type_t type; + bool operator==(const tag_bp_t& x) const { + return tag == x.tag && type == x.type; + } + bool operator!=(const tag_bp_t& x) const { return !operator==(x); } + bool operator<(const tag_bp_t& x) const { + return std::tie(tag, type) < std::tie(x.tag, x.type); + } +}; + +struct Breakpoint { + using var_t = std::variant<custom_bp_t, tag_bp_t>; + var_t bp; + Breakpoint(custom_bp_t bp) : bp(bp) { } + Breakpoint(ceph::msgr::v2::Tag tag, bp_type_t type) + : bp(tag_bp_t{tag, type}) { } + bool operator==(const Breakpoint& x) const { return bp == x.bp; } + bool operator!=(const Breakpoint& x) const { return !operator==(x); } + bool operator==(const custom_bp_t& x) const { return bp == var_t(x); } + bool operator!=(const custom_bp_t& x) const { return !operator==(x); } + bool operator==(const tag_bp_t& x) const { return bp == var_t(x); } + bool operator!=(const tag_bp_t& x) const { return !operator==(x); } + bool operator<(const Breakpoint& x) const { return bp < x.bp; } +}; + +inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) { + if (auto custom_bp = std::get_if<custom_bp_t>(&bp.bp)) { + return out << get_bp_name(*custom_bp); + } else { + auto tag_bp = std::get<tag_bp_t>(bp.bp); + static const char *const tag_names[] = {"NONE", + "HELLO", + "AUTH_REQUEST", + "AUTH_BAD_METHOD", + "AUTH_REPLY_MORE", + "AUTH_REQUEST_MORE", + "AUTH_DONE", + "AUTH_SIGNATURE", + "CLIENT_IDENT", + "SERVER_IDENT", + "IDENT_MISSING_FEATURES", + "SESSION_RECONNECT", + "SESSION_RESET", + "SESSION_RETRY", + "SESSION_RETRY_GLOBAL", + "SESSION_RECONNECT_OK", + "WAIT", + "MESSAGE", + "KEEPALIVE2", + "KEEPALIVE2_ACK", + "ACK"}; + assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names)); + return out << tag_names[static_cast<size_t>(tag_bp.tag)] + << (tag_bp.type == bp_type_t::WRITE ? "_WRITE" : "_READ"); + } +} + +struct Interceptor { + socket_blocker blocker; + virtual ~Interceptor() {} + virtual void register_conn(Connection& conn) = 0; + virtual void register_conn_ready(Connection& conn) = 0; + virtual void register_conn_closed(Connection& conn) = 0; + virtual void register_conn_replaced(Connection& conn) = 0; + virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0; +}; + +} // namespace crimson::net diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc new file mode 100644 index 000000000..aab476f7a --- /dev/null +++ b/src/crimson/net/Messenger.cc @@ -0,0 +1,17 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Messenger.h" +#include "SocketMessenger.h" + +namespace crimson::net { + +MessengerRef +Messenger::create(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) +{ + return seastar::make_shared<SocketMessenger>(name, lname, nonce); +} + +} // namespace crimson::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h new file mode 100644 index 000000000..2b39fbf63 --- /dev/null +++ b/src/crimson/net/Messenger.h @@ -0,0 +1,154 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "Fwd.h" +#include "crimson/common/throttle.h" +#include "msg/Message.h" +#include "msg/Policy.h" + +class AuthAuthorizer; + +namespace crimson::auth { +class AuthClient; +class AuthServer; +} + +namespace crimson::net { + +#ifdef UNIT_TESTS_BUILT +class Interceptor; +#endif + +using Throttle = crimson::common::Throttle; +using SocketPolicy = ceph::net::Policy<Throttle>; + +class Messenger { + entity_name_t my_name; + entity_addrvec_t my_addrs; + uint32_t crc_flags = 0; + crimson::auth::AuthClient* auth_client = nullptr; + crimson::auth::AuthServer* auth_server = nullptr; + bool require_authorizer = true; + +public: + Messenger(const entity_name_t& name) + : my_name(name) + {} + virtual ~Messenger() {} + +#ifdef UNIT_TESTS_BUILT + Interceptor *interceptor = nullptr; +#endif + + entity_type_t get_mytype() const { return my_name.type(); } + const entity_name_t& get_myname() const { return my_name; } + const entity_addrvec_t& get_myaddrs() const { return my_addrs; } + entity_addr_t get_myaddr() const { return my_addrs.front(); } + virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) { + my_addrs = addrs; + return seastar::now(); + } + + using bind_ertr = crimson::errorator< + crimson::ct_error::address_in_use // The address (range) is already bound + >; + /// bind to the given address + virtual bind_ertr::future<> bind(const entity_addrvec_t& addr) = 0; + + /// try to bind to the first unused port of given address + virtual bind_ertr::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) = 0; + + /// start the messenger + virtual seastar::future<> start(const dispatchers_t&) = 0; + + /// either return an existing connection to the peer, + /// or a new pending connection + virtual ConnectionRef + connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) = 0; + + ConnectionRef + connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) { + return connect(peer_addr, entity_name_t(peer_type, -1)); + } + + // wait for messenger shutdown + virtual seastar::future<> wait() = 0; + + // stop dispatching events and messages + virtual void stop() = 0; + + virtual bool is_started() const = 0; + + // free internal resources before destruction, must be called after stopped, + // and must be called if is bound. + virtual seastar::future<> shutdown() = 0; + + uint32_t get_crc_flags() const { + return crc_flags; + } + void set_crc_data() { + crc_flags |= MSG_CRC_DATA; + } + void set_crc_header() { + crc_flags |= MSG_CRC_HEADER; + } + + crimson::auth::AuthClient* get_auth_client() const { return auth_client; } + void set_auth_client(crimson::auth::AuthClient *ac) { + auth_client = ac; + } + crimson::auth::AuthServer* get_auth_server() const { return auth_server; } + void set_auth_server(crimson::auth::AuthServer *as) { + auth_server = as; + } + + virtual void print(ostream& out) const = 0; + + virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0; + + virtual SocketPolicy get_default_policy() const = 0; + + virtual void set_default_policy(const SocketPolicy& p) = 0; + + virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0; + + virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0; + + // allow unauthenticated connections. This is needed for compatibility with + // pre-nautilus OSDs, which do not authenticate the heartbeat sessions. + bool get_require_authorizer() const { + return require_authorizer; + } + void set_require_authorizer(bool r) { + require_authorizer = r; + } + static MessengerRef + create(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce); +}; + +inline ostream& operator<<(ostream& out, const Messenger& msgr) { + out << "["; + msgr.print(out); + out << "]"; + return out; +} + +} // namespace crimson::net diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc new file mode 100644 index 000000000..50b5c45a3 --- /dev/null +++ b/src/crimson/net/Protocol.cc @@ -0,0 +1,323 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Protocol.h" + +#include "auth/Auth.h" + +#include "crimson/common/log.h" +#include "crimson/net/Errors.h" +#include "crimson/net/chained_dispatchers.h" +#include "crimson/net/Socket.h" +#include "crimson/net/SocketConnection.h" +#include "msg/Message.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); + } +} + +namespace crimson::net { + +Protocol::Protocol(proto_t type, + ChainedDispatchers& dispatchers, + SocketConnection& conn) + : proto_type(type), + dispatchers(dispatchers), + conn(conn), + auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()} +{} + +Protocol::~Protocol() +{ + ceph_assert(gate.is_closed()); + assert(!exit_open); +} + +void Protocol::close(bool dispatch_reset, + std::optional<std::function<void()>> f_accept_new) +{ + if (closed) { + // already closing + return; + } + + bool is_replace = f_accept_new ? true : false; + logger().info("{} closing: reset {}, replace {}", conn, + dispatch_reset ? "yes" : "no", + is_replace ? "yes" : "no"); + + // atomic operations + closed = true; + trigger_close(); + if (f_accept_new) { + (*f_accept_new)(); + } + if (socket) { + socket->shutdown(); + } + set_write_state(write_state_t::drop); + assert(!gate.is_closed()); + auto gate_closed = gate.close(); + + if (dispatch_reset) { + dispatchers.ms_handle_reset( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()), + is_replace); + } + + // asynchronous operations + assert(!close_ready.valid()); + close_ready = std::move(gate_closed).then([this] { + if (socket) { + return socket->close(); + } else { + return seastar::now(); + } + }).then([this] { + logger().debug("{} closed!", conn); + on_closed(); +#ifdef UNIT_TESTS_BUILT + is_closed_clean = true; + if (conn.interceptor) { + conn.interceptor->register_conn_closed(conn); + } +#endif + }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { + logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr); + ceph_abort(); + }); +} + +seastar::future<> Protocol::send(MessageRef msg) +{ + if (write_state != write_state_t::drop) { + conn.out_q.push_back(std::move(msg)); + write_event(); + } + return seastar::now(); +} + +seastar::future<> Protocol::keepalive() +{ + if (!need_keepalive) { + need_keepalive = true; + write_event(); + } + return seastar::now(); +} + +void Protocol::notify_keepalive_ack(utime_t _keepalive_ack) +{ + logger().trace("{} got keepalive ack {}", conn, _keepalive_ack); + keepalive_ack = _keepalive_ack; + write_event(); +} + +void Protocol::notify_ack() +{ + if (!conn.policy.lossy) { + ++ack_left; + write_event(); + } +} + +void Protocol::requeue_sent() +{ + assert(write_state != write_state_t::open); + if (conn.sent.empty()) { + return; + } + + conn.out_seq -= conn.sent.size(); + logger().debug("{} requeue {} items, revert out_seq to {}", + conn, conn.sent.size(), conn.out_seq); + for (MessageRef& msg : conn.sent) { + msg->clear_payload(); + msg->set_seq(0); + } + conn.out_q.insert(conn.out_q.begin(), + std::make_move_iterator(conn.sent.begin()), + std::make_move_iterator(conn.sent.end())); + conn.sent.clear(); + write_event(); +} + +void Protocol::requeue_up_to(seq_num_t seq) +{ + assert(write_state != write_state_t::open); + if (conn.sent.empty() && conn.out_q.empty()) { + logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", + conn, conn.out_seq, seq); + conn.out_seq = seq; + return; + } + logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})", + conn, seq, conn.sent.size(), conn.out_seq); + while (!conn.sent.empty()) { + auto cur_seq = conn.sent.front()->get_seq(); + if (cur_seq == 0 || cur_seq > seq) { + break; + } else { + conn.sent.pop_front(); + } + } + requeue_sent(); +} + +void Protocol::reset_write() +{ + assert(write_state != write_state_t::open); + conn.out_seq = 0; + conn.out_q.clear(); + conn.sent.clear(); + need_keepalive = false; + keepalive_ack = std::nullopt; + ack_left = 0; +} + +void Protocol::ack_writes(seq_num_t seq) +{ + if (conn.policy.lossy) { // lossy connections don't keep sent messages + return; + } + while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) { + logger().trace("{} got ack seq {} >= {}, pop {}", + conn, seq, conn.sent.front()->get_seq(), conn.sent.front()); + conn.sent.pop_front(); + } +} + +seastar::future<stop_t> Protocol::try_exit_sweep() { + assert(!is_queued()); + return socket->flush().then([this] { + if (!is_queued()) { + // still nothing pending to send after flush, + // the dispatching can ONLY stop now + ceph_assert(write_dispatching); + write_dispatching = false; + if (unlikely(exit_open.has_value())) { + exit_open->set_value(); + exit_open = std::nullopt; + logger().info("{} write_event: nothing queued at {}," + " set exit_open", + conn, get_state_name(write_state)); + } + return seastar::make_ready_future<stop_t>(stop_t::yes); + } else { + // something is pending to send during flushing + return seastar::make_ready_future<stop_t>(stop_t::no); + } + }); +} + +seastar::future<> Protocol::do_write_dispatch_sweep() +{ + return seastar::repeat([this] { + switch (write_state) { + case write_state_t::open: { + size_t num_msgs = conn.out_q.size(); + bool still_queued = is_queued(); + if (unlikely(!still_queued)) { + return try_exit_sweep(); + } + conn.pending_q.clear(); + conn.pending_q.swap(conn.out_q); + if (!conn.policy.lossy) { + conn.sent.insert(conn.sent.end(), + conn.pending_q.begin(), + conn.pending_q.end()); + } + auto acked = ack_left; + assert(acked == 0 || conn.in_seq > 0); + // sweep all pending writes with the concrete Protocol + return socket->write(do_sweep_messages( + conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0) + ).then([this, prv_keepalive_ack=keepalive_ack, acked] { + need_keepalive = false; + if (keepalive_ack == prv_keepalive_ack) { + keepalive_ack = std::nullopt; + } + assert(ack_left >= acked); + ack_left -= acked; + if (!is_queued()) { + return try_exit_sweep(); + } else { + // messages were enqueued during socket write + return seastar::make_ready_future<stop_t>(stop_t::no); + } + }); + } + case write_state_t::delay: + // delay dispatching writes until open + if (exit_open) { + exit_open->set_value(); + exit_open = std::nullopt; + logger().info("{} write_event: delay and set exit_open ...", conn); + } else { + logger().info("{} write_event: delay ...", conn); + } + return state_changed.get_shared_future() + .then([] { return stop_t::no; }); + case write_state_t::drop: + ceph_assert(write_dispatching); + write_dispatching = false; + if (exit_open) { + exit_open->set_value(); + exit_open = std::nullopt; + logger().info("{} write_event: dropped and set exit_open", conn); + } else { + logger().info("{} write_event: dropped", conn); + } + return seastar::make_ready_future<stop_t>(stop_t::yes); + default: + ceph_assert(false); + } + }).handle_exception_type([this] (const std::system_error& e) { + if (e.code() != std::errc::broken_pipe && + e.code() != std::errc::connection_reset && + e.code() != error::negotiation_failure) { + logger().error("{} write_event(): unexpected error at {} -- {}", + conn, get_state_name(write_state), e); + ceph_abort(); + } + socket->shutdown(); + if (write_state == write_state_t::open) { + logger().info("{} write_event(): fault at {}, going to delay -- {}", + conn, get_state_name(write_state), e); + write_state = write_state_t::delay; + } else { + logger().info("{} write_event(): fault at {} -- {}", + conn, get_state_name(write_state), e); + } + return do_write_dispatch_sweep(); + }); +} + +void Protocol::write_event() +{ + notify_write(); + if (write_dispatching) { + // already dispatching + return; + } + write_dispatching = true; + switch (write_state) { + case write_state_t::open: + [[fallthrough]]; + case write_state_t::delay: + assert(!gate.is_closed()); + gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] { + return do_write_dispatch_sweep(); + }); + return; + case write_state_t::drop: + write_dispatching = false; + return; + default: + ceph_assert(false); + } +} + +} // namespace crimson::net diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h new file mode 100644 index 000000000..dc4e4f2af --- /dev/null +++ b/src/crimson/net/Protocol.h @@ -0,0 +1,173 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/gate.hh> +#include <seastar/core/shared_future.hh> + +#include "crimson/common/gated.h" +#include "crimson/common/log.h" +#include "Fwd.h" +#include "SocketConnection.h" + +namespace crimson::net { + +class Protocol { + public: + enum class proto_t { + none, + v1, + v2 + }; + + Protocol(Protocol&&) = delete; + virtual ~Protocol(); + + virtual bool is_connected() const = 0; + +#ifdef UNIT_TESTS_BUILT + bool is_closed_clean = false; + bool is_closed() const { return closed; } +#endif + + // Reentrant closing + void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt); + seastar::future<> close_clean(bool dispatch_reset) { + close(dispatch_reset); + // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset() + // which will otherwise result in deadlock + assert(close_ready.valid()); + return close_ready.get_future(); + } + + virtual void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) = 0; + + virtual void start_accept(SocketRef&& socket, + const entity_addr_t& peer_addr) = 0; + + virtual void print(std::ostream&) const = 0; + protected: + Protocol(proto_t type, + ChainedDispatchers& dispatchers, + SocketConnection& conn); + + virtual void trigger_close() = 0; + + virtual ceph::bufferlist do_sweep_messages( + const std::deque<MessageRef>& msgs, + size_t num_msgs, + bool require_keepalive, + std::optional<utime_t> keepalive_ack, + bool require_ack) = 0; + + virtual void notify_write() {}; + + virtual void on_closed() {} + + public: + const proto_t proto_type; + SocketRef socket; + + protected: + ChainedDispatchers& dispatchers; + SocketConnection &conn; + + AuthConnectionMetaRef auth_meta; + + private: + bool closed = false; + // become valid only after closed == true + seastar::shared_future<> close_ready; + +// the write state-machine + public: + seastar::future<> send(MessageRef msg); + seastar::future<> keepalive(); + +// TODO: encapsulate a SessionedSender class + protected: + // write_state is changed with state atomically, indicating the write + // behavior of the according state. + enum class write_state_t : uint8_t { + none, + delay, + open, + drop + }; + + static const char* get_state_name(write_state_t state) { + uint8_t index = static_cast<uint8_t>(state); + static const char *const state_names[] = {"none", + "delay", + "open", + "drop"}; + assert(index < std::size(state_names)); + return state_names[index]; + } + + void set_write_state(const write_state_t& state) { + if (write_state == write_state_t::open && + state != write_state_t::open && + write_dispatching) { + exit_open = seastar::shared_promise<>(); + } + write_state = state; + state_changed.set_value(); + state_changed = seastar::shared_promise<>(); + } + + seastar::future<> wait_write_exit() { + if (exit_open) { + return exit_open->get_shared_future(); + } + return seastar::now(); + } + + void notify_keepalive_ack(utime_t keepalive_ack); + + void notify_ack(); + + void requeue_up_to(seq_num_t seq); + + void requeue_sent(); + + void reset_write(); + + bool is_queued() const { + return (!conn.out_q.empty() || + ack_left > 0 || + need_keepalive || + keepalive_ack.has_value()); + } + + void ack_writes(seq_num_t seq); + crimson::common::Gated gate; + + private: + write_state_t write_state = write_state_t::none; + // wait until current state changed + seastar::shared_promise<> state_changed; + + bool need_keepalive = false; + std::optional<utime_t> keepalive_ack = std::nullopt; + uint64_t ack_left = 0; + bool write_dispatching = false; + // If another continuation is trying to close or replace socket when + // write_dispatching is true and write_state is open, + // it needs to wait for exit_open until writing is stopped or failed. + std::optional<seastar::shared_promise<>> exit_open; + + seastar::future<stop_t> try_exit_sweep(); + seastar::future<> do_write_dispatch_sweep(); + void write_event(); +}; + +inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) { + proto.print(out); + return out; +} + + +} // namespace crimson::net diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc new file mode 100644 index 000000000..3c604240d --- /dev/null +++ b/src/crimson/net/ProtocolV1.cc @@ -0,0 +1,1014 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ProtocolV1.h" + +#include <seastar/core/shared_future.hh> +#include <seastar/core/sleep.hh> +#include <seastar/net/packet.hh> + +#include "include/msgr.h" +#include "include/random.h" +#include "auth/Auth.h" +#include "auth/AuthSessionHandler.h" + +#include "crimson/auth/AuthClient.h" +#include "crimson/auth/AuthServer.h" +#include "crimson/common/log.h" +#include "chained_dispatchers.h" +#include "Errors.h" +#include "Socket.h" +#include "SocketConnection.h" +#include "SocketMessenger.h" + +WRITE_RAW_ENCODER(ceph_msg_connect); +WRITE_RAW_ENCODER(ceph_msg_connect_reply); + +using crimson::common::local_conf; + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) +{ + return out << "connect{features=" << std::hex << c.features << std::dec + << " host_type=" << c.host_type + << " global_seq=" << c.global_seq + << " connect_seq=" << c.connect_seq + << " protocol_version=" << c.protocol_version + << " authorizer_protocol=" << c.authorizer_protocol + << " authorizer_len=" << c.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}'; +} + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) +{ + return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag) + << " features=" << std::hex << r.features << std::dec + << " global_seq=" << r.global_seq + << " connect_seq=" << r.connect_seq + << " protocol_version=" << r.protocol_version + << " authorizer_len=" << r.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}'; +} + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +template <typename T> +seastar::net::packet make_static_packet(const T& value) { + return { reinterpret_cast<const char*>(&value), sizeof(value) }; +} + +// store the banner in a non-const string for buffer::create_static() +char banner[] = CEPH_BANNER; +constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; + +constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); +constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); + +// check that the buffer starts with a valid banner without requiring it to +// be contiguous in memory +void validate_banner(bufferlist::const_iterator& p) +{ + auto b = std::cbegin(banner); + auto end = b + banner_size; + while (b != end) { + const char *buf{nullptr}; + auto remaining = std::distance(b, end); + auto len = p.get_ptr_and_advance(remaining, &buf); + if (!std::equal(buf, buf + len, b)) { + throw std::system_error( + make_error_code(crimson::net::error::bad_connect_banner)); + } + b += len; + } +} + +// return a static bufferptr to the given object +template <typename T> +bufferptr create_static(T& obj) +{ + return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj)); +} + +uint32_t get_proto_version(entity_type_t peer_type, bool connect) +{ + constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; + // see also OSD.h, unlike other connection of simple/async messenger, + // crimson msgr is only used by osd + constexpr uint32_t CEPH_OSD_PROTOCOL = 10; + if (peer_type == my_type) { + // internal + return CEPH_OSD_PROTOCOL; + } else { + // public + switch (connect ? peer_type : my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + default: return 0; + } + } +} + +void discard_up_to(std::deque<MessageRef>* queue, + crimson::net::seq_num_t seq) +{ + while (!queue->empty() && + queue->front()->get_seq() < seq) { + queue->pop_front(); + } +} + +} // namespace anonymous + +namespace crimson::net { + +ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers, + SocketConnection& conn, + SocketMessenger& messenger) + : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {} + +ProtocolV1::~ProtocolV1() {} + +bool ProtocolV1::is_connected() const +{ + return state == state_t::open; +} + +// connecting state + +void ProtocolV1::reset_session() +{ + conn.out_q = {}; + conn.sent = {}; + conn.in_seq = 0; + h.connect_seq = 0; + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + // Set out_seq to a random value, so CRC won't be predictable. + // Constant to limit starting sequence number to 2^31. Nothing special + // about it, just a big number. + constexpr uint64_t SEQ_MASK = 0x7fffffff; + conn.out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK); + } else { + // previously, seq #'s always started at 0. + conn.out_seq = 0; + } +} + +seastar::future<stop_t> +ProtocolV1::handle_connect_reply(msgr_tag_t tag) +{ + if (h.auth_payload.length() && !conn.peer_is_mon()) { + if (tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { // more + h.auth_more = messenger.get_auth_client()->handle_auth_reply_more( + conn.shared_from_this(), auth_meta, h.auth_payload); + return seastar::make_ready_future<stop_t>(stop_t::no); + } else { + int ret = messenger.get_auth_client()->handle_auth_done( + conn.shared_from_this(), auth_meta, 0, 0, h.auth_payload); + if (ret < 0) { + // fault + logger().warn("{} AuthClient::handle_auth_done() return {}", conn, ret); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + } + } + + switch (tag) { + case CEPH_MSGR_TAG_FEATURES: + logger().error("{} connect protocol feature mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADPROTOVER: + logger().error("{} connect protocol version mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADAUTHORIZER: + logger().error("{} got bad authorizer", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_RESETSESSION: + reset_session(); + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_RETRY_GLOBAL: + return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) { + h.global_seq = gs; + return seastar::make_ready_future<stop_t>(stop_t::no); + }); + case CEPH_MSGR_TAG_RETRY_SESSION: + ceph_assert(h.reply.connect_seq > h.connect_seq); + h.connect_seq = h.reply.connect_seq; + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_WAIT: + // TODO: state wait + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_SEQ: + case CEPH_MSGR_TAG_READY: + if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features); + missing) { + logger().error("{} missing required features", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + return seastar::futurize_invoke([this, tag] { + if (tag == CEPH_MSGR_TAG_SEQ) { + return socket->read_exactly(sizeof(seq_num_t)) + .then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&conn.out_q, *acked_seq); + return socket->write_flush(make_static_packet(conn.in_seq)); + }); + } + // tag CEPH_MSGR_TAG_READY + return seastar::now(); + }).then([this] { + // hooray! + h.peer_global_seq = h.reply.global_seq; + conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; + h.connect_seq++; + h.backoff = 0ms; + conn.set_features(h.reply.features & h.connect.features); + if (auth_meta->authorizer) { + session_security.reset( + get_auth_session_handler(nullptr, + auth_meta->authorizer->protocol, + auth_meta->session_key, + conn.features)); + } else { + session_security.reset(); + } + return seastar::make_ready_future<stop_t>(stop_t::yes); + }); + break; + default: + // unknown tag + logger().error("{} got unknown tag", __func__, int(tag)); + throw std::system_error(make_error_code(error::negotiation_failure)); + } +} + +ceph::bufferlist ProtocolV1::get_auth_payload() +{ + // only non-mons connectings to mons use MAuth messages + if (conn.peer_is_mon() && + messenger.get_mytype() != CEPH_ENTITY_TYPE_MON) { + return {}; + } else { + if (h.auth_more.length()) { + logger().info("using augmented (challenge) auth payload"); + return std::move(h.auth_more); + } else { + auto [auth_method, preferred_modes, auth_bl] = + messenger.get_auth_client()->get_auth_request( + conn.shared_from_this(), auth_meta); + auth_meta->auth_method = auth_method; + return auth_bl; + } + } +} + +seastar::future<stop_t> +ProtocolV1::repeat_connect() +{ + // encode ceph_msg_connect + memset(&h.connect, 0, sizeof(h.connect)); + h.connect.features = conn.policy.features_supported; + h.connect.host_type = messenger.get_myname().type(); + h.connect.global_seq = h.global_seq; + h.connect.connect_seq = h.connect_seq; + h.connect.protocol_version = get_proto_version(conn.get_peer_type(), true); + // this is fyi, actually, server decides! + h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; + + ceph_assert(messenger.get_auth_client()); + + bufferlist bl; + bufferlist auth_bl = get_auth_payload(); + if (auth_bl.length()) { + h.connect.authorizer_protocol = auth_meta->auth_method; + h.connect.authorizer_len = auth_bl.length(); + bl.append(create_static(h.connect)); + bl.claim_append(auth_bl); + } else { + h.connect.authorizer_protocol = 0; + h.connect.authorizer_len = 0; + bl.append(create_static(h.connect)); + }; + return socket->write_flush(std::move(bl)) + .then([this] { + // read the reply + return socket->read(sizeof(h.reply)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.reply, p); + ceph_assert(p.end()); + return socket->read(h.reply.authorizer_len); + }).then([this] (bufferlist bl) { + h.auth_payload = std::move(bl); + return handle_connect_reply(h.reply.tag); + }); +} + +void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, + const entity_name_t& _peer_name) +{ + ceph_assert(state == state_t::none); + logger().trace("{} trigger connecting, was {}", conn, static_cast<int>(state)); + state = state_t::connecting; + set_write_state(write_state_t::delay); + + ceph_assert(!socket); + ceph_assert(!gate.is_closed()); + conn.peer_addr = _peer_addr; + conn.target_addr = _peer_addr; + conn.set_peer_name(_peer_name); + conn.policy = messenger.get_policy(_peer_name.type()); + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + gate.dispatch_in_background("start_connect", *this, [this] { + return Socket::connect(conn.peer_addr) + .then([this](SocketRef sock) { + socket = std::move(sock); + if (state != state_t::connecting) { + assert(state == state_t::closing); + return socket->close().then([] { + throw std::system_error(make_error_code(error::protocol_aborted)); + }); + } + return seastar::now(); + }).then([this] { + return messenger.get_global_seq(); + }).then([this] (auto gs) { + h.global_seq = gs; + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + ceph_assert(p.end()); + if (saddr != conn.peer_addr) { + logger().error("{} my peer_addr {} doesn't match what peer advertized {}", + conn, conn.peer_addr, saddr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (state != state_t::connecting) { + assert(state == state_t::closing); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + socket->learn_ephemeral_port_as_connector(caddr.get_port()); + if (unlikely(caddr.is_msgr2())) { + logger().warn("{} peer sent a v2 address for me: {}", + conn, caddr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + caddr.set_type(entity_addr_t::TYPE_LEGACY); + return messenger.learned_addr(caddr, conn); + }).then([this] { + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + return socket->write_flush(std::move(bl)); + }).then([=] { + return seastar::repeat([this] { + return repeat_connect(); + }); + }).then([this] { + if (state != state_t::connecting) { + assert(state == state_t::closing); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + execute_open(open_t::connected); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the connecting state + logger().warn("{} connecting fault: {}", conn, eptr); + close(true); + }); + }); +} + +// accepting state + +seastar::future<stop_t> ProtocolV1::send_connect_reply( + msgr_tag_t tag, bufferlist&& authorizer_reply) +{ + h.reply.tag = tag; + h.reply.features = static_cast<uint64_t>((h.connect.features & + conn.policy.features_supported) | + conn.policy.features_required); + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + return socket->write_flush(std::move(reply)); + }).then([] { + return stop_t::no; + }); +} + +seastar::future<stop_t> ProtocolV1::send_connect_reply_ready( + msgr_tag_t tag, bufferlist&& authorizer_reply) +{ + return messenger.get_global_seq( + ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) { + h.global_seq = gs; + h.reply.tag = tag; + h.reply.features = conn.policy.features_supported; + h.reply.global_seq = h.global_seq; + h.reply.connect_seq = h.connect_seq; + h.reply.flags = 0; + if (conn.policy.lossy) { + h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; + } + h.reply.authorizer_len = auth_len; + + session_security.reset( + get_auth_session_handler(nullptr, + auth_meta->auth_method, + auth_meta->session_key, + conn.features)); + + return socket->write(make_static_packet(h.reply)); + }).then([this, reply=std::move(authorizer_reply)]() mutable { + if (reply.length()) { + return socket->write(std::move(reply)); + } else { + return seastar::now(); + } + }).then([this] { + if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { + return socket->write_flush(make_static_packet(conn.in_seq)) + .then([this] { + return socket->read_exactly(sizeof(seq_num_t)); + }).then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&conn.out_q, *acked_seq); + }); + } else { + return socket->flush(); + } + }).then([] { + return stop_t::yes; + }); +} + +seastar::future<stop_t> ProtocolV1::replace_existing( + SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer) +{ + msgr_tag_t reply_tag; + if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && + !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + } else { + reply_tag = CEPH_MSGR_TAG_READY; + } + if (!existing->is_lossy()) { + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + } + existing->protocol->close(true); + return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); +} + +seastar::future<stop_t> ProtocolV1::handle_connect_with_existing( + SocketConnectionRef existing, bufferlist&& authorizer_reply) +{ + ProtocolV1 *exproto = dynamic_cast<ProtocolV1*>(existing->protocol.get()); + + if (h.connect.global_seq < exproto->peer_global_seq()) { + h.reply.global_seq = exproto->peer_global_seq(); + return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); + } else if (existing->is_lossy()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) { + return replace_existing(existing, std::move(authorizer_reply), true); + } else if (h.connect.connect_seq < exproto->connect_seq()) { + // old attempt, or we sent READY but they didn't get it. + h.reply.connect_seq = exproto->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } else if (h.connect.connect_seq == exproto->connect_seq()) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (exproto->get_state() == state_t::open || + exproto->get_state() == state_t::standby) { + if (conn.policy.resetcheck && exproto->connect_seq() == 0) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + h.reply.connect_seq = exproto->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } + } else if (existing->peer_wins()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + return send_connect_reply(CEPH_MSGR_TAG_WAIT); + } + } else if (conn.policy.resetcheck && + exproto->connect_seq() == 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); + } else { + return replace_existing(existing, std::move(authorizer_reply)); + } +} + +bool ProtocolV1::require_auth_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (local_conf()->cephx_require_signatures) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS || + h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { + return local_conf()->cephx_cluster_require_signatures; + } else { + return local_conf()->cephx_service_require_signatures; + } +} + +bool ProtocolV1::require_cephx_v2_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (local_conf()->cephx_require_version >= 2) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS || + h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { + return local_conf()->cephx_cluster_require_version >= 2; + } else { + return local_conf()->cephx_service_require_version >= 2; + } +} + +seastar::future<stop_t> ProtocolV1::repeat_handle_connect() +{ + return socket->read(sizeof(h.connect)) + .then([this](bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.connect, p); + if (conn.get_peer_type() != 0 && + conn.get_peer_type() != h.connect.host_type) { + logger().error("{} repeat_handle_connect(): my peer type does not match" + " what peer advertises {} != {}", + conn, conn.get_peer_type(), h.connect.host_type); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + conn.set_peer_type(h.connect.host_type); + conn.policy = messenger.get_policy(h.connect.host_type); + if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { + logger().error("{} we don't know how to reconnect to peer {}", + conn, conn.target_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + return socket->read(h.connect.authorizer_len); + }).then([this] (bufferlist authorizer) { + memset(&h.reply, 0, sizeof(h.reply)); + // TODO: set reply.protocol_version + if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { + return send_connect_reply( + CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); + } + if (require_auth_feature()) { + conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (require_cephx_v2_feature()) { + conn.policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2; + } + if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features; + feat_missing != 0) { + return send_connect_reply( + CEPH_MSGR_TAG_FEATURES, bufferlist{}); + } + + bufferlist authorizer_reply; + auth_meta->auth_method = h.connect.authorizer_protocol; + if (!HAVE_FEATURE((uint64_t)h.connect.features, CEPHX_V2)) { + // peer doesn't support it and we won't get here if we require it + auth_meta->skip_authorizer_challenge = true; + } + auto more = static_cast<bool>(auth_meta->authorizer_challenge); + ceph_assert(messenger.get_auth_server()); + int r = messenger.get_auth_server()->handle_auth_request( + conn.shared_from_this(), auth_meta, more, auth_meta->auth_method, authorizer, + &authorizer_reply); + + if (r < 0) { + session_security.reset(); + return send_connect_reply( + CEPH_MSGR_TAG_BADAUTHORIZER, std::move(authorizer_reply)); + } else if (r == 0) { + ceph_assert(authorizer_reply.length()); + return send_connect_reply( + CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, std::move(authorizer_reply)); + } + + // r > 0 + if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) { + if (existing->protocol->proto_type != proto_t::v1) { + logger().warn("{} existing {} proto version is {} not 1, close existing", + conn, *existing, + static_cast<int>(existing->protocol->proto_type)); + // NOTE: this is following async messenger logic, but we may miss the reset event. + existing->mark_down(); + } else { + return handle_connect_with_existing(existing, std::move(authorizer_reply)); + } + } + if (h.connect.connect_seq > 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, + std::move(authorizer_reply)); + } + h.connect_seq = h.connect.connect_seq + 1; + h.peer_global_seq = h.connect.global_seq; + conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features); + // TODO: cct + return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); + }); +} + +void ProtocolV1::start_accept(SocketRef&& sock, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + logger().trace("{} trigger accepting, was {}", + conn, static_cast<int>(state)); + state = state_t::accepting; + set_write_state(write_state_t::delay); + + ceph_assert(!socket); + // until we know better + conn.target_addr = _peer_addr; + socket = std::move(sock); + messenger.accept_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + gate.dispatch_in_background("start_accept", *this, [this] { + // stop learning my_addr before sending it out, so it won't change + return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] { + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + ::encode(conn.target_addr, bl, 0); + return socket->write_flush(std::move(bl)); + }).then([this] { + // read client's handshake header and connect request + return socket->read(client_header_size); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ceph_assert(p.end()); + if ((addr.is_legacy() || addr.is_any()) && + addr.is_same_host(conn.target_addr)) { + // good + } else { + logger().error("{} peer advertized an invalid peer_addr: {}," + " which should be v1 and the same host with {}.", + conn, addr, conn.peer_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + conn.peer_addr = addr; + conn.target_addr = conn.peer_addr; + return seastar::repeat([this] { + return repeat_handle_connect(); + }); + }).then([this] { + if (state != state_t::accepting) { + assert(state == state_t::closing); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + execute_open(open_t::accepted); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the accepting state + logger().warn("{} accepting fault: {}", conn, eptr); + close(false); + }); + }); +} + +// open state + +ceph::bufferlist ProtocolV1::do_sweep_messages( + const std::deque<MessageRef>& msgs, + size_t num_msgs, + bool require_keepalive, + std::optional<utime_t> _keepalive_ack, + bool require_ack) +{ + static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) + + sizeof(ceph_msg_header) + + sizeof(ceph_msg_footer); + static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) + + sizeof(ceph_msg_header) + + sizeof(ceph_msg_footer_old); + + ceph::bufferlist bl; + if (likely(num_msgs)) { + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + bl.reserve(num_msgs * RESERVE_MSG_SIZE); + } else { + bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD); + } + } + + if (unlikely(require_keepalive)) { + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + logger().trace("{} write keepalive2 {}", conn, k.req.stamp.tv_sec); + bl.append(create_static(k.req)); + } + + if (unlikely(_keepalive_ack.has_value())) { + logger().trace("{} write keepalive2 ack {}", conn, *_keepalive_ack); + k.ack.stamp = ceph_timespec(*_keepalive_ack); + bl.append(create_static(k.ack)); + } + + if (require_ack) { + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time of crimson-osd's GA, the in-cluster communication + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + } + + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { + ceph_assert(!msg->get_seq() && "message already has seq"); + msg->set_seq(++conn.out_seq); + auto& header = msg->get_header(); + header.src = messenger.get_myname(); + msg->encode(conn.features, messenger.get_crc_flags()); + if (session_security) { + session_security->sign_message(msg.get()); + } + logger().debug("{} --> #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + bl.append(CEPH_MSGR_TAG_MSG); + bl.append((const char*)&header, sizeof(header)); + bl.append(msg->get_payload()); + bl.append(msg->get_middle()); + bl.append(msg->get_data()); + auto& footer = msg->get_footer(); + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + bl.append((const char*)&footer, sizeof(footer)); + } else { + ceph_msg_footer_old old_footer; + if (messenger.get_crc_flags() & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + if (messenger.get_crc_flags() & MSG_CRC_DATA) { + old_footer.data_crc = footer.data_crc; + } else { + old_footer.data_crc = 0; + } + old_footer.flags = footer.flags; + bl.append((const char*)&old_footer, sizeof(old_footer)); + } + }); + + return bl; +} + +seastar::future<> ProtocolV1::handle_keepalive2_ack() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + auto t = reinterpret_cast<const ceph_timespec*>(buf.get()); + k.ack_stamp = *t; + logger().trace("{} got keepalive2 ack {}", conn, t->tv_sec); + }); +} + +seastar::future<> ProtocolV1::handle_keepalive2() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + utime_t ack{*reinterpret_cast<const ceph_timespec*>(buf.get())}; + notify_keepalive_ack(ack); + }); +} + +seastar::future<> ProtocolV1::handle_ack() +{ + return socket->read_exactly(sizeof(ceph_le64)) + .then([this] (auto buf) { + auto seq = reinterpret_cast<const ceph_le64*>(buf.get()); + discard_up_to(&conn.sent, *seq); + }); +} + +seastar::future<> ProtocolV1::maybe_throttle() +{ + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + const auto to_read = (m.header.front_len + + m.header.middle_len + + m.header.data_len); + return conn.policy.throttler_bytes->get(to_read); +} + +seastar::future<> ProtocolV1::read_message() +{ + return socket->read(sizeof(m.header)) + .then([this] (bufferlist bl) { + // throttle the traffic, maybe + auto p = bl.cbegin(); + ::decode(m.header, p); + return maybe_throttle(); + }).then([this] { + // read front + return socket->read(m.header.front_len); + }).then([this] (bufferlist bl) { + m.front = std::move(bl); + // read middle + return socket->read(m.header.middle_len); + }).then([this] (bufferlist bl) { + m.middle = std::move(bl); + // read data + return socket->read(m.header.data_len); + }).then([this] (bufferlist bl) { + m.data = std::move(bl); + // read footer + return socket->read(sizeof(m.footer)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(m.footer, p); + auto conn_ref = seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this()); + auto msg = ::decode_message(nullptr, 0, m.header, m.footer, + m.front, m.middle, m.data, conn_ref); + if (unlikely(!msg)) { + logger().warn("{} decode message failed", conn); + throw std::system_error{make_error_code(error::corrupted_message)}; + } + constexpr bool add_ref = false; // Message starts with 1 ref + // TODO: change MessageRef with foreign_ptr + auto msg_ref = MessageRef{msg, add_ref}; + + if (session_security) { + if (unlikely(session_security->check_message_signature(msg))) { + logger().warn("{} message signature check failed", conn); + throw std::system_error{make_error_code(error::corrupted_message)}; + } + } + // TODO: set time stamps + msg->set_byte_throttler(conn.policy.throttler_bytes); + + if (unlikely(!conn.update_rx_seq(msg->get_seq()))) { + // skip this message + return seastar::now(); + } + + logger().debug("{} <== #{} === {} ({})", + conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type()); + // throttle the reading process by the returned future + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); + }); +} + +seastar::future<> ProtocolV1::handle_tags() +{ + return seastar::keep_doing([this] { + // read the next tag + return socket->read_exactly(1) + .then([this] (auto buf) { + switch (buf[0]) { + case CEPH_MSGR_TAG_MSG: + return read_message(); + case CEPH_MSGR_TAG_ACK: + return handle_ack(); + case CEPH_MSGR_TAG_KEEPALIVE: + return seastar::now(); + case CEPH_MSGR_TAG_KEEPALIVE2: + return handle_keepalive2(); + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + return handle_keepalive2_ack(); + case CEPH_MSGR_TAG_CLOSE: + logger().info("{} got tag close", conn); + throw std::system_error(make_error_code(error::protocol_aborted)); + default: + logger().error("{} got unknown msgr tag {}", + conn, static_cast<int>(buf[0])); + throw std::system_error(make_error_code(error::read_eof)); + } + }); + }); +} + +void ProtocolV1::execute_open(open_t type) +{ + logger().trace("{} trigger open, was {}", conn, static_cast<int>(state)); + state = state_t::open; + set_write_state(write_state_t::open); + + if (type == open_t::connected) { + dispatchers.ms_handle_connect( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + } else { // type == open_t::accepted + dispatchers.ms_handle_accept( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + } + + gate.dispatch_in_background("execute_open", *this, [this] { + // start background processing of tags + return handle_tags() + .handle_exception_type([this] (const std::system_error& e) { + logger().warn("{} open fault: {}", conn, e); + if (e.code() == error::protocol_aborted || + e.code() == std::errc::connection_reset || + e.code() == error::read_eof) { + close(true); + return seastar::now(); + } else { + throw e; + } + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the open state + logger().warn("{} open fault: {}", conn, eptr); + close(true); + }); + }); +} + +// closing state + +void ProtocolV1::trigger_close() +{ + logger().trace("{} trigger closing, was {}", + conn, static_cast<int>(state)); + messenger.closing_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + + if (state == state_t::accepting) { + messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else if (state >= state_t::connecting && state < state_t::closing) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + + if (!socket) { + ceph_assert(state == state_t::connecting); + } + + state = state_t::closing; +} + +void ProtocolV1::on_closed() +{ + messenger.closed_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); +} + +seastar::future<> ProtocolV1::fault() +{ + if (conn.policy.lossy) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time of crimson-osd's GA, the in-cluster communication + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + return seastar::now(); +} + +void ProtocolV1::print(std::ostream& out) const +{ + out << conn; +} + +} // namespace crimson::net diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h new file mode 100644 index 000000000..ed6df8954 --- /dev/null +++ b/src/crimson/net/ProtocolV1.h @@ -0,0 +1,137 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "Protocol.h" + +class AuthAuthorizer; +class AuthSessionHandler; + +namespace crimson::net { + +class ProtocolV1 final : public Protocol { + public: + ProtocolV1(ChainedDispatchers& dispatchers, + SocketConnection& conn, + SocketMessenger& messenger); + ~ProtocolV1() override; + void print(std::ostream&) const final; + private: + void on_closed() override; + bool is_connected() const override; + + void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) override; + + void start_accept(SocketRef&& socket, + const entity_addr_t& peer_addr) override; + + void trigger_close() override; + + ceph::bufferlist do_sweep_messages( + const std::deque<MessageRef>& msgs, + size_t num_msgs, + bool require_keepalive, + std::optional<utime_t> keepalive_ack, + bool require_ack) override; + + private: + SocketMessenger &messenger; + + enum class state_t { + none, + accepting, + connecting, + open, + standby, + wait, + closing + }; + state_t state = state_t::none; + + // state for handshake + struct Handshake { + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + ceph::bufferlist auth_payload; // auth(orizer) payload read off the wire + ceph::bufferlist auth_more; // connect-side auth retry (we added challenge) + std::chrono::milliseconds backoff; + uint32_t connect_seq = 0; + uint32_t peer_global_seq = 0; + uint32_t global_seq; + } h; + + std::unique_ptr<AuthSessionHandler> session_security; + + // state for an incoming message + struct MessageReader { + ceph_msg_header header; + ceph_msg_footer footer; + bufferlist front; + bufferlist middle; + bufferlist data; + } m; + + struct Keepalive { + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2; + ceph_timespec stamp; + } __attribute__((packed)) req; + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; + ceph_timespec stamp; + } __attribute__((packed)) ack; + ceph_timespec ack_stamp; + } k; + + private: + // connecting + void reset_session(); + seastar::future<stop_t> handle_connect_reply(crimson::net::msgr_tag_t tag); + seastar::future<stop_t> repeat_connect(); + ceph::bufferlist get_auth_payload(); + + // accepting + seastar::future<stop_t> send_connect_reply( + msgr_tag_t tag, bufferlist&& authorizer_reply = {}); + seastar::future<stop_t> send_connect_reply_ready( + msgr_tag_t tag, bufferlist&& authorizer_reply); + seastar::future<stop_t> replace_existing( + SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer = false); + seastar::future<stop_t> handle_connect_with_existing( + SocketConnectionRef existing, bufferlist&& authorizer_reply); + bool require_auth_feature() const; + bool require_cephx_v2_feature() const; + seastar::future<stop_t> repeat_handle_connect(); + + // open + seastar::future<> handle_keepalive2_ack(); + seastar::future<> handle_keepalive2(); + seastar::future<> handle_ack(); + seastar::future<> maybe_throttle(); + seastar::future<> read_message(); + seastar::future<> handle_tags(); + + enum class open_t { + connected, + accepted + }; + void execute_open(open_t type); + + // replacing + // the number of connections initiated in this session, increment when a + // new connection is established + uint32_t connect_seq() const { return h.connect_seq; } + // the client side should connect us with a gseq. it will be reset with + // the one of exsting connection if it's greater. + uint32_t peer_global_seq() const { return h.peer_global_seq; } + // current state of ProtocolV1 + state_t get_state() const { return state; } + + seastar::future<> fault(); +}; + +} // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc new file mode 100644 index 000000000..b7137b8b8 --- /dev/null +++ b/src/crimson/net/ProtocolV2.cc @@ -0,0 +1,2139 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ProtocolV2.h" + +#include <seastar/core/lowres_clock.hh> +#include <fmt/format.h> +#include "include/msgr.h" +#include "include/random.h" + +#include "crimson/auth/AuthClient.h" +#include "crimson/auth/AuthServer.h" +#include "crimson/common/formatter.h" + +#include "chained_dispatchers.h" +#include "Errors.h" +#include "Socket.h" +#include "SocketConnection.h" +#include "SocketMessenger.h" + +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + +using namespace ceph::msgr::v2; +using crimson::common::local_conf; + +namespace { + +// TODO: apply the same logging policy to Protocol V1 +// Log levels in V2 Protocol: +// * error level, something error that cause connection to terminate: +// - fatal errors; +// - bugs; +// * warn level: something unusual that identifies connection fault or replacement: +// - unstable network; +// - incompatible peer; +// - auth failure; +// - connection race; +// - connection reset; +// * info level, something very important to show connection lifecycle, +// which doesn't happen very frequently; +// * debug level, important logs for debugging, including: +// - all the messages sent/received (-->/<==); +// - all the frames exchanged (WRITE/GOT); +// - important fields updated (UPDATE); +// - connection state transitions (TRIGGER); +// * trace level, trivial logs showing: +// - the exact bytes being sent/received (SEND/RECV(bytes)); +// - detailed information of sub-frames; +// - integrity checks; +// - etc. +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +[[noreturn]] void abort_in_fault() { + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); +} + +[[noreturn]] void abort_protocol() { + throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); +} + +[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) { + proto.close(dispatch_reset); + abort_protocol(); +} + +inline void expect_tag(const Tag& expected, + const Tag& actual, + crimson::net::SocketConnection& conn, + const char *where) { + if (actual != expected) { + logger().warn("{} {} received wrong tag: {}, expected {}", + conn, where, + static_cast<uint32_t>(actual), + static_cast<uint32_t>(expected)); + abort_in_fault(); + } +} + +inline void unexpected_tag(const Tag& unexpected, + crimson::net::SocketConnection& conn, + const char *where) { + logger().warn("{} {} received unexpected tag: {}", + conn, where, static_cast<uint32_t>(unexpected)); + abort_in_fault(); +} + +inline uint64_t generate_client_cookie() { + return ceph::util::generate_random_number<uint64_t>( + 1, std::numeric_limits<uint64_t>::max()); +} + +} // namespace anonymous + +namespace crimson::net { + +#ifdef UNIT_TESTS_BUILT +void intercept(Breakpoint bp, bp_type_t type, + SocketConnection& conn, SocketRef& socket) { + if (conn.interceptor) { + auto action = conn.interceptor->intercept(conn, Breakpoint(bp)); + socket->set_trap(type, action, &conn.interceptor->blocker); + } +} + +#define INTERCEPT_CUSTOM(bp, type) \ +intercept({bp}, type, conn, socket) + +#define INTERCEPT_FRAME(tag, type) \ +intercept({static_cast<Tag>(tag), type}, \ + type, conn, socket) + +#define INTERCEPT_N_RW(bp) \ +if (conn.interceptor) { \ + auto action = conn.interceptor->intercept(conn, {bp}); \ + ceph_assert(action != bp_action_t::BLOCK); \ + if (action == bp_action_t::FAULT) { \ + abort_in_fault(); \ + } \ +} + +#else +#define INTERCEPT_CUSTOM(bp, type) +#define INTERCEPT_FRAME(tag, type) +#define INTERCEPT_N_RW(bp) +#endif + +seastar::future<> ProtocolV2::Timer::backoff(double seconds) +{ + logger().warn("{} waiting {} seconds ...", conn, seconds); + cancel(); + last_dur_ = seconds; + as = seastar::abort_source(); + auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>( + std::chrono::duration<double>(seconds)); + return seastar::sleep_abortable(dur, *as + ).handle_exception_type([this] (const seastar::sleep_aborted& e) { + logger().debug("{} wait aborted", conn); + abort_protocol(); + }); +} + +ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers, + SocketConnection& conn, + SocketMessenger& messenger) + : Protocol(proto_t::v2, dispatchers, conn), + messenger{messenger}, + protocol_timer{conn} +{} + +ProtocolV2::~ProtocolV2() {} + +bool ProtocolV2::is_connected() const { + return state == state_t::READY || + state == state_t::ESTABLISHING || + state == state_t::REPLACING; +} + +void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, + const entity_name_t& _peer_name) +{ + ceph_assert(state == state_t::NONE); + ceph_assert(!socket); + ceph_assert(!gate.is_closed()); + conn.peer_addr = _peer_addr; + conn.target_addr = _peer_addr; + conn.set_peer_name(_peer_name); + conn.policy = messenger.get_policy(_peer_name.type()); + client_cookie = generate_client_cookie(); + logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}" + " policy(lossy={}, server={}, standby={}, resetcheck={})", + conn, _peer_addr, _peer_name, client_cookie, + conn.policy.lossy, conn.policy.server, + conn.policy.standby, conn.policy.resetcheck); + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + execute_connecting(); +} + +void ProtocolV2::start_accept(SocketRef&& sock, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::NONE); + ceph_assert(!socket); + // until we know better + conn.target_addr = _peer_addr; + socket = std::move(sock); + logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); + messenger.accept_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + execute_accepting(); +} + +// TODO: Frame related implementations, probably to a separate class. + +void ProtocolV2::enable_recording() +{ + rxbuf.clear(); + txbuf.clear(); + record_io = true; +} + +seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes) +{ + if (unlikely(record_io)) { + return socket->read_exactly(bytes) + .then([this] (auto bl) { + rxbuf.append(buffer::create(bl.share())); + return bl; + }); + } else { + return socket->read_exactly(bytes); + }; +} + +seastar::future<bufferlist> ProtocolV2::read(size_t bytes) +{ + if (unlikely(record_io)) { + return socket->read(bytes) + .then([this] (auto buf) { + rxbuf.append(buf); + return buf; + }); + } else { + return socket->read(bytes); + } +} + +seastar::future<> ProtocolV2::write(bufferlist&& buf) +{ + if (unlikely(record_io)) { + txbuf.append(buf); + } + return socket->write(std::move(buf)); +} + +seastar::future<> ProtocolV2::write_flush(bufferlist&& buf) +{ + if (unlikely(record_io)) { + txbuf.append(buf); + } + return socket->write_flush(std::move(buf)); +} + +size_t ProtocolV2::get_current_msg_size() const +{ + ceph_assert(rx_frame_asm.get_num_segments() > 0); + size_t sum = 0; + // we don't include SegmentIndex::Msg::HEADER. + for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { + sum += rx_frame_asm.get_segment_logical_len(idx); + } + return sum; +} + +seastar::future<Tag> ProtocolV2::read_main_preamble() +{ + rx_preamble.clear(); + return read_exactly(rx_frame_asm.get_preamble_onwire_len()) + .then([this] (auto bl) { + rx_segments_data.clear(); + try { + rx_preamble.append(buffer::create(std::move(bl))); + const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); + INTERCEPT_FRAME(tag, bp_type_t::READ); + return tag; + } catch (FrameError& e) { + logger().warn("{} read_main_preamble: {}", conn, e.what()); + abort_in_fault(); + } + }); +} + +seastar::future<> ProtocolV2::read_frame_payload() +{ + ceph_assert(rx_segments_data.empty()); + + return seastar::do_until( + [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); }, + [this] { + // TODO: create aligned and contiguous buffer from socket + const size_t seg_idx = rx_segments_data.size(); + if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx); + alignment != segment_t::DEFAULT_ALIGNMENT) { + logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}", + conn, alignment, rx_segments_data.size()); + } + uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); + // TODO: create aligned and contiguous buffer from socket + return read_exactly(onwire_len).then([this] (auto tmp_bl) { + logger().trace("{} RECV({}) frame segment[{}]", + conn, tmp_bl.size(), rx_segments_data.size()); + bufferlist segment; + segment.append(buffer::create(std::move(tmp_bl))); + rx_segments_data.emplace_back(std::move(segment)); + }); + } + ).then([this] { + return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); + }).then([this] (auto bl) { + logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); + bool ok = false; + try { + rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]); + bufferlist rx_epilogue; + rx_epilogue.append(buffer::create(std::move(bl))); + ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue); + } catch (FrameError& e) { + logger().error("read_frame_payload: {} {}", conn, e.what()); + abort_in_fault(); + } catch (ceph::crypto::onwire::MsgAuthError&) { + logger().error("read_frame_payload: {} bad auth tag", conn); + abort_in_fault(); + } + // we do have a mechanism that allows transmitter to start sending message + // and abort after putting entire data field on wire. This will be used by + // the kernel client to avoid unnecessary buffering. + if (!ok) { + // TODO + ceph_assert(false); + } + }); +} + +template <class F> +seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) +{ + auto bl = frame.get_buffer(tx_frame_asm); + const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str()); + logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", + conn, bl.length(), (int)main_preamble->tag, + (int)main_preamble->num_segments, main_preamble->crc); + INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE); + if (flush) { + return write_flush(std::move(bl)); + } else { + return write(std::move(bl)); + } +} + +void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant) +{ + if (!reentrant && _state == state) { + logger().error("{} is not allowed to re-trigger state {}", + conn, get_state_name(state)); + ceph_assert(false); + } + logger().debug("{} TRIGGER {}, was {}", + conn, get_state_name(_state), get_state_name(state)); + state = _state; + set_write_state(_write_state); +} + +void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr) +{ + if (conn.policy.lossy) { + logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}", + conn, func_name, get_state_name(state), eptr); + close(true); + } else if (conn.policy.server || + (conn.policy.standby && + (!is_queued() && conn.sent.empty()))) { + logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}", + conn, func_name, get_state_name(state), eptr); + execute_standby(); + } else if (backoff) { + logger().info("{} {}: fault at {}, going to WAIT -- {}", + conn, func_name, get_state_name(state), eptr); + execute_wait(false); + } else { + logger().info("{} {}: fault at {}, going to CONNECTING -- {}", + conn, func_name, get_state_name(state), eptr); + execute_connecting(); + } +} + +void ProtocolV2::reset_session(bool full) +{ + server_cookie = 0; + connect_seq = 0; + conn.in_seq = 0; + if (full) { + client_cookie = generate_client_cookie(); + peer_global_seq = 0; + reset_write(); + dispatchers.ms_handle_remote_reset( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + } +} + +seastar::future<std::tuple<entity_type_t, entity_addr_t>> +ProtocolV2::banner_exchange(bool is_connect) +{ + // 1. prepare and send banner + bufferlist banner_payload; + encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0); + encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0); + + bufferlist bl; + bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); + auto len_payload = static_cast<uint16_t>(banner_payload.length()); + encode(len_payload, bl, 0); + bl.claim_append(banner_payload); + logger().debug("{} SEND({}) banner: len_payload={}, supported={}, " + "required={}, banner=\"{}\"", + conn, bl.length(), len_payload, + CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES, + CEPH_BANNER_V2_PREFIX); + INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE); + return write_flush(std::move(bl)).then([this] { + // 2. read peer banner + unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); + INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); + return read_exactly(banner_len); // or read exactly? + }).then([this] (auto bl) { + // 3. process peer banner and read banner_payload + unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); + logger().debug("{} RECV({}) banner: \"{}\"", + conn, bl.size(), + std::string((const char*)bl.get(), banner_prefix_len)); + + if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { + if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { + logger().warn("{} peer is using V1 protocol", conn); + } else { + logger().warn("{} peer sent bad banner", conn); + } + abort_in_fault(); + } + bl.trim_front(banner_prefix_len); + + uint16_t payload_len; + bufferlist buf; + buf.append(buffer::create(std::move(bl))); + auto ti = buf.cbegin(); + try { + decode(payload_len, ti); + } catch (const buffer::error &e) { + logger().warn("{} decode banner payload len failed", conn); + abort_in_fault(); + } + logger().debug("{} GOT banner: payload_len={}", conn, payload_len); + INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); + return read(payload_len); + }).then([this, is_connect] (bufferlist bl) { + // 4. process peer banner_payload and send HelloFrame + auto p = bl.cbegin(); + uint64_t peer_supported_features; + uint64_t peer_required_features; + try { + decode(peer_supported_features, p); + decode(peer_required_features, p); + } catch (const buffer::error &e) { + logger().warn("{} decode banner payload failed", conn); + abort_in_fault(); + } + logger().debug("{} RECV({}) banner features: supported={} required={}", + conn, bl.length(), + peer_supported_features, peer_required_features); + + // Check feature bit compatibility + uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES; + uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; + if ((required_features & peer_supported_features) != required_features) { + logger().error("{} peer does not support all required features" + " required={} peer_supported={}", + conn, required_features, peer_supported_features); + abort_in_close(*this, is_connect); + } + if ((supported_features & peer_required_features) != peer_required_features) { + logger().error("{} we do not support all peer required features" + " peer_required={} supported={}", + conn, peer_required_features, supported_features); + abort_in_close(*this, is_connect); + } + this->peer_required_features = peer_required_features; + if (this->peer_required_features == 0) { + this->connection_features = msgr2_required; + } + const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); + tx_frame_asm.set_is_rev1(is_rev1); + rx_frame_asm.set_is_rev1(is_rev1); + + auto hello = HelloFrame::Encode(messenger.get_mytype(), + conn.target_addr); + logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", + conn, ceph_entity_type_name(messenger.get_mytype()), + conn.target_addr); + return write_frame(hello); + }).then([this] { + //5. read peer HelloFrame + return read_main_preamble(); + }).then([this] (Tag tag) { + expect_tag(Tag::HELLO, tag, conn, __func__); + return read_frame_payload(); + }).then([this] { + // 6. process peer HelloFrame + auto hello = HelloFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", + conn, ceph_entity_type_name(hello.entity_type()), + hello.peer_addr()); + return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>( + std::make_tuple(hello.entity_type(), hello.peer_addr())); + }); +} + +// CONNECTING state + +seastar::future<> ProtocolV2::handle_auth_reply() +{ + return read_main_preamble() + .then([this] (Tag tag) { + switch (tag) { + case Tag::AUTH_BAD_METHOD: + return read_frame_payload().then([this] { + // handle_auth_bad_method() logic + auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back()); + logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, " + "allowed_methods={}, allowed_modes={}", + conn, bad_method.method(), cpp_strerror(bad_method.result()), + bad_method.allowed_methods(), bad_method.allowed_modes()); + ceph_assert(messenger.get_auth_client()); + int r = messenger.get_auth_client()->handle_auth_bad_method( + conn.shared_from_this(), auth_meta, + bad_method.method(), bad_method.result(), + bad_method.allowed_methods(), bad_method.allowed_modes()); + if (r < 0) { + logger().warn("{} auth_client handle_auth_bad_method returned {}", + conn, r); + abort_in_fault(); + } + return client_auth(bad_method.allowed_methods()); + }); + case Tag::AUTH_REPLY_MORE: + return read_frame_payload().then([this] { + // handle_auth_reply_more() logic + auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}", + conn, auth_more.auth_payload().length()); + ceph_assert(messenger.get_auth_client()); + // let execute_connecting() take care of the thrown exception + auto reply = messenger.get_auth_client()->handle_auth_reply_more( + conn.shared_from_this(), auth_meta, auth_more.auth_payload()); + auto more_reply = AuthRequestMoreFrame::Encode(reply); + logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", + conn, reply.length()); + return write_frame(more_reply); + }).then([this] { + return handle_auth_reply(); + }); + case Tag::AUTH_DONE: + return read_frame_payload().then([this] { + // handle_auth_done() logic + auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}", + conn, auth_done.global_id(), + ceph_con_mode_name(auth_done.con_mode()), + auth_done.auth_payload().length()); + ceph_assert(messenger.get_auth_client()); + int r = messenger.get_auth_client()->handle_auth_done( + conn.shared_from_this(), auth_meta, + auth_done.global_id(), + auth_done.con_mode(), + auth_done.auth_payload()); + if (r < 0) { + logger().warn("{} auth_client handle_auth_done returned {}", conn, r); + abort_in_fault(); + } + auth_meta->con_mode = auth_done.con_mode(); + session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( + nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false); + return finish_auth(); + }); + default: { + unexpected_tag(tag, conn, __func__); + return seastar::now(); + } + } + }); +} + +seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods) +{ + // send_auth_request() logic + ceph_assert(messenger.get_auth_client()); + + try { + auto [auth_method, preferred_modes, bl] = + messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta); + auth_meta->auth_method = auth_method; + auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl); + logger().debug("{} WRITE AuthRequestFrame: method={}," + " preferred_modes={}, payload_len={}", + conn, auth_method, preferred_modes, bl.length()); + return write_frame(frame).then([this] { + return handle_auth_reply(); + }); + } catch (const crimson::auth::error& e) { + logger().error("{} get_initial_auth_request returned {}", conn, e); + abort_in_close(*this, true); + return seastar::now(); + } +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::process_wait() +{ + return read_frame_payload().then([this] { + // handle_wait() logic + logger().debug("{} GOT WaitFrame", conn); + WaitFrame::Decode(rx_segments_data.back()); + return next_step_t::wait; + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::client_connect() +{ + // send_client_ident() logic + uint64_t flags = 0; + if (conn.policy.lossy) { + flags |= CEPH_MSG_CONNECT_LOSSY; + } + + auto client_ident = ClientIdentFrame::Encode( + messenger.get_myaddrs(), + conn.target_addr, + messenger.get_myname().num(), + global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, flags, + client_cookie); + + logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, messenger.get_myaddrs(), conn.target_addr, + messenger.get_myname().num(), global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, client_cookie); + return write_frame(client_ident).then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + switch (tag) { + case Tag::IDENT_MISSING_FEATURES: + return read_frame_payload().then([this] { + // handle_ident_missing_features() logic + auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back()); + logger().warn("{} GOT IdentMissingFeaturesFrame: features={}" + " (client does not support all server features)", + conn, ident_missing.features()); + abort_in_fault(); + return next_step_t::none; + }); + case Tag::WAIT: + return process_wait(); + case Tag::SERVER_IDENT: + return read_frame_payload().then([this] { + // handle_server_ident() logic + requeue_sent(); + auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT ServerIdentFrame:" + " addrs={}, gid={}, gs={}," + " features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, + server_ident.addrs(), server_ident.gid(), + server_ident.global_seq(), + server_ident.supported_features(), + server_ident.required_features(), + server_ident.flags(), server_ident.cookie()); + + // is this who we intended to talk to? + // be a bit forgiving here, since we may be connecting based on addresses parsed out + // of mon_host or something. + if (!server_ident.addrs().contains(conn.target_addr)) { + logger().warn("{} peer identifies as {}, does not include {}", + conn, server_ident.addrs(), conn.target_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + + server_cookie = server_ident.cookie(); + + // TODO: change peer_addr to entity_addrvec_t + if (server_ident.addrs().front() != conn.peer_addr) { + logger().warn("{} peer advertises as {}, does not match {}", + conn, server_ident.addrs(), conn.peer_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (conn.get_peer_id() != entity_name_t::NEW && + conn.get_peer_id() != server_ident.gid()) { + logger().error("{} connection peer id ({}) does not match " + "what it should be ({}) during connecting, close", + conn, server_ident.gid(), conn.get_peer_id()); + abort_in_close(*this, true); + } + conn.set_peer_id(server_ident.gid()); + conn.set_features(server_ident.supported_features() & + conn.policy.features_supported); + peer_global_seq = server_ident.global_seq(); + + bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY; + if (lossy != conn.policy.lossy) { + logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy); + conn.policy.lossy = lossy; + } + if (lossy && (connect_seq != 0 || server_cookie != 0)) { + logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy", + conn, connect_seq, server_cookie); + connect_seq = 0; + server_cookie = 0; + } + + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + }); + default: { + unexpected_tag(tag, conn, "post_client_connect"); + return seastar::make_ready_future<next_step_t>(next_step_t::none); + } + } + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::client_reconnect() +{ + // send_reconnect() logic + auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(), + client_cookie, + server_cookie, + global_seq, + connect_seq, + conn.in_seq); + logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={}," + " server_cookie={}, gs={}, cs={}, msg_seq={}", + conn, messenger.get_myaddrs(), + client_cookie, server_cookie, + global_seq, connect_seq, conn.in_seq); + return write_frame(reconnect).then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + switch (tag) { + case Tag::SESSION_RETRY_GLOBAL: + return read_frame_payload().then([this] { + // handle_session_retry_global() logic + auto retry = RetryGlobalFrame::Decode(rx_segments_data.back()); + logger().warn("{} GOT RetryGlobalFrame: gs={}", + conn, retry.global_seq()); + return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) { + global_seq = gs; + logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq); + return client_reconnect(); + }); + }); + case Tag::SESSION_RETRY: + return read_frame_payload().then([this] { + // handle_session_retry() logic + auto retry = RetryFrame::Decode(rx_segments_data.back()); + logger().warn("{} GOT RetryFrame: cs={}", + conn, retry.connect_seq()); + connect_seq = retry.connect_seq() + 1; + logger().warn("{} UPDATE: cs={}", conn, connect_seq); + return client_reconnect(); + }); + case Tag::SESSION_RESET: + return read_frame_payload().then([this] { + // handle_session_reset() logic + auto reset = ResetFrame::Decode(rx_segments_data.back()); + logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); + reset_session(reset.full()); + return client_connect(); + }); + case Tag::WAIT: + return process_wait(); + case Tag::SESSION_RECONNECT_OK: + return read_frame_payload().then([this] { + // handle_reconnect_ok() logic + auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", + conn, reconnect_ok.msg_seq()); + requeue_up_to(reconnect_ok.msg_seq()); + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + }); + default: { + unexpected_tag(tag, conn, "post_client_reconnect"); + return seastar::make_ready_future<next_step_t>(next_step_t::none); + } + } + }); +} + +void ProtocolV2::execute_connecting() +{ + trigger_state(state_t::CONNECTING, write_state_t::delay, true); + if (socket) { + socket->shutdown(); + } + gated_execute("execute_connecting", [this] { + return messenger.get_global_seq().then([this] (auto gs) { + global_seq = gs; + assert(client_cookie != 0); + if (!conn.policy.lossy && server_cookie != 0) { + ++connect_seq; + logger().debug("{} UPDATE: gs={}, cs={} for reconnect", + conn, global_seq, connect_seq); + } else { // conn.policy.lossy || server_cookie == 0 + assert(connect_seq == 0); + assert(server_cookie == 0); + logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); + } + + return wait_write_exit(); + }).then([this] { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} before Socket::connect()", + conn, get_state_name(state)); + abort_protocol(); + } + if (socket) { + gate.dispatch_in_background("close_sockect_connecting", *this, + [sock = std::move(socket)] () mutable { + return sock->close().then([sock = std::move(sock)] {}); + }); + } + INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING); + return Socket::connect(conn.peer_addr); + }).then([this](SocketRef sock) { + logger().debug("{} socket connected", conn); + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} during Socket::connect()", + conn, get_state_name(state)); + return sock->close().then([sock = std::move(sock)] { + abort_protocol(); + }); + } + socket = std::move(sock); + return seastar::now(); + }).then([this] { + auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); + session_stream_handlers = { nullptr, nullptr }; + enable_recording(); + return banner_exchange(true); + }).then([this] (auto&& ret) { + auto [_peer_type, _my_addr_from_peer] = std::move(ret); + if (conn.get_peer_type() != _peer_type) { + logger().warn("{} connection peer type does not match what peer advertises {} != {}", + conn, ceph_entity_type_name(conn.get_peer_type()), + ceph_entity_type_name(_peer_type)); + abort_in_close(*this, true); + } + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} during banner_exchange(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port()); + if (unlikely(_my_addr_from_peer.is_legacy())) { + logger().warn("{} peer sent a legacy address for me: {}", + conn, _my_addr_from_peer); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2); + return messenger.learned_addr(_my_addr_from_peer, conn); + }).then([this] { + return client_auth(); + }).then([this] { + if (server_cookie == 0) { + ceph_assert(connect_seq == 0); + return client_connect(); + } else { + ceph_assert(connect_seq > 0); + return client_reconnect(); + } + }).then([this] (next_step_t next) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} at the end of execute_connecting()", + conn, get_state_name(state)); + abort_protocol(); + } + switch (next) { + case next_step_t::ready: { + logger().info("{} connected:" + " gs={}, pgs={}, cs={}, client_cookie={}," + " server_cookie={}, in_seq={}, out_seq={}, out_q={}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, conn.in_seq, + conn.out_seq, conn.out_q.size()); + execute_ready(true); + break; + } + case next_step_t::wait: { + logger().info("{} execute_connecting(): going to WAIT", conn); + execute_wait(true); + break; + } + default: { + ceph_abort("impossible next step"); + } + } + }).handle_exception([this] (std::exception_ptr eptr) { + if (state != state_t::CONNECTING) { + logger().info("{} execute_connecting(): protocol aborted at {} -- {}", + conn, get_state_name(state), eptr); + assert(state == state_t::CLOSING || + state == state_t::REPLACING); + return; + } + + if (conn.policy.server || + (conn.policy.standby && + (!is_queued() && conn.sent.empty()))) { + logger().info("{} execute_connecting(): fault at {} with nothing to send," + " going to STANDBY -- {}", + conn, get_state_name(state), eptr); + execute_standby(); + } else { + logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}", + conn, get_state_name(state), eptr); + execute_wait(false); + } + }); + }); +} + +// ACCEPTING state + +seastar::future<> ProtocolV2::_auth_bad_method(int r) +{ + // _auth_bad_method() logic + ceph_assert(r < 0); + auto [allowed_methods, allowed_modes] = + messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type()); + auto bad_method = AuthBadMethodFrame::Encode( + auth_meta->auth_method, r, allowed_methods, allowed_modes); + logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, " + "allowed_methods={}, allowed_modes={})", + conn, auth_meta->auth_method, cpp_strerror(r), + allowed_methods, allowed_modes); + return write_frame(bad_method).then([this] { + return server_auth(); + }); +} + +seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) +{ + // _handle_auth_request() logic + ceph_assert(messenger.get_auth_server()); + bufferlist reply; + int r = messenger.get_auth_server()->handle_auth_request( + conn.shared_from_this(), auth_meta, + more, auth_meta->auth_method, auth_payload, + &reply); + switch (r) { + // successful + case 1: { + auto auth_done = AuthDoneFrame::Encode( + conn.peer_global_id, auth_meta->con_mode, reply); + logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", + conn, conn.peer_global_id, + ceph_con_mode_name(auth_meta->con_mode), reply.length()); + return write_frame(auth_done).then([this] { + ceph_assert(auth_meta); + session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( + nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true); + return finish_auth(); + }); + } + // auth more + case 0: { + auto more = AuthReplyMoreFrame::Encode(reply); + logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", + conn, reply.length()); + return write_frame(more).then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__); + return read_frame_payload(); + }).then([this] { + auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}", + conn, auth_more.auth_payload().length()); + return _handle_auth_request(auth_more.auth_payload(), true); + }); + } + case -EBUSY: { + logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn); + abort_in_fault(); + return seastar::now(); + } + default: { + logger().warn("{} auth_server handle_auth_request returned {}", conn, r); + return _auth_bad_method(r); + } + } +} + +seastar::future<> ProtocolV2::server_auth() +{ + return read_main_preamble() + .then([this] (Tag tag) { + expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__); + return read_frame_payload(); + }).then([this] { + // handle_auth_request() logic + auto request = AuthRequestFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={}," + " payload_len={}", + conn, request.method(), request.preferred_modes(), + request.auth_payload().length()); + auth_meta->auth_method = request.method(); + auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode( + conn.get_peer_type(), auth_meta->auth_method, + request.preferred_modes()); + if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) { + logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn); + return _auth_bad_method(-EOPNOTSUPP); + } + return _handle_auth_request(request.auth_payload(), false); + }); +} + +bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const +{ + auto my_peer_name = conn.get_peer_name(); + if (my_peer_name.type() != peer_name.type()) { + return false; + } + if (my_peer_name.num() != entity_name_t::NEW && + peer_name.num() != entity_name_t::NEW && + my_peer_name.num() != peer_name.num()) { + return false; + } + return true; +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_wait() +{ + auto wait = WaitFrame::Encode(); + logger().debug("{} WRITE WaitFrame", conn); + return write_frame(wait).then([] { + return next_step_t::wait; + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::reuse_connection( + ProtocolV2* existing_proto, bool do_reset, + bool reconnect, uint64_t conn_seq, uint64_t msg_seq) +{ + existing_proto->trigger_replacing(reconnect, + do_reset, + std::move(socket), + std::move(auth_meta), + std::move(session_stream_handlers), + peer_global_seq, + client_cookie, + conn.get_peer_name(), + connection_features, + tx_frame_asm.get_is_rev1(), + rx_frame_asm.get_is_rev1(), + conn_seq, + msg_seq); +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + conn.interceptor->register_conn_replaced(conn); + } +#endif + // close this connection because all the necessary information is delivered + // to the exisiting connection, and jump to error handling code to abort the + // current state. + abort_in_close(*this, false); + return seastar::make_ready_future<next_step_t>(next_step_t::none); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) +{ + // handle_existing_connection() logic + ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( + existing_conn->protocol.get()); + ceph_assert(existing_proto); + logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting," + " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, + existing_conn, get_state_name(existing_proto->state), + existing_proto->global_seq, + existing_proto->peer_global_seq, + existing_proto->connect_seq, + existing_proto->client_cookie, + existing_proto->server_cookie); + + if (!validate_peer_name(existing_conn->get_peer_name())) { + logger().error("{} server_connect: my peer_name doesn't match" + " the existing connection {}, abort", conn, existing_conn); + abort_in_fault(); + } + + if (existing_proto->state == state_t::REPLACING) { + logger().warn("{} server_connect: racing replace happened while" + " replacing existing connection {}, send wait.", + conn, *existing_conn); + return send_wait(); + } + + if (existing_proto->peer_global_seq > peer_global_seq) { + logger().warn("{} server_connect:" + " this is a stale connection, because peer_global_seq({})" + " < existing->peer_global_seq({}), close this connection" + " in favor of existing connection {}", + conn, peer_global_seq, + existing_proto->peer_global_seq, *existing_conn); + abort_in_fault(); + } + + if (existing_conn->policy.lossy) { + // existing connection can be thrown out in favor of this one + logger().warn("{} server_connect:" + " existing connection {} is a lossy channel. Close existing in favor of" + " this connection", conn, *existing_conn); + execute_establishing(existing_conn, true); + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + } + + if (existing_proto->server_cookie != 0) { + if (existing_proto->client_cookie != client_cookie) { + // Found previous session + // peer has reset and we're going to reuse the existing connection + // by replacing the socket + logger().warn("{} server_connect:" + " found new session (cs={})" + " when existing {} is with stale session (cs={}, ss={})," + " peer must have reset", + conn, client_cookie, + *existing_conn, existing_proto->client_cookie, + existing_proto->server_cookie); + return reuse_connection(existing_proto, conn.policy.resetcheck); + } else { + // session establishment interrupted between client_ident and server_ident, + // continuing... + logger().warn("{} server_connect: found client session with existing {}" + " matched (cs={}, ss={}), continuing session establishment", + conn, *existing_conn, client_cookie, existing_proto->server_cookie); + return reuse_connection(existing_proto); + } + } else { + // Looks like a connection race: server and client are both connecting to + // each other at the same time. + if (existing_proto->client_cookie != client_cookie) { + if (existing_conn->peer_wins()) { + logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" + " and win, reusing existing {}", + conn, client_cookie, existing_proto->client_cookie, *existing_conn); + return reuse_connection(existing_proto); + } else { + logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" + " and lose to existing {}, ask client to wait", + conn, client_cookie, existing_proto->client_cookie, *existing_conn); + return existing_conn->keepalive().then([this] { + return send_wait(); + }); + } + } else { + logger().warn("{} server_connect: found client session with existing {}" + " matched (cs={}, ss={}), continuing session establishment", + conn, *existing_conn, client_cookie, existing_proto->server_cookie); + return reuse_connection(existing_proto); + } + } +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::server_connect() +{ + return read_frame_payload().then([this] { + // handle_client_ident() logic + auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT ClientIdentFrame: addrs={}, target={}," + " gid={}, gs={}, features_supported={}," + " features_required={}, flags={}, cookie={}", + conn, client_ident.addrs(), client_ident.target_addr(), + client_ident.gid(), client_ident.global_seq(), + client_ident.supported_features(), + client_ident.required_features(), + client_ident.flags(), client_ident.cookie()); + + if (client_ident.addrs().empty() || + client_ident.addrs().front() == entity_addr_t()) { + logger().warn("{} oops, client_ident.addrs() is empty", conn); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (!messenger.get_myaddrs().contains(client_ident.target_addr())) { + logger().warn("{} peer is trying to reach {} which is not us ({})", + conn, client_ident.target_addr(), messenger.get_myaddrs()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + // TODO: change peer_addr to entity_addrvec_t + entity_addr_t paddr = client_ident.addrs().front(); + if ((paddr.is_msgr2() || paddr.is_any()) && + paddr.is_same_host(conn.target_addr)) { + // good + } else { + logger().warn("{} peer's address {} is not v2 or not the same host with {}", + conn, paddr, conn.target_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + conn.peer_addr = paddr; + logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr); + conn.target_addr = conn.peer_addr; + if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { + logger().warn("{} we don't know how to reconnect to peer {}", + conn, conn.target_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + + if (conn.get_peer_id() != entity_name_t::NEW && + conn.get_peer_id() != client_ident.gid()) { + logger().error("{} client_ident peer_id ({}) does not match" + " what it should be ({}) during accepting, abort", + conn, client_ident.gid(), conn.get_peer_id()); + abort_in_fault(); + } + conn.set_peer_id(client_ident.gid()); + client_cookie = client_ident.cookie(); + + uint64_t feat_missing = + (conn.policy.features_required | msgr2_required) & + ~(uint64_t)client_ident.supported_features(); + if (feat_missing) { + auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); + logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", + conn, feat_missing); + return write_frame(ident_missing_features).then([] { + return next_step_t::wait; + }); + } + connection_features = + client_ident.supported_features() & conn.policy.features_supported; + logger().debug("{} UPDATE: connection_features={}", conn, connection_features); + + peer_global_seq = client_ident.global_seq(); + + // Looks good so far, let's check if there is already an existing connection + // to this peer. + + SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); + + if (existing_conn) { + if (existing_conn->protocol->proto_type != proto_t::v2) { + logger().warn("{} existing connection {} proto version is {}, close existing", + conn, *existing_conn, + static_cast<int>(existing_conn->protocol->proto_type)); + // should unregister the existing from msgr atomically + // NOTE: this is following async messenger logic, but we may miss the reset event. + execute_establishing(existing_conn, false); + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + } else { + return handle_existing_connection(existing_conn); + } + } else { + execute_establishing(nullptr, true); + return seastar::make_ready_future<next_step_t>(next_step_t::ready); + } + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::read_reconnect() +{ + return read_main_preamble() + .then([this] (Tag tag) { + expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect"); + return server_reconnect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_retry(uint64_t connect_seq) +{ + auto retry = RetryFrame::Encode(connect_seq); + logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); + return write_frame(retry).then([this] { + return read_reconnect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_retry_global(uint64_t global_seq) +{ + auto retry = RetryGlobalFrame::Encode(global_seq); + logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); + return write_frame(retry).then([this] { + return read_reconnect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::send_reset(bool full) +{ + auto reset = ResetFrame::Encode(full); + logger().warn("{} WRITE ResetFrame: full={}", conn, full); + return write_frame(reset).then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset"); + return server_connect(); + }); +} + +seastar::future<ProtocolV2::next_step_t> +ProtocolV2::server_reconnect() +{ + return read_frame_payload().then([this] { + // handle_reconnect() logic + auto reconnect = ReconnectFrame::Decode(rx_segments_data.back()); + + logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={}," + " server_cookie={}, gs={}, cs={}, msg_seq={}", + conn, reconnect.addrs(), + reconnect.client_cookie(), reconnect.server_cookie(), + reconnect.global_seq(), reconnect.connect_seq(), + reconnect.msg_seq()); + + // can peer_addrs be changed on-the-fly? + // TODO: change peer_addr to entity_addrvec_t + entity_addr_t paddr = reconnect.addrs().front(); + if (paddr.is_msgr2() || paddr.is_any()) { + // good + } else { + logger().warn("{} peer's address {} is not v2", conn, paddr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (conn.peer_addr == entity_addr_t()) { + conn.peer_addr = paddr; + } else if (conn.peer_addr != paddr) { + logger().error("{} peer identifies as {}, while conn.peer_addr={}," + " reconnect failed", + conn, paddr, conn.peer_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + peer_global_seq = reconnect.global_seq(); + + SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); + + if (!existing_conn) { + // there is no existing connection therefore cannot reconnect to previous + // session + logger().warn("{} server_reconnect: no existing connection from address {}," + " reseting client", conn, conn.peer_addr); + return send_reset(true); + } + + if (existing_conn->protocol->proto_type != proto_t::v2) { + logger().warn("{} server_reconnect: existing connection {} proto version is {}," + "close existing and reset client.", + conn, *existing_conn, + static_cast<int>(existing_conn->protocol->proto_type)); + // NOTE: this is following async messenger logic, but we may miss the reset event. + existing_conn->mark_down(); + return send_reset(true); + } + + ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( + existing_conn->protocol.get()); + ceph_assert(existing_proto); + logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting," + " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", + conn, global_seq, peer_global_seq, reconnect.connect_seq(), + reconnect.client_cookie(), reconnect.server_cookie(), + existing_conn, + get_state_name(existing_proto->state), + existing_proto->global_seq, + existing_proto->peer_global_seq, + existing_proto->connect_seq, + existing_proto->client_cookie, + existing_proto->server_cookie); + + if (!validate_peer_name(existing_conn->get_peer_name())) { + logger().error("{} server_reconnect: my peer_name doesn't match" + " the existing connection {}, abort", conn, existing_conn); + abort_in_fault(); + } + + if (existing_proto->state == state_t::REPLACING) { + logger().warn("{} server_reconnect: racing replace happened while " + " replacing existing connection {}, retry global.", + conn, *existing_conn); + return send_retry_global(existing_proto->peer_global_seq); + } + + if (existing_proto->client_cookie != reconnect.client_cookie()) { + logger().warn("{} server_reconnect:" + " client_cookie mismatch with existing connection {}," + " cc={} rcc={}. I must have reset, reseting client.", + conn, *existing_conn, + existing_proto->client_cookie, reconnect.client_cookie()); + return send_reset(conn.policy.resetcheck); + } else if (existing_proto->server_cookie == 0) { + // this happens when: + // - a connects to b + // - a sends client_ident + // - b gets client_ident, sends server_ident and sets cookie X + // - connection fault + // - b reconnects to a with cookie X, connect_seq=1 + // - a has cookie==0 + logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the" + " server_ident with existing connection {}." + " Asking peer to resume session establishment", + conn, existing_proto->client_cookie, *existing_conn); + return send_reset(false); + } + + if (existing_proto->peer_global_seq > reconnect.global_seq()) { + logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({})," + " with existing connection {}," + " ask client to retry global", + conn, existing_proto->peer_global_seq, + reconnect.global_seq(), *existing_conn); + return send_retry_global(existing_proto->peer_global_seq); + } + + if (existing_proto->connect_seq > reconnect.connect_seq()) { + logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({})," + " with existing connection {}, ask client to retry", + conn, reconnect.connect_seq(), + existing_proto->connect_seq, *existing_conn); + return send_retry(existing_proto->connect_seq); + } else if (existing_proto->connect_seq == reconnect.connect_seq()) { + // reconnect race: both peers are sending reconnect messages + if (existing_conn->peer_wins()) { + logger().warn("{} server_reconnect: reconnect race detected (cs={})" + " and win, reusing existing {}", + conn, reconnect.connect_seq(), *existing_conn); + return reuse_connection( + existing_proto, false, + true, reconnect.connect_seq(), reconnect.msg_seq()); + } else { + logger().warn("{} server_reconnect: reconnect race detected (cs={})" + " and lose to existing {}, ask client to wait", + conn, reconnect.connect_seq(), *existing_conn); + return send_wait(); + } + } else { // existing_proto->connect_seq < reconnect.connect_seq() + logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({})," + " reusing existing {}", + conn, existing_proto->connect_seq, + reconnect.connect_seq(), *existing_conn); + return reuse_connection( + existing_proto, false, + true, reconnect.connect_seq(), reconnect.msg_seq()); + } + }); +} + +void ProtocolV2::execute_accepting() +{ + trigger_state(state_t::ACCEPTING, write_state_t::none, false); + gate.dispatch_in_background("execute_accepting", *this, [this] { + return seastar::futurize_invoke([this] { + INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); + auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); + session_stream_handlers = { nullptr, nullptr }; + enable_recording(); + return banner_exchange(false); + }).then([this] (auto&& ret) { + auto [_peer_type, _my_addr_from_peer] = std::move(ret); + ceph_assert(conn.get_peer_type() == 0); + conn.set_peer_type(_peer_type); + + conn.policy = messenger.get_policy(_peer_type); + logger().info("{} UPDATE: peer_type={}," + " policy(lossy={} server={} standby={} resetcheck={})", + conn, ceph_entity_type_name(_peer_type), + conn.policy.lossy, conn.policy.server, + conn.policy.standby, conn.policy.resetcheck); + if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() || + messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) { + logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}", + conn, _my_addr_from_peer, messenger.get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + return messenger.learned_addr(_my_addr_from_peer, conn); + }).then([this] { + return server_auth(); + }).then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + switch (tag) { + case Tag::CLIENT_IDENT: + return server_connect(); + case Tag::SESSION_RECONNECT: + return server_reconnect(); + default: { + unexpected_tag(tag, conn, "post_server_auth"); + return seastar::make_ready_future<next_step_t>(next_step_t::none); + } + } + }).then([this] (next_step_t next) { + switch (next) { + case next_step_t::ready: + assert(state != state_t::ACCEPTING); + break; + case next_step_t::wait: + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} at the end of execute_accepting()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} execute_accepting(): going to SERVER_WAIT", conn); + execute_server_wait(); + break; + default: + ceph_abort("impossible next step"); + } + }).handle_exception([this] (std::exception_ptr eptr) { + logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", + conn, get_state_name(state), eptr); + close(false); + }); + }); +} + +// CONNECTING or ACCEPTING state + +seastar::future<> ProtocolV2::finish_auth() +{ + ceph_assert(auth_meta); + + const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() : + auth_meta->session_key.hmac_sha256(nullptr, rxbuf); + auto sig_frame = AuthSignatureFrame::Encode(sig); + ceph_assert(record_io); + record_io = false; + rxbuf.clear(); + logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); + return write_frame(sig_frame).then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth"); + return read_frame_payload(); + }).then([this] { + // handle_auth_signature() logic + auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature()); + + const auto actual_tx_sig = auth_meta->session_key.empty() ? + sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf); + if (sig_frame.signature() != actual_tx_sig) { + logger().warn("{} pre-auth signature mismatch actual_tx_sig={}" + " sig_frame.signature()={}", + conn, actual_tx_sig, sig_frame.signature()); + abort_in_fault(); + } + txbuf.clear(); + }); +} + +// ESTABLISHING + +void ProtocolV2::execute_establishing( + SocketConnectionRef existing_conn, bool dispatch_reset) { + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + + auto accept_me = [this] { + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + }; + + trigger_state(state_t::ESTABLISHING, write_state_t::delay, false); + if (existing_conn) { + existing_conn->protocol->close(dispatch_reset, std::move(accept_me)); + if (unlikely(state != state_t::ESTABLISHING)) { + logger().warn("{} triggered {} during execute_establishing(), " + "the accept event will not be delivered!", + conn, get_state_name(state)); + abort_protocol(); + } + } else { + accept_me(); + } + + dispatchers.ms_handle_accept( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + + gated_execute("execute_establishing", [this] { + return seastar::futurize_invoke([this] { + return send_server_ident(); + }).then([this] { + if (unlikely(state != state_t::ESTABLISHING)) { + logger().debug("{} triggered {} at the end of execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={}," + " server_cookie={}, in_seq={}, out_seq={}, out_q={}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, conn.in_seq, + conn.out_seq, conn.out_q.size()); + execute_ready(false); + }).handle_exception([this] (std::exception_ptr eptr) { + if (state != state_t::ESTABLISHING) { + logger().info("{} execute_establishing() protocol aborted at {} -- {}", + conn, get_state_name(state), eptr); + assert(state == state_t::CLOSING || + state == state_t::REPLACING); + return; + } + fault(false, "execute_establishing()", eptr); + }); + }); +} + +// ESTABLISHING or REPLACING state + +seastar::future<> +ProtocolV2::send_server_ident() +{ + // send_server_ident() logic + + // refered to async-conn v2: not assign gs to global_seq + return messenger.get_global_seq().then([this] (auto gs) { + logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); + + // this is required for the case when this connection is being replaced + requeue_up_to(0); + conn.in_seq = 0; + + if (!conn.policy.lossy) { + server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll); + } + + uint64_t flags = 0; + if (conn.policy.lossy) { + flags = flags | CEPH_MSG_CONNECT_LOSSY; + } + + auto server_ident = ServerIdentFrame::Encode( + messenger.get_myaddrs(), + messenger.get_myname().num(), + gs, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, + server_cookie); + + logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, messenger.get_myaddrs(), messenger.get_myname().num(), + gs, conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, server_cookie); + + conn.set_features(connection_features); + + return write_frame(server_ident); + }); +} + +// REPLACING state + +void ProtocolV2::trigger_replacing(bool reconnect, + bool do_reset, + SocketRef&& new_socket, + AuthConnectionMetaRef&& new_auth_meta, + ceph::crypto::onwire::rxtx_t new_rxtx, + uint64_t new_peer_global_seq, + uint64_t new_client_cookie, + entity_name_t new_peer_name, + uint64_t new_conn_features, + bool tx_is_rev1, + bool rx_is_rev1, + uint64_t new_connect_seq, + uint64_t new_msg_seq) +{ + trigger_state(state_t::REPLACING, write_state_t::delay, false); + if (socket) { + socket->shutdown(); + } + dispatchers.ms_handle_accept( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + gate.dispatch_in_background("trigger_replacing", *this, + [this, + reconnect, + do_reset, + new_socket = std::move(new_socket), + new_auth_meta = std::move(new_auth_meta), + new_rxtx = std::move(new_rxtx), + tx_is_rev1, rx_is_rev1, + new_client_cookie, new_peer_name, + new_conn_features, new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { + return wait_write_exit().then([this, do_reset] { + if (do_reset) { + reset_session(true); + } + protocol_timer.cancel(); + return execution_done.get_future(); + }).then([this, + reconnect, + new_socket = std::move(new_socket), + new_auth_meta = std::move(new_auth_meta), + new_rxtx = std::move(new_rxtx), + tx_is_rev1, rx_is_rev1, + new_client_cookie, new_peer_name, + new_conn_features, new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { + if (unlikely(state != state_t::REPLACING)) { + return new_socket->close().then([sock = std::move(new_socket)] { + abort_protocol(); + }); + } + + if (socket) { + gate.dispatch_in_background("close_socket_replacing", *this, + [sock = std::move(socket)] () mutable { + return sock->close().then([sock = std::move(sock)] {}); + }); + } + socket = std::move(new_socket); + auth_meta = std::move(new_auth_meta); + session_stream_handlers = std::move(new_rxtx); + record_io = false; + peer_global_seq = new_peer_global_seq; + + if (reconnect) { + connect_seq = new_connect_seq; + // send_reconnect_ok() logic + requeue_up_to(new_msg_seq); + auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq); + logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq); + return write_frame(reconnect_ok); + } else { + client_cookie = new_client_cookie; + assert(conn.get_peer_type() == new_peer_name.type()); + if (conn.get_peer_id() == entity_name_t::NEW) { + conn.set_peer_id(new_peer_name.num()); + } + connection_features = new_conn_features; + tx_frame_asm.set_is_rev1(tx_is_rev1); + rx_frame_asm.set_is_rev1(rx_is_rev1); + return send_server_ident(); + } + }).then([this, reconnect] { + if (unlikely(state != state_t::REPLACING)) { + logger().debug("{} triggered {} at the end of trigger_replacing()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} replaced ({}):" + " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={}," + " in_seq={}, out_seq={}, out_q={}", + conn, reconnect ? "reconnected" : "connected", + global_seq, peer_global_seq, connect_seq, client_cookie, + server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size()); + execute_ready(false); + }).handle_exception([this] (std::exception_ptr eptr) { + if (state != state_t::REPLACING) { + logger().info("{} trigger_replacing(): protocol aborted at {} -- {}", + conn, get_state_name(state), eptr); + assert(state == state_t::CLOSING); + return; + } + fault(true, "trigger_replacing()", eptr); + }); + }); +} + +// READY state + +ceph::bufferlist ProtocolV2::do_sweep_messages( + const std::deque<MessageRef>& msgs, + size_t num_msgs, + bool require_keepalive, + std::optional<utime_t> _keepalive_ack, + bool require_ack) +{ + ceph::bufferlist bl; + + if (unlikely(require_keepalive)) { + auto keepalive_frame = KeepAliveFrame::Encode(); + bl.append(keepalive_frame.get_buffer(tx_frame_asm)); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE); + } + + if (unlikely(_keepalive_ack.has_value())) { + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack); + bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm)); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE); + } + + if (require_ack && !num_msgs) { + auto ack_frame = AckFrame::Encode(conn.in_seq); + bl.append(ack_frame.get_buffer(tx_frame_asm)); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); + } + + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { + // TODO: move to common code + // set priority + msg->get_header().src = messenger.get_myname(); + + msg->encode(conn.features, 0); + + ceph_assert(!msg->get_seq() && "message already has seq"); + msg->set_seq(++conn.out_seq); + + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + ceph_msg_header2 header2{header.seq, header.tid, + header.type, header.priority, + header.version, + init_le32(0), header.data_off, + init_le64(conn.in_seq), + footer.flags, header.compat_version, + header.reserved}; + + auto message = MessageFrame::Encode(header2, + msg->get_payload(), msg->get_middle(), msg->get_data()); + logger().debug("{} --> #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + bl.append(message.get_buffer(tx_frame_asm)); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE); + }); + + return bl; +} + +seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) +{ + return read_frame_payload() + .then([this, throttle_stamp] { + utime_t recv_stamp{seastar::lowres_system_clock::now()}; + + // we need to get the size before std::moving segments data + const size_t cur_msg_size = get_current_msg_size(); + auto msg_frame = MessageFrame::Decode(rx_segments_data); + // XXX: paranoid copy just to avoid oops + ceph_msg_header2 current_header = msg_frame.header(); + + logger().trace("{} got {} + {} + {} byte message," + " envelope type={} src={} off={} seq={}", + conn, msg_frame.front_len(), msg_frame.middle_len(), + msg_frame.data_len(), current_header.type, conn.get_peer_name(), + current_header.data_off, current_header.seq); + + ceph_msg_header header{current_header.seq, + current_header.tid, + current_header.type, + current_header.priority, + current_header.version, + init_le32(msg_frame.front_len()), + init_le32(msg_frame.middle_len()), + init_le32(msg_frame.data_len()), + current_header.data_off, + conn.get_peer_name(), + current_header.compat_version, + current_header.reserved, + init_le32(0)}; + ceph_msg_footer footer{init_le32(0), init_le32(0), + init_le32(0), init_le64(0), current_header.flags}; + + auto conn_ref = seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this()); + Message *message = decode_message(nullptr, 0, header, footer, + msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref); + if (!message) { + logger().warn("{} decode message failed", conn); + abort_in_fault(); + } + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(cur_msg_size); + + message->set_throttle_stamp(throttle_stamp); + message->set_recv_stamp(recv_stamp); + message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()}); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the + // client side queueing because messages can't be renumbered, but the (kernel) + // client will occasionally pull a message out of the sent queue to send + // elsewhere. in that case it doesn't matter if we "got" it or not. + uint64_t cur_seq = conn.in_seq; + if (message->get_seq() <= cur_seq) { + logger().error("{} got old message {} <= {} {}, discarding", + conn, message->get_seq(), cur_seq, *message); + if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && + local_conf()->ms_die_on_old_message) { + ceph_assert(0 == "old msgs despite reconnect_seq feature"); + } + return seastar::now(); + } else if (message->get_seq() > cur_seq + 1) { + logger().error("{} missed message? skipped from seq {} to {}", + conn, cur_seq, message->get_seq()); + if (local_conf()->ms_die_on_skipped_message) { + ceph_assert(0 == "skipped incoming seq"); + } + } + + // note last received message. + conn.in_seq = message->get_seq(); + logger().debug("{} <== #{} === {} ({})", + conn, message->get_seq(), *message, message->get_type()); + notify_ack(); + ack_writes(current_header.ack_seq); + + // TODO: change MessageRef with seastar::shared_ptr + auto msg_ref = MessageRef{message, false}; + // throttle the reading process by the returned future + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); + }); +} + +void ProtocolV2::execute_ready(bool dispatch_connect) +{ + assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); + trigger_state(state_t::READY, write_state_t::open, false); + if (dispatch_connect) { + dispatchers.ms_handle_connect( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + } +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + conn.interceptor->register_conn_ready(conn); + } +#endif + gated_execute("execute_ready", [this] { + protocol_timer.cancel(); + return seastar::keep_doing([this] { + return read_main_preamble() + .then([this] (Tag tag) { + switch (tag) { + case Tag::MESSAGE: { + return seastar::futurize_invoke([this] { + // throttle_message() logic + if (!conn.policy.throttler_messages) { + return seastar::now(); + } + // TODO: message throttler + ceph_assert(false); + return seastar::now(); + }).then([this] { + // throttle_bytes() logic + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + size_t cur_msg_size = get_current_msg_size(); + if (!cur_msg_size) { + return seastar::now(); + } + logger().trace("{} wants {} bytes from policy throttler {}/{}", + conn, cur_msg_size, + conn.policy.throttler_bytes->get_current(), + conn.policy.throttler_bytes->get_max()); + return conn.policy.throttler_bytes->get(cur_msg_size); + }).then([this] { + // TODO: throttle_dispatch_queue() logic + utime_t throttle_stamp{seastar::lowres_system_clock::now()}; + return read_message(throttle_stamp); + }); + } + case Tag::ACK: + return read_frame_payload().then([this] { + // handle_message_ack() logic + auto ack = AckFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); + ack_writes(ack.seq()); + }); + case Tag::KEEPALIVE2: + return read_frame_payload().then([this] { + // handle_keepalive2() logic + auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT KeepAliveFrame: timestamp={}", + conn, keepalive_frame.timestamp()); + notify_keepalive_ack(keepalive_frame.timestamp()); + conn.set_last_keepalive(seastar::lowres_system_clock::now()); + }); + case Tag::KEEPALIVE2_ACK: + return read_frame_payload().then([this] { + // handle_keepalive2_ack() logic + auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back()); + conn.set_last_keepalive_ack( + seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}); + logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", + conn, conn.last_keepalive_ack); + }); + default: { + unexpected_tag(tag, conn, "execute_ready"); + return seastar::now(); + } + } + }); + }).handle_exception([this] (std::exception_ptr eptr) { + if (state != state_t::READY) { + logger().info("{} execute_ready(): protocol aborted at {} -- {}", + conn, get_state_name(state), eptr); + assert(state == state_t::REPLACING || + state == state_t::CLOSING); + return; + } + fault(false, "execute_ready()", eptr); + }); + }); +} + +// STANDBY state + +void ProtocolV2::execute_standby() +{ + trigger_state(state_t::STANDBY, write_state_t::delay, true); + if (socket) { + socket->shutdown(); + } +} + +void ProtocolV2::notify_write() +{ + if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { + logger().info("{} notify_write(): at {}, going to CONNECTING", + conn, get_state_name(state)); + execute_connecting(); + } +} + +// WAIT state + +void ProtocolV2::execute_wait(bool max_backoff) +{ + trigger_state(state_t::WAIT, write_state_t::delay, true); + if (socket) { + socket->shutdown(); + } + gated_execute("execute_wait", [this, max_backoff] { + double backoff = protocol_timer.last_dur(); + if (max_backoff) { + backoff = local_conf().get_val<double>("ms_max_backoff"); + } else if (backoff > 0) { + backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff); + } else { + backoff = local_conf().get_val<double>("ms_initial_backoff"); + } + return protocol_timer.backoff(backoff).then([this] { + if (unlikely(state != state_t::WAIT)) { + logger().debug("{} triggered {} at the end of execute_wait()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} execute_wait(): going to CONNECTING", conn); + execute_connecting(); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().info("{} execute_wait(): protocol aborted at {} -- {}", + conn, get_state_name(state), eptr); + assert(state == state_t::REPLACING || + state == state_t::CLOSING); + }); + }); +} + +// SERVER_WAIT state + +void ProtocolV2::execute_server_wait() +{ + trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false); + gated_execute("execute_server_wait", [this] { + return read_exactly(1).then([this] (auto bl) { + logger().warn("{} SERVER_WAIT got read, abort", conn); + abort_in_fault(); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}", + conn, get_state_name(state), eptr); + close(false); + }); + }); +} + +// CLOSING state + +void ProtocolV2::trigger_close() +{ + messenger.closing_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + + if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) { + messenger.unaccept_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) { + messenger.unregister_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + + protocol_timer.cancel(); + trigger_state(state_t::CLOSING, write_state_t::drop, false); +} + +void ProtocolV2::on_closed() +{ + messenger.closed_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); +} + +void ProtocolV2::print(std::ostream& out) const +{ + out << conn; +} + +} // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h new file mode 100644 index 000000000..be9a22816 --- /dev/null +++ b/src/crimson/net/ProtocolV2.h @@ -0,0 +1,225 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/sleep.hh> + +#include "Protocol.h" +#include "msg/async/frames_v2.h" +#include "msg/async/crypto_onwire.h" + +namespace crimson::net { + +class ProtocolV2 final : public Protocol { + public: + ProtocolV2(ChainedDispatchers& dispatchers, + SocketConnection& conn, + SocketMessenger& messenger); + ~ProtocolV2() override; + void print(std::ostream&) const final; + private: + void on_closed() override; + bool is_connected() const override; + + void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) override; + + void start_accept(SocketRef&& socket, + const entity_addr_t& peer_addr) override; + + void trigger_close() override; + + ceph::bufferlist do_sweep_messages( + const std::deque<MessageRef>& msgs, + size_t num_msgs, + bool require_keepalive, + std::optional<utime_t> keepalive_ack, + bool require_ack) override; + + void notify_write() override; + + private: + SocketMessenger &messenger; + + enum class state_t { + NONE = 0, + ACCEPTING, + SERVER_WAIT, + ESTABLISHING, + CONNECTING, + READY, + STANDBY, + WAIT, + REPLACING, + CLOSING + }; + state_t state = state_t::NONE; + + static const char *get_state_name(state_t state) { + const char *const statenames[] = {"NONE", + "ACCEPTING", + "SERVER_WAIT", + "ESTABLISHING", + "CONNECTING", + "READY", + "STANDBY", + "WAIT", + "REPLACING", + "CLOSING"}; + return statenames[static_cast<int>(state)]; + } + + void trigger_state(state_t state, write_state_t write_state, bool reentrant); + + uint64_t connection_features = 0; + uint64_t peer_required_features = 0; + + uint64_t client_cookie = 0; + uint64_t server_cookie = 0; + uint64_t global_seq = 0; + uint64_t peer_global_seq = 0; + uint64_t connect_seq = 0; + + seastar::shared_future<> execution_done = seastar::now(); + + template <typename Func> + void gated_execute(const char* what, Func&& func) { + gate.dispatch_in_background(what, *this, [this, &func] { + execution_done = seastar::futurize_invoke(std::forward<Func>(func)); + return execution_done.get_future(); + }); + } + + class Timer { + double last_dur_ = 0.0; + const SocketConnection& conn; + std::optional<seastar::abort_source> as; + public: + Timer(SocketConnection& conn) : conn(conn) {} + double last_dur() const { return last_dur_; } + seastar::future<> backoff(double seconds); + void cancel() { + last_dur_ = 0.0; + if (as) { + as->request_abort(); + as = std::nullopt; + } + } + }; + Timer protocol_timer; + + // TODO: Frame related implementations, probably to a separate class. + private: + bool record_io = false; + ceph::bufferlist rxbuf; + ceph::bufferlist txbuf; + + void enable_recording(); + seastar::future<Socket::tmp_buf> read_exactly(size_t bytes); + seastar::future<bufferlist> read(size_t bytes); + seastar::future<> write(bufferlist&& buf); + seastar::future<> write_flush(bufferlist&& buf); + + ceph::crypto::onwire::rxtx_t session_stream_handlers; + ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false}; + ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false}; + ceph::bufferlist rx_preamble; + ceph::msgr::v2::segment_bls_t rx_segments_data; + + size_t get_current_msg_size() const; + seastar::future<ceph::msgr::v2::Tag> read_main_preamble(); + seastar::future<> read_frame_payload(); + template <class F> + seastar::future<> write_frame(F &frame, bool flush=true); + + private: + void fault(bool backoff, const char* func_name, std::exception_ptr eptr); + void reset_session(bool full); + seastar::future<std::tuple<entity_type_t, entity_addr_t>> + banner_exchange(bool is_connect); + + enum class next_step_t { + ready, + wait, + none, // protocol should have been aborted or failed + }; + + // CONNECTING (client) + seastar::future<> handle_auth_reply(); + inline seastar::future<> client_auth() { + std::vector<uint32_t> empty; + return client_auth(empty); + } + seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods); + + seastar::future<next_step_t> process_wait(); + seastar::future<next_step_t> client_connect(); + seastar::future<next_step_t> client_reconnect(); + void execute_connecting(); + + // ACCEPTING (server) + seastar::future<> _auth_bad_method(int r); + seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more); + seastar::future<> server_auth(); + + bool validate_peer_name(const entity_name_t& peer_name) const; + seastar::future<next_step_t> send_wait(); + seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto, + bool do_reset=false, + bool reconnect=false, + uint64_t conn_seq=0, + uint64_t msg_seq=0); + + seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn); + seastar::future<next_step_t> server_connect(); + + seastar::future<next_step_t> read_reconnect(); + seastar::future<next_step_t> send_retry(uint64_t connect_seq); + seastar::future<next_step_t> send_retry_global(uint64_t global_seq); + seastar::future<next_step_t> send_reset(bool full); + seastar::future<next_step_t> server_reconnect(); + + void execute_accepting(); + + // CONNECTING/ACCEPTING + seastar::future<> finish_auth(); + + // ESTABLISHING + void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset); + + // ESTABLISHING/REPLACING (server) + seastar::future<> send_server_ident(); + + // REPLACING (server) + void trigger_replacing(bool reconnect, + bool do_reset, + SocketRef&& new_socket, + AuthConnectionMetaRef&& new_auth_meta, + ceph::crypto::onwire::rxtx_t new_rxtx, + uint64_t new_peer_global_seq, + // !reconnect + uint64_t new_client_cookie, + entity_name_t new_peer_name, + uint64_t new_conn_features, + bool tx_is_rev1, + bool rx_is_rev1, + // reconnect + uint64_t new_connect_seq, + uint64_t new_msg_seq); + + // READY + seastar::future<> read_message(utime_t throttle_stamp); + void execute_ready(bool dispatch_connect); + + // STANDBY + void execute_standby(); + + // WAIT + void execute_wait(bool max_backoff); + + // SERVER_WAIT + void execute_server_wait(); +}; + +} // namespace crimson::net diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc new file mode 100644 index 000000000..8ad106dbd --- /dev/null +++ b/src/crimson/net/Socket.cc @@ -0,0 +1,276 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Socket.h" + +#include <seastar/core/when_all.hh> + +#include "crimson/common/log.h" +#include "Errors.h" + +namespace crimson::net { + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +// an input_stream consumer that reads buffer segments into a bufferlist up to +// the given number of remaining bytes +struct bufferlist_consumer { + bufferlist& bl; + size_t& remaining; + + bufferlist_consumer(bufferlist& bl, size_t& remaining) + : bl(bl), remaining(remaining) {} + + using tmp_buf = seastar::temporary_buffer<char>; + using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type; + + // consume some or all of a buffer segment + seastar::future<consumption_result_type> operator()(tmp_buf&& data) { + if (remaining >= data.size()) { + // consume the whole buffer + remaining -= data.size(); + bl.append(buffer::create_foreign(std::move(data))); + if (remaining > 0) { + // return none to request more segments + return seastar::make_ready_future<consumption_result_type>( + seastar::continue_consuming{}); + } else { + // return an empty buffer to singal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type({})); + } + } + if (remaining > 0) { + // consume the front + bl.append(buffer::create_foreign(data.share(0, remaining))); + data.trim_front(remaining); + remaining = 0; + } + // give the rest back to signal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type{std::move(data)}); + }; +}; + +} // anonymous namespace + +seastar::future<bufferlist> Socket::read(size_t bytes) +{ +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_read).then([bytes, this] { +#endif + if (bytes == 0) { + return seastar::make_ready_future<bufferlist>(); + } + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] { + if (r.remaining) { // throw on short reads + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<bufferlist>(std::move(r.buffer)); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] (auto buf) { + return try_trap_post(next_trap_read + ).then([buf = std::move(buf)] () mutable { + return std::move(buf); + }); + }); +#endif +} + +seastar::future<seastar::temporary_buffer<char>> +Socket::read_exactly(size_t bytes) { +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_read).then([bytes, this] { +#endif + if (bytes == 0) { + return seastar::make_ready_future<seastar::temporary_buffer<char>>(); + } + return in.read_exactly(bytes).then([](auto buf) { + if (buf.empty()) { + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<tmp_buf>(std::move(buf)); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] (auto buf) { + return try_trap_post(next_trap_read + ).then([buf = std::move(buf)] () mutable { + return std::move(buf); + }); + }); +#endif +} + +void Socket::shutdown() { + socket.shutdown_input(); + socket.shutdown_output(); +} + +static inline seastar::future<> +close_and_handle_errors(seastar::output_stream<char>& out) +{ + return out.close().handle_exception_type([] (const std::system_error& e) { + if (e.code() != std::errc::broken_pipe && + e.code() != std::errc::connection_reset) { + logger().error("Socket::close(): unexpected error {}", e); + ceph_abort(); + } + // can happen when out is already shutdown, ignore + }); +} + +seastar::future<> Socket::close() { +#ifndef NDEBUG + ceph_assert(!closed); + closed = true; +#endif + return seastar::when_all_succeed( + in.close(), + close_and_handle_errors(out) + ).then_unpack([] { + return seastar::make_ready_future<>(); + }).handle_exception([] (auto eptr) { + logger().error("Socket::close(): unexpected exception {}", eptr); + ceph_abort(); + }); +} + +#ifdef UNIT_TESTS_BUILT +seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { + auto action = trap; + trap = bp_action_t::CONTINUE; + switch (action) { + case bp_action_t::CONTINUE: + break; + case bp_action_t::FAULT: + logger().info("[Test] got FAULT"); + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + case bp_action_t::BLOCK: + logger().info("[Test] got BLOCK"); + return blocker->block(); + case bp_action_t::STALL: + trap = action; + break; + default: + ceph_abort("unexpected action from trap"); + } + return seastar::make_ready_future<>(); +} + +seastar::future<> Socket::try_trap_post(bp_action_t& trap) { + auto action = trap; + trap = bp_action_t::CONTINUE; + switch (action) { + case bp_action_t::CONTINUE: + break; + case bp_action_t::STALL: + logger().info("[Test] got STALL and block"); + shutdown(); + return blocker->block(); + default: + ceph_abort("unexpected action from trap"); + } + return seastar::make_ready_future<>(); +} + +void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { + blocker = blocker_; + if (type == bp_type_t::READ) { + ceph_assert(next_trap_read == bp_action_t::CONTINUE); + next_trap_read = action; + } else { // type == bp_type_t::WRITE + if (next_trap_write == bp_action_t::CONTINUE) { + next_trap_write = action; + } else if (next_trap_write == bp_action_t::FAULT) { + // do_sweep_messages() may combine multiple write events into one socket write + ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); + } else { + ceph_abort(); + } + } +} +#endif + +FixedCPUServerSocket::listen_ertr::future<> +FixedCPUServerSocket::listen(entity_addr_t addr) +{ + assert(seastar::this_shard_id() == cpu); + logger().trace("FixedCPUServerSocket::listen({})...", addr); + return container().invoke_on_all([addr] (auto& ss) { + ss.addr = addr; + seastar::socket_address s_addr(addr.in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + lo.set_fixed_cpu(ss.cpu); + ss.listener = seastar::listen(s_addr, lo); + }).then([] { + return true; + }).handle_exception_type([addr] (const std::system_error& e) { + if (e.code() == std::errc::address_in_use) { + logger().trace("FixedCPUServerSocket::listen({}): address in use", addr); + } else { + logger().error("FixedCPUServerSocket::listen({}): " + "got unexpeted error {}", addr, e); + ceph_abort(); + } + return false; + }).then([] (bool success) -> listen_ertr::future<> { + if (success) { + return listen_ertr::now(); + } else { + return crimson::ct_error::address_in_use::make(); + } + }); +} + +seastar::future<> FixedCPUServerSocket::shutdown() +{ + assert(seastar::this_shard_id() == cpu); + logger().trace("FixedCPUServerSocket({})::shutdown()...", addr); + return container().invoke_on_all([] (auto& ss) { + if (ss.listener) { + ss.listener->abort_accept(); + } + return ss.shutdown_gate.close(); + }).then([this] { + return reset(); + }); +} + +seastar::future<> FixedCPUServerSocket::destroy() +{ + assert(seastar::this_shard_id() == cpu); + return shutdown().then([this] { + // we should only construct/stop shards on #0 + return container().invoke_on(0, [] (auto& ss) { + assert(ss.service); + return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); + }); + }); +} + +seastar::future<FixedCPUServerSocket*> FixedCPUServerSocket::create() +{ + auto cpu = seastar::this_shard_id(); + // we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [cpu] { + auto service = std::make_unique<sharded_service_t>(); + return service->start(cpu, construct_tag{} + ).then([service = std::move(service)] () mutable { + auto p_shard = service.get(); + p_shard->local().service = std::move(service); + return p_shard; + }); + }).then([] (auto p_shard) { + return &p_shard->local(); + }); +} + +} // namespace crimson::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h new file mode 100644 index 000000000..d39a2517f --- /dev/null +++ b/src/crimson/net/Socket.h @@ -0,0 +1,268 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> +#include <seastar/net/packet.hh> + +#include "include/buffer.h" + +#include "crimson/common/log.h" +#include "Errors.h" +#include "Fwd.h" + +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + +namespace crimson::net { + +class Socket; +using SocketRef = std::unique_ptr<Socket>; + +class Socket +{ + struct construct_tag {}; + + public: + // if acceptor side, peer is using a different port (ephemeral_port) + // if connector side, I'm using a different port (ephemeral_port) + enum class side_t { + acceptor, + connector + }; + + Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag) + : sid{seastar::this_shard_id()}, + socket(std::move(_socket)), + in(socket.input()), + // the default buffer size 8192 is too small that may impact our write + // performance. see seastar::net::connected_socket::output() + out(socket.output(65536)), + side(_side), + ephemeral_port(e_port) {} + + ~Socket() { +#ifndef NDEBUG + assert(closed); +#endif + } + + Socket(Socket&& o) = delete; + + static seastar::future<SocketRef> + connect(const entity_addr_t& peer_addr) { + return seastar::connect(peer_addr.in4_addr() + ).then([] (seastar::connected_socket socket) { + return std::make_unique<Socket>( + std::move(socket), side_t::connector, 0, construct_tag{}); + }); + } + + /// read the requested number of bytes into a bufferlist + seastar::future<bufferlist> read(size_t bytes); + using tmp_buf = seastar::temporary_buffer<char>; + using packet = seastar::net::packet; + seastar::future<tmp_buf> read_exactly(size_t bytes); + + seastar::future<> write(packet&& buf) { +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { +#endif + return out.write(std::move(buf)); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif + } + seastar::future<> flush() { + return out.flush(); + } + seastar::future<> write_flush(packet&& buf) { +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { +#endif + return out.write(std::move(buf)).then([this] { return out.flush(); }); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif + } + + // preemptively disable further reads or writes, can only be shutdown once. + void shutdown(); + + /// Socket can only be closed once. + seastar::future<> close(); + + // shutdown input_stream only, for tests + void force_shutdown_in() { + socket.shutdown_input(); + } + + // shutdown output_stream only, for tests + void force_shutdown_out() { + socket.shutdown_output(); + } + + side_t get_side() const { + return side; + } + + uint16_t get_ephemeral_port() const { + return ephemeral_port; + } + + // learn my ephemeral_port as connector. + // unfortunately, there's no way to identify which port I'm using as + // connector with current seastar interface. + void learn_ephemeral_port_as_connector(uint16_t port) { + assert(side == side_t::connector && + (ephemeral_port == 0 || ephemeral_port == port)); + ephemeral_port = port; + } + + private: + const seastar::shard_id sid; + seastar::connected_socket socket; + seastar::input_stream<char> in; + seastar::output_stream<char> out; + side_t side; + uint16_t ephemeral_port; + +#ifndef NDEBUG + bool closed = false; +#endif + + /// buffer state for read() + struct { + bufferlist buffer; + size_t remaining; + } r; + +#ifdef UNIT_TESTS_BUILT + public: + void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); + + private: + bp_action_t next_trap_read = bp_action_t::CONTINUE; + bp_action_t next_trap_write = bp_action_t::CONTINUE; + socket_blocker* blocker = nullptr; + seastar::future<> try_trap_pre(bp_action_t& trap); + seastar::future<> try_trap_post(bp_action_t& trap); + +#endif + friend class FixedCPUServerSocket; +}; + +class FixedCPUServerSocket + : public seastar::peering_sharded_service<FixedCPUServerSocket> { + const seastar::shard_id cpu; + entity_addr_t addr; + std::optional<seastar::server_socket> listener; + seastar::gate shutdown_gate; + + using sharded_service_t = seastar::sharded<FixedCPUServerSocket>; + std::unique_ptr<sharded_service_t> service; + + struct construct_tag {}; + + static seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); + } + + seastar::future<> reset() { + return container().invoke_on_all([] (auto& ss) { + assert(ss.shutdown_gate.is_closed()); + ss.shutdown_gate = seastar::gate(); + ss.addr = entity_addr_t(); + ss.listener.reset(); + }); + } + +public: + FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {} + ~FixedCPUServerSocket() { + assert(!listener); + // detect whether user have called destroy() properly + ceph_assert(!service); + } + + FixedCPUServerSocket(FixedCPUServerSocket&&) = delete; + FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; + FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete; + + using listen_ertr = crimson::errorator< + crimson::ct_error::address_in_use // The address is already bound + >; + listen_ertr::future<> listen(entity_addr_t addr); + + // fn_accept should be a nothrow function of type + // seastar::future<>(SocketRef, entity_addr_t) + template <typename Func> + seastar::future<> accept(Func&& fn_accept) { + assert(seastar::this_shard_id() == cpu); + logger().trace("FixedCPUServerSocket({})::accept()...", addr); + return container().invoke_on_all( + [fn_accept = std::move(fn_accept)] (auto& ss) mutable { + assert(ss.listener); + // gate accepting + // FixedCPUServerSocket::shutdown() will drain the continuations in the gate + // so ignore the returned future + std::ignore = seastar::with_gate(ss.shutdown_gate, + [&ss, fn_accept = std::move(fn_accept)] () mutable { + return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable { + return ss.listener->accept().then( + [&ss, fn_accept = std::move(fn_accept)] + (seastar::accept_result accept_result) mutable { + // assert seastar::listen_options::set_fixed_cpu() works + assert(seastar::this_shard_id() == ss.cpu); + auto [socket, paddr] = std::move(accept_result); + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + peer_addr.set_type(entity_addr_t::TYPE_ANY); + SocketRef _socket = std::make_unique<Socket>( + std::move(socket), Socket::side_t::acceptor, + peer_addr.get_port(), Socket::construct_tag{}); + std::ignore = seastar::with_gate(ss.shutdown_gate, + [socket = std::move(_socket), peer_addr, + &ss, fn_accept = std::move(fn_accept)] () mutable { + logger().trace("FixedCPUServerSocket({})::accept(): " + "accepted peer {}", ss.addr, peer_addr); + return fn_accept(std::move(socket), peer_addr + ).handle_exception([&ss, peer_addr] (auto eptr) { + logger().error("FixedCPUServerSocket({})::accept(): " + "fn_accept(s, {}) got unexpected exception {}", + ss.addr, peer_addr, eptr); + ceph_abort(); + }); + }); + }); + }).handle_exception_type([&ss] (const std::system_error& e) { + if (e.code() == std::errc::connection_aborted || + e.code() == std::errc::invalid_argument) { + logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})", + ss.addr, e); + } else { + throw; + } + }).handle_exception([&ss] (auto eptr) { + logger().error("FixedCPUServerSocket({})::accept(): " + "got unexpected exception {}", ss.addr, eptr); + ceph_abort(); + }); + }); + }); + } + + seastar::future<> shutdown(); + seastar::future<> destroy(); + static seastar::future<FixedCPUServerSocket*> create(); +}; + +} // namespace crimson::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc new file mode 100644 index 000000000..623dca32f --- /dev/null +++ b/src/crimson/net/SocketConnection.cc @@ -0,0 +1,150 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketConnection.h" + +#include "ProtocolV1.h" +#include "ProtocolV2.h" +#include "SocketMessenger.h" + +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + +using namespace crimson::net; +using crimson::common::local_conf; + +SocketConnection::SocketConnection(SocketMessenger& messenger, + ChainedDispatchers& dispatchers, + bool is_msgr2) + : messenger(messenger) +{ + if (is_msgr2) { + protocol = std::make_unique<ProtocolV2>(dispatchers, *this, messenger); + } else { + protocol = std::make_unique<ProtocolV1>(dispatchers, *this, messenger); + } +#ifdef UNIT_TESTS_BUILT + if (messenger.interceptor) { + interceptor = messenger.interceptor; + interceptor->register_conn(*this); + } +#endif +} + +SocketConnection::~SocketConnection() {} + +crimson::net::Messenger* +SocketConnection::get_messenger() const { + return &messenger; +} + +bool SocketConnection::is_connected() const +{ + assert(seastar::this_shard_id() == shard_id()); + return protocol->is_connected(); +} + +#ifdef UNIT_TESTS_BUILT +bool SocketConnection::is_closed() const +{ + assert(seastar::this_shard_id() == shard_id()); + return protocol->is_closed(); +} + +bool SocketConnection::is_closed_clean() const +{ + assert(seastar::this_shard_id() == shard_id()); + return protocol->is_closed_clean; +} + +#endif +bool SocketConnection::peer_wins() const +{ + return (messenger.get_myaddr() > peer_addr || policy.server); +} + +seastar::future<> SocketConnection::send(MessageRef msg) +{ + assert(seastar::this_shard_id() == shard_id()); + return protocol->send(std::move(msg)); +} + +seastar::future<> SocketConnection::keepalive() +{ + assert(seastar::this_shard_id() == shard_id()); + return protocol->keepalive(); +} + +void SocketConnection::mark_down() +{ + assert(seastar::this_shard_id() == shard_id()); + protocol->close(false); +} + +bool SocketConnection::update_rx_seq(seq_num_t seq) +{ + if (seq <= in_seq) { + if (HAVE_FEATURE(features, RECONNECT_SEQ) && + local_conf()->ms_die_on_old_message) { + ceph_abort_msg("old msgs despite reconnect_seq feature"); + } + return false; + } else if (seq > in_seq + 1) { + if (local_conf()->ms_die_on_skipped_message) { + ceph_abort_msg("skipped incoming seq"); + } + return false; + } else { + in_seq = seq; + return true; + } +} + +void +SocketConnection::start_connect(const entity_addr_t& _peer_addr, + const entity_name_t& _peer_name) +{ + protocol->start_connect(_peer_addr, _peer_name); +} + +void +SocketConnection::start_accept(SocketRef&& sock, + const entity_addr_t& _peer_addr) +{ + protocol->start_accept(std::move(sock), _peer_addr); +} + +seastar::future<> +SocketConnection::close_clean(bool dispatch_reset) +{ + return protocol->close_clean(dispatch_reset); +} + +seastar::shard_id SocketConnection::shard_id() const { + return messenger.shard_id(); +} + +void SocketConnection::print(ostream& out) const { + messenger.print(out); + if (!protocol->socket) { + out << " >> " << get_peer_name() << " " << peer_addr; + } else if (protocol->socket->get_side() == Socket::side_t::acceptor) { + out << " >> " << get_peer_name() << " " << peer_addr + << "@" << protocol->socket->get_ephemeral_port(); + } else { // protocol->socket->get_side() == Socket::side_t::connector + out << "@" << protocol->socket->get_ephemeral_port() + << " >> " << get_peer_name() << " " << peer_addr; + } +} diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h new file mode 100644 index 000000000..9c977c7cf --- /dev/null +++ b/src/crimson/net/SocketConnection.h @@ -0,0 +1,106 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/sharded.hh> + +#include "msg/Policy.h" +#include "crimson/common/throttle.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Socket.h" + +namespace crimson::net { + +class Protocol; +class SocketMessenger; +class SocketConnection; +using SocketConnectionRef = seastar::shared_ptr<SocketConnection>; + +class SocketConnection : public Connection { + SocketMessenger& messenger; + std::unique_ptr<Protocol> protocol; + + ceph::net::Policy<crimson::common::Throttle> policy; + + /// the seq num of the last transmitted message + seq_num_t out_seq = 0; + /// the seq num of the last received message + seq_num_t in_seq = 0; + /// update the seq num of last received message + /// @returns true if the @c seq is valid, and @c in_seq is updated, + /// false otherwise. + bool update_rx_seq(seq_num_t seq); + + // messages to be resent after connection gets reset + std::deque<MessageRef> out_q; + std::deque<MessageRef> pending_q; + // messages sent, but not yet acked by peer + std::deque<MessageRef> sent; + + seastar::shard_id shard_id() const; + + public: + SocketConnection(SocketMessenger& messenger, + ChainedDispatchers& dispatchers, + bool is_msgr2); + ~SocketConnection() override; + + Messenger* get_messenger() const override; + + bool is_connected() const override; + +#ifdef UNIT_TESTS_BUILT + bool is_closed_clean() const override; + + bool is_closed() const override; + + bool peer_wins() const override; +#else + bool peer_wins() const; +#endif + + seastar::future<> send(MessageRef msg) override; + + seastar::future<> keepalive() override; + + void mark_down() override; + + void print(ostream& out) const override; + + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name); + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + void start_accept(SocketRef&& socket, + const entity_addr_t& peer_addr); + + seastar::future<> close_clean(bool dispatch_reset); + + bool is_server_side() const { + return policy.server; + } + + bool is_lossy() const { + return policy.lossy; + } + + friend class Protocol; + friend class ProtocolV1; + friend class ProtocolV2; +}; + +} // namespace crimson::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc new file mode 100644 index 000000000..db9421e79 --- /dev/null +++ b/src/crimson/net/SocketMessenger.cc @@ -0,0 +1,351 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketMessenger.h" + +#include <tuple> +#include <boost/functional/hash.hpp> + +#include "auth/Auth.h" +#include "Errors.h" +#include "Socket.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); + } +} + +namespace crimson::net { + +SocketMessenger::SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce) + : Messenger{myname}, + master_sid{seastar::this_shard_id()}, + logic_name{logic_name}, + nonce{nonce} +{} + +seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) +{ + assert(seastar::this_shard_id() == master_sid); + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } + return Messenger::set_myaddrs(my_addrs); +} + +SocketMessenger::bind_ertr::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs) +{ + assert(seastar::this_shard_id() == master_sid); + ceph_assert(addrs.front().get_family() == AF_INET); + return set_myaddrs(addrs).then([this] { + if (!listener) { + return FixedCPUServerSocket::create().then([this] (auto _listener) { + listener = _listener; + }); + } else { + return seastar::now(); + } + }).then([this] () -> bind_ertr::future<> { + const entity_addr_t listen_addr = get_myaddr(); + logger().debug("{} do_bind: try listen {}...", *this, listen_addr); + if (!listener) { + logger().warn("{} do_bind: listener doesn't exist", *this); + return bind_ertr::now(); + } + return listener->listen(listen_addr); + }); +} + +SocketMessenger::bind_ertr::future<> +SocketMessenger::bind(const entity_addrvec_t& addrs) +{ + return do_bind(addrs).safe_then([this] { + logger().info("{} bind: done", *this); + }); +} + +SocketMessenger::bind_ertr::future<> +SocketMessenger::try_bind(const entity_addrvec_t& addrs, + uint32_t min_port, uint32_t max_port) +{ + auto addr = addrs.front(); + if (addr.get_port() != 0) { + return do_bind(addrs).safe_then([this] { + logger().info("{} try_bind: done", *this); + }); + } + ceph_assert(min_port <= max_port); + return seastar::do_with(uint32_t(min_port), + [this, max_port, addr] (auto& port) { + return seastar::repeat_until_value([this, max_port, addr, &port] { + auto to_bind = addr; + to_bind.set_port(port); + return do_bind(entity_addrvec_t{to_bind} + ).safe_then([this] () -> seastar::future<std::optional<bool>> { + logger().info("{} try_bind: done", *this); + return seastar::make_ready_future<std::optional<bool>>( + std::make_optional<bool>(true)); + }, bind_ertr::all_same_way([this, max_port, &port] + (const std::error_code& e) mutable + -> seastar::future<std::optional<bool>> { + assert(e == std::errc::address_in_use); + logger().trace("{} try_bind: {} already used", *this, port); + if (port == max_port) { + return seastar::make_ready_future<std::optional<bool>>( + std::make_optional<bool>(false)); + } + ++port; + return seastar::make_ready_future<std::optional<bool>>(); + })); + }).then([] (bool success) -> bind_ertr::future<> { + if (success) { + return bind_ertr::now(); + } else { + return crimson::ct_error::address_in_use::make(); + } + }); + }); +} + +seastar::future<> SocketMessenger::start( + const dispatchers_t& _dispatchers) { + assert(seastar::this_shard_id() == master_sid); + + dispatchers.assign(_dispatchers); + if (listener) { + // make sure we have already bound to a valid address + ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2()); + ceph_assert(get_myaddr().get_port() > 0); + + return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) { + assert(seastar::this_shard_id() == master_sid); + SocketConnectionRef conn = seastar::make_shared<SocketConnection>( + *this, dispatchers, get_myaddr().is_msgr2()); + conn->start_accept(std::move(socket), peer_addr); + return seastar::now(); + }); + } + return seastar::now(); +} + +crimson::net::ConnectionRef +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) +{ + assert(seastar::this_shard_id() == master_sid); + + // make sure we connect to a valid peer_addr + ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2()); + ceph_assert(peer_addr.get_port() > 0); + + if (auto found = lookup_conn(peer_addr); found) { + logger().debug("{} connect to existing", *found); + return found->shared_from_this(); + } + SocketConnectionRef conn = seastar::make_shared<SocketConnection>( + *this, dispatchers, peer_addr.is_msgr2()); + conn->start_connect(peer_addr, peer_name); + return conn->shared_from_this(); +} + +seastar::future<> SocketMessenger::shutdown() +{ + assert(seastar::this_shard_id() == master_sid); + return seastar::futurize_invoke([this] { + assert(dispatchers.empty()); + if (listener) { + auto d_listener = listener; + listener = nullptr; + return d_listener->destroy(); + } else { + return seastar::now(); + } + // close all connections + }).then([this] { + return seastar::parallel_for_each(accepting_conns, [] (auto conn) { + return conn->close_clean(false); + }); + }).then([this] { + ceph_assert(accepting_conns.empty()); + return seastar::parallel_for_each(connections, [] (auto conn) { + return conn.second->close_clean(false); + }); + }).then([this] { + return seastar::parallel_for_each(closing_conns, [] (auto conn) { + return conn->close_clean(false); + }); + }).then([this] { + ceph_assert(connections.empty()); + shutdown_promise.set_value(); + }); +} + +seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn) +{ + assert(seastar::this_shard_id() == master_sid); + if (!need_addr) { + if ((!get_myaddr().is_any() && + get_myaddr().get_type() != peer_addr_for_me.get_type()) || + get_myaddr().get_family() != peer_addr_for_me.get_family() || + !get_myaddr().is_same_host(peer_addr_for_me)) { + logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + return seastar::now(); + } + + if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) { + // Not bound + entity_addr_t addr = peer_addr_for_me; + addr.set_type(entity_addr_t::TYPE_ANY); + addr.set_port(0); + need_addr = false; + return set_myaddrs(entity_addrvec_t{addr} + ).then([this, &conn, peer_addr_for_me] { + logger().info("{} learned myaddr={} (unbound) from {}", + conn, get_myaddr(), peer_addr_for_me); + }); + } else { + // Already bound + if (!get_myaddr().is_any() && + get_myaddr().get_type() != peer_addr_for_me.get_type()) { + logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (get_myaddr().get_family() != peer_addr_for_me.get_family()) { + logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (get_myaddr().is_blank_ip()) { + entity_addr_t addr = peer_addr_for_me; + addr.set_type(get_myaddr().get_type()); + addr.set_port(get_myaddr().get_port()); + need_addr = false; + return set_myaddrs(entity_addrvec_t{addr} + ).then([this, &conn, peer_addr_for_me] { + logger().info("{} learned myaddr={} (blank IP) from {}", + conn, get_myaddr(), peer_addr_for_me); + }); + } else if (!get_myaddr().is_same_host(peer_addr_for_me)) { + logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } else { + need_addr = false; + return seastar::now(); + } + } +} + +SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const +{ + return policy_set.get(peer_type); +} + +SocketPolicy SocketMessenger::get_default_policy() const +{ + return policy_set.get_default(); +} + +void SocketMessenger::set_default_policy(const SocketPolicy& p) +{ + policy_set.set_default(p); +} + +void SocketMessenger::set_policy(entity_type_t peer_type, + const SocketPolicy& p) +{ + policy_set.set(peer_type, p); +} + +void SocketMessenger::set_policy_throttler(entity_type_t peer_type, + Throttle* throttle) +{ + // only byte throttler is used in OSD + policy_set.set_throttlers(peer_type, throttle, nullptr); +} + +crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) +{ + if (auto found = connections.find(addr); + found != connections.end()) { + return found->second; + } else { + return nullptr; + } +} + +void SocketMessenger::accept_conn(SocketConnectionRef conn) +{ + accepting_conns.insert(conn); +} + +void SocketMessenger::unaccept_conn(SocketConnectionRef conn) +{ + accepting_conns.erase(conn); +} + +void SocketMessenger::register_conn(SocketConnectionRef conn) +{ + auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); + std::ignore = i; + ceph_assert(added); +} + +void SocketMessenger::unregister_conn(SocketConnectionRef conn) +{ + ceph_assert(conn); + auto found = connections.find(conn->get_peer_addr()); + ceph_assert(found != connections.end()); + ceph_assert(found->second == conn); + connections.erase(found); +} + +void SocketMessenger::closing_conn(SocketConnectionRef conn) +{ + closing_conns.push_back(conn); +} + +void SocketMessenger::closed_conn(SocketConnectionRef conn) +{ + for (auto it = closing_conns.begin(); + it != closing_conns.end();) { + if (*it == conn) { + it = closing_conns.erase(it); + } else { + it++; + } + } +} + +seastar::future<uint32_t> +SocketMessenger::get_global_seq(uint32_t old) +{ + if (old > global_seq) { + global_seq = old; + } + return seastar::make_ready_future<uint32_t>(++global_seq); +} + +} // namespace crimson::net diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h new file mode 100644 index 000000000..44c1d3c21 --- /dev/null +++ b/src/crimson/net/SocketMessenger.h @@ -0,0 +1,122 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <map> +#include <set> +#include <vector> +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> +#include <seastar/core/shared_future.hh> + +#include "crimson/net/chained_dispatchers.h" +#include "Messenger.h" +#include "SocketConnection.h" + +namespace crimson::net { + +class FixedCPUServerSocket; + +class SocketMessenger final : public Messenger { + const seastar::shard_id master_sid; + seastar::promise<> shutdown_promise; + + FixedCPUServerSocket* listener = nullptr; + ChainedDispatchers dispatchers; + std::map<entity_addr_t, SocketConnectionRef> connections; + std::set<SocketConnectionRef> accepting_conns; + std::vector<SocketConnectionRef> closing_conns; + ceph::net::PolicySet<Throttle> policy_set; + // Distinguish messengers with meaningful names for debugging + const std::string logic_name; + const uint32_t nonce; + // specifying we haven't learned our addr; set false when we find it. + bool need_addr = true; + uint32_t global_seq = 0; + bool started = false; + + bind_ertr::future<> do_bind(const entity_addrvec_t& addr); + + public: + SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce); + ~SocketMessenger() override { ceph_assert(!listener); } + + seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override; + + // Messenger interfaces are assumed to be called from its own shard, but its + // behavior should be symmetric when called from any shard. + bind_ertr::future<> bind(const entity_addrvec_t& addr) override; + + bind_ertr::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) override; + + seastar::future<> start(const dispatchers_t& dispatchers) override; + + ConnectionRef connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) override; + // can only wait once + seastar::future<> wait() override { + assert(seastar::this_shard_id() == master_sid); + return shutdown_promise.get_future(); + } + + void stop() override { + dispatchers.clear(); + } + + bool is_started() const override { + return !dispatchers.empty(); + } + + seastar::future<> shutdown() override; + + void print(ostream& out) const override { + out << get_myname() + << "(" << logic_name + << ") " << get_myaddr(); + } + + SocketPolicy get_policy(entity_type_t peer_type) const override; + + SocketPolicy get_default_policy() const override; + + void set_default_policy(const SocketPolicy& p) override; + + void set_policy(entity_type_t peer_type, const SocketPolicy& p) override; + + void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; + + public: + seastar::future<uint32_t> get_global_seq(uint32_t old=0); + seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me, + const SocketConnection& conn); + + SocketConnectionRef lookup_conn(const entity_addr_t& addr); + void accept_conn(SocketConnectionRef); + void unaccept_conn(SocketConnectionRef); + void register_conn(SocketConnectionRef); + void unregister_conn(SocketConnectionRef); + void closing_conn(SocketConnectionRef); + void closed_conn(SocketConnectionRef); + seastar::shard_id shard_id() const { + assert(seastar::this_shard_id() == master_sid); + return master_sid; + } +}; + +} // namespace crimson::net diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc new file mode 100644 index 000000000..b13d40c8f --- /dev/null +++ b/src/crimson/net/chained_dispatchers.cc @@ -0,0 +1,93 @@ +#include "crimson/common/log.h" +#include "crimson/net/chained_dispatchers.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "msg/Message.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); + } +} + +namespace crimson::net { + +seastar::future<> +ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn, + MessageRef m) { + try { + for (auto& dispatcher : dispatchers) { + auto dispatched = dispatcher->ms_dispatch(conn, m); + if (dispatched.has_value()) { + return std::move(*dispatched + ).handle_exception([conn] (std::exception_ptr eptr) { + logger().error("{} got unexpected exception in ms_dispatch() throttling {}", + *conn, eptr); + ceph_abort(); + }); + } + } + } catch (...) { + logger().error("{} got unexpected exception in ms_dispatch() {}", + *conn, std::current_exception()); + ceph_abort(); + } + if (!dispatchers.empty()) { + logger().error("ms_dispatch unhandled message {}", *m); + } + return seastar::now(); +} + +void +ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_accept(conn); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_accept() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) { + try { + for(auto& dispatcher : dispatchers) { + dispatcher->ms_handle_connect(conn); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_connect() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_reset(conn, is_replace); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_reset() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_remote_reset(conn); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_remote_reset() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +} diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h new file mode 100644 index 000000000..712b0894b --- /dev/null +++ b/src/crimson/net/chained_dispatchers.h @@ -0,0 +1,36 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "Fwd.h" +#include "crimson/common/log.h" + +namespace crimson::net { + +class Dispatcher; + +class ChainedDispatchers { +public: + void assign(const dispatchers_t& _dispatchers) { + assert(empty()); + assert(!_dispatchers.empty()); + dispatchers = _dispatchers; + } + void clear() { + dispatchers.clear(); + } + bool empty() const { + return dispatchers.empty(); + } + seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef); + void ms_handle_accept(crimson::net::ConnectionRef conn); + void ms_handle_connect(crimson::net::ConnectionRef conn); + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace); + void ms_handle_remote_reset(crimson::net::ConnectionRef conn); + + private: + dispatchers_t dispatchers; +}; + +} |