summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/SocketMessenger.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net/SocketMessenger.h')
-rw-r--r--src/crimson/net/SocketMessenger.h122
1 files changed, 122 insertions, 0 deletions
diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h
new file mode 100644
index 000000000..44c1d3c21
--- /dev/null
+++ b/src/crimson/net/SocketMessenger.h
@@ -0,0 +1,122 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <map>
+#include <set>
+#include <vector>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "crimson/net/chained_dispatchers.h"
+#include "Messenger.h"
+#include "SocketConnection.h"
+
+namespace crimson::net {
+
+class FixedCPUServerSocket;
+
+class SocketMessenger final : public Messenger {
+ const seastar::shard_id master_sid;
+ seastar::promise<> shutdown_promise;
+
+ FixedCPUServerSocket* listener = nullptr;
+ ChainedDispatchers dispatchers;
+ std::map<entity_addr_t, SocketConnectionRef> connections;
+ std::set<SocketConnectionRef> accepting_conns;
+ std::vector<SocketConnectionRef> closing_conns;
+ ceph::net::PolicySet<Throttle> policy_set;
+ // Distinguish messengers with meaningful names for debugging
+ const std::string logic_name;
+ const uint32_t nonce;
+ // specifying we haven't learned our addr; set false when we find it.
+ bool need_addr = true;
+ uint32_t global_seq = 0;
+ bool started = false;
+
+ bind_ertr::future<> do_bind(const entity_addrvec_t& addr);
+
+ public:
+ SocketMessenger(const entity_name_t& myname,
+ const std::string& logic_name,
+ uint32_t nonce);
+ ~SocketMessenger() override { ceph_assert(!listener); }
+
+ seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
+
+ // Messenger interfaces are assumed to be called from its own shard, but its
+ // behavior should be symmetric when called from any shard.
+ bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
+
+ bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) override;
+
+ seastar::future<> start(const dispatchers_t& dispatchers) override;
+
+ ConnectionRef connect(const entity_addr_t& peer_addr,
+ const entity_name_t& peer_name) override;
+ // can only wait once
+ seastar::future<> wait() override {
+ assert(seastar::this_shard_id() == master_sid);
+ return shutdown_promise.get_future();
+ }
+
+ void stop() override {
+ dispatchers.clear();
+ }
+
+ bool is_started() const override {
+ return !dispatchers.empty();
+ }
+
+ seastar::future<> shutdown() override;
+
+ void print(ostream& out) const override {
+ out << get_myname()
+ << "(" << logic_name
+ << ") " << get_myaddr();
+ }
+
+ SocketPolicy get_policy(entity_type_t peer_type) const override;
+
+ SocketPolicy get_default_policy() const override;
+
+ void set_default_policy(const SocketPolicy& p) override;
+
+ void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
+
+ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
+
+ public:
+ seastar::future<uint32_t> get_global_seq(uint32_t old=0);
+ seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn);
+
+ SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+ void accept_conn(SocketConnectionRef);
+ void unaccept_conn(SocketConnectionRef);
+ void register_conn(SocketConnectionRef);
+ void unregister_conn(SocketConnectionRef);
+ void closing_conn(SocketConnectionRef);
+ void closed_conn(SocketConnectionRef);
+ seastar::shard_id shard_id() const {
+ assert(seastar::this_shard_id() == master_sid);
+ return master_sid;
+ }
+};
+
+} // namespace crimson::net