// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "ProtocolV1.h" #include #include #include #include "include/msgr.h" #include "include/random.h" #include "auth/Auth.h" #include "auth/AuthSessionHandler.h" #include "crimson/auth/AuthClient.h" #include "crimson/auth/AuthServer.h" #include "crimson/common/log.h" #include "chained_dispatchers.h" #include "Errors.h" #include "Socket.h" #include "SocketConnection.h" #include "SocketMessenger.h" WRITE_RAW_ENCODER(ceph_msg_connect); WRITE_RAW_ENCODER(ceph_msg_connect_reply); using crimson::common::local_conf; std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) { return out << "connect{features=" << std::hex << c.features << std::dec << " host_type=" << c.host_type << " global_seq=" << c.global_seq << " connect_seq=" << c.connect_seq << " protocol_version=" << c.protocol_version << " authorizer_protocol=" << c.authorizer_protocol << " authorizer_len=" << c.authorizer_len << " flags=" << std::hex << static_cast(c.flags) << std::dec << '}'; } std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) { return out << "connect_reply{tag=" << static_cast(r.tag) << " features=" << std::hex << r.features << std::dec << " global_seq=" << r.global_seq << " connect_seq=" << r.connect_seq << " protocol_version=" << r.protocol_version << " authorizer_len=" << r.authorizer_len << " flags=" << std::hex << static_cast(r.flags) << std::dec << '}'; } namespace { seastar::logger& logger() { return crimson::get_logger(ceph_subsys_ms); } template seastar::net::packet make_static_packet(const T& value) { return { reinterpret_cast(&value), sizeof(value) }; } // store the banner in a non-const string for buffer::create_static() char banner[] = CEPH_BANNER; constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); // check that the buffer starts with a valid banner without requiring it to // be contiguous in memory void validate_banner(bufferlist::const_iterator& p) { auto b = std::cbegin(banner); auto end = b + banner_size; while (b != end) { const char *buf{nullptr}; auto remaining = std::distance(b, end); auto len = p.get_ptr_and_advance(remaining, &buf); if (!std::equal(buf, buf + len, b)) { throw std::system_error( make_error_code(crimson::net::error::bad_connect_banner)); } b += len; } } // return a static bufferptr to the given object template bufferptr create_static(T& obj) { return buffer::create_static(sizeof(obj), reinterpret_cast(&obj)); } uint32_t get_proto_version(entity_type_t peer_type, bool connect) { constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; // see also OSD.h, unlike other connection of simple/async messenger, // crimson msgr is only used by osd constexpr uint32_t CEPH_OSD_PROTOCOL = 10; if (peer_type == my_type) { // internal return CEPH_OSD_PROTOCOL; } else { // public switch (connect ? peer_type : my_type) { case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; default: return 0; } } } void discard_up_to(std::deque* queue, crimson::net::seq_num_t seq) { while (!queue->empty() && queue->front()->get_seq() < seq) { queue->pop_front(); } } } // namespace anonymous namespace crimson::net { ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers, SocketConnection& conn, SocketMessenger& messenger) : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {} ProtocolV1::~ProtocolV1() {} bool ProtocolV1::is_connected() const { return state == state_t::open; } // connecting state void ProtocolV1::reset_session() { conn.out_q = {}; conn.sent = {}; conn.in_seq = 0; h.connect_seq = 0; if (HAVE_FEATURE(conn.features, MSG_AUTH)) { // Set out_seq to a random value, so CRC won't be predictable. // Constant to limit starting sequence number to 2^31. Nothing special // about it, just a big number. constexpr uint64_t SEQ_MASK = 0x7fffffff; conn.out_seq = ceph::util::generate_random_number(0, SEQ_MASK); } else { // previously, seq #'s always started at 0. conn.out_seq = 0; } } seastar::future ProtocolV1::handle_connect_reply(msgr_tag_t tag) { if (h.auth_payload.length() && !conn.peer_is_mon()) { if (tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { // more h.auth_more = messenger.get_auth_client()->handle_auth_reply_more( conn.shared_from_this(), auth_meta, h.auth_payload); return seastar::make_ready_future(stop_t::no); } else { int ret = messenger.get_auth_client()->handle_auth_done( conn.shared_from_this(), auth_meta, 0, 0, h.auth_payload); if (ret < 0) { // fault logger().warn("{} AuthClient::handle_auth_done() return {}", conn, ret); throw std::system_error(make_error_code(error::negotiation_failure)); } } } switch (tag) { case CEPH_MSGR_TAG_FEATURES: logger().error("{} connect protocol feature mispatch", __func__); throw std::system_error(make_error_code(error::negotiation_failure)); case CEPH_MSGR_TAG_BADPROTOVER: logger().error("{} connect protocol version mispatch", __func__); throw std::system_error(make_error_code(error::negotiation_failure)); case CEPH_MSGR_TAG_BADAUTHORIZER: logger().error("{} got bad authorizer", __func__); throw std::system_error(make_error_code(error::negotiation_failure)); case CEPH_MSGR_TAG_RESETSESSION: reset_session(); return seastar::make_ready_future(stop_t::no); case CEPH_MSGR_TAG_RETRY_GLOBAL: return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) { h.global_seq = gs; return seastar::make_ready_future(stop_t::no); }); case CEPH_MSGR_TAG_RETRY_SESSION: ceph_assert(h.reply.connect_seq > h.connect_seq); h.connect_seq = h.reply.connect_seq; return seastar::make_ready_future(stop_t::no); case CEPH_MSGR_TAG_WAIT: // TODO: state wait throw std::system_error(make_error_code(error::negotiation_failure)); case CEPH_MSGR_TAG_SEQ: case CEPH_MSGR_TAG_READY: if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features); missing) { logger().error("{} missing required features", __func__); throw std::system_error(make_error_code(error::negotiation_failure)); } return seastar::futurize_invoke([this, tag] { if (tag == CEPH_MSGR_TAG_SEQ) { return socket->read_exactly(sizeof(seq_num_t)) .then([this] (auto buf) { auto acked_seq = reinterpret_cast(buf.get()); discard_up_to(&conn.out_q, *acked_seq); return socket->write_flush(make_static_packet(conn.in_seq)); }); } // tag CEPH_MSGR_TAG_READY return seastar::now(); }).then([this] { // hooray! h.peer_global_seq = h.reply.global_seq; conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; h.connect_seq++; h.backoff = 0ms; conn.set_features(h.reply.features & h.connect.features); if (auth_meta->authorizer) { session_security.reset( get_auth_session_handler(nullptr, auth_meta->authorizer->protocol, auth_meta->session_key, conn.features)); } else { session_security.reset(); } return seastar::make_ready_future(stop_t::yes); }); break; default: // unknown tag logger().error("{} got unknown tag", __func__, int(tag)); throw std::system_error(make_error_code(error::negotiation_failure)); } } ceph::bufferlist ProtocolV1::get_auth_payload() { // only non-mons connectings to mons use MAuth messages if (conn.peer_is_mon() && messenger.get_mytype() != CEPH_ENTITY_TYPE_MON) { return {}; } else { if (h.auth_more.length()) { logger().info("using augmented (challenge) auth payload"); return std::move(h.auth_more); } else { auto [auth_method, preferred_modes, auth_bl] = messenger.get_auth_client()->get_auth_request( conn.shared_from_this(), auth_meta); auth_meta->auth_method = auth_method; return auth_bl; } } } seastar::future ProtocolV1::repeat_connect() { // encode ceph_msg_connect memset(&h.connect, 0, sizeof(h.connect)); h.connect.features = conn.policy.features_supported; h.connect.host_type = messenger.get_myname().type(); h.connect.global_seq = h.global_seq; h.connect.connect_seq = h.connect_seq; h.connect.protocol_version = get_proto_version(conn.get_peer_type(), true); // this is fyi, actually, server decides! h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; ceph_assert(messenger.get_auth_client()); bufferlist bl; bufferlist auth_bl = get_auth_payload(); if (auth_bl.length()) { h.connect.authorizer_protocol = auth_meta->auth_method; h.connect.authorizer_len = auth_bl.length(); bl.append(create_static(h.connect)); bl.claim_append(auth_bl); } else { h.connect.authorizer_protocol = 0; h.connect.authorizer_len = 0; bl.append(create_static(h.connect)); }; return socket->write_flush(std::move(bl)) .then([this] { // read the reply return socket->read(sizeof(h.reply)); }).then([this] (bufferlist bl) { auto p = bl.cbegin(); ::decode(h.reply, p); ceph_assert(p.end()); return socket->read(h.reply.authorizer_len); }).then([this] (bufferlist bl) { h.auth_payload = std::move(bl); return handle_connect_reply(h.reply.tag); }); } void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, const entity_name_t& _peer_name) { ceph_assert(state == state_t::none); logger().trace("{} trigger connecting, was {}", conn, static_cast(state)); state = state_t::connecting; set_write_state(write_state_t::delay); ceph_assert(!socket); ceph_assert(!gate.is_closed()); conn.peer_addr = _peer_addr; conn.target_addr = _peer_addr; conn.set_peer_name(_peer_name); conn.policy = messenger.get_policy(_peer_name.type()); messenger.register_conn( seastar::static_pointer_cast(conn.shared_from_this())); gate.dispatch_in_background("start_connect", *this, [this] { return Socket::connect(conn.peer_addr) .then([this](SocketRef sock) { socket = std::move(sock); if (state != state_t::connecting) { assert(state == state_t::closing); return socket->close().then([] { throw std::system_error(make_error_code(error::protocol_aborted)); }); } return seastar::now(); }).then([this] { return messenger.get_global_seq(); }).then([this] (auto gs) { h.global_seq = gs; // read server's handshake header return socket->read(server_header_size); }).then([this] (bufferlist headerbl) { auto p = headerbl.cbegin(); validate_banner(p); entity_addr_t saddr, caddr; ::decode(saddr, p); ::decode(caddr, p); ceph_assert(p.end()); if (saddr != conn.peer_addr) { logger().error("{} my peer_addr {} doesn't match what peer advertized {}", conn, conn.peer_addr, saddr); throw std::system_error( make_error_code(crimson::net::error::bad_peer_address)); } if (state != state_t::connecting) { assert(state == state_t::closing); throw std::system_error(make_error_code(error::protocol_aborted)); } socket->learn_ephemeral_port_as_connector(caddr.get_port()); if (unlikely(caddr.is_msgr2())) { logger().warn("{} peer sent a v2 address for me: {}", conn, caddr); throw std::system_error( make_error_code(crimson::net::error::bad_peer_address)); } caddr.set_type(entity_addr_t::TYPE_LEGACY); return messenger.learned_addr(caddr, conn); }).then([this] { // encode/send client's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); ::encode(messenger.get_myaddr(), bl, 0); return socket->write_flush(std::move(bl)); }).then([=] { return seastar::repeat([this] { return repeat_connect(); }); }).then([this] { if (state != state_t::connecting) { assert(state == state_t::closing); throw std::system_error(make_error_code(error::protocol_aborted)); } execute_open(open_t::connected); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the connecting state logger().warn("{} connecting fault: {}", conn, eptr); close(true); }); }); } // accepting state seastar::future ProtocolV1::send_connect_reply( msgr_tag_t tag, bufferlist&& authorizer_reply) { h.reply.tag = tag; h.reply.features = static_cast((h.connect.features & conn.policy.features_supported) | conn.policy.features_required); h.reply.authorizer_len = authorizer_reply.length(); return socket->write(make_static_packet(h.reply)) .then([this, reply=std::move(authorizer_reply)]() mutable { return socket->write_flush(std::move(reply)); }).then([] { return stop_t::no; }); } seastar::future ProtocolV1::send_connect_reply_ready( msgr_tag_t tag, bufferlist&& authorizer_reply) { return messenger.get_global_seq( ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) { h.global_seq = gs; h.reply.tag = tag; h.reply.features = conn.policy.features_supported; h.reply.global_seq = h.global_seq; h.reply.connect_seq = h.connect_seq; h.reply.flags = 0; if (conn.policy.lossy) { h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; } h.reply.authorizer_len = auth_len; session_security.reset( get_auth_session_handler(nullptr, auth_meta->auth_method, auth_meta->session_key, conn.features)); return socket->write(make_static_packet(h.reply)); }).then([this, reply=std::move(authorizer_reply)]() mutable { if (reply.length()) { return socket->write(std::move(reply)); } else { return seastar::now(); } }).then([this] { if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { return socket->write_flush(make_static_packet(conn.in_seq)) .then([this] { return socket->read_exactly(sizeof(seq_num_t)); }).then([this] (auto buf) { auto acked_seq = reinterpret_cast(buf.get()); discard_up_to(&conn.out_q, *acked_seq); }); } else { return socket->flush(); } }).then([] { return stop_t::yes; }); } seastar::future ProtocolV1::replace_existing( SocketConnectionRef existing, bufferlist&& authorizer_reply, bool is_reset_from_peer) { msgr_tag_t reply_tag; if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && !is_reset_from_peer) { reply_tag = CEPH_MSGR_TAG_SEQ; } else { reply_tag = CEPH_MSGR_TAG_READY; } if (!existing->is_lossy()) { // XXX: we decided not to support lossless connection in v1. as the // client's default policy is // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is // lossy. And by the time // will all be performed using v2 protocol. ceph_abort("lossless policy not supported for v1"); } existing->protocol->close(true); return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); } seastar::future ProtocolV1::handle_connect_with_existing( SocketConnectionRef existing, bufferlist&& authorizer_reply) { ProtocolV1 *exproto = dynamic_cast(existing->protocol.get()); if (h.connect.global_seq < exproto->peer_global_seq()) { h.reply.global_seq = exproto->peer_global_seq(); return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); } else if (existing->is_lossy()) { return replace_existing(existing, std::move(authorizer_reply)); } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) { return replace_existing(existing, std::move(authorizer_reply), true); } else if (h.connect.connect_seq < exproto->connect_seq()) { // old attempt, or we sent READY but they didn't get it. h.reply.connect_seq = exproto->connect_seq() + 1; return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); } else if (h.connect.connect_seq == exproto->connect_seq()) { // if the existing connection successfully opened, and/or // subsequently went to standby, then the peer should bump // their connect_seq and retry: this is not a connection race // we need to resolve here. if (exproto->get_state() == state_t::open || exproto->get_state() == state_t::standby) { if (conn.policy.resetcheck && exproto->connect_seq() == 0) { return replace_existing(existing, std::move(authorizer_reply)); } else { h.reply.connect_seq = exproto->connect_seq() + 1; return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); } } else if (existing->peer_wins()) { return replace_existing(existing, std::move(authorizer_reply)); } else { return send_connect_reply(CEPH_MSGR_TAG_WAIT); } } else if (conn.policy.resetcheck && exproto->connect_seq() == 0) { return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); } else { return replace_existing(existing, std::move(authorizer_reply)); } } bool ProtocolV1::require_auth_feature() const { if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { return false; } if (local_conf()->cephx_require_signatures) { return true; } if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || h.connect.host_type == CEPH_ENTITY_TYPE_MDS || h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { return local_conf()->cephx_cluster_require_signatures; } else { return local_conf()->cephx_service_require_signatures; } } bool ProtocolV1::require_cephx_v2_feature() const { if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { return false; } if (local_conf()->cephx_require_version >= 2) { return true; } if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || h.connect.host_type == CEPH_ENTITY_TYPE_MDS || h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { return local_conf()->cephx_cluster_require_version >= 2; } else { return local_conf()->cephx_service_require_version >= 2; } } seastar::future ProtocolV1::repeat_handle_connect() { return socket->read(sizeof(h.connect)) .then([this](bufferlist bl) { auto p = bl.cbegin(); ::decode(h.connect, p); if (conn.get_peer_type() != 0 && conn.get_peer_type() != h.connect.host_type) { logger().error("{} repeat_handle_connect(): my peer type does not match" " what peer advertises {} != {}", conn, conn.get_peer_type(), h.connect.host_type); throw std::system_error(make_error_code(error::protocol_aborted)); } conn.set_peer_type(h.connect.host_type); conn.policy = messenger.get_policy(h.connect.host_type); if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { logger().error("{} we don't know how to reconnect to peer {}", conn, conn.target_addr); throw std::system_error( make_error_code(crimson::net::error::bad_peer_address)); } return socket->read(h.connect.authorizer_len); }).then([this] (bufferlist authorizer) { memset(&h.reply, 0, sizeof(h.reply)); // TODO: set reply.protocol_version if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { return send_connect_reply( CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); } if (require_auth_feature()) { conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH; } if (require_cephx_v2_feature()) { conn.policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2; } if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features; feat_missing != 0) { return send_connect_reply( CEPH_MSGR_TAG_FEATURES, bufferlist{}); } bufferlist authorizer_reply; auth_meta->auth_method = h.connect.authorizer_protocol; if (!HAVE_FEATURE((uint64_t)h.connect.features, CEPHX_V2)) { // peer doesn't support it and we won't get here if we require it auth_meta->skip_authorizer_challenge = true; } auto more = static_cast(auth_meta->authorizer_challenge); ceph_assert(messenger.get_auth_server()); int r = messenger.get_auth_server()->handle_auth_request( conn.shared_from_this(), auth_meta, more, auth_meta->auth_method, authorizer, &authorizer_reply); if (r < 0) { session_security.reset(); return send_connect_reply( CEPH_MSGR_TAG_BADAUTHORIZER, std::move(authorizer_reply)); } else if (r == 0) { ceph_assert(authorizer_reply.length()); return send_connect_reply( CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, std::move(authorizer_reply)); } // r > 0 if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) { if (existing->protocol->proto_type != proto_t::v1) { logger().warn("{} existing {} proto version is {} not 1, close existing", conn, *existing, static_cast(existing->protocol->proto_type)); // NOTE: this is following async messenger logic, but we may miss the reset event. existing->mark_down(); } else { return handle_connect_with_existing(existing, std::move(authorizer_reply)); } } if (h.connect.connect_seq > 0) { return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, std::move(authorizer_reply)); } h.connect_seq = h.connect.connect_seq + 1; h.peer_global_seq = h.connect.global_seq; conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features); // TODO: cct return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); }); } void ProtocolV1::start_accept(SocketRef&& sock, const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::none); logger().trace("{} trigger accepting, was {}", conn, static_cast(state)); state = state_t::accepting; set_write_state(write_state_t::delay); ceph_assert(!socket); // until we know better conn.target_addr = _peer_addr; socket = std::move(sock); messenger.accept_conn( seastar::static_pointer_cast(conn.shared_from_this())); gate.dispatch_in_background("start_accept", *this, [this] { // stop learning my_addr before sending it out, so it won't change return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] { // encode/send server's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); ::encode(messenger.get_myaddr(), bl, 0); ::encode(conn.target_addr, bl, 0); return socket->write_flush(std::move(bl)); }).then([this] { // read client's handshake header and connect request return socket->read(client_header_size); }).then([this] (bufferlist bl) { auto p = bl.cbegin(); validate_banner(p); entity_addr_t addr; ::decode(addr, p); ceph_assert(p.end()); if ((addr.is_legacy() || addr.is_any()) && addr.is_same_host(conn.target_addr)) { // good } else { logger().error("{} peer advertized an invalid peer_addr: {}," " which should be v1 and the same host with {}.", conn, addr, conn.peer_addr); throw std::system_error( make_error_code(crimson::net::error::bad_peer_address)); } conn.peer_addr = addr; conn.target_addr = conn.peer_addr; return seastar::repeat([this] { return repeat_handle_connect(); }); }).then([this] { if (state != state_t::accepting) { assert(state == state_t::closing); throw std::system_error(make_error_code(error::protocol_aborted)); } messenger.register_conn( seastar::static_pointer_cast(conn.shared_from_this())); messenger.unaccept_conn( seastar::static_pointer_cast(conn.shared_from_this())); execute_open(open_t::accepted); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state logger().warn("{} accepting fault: {}", conn, eptr); close(false); }); }); } // open state ceph::bufferlist ProtocolV1::do_sweep_messages( const std::deque& msgs, size_t num_msgs, bool require_keepalive, std::optional _keepalive_ack, bool require_ack) { static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer); static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer_old); ceph::bufferlist bl; if (likely(num_msgs)) { if (HAVE_FEATURE(conn.features, MSG_AUTH)) { bl.reserve(num_msgs * RESERVE_MSG_SIZE); } else { bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD); } } if (unlikely(require_keepalive)) { k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( ceph::coarse_real_clock::now()); logger().trace("{} write keepalive2 {}", conn, k.req.stamp.tv_sec); bl.append(create_static(k.req)); } if (unlikely(_keepalive_ack.has_value())) { logger().trace("{} write keepalive2 ack {}", conn, *_keepalive_ack); k.ack.stamp = ceph_timespec(*_keepalive_ack); bl.append(create_static(k.ack)); } if (require_ack) { // XXX: we decided not to support lossless connection in v1. as the // client's default policy is // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is // lossy. And by the time of crimson-osd's GA, the in-cluster communication // will all be performed using v2 protocol. ceph_abort("lossless policy not supported for v1"); } std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { ceph_assert(!msg->get_seq() && "message already has seq"); msg->set_seq(++conn.out_seq); auto& header = msg->get_header(); header.src = messenger.get_myname(); msg->encode(conn.features, messenger.get_crc_flags()); if (session_security) { session_security->sign_message(msg.get()); } logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); bl.append(CEPH_MSGR_TAG_MSG); bl.append((const char*)&header, sizeof(header)); bl.append(msg->get_payload()); bl.append(msg->get_middle()); bl.append(msg->get_data()); auto& footer = msg->get_footer(); if (HAVE_FEATURE(conn.features, MSG_AUTH)) { bl.append((const char*)&footer, sizeof(footer)); } else { ceph_msg_footer_old old_footer; if (messenger.get_crc_flags() & MSG_CRC_HEADER) { old_footer.front_crc = footer.front_crc; old_footer.middle_crc = footer.middle_crc; } else { old_footer.front_crc = old_footer.middle_crc = 0; } if (messenger.get_crc_flags() & MSG_CRC_DATA) { old_footer.data_crc = footer.data_crc; } else { old_footer.data_crc = 0; } old_footer.flags = footer.flags; bl.append((const char*)&old_footer, sizeof(old_footer)); } }); return bl; } seastar::future<> ProtocolV1::handle_keepalive2_ack() { return socket->read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { auto t = reinterpret_cast(buf.get()); k.ack_stamp = *t; logger().trace("{} got keepalive2 ack {}", conn, t->tv_sec); }); } seastar::future<> ProtocolV1::handle_keepalive2() { return socket->read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { utime_t ack{*reinterpret_cast(buf.get())}; notify_keepalive_ack(ack); }); } seastar::future<> ProtocolV1::handle_ack() { return socket->read_exactly(sizeof(ceph_le64)) .then([this] (auto buf) { auto seq = reinterpret_cast(buf.get()); discard_up_to(&conn.sent, *seq); }); } seastar::future<> ProtocolV1::maybe_throttle() { if (!conn.policy.throttler_bytes) { return seastar::now(); } const auto to_read = (m.header.front_len + m.header.middle_len + m.header.data_len); return conn.policy.throttler_bytes->get(to_read); } seastar::future<> ProtocolV1::read_message() { return socket->read(sizeof(m.header)) .then([this] (bufferlist bl) { // throttle the traffic, maybe auto p = bl.cbegin(); ::decode(m.header, p); return maybe_throttle(); }).then([this] { // read front return socket->read(m.header.front_len); }).then([this] (bufferlist bl) { m.front = std::move(bl); // read middle return socket->read(m.header.middle_len); }).then([this] (bufferlist bl) { m.middle = std::move(bl); // read data return socket->read(m.header.data_len); }).then([this] (bufferlist bl) { m.data = std::move(bl); // read footer return socket->read(sizeof(m.footer)); }).then([this] (bufferlist bl) { auto p = bl.cbegin(); ::decode(m.footer, p); auto conn_ref = seastar::static_pointer_cast( conn.shared_from_this()); auto msg = ::decode_message(nullptr, 0, m.header, m.footer, m.front, m.middle, m.data, conn_ref); if (unlikely(!msg)) { logger().warn("{} decode message failed", conn); throw std::system_error{make_error_code(error::corrupted_message)}; } constexpr bool add_ref = false; // Message starts with 1 ref // TODO: change MessageRef with foreign_ptr auto msg_ref = MessageRef{msg, add_ref}; if (session_security) { if (unlikely(session_security->check_message_signature(msg))) { logger().warn("{} message signature check failed", conn); throw std::system_error{make_error_code(error::corrupted_message)}; } } // TODO: set time stamps msg->set_byte_throttler(conn.policy.throttler_bytes); if (unlikely(!conn.update_rx_seq(msg->get_seq()))) { // skip this message return seastar::now(); } logger().debug("{} <== #{} === {} ({})", conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type()); // throttle the reading process by the returned future return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); }); } seastar::future<> ProtocolV1::handle_tags() { return seastar::keep_doing([this] { // read the next tag return socket->read_exactly(1) .then([this] (auto buf) { switch (buf[0]) { case CEPH_MSGR_TAG_MSG: return read_message(); case CEPH_MSGR_TAG_ACK: return handle_ack(); case CEPH_MSGR_TAG_KEEPALIVE: return seastar::now(); case CEPH_MSGR_TAG_KEEPALIVE2: return handle_keepalive2(); case CEPH_MSGR_TAG_KEEPALIVE2_ACK: return handle_keepalive2_ack(); case CEPH_MSGR_TAG_CLOSE: logger().info("{} got tag close", conn); throw std::system_error(make_error_code(error::protocol_aborted)); default: logger().error("{} got unknown msgr tag {}", conn, static_cast(buf[0])); throw std::system_error(make_error_code(error::read_eof)); } }); }); } void ProtocolV1::execute_open(open_t type) { logger().trace("{} trigger open, was {}", conn, static_cast(state)); state = state_t::open; set_write_state(write_state_t::open); if (type == open_t::connected) { dispatchers.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())); } else { // type == open_t::accepted dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); } gate.dispatch_in_background("execute_open", *this, [this] { // start background processing of tags return handle_tags() .handle_exception_type([this] (const std::system_error& e) { logger().warn("{} open fault: {}", conn, e); if (e.code() == error::protocol_aborted || e.code() == std::errc::connection_reset || e.code() == error::read_eof) { close(true); return seastar::now(); } else { throw e; } }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the open state logger().warn("{} open fault: {}", conn, eptr); close(true); }); }); } // closing state void ProtocolV1::trigger_close() { logger().trace("{} trigger closing, was {}", conn, static_cast(state)); messenger.closing_conn( seastar::static_pointer_cast( conn.shared_from_this())); if (state == state_t::accepting) { messenger.unaccept_conn(seastar::static_pointer_cast( conn.shared_from_this())); } else if (state >= state_t::connecting && state < state_t::closing) { messenger.unregister_conn(seastar::static_pointer_cast( conn.shared_from_this())); } else { // cannot happen ceph_assert(false); } if (!socket) { ceph_assert(state == state_t::connecting); } state = state_t::closing; } void ProtocolV1::on_closed() { messenger.closed_conn( seastar::static_pointer_cast( conn.shared_from_this())); } seastar::future<> ProtocolV1::fault() { if (conn.policy.lossy) { messenger.unregister_conn(seastar::static_pointer_cast( conn.shared_from_this())); } // XXX: we decided not to support lossless connection in v1. as the // client's default policy is // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is // lossy. And by the time of crimson-osd's GA, the in-cluster communication // will all be performed using v2 protocol. ceph_abort("lossless policy not supported for v1"); return seastar::now(); } void ProtocolV1::print(std::ostream& out) const { out << conn; } } // namespace crimson::net