diff options
Diffstat (limited to '')
-rw-r--r-- | src/msg/async/AsyncMessenger.h | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h new file mode 100644 index 00000000..98bf9d52 --- /dev/null +++ b/src/msg/async/AsyncMessenger.h @@ -0,0 +1,426 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNCMESSENGER_H +#define CEPH_ASYNCMESSENGER_H + +#include <map> +#include <mutex> + +#include "include/types.h" +#include "include/xlist.h" +#include "include/spinlock.h" +#include "include/unordered_map.h" +#include "include/unordered_set.h" + +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/Thread.h" + +#include "msg/SimplePolicyMessenger.h" +#include "msg/DispatchQueue.h" +#include "AsyncConnection.h" +#include "Event.h" + +#include "include/ceph_assert.h" + +class AsyncMessenger; + +/** + * If the Messenger binds to a specific address, the Processor runs + * and listens for incoming connections. + */ +class Processor { + AsyncMessenger *msgr; + NetHandler net; + Worker *worker; + vector<ServerSocket> listen_sockets; + EventCallbackRef listen_handler; + + class C_processor_accept; + + public: + Processor(AsyncMessenger *r, Worker *w, CephContext *c); + ~Processor() { delete listen_handler; }; + + void stop(); + int bind(const entity_addrvec_t &bind_addrs, + const set<int>& avoid_ports, + entity_addrvec_t* bound_addrs); + void start(); + void accept(); +}; + +/* + * AsyncMessenger is represented for maintaining a set of asynchronous connections, + * it may own a bind address and the accepted connections will be managed by + * AsyncMessenger. + * + */ + +class AsyncMessenger : public SimplePolicyMessenger { + // First we have the public Messenger interface implementation... +public: + /** + * Initialize the AsyncMessenger! + * + * @param cct The CephContext to use + * @param name The name to assign ourselves + * _nonce A unique ID to use for this AsyncMessenger. It should not + * be a value that will be repeated if the daemon restarts. + */ + AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, + string mname, uint64_t _nonce); + + /** + * Destroy the AsyncMessenger. Pretty simple since all the work is done + * elsewhere. + */ + ~AsyncMessenger() override; + + /** @defgroup Accessors + * @{ + */ + bool set_addr_unknowns(const entity_addrvec_t &addr) override; + void set_addrs(const entity_addrvec_t &addrs) override; + + int get_dispatch_queue_len() override { + return dispatch_queue.get_queue_len(); + } + + double get_dispatch_queue_max_age(utime_t now) override { + return dispatch_queue.get_max_age(now); + } + /** @} Accessors */ + + /** + * @defgroup Configuration functions + * @{ + */ + void set_cluster_protocol(int p) override { + ceph_assert(!started && !did_bind); + cluster_protocol = p; + } + + int bind(const entity_addr_t& bind_addr) override; + int rebind(const set<int>& avoid_ports) override; + int client_bind(const entity_addr_t& bind_addr) override; + + int bindv(const entity_addrvec_t& bind_addrs) override; + + bool should_use_msgr2() override; + + /** @} Configuration functions */ + + /** + * @defgroup Startup/Shutdown + * @{ + */ + int start() override; + void wait() override; + int shutdown() override; + + /** @} // Startup/Shutdown */ + + /** + * @defgroup Messaging + * @{ + */ + int send_to(Message *m, int type, const entity_addrvec_t& addrs) override; + + /** @} // Messaging */ + + /** + * @defgroup Connection Management + * @{ + */ + ConnectionRef connect_to(int type, + const entity_addrvec_t& addrs) override; + ConnectionRef get_loopback_connection() override; + void mark_down(const entity_addr_t& addr) override { + mark_down_addrs(entity_addrvec_t(addr)); + } + void mark_down_addrs(const entity_addrvec_t& addrs) override; + void mark_down_all() override { + shutdown_connections(true); + } + /** @} // Connection Management */ + + /** + * @defgroup Inner classes + * @{ + */ + + /** + * @} // Inner classes + */ + +protected: + /** + * @defgroup Messenger Interfaces + * @{ + */ + /** + * Start up the DispatchQueue thread once we have somebody to dispatch to. + */ + void ready() override; + /** @} // Messenger Interfaces */ + +private: + + /** + * @defgroup Utility functions + * @{ + */ + + /** + * Create a connection associated with the given entity (of the given type). + * Initiate the connection. (This function returning does not guarantee + * connection success.) + * + * @param addrs The address(es) of the entity to connect to. + * @param type The peer type of the entity at the address. + * + * @return a pointer to the newly-created connection. Caller does not own a + * reference; take one if you need it. + */ + AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type); + + /** + * Queue up a Message for delivery to the entity specified + * by addr and dest_type. + * submit_message() is responsible for creating + * new AsyncConnection (and closing old ones) as necessary. + * + * @param m The Message to queue up. This function eats a reference. + * @param con The existing Connection to use, or NULL if you don't know of one. + * @param dest_addr The address to send the Message to. + * @param dest_type The peer type of the address we're sending to + * just drop silently under failure. + */ + void submit_message(Message *m, AsyncConnectionRef con, + const entity_addrvec_t& dest_addrs, int dest_type); + + void _finish_bind(const entity_addrvec_t& bind_addrs, + const entity_addrvec_t& listen_addrs); + + entity_addrvec_t _filter_addrs(int type, + const entity_addrvec_t& addrs); + + private: + static const uint64_t ReapDeadConnectionThreshold = 5; + + NetworkStack *stack; + std::vector<Processor*> processors; + friend class Processor; + DispatchQueue dispatch_queue; + + // the worker run messenger's cron jobs + Worker *local_worker; + + std::string ms_type; + + /// overall lock used for AsyncMessenger data structures + Mutex lock; + // AsyncMessenger stuff + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; + + /// true, specifying we haven't learned our addr; set false when we find it. + // maybe this should be protected by the lock? + bool need_addr; + + /** + * set to bind addresses if bind was called before NetworkStack was ready to + * bind + */ + entity_addrvec_t pending_bind_addrs; + + /** + * false; set to true if a pending bind exists + */ + bool pending_bind = false; + + /** + * The following aren't lock-protected since you shouldn't be able to race + * the only writers. + */ + + /** + * false; set to true if the AsyncMessenger bound to a specific address; + * and set false again by Accepter::stop(). + */ + bool did_bind; + /// counter for the global seq our connection protocol uses + __u32 global_seq; + /// lock to protect the global_seq + ceph::spinlock global_seq_lock; + + /** + * hash map of addresses to Asyncconnection + * + * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered + * invalid and can be replaced by anyone holding the msgr lock + */ + ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns; + + /** + * list of connection are in the process of accepting + * + * These are not yet in the conns map. + */ + set<AsyncConnectionRef> accepting_conns; + + /** + * list of connection are closed which need to be clean up + * + * Because AsyncMessenger and AsyncConnection follow a lock rule that + * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock + * but can't reversed. This rule is aimed to avoid dead lock. + * So if AsyncConnection want to unregister itself from AsyncMessenger, + * we pick up this idea that just queue itself to this set and do lazy + * deleted for AsyncConnection. "_lookup_conn" must ensure not return a + * AsyncConnection in this set. + */ + Mutex deleted_lock; + set<AsyncConnectionRef> deleted_conns; + + EventCallbackRef reap_handler; + + /// internal cluster protocol version, if any, for talking to entities of the same type. + int cluster_protocol; + + Cond stop_cond; + bool stopped; + + AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) { + ceph_assert(lock.is_locked()); + auto p = conns.find(k); + if (p == conns.end()) + return NULL; + + // lazy delete, see "deleted_conns" + Mutex::Locker l(deleted_lock); + if (deleted_conns.erase(p->second)) { + conns.erase(p); + return NULL; + } + + return p->second; + } + + void _init_local_connection() { + ceph_assert(lock.is_locked()); + local_connection->peer_addrs = *my_addrs; + local_connection->peer_type = my_name.type(); + local_connection->set_features(CEPH_FEATURES_ALL); + ms_deliver_handle_fast_connect(local_connection.get()); + } + + void shutdown_connections(bool queue_reset); + +public: + + /// con used for sending messages to ourselves + AsyncConnectionRef local_connection; + + /** + * @defgroup AsyncMessenger internals + * @{ + */ + /** + * This wraps _lookup_conn. + */ + AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) { + Mutex::Locker l(lock); + return _lookup_conn(k); + } + + int accept_conn(AsyncConnectionRef conn); + bool learned_addr(const entity_addr_t &peer_addr_for_me); + void add_accept(Worker *w, ConnectedSocket cli_socket, + const entity_addr_t &listen_addr, + const entity_addr_t &peer_addr); + NetworkStack *get_stack() { + return stack; + } + + uint64_t get_nonce() const { + return nonce; + } + + /** + * Increment the global sequence for this AsyncMessenger and return it. + * This is for the connect protocol, although it doesn't hurt if somebody + * else calls it. + * + * @return a global sequence ID that nobody else has seen. + */ + __u32 get_global_seq(__u32 old=0) { + std::lock_guard<ceph::spinlock> lg(global_seq_lock); + + if (old > global_seq) + global_seq = old; + __u32 ret = ++global_seq; + + return ret; + } + /** + * Get the protocol version we support for the given peer type: either + * a peer protocol (if it matches our own), the protocol version for the + * peer (if we're connecting), or our protocol version (if we're accepting). + */ + int get_proto_version(int peer_type, bool connect) const; + + /** + * Fill in the address and peer type for the local connection, which + * is used for delivering messages back to ourself. + */ + void init_local_connection() { + Mutex::Locker l(lock); + _init_local_connection(); + } + + /** + * Unregister connection from `conns` + * + * See "deleted_conns" + */ + void unregister_conn(AsyncConnectionRef conn) { + Mutex::Locker l(deleted_lock); + conn->get_perf_counter()->dec(l_msgr_active_connections); + deleted_conns.emplace(std::move(conn)); + + if (deleted_conns.size() >= ReapDeadConnectionThreshold) { + local_worker->center.dispatch_event_external(reap_handler); + } + } + + /** + * Reap dead connection from `deleted_conns` + * + * @return the number of dead connections + * + * See "deleted_conns" + */ + int reap_dead(); + + /** + * @} // AsyncMessenger Internals + */ +} ; + +#endif /* CEPH_ASYNCMESSENGER_H */ |