diff options
Diffstat (limited to 'src/test/librados_test_stub/TestIoCtxImpl.cc')
-rw-r--r-- | src/test/librados_test_stub/TestIoCtxImpl.cc | 386 |
1 files changed, 386 insertions, 0 deletions
diff --git a/src/test/librados_test_stub/TestIoCtxImpl.cc b/src/test/librados_test_stub/TestIoCtxImpl.cc new file mode 100644 index 00000000..7e6fa4ee --- /dev/null +++ b/src/test/librados_test_stub/TestIoCtxImpl.cc @@ -0,0 +1,386 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librados_test_stub/TestIoCtxImpl.h" +#include "test/librados_test_stub/TestClassHandler.h" +#include "test/librados_test_stub/TestRadosClient.h" +#include "test/librados_test_stub/TestWatchNotify.h" +#include "librados/AioCompletionImpl.h" +#include "include/ceph_assert.h" +#include "common/Finisher.h" +#include "common/valgrind.h" +#include "objclass/objclass.h" +#include <boost/bind.hpp> +#include <errno.h> + +namespace librados { + +TestIoCtxImpl::TestIoCtxImpl() : m_client(NULL) { + get(); +} + +TestIoCtxImpl::TestIoCtxImpl(TestRadosClient *client, int64_t pool_id, + const std::string& pool_name) + : m_client(client), m_pool_id(pool_id), m_pool_name(pool_name), + m_snap_seq(CEPH_NOSNAP) +{ + m_client->get(); + get(); +} + +TestIoCtxImpl::TestIoCtxImpl(const TestIoCtxImpl& rhs) + : m_client(rhs.m_client), + m_pool_id(rhs.m_pool_id), + m_pool_name(rhs.m_pool_name), + m_namespace_name(rhs.m_namespace_name), + m_snap_seq(rhs.m_snap_seq) +{ + m_client->get(); + get(); +} + +TestIoCtxImpl::~TestIoCtxImpl() { + ceph_assert(m_pending_ops == 0); +} + +void TestObjectOperationImpl::get() { + m_refcount++; +} + +void TestObjectOperationImpl::put() { + if (--m_refcount == 0) { + ANNOTATE_HAPPENS_AFTER(&m_refcount); + ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_refcount); + delete this; + } else { + ANNOTATE_HAPPENS_BEFORE(&m_refcount); + } +} + +void TestIoCtxImpl::get() { + m_refcount++; +} + +void TestIoCtxImpl::put() { + if (--m_refcount == 0) { + m_client->put(); + delete this; + } +} + +uint64_t TestIoCtxImpl::get_instance_id() const { + return m_client->get_instance_id(); +} + +int64_t TestIoCtxImpl::get_id() { + return m_pool_id; +} + +uint64_t TestIoCtxImpl::get_last_version() { + return 0; +} + +std::string TestIoCtxImpl::get_pool_name() { + return m_pool_name; +} + +int TestIoCtxImpl::aio_flush() { + m_client->flush_aio_operations(); + return 0; +} + +void TestIoCtxImpl::aio_flush_async(AioCompletionImpl *c) { + m_client->flush_aio_operations(c); +} + +void TestIoCtxImpl::aio_notify(const std::string& oid, AioCompletionImpl *c, + bufferlist& bl, uint64_t timeout_ms, + bufferlist *pbl) { + m_pending_ops++; + c->get(); + C_AioNotify *ctx = new C_AioNotify(this, c); + m_client->get_watch_notify()->aio_notify(m_client, m_pool_id, get_namespace(), + oid, bl, timeout_ms, pbl, ctx); +} + +int TestIoCtxImpl::aio_operate(const std::string& oid, TestObjectOperationImpl &ops, + AioCompletionImpl *c, SnapContext *snap_context, + int flags) { + // TODO flags for now + ops.get(); + m_pending_ops++; + m_client->add_aio_operation(oid, true, boost::bind( + &TestIoCtxImpl::execute_aio_operations, this, oid, &ops, + reinterpret_cast<bufferlist*>(0), + snap_context != NULL ? *snap_context : m_snapc), c); + return 0; +} + +int TestIoCtxImpl::aio_operate_read(const std::string& oid, + TestObjectOperationImpl &ops, + AioCompletionImpl *c, int flags, + bufferlist *pbl) { + // TODO ignoring flags for now + ops.get(); + m_pending_ops++; + m_client->add_aio_operation(oid, true, boost::bind( + &TestIoCtxImpl::execute_aio_operations, this, oid, &ops, pbl, m_snapc), c); + return 0; +} + +int TestIoCtxImpl::aio_watch(const std::string& o, AioCompletionImpl *c, + uint64_t *handle, librados::WatchCtx2 *watch_ctx) { + m_pending_ops++; + c->get(); + C_AioNotify *ctx = new C_AioNotify(this, c); + if (m_client->is_blacklisted()) { + m_client->get_aio_finisher()->queue(ctx, -EBLACKLISTED); + } else { + m_client->get_watch_notify()->aio_watch(m_client, m_pool_id, + get_namespace(), o, + get_instance_id(), handle, nullptr, + watch_ctx, ctx); + } + return 0; +} + +int TestIoCtxImpl::aio_unwatch(uint64_t handle, AioCompletionImpl *c) { + m_pending_ops++; + c->get(); + C_AioNotify *ctx = new C_AioNotify(this, c); + if (m_client->is_blacklisted()) { + m_client->get_aio_finisher()->queue(ctx, -EBLACKLISTED); + } else { + m_client->get_watch_notify()->aio_unwatch(m_client, handle, ctx); + } + return 0; +} + +int TestIoCtxImpl::exec(const std::string& oid, TestClassHandler *handler, + const char *cls, const char *method, + bufferlist& inbl, bufferlist* outbl, + const SnapContext &snapc) { + if (m_client->is_blacklisted()) { + return -EBLACKLISTED; + } + + cls_method_cxx_call_t call = handler->get_method(cls, method); + if (call == NULL) { + return -ENOSYS; + } + + return (*call)(reinterpret_cast<cls_method_context_t>( + handler->get_method_context(this, oid, snapc).get()), &inbl, outbl); +} + +int TestIoCtxImpl::list_watchers(const std::string& o, + std::list<obj_watch_t> *out_watchers) { + if (m_client->is_blacklisted()) { + return -EBLACKLISTED; + } + + return m_client->get_watch_notify()->list_watchers(m_pool_id, get_namespace(), + o, out_watchers); +} + +int TestIoCtxImpl::notify(const std::string& o, bufferlist& bl, + uint64_t timeout_ms, bufferlist *pbl) { + if (m_client->is_blacklisted()) { + return -EBLACKLISTED; + } + + return m_client->get_watch_notify()->notify(m_client, m_pool_id, + get_namespace(), o, bl, + timeout_ms, pbl); +} + +void TestIoCtxImpl::notify_ack(const std::string& o, uint64_t notify_id, + uint64_t handle, bufferlist& bl) { + m_client->get_watch_notify()->notify_ack(m_client, m_pool_id, get_namespace(), + o, notify_id, handle, + m_client->get_instance_id(), bl); +} + +int TestIoCtxImpl::operate(const std::string& oid, TestObjectOperationImpl &ops) { + AioCompletionImpl *comp = new AioCompletionImpl(); + + ops.get(); + m_pending_ops++; + m_client->add_aio_operation(oid, false, boost::bind( + &TestIoCtxImpl::execute_aio_operations, this, oid, &ops, + reinterpret_cast<bufferlist*>(0), m_snapc), comp); + + comp->wait_for_safe(); + int ret = comp->get_return_value(); + comp->put(); + return ret; +} + +int TestIoCtxImpl::operate_read(const std::string& oid, TestObjectOperationImpl &ops, + bufferlist *pbl) { + AioCompletionImpl *comp = new AioCompletionImpl(); + + ops.get(); + m_pending_ops++; + m_client->add_aio_operation(oid, false, boost::bind( + &TestIoCtxImpl::execute_aio_operations, this, oid, &ops, pbl, + m_snapc), comp); + + comp->wait_for_complete(); + int ret = comp->get_return_value(); + comp->put(); + return ret; +} + +void TestIoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid, + AioCompletionImpl *c) { + m_client->add_aio_operation( + "", true, + boost::bind(&TestIoCtxImpl::selfmanaged_snap_create, this, snapid), c); +} + +void TestIoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid, + AioCompletionImpl *c) { + m_client->add_aio_operation( + "", true, + boost::bind(&TestIoCtxImpl::selfmanaged_snap_remove, this, snapid), c); +} + +int TestIoCtxImpl::selfmanaged_snap_set_write_ctx(snap_t seq, + std::vector<snap_t>& snaps) { + std::vector<snapid_t> snap_ids(snaps.begin(), snaps.end()); + m_snapc = SnapContext(seq, snap_ids); + return 0; +} + +int TestIoCtxImpl::set_alloc_hint(const std::string& oid, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags, + const SnapContext &snapc) { + return 0; +} + +void TestIoCtxImpl::set_snap_read(snap_t seq) { + if (seq == 0) { + seq = CEPH_NOSNAP; + } + m_snap_seq = seq; +} + +int TestIoCtxImpl::tmap_update(const std::string& oid, bufferlist& cmdbl) { + if (m_client->is_blacklisted()) { + return -EBLACKLISTED; + } + + // TODO: protect against concurrent tmap updates + bufferlist tmap_header; + std::map<string,bufferlist> tmap; + uint64_t size = 0; + int r = stat(oid, &size, NULL); + if (r == -ENOENT) { + r = create(oid, false); + } + if (r < 0) { + return r; + } + + if (size > 0) { + bufferlist inbl; + r = read(oid, size, 0, &inbl); + if (r < 0) { + return r; + } + auto iter = inbl.cbegin(); + decode(tmap_header, iter); + decode(tmap, iter); + } + + __u8 c; + std::string key; + bufferlist value; + auto iter = cmdbl.cbegin(); + decode(c, iter); + decode(key, iter); + + switch (c) { + case CEPH_OSD_TMAP_SET: + decode(value, iter); + tmap[key] = value; + break; + case CEPH_OSD_TMAP_RM: + r = tmap.erase(key); + if (r == 0) { + return -ENOENT; + } + break; + default: + return -EINVAL; + } + + bufferlist out; + encode(tmap_header, out); + encode(tmap, out); + r = write_full(oid, out, m_snapc); + return r; +} + +int TestIoCtxImpl::unwatch(uint64_t handle) { + if (m_client->is_blacklisted()) { + return -EBLACKLISTED; + } + + return m_client->get_watch_notify()->unwatch(m_client, handle); +} + +int TestIoCtxImpl::watch(const std::string& o, uint64_t *handle, + librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) { + if (m_client->is_blacklisted()) { + return -EBLACKLISTED; + } + + return m_client->get_watch_notify()->watch(m_client, m_pool_id, + get_namespace(), o, + get_instance_id(), handle, ctx, + ctx2); +} + +int TestIoCtxImpl::execute_operation(const std::string& oid, + const Operation &operation) { + if (m_client->is_blacklisted()) { + return -EBLACKLISTED; + } + + TestRadosClient::Transaction transaction(m_client, get_namespace(), oid); + return operation(this, oid); +} + +int TestIoCtxImpl::execute_aio_operations(const std::string& oid, + TestObjectOperationImpl *ops, + bufferlist *pbl, + const SnapContext &snapc) { + int ret = 0; + if (m_client->is_blacklisted()) { + ret = -EBLACKLISTED; + } else { + TestRadosClient::Transaction transaction(m_client, get_namespace(), oid); + for (ObjectOperations::iterator it = ops->ops.begin(); + it != ops->ops.end(); ++it) { + ret = (*it)(this, oid, pbl, snapc); + if (ret < 0) { + break; + } + } + } + m_pending_ops--; + ops->put(); + return ret; +} + +void TestIoCtxImpl::handle_aio_notify_complete(AioCompletionImpl *c, int r) { + m_pending_ops--; + + m_client->finish_aio_completion(c, r); +} + +} // namespace librados |