summaryrefslogtreecommitdiffstats
path: root/src/librbd/watcher
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/watcher/Notifier.cc99
-rw-r--r--src/librbd/watcher/Notifier.h64
-rw-r--r--src/librbd/watcher/RewatchRequest.cc108
-rw-r--r--src/librbd/watcher/RewatchRequest.h75
-rw-r--r--src/librbd/watcher/Types.cc45
-rw-r--r--src/librbd/watcher/Types.h71
-rw-r--r--src/librbd/watcher/Utils.h74
7 files changed, 536 insertions, 0 deletions
diff --git a/src/librbd/watcher/Notifier.cc b/src/librbd/watcher/Notifier.cc
new file mode 100644
index 000000000..9a4134402
--- /dev/null
+++ b/src/librbd/watcher/Notifier.cc
@@ -0,0 +1,99 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/Notifier.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/watcher/Types.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::watcher::Notifier: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace watcher {
+
+const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
+
+Notifier::C_AioNotify::C_AioNotify(Notifier *notifier, NotifyResponse *response,
+ Context *on_finish)
+ : notifier(notifier), response(response), on_finish(on_finish) {
+}
+
+void Notifier::C_AioNotify::finish(int r) {
+ if (response != nullptr) {
+ if (r == 0 || r == -ETIMEDOUT) {
+ try {
+ auto it = out_bl.cbegin();
+ decode(*response, it);
+ } catch (const buffer::error &err) {
+ r = -EBADMSG;
+ }
+ }
+ }
+ notifier->handle_notify(r, on_finish);
+}
+
+Notifier::Notifier(asio::ContextWQ *work_queue, IoCtx &ioctx,
+ const std::string &oid)
+ : m_work_queue(work_queue), m_ioctx(ioctx), m_oid(oid),
+ m_aio_notify_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::object_watcher::Notifier::m_aio_notify_lock", this))) {
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+}
+
+Notifier::~Notifier() {
+ std::lock_guard aio_notify_locker{m_aio_notify_lock};
+ ceph_assert(m_pending_aio_notifies == 0);
+}
+
+void Notifier::flush(Context *on_finish) {
+ std::lock_guard aio_notify_locker{m_aio_notify_lock};
+ if (m_pending_aio_notifies == 0) {
+ m_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ m_aio_notify_flush_ctxs.push_back(on_finish);
+}
+
+void Notifier::notify(bufferlist &bl, NotifyResponse *response,
+ Context *on_finish) {
+ {
+ std::lock_guard aio_notify_locker{m_aio_notify_lock};
+ ++m_pending_aio_notifies;
+
+ ldout(m_cct, 20) << "pending=" << m_pending_aio_notifies << dendl;
+ }
+
+ C_AioNotify *ctx = new C_AioNotify(this, response, on_finish);
+ librados::AioCompletion *comp = util::create_rados_callback(ctx);
+ int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, &ctx->out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void Notifier::handle_notify(int r, Context *on_finish) {
+ ldout(m_cct, 20) << "r=" << r << dendl;
+
+ std::lock_guard aio_notify_locker{m_aio_notify_lock};
+ ceph_assert(m_pending_aio_notifies > 0);
+ --m_pending_aio_notifies;
+
+ ldout(m_cct, 20) << "pending=" << m_pending_aio_notifies << dendl;
+ if (m_pending_aio_notifies == 0) {
+ for (auto ctx : m_aio_notify_flush_ctxs) {
+ m_work_queue->queue(ctx, 0);
+ }
+ m_aio_notify_flush_ctxs.clear();
+ }
+
+ if (on_finish != nullptr) {
+ m_work_queue->queue(on_finish, r);
+ }
+}
+
+} // namespace watcher
+} // namespace librbd
diff --git a/src/librbd/watcher/Notifier.h b/src/librbd/watcher/Notifier.h
new file mode 100644
index 000000000..79546b505
--- /dev/null
+++ b/src/librbd/watcher/Notifier.h
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_NOTIFIER_H
+#define CEPH_LIBRBD_WATCHER_NOTIFIER_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/ceph_mutex.h"
+#include <list>
+
+namespace librbd {
+
+namespace asio { struct ContextWQ; }
+
+namespace watcher {
+
+struct NotifyResponse;
+
+class Notifier {
+public:
+ static const uint64_t NOTIFY_TIMEOUT;
+
+ Notifier(asio::ContextWQ *work_queue, librados::IoCtx &ioctx,
+ const std::string &oid);
+ ~Notifier();
+
+ void flush(Context *on_finish);
+ void notify(bufferlist &bl, NotifyResponse *response, Context *on_finish);
+
+private:
+ typedef std::list<Context*> Contexts;
+
+ struct C_AioNotify : public Context {
+ Notifier *notifier;
+ NotifyResponse *response;
+ Context *on_finish;
+ bufferlist out_bl;
+
+ C_AioNotify(Notifier *notifier, NotifyResponse *response,
+ Context *on_finish);
+
+ void finish(int r) override;
+ };
+
+ asio::ContextWQ *m_work_queue;
+ librados::IoCtx &m_ioctx;
+ CephContext *m_cct;
+ std::string m_oid;
+
+ ceph::mutex m_aio_notify_lock;
+ size_t m_pending_aio_notifies = 0;
+ Contexts m_aio_notify_flush_ctxs;
+
+ void handle_notify(int r, Context *on_finish);
+
+};
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_NOTIFIER_H
diff --git a/src/librbd/watcher/RewatchRequest.cc b/src/librbd/watcher/RewatchRequest.cc
new file mode 100644
index 000000000..b890cb3c5
--- /dev/null
+++ b/src/librbd/watcher/RewatchRequest.cc
@@ -0,0 +1,108 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/RewatchRequest.h"
+#include "common/ceph_mutex.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::watcher::RewatchRequest: " \
+ << this << " " << __func__ << " "
+
+namespace librbd {
+
+using util::create_context_callback;
+using util::create_rados_callback;
+
+namespace watcher {
+
+using std::string;
+
+RewatchRequest::RewatchRequest(librados::IoCtx& ioctx, const string& oid,
+ ceph::shared_mutex &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish)
+ : m_ioctx(ioctx), m_oid(oid), m_watch_lock(watch_lock),
+ m_watch_ctx(watch_ctx), m_watch_handle(watch_handle),
+ m_on_finish(on_finish) {
+}
+
+void RewatchRequest::send() {
+ unwatch();
+}
+
+void RewatchRequest::unwatch() {
+ ceph_assert(ceph_mutex_is_wlocked(m_watch_lock));
+ if (*m_watch_handle == 0) {
+ rewatch();
+ return;
+ }
+
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << dendl;
+
+ uint64_t watch_handle = 0;
+ std::swap(*m_watch_handle, watch_handle);
+
+ librados::AioCompletion *aio_comp = create_rados_callback<
+ RewatchRequest, &RewatchRequest::handle_unwatch>(this);
+ int r = m_ioctx.aio_unwatch(watch_handle, aio_comp);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void RewatchRequest::handle_unwatch(int r) {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r == -EBLOCKLISTED) {
+ lderr(cct) << "client blocklisted" << dendl;
+ finish(r);
+ return;
+ } else if (r < 0) {
+ lderr(cct) << "failed to unwatch: " << cpp_strerror(r) << dendl;
+ }
+ rewatch();
+}
+
+void RewatchRequest::rewatch() {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << dendl;
+
+ librados::AioCompletion *aio_comp = create_rados_callback<
+ RewatchRequest, &RewatchRequest::handle_rewatch>(this);
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void RewatchRequest::handle_rewatch(int r) {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << "r=" << r << dendl;
+ if (r < 0) {
+ lderr(cct) << "failed to watch object: " << cpp_strerror(r)
+ << dendl;
+ m_rewatch_handle = 0;
+ }
+
+ {
+ std::unique_lock watch_locker{m_watch_lock};
+ *m_watch_handle = m_rewatch_handle;
+ }
+
+ finish(r);
+}
+
+void RewatchRequest::finish(int r) {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace watcher
+} // namespace librbd
+
diff --git a/src/librbd/watcher/RewatchRequest.h b/src/librbd/watcher/RewatchRequest.h
new file mode 100644
index 000000000..ce5e31539
--- /dev/null
+++ b/src/librbd/watcher/RewatchRequest.h
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
+#define CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
+
+#include "common/ceph_mutex.h"
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+
+struct Context;
+
+namespace librbd {
+
+namespace watcher {
+
+class RewatchRequest {
+public:
+
+ static RewatchRequest *create(librados::IoCtx& ioctx, const std::string& oid,
+ ceph::shared_mutex &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish) {
+ return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle,
+ on_finish);
+ }
+
+ RewatchRequest(librados::IoCtx& ioctx, const std::string& oid,
+ ceph::shared_mutex &watch_lock, librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish);
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * UNWATCH
+ * |
+ * | . . . .
+ * | . . (recoverable error)
+ * v v .
+ * REWATCH . . .
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ librados::IoCtx& m_ioctx;
+ std::string m_oid;
+ ceph::shared_mutex &m_watch_lock;
+ librados::WatchCtx2 *m_watch_ctx;
+ uint64_t *m_watch_handle;
+ Context *m_on_finish;
+
+ uint64_t m_rewatch_handle = 0;
+
+ void unwatch();
+ void handle_unwatch(int r);
+
+ void rewatch();
+ void handle_rewatch(int r);
+
+ void finish(int r);
+};
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
diff --git a/src/librbd/watcher/Types.cc b/src/librbd/watcher/Types.cc
new file mode 100644
index 000000000..8f1991d7b
--- /dev/null
+++ b/src/librbd/watcher/Types.cc
@@ -0,0 +1,45 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/Types.h"
+#include "common/Formatter.h"
+
+namespace librbd {
+namespace watcher {
+
+void ClientId::encode(bufferlist &bl) const {
+ using ceph::encode;
+ encode(gid, bl);
+ encode(handle, bl);
+}
+
+void ClientId::decode(bufferlist::const_iterator &iter) {
+ using ceph::decode;
+ decode(gid, iter);
+ decode(handle, iter);
+}
+
+void ClientId::dump(Formatter *f) const {
+ f->dump_unsigned("gid", gid);
+ f->dump_unsigned("handle", handle);
+}
+
+void NotifyResponse::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(acks, bl);
+ encode(timeouts, bl);
+}
+
+void NotifyResponse::decode(bufferlist::const_iterator& iter) {
+ using ceph::decode;
+ decode(acks, iter);
+ decode(timeouts, iter);
+}
+std::ostream &operator<<(std::ostream &out,
+ const ClientId &client_id) {
+ out << "[" << client_id.gid << "," << client_id.handle << "]";
+ return out;
+}
+
+} // namespace watcher
+} // namespace librbd
diff --git a/src/librbd/watcher/Types.h b/src/librbd/watcher/Types.h
new file mode 100644
index 000000000..d1517fb0f
--- /dev/null
+++ b/src/librbd/watcher/Types.h
@@ -0,0 +1,71 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_TYPES_H
+#define CEPH_LIBRBD_WATCHER_TYPES_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+
+namespace ceph { class Formatter; }
+
+namespace librbd {
+
+class Watcher;
+
+namespace watcher {
+
+struct ClientId {
+ uint64_t gid;
+ uint64_t handle;
+
+ ClientId() : gid(0), handle(0) {}
+ ClientId(uint64_t gid, uint64_t handle) : gid(gid), handle(handle) {}
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+
+ inline bool is_valid() const {
+ return (*this != ClientId());
+ }
+
+ inline bool operator==(const ClientId &rhs) const {
+ return (gid == rhs.gid && handle == rhs.handle);
+ }
+ inline bool operator!=(const ClientId &rhs) const {
+ return !(*this == rhs);
+ }
+ inline bool operator<(const ClientId &rhs) const {
+ if (gid != rhs.gid) {
+ return gid < rhs.gid;
+ } else {
+ return handle < rhs.handle;
+ }
+ }
+};
+
+struct NotifyResponse {
+ std::map<ClientId, bufferlist> acks;
+ std::vector<ClientId> timeouts;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& it);
+};
+
+template <typename ImageCtxT>
+struct Traits {
+ typedef librbd::Watcher Watcher;
+};
+
+std::ostream &operator<<(std::ostream &out,
+ const ClientId &client);
+
+WRITE_CLASS_ENCODER(ClientId);
+WRITE_CLASS_ENCODER(NotifyResponse);
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_TYPES_H
diff --git a/src/librbd/watcher/Utils.h b/src/librbd/watcher/Utils.h
new file mode 100644
index 000000000..d2510aaf3
--- /dev/null
+++ b/src/librbd/watcher/Utils.h
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_UTILS_H
+#define CEPH_LIBRBD_WATCHER_UTILS_H
+
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include "include/Context.h"
+#include "librbd/Watcher.h"
+
+namespace ceph { class Formatter; }
+
+namespace librbd {
+namespace watcher {
+namespace util {
+
+template <typename Watcher>
+struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ Watcher *watcher;
+ uint64_t notify_id;
+ uint64_t handle;
+
+ HandlePayloadVisitor(Watcher *watcher_, uint64_t notify_id_,
+ uint64_t handle_)
+ : watcher(watcher_), notify_id(notify_id_), handle(handle_)
+ {
+ }
+
+ template <typename P>
+ inline void operator()(const P &payload) const {
+ typename Watcher::C_NotifyAck *ctx =
+ new typename Watcher::C_NotifyAck(watcher, notify_id, handle);
+ if (watcher->handle_payload(payload, ctx)) {
+ ctx->complete(0);
+ }
+ }
+};
+
+class EncodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
+
+ template <typename P>
+ inline void operator()(const P &payload) const {
+ using ceph::encode;
+ encode(static_cast<uint32_t>(P::NOTIFY_OP), m_bl);
+ payload.encode(m_bl);
+ }
+
+private:
+ bufferlist &m_bl;
+};
+
+class DecodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ DecodePayloadVisitor(__u8 version, bufferlist::const_iterator &iter)
+ : m_version(version), m_iter(iter) {}
+
+ template <typename P>
+ inline void operator()(P &payload) const {
+ payload.decode(m_version, m_iter);
+ }
+
+private:
+ __u8 m_version;
+ bufferlist::const_iterator &m_iter;
+};
+
+} // namespace util
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_UTILS_H