summaryrefslogtreecommitdiffstats
path: root/src/librbd/journal/ObjectDispatch.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/librbd/journal/ObjectDispatch.cc')
-rw-r--r--src/librbd/journal/ObjectDispatch.cc257
1 files changed, 257 insertions, 0 deletions
diff --git a/src/librbd/journal/ObjectDispatch.cc b/src/librbd/journal/ObjectDispatch.cc
new file mode 100644
index 000000000..e3659c221
--- /dev/null
+++ b/src/librbd/journal/ObjectDispatch.cc
@@ -0,0 +1,257 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/ObjectDispatch.h"
+#include "common/dout.h"
+#include "osdc/Striper.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
+#include "librbd/io/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::ObjectDispatch: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace journal {
+
+using librbd::util::data_object_name;
+using util::create_context_callback;
+
+namespace {
+
+template <typename I>
+struct C_CommitIOEvent : public Context {
+ I* image_ctx;
+ Journal<I>* journal;
+ uint64_t object_no;
+ uint64_t object_off;
+ uint64_t object_len;
+ uint64_t journal_tid;
+ int object_dispatch_flags;
+ Context* on_finish;
+
+ C_CommitIOEvent(I* image_ctx, Journal<I>* journal, uint64_t object_no,
+ uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, int object_dispatch_flags,
+ Context* on_finish)
+ : image_ctx(image_ctx), journal(journal), object_no(object_no),
+ object_off(object_off), object_len(object_len), journal_tid(journal_tid),
+ object_dispatch_flags(object_dispatch_flags), on_finish(on_finish) {
+ }
+
+ void finish(int r) override {
+ // don't commit the IO extent if a previous dispatch handler will just
+ // retry the failed IO
+ if (r >= 0 ||
+ (object_dispatch_flags &
+ io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR) == 0) {
+ auto [image_extents, _] = io::util::object_to_area_extents(
+ image_ctx, object_no, {{object_off, object_len}});
+ for (const auto& extent : image_extents) {
+ journal->commit_io_event_extent(journal_tid, extent.first,
+ extent.second, r);
+ }
+ }
+
+ if (on_finish != nullptr) {
+ on_finish->complete(r);
+ }
+ }
+};
+
+} // anonymous namespace
+
+template <typename I>
+ObjectDispatch<I>::ObjectDispatch(I* image_ctx, Journal<I>* journal)
+ : m_image_ctx(image_ctx), m_journal(journal) {
+}
+
+template <typename I>
+void ObjectDispatch<I>::shut_down(Context* on_finish) {
+ m_image_ctx->op_work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+bool ObjectDispatch<I>::discard(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ IOContext io_context, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
+ << object_off << "~" << object_len << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, object_len, *journal_tid,
+ *object_dispatch_flags, *on_finish);
+ *on_finish = create_context_callback<
+ Context, &Context::complete>(*on_finish, m_journal);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write(
+ uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data,
+ IOContext io_context, int op_flags, int write_flags,
+ std::optional<uint64_t> assert_version,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
+ << object_off << "~" << data.length() << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, data.length(), *journal_tid,
+ *object_dispatch_flags, *on_finish);
+ *on_finish = create_context_callback<
+ Context, &Context::complete>(*on_finish, m_journal);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write_same(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ io::LightweightBufferExtents&& buffer_extents, ceph::bufferlist&& data,
+ IOContext io_context, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
+ << object_off << "~" << object_len << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, object_len, *journal_tid,
+ *object_dispatch_flags, *on_finish);
+ *on_finish = create_context_callback<
+ Context, &Context::complete>(*on_finish, m_journal);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::compare_and_write(
+ uint64_t object_no, uint64_t object_off, ceph::bufferlist&& cmp_data,
+ ceph::bufferlist&& write_data, IOContext io_context, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
+ << object_off << "~" << write_data.length()
+ << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, write_data.length(),
+ *journal_tid, *object_dispatch_flags,
+ *on_finish);
+ *on_finish = create_context_callback<
+ Context, &Context::complete>(*on_finish, m_journal);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::flush(
+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ auto ctx = *on_finish;
+ *on_finish = new LambdaContext(
+ [image_ctx=m_image_ctx, ctx, journal_tid=*journal_tid](int r) {
+ image_ctx->journal->commit_io_event(journal_tid, r);
+ ctx->complete(r);
+ });
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, io::OBJECT_DISPATCH_FLAG_FLUSH,
+ on_dispatched);
+ return true;
+}
+
+template <typename I>
+void ObjectDispatch<I>::extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << object_no << " " << object_off << "~" << object_len
+ << dendl;
+
+ Context *ctx = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, object_len, journal_tid, false,
+ nullptr);
+ if (new_journal_tid != 0) {
+ // ensure new journal event is safely committed to disk before
+ // committing old event
+ m_journal->flush_event(new_journal_tid, ctx);
+ } else {
+ ctx = create_context_callback<
+ Context, &Context::complete>(ctx, m_journal);
+ ctx->complete(0);
+ }
+}
+
+template <typename I>
+void ObjectDispatch<I>::wait_or_flush_event(
+ uint64_t journal_tid, int object_dispatch_flags, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "journal_tid=" << journal_tid << dendl;
+
+ if ((object_dispatch_flags & io::OBJECT_DISPATCH_FLAG_FLUSH) != 0) {
+ m_journal->flush_event(journal_tid, on_dispatched);
+ } else {
+ m_journal->wait_event(journal_tid, on_dispatched);
+ }
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::ObjectDispatch<librbd::ImageCtx>;