diff options
Diffstat (limited to 'src/librbd/asio')
-rw-r--r-- | src/librbd/asio/ContextWQ.cc | 49 | ||||
-rw-r--r-- | src/librbd/asio/ContextWQ.h | 52 | ||||
-rw-r--r-- | src/librbd/asio/Utils.h | 33 |
3 files changed, 134 insertions, 0 deletions
diff --git a/src/librbd/asio/ContextWQ.cc b/src/librbd/asio/ContextWQ.cc new file mode 100644 index 000000000..4f6c72770 --- /dev/null +++ b/src/librbd/asio/ContextWQ.cc @@ -0,0 +1,49 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/asio/ContextWQ.h" +#include "include/Context.h" +#include "common/Cond.h" +#include "common/dout.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::asio::ContextWQ: " \ + << this << " " << __func__ << ": " + +namespace librbd { +namespace asio { + +ContextWQ::ContextWQ(CephContext* cct, boost::asio::io_context& io_context) + : m_cct(cct), m_io_context(io_context), + m_strand(std::make_unique<boost::asio::io_context::strand>(io_context)), + m_queued_ops(0) { + ldout(m_cct, 20) << dendl; +} + +ContextWQ::~ContextWQ() { + ldout(m_cct, 20) << dendl; + drain(); + m_strand.reset(); +} + +void ContextWQ::drain() { + ldout(m_cct, 20) << dendl; + C_SaferCond ctx; + drain_handler(&ctx); + ctx.wait(); +} + +void ContextWQ::drain_handler(Context* ctx) { + if (m_queued_ops == 0) { + ctx->complete(0); + return; + } + + // new items might be queued while we are trying to drain, so we + // might need to post the handler multiple times + boost::asio::post(*m_strand, [this, ctx]() { drain_handler(ctx); }); +} + +} // namespace asio +} // namespace librbd diff --git a/src/librbd/asio/ContextWQ.h b/src/librbd/asio/ContextWQ.h new file mode 100644 index 000000000..85c254161 --- /dev/null +++ b/src/librbd/asio/ContextWQ.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_ASIO_CONTEXT_WQ_H +#define CEPH_LIBRBD_ASIO_CONTEXT_WQ_H + +#include "include/common_fwd.h" +#include "include/Context.h" +#include <atomic> +#include <memory> +#include <boost/asio/io_context.hpp> +#include <boost/asio/io_context_strand.hpp> +#include <boost/asio/post.hpp> + +namespace librbd { +namespace asio { + +class ContextWQ { +public: + explicit ContextWQ(CephContext* cct, boost::asio::io_context& io_context); + ~ContextWQ(); + + void drain(); + + void queue(Context *ctx, int r = 0) { + ++m_queued_ops; + + // ensure all legacy ContextWQ users are dispatched sequentially for + // backwards compatibility (i.e. might not be concurrent thread-safe) + boost::asio::post(*m_strand, [this, ctx, r]() { + ctx->complete(r); + + ceph_assert(m_queued_ops > 0); + --m_queued_ops; + }); + } + +private: + CephContext* m_cct; + boost::asio::io_context& m_io_context; + std::unique_ptr<boost::asio::io_context::strand> m_strand; + + std::atomic<uint64_t> m_queued_ops; + + void drain_handler(Context* ctx); + +}; + +} // namespace asio +} // namespace librbd + +#endif // CEPH_LIBRBD_ASIO_CONTEXT_WQ_H diff --git a/src/librbd/asio/Utils.h b/src/librbd/asio/Utils.h new file mode 100644 index 000000000..2fbbb5846 --- /dev/null +++ b/src/librbd/asio/Utils.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_ASIO_UTILS_H +#define CEPH_LIBRBD_ASIO_UTILS_H + +#include "include/Context.h" +#include "include/rados/librados_fwd.hpp" +#include <boost/system/error_code.hpp> + +namespace librbd { +namespace asio { +namespace util { + +template <typename T> +auto get_context_adapter(T&& t) { + return [t = std::move(t)](boost::system::error_code ec) { + t->complete(-ec.value()); + }; +} + +template <typename T> +auto get_callback_adapter(T&& t) { + return [t = std::move(t)](boost::system::error_code ec, auto&& ... args) { + t(-ec.value(), std::forward<decltype(args)>(args)...); + }; +} + +} // namespace util +} // namespace asio +} // namespace librbd + +#endif // CEPH_LIBRBD_ASIO_UTILS_H |