summaryrefslogtreecommitdiffstats
path: root/src/msg/Message.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/Message.h
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/msg/Message.h577
1 files changed, 577 insertions, 0 deletions
diff --git a/src/msg/Message.h b/src/msg/Message.h
new file mode 100644
index 00000000..42405ed3
--- /dev/null
+++ b/src/msg/Message.h
@@ -0,0 +1,577 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MESSAGE_H
+#define CEPH_MESSAGE_H
+
+#include <stdlib.h>
+#include <ostream>
+#include <string_view>
+
+#include <boost/intrusive/list.hpp>
+
+#include "include/Context.h"
+#include "common/RefCountedObj.h"
+#include "common/ThrottleInterface.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "common/zipkin_trace.h"
+#include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
+#include "include/buffer.h"
+#include "include/types.h"
+#include "msg/Connection.h"
+#include "msg/MessageRef.h"
+#include "msg_types.h"
+
+// monitor internal
+#define MSG_MON_SCRUB 64
+#define MSG_MON_ELECTION 65
+#define MSG_MON_PAXOS 66
+#define MSG_MON_PROBE 67
+#define MSG_MON_JOIN 68
+#define MSG_MON_SYNC 69
+
+/* monitor <-> mon admin tool */
+#define MSG_MON_COMMAND 50
+#define MSG_MON_COMMAND_ACK 51
+#define MSG_LOG 52
+#define MSG_LOGACK 53
+
+#define MSG_GETPOOLSTATS 58
+#define MSG_GETPOOLSTATSREPLY 59
+
+#define MSG_MON_GLOBAL_ID 60
+
+#define MSG_ROUTE 47
+#define MSG_FORWARD 46
+
+#define MSG_PAXOS 40
+
+#define MSG_CONFIG 62
+#define MSG_GET_CONFIG 63
+
+
+// osd internal
+#define MSG_OSD_PING 70
+#define MSG_OSD_BOOT 71
+#define MSG_OSD_FAILURE 72
+#define MSG_OSD_ALIVE 73
+#define MSG_OSD_MARK_ME_DOWN 74
+#define MSG_OSD_FULL 75
+
+// removed right after luminous
+//#define MSG_OSD_SUBOP 76
+//#define MSG_OSD_SUBOPREPLY 77
+
+#define MSG_OSD_PGTEMP 78
+
+#define MSG_OSD_BEACON 79
+
+#define MSG_OSD_PG_NOTIFY 80
+#define MSG_OSD_PG_QUERY 81
+#define MSG_OSD_PG_LOG 83
+#define MSG_OSD_PG_REMOVE 84
+#define MSG_OSD_PG_INFO 85
+#define MSG_OSD_PG_TRIM 86
+
+#define MSG_PGSTATS 87
+#define MSG_PGSTATSACK 88
+
+#define MSG_OSD_PG_CREATE 89
+#define MSG_REMOVE_SNAPS 90
+
+#define MSG_OSD_SCRUB 91
+#define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
+#define MSG_OSD_REP_SCRUB 93
+
+#define MSG_OSD_PG_SCAN 94
+#define MSG_OSD_PG_BACKFILL 95
+#define MSG_OSD_PG_BACKFILL_REMOVE 96
+
+#define MSG_COMMAND 97
+#define MSG_COMMAND_REPLY 98
+
+#define MSG_OSD_BACKFILL_RESERVE 99
+#define MSG_OSD_RECOVERY_RESERVE 150
+#define MSG_OSD_FORCE_RECOVERY 151
+
+#define MSG_OSD_PG_PUSH 105
+#define MSG_OSD_PG_PULL 106
+#define MSG_OSD_PG_PUSH_REPLY 107
+
+#define MSG_OSD_EC_WRITE 108
+#define MSG_OSD_EC_WRITE_REPLY 109
+#define MSG_OSD_EC_READ 110
+#define MSG_OSD_EC_READ_REPLY 111
+
+#define MSG_OSD_REPOP 112
+#define MSG_OSD_REPOPREPLY 113
+#define MSG_OSD_PG_UPDATE_LOG_MISSING 114
+#define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
+
+#define MSG_OSD_PG_CREATED 116
+#define MSG_OSD_REP_SCRUBMAP 117
+#define MSG_OSD_PG_RECOVERY_DELETE 118
+#define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
+#define MSG_OSD_PG_CREATE2 120
+#define MSG_OSD_SCRUB2 121
+
+#define MSG_OSD_PG_READY_TO_MERGE 122
+
+// *** MDS ***
+
+#define MSG_MDS_BEACON 100 // to monitor
+#define MSG_MDS_SLAVE_REQUEST 101
+#define MSG_MDS_TABLE_REQUEST 102
+
+ // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
+
+#define MSG_MDS_RESOLVE 0x200
+#define MSG_MDS_RESOLVEACK 0x201
+#define MSG_MDS_CACHEREJOIN 0x202
+#define MSG_MDS_DISCOVER 0x203
+#define MSG_MDS_DISCOVERREPLY 0x204
+#define MSG_MDS_INODEUPDATE 0x205
+#define MSG_MDS_DIRUPDATE 0x206
+#define MSG_MDS_CACHEEXPIRE 0x207
+#define MSG_MDS_DENTRYUNLINK 0x208
+#define MSG_MDS_FRAGMENTNOTIFY 0x209
+#define MSG_MDS_OFFLOAD_TARGETS 0x20a
+#define MSG_MDS_DENTRYLINK 0x20c
+#define MSG_MDS_FINDINO 0x20d
+#define MSG_MDS_FINDINOREPLY 0x20e
+#define MSG_MDS_OPENINO 0x20f
+#define MSG_MDS_OPENINOREPLY 0x210
+#define MSG_MDS_SNAPUPDATE 0x211
+#define MSG_MDS_FRAGMENTNOTIFYACK 0x212
+#define MSG_MDS_LOCK 0x300
+#define MSG_MDS_INODEFILECAPS 0x301
+
+#define MSG_MDS_EXPORTDIRDISCOVER 0x449
+#define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
+#define MSG_MDS_EXPORTDIRCANCEL 0x451
+#define MSG_MDS_EXPORTDIRPREP 0x452
+#define MSG_MDS_EXPORTDIRPREPACK 0x453
+#define MSG_MDS_EXPORTDIRWARNING 0x454
+#define MSG_MDS_EXPORTDIRWARNINGACK 0x455
+#define MSG_MDS_EXPORTDIR 0x456
+#define MSG_MDS_EXPORTDIRACK 0x457
+#define MSG_MDS_EXPORTDIRNOTIFY 0x458
+#define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
+#define MSG_MDS_EXPORTDIRFINISH 0x460
+
+#define MSG_MDS_EXPORTCAPS 0x470
+#define MSG_MDS_EXPORTCAPSACK 0x471
+#define MSG_MDS_GATHERCAPS 0x472
+
+#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
+
+// *** generic ***
+#define MSG_TIMECHECK 0x600
+#define MSG_MON_HEALTH 0x601
+
+// *** Message::encode() crcflags bits ***
+#define MSG_CRC_DATA (1 << 0)
+#define MSG_CRC_HEADER (1 << 1)
+#define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
+
+// Xio Testing
+#define MSG_DATA_PING 0x602
+
+// Xio intends to define messages 0x603..0x606
+
+// Special
+#define MSG_NOP 0x607
+
+#define MSG_MON_HEALTH_CHECKS 0x608
+#define MSG_TIMECHECK2 0x609
+
+// *** ceph-mgr <-> OSD/MDS daemons ***
+#define MSG_MGR_OPEN 0x700
+#define MSG_MGR_CONFIGURE 0x701
+#define MSG_MGR_REPORT 0x702
+
+// *** ceph-mgr <-> ceph-mon ***
+#define MSG_MGR_BEACON 0x703
+
+// *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
+#define MSG_MGR_MAP 0x704
+
+// *** ceph-mon(MgrMonitor) -> ceph-mgr
+#define MSG_MGR_DIGEST 0x705
+// *** cephmgr -> ceph-mon
+#define MSG_MON_MGR_REPORT 0x706
+#define MSG_SERVICE_MAP 0x707
+
+#define MSG_MGR_CLOSE 0x708
+
+// ======================================================
+
+// abstract Message class
+
+namespace bi = boost::intrusive;
+
+// XioMessenger conditional trace flags
+#define MSG_MAGIC_XIO 0x0002
+#define MSG_MAGIC_TRACE_XCON 0x0004
+#define MSG_MAGIC_TRACE_DTOR 0x0008
+#define MSG_MAGIC_TRACE_HDR 0x0010
+#define MSG_MAGIC_TRACE_XIO 0x0020
+#define MSG_MAGIC_TRACE_XMSGR 0x0040
+#define MSG_MAGIC_TRACE_CTR 0x0080
+
+// XioMessenger diagnostic "ping pong" flag (resend msg when send completes)
+#define MSG_MAGIC_REDUPE 0x0100
+
+class Message : public RefCountedObject {
+protected:
+ ceph_msg_header header; // headerelope
+ ceph_msg_footer footer;
+ bufferlist payload; // "front" unaligned blob
+ bufferlist middle; // "middle" unaligned blob
+ bufferlist data; // data payload (page-alignment will be preserved where possible)
+
+ /* recv_stamp is set when the Messenger starts reading the
+ * Message off the wire */
+ utime_t recv_stamp;
+ /* dispatch_stamp is set when the Messenger starts calling dispatch() on
+ * its endpoints */
+ utime_t dispatch_stamp;
+ /* throttle_stamp is the point at which we got throttle */
+ utime_t throttle_stamp;
+ /* time at which message was fully read */
+ utime_t recv_complete_stamp;
+
+ ConnectionRef connection;
+
+ uint32_t magic = 0;
+
+ bi::list_member_hook<> dispatch_q;
+
+public:
+ using ref = MessageRef;
+ using const_ref = MessageConstRef;
+
+ // zipkin tracing
+ ZTracer::Trace trace;
+ void encode_trace(bufferlist &bl, uint64_t features) const;
+ void decode_trace(bufferlist::const_iterator &p, bool create = false);
+
+ class CompletionHook : public Context {
+ protected:
+ Message *m;
+ friend class Message;
+ public:
+ explicit CompletionHook(Message *_m) : m(_m) {}
+ virtual void set_message(Message *_m) { m = _m; }
+ };
+
+ typedef bi::list< Message,
+ bi::member_hook< Message,
+ bi::list_member_hook<>,
+ &Message::dispatch_q > > Queue;
+
+protected:
+ CompletionHook* completion_hook = nullptr; // owned by Messenger
+
+ // release our size in bytes back to this throttler when our payload
+ // is adjusted or when we are destroyed.
+ ThrottleInterface *byte_throttler = nullptr;
+
+ // release a count back to this throttler when we are destroyed
+ ThrottleInterface *msg_throttler = nullptr;
+
+ // keep track of how big this message was when we reserved space in
+ // the msgr dispatch_throttler, so that we can properly release it
+ // later. this is necessary because messages can enter the dispatch
+ // queue locally (not via read_message()), and those are not
+ // currently throttled.
+ uint64_t dispatch_throttle_size = 0;
+
+ friend class Messenger;
+
+public:
+ Message() {
+ memset(&header, 0, sizeof(header));
+ memset(&footer, 0, sizeof(footer));
+ }
+ Message(int t, int version=1, int compat_version=0) {
+ memset(&header, 0, sizeof(header));
+ header.type = t;
+ header.version = version;
+ header.compat_version = compat_version;
+ header.priority = 0; // undef
+ header.data_off = 0;
+ memset(&footer, 0, sizeof(footer));
+ }
+
+ Message *get() {
+ return static_cast<Message *>(RefCountedObject::get());
+ }
+
+protected:
+ ~Message() override {
+ if (byte_throttler)
+ byte_throttler->put(payload.length() + middle.length() + data.length());
+ release_message_throttle();
+ trace.event("message destructed");
+ /* call completion hooks (if any) */
+ if (completion_hook)
+ completion_hook->complete(0);
+ }
+public:
+ const ConnectionRef& get_connection() const { return connection; }
+ void set_connection(const ConnectionRef& c) {
+ connection = c;
+ }
+ CompletionHook* get_completion_hook() { return completion_hook; }
+ void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
+ void set_byte_throttler(ThrottleInterface *t) {
+ byte_throttler = t;
+ }
+ void set_message_throttler(ThrottleInterface *t) {
+ msg_throttler = t;
+ }
+
+ void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
+ uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
+
+ const ceph_msg_header &get_header() const { return header; }
+ ceph_msg_header &get_header() { return header; }
+ void set_header(const ceph_msg_header &e) { header = e; }
+ void set_footer(const ceph_msg_footer &e) { footer = e; }
+ const ceph_msg_footer &get_footer() const { return footer; }
+ ceph_msg_footer &get_footer() { return footer; }
+ void set_src(const entity_name_t& src) { header.src = src; }
+
+ uint32_t get_magic() const { return magic; }
+ void set_magic(int _magic) { magic = _magic; }
+
+ /*
+ * If you use get_[data, middle, payload] you shouldn't
+ * use it to change those bufferlists unless you KNOW
+ * there is no throttle being used. The other
+ * functions are throttling-aware as appropriate.
+ */
+
+ void clear_payload() {
+ if (byte_throttler) {
+ byte_throttler->put(payload.length() + middle.length());
+ }
+ payload.clear();
+ middle.clear();
+ }
+
+ virtual void clear_buffers() {}
+ void clear_data() {
+ if (byte_throttler)
+ byte_throttler->put(data.length());
+ data.clear();
+ clear_buffers(); // let subclass drop buffers as well
+ }
+ void release_message_throttle() {
+ if (msg_throttler)
+ msg_throttler->put();
+ msg_throttler = nullptr;
+ }
+
+ bool empty_payload() const { return payload.length() == 0; }
+ bufferlist& get_payload() { return payload; }
+ const bufferlist& get_payload() const { return payload; }
+ void set_payload(bufferlist& bl) {
+ if (byte_throttler)
+ byte_throttler->put(payload.length());
+ payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
+ if (byte_throttler)
+ byte_throttler->take(payload.length());
+ }
+
+ void set_middle(bufferlist& bl) {
+ if (byte_throttler)
+ byte_throttler->put(middle.length());
+ middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
+ if (byte_throttler)
+ byte_throttler->take(middle.length());
+ }
+ bufferlist& get_middle() { return middle; }
+
+ void set_data(const bufferlist &bl) {
+ if (byte_throttler)
+ byte_throttler->put(data.length());
+ data.share(bl);
+ if (byte_throttler)
+ byte_throttler->take(data.length());
+ }
+
+ const bufferlist& get_data() const { return data; }
+ bufferlist& get_data() { return data; }
+ void claim_data(bufferlist& bl,
+ unsigned int flags = buffer::list::CLAIM_DEFAULT) {
+ if (byte_throttler)
+ byte_throttler->put(data.length());
+ bl.claim(data, flags);
+ }
+ off_t get_data_len() const { return data.length(); }
+
+ void set_recv_stamp(utime_t t) { recv_stamp = t; }
+ const utime_t& get_recv_stamp() const { return recv_stamp; }
+ void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
+ const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
+ void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
+ const utime_t& get_throttle_stamp() const { return throttle_stamp; }
+ void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
+ const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
+
+ void calc_header_crc() {
+ header.crc = ceph_crc32c(0, (unsigned char*)&header,
+ sizeof(header) - sizeof(header.crc));
+ }
+ void calc_front_crc() {
+ footer.front_crc = payload.crc32c(0);
+ footer.middle_crc = middle.crc32c(0);
+ }
+ void calc_data_crc() {
+ footer.data_crc = data.crc32c(0);
+ }
+
+ virtual int get_cost() const {
+ return data.length();
+ }
+
+ // type
+ int get_type() const { return header.type; }
+ void set_type(int t) { header.type = t; }
+
+ uint64_t get_tid() const { return header.tid; }
+ void set_tid(uint64_t t) { header.tid = t; }
+
+ uint64_t get_seq() const { return header.seq; }
+ void set_seq(uint64_t s) { header.seq = s; }
+
+ unsigned get_priority() const { return header.priority; }
+ void set_priority(__s16 p) { header.priority = p; }
+
+ // source/dest
+ entity_inst_t get_source_inst() const {
+ return entity_inst_t(get_source(), get_source_addr());
+ }
+ entity_name_t get_source() const {
+ return entity_name_t(header.src);
+ }
+ entity_addr_t get_source_addr() const {
+ if (connection)
+ return connection->get_peer_addr();
+ return entity_addr_t();
+ }
+ entity_addrvec_t get_source_addrs() const {
+ if (connection)
+ return connection->get_peer_addrs();
+ return entity_addrvec_t();
+ }
+
+ // forwarded?
+ entity_inst_t get_orig_source_inst() const {
+ return get_source_inst();
+ }
+ entity_name_t get_orig_source() const {
+ return get_source();
+ }
+ entity_addr_t get_orig_source_addr() const {
+ return get_source_addr();
+ }
+ entity_addrvec_t get_orig_source_addrs() const {
+ return get_source_addrs();
+ }
+
+ // virtual bits
+ virtual void decode_payload() = 0;
+ virtual void encode_payload(uint64_t features) = 0;
+ virtual std::string_view get_type_name() const = 0;
+ virtual void print(ostream& out) const {
+ out << get_type_name() << " magic: " << magic;
+ }
+
+ virtual void dump(Formatter *f) const;
+
+ void encode(uint64_t features, int crcflags);
+};
+
+extern Message *decode_message(CephContext *cct, int crcflags,
+ ceph_msg_header &header,
+ ceph_msg_footer& footer, bufferlist& front,
+ bufferlist& middle, bufferlist& data,
+ Connection* conn);
+inline ostream& operator<<(ostream& out, const Message& m) {
+ m.print(out);
+ if (m.get_header().version)
+ out << " v" << m.get_header().version;
+ return out;
+}
+
+extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
+extern Message *decode_message(CephContext *cct, int crcflags,
+ bufferlist::const_iterator& bl);
+
+template <class MessageType>
+class MessageFactory {
+public:
+template<typename... Args>
+ static typename MessageType::ref build(Args&&... args) {
+ return typename MessageType::ref(new MessageType(std::forward<Args>(args)...), false);
+ }
+};
+
+template<class T, class M = Message>
+class MessageSubType : public M {
+public:
+ typedef boost::intrusive_ptr<T> ref;
+ typedef boost::intrusive_ptr<T const> const_ref;
+
+ static auto msgref_cast(typename M::ref const& m) {
+ return boost::static_pointer_cast<typename T::const_ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
+ }
+ static auto msgref_cast(typename M::const_ref const& m) {
+ return boost::static_pointer_cast<typename T::ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
+ }
+
+protected:
+template<typename... Args>
+ MessageSubType(Args&&... args) : M(std::forward<Args>(args)...) {}
+ virtual ~MessageSubType() override {}
+};
+
+
+template<class T, class M = Message>
+class MessageInstance : public MessageSubType<T, M> {
+public:
+ using factory = MessageFactory<T>;
+
+ template<typename... Args>
+ static auto create(Args&&... args) {
+ return MessageFactory<T>::build(std::forward<Args>(args)...);
+ }
+ static auto msgref_cast(typename Message::ref const& m) {
+ return boost::static_pointer_cast<typename T::ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
+ }
+ static auto msgref_cast(typename Message::const_ref const& m) {
+ return boost::static_pointer_cast<typename T::const_ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
+ }
+
+protected:
+template<typename... Args>
+ MessageInstance(Args&&... args) : MessageSubType<T,M>(std::forward<Args>(args)...) {}
+ virtual ~MessageInstance() override {}
+};
+
+#endif