summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalMetadata.h
blob: 071456f56ecc02a4b64d4e129eb378785c11f1fa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_JOURNAL_JOURNAL_METADATA_H
#define CEPH_JOURNAL_JOURNAL_METADATA_H

#include "include/int_types.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/AsyncOpTracker.h"
#include "common/Cond.h"
#include "common/Timer.h"
#include "common/ceph_mutex.h"
#include "common/RefCountedObj.h"
#include "common/WorkQueue.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/JournalMetadataListener.h"
#include "journal/Settings.h"
#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
#include <functional>
#include <list>
#include <map>
#include <string>
#include "include/ceph_assert.h"

namespace journal {

class JournalMetadata : public RefCountedObject, boost::noncopyable {
public:
  typedef std::function<Context*()> CreateContext;
  typedef cls::journal::ObjectPosition ObjectPosition;
  typedef cls::journal::ObjectPositions ObjectPositions;
  typedef cls::journal::ObjectSetPosition ObjectSetPosition;
  typedef cls::journal::Client Client;
  typedef cls::journal::Tag Tag;

  typedef std::set<Client> RegisteredClients;
  typedef std::list<Tag> Tags;

  void init(Context *on_init);
  void shut_down(Context *on_finish);

  bool is_initialized() const { return m_initialized; }

  void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
			      int64_t *pool_id, Context *on_finish);

  void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set,
			    RegisteredClients *clients, Context *on_finish);

  void add_listener(JournalMetadataListener *listener);
  void remove_listener(JournalMetadataListener *listener);

  void register_client(const bufferlist &data, Context *on_finish);
  void update_client(const bufferlist &data, Context *on_finish);
  void unregister_client(Context *on_finish);
  void get_client(const std::string &client_id, cls::journal::Client *client,
                  Context *on_finish);

  void allocate_tag(uint64_t tag_class, const bufferlist &data,
                    Tag *tag, Context *on_finish);
  void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish);
  void get_tags(uint64_t start_after_tag_tid,
                const boost::optional<uint64_t> &tag_class, Tags *tags,
                Context *on_finish);

  inline const Settings &get_settings() const {
    return m_settings;
  }
  inline const std::string &get_client_id() const {
    return m_client_id;
  }
  inline uint8_t get_order() const {
    return m_order;
  }
  inline uint64_t get_object_size() const {
    return 1 << m_order;
  }
  inline uint8_t get_splay_width() const {
    return m_splay_width;
  }
  inline int64_t get_pool_id() const {
    return m_pool_id;
  }

  inline void queue(Context *on_finish, int r) {
    m_work_queue->queue(on_finish, r);
  }

  inline ContextWQ *get_work_queue() {
    return m_work_queue;
  }

  inline SafeTimer &get_timer() {
    return *m_timer;
  }
  inline ceph::mutex &get_timer_lock() {
    return *m_timer_lock;
  }

  void set_minimum_set(uint64_t object_set);
  inline uint64_t get_minimum_set() const {
    std::lock_guard locker{m_lock};
    return m_minimum_set;
  }

  int set_active_set(uint64_t object_set);
  void set_active_set(uint64_t object_set, Context *on_finish);
  inline uint64_t get_active_set() const {
    std::lock_guard locker{m_lock};
    return m_active_set;
  }

  void assert_active_tag(uint64_t tag_tid, Context *on_finish);

  void flush_commit_position();
  void flush_commit_position(Context *on_safe);
  void get_commit_position(ObjectSetPosition *commit_position) const {
    std::lock_guard locker{m_lock};
    *commit_position = m_client.commit_position;
  }

  void get_registered_clients(RegisteredClients *registered_clients) {
    std::lock_guard locker{m_lock};
    *registered_clients = m_registered_clients;
  }

  inline uint64_t allocate_entry_tid(uint64_t tag_tid) {
    std::lock_guard locker{m_lock};
    return m_allocated_entry_tids[tag_tid]++;
  }
  void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid);
  bool get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const;

  uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid,
                               uint64_t entry_tid);
  void overflow_commit_tid(uint64_t commit_tid, uint64_t object_num);
  void get_commit_entry(uint64_t commit_tid, uint64_t *object_num,
                        uint64_t *tag_tid, uint64_t *entry_tid);
  void committed(uint64_t commit_tid, const CreateContext &create_context);

  void notify_update();
  void async_notify_update(Context *on_safe);

  void wait_for_ops();

private:
  FRIEND_MAKE_REF(JournalMetadata);
  JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
                  librados::IoCtx &ioctx, const std::string &oid,
                  const std::string &client_id, const Settings &settings);
  ~JournalMetadata() override;

  typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
  typedef std::list<JournalMetadataListener*> Listeners;
  typedef std::list<Context*> Contexts;

  struct CommitEntry {
    uint64_t object_num;
    uint64_t tag_tid;
    uint64_t entry_tid;
    bool committed;

    CommitEntry() : object_num(0), tag_tid(0), entry_tid(0), committed(false) {
    }
    CommitEntry(uint64_t _object_num, uint64_t _tag_tid, uint64_t _entry_tid)
      : object_num(_object_num), tag_tid(_tag_tid), entry_tid(_entry_tid),
        committed(false) {
    }
  };
  typedef std::map<uint64_t, CommitEntry> CommitTids;

  struct C_WatchCtx : public librados::WatchCtx2 {
    JournalMetadata *journal_metadata;

    C_WatchCtx(JournalMetadata *_journal_metadata)
      : journal_metadata(_journal_metadata) {}

    void handle_notify(uint64_t notify_id, uint64_t cookie,
                               uint64_t notifier_id, bufferlist& bl) override {
      journal_metadata->handle_watch_notify(notify_id, cookie);
    }
    void handle_error(uint64_t cookie, int err) override {
      journal_metadata->handle_watch_error(err);
    }
  };

  struct C_WatchReset : public Context {
    JournalMetadata *journal_metadata;

    C_WatchReset(JournalMetadata *_journal_metadata)
      : journal_metadata(_journal_metadata) {
      journal_metadata->m_async_op_tracker.start_op();
    }
    ~C_WatchReset() override {
      journal_metadata->m_async_op_tracker.finish_op();
    }
    void finish(int r) override {
      journal_metadata->handle_watch_reset();
    }
  };

  struct C_CommitPositionTask : public Context {
    JournalMetadata *journal_metadata;

    C_CommitPositionTask(JournalMetadata *_journal_metadata)
      : journal_metadata(_journal_metadata) {
      journal_metadata->m_async_op_tracker.start_op();
    }
    ~C_CommitPositionTask() override {
      journal_metadata->m_async_op_tracker.finish_op();
    }
    void finish(int r) override {
      std::lock_guard locker{journal_metadata->m_lock};
      journal_metadata->handle_commit_position_task();
    };
  };

  struct C_AioNotify : public Context {
    JournalMetadata* journal_metadata;
    Context *on_safe;

    C_AioNotify(JournalMetadata *_journal_metadata, Context *_on_safe)
      : journal_metadata(_journal_metadata), on_safe(_on_safe) {
      journal_metadata->m_async_op_tracker.start_op();
    }
    ~C_AioNotify() override {
      journal_metadata->m_async_op_tracker.finish_op();
    }
    void finish(int r) override {
      journal_metadata->handle_notified(r);
      if (on_safe != nullptr) {
        on_safe->complete(0);
      }
    }
  };

  struct C_NotifyUpdate : public Context {
    JournalMetadata* journal_metadata;
    Context *on_safe;

    C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL)
      : journal_metadata(_journal_metadata), on_safe(_on_safe) {
      journal_metadata->m_async_op_tracker.start_op();
    }
    ~C_NotifyUpdate() override {
      journal_metadata->m_async_op_tracker.finish_op();
    }
    void finish(int r) override {
      if (r == 0) {
        journal_metadata->async_notify_update(on_safe);
        return;
      }
      if (on_safe != NULL) {
        on_safe->complete(r);
      }
    }
  };

  struct C_ImmutableMetadata : public Context {
    JournalMetadata* journal_metadata;
    Context *on_finish;

    C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
      : journal_metadata(_journal_metadata), on_finish(_on_finish) {
      std::lock_guard locker{journal_metadata->m_lock};
      journal_metadata->m_async_op_tracker.start_op();
    }
    ~C_ImmutableMetadata() override {
      journal_metadata->m_async_op_tracker.finish_op();
    }
    void finish(int r) override {
      journal_metadata->handle_immutable_metadata(r, on_finish);
    }
  };

  struct C_Refresh : public Context {
    JournalMetadata* journal_metadata;
    uint64_t minimum_set;
    uint64_t active_set;
    RegisteredClients registered_clients;

    C_Refresh(JournalMetadata *_journal_metadata)
      : journal_metadata(_journal_metadata), minimum_set(0), active_set(0) {
      std::lock_guard locker{journal_metadata->m_lock};
      journal_metadata->m_async_op_tracker.start_op();
    }
    ~C_Refresh() override {
      journal_metadata->m_async_op_tracker.finish_op();
    }
    void finish(int r) override {
      journal_metadata->handle_refresh_complete(this, r);
    }
  };

  librados::IoCtx m_ioctx;
  CephContext *m_cct = nullptr;
  std::string m_oid;
  std::string m_client_id;
  Settings m_settings;

  uint8_t m_order = 0;
  uint8_t m_splay_width = 0;
  int64_t m_pool_id = -1;
  bool m_initialized = false;

  ContextWQ *m_work_queue;
  SafeTimer *m_timer;
  ceph::mutex *m_timer_lock;

  mutable ceph::mutex m_lock = ceph::make_mutex("JournalMetadata::m_lock");

  uint64_t m_commit_tid = 0;
  CommitTids m_pending_commit_tids;

  Listeners m_listeners;

  C_WatchCtx m_watch_ctx;
  uint64_t m_watch_handle = 0;

  uint64_t m_minimum_set = 0;
  uint64_t m_active_set = 0;
  RegisteredClients m_registered_clients;
  Client m_client;

  AllocatedEntryTids m_allocated_entry_tids;

  size_t m_update_notifications = 0;
  ceph::condition_variable m_update_cond;

  size_t m_ignore_watch_notifies = 0;
  size_t m_refreshes_in_progress = 0;
  Contexts m_refresh_ctxs;

  uint64_t m_commit_position_tid = 0;
  ObjectSetPosition m_commit_position;
  Context *m_commit_position_ctx = nullptr;
  Context *m_commit_position_task_ctx = nullptr;

  size_t m_flush_commits_in_progress = 0;
  Contexts m_flush_commit_position_ctxs;

  AsyncOpTracker m_async_op_tracker;

  void handle_immutable_metadata(int r, Context *on_init);

  void refresh(Context *on_finish);
  void handle_refresh_complete(C_Refresh *refresh, int r);

  void cancel_commit_task();
  void schedule_commit_task();
  void handle_commit_position_task();

  void schedule_watch_reset();
  void handle_watch_reset();
  void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
  void handle_watch_error(int err);
  void handle_notified(int r);

  void schedule_laggy_clients_disconnect(Context *on_finish);

  friend std::ostream &operator<<(std::ostream &os,
				  const JournalMetadata &journal_metadata);
};

std::ostream &operator<<(std::ostream &os,
			 const JournalMetadata::RegisteredClients &clients);

std::ostream &operator<<(std::ostream &os,
			 const JournalMetadata &journal_metadata);

} // namespace journal

#endif // CEPH_JOURNAL_JOURNAL_METADATA_H