summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/watch.h
blob: b3982141d86e82da02cd51740fad8ee81f275709 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <iterator>
#include <map>
#include <set>

#include <seastar/core/shared_ptr.hh>

#include "crimson/net/Connection.h"
#include "crimson/osd/object_context.h"
#include "crimson/osd/pg.h"
#include "include/denc.h"

namespace crimson::osd {

class Notify;
using NotifyRef = seastar::shared_ptr<Notify>;

// NOTE: really need to have this public. Otherwise `shared_from_this()`
// will abort. According to cppreference.com:
//
//   "The constructors of std::shared_ptr detect the presence
//   of an unambiguous and accessible (ie. public inheritance
//   is mandatory) (since C++17) enable_shared_from_this base".
//
// I expect the `seastar::shared_ptr` shares this behaviour.
class Watch : public seastar::enable_shared_from_this<Watch> {
  // this is a private tag for the public constructor that turns it into
  // de facto private one. The motivation behind the hack is make_shared
  // used by create().
  struct private_ctag_t{};

  std::set<NotifyRef, std::less<>> in_progress_notifies;
  crimson::net::ConnectionRef conn;
  crimson::osd::ObjectContextRef obc;

  watch_info_t winfo;
  entity_name_t entity_name;
  Ref<PG> pg;

  seastar::timer<seastar::lowres_clock> timeout_timer;

  seastar::future<> start_notify(NotifyRef);
  seastar::future<> send_notify_msg(NotifyRef);
  seastar::future<> send_disconnect_msg();

  friend Notify;
  friend class WatchTimeoutRequest;

public:
  Watch(private_ctag_t,
        crimson::osd::ObjectContextRef obc,
        const watch_info_t& winfo,
        const entity_name_t& entity_name,
        Ref<PG> pg)
    : obc(std::move(obc)),
      winfo(winfo),
      entity_name(entity_name),
      pg(std::move(pg)),
      timeout_timer([this] {
        return do_watch_timeout();
      }) {
    assert(this->pg);
  }
  ~Watch();

  seastar::future<> connect(crimson::net::ConnectionRef, bool);
  void disconnect();
  bool is_alive() const {
    return true;
  }
  bool is_connected() const {
    return static_cast<bool>(conn);
  }
  void got_ping(utime_t);

  void discard_state();

  seastar::future<> remove();

  /// Call when notify_ack received on notify_id
  seastar::future<> notify_ack(
    uint64_t notify_id, ///< [in] id of acked notify
    const ceph::bufferlist& reply_bl); ///< [in] notify reply buffer

  template <class... Args>
  static seastar::shared_ptr<Watch> create(Args&&... args) {
    return seastar::make_shared<Watch>(private_ctag_t{},
                                       std::forward<Args>(args)...);
  };

  uint64_t get_watcher_gid() const {
    return entity_name.num();
  }
  auto get_pg() const {
    return pg;
  }
  auto& get_entity() const {
    return entity_name;
  }
  auto& get_cookie() const {
    return winfo.cookie;
  }
  auto& get_peer_addr() const {
    return winfo.addr;
  }
  void cancel_notify(const uint64_t notify_id);
  void do_watch_timeout();
};

using WatchRef = seastar::shared_ptr<Watch>;

struct notify_reply_t {
  uint64_t watcher_gid;
  uint64_t watcher_cookie;
  ceph::bufferlist bl;

  bool operator<(const notify_reply_t& rhs) const;
  DENC(notify_reply_t, v, p) {
    // there is no versioning / preamble
    denc(v.watcher_gid, p);
    denc(v.watcher_cookie, p);
    denc(v.bl, p);
  }
};
std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs);

class Notify : public seastar::enable_shared_from_this<Notify> {
  std::set<WatchRef> watchers;
  const notify_info_t ninfo;
  crimson::net::ConnectionRef conn;
  const uint64_t client_gid;
  const uint64_t user_version;
  bool complete{false};
  bool discarded{false};
  seastar::timer<seastar::lowres_clock> timeout_timer{
    [this] { do_notify_timeout(); }
  };

  ~Notify();

  /// (gid,cookie) -> reply_bl for everyone who acked the notify
  std::multiset<notify_reply_t> notify_replies;

  uint64_t get_id() const { return ninfo.notify_id; }

  /// Sends notify completion if watchers.empty() or timeout
  seastar::future<> send_completion(
    std::set<WatchRef> timedout_watchers = {});

  /// Called on Notify timeout
  void do_notify_timeout();

  Notify(crimson::net::ConnectionRef conn,
         const notify_info_t& ninfo,
         const uint64_t client_gid,
         const uint64_t user_version);
  template <class WatchIteratorT>
  Notify(WatchIteratorT begin,
         WatchIteratorT end,
         crimson::net::ConnectionRef conn,
         const notify_info_t& ninfo,
         const uint64_t client_gid,
         const uint64_t user_version);
  // this is a private tag for the public constructor that turns it into
  // de facto private one. The motivation behind the hack is make_shared
  // used by create_n_propagate factory.
  struct private_ctag_t{};

  using ptr_t = seastar::shared_ptr<Notify>;
  friend bool operator<(const ptr_t& lhs, const ptr_t& rhs) {
    assert(lhs);
    assert(rhs);
    return lhs->get_id() < rhs->get_id();
  }
  friend bool operator<(const ptr_t& ptr, const uint64_t id) {
    assert(ptr);
    return ptr->get_id() < id;
  }
  friend bool operator<(const uint64_t id, const ptr_t& ptr) {
    assert(ptr);
    return id < ptr->get_id();
  }

  friend Watch;

public:
  template <class... Args>
  Notify(private_ctag_t, Args&&... args) : Notify(std::forward<Args>(args)...) {
  }

  template <class WatchIteratorT, class... Args>
  static seastar::future<> create_n_propagate(
    WatchIteratorT begin,
    WatchIteratorT end,
    Args&&... args);

  seastar::future<> remove_watcher(WatchRef watch);
  seastar::future<> complete_watcher(WatchRef watch,
                                     const ceph::bufferlist& reply_bl);
};


template <class WatchIteratorT>
Notify::Notify(WatchIteratorT begin,
               WatchIteratorT end,
               crimson::net::ConnectionRef conn,
               const notify_info_t& ninfo,
               const uint64_t client_gid,
               const uint64_t user_version)
  : watchers(begin, end),
    ninfo(ninfo),
    conn(std::move(conn)),
    client_gid(client_gid),
    user_version(user_version) {
  assert(!std::empty(watchers));
  if (ninfo.timeout) {
    timeout_timer.arm(std::chrono::seconds{ninfo.timeout});
  }
}

template <class WatchIteratorT, class... Args>
seastar::future<> Notify::create_n_propagate(
  WatchIteratorT begin,
  WatchIteratorT end,
  Args&&... args)
{
  static_assert(
    std::is_same_v<typename std::iterator_traits<WatchIteratorT>::value_type,
                   crimson::osd::WatchRef>);
  if (begin == end) {
    auto notify = seastar::make_shared<Notify>(
      private_ctag_t{},
      std::forward<Args>(args)...);
    return notify->send_completion();
  } else {
    auto notify = seastar::make_shared<Notify>(
      private_ctag_t{},
      begin, end,
      std::forward<Args>(args)...);
    return seastar::do_for_each(begin, end, [=] (auto& watchref) {
      return watchref->start_notify(notify);
    });
  }
}

} // namespace crimson::osd

WRITE_CLASS_DENC(crimson::osd::notify_reply_t)

#if FMT_VERSION >= 90000
template <> struct fmt::formatter<crimson::osd::notify_reply_t> : fmt::ostream_formatter {};
#endif