1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
#define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
#include "common/AsyncOpTracker.h"
#include "common/ceph_mutex.h"
#include "include/rados/librados.hpp"
#include "cls/rbd/cls_rbd_types.h"
#include "ProgressContext.h"
#include "tools/rbd_mirror/Types.h"
#include "tools/rbd_mirror/image_replayer/Types.h"
#include <boost/optional.hpp>
#include <string>
class AdminSocketHook;
namespace journal { struct CacheManagerHandler; }
namespace librbd { class ImageCtx; }
namespace rbd {
namespace mirror {
template <typename> struct InstanceWatcher;
template <typename> struct MirrorStatusUpdater;
struct PoolMetaCache;
template <typename> struct Threads;
namespace image_replayer {
class Replayer;
template <typename> class BootstrapRequest;
template <typename> class StateBuilder;
} // namespace image_replayer
/**
* Replays changes from a remote cluster for a single image.
*/
template <typename ImageCtxT = librbd::ImageCtx>
class ImageReplayer {
public:
static ImageReplayer *create(
librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
const std::string &global_image_id, Threads<ImageCtxT> *threads,
InstanceWatcher<ImageCtxT> *instance_watcher,
MirrorStatusUpdater<ImageCtxT>* local_status_updater,
journal::CacheManagerHandler *cache_manager_handler,
PoolMetaCache* pool_meta_cache) {
return new ImageReplayer(local_io_ctx, local_mirror_uuid, global_image_id,
threads, instance_watcher, local_status_updater,
cache_manager_handler, pool_meta_cache);
}
void destroy() {
delete this;
}
ImageReplayer(librados::IoCtx &local_io_ctx,
const std::string &local_mirror_uuid,
const std::string &global_image_id,
Threads<ImageCtxT> *threads,
InstanceWatcher<ImageCtxT> *instance_watcher,
MirrorStatusUpdater<ImageCtxT>* local_status_updater,
journal::CacheManagerHandler *cache_manager_handler,
PoolMetaCache* pool_meta_cache);
virtual ~ImageReplayer();
ImageReplayer(const ImageReplayer&) = delete;
ImageReplayer& operator=(const ImageReplayer&) = delete;
bool is_stopped() { std::lock_guard l{m_lock}; return is_stopped_(); }
bool is_running() { std::lock_guard l{m_lock}; return is_running_(); }
bool is_replaying() { std::lock_guard l{m_lock}; return is_replaying_(); }
std::string get_name() { std::lock_guard l{m_lock}; return m_image_spec; };
void set_state_description(int r, const std::string &desc);
// TODO temporary until policy handles release of image replayers
inline bool is_finished() const {
std::lock_guard locker{m_lock};
return m_finished;
}
inline void set_finished(bool finished) {
std::lock_guard locker{m_lock};
m_finished = finished;
}
inline bool is_blocklisted() const {
std::lock_guard locker{m_lock};
return (m_last_r == -EBLOCKLISTED);
}
image_replayer::HealthState get_health_state() const;
void add_peer(const Peer<ImageCtxT>& peer);
inline int64_t get_local_pool_id() const {
return m_local_io_ctx.get_id();
}
inline const std::string& get_global_image_id() const {
return m_global_image_id;
}
void start(Context *on_finish, bool manual = false, bool restart = false);
void stop(Context *on_finish, bool manual = false, bool restart = false);
void restart(Context *on_finish = nullptr);
void flush();
void print_status(Formatter *f);
protected:
/**
* @verbatim
* (error)
* <uninitialized> <------------------------------------ FAIL
* | ^
* v *
* <starting> *
* | *
* v (error) *
* BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
* | *
* v (error) *
* START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
* |
* v
* REPLAYING
* |
* v
* JOURNAL_REPLAY_SHUT_DOWN
* |
* v
* LOCAL_IMAGE_CLOSE
* |
* v
* <stopped>
*
* @endverbatim
*/
void on_start_fail(int r, const std::string &desc);
bool on_start_interrupted();
bool on_start_interrupted(ceph::mutex& lock);
void on_stop_journal_replay(int r = 0, const std::string &desc = "");
bool on_replay_interrupted();
private:
typedef std::set<Peer<ImageCtxT>> Peers;
typedef std::list<Context *> Contexts;
enum State {
STATE_UNKNOWN,
STATE_STARTING,
STATE_REPLAYING,
STATE_STOPPING,
STATE_STOPPED,
};
struct ReplayerListener;
typedef boost::optional<State> OptionalState;
typedef boost::optional<cls::rbd::MirrorImageStatusState>
OptionalMirrorImageStatusState;
class BootstrapProgressContext : public ProgressContext {
public:
BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
replayer(replayer) {
}
void update_progress(const std::string &description,
bool flush = true) override;
private:
ImageReplayer<ImageCtxT> *replayer;
};
librados::IoCtx &m_local_io_ctx;
std::string m_local_mirror_uuid;
std::string m_global_image_id;
Threads<ImageCtxT> *m_threads;
InstanceWatcher<ImageCtxT> *m_instance_watcher;
MirrorStatusUpdater<ImageCtxT>* m_local_status_updater;
journal::CacheManagerHandler *m_cache_manager_handler;
PoolMetaCache* m_pool_meta_cache;
Peers m_peers;
Peer<ImageCtxT> m_remote_image_peer;
std::string m_local_image_name;
std::string m_image_spec;
mutable ceph::mutex m_lock;
State m_state = STATE_STOPPED;
std::string m_state_desc;
OptionalMirrorImageStatusState m_mirror_image_status_state =
boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
int m_last_r = 0;
BootstrapProgressContext m_progress_cxt;
bool m_finished = false;
bool m_delete_in_progress = false;
bool m_delete_requested = false;
bool m_resync_requested = false;
bool m_restart_requested = false;
bool m_status_removed = false;
image_replayer::StateBuilder<ImageCtxT>* m_state_builder = nullptr;
image_replayer::Replayer* m_replayer = nullptr;
ReplayerListener* m_replayer_listener = nullptr;
Context *m_on_start_finish = nullptr;
Contexts m_on_stop_contexts;
bool m_stop_requested = false;
bool m_manual_stop = false;
AdminSocketHook *m_asok_hook = nullptr;
image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
AsyncOpTracker m_in_flight_op_tracker;
Context* m_update_status_task = nullptr;
static std::string to_string(const State state);
bool is_stopped_() const {
return m_state == STATE_STOPPED;
}
bool is_running_() const {
return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
}
bool is_replaying_() const {
return (m_state == STATE_REPLAYING);
}
void schedule_update_mirror_image_replay_status();
void handle_update_mirror_image_replay_status(int r);
void cancel_update_mirror_image_replay_status();
void update_mirror_image_status(bool force, const OptionalState &state);
void set_mirror_image_status_update(bool force, const OptionalState &state);
void shut_down(int r);
void handle_shut_down(int r);
void bootstrap();
void handle_bootstrap(int r);
void start_replay();
void handle_start_replay(int r);
void handle_replayer_notification();
void register_admin_socket_hook();
void unregister_admin_socket_hook();
void reregister_admin_socket_hook();
void remove_image_status(bool force, Context *on_finish);
void remove_image_status_remote(bool force, Context *on_finish);
};
} // namespace mirror
} // namespace rbd
extern template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
#endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
|