summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/SocketMessenger.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/net/SocketMessenger.h
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net/SocketMessenger.h')
-rw-r--r--src/crimson/net/SocketMessenger.h192
1 files changed, 192 insertions, 0 deletions
diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h
new file mode 100644
index 000000000..e4ac63184
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.h
@@ -0,0 +1,192 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <map>
+#include <set>
+#include <vector>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "crimson/net/chained_dispatchers.h"
+#include "Messenger.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+
+namespace crimson::net {
+
+class ShardedServerSocket;
+
+class SocketMessenger final : public Messenger {
+// Messenger public interfaces
+public:
+ SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce,
+ bool dispatch_only_on_this_shard);
+
+ ~SocketMessenger() override;
+
+ const entity_name_t &get_myname() const override {
+ return my_name;
+ }
+
+ const entity_addrvec_t &get_myaddrs() const override {
+ return my_addrs;
+ }
+
+ void set_myaddrs(const entity_addrvec_t& addr) override;
+
+ bool set_addr_unknowns(const entity_addrvec_t &addr) override;
+
+ void set_auth_client(crimson::auth::AuthClient *ac) override {
+ assert(seastar::this_shard_id() == sid);
+ auth_client = ac;
+ }
+
+ void set_auth_server(crimson::auth::AuthServer *as) override {
+ assert(seastar::this_shard_id() == sid);
+ auth_server = as;
+ }
+
+ bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
+
+ seastar::future<> start(const dispatchers_t& dispatchers) override;
+
+ ConnectionRef connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) override;
+
+ bool owns_connection(Connection &conn) const override {
+ assert(seastar::this_shard_id() == sid);
+ return this == &static_cast<SocketConnection&>(conn).get_messenger();
+ }
+
+ // can only wait once
+ seastar::future<> wait() override {
+ assert(seastar::this_shard_id() == sid);
+ return shutdown_promise.get_future();
+ }
+
+ void stop() override {
+ assert(seastar::this_shard_id() == sid);
+ dispatchers.clear();
+ }
+
+ bool is_started() const override {
+ assert(seastar::this_shard_id() == sid);
+ return !dispatchers.empty();
+ }
+
+ seastar::future<> shutdown() override;
+
+ void print(std::ostream& out) const override {
+ out << get_myname()
+ << "(" << logic_name
+ << ") " << get_myaddr();
+ }
+
+ SocketPolicy get_policy(entity_type_t peer_type) const override;
+
+ SocketPolicy get_default_policy() const override;
+
+ void set_default_policy(const SocketPolicy& p) override;
+
+ void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
+
+ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
+
+// SocketMessenger public interfaces
+public:
+ crimson::auth::AuthClient* get_auth_client() const {
+ assert(seastar::this_shard_id() == sid);
+ return auth_client;
+ }
+
+ crimson::auth::AuthServer* get_auth_server() const {
+ assert(seastar::this_shard_id() == sid);
+ return auth_server;
+ }
+
+ uint32_t get_global_seq(uint32_t old=0);
+
+ void learned_addr(const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn);
+
+ SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+
+ void accept_conn(SocketConnectionRef);
+
+ void unaccept_conn(SocketConnectionRef);
+
+ void register_conn(SocketConnectionRef);
+
+ void unregister_conn(SocketConnectionRef);
+
+ void closing_conn(SocketConnectionRef);
+
+ void closed_conn(SocketConnectionRef);
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+#ifdef UNIT_TESTS_BUILT
+ void set_interceptor(Interceptor *i) override {
+ interceptor = i;
+ }
+
+ Interceptor *interceptor = nullptr;
+#endif
+
+private:
+ seastar::future<> accept(SocketFRef &&, const entity_addr_t &);
+
+ listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
+
+ /// try to bind to the first unused port of given address
+ bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port);
+
+ const seastar::shard_id sid;
+ // Distinguish messengers with meaningful names for debugging
+ const std::string logic_name;
+ const uint32_t nonce;
+ const bool dispatch_only_on_sid;
+
+ entity_name_t my_name;
+ entity_addrvec_t my_addrs;
+ crimson::auth::AuthClient* auth_client = nullptr;
+ crimson::auth::AuthServer* auth_server = nullptr;
+
+ ShardedServerSocket *listener = nullptr;
+ ChainedDispatchers dispatchers;
+ std::map<entity_addr_t, SocketConnectionRef> connections;
+ std::set<SocketConnectionRef> accepting_conns;
+ std::vector<SocketConnectionRef> closing_conns;
+ ceph::net::PolicySet<Throttle> policy_set;
+ // specifying we haven't learned our addr; set false when we find it.
+ bool need_addr = true;
+ uint32_t global_seq = 0;
+ bool started = false;
+ seastar::promise<> shutdown_promise;
+};
+
+} // namespace crimson::net
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::net::SocketMessenger> : fmt::ostream_formatter {};
+#endif