summaryrefslogtreecommitdiffstats
path: root/src/crimson/net
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net')
-rw-r--r--src/crimson/net/Config.h26
-rw-r--r--src/crimson/net/Connection.h66
-rw-r--r--src/crimson/net/Dispatcher.cc11
-rw-r--r--src/crimson/net/Dispatcher.h65
-rw-r--r--src/crimson/net/Errors.cc107
-rw-r--r--src/crimson/net/Errors.h54
-rw-r--r--src/crimson/net/Fwd.h52
-rw-r--r--src/crimson/net/Messenger.cc21
-rw-r--r--src/crimson/net/Messenger.h117
-rw-r--r--src/crimson/net/Socket.cc81
-rw-r--r--src/crimson/net/Socket.h59
-rw-r--r--src/crimson/net/SocketConnection.cc972
-rw-r--r--src/crimson/net/SocketConnection.h235
-rw-r--r--src/crimson/net/SocketMessenger.cc283
-rw-r--r--src/crimson/net/SocketMessenger.h119
15 files changed, 2268 insertions, 0 deletions
diff --git a/src/crimson/net/Config.h b/src/crimson/net/Config.h
new file mode 100644
index 00000000..90929bde
--- /dev/null
+++ b/src/crimson/net/Config.h
@@ -0,0 +1,26 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+// XXX: a poor man's md_config_t
+#pragma once
+
+#include "include/msgr.h"
+#include <chrono>
+
+namespace ceph::net {
+
+using namespace std::literals::chrono_literals;
+
+constexpr struct simple_md_config_t {
+ uint32_t host_type = CEPH_ENTITY_TYPE_OSD;
+ bool cephx_require_signatures = false;
+ bool cephx_cluster_require_signatures = false;
+ bool cephx_service_require_signatures = false;
+ bool ms_die_on_old_message = true;
+ bool ms_die_on_skipped_message = true;
+ std::chrono::milliseconds ms_initial_backoff = 200ms;
+ std::chrono::milliseconds ms_max_backoff = 15000ms;
+ std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms;
+ size_t osd_client_message_size_cap = 500ULL << 20;
+} conf;
+}
diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h
new file mode 100644
index 00000000..b1b72c74
--- /dev/null
+++ b/src/crimson/net/Connection.h
@@ -0,0 +1,66 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <queue>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
+
+#include "Fwd.h"
+
+namespace ceph::net {
+
+using seq_num_t = uint64_t;
+
+class Connection : public seastar::enable_shared_from_this<Connection> {
+ protected:
+ entity_addr_t peer_addr;
+ peer_type_t peer_type = -1;
+
+ public:
+ Connection() {}
+ virtual ~Connection() {}
+
+ virtual Messenger* get_messenger() const = 0;
+ const entity_addr_t& get_peer_addr() const { return peer_addr; }
+ virtual int get_peer_type() const = 0;
+
+ /// true if the handshake has completed and no errors have been encountered
+ virtual seastar::future<bool> is_connected() = 0;
+
+ /// send a message over a connection that has completed its handshake
+ virtual seastar::future<> send(MessageRef msg) = 0;
+
+ /// send a keepalive message over a connection that has completed its
+ /// handshake
+ virtual seastar::future<> keepalive() = 0;
+
+ /// close the connection and cancel any any pending futures from read/send
+ virtual seastar::future<> close() = 0;
+
+ /// which shard id the connection lives
+ virtual seastar::shard_id shard_id() const = 0;
+
+ virtual void print(ostream& out) const = 0;
+};
+
+inline ostream& operator<<(ostream& out, const Connection& conn) {
+ out << "[";
+ conn.print(out);
+ out << "]";
+ return out;
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Dispatcher.cc b/src/crimson/net/Dispatcher.cc
new file mode 100644
index 00000000..336ded38
--- /dev/null
+++ b/src/crimson/net/Dispatcher.cc
@@ -0,0 +1,11 @@
+#include "auth/Auth.h"
+#include "Dispatcher.h"
+
+namespace ceph::net
+{
+seastar::future<std::unique_ptr<AuthAuthorizer>>
+Dispatcher::ms_get_authorizer(peer_type_t)
+{
+ return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(nullptr);
+}
+}
diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h
new file mode 100644
index 00000000..cbde1549
--- /dev/null
+++ b/src/crimson/net/Dispatcher.h
@@ -0,0 +1,65 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/sharded.hh>
+
+#include "Fwd.h"
+
+class AuthAuthorizer;
+
+namespace ceph::net {
+
+class Dispatcher {
+ public:
+ virtual ~Dispatcher() {}
+
+ virtual seastar::future<> ms_dispatch(ConnectionRef conn, MessageRef m) {
+ return seastar::make_ready_future<>();
+ }
+
+ virtual seastar::future<> ms_handle_accept(ConnectionRef conn) {
+ return seastar::make_ready_future<>();
+ }
+
+ virtual seastar::future<> ms_handle_connect(ConnectionRef conn) {
+ return seastar::make_ready_future<>();
+ }
+
+ virtual seastar::future<> ms_handle_reset(ConnectionRef conn) {
+ return seastar::make_ready_future<>();
+ }
+
+ virtual seastar::future<> ms_handle_remote_reset(ConnectionRef conn) {
+ return seastar::make_ready_future<>();
+ }
+
+ virtual seastar::future<msgr_tag_t, bufferlist>
+ ms_verify_authorizer(peer_type_t,
+ auth_proto_t,
+ bufferlist&) {
+ return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{});
+ }
+ virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
+ ms_get_authorizer(peer_type_t);
+
+ // get the local dispatcher shard if it is accessed by another core
+ virtual Dispatcher* get_local_shard() {
+ return this;
+ }
+};
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc
new file mode 100644
index 00000000..62d60ce1
--- /dev/null
+++ b/src/crimson/net/Errors.cc
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Errors.h"
+
+namespace ceph::net {
+
+const std::error_category& net_category()
+{
+ struct category : public std::error_category {
+ const char* name() const noexcept override {
+ return "ceph::net";
+ }
+
+ std::string message(int ev) const override {
+ switch (static_cast<error>(ev)) {
+ case error::success:
+ return "success";
+ case error::bad_connect_banner:
+ return "bad connect banner";
+ case error::bad_peer_address:
+ return "bad peer address";
+ case error::negotiation_failure:
+ return "negotiation failure";
+ case error::read_eof:
+ return "read eof";
+ case error::connection_aborted:
+ return "connection aborted";
+ case error::connection_refused:
+ return "connection refused";
+ case error::connection_reset:
+ return "connection reset";
+ default:
+ return "unknown";
+ }
+ }
+
+ // unfortunately, seastar throws connection errors with the system category,
+ // rather than the generic category that would match their counterparts
+ // in std::errc. we add our own errors for them, so we can match either
+ std::error_condition default_error_condition(int ev) const noexcept override {
+ switch (static_cast<error>(ev)) {
+ case error::connection_aborted:
+ return std::errc::connection_aborted;
+ case error::connection_refused:
+ return std::errc::connection_refused;
+ case error::connection_reset:
+ return std::errc::connection_reset;
+ default:
+ return std::error_condition(ev, *this);
+ }
+ }
+
+ bool equivalent(int code, const std::error_condition& cond) const noexcept override {
+ if (error_category::equivalent(code, cond)) {
+ return true;
+ }
+ switch (static_cast<error>(code)) {
+ case error::connection_aborted:
+ return cond == std::errc::connection_aborted
+ || cond == std::error_condition(ECONNABORTED, std::system_category());
+ case error::connection_refused:
+ return cond == std::errc::connection_refused
+ || cond == std::error_condition(ECONNREFUSED, std::system_category());
+ case error::connection_reset:
+ return cond == std::errc::connection_reset
+ || cond == std::error_condition(ECONNRESET, std::system_category());
+ default:
+ return false;
+ }
+ }
+
+ bool equivalent(const std::error_code& code, int cond) const noexcept override {
+ if (error_category::equivalent(code, cond)) {
+ return true;
+ }
+ switch (static_cast<error>(cond)) {
+ case error::connection_aborted:
+ return code == std::errc::connection_aborted
+ || code == std::error_code(ECONNABORTED, std::system_category());
+ case error::connection_refused:
+ return code == std::errc::connection_refused
+ || code == std::error_code(ECONNREFUSED, std::system_category());
+ case error::connection_reset:
+ return code == std::errc::connection_reset
+ || code == std::error_code(ECONNRESET, std::system_category());
+ default:
+ return false;
+ }
+ }
+ };
+ static category instance;
+ return instance;
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h
new file mode 100644
index 00000000..d75082fd
--- /dev/null
+++ b/src/crimson/net/Errors.h
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <system_error>
+
+namespace ceph::net {
+
+/// net error codes
+enum class error {
+ success = 0,
+ bad_connect_banner,
+ bad_peer_address,
+ negotiation_failure,
+ read_eof,
+ connection_aborted,
+ connection_refused,
+ connection_reset,
+};
+
+/// net error category
+const std::error_category& net_category();
+
+inline std::error_code make_error_code(error e)
+{
+ return {static_cast<int>(e), net_category()};
+}
+
+inline std::error_condition make_error_condition(error e)
+{
+ return {static_cast<int>(e), net_category()};
+}
+
+} // namespace ceph::net
+
+namespace std {
+
+/// enables implicit conversion to std::error_condition
+template <>
+struct is_error_condition_enum<ceph::net::error> : public true_type {};
+
+} // namespace std
diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h
new file mode 100644
index 00000000..8a0a1c96
--- /dev/null
+++ b/src/crimson/net/Fwd.h
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/sharded.hh>
+
+#include "msg/msg_types.h"
+#include "msg/Message.h"
+
+using peer_type_t = int;
+using auth_proto_t = int;
+
+namespace ceph::net {
+
+using msgr_tag_t = uint8_t;
+
+class Connection;
+using ConnectionRef = seastar::shared_ptr<Connection>;
+// NOTE: ConnectionXRef should only be used in seastar world, because
+// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads.
+using ConnectionXRef = seastar::lw_shared_ptr<seastar::foreign_ptr<ConnectionRef>>;
+
+class Dispatcher;
+
+class Messenger;
+
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+ auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+ return sharded_obj->start(args...).then([sharded_obj]() {
+ auto ret = &sharded_obj->local();
+ seastar::engine().at_exit([sharded_obj]() {
+ return sharded_obj->stop().finally([sharded_obj] {});
+ });
+ return ret;
+ });
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc
new file mode 100644
index 00000000..7f8665ef
--- /dev/null
+++ b/src/crimson/net/Messenger.cc
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Messenger.h"
+#include "SocketMessenger.h"
+
+namespace ceph::net {
+
+seastar::future<Messenger*>
+Messenger::create(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const int master_sid)
+{
+ return create_sharded<SocketMessenger>(name, lname, nonce, master_sid)
+ .then([](Messenger *msgr) {
+ return msgr;
+ });
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h
new file mode 100644
index 00000000..9d766cb0
--- /dev/null
+++ b/src/crimson/net/Messenger.h
@@ -0,0 +1,117 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/future.hh>
+
+#include "Fwd.h"
+#include "crimson/thread/Throttle.h"
+#include "msg/Policy.h"
+
+class AuthAuthorizer;
+
+namespace ceph::net {
+
+using Throttle = ceph::thread::Throttle;
+using SocketPolicy = ceph::net::Policy<Throttle>;
+
+class Messenger {
+ entity_name_t my_name;
+ entity_addrvec_t my_addrs;
+ uint32_t global_seq = 0;
+ uint32_t crc_flags = 0;
+
+ public:
+ Messenger(const entity_name_t& name)
+ : my_name(name)
+ {}
+ virtual ~Messenger() {}
+
+ const entity_name_t& get_myname() const { return my_name; }
+ const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
+ entity_addr_t get_myaddr() const { return my_addrs.front(); }
+ virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) {
+ my_addrs = addrs;
+ return seastar::now();
+ }
+
+ /// bind to the given address
+ virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0;
+
+ /// try to bind to the first unused port of given address
+ virtual seastar::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) = 0;
+
+ /// start the messenger
+ virtual seastar::future<> start(Dispatcher *dispatcher) = 0;
+
+ /// either return an existing connection to the peer,
+ /// or a new pending connection
+ virtual seastar::future<ConnectionXRef>
+ connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) = 0;
+
+ // wait for messenger shutdown
+ virtual seastar::future<> wait() = 0;
+
+ /// stop listenening and wait for all connections to close. safe to destruct
+ /// after this future becomes available
+ virtual seastar::future<> shutdown() = 0;
+
+ uint32_t get_global_seq(uint32_t old=0) {
+ if (old > global_seq) {
+ global_seq = old;
+ }
+ return ++global_seq;
+ }
+
+ uint32_t get_crc_flags() const {
+ return crc_flags;
+ }
+ void set_crc_data() {
+ crc_flags |= MSG_CRC_DATA;
+ }
+ void set_crc_header() {
+ crc_flags |= MSG_CRC_HEADER;
+ }
+
+ // get the local messenger shard if it is accessed by another core
+ virtual Messenger* get_local_shard() {
+ return this;
+ }
+
+ virtual void print(ostream& out) const = 0;
+
+ virtual void set_default_policy(const SocketPolicy& p) = 0;
+
+ virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0;
+
+ virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0;
+
+ static seastar::future<Messenger*>
+ create(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const int master_sid=-1);
+};
+
+inline ostream& operator<<(ostream& out, const Messenger& msgr) {
+ out << "[";
+ msgr.print(out);
+ out << "]";
+ return out;
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc
new file mode 100644
index 00000000..a22e9b2e
--- /dev/null
+++ b/src/crimson/net/Socket.cc
@@ -0,0 +1,81 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Socket.h"
+
+#include "Errors.h"
+
+namespace ceph::net {
+
+namespace {
+
+// an input_stream consumer that reads buffer segments into a bufferlist up to
+// the given number of remaining bytes
+struct bufferlist_consumer {
+ bufferlist& bl;
+ size_t& remaining;
+
+ bufferlist_consumer(bufferlist& bl, size_t& remaining)
+ : bl(bl), remaining(remaining) {}
+
+ using tmp_buf = seastar::temporary_buffer<char>;
+ using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
+
+ // consume some or all of a buffer segment
+ seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
+ if (remaining >= data.size()) {
+ // consume the whole buffer
+ remaining -= data.size();
+ bl.append(buffer::create_foreign(std::move(data)));
+ if (remaining > 0) {
+ // return none to request more segments
+ return seastar::make_ready_future<consumption_result_type>(
+ seastar::continue_consuming{});
+ } else {
+ // return an empty buffer to singal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type({}));
+ }
+ }
+ if (remaining > 0) {
+ // consume the front
+ bl.append(buffer::create_foreign(data.share(0, remaining)));
+ data.trim_front(remaining);
+ remaining = 0;
+ }
+ // give the rest back to signal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type{std::move(data)});
+ };
+};
+
+} // anonymous namespace
+
+seastar::future<bufferlist> Socket::read(size_t bytes)
+{
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferlist>();
+ }
+ r.buffer.clear();
+ r.remaining = bytes;
+ return in.consume(bufferlist_consumer{r.buffer, r.remaining})
+ .then([this] {
+ if (r.remaining) { // throw on short reads
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+ });
+}
+
+seastar::future<seastar::temporary_buffer<char>>
+Socket::read_exactly(size_t bytes) {
+ return in.read_exactly(bytes)
+ .then([this](auto buf) {
+ if (buf.empty()) {
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ return seastar::make_ready_future<tmp_buf>(std::move(buf));
+ });
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h
new file mode 100644
index 00000000..c1a2ed59
--- /dev/null
+++ b/src/crimson/net/Socket.h
@@ -0,0 +1,59 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/reactor.hh>
+#include <seastar/net/packet.hh>
+
+#include "include/buffer.h"
+
+namespace ceph::net {
+
+class Socket
+{
+ const seastar::shard_id sid;
+ seastar::connected_socket socket;
+ seastar::input_stream<char> in;
+ seastar::output_stream<char> out;
+
+ /// buffer state for read()
+ struct {
+ bufferlist buffer;
+ size_t remaining;
+ } r;
+
+ public:
+ explicit Socket(seastar::connected_socket&& _socket)
+ : sid{seastar::engine().cpu_id()},
+ socket(std::move(_socket)),
+ in(socket.input()),
+ out(socket.output()) {}
+ Socket(Socket&& o) = delete;
+
+ /// read the requested number of bytes into a bufferlist
+ seastar::future<bufferlist> read(size_t bytes);
+ using tmp_buf = seastar::temporary_buffer<char>;
+ using packet = seastar::net::packet;
+ seastar::future<tmp_buf> read_exactly(size_t bytes);
+
+ seastar::future<> write(packet&& buf) {
+ return out.write(std::move(buf));
+ }
+ seastar::future<> flush() {
+ return out.flush();
+ }
+ seastar::future<> write_flush(packet&& buf) {
+ return out.write(std::move(buf)).then([this] { return out.flush(); });
+ }
+
+ /// Socket can only be closed once.
+ seastar::future<> close() {
+ return seastar::smp::submit_to(sid, [this] {
+ return seastar::when_all(
+ in.close(), out.close()).discard_result();
+ });
+ }
+};
+
+} // namespace ceph::net
diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc
new file mode 100644
index 00000000..2907c486
--- /dev/null
+++ b/src/crimson/net/SocketConnection.cc
@@ -0,0 +1,972 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "SocketConnection.h"
+
+#include <algorithm>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/net/packet.hh>
+
+#include "include/msgr.h"
+#include "include/random.h"
+#include "auth/Auth.h"
+#include "auth/AuthSessionHandler.h"
+
+#include "crimson/common/log.h"
+#include "Config.h"
+#include "Dispatcher.h"
+#include "Errors.h"
+#include "SocketMessenger.h"
+
+using namespace ceph::net;
+
+template <typename T>
+seastar::net::packet make_static_packet(const T& value) {
+ return { reinterpret_cast<const char*>(&value), sizeof(value) };
+}
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_ms);
+ }
+}
+
+SocketConnection::SocketConnection(SocketMessenger& messenger,
+ Dispatcher& dispatcher)
+ : messenger(messenger),
+ dispatcher(dispatcher),
+ send_ready(h.promise.get_future())
+{
+ ceph_assert(&messenger.container().local() == &messenger);
+}
+
+SocketConnection::~SocketConnection()
+{
+ ceph_assert(pending_dispatch.is_closed());
+}
+
+ceph::net::Messenger*
+SocketConnection::get_messenger() const {
+ return &messenger;
+}
+
+seastar::future<bool> SocketConnection::is_connected()
+{
+ return seastar::smp::submit_to(shard_id(), [this] {
+ return !send_ready.failed();
+ });
+}
+
+seastar::future<> SocketConnection::send(MessageRef msg)
+{
+ return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
+ if (state == state_t::closing)
+ return seastar::now();
+ return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
+ return do_send(std::move(msg))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} send fault: {}", *this, eptr);
+ close();
+ });
+ });
+ });
+}
+
+seastar::future<> SocketConnection::keepalive()
+{
+ return seastar::smp::submit_to(shard_id(), [this] {
+ if (state == state_t::closing)
+ return seastar::now();
+ return seastar::with_gate(pending_dispatch, [this] {
+ return do_keepalive()
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} keepalive fault: {}", *this, eptr);
+ close();
+ });
+ });
+ });
+}
+
+seastar::future<> SocketConnection::close()
+{
+ return seastar::smp::submit_to(shard_id(), [this] {
+ return do_close();
+ });
+}
+
+seastar::future<> SocketConnection::handle_tags()
+{
+ return seastar::keep_doing([this] {
+ // read the next tag
+ return socket->read_exactly(1)
+ .then([this] (auto buf) {
+ switch (buf[0]) {
+ case CEPH_MSGR_TAG_MSG:
+ return read_message();
+ case CEPH_MSGR_TAG_ACK:
+ return handle_ack();
+ case CEPH_MSGR_TAG_KEEPALIVE:
+ return seastar::now();
+ case CEPH_MSGR_TAG_KEEPALIVE2:
+ return handle_keepalive2();
+ case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+ return handle_keepalive2_ack();
+ case CEPH_MSGR_TAG_CLOSE:
+ logger().info("{} got tag close", *this);
+ throw std::system_error(make_error_code(error::connection_aborted));
+ default:
+ logger().error("{} got unknown msgr tag {}", *this, static_cast<int>(buf[0]));
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ });
+ });
+}
+
+seastar::future<> SocketConnection::handle_ack()
+{
+ return socket->read_exactly(sizeof(ceph_le64))
+ .then([this] (auto buf) {
+ auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
+ discard_up_to(&sent, *seq);
+ });
+}
+
+void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
+ seq_num_t seq)
+{
+ while (!queue->empty() &&
+ queue->front()->get_seq() < seq) {
+ queue->pop();
+ }
+}
+
+void SocketConnection::requeue_sent()
+{
+ out_seq -= sent.size();
+ while (!sent.empty()) {
+ auto m = sent.front();
+ sent.pop();
+ out_q.push(std::move(m));
+ }
+}
+
+seastar::future<> SocketConnection::maybe_throttle()
+{
+ if (!policy.throttler_bytes) {
+ return seastar::now();
+ }
+ const auto to_read = (m.header.front_len +
+ m.header.middle_len +
+ m.header.data_len);
+ return policy.throttler_bytes->get(to_read);
+}
+
+seastar::future<> SocketConnection::read_message()
+{
+ return socket->read(sizeof(m.header))
+ .then([this] (bufferlist bl) {
+ // throttle the traffic, maybe
+ auto p = bl.cbegin();
+ ::decode(m.header, p);
+ return maybe_throttle();
+ }).then([this] {
+ // read front
+ return socket->read(m.header.front_len);
+ }).then([this] (bufferlist bl) {
+ m.front = std::move(bl);
+ // read middle
+ return socket->read(m.header.middle_len);
+ }).then([this] (bufferlist bl) {
+ m.middle = std::move(bl);
+ // read data
+ return socket->read(m.header.data_len);
+ }).then([this] (bufferlist bl) {
+ m.data = std::move(bl);
+ // read footer
+ return socket->read(sizeof(m.footer));
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(m.footer, p);
+ auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
+ m.front, m.middle, m.data, nullptr);
+ // TODO: set time stamps
+ msg->set_byte_throttler(policy.throttler_bytes);
+
+ if (!update_rx_seq(msg->get_seq())) {
+ // skip this message
+ return;
+ }
+
+ constexpr bool add_ref = false; // Message starts with 1 ref
+ // TODO: change MessageRef with foreign_ptr
+ auto msg_ref = MessageRef{msg, add_ref};
+ // start dispatch, ignoring exceptions from the application layer
+ seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+ return dispatcher.ms_dispatch(
+ seastar::static_pointer_cast<SocketConnection>(shared_from_this()),
+ std::move(msg))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_dispatch caught exception: {}", *this, eptr);
+ ceph_assert(false);
+ });
+ });
+ });
+}
+
+bool SocketConnection::update_rx_seq(seq_num_t seq)
+{
+ if (seq <= in_seq) {
+ if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
+ conf.ms_die_on_old_message) {
+ ceph_abort_msg("old msgs despite reconnect_seq feature");
+ }
+ return false;
+ } else if (seq > in_seq + 1) {
+ if (conf.ms_die_on_skipped_message) {
+ ceph_abort_msg("skipped incoming seq");
+ }
+ return false;
+ } else {
+ in_seq = seq;
+ return true;
+ }
+}
+
+seastar::future<> SocketConnection::write_message(MessageRef msg)
+{
+ msg->set_seq(++out_seq);
+ auto& header = msg->get_header();
+ header.src = messenger.get_myname();
+ msg->encode(features, messenger.get_crc_flags());
+ bufferlist bl;
+ bl.append(CEPH_MSGR_TAG_MSG);
+ bl.append((const char*)&header, sizeof(header));
+ bl.append(msg->get_payload());
+ bl.append(msg->get_middle());
+ bl.append(msg->get_data());
+ auto& footer = msg->get_footer();
+ if (HAVE_FEATURE(features, MSG_AUTH)) {
+ bl.append((const char*)&footer, sizeof(footer));
+ } else {
+ ceph_msg_footer_old old_footer;
+ if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
+ old_footer.front_crc = footer.front_crc;
+ old_footer.middle_crc = footer.middle_crc;
+ } else {
+ old_footer.front_crc = old_footer.middle_crc = 0;
+ }
+ if (messenger.get_crc_flags() & MSG_CRC_DATA) {
+ old_footer.data_crc = footer.data_crc;
+ } else {
+ old_footer.data_crc = 0;
+ }
+ old_footer.flags = footer.flags;
+ bl.append((const char*)&old_footer, sizeof(old_footer));
+ }
+ // write as a seastar::net::packet
+ return socket->write_flush(std::move(bl));
+ // TODO: lossless policy
+ // .then([this, msg = std::move(msg)] {
+ // if (!policy.lossy) {
+ // sent.push(std::move(msg));
+ // }
+ // });
+}
+
+seastar::future<> SocketConnection::do_send(MessageRef msg)
+{
+ // chain the message after the last message is sent
+ // TODO: retry send for lossless connection
+ seastar::shared_future<> f = send_ready.then(
+ [this, msg = std::move(msg)] {
+ if (state == state_t::closing)
+ return seastar::now();
+ return write_message(std::move(msg));
+ });
+
+ // chain any later messages after this one completes
+ send_ready = f.get_future();
+ // allow the caller to wait on the same future
+ return f.get_future();
+}
+
+seastar::future<> SocketConnection::do_keepalive()
+{
+ // TODO: retry keepalive for lossless connection
+ seastar::shared_future<> f = send_ready.then([this] {
+ if (state == state_t::closing)
+ return seastar::now();
+ k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+ ceph::coarse_real_clock::now());
+ return socket->write_flush(make_static_packet(k.req));
+ });
+ send_ready = f.get_future();
+ return f.get_future();
+}
+
+seastar::future<> SocketConnection::do_close()
+{
+ if (state == state_t::closing) {
+ // already closing
+ assert(close_ready.valid());
+ return close_ready.get_future();
+ }
+
+ // unregister_conn() drops a reference, so hold another until completion
+ auto cleanup = [conn_ref = shared_from_this(), this] {
+ logger().debug("{} closed!", *this);
+ };
+
+ if (state == state_t::accepting) {
+ messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ } else if (state >= state_t::connecting && state < state_t::closing) {
+ messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ } else {
+ // cannot happen
+ ceph_assert(false);
+ }
+
+ // close_ready become valid only after state is state_t::closing
+ assert(!close_ready.valid());
+
+ if (socket) {
+ close_ready = socket->close()
+ .then([this] {
+ return pending_dispatch.close();
+ }).finally(std::move(cleanup));
+ } else {
+ ceph_assert(state == state_t::connecting);
+ close_ready = pending_dispatch.close().finally(std::move(cleanup));
+ }
+ logger().debug("{} trigger closing, was {}", *this, static_cast<int>(state));
+ state = state_t::closing;
+ return close_ready.get_future();
+}
+
+// handshake
+
+/// store the banner in a non-const string for buffer::create_static()
+static char banner[] = CEPH_BANNER;
+constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
+
+constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
+constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
+
+WRITE_RAW_ENCODER(ceph_msg_connect);
+WRITE_RAW_ENCODER(ceph_msg_connect_reply);
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
+{
+ return out << "connect{features=" << std::hex << c.features << std::dec
+ << " host_type=" << c.host_type
+ << " global_seq=" << c.global_seq
+ << " connect_seq=" << c.connect_seq
+ << " protocol_version=" << c.protocol_version
+ << " authorizer_protocol=" << c.authorizer_protocol
+ << " authorizer_len=" << c.authorizer_len
+ << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
+}
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
+{
+ return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
+ << " features=" << std::hex << r.features << std::dec
+ << " global_seq=" << r.global_seq
+ << " connect_seq=" << r.connect_seq
+ << " protocol_version=" << r.protocol_version
+ << " authorizer_len=" << r.authorizer_len
+ << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
+}
+
+// check that the buffer starts with a valid banner without requiring it to
+// be contiguous in memory
+static void validate_banner(bufferlist::const_iterator& p)
+{
+ auto b = std::cbegin(banner);
+ auto end = b + banner_size;
+ while (b != end) {
+ const char *buf{nullptr};
+ auto remaining = std::distance(b, end);
+ auto len = p.get_ptr_and_advance(remaining, &buf);
+ if (!std::equal(buf, buf + len, b)) {
+ throw std::system_error(make_error_code(error::bad_connect_banner));
+ }
+ b += len;
+ }
+}
+
+// make sure that we agree with the peer about its address
+static void validate_peer_addr(const entity_addr_t& addr,
+ const entity_addr_t& expected)
+{
+ if (addr == expected) {
+ return;
+ }
+ // ok if server bound anonymously, as long as port/nonce match
+ if (addr.is_blank_ip() &&
+ addr.get_port() == expected.get_port() &&
+ addr.get_nonce() == expected.get_nonce()) {
+ return;
+ } else {
+ throw std::system_error(make_error_code(error::bad_peer_address));
+ }
+}
+
+/// return a static bufferptr to the given object
+template <typename T>
+bufferptr create_static(T& obj)
+{
+ return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
+}
+
+bool SocketConnection::require_auth_feature() const
+{
+ if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
+ return false;
+ }
+ if (conf.cephx_require_signatures) {
+ return true;
+ }
+ if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
+ h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
+ return conf.cephx_cluster_require_signatures;
+ } else {
+ return conf.cephx_service_require_signatures;
+ }
+}
+
+uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
+{
+ constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
+ // see also OSD.h, unlike other connection of simple/async messenger,
+ // crimson msgr is only used by osd
+ constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
+ if (peer_type == my_type) {
+ // internal
+ return CEPH_OSD_PROTOCOL;
+ } else {
+ // public
+ switch (connect ? peer_type : my_type) {
+ case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
+ case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
+ case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
+ default: return 0;
+ }
+ }
+}
+
+seastar::future<seastar::stop_iteration>
+SocketConnection::repeat_handle_connect()
+{
+ return socket->read(sizeof(h.connect))
+ .then([this](bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(h.connect, p);
+ peer_type = h.connect.host_type;
+ return socket->read(h.connect.authorizer_len);
+ }).then([this] (bufferlist authorizer) {
+ if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
+ return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+ CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
+ }
+ if (require_auth_feature()) {
+ policy.features_required |= CEPH_FEATURE_MSG_AUTH;
+ }
+ if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
+ feat_missing != 0) {
+ return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+ CEPH_MSGR_TAG_FEATURES, bufferlist{});
+ }
+ return dispatcher.ms_verify_authorizer(peer_type,
+ h.connect.authorizer_protocol,
+ authorizer);
+ }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
+ memset(&h.reply, 0, sizeof(h.reply));
+ if (tag) {
+ return send_connect_reply(tag, std::move(authorizer_reply));
+ }
+ if (auto existing = messenger.lookup_conn(peer_addr); existing) {
+ return handle_connect_with_existing(existing, std::move(authorizer_reply));
+ } else if (h.connect.connect_seq > 0) {
+ return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
+ std::move(authorizer_reply));
+ }
+ h.connect_seq = h.connect.connect_seq + 1;
+ h.peer_global_seq = h.connect.global_seq;
+ set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features);
+ // TODO: cct
+ return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
+ });
+}
+
+seastar::future<seastar::stop_iteration>
+SocketConnection::send_connect_reply(msgr_tag_t tag,
+ bufferlist&& authorizer_reply)
+{
+ h.reply.tag = tag;
+ h.reply.features = static_cast<uint64_t>((h.connect.features &
+ policy.features_supported) |
+ policy.features_required);
+ h.reply.authorizer_len = authorizer_reply.length();
+ return socket->write(make_static_packet(h.reply))
+ .then([this, reply=std::move(authorizer_reply)]() mutable {
+ return socket->write_flush(std::move(reply));
+ }).then([] {
+ return stop_t::no;
+ });
+}
+
+seastar::future<seastar::stop_iteration>
+SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
+ bufferlist&& authorizer_reply)
+{
+ h.global_seq = messenger.get_global_seq();
+ h.reply.tag = tag;
+ h.reply.features = policy.features_supported;
+ h.reply.global_seq = h.global_seq;
+ h.reply.connect_seq = h.connect_seq;
+ h.reply.flags = 0;
+ if (policy.lossy) {
+ h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+ h.reply.authorizer_len = authorizer_reply.length();
+ return socket->write(make_static_packet(h.reply))
+ .then([this, reply=std::move(authorizer_reply)]() mutable {
+ if (reply.length()) {
+ return socket->write(std::move(reply));
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
+ return socket->write_flush(make_static_packet(in_seq))
+ .then([this] {
+ return socket->read_exactly(sizeof(seq_num_t));
+ }).then([this] (auto buf) {
+ auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+ discard_up_to(&out_q, *acked_seq);
+ });
+ } else {
+ return socket->flush();
+ }
+ }).then([this] {
+ return stop_t::yes;
+ });
+}
+
+seastar::future<>
+SocketConnection::handle_keepalive2()
+{
+ return socket->read_exactly(sizeof(ceph_timespec))
+ .then([this] (auto buf) {
+ k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
+ seastar::shared_future<> f = send_ready.then([this] {
+ logger().debug("{} keepalive2 {}", *this, k.ack.stamp.tv_sec);
+ return socket->write_flush(make_static_packet(k.ack));
+ });
+ send_ready = f.get_future();
+ return f.get_future();
+ });
+}
+
+seastar::future<>
+SocketConnection::handle_keepalive2_ack()
+{
+ return socket->read_exactly(sizeof(ceph_timespec))
+ .then([this] (auto buf) {
+ auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+ k.ack_stamp = *t;
+ logger().debug("{} keepalive2 ack {}", *this, t->tv_sec);
+ });
+}
+
+seastar::future<seastar::stop_iteration>
+SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
+{
+ if (h.connect.global_seq < existing->peer_global_seq()) {
+ h.reply.global_seq = existing->peer_global_seq();
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
+ } else if (existing->is_lossy()) {
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
+ return replace_existing(existing, std::move(authorizer_reply), true);
+ } else if (h.connect.connect_seq < existing->connect_seq()) {
+ // old attempt, or we sent READY but they didn't get it.
+ h.reply.connect_seq = existing->connect_seq() + 1;
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+ } else if (h.connect.connect_seq == existing->connect_seq()) {
+ // if the existing connection successfully opened, and/or
+ // subsequently went to standby, then the peer should bump
+ // their connect_seq and retry: this is not a connection race
+ // we need to resolve here.
+ if (existing->get_state() == state_t::open ||
+ existing->get_state() == state_t::standby) {
+ if (policy.resetcheck && existing->connect_seq() == 0) {
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else {
+ h.reply.connect_seq = existing->connect_seq() + 1;
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+ }
+ } else if (peer_addr < messenger.get_myaddr() ||
+ existing->is_server_side()) {
+ // incoming wins
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else {
+ return send_connect_reply(CEPH_MSGR_TAG_WAIT);
+ }
+ } else if (policy.resetcheck &&
+ existing->connect_seq() == 0) {
+ return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
+ } else {
+ return replace_existing(existing, std::move(authorizer_reply));
+ }
+}
+
+seastar::future<seastar::stop_iteration>
+SocketConnection::replace_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer)
+{
+ msgr_tag_t reply_tag;
+ if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
+ !is_reset_from_peer) {
+ reply_tag = CEPH_MSGR_TAG_SEQ;
+ } else {
+ reply_tag = CEPH_MSGR_TAG_READY;
+ }
+ messenger.unregister_conn(existing);
+ if (!existing->is_lossy()) {
+ // reset the in_seq if this is a hard reset from peer,
+ // otherwise we respect our original connection's value
+ in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
+ // steal outgoing queue and out_seq
+ existing->requeue_sent();
+ std::tie(out_seq, out_q) = existing->get_out_queue();
+ }
+ return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
+}
+
+seastar::future<seastar::stop_iteration>
+SocketConnection::handle_connect_reply(msgr_tag_t tag)
+{
+ switch (tag) {
+ case CEPH_MSGR_TAG_FEATURES:
+ logger().error("{} connect protocol feature mispatch", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_BADPROTOVER:
+ logger().error("{} connect protocol version mispatch", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_BADAUTHORIZER:
+ logger().error("{} got bad authorizer", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_RESETSESSION:
+ reset_session();
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ case CEPH_MSGR_TAG_RETRY_GLOBAL:
+ h.global_seq = messenger.get_global_seq(h.reply.global_seq);
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ case CEPH_MSGR_TAG_RETRY_SESSION:
+ ceph_assert(h.reply.connect_seq > h.connect_seq);
+ h.connect_seq = h.reply.connect_seq;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ case CEPH_MSGR_TAG_WAIT:
+ // TODO: state wait
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ case CEPH_MSGR_TAG_SEQ:
+ case CEPH_MSGR_TAG_READY:
+ if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
+ missing) {
+ logger().error("{} missing required features", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ return seastar::futurize_apply([this, tag] {
+ if (tag == CEPH_MSGR_TAG_SEQ) {
+ return socket->read_exactly(sizeof(seq_num_t))
+ .then([this] (auto buf) {
+ auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+ discard_up_to(&out_q, *acked_seq);
+ return socket->write_flush(make_static_packet(in_seq));
+ });
+ }
+ // tag CEPH_MSGR_TAG_READY
+ return seastar::now();
+ }).then([this] {
+ // hooray!
+ h.peer_global_seq = h.reply.global_seq;
+ policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
+ h.connect_seq++;
+ h.backoff = 0ms;
+ set_features(h.reply.features & h.connect.features);
+ if (h.authorizer) {
+ session_security.reset(
+ get_auth_session_handler(nullptr,
+ h.authorizer->protocol,
+ h.authorizer->session_key,
+ features));
+ }
+ h.authorizer.reset();
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ });
+ break;
+ default:
+ // unknown tag
+ logger().error("{} got unknown tag", __func__, int(tag));
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+}
+
+void SocketConnection::reset_session()
+{
+ decltype(out_q){}.swap(out_q);
+ decltype(sent){}.swap(sent);
+ in_seq = 0;
+ h.connect_seq = 0;
+ if (HAVE_FEATURE(features, MSG_AUTH)) {
+ // Set out_seq to a random value, so CRC won't be predictable.
+ // Constant to limit starting sequence number to 2^31. Nothing special
+ // about it, just a big number.
+ constexpr uint64_t SEQ_MASK = 0x7fffffff;
+ out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
+ } else {
+ // previously, seq #'s always started at 0.
+ out_seq = 0;
+ }
+}
+
+seastar::future<seastar::stop_iteration>
+SocketConnection::repeat_connect()
+{
+ // encode ceph_msg_connect
+ memset(&h.connect, 0, sizeof(h.connect));
+ h.connect.features = policy.features_supported;
+ h.connect.host_type = messenger.get_myname().type();
+ h.connect.global_seq = h.global_seq;
+ h.connect.connect_seq = h.connect_seq;
+ h.connect.protocol_version = get_proto_version(peer_type, true);
+ // this is fyi, actually, server decides!
+ h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
+
+ return dispatcher.ms_get_authorizer(peer_type)
+ .then([this](auto&& auth) {
+ h.authorizer = std::move(auth);
+ bufferlist bl;
+ if (h.authorizer) {
+ h.connect.authorizer_protocol = h.authorizer->protocol;
+ h.connect.authorizer_len = h.authorizer->bl.length();
+ bl.append(create_static(h.connect));
+ bl.append(h.authorizer->bl);
+ } else {
+ h.connect.authorizer_protocol = 0;
+ h.connect.authorizer_len = 0;
+ bl.append(create_static(h.connect));
+ };
+ return socket->write_flush(std::move(bl));
+ }).then([this] {
+ // read the reply
+ return socket->read(sizeof(h.reply));
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(h.reply, p);
+ ceph_assert(p.end());
+ return socket->read(h.reply.authorizer_len);
+ }).then([this] (bufferlist bl) {
+ if (h.authorizer) {
+ auto reply = bl.cbegin();
+ if (!h.authorizer->verify_reply(reply, nullptr)) {
+ logger().error("{} authorizer failed to verify reply", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ }
+ return handle_connect_reply(h.reply.tag);
+ });
+}
+
+void
+SocketConnection::start_connect(const entity_addr_t& _peer_addr,
+ const entity_type_t& _peer_type)
+{
+ ceph_assert(state == state_t::none);
+ ceph_assert(!socket);
+ peer_addr = _peer_addr;
+ peer_type = _peer_type;
+ messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
+ state = state_t::connecting;
+ seastar::with_gate(pending_dispatch, [this] {
+ return seastar::connect(peer_addr.in4_addr())
+ .then([this](seastar::connected_socket fd) {
+ if (state == state_t::closing) {
+ fd.shutdown_input();
+ fd.shutdown_output();
+ throw std::system_error(make_error_code(error::connection_aborted));
+ }
+ socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
+ // read server's handshake header
+ return socket->read(server_header_size);
+ }).then([this] (bufferlist headerbl) {
+ auto p = headerbl.cbegin();
+ validate_banner(p);
+ entity_addr_t saddr, caddr;
+ ::decode(saddr, p);
+ ::decode(caddr, p);
+ ceph_assert(p.end());
+ validate_peer_addr(saddr, peer_addr);
+
+ side = side_t::connector;
+ socket_port = caddr.get_port();
+ return messenger.learned_addr(caddr);
+ }).then([this] {
+ // encode/send client's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(messenger.get_myaddr(), bl, 0);
+ h.global_seq = messenger.get_global_seq();
+ return socket->write_flush(std::move(bl));
+ }).then([=] {
+ return seastar::repeat([this] {
+ return repeat_connect();
+ });
+ }).then([this] {
+ // notify the dispatcher and allow them to reject the connection
+ return dispatcher.ms_handle_connect(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ }).then([this] {
+ execute_open();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // TODO: handle fault in the connecting state
+ logger().warn("{} connecting fault: {}", *this, eptr);
+ h.promise.set_value();
+ close();
+ });
+ });
+}
+
+void
+SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& sock,
+ const entity_addr_t& _peer_addr)
+{
+ ceph_assert(state == state_t::none);
+ ceph_assert(!socket);
+ peer_addr.u = _peer_addr.u;
+ peer_addr.set_port(0);
+ side = side_t::acceptor;
+ socket_port = _peer_addr.get_port();
+ socket = std::move(sock);
+ messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
+ state = state_t::accepting;
+ seastar::with_gate(pending_dispatch, [this, _peer_addr] {
+ // encode/send server's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(messenger.get_myaddr(), bl, 0);
+ ::encode(_peer_addr, bl, 0);
+ return socket->write_flush(std::move(bl))
+ .then([this] {
+ // read client's handshake header and connect request
+ return socket->read(client_header_size);
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ validate_banner(p);
+ entity_addr_t addr;
+ ::decode(addr, p);
+ ceph_assert(p.end());
+ peer_addr.set_type(addr.get_type());
+ peer_addr.set_port(addr.get_port());
+ peer_addr.set_nonce(addr.get_nonce());
+ return seastar::repeat([this] {
+ return repeat_handle_connect();
+ });
+ }).then([this] {
+ // notify the dispatcher and allow them to reject the connection
+ return dispatcher.ms_handle_accept(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ }).then([this] {
+ messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ execute_open();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // TODO: handle fault in the accepting state
+ logger().warn("{} accepting fault: {}", *this, eptr);
+ h.promise.set_value();
+ close();
+ });
+ });
+}
+
+void
+SocketConnection::execute_open()
+{
+ logger().debug("{} trigger open, was {}", *this, static_cast<int>(state));
+ state = state_t::open;
+ // satisfy the handshake's promise
+ h.promise.set_value();
+ seastar::with_gate(pending_dispatch, [this] {
+ // start background processing of tags
+ return handle_tags()
+ .handle_exception_type([this] (const std::system_error& e) {
+ logger().warn("{} open fault: {}", *this, e);
+ if (e.code() == error::connection_aborted ||
+ e.code() == error::connection_reset) {
+ return dispatcher.ms_handle_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
+ .then([this] {
+ close();
+ });
+ } else if (e.code() == error::read_eof) {
+ return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
+ .then([this] {
+ close();
+ });
+ } else {
+ throw e;
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // TODO: handle fault in the open state
+ logger().warn("{} open fault: {}", *this, eptr);
+ close();
+ });
+ });
+}
+
+seastar::future<> SocketConnection::fault()
+{
+ if (policy.lossy) {
+ messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ }
+ if (h.backoff.count()) {
+ h.backoff += h.backoff;
+ } else {
+ h.backoff = conf.ms_initial_backoff;
+ }
+ if (h.backoff > conf.ms_max_backoff) {
+ h.backoff = conf.ms_max_backoff;
+ }
+ return seastar::sleep(h.backoff);
+}
+
+seastar::shard_id SocketConnection::shard_id() const {
+ return messenger.shard_id();
+}
+
+void SocketConnection::print(ostream& out) const {
+ messenger.print(out);
+ if (side == side_t::none) {
+ out << " >> " << peer_addr;
+ } else if (side == side_t::acceptor) {
+ out << " >> " << peer_addr
+ << "@" << socket_port;
+ } else { // side == side_t::connector
+ out << "@" << socket_port
+ << " >> " << peer_addr;
+ }
+}
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
new file mode 100644
index 00000000..62cc77d5
--- /dev/null
+++ b/src/crimson/net/SocketConnection.h
@@ -0,0 +1,235 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sharded.hh>
+
+#include "msg/Policy.h"
+#include "Connection.h"
+#include "Socket.h"
+#include "crimson/thread/Throttle.h"
+
+class AuthAuthorizer;
+class AuthSessionHandler;
+
+namespace ceph::net {
+
+using stop_t = seastar::stop_iteration;
+
+class SocketMessenger;
+class SocketConnection;
+using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
+
+class SocketConnection : public Connection {
+ SocketMessenger& messenger;
+ seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
+ Dispatcher& dispatcher;
+ seastar::gate pending_dispatch;
+
+ // if acceptor side, socket_port is different from peer_addr.get_port();
+ // if connector side, socket_port is different from my_addr.get_port().
+ enum class side_t {
+ none,
+ acceptor,
+ connector
+ };
+ side_t side = side_t::none;
+ uint16_t socket_port = 0;
+
+ enum class state_t {
+ none,
+ accepting,
+ connecting,
+ open,
+ standby,
+ wait,
+ closing
+ };
+ state_t state = state_t::none;
+
+ /// become valid only when state is state_t::closing
+ seastar::shared_future<> close_ready;
+
+ /// state for handshake
+ struct Handshake {
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ std::unique_ptr<AuthAuthorizer> authorizer;
+ std::chrono::milliseconds backoff;
+ uint32_t connect_seq = 0;
+ uint32_t peer_global_seq = 0;
+ uint32_t global_seq;
+ seastar::promise<> promise;
+ } h;
+
+ /// server side of handshake negotiation
+ seastar::future<stop_t> repeat_handle_connect();
+ seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply);
+ seastar::future<stop_t> replace_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer = false);
+ seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply = {});
+ seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply);
+
+ seastar::future<> handle_keepalive2();
+ seastar::future<> handle_keepalive2_ack();
+
+ bool require_auth_feature() const;
+ uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
+ /// client side of handshake negotiation
+ seastar::future<stop_t> repeat_connect();
+ seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
+ void reset_session();
+
+ /// state for an incoming message
+ struct MessageReader {
+ ceph_msg_header header;
+ ceph_msg_footer footer;
+ bufferlist front;
+ bufferlist middle;
+ bufferlist data;
+ } m;
+
+ seastar::future<> maybe_throttle();
+ seastar::future<> handle_tags();
+ seastar::future<> handle_ack();
+
+ /// becomes available when handshake completes, and when all previous messages
+ /// have been sent to the output stream. send() chains new messages as
+ /// continuations to this future to act as a queue
+ seastar::future<> send_ready;
+
+ /// encode/write a message
+ seastar::future<> write_message(MessageRef msg);
+
+ ceph::net::Policy<ceph::thread::Throttle> policy;
+ uint64_t features;
+ void set_features(uint64_t new_features) {
+ features = new_features;
+ }
+
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+ /// update the seq num of last received message
+ /// @returns true if the @c seq is valid, and @c in_seq is updated,
+ /// false otherwise.
+ bool update_rx_seq(seq_num_t seq);
+
+ seastar::future<> read_message();
+
+ std::unique_ptr<AuthSessionHandler> session_security;
+
+ // messages to be resent after connection gets reset
+ std::queue<MessageRef> out_q;
+ // messages sent, but not yet acked by peer
+ std::queue<MessageRef> sent;
+ static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
+
+ struct Keepalive {
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
+ ceph_timespec stamp;
+ } __attribute__((packed)) req;
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+ ceph_timespec stamp;
+ } __attribute__((packed)) ack;
+ ceph_timespec ack_stamp;
+ } k;
+
+ seastar::future<> fault();
+
+ void execute_open();
+
+ seastar::future<> do_send(MessageRef msg);
+ seastar::future<> do_keepalive();
+ seastar::future<> do_close();
+
+ public:
+ SocketConnection(SocketMessenger& messenger,
+ Dispatcher& dispatcher);
+ ~SocketConnection();
+
+ Messenger* get_messenger() const override;
+
+ int get_peer_type() const override {
+ return peer_type;
+ }
+
+ seastar::future<bool> is_connected() override;
+
+ seastar::future<> send(MessageRef msg) override;
+
+ seastar::future<> keepalive() override;
+
+ seastar::future<> close() override;
+
+ seastar::shard_id shard_id() const override;
+
+ void print(ostream& out) const override;
+
+ public:
+ /// start a handshake from the client's perspective,
+ /// only call when SocketConnection first construct
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type);
+ /// start a handshake from the server's perspective,
+ /// only call when SocketConnection first construct
+ void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
+ const entity_addr_t& peer_addr);
+
+ /// the number of connections initiated in this session, increment when a
+ /// new connection is established
+ uint32_t connect_seq() const {
+ return h.connect_seq;
+ }
+
+ /// the client side should connect us with a gseq. it will be reset with
+ /// the one of exsting connection if it's greater.
+ uint32_t peer_global_seq() const {
+ return h.peer_global_seq;
+ }
+ seq_num_t rx_seq_num() const {
+ return in_seq;
+ }
+
+ /// current state of connection
+ state_t get_state() const {
+ return state;
+ }
+ bool is_server_side() const {
+ return policy.server;
+ }
+ bool is_lossy() const {
+ return policy.lossy;
+ }
+
+ /// move all messages in the sent list back into the queue
+ void requeue_sent();
+
+ std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
+ return {out_seq, std::move(out_q)};
+ }
+};
+
+} // namespace ceph::net
diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc
new file mode 100644
index 00000000..46a38ff7
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.cc
@@ -0,0 +1,283 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "SocketMessenger.h"
+
+#include <tuple>
+#include <boost/functional/hash.hpp>
+
+#include "auth/Auth.h"
+#include "Errors.h"
+#include "Dispatcher.h"
+#include "Socket.h"
+
+using namespace ceph::net;
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_ms);
+ }
+}
+
+SocketMessenger::SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce,
+ int master_sid)
+ : Messenger{myname},
+ master_sid{master_sid},
+ sid{seastar::engine().cpu_id()},
+ logic_name{logic_name},
+ nonce{nonce}
+{}
+
+seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
+{
+ auto my_addrs = addrs;
+ for (auto& addr : my_addrs.v) {
+ addr.nonce = nonce;
+ }
+ return container().invoke_on_all([my_addrs](auto& msgr) {
+ return msgr.Messenger::set_myaddrs(my_addrs);
+ });
+}
+
+seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
+{
+ ceph_assert(addrs.legacy_addr().get_family() == AF_INET);
+ auto my_addrs = addrs;
+ for (auto& addr : my_addrs.v) {
+ addr.nonce = nonce;
+ }
+ logger().info("listening on {}", my_addrs.legacy_addr().in4_addr());
+ return container().invoke_on_all([my_addrs](auto& msgr) {
+ msgr.do_bind(my_addrs);
+ });
+}
+
+seastar::future<>
+SocketMessenger::try_bind(const entity_addrvec_t& addrs,
+ uint32_t min_port, uint32_t max_port)
+{
+ auto addr = addrs.legacy_or_front_addr();
+ if (addr.get_port() != 0) {
+ return bind(addrs);
+ }
+ ceph_assert(min_port <= max_port);
+ return seastar::do_with(uint32_t(min_port),
+ [this, max_port, addr] (auto& port) {
+ return seastar::repeat([this, max_port, addr, &port] {
+ auto to_bind = addr;
+ to_bind.set_port(port);
+ return bind(entity_addrvec_t{to_bind})
+ .then([this] {
+ logger().info("{}: try_bind: done", *this);
+ return stop_t::yes;
+ }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
+ logger().debug("{}: try_bind: {} already used", *this, port);
+ if (port == max_port) {
+ throw e;
+ }
+ ++port;
+ return stop_t::no;
+ });
+ });
+ });
+}
+
+seastar::future<> SocketMessenger::start(Dispatcher *disp) {
+ return container().invoke_on_all([disp](auto& msgr) {
+ return msgr.do_start(disp->get_local_shard());
+ });
+}
+
+seastar::future<ceph::net::ConnectionXRef>
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+{
+ auto shard = locate_shard(peer_addr);
+ return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
+ return msgr.do_connect(peer_addr, peer_type);
+ }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) {
+ return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn));
+ });
+}
+
+seastar::future<> SocketMessenger::shutdown()
+{
+ return container().invoke_on_all([](auto& msgr) {
+ return msgr.do_shutdown();
+ }).finally([this] {
+ return container().invoke_on_all([](auto& msgr) {
+ msgr.shutdown_promise.set_value();
+ });
+ });
+}
+
+void SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+{
+ Messenger::set_myaddrs(addrs);
+
+ // TODO: v2: listen on multiple addresses
+ seastar::socket_address address(addrs.legacy_addr().in4_addr());
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ listener = seastar::listen(address, lo);
+}
+
+seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
+{
+ dispatcher = disp;
+
+ // start listening if bind() was called
+ if (listener) {
+ seastar::keep_doing([this] {
+ return listener->accept()
+ .then([this] (seastar::connected_socket socket,
+ seastar::socket_address paddr) {
+ // allocate the connection
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ auto shard = locate_shard(peer_addr);
+#warning fixme
+ // we currently do dangerous i/o from a Connection core, different from the Socket core.
+ auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket)));
+ // don't wait before accepting another
+ container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable {
+ SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher);
+ conn->start_accept(std::move(sock), peer_addr);
+ });
+ });
+ }).handle_exception_type([this] (const std::system_error& e) {
+ // stop gracefully on connection_aborted
+ if (e.code() != error::connection_aborted) {
+ logger().error("{} unexpected error during accept: {}", *this, e);
+ }
+ });
+ }
+
+ return seastar::now();
+}
+
+seastar::foreign_ptr<ceph::net::ConnectionRef>
+SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+{
+ if (auto found = lookup_conn(peer_addr); found) {
+ return seastar::make_foreign(found->shared_from_this());
+ }
+ SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher);
+ conn->start_connect(peer_addr, peer_type);
+ return seastar::make_foreign(conn->shared_from_this());
+}
+
+seastar::future<> SocketMessenger::do_shutdown()
+{
+ if (listener) {
+ listener->abort_accept();
+ }
+ // close all connections
+ return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
+ return conn->close();
+ }).then([this] {
+ ceph_assert(accepting_conns.empty());
+ return seastar::parallel_for_each(connections, [] (auto conn) {
+ return conn.second->close();
+ });
+ }).finally([this] {
+ ceph_assert(connections.empty());
+ });
+}
+
+seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+{
+ if (!get_myaddr().is_blank_ip()) {
+ // already learned or binded
+ return seastar::now();
+ }
+
+ // Only learn IP address if blank.
+ entity_addr_t addr = get_myaddr();
+ addr.u = peer_addr_for_me.u;
+ addr.set_type(peer_addr_for_me.get_type());
+ addr.set_port(get_myaddr().get_port());
+ return set_myaddrs(entity_addrvec_t{addr});
+}
+
+void SocketMessenger::set_default_policy(const SocketPolicy& p)
+{
+ policy_set.set_default(p);
+}
+
+void SocketMessenger::set_policy(entity_type_t peer_type,
+ const SocketPolicy& p)
+{
+ policy_set.set(peer_type, p);
+}
+
+void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
+ Throttle* throttle)
+{
+ // only byte throttler is used in OSD
+ policy_set.set_throttlers(peer_type, throttle, nullptr);
+}
+
+seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr)
+{
+ ceph_assert(addr.get_family() == AF_INET);
+ if (master_sid >= 0) {
+ return master_sid;
+ }
+ std::size_t seed = 0;
+ boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr);
+ //boost::hash_combine(seed, addr.u.sin.sin_port);
+ //boost::hash_combine(seed, addr.nonce);
+ return seed % seastar::smp::count;
+}
+
+ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
+{
+ if (auto found = connections.find(addr);
+ found != connections.end()) {
+ return found->second;
+ } else {
+ return nullptr;
+ }
+}
+
+void SocketMessenger::accept_conn(SocketConnectionRef conn)
+{
+ accepting_conns.insert(conn);
+}
+
+void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
+{
+ accepting_conns.erase(conn);
+}
+
+void SocketMessenger::register_conn(SocketConnectionRef conn)
+{
+ if (master_sid >= 0) {
+ ceph_assert(static_cast<int>(sid) == master_sid);
+ }
+ auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
+ std::ignore = i;
+ ceph_assert(added);
+}
+
+void SocketMessenger::unregister_conn(SocketConnectionRef conn)
+{
+ ceph_assert(conn);
+ auto found = connections.find(conn->get_peer_addr());
+ ceph_assert(found != connections.end());
+ ceph_assert(found->second == conn);
+ connections.erase(found);
+}
diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h
new file mode 100644
index 00000000..535dea3a
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.h
@@ -0,0 +1,119 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <map>
+#include <optional>
+#include <set>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+
+#include "Messenger.h"
+#include "SocketConnection.h"
+
+namespace ceph::net {
+
+class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
+ const int master_sid;
+ const seastar::shard_id sid;
+ seastar::promise<> shutdown_promise;
+
+ std::optional<seastar::server_socket> listener;
+ Dispatcher *dispatcher = nullptr;
+ std::map<entity_addr_t, SocketConnectionRef> connections;
+ std::set<SocketConnectionRef> accepting_conns;
+ ceph::net::PolicySet<Throttle> policy_set;
+ // Distinguish messengers with meaningful names for debugging
+ const std::string logic_name;
+ const uint32_t nonce;
+
+ seastar::future<> accept(seastar::connected_socket socket,
+ seastar::socket_address paddr);
+
+ void do_bind(const entity_addrvec_t& addr);
+ seastar::future<> do_start(Dispatcher *disp);
+ seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type);
+ seastar::future<> do_shutdown();
+ // conn sharding options:
+ // 0. Compatible (master_sid >= 0): place all connections to one master shard
+ // 1. Simplest (master_sid < 0): sharded by ip only
+ // 2. Balanced (not implemented): sharded by ip + port + nonce,
+ // but, need to move SocketConnection between cores.
+ seastar::shard_id locate_shard(const entity_addr_t& addr);
+
+ public:
+ SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce,
+ int master_sid);
+
+ seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
+
+ // Messenger interfaces are assumed to be called from its own shard, but its
+ // behavior should be symmetric when called from any shard.
+ seastar::future<> bind(const entity_addrvec_t& addr) override;
+
+ seastar::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) override;
+
+ seastar::future<> start(Dispatcher *dispatcher) override;
+
+ seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) override;
+ // can only wait once
+ seastar::future<> wait() override {
+ return shutdown_promise.get_future();
+ }
+
+ seastar::future<> shutdown() override;
+
+ Messenger* get_local_shard() override {
+ return &container().local();
+ }
+
+ void print(ostream& out) const override {
+ out << get_myname()
+ << "(" << logic_name
+ << ") " << get_myaddr();
+ }
+
+ void set_default_policy(const SocketPolicy& p) override;
+
+ void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
+
+ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
+
+ public:
+ seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
+
+ SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+ void accept_conn(SocketConnectionRef);
+ void unaccept_conn(SocketConnectionRef);
+ void register_conn(SocketConnectionRef);
+ void unregister_conn(SocketConnectionRef);
+
+ // required by sharded<>
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+
+ seastar::shard_id shard_id() const {
+ return sid;
+ }
+};
+
+} // namespace ceph::net