diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/crimson/net | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net')
-rw-r--r-- | src/crimson/net/Config.h | 26 | ||||
-rw-r--r-- | src/crimson/net/Connection.h | 66 | ||||
-rw-r--r-- | src/crimson/net/Dispatcher.cc | 11 | ||||
-rw-r--r-- | src/crimson/net/Dispatcher.h | 65 | ||||
-rw-r--r-- | src/crimson/net/Errors.cc | 107 | ||||
-rw-r--r-- | src/crimson/net/Errors.h | 54 | ||||
-rw-r--r-- | src/crimson/net/Fwd.h | 52 | ||||
-rw-r--r-- | src/crimson/net/Messenger.cc | 21 | ||||
-rw-r--r-- | src/crimson/net/Messenger.h | 117 | ||||
-rw-r--r-- | src/crimson/net/Socket.cc | 81 | ||||
-rw-r--r-- | src/crimson/net/Socket.h | 59 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.cc | 972 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.h | 235 | ||||
-rw-r--r-- | src/crimson/net/SocketMessenger.cc | 283 | ||||
-rw-r--r-- | src/crimson/net/SocketMessenger.h | 119 |
15 files changed, 2268 insertions, 0 deletions
diff --git a/src/crimson/net/Config.h b/src/crimson/net/Config.h new file mode 100644 index 00000000..90929bde --- /dev/null +++ b/src/crimson/net/Config.h @@ -0,0 +1,26 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +// XXX: a poor man's md_config_t +#pragma once + +#include "include/msgr.h" +#include <chrono> + +namespace ceph::net { + +using namespace std::literals::chrono_literals; + +constexpr struct simple_md_config_t { + uint32_t host_type = CEPH_ENTITY_TYPE_OSD; + bool cephx_require_signatures = false; + bool cephx_cluster_require_signatures = false; + bool cephx_service_require_signatures = false; + bool ms_die_on_old_message = true; + bool ms_die_on_skipped_message = true; + std::chrono::milliseconds ms_initial_backoff = 200ms; + std::chrono::milliseconds ms_max_backoff = 15000ms; + std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms; + size_t osd_client_message_size_cap = 500ULL << 20; +} conf; +} diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h new file mode 100644 index 00000000..b1b72c74 --- /dev/null +++ b/src/crimson/net/Connection.h @@ -0,0 +1,66 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <queue> +#include <seastar/core/future.hh> +#include <seastar/core/shared_ptr.hh> + +#include "Fwd.h" + +namespace ceph::net { + +using seq_num_t = uint64_t; + +class Connection : public seastar::enable_shared_from_this<Connection> { + protected: + entity_addr_t peer_addr; + peer_type_t peer_type = -1; + + public: + Connection() {} + virtual ~Connection() {} + + virtual Messenger* get_messenger() const = 0; + const entity_addr_t& get_peer_addr() const { return peer_addr; } + virtual int get_peer_type() const = 0; + + /// true if the handshake has completed and no errors have been encountered + virtual seastar::future<bool> is_connected() = 0; + + /// send a message over a connection that has completed its handshake + virtual seastar::future<> send(MessageRef msg) = 0; + + /// send a keepalive message over a connection that has completed its + /// handshake + virtual seastar::future<> keepalive() = 0; + + /// close the connection and cancel any any pending futures from read/send + virtual seastar::future<> close() = 0; + + /// which shard id the connection lives + virtual seastar::shard_id shard_id() const = 0; + + virtual void print(ostream& out) const = 0; +}; + +inline ostream& operator<<(ostream& out, const Connection& conn) { + out << "["; + conn.print(out); + out << "]"; + return out; +} + +} // namespace ceph::net diff --git a/src/crimson/net/Dispatcher.cc b/src/crimson/net/Dispatcher.cc new file mode 100644 index 00000000..336ded38 --- /dev/null +++ b/src/crimson/net/Dispatcher.cc @@ -0,0 +1,11 @@ +#include "auth/Auth.h" +#include "Dispatcher.h" + +namespace ceph::net +{ +seastar::future<std::unique_ptr<AuthAuthorizer>> +Dispatcher::ms_get_authorizer(peer_type_t) +{ + return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(nullptr); +} +} diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h new file mode 100644 index 00000000..cbde1549 --- /dev/null +++ b/src/crimson/net/Dispatcher.h @@ -0,0 +1,65 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/future.hh> +#include <seastar/core/sharded.hh> + +#include "Fwd.h" + +class AuthAuthorizer; + +namespace ceph::net { + +class Dispatcher { + public: + virtual ~Dispatcher() {} + + virtual seastar::future<> ms_dispatch(ConnectionRef conn, MessageRef m) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_accept(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_connect(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_reset(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_remote_reset(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<msgr_tag_t, bufferlist> + ms_verify_authorizer(peer_type_t, + auth_proto_t, + bufferlist&) { + return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{}); + } + virtual seastar::future<std::unique_ptr<AuthAuthorizer>> + ms_get_authorizer(peer_type_t); + + // get the local dispatcher shard if it is accessed by another core + virtual Dispatcher* get_local_shard() { + return this; + } +}; + +} // namespace ceph::net diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc new file mode 100644 index 00000000..62d60ce1 --- /dev/null +++ b/src/crimson/net/Errors.cc @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Errors.h" + +namespace ceph::net { + +const std::error_category& net_category() +{ + struct category : public std::error_category { + const char* name() const noexcept override { + return "ceph::net"; + } + + std::string message(int ev) const override { + switch (static_cast<error>(ev)) { + case error::success: + return "success"; + case error::bad_connect_banner: + return "bad connect banner"; + case error::bad_peer_address: + return "bad peer address"; + case error::negotiation_failure: + return "negotiation failure"; + case error::read_eof: + return "read eof"; + case error::connection_aborted: + return "connection aborted"; + case error::connection_refused: + return "connection refused"; + case error::connection_reset: + return "connection reset"; + default: + return "unknown"; + } + } + + // unfortunately, seastar throws connection errors with the system category, + // rather than the generic category that would match their counterparts + // in std::errc. we add our own errors for them, so we can match either + std::error_condition default_error_condition(int ev) const noexcept override { + switch (static_cast<error>(ev)) { + case error::connection_aborted: + return std::errc::connection_aborted; + case error::connection_refused: + return std::errc::connection_refused; + case error::connection_reset: + return std::errc::connection_reset; + default: + return std::error_condition(ev, *this); + } + } + + bool equivalent(int code, const std::error_condition& cond) const noexcept override { + if (error_category::equivalent(code, cond)) { + return true; + } + switch (static_cast<error>(code)) { + case error::connection_aborted: + return cond == std::errc::connection_aborted + || cond == std::error_condition(ECONNABORTED, std::system_category()); + case error::connection_refused: + return cond == std::errc::connection_refused + || cond == std::error_condition(ECONNREFUSED, std::system_category()); + case error::connection_reset: + return cond == std::errc::connection_reset + || cond == std::error_condition(ECONNRESET, std::system_category()); + default: + return false; + } + } + + bool equivalent(const std::error_code& code, int cond) const noexcept override { + if (error_category::equivalent(code, cond)) { + return true; + } + switch (static_cast<error>(cond)) { + case error::connection_aborted: + return code == std::errc::connection_aborted + || code == std::error_code(ECONNABORTED, std::system_category()); + case error::connection_refused: + return code == std::errc::connection_refused + || code == std::error_code(ECONNREFUSED, std::system_category()); + case error::connection_reset: + return code == std::errc::connection_reset + || code == std::error_code(ECONNRESET, std::system_category()); + default: + return false; + } + } + }; + static category instance; + return instance; +} + +} // namespace ceph::net diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h new file mode 100644 index 00000000..d75082fd --- /dev/null +++ b/src/crimson/net/Errors.h @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <system_error> + +namespace ceph::net { + +/// net error codes +enum class error { + success = 0, + bad_connect_banner, + bad_peer_address, + negotiation_failure, + read_eof, + connection_aborted, + connection_refused, + connection_reset, +}; + +/// net error category +const std::error_category& net_category(); + +inline std::error_code make_error_code(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +inline std::error_condition make_error_condition(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +} // namespace ceph::net + +namespace std { + +/// enables implicit conversion to std::error_condition +template <> +struct is_error_condition_enum<ceph::net::error> : public true_type {}; + +} // namespace std diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h new file mode 100644 index 00000000..8a0a1c96 --- /dev/null +++ b/src/crimson/net/Fwd.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/sharded.hh> + +#include "msg/msg_types.h" +#include "msg/Message.h" + +using peer_type_t = int; +using auth_proto_t = int; + +namespace ceph::net { + +using msgr_tag_t = uint8_t; + +class Connection; +using ConnectionRef = seastar::shared_ptr<Connection>; +// NOTE: ConnectionXRef should only be used in seastar world, because +// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads. +using ConnectionXRef = seastar::lw_shared_ptr<seastar::foreign_ptr<ConnectionRef>>; + +class Dispatcher; + +class Messenger; + +template <typename T, typename... Args> +seastar::future<T*> create_sharded(Args... args) { + auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>(); + return sharded_obj->start(args...).then([sharded_obj]() { + auto ret = &sharded_obj->local(); + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().finally([sharded_obj] {}); + }); + return ret; + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc new file mode 100644 index 00000000..7f8665ef --- /dev/null +++ b/src/crimson/net/Messenger.cc @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Messenger.h" +#include "SocketMessenger.h" + +namespace ceph::net { + +seastar::future<Messenger*> +Messenger::create(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const int master_sid) +{ + return create_sharded<SocketMessenger>(name, lname, nonce, master_sid) + .then([](Messenger *msgr) { + return msgr; + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h new file mode 100644 index 00000000..9d766cb0 --- /dev/null +++ b/src/crimson/net/Messenger.h @@ -0,0 +1,117 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/future.hh> + +#include "Fwd.h" +#include "crimson/thread/Throttle.h" +#include "msg/Policy.h" + +class AuthAuthorizer; + +namespace ceph::net { + +using Throttle = ceph::thread::Throttle; +using SocketPolicy = ceph::net::Policy<Throttle>; + +class Messenger { + entity_name_t my_name; + entity_addrvec_t my_addrs; + uint32_t global_seq = 0; + uint32_t crc_flags = 0; + + public: + Messenger(const entity_name_t& name) + : my_name(name) + {} + virtual ~Messenger() {} + + const entity_name_t& get_myname() const { return my_name; } + const entity_addrvec_t& get_myaddrs() const { return my_addrs; } + entity_addr_t get_myaddr() const { return my_addrs.front(); } + virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) { + my_addrs = addrs; + return seastar::now(); + } + + /// bind to the given address + virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0; + + /// try to bind to the first unused port of given address + virtual seastar::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) = 0; + + /// start the messenger + virtual seastar::future<> start(Dispatcher *dispatcher) = 0; + + /// either return an existing connection to the peer, + /// or a new pending connection + virtual seastar::future<ConnectionXRef> + connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) = 0; + + // wait for messenger shutdown + virtual seastar::future<> wait() = 0; + + /// stop listenening and wait for all connections to close. safe to destruct + /// after this future becomes available + virtual seastar::future<> shutdown() = 0; + + uint32_t get_global_seq(uint32_t old=0) { + if (old > global_seq) { + global_seq = old; + } + return ++global_seq; + } + + uint32_t get_crc_flags() const { + return crc_flags; + } + void set_crc_data() { + crc_flags |= MSG_CRC_DATA; + } + void set_crc_header() { + crc_flags |= MSG_CRC_HEADER; + } + + // get the local messenger shard if it is accessed by another core + virtual Messenger* get_local_shard() { + return this; + } + + virtual void print(ostream& out) const = 0; + + virtual void set_default_policy(const SocketPolicy& p) = 0; + + virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0; + + virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0; + + static seastar::future<Messenger*> + create(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const int master_sid=-1); +}; + +inline ostream& operator<<(ostream& out, const Messenger& msgr) { + out << "["; + msgr.print(out); + out << "]"; + return out; +} + +} // namespace ceph::net diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc new file mode 100644 index 00000000..a22e9b2e --- /dev/null +++ b/src/crimson/net/Socket.cc @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Socket.h" + +#include "Errors.h" + +namespace ceph::net { + +namespace { + +// an input_stream consumer that reads buffer segments into a bufferlist up to +// the given number of remaining bytes +struct bufferlist_consumer { + bufferlist& bl; + size_t& remaining; + + bufferlist_consumer(bufferlist& bl, size_t& remaining) + : bl(bl), remaining(remaining) {} + + using tmp_buf = seastar::temporary_buffer<char>; + using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type; + + // consume some or all of a buffer segment + seastar::future<consumption_result_type> operator()(tmp_buf&& data) { + if (remaining >= data.size()) { + // consume the whole buffer + remaining -= data.size(); + bl.append(buffer::create_foreign(std::move(data))); + if (remaining > 0) { + // return none to request more segments + return seastar::make_ready_future<consumption_result_type>( + seastar::continue_consuming{}); + } else { + // return an empty buffer to singal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type({})); + } + } + if (remaining > 0) { + // consume the front + bl.append(buffer::create_foreign(data.share(0, remaining))); + data.trim_front(remaining); + remaining = 0; + } + // give the rest back to signal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type{std::move(data)}); + }; +}; + +} // anonymous namespace + +seastar::future<bufferlist> Socket::read(size_t bytes) +{ + if (bytes == 0) { + return seastar::make_ready_future<bufferlist>(); + } + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}) + .then([this] { + if (r.remaining) { // throw on short reads + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<bufferlist>(std::move(r.buffer)); + }); +} + +seastar::future<seastar::temporary_buffer<char>> +Socket::read_exactly(size_t bytes) { + return in.read_exactly(bytes) + .then([this](auto buf) { + if (buf.empty()) { + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<tmp_buf>(std::move(buf)); + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h new file mode 100644 index 00000000..c1a2ed59 --- /dev/null +++ b/src/crimson/net/Socket.h @@ -0,0 +1,59 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/reactor.hh> +#include <seastar/net/packet.hh> + +#include "include/buffer.h" + +namespace ceph::net { + +class Socket +{ + const seastar::shard_id sid; + seastar::connected_socket socket; + seastar::input_stream<char> in; + seastar::output_stream<char> out; + + /// buffer state for read() + struct { + bufferlist buffer; + size_t remaining; + } r; + + public: + explicit Socket(seastar::connected_socket&& _socket) + : sid{seastar::engine().cpu_id()}, + socket(std::move(_socket)), + in(socket.input()), + out(socket.output()) {} + Socket(Socket&& o) = delete; + + /// read the requested number of bytes into a bufferlist + seastar::future<bufferlist> read(size_t bytes); + using tmp_buf = seastar::temporary_buffer<char>; + using packet = seastar::net::packet; + seastar::future<tmp_buf> read_exactly(size_t bytes); + + seastar::future<> write(packet&& buf) { + return out.write(std::move(buf)); + } + seastar::future<> flush() { + return out.flush(); + } + seastar::future<> write_flush(packet&& buf) { + return out.write(std::move(buf)).then([this] { return out.flush(); }); + } + + /// Socket can only be closed once. + seastar::future<> close() { + return seastar::smp::submit_to(sid, [this] { + return seastar::when_all( + in.close(), out.close()).discard_result(); + }); + } +}; + +} // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc new file mode 100644 index 00000000..2907c486 --- /dev/null +++ b/src/crimson/net/SocketConnection.cc @@ -0,0 +1,972 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketConnection.h" + +#include <algorithm> +#include <seastar/core/shared_future.hh> +#include <seastar/core/sleep.hh> +#include <seastar/net/packet.hh> + +#include "include/msgr.h" +#include "include/random.h" +#include "auth/Auth.h" +#include "auth/AuthSessionHandler.h" + +#include "crimson/common/log.h" +#include "Config.h" +#include "Dispatcher.h" +#include "Errors.h" +#include "SocketMessenger.h" + +using namespace ceph::net; + +template <typename T> +seastar::net::packet make_static_packet(const T& value) { + return { reinterpret_cast<const char*>(&value), sizeof(value) }; +} + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); + } +} + +SocketConnection::SocketConnection(SocketMessenger& messenger, + Dispatcher& dispatcher) + : messenger(messenger), + dispatcher(dispatcher), + send_ready(h.promise.get_future()) +{ + ceph_assert(&messenger.container().local() == &messenger); +} + +SocketConnection::~SocketConnection() +{ + ceph_assert(pending_dispatch.is_closed()); +} + +ceph::net::Messenger* +SocketConnection::get_messenger() const { + return &messenger; +} + +seastar::future<bool> SocketConnection::is_connected() +{ + return seastar::smp::submit_to(shard_id(), [this] { + return !send_ready.failed(); + }); +} + +seastar::future<> SocketConnection::send(MessageRef msg) +{ + return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { + if (state == state_t::closing) + return seastar::now(); + return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { + return do_send(std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} send fault: {}", *this, eptr); + close(); + }); + }); + }); +} + +seastar::future<> SocketConnection::keepalive() +{ + return seastar::smp::submit_to(shard_id(), [this] { + if (state == state_t::closing) + return seastar::now(); + return seastar::with_gate(pending_dispatch, [this] { + return do_keepalive() + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} keepalive fault: {}", *this, eptr); + close(); + }); + }); + }); +} + +seastar::future<> SocketConnection::close() +{ + return seastar::smp::submit_to(shard_id(), [this] { + return do_close(); + }); +} + +seastar::future<> SocketConnection::handle_tags() +{ + return seastar::keep_doing([this] { + // read the next tag + return socket->read_exactly(1) + .then([this] (auto buf) { + switch (buf[0]) { + case CEPH_MSGR_TAG_MSG: + return read_message(); + case CEPH_MSGR_TAG_ACK: + return handle_ack(); + case CEPH_MSGR_TAG_KEEPALIVE: + return seastar::now(); + case CEPH_MSGR_TAG_KEEPALIVE2: + return handle_keepalive2(); + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + return handle_keepalive2_ack(); + case CEPH_MSGR_TAG_CLOSE: + logger().info("{} got tag close", *this); + throw std::system_error(make_error_code(error::connection_aborted)); + default: + logger().error("{} got unknown msgr tag {}", *this, static_cast<int>(buf[0])); + throw std::system_error(make_error_code(error::read_eof)); + } + }); + }); +} + +seastar::future<> SocketConnection::handle_ack() +{ + return socket->read_exactly(sizeof(ceph_le64)) + .then([this] (auto buf) { + auto seq = reinterpret_cast<const ceph_le64*>(buf.get()); + discard_up_to(&sent, *seq); + }); +} + +void SocketConnection::discard_up_to(std::queue<MessageRef>* queue, + seq_num_t seq) +{ + while (!queue->empty() && + queue->front()->get_seq() < seq) { + queue->pop(); + } +} + +void SocketConnection::requeue_sent() +{ + out_seq -= sent.size(); + while (!sent.empty()) { + auto m = sent.front(); + sent.pop(); + out_q.push(std::move(m)); + } +} + +seastar::future<> SocketConnection::maybe_throttle() +{ + if (!policy.throttler_bytes) { + return seastar::now(); + } + const auto to_read = (m.header.front_len + + m.header.middle_len + + m.header.data_len); + return policy.throttler_bytes->get(to_read); +} + +seastar::future<> SocketConnection::read_message() +{ + return socket->read(sizeof(m.header)) + .then([this] (bufferlist bl) { + // throttle the traffic, maybe + auto p = bl.cbegin(); + ::decode(m.header, p); + return maybe_throttle(); + }).then([this] { + // read front + return socket->read(m.header.front_len); + }).then([this] (bufferlist bl) { + m.front = std::move(bl); + // read middle + return socket->read(m.header.middle_len); + }).then([this] (bufferlist bl) { + m.middle = std::move(bl); + // read data + return socket->read(m.header.data_len); + }).then([this] (bufferlist bl) { + m.data = std::move(bl); + // read footer + return socket->read(sizeof(m.footer)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(m.footer, p); + auto msg = ::decode_message(nullptr, 0, m.header, m.footer, + m.front, m.middle, m.data, nullptr); + // TODO: set time stamps + msg->set_byte_throttler(policy.throttler_bytes); + + if (!update_rx_seq(msg->get_seq())) { + // skip this message + return; + } + + constexpr bool add_ref = false; // Message starts with 1 ref + // TODO: change MessageRef with foreign_ptr + auto msg_ref = MessageRef{msg, add_ref}; + // start dispatch, ignoring exceptions from the application layer + seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { + return dispatcher.ms_dispatch( + seastar::static_pointer_cast<SocketConnection>(shared_from_this()), + std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_dispatch caught exception: {}", *this, eptr); + ceph_assert(false); + }); + }); + }); +} + +bool SocketConnection::update_rx_seq(seq_num_t seq) +{ + if (seq <= in_seq) { + if (HAVE_FEATURE(features, RECONNECT_SEQ) && + conf.ms_die_on_old_message) { + ceph_abort_msg("old msgs despite reconnect_seq feature"); + } + return false; + } else if (seq > in_seq + 1) { + if (conf.ms_die_on_skipped_message) { + ceph_abort_msg("skipped incoming seq"); + } + return false; + } else { + in_seq = seq; + return true; + } +} + +seastar::future<> SocketConnection::write_message(MessageRef msg) +{ + msg->set_seq(++out_seq); + auto& header = msg->get_header(); + header.src = messenger.get_myname(); + msg->encode(features, messenger.get_crc_flags()); + bufferlist bl; + bl.append(CEPH_MSGR_TAG_MSG); + bl.append((const char*)&header, sizeof(header)); + bl.append(msg->get_payload()); + bl.append(msg->get_middle()); + bl.append(msg->get_data()); + auto& footer = msg->get_footer(); + if (HAVE_FEATURE(features, MSG_AUTH)) { + bl.append((const char*)&footer, sizeof(footer)); + } else { + ceph_msg_footer_old old_footer; + if (messenger.get_crc_flags() & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + if (messenger.get_crc_flags() & MSG_CRC_DATA) { + old_footer.data_crc = footer.data_crc; + } else { + old_footer.data_crc = 0; + } + old_footer.flags = footer.flags; + bl.append((const char*)&old_footer, sizeof(old_footer)); + } + // write as a seastar::net::packet + return socket->write_flush(std::move(bl)); + // TODO: lossless policy + // .then([this, msg = std::move(msg)] { + // if (!policy.lossy) { + // sent.push(std::move(msg)); + // } + // }); +} + +seastar::future<> SocketConnection::do_send(MessageRef msg) +{ + // chain the message after the last message is sent + // TODO: retry send for lossless connection + seastar::shared_future<> f = send_ready.then( + [this, msg = std::move(msg)] { + if (state == state_t::closing) + return seastar::now(); + return write_message(std::move(msg)); + }); + + // chain any later messages after this one completes + send_ready = f.get_future(); + // allow the caller to wait on the same future + return f.get_future(); +} + +seastar::future<> SocketConnection::do_keepalive() +{ + // TODO: retry keepalive for lossless connection + seastar::shared_future<> f = send_ready.then([this] { + if (state == state_t::closing) + return seastar::now(); + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + return socket->write_flush(make_static_packet(k.req)); + }); + send_ready = f.get_future(); + return f.get_future(); +} + +seastar::future<> SocketConnection::do_close() +{ + if (state == state_t::closing) { + // already closing + assert(close_ready.valid()); + return close_ready.get_future(); + } + + // unregister_conn() drops a reference, so hold another until completion + auto cleanup = [conn_ref = shared_from_this(), this] { + logger().debug("{} closed!", *this); + }; + + if (state == state_t::accepting) { + messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + } else if (state >= state_t::connecting && state < state_t::closing) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + + // close_ready become valid only after state is state_t::closing + assert(!close_ready.valid()); + + if (socket) { + close_ready = socket->close() + .then([this] { + return pending_dispatch.close(); + }).finally(std::move(cleanup)); + } else { + ceph_assert(state == state_t::connecting); + close_ready = pending_dispatch.close().finally(std::move(cleanup)); + } + logger().debug("{} trigger closing, was {}", *this, static_cast<int>(state)); + state = state_t::closing; + return close_ready.get_future(); +} + +// handshake + +/// store the banner in a non-const string for buffer::create_static() +static char banner[] = CEPH_BANNER; +constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; + +constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); +constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); + +WRITE_RAW_ENCODER(ceph_msg_connect); +WRITE_RAW_ENCODER(ceph_msg_connect_reply); + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) +{ + return out << "connect{features=" << std::hex << c.features << std::dec + << " host_type=" << c.host_type + << " global_seq=" << c.global_seq + << " connect_seq=" << c.connect_seq + << " protocol_version=" << c.protocol_version + << " authorizer_protocol=" << c.authorizer_protocol + << " authorizer_len=" << c.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}'; +} + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) +{ + return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag) + << " features=" << std::hex << r.features << std::dec + << " global_seq=" << r.global_seq + << " connect_seq=" << r.connect_seq + << " protocol_version=" << r.protocol_version + << " authorizer_len=" << r.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}'; +} + +// check that the buffer starts with a valid banner without requiring it to +// be contiguous in memory +static void validate_banner(bufferlist::const_iterator& p) +{ + auto b = std::cbegin(banner); + auto end = b + banner_size; + while (b != end) { + const char *buf{nullptr}; + auto remaining = std::distance(b, end); + auto len = p.get_ptr_and_advance(remaining, &buf); + if (!std::equal(buf, buf + len, b)) { + throw std::system_error(make_error_code(error::bad_connect_banner)); + } + b += len; + } +} + +// make sure that we agree with the peer about its address +static void validate_peer_addr(const entity_addr_t& addr, + const entity_addr_t& expected) +{ + if (addr == expected) { + return; + } + // ok if server bound anonymously, as long as port/nonce match + if (addr.is_blank_ip() && + addr.get_port() == expected.get_port() && + addr.get_nonce() == expected.get_nonce()) { + return; + } else { + throw std::system_error(make_error_code(error::bad_peer_address)); + } +} + +/// return a static bufferptr to the given object +template <typename T> +bufferptr create_static(T& obj) +{ + return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj)); +} + +bool SocketConnection::require_auth_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (conf.cephx_require_signatures) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS) { + return conf.cephx_cluster_require_signatures; + } else { + return conf.cephx_service_require_signatures; + } +} + +uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const +{ + constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; + // see also OSD.h, unlike other connection of simple/async messenger, + // crimson msgr is only used by osd + constexpr uint32_t CEPH_OSD_PROTOCOL = 10; + if (peer_type == my_type) { + // internal + return CEPH_OSD_PROTOCOL; + } else { + // public + switch (connect ? peer_type : my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + default: return 0; + } + } +} + +seastar::future<seastar::stop_iteration> +SocketConnection::repeat_handle_connect() +{ + return socket->read(sizeof(h.connect)) + .then([this](bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.connect, p); + peer_type = h.connect.host_type; + return socket->read(h.connect.authorizer_len); + }).then([this] (bufferlist authorizer) { + if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { + return seastar::make_ready_future<msgr_tag_t, bufferlist>( + CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); + } + if (require_auth_feature()) { + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features; + feat_missing != 0) { + return seastar::make_ready_future<msgr_tag_t, bufferlist>( + CEPH_MSGR_TAG_FEATURES, bufferlist{}); + } + return dispatcher.ms_verify_authorizer(peer_type, + h.connect.authorizer_protocol, + authorizer); + }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { + memset(&h.reply, 0, sizeof(h.reply)); + if (tag) { + return send_connect_reply(tag, std::move(authorizer_reply)); + } + if (auto existing = messenger.lookup_conn(peer_addr); existing) { + return handle_connect_with_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq > 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, + std::move(authorizer_reply)); + } + h.connect_seq = h.connect.connect_seq + 1; + h.peer_global_seq = h.connect.global_seq; + set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features); + // TODO: cct + return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); + }); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::send_connect_reply(msgr_tag_t tag, + bufferlist&& authorizer_reply) +{ + h.reply.tag = tag; + h.reply.features = static_cast<uint64_t>((h.connect.features & + policy.features_supported) | + policy.features_required); + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + return socket->write_flush(std::move(reply)); + }).then([] { + return stop_t::no; + }); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::send_connect_reply_ready(msgr_tag_t tag, + bufferlist&& authorizer_reply) +{ + h.global_seq = messenger.get_global_seq(); + h.reply.tag = tag; + h.reply.features = policy.features_supported; + h.reply.global_seq = h.global_seq; + h.reply.connect_seq = h.connect_seq; + h.reply.flags = 0; + if (policy.lossy) { + h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; + } + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + if (reply.length()) { + return socket->write(std::move(reply)); + } else { + return seastar::now(); + } + }).then([this] { + if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { + return socket->write_flush(make_static_packet(in_seq)) + .then([this] { + return socket->read_exactly(sizeof(seq_num_t)); + }).then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&out_q, *acked_seq); + }); + } else { + return socket->flush(); + } + }).then([this] { + return stop_t::yes; + }); +} + +seastar::future<> +SocketConnection::handle_keepalive2() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get()); + seastar::shared_future<> f = send_ready.then([this] { + logger().debug("{} keepalive2 {}", *this, k.ack.stamp.tv_sec); + return socket->write_flush(make_static_packet(k.ack)); + }); + send_ready = f.get_future(); + return f.get_future(); + }); +} + +seastar::future<> +SocketConnection::handle_keepalive2_ack() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + auto t = reinterpret_cast<const ceph_timespec*>(buf.get()); + k.ack_stamp = *t; + logger().debug("{} keepalive2 ack {}", *this, t->tv_sec); + }); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply) +{ + if (h.connect.global_seq < existing->peer_global_seq()) { + h.reply.global_seq = existing->peer_global_seq(); + return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); + } else if (existing->is_lossy()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) { + return replace_existing(existing, std::move(authorizer_reply), true); + } else if (h.connect.connect_seq < existing->connect_seq()) { + // old attempt, or we sent READY but they didn't get it. + h.reply.connect_seq = existing->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } else if (h.connect.connect_seq == existing->connect_seq()) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (existing->get_state() == state_t::open || + existing->get_state() == state_t::standby) { + if (policy.resetcheck && existing->connect_seq() == 0) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + h.reply.connect_seq = existing->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } + } else if (peer_addr < messenger.get_myaddr() || + existing->is_server_side()) { + // incoming wins + return replace_existing(existing, std::move(authorizer_reply)); + } else { + return send_connect_reply(CEPH_MSGR_TAG_WAIT); + } + } else if (policy.resetcheck && + existing->connect_seq() == 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); + } else { + return replace_existing(existing, std::move(authorizer_reply)); + } +} + +seastar::future<seastar::stop_iteration> +SocketConnection::replace_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer) +{ + msgr_tag_t reply_tag; + if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && + !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + } else { + reply_tag = CEPH_MSGR_TAG_READY; + } + messenger.unregister_conn(existing); + if (!existing->is_lossy()) { + // reset the in_seq if this is a hard reset from peer, + // otherwise we respect our original connection's value + in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num(); + // steal outgoing queue and out_seq + existing->requeue_sent(); + std::tie(out_seq, out_q) = existing->get_out_queue(); + } + return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::handle_connect_reply(msgr_tag_t tag) +{ + switch (tag) { + case CEPH_MSGR_TAG_FEATURES: + logger().error("{} connect protocol feature mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADPROTOVER: + logger().error("{} connect protocol version mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADAUTHORIZER: + logger().error("{} got bad authorizer", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_RESETSESSION: + reset_session(); + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_RETRY_GLOBAL: + h.global_seq = messenger.get_global_seq(h.reply.global_seq); + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_RETRY_SESSION: + ceph_assert(h.reply.connect_seq > h.connect_seq); + h.connect_seq = h.reply.connect_seq; + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_WAIT: + // TODO: state wait + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_SEQ: + case CEPH_MSGR_TAG_READY: + if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features); + missing) { + logger().error("{} missing required features", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + return seastar::futurize_apply([this, tag] { + if (tag == CEPH_MSGR_TAG_SEQ) { + return socket->read_exactly(sizeof(seq_num_t)) + .then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&out_q, *acked_seq); + return socket->write_flush(make_static_packet(in_seq)); + }); + } + // tag CEPH_MSGR_TAG_READY + return seastar::now(); + }).then([this] { + // hooray! + h.peer_global_seq = h.reply.global_seq; + policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; + h.connect_seq++; + h.backoff = 0ms; + set_features(h.reply.features & h.connect.features); + if (h.authorizer) { + session_security.reset( + get_auth_session_handler(nullptr, + h.authorizer->protocol, + h.authorizer->session_key, + features)); + } + h.authorizer.reset(); + return seastar::make_ready_future<stop_t>(stop_t::yes); + }); + break; + default: + // unknown tag + logger().error("{} got unknown tag", __func__, int(tag)); + throw std::system_error(make_error_code(error::negotiation_failure)); + } +} + +void SocketConnection::reset_session() +{ + decltype(out_q){}.swap(out_q); + decltype(sent){}.swap(sent); + in_seq = 0; + h.connect_seq = 0; + if (HAVE_FEATURE(features, MSG_AUTH)) { + // Set out_seq to a random value, so CRC won't be predictable. + // Constant to limit starting sequence number to 2^31. Nothing special + // about it, just a big number. + constexpr uint64_t SEQ_MASK = 0x7fffffff; + out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK); + } else { + // previously, seq #'s always started at 0. + out_seq = 0; + } +} + +seastar::future<seastar::stop_iteration> +SocketConnection::repeat_connect() +{ + // encode ceph_msg_connect + memset(&h.connect, 0, sizeof(h.connect)); + h.connect.features = policy.features_supported; + h.connect.host_type = messenger.get_myname().type(); + h.connect.global_seq = h.global_seq; + h.connect.connect_seq = h.connect_seq; + h.connect.protocol_version = get_proto_version(peer_type, true); + // this is fyi, actually, server decides! + h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; + + return dispatcher.ms_get_authorizer(peer_type) + .then([this](auto&& auth) { + h.authorizer = std::move(auth); + bufferlist bl; + if (h.authorizer) { + h.connect.authorizer_protocol = h.authorizer->protocol; + h.connect.authorizer_len = h.authorizer->bl.length(); + bl.append(create_static(h.connect)); + bl.append(h.authorizer->bl); + } else { + h.connect.authorizer_protocol = 0; + h.connect.authorizer_len = 0; + bl.append(create_static(h.connect)); + }; + return socket->write_flush(std::move(bl)); + }).then([this] { + // read the reply + return socket->read(sizeof(h.reply)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.reply, p); + ceph_assert(p.end()); + return socket->read(h.reply.authorizer_len); + }).then([this] (bufferlist bl) { + if (h.authorizer) { + auto reply = bl.cbegin(); + if (!h.authorizer->verify_reply(reply, nullptr)) { + logger().error("{} authorizer failed to verify reply", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + } + return handle_connect_reply(h.reply.tag); + }); +} + +void +SocketConnection::start_connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr = _peer_addr; + peer_type = _peer_type; + messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state)); + state = state_t::connecting; + seastar::with_gate(pending_dispatch, [this] { + return seastar::connect(peer_addr.in4_addr()) + .then([this](seastar::connected_socket fd) { + if (state == state_t::closing) { + fd.shutdown_input(); + fd.shutdown_output(); + throw std::system_error(make_error_code(error::connection_aborted)); + } + socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd))); + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + ceph_assert(p.end()); + validate_peer_addr(saddr, peer_addr); + + side = side_t::connector; + socket_port = caddr.get_port(); + return messenger.learned_addr(caddr); + }).then([this] { + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + h.global_seq = messenger.get_global_seq(); + return socket->write_flush(std::move(bl)); + }).then([=] { + return seastar::repeat([this] { + return repeat_connect(); + }); + }).then([this] { + // notify the dispatcher and allow them to reject the connection + return dispatcher.ms_handle_connect(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + }).then([this] { + execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the connecting state + logger().warn("{} connecting fault: {}", *this, eptr); + h.promise.set_value(); + close(); + }); + }); +} + +void +SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& sock, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr.u = _peer_addr.u; + peer_addr.set_port(0); + side = side_t::acceptor; + socket_port = _peer_addr.get_port(); + socket = std::move(sock); + messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state)); + state = state_t::accepting; + seastar::with_gate(pending_dispatch, [this, _peer_addr] { + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + ::encode(_peer_addr, bl, 0); + return socket->write_flush(std::move(bl)) + .then([this] { + // read client's handshake header and connect request + return socket->read(client_header_size); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ceph_assert(p.end()); + peer_addr.set_type(addr.get_type()); + peer_addr.set_port(addr.get_port()); + peer_addr.set_nonce(addr.get_nonce()); + return seastar::repeat([this] { + return repeat_handle_connect(); + }); + }).then([this] { + // notify the dispatcher and allow them to reject the connection + return dispatcher.ms_handle_accept(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + }).then([this] { + messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the accepting state + logger().warn("{} accepting fault: {}", *this, eptr); + h.promise.set_value(); + close(); + }); + }); +} + +void +SocketConnection::execute_open() +{ + logger().debug("{} trigger open, was {}", *this, static_cast<int>(state)); + state = state_t::open; + // satisfy the handshake's promise + h.promise.set_value(); + seastar::with_gate(pending_dispatch, [this] { + // start background processing of tags + return handle_tags() + .handle_exception_type([this] (const std::system_error& e) { + logger().warn("{} open fault: {}", *this, e); + if (e.code() == error::connection_aborted || + e.code() == error::connection_reset) { + return dispatcher.ms_handle_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this())) + .then([this] { + close(); + }); + } else if (e.code() == error::read_eof) { + return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this())) + .then([this] { + close(); + }); + } else { + throw e; + } + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the open state + logger().warn("{} open fault: {}", *this, eptr); + close(); + }); + }); +} + +seastar::future<> SocketConnection::fault() +{ + if (policy.lossy) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + } + if (h.backoff.count()) { + h.backoff += h.backoff; + } else { + h.backoff = conf.ms_initial_backoff; + } + if (h.backoff > conf.ms_max_backoff) { + h.backoff = conf.ms_max_backoff; + } + return seastar::sleep(h.backoff); +} + +seastar::shard_id SocketConnection::shard_id() const { + return messenger.shard_id(); +} + +void SocketConnection::print(ostream& out) const { + messenger.print(out); + if (side == side_t::none) { + out << " >> " << peer_addr; + } else if (side == side_t::acceptor) { + out << " >> " << peer_addr + << "@" << socket_port; + } else { // side == side_t::connector + out << "@" << socket_port + << " >> " << peer_addr; + } +} diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h new file mode 100644 index 00000000..62cc77d5 --- /dev/null +++ b/src/crimson/net/SocketConnection.h @@ -0,0 +1,235 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/shared_future.hh> +#include <seastar/core/sharded.hh> + +#include "msg/Policy.h" +#include "Connection.h" +#include "Socket.h" +#include "crimson/thread/Throttle.h" + +class AuthAuthorizer; +class AuthSessionHandler; + +namespace ceph::net { + +using stop_t = seastar::stop_iteration; + +class SocketMessenger; +class SocketConnection; +using SocketConnectionRef = seastar::shared_ptr<SocketConnection>; + +class SocketConnection : public Connection { + SocketMessenger& messenger; + seastar::foreign_ptr<std::unique_ptr<Socket>> socket; + Dispatcher& dispatcher; + seastar::gate pending_dispatch; + + // if acceptor side, socket_port is different from peer_addr.get_port(); + // if connector side, socket_port is different from my_addr.get_port(). + enum class side_t { + none, + acceptor, + connector + }; + side_t side = side_t::none; + uint16_t socket_port = 0; + + enum class state_t { + none, + accepting, + connecting, + open, + standby, + wait, + closing + }; + state_t state = state_t::none; + + /// become valid only when state is state_t::closing + seastar::shared_future<> close_ready; + + /// state for handshake + struct Handshake { + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + std::unique_ptr<AuthAuthorizer> authorizer; + std::chrono::milliseconds backoff; + uint32_t connect_seq = 0; + uint32_t peer_global_seq = 0; + uint32_t global_seq; + seastar::promise<> promise; + } h; + + /// server side of handshake negotiation + seastar::future<stop_t> repeat_handle_connect(); + seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply); + seastar::future<stop_t> replace_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer = false); + seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply = {}); + seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply); + + seastar::future<> handle_keepalive2(); + seastar::future<> handle_keepalive2_ack(); + + bool require_auth_feature() const; + uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; + /// client side of handshake negotiation + seastar::future<stop_t> repeat_connect(); + seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag); + void reset_session(); + + /// state for an incoming message + struct MessageReader { + ceph_msg_header header; + ceph_msg_footer footer; + bufferlist front; + bufferlist middle; + bufferlist data; + } m; + + seastar::future<> maybe_throttle(); + seastar::future<> handle_tags(); + seastar::future<> handle_ack(); + + /// becomes available when handshake completes, and when all previous messages + /// have been sent to the output stream. send() chains new messages as + /// continuations to this future to act as a queue + seastar::future<> send_ready; + + /// encode/write a message + seastar::future<> write_message(MessageRef msg); + + ceph::net::Policy<ceph::thread::Throttle> policy; + uint64_t features; + void set_features(uint64_t new_features) { + features = new_features; + } + + /// the seq num of the last transmitted message + seq_num_t out_seq = 0; + /// the seq num of the last received message + seq_num_t in_seq = 0; + /// update the seq num of last received message + /// @returns true if the @c seq is valid, and @c in_seq is updated, + /// false otherwise. + bool update_rx_seq(seq_num_t seq); + + seastar::future<> read_message(); + + std::unique_ptr<AuthSessionHandler> session_security; + + // messages to be resent after connection gets reset + std::queue<MessageRef> out_q; + // messages sent, but not yet acked by peer + std::queue<MessageRef> sent; + static void discard_up_to(std::queue<MessageRef>*, seq_num_t); + + struct Keepalive { + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2; + ceph_timespec stamp; + } __attribute__((packed)) req; + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; + ceph_timespec stamp; + } __attribute__((packed)) ack; + ceph_timespec ack_stamp; + } k; + + seastar::future<> fault(); + + void execute_open(); + + seastar::future<> do_send(MessageRef msg); + seastar::future<> do_keepalive(); + seastar::future<> do_close(); + + public: + SocketConnection(SocketMessenger& messenger, + Dispatcher& dispatcher); + ~SocketConnection(); + + Messenger* get_messenger() const override; + + int get_peer_type() const override { + return peer_type; + } + + seastar::future<bool> is_connected() override; + + seastar::future<> send(MessageRef msg) override; + + seastar::future<> keepalive() override; + + seastar::future<> close() override; + + seastar::shard_id shard_id() const override; + + void print(ostream& out) const override; + + public: + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + void start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket, + const entity_addr_t& peer_addr); + + /// the number of connections initiated in this session, increment when a + /// new connection is established + uint32_t connect_seq() const { + return h.connect_seq; + } + + /// the client side should connect us with a gseq. it will be reset with + /// the one of exsting connection if it's greater. + uint32_t peer_global_seq() const { + return h.peer_global_seq; + } + seq_num_t rx_seq_num() const { + return in_seq; + } + + /// current state of connection + state_t get_state() const { + return state; + } + bool is_server_side() const { + return policy.server; + } + bool is_lossy() const { + return policy.lossy; + } + + /// move all messages in the sent list back into the queue + void requeue_sent(); + + std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() { + return {out_seq, std::move(out_q)}; + } +}; + +} // namespace ceph::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc new file mode 100644 index 00000000..46a38ff7 --- /dev/null +++ b/src/crimson/net/SocketMessenger.cc @@ -0,0 +1,283 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketMessenger.h" + +#include <tuple> +#include <boost/functional/hash.hpp> + +#include "auth/Auth.h" +#include "Errors.h" +#include "Dispatcher.h" +#include "Socket.h" + +using namespace ceph::net; + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); + } +} + +SocketMessenger::SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce, + int master_sid) + : Messenger{myname}, + master_sid{master_sid}, + sid{seastar::engine().cpu_id()}, + logic_name{logic_name}, + nonce{nonce} +{} + +seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) +{ + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } + return container().invoke_on_all([my_addrs](auto& msgr) { + return msgr.Messenger::set_myaddrs(my_addrs); + }); +} + +seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) +{ + ceph_assert(addrs.legacy_addr().get_family() == AF_INET); + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } + logger().info("listening on {}", my_addrs.legacy_addr().in4_addr()); + return container().invoke_on_all([my_addrs](auto& msgr) { + msgr.do_bind(my_addrs); + }); +} + +seastar::future<> +SocketMessenger::try_bind(const entity_addrvec_t& addrs, + uint32_t min_port, uint32_t max_port) +{ + auto addr = addrs.legacy_or_front_addr(); + if (addr.get_port() != 0) { + return bind(addrs); + } + ceph_assert(min_port <= max_port); + return seastar::do_with(uint32_t(min_port), + [this, max_port, addr] (auto& port) { + return seastar::repeat([this, max_port, addr, &port] { + auto to_bind = addr; + to_bind.set_port(port); + return bind(entity_addrvec_t{to_bind}) + .then([this] { + logger().info("{}: try_bind: done", *this); + return stop_t::yes; + }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { + logger().debug("{}: try_bind: {} already used", *this, port); + if (port == max_port) { + throw e; + } + ++port; + return stop_t::no; + }); + }); + }); +} + +seastar::future<> SocketMessenger::start(Dispatcher *disp) { + return container().invoke_on_all([disp](auto& msgr) { + return msgr.do_start(disp->get_local_shard()); + }); +} + +seastar::future<ceph::net::ConnectionXRef> +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + auto shard = locate_shard(peer_addr); + return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) { + return msgr.do_connect(peer_addr, peer_type); + }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) { + return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn)); + }); +} + +seastar::future<> SocketMessenger::shutdown() +{ + return container().invoke_on_all([](auto& msgr) { + return msgr.do_shutdown(); + }).finally([this] { + return container().invoke_on_all([](auto& msgr) { + msgr.shutdown_promise.set_value(); + }); + }); +} + +void SocketMessenger::do_bind(const entity_addrvec_t& addrs) +{ + Messenger::set_myaddrs(addrs); + + // TODO: v2: listen on multiple addresses + seastar::socket_address address(addrs.legacy_addr().in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + listener = seastar::listen(address, lo); +} + +seastar::future<> SocketMessenger::do_start(Dispatcher *disp) +{ + dispatcher = disp; + + // start listening if bind() was called + if (listener) { + seastar::keep_doing([this] { + return listener->accept() + .then([this] (seastar::connected_socket socket, + seastar::socket_address paddr) { + // allocate the connection + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + auto shard = locate_shard(peer_addr); +#warning fixme + // we currently do dangerous i/o from a Connection core, different from the Socket core. + auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket))); + // don't wait before accepting another + container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable { + SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher); + conn->start_accept(std::move(sock), peer_addr); + }); + }); + }).handle_exception_type([this] (const std::system_error& e) { + // stop gracefully on connection_aborted + if (e.code() != error::connection_aborted) { + logger().error("{} unexpected error during accept: {}", *this, e); + } + }); + } + + return seastar::now(); +} + +seastar::foreign_ptr<ceph::net::ConnectionRef> +SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + if (auto found = lookup_conn(peer_addr); found) { + return seastar::make_foreign(found->shared_from_this()); + } + SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher); + conn->start_connect(peer_addr, peer_type); + return seastar::make_foreign(conn->shared_from_this()); +} + +seastar::future<> SocketMessenger::do_shutdown() +{ + if (listener) { + listener->abort_accept(); + } + // close all connections + return seastar::parallel_for_each(accepting_conns, [] (auto conn) { + return conn->close(); + }).then([this] { + ceph_assert(accepting_conns.empty()); + return seastar::parallel_for_each(connections, [] (auto conn) { + return conn.second->close(); + }); + }).finally([this] { + ceph_assert(connections.empty()); + }); +} + +seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + if (!get_myaddr().is_blank_ip()) { + // already learned or binded + return seastar::now(); + } + + // Only learn IP address if blank. + entity_addr_t addr = get_myaddr(); + addr.u = peer_addr_for_me.u; + addr.set_type(peer_addr_for_me.get_type()); + addr.set_port(get_myaddr().get_port()); + return set_myaddrs(entity_addrvec_t{addr}); +} + +void SocketMessenger::set_default_policy(const SocketPolicy& p) +{ + policy_set.set_default(p); +} + +void SocketMessenger::set_policy(entity_type_t peer_type, + const SocketPolicy& p) +{ + policy_set.set(peer_type, p); +} + +void SocketMessenger::set_policy_throttler(entity_type_t peer_type, + Throttle* throttle) +{ + // only byte throttler is used in OSD + policy_set.set_throttlers(peer_type, throttle, nullptr); +} + +seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr) +{ + ceph_assert(addr.get_family() == AF_INET); + if (master_sid >= 0) { + return master_sid; + } + std::size_t seed = 0; + boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr); + //boost::hash_combine(seed, addr.u.sin.sin_port); + //boost::hash_combine(seed, addr.nonce); + return seed % seastar::smp::count; +} + +ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) +{ + if (auto found = connections.find(addr); + found != connections.end()) { + return found->second; + } else { + return nullptr; + } +} + +void SocketMessenger::accept_conn(SocketConnectionRef conn) +{ + accepting_conns.insert(conn); +} + +void SocketMessenger::unaccept_conn(SocketConnectionRef conn) +{ + accepting_conns.erase(conn); +} + +void SocketMessenger::register_conn(SocketConnectionRef conn) +{ + if (master_sid >= 0) { + ceph_assert(static_cast<int>(sid) == master_sid); + } + auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); + std::ignore = i; + ceph_assert(added); +} + +void SocketMessenger::unregister_conn(SocketConnectionRef conn) +{ + ceph_assert(conn); + auto found = connections.find(conn->get_peer_addr()); + ceph_assert(found != connections.end()); + ceph_assert(found->second == conn); + connections.erase(found); +} diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h new file mode 100644 index 00000000..535dea3a --- /dev/null +++ b/src/crimson/net/SocketMessenger.h @@ -0,0 +1,119 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <map> +#include <optional> +#include <set> +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> + +#include "Messenger.h" +#include "SocketConnection.h" + +namespace ceph::net { + +class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> { + const int master_sid; + const seastar::shard_id sid; + seastar::promise<> shutdown_promise; + + std::optional<seastar::server_socket> listener; + Dispatcher *dispatcher = nullptr; + std::map<entity_addr_t, SocketConnectionRef> connections; + std::set<SocketConnectionRef> accepting_conns; + ceph::net::PolicySet<Throttle> policy_set; + // Distinguish messengers with meaningful names for debugging + const std::string logic_name; + const uint32_t nonce; + + seastar::future<> accept(seastar::connected_socket socket, + seastar::socket_address paddr); + + void do_bind(const entity_addrvec_t& addr); + seastar::future<> do_start(Dispatcher *disp); + seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + seastar::future<> do_shutdown(); + // conn sharding options: + // 0. Compatible (master_sid >= 0): place all connections to one master shard + // 1. Simplest (master_sid < 0): sharded by ip only + // 2. Balanced (not implemented): sharded by ip + port + nonce, + // but, need to move SocketConnection between cores. + seastar::shard_id locate_shard(const entity_addr_t& addr); + + public: + SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce, + int master_sid); + + seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override; + + // Messenger interfaces are assumed to be called from its own shard, but its + // behavior should be symmetric when called from any shard. + seastar::future<> bind(const entity_addrvec_t& addr) override; + + seastar::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) override; + + seastar::future<> start(Dispatcher *dispatcher) override; + + seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; + // can only wait once + seastar::future<> wait() override { + return shutdown_promise.get_future(); + } + + seastar::future<> shutdown() override; + + Messenger* get_local_shard() override { + return &container().local(); + } + + void print(ostream& out) const override { + out << get_myname() + << "(" << logic_name + << ") " << get_myaddr(); + } + + void set_default_policy(const SocketPolicy& p) override; + + void set_policy(entity_type_t peer_type, const SocketPolicy& p) override; + + void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; + + public: + seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me); + + SocketConnectionRef lookup_conn(const entity_addr_t& addr); + void accept_conn(SocketConnectionRef); + void unaccept_conn(SocketConnectionRef); + void register_conn(SocketConnectionRef); + void unregister_conn(SocketConnectionRef); + + // required by sharded<> + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + + seastar::shard_id shard_id() const { + return sid; + } +}; + +} // namespace ceph::net |