summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Socket.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net/Socket.h')
-rw-r--r--src/crimson/net/Socket.h268
1 files changed, 268 insertions, 0 deletions
diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h
new file mode 100644
index 000000000..d39a2517f
--- /dev/null
+++ b/src/crimson/net/Socket.h
@@ -0,0 +1,268 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/net/packet.hh>
+
+#include "include/buffer.h"
+
+#include "crimson/common/log.h"
+#include "Errors.h"
+#include "Fwd.h"
+
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
+namespace crimson::net {
+
+class Socket;
+using SocketRef = std::unique_ptr<Socket>;
+
+class Socket
+{
+ struct construct_tag {};
+
+ public:
+ // if acceptor side, peer is using a different port (ephemeral_port)
+ // if connector side, I'm using a different port (ephemeral_port)
+ enum class side_t {
+ acceptor,
+ connector
+ };
+
+ Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
+ : sid{seastar::this_shard_id()},
+ socket(std::move(_socket)),
+ in(socket.input()),
+ // the default buffer size 8192 is too small that may impact our write
+ // performance. see seastar::net::connected_socket::output()
+ out(socket.output(65536)),
+ side(_side),
+ ephemeral_port(e_port) {}
+
+ ~Socket() {
+#ifndef NDEBUG
+ assert(closed);
+#endif
+ }
+
+ Socket(Socket&& o) = delete;
+
+ static seastar::future<SocketRef>
+ connect(const entity_addr_t& peer_addr) {
+ return seastar::connect(peer_addr.in4_addr()
+ ).then([] (seastar::connected_socket socket) {
+ return std::make_unique<Socket>(
+ std::move(socket), side_t::connector, 0, construct_tag{});
+ });
+ }
+
+ /// read the requested number of bytes into a bufferlist
+ seastar::future<bufferlist> read(size_t bytes);
+ using tmp_buf = seastar::temporary_buffer<char>;
+ using packet = seastar::net::packet;
+ seastar::future<tmp_buf> read_exactly(size_t bytes);
+
+ seastar::future<> write(packet&& buf) {
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
+#endif
+ return out.write(std::move(buf));
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+ }
+ seastar::future<> flush() {
+ return out.flush();
+ }
+ seastar::future<> write_flush(packet&& buf) {
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
+#endif
+ return out.write(std::move(buf)).then([this] { return out.flush(); });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+ }
+
+ // preemptively disable further reads or writes, can only be shutdown once.
+ void shutdown();
+
+ /// Socket can only be closed once.
+ seastar::future<> close();
+
+ // shutdown input_stream only, for tests
+ void force_shutdown_in() {
+ socket.shutdown_input();
+ }
+
+ // shutdown output_stream only, for tests
+ void force_shutdown_out() {
+ socket.shutdown_output();
+ }
+
+ side_t get_side() const {
+ return side;
+ }
+
+ uint16_t get_ephemeral_port() const {
+ return ephemeral_port;
+ }
+
+ // learn my ephemeral_port as connector.
+ // unfortunately, there's no way to identify which port I'm using as
+ // connector with current seastar interface.
+ void learn_ephemeral_port_as_connector(uint16_t port) {
+ assert(side == side_t::connector &&
+ (ephemeral_port == 0 || ephemeral_port == port));
+ ephemeral_port = port;
+ }
+
+ private:
+ const seastar::shard_id sid;
+ seastar::connected_socket socket;
+ seastar::input_stream<char> in;
+ seastar::output_stream<char> out;
+ side_t side;
+ uint16_t ephemeral_port;
+
+#ifndef NDEBUG
+ bool closed = false;
+#endif
+
+ /// buffer state for read()
+ struct {
+ bufferlist buffer;
+ size_t remaining;
+ } r;
+
+#ifdef UNIT_TESTS_BUILT
+ public:
+ void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
+
+ private:
+ bp_action_t next_trap_read = bp_action_t::CONTINUE;
+ bp_action_t next_trap_write = bp_action_t::CONTINUE;
+ socket_blocker* blocker = nullptr;
+ seastar::future<> try_trap_pre(bp_action_t& trap);
+ seastar::future<> try_trap_post(bp_action_t& trap);
+
+#endif
+ friend class FixedCPUServerSocket;
+};
+
+class FixedCPUServerSocket
+ : public seastar::peering_sharded_service<FixedCPUServerSocket> {
+ const seastar::shard_id cpu;
+ entity_addr_t addr;
+ std::optional<seastar::server_socket> listener;
+ seastar::gate shutdown_gate;
+
+ using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+ std::unique_ptr<sharded_service_t> service;
+
+ struct construct_tag {};
+
+ static seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+
+ seastar::future<> reset() {
+ return container().invoke_on_all([] (auto& ss) {
+ assert(ss.shutdown_gate.is_closed());
+ ss.shutdown_gate = seastar::gate();
+ ss.addr = entity_addr_t();
+ ss.listener.reset();
+ });
+ }
+
+public:
+ FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
+ ~FixedCPUServerSocket() {
+ assert(!listener);
+ // detect whether user have called destroy() properly
+ ceph_assert(!service);
+ }
+
+ FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
+ FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
+ FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+
+ using listen_ertr = crimson::errorator<
+ crimson::ct_error::address_in_use // The address is already bound
+ >;
+ listen_ertr::future<> listen(entity_addr_t addr);
+
+ // fn_accept should be a nothrow function of type
+ // seastar::future<>(SocketRef, entity_addr_t)
+ template <typename Func>
+ seastar::future<> accept(Func&& fn_accept) {
+ assert(seastar::this_shard_id() == cpu);
+ logger().trace("FixedCPUServerSocket({})::accept()...", addr);
+ return container().invoke_on_all(
+ [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
+ assert(ss.listener);
+ // gate accepting
+ // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+ // so ignore the returned future
+ std::ignore = seastar::with_gate(ss.shutdown_gate,
+ [&ss, fn_accept = std::move(fn_accept)] () mutable {
+ return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
+ return ss.listener->accept().then(
+ [&ss, fn_accept = std::move(fn_accept)]
+ (seastar::accept_result accept_result) mutable {
+ // assert seastar::listen_options::set_fixed_cpu() works
+ assert(seastar::this_shard_id() == ss.cpu);
+ auto [socket, paddr] = std::move(accept_result);
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ peer_addr.set_type(entity_addr_t::TYPE_ANY);
+ SocketRef _socket = std::make_unique<Socket>(
+ std::move(socket), Socket::side_t::acceptor,
+ peer_addr.get_port(), Socket::construct_tag{});
+ std::ignore = seastar::with_gate(ss.shutdown_gate,
+ [socket = std::move(_socket), peer_addr,
+ &ss, fn_accept = std::move(fn_accept)] () mutable {
+ logger().trace("FixedCPUServerSocket({})::accept(): "
+ "accepted peer {}", ss.addr, peer_addr);
+ return fn_accept(std::move(socket), peer_addr
+ ).handle_exception([&ss, peer_addr] (auto eptr) {
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "fn_accept(s, {}) got unexpected exception {}",
+ ss.addr, peer_addr, eptr);
+ ceph_abort();
+ });
+ });
+ });
+ }).handle_exception_type([&ss] (const std::system_error& e) {
+ if (e.code() == std::errc::connection_aborted ||
+ e.code() == std::errc::invalid_argument) {
+ logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
+ ss.addr, e);
+ } else {
+ throw;
+ }
+ }).handle_exception([&ss] (auto eptr) {
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "got unexpected exception {}", ss.addr, eptr);
+ ceph_abort();
+ });
+ });
+ });
+ }
+
+ seastar::future<> shutdown();
+ seastar::future<> destroy();
+ static seastar::future<FixedCPUServerSocket*> create();
+};
+
+} // namespace crimson::net