summaryrefslogtreecommitdiffstats
path: root/src/msg/simple/SimpleMessenger.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/simple/SimpleMessenger.h
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/msg/simple/SimpleMessenger.h414
1 files changed, 414 insertions, 0 deletions
diff --git a/src/msg/simple/SimpleMessenger.h b/src/msg/simple/SimpleMessenger.h
new file mode 100644
index 00000000..b1aad539
--- /dev/null
+++ b/src/msg/simple/SimpleMessenger.h
@@ -0,0 +1,414 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SIMPLEMESSENGER_H
+#define CEPH_SIMPLEMESSENGER_H
+
+#include <list>
+#include <map>
+
+#include "include/types.h"
+#include "include/xlist.h"
+
+#include "include/unordered_map.h"
+#include "include/unordered_set.h"
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+#include "common/Throttle.h"
+
+#include "include/spinlock.h"
+
+#include "msg/SimplePolicyMessenger.h"
+#include "msg/Message.h"
+#include "include/ceph_assert.h"
+
+#include "msg/DispatchQueue.h"
+#include "Pipe.h"
+#include "Accepter.h"
+
+/*
+ * This class handles transmission and reception of messages. Generally
+ * speaking, there are several major components:
+ *
+ * - Connection
+ * Each logical session is associated with a Connection.
+ * - Pipe
+ * Each network connection is handled through a pipe, which handles
+ * the input and output of each message. There is normally a 1:1
+ * relationship between Pipe and Connection, but logical sessions may
+ * get handed off between Pipes when sockets reconnect or during
+ * connection races.
+ * - IncomingQueue
+ * Incoming messages are associated with an IncomingQueue, and there
+ * is one such queue associated with each Pipe.
+ * - DispatchQueue
+ * IncomingQueues get queued in the DIspatchQueue, which is responsible
+ * for doing a round-robin sweep and processing them via a worker thread.
+ * - SimpleMessenger
+ * It's the exterior class passed to the external message handler and
+ * most of the API details.
+ *
+ * Lock ordering:
+ *
+ * SimpleMessenger::lock
+ * Pipe::pipe_lock
+ * DispatchQueue::lock
+ * IncomingQueue::lock
+ */
+
+class SimpleMessenger : public SimplePolicyMessenger {
+ // First we have the public Messenger interface implementation...
+public:
+ /**
+ * Initialize the SimpleMessenger!
+ *
+ * @param cct The CephContext to use
+ * @param name The name to assign ourselves
+ * _nonce A unique ID to use for this SimpleMessenger. It should not
+ * be a value that will be repeated if the daemon restarts.
+ * features The local features bits for the local_connection
+ */
+ SimpleMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t _nonce);
+
+ /**
+ * Destroy the SimpleMessenger. Pretty simple since all the work is done
+ * elsewhere.
+ */
+ ~SimpleMessenger() override;
+
+ /** @defgroup Accessors
+ * @{
+ */
+ bool set_addr_unknowns(const entity_addrvec_t& addr) override;
+ void set_addrs(const entity_addrvec_t &addr) override;
+ void set_myaddrs(const entity_addrvec_t& a) override;
+
+ int get_dispatch_queue_len() override {
+ return dispatch_queue.get_queue_len();
+ }
+
+ double get_dispatch_queue_max_age(utime_t now) override {
+ return dispatch_queue.get_max_age(now);
+ }
+ /** @} Accessors */
+
+ /**
+ * @defgroup Configuration functions
+ * @{
+ */
+ void set_cluster_protocol(int p) override {
+ ceph_assert(!started && !did_bind);
+ cluster_protocol = p;
+ }
+
+ int bind(const entity_addr_t& bind_addr) override;
+ int rebind(const set<int>& avoid_ports) override;
+ int client_bind(const entity_addr_t& bind_addr) override;
+
+ /** @} Configuration functions */
+
+ /**
+ * @defgroup Startup/Shutdown
+ * @{
+ */
+ int start() override;
+ void wait() override;
+ int shutdown() override;
+
+ /** @} // Startup/Shutdown */
+
+ /**
+ * @defgroup Messaging
+ * @{
+ */
+ int send_to(
+ Message *m,
+ int type,
+ const entity_addrvec_t& addr) override {
+ // temporary
+ return _send_message(m, entity_inst_t(entity_name_t(type, -1),
+ addr.legacy_addr()));
+ }
+
+ int send_message(Message *m, Connection *con) {
+ return _send_message(m, con);
+ }
+
+ /** @} // Messaging */
+
+ /**
+ * @defgroup Connection Management
+ * @{
+ */
+ ConnectionRef connect_to(int type, const entity_addrvec_t& addrs) override;
+ ConnectionRef get_loopback_connection() override;
+ int send_keepalive(Connection *con);
+ void mark_down(const entity_addr_t& addr) override;
+ void mark_down(Connection *con);
+ void mark_disposable(Connection *con);
+ void mark_down_all() override;
+ /** @} // Connection Management */
+protected:
+ /**
+ * @defgroup Messenger Interfaces
+ * @{
+ */
+ /**
+ * Start up the DispatchQueue thread once we have somebody to dispatch to.
+ */
+ void ready() override;
+ /** @} // Messenger Interfaces */
+private:
+ /**
+ * @defgroup Inner classes
+ * @{
+ */
+
+public:
+ Accepter accepter;
+ DispatchQueue dispatch_queue;
+
+ friend class Accepter;
+
+ /**
+ * Register a new pipe for accept
+ *
+ * @param sd socket
+ */
+ Pipe *add_accept_pipe(int sd);
+
+private:
+
+ /**
+ * A thread used to tear down Pipes when they're complete.
+ */
+ class ReaperThread : public Thread {
+ SimpleMessenger *msgr;
+ public:
+ explicit ReaperThread(SimpleMessenger *m) : msgr(m) {}
+ void *entry() override {
+ msgr->reaper_entry();
+ return 0;
+ }
+ } reaper_thread;
+
+ /**
+ * @} // Inner classes
+ */
+
+ /**
+ * @defgroup Utility functions
+ * @{
+ */
+
+ /**
+ * Create a Pipe associated with the given entity (of the given type).
+ * Initiate the connection. (This function returning does not guarantee
+ * connection success.)
+ *
+ * @param addr The address of the entity to connect to.
+ * @param type The peer type of the entity at the address.
+ * @param con An existing Connection to associate with the new Pipe. If
+ * NULL, it creates a new Connection.
+ * @param first an initial message to queue on the new pipe
+ *
+ * @return a pointer to the newly-created Pipe. Caller does not own a
+ * reference; take one if you need it.
+ */
+ Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
+ Message *first);
+ /**
+ * Send a message, lazily or not.
+ * This just glues send_message together and passes
+ * the input on to submit_message.
+ */
+ int _send_message(Message *m, const entity_inst_t& dest);
+ /**
+ * Same as above, but for the Connection-based variants.
+ */
+ int _send_message(Message *m, Connection *con);
+ /**
+ * Queue up a Message for delivery to the entity specified
+ * by addr and dest_type.
+ * submit_message() is responsible for creating
+ * new Pipes (and closing old ones) as necessary.
+ *
+ * @param m The Message to queue up. This function eats a reference.
+ * @param con The existing Connection to use, or NULL if you don't know of one.
+ * @param addr The address to send the Message to.
+ * @param dest_type The peer type of the address we're sending to
+ * just drop silently under failure.
+ * @param already_locked If false, submit_message() will acquire the
+ * SimpleMessenger lock before accessing shared data structures; otherwise
+ * it will assume the lock is held. NOTE: if you are making a request
+ * without locking, you MUST have filled in the con with a valid pointer.
+ */
+ void submit_message(Message *m, PipeConnection *con,
+ const entity_addr_t& addr, int dest_type,
+ bool already_locked);
+ /**
+ * Look through the pipes in the pipe_reap_queue and tear them down.
+ */
+ void reaper();
+ /**
+ * @} // Utility functions
+ */
+
+ // SimpleMessenger stuff
+ /// approximately unique ID set by the Constructor for use in entity_addr_t
+ uint64_t nonce;
+ /// overall lock used for SimpleMessenger data structures
+ Mutex lock;
+ /// true, specifying we haven't learned our addr; set false when we find it.
+ // maybe this should be protected by the lock?
+ bool need_addr;
+
+public:
+ bool get_need_addr() const { return need_addr; }
+
+private:
+ /**
+ * false; set to true if the SimpleMessenger bound to a specific address;
+ * and set false again by Accepter::stop(). This isn't lock-protected
+ * since you shouldn't be able to race the only writers.
+ */
+ bool did_bind;
+ /// counter for the global seq our connection protocol uses
+ __u32 global_seq;
+ /// lock to protect the global_seq
+ ceph::spinlock global_seq_lock;
+
+ entity_addr_t my_addr;
+
+ /**
+ * hash map of addresses to Pipes
+ *
+ * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
+ * invalid and can be replaced by anyone holding the msgr lock
+ */
+ ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
+ /**
+ * list of pipes are in the process of accepting
+ *
+ * These are not yet in the rank_pipe map.
+ */
+ set<Pipe*> accepting_pipes;
+ /// a set of all the Pipes we have which are somehow active
+ set<Pipe*> pipes;
+ /// a list of Pipes we want to tear down
+ list<Pipe*> pipe_reap_queue;
+
+ /// internal cluster protocol version, if any, for talking to entities of the same type.
+ int cluster_protocol;
+
+ Cond stop_cond;
+ bool stopped = true;
+
+ bool reaper_started, reaper_stop;
+ Cond reaper_cond;
+
+ /// This Cond is slept on by wait() and signaled by dispatch_entry()
+ Cond wait_cond;
+
+ friend class Pipe;
+
+ Pipe *_lookup_pipe(const entity_addr_t& k) {
+ ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
+ if (p == rank_pipe.end())
+ return NULL;
+ // see lock cribbing in Pipe::fault()
+ if (p->second->state_closed)
+ return NULL;
+ return p->second;
+ }
+
+public:
+
+ int timeout;
+
+ /// con used for sending messages to ourselves
+ ConnectionRef local_connection;
+
+ /**
+ * @defgroup SimpleMessenger internals
+ * @{
+ */
+
+ /**
+ * Increment the global sequence for this SimpleMessenger and return it.
+ * This is for the connect protocol, although it doesn't hurt if somebody
+ * else calls it.
+ *
+ * @return a global sequence ID that nobody else has seen.
+ */
+ __u32 get_global_seq(__u32 old=0) {
+ std::lock_guard<decltype(global_seq_lock)> lg(global_seq_lock);
+
+ if (old > global_seq)
+ global_seq = old;
+ __u32 ret = ++global_seq;
+
+ return ret;
+ }
+ /**
+ * Get the protocol version we support for the given peer type: either
+ * a peer protocol (if it matches our own), the protocol version for the
+ * peer (if we're connecting), or our protocol version (if we're accepting).
+ */
+ int get_proto_version(int peer_type, bool connect);
+
+ /**
+ * Fill in the features, address and peer type for the local connection, which
+ * is used for delivering messages back to ourself.
+ */
+ void init_local_connection();
+ /**
+ * Tell the SimpleMessenger its full IP address.
+ *
+ * This is used by Pipes when connecting to other endpoints, and
+ * probably shouldn't be called by anybody else.
+ */
+ void learned_addr(const entity_addr_t& peer_addr_for_me);
+
+ /**
+ * This function is used by the reaper thread. As long as nobody
+ * has set reaper_stop, it calls the reaper function, then
+ * waits to be signaled when it needs to reap again (or when it needs
+ * to stop).
+ */
+ void reaper_entry();
+ /**
+ * Add a pipe to the pipe_reap_queue, to be torn down on
+ * the next call to reaper().
+ * It should really only be the Pipe calling this, in our current
+ * implementation.
+ *
+ * @param pipe A Pipe which has stopped its threads and is
+ * ready to be torn down.
+ */
+ void queue_reap(Pipe *pipe);
+
+ /**
+ * Used to get whether this connection ready to send
+ */
+ bool is_connected(Connection *con);
+ /**
+ * @} // SimpleMessenger Internals
+ */
+} ;
+
+#endif /* CEPH_SIMPLEMESSENGER_H */