summaryrefslogtreecommitdiffstats
path: root/src/librbd/asio
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/asio/ContextWQ.cc49
-rw-r--r--src/librbd/asio/ContextWQ.h52
-rw-r--r--src/librbd/asio/Utils.h33
3 files changed, 134 insertions, 0 deletions
diff --git a/src/librbd/asio/ContextWQ.cc b/src/librbd/asio/ContextWQ.cc
new file mode 100644
index 000000000..4f6c72770
--- /dev/null
+++ b/src/librbd/asio/ContextWQ.cc
@@ -0,0 +1,49 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/asio/ContextWQ.h"
+#include "include/Context.h"
+#include "common/Cond.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::asio::ContextWQ: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace asio {
+
+ContextWQ::ContextWQ(CephContext* cct, boost::asio::io_context& io_context)
+ : m_cct(cct), m_io_context(io_context),
+ m_strand(std::make_unique<boost::asio::io_context::strand>(io_context)),
+ m_queued_ops(0) {
+ ldout(m_cct, 20) << dendl;
+}
+
+ContextWQ::~ContextWQ() {
+ ldout(m_cct, 20) << dendl;
+ drain();
+ m_strand.reset();
+}
+
+void ContextWQ::drain() {
+ ldout(m_cct, 20) << dendl;
+ C_SaferCond ctx;
+ drain_handler(&ctx);
+ ctx.wait();
+}
+
+void ContextWQ::drain_handler(Context* ctx) {
+ if (m_queued_ops == 0) {
+ ctx->complete(0);
+ return;
+ }
+
+ // new items might be queued while we are trying to drain, so we
+ // might need to post the handler multiple times
+ boost::asio::post(*m_strand, [this, ctx]() { drain_handler(ctx); });
+}
+
+} // namespace asio
+} // namespace librbd
diff --git a/src/librbd/asio/ContextWQ.h b/src/librbd/asio/ContextWQ.h
new file mode 100644
index 000000000..85c254161
--- /dev/null
+++ b/src/librbd/asio/ContextWQ.h
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
+#define CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
+
+#include "include/common_fwd.h"
+#include "include/Context.h"
+#include <atomic>
+#include <memory>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/io_context_strand.hpp>
+#include <boost/asio/post.hpp>
+
+namespace librbd {
+namespace asio {
+
+class ContextWQ {
+public:
+ explicit ContextWQ(CephContext* cct, boost::asio::io_context& io_context);
+ ~ContextWQ();
+
+ void drain();
+
+ void queue(Context *ctx, int r = 0) {
+ ++m_queued_ops;
+
+ // ensure all legacy ContextWQ users are dispatched sequentially for
+ // backwards compatibility (i.e. might not be concurrent thread-safe)
+ boost::asio::post(*m_strand, [this, ctx, r]() {
+ ctx->complete(r);
+
+ ceph_assert(m_queued_ops > 0);
+ --m_queued_ops;
+ });
+ }
+
+private:
+ CephContext* m_cct;
+ boost::asio::io_context& m_io_context;
+ std::unique_ptr<boost::asio::io_context::strand> m_strand;
+
+ std::atomic<uint64_t> m_queued_ops;
+
+ void drain_handler(Context* ctx);
+
+};
+
+} // namespace asio
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
diff --git a/src/librbd/asio/Utils.h b/src/librbd/asio/Utils.h
new file mode 100644
index 000000000..2fbbb5846
--- /dev/null
+++ b/src/librbd/asio/Utils.h
@@ -0,0 +1,33 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_ASIO_UTILS_H
+#define CEPH_LIBRBD_ASIO_UTILS_H
+
+#include "include/Context.h"
+#include "include/rados/librados_fwd.hpp"
+#include <boost/system/error_code.hpp>
+
+namespace librbd {
+namespace asio {
+namespace util {
+
+template <typename T>
+auto get_context_adapter(T&& t) {
+ return [t = std::move(t)](boost::system::error_code ec) {
+ t->complete(-ec.value());
+ };
+}
+
+template <typename T>
+auto get_callback_adapter(T&& t) {
+ return [t = std::move(t)](boost::system::error_code ec, auto&& ... args) {
+ t(-ec.value(), std::forward<decltype(args)>(args)...);
+ };
+}
+
+} // namespace util
+} // namespace asio
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_ASIO_UTILS_H