summaryrefslogtreecommitdiffstats
path: root/src/msg/Message.h
blob: b211474c3affbf98af496bef3e9a2e0b14082a97 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software 
 * Foundation.  See file COPYING.
 * 
 */

#ifndef CEPH_MESSAGE_H
#define CEPH_MESSAGE_H

#include <cstdlib>
#include <ostream>
#include <string_view>

#include <boost/intrusive/list.hpp>

#include "include/Context.h"
#include "common/RefCountedObj.h"
#include "common/ThrottleInterface.h"
#include "common/config.h"
#include "common/ref.h"
#include "common/debug.h"
#include "common/zipkin_trace.h"
#include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
#include "msg/Connection.h"
#include "msg/MessageRef.h"
#include "msg_types.h"

#ifdef WITH_SEASTAR
#  include "crimson/net/SocketConnection.h"
#endif // WITH_SEASTAR

// monitor internal
#define MSG_MON_SCRUB              64
#define MSG_MON_ELECTION           65
#define MSG_MON_PAXOS              66
#define MSG_MON_PROBE              67
#define MSG_MON_JOIN               68
#define MSG_MON_SYNC		   69
#define MSG_MON_PING               140

/* monitor <-> mon admin tool */
#define MSG_MON_COMMAND            50
#define MSG_MON_COMMAND_ACK        51
#define MSG_LOG                    52
#define MSG_LOGACK                 53

#define MSG_GETPOOLSTATS           58
#define MSG_GETPOOLSTATSREPLY      59

#define MSG_MON_GLOBAL_ID          60

#define MSG_ROUTE                  47
#define MSG_FORWARD                46

#define MSG_PAXOS                  40

#define MSG_CONFIG           62
#define MSG_GET_CONFIG       63

#define MSG_KV_DATA          54

#define MSG_MON_GET_PURGED_SNAPS 76
#define MSG_MON_GET_PURGED_SNAPS_REPLY 77

// osd internal
#define MSG_OSD_PING         70
#define MSG_OSD_BOOT         71
#define MSG_OSD_FAILURE      72
#define MSG_OSD_ALIVE        73
#define MSG_OSD_MARK_ME_DOWN 74
#define MSG_OSD_FULL         75
#define MSG_OSD_MARK_ME_DEAD 123

// removed right after luminous
//#define MSG_OSD_SUBOP        76
//#define MSG_OSD_SUBOPREPLY   77

#define MSG_OSD_PGTEMP       78

#define MSG_OSD_BEACON       79

#define MSG_OSD_PG_NOTIFY      80
#define MSG_OSD_PG_NOTIFY2    130
#define MSG_OSD_PG_QUERY       81
#define MSG_OSD_PG_QUERY2     131
#define MSG_OSD_PG_LOG         83
#define MSG_OSD_PG_REMOVE      84
#define MSG_OSD_PG_INFO        85
#define MSG_OSD_PG_INFO2      132
#define MSG_OSD_PG_TRIM        86

#define MSG_PGSTATS            87
#define MSG_PGSTATSACK         88

#define MSG_OSD_PG_CREATE      89
#define MSG_REMOVE_SNAPS       90

#define MSG_OSD_SCRUB          91
#define MSG_OSD_SCRUB_RESERVE  92  // previous PG_MISSING
#define MSG_OSD_REP_SCRUB      93

#define MSG_OSD_PG_SCAN        94
#define MSG_OSD_PG_BACKFILL    95
#define MSG_OSD_PG_BACKFILL_REMOVE 96

#define MSG_COMMAND            97
#define MSG_COMMAND_REPLY      98

#define MSG_OSD_BACKFILL_RESERVE 99
#define MSG_OSD_RECOVERY_RESERVE 150
#define MSG_OSD_FORCE_RECOVERY 151

#define MSG_OSD_PG_PUSH        105
#define MSG_OSD_PG_PULL        106
#define MSG_OSD_PG_PUSH_REPLY  107

#define MSG_OSD_EC_WRITE       108
#define MSG_OSD_EC_WRITE_REPLY 109
#define MSG_OSD_EC_READ        110
#define MSG_OSD_EC_READ_REPLY  111

#define MSG_OSD_REPOP         112
#define MSG_OSD_REPOPREPLY    113
#define MSG_OSD_PG_UPDATE_LOG_MISSING  114
#define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY  115

#define MSG_OSD_PG_CREATED      116
#define MSG_OSD_REP_SCRUBMAP    117
#define MSG_OSD_PG_RECOVERY_DELETE 118
#define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
#define MSG_OSD_PG_CREATE2      120
#define MSG_OSD_SCRUB2          121

#define MSG_OSD_PG_READY_TO_MERGE 122

#define MSG_OSD_PG_LEASE        133
#define MSG_OSD_PG_LEASE_ACK    134

// *** MDS ***

#define MSG_MDS_BEACON             100  // to monitor
#define MSG_MDS_PEER_REQUEST       101
#define MSG_MDS_TABLE_REQUEST      102
#define MSG_MDS_SCRUB              135

                                // 150 already in use (MSG_OSD_RECOVERY_RESERVE)

#define MSG_MDS_RESOLVE            0x200 // 0x2xx are for mdcache of mds
#define MSG_MDS_RESOLVEACK         0x201
#define MSG_MDS_CACHEREJOIN        0x202
#define MSG_MDS_DISCOVER           0x203
#define MSG_MDS_DISCOVERREPLY      0x204
#define MSG_MDS_INODEUPDATE        0x205
#define MSG_MDS_DIRUPDATE          0x206
#define MSG_MDS_CACHEEXPIRE        0x207
#define MSG_MDS_DENTRYUNLINK       0x208
#define MSG_MDS_FRAGMENTNOTIFY     0x209
#define MSG_MDS_OFFLOAD_TARGETS    0x20a
#define MSG_MDS_DENTRYLINK         0x20c
#define MSG_MDS_FINDINO            0x20d
#define MSG_MDS_FINDINOREPLY       0x20e
#define MSG_MDS_OPENINO            0x20f
#define MSG_MDS_OPENINOREPLY       0x210
#define MSG_MDS_SNAPUPDATE         0x211
#define MSG_MDS_FRAGMENTNOTIFYACK  0x212
#define MSG_MDS_DENTRYUNLINK_ACK   0x213
#define MSG_MDS_LOCK               0x300 // 0x3xx are for locker of mds
#define MSG_MDS_INODEFILECAPS      0x301

#define MSG_MDS_EXPORTDIRDISCOVER     0x449 // 0x4xx are for migrator of mds
#define MSG_MDS_EXPORTDIRDISCOVERACK  0x450
#define MSG_MDS_EXPORTDIRCANCEL       0x451
#define MSG_MDS_EXPORTDIRPREP         0x452
#define MSG_MDS_EXPORTDIRPREPACK      0x453
#define MSG_MDS_EXPORTDIRWARNING      0x454
#define MSG_MDS_EXPORTDIRWARNINGACK   0x455
#define MSG_MDS_EXPORTDIR             0x456
#define MSG_MDS_EXPORTDIRACK          0x457
#define MSG_MDS_EXPORTDIRNOTIFY       0x458
#define MSG_MDS_EXPORTDIRNOTIFYACK    0x459
#define MSG_MDS_EXPORTDIRFINISH       0x460

#define MSG_MDS_EXPORTCAPS            0x470
#define MSG_MDS_EXPORTCAPSACK         0x471
#define MSG_MDS_GATHERCAPS            0x472

#define MSG_MDS_HEARTBEAT          0x500  // for mds load balancer
#define MSG_MDS_METRICS            0x501  // for mds metric aggregator
#define MSG_MDS_PING               0x502  // for mds pinger
#define MSG_MDS_SCRUB_STATS        0x503  // for mds scrub stack

// *** generic ***
#define MSG_TIMECHECK             0x600
#define MSG_MON_HEALTH            0x601

// *** Message::encode() crcflags bits ***
#define MSG_CRC_DATA           (1 << 0)
#define MSG_CRC_HEADER         (1 << 1)
#define MSG_CRC_ALL            (MSG_CRC_DATA | MSG_CRC_HEADER)


// Special
#define MSG_NOP                   0x607

#define MSG_MON_HEALTH_CHECKS     0x608
#define MSG_TIMECHECK2            0x609

// *** ceph-mgr <-> OSD/MDS daemons ***
#define MSG_MGR_OPEN              0x700
#define MSG_MGR_CONFIGURE         0x701
#define MSG_MGR_REPORT            0x702

// *** ceph-mgr <-> ceph-mon ***
#define MSG_MGR_BEACON            0x703

// *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
#define MSG_MGR_MAP               0x704

// *** ceph-mon(MgrMonitor) -> ceph-mgr
#define MSG_MGR_DIGEST               0x705
// *** cephmgr -> ceph-mon
#define MSG_MON_MGR_REPORT        0x706
#define MSG_SERVICE_MAP           0x707

#define MSG_MGR_CLOSE             0x708
#define MSG_MGR_COMMAND           0x709
#define MSG_MGR_COMMAND_REPLY     0x70a

// *** ceph-mgr <-> MON daemons ***
#define MSG_MGR_UPDATE     0x70b

// ======================================================

// abstract Message class

class Message : public RefCountedObject {
public:
#ifdef WITH_SEASTAR
  using ConnectionRef = crimson::net::ConnectionRef;
#else
  using ConnectionRef = ::ConnectionRef;
#endif // WITH_SEASTAR

protected:
  ceph_msg_header  header;      // headerelope
  ceph_msg_footer  footer;
  ceph::buffer::list       payload;  // "front" unaligned blob
  ceph::buffer::list       middle;   // "middle" unaligned blob
  ceph::buffer::list       data;     // data payload (page-alignment will be preserved where possible)

  /* recv_stamp is set when the Messenger starts reading the
   * Message off the wire */
  utime_t recv_stamp;
  /* dispatch_stamp is set when the Messenger starts calling dispatch() on
   * its endpoints */
  utime_t dispatch_stamp;
  /* throttle_stamp is the point at which we got throttle */
  utime_t throttle_stamp;
  /* time at which message was fully read */
  utime_t recv_complete_stamp;

  ConnectionRef connection;

  uint32_t magic = 0;

  boost::intrusive::list_member_hook<> dispatch_q;

public:
  // zipkin tracing
  ZTracer::Trace trace;
  void encode_trace(ceph::buffer::list &bl, uint64_t features) const;
  void decode_trace(ceph::buffer::list::const_iterator &p, bool create = false);

  class CompletionHook : public Context {
  protected:
    Message *m;
    friend class Message;
  public:
    explicit CompletionHook(Message *_m) : m(_m) {}
    virtual void set_message(Message *_m) { m = _m; }
  };

  typedef boost::intrusive::list<Message,
				 boost::intrusive::member_hook<
				   Message,
				   boost::intrusive::list_member_hook<>,
				   &Message::dispatch_q>> Queue;

  ceph::mono_time queue_start;
protected:
  CompletionHook* completion_hook = nullptr; // owned by Messenger

  // release our size in bytes back to this throttler when our payload
  // is adjusted or when we are destroyed.
  ThrottleInterface *byte_throttler = nullptr;

  // release a count back to this throttler when we are destroyed
  ThrottleInterface *msg_throttler = nullptr;

  // keep track of how big this message was when we reserved space in
  // the msgr dispatch_throttler, so that we can properly release it
  // later.  this is necessary because messages can enter the dispatch
  // queue locally (not via read_message()), and those are not
  // currently throttled.
  uint64_t dispatch_throttle_size = 0;

  friend class Messenger;

public:
  Message() {
    memset(&header, 0, sizeof(header));
    memset(&footer, 0, sizeof(footer));
  }
  Message(int t, int version=1, int compat_version=0) {
    memset(&header, 0, sizeof(header));
    header.type = t;
    header.version = version;
    header.compat_version = compat_version;
    memset(&footer, 0, sizeof(footer));
  }

  Message *get() {
    return static_cast<Message *>(RefCountedObject::get());
  }

protected:
  ~Message() override {
    if (byte_throttler)
      byte_throttler->put(payload.length() + middle.length() + data.length());
    release_message_throttle();
    trace.event("message destructed");
    /* call completion hooks (if any) */
    if (completion_hook)
      completion_hook->complete(0);
  }
public:
  const ConnectionRef& get_connection() const { return connection; }
  void set_connection(ConnectionRef c) {
    connection = std::move(c);
  }
  CompletionHook* get_completion_hook() { return completion_hook; }
  void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
  void set_byte_throttler(ThrottleInterface *t) {
    byte_throttler = t;
  }
  void set_message_throttler(ThrottleInterface *t) {
    msg_throttler = t;
  }

  void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
  uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }

  const ceph_msg_header &get_header() const { return header; }
  ceph_msg_header &get_header() { return header; }
  void set_header(const ceph_msg_header &e) { header = e; }
  void set_footer(const ceph_msg_footer &e) { footer = e; }
  const ceph_msg_footer &get_footer() const { return footer; }
  ceph_msg_footer &get_footer() { return footer; }
  void set_src(const entity_name_t& src) { header.src = src; }

  uint32_t get_magic() const { return magic; }
  void set_magic(int _magic) { magic = _magic; }

  /*
   * If you use get_[data, middle, payload] you shouldn't
   * use it to change those ceph::buffer::lists unless you KNOW
   * there is no throttle being used. The other
   * functions are throttling-aware as appropriate.
   */

  void clear_payload() {
    if (byte_throttler) {
      byte_throttler->put(payload.length() + middle.length());
    }
    payload.clear();
    middle.clear();
  }

  virtual void clear_buffers() {}
  void clear_data() {
    if (byte_throttler)
      byte_throttler->put(data.length());
    data.clear();
    clear_buffers(); // let subclass drop buffers as well
  }
  void release_message_throttle() {
    if (msg_throttler)
      msg_throttler->put();
    msg_throttler = nullptr;
  }

  bool empty_payload() const { return payload.length() == 0; }
  ceph::buffer::list& get_payload() { return payload; }
  const ceph::buffer::list& get_payload() const { return payload; }
  void set_payload(ceph::buffer::list& bl) {
    if (byte_throttler)
      byte_throttler->put(payload.length());
    payload = std::move(bl);
    if (byte_throttler)
      byte_throttler->take(payload.length());
  }

  void set_middle(ceph::buffer::list& bl) {
    if (byte_throttler)
      byte_throttler->put(middle.length());
    middle = std::move(bl);
    if (byte_throttler)
      byte_throttler->take(middle.length());
  }
  ceph::buffer::list& get_middle() { return middle; }

  void set_data(const ceph::buffer::list &bl) {
    if (byte_throttler)
      byte_throttler->put(data.length());
    data.share(bl);
    if (byte_throttler)
      byte_throttler->take(data.length());
  }

  const ceph::buffer::list& get_data() const { return data; }
  ceph::buffer::list& get_data() { return data; }
  void claim_data(ceph::buffer::list& bl) {
    if (byte_throttler)
      byte_throttler->put(data.length());
    bl = std::move(data);
  }
  off_t get_data_len() const { return data.length(); }

  void set_recv_stamp(utime_t t) { recv_stamp = t; }
  const utime_t& get_recv_stamp() const { return recv_stamp; }
  void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
  const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
  void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
  const utime_t& get_throttle_stamp() const { return throttle_stamp; }
  void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
  const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }

  void calc_header_crc() {
    header.crc = ceph_crc32c(0, (unsigned char*)&header,
			     sizeof(header) - sizeof(header.crc));
  }
  void calc_front_crc() {
    footer.front_crc = payload.crc32c(0);
    footer.middle_crc = middle.crc32c(0);
  }
  void calc_data_crc() {
    footer.data_crc = data.crc32c(0);
  }

  virtual int get_cost() const {
    return data.length();
  }

  // type
  int get_type() const { return header.type; }
  void set_type(int t) { header.type = t; }

  uint64_t get_tid() const { return header.tid; }
  void set_tid(uint64_t t) { header.tid = t; }

  uint64_t get_seq() const { return header.seq; }
  void set_seq(uint64_t s) { header.seq = s; }

  unsigned get_priority() const { return header.priority; }
  void set_priority(__s16 p) { header.priority = p; }

  // source/dest
  entity_inst_t get_source_inst() const {
    return entity_inst_t(get_source(), get_source_addr());
  }
  entity_name_t get_source() const {
    return entity_name_t(header.src);
  }
  entity_addr_t get_source_addr() const {
    if (connection)
      return connection->get_peer_addr();
    return entity_addr_t();
  }
  entity_addrvec_t get_source_addrs() const {
    if (connection)
      return connection->get_peer_addrs();
    return entity_addrvec_t();
  }

  // forwarded?
  entity_inst_t get_orig_source_inst() const {
    return get_source_inst();
  }
  entity_name_t get_orig_source() const {
    return get_source();
  }
  entity_addr_t get_orig_source_addr() const {
    return get_source_addr();
  }
  entity_addrvec_t get_orig_source_addrs() const {
    return get_source_addrs();
  }

  // virtual bits
  virtual void decode_payload() = 0;
  virtual void encode_payload(uint64_t features) = 0;
  virtual std::string_view get_type_name() const = 0;
  virtual void print(std::ostream& out) const {
    out << get_type_name() << " magic: " << magic;
  }

  virtual void dump(ceph::Formatter *f) const;

  void encode(uint64_t features, int crcflags, bool skip_header_crc = false);
};

extern Message *decode_message(CephContext *cct,
                               int crcflags,
                               ceph_msg_header& header,
                               ceph_msg_footer& footer,
                               ceph::buffer::list& front,
                               ceph::buffer::list& middle,
                               ceph::buffer::list& data,
                               Message::ConnectionRef conn);
inline std::ostream& operator<<(std::ostream& out, const Message& m) {
  m.print(out);
  if (m.get_header().version)
    out << " v" << m.get_header().version;
  return out;
}

extern void encode_message(Message *m, uint64_t features, ceph::buffer::list& bl);
extern Message *decode_message(CephContext *cct, int crcflags,
                               ceph::buffer::list::const_iterator& bl);

/// this is a "safe" version of Message. it does not allow calling get/put
/// methods on its derived classes. This is intended to prevent some accidental
/// reference leaks by forcing . Instead, you must either cast the derived class to a
/// RefCountedObject to do the get/put or detach a temporary reference.
class SafeMessage : public Message {
public:
  using Message::Message;
  bool is_a_client() const {
    return get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT;
  }

private:
  using RefCountedObject::get;
  using RefCountedObject::put;
};

namespace ceph {
template<class T, typename... Args>
ceph::ref_t<T> make_message(Args&&... args) {
  return {new T(std::forward<Args>(args)...), false};
}
}

#endif