1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_LIBRBD_WATCHER_H
#define CEPH_LIBRBD_WATCHER_H
#include "common/AsyncOpTracker.h"
#include "common/ceph_mutex.h"
#include "common/RWLock.h"
#include "include/rados/librados.hpp"
#include "librbd/watcher/Notifier.h"
#include "librbd/watcher/Types.h"
#include <string>
#include <utility>
namespace librbd {
namespace asio { struct ContextWQ; }
namespace watcher { struct NotifyResponse; }
class Watcher {
public:
struct C_NotifyAck : public Context {
Watcher *watcher;
CephContext *cct;
uint64_t notify_id;
uint64_t handle;
bufferlist out;
C_NotifyAck(Watcher *watcher, uint64_t notify_id, uint64_t handle);
void finish(int r) override;
};
Watcher(librados::IoCtx& ioctx, asio::ContextWQ *work_queue,
const std::string& oid);
virtual ~Watcher();
void register_watch(Context *on_finish);
virtual void unregister_watch(Context *on_finish);
void flush(Context *on_finish);
bool notifications_blocked() const;
virtual void block_notifies(Context *on_finish);
void unblock_notifies();
std::string get_oid() const;
void set_oid(const string& oid);
uint64_t get_watch_handle() const {
std::shared_lock watch_locker{m_watch_lock};
return m_watch_handle;
}
bool is_registered() const {
std::shared_lock locker{m_watch_lock};
return is_registered(m_watch_lock);
}
bool is_unregistered() const {
std::shared_lock locker{m_watch_lock};
return is_unregistered(m_watch_lock);
}
bool is_blocklisted() const {
std::shared_lock locker{m_watch_lock};
return m_watch_blocklisted;
}
protected:
enum WatchState {
WATCH_STATE_IDLE,
WATCH_STATE_REGISTERING,
WATCH_STATE_REWATCHING
};
librados::IoCtx& m_ioctx;
asio::ContextWQ *m_work_queue;
std::string m_oid;
CephContext *m_cct;
mutable ceph::shared_mutex m_watch_lock;
uint64_t m_watch_handle;
watcher::Notifier m_notifier;
WatchState m_watch_state;
bool m_watch_blocklisted = false;
AsyncOpTracker m_async_op_tracker;
bool is_registered(const ceph::shared_mutex&) const {
return (m_watch_state == WATCH_STATE_IDLE && m_watch_handle != 0);
}
bool is_unregistered(const ceph::shared_mutex&) const {
return (m_watch_state == WATCH_STATE_IDLE && m_watch_handle == 0);
}
void send_notify(bufferlist &payload,
watcher::NotifyResponse *response = nullptr,
Context *on_finish = nullptr);
virtual void handle_notify(uint64_t notify_id, uint64_t handle,
uint64_t notifier_id, bufferlist &bl) = 0;
virtual void handle_error(uint64_t cookie, int err);
void acknowledge_notify(uint64_t notify_id, uint64_t handle,
bufferlist &out);
virtual void handle_rewatch_complete(int r) { }
private:
/**
* @verbatim
*
* <start>
* |
* v
* UNREGISTERED
* |
* | (register_watch)
* |
* REGISTERING
* |
* v (watch error)
* REGISTERED * * * * * * * > ERROR
* | ^ |
* | | | (rewatch)
* | | v
* | | REWATCHING
* | | |
* | | |
* | \---------------------/
* |
* | (unregister_watch)
* |
* v
* UNREGISTERED
* |
* v
* <finish>
*
* @endverbatim
*/
struct WatchCtx : public librados::WatchCtx2 {
Watcher &watcher;
WatchCtx(Watcher &parent) : watcher(parent) {}
void handle_notify(uint64_t notify_id,
uint64_t handle,
uint64_t notifier_id,
bufferlist& bl) override;
void handle_error(uint64_t handle, int err) override;
};
struct C_RegisterWatch : public Context {
Watcher *watcher;
Context *on_finish;
C_RegisterWatch(Watcher *watcher, Context *on_finish)
: watcher(watcher), on_finish(on_finish) {
}
void finish(int r) override {
watcher->handle_register_watch(r, on_finish);
}
};
WatchCtx m_watch_ctx;
Context *m_unregister_watch_ctx = nullptr;
bool m_watch_error = false;
uint32_t m_blocked_count = 0;
void handle_register_watch(int r, Context *on_finish);
void rewatch();
void handle_rewatch(int r);
void handle_rewatch_callback(int r);
};
} // namespace librbd
#endif // CEPH_LIBRBD_WATCHER_H
|