summaryrefslogtreecommitdiffstats
path: root/src/librbd/cache/WriteAroundObjectDispatch.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/cache/WriteAroundObjectDispatch.cc525
1 files changed, 525 insertions, 0 deletions
diff --git a/src/librbd/cache/WriteAroundObjectDispatch.cc b/src/librbd/cache/WriteAroundObjectDispatch.cc
new file mode 100644
index 000000000..fafb73f40
--- /dev/null
+++ b/src/librbd/cache/WriteAroundObjectDispatch.cc
@@ -0,0 +1,525 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/cache/WriteAroundObjectDispatch.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::WriteAroundObjectDispatch: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+
+using librbd::util::data_object_name;
+
+template <typename I>
+WriteAroundObjectDispatch<I>::WriteAroundObjectDispatch(
+ I* image_ctx, size_t max_dirty, bool writethrough_until_flush)
+ : m_image_ctx(image_ctx), m_init_max_dirty(max_dirty), m_max_dirty(max_dirty),
+ m_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::WriteAroundObjectDispatch::lock", this))) {
+ if (writethrough_until_flush) {
+ m_max_dirty = 0;
+ }
+}
+
+template <typename I>
+WriteAroundObjectDispatch<I>::~WriteAroundObjectDispatch() {
+}
+
+template <typename I>
+void WriteAroundObjectDispatch<I>::init() {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ // add ourself to the IO object dispatcher chain
+ if (m_init_max_dirty > 0) {
+ m_image_ctx->disable_zero_copy = true;
+ }
+ m_image_ctx->io_object_dispatcher->register_dispatch(this);
+}
+
+template <typename I>
+void WriteAroundObjectDispatch<I>::shut_down(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::read(
+ uint64_t object_no, io::ReadExtents* extents, IOContext io_context,
+ int op_flags, int read_flags, const ZTracer::Trace &parent_trace,
+ uint64_t* version, int* object_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ bool handled = false;
+ for (auto& extent: *extents) {
+ handled |= dispatch_unoptimized_io(object_no, extent.offset, extent.length,
+ dispatch_result, on_dispatched);
+ }
+ return handled;
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::discard(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ IOContext io_context, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
+ << object_off << "~" << object_len << dendl;
+
+ return dispatch_io(object_no, object_off, object_len, 0, dispatch_result,
+ on_finish, on_dispatched);
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::write(
+ uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data,
+ IOContext io_context, int op_flags, int write_flags,
+ std::optional<uint64_t> assert_version,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context**on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
+ << object_off << "~" << data.length() << dendl;
+
+ return dispatch_io(object_no, object_off, data.length(), op_flags,
+ dispatch_result, on_finish, on_dispatched);
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::write_same(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ io::LightweightBufferExtents&& buffer_extents, ceph::bufferlist&& data,
+ IOContext io_context, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context**on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
+ << object_off << "~" << object_len << dendl;
+
+ return dispatch_io(object_no, object_off, object_len, op_flags,
+ dispatch_result, on_finish, on_dispatched);
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::compare_and_write(
+ uint64_t object_no, uint64_t object_off, ceph::bufferlist&& cmp_data,
+ ceph::bufferlist&& write_data, IOContext io_context, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ return dispatch_unoptimized_io(object_no, object_off, cmp_data.length(),
+ dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::flush(
+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ std::lock_guard locker{m_lock};
+ if (flush_source == io::FLUSH_SOURCE_USER && !m_user_flushed) {
+ m_user_flushed = true;
+ if (m_max_dirty == 0 && m_init_max_dirty > 0) {
+ ldout(cct, 5) << "first user flush: enabling write-around" << dendl;
+ m_max_dirty = m_init_max_dirty;
+ }
+ }
+
+ if (m_in_flight_io_tids.empty()) {
+ // no in-flight IO (also implies no queued/blocked IO)
+ return false;
+ }
+
+ auto tid = ++m_last_tid;
+ auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ *on_finish = new LambdaContext([this, tid](int r) {
+ handle_in_flight_flush_complete(r, tid);
+ });
+
+ if (m_queued_ios.empty() && m_blocked_ios.empty()) {
+ // immediately allow the flush to be dispatched
+ ldout(cct, 20) << "dispatching: tid=" << tid << dendl;
+ m_in_flight_flushes.emplace(tid, ctx);
+ return false;
+ }
+
+ // cannot dispatch the flush until after preceeding IO is dispatched
+ ldout(cct, 20) << "queueing: tid=" << tid << dendl;
+ m_queued_flushes.emplace(tid, QueuedFlush{ctx, on_dispatched});
+ return true;
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::dispatch_unoptimized_io(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ io::DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+
+ m_lock.lock();
+ auto in_flight_extents_it = m_in_flight_extents.find(object_no);
+ if (in_flight_extents_it == m_in_flight_extents.end() ||
+ !in_flight_extents_it->second.intersects(object_off, object_len)) {
+ // no IO in-flight to the specified extent
+ m_lock.unlock();
+ return false;
+ }
+
+ // write IO is in-flight -- it needs to complete before the unoptimized
+ // IO can be dispatched
+ auto tid = ++m_last_tid;
+ ldout(cct, 20) << "blocked by in-flight IO: tid=" << tid << dendl;
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ m_blocked_unoptimized_ios[object_no].emplace(
+ tid, BlockedIO{object_off, object_len, nullptr, on_dispatched});
+ m_lock.unlock();
+
+ return true;
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::dispatch_io(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ int op_flags, io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+
+ m_lock.lock();
+ if (m_max_dirty == 0) {
+ // write-through mode is active -- no-op the cache
+ m_lock.unlock();
+ return false;
+ }
+
+ if ((op_flags & LIBRADOS_OP_FLAG_FADVISE_FUA) != 0) {
+ // force unit access flag is set -- disable write-around
+ m_lock.unlock();
+ return dispatch_unoptimized_io(object_no, object_off, object_len,
+ dispatch_result, on_dispatched);
+ }
+
+ auto tid = ++m_last_tid;
+ auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ *on_finish = new LambdaContext(
+ [this, tid, object_no, object_off, object_len](int r) {
+ handle_in_flight_io_complete(r, tid, object_no, object_off, object_len);
+ });
+
+ bool blocked = block_overlapping_io(&m_in_flight_extents[object_no],
+ object_off, object_len);
+ if (blocked) {
+ ldout(cct, 20) << "blocked on overlap: tid=" << tid << dendl;
+ m_queued_or_blocked_io_tids.insert(tid);
+ m_blocked_ios[object_no].emplace(tid, BlockedIO{object_off, object_len, ctx,
+ on_dispatched});
+ m_lock.unlock();
+ } else if (can_dispatch_io(tid, object_len)) {
+ m_lock.unlock();
+
+ ldout(cct, 20) << "dispatching: tid=" << tid << dendl;
+ on_dispatched->complete(0);
+ ctx->complete(0);
+ } else {
+ ldout(cct, 20) << "queueing: tid=" << tid << dendl;
+ m_queued_or_blocked_io_tids.insert(tid);
+ m_queued_ios.emplace(tid, QueuedIO{object_len, ctx, on_dispatched});
+ m_lock.unlock();
+ }
+ return true;
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::block_overlapping_io(
+ InFlightObjectExtents* in_flight_object_extents, uint64_t object_off,
+ uint64_t object_len) {
+ if (in_flight_object_extents->intersects(object_off, object_len)) {
+ return true;
+ }
+
+ in_flight_object_extents->insert(object_off, object_len);
+ return false;
+}
+
+template <typename I>
+void WriteAroundObjectDispatch<I>::unblock_overlapping_ios(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ Contexts* unoptimized_io_dispatches) {
+ auto cct = m_image_ctx->cct;
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto in_flight_extents_it = m_in_flight_extents.find(object_no);
+ ceph_assert(in_flight_extents_it != m_in_flight_extents.end());
+
+ auto& in_flight_object_extents = in_flight_extents_it->second;
+ in_flight_object_extents.erase(object_off, object_len);
+
+ // handle unoptimized IOs that were blocked by in-flight IO
+ InFlightObjectExtents blocked_unoptimized_ios;
+ auto blocked_unoptimized_ios_it = m_blocked_unoptimized_ios.find(object_no);
+ if (blocked_unoptimized_ios_it != m_blocked_unoptimized_ios.end()) {
+ auto& blocked_unoptimized_object_ios = blocked_unoptimized_ios_it->second;
+ for (auto it = blocked_unoptimized_object_ios.begin();
+ it != blocked_unoptimized_object_ios.end();) {
+ auto& blocked_io = it->second;
+ if (!in_flight_object_extents.intersects(blocked_io.offset,
+ blocked_io.length)) {
+ unoptimized_io_dispatches->emplace(it->first, blocked_io.on_dispatched);
+ it = blocked_unoptimized_object_ios.erase(it);
+ } else {
+ blocked_unoptimized_ios.union_insert(blocked_io.offset,
+ blocked_io.length);
+ ++it;
+ }
+ }
+
+ if (blocked_unoptimized_object_ios.empty()) {
+ m_blocked_unoptimized_ios.erase(blocked_unoptimized_ios_it);
+ }
+ }
+
+ // handle optimized IOs that were blocked
+ auto blocked_io_it = m_blocked_ios.find(object_no);
+ if (blocked_io_it != m_blocked_ios.end()) {
+ auto& blocked_object_ios = blocked_io_it->second;
+
+ auto blocked_object_ios_it = blocked_object_ios.begin();
+ while (blocked_object_ios_it != blocked_object_ios.end()) {
+ auto next_blocked_object_ios_it = blocked_object_ios_it;
+ ++next_blocked_object_ios_it;
+
+ auto& blocked_io = blocked_object_ios_it->second;
+ if (blocked_unoptimized_ios.intersects(blocked_io.offset,
+ blocked_io.length) ||
+ block_overlapping_io(&in_flight_object_extents, blocked_io.offset,
+ blocked_io.length)) {
+ break;
+ }
+
+ // move unblocked IO to the queued list, which will get processed when
+ // there is capacity
+ auto tid = blocked_object_ios_it->first;
+ ldout(cct, 20) << "queueing unblocked: tid=" << tid << dendl;
+ m_queued_ios.emplace(tid, blocked_io);
+
+ blocked_object_ios.erase(blocked_object_ios_it);
+ blocked_object_ios_it = next_blocked_object_ios_it;
+ }
+
+ if (blocked_object_ios.empty()) {
+ m_blocked_ios.erase(blocked_io_it);
+ }
+ }
+
+ if (in_flight_object_extents.empty()) {
+ m_in_flight_extents.erase(in_flight_extents_it);
+ }
+}
+
+template <typename I>
+bool WriteAroundObjectDispatch<I>::can_dispatch_io(
+ uint64_t tid, uint64_t length) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ if (m_in_flight_bytes == 0 || m_in_flight_bytes + length <= m_max_dirty) {
+ // no in-flight IO or still under max write-around in-flight limit.
+ // allow the dispatcher to proceed to send the IO but complete it back
+ // to the invoker.
+ m_in_flight_bytes += length;
+ m_in_flight_io_tids.insert(tid);
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+void WriteAroundObjectDispatch<I>::handle_in_flight_io_complete(
+ int r, uint64_t tid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl;
+
+ m_lock.lock();
+ m_in_flight_io_tids.erase(tid);
+ ceph_assert(m_in_flight_bytes >= object_len);
+ m_in_flight_bytes -= object_len;
+
+ if (r < 0) {
+ lderr(cct) << "IO error encountered: tid=" << tid << ": "
+ << cpp_strerror(r) << dendl;
+ if (m_pending_flush_error == 0) {
+ m_pending_flush_error = r;
+ }
+ }
+
+ // any overlapping blocked IOs can be queued now
+ Contexts unoptimized_io_dispatches;
+ unblock_overlapping_ios(object_no, object_off, object_len,
+ &unoptimized_io_dispatches);
+
+ // collect any flushes that are ready for completion
+ int pending_flush_error = 0;
+ auto finished_flushes = collect_finished_flushes();
+ if (!finished_flushes.empty()) {
+ std::swap(pending_flush_error, m_pending_flush_error);
+ }
+
+ // collect any queued IOs that are ready for dispatch
+ auto ready_ios = collect_ready_ios();
+
+ // collect any queued flushes that were tied to queued IOs
+ auto ready_flushes = collect_ready_flushes();
+ m_lock.unlock();
+
+ // dispatch any ready unoptimized IOs
+ for (auto& it : unoptimized_io_dispatches) {
+ ldout(cct, 20) << "dispatching unoptimized IO: tid=" << it.first << dendl;
+ it.second->complete(0);
+ }
+
+ // complete flushes that were waiting on in-flight IO
+ // (and propogate any IO error to first flush)
+ for (auto& it : finished_flushes) {
+ ldout(cct, 20) << "completing flush: tid=" << it.first << ", "
+ << "r=" << pending_flush_error << dendl;
+ it.second->complete(pending_flush_error);
+ }
+
+ // dispatch any ready queued IOs
+ for (auto& it : ready_ios) {
+ ldout(cct, 20) << "dispatching IO: tid=" << it.first << dendl;
+ it.second.on_dispatched->complete(0);
+ it.second.on_finish->complete(0);
+ }
+
+ // dispatch any ready flushes
+ for (auto& it : ready_flushes) {
+ ldout(cct, 20) << "dispatching flush: tid=" << it.first << dendl;
+ it.second->complete(0);
+ }
+}
+
+template <typename I>
+void WriteAroundObjectDispatch<I>::handle_in_flight_flush_complete(
+ int r, uint64_t tid) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl;
+
+ m_lock.lock();
+
+ // move the in-flight flush to the pending completion list
+ auto it = m_in_flight_flushes.find(tid);
+ ceph_assert(it != m_in_flight_flushes.end());
+
+ m_pending_flushes.emplace(it->first, it->second);
+ m_in_flight_flushes.erase(it);
+
+ // collect any flushes that are ready for completion
+ int pending_flush_error = 0;
+ auto finished_flushes = collect_finished_flushes();
+ if (!finished_flushes.empty()) {
+ std::swap(pending_flush_error, m_pending_flush_error);
+ }
+ m_lock.unlock();
+
+ // complete flushes that were waiting on in-flight IO
+ // (and propogate any IO errors)
+ for (auto& it : finished_flushes) {
+ ldout(cct, 20) << "completing flush: tid=" << it.first << dendl;
+ it.second->complete(pending_flush_error);
+ pending_flush_error = 0;
+ }
+}
+
+template <typename I>
+typename WriteAroundObjectDispatch<I>::QueuedIOs
+WriteAroundObjectDispatch<I>::collect_ready_ios() {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ QueuedIOs queued_ios;
+
+ while (true) {
+ auto it = m_queued_ios.begin();
+ if (it == m_queued_ios.end() ||
+ !can_dispatch_io(it->first, it->second.length)) {
+ break;
+ }
+
+ queued_ios.emplace(it->first, it->second);
+ m_queued_or_blocked_io_tids.erase(it->first);
+ m_queued_ios.erase(it);
+ }
+ return queued_ios;
+}
+
+template <typename I>
+typename WriteAroundObjectDispatch<I>::Contexts
+WriteAroundObjectDispatch<I>::collect_ready_flushes() {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ Contexts ready_flushes;
+ auto io_tid_it = m_queued_or_blocked_io_tids.begin();
+ while (true) {
+ auto it = m_queued_flushes.begin();
+ if (it == m_queued_flushes.end() ||
+ (io_tid_it != m_queued_or_blocked_io_tids.end() &&
+ *io_tid_it < it->first)) {
+ break;
+ }
+
+ m_in_flight_flushes.emplace(it->first, it->second.on_finish);
+ ready_flushes.emplace(it->first, it->second.on_dispatched);
+ m_queued_flushes.erase(it);
+ }
+
+ return ready_flushes;
+}
+
+template <typename I>
+typename WriteAroundObjectDispatch<I>::Contexts
+WriteAroundObjectDispatch<I>::collect_finished_flushes() {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ Contexts finished_flushes;
+ auto io_tid_it = m_in_flight_io_tids.begin();
+ while (true) {
+ auto it = m_pending_flushes.begin();
+ if (it == m_pending_flushes.end() ||
+ (io_tid_it != m_in_flight_io_tids.end() && *io_tid_it < it->first)) {
+ break;
+ }
+
+ finished_flushes.emplace(it->first, it->second);
+ m_pending_flushes.erase(it);
+ }
+ return finished_flushes;
+}
+
+} // namespace cache
+} // namespace librbd
+
+template class librbd::cache::WriteAroundObjectDispatch<librbd::ImageCtx>;