diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/xio/XioMessenger.h | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/msg/xio/XioMessenger.h')
-rw-r--r-- | src/msg/xio/XioMessenger.h | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h new file mode 100644 index 00000000..6f8a67ba --- /dev/null +++ b/src/msg/xio/XioMessenger.h @@ -0,0 +1,176 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_MESSENGER_H +#define XIO_MESSENGER_H + +#include "msg/SimplePolicyMessenger.h" + +#include <atomic> + +extern "C" { +#include "libxio.h" +} + +#include "XioConnection.h" +#include "XioPortal.h" +#include "QueueStrategy.h" +#include "common/Thread.h" +#include "common/Mutex.h" +#include "include/spinlock.h" + +class XioInit { + /* safe to be called multiple times */ + void package_init(CephContext *cct); + +protected: + explicit XioInit(CephContext *cct) { + this->package_init(cct); + } +}; + +class XioMessenger : public SimplePolicyMessenger, XioInit +{ +private: + static std::atomic<uint64_t> nInstances = { 0 }; + std::atomic<uint64_t> nsessions = { 0 }; + std::atomic<bool> shutdown_called = { false }; + ceph::spinlock conns_sp; + XioConnection::ConnList conns_list; + XioConnection::EntitySet conns_entity_map; + XioPortals portals; + DispatchStrategy* dispatch_strategy; + XioLoopbackConnectionRef loop_con; + uint32_t special_handling; + Mutex sh_mtx; + Cond sh_cond; + bool need_addr; + bool did_bind; + + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; + + friend class XioConnection; + +public: + XioMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + uint64_t cflags = 0, + DispatchStrategy* ds = new QueueStrategy(1)); + + virtual ~XioMessenger(); + + XioPortal* get_portal() { return portals.get_next_portal(); } + + virtual void set_myaddr(const entity_addr_t& a) { + Messenger::set_myaddr(a); + loop_con->set_peer_addr(a); + } + + int _send_message(Message *m, const entity_inst_t &dest); + int _send_message(Message *m, Connection *con); + int _send_message_impl(Message *m, XioConnection *xcon); + + uint32_t get_special_handling() { return special_handling; } + void set_special_handling(int n) { special_handling = n; } + int pool_hint(uint32_t size); + void try_insert(XioConnection *xcon); + + /* xio hooks */ + int new_session(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context); + + int session_event(struct xio_session *session, + struct xio_session_event_data *event_data, + void *cb_user_context); + + /* Messenger interface */ + virtual bool set_addr_unknowns(const entity_addrvec_t &addr) override + { } /* XXX applicable? */ + virtual void set_addr(const entity_addr_t &addr) override + { } /* XXX applicable? */ + + virtual int get_dispatch_queue_len() + { return 0; } /* XXX bogus? */ + + virtual double get_dispatch_queue_max_age(utime_t now) + { return 0; } /* XXX bogus? */ + + virtual void set_cluster_protocol(int p) + { } + + virtual int bind(const entity_addr_t& addr); + + virtual int rebind(const set<int>& avoid_ports); + + virtual int start(); + + virtual void wait(); + + virtual int shutdown(); + + virtual int send_message(Message *m, const entity_inst_t &dest) { + return _send_message(m, dest); + } + + virtual int lazy_send_message(Message *m, const entity_inst_t& dest) + { return EINVAL; } + + virtual int lazy_send_message(Message *m, Connection *con) + { return EINVAL; } + + virtual ConnectionRef get_connection(const entity_inst_t& dest); + + // compat hack + ConnectionRef connect_to( + int type, const entity_addrvec_t& dest) override { + return get_connection(entity_inst_t(entity_name_t(type, -1), + dest.legacy_addr())); + } + + virtual ConnectionRef get_loopback_connection(); + + void unregister_xcon(XioConnection *xcon); + virtual void mark_down(const entity_addr_t& a); + virtual void mark_down(Connection *con); + virtual void mark_down_all(); + virtual void mark_down_on_empty(Connection *con); + virtual void mark_disposable(Connection *con); + + void ds_dispatch(Message *m) + { dispatch_strategy->ds_dispatch(m); } + + /** + * Tell the XioMessenger its full IP address. + * + * This is used by clients when connecting to other endpoints, and + * probably shouldn't be called by anybody else. + */ + void learned_addr(const entity_addr_t& peer_addr_for_me); + +private: + int get_nconns_per_portal(uint64_t cflags); + int get_nportals(uint64_t cflags); + +protected: + virtual void ready() + { } +}; + +XioCommand* pool_alloc_xio_command(XioConnection *xcon); + + +#endif /* XIO_MESSENGER_H */ |