diff options
Diffstat (limited to 'src/tools/cephfs_mirror/Watcher.h')
-rw-r--r-- | src/tools/cephfs_mirror/Watcher.h | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/src/tools/cephfs_mirror/Watcher.h b/src/tools/cephfs_mirror/Watcher.h new file mode 100644 index 000000000..9e7c54eeb --- /dev/null +++ b/src/tools/cephfs_mirror/Watcher.h @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_WATCHER_H +#define CEPHFS_MIRROR_WATCHER_H + +#include <string_view> + +#include "common/ceph_mutex.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" + +class ContextWQ; + +namespace cephfs { +namespace mirror { + +// generic watcher class -- establish watch on a given rados object +// and invoke handle_notify() when notified. On notify error, try +// to re-establish the watch. Errors during rewatch are notified via +// handle_rewatch_complete(). + +class Watcher { +public: + Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue); + virtual ~Watcher(); + + void register_watch(Context *on_finish); + void unregister_watch(Context *on_finish); + +protected: + std::string m_oid; + + void acknowledge_notify(uint64_t notify_if, uint64_t handle, bufferlist &bl); + + bool is_registered() const { + return m_state == STATE_IDLE && m_watch_handle != 0; + } + bool is_unregistered() const { + return m_state == STATE_IDLE && m_watch_handle == 0; + } + + virtual void handle_rewatch_complete(int r) { } + +private: + enum State { + STATE_IDLE, + STATE_REGISTERING, + STATE_REWATCHING + }; + + struct WatchCtx : public librados::WatchCtx2 { + Watcher &watcher; + + WatchCtx(Watcher &parent) : watcher(parent) {} + + void handle_notify(uint64_t notify_id, + uint64_t handle, + uint64_t notifier_id, + bufferlist& bl) override; + void handle_error(uint64_t handle, int err) override; + }; + + struct C_RegisterWatch : public Context { + Watcher *watcher; + Context *on_finish; + + C_RegisterWatch(Watcher *watcher, Context *on_finish) + : watcher(watcher), + on_finish(on_finish) { + } + + void finish(int r) override { + watcher->handle_register_watch(r, on_finish); + } + }; + + librados::IoCtx &m_ioctx; + ContextWQ *m_work_queue; + + mutable ceph::shared_mutex m_lock; + State m_state; + bool m_watch_error = false; + bool m_watch_blocklisted = false; + uint64_t m_watch_handle; + WatchCtx m_watch_ctx; + Context *m_unregister_watch_ctx = nullptr; + + virtual void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) = 0; + void handle_error(uint64_t handle, int err); + + void rewatch(); + void handle_rewatch(int r); + void handle_rewatch_callback(int r); + void handle_register_watch(int r, Context *on_finish); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_WATCHER_H |