summaryrefslogtreecommitdiffstats
path: root/src/test/librados_test_stub/TestWatchNotify.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/librados_test_stub/TestWatchNotify.cc')
-rw-r--r--src/test/librados_test_stub/TestWatchNotify.cc450
1 files changed, 450 insertions, 0 deletions
diff --git a/src/test/librados_test_stub/TestWatchNotify.cc b/src/test/librados_test_stub/TestWatchNotify.cc
new file mode 100644
index 000000000..3b18026bb
--- /dev/null
+++ b/src/test/librados_test_stub/TestWatchNotify.cc
@@ -0,0 +1,450 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librados_test_stub/TestWatchNotify.h"
+#include "include/Context.h"
+#include "common/Cond.h"
+#include "include/stringify.h"
+#include "common/Finisher.h"
+#include "test/librados_test_stub/TestCluster.h"
+#include "test/librados_test_stub/TestRadosClient.h"
+#include <boost/bind/bind.hpp>
+#include <boost/function.hpp>
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_rados
+#undef dout_prefix
+#define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
+
+namespace librados {
+
+std::ostream& operator<<(std::ostream& out,
+ const TestWatchNotify::WatcherID &watcher_id) {
+ out << "(" << watcher_id.first << "," << watcher_id.second << ")";
+ return out;
+}
+
+struct TestWatchNotify::ObjectHandler : public TestCluster::ObjectHandler {
+ TestWatchNotify* test_watch_notify;
+ int64_t pool_id;
+ std::string nspace;
+ std::string oid;
+
+ ObjectHandler(TestWatchNotify* test_watch_notify, int64_t pool_id,
+ const std::string& nspace, const std::string& oid)
+ : test_watch_notify(test_watch_notify), pool_id(pool_id),
+ nspace(nspace), oid(oid) {
+ }
+
+ void handle_removed(TestRadosClient* test_rados_client) override {
+ // copy member variables since this object might be deleted
+ auto _test_watch_notify = test_watch_notify;
+ auto _pool_id = pool_id;
+ auto _nspace = nspace;
+ auto _oid = oid;
+ auto ctx = new LambdaContext([_test_watch_notify, _pool_id, _nspace, _oid](int r) {
+ _test_watch_notify->handle_object_removed(_pool_id, _nspace, _oid);
+ });
+ test_rados_client->get_aio_finisher()->queue(ctx);
+ }
+};
+
+TestWatchNotify::TestWatchNotify(TestCluster* test_cluster)
+ : m_test_cluster(test_cluster) {
+}
+
+void TestWatchNotify::flush(TestRadosClient *rados_client) {
+ CephContext *cct = rados_client->cct();
+
+ ldout(cct, 20) << "enter" << dendl;
+ // block until we know no additional async notify callbacks will occur
+ C_SaferCond ctx;
+ m_async_op_tracker.wait_for_ops(&ctx);
+ ctx.wait();
+}
+
+int TestWatchNotify::list_watchers(int64_t pool_id, const std::string& nspace,
+ const std::string& o,
+ std::list<obj_watch_t> *out_watchers) {
+ std::lock_guard lock{m_lock};
+ SharedWatcher watcher = get_watcher(pool_id, nspace, o);
+ if (!watcher) {
+ return -ENOENT;
+ }
+
+ out_watchers->clear();
+ for (TestWatchNotify::WatchHandles::iterator it =
+ watcher->watch_handles.begin();
+ it != watcher->watch_handles.end(); ++it) {
+ obj_watch_t obj;
+ strncpy(obj.addr, it->second.addr.c_str(), sizeof(obj.addr) - 1);
+ obj.addr[sizeof(obj.addr) - 1] = '\0';
+ obj.watcher_id = static_cast<int64_t>(it->second.gid);
+ obj.cookie = it->second.handle;
+ obj.timeout_seconds = 30;
+ out_watchers->push_back(obj);
+ }
+ return 0;
+}
+
+void TestWatchNotify::aio_flush(TestRadosClient *rados_client,
+ Context *on_finish) {
+ rados_client->get_aio_finisher()->queue(on_finish);
+}
+
+int TestWatchNotify::watch(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& nspace, const std::string& o,
+ uint64_t gid, uint64_t *handle,
+ librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) {
+ C_SaferCond cond;
+ aio_watch(rados_client, pool_id, nspace, o, gid, handle, ctx, ctx2, &cond);
+ return cond.wait();
+}
+
+void TestWatchNotify::aio_watch(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& nspace, const std::string& o,
+ uint64_t gid, uint64_t *handle,
+ librados::WatchCtx *watch_ctx,
+ librados::WatchCtx2 *watch_ctx2,
+ Context *on_finish) {
+ auto ctx = new LambdaContext([=](int) {
+ execute_watch(rados_client, pool_id, nspace, o, gid, handle, watch_ctx,
+ watch_ctx2, on_finish);
+ });
+ rados_client->get_aio_finisher()->queue(ctx);
+}
+
+int TestWatchNotify::unwatch(TestRadosClient *rados_client,
+ uint64_t handle) {
+ C_SaferCond ctx;
+ aio_unwatch(rados_client, handle, &ctx);
+ return ctx.wait();
+}
+
+void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
+ uint64_t handle, Context *on_finish) {
+ auto ctx = new LambdaContext([this, rados_client, handle, on_finish](int) {
+ execute_unwatch(rados_client, handle, on_finish);
+ });
+ rados_client->get_aio_finisher()->queue(ctx);
+}
+
+void TestWatchNotify::aio_notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& nspace,
+ const std::string& oid, const bufferlist& bl,
+ uint64_t timeout_ms, bufferlist *pbl,
+ Context *on_notify) {
+ auto ctx = new LambdaContext([=](int) {
+ execute_notify(rados_client, pool_id, nspace, oid, bl, pbl, on_notify);
+ });
+ rados_client->get_aio_finisher()->queue(ctx);
+}
+
+int TestWatchNotify::notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& nspace, const std::string& oid,
+ bufferlist& bl, uint64_t timeout_ms,
+ bufferlist *pbl) {
+ C_SaferCond cond;
+ aio_notify(rados_client, pool_id, nspace, oid, bl, timeout_ms, pbl, &cond);
+ return cond.wait();
+}
+
+void TestWatchNotify::notify_ack(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& nspace,
+ const std::string& o, uint64_t notify_id,
+ uint64_t handle, uint64_t gid,
+ bufferlist& bl) {
+ CephContext *cct = rados_client->cct();
+ ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
+ << ", gid=" << gid << dendl;
+ std::lock_guard lock{m_lock};
+ WatcherID watcher_id = std::make_pair(gid, handle);
+ ack_notify(rados_client, pool_id, nspace, o, notify_id, watcher_id, bl);
+ finish_notify(rados_client, pool_id, nspace, o, notify_id);
+}
+
+void TestWatchNotify::execute_watch(TestRadosClient *rados_client,
+ int64_t pool_id, const std::string& nspace,
+ const std::string& o, uint64_t gid,
+ uint64_t *handle, librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2,
+ Context* on_finish) {
+ CephContext *cct = rados_client->cct();
+
+ m_lock.lock();
+ SharedWatcher watcher = get_watcher(pool_id, nspace, o);
+ if (!watcher) {
+ m_lock.unlock();
+ on_finish->complete(-ENOENT);
+ return;
+ }
+
+ WatchHandle watch_handle;
+ watch_handle.rados_client = rados_client;
+ watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce());
+ watch_handle.nonce = rados_client->get_nonce();
+ watch_handle.gid = gid;
+ watch_handle.handle = ++m_handle;
+ watch_handle.watch_ctx = ctx;
+ watch_handle.watch_ctx2 = ctx2;
+ watcher->watch_handles[watch_handle.handle] = watch_handle;
+
+ *handle = watch_handle.handle;
+
+ ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
+ << dendl;
+ m_lock.unlock();
+
+ on_finish->complete(0);
+}
+
+void TestWatchNotify::execute_unwatch(TestRadosClient *rados_client,
+ uint64_t handle, Context* on_finish) {
+ CephContext *cct = rados_client->cct();
+
+ ldout(cct, 20) << "handle=" << handle << dendl;
+ {
+ std::lock_guard locker{m_lock};
+ for (FileWatchers::iterator it = m_file_watchers.begin();
+ it != m_file_watchers.end(); ++it) {
+ SharedWatcher watcher = it->second;
+
+ WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
+ if (w_it != watcher->watch_handles.end()) {
+ watcher->watch_handles.erase(w_it);
+ maybe_remove_watcher(watcher);
+ break;
+ }
+ }
+ }
+ on_finish->complete(0);
+}
+
+TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
+ int64_t pool_id, const std::string& nspace, const std::string& oid) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto it = m_file_watchers.find({pool_id, nspace, oid});
+ if (it == m_file_watchers.end()) {
+ SharedWatcher watcher(new Watcher(pool_id, nspace, oid));
+ watcher->object_handler.reset(new ObjectHandler(
+ this, pool_id, nspace, oid));
+ int r = m_test_cluster->register_object_handler(
+ pool_id, {nspace, oid}, watcher->object_handler.get());
+ if (r < 0) {
+ // object doesn't exist
+ return SharedWatcher();
+ }
+ m_file_watchers[{pool_id, nspace, oid}] = watcher;
+ return watcher;
+ }
+
+ return it->second;
+}
+
+void TestWatchNotify::maybe_remove_watcher(SharedWatcher watcher) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ // TODO
+ if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+ auto pool_id = watcher->pool_id;
+ auto& nspace = watcher->nspace;
+ auto& oid = watcher->oid;
+ if (watcher->object_handler) {
+ m_test_cluster->unregister_object_handler(pool_id, {nspace, oid},
+ watcher->object_handler.get());
+ watcher->object_handler.reset();
+ }
+
+ m_file_watchers.erase({pool_id, nspace, oid});
+ }
+}
+
+void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
+ int64_t pool_id, const std::string& nspace,
+ const std::string &oid,
+ const bufferlist &bl, bufferlist *pbl,
+ Context *on_notify) {
+ CephContext *cct = rados_client->cct();
+
+ m_lock.lock();
+ uint64_t notify_id = ++m_notify_id;
+
+ SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
+ if (!watcher) {
+ ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
+ m_lock.unlock();
+ on_notify->complete(-ENOENT);
+ return;
+ }
+
+ ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
+
+ SharedNotifyHandle notify_handle(new NotifyHandle());
+ notify_handle->rados_client = rados_client;
+ notify_handle->pbl = pbl;
+ notify_handle->on_notify = on_notify;
+
+ WatchHandles &watch_handles = watcher->watch_handles;
+ for (auto &watch_handle_pair : watch_handles) {
+ WatchHandle &watch_handle = watch_handle_pair.second;
+ notify_handle->pending_watcher_ids.insert(std::make_pair(
+ watch_handle.gid, watch_handle.handle));
+
+ m_async_op_tracker.start_op();
+ uint64_t notifier_id = rados_client->get_instance_id();
+ watch_handle.rados_client->get_aio_finisher()->queue(new LambdaContext(
+ [this, pool_id, nspace, oid, bl, notify_id, watch_handle, notifier_id](int r) {
+ bufferlist notify_bl;
+ notify_bl.append(bl);
+
+ if (watch_handle.watch_ctx2 != NULL) {
+ watch_handle.watch_ctx2->handle_notify(notify_id,
+ watch_handle.handle,
+ notifier_id, notify_bl);
+ } else if (watch_handle.watch_ctx != NULL) {
+ watch_handle.watch_ctx->notify(0, 0, notify_bl);
+
+ // auto ack old-style watch/notify clients
+ ack_notify(watch_handle.rados_client, pool_id, nspace, oid, notify_id,
+ {watch_handle.gid, watch_handle.handle}, bufferlist());
+ }
+
+ m_async_op_tracker.finish_op();
+ }));
+ }
+ watcher->notify_handles[notify_id] = notify_handle;
+
+ finish_notify(rados_client, pool_id, nspace, oid, notify_id);
+ m_lock.unlock();
+}
+
+void TestWatchNotify::ack_notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& nspace,
+ const std::string &oid, uint64_t notify_id,
+ const WatcherID &watcher_id,
+ const bufferlist &bl) {
+ CephContext *cct = rados_client->cct();
+
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
+ if (!watcher) {
+ ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
+ return;
+ }
+
+ NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
+ if (it == watcher->notify_handles.end()) {
+ ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
+ << ", WatcherID=" << watcher_id << ": not found" << dendl;
+ return;
+ }
+
+ ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
+ << ", WatcherID=" << watcher_id << dendl;
+
+ bufferlist response;
+ response.append(bl);
+
+ SharedNotifyHandle notify_handle = it->second;
+ notify_handle->notify_responses[watcher_id] = response;
+ notify_handle->pending_watcher_ids.erase(watcher_id);
+}
+
+void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
+ int64_t pool_id, const std::string& nspace,
+ const std::string &oid,
+ uint64_t notify_id) {
+ CephContext *cct = rados_client->cct();
+
+ ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
+
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
+ if (!watcher) {
+ ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
+ return;
+ }
+
+ NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
+ if (it == watcher->notify_handles.end()) {
+ ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
+ << ": not found" << dendl;
+ return;
+ }
+
+ SharedNotifyHandle notify_handle = it->second;
+ if (!notify_handle->pending_watcher_ids.empty()) {
+ ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
+ << ": pending watchers, returning" << dendl;
+ return;
+ }
+
+ ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
+ << ": completing" << dendl;
+
+ if (notify_handle->pbl != NULL) {
+ encode(notify_handle->notify_responses, *notify_handle->pbl);
+ encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
+ }
+
+ notify_handle->rados_client->get_aio_finisher()->queue(
+ notify_handle->on_notify, 0);
+ watcher->notify_handles.erase(notify_id);
+ maybe_remove_watcher(watcher);
+}
+
+void TestWatchNotify::blocklist(uint32_t nonce) {
+ std::lock_guard locker{m_lock};
+
+ for (auto file_it = m_file_watchers.begin();
+ file_it != m_file_watchers.end(); ) {
+ auto &watcher = file_it->second;
+ for (auto w_it = watcher->watch_handles.begin();
+ w_it != watcher->watch_handles.end();) {
+ if (w_it->second.nonce == nonce) {
+ w_it = watcher->watch_handles.erase(w_it);
+ } else {
+ ++w_it;
+ }
+ }
+
+ ++file_it;
+ maybe_remove_watcher(watcher);
+ }
+}
+
+void TestWatchNotify::handle_object_removed(int64_t pool_id,
+ const std::string& nspace,
+ const std::string& oid) {
+ std::lock_guard locker{m_lock};
+ auto it = m_file_watchers.find({pool_id, nspace, oid});
+ if (it == m_file_watchers.end()) {
+ return;
+ }
+
+ auto watcher = it->second;
+
+ // cancel all in-flight notifications
+ for (auto& notify_handle_pair : watcher->notify_handles) {
+ auto notify_handle = notify_handle_pair.second;
+ notify_handle->rados_client->get_aio_finisher()->queue(
+ notify_handle->on_notify, -ENOENT);
+ }
+
+ // alert all watchers of the loss of connection
+ for (auto& watch_handle_pair : watcher->watch_handles) {
+ auto& watch_handle = watch_handle_pair.second;
+ auto handle = watch_handle.handle;
+ auto watch_ctx2 = watch_handle.watch_ctx2;
+ if (watch_ctx2 != nullptr) {
+ auto ctx = new LambdaContext([handle, watch_ctx2](int) {
+ watch_ctx2->handle_error(handle, -ENOTCONN);
+ });
+ watch_handle.rados_client->get_aio_finisher()->queue(ctx);
+ }
+ }
+ m_file_watchers.erase(it);
+}
+
+} // namespace librados