summaryrefslogtreecommitdiffstats
path: root/src/librbd/Watcher.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/librbd/Watcher.cc
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/librbd/Watcher.cc370
1 files changed, 370 insertions, 0 deletions
diff --git a/src/librbd/Watcher.cc b/src/librbd/Watcher.cc
new file mode 100644
index 000000000..c215d6df7
--- /dev/null
+++ b/src/librbd/Watcher.cc
@@ -0,0 +1,370 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/Watcher.h"
+#include "librbd/watcher/RewatchRequest.h"
+#include "librbd/Utils.h"
+#include "librbd/TaskFinisher.h"
+#include "librbd/asio/ContextWQ.h"
+#include "include/encoding.h"
+#include "common/errno.h"
+#include <boost/bind/bind.hpp>
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_rbd
+
+namespace librbd {
+
+using namespace boost::placeholders;
+
+using namespace watcher;
+
+using util::create_context_callback;
+using util::create_rados_callback;
+using std::string;
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+ librados::Rados rados;
+ Context *on_finish;
+ bool flushing = false;
+ int ret_val = 0;
+
+ C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
+ : rados(io_ctx), on_finish(on_finish) {
+ }
+
+ void complete(int r) override {
+ if (ret_val == 0 && r < 0) {
+ ret_val = r;
+ }
+
+ if (!flushing) {
+ flushing = true;
+
+ librados::AioCompletion *aio_comp = create_rados_callback(this);
+ r = rados.aio_watch_flush(aio_comp);
+ ceph_assert(r == 0);
+ aio_comp->release();
+ return;
+ }
+
+ // ensure our reference to the RadosClient is released prior
+ // to completing the callback to avoid racing an explicit
+ // librados shutdown
+ Context *ctx = on_finish;
+ r = ret_val;
+ delete this;
+
+ ctx->complete(r);
+ }
+
+ void finish(int r) override {
+ }
+};
+
+} // anonymous namespace
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
+ << __func__ << ": "
+
+Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
+ uint64_t handle)
+ : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
+ handle(handle) {
+ ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
+}
+
+void Watcher::C_NotifyAck::finish(int r) {
+ ldout(cct, 10) << "r=" << r << dendl;
+ ceph_assert(r == 0);
+ watcher->acknowledge_notify(notify_id, handle, out);
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
+ << ": "
+
+Watcher::Watcher(librados::IoCtx& ioctx, asio::ContextWQ *work_queue,
+ const string& oid)
+ : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
+ m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
+ m_watch_lock(ceph::make_shared_mutex(
+ util::unique_lock_name("librbd::Watcher::m_watch_lock", this))),
+ m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
+ m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) {
+}
+
+Watcher::~Watcher() {
+ std::shared_lock l{m_watch_lock};
+ ceph_assert(is_unregistered(m_watch_lock));
+}
+
+void Watcher::register_watch(Context *on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ std::unique_lock watch_locker{m_watch_lock};
+ ceph_assert(is_unregistered(m_watch_lock));
+ m_watch_state = WATCH_STATE_REGISTERING;
+ m_watch_blocklisted = false;
+
+ librados::AioCompletion *aio_comp = create_rados_callback(
+ new C_RegisterWatch(this, on_finish));
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void Watcher::handle_register_watch(int r, Context *on_finish) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::unique_lock watch_locker{m_watch_lock};
+ ceph_assert(m_watch_state == WATCH_STATE_REGISTERING);
+
+ m_watch_state = WATCH_STATE_IDLE;
+ if (r < 0) {
+ lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
+ << dendl;
+ m_watch_handle = 0;
+ }
+
+ if (m_unregister_watch_ctx != nullptr) {
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == 0 && m_watch_error) {
+ lderr(m_cct) << "re-registering watch after error" << dendl;
+ m_watch_state = WATCH_STATE_REWATCHING;
+ watch_error = true;
+ } else {
+ m_watch_blocklisted = (r == -EBLOCKLISTED);
+ }
+ }
+
+ on_finish->complete(r);
+
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
+ }
+}
+
+void Watcher::unregister_watch(Context *on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ {
+ std::unique_lock watch_locker{m_watch_lock};
+ if (m_watch_state != WATCH_STATE_IDLE) {
+ ldout(m_cct, 10) << "delaying unregister until register completed"
+ << dendl;
+
+ ceph_assert(m_unregister_watch_ctx == nullptr);
+ m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
+ unregister_watch(on_finish);
+ });
+ return;
+ } else if (is_registered(m_watch_lock)) {
+ librados::AioCompletion *aio_comp = create_rados_callback(
+ new C_UnwatchAndFlush(m_ioctx, on_finish));
+ int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
+ ceph_assert(r == 0);
+ aio_comp->release();
+
+ m_watch_handle = 0;
+ m_watch_blocklisted = false;
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+bool Watcher::notifications_blocked() const {
+ std::shared_lock locker{m_watch_lock};
+
+ bool blocked = (m_blocked_count > 0);
+ ldout(m_cct, 5) << "blocked=" << blocked << dendl;
+ return blocked;
+}
+
+void Watcher::block_notifies(Context *on_finish) {
+ {
+ std::unique_lock locker{m_watch_lock};
+ ++m_blocked_count;
+ ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+ }
+ m_async_op_tracker.wait_for_ops(on_finish);
+}
+
+void Watcher::unblock_notifies() {
+ std::unique_lock locker{m_watch_lock};
+ ceph_assert(m_blocked_count > 0);
+ --m_blocked_count;
+ ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+}
+
+void Watcher::flush(Context *on_finish) {
+ m_notifier.flush(on_finish);
+}
+
+std::string Watcher::get_oid() const {
+ std::shared_lock locker{m_watch_lock};
+ return m_oid;
+}
+
+void Watcher::set_oid(const string& oid) {
+ std::unique_lock watch_locker{m_watch_lock};
+ ceph_assert(is_unregistered(m_watch_lock));
+
+ m_oid = oid;
+}
+
+void Watcher::handle_error(uint64_t handle, int err) {
+ lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
+
+ std::unique_lock watch_locker{m_watch_lock};
+ m_watch_error = true;
+
+ if (is_registered(m_watch_lock)) {
+ m_watch_state = WATCH_STATE_REWATCHING;
+ if (err == -EBLOCKLISTED) {
+ m_watch_blocklisted = true;
+ }
+
+ auto ctx = new LambdaContext(
+ boost::bind(&Watcher::rewatch, this));
+ m_work_queue->queue(ctx);
+ }
+}
+
+void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &out) {
+ m_ioctx.notify_ack(m_oid, notify_id, handle, out);
+}
+
+void Watcher::rewatch() {
+ ldout(m_cct, 10) << dendl;
+
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::unique_lock watch_locker{m_watch_lock};
+ ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ m_watch_state = WATCH_STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else {
+ m_watch_error = false;
+ auto ctx = create_context_callback<
+ Watcher, &Watcher::handle_rewatch>(this);
+ auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
+ &m_watch_ctx, &m_watch_handle, ctx);
+ req->send();
+ return;
+ }
+ }
+
+ unregister_watch_ctx->complete(0);
+}
+
+void Watcher::handle_rewatch(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::unique_lock watch_locker{m_watch_lock};
+ ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
+
+ m_watch_blocklisted = false;
+ if (m_unregister_watch_ctx != nullptr) {
+ ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl;
+ m_watch_state = WATCH_STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLOCKLISTED) {
+ lderr(m_cct) << "client blocklisted" << dendl;
+ m_watch_blocklisted = true;
+ } else if (r == -ENOENT) {
+ ldout(m_cct, 5) << "object does not exist" << dendl;
+ } else if (r < 0) {
+ lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl;
+ watch_error = true;
+ } else if (m_watch_error) {
+ lderr(m_cct) << "re-registering watch after error" << dendl;
+ watch_error = true;
+ }
+ }
+
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ return;
+ } else if (watch_error) {
+ rewatch();
+ return;
+ }
+
+ auto ctx = create_context_callback<
+ Watcher, &Watcher::handle_rewatch_callback>(this);
+ m_work_queue->queue(ctx, r);
+}
+
+void Watcher::handle_rewatch_callback(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+ handle_rewatch_complete(r);
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::unique_lock watch_locker{m_watch_lock};
+ ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ m_watch_state = WATCH_STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLOCKLISTED || r == -ENOENT) {
+ m_watch_state = WATCH_STATE_IDLE;
+ } else if (r < 0 || m_watch_error) {
+ watch_error = true;
+ } else {
+ m_watch_state = WATCH_STATE_IDLE;
+ }
+ }
+
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
+ }
+}
+
+void Watcher::send_notify(bufferlist& payload,
+ watcher::NotifyResponse *response,
+ Context *on_finish) {
+ m_notifier.notify(payload, response, on_finish);
+}
+
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ // if notifications are blocked, finish the notification w/o
+ // bubbling the notification up to the derived class
+ watcher.m_async_op_tracker.start_op();
+ if (watcher.notifications_blocked()) {
+ bufferlist bl;
+ watcher.acknowledge_notify(notify_id, handle, bl);
+ } else {
+ watcher.handle_notify(notify_id, handle, notifier_id, bl);
+ }
+ watcher.m_async_op_tracker.finish_op();
+}
+
+void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
+ watcher.handle_error(handle, err);
+}
+
+} // namespace librbd