1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_RBD_MIRROR_IMAGE_MAP_H
#define CEPH_RBD_MIRROR_IMAGE_MAP_H
#include <vector>
#include "common/ceph_mutex.h"
#include "include/Context.h"
#include "common/AsyncOpTracker.h"
#include "cls/rbd/cls_rbd_types.h"
#include "include/rados/librados.hpp"
#include "image_map/Policy.h"
#include "image_map/Types.h"
namespace librbd { class ImageCtx; }
namespace rbd {
namespace mirror {
template <typename> struct Threads;
template <typename ImageCtxT = librbd::ImageCtx>
class ImageMap {
public:
static ImageMap *create(librados::IoCtx &ioctx, Threads<ImageCtxT> *threads,
const std::string& instance_id,
image_map::Listener &listener) {
return new ImageMap(ioctx, threads, instance_id, listener);
}
~ImageMap();
// init (load) the instance map from disk
void init(Context *on_finish);
// shut down map operations
void shut_down(Context *on_finish);
// update (add/remove) images
void update_images(const std::string &peer_uuid,
std::set<std::string> &&added_global_image_ids,
std::set<std::string> &&removed_global_image_ids);
// add/remove instances
void update_instances_added(const std::vector<std::string> &instances);
void update_instances_removed(const std::vector<std::string> &instances);
private:
struct C_NotifyInstance;
ImageMap(librados::IoCtx &ioctx, Threads<ImageCtxT> *threads,
const std::string& instance_id, image_map::Listener &listener);
struct Update {
std::string global_image_id;
std::string instance_id;
utime_t mapped_time;
Update(const std::string &global_image_id, const std::string &instance_id,
utime_t mapped_time)
: global_image_id(global_image_id),
instance_id(instance_id),
mapped_time(mapped_time) {
}
Update(const std::string &global_image_id, const std::string &instance_id)
: Update(global_image_id, instance_id, ceph_clock_now()) {
}
friend std::ostream& operator<<(std::ostream& os,
const Update& update) {
os << "{global_image_id=" << update.global_image_id << ", "
<< "instance_id=" << update.instance_id << "}";
return os;
}
};
typedef std::list<Update> Updates;
// Lock ordering: m_threads->timer_lock, m_lock
librados::IoCtx &m_ioctx;
Threads<ImageCtxT> *m_threads;
std::string m_instance_id;
image_map::Listener &m_listener;
std::unique_ptr<image_map::Policy> m_policy; // our mapping policy
Context *m_timer_task = nullptr;
ceph::mutex m_lock;
bool m_shutting_down = false;
AsyncOpTracker m_async_op_tracker;
// global_image_id -> registered peers ("" == local, remote otherwise)
std::map<std::string, std::set<std::string> > m_peer_map;
std::set<std::string> m_global_image_ids;
Context *m_rebalance_task = nullptr;
struct C_LoadMap : Context {
ImageMap *image_map;
Context *on_finish;
std::map<std::string, cls::rbd::MirrorImageMap> image_mapping;
C_LoadMap(ImageMap *image_map, Context *on_finish)
: image_map(image_map),
on_finish(on_finish) {
}
void finish(int r) override {
if (r == 0) {
image_map->handle_load(image_mapping);
}
image_map->finish_async_op();
on_finish->complete(r);
}
};
// async op-tracker helper routines
void start_async_op() {
m_async_op_tracker.start_op();
}
void finish_async_op() {
m_async_op_tracker.finish_op();
}
void wait_for_async_ops(Context *on_finish) {
m_async_op_tracker.wait_for_ops(on_finish);
}
void handle_peer_ack(const std::string &global_image_id, int r);
void handle_peer_ack_remove(const std::string &global_image_id, int r);
void handle_load(const std::map<std::string, cls::rbd::MirrorImageMap> &image_mapping);
void handle_update_request(const Updates &updates,
const std::set<std::string> &remove_global_image_ids, int r);
// continue (retry or resume depending on state machine) processing
// current action.
void continue_action(const std::set<std::string> &global_image_ids, int r);
// schedule an image for update
void schedule_action(const std::string &global_image_id);
void schedule_update_task();
void schedule_update_task(const ceph::mutex &timer_lock);
void process_updates();
void update_image_mapping(Updates&& map_updates,
std::set<std::string>&& map_removals);
void rebalance();
void schedule_rebalance_task();
void notify_listener_acquire_release_images(const Updates &acquire, const Updates &release);
void notify_listener_remove_images(const std::string &peer_uuid, const Updates &remove);
void update_images_added(const std::string &peer_uuid,
const std::set<std::string> &global_image_ids);
void update_images_removed(const std::string &peer_uuid,
const std::set<std::string> &global_image_ids);
void filter_instance_ids(const std::vector<std::string> &instance_ids,
std::vector<std::string> *filtered_instance_ids,
bool removal) const;
};
} // namespace mirror
} // namespace rbd
#endif // CEPH_RBD_MIRROR_IMAGE_MAP_H
|