summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_aio.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_aio.cc')
-rw-r--r--src/rgw/rgw_aio.cc119
1 files changed, 119 insertions, 0 deletions
diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc
new file mode 100644
index 000000000..1d5a021d3
--- /dev/null
+++ b/src/rgw/rgw_aio.cc
@@ -0,0 +1,119 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <type_traits>
+#include "include/rados/librados.hpp"
+#include "librados/librados_asio.h"
+
+#include "rgw_aio.h"
+
+namespace rgw {
+
+namespace {
+
+void cb(librados::completion_t, void* arg);
+
+struct state {
+ Aio* aio;
+ librados::AioCompletion* c;
+
+ state(Aio* aio, AioResult& r)
+ : aio(aio),
+ c(librados::Rados::aio_create_completion(&r, &cb)) {}
+};
+
+void cb(librados::completion_t, void* arg) {
+ static_assert(sizeof(AioResult::user_data) >= sizeof(state));
+ static_assert(std::is_trivially_destructible_v<state>);
+ auto& r = *(static_cast<AioResult*>(arg));
+ auto s = reinterpret_cast<state*>(&r.user_data);
+ r.result = s->c->get_return_value();
+ s->c->release();
+ s->aio->put(r);
+}
+
+template <typename Op>
+Aio::OpFunc aio_abstract(Op&& op) {
+ return [op = std::move(op)] (Aio* aio, AioResult& r) mutable {
+ constexpr bool read = std::is_same_v<std::decay_t<Op>, librados::ObjectReadOperation>;
+ auto s = new (&r.user_data) state(aio, r);
+ if constexpr (read) {
+ r.result = r.obj.aio_operate(s->c, &op, &r.data);
+ } else {
+ r.result = r.obj.aio_operate(s->c, &op);
+ }
+ if (r.result < 0) {
+ s->c->release();
+ aio->put(r);
+ }
+ };
+}
+
+struct Handler {
+ Aio* throttle = nullptr;
+ AioResult& r;
+ // write callback
+ void operator()(boost::system::error_code ec) const {
+ r.result = -ec.value();
+ throttle->put(r);
+ }
+ // read callback
+ void operator()(boost::system::error_code ec, bufferlist bl) const {
+ r.result = -ec.value();
+ r.data = std::move(bl);
+ throttle->put(r);
+ }
+};
+
+template <typename Op>
+Aio::OpFunc aio_abstract(Op&& op, boost::asio::io_context& context,
+ yield_context yield) {
+ return [op = std::move(op), &context, yield] (Aio* aio, AioResult& r) mutable {
+ // arrange for the completion Handler to run on the yield_context's strand
+ // executor so it can safely call back into Aio without locking
+ using namespace boost::asio;
+ async_completion<yield_context, void()> init(yield);
+ auto ex = get_associated_executor(init.completion_handler);
+
+ auto& ref = r.obj.get_ref();
+ librados::async_operate(context, ref.pool.ioctx(), ref.obj.oid, &op, 0,
+ bind_executor(ex, Handler{aio, r}));
+ };
+}
+
+template <typename Op>
+Aio::OpFunc aio_abstract(Op&& op, optional_yield y) {
+ static_assert(std::is_base_of_v<librados::ObjectOperation, std::decay_t<Op>>);
+ static_assert(!std::is_lvalue_reference_v<Op>);
+ static_assert(!std::is_const_v<Op>);
+ if (y) {
+ return aio_abstract(std::forward<Op>(op), y.get_io_context(),
+ y.get_yield_context());
+ }
+ return aio_abstract(std::forward<Op>(op));
+}
+
+} // anonymous namespace
+
+Aio::OpFunc Aio::librados_op(librados::ObjectReadOperation&& op,
+ optional_yield y) {
+ return aio_abstract(std::move(op), y);
+}
+Aio::OpFunc Aio::librados_op(librados::ObjectWriteOperation&& op,
+ optional_yield y) {
+ return aio_abstract(std::move(op), y);
+}
+
+} // namespace rgw