diff options
Diffstat (limited to '')
-rw-r--r-- | src/librbd/io/WriteBlockImageDispatch.cc | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/src/librbd/io/WriteBlockImageDispatch.cc b/src/librbd/io/WriteBlockImageDispatch.cc new file mode 100644 index 000000000..4439a15a7 --- /dev/null +++ b/src/librbd/io/WriteBlockImageDispatch.cc @@ -0,0 +1,271 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/WriteBlockImageDispatch.h" +#include "common/dout.h" +#include "common/Cond.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ImageDispatchSpec.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::WriteBlockImageDispatch: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +template <typename I> +struct WriteBlockImageDispatch<I>::C_BlockedWrites : public Context { + WriteBlockImageDispatch *dispatch; + explicit C_BlockedWrites(WriteBlockImageDispatch *dispatch) + : dispatch(dispatch) { + } + + void finish(int r) override { + dispatch->handle_blocked_writes(r); + } +}; + +template <typename I> +WriteBlockImageDispatch<I>::WriteBlockImageDispatch(I* image_ctx) + : m_image_ctx(image_ctx), + m_lock(ceph::make_shared_mutex( + util::unique_lock_name("librbd::io::WriteBlockImageDispatch::m_lock", + this))) { + auto cct = m_image_ctx->cct; + ldout(cct, 5) << "ictx=" << image_ctx << dendl; +} + +template <typename I> +void WriteBlockImageDispatch<I>::shut_down(Context* on_finish) { + on_finish->complete(0); +} + +template <typename I> +int WriteBlockImageDispatch<I>::block_writes() { + C_SaferCond cond_ctx; + block_writes(&cond_ctx); + return cond_ctx.wait(); +} + +template <typename I> +void WriteBlockImageDispatch<I>::block_writes(Context *on_blocked) { + ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock)); + auto cct = m_image_ctx->cct; + + // ensure owner lock is not held after block_writes completes + on_blocked = util::create_async_context_callback( + *m_image_ctx, on_blocked); + + { + std::unique_lock locker{m_lock}; + ++m_write_blockers; + ldout(cct, 5) << m_image_ctx << ", " + << "num=" << m_write_blockers << dendl; + if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { + ldout(cct, 5) << "waiting for in-flight writes to complete: " + << "in_flight_writes=" << m_in_flight_writes << dendl; + m_write_blocker_contexts.push_back(on_blocked); + return; + } + } + + flush_io(on_blocked); +}; + +template <typename I> +void WriteBlockImageDispatch<I>::unblock_writes() { + auto cct = m_image_ctx->cct; + + Contexts waiter_contexts; + Contexts dispatch_contexts; + { + std::unique_lock locker{m_lock}; + ceph_assert(m_write_blockers > 0); + --m_write_blockers; + + ldout(cct, 5) << m_image_ctx << ", " + << "num=" << m_write_blockers << dendl; + if (m_write_blockers == 0) { + std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); + std::swap(dispatch_contexts, m_on_dispatches); + } + } + + for (auto ctx : waiter_contexts) { + ctx->complete(0); + } + + for (auto ctx : dispatch_contexts) { + ctx->complete(0); + } +} + +template <typename I> +void WriteBlockImageDispatch<I>::wait_on_writes_unblocked( + Context *on_unblocked) { + ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock)); + auto cct = m_image_ctx->cct; + + { + std::unique_lock locker{m_lock}; + ldout(cct, 20) << m_image_ctx << ", " + << "write_blockers=" << m_write_blockers << dendl; + if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { + m_unblocked_write_waiter_contexts.push_back(on_unblocked); + return; + } + } + + on_unblocked->complete(0); +} + +template <typename I> +bool WriteBlockImageDispatch<I>::write( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return process_io(tid, dispatch_result, on_finish, on_dispatched); +} + +template <typename I> +bool WriteBlockImageDispatch<I>::discard( + AioCompletion* aio_comp, Extents &&image_extents, + uint32_t discard_granularity_bytes, IOContext io_context, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return process_io(tid, dispatch_result, on_finish, on_dispatched); +} + +template <typename I> +bool WriteBlockImageDispatch<I>::write_same( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return process_io(tid, dispatch_result, on_finish, on_dispatched); +} + +template <typename I> +bool WriteBlockImageDispatch<I>::compare_and_write( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl, + bufferlist &&bl, uint64_t *mismatch_offset, IOContext io_context, + int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return process_io(tid, dispatch_result, on_finish, on_dispatched); +} + +template <typename I> +bool WriteBlockImageDispatch<I>::flush( + AioCompletion* aio_comp, FlushSource flush_source, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + if (flush_source != FLUSH_SOURCE_USER) { + return false; + } + + return process_io(tid, dispatch_result, on_finish, on_dispatched); +} + +template <typename I> +void WriteBlockImageDispatch<I>::handle_finished(int r, uint64_t tid) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl; + + std::unique_lock locker{m_lock}; + ceph_assert(m_in_flight_writes > 0); + --m_in_flight_writes; + + bool writes_blocked = false; + if (m_write_blockers > 0 && m_in_flight_writes == 0) { + ldout(cct, 10) << "flushing all in-flight IO for blocked writes" << dendl; + writes_blocked = true; + } + locker.unlock(); + + if (writes_blocked) { + flush_io(new C_BlockedWrites(this)); + } +} + +template <typename I> +bool WriteBlockImageDispatch<I>::process_io( + uint64_t tid, DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + std::unique_lock locker{m_lock}; + if (m_write_blockers > 0 || !m_on_dispatches.empty()) { + *dispatch_result = DISPATCH_RESULT_RESTART; + m_on_dispatches.push_back(on_dispatched); + return true; + } + + ++m_in_flight_writes; + *on_finish = new LambdaContext([this, tid, on_finish=*on_finish](int r) { + handle_finished(r, tid); + on_finish->complete(r); + }); + return false; +} + +template <typename I> +void WriteBlockImageDispatch<I>::flush_io(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + // ensure that all in-flight IO is flushed + auto aio_comp = AioCompletion::create_and_start( + on_finish, util::get_image_ctx(m_image_ctx), librbd::io::AIO_TYPE_FLUSH); + auto req = ImageDispatchSpec::create_flush( + *m_image_ctx, IMAGE_DISPATCH_LAYER_WRITE_BLOCK, aio_comp, + FLUSH_SOURCE_WRITE_BLOCK, {}); + req->send(); +} + +template <typename I> +void WriteBlockImageDispatch<I>::handle_blocked_writes(int r) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + Contexts write_blocker_contexts; + { + std::unique_lock locker{m_lock}; + std::swap(write_blocker_contexts, m_write_blocker_contexts); + } + + for (auto ctx : write_blocker_contexts) { + ctx->complete(0); + } +} + +} // namespace io +} // namespace librbd + +template class librbd::io::WriteBlockImageDispatch<librbd::ImageCtx>; |