summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_ggate/Server.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/rbd_ggate/Server.cc
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/rbd_ggate/Server.cc')
-rw-r--r--src/tools/rbd_ggate/Server.cc262
1 files changed, 262 insertions, 0 deletions
diff --git a/src/tools/rbd_ggate/Server.cc b/src/tools/rbd_ggate/Server.cc
new file mode 100644
index 000000000..2565ba10f
--- /dev/null
+++ b/src/tools/rbd_ggate/Server.cc
@@ -0,0 +1,262 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "Driver.h"
+#include "Server.h"
+#include "Request.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::ggate::Server: " << this \
+ << " " << __func__ << ": "
+
+namespace rbd {
+namespace ggate {
+
+Server::Server(Driver *drv, librbd::Image& image)
+ : m_drv(drv), m_image(image),
+ m_reader_thread(this, &Server::reader_entry),
+ m_writer_thread(this, &Server::writer_entry) {
+}
+
+void Server::run() {
+ dout(10) << dendl;
+
+ int r = start();
+ ceph_assert(r == 0);
+
+ dout(20) << "entering run loop" << dendl;
+
+ {
+ std::unique_lock locker{m_lock};
+ m_cond.wait(locker, [this] { return m_stopping;});
+ }
+
+ dout(20) << "exiting run loop" << dendl;
+
+ stop();
+}
+
+int Server::start() {
+ dout(10) << dendl;
+
+ m_reader_thread.create("rbd_reader");
+ m_writer_thread.create("rbd_writer");
+ return 0;
+}
+
+void Server::stop() {
+ dout(10) << dendl;
+
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_stopping);
+ }
+
+ m_reader_thread.join();
+ m_writer_thread.join();
+
+ wait_clean();
+}
+
+void Server::io_start(IOContext *ctx) {
+ dout(20) << ctx << dendl;
+
+ std::lock_guard locker{m_lock};
+ m_io_pending.push_back(&ctx->item);
+}
+
+void Server::io_finish(IOContext *ctx) {
+ dout(20) << ctx << dendl;
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(ctx->item.is_on_list());
+
+ ctx->item.remove_myself();
+ m_io_finished.push_back(&ctx->item);
+ m_cond.notify_all();
+}
+
+Server::IOContext *Server::wait_io_finish() {
+ dout(20) << dendl;
+
+ std::unique_lock locker{m_lock};
+ m_cond.wait(locker, [this] { return !m_io_finished.empty() || m_stopping;});
+
+ if (m_io_finished.empty()) {
+ return nullptr;
+ }
+
+ IOContext *ret = m_io_finished.front();
+ m_io_finished.pop_front();
+
+ return ret;
+}
+
+void Server::wait_clean() {
+ dout(20) << dendl;
+
+ ceph_assert(!m_reader_thread.is_started());
+
+ std::unique_lock locker{m_lock};
+ m_cond.wait(locker, [this] { return m_io_pending.empty();});
+
+ while (!m_io_finished.empty()) {
+ std::unique_ptr<IOContext> free_ctx(m_io_finished.front());
+ m_io_finished.pop_front();
+ }
+}
+
+void Server::aio_callback(librbd::completion_t cb, void *arg) {
+ librbd::RBD::AioCompletion *aio_completion =
+ reinterpret_cast<librbd::RBD::AioCompletion*>(cb);
+
+ IOContext *ctx = reinterpret_cast<IOContext *>(arg);
+ int r = aio_completion->get_return_value();
+
+ ctx->server->handle_aio(ctx, r);
+ aio_completion->release();
+}
+
+void Server::handle_aio(IOContext *ctx, int r) {
+ dout(20) << ctx << ": r=" << r << dendl;
+
+ if (r == -EINVAL) {
+ // if shrinking an image, a pagecache writeback might reference
+ // extents outside of the range of the new image extents
+ dout(5) << "masking IO out-of-bounds error" << dendl;
+ ctx->req->bl.clear();
+ r = 0;
+ }
+
+ if (r < 0) {
+ ctx->req->set_error(-r);
+ } else if ((ctx->req->get_cmd() == Request::Read) &&
+ r != static_cast<int>(ctx->req->get_length())) {
+ int pad_byte_count = static_cast<int> (ctx->req->get_length()) - r;
+ ctx->req->bl.append_zero(pad_byte_count);
+ dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl;
+ ctx->req->set_error(0);
+ } else {
+ ctx->req->set_error(0);
+ }
+ io_finish(ctx);
+}
+
+void Server::reader_entry() {
+ dout(20) << dendl;
+
+ while (!m_stopping) {
+ std::unique_ptr<IOContext> ctx(new IOContext(this));
+
+ dout(20) << "waiting for ggate request" << dendl;
+
+ int r = m_drv->recv(&ctx->req);
+ if (r < 0) {
+ if (r != -ECANCELED) {
+ derr << "recv: " << cpp_strerror(r) << dendl;
+ }
+ std::lock_guard locker{m_lock};
+ m_stopping = true;
+ m_cond.notify_all();
+ return;
+ }
+
+ IOContext *pctx = ctx.release();
+
+ dout(20) << pctx << ": start: " << *pctx << dendl;
+
+ io_start(pctx);
+ librbd::RBD::AioCompletion *c =
+ new librbd::RBD::AioCompletion(pctx, aio_callback);
+ switch (pctx->req->get_cmd())
+ {
+ case rbd::ggate::Request::Write:
+ m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(),
+ pctx->req->bl, c);
+ break;
+ case rbd::ggate::Request::Read:
+ m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(),
+ pctx->req->bl, c);
+ break;
+ case rbd::ggate::Request::Flush:
+ m_image.aio_flush(c);
+ break;
+ case rbd::ggate::Request::Discard:
+ m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c);
+ break;
+ default:
+ derr << pctx << ": invalid request command: " << pctx->req->get_cmd()
+ << dendl;
+ c->release();
+ std::lock_guard locker{m_lock};
+ m_stopping = true;
+ m_cond.notify_all();
+ return;
+ }
+ }
+ dout(20) << "terminated" << dendl;
+}
+
+void Server::writer_entry() {
+ dout(20) << dendl;
+
+ while (!m_stopping) {
+ dout(20) << "waiting for io request" << dendl;
+
+ std::unique_ptr<IOContext> ctx(wait_io_finish());
+ if (!ctx) {
+ dout(20) << "no io requests, terminating" << dendl;
+ return;
+ }
+
+ dout(20) << ctx.get() << ": got: " << *ctx << dendl;
+
+ int r = m_drv->send(ctx->req);
+ if (r < 0) {
+ derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl;
+ std::lock_guard locker{m_lock};
+ m_stopping = true;
+ m_cond.notify_all();
+ return;
+ }
+ dout(20) << ctx.get() << " finish" << dendl;
+ }
+ dout(20) << "terminated" << dendl;
+}
+
+std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) {
+
+ os << "[" << ctx.req->get_id();
+
+ switch (ctx.req->get_cmd())
+ {
+ case rbd::ggate::Request::Write:
+ os << " Write ";
+ break;
+ case rbd::ggate::Request::Read:
+ os << " Read ";
+ break;
+ case rbd::ggate::Request::Flush:
+ os << " Flush ";
+ break;
+ case rbd::ggate::Request::Discard:
+ os << " Discard ";
+ break;
+ default:
+ os << " Unknow(" << ctx.req->get_cmd() << ") ";
+ break;
+ }
+
+ os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " "
+ << ctx.req->get_error() << "]";
+
+ return os;
+}
+
+} // namespace ggate
+} // namespace rbd
+