diff options
Diffstat (limited to '')
-rw-r--r-- | src/crimson/net/SocketConnection.h | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h new file mode 100644 index 000000000..823d6c574 --- /dev/null +++ b/src/crimson/net/SocketConnection.h @@ -0,0 +1,236 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/sharded.hh> + +#include "msg/Policy.h" +#include "crimson/common/throttle.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Socket.h" + +namespace crimson::net { + +class ProtocolV2; +class SocketMessenger; +class SocketConnection; +using SocketConnectionRef = seastar::shared_ptr<SocketConnection>; + +#ifdef UNIT_TESTS_BUILT +class Interceptor; +#endif + +/** + * ConnectionHandler + * + * The interface class to implement Connection, called by SocketConnection. + * + * The operations must be done in get_shard_id(). + */ +class ConnectionHandler { +public: + using clock_t = seastar::lowres_system_clock; + + virtual ~ConnectionHandler() = default; + + ConnectionHandler(const ConnectionHandler &) = delete; + ConnectionHandler(ConnectionHandler &&) = delete; + ConnectionHandler &operator=(const ConnectionHandler &) = delete; + ConnectionHandler &operator=(ConnectionHandler &&) = delete; + + virtual seastar::shard_id get_shard_id() const = 0; + + virtual bool is_connected() const = 0; + + virtual seastar::future<> send(MessageFRef) = 0; + + virtual seastar::future<> send_keepalive() = 0; + + virtual clock_t::time_point get_last_keepalive() const = 0; + + virtual clock_t::time_point get_last_keepalive_ack() const = 0; + + virtual void set_last_keepalive_ack(clock_t::time_point) = 0; + + virtual void mark_down() = 0; + +protected: + ConnectionHandler() = default; +}; + +class SocketConnection : public Connection { + /* + * Connection interfaces, public to users + * Working in ConnectionHandler::get_shard_id() + */ + public: + SocketConnection(SocketMessenger& messenger, + ChainedDispatchers& dispatchers); + + ~SocketConnection() override; + + const seastar::shard_id get_shard_id() const override { + return io_handler->get_shard_id(); + } + + const entity_name_t &get_peer_name() const override { + return peer_name; + } + + const entity_addr_t &get_peer_addr() const override { + return peer_addr; + } + + const entity_addr_t &get_peer_socket_addr() const override { + return target_addr; + } + + uint64_t get_features() const override { + return features; + } + + bool is_connected() const override; + + seastar::future<> send(MessageURef msg) override; + + seastar::future<> send_keepalive() override; + + clock_t::time_point get_last_keepalive() const override; + + clock_t::time_point get_last_keepalive_ack() const override; + + void set_last_keepalive_ack(clock_t::time_point when) override; + + void mark_down() override; + + bool has_user_private() const override { + return user_private != nullptr; + } + + user_private_t &get_user_private() override { + assert(has_user_private()); + return *user_private; + } + + void set_user_private(std::unique_ptr<user_private_t> new_user_private) override { + assert(!has_user_private()); + user_private = std::move(new_user_private); + } + + void print(std::ostream& out) const override; + + /* + * Public to SocketMessenger + * Working in SocketMessenger::get_shard_id(); + */ + public: + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name); + + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + void start_accept(SocketFRef&& socket, + const entity_addr_t& peer_addr); + + seastar::future<> close_clean_yielded(); + + seastar::socket_address get_local_address() const; + + seastar::shard_id get_messenger_shard_id() const; + + SocketMessenger &get_messenger() const; + + ConnectionRef get_local_shared_foreign_from_this(); + +private: + void set_peer_type(entity_type_t peer_type); + + void set_peer_id(int64_t peer_id); + + void set_peer_name(entity_name_t name) { + set_peer_type(name.type()); + set_peer_id(name.num()); + } + + void set_features(uint64_t f); + + void set_socket(Socket *s); + +#ifdef UNIT_TESTS_BUILT + bool is_protocol_ready() const override; + + bool is_protocol_standby() const override; + + bool is_protocol_closed_clean() const override; + + bool is_protocol_closed() const override; + + // peer wins if myaddr > peeraddr + bool peer_wins() const override; + + Interceptor *interceptor = nullptr; +#else + // peer wins if myaddr > peeraddr + bool peer_wins() const; +#endif + +private: + const seastar::shard_id msgr_sid; + + /* + * Core owner is messenger core, may allow to access from the I/O core. + */ + SocketMessenger& messenger; + + std::unique_ptr<ProtocolV2> protocol; + + Socket *socket = nullptr; + + entity_name_t peer_name = {0, entity_name_t::NEW}; + + entity_addr_t peer_addr; + + // which of the peer_addrs we're connecting to (as client) + // or should reconnect to (as peer) + entity_addr_t target_addr; + + uint64_t features = 0; + + ceph::net::Policy<crimson::common::Throttle> policy; + + uint64_t peer_global_id = 0; + + /* + * Core owner is I/O core (mutable). + */ + std::unique_ptr<ConnectionHandler> io_handler; + + /* + * Core owner is up to the connection user. + */ + std::unique_ptr<user_private_t> user_private; + + friend class IOHandler; + friend class ProtocolV2; + friend class FrameAssemblerV2; +}; + +} // namespace crimson::net + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::net::SocketConnection> : fmt::ostream_formatter {}; +#endif |