summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_mirror/image_replayer/journal/Replayer.h
blob: 6b1f36d9c7fe9d9f699667fb1e13d1a6041aba78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H

#include "tools/rbd_mirror/image_replayer/Replayer.h"
#include "include/utime.h"
#include "common/AsyncOpTracker.h"
#include "common/ceph_mutex.h"
#include "common/RefCountedObj.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/ReplayEntry.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
#include <string>
#include <type_traits>

namespace journal { class Journaler; }
namespace librbd {

struct ImageCtx;
namespace journal { template <typename I> class Replay; }

} // namespace librbd

namespace rbd {
namespace mirror {

template <typename> struct Threads;

namespace image_replayer {

struct ReplayerListener;

namespace journal {

template <typename> class EventPreprocessor;
template <typename> class ReplayStatusFormatter;
template <typename> class StateBuilder;

template <typename ImageCtxT>
class Replayer : public image_replayer::Replayer {
public:
  typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;

  static Replayer* create(
      Threads<ImageCtxT>* threads,
      const std::string& local_mirror_uuid,
      StateBuilder<ImageCtxT>* state_builder,
      ReplayerListener* replayer_listener) {
    return new Replayer(threads, local_mirror_uuid, state_builder,
                        replayer_listener);
  }

  Replayer(
      Threads<ImageCtxT>* threads,
      const std::string& local_mirror_uuid,
      StateBuilder<ImageCtxT>* state_builder,
      ReplayerListener* replayer_listener);
  ~Replayer();

  void destroy() override {
    delete this;
  }

  void init(Context* on_finish) override;
  void shut_down(Context* on_finish) override;

  void flush(Context* on_finish) override;

  bool get_replay_status(std::string* description, Context* on_finish) override;

  bool is_replaying() const override {
    std::unique_lock locker{m_lock};
    return (m_state == STATE_REPLAYING);
  }

  bool is_resync_requested() const override {
    std::unique_lock locker(m_lock);
    return m_resync_requested;
  }

  int get_error_code() const override {
    std::unique_lock locker(m_lock);
    return m_error_code;
  }

  std::string get_error_description() const override {
    std::unique_lock locker(m_lock);
    return m_error_description;
  }

  std::string get_image_spec() const {
    std::unique_lock locker(m_lock);
    return m_image_spec;
  }

private:
  /**
   * @verbatim
   *
   *  <init>
   *    |
   *    v                     (error)
   * INIT_REMOTE_JOURNALER  * * * * * * * * * * * * * * * * * * *
   *    |                                                       *
   *    v                     (error)                           *
   * START_EXTERNAL_REPLAY  * * * * * * * * * * * * * * * * * * *
   *    |                                                       *
   *    |  /--------------------------------------------\       *
   *    |  |                                            |       *
   *    v  v   (asok flush)                             |       *
   * REPLAYING -------------> LOCAL_REPLAY_FLUSH        |       *
   *    |       \                 |                     |       *
   *    |       |                 v                     |       *
   *    |       |             FLUSH_COMMIT_POSITION     |       *
   *    |       |                 |                     |       *
   *    |       |                 \--------------------/|       *
   *    |       |                                       |       *
   *    |       | (entries available)                   |       *
   *    |       \-----------> REPLAY_READY              |       *
   *    |                         |                     |       *
   *    |                         | (skip if not        |       *
   *    |                         v  needed)        (error)     *
   *    |                     REPLAY_FLUSH  * * * * * * * * *   *
   *    |                         |                     |   *   *
   *    |                         | (skip if not        |   *   *
   *    |                         v  needed)        (error) *   *
   *    |                     GET_REMOTE_TAG  * * * * * * * *   *
   *    |                         |                     |   *   *
   *    |                         | (skip if not        |   *   *
   *    |                         v  needed)        (error) *   *
   *    |                     ALLOCATE_LOCAL_TAG  * * * * * *   *
   *    |                         |                     |   *   *
   *    |                         v                 (error) *   *
   *    |                     PREPROCESS_ENTRY  * * * * * * *   *
   *    |                         |                     |   *   *
   *    |                         v                 (error) *   *
   *    |                     PROCESS_ENTRY * * * * * * * * *   *
   *    |                         |                     |   *   *
   *    |                         \---------------------/   *   *
   *    v (shutdown)                                        *   *
   * REPLAY_COMPLETE  < * * * * * * * * * * * * * * * * * * *   *
   *    |                                                       *
   *    v                                                       *
   * WAIT_FOR_FLUSH                                             *
   *    |                                                       *
   *    v                                                       *
   * SHUT_DOWN_LOCAL_JOURNAL_REPLAY                             *
   *    |                                                       *
   *    v                                                       *
   * WAIT_FOR_REPLAY                                            *
   *    |                                                       *
   *    v                                                       *
   * CLOSE_LOCAL_IMAGE  < * * * * * * * * * * * * * * * * * * * *
   *    |
   *    v (skip if not started)
   * STOP_REMOTE_JOURNALER_REPLAY
   *    |
   *    v
   * WAIT_FOR_IN_FLIGHT_OPS
   *    |
   *    v
   * <shutdown>
   *
   * @endverbatim
   */

  typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;

  enum State {
    STATE_INIT,
    STATE_REPLAYING,
    STATE_COMPLETE
  };

  struct C_ReplayCommitted;
  struct RemoteJournalerListener;
  struct RemoteReplayHandler;
  struct LocalJournalListener;

  Threads<ImageCtxT>* m_threads;
  std::string m_local_mirror_uuid;
  StateBuilder<ImageCtxT>* m_state_builder;
  ReplayerListener* m_replayer_listener;

  mutable ceph::mutex m_lock;

  std::string m_image_spec;
  Context* m_on_init_shutdown = nullptr;

  State m_state = STATE_INIT;
  int m_error_code = 0;
  std::string m_error_description;
  bool m_resync_requested = false;

  ceph::ref_t<typename std::remove_pointer<decltype(ImageCtxT::journal)>::type>
    m_local_journal;
  RemoteJournalerListener* m_remote_listener = nullptr;

  librbd::journal::Replay<ImageCtxT>* m_local_journal_replay = nullptr;
  EventPreprocessor<ImageCtxT>* m_event_preprocessor = nullptr;
  ReplayStatusFormatter<ImageCtxT>* m_replay_status_formatter = nullptr;
  RemoteReplayHandler* m_remote_replay_handler = nullptr;
  LocalJournalListener* m_local_journal_listener = nullptr;

  PerfCounters *m_perf_counters = nullptr;

  ReplayEntry m_replay_entry;
  uint64_t m_replay_bytes = 0;
  utime_t m_replay_start_time;
  bool m_replay_tag_valid = false;
  uint64_t m_replay_tag_tid = 0;
  cls::journal::Tag m_replay_tag;
  librbd::journal::TagData m_replay_tag_data;
  librbd::journal::EventEntry m_event_entry;

  AsyncOpTracker m_flush_tracker;

  AsyncOpTracker m_event_replay_tracker;
  Context *m_delayed_preprocess_task = nullptr;

  AsyncOpTracker m_in_flight_op_tracker;
  Context *m_flush_local_replay_task = nullptr;

  void handle_remote_journal_metadata_updated();

  void schedule_flush_local_replay_task();
  void cancel_flush_local_replay_task();
  void handle_flush_local_replay_task(int r);

  void flush_local_replay(Context* on_flush);
  void handle_flush_local_replay(Context* on_flush, int r);

  void flush_commit_position(Context* on_flush);
  void handle_flush_commit_position(Context* on_flush, int r);

  void init_remote_journaler();
  void handle_init_remote_journaler(int r);

  void start_external_replay(std::unique_lock<ceph::mutex>& locker);
  void handle_start_external_replay(int r);

  bool add_local_journal_listener(std::unique_lock<ceph::mutex>& locker);

  bool notify_init_complete(std::unique_lock<ceph::mutex>& locker);

  void wait_for_flush();
  void handle_wait_for_flush(int r);

  void shut_down_local_journal_replay();
  void handle_shut_down_local_journal_replay(int r);

  void wait_for_event_replay();
  void handle_wait_for_event_replay(int r);

  void close_local_image();
  void handle_close_local_image(int r);

  void stop_remote_journaler_replay();
  void handle_stop_remote_journaler_replay(int r);

  void wait_for_in_flight_ops();
  void handle_wait_for_in_flight_ops(int r);

  void replay_flush();
  void handle_replay_flush_shut_down(int r);
  void handle_replay_flush(int r);

  void get_remote_tag();
  void handle_get_remote_tag(int r);

  void allocate_local_tag();
  void handle_allocate_local_tag(int r);

  void handle_replay_error(int r, const std::string &error);

  bool is_replay_complete() const;
  bool is_replay_complete(const std::unique_lock<ceph::mutex>& locker) const;

  void handle_replay_complete(int r, const std::string &error_desc);
  void handle_replay_complete(const std::unique_lock<ceph::mutex>&,
                              int r, const std::string &error_desc);
  void handle_replay_ready();
  void handle_replay_ready(std::unique_lock<ceph::mutex>& locker);

  void preprocess_entry();
  void handle_delayed_preprocess_task(int r);
  void handle_preprocess_entry_ready(int r);
  void handle_preprocess_entry_safe(int r);

  void process_entry();
  void handle_process_entry_ready(int r);
  void handle_process_entry_safe(const ReplayEntry& replay_entry,
                                 uint64_t relay_bytes,
                                 const utime_t &replay_start_time, int r);

  void handle_resync_image();

  void notify_status_updated();

  void cancel_delayed_preprocess_task();

  int validate_remote_client_state(
      const cls::journal::Client& remote_client,
      librbd::journal::MirrorPeerClientMeta* remote_client_meta,
      bool* resync_requested, std::string* error);

  void register_perf_counters();
  void unregister_perf_counters();

};

} // namespace journal
} // namespace image_replayer
} // namespace mirror
} // namespace rbd

extern template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;

#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H