diff options
Diffstat (limited to 'src/crimson/net/ProtocolV1.cc')
-rw-r--r-- | src/crimson/net/ProtocolV1.cc | 1014 |
1 files changed, 1014 insertions, 0 deletions
diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc new file mode 100644 index 000000000..3c604240d --- /dev/null +++ b/src/crimson/net/ProtocolV1.cc @@ -0,0 +1,1014 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ProtocolV1.h" + +#include <seastar/core/shared_future.hh> +#include <seastar/core/sleep.hh> +#include <seastar/net/packet.hh> + +#include "include/msgr.h" +#include "include/random.h" +#include "auth/Auth.h" +#include "auth/AuthSessionHandler.h" + +#include "crimson/auth/AuthClient.h" +#include "crimson/auth/AuthServer.h" +#include "crimson/common/log.h" +#include "chained_dispatchers.h" +#include "Errors.h" +#include "Socket.h" +#include "SocketConnection.h" +#include "SocketMessenger.h" + +WRITE_RAW_ENCODER(ceph_msg_connect); +WRITE_RAW_ENCODER(ceph_msg_connect_reply); + +using crimson::common::local_conf; + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) +{ + return out << "connect{features=" << std::hex << c.features << std::dec + << " host_type=" << c.host_type + << " global_seq=" << c.global_seq + << " connect_seq=" << c.connect_seq + << " protocol_version=" << c.protocol_version + << " authorizer_protocol=" << c.authorizer_protocol + << " authorizer_len=" << c.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}'; +} + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) +{ + return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag) + << " features=" << std::hex << r.features << std::dec + << " global_seq=" << r.global_seq + << " connect_seq=" << r.connect_seq + << " protocol_version=" << r.protocol_version + << " authorizer_len=" << r.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}'; +} + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +template <typename T> +seastar::net::packet make_static_packet(const T& value) { + return { reinterpret_cast<const char*>(&value), sizeof(value) }; +} + +// store the banner in a non-const string for buffer::create_static() +char banner[] = CEPH_BANNER; +constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; + +constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); +constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); + +// check that the buffer starts with a valid banner without requiring it to +// be contiguous in memory +void validate_banner(bufferlist::const_iterator& p) +{ + auto b = std::cbegin(banner); + auto end = b + banner_size; + while (b != end) { + const char *buf{nullptr}; + auto remaining = std::distance(b, end); + auto len = p.get_ptr_and_advance(remaining, &buf); + if (!std::equal(buf, buf + len, b)) { + throw std::system_error( + make_error_code(crimson::net::error::bad_connect_banner)); + } + b += len; + } +} + +// return a static bufferptr to the given object +template <typename T> +bufferptr create_static(T& obj) +{ + return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj)); +} + +uint32_t get_proto_version(entity_type_t peer_type, bool connect) +{ + constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; + // see also OSD.h, unlike other connection of simple/async messenger, + // crimson msgr is only used by osd + constexpr uint32_t CEPH_OSD_PROTOCOL = 10; + if (peer_type == my_type) { + // internal + return CEPH_OSD_PROTOCOL; + } else { + // public + switch (connect ? peer_type : my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + default: return 0; + } + } +} + +void discard_up_to(std::deque<MessageRef>* queue, + crimson::net::seq_num_t seq) +{ + while (!queue->empty() && + queue->front()->get_seq() < seq) { + queue->pop_front(); + } +} + +} // namespace anonymous + +namespace crimson::net { + +ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers, + SocketConnection& conn, + SocketMessenger& messenger) + : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {} + +ProtocolV1::~ProtocolV1() {} + +bool ProtocolV1::is_connected() const +{ + return state == state_t::open; +} + +// connecting state + +void ProtocolV1::reset_session() +{ + conn.out_q = {}; + conn.sent = {}; + conn.in_seq = 0; + h.connect_seq = 0; + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + // Set out_seq to a random value, so CRC won't be predictable. + // Constant to limit starting sequence number to 2^31. Nothing special + // about it, just a big number. + constexpr uint64_t SEQ_MASK = 0x7fffffff; + conn.out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK); + } else { + // previously, seq #'s always started at 0. + conn.out_seq = 0; + } +} + +seastar::future<stop_t> +ProtocolV1::handle_connect_reply(msgr_tag_t tag) +{ + if (h.auth_payload.length() && !conn.peer_is_mon()) { + if (tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { // more + h.auth_more = messenger.get_auth_client()->handle_auth_reply_more( + conn.shared_from_this(), auth_meta, h.auth_payload); + return seastar::make_ready_future<stop_t>(stop_t::no); + } else { + int ret = messenger.get_auth_client()->handle_auth_done( + conn.shared_from_this(), auth_meta, 0, 0, h.auth_payload); + if (ret < 0) { + // fault + logger().warn("{} AuthClient::handle_auth_done() return {}", conn, ret); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + } + } + + switch (tag) { + case CEPH_MSGR_TAG_FEATURES: + logger().error("{} connect protocol feature mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADPROTOVER: + logger().error("{} connect protocol version mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADAUTHORIZER: + logger().error("{} got bad authorizer", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_RESETSESSION: + reset_session(); + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_RETRY_GLOBAL: + return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) { + h.global_seq = gs; + return seastar::make_ready_future<stop_t>(stop_t::no); + }); + case CEPH_MSGR_TAG_RETRY_SESSION: + ceph_assert(h.reply.connect_seq > h.connect_seq); + h.connect_seq = h.reply.connect_seq; + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_WAIT: + // TODO: state wait + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_SEQ: + case CEPH_MSGR_TAG_READY: + if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features); + missing) { + logger().error("{} missing required features", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + return seastar::futurize_invoke([this, tag] { + if (tag == CEPH_MSGR_TAG_SEQ) { + return socket->read_exactly(sizeof(seq_num_t)) + .then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&conn.out_q, *acked_seq); + return socket->write_flush(make_static_packet(conn.in_seq)); + }); + } + // tag CEPH_MSGR_TAG_READY + return seastar::now(); + }).then([this] { + // hooray! + h.peer_global_seq = h.reply.global_seq; + conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; + h.connect_seq++; + h.backoff = 0ms; + conn.set_features(h.reply.features & h.connect.features); + if (auth_meta->authorizer) { + session_security.reset( + get_auth_session_handler(nullptr, + auth_meta->authorizer->protocol, + auth_meta->session_key, + conn.features)); + } else { + session_security.reset(); + } + return seastar::make_ready_future<stop_t>(stop_t::yes); + }); + break; + default: + // unknown tag + logger().error("{} got unknown tag", __func__, int(tag)); + throw std::system_error(make_error_code(error::negotiation_failure)); + } +} + +ceph::bufferlist ProtocolV1::get_auth_payload() +{ + // only non-mons connectings to mons use MAuth messages + if (conn.peer_is_mon() && + messenger.get_mytype() != CEPH_ENTITY_TYPE_MON) { + return {}; + } else { + if (h.auth_more.length()) { + logger().info("using augmented (challenge) auth payload"); + return std::move(h.auth_more); + } else { + auto [auth_method, preferred_modes, auth_bl] = + messenger.get_auth_client()->get_auth_request( + conn.shared_from_this(), auth_meta); + auth_meta->auth_method = auth_method; + return auth_bl; + } + } +} + +seastar::future<stop_t> +ProtocolV1::repeat_connect() +{ + // encode ceph_msg_connect + memset(&h.connect, 0, sizeof(h.connect)); + h.connect.features = conn.policy.features_supported; + h.connect.host_type = messenger.get_myname().type(); + h.connect.global_seq = h.global_seq; + h.connect.connect_seq = h.connect_seq; + h.connect.protocol_version = get_proto_version(conn.get_peer_type(), true); + // this is fyi, actually, server decides! + h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; + + ceph_assert(messenger.get_auth_client()); + + bufferlist bl; + bufferlist auth_bl = get_auth_payload(); + if (auth_bl.length()) { + h.connect.authorizer_protocol = auth_meta->auth_method; + h.connect.authorizer_len = auth_bl.length(); + bl.append(create_static(h.connect)); + bl.claim_append(auth_bl); + } else { + h.connect.authorizer_protocol = 0; + h.connect.authorizer_len = 0; + bl.append(create_static(h.connect)); + }; + return socket->write_flush(std::move(bl)) + .then([this] { + // read the reply + return socket->read(sizeof(h.reply)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.reply, p); + ceph_assert(p.end()); + return socket->read(h.reply.authorizer_len); + }).then([this] (bufferlist bl) { + h.auth_payload = std::move(bl); + return handle_connect_reply(h.reply.tag); + }); +} + +void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, + const entity_name_t& _peer_name) +{ + ceph_assert(state == state_t::none); + logger().trace("{} trigger connecting, was {}", conn, static_cast<int>(state)); + state = state_t::connecting; + set_write_state(write_state_t::delay); + + ceph_assert(!socket); + ceph_assert(!gate.is_closed()); + conn.peer_addr = _peer_addr; + conn.target_addr = _peer_addr; + conn.set_peer_name(_peer_name); + conn.policy = messenger.get_policy(_peer_name.type()); + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + gate.dispatch_in_background("start_connect", *this, [this] { + return Socket::connect(conn.peer_addr) + .then([this](SocketRef sock) { + socket = std::move(sock); + if (state != state_t::connecting) { + assert(state == state_t::closing); + return socket->close().then([] { + throw std::system_error(make_error_code(error::protocol_aborted)); + }); + } + return seastar::now(); + }).then([this] { + return messenger.get_global_seq(); + }).then([this] (auto gs) { + h.global_seq = gs; + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + ceph_assert(p.end()); + if (saddr != conn.peer_addr) { + logger().error("{} my peer_addr {} doesn't match what peer advertized {}", + conn, conn.peer_addr, saddr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (state != state_t::connecting) { + assert(state == state_t::closing); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + socket->learn_ephemeral_port_as_connector(caddr.get_port()); + if (unlikely(caddr.is_msgr2())) { + logger().warn("{} peer sent a v2 address for me: {}", + conn, caddr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + caddr.set_type(entity_addr_t::TYPE_LEGACY); + return messenger.learned_addr(caddr, conn); + }).then([this] { + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + return socket->write_flush(std::move(bl)); + }).then([=] { + return seastar::repeat([this] { + return repeat_connect(); + }); + }).then([this] { + if (state != state_t::connecting) { + assert(state == state_t::closing); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + execute_open(open_t::connected); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the connecting state + logger().warn("{} connecting fault: {}", conn, eptr); + close(true); + }); + }); +} + +// accepting state + +seastar::future<stop_t> ProtocolV1::send_connect_reply( + msgr_tag_t tag, bufferlist&& authorizer_reply) +{ + h.reply.tag = tag; + h.reply.features = static_cast<uint64_t>((h.connect.features & + conn.policy.features_supported) | + conn.policy.features_required); + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + return socket->write_flush(std::move(reply)); + }).then([] { + return stop_t::no; + }); +} + +seastar::future<stop_t> ProtocolV1::send_connect_reply_ready( + msgr_tag_t tag, bufferlist&& authorizer_reply) +{ + return messenger.get_global_seq( + ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) { + h.global_seq = gs; + h.reply.tag = tag; + h.reply.features = conn.policy.features_supported; + h.reply.global_seq = h.global_seq; + h.reply.connect_seq = h.connect_seq; + h.reply.flags = 0; + if (conn.policy.lossy) { + h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; + } + h.reply.authorizer_len = auth_len; + + session_security.reset( + get_auth_session_handler(nullptr, + auth_meta->auth_method, + auth_meta->session_key, + conn.features)); + + return socket->write(make_static_packet(h.reply)); + }).then([this, reply=std::move(authorizer_reply)]() mutable { + if (reply.length()) { + return socket->write(std::move(reply)); + } else { + return seastar::now(); + } + }).then([this] { + if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { + return socket->write_flush(make_static_packet(conn.in_seq)) + .then([this] { + return socket->read_exactly(sizeof(seq_num_t)); + }).then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&conn.out_q, *acked_seq); + }); + } else { + return socket->flush(); + } + }).then([] { + return stop_t::yes; + }); +} + +seastar::future<stop_t> ProtocolV1::replace_existing( + SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer) +{ + msgr_tag_t reply_tag; + if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && + !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + } else { + reply_tag = CEPH_MSGR_TAG_READY; + } + if (!existing->is_lossy()) { + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + } + existing->protocol->close(true); + return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); +} + +seastar::future<stop_t> ProtocolV1::handle_connect_with_existing( + SocketConnectionRef existing, bufferlist&& authorizer_reply) +{ + ProtocolV1 *exproto = dynamic_cast<ProtocolV1*>(existing->protocol.get()); + + if (h.connect.global_seq < exproto->peer_global_seq()) { + h.reply.global_seq = exproto->peer_global_seq(); + return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); + } else if (existing->is_lossy()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) { + return replace_existing(existing, std::move(authorizer_reply), true); + } else if (h.connect.connect_seq < exproto->connect_seq()) { + // old attempt, or we sent READY but they didn't get it. + h.reply.connect_seq = exproto->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } else if (h.connect.connect_seq == exproto->connect_seq()) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (exproto->get_state() == state_t::open || + exproto->get_state() == state_t::standby) { + if (conn.policy.resetcheck && exproto->connect_seq() == 0) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + h.reply.connect_seq = exproto->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } + } else if (existing->peer_wins()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + return send_connect_reply(CEPH_MSGR_TAG_WAIT); + } + } else if (conn.policy.resetcheck && + exproto->connect_seq() == 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); + } else { + return replace_existing(existing, std::move(authorizer_reply)); + } +} + +bool ProtocolV1::require_auth_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (local_conf()->cephx_require_signatures) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS || + h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { + return local_conf()->cephx_cluster_require_signatures; + } else { + return local_conf()->cephx_service_require_signatures; + } +} + +bool ProtocolV1::require_cephx_v2_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (local_conf()->cephx_require_version >= 2) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS || + h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { + return local_conf()->cephx_cluster_require_version >= 2; + } else { + return local_conf()->cephx_service_require_version >= 2; + } +} + +seastar::future<stop_t> ProtocolV1::repeat_handle_connect() +{ + return socket->read(sizeof(h.connect)) + .then([this](bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.connect, p); + if (conn.get_peer_type() != 0 && + conn.get_peer_type() != h.connect.host_type) { + logger().error("{} repeat_handle_connect(): my peer type does not match" + " what peer advertises {} != {}", + conn, conn.get_peer_type(), h.connect.host_type); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + conn.set_peer_type(h.connect.host_type); + conn.policy = messenger.get_policy(h.connect.host_type); + if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { + logger().error("{} we don't know how to reconnect to peer {}", + conn, conn.target_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + return socket->read(h.connect.authorizer_len); + }).then([this] (bufferlist authorizer) { + memset(&h.reply, 0, sizeof(h.reply)); + // TODO: set reply.protocol_version + if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { + return send_connect_reply( + CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); + } + if (require_auth_feature()) { + conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (require_cephx_v2_feature()) { + conn.policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2; + } + if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features; + feat_missing != 0) { + return send_connect_reply( + CEPH_MSGR_TAG_FEATURES, bufferlist{}); + } + + bufferlist authorizer_reply; + auth_meta->auth_method = h.connect.authorizer_protocol; + if (!HAVE_FEATURE((uint64_t)h.connect.features, CEPHX_V2)) { + // peer doesn't support it and we won't get here if we require it + auth_meta->skip_authorizer_challenge = true; + } + auto more = static_cast<bool>(auth_meta->authorizer_challenge); + ceph_assert(messenger.get_auth_server()); + int r = messenger.get_auth_server()->handle_auth_request( + conn.shared_from_this(), auth_meta, more, auth_meta->auth_method, authorizer, + &authorizer_reply); + + if (r < 0) { + session_security.reset(); + return send_connect_reply( + CEPH_MSGR_TAG_BADAUTHORIZER, std::move(authorizer_reply)); + } else if (r == 0) { + ceph_assert(authorizer_reply.length()); + return send_connect_reply( + CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, std::move(authorizer_reply)); + } + + // r > 0 + if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) { + if (existing->protocol->proto_type != proto_t::v1) { + logger().warn("{} existing {} proto version is {} not 1, close existing", + conn, *existing, + static_cast<int>(existing->protocol->proto_type)); + // NOTE: this is following async messenger logic, but we may miss the reset event. + existing->mark_down(); + } else { + return handle_connect_with_existing(existing, std::move(authorizer_reply)); + } + } + if (h.connect.connect_seq > 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, + std::move(authorizer_reply)); + } + h.connect_seq = h.connect.connect_seq + 1; + h.peer_global_seq = h.connect.global_seq; + conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features); + // TODO: cct + return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); + }); +} + +void ProtocolV1::start_accept(SocketRef&& sock, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + logger().trace("{} trigger accepting, was {}", + conn, static_cast<int>(state)); + state = state_t::accepting; + set_write_state(write_state_t::delay); + + ceph_assert(!socket); + // until we know better + conn.target_addr = _peer_addr; + socket = std::move(sock); + messenger.accept_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + gate.dispatch_in_background("start_accept", *this, [this] { + // stop learning my_addr before sending it out, so it won't change + return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] { + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + ::encode(conn.target_addr, bl, 0); + return socket->write_flush(std::move(bl)); + }).then([this] { + // read client's handshake header and connect request + return socket->read(client_header_size); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ceph_assert(p.end()); + if ((addr.is_legacy() || addr.is_any()) && + addr.is_same_host(conn.target_addr)) { + // good + } else { + logger().error("{} peer advertized an invalid peer_addr: {}," + " which should be v1 and the same host with {}.", + conn, addr, conn.peer_addr); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + conn.peer_addr = addr; + conn.target_addr = conn.peer_addr; + return seastar::repeat([this] { + return repeat_handle_connect(); + }); + }).then([this] { + if (state != state_t::accepting) { + assert(state == state_t::closing); + throw std::system_error(make_error_code(error::protocol_aborted)); + } + messenger.register_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + execute_open(open_t::accepted); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the accepting state + logger().warn("{} accepting fault: {}", conn, eptr); + close(false); + }); + }); +} + +// open state + +ceph::bufferlist ProtocolV1::do_sweep_messages( + const std::deque<MessageRef>& msgs, + size_t num_msgs, + bool require_keepalive, + std::optional<utime_t> _keepalive_ack, + bool require_ack) +{ + static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) + + sizeof(ceph_msg_header) + + sizeof(ceph_msg_footer); + static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) + + sizeof(ceph_msg_header) + + sizeof(ceph_msg_footer_old); + + ceph::bufferlist bl; + if (likely(num_msgs)) { + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + bl.reserve(num_msgs * RESERVE_MSG_SIZE); + } else { + bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD); + } + } + + if (unlikely(require_keepalive)) { + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + logger().trace("{} write keepalive2 {}", conn, k.req.stamp.tv_sec); + bl.append(create_static(k.req)); + } + + if (unlikely(_keepalive_ack.has_value())) { + logger().trace("{} write keepalive2 ack {}", conn, *_keepalive_ack); + k.ack.stamp = ceph_timespec(*_keepalive_ack); + bl.append(create_static(k.ack)); + } + + if (require_ack) { + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time of crimson-osd's GA, the in-cluster communication + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + } + + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { + ceph_assert(!msg->get_seq() && "message already has seq"); + msg->set_seq(++conn.out_seq); + auto& header = msg->get_header(); + header.src = messenger.get_myname(); + msg->encode(conn.features, messenger.get_crc_flags()); + if (session_security) { + session_security->sign_message(msg.get()); + } + logger().debug("{} --> #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + bl.append(CEPH_MSGR_TAG_MSG); + bl.append((const char*)&header, sizeof(header)); + bl.append(msg->get_payload()); + bl.append(msg->get_middle()); + bl.append(msg->get_data()); + auto& footer = msg->get_footer(); + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + bl.append((const char*)&footer, sizeof(footer)); + } else { + ceph_msg_footer_old old_footer; + if (messenger.get_crc_flags() & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + if (messenger.get_crc_flags() & MSG_CRC_DATA) { + old_footer.data_crc = footer.data_crc; + } else { + old_footer.data_crc = 0; + } + old_footer.flags = footer.flags; + bl.append((const char*)&old_footer, sizeof(old_footer)); + } + }); + + return bl; +} + +seastar::future<> ProtocolV1::handle_keepalive2_ack() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + auto t = reinterpret_cast<const ceph_timespec*>(buf.get()); + k.ack_stamp = *t; + logger().trace("{} got keepalive2 ack {}", conn, t->tv_sec); + }); +} + +seastar::future<> ProtocolV1::handle_keepalive2() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + utime_t ack{*reinterpret_cast<const ceph_timespec*>(buf.get())}; + notify_keepalive_ack(ack); + }); +} + +seastar::future<> ProtocolV1::handle_ack() +{ + return socket->read_exactly(sizeof(ceph_le64)) + .then([this] (auto buf) { + auto seq = reinterpret_cast<const ceph_le64*>(buf.get()); + discard_up_to(&conn.sent, *seq); + }); +} + +seastar::future<> ProtocolV1::maybe_throttle() +{ + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + const auto to_read = (m.header.front_len + + m.header.middle_len + + m.header.data_len); + return conn.policy.throttler_bytes->get(to_read); +} + +seastar::future<> ProtocolV1::read_message() +{ + return socket->read(sizeof(m.header)) + .then([this] (bufferlist bl) { + // throttle the traffic, maybe + auto p = bl.cbegin(); + ::decode(m.header, p); + return maybe_throttle(); + }).then([this] { + // read front + return socket->read(m.header.front_len); + }).then([this] (bufferlist bl) { + m.front = std::move(bl); + // read middle + return socket->read(m.header.middle_len); + }).then([this] (bufferlist bl) { + m.middle = std::move(bl); + // read data + return socket->read(m.header.data_len); + }).then([this] (bufferlist bl) { + m.data = std::move(bl); + // read footer + return socket->read(sizeof(m.footer)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(m.footer, p); + auto conn_ref = seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this()); + auto msg = ::decode_message(nullptr, 0, m.header, m.footer, + m.front, m.middle, m.data, conn_ref); + if (unlikely(!msg)) { + logger().warn("{} decode message failed", conn); + throw std::system_error{make_error_code(error::corrupted_message)}; + } + constexpr bool add_ref = false; // Message starts with 1 ref + // TODO: change MessageRef with foreign_ptr + auto msg_ref = MessageRef{msg, add_ref}; + + if (session_security) { + if (unlikely(session_security->check_message_signature(msg))) { + logger().warn("{} message signature check failed", conn); + throw std::system_error{make_error_code(error::corrupted_message)}; + } + } + // TODO: set time stamps + msg->set_byte_throttler(conn.policy.throttler_bytes); + + if (unlikely(!conn.update_rx_seq(msg->get_seq()))) { + // skip this message + return seastar::now(); + } + + logger().debug("{} <== #{} === {} ({})", + conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type()); + // throttle the reading process by the returned future + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); + }); +} + +seastar::future<> ProtocolV1::handle_tags() +{ + return seastar::keep_doing([this] { + // read the next tag + return socket->read_exactly(1) + .then([this] (auto buf) { + switch (buf[0]) { + case CEPH_MSGR_TAG_MSG: + return read_message(); + case CEPH_MSGR_TAG_ACK: + return handle_ack(); + case CEPH_MSGR_TAG_KEEPALIVE: + return seastar::now(); + case CEPH_MSGR_TAG_KEEPALIVE2: + return handle_keepalive2(); + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + return handle_keepalive2_ack(); + case CEPH_MSGR_TAG_CLOSE: + logger().info("{} got tag close", conn); + throw std::system_error(make_error_code(error::protocol_aborted)); + default: + logger().error("{} got unknown msgr tag {}", + conn, static_cast<int>(buf[0])); + throw std::system_error(make_error_code(error::read_eof)); + } + }); + }); +} + +void ProtocolV1::execute_open(open_t type) +{ + logger().trace("{} trigger open, was {}", conn, static_cast<int>(state)); + state = state_t::open; + set_write_state(write_state_t::open); + + if (type == open_t::connected) { + dispatchers.ms_handle_connect( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + } else { // type == open_t::accepted + dispatchers.ms_handle_accept( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + } + + gate.dispatch_in_background("execute_open", *this, [this] { + // start background processing of tags + return handle_tags() + .handle_exception_type([this] (const std::system_error& e) { + logger().warn("{} open fault: {}", conn, e); + if (e.code() == error::protocol_aborted || + e.code() == std::errc::connection_reset || + e.code() == error::read_eof) { + close(true); + return seastar::now(); + } else { + throw e; + } + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the open state + logger().warn("{} open fault: {}", conn, eptr); + close(true); + }); + }); +} + +// closing state + +void ProtocolV1::trigger_close() +{ + logger().trace("{} trigger closing, was {}", + conn, static_cast<int>(state)); + messenger.closing_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + + if (state == state_t::accepting) { + messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else if (state >= state_t::connecting && state < state_t::closing) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + + if (!socket) { + ceph_assert(state == state_t::connecting); + } + + state = state_t::closing; +} + +void ProtocolV1::on_closed() +{ + messenger.closed_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); +} + +seastar::future<> ProtocolV1::fault() +{ + if (conn.policy.lossy) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); + } + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time of crimson-osd's GA, the in-cluster communication + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + return seastar::now(); +} + +void ProtocolV1::print(std::ostream& out) const +{ + out << conn; +} + +} // namespace crimson::net |