1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_LIBRBD_JOURNAL_H
#define CEPH_LIBRBD_JOURNAL_H
#include "include/int_types.h"
#include "include/Context.h"
#include "include/interval_set.h"
#include "include/rados/librados_fwd.hpp"
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/WorkQueue.h"
#include "journal/Future.h"
#include "journal/JournalMetadataListener.h"
#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
#include "librbd/Utils.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
#include <algorithm>
#include <list>
#include <string>
#include <atomic>
#include <unordered_map>
class SafeTimer;
namespace journal {
class Journaler;
}
namespace librbd {
class ImageCtx;
namespace journal { template <typename> class Replay; }
template <typename ImageCtxT = ImageCtx>
class Journal {
public:
/**
* @verbatim
*
* <start>
* |
* v
* UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
* | * . ^ * . * |
* | * . | * . * |
* | * . | (error) * . . . . . . . * |
* | * . | * . * |
* | * . | v . * |
* | * . | FLUSHING_RESTART . * |
* | * . | | . * |
* | * . | | . * |
* | * . | v . * v
* | * . | RESTARTING < * * * * * STOPPING
* | * . | | . |
* | * . | | . |
* | * * * * * * . \-------------/ . |
* | * (error) . . |
* | * . . . . . . . . . . . . . . . . |
* | * . . |
* | v v v |
* | CLOSED <----- CLOSING <---------------------------------------/
* | |
* | v
* \---> <finish>
*
* @endverbatim
*/
enum State {
STATE_UNINITIALIZED,
STATE_INITIALIZING,
STATE_REPLAYING,
STATE_FLUSHING_RESTART,
STATE_RESTARTING_REPLAY,
STATE_FLUSHING_REPLAY,
STATE_READY,
STATE_STOPPING,
STATE_CLOSING,
STATE_CLOSED
};
static const std::string IMAGE_CLIENT_ID;
static const std::string LOCAL_MIRROR_UUID;
static const std::string ORPHAN_MIRROR_UUID;
Journal(ImageCtxT &image_ctx);
~Journal();
static bool is_journal_supported(ImageCtxT &image_ctx);
static int create(librados::IoCtx &io_ctx, const std::string &image_id,
uint8_t order, uint8_t splay_width,
const std::string &object_pool);
static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
static int reset(librados::IoCtx &io_ctx, const std::string &image_id);
static void is_tag_owner(ImageCtxT *image_ctx, bool *is_tag_owner,
Context *on_finish);
static void is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
bool *is_tag_owner, ContextWQ *op_work_queue,
Context *on_finish);
static void get_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
std::string *mirror_uuid,
ContextWQ *op_work_queue, Context *on_finish);
static int request_resync(ImageCtxT *image_ctx);
static void promote(ImageCtxT *image_ctx, Context *on_finish);
static void demote(ImageCtxT *image_ctx, Context *on_finish);
bool is_journal_ready() const;
bool is_journal_replaying() const;
bool is_journal_appending() const;
void wait_for_journal_ready(Context *on_ready);
void open(Context *on_finish);
void close(Context *on_finish);
bool is_tag_owner() const;
uint64_t get_tag_tid() const;
journal::TagData get_tag_data() const;
void allocate_local_tag(Context *on_finish);
void allocate_tag(const std::string &mirror_uuid,
const journal::TagPredecessor &predecessor,
Context *on_finish);
void flush_commit_position(Context *on_finish);
void user_flushed();
uint64_t append_write_event(uint64_t offset, size_t length,
const bufferlist &bl,
bool flush_entry);
uint64_t append_io_event(journal::EventEntry &&event_entry,
uint64_t offset, size_t length,
bool flush_entry, int filter_ret_val);
void commit_io_event(uint64_t tid, int r);
void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
int r);
void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry,
Context *on_safe);
void commit_op_event(uint64_t tid, int r, Context *on_safe);
void replay_op_ready(uint64_t op_tid, Context *on_resume);
void flush_event(uint64_t tid, Context *on_safe);
void wait_event(uint64_t tid, Context *on_safe);
uint64_t allocate_op_tid() {
uint64_t op_tid = ++m_op_tid;
ceph_assert(op_tid != 0);
return op_tid;
}
void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
Context *on_start);
void stop_external_replay();
void add_listener(journal::Listener *listener);
void remove_listener(journal::Listener *listener);
int is_resync_requested(bool *do_resync);
inline ContextWQ *get_work_queue() {
return m_work_queue;
}
private:
ImageCtxT &m_image_ctx;
// mock unit testing support
typedef journal::TypeTraits<ImageCtxT> TypeTraits;
typedef typename TypeTraits::Journaler Journaler;
typedef typename TypeTraits::Future Future;
typedef typename TypeTraits::ReplayEntry ReplayEntry;
typedef std::list<bufferlist> Bufferlists;
typedef std::list<Context *> Contexts;
typedef std::list<Future> Futures;
typedef interval_set<uint64_t> ExtentInterval;
struct Event {
Futures futures;
Contexts on_safe_contexts;
ExtentInterval pending_extents;
int filter_ret_val = 0;
bool committed_io = false;
bool safe = false;
int ret_val = 0;
Event() {
}
Event(const Futures &_futures, uint64_t offset, size_t length,
int filter_ret_val)
: futures(_futures), filter_ret_val(filter_ret_val) {
if (length > 0) {
pending_extents.insert(offset, length);
}
}
};
typedef std::unordered_map<uint64_t, Event> Events;
typedef std::unordered_map<uint64_t, Future> TidToFutures;
struct C_IOEventSafe : public Context {
Journal *journal;
uint64_t tid;
C_IOEventSafe(Journal *_journal, uint64_t _tid)
: journal(_journal), tid(_tid) {
}
void finish(int r) override {
journal->handle_io_event_safe(r, tid);
}
};
struct C_OpEventSafe : public Context {
Journal *journal;
uint64_t tid;
Future op_start_future;
Future op_finish_future;
Context *on_safe;
C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future, Context *on_safe)
: journal(journal), tid(tid), op_start_future(op_start_future),
op_finish_future(op_finish_future), on_safe(on_safe) {
}
void finish(int r) override {
journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future,
on_safe);
}
};
struct C_ReplayProcessSafe : public Context {
Journal *journal;
ReplayEntry replay_entry;
C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) :
journal(journal), replay_entry(std::move(replay_entry)) {
}
void finish(int r) override {
journal->handle_replay_process_safe(replay_entry, r);
}
};
struct ReplayHandler : public ::journal::ReplayHandler {
Journal *journal;
ReplayHandler(Journal *_journal) : journal(_journal) {
}
void get() override {
// TODO
}
void put() override {
// TODO
}
void handle_entries_available() override {
journal->handle_replay_ready();
}
void handle_complete(int r) override {
journal->handle_replay_complete(r);
}
};
ContextWQ *m_work_queue = nullptr;
SafeTimer *m_timer = nullptr;
Mutex *m_timer_lock = nullptr;
Journaler *m_journaler;
mutable Mutex m_lock;
State m_state;
uint64_t m_max_append_size = 0;
uint64_t m_tag_class = 0;
uint64_t m_tag_tid = 0;
journal::ImageClientMeta m_client_meta;
journal::TagData m_tag_data;
int m_error_result;
Contexts m_wait_for_state_contexts;
ReplayHandler m_replay_handler;
bool m_close_pending;
Mutex m_event_lock;
uint64_t m_event_tid;
Events m_events;
std::atomic<bool> m_user_flushed = false;
std::atomic<uint64_t> m_op_tid = { 0 };
TidToFutures m_op_futures;
bool m_processing_entry = false;
bool m_blocking_writes;
journal::Replay<ImageCtxT> *m_journal_replay;
util::AsyncOpTracker m_async_journal_op_tracker;
struct MetadataListener : public ::journal::JournalMetadataListener {
Journal<ImageCtxT> *journal;
MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { }
void handle_update(::journal::JournalMetadata *) override {
FunctionContext *ctx = new FunctionContext([this](int r) {
journal->handle_metadata_updated();
});
journal->m_work_queue->queue(ctx, 0);
}
} m_metadata_listener;
typedef std::set<journal::Listener *> Listeners;
Listeners m_listeners;
Cond m_listener_cond;
bool m_listener_notify = false;
uint64_t m_refresh_sequence = 0;
bool is_journal_replaying(const Mutex &) const;
bool is_tag_owner(const Mutex &) const;
uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
uint64_t offset, size_t length, bool flush_entry,
int filter_ret_val);
Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
void create_journaler();
void destroy_journaler(int r);
void recreate_journaler(int r);
void complete_event(typename Events::iterator it, int r);
void start_append();
void handle_open(int r);
void handle_replay_ready();
void handle_replay_complete(int r);
void handle_replay_process_ready(int r);
void handle_replay_process_safe(ReplayEntry replay_entry, int r);
void handle_start_external_replay(int r,
journal::Replay<ImageCtxT> **journal_replay,
Context *on_finish);
void handle_flushing_restart(int r);
void handle_flushing_replay();
void handle_recording_stopped(int r);
void handle_journal_destroyed(int r);
void handle_io_event_safe(int r, uint64_t tid);
void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future, Context *on_safe);
void stop_recording();
void transition_state(State state, int r);
bool is_steady_state() const;
void wait_for_steady_state(Context *on_state);
int check_resync_requested(bool *do_resync);
void handle_metadata_updated();
void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid,
journal::TagData tag_data, int r);
};
} // namespace librbd
extern template class librbd::Journal<librbd::ImageCtx>;
#endif // CEPH_LIBRBD_JOURNAL_H
|