diff options
Diffstat (limited to '')
-rw-r--r-- | src/tools/rbd_ggate/Server.cc | 262 |
1 files changed, 262 insertions, 0 deletions
diff --git a/src/tools/rbd_ggate/Server.cc b/src/tools/rbd_ggate/Server.cc new file mode 100644 index 000000000..2565ba10f --- /dev/null +++ b/src/tools/rbd_ggate/Server.cc @@ -0,0 +1,262 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/debug.h" +#include "common/errno.h" +#include "Driver.h" +#include "Server.h" +#include "Request.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "rbd::ggate::Server: " << this \ + << " " << __func__ << ": " + +namespace rbd { +namespace ggate { + +Server::Server(Driver *drv, librbd::Image& image) + : m_drv(drv), m_image(image), + m_reader_thread(this, &Server::reader_entry), + m_writer_thread(this, &Server::writer_entry) { +} + +void Server::run() { + dout(10) << dendl; + + int r = start(); + ceph_assert(r == 0); + + dout(20) << "entering run loop" << dendl; + + { + std::unique_lock locker{m_lock}; + m_cond.wait(locker, [this] { return m_stopping;}); + } + + dout(20) << "exiting run loop" << dendl; + + stop(); +} + +int Server::start() { + dout(10) << dendl; + + m_reader_thread.create("rbd_reader"); + m_writer_thread.create("rbd_writer"); + return 0; +} + +void Server::stop() { + dout(10) << dendl; + + { + std::lock_guard locker{m_lock}; + ceph_assert(m_stopping); + } + + m_reader_thread.join(); + m_writer_thread.join(); + + wait_clean(); +} + +void Server::io_start(IOContext *ctx) { + dout(20) << ctx << dendl; + + std::lock_guard locker{m_lock}; + m_io_pending.push_back(&ctx->item); +} + +void Server::io_finish(IOContext *ctx) { + dout(20) << ctx << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(ctx->item.is_on_list()); + + ctx->item.remove_myself(); + m_io_finished.push_back(&ctx->item); + m_cond.notify_all(); +} + +Server::IOContext *Server::wait_io_finish() { + dout(20) << dendl; + + std::unique_lock locker{m_lock}; + m_cond.wait(locker, [this] { return !m_io_finished.empty() || m_stopping;}); + + if (m_io_finished.empty()) { + return nullptr; + } + + IOContext *ret = m_io_finished.front(); + m_io_finished.pop_front(); + + return ret; +} + +void Server::wait_clean() { + dout(20) << dendl; + + ceph_assert(!m_reader_thread.is_started()); + + std::unique_lock locker{m_lock}; + m_cond.wait(locker, [this] { return m_io_pending.empty();}); + + while (!m_io_finished.empty()) { + std::unique_ptr<IOContext> free_ctx(m_io_finished.front()); + m_io_finished.pop_front(); + } +} + +void Server::aio_callback(librbd::completion_t cb, void *arg) { + librbd::RBD::AioCompletion *aio_completion = + reinterpret_cast<librbd::RBD::AioCompletion*>(cb); + + IOContext *ctx = reinterpret_cast<IOContext *>(arg); + int r = aio_completion->get_return_value(); + + ctx->server->handle_aio(ctx, r); + aio_completion->release(); +} + +void Server::handle_aio(IOContext *ctx, int r) { + dout(20) << ctx << ": r=" << r << dendl; + + if (r == -EINVAL) { + // if shrinking an image, a pagecache writeback might reference + // extents outside of the range of the new image extents + dout(5) << "masking IO out-of-bounds error" << dendl; + ctx->req->bl.clear(); + r = 0; + } + + if (r < 0) { + ctx->req->set_error(-r); + } else if ((ctx->req->get_cmd() == Request::Read) && + r != static_cast<int>(ctx->req->get_length())) { + int pad_byte_count = static_cast<int> (ctx->req->get_length()) - r; + ctx->req->bl.append_zero(pad_byte_count); + dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl; + ctx->req->set_error(0); + } else { + ctx->req->set_error(0); + } + io_finish(ctx); +} + +void Server::reader_entry() { + dout(20) << dendl; + + while (!m_stopping) { + std::unique_ptr<IOContext> ctx(new IOContext(this)); + + dout(20) << "waiting for ggate request" << dendl; + + int r = m_drv->recv(&ctx->req); + if (r < 0) { + if (r != -ECANCELED) { + derr << "recv: " << cpp_strerror(r) << dendl; + } + std::lock_guard locker{m_lock}; + m_stopping = true; + m_cond.notify_all(); + return; + } + + IOContext *pctx = ctx.release(); + + dout(20) << pctx << ": start: " << *pctx << dendl; + + io_start(pctx); + librbd::RBD::AioCompletion *c = + new librbd::RBD::AioCompletion(pctx, aio_callback); + switch (pctx->req->get_cmd()) + { + case rbd::ggate::Request::Write: + m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(), + pctx->req->bl, c); + break; + case rbd::ggate::Request::Read: + m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(), + pctx->req->bl, c); + break; + case rbd::ggate::Request::Flush: + m_image.aio_flush(c); + break; + case rbd::ggate::Request::Discard: + m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c); + break; + default: + derr << pctx << ": invalid request command: " << pctx->req->get_cmd() + << dendl; + c->release(); + std::lock_guard locker{m_lock}; + m_stopping = true; + m_cond.notify_all(); + return; + } + } + dout(20) << "terminated" << dendl; +} + +void Server::writer_entry() { + dout(20) << dendl; + + while (!m_stopping) { + dout(20) << "waiting for io request" << dendl; + + std::unique_ptr<IOContext> ctx(wait_io_finish()); + if (!ctx) { + dout(20) << "no io requests, terminating" << dendl; + return; + } + + dout(20) << ctx.get() << ": got: " << *ctx << dendl; + + int r = m_drv->send(ctx->req); + if (r < 0) { + derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl; + std::lock_guard locker{m_lock}; + m_stopping = true; + m_cond.notify_all(); + return; + } + dout(20) << ctx.get() << " finish" << dendl; + } + dout(20) << "terminated" << dendl; +} + +std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) { + + os << "[" << ctx.req->get_id(); + + switch (ctx.req->get_cmd()) + { + case rbd::ggate::Request::Write: + os << " Write "; + break; + case rbd::ggate::Request::Read: + os << " Read "; + break; + case rbd::ggate::Request::Flush: + os << " Flush "; + break; + case rbd::ggate::Request::Discard: + os << " Discard "; + break; + default: + os << " Unknow(" << ctx.req->get_cmd() << ") "; + break; + } + + os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " " + << ctx.req->get_error() << "]"; + + return os; +} + +} // namespace ggate +} // namespace rbd + |