summaryrefslogtreecommitdiffstats
path: root/src/librbd/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/librbd/io')
-rw-r--r--src/librbd/io/AioCompletion.cc216
-rw-r--r--src/librbd/io/AioCompletion.h203
-rw-r--r--src/librbd/io/AsyncOperation.cc94
-rw-r--r--src/librbd/io/AsyncOperation.h52
-rw-r--r--src/librbd/io/CopyupRequest.cc625
-rw-r--r--src/librbd/io/CopyupRequest.h135
-rw-r--r--src/librbd/io/ImageDispatchSpec.cc154
-rw-r--r--src/librbd/io/ImageDispatchSpec.h182
-rw-r--r--src/librbd/io/ImageRequest.cc824
-rw-r--r--src/librbd/io/ImageRequest.h365
-rw-r--r--src/librbd/io/ImageRequestWQ.cc1043
-rw-r--r--src/librbd/io/ImageRequestWQ.h154
-rw-r--r--src/librbd/io/ObjectDispatch.cc135
-rw-r--r--src/librbd/io/ObjectDispatch.h104
-rw-r--r--src/librbd/io/ObjectDispatchInterface.h86
-rw-r--r--src/librbd/io/ObjectDispatchSpec.cc46
-rw-r--r--src/librbd/io/ObjectDispatchSpec.h266
-rw-r--r--src/librbd/io/ObjectDispatcher.cc354
-rw-r--r--src/librbd/io/ObjectDispatcher.h89
-rw-r--r--src/librbd/io/ObjectRequest.cc729
-rw-r--r--src/librbd/io/ObjectRequest.h478
-rw-r--r--src/librbd/io/ReadResult.cc170
-rw-r--r--src/librbd/io/ReadResult.h106
-rw-r--r--src/librbd/io/Types.h83
-rw-r--r--src/librbd/io/Utils.cc57
-rw-r--r--src/librbd/io/Utils.h26
26 files changed, 6776 insertions, 0 deletions
diff --git a/src/librbd/io/AioCompletion.cc b/src/librbd/io/AioCompletion.cc
new file mode 100644
index 00000000..73c58167
--- /dev/null
+++ b/src/librbd/io/AioCompletion.cc
@@ -0,0 +1,216 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/AioCompletion.h"
+#include <errno.h>
+
+#include "common/ceph_context.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/perf_counters.h"
+#include "common/WorkQueue.h"
+
+#include "librbd/ImageCtx.h"
+#include "librbd/internal.h"
+#include "librbd/Journal.h"
+#include "librbd/Types.h"
+
+#ifdef WITH_LTTNG
+#include "tracing/librbd.h"
+#else
+#define tracepoint(...)
+#endif
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::AioCompletion: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+int AioCompletion::wait_for_complete() {
+ tracepoint(librbd, aio_wait_for_complete_enter, this);
+ lock.Lock();
+ while (state != AIO_STATE_COMPLETE)
+ cond.Wait(lock);
+ lock.Unlock();
+ tracepoint(librbd, aio_wait_for_complete_exit, 0);
+ return 0;
+}
+
+void AioCompletion::finalize(ssize_t rval)
+{
+ ceph_assert(lock.is_locked());
+ ceph_assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
+ ldout(cct, 20) << "r=" << rval << dendl;
+ if (rval >= 0 && aio_type == AIO_TYPE_READ) {
+ read_result.assemble_result(cct);
+ }
+}
+
+void AioCompletion::complete() {
+ ceph_assert(lock.is_locked());
+ ceph_assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
+ tracepoint(librbd, aio_complete_enter, this, rval);
+ if (ictx->perfcounter != nullptr) {
+ ceph::timespan elapsed = coarse_mono_clock::now() - start_time;
+ switch (aio_type) {
+ case AIO_TYPE_GENERIC:
+ case AIO_TYPE_OPEN:
+ case AIO_TYPE_CLOSE:
+ break;
+ case AIO_TYPE_READ:
+ ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed); break;
+ case AIO_TYPE_WRITE:
+ ictx->perfcounter->tinc(l_librbd_wr_latency, elapsed); break;
+ case AIO_TYPE_DISCARD:
+ ictx->perfcounter->tinc(l_librbd_discard_latency, elapsed); break;
+ case AIO_TYPE_FLUSH:
+ ictx->perfcounter->tinc(l_librbd_flush_latency, elapsed); break;
+ case AIO_TYPE_WRITESAME:
+ ictx->perfcounter->tinc(l_librbd_ws_latency, elapsed); break;
+ case AIO_TYPE_COMPARE_AND_WRITE:
+ ictx->perfcounter->tinc(l_librbd_cmp_latency, elapsed); break;
+ default:
+ lderr(cct) << "completed invalid aio_type: " << aio_type << dendl;
+ break;
+ }
+ }
+
+ if ((aio_type == AIO_TYPE_CLOSE) ||
+ (aio_type == AIO_TYPE_OPEN && rval < 0)) {
+ // must destroy ImageCtx prior to invoking callback
+ delete ictx;
+ ictx = nullptr;
+ }
+
+ state = AIO_STATE_CALLBACK;
+ if (complete_cb) {
+ lock.Unlock();
+ complete_cb(rbd_comp, complete_arg);
+ lock.Lock();
+ }
+
+ if (ictx != nullptr && event_notify && ictx->event_socket.is_valid()) {
+ ictx->completed_reqs_lock.Lock();
+ ictx->completed_reqs.push_back(&m_xlist_item);
+ ictx->completed_reqs_lock.Unlock();
+ ictx->event_socket.notify();
+ }
+
+ state = AIO_STATE_COMPLETE;
+ cond.Signal();
+
+ // note: possible for image to be closed after op marked finished
+ if (async_op.started()) {
+ async_op.finish_op();
+ }
+ tracepoint(librbd, aio_complete_exit);
+}
+
+void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
+ Mutex::Locker locker(lock);
+ if (ictx == nullptr) {
+ ictx = i;
+ aio_type = t;
+ start_time = coarse_mono_clock::now();
+ }
+}
+
+void AioCompletion::start_op() {
+ Mutex::Locker locker(lock);
+ ceph_assert(ictx != nullptr);
+
+ if (aio_type == AIO_TYPE_OPEN || aio_type == AIO_TYPE_CLOSE) {
+ // no need to track async open/close operations
+ return;
+ }
+
+ ceph_assert(!async_op.started());
+ async_op.start_op(*ictx);
+}
+
+void AioCompletion::fail(int r)
+{
+ lock.Lock();
+ ceph_assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
+ lderr(cct) << cpp_strerror(r) << dendl;
+ ceph_assert(pending_count == 0);
+ rval = r;
+ complete();
+ put_unlock();
+}
+
+void AioCompletion::set_request_count(uint32_t count) {
+ lock.Lock();
+ ceph_assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
+ ldout(cct, 20) << "pending=" << count << dendl;
+ ceph_assert(pending_count == 0);
+
+ if (count > 0) {
+ pending_count = count;
+ lock.Unlock();
+ } else {
+ pending_count = 1;
+ lock.Unlock();
+
+ // ensure completion fires in clean lock context
+ ictx->op_work_queue->queue(new C_AioRequest(this), 0);
+ }
+}
+
+void AioCompletion::complete_request(ssize_t r)
+{
+ lock.Lock();
+ ceph_assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
+ if (rval >= 0) {
+ if (r < 0 && r != -EEXIST)
+ rval = r;
+ else if (r > 0)
+ rval += r;
+ }
+ ceph_assert(pending_count);
+ int count = --pending_count;
+
+ ldout(cct, 20) << "cb=" << complete_cb << ", "
+ << "pending=" << pending_count << dendl;
+ if (!count) {
+ finalize(rval);
+ complete();
+ }
+ put_unlock();
+}
+
+bool AioCompletion::is_complete() {
+ tracepoint(librbd, aio_is_complete_enter, this);
+ bool done;
+ {
+ Mutex::Locker l(lock);
+ done = this->state == AIO_STATE_COMPLETE;
+ }
+ tracepoint(librbd, aio_is_complete_exit, done);
+ return done;
+}
+
+ssize_t AioCompletion::get_return_value() {
+ tracepoint(librbd, aio_get_return_value_enter, this);
+ lock.Lock();
+ ssize_t r = rval;
+ lock.Unlock();
+ tracepoint(librbd, aio_get_return_value_exit, r);
+ return r;
+}
+
+} // namespace io
+} // namespace librbd
diff --git a/src/librbd/io/AioCompletion.h b/src/librbd/io/AioCompletion.h
new file mode 100644
index 00000000..f3551a02
--- /dev/null
+++ b/src/librbd/io/AioCompletion.h
@@ -0,0 +1,203 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_AIO_COMPLETION_H
+#define CEPH_LIBRBD_IO_AIO_COMPLETION_H
+
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/ceph_time.h"
+#include "include/Context.h"
+#include "include/utime.h"
+#include "include/rbd/librbd.hpp"
+
+#include "librbd/ImageCtx.h"
+#include "librbd/io/AsyncOperation.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/io/Types.h"
+
+class CephContext;
+
+namespace librbd {
+namespace io {
+
+
+/**
+ * AioCompletion is the overall completion for a single
+ * rbd I/O request. It may be composed of many AioObjectRequests,
+ * which each go to a single object.
+ *
+ * The retrying of individual requests is handled at a lower level,
+ * so all AioCompletion cares about is the count of outstanding
+ * requests. The number of expected individual requests should be
+ * set initially using set_request_count() prior to issuing the
+ * requests. This ensures that the completion will not be completed
+ * within the caller's thread of execution (instead via a librados
+ * context or via a thread pool context for cache read hits).
+ */
+struct AioCompletion {
+ typedef enum {
+ AIO_STATE_PENDING = 0,
+ AIO_STATE_CALLBACK,
+ AIO_STATE_COMPLETE,
+ } aio_state_t;
+
+ mutable Mutex lock;
+ Cond cond;
+ aio_state_t state;
+ ssize_t rval;
+ callback_t complete_cb;
+ void *complete_arg;
+ rbd_completion_t rbd_comp;
+ uint32_t pending_count; ///< number of requests
+ int ref;
+ bool released;
+ ImageCtx *ictx;
+ coarse_mono_time start_time;
+ aio_type_t aio_type;
+
+ ReadResult read_result;
+
+ AsyncOperation async_op;
+
+ xlist<AioCompletion*>::item m_xlist_item;
+ bool event_notify;
+
+ template <typename T, void (T::*MF)(int)>
+ static void callback_adapter(completion_t cb, void *arg) {
+ AioCompletion *comp = reinterpret_cast<AioCompletion *>(cb);
+ T *t = reinterpret_cast<T *>(arg);
+ (t->*MF)(comp->get_return_value());
+ comp->release();
+ }
+
+ static AioCompletion *create(void *cb_arg, callback_t cb_complete,
+ rbd_completion_t rbd_comp) {
+ AioCompletion *comp = new AioCompletion();
+ comp->set_complete_cb(cb_arg, cb_complete);
+ comp->rbd_comp = (rbd_comp != nullptr ? rbd_comp : comp);
+ return comp;
+ }
+
+ template <typename T, void (T::*MF)(int) = &T::complete>
+ static AioCompletion *create(T *obj) {
+ AioCompletion *comp = new AioCompletion();
+ comp->set_complete_cb(obj, &callback_adapter<T, MF>);
+ comp->rbd_comp = comp;
+ return comp;
+ }
+
+ template <typename T, void (T::*MF)(int) = &T::complete>
+ static AioCompletion *create_and_start(T *obj, ImageCtx *image_ctx,
+ aio_type_t type) {
+ AioCompletion *comp = create<T, MF>(obj);
+ comp->init_time(image_ctx, type);
+ comp->start_op();
+ return comp;
+ }
+
+ AioCompletion() : lock("AioCompletion::lock", true, false),
+ state(AIO_STATE_PENDING), rval(0), complete_cb(NULL),
+ complete_arg(NULL), rbd_comp(NULL),
+ pending_count(0), ref(1), released(false), ictx(NULL),
+ aio_type(AIO_TYPE_NONE), m_xlist_item(this),
+ event_notify(false) {
+ }
+
+ ~AioCompletion() {
+ }
+
+ int wait_for_complete();
+
+ void finalize(ssize_t rval);
+
+ inline bool is_initialized(aio_type_t type) const {
+ Mutex::Locker locker(lock);
+ return ((ictx != nullptr) && (aio_type == type));
+ }
+ inline bool is_started() const {
+ Mutex::Locker locker(lock);
+ return async_op.started();
+ }
+
+ void init_time(ImageCtx *i, aio_type_t t);
+ void start_op();
+ void fail(int r);
+
+ void complete();
+
+ void set_complete_cb(void *cb_arg, callback_t cb) {
+ complete_cb = cb;
+ complete_arg = cb_arg;
+ }
+
+ void set_request_count(uint32_t num);
+ void add_request() {
+ lock.Lock();
+ ceph_assert(pending_count > 0);
+ lock.Unlock();
+ get();
+ }
+ void complete_request(ssize_t r);
+
+ bool is_complete();
+
+ ssize_t get_return_value();
+
+ void get() {
+ lock.Lock();
+ ceph_assert(ref > 0);
+ ref++;
+ lock.Unlock();
+ }
+ void release() {
+ lock.Lock();
+ ceph_assert(!released);
+ released = true;
+ put_unlock();
+ }
+ void put() {
+ lock.Lock();
+ put_unlock();
+ }
+ void put_unlock() {
+ ceph_assert(ref > 0);
+ int n = --ref;
+ lock.Unlock();
+ if (!n) {
+ if (ictx != nullptr && event_notify) {
+ ictx->completed_reqs_lock.Lock();
+ m_xlist_item.remove_myself();
+ ictx->completed_reqs_lock.Unlock();
+ }
+ delete this;
+ }
+ }
+
+ void set_event_notify(bool s) {
+ Mutex::Locker l(lock);
+ event_notify = s;
+ }
+
+ void *get_arg() {
+ return complete_arg;
+ }
+};
+
+class C_AioRequest : public Context {
+public:
+ C_AioRequest(AioCompletion *completion) : m_completion(completion) {
+ m_completion->add_request();
+ }
+ ~C_AioRequest() override {}
+ void finish(int r) override {
+ m_completion->complete_request(r);
+ }
+protected:
+ AioCompletion *m_completion;
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_AIO_COMPLETION_H
diff --git a/src/librbd/io/AsyncOperation.cc b/src/librbd/io/AsyncOperation.cc
new file mode 100644
index 00000000..c5a3bc93
--- /dev/null
+++ b/src/librbd/io/AsyncOperation.cc
@@ -0,0 +1,94 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/AsyncOperation.h"
+#include "librbd/ImageCtx.h"
+#include "common/dout.h"
+#include "common/WorkQueue.h"
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::AsyncOperation: "
+
+namespace librbd {
+namespace io {
+
+namespace {
+
+struct C_CompleteFlushes : public Context {
+ ImageCtx *image_ctx;
+ std::list<Context *> flush_contexts;
+
+ explicit C_CompleteFlushes(ImageCtx *image_ctx,
+ std::list<Context *> &&flush_contexts)
+ : image_ctx(image_ctx), flush_contexts(std::move(flush_contexts)) {
+ }
+ void finish(int r) override {
+ RWLock::RLocker owner_locker(image_ctx->owner_lock);
+ while (!flush_contexts.empty()) {
+ Context *flush_ctx = flush_contexts.front();
+ flush_contexts.pop_front();
+
+ ldout(image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl;
+ flush_ctx->complete(0);
+ }
+ }
+};
+
+} // anonymous namespace
+
+void AsyncOperation::start_op(ImageCtx &image_ctx) {
+ ceph_assert(m_image_ctx == NULL);
+ m_image_ctx = &image_ctx;
+
+ ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl;
+ Mutex::Locker l(m_image_ctx->async_ops_lock);
+ m_image_ctx->async_ops.push_front(&m_xlist_item);
+}
+
+void AsyncOperation::finish_op() {
+ ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl;
+
+ {
+ Mutex::Locker l(m_image_ctx->async_ops_lock);
+ xlist<AsyncOperation *>::iterator iter(&m_xlist_item);
+ ++iter;
+ ceph_assert(m_xlist_item.remove_myself());
+
+ // linked list stored newest -> oldest ops
+ if (!iter.end() && !m_flush_contexts.empty()) {
+ ldout(m_image_ctx->cct, 20) << "moving flush contexts to previous op: "
+ << *iter << dendl;
+ (*iter)->m_flush_contexts.insert((*iter)->m_flush_contexts.end(),
+ m_flush_contexts.begin(),
+ m_flush_contexts.end());
+ return;
+ }
+ }
+
+ if (!m_flush_contexts.empty()) {
+ C_CompleteFlushes *ctx = new C_CompleteFlushes(m_image_ctx,
+ std::move(m_flush_contexts));
+ m_image_ctx->op_work_queue->queue(ctx);
+ }
+}
+
+void AsyncOperation::flush(Context* on_finish) {
+ {
+ Mutex::Locker locker(m_image_ctx->async_ops_lock);
+ xlist<AsyncOperation *>::iterator iter(&m_xlist_item);
+ ++iter;
+
+ // linked list stored newest -> oldest ops
+ if (!iter.end()) {
+ (*iter)->m_flush_contexts.push_back(on_finish);
+ return;
+ }
+ }
+
+ m_image_ctx->op_work_queue->queue(on_finish);
+}
+
+} // namespace io
+} // namespace librbd
diff --git a/src/librbd/io/AsyncOperation.h b/src/librbd/io/AsyncOperation.h
new file mode 100644
index 00000000..b0a37c4b
--- /dev/null
+++ b/src/librbd/io/AsyncOperation.h
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef LIBRBD_IO_ASYNC_OPERATION_H
+#define LIBRBD_IO_ASYNC_OPERATION_H
+
+#include "include/ceph_assert.h"
+#include "include/xlist.h"
+#include <list>
+
+class Context;
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace io {
+
+class AsyncOperation {
+public:
+
+ AsyncOperation()
+ : m_image_ctx(NULL), m_xlist_item(this)
+ {
+ }
+
+ ~AsyncOperation()
+ {
+ ceph_assert(!m_xlist_item.is_on_list());
+ }
+
+ inline bool started() const {
+ return m_xlist_item.is_on_list();
+ }
+
+ void start_op(ImageCtx &image_ctx);
+ void finish_op();
+
+ void flush(Context *on_finish);
+
+private:
+
+ ImageCtx *m_image_ctx;
+ xlist<AsyncOperation *>::item m_xlist_item;
+ std::list<Context *> m_flush_contexts;
+
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // LIBRBD_IO_ASYNC_OPERATION_H
diff --git a/src/librbd/io/CopyupRequest.cc b/src/librbd/io/CopyupRequest.cc
new file mode 100644
index 00000000..c2ebb10f
--- /dev/null
+++ b/src/librbd/io/CopyupRequest.cc
@@ -0,0 +1,625 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/CopyupRequest.h"
+#include "common/ceph_context.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/Mutex.h"
+#include "common/WorkQueue.h"
+#include "librbd/AsyncObjectThrottle.h"
+#include "librbd/ExclusiveLock.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ObjectMap.h"
+#include "librbd/Utils.h"
+#include "librbd/deep_copy/ObjectCopyRequest.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ImageRequest.h"
+#include "librbd/io/ObjectRequest.h"
+#include "librbd/io/ReadResult.h"
+
+#include <boost/bind.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/lambda/construct.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::CopyupRequest: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+namespace {
+
+template <typename I>
+class C_UpdateObjectMap : public C_AsyncObjectThrottle<I> {
+public:
+ C_UpdateObjectMap(AsyncObjectThrottle<I> &throttle, I *image_ctx,
+ uint64_t object_no, uint8_t head_object_map_state,
+ const std::vector<uint64_t> *snap_ids,
+ bool first_snap_is_clean, const ZTracer::Trace &trace,
+ size_t snap_id_idx)
+ : C_AsyncObjectThrottle<I>(throttle, *image_ctx), m_object_no(object_no),
+ m_head_object_map_state(head_object_map_state), m_snap_ids(*snap_ids),
+ m_first_snap_is_clean(first_snap_is_clean), m_trace(trace),
+ m_snap_id_idx(snap_id_idx)
+ {
+ }
+
+ int send() override {
+ auto& image_ctx = this->m_image_ctx;
+ ceph_assert(image_ctx.owner_lock.is_locked());
+ if (image_ctx.exclusive_lock == nullptr) {
+ return 1;
+ }
+ ceph_assert(image_ctx.exclusive_lock->is_lock_owner());
+
+ RWLock::RLocker snap_locker(image_ctx.snap_lock);
+ if (image_ctx.object_map == nullptr) {
+ return 1;
+ }
+
+ uint64_t snap_id = m_snap_ids[m_snap_id_idx];
+ if (snap_id == CEPH_NOSNAP) {
+ return update_head();
+ } else {
+ return update_snapshot(snap_id);
+ }
+ }
+
+ int update_head() {
+ auto& image_ctx = this->m_image_ctx;
+ RWLock::WLocker object_map_locker(image_ctx.object_map_lock);
+ bool sent = image_ctx.object_map->template aio_update<Context>(
+ CEPH_NOSNAP, m_object_no, m_head_object_map_state, {}, m_trace, false,
+ this);
+ return (sent ? 0 : 1);
+ }
+
+ int update_snapshot(uint64_t snap_id) {
+ auto& image_ctx = this->m_image_ctx;
+ uint8_t state = OBJECT_EXISTS;
+ if (image_ctx.test_features(RBD_FEATURE_FAST_DIFF, image_ctx.snap_lock) &&
+ (m_snap_id_idx > 0 || m_first_snap_is_clean)) {
+ // first snapshot should be exists+dirty since it contains
+ // the copyup data -- later snapshots inherit the data.
+ state = OBJECT_EXISTS_CLEAN;
+ }
+
+ RWLock::RLocker object_map_locker(image_ctx.object_map_lock);
+ bool sent = image_ctx.object_map->template aio_update<Context>(
+ snap_id, m_object_no, state, {}, m_trace, true, this);
+ ceph_assert(sent);
+ return 0;
+ }
+
+private:
+ uint64_t m_object_no;
+ uint8_t m_head_object_map_state;
+ const std::vector<uint64_t> &m_snap_ids;
+ bool m_first_snap_is_clean;
+ const ZTracer::Trace &m_trace;
+ size_t m_snap_id_idx;
+};
+
+} // anonymous namespace
+
+template <typename I>
+CopyupRequest<I>::CopyupRequest(I *ictx, const std::string &oid,
+ uint64_t objectno, Extents &&image_extents,
+ const ZTracer::Trace &parent_trace)
+ : m_image_ctx(ictx), m_oid(oid), m_object_no(objectno),
+ m_image_extents(image_extents),
+ m_trace(util::create_trace(*m_image_ctx, "copy-up", parent_trace)),
+ m_lock("CopyupRequest", false, false)
+{
+ ceph_assert(m_image_ctx->data_ctx.is_valid());
+ m_async_op.start_op(*util::get_image_ctx(m_image_ctx));
+}
+
+template <typename I>
+CopyupRequest<I>::~CopyupRequest() {
+ ceph_assert(m_pending_requests.empty());
+ m_async_op.finish_op();
+}
+
+template <typename I>
+void CopyupRequest<I>::append_request(AbstractObjectWriteRequest<I> *req) {
+ Mutex::Locker locker(m_lock);
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "oid=" << m_oid << ", "
+ << "object_request=" << req << ", "
+ << "append=" << m_append_request_permitted << dendl;
+ if (m_append_request_permitted) {
+ m_pending_requests.push_back(req);
+ } else {
+ m_restart_requests.push_back(req);
+ }
+}
+
+template <typename I>
+void CopyupRequest<I>::send() {
+ read_from_parent();
+}
+
+template <typename I>
+void CopyupRequest<I>::read_from_parent() {
+ auto cct = m_image_ctx->cct;
+ RWLock::RLocker snap_locker(m_image_ctx->snap_lock);
+ RWLock::RLocker parent_locker(m_image_ctx->parent_lock);
+
+ if (m_image_ctx->parent == nullptr) {
+ ldout(cct, 5) << "parent detached" << dendl;
+
+ m_image_ctx->op_work_queue->queue(
+ util::create_context_callback<
+ CopyupRequest<I>, &CopyupRequest<I>::handle_read_from_parent>(this),
+ -ENOENT);
+ return;
+ } else if (is_deep_copy()) {
+ deep_copy();
+ return;
+ }
+
+ auto comp = AioCompletion::create_and_start<
+ CopyupRequest<I>,
+ &CopyupRequest<I>::handle_read_from_parent>(
+ this, util::get_image_ctx(m_image_ctx->parent), AIO_TYPE_READ);
+
+ ldout(cct, 20) << "oid=" << m_oid << ", "
+ << "completion=" << comp << ", "
+ << "extents=" << m_image_extents
+ << dendl;
+ ImageRequest<I>::aio_read(m_image_ctx->parent, comp,
+ std::move(m_image_extents),
+ ReadResult{&m_copyup_data}, 0, m_trace);
+}
+
+template <typename I>
+void CopyupRequest<I>::handle_read_from_parent(int r) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl;
+
+ m_image_ctx->snap_lock.get_read();
+ m_lock.Lock();
+ m_copyup_is_zero = m_copyup_data.is_zero();
+ m_copyup_required = is_copyup_required();
+ disable_append_requests();
+
+ if (r < 0 && r != -ENOENT) {
+ m_lock.Unlock();
+ m_image_ctx->snap_lock.put_read();
+
+ lderr(cct) << "error reading from parent: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ if (!m_copyup_required) {
+ m_lock.Unlock();
+ m_image_ctx->snap_lock.put_read();
+
+ ldout(cct, 20) << "no-op, skipping" << dendl;
+ finish(0);
+ return;
+ }
+
+ // copyup() will affect snapshots only if parent data is not all
+ // zeros.
+ if (!m_copyup_is_zero) {
+ m_snap_ids.insert(m_snap_ids.end(), m_image_ctx->snaps.rbegin(),
+ m_image_ctx->snaps.rend());
+ }
+
+ m_lock.Unlock();
+ m_image_ctx->snap_lock.put_read();
+
+ update_object_maps();
+}
+
+template <typename I>
+void CopyupRequest<I>::deep_copy() {
+ auto cct = m_image_ctx->cct;
+ ceph_assert(m_image_ctx->snap_lock.is_locked());
+ ceph_assert(m_image_ctx->parent_lock.is_locked());
+ ceph_assert(m_image_ctx->parent != nullptr);
+
+ m_lock.Lock();
+ m_flatten = is_copyup_required() ? true : m_image_ctx->migration_info.flatten;
+ m_lock.Unlock();
+
+ ldout(cct, 20) << "oid=" << m_oid << ", flatten=" << m_flatten << dendl;
+
+ auto ctx = util::create_context_callback<
+ CopyupRequest<I>, &CopyupRequest<I>::handle_deep_copy>(this);
+ auto req = deep_copy::ObjectCopyRequest<I>::create(
+ m_image_ctx->parent, m_image_ctx, 0, 0,
+ m_image_ctx->migration_info.snap_map, m_object_no, m_flatten, ctx);
+
+ req->send();
+}
+
+template <typename I>
+void CopyupRequest<I>::handle_deep_copy(int r) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl;
+
+ m_image_ctx->snap_lock.get_read();
+ m_lock.Lock();
+ m_copyup_required = is_copyup_required();
+ if (r == -ENOENT && !m_flatten && m_copyup_required) {
+ m_lock.Unlock();
+ m_image_ctx->snap_lock.put_read();
+
+ ldout(cct, 10) << "restart deep-copy with flatten" << dendl;
+ send();
+ return;
+ }
+
+ disable_append_requests();
+
+ if (r < 0 && r != -ENOENT) {
+ m_lock.Unlock();
+ m_image_ctx->snap_lock.put_read();
+
+ lderr(cct) << "error encountered during deep-copy: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ }
+
+ if (!m_copyup_required && !is_update_object_map_required(r)) {
+ m_lock.Unlock();
+ m_image_ctx->snap_lock.put_read();
+
+ if (r == -ENOENT) {
+ r = 0;
+ }
+
+ ldout(cct, 20) << "skipping" << dendl;
+ finish(r);
+ return;
+ }
+
+ // For deep-copy, copyup() will never affect snapshots. However,
+ // this state machine is responsible for updating object maps for
+ // snapshots that have been created on destination image after
+ // migration started.
+ if (r != -ENOENT) {
+ compute_deep_copy_snap_ids();
+ }
+
+ m_lock.Unlock();
+ m_image_ctx->snap_lock.put_read();
+
+ update_object_maps();
+}
+
+template <typename I>
+void CopyupRequest<I>::update_object_maps() {
+ RWLock::RLocker owner_locker(m_image_ctx->owner_lock);
+ RWLock::RLocker snap_locker(m_image_ctx->snap_lock);
+ if (m_image_ctx->object_map == nullptr) {
+ snap_locker.unlock();
+ owner_locker.unlock();
+
+ copyup();
+ return;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "oid=" << m_oid << dendl;
+
+ bool copy_on_read = m_pending_requests.empty();
+ uint8_t head_object_map_state = OBJECT_EXISTS;
+ if (copy_on_read && !m_snap_ids.empty() &&
+ m_image_ctx->test_features(RBD_FEATURE_FAST_DIFF,
+ m_image_ctx->snap_lock)) {
+ // HEAD is non-dirty since data is tied to first snapshot
+ head_object_map_state = OBJECT_EXISTS_CLEAN;
+ }
+
+ auto r_it = m_pending_requests.rbegin();
+ if (r_it != m_pending_requests.rend()) {
+ // last write-op determines the final object map state
+ head_object_map_state = (*r_it)->get_pre_write_object_map_state();
+ }
+
+ RWLock::WLocker object_map_locker(m_image_ctx->object_map_lock);
+ if ((*m_image_ctx->object_map)[m_object_no] != head_object_map_state) {
+ // (maybe) need to update the HEAD object map state
+ m_snap_ids.push_back(CEPH_NOSNAP);
+ }
+ object_map_locker.unlock();
+ snap_locker.unlock();
+
+ ceph_assert(m_image_ctx->exclusive_lock->is_lock_owner());
+ typename AsyncObjectThrottle<I>::ContextFactory context_factory(
+ boost::lambda::bind(boost::lambda::new_ptr<C_UpdateObjectMap<I>>(),
+ boost::lambda::_1, m_image_ctx, m_object_no, head_object_map_state,
+ &m_snap_ids, m_first_snap_is_clean, m_trace, boost::lambda::_2));
+ auto ctx = util::create_context_callback<
+ CopyupRequest<I>, &CopyupRequest<I>::handle_update_object_maps>(this);
+ auto throttle = new AsyncObjectThrottle<I>(
+ nullptr, *m_image_ctx, context_factory, ctx, nullptr, 0, m_snap_ids.size());
+ throttle->start_ops(
+ m_image_ctx->config.template get_val<uint64_t>("rbd_concurrent_management_ops"));
+}
+
+template <typename I>
+void CopyupRequest<I>::handle_update_object_maps(int r) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_image_ctx->cct) << "failed to update object map: "
+ << cpp_strerror(r) << dendl;
+
+ finish(r);
+ return;
+ }
+
+ copyup();
+}
+
+template <typename I>
+void CopyupRequest<I>::copyup() {
+ auto cct = m_image_ctx->cct;
+ m_image_ctx->snap_lock.get_read();
+ auto snapc = m_image_ctx->snapc;
+ m_image_ctx->snap_lock.put_read();
+
+ m_lock.Lock();
+ if (!m_copyup_required) {
+ m_lock.Unlock();
+
+ ldout(cct, 20) << "skipping copyup" << dendl;
+ finish(0);
+ return;
+ }
+
+ ldout(cct, 20) << "oid=" << m_oid << dendl;
+
+ bool copy_on_read = m_pending_requests.empty();
+ bool deep_copyup = !snapc.snaps.empty() && !m_copyup_is_zero;
+ if (m_copyup_is_zero) {
+ m_copyup_data.clear();
+ }
+
+ int r;
+ librados::ObjectWriteOperation copyup_op;
+ if (copy_on_read || deep_copyup) {
+ copyup_op.exec("rbd", "copyup", m_copyup_data);
+ ObjectRequest<I>::add_write_hint(*m_image_ctx, &copyup_op);
+ ++m_pending_copyups;
+ }
+
+ librados::ObjectWriteOperation write_op;
+ if (!copy_on_read) {
+ if (!deep_copyup) {
+ write_op.exec("rbd", "copyup", m_copyup_data);
+ ObjectRequest<I>::add_write_hint(*m_image_ctx, &write_op);
+ }
+
+ // merge all pending write ops into this single RADOS op
+ for (auto req : m_pending_requests) {
+ ldout(cct, 20) << "add_copyup_ops " << req << dendl;
+ req->add_copyup_ops(&write_op);
+ }
+
+ if (write_op.size() > 0) {
+ ++m_pending_copyups;
+ }
+ }
+ m_lock.Unlock();
+
+ // issue librados ops at the end to simplify test cases
+ std::vector<librados::snap_t> snaps;
+ if (copyup_op.size() > 0) {
+ // send only the copyup request with a blank snapshot context so that
+ // all snapshots are detected from the parent for this object. If
+ // this is a CoW request, a second request will be created for the
+ // actual modification.
+ ldout(cct, 20) << "copyup with empty snapshot context" << dendl;
+
+ auto comp = util::create_rados_callback<
+ CopyupRequest<I>, &CopyupRequest<I>::handle_copyup>(this);
+ r = m_image_ctx->data_ctx.aio_operate(
+ m_oid, comp, &copyup_op, 0, snaps,
+ (m_trace.valid() ? m_trace.get_info() : nullptr));
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ if (write_op.size() > 0) {
+ // compare-and-write doesn't add any write ops (copyup+cmpext+write
+ // can't be executed in the same RADOS op because, unless the object
+ // was already present in the clone, cmpext wouldn't see it)
+ ldout(cct, 20) << (!deep_copyup && write_op.size() > 2 ?
+ "copyup + ops" : !deep_copyup ? "copyup" : "ops")
+ << " with current snapshot context" << dendl;
+
+ snaps.insert(snaps.end(), snapc.snaps.begin(), snapc.snaps.end());
+ auto comp = util::create_rados_callback<
+ CopyupRequest<I>, &CopyupRequest<I>::handle_copyup>(this);
+ r = m_image_ctx->data_ctx.aio_operate(
+ m_oid, comp, &write_op, snapc.seq, snaps,
+ (m_trace.valid() ? m_trace.get_info() : nullptr));
+ ceph_assert(r == 0);
+ comp->release();
+ }
+}
+
+template <typename I>
+void CopyupRequest<I>::handle_copyup(int r) {
+ auto cct = m_image_ctx->cct;
+ unsigned pending_copyups;
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_pending_copyups > 0);
+ pending_copyups = --m_pending_copyups;
+ }
+
+ ldout(cct, 20) << "oid=" << m_oid << ", " << "r=" << r << ", "
+ << "pending=" << pending_copyups << dendl;
+
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << "failed to copyup object: " << cpp_strerror(r) << dendl;
+ complete_requests(false, r);
+ }
+
+ if (pending_copyups == 0) {
+ finish(0);
+ }
+}
+
+template <typename I>
+void CopyupRequest<I>::finish(int r) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl;
+
+ complete_requests(true, r);
+ delete this;
+}
+
+template <typename I>
+void CopyupRequest<I>::complete_requests(bool override_restart_retval, int r) {
+ auto cct = m_image_ctx->cct;
+ remove_from_list();
+
+ while (!m_pending_requests.empty()) {
+ auto it = m_pending_requests.begin();
+ auto req = *it;
+ ldout(cct, 20) << "completing request " << req << dendl;
+ req->handle_copyup(r);
+ m_pending_requests.erase(it);
+ }
+
+ if (override_restart_retval) {
+ r = -ERESTART;
+ }
+
+ while (!m_restart_requests.empty()) {
+ auto it = m_restart_requests.begin();
+ auto req = *it;
+ ldout(cct, 20) << "restarting request " << req << dendl;
+ req->handle_copyup(r);
+ m_restart_requests.erase(it);
+ }
+}
+
+template <typename I>
+void CopyupRequest<I>::disable_append_requests() {
+ ceph_assert(m_lock.is_locked());
+ m_append_request_permitted = false;
+}
+
+template <typename I>
+void CopyupRequest<I>::remove_from_list() {
+ Mutex::Locker copyup_list_locker(m_image_ctx->copyup_list_lock);
+
+ auto it = m_image_ctx->copyup_list.find(m_object_no);
+ if (it != m_image_ctx->copyup_list.end()) {
+ m_image_ctx->copyup_list.erase(it);
+ }
+}
+
+template <typename I>
+bool CopyupRequest<I>::is_copyup_required() {
+ ceph_assert(m_lock.is_locked());
+
+ bool copy_on_read = m_pending_requests.empty();
+ if (copy_on_read) {
+ // always force a copyup if CoR enabled
+ return true;
+ }
+
+ if (!m_copyup_is_zero) {
+ return true;
+ }
+
+ for (auto req : m_pending_requests) {
+ if (!req->is_empty_write_op()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+template <typename I>
+bool CopyupRequest<I>::is_deep_copy() const {
+ ceph_assert(m_image_ctx->snap_lock.is_locked());
+ return !m_image_ctx->migration_info.empty();
+}
+
+template <typename I>
+bool CopyupRequest<I>::is_update_object_map_required(int r) {
+ ceph_assert(m_image_ctx->snap_lock.is_locked());
+
+ if (r < 0) {
+ return false;
+ }
+
+ if (m_image_ctx->object_map == nullptr) {
+ return false;
+ }
+
+ if (m_image_ctx->migration_info.empty()) {
+ // migration might have completed while IO was in-flight,
+ // assume worst-case and perform an object map update
+ return true;
+ }
+
+ auto it = m_image_ctx->migration_info.snap_map.find(CEPH_NOSNAP);
+ ceph_assert(it != m_image_ctx->migration_info.snap_map.end());
+ return it->second[0] != CEPH_NOSNAP;
+}
+
+template <typename I>
+void CopyupRequest<I>::compute_deep_copy_snap_ids() {
+ ceph_assert(m_image_ctx->snap_lock.is_locked());
+
+ // don't copy ids for the snaps updated by object deep copy or
+ // that don't overlap
+ std::set<uint64_t> deep_copied;
+ for (auto &it : m_image_ctx->migration_info.snap_map) {
+ if (it.first != CEPH_NOSNAP) {
+ deep_copied.insert(it.second.front());
+ }
+ }
+
+ RWLock::RLocker parent_locker(m_image_ctx->parent_lock);
+ std::copy_if(m_image_ctx->snaps.rbegin(), m_image_ctx->snaps.rend(),
+ std::back_inserter(m_snap_ids),
+ [this, cct=m_image_ctx->cct, &deep_copied](uint64_t snap_id) {
+ if (deep_copied.count(snap_id)) {
+ m_first_snap_is_clean = true;
+ return false;
+ }
+
+ uint64_t parent_overlap = 0;
+ int r = m_image_ctx->get_parent_overlap(snap_id, &parent_overlap);
+ if (r < 0) {
+ ldout(cct, 5) << "failed getting parent overlap for snap_id: "
+ << snap_id << ": " << cpp_strerror(r) << dendl;
+ }
+ if (parent_overlap == 0) {
+ return false;
+ }
+ std::vector<std::pair<uint64_t, uint64_t>> extents;
+ Striper::extent_to_file(cct, &m_image_ctx->layout,
+ m_object_no, 0,
+ m_image_ctx->layout.object_size,
+ extents);
+ auto overlap = m_image_ctx->prune_parent_extents(
+ extents, parent_overlap);
+ return overlap > 0;
+ });
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::CopyupRequest<librbd::ImageCtx>;
diff --git a/src/librbd/io/CopyupRequest.h b/src/librbd/io/CopyupRequest.h
new file mode 100644
index 00000000..e4b3a2e7
--- /dev/null
+++ b/src/librbd/io/CopyupRequest.h
@@ -0,0 +1,135 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_COPYUP_REQUEST_H
+#define CEPH_LIBRBD_IO_COPYUP_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+#include "include/buffer.h"
+#include "common/Mutex.h"
+#include "common/zipkin_trace.h"
+#include "librbd/io/AsyncOperation.h"
+#include "librbd/io/Types.h"
+
+#include <string>
+#include <vector>
+
+namespace ZTracer { struct Trace; }
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+template <typename I> class AbstractObjectWriteRequest;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class CopyupRequest {
+public:
+ static CopyupRequest* create(ImageCtxT *ictx, const std::string &oid,
+ uint64_t objectno, Extents &&image_extents,
+ const ZTracer::Trace &parent_trace) {
+ return new CopyupRequest(ictx, oid, objectno, std::move(image_extents),
+ parent_trace);
+ }
+
+ CopyupRequest(ImageCtxT *ictx, const std::string &oid, uint64_t objectno,
+ Extents &&image_extents, const ZTracer::Trace &parent_trace);
+ ~CopyupRequest();
+
+ void append_request(AbstractObjectWriteRequest<ImageCtxT> *req);
+
+ void send();
+
+private:
+ /**
+ * Copyup requests go through the following state machine to read from the
+ * parent image, update the object map, and copyup the object:
+ *
+ *
+ * @verbatim
+ *
+ * <start>
+ * |
+ * /---------/ \---------\
+ * | |
+ * v v
+ * READ_FROM_PARENT DEEP_COPY
+ * | |
+ * \---------\ /---------/
+ * |
+ * v (skip if not needed)
+ * UPDATE_OBJECT_MAPS
+ * |
+ * v (skip if not needed)
+ * COPYUP
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ *
+ * The OBJECT_MAP state is skipped if the object map isn't enabled or if
+ * an object map update isn't required. The COPYUP state is skipped if
+ * no data was read from the parent *and* there are no additional ops.
+ */
+
+ typedef std::vector<AbstractObjectWriteRequest<ImageCtxT> *> WriteRequests;
+
+ ImageCtxT *m_image_ctx;
+ std::string m_oid;
+ uint64_t m_object_no;
+ Extents m_image_extents;
+ ZTracer::Trace m_trace;
+
+ bool m_flatten = false;
+ bool m_copyup_required = true;
+ bool m_copyup_is_zero = true;
+
+ ceph::bufferlist m_copyup_data;
+
+ AsyncOperation m_async_op;
+
+ std::vector<uint64_t> m_snap_ids;
+ bool m_first_snap_is_clean = false;
+
+ Mutex m_lock;
+ WriteRequests m_pending_requests;
+ unsigned m_pending_copyups = 0;
+
+ WriteRequests m_restart_requests;
+ bool m_append_request_permitted = true;
+
+ void read_from_parent();
+ void handle_read_from_parent(int r);
+
+ void deep_copy();
+ void handle_deep_copy(int r);
+
+ void update_object_maps();
+ void handle_update_object_maps(int r);
+
+ void copyup();
+ void handle_copyup(int r);
+
+ void finish(int r);
+ void complete_requests(bool override_restart_retval, int r);
+
+ void disable_append_requests();
+ void remove_from_list();
+
+ bool is_copyup_required();
+ bool is_update_object_map_required(int r);
+ bool is_deep_copy() const;
+
+ void compute_deep_copy_snap_ids();
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::CopyupRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_COPYUP_REQUEST_H
diff --git a/src/librbd/io/ImageDispatchSpec.cc b/src/librbd/io/ImageDispatchSpec.cc
new file mode 100644
index 00000000..2c405d74
--- /dev/null
+++ b/src/librbd/io/ImageDispatchSpec.cc
@@ -0,0 +1,154 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ImageDispatchSpec.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ImageRequest.h"
+#include <boost/variant.hpp>
+
+namespace librbd {
+namespace io {
+
+template <typename I>
+struct ImageDispatchSpec<I>::SendVisitor
+ : public boost::static_visitor<void> {
+ ImageDispatchSpec* spec;
+
+ explicit SendVisitor(ImageDispatchSpec* spec)
+ : spec(spec) {
+ }
+
+ void operator()(Read& read) const {
+ ImageRequest<I>::aio_read(
+ &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents),
+ std::move(read.read_result), spec->m_op_flags, spec->m_parent_trace);
+ }
+
+ void operator()(Discard& discard) const {
+ ImageRequest<I>::aio_discard(
+ &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents),
+ discard.discard_granularity_bytes, spec->m_parent_trace);
+ }
+
+ void operator()(Write& write) const {
+ ImageRequest<I>::aio_write(
+ &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents),
+ std::move(write.bl), spec->m_op_flags, spec->m_parent_trace);
+ }
+
+ void operator()(WriteSame& write_same) const {
+ ImageRequest<I>::aio_writesame(
+ &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents),
+ std::move(write_same.bl), spec->m_op_flags, spec->m_parent_trace);
+ }
+
+ void operator()(CompareAndWrite& compare_and_write) const {
+ ImageRequest<I>::aio_compare_and_write(
+ &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents),
+ std::move(compare_and_write.cmp_bl), std::move(compare_and_write.bl),
+ compare_and_write.mismatch_offset, spec->m_op_flags,
+ spec->m_parent_trace);
+ }
+
+ void operator()(Flush& flush) const {
+ ImageRequest<I>::aio_flush(
+ &spec->m_image_ctx, spec->m_aio_comp, flush.flush_source,
+ spec->m_parent_trace);
+ }
+};
+
+template <typename I>
+struct ImageDispatchSpec<I>::IsWriteOpVisitor
+ : public boost::static_visitor<bool> {
+ bool operator()(const Read&) const {
+ return false;
+ }
+
+ template <typename T>
+ bool operator()(const T&) const {
+ return true;
+ }
+};
+
+template <typename I>
+struct ImageDispatchSpec<I>::TokenRequestedVisitor
+ : public boost::static_visitor<uint64_t> {
+ ImageDispatchSpec* spec;
+ uint64_t flag;
+ uint64_t *tokens;
+
+ TokenRequestedVisitor(ImageDispatchSpec* spec, uint64_t _flag,
+ uint64_t *tokens)
+ : spec(spec), flag(_flag), tokens(tokens) {
+ }
+
+ uint64_t operator()(const Read&) const {
+ if (flag & RBD_QOS_WRITE_MASK) {
+ *tokens = 0;
+ return false;
+ }
+
+ *tokens = (flag & RBD_QOS_BPS_MASK) ? spec->extents_length() : 1;
+ return true;
+ }
+
+ uint64_t operator()(const Flush&) const {
+ *tokens = 0;
+ return true;
+ }
+
+ template <typename T>
+ uint64_t operator()(const T&) const {
+ if (flag & RBD_QOS_READ_MASK) {
+ *tokens = 0;
+ return false;
+ }
+
+ *tokens = (flag & RBD_QOS_BPS_MASK) ? spec->extents_length() : 1;
+ return true;
+ }
+};
+
+template <typename I>
+void ImageDispatchSpec<I>::send() {
+ boost::apply_visitor(SendVisitor{this}, m_request);
+}
+
+template <typename I>
+void ImageDispatchSpec<I>::fail(int r) {
+ m_aio_comp->get();
+ m_aio_comp->fail(r);
+}
+
+template <typename I>
+uint64_t ImageDispatchSpec<I>::extents_length() {
+ uint64_t length = 0;
+ auto &extents = this->m_image_extents;
+
+ for (auto &extent : extents) {
+ length += extent.second;
+ }
+ return length;
+}
+
+template <typename I>
+bool ImageDispatchSpec<I>::is_write_op() const {
+ return boost::apply_visitor(IsWriteOpVisitor(), m_request);
+}
+
+template <typename I>
+bool ImageDispatchSpec<I>::tokens_requested(uint64_t flag, uint64_t *tokens) {
+ return boost::apply_visitor(TokenRequestedVisitor{this, flag, tokens},
+ m_request);
+}
+
+template <typename I>
+void ImageDispatchSpec<I>::start_op() {
+ m_aio_comp->start_op();
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::ImageDispatchSpec<librbd::ImageCtx>;
diff --git a/src/librbd/io/ImageDispatchSpec.h b/src/librbd/io/ImageDispatchSpec.h
new file mode 100644
index 00000000..93c53a0f
--- /dev/null
+++ b/src/librbd/io/ImageDispatchSpec.h
@@ -0,0 +1,182 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_IMAGE_DISPATCH_SPEC_H
+#define CEPH_LIBRBD_IO_IMAGE_DISPATCH_SPEC_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "common/zipkin_trace.h"
+#include "librbd/io/Types.h"
+#include "librbd/io/ReadResult.h"
+#include <boost/variant/variant.hpp>
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace io {
+
+class AioCompletion;
+
+template <typename ImageCtxT = ImageCtx>
+class ImageDispatchSpec {
+public:
+ struct Read {
+ ReadResult read_result;
+
+ Read(ReadResult &&read_result) : read_result(std::move(read_result)) {
+ }
+ };
+
+ struct Discard {
+ uint32_t discard_granularity_bytes;
+
+ Discard(uint32_t discard_granularity_bytes)
+ : discard_granularity_bytes(discard_granularity_bytes) {
+ }
+ };
+
+ struct Write {
+ bufferlist bl;
+
+ Write(bufferlist&& bl) : bl(std::move(bl)) {
+ }
+ };
+
+ struct WriteSame {
+ bufferlist bl;
+
+ WriteSame(bufferlist&& bl) : bl(std::move(bl)) {
+ }
+ };
+
+ struct CompareAndWrite {
+ bufferlist cmp_bl;
+ bufferlist bl;
+ uint64_t *mismatch_offset;
+
+ CompareAndWrite(bufferlist&& cmp_bl, bufferlist&& bl,
+ uint64_t *mismatch_offset)
+ : cmp_bl(std::move(cmp_bl)), bl(std::move(bl)),
+ mismatch_offset(mismatch_offset) {
+ }
+ };
+
+ struct Flush {
+ FlushSource flush_source;
+
+ Flush(FlushSource flush_source) : flush_source(flush_source) {
+ }
+ };
+
+ static ImageDispatchSpec* create_read_request(
+ ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
+ ReadResult &&read_result, int op_flags,
+ const ZTracer::Trace &parent_trace) {
+ return new ImageDispatchSpec(image_ctx, aio_comp,
+ std::move(image_extents),
+ Read{std::move(read_result)},
+ op_flags, parent_trace);
+ }
+
+ static ImageDispatchSpec* create_discard_request(
+ ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
+ uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace) {
+ return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
+ Discard{discard_granularity_bytes},
+ 0, parent_trace);
+ }
+
+ static ImageDispatchSpec* create_write_request(
+ ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
+ bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
+ return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents),
+ Write{std::move(bl)}, op_flags, parent_trace);
+ }
+
+ static ImageDispatchSpec* create_write_same_request(
+ ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
+ return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
+ WriteSame{std::move(bl)}, op_flags,
+ parent_trace);
+ }
+
+ static ImageDispatchSpec* create_compare_and_write_request(
+ ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
+ bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset,
+ int op_flags, const ZTracer::Trace &parent_trace) {
+ return new ImageDispatchSpec(image_ctx, aio_comp,
+ std::move(image_extents),
+ CompareAndWrite{std::move(cmp_bl),
+ std::move(bl),
+ mismatch_offset},
+ op_flags, parent_trace);
+ }
+
+ static ImageDispatchSpec* create_flush_request(
+ ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ FlushSource flush_source, const ZTracer::Trace &parent_trace) {
+ return new ImageDispatchSpec(image_ctx, aio_comp, {}, Flush{flush_source},
+ 0, parent_trace);
+ }
+
+ void send();
+ void fail(int r);
+
+ bool is_write_op() const;
+
+ void start_op();
+
+ bool tokens_requested(uint64_t flag, uint64_t *tokens);
+
+ bool was_throttled(uint64_t flag) {
+ return m_throttled_flag & flag;
+ }
+
+ void set_throttled(uint64_t flag) {
+ m_throttled_flag |= flag;
+ }
+
+ bool were_all_throttled() {
+ return (m_throttled_flag & RBD_QOS_MASK) == RBD_QOS_MASK;
+ }
+
+private:
+ typedef boost::variant<Read,
+ Discard,
+ Write,
+ WriteSame,
+ CompareAndWrite,
+ Flush> Request;
+
+ struct SendVisitor;
+ struct IsWriteOpVisitor;
+ struct TokenRequestedVisitor;
+
+ ImageDispatchSpec(ImageCtxT& image_ctx, AioCompletion* aio_comp,
+ Extents&& image_extents, Request&& request,
+ int op_flags, const ZTracer::Trace& parent_trace)
+ : m_image_ctx(image_ctx), m_aio_comp(aio_comp),
+ m_image_extents(std::move(image_extents)), m_request(std::move(request)),
+ m_op_flags(op_flags), m_parent_trace(parent_trace) {
+ }
+
+ ImageCtxT& m_image_ctx;
+ AioCompletion* m_aio_comp;
+ Extents m_image_extents;
+ Request m_request;
+ int m_op_flags;
+ ZTracer::Trace m_parent_trace;
+ std::atomic<uint64_t> m_throttled_flag = 0;
+
+ uint64_t extents_length();
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::ImageDispatchSpec<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_IMAGE_DISPATCH_SPEC_H
diff --git a/src/librbd/io/ImageRequest.cc b/src/librbd/io/ImageRequest.cc
new file mode 100644
index 00000000..d6eb29fe
--- /dev/null
+++ b/src/librbd/io/ImageRequest.cc
@@ -0,0 +1,824 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ImageRequest.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/internal.h"
+#include "librbd/Journal.h"
+#include "librbd/Types.h"
+#include "librbd/Utils.h"
+#include "librbd/cache/ImageCache.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/AsyncOperation.h"
+#include "librbd/io/ObjectDispatchInterface.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/Utils.h"
+#include "librbd/journal/Types.h"
+#include "include/rados/librados.hpp"
+#include "common/perf_counters.h"
+#include "common/WorkQueue.h"
+#include "osdc/Striper.h"
+#include <algorithm>
+#include <functional>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::ImageRequest: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+using librbd::util::get_image_ctx;
+
+namespace {
+
+template <typename I>
+struct C_UpdateTimestamp : public Context {
+public:
+ I& m_image_ctx;
+ bool m_modify; // if modify set to 'true', modify timestamp is updated,
+ // access timestamp otherwise
+ AsyncOperation m_async_op;
+
+ C_UpdateTimestamp(I& ictx, bool m) : m_image_ctx(ictx), m_modify(m) {
+ m_async_op.start_op(*get_image_ctx(&m_image_ctx));
+ }
+ ~C_UpdateTimestamp() override {
+ m_async_op.finish_op();
+ }
+
+ void send() {
+ librados::ObjectWriteOperation op;
+ if (m_modify) {
+ cls_client::set_modify_timestamp(&op);
+ } else {
+ cls_client::set_access_timestamp(&op);
+ }
+
+ auto comp = librbd::util::create_rados_callback(this);
+ int r = m_image_ctx.md_ctx.aio_operate(m_image_ctx.header_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void finish(int r) override {
+ // ignore errors updating timestamp
+ }
+};
+
+bool should_update_timestamp(const utime_t& now, const utime_t& timestamp,
+ uint64_t interval) {
+ return (interval &&
+ (static_cast<uint64_t>(now.sec()) >= interval + timestamp));
+}
+
+} // anonymous namespace
+
+template <typename I>
+void ImageRequest<I>::aio_read(I *ictx, AioCompletion *c,
+ Extents &&image_extents,
+ ReadResult &&read_result, int op_flags,
+ const ZTracer::Trace &parent_trace) {
+ ImageReadRequest<I> req(*ictx, c, std::move(image_extents),
+ std::move(read_result), op_flags, parent_trace);
+ req.send();
+}
+
+template <typename I>
+void ImageRequest<I>::aio_write(I *ictx, AioCompletion *c,
+ Extents &&image_extents, bufferlist &&bl,
+ int op_flags,
+ const ZTracer::Trace &parent_trace) {
+ ImageWriteRequest<I> req(*ictx, c, std::move(image_extents), std::move(bl),
+ op_flags, parent_trace);
+ req.send();
+}
+
+template <typename I>
+void ImageRequest<I>::aio_discard(I *ictx, AioCompletion *c,
+ Extents &&image_extents,
+ uint32_t discard_granularity_bytes,
+ const ZTracer::Trace &parent_trace) {
+ ImageDiscardRequest<I> req(*ictx, c, std::move(image_extents),
+ discard_granularity_bytes, parent_trace);
+ req.send();
+}
+
+template <typename I>
+void ImageRequest<I>::aio_flush(I *ictx, AioCompletion *c,
+ FlushSource flush_source,
+ const ZTracer::Trace &parent_trace) {
+ ImageFlushRequest<I> req(*ictx, c, flush_source, parent_trace);
+ req.send();
+}
+
+template <typename I>
+void ImageRequest<I>::aio_writesame(I *ictx, AioCompletion *c,
+ Extents &&image_extents,
+ bufferlist &&bl, int op_flags,
+ const ZTracer::Trace &parent_trace) {
+ ImageWriteSameRequest<I> req(*ictx, c, std::move(image_extents),
+ std::move(bl), op_flags, parent_trace);
+ req.send();
+}
+
+template <typename I>
+void ImageRequest<I>::aio_compare_and_write(I *ictx, AioCompletion *c,
+ Extents &&image_extents,
+ bufferlist &&cmp_bl,
+ bufferlist &&bl,
+ uint64_t *mismatch_offset,
+ int op_flags,
+ const ZTracer::Trace &parent_trace) {
+ ImageCompareAndWriteRequest<I> req(*ictx, c, std::move(image_extents),
+ std::move(cmp_bl), std::move(bl),
+ mismatch_offset, op_flags, parent_trace);
+ req.send();
+}
+
+
+template <typename I>
+void ImageRequest<I>::send() {
+ I &image_ctx = this->m_image_ctx;
+ ceph_assert(m_aio_comp->is_initialized(get_aio_type()));
+ ceph_assert(m_aio_comp->is_started());
+
+ CephContext *cct = image_ctx.cct;
+ AioCompletion *aio_comp = this->m_aio_comp;
+ ldout(cct, 20) << get_request_type() << ": ictx=" << &image_ctx << ", "
+ << "completion=" << aio_comp << dendl;
+
+ aio_comp->get();
+ int r = clip_request();
+ if (r < 0) {
+ m_aio_comp->fail(r);
+ return;
+ }
+
+ if (m_bypass_image_cache || m_image_ctx.image_cache == nullptr) {
+ update_timestamp();
+ send_request();
+ } else {
+ send_image_cache_request();
+ }
+}
+
+template <typename I>
+int ImageRequest<I>::clip_request() {
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ for (auto &image_extent : m_image_extents) {
+ auto clip_len = image_extent.second;
+ int r = clip_io(get_image_ctx(&m_image_ctx), image_extent.first, &clip_len);
+ if (r < 0) {
+ return r;
+ }
+
+ image_extent.second = clip_len;
+ }
+ return 0;
+}
+
+template <typename I>
+void ImageRequest<I>::update_timestamp() {
+ bool modify = (get_aio_type() != AIO_TYPE_READ);
+ uint64_t update_interval;
+ if (modify) {
+ update_interval = m_image_ctx.mtime_update_interval;
+ } else {
+ update_interval = m_image_ctx.atime_update_interval;
+ }
+
+ if (update_interval == 0) {
+ return;
+ }
+
+ utime_t (I::*get_timestamp_fn)() const;
+ void (I::*set_timestamp_fn)(utime_t);
+ if (modify) {
+ get_timestamp_fn = &I::get_modify_timestamp;
+ set_timestamp_fn = &I::set_modify_timestamp;
+ } else {
+ get_timestamp_fn = &I::get_access_timestamp;
+ set_timestamp_fn = &I::set_access_timestamp;
+ }
+
+ utime_t ts = ceph_clock_now();
+ {
+ RWLock::RLocker timestamp_locker(m_image_ctx.timestamp_lock);
+ if(!should_update_timestamp(ts, std::invoke(get_timestamp_fn, m_image_ctx),
+ update_interval)) {
+ return;
+ }
+ }
+
+ {
+ RWLock::WLocker timestamp_locker(m_image_ctx.timestamp_lock);
+ bool update = should_update_timestamp(
+ ts, std::invoke(get_timestamp_fn, m_image_ctx), update_interval);
+ if (!update) {
+ return;
+ }
+
+ std::invoke(set_timestamp_fn, m_image_ctx, ts);
+ }
+
+ // TODO we fire and forget this outside the IO path to prevent
+ // potential race conditions with librbd client IO callbacks
+ // between different threads (e.g. librados and object cacher)
+ ldout(m_image_ctx.cct, 10) << get_request_type() << dendl;
+ auto req = new C_UpdateTimestamp<I>(m_image_ctx, modify);
+ req->send();
+}
+
+template <typename I>
+ImageReadRequest<I>::ImageReadRequest(I &image_ctx, AioCompletion *aio_comp,
+ Extents &&image_extents,
+ ReadResult &&read_result, int op_flags,
+ const ZTracer::Trace &parent_trace)
+ : ImageRequest<I>(image_ctx, aio_comp, std::move(image_extents), "read",
+ parent_trace),
+ m_op_flags(op_flags) {
+ aio_comp->read_result = std::move(read_result);
+}
+
+template <typename I>
+int ImageReadRequest<I>::clip_request() {
+ int r = ImageRequest<I>::clip_request();
+ if (r < 0) {
+ return r;
+ }
+
+ uint64_t buffer_length = 0;
+ auto &image_extents = this->m_image_extents;
+ for (auto &image_extent : image_extents) {
+ buffer_length += image_extent.second;
+ }
+ this->m_aio_comp->read_result.set_clip_length(buffer_length);
+ return 0;
+}
+
+template <typename I>
+void ImageReadRequest<I>::send_request() {
+ I &image_ctx = this->m_image_ctx;
+ CephContext *cct = image_ctx.cct;
+
+ auto &image_extents = this->m_image_extents;
+ if (image_ctx.cache && image_ctx.readahead_max_bytes > 0 &&
+ !(m_op_flags & LIBRADOS_OP_FLAG_FADVISE_RANDOM)) {
+ readahead(get_image_ctx(&image_ctx), image_extents);
+ }
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ librados::snap_t snap_id;
+ map<object_t,vector<ObjectExtent> > object_extents;
+ uint64_t buffer_ofs = 0;
+ {
+ // prevent image size from changing between computing clip and recording
+ // pending async operation
+ RWLock::RLocker snap_locker(image_ctx.snap_lock);
+ snap_id = image_ctx.snap_id;
+
+ // map image extents to object extents
+ for (auto &extent : image_extents) {
+ if (extent.second == 0) {
+ continue;
+ }
+
+ Striper::file_to_extents(cct, image_ctx.format_string, &image_ctx.layout,
+ extent.first, extent.second, 0, object_extents,
+ buffer_ofs);
+ buffer_ofs += extent.second;
+ }
+ }
+
+ // pre-calculate the expected number of read requests
+ uint32_t request_count = 0;
+ for (auto &object_extent : object_extents) {
+ request_count += object_extent.second.size();
+ }
+ aio_comp->set_request_count(request_count);
+
+ // issue the requests
+ for (auto &object_extent : object_extents) {
+ for (auto &extent : object_extent.second) {
+ ldout(cct, 20) << "oid " << extent.oid << " " << extent.offset << "~"
+ << extent.length << " from " << extent.buffer_extents
+ << dendl;
+
+ auto req_comp = new io::ReadResult::C_ObjectReadRequest(
+ aio_comp, extent.offset, extent.length,
+ std::move(extent.buffer_extents));
+ auto req = ObjectDispatchSpec::create_read(
+ &image_ctx, OBJECT_DISPATCH_LAYER_NONE, extent.oid.name,
+ extent.objectno, extent.offset, extent.length, snap_id, m_op_flags,
+ this->m_trace, &req_comp->bl, &req_comp->extent_map, req_comp);
+ req->send();
+ }
+ }
+
+ aio_comp->put();
+
+ image_ctx.perfcounter->inc(l_librbd_rd);
+ image_ctx.perfcounter->inc(l_librbd_rd_bytes, buffer_ofs);
+}
+
+template <typename I>
+void ImageReadRequest<I>::send_image_cache_request() {
+ I &image_ctx = this->m_image_ctx;
+ ceph_assert(image_ctx.image_cache != nullptr);
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(1);
+
+ auto *req_comp = new io::ReadResult::C_ImageReadRequest(
+ aio_comp, this->m_image_extents);
+ image_ctx.image_cache->aio_read(std::move(this->m_image_extents),
+ &req_comp->bl, m_op_flags,
+ req_comp);
+}
+
+template <typename I>
+void AbstractImageWriteRequest<I>::send_request() {
+ I &image_ctx = this->m_image_ctx;
+ CephContext *cct = image_ctx.cct;
+
+ RWLock::RLocker md_locker(image_ctx.md_lock);
+
+ bool journaling = false;
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ uint64_t clip_len = 0;
+ ObjectExtents object_extents;
+ ::SnapContext snapc;
+ {
+ // prevent image size from changing between computing clip and recording
+ // pending async operation
+ RWLock::RLocker snap_locker(image_ctx.snap_lock);
+ if (image_ctx.snap_id != CEPH_NOSNAP || image_ctx.read_only) {
+ aio_comp->fail(-EROFS);
+ return;
+ }
+
+ for (auto &extent : this->m_image_extents) {
+ if (extent.second == 0) {
+ continue;
+ }
+
+ // map to object extents
+ Striper::file_to_extents(cct, image_ctx.format_string, &image_ctx.layout,
+ extent.first, extent.second, 0, object_extents);
+ clip_len += extent.second;
+ }
+
+ snapc = image_ctx.snapc;
+ journaling = (image_ctx.journal != nullptr &&
+ image_ctx.journal->is_journal_appending());
+ }
+
+ int ret = prune_object_extents(&object_extents);
+ if (ret < 0) {
+ aio_comp->fail(ret);
+ return;
+ }
+
+ if (!object_extents.empty()) {
+ uint64_t journal_tid = 0;
+ if (journaling) {
+ // in-flight ops are flushed prior to closing the journal
+ ceph_assert(image_ctx.journal != NULL);
+ journal_tid = append_journal_event(m_synchronous);
+ }
+
+ aio_comp->set_request_count(object_extents.size());
+ send_object_requests(object_extents, snapc, journal_tid);
+ } else {
+ // no IO to perform -- fire completion
+ aio_comp->set_request_count(0);
+ }
+
+ update_stats(clip_len);
+ aio_comp->put();
+}
+
+template <typename I>
+void AbstractImageWriteRequest<I>::send_object_requests(
+ const ObjectExtents &object_extents, const ::SnapContext &snapc,
+ uint64_t journal_tid) {
+ I &image_ctx = this->m_image_ctx;
+ CephContext *cct = image_ctx.cct;
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ for (ObjectExtents::const_iterator p = object_extents.begin();
+ p != object_extents.end(); ++p) {
+ ldout(cct, 20) << "oid " << p->oid << " " << p->offset << "~" << p->length
+ << " from " << p->buffer_extents << dendl;
+ C_AioRequest *req_comp = new C_AioRequest(aio_comp);
+ auto request = create_object_request(*p, snapc, journal_tid, req_comp);
+
+ // if journaling, stash the request for later; otherwise send
+ if (request != NULL) {
+ request->send();
+ }
+ }
+}
+
+template <typename I>
+void ImageWriteRequest<I>::assemble_extent(const ObjectExtent &object_extent,
+ bufferlist *bl) {
+ for (auto q = object_extent.buffer_extents.begin();
+ q != object_extent.buffer_extents.end(); ++q) {
+ bufferlist sub_bl;
+ sub_bl.substr_of(m_bl, q->first, q->second);
+ bl->claim_append(sub_bl);
+ }
+}
+
+template <typename I>
+uint64_t ImageWriteRequest<I>::append_journal_event(bool synchronous) {
+ I &image_ctx = this->m_image_ctx;
+
+ uint64_t tid = 0;
+ uint64_t buffer_offset = 0;
+ ceph_assert(!this->m_image_extents.empty());
+ for (auto &extent : this->m_image_extents) {
+ bufferlist sub_bl;
+ sub_bl.substr_of(m_bl, buffer_offset, extent.second);
+ buffer_offset += extent.second;
+
+ tid = image_ctx.journal->append_write_event(extent.first, extent.second,
+ sub_bl, synchronous);
+ }
+
+ return tid;
+}
+
+template <typename I>
+void ImageWriteRequest<I>::send_image_cache_request() {
+ I &image_ctx = this->m_image_ctx;
+ ceph_assert(image_ctx.image_cache != nullptr);
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(1);
+ C_AioRequest *req_comp = new C_AioRequest(aio_comp);
+ image_ctx.image_cache->aio_write(std::move(this->m_image_extents),
+ std::move(m_bl), m_op_flags, req_comp);
+}
+
+template <typename I>
+ObjectDispatchSpec *ImageWriteRequest<I>::create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) {
+ I &image_ctx = this->m_image_ctx;
+
+ bufferlist bl;
+ assemble_extent(object_extent, &bl);
+ auto req = ObjectDispatchSpec::create_write(
+ &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name,
+ object_extent.objectno, object_extent.offset, std::move(bl), snapc,
+ m_op_flags, journal_tid, this->m_trace, on_finish);
+ return req;
+}
+
+template <typename I>
+void ImageWriteRequest<I>::update_stats(size_t length) {
+ I &image_ctx = this->m_image_ctx;
+ image_ctx.perfcounter->inc(l_librbd_wr);
+ image_ctx.perfcounter->inc(l_librbd_wr_bytes, length);
+}
+
+template <typename I>
+uint64_t ImageDiscardRequest<I>::append_journal_event(bool synchronous) {
+ I &image_ctx = this->m_image_ctx;
+
+ uint64_t tid = 0;
+ ceph_assert(!this->m_image_extents.empty());
+ for (auto &extent : this->m_image_extents) {
+ journal::EventEntry event_entry(
+ journal::AioDiscardEvent(extent.first,
+ extent.second,
+ this->m_discard_granularity_bytes));
+ tid = image_ctx.journal->append_io_event(std::move(event_entry),
+ extent.first, extent.second,
+ synchronous, 0);
+ }
+
+ return tid;
+}
+
+template <typename I>
+void ImageDiscardRequest<I>::send_image_cache_request() {
+ I &image_ctx = this->m_image_ctx;
+ ceph_assert(image_ctx.image_cache != nullptr);
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(this->m_image_extents.size());
+ for (auto &extent : this->m_image_extents) {
+ C_AioRequest *req_comp = new C_AioRequest(aio_comp);
+ image_ctx.image_cache->aio_discard(extent.first, extent.second,
+ this->m_discard_granularity_bytes,
+ req_comp);
+ }
+}
+
+template <typename I>
+ObjectDispatchSpec *ImageDiscardRequest<I>::create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) {
+ I &image_ctx = this->m_image_ctx;
+ auto req = ObjectDispatchSpec::create_discard(
+ &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name,
+ object_extent.objectno, object_extent.offset, object_extent.length, snapc,
+ OBJECT_DISCARD_FLAG_DISABLE_CLONE_REMOVE, journal_tid, this->m_trace,
+ on_finish);
+ return req;
+}
+
+template <typename I>
+void ImageDiscardRequest<I>::update_stats(size_t length) {
+ I &image_ctx = this->m_image_ctx;
+ image_ctx.perfcounter->inc(l_librbd_discard);
+ image_ctx.perfcounter->inc(l_librbd_discard_bytes, length);
+}
+
+template <typename I>
+int ImageDiscardRequest<I>::prune_object_extents(
+ ObjectExtents* object_extents) const {
+ if (m_discard_granularity_bytes == 0) {
+ return 0;
+ }
+
+ // Align the range to discard_granularity_bytes boundary and skip
+ // and discards that are too small to free up any space.
+ //
+ // discard_granularity_bytes >= object_size && tail truncation
+ // is a special case for filestore
+ bool prune_required = false;
+ auto object_size = this->m_image_ctx.layout.object_size;
+ auto discard_granularity_bytes = std::min(m_discard_granularity_bytes,
+ object_size);
+ auto xform_lambda =
+ [discard_granularity_bytes, object_size, &prune_required]
+ (ObjectExtent& object_extent) {
+ auto& offset = object_extent.offset;
+ auto& length = object_extent.length;
+ auto next_offset = offset + length;
+
+ if ((discard_granularity_bytes < object_size) ||
+ (next_offset < object_size)) {
+ offset = p2roundup<uint64_t>(offset, discard_granularity_bytes);
+ next_offset = p2align<uint64_t>(next_offset, discard_granularity_bytes);
+ if (offset >= next_offset) {
+ prune_required = true;
+ length = 0;
+ } else {
+ length = next_offset - offset;
+ }
+ }
+ };
+ std::for_each(object_extents->begin(), object_extents->end(),
+ xform_lambda);
+
+ if (prune_required) {
+ // one or more object extents were skipped
+ auto remove_lambda =
+ [](const ObjectExtent& object_extent) {
+ return (object_extent.length == 0);
+ };
+ object_extents->erase(
+ std::remove_if(object_extents->begin(), object_extents->end(),
+ remove_lambda),
+ object_extents->end());
+ }
+ return 0;
+}
+
+template <typename I>
+void ImageFlushRequest<I>::send_request() {
+ I &image_ctx = this->m_image_ctx;
+
+ bool journaling = false;
+ {
+ RWLock::RLocker snap_locker(image_ctx.snap_lock);
+ journaling = (m_flush_source == FLUSH_SOURCE_USER &&
+ image_ctx.journal != nullptr &&
+ image_ctx.journal->is_journal_appending());
+ }
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(1);
+
+ Context *ctx = new C_AioRequest(aio_comp);
+
+ // ensure no locks are held when flush is complete
+ ctx = librbd::util::create_async_context_callback(image_ctx, ctx);
+
+ if (journaling) {
+ // in-flight ops are flushed prior to closing the journal
+ uint64_t journal_tid = image_ctx.journal->append_io_event(
+ journal::EventEntry(journal::AioFlushEvent()), 0, 0, false, 0);
+ image_ctx.journal->user_flushed();
+
+ ctx = new FunctionContext(
+ [&image_ctx, journal_tid, ctx](int r) {
+ image_ctx.journal->commit_io_event(journal_tid, r);
+ ctx->complete(r);
+ });
+ ctx = new FunctionContext(
+ [&image_ctx, journal_tid, ctx](int r) {
+ image_ctx.journal->flush_event(journal_tid, ctx);
+ });
+ } else {
+ // flush rbd cache only when journaling is not enabled
+ auto object_dispatch_spec = ObjectDispatchSpec::create_flush(
+ &image_ctx, OBJECT_DISPATCH_LAYER_NONE, m_flush_source, this->m_trace,
+ ctx);
+ ctx = new FunctionContext([object_dispatch_spec](int r) {
+ object_dispatch_spec->send();
+ });
+ }
+
+ // ensure all in-flight IOs are settled if non-user flush request
+ aio_comp->async_op.flush(ctx);
+ aio_comp->put();
+
+ // might be flushing during image shutdown
+ if (image_ctx.perfcounter != nullptr) {
+ image_ctx.perfcounter->inc(l_librbd_flush);
+ }
+}
+
+template <typename I>
+void ImageFlushRequest<I>::send_image_cache_request() {
+ I &image_ctx = this->m_image_ctx;
+ ceph_assert(image_ctx.image_cache != nullptr);
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(1);
+ C_AioRequest *req_comp = new C_AioRequest(aio_comp);
+ image_ctx.image_cache->aio_flush(req_comp);
+}
+
+template <typename I>
+uint64_t ImageWriteSameRequest<I>::append_journal_event(bool synchronous) {
+ I &image_ctx = this->m_image_ctx;
+
+ uint64_t tid = 0;
+ ceph_assert(!this->m_image_extents.empty());
+ for (auto &extent : this->m_image_extents) {
+ journal::EventEntry event_entry(journal::AioWriteSameEvent(extent.first,
+ extent.second,
+ m_data_bl));
+ tid = image_ctx.journal->append_io_event(std::move(event_entry),
+ extent.first, extent.second,
+ synchronous, 0);
+ }
+
+ return tid;
+}
+
+template <typename I>
+void ImageWriteSameRequest<I>::send_image_cache_request() {
+ I &image_ctx = this->m_image_ctx;
+ ceph_assert(image_ctx.image_cache != nullptr);
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(this->m_image_extents.size());
+ for (auto &extent : this->m_image_extents) {
+ C_AioRequest *req_comp = new C_AioRequest(aio_comp);
+ image_ctx.image_cache->aio_writesame(extent.first, extent.second,
+ std::move(m_data_bl), m_op_flags,
+ req_comp);
+ }
+}
+
+template <typename I>
+ObjectDispatchSpec *ImageWriteSameRequest<I>::create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) {
+ I &image_ctx = this->m_image_ctx;
+
+ bufferlist bl;
+ ObjectDispatchSpec *req;
+
+ if (util::assemble_write_same_extent(object_extent, m_data_bl, &bl, false)) {
+ Extents buffer_extents{object_extent.buffer_extents};
+
+ req = ObjectDispatchSpec::create_write_same(
+ &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name,
+ object_extent.objectno, object_extent.offset, object_extent.length,
+ std::move(buffer_extents), std::move(bl), snapc, m_op_flags, journal_tid,
+ this->m_trace, on_finish);
+ return req;
+ }
+ req = ObjectDispatchSpec::create_write(
+ &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name,
+ object_extent.objectno, object_extent.offset, std::move(bl), snapc,
+ m_op_flags, journal_tid, this->m_trace, on_finish);
+ return req;
+}
+
+template <typename I>
+void ImageWriteSameRequest<I>::update_stats(size_t length) {
+ I &image_ctx = this->m_image_ctx;
+ image_ctx.perfcounter->inc(l_librbd_ws);
+ image_ctx.perfcounter->inc(l_librbd_ws_bytes, length);
+}
+
+template <typename I>
+uint64_t ImageCompareAndWriteRequest<I>::append_journal_event(
+ bool synchronous) {
+ I &image_ctx = this->m_image_ctx;
+
+ uint64_t tid = 0;
+ ceph_assert(this->m_image_extents.size() == 1);
+ auto &extent = this->m_image_extents.front();
+ journal::EventEntry event_entry(
+ journal::AioCompareAndWriteEvent(extent.first, extent.second, m_cmp_bl,
+ m_bl));
+ tid = image_ctx.journal->append_io_event(std::move(event_entry),
+ extent.first, extent.second,
+ synchronous, -EILSEQ);
+
+ return tid;
+}
+
+template <typename I>
+void ImageCompareAndWriteRequest<I>::assemble_extent(
+ const ObjectExtent &object_extent, bufferlist *bl) {
+ for (auto q = object_extent.buffer_extents.begin();
+ q != object_extent.buffer_extents.end(); ++q) {
+ bufferlist sub_bl;
+ sub_bl.substr_of(m_bl, q->first, q->second);
+ bl->claim_append(sub_bl);
+ }
+}
+
+template <typename I>
+void ImageCompareAndWriteRequest<I>::send_image_cache_request() {
+ I &image_ctx = this->m_image_ctx;
+ ceph_assert(image_ctx.image_cache != nullptr);
+
+ AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(1);
+ C_AioRequest *req_comp = new C_AioRequest(aio_comp);
+ image_ctx.image_cache->aio_compare_and_write(
+ std::move(this->m_image_extents), std::move(m_cmp_bl), std::move(m_bl),
+ m_mismatch_offset, m_op_flags, req_comp);
+}
+
+template <typename I>
+ObjectDispatchSpec *ImageCompareAndWriteRequest<I>::create_object_request(
+ const ObjectExtent &object_extent,
+ const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) {
+ I &image_ctx = this->m_image_ctx;
+
+ // NOTE: safe to move m_cmp_bl since we only support this op against
+ // a single object
+ bufferlist bl;
+ assemble_extent(object_extent, &bl);
+ auto req = ObjectDispatchSpec::create_compare_and_write(
+ &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name,
+ object_extent.objectno, object_extent.offset, std::move(m_cmp_bl),
+ std::move(bl), snapc, m_mismatch_offset, m_op_flags, journal_tid,
+ this->m_trace, on_finish);
+ return req;
+}
+
+template <typename I>
+void ImageCompareAndWriteRequest<I>::update_stats(size_t length) {
+ I &image_ctx = this->m_image_ctx;
+ image_ctx.perfcounter->inc(l_librbd_cmp);
+ image_ctx.perfcounter->inc(l_librbd_cmp_bytes, length);
+}
+
+template <typename I>
+int ImageCompareAndWriteRequest<I>::prune_object_extents(
+ ObjectExtents* object_extents) const {
+ if (object_extents->size() > 1)
+ return -EINVAL;
+
+ I &image_ctx = this->m_image_ctx;
+ uint64_t sector_size = 512ULL;
+ uint64_t su = image_ctx.layout.stripe_unit;
+ ObjectExtent object_extent = object_extents->front();
+ if (object_extent.offset % sector_size + object_extent.length > sector_size ||
+ (su != 0 && (object_extent.offset % su + object_extent.length > su)))
+ return -EINVAL;
+
+ return 0;
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::ImageRequest<librbd::ImageCtx>;
+template class librbd::io::ImageReadRequest<librbd::ImageCtx>;
+template class librbd::io::AbstractImageWriteRequest<librbd::ImageCtx>;
+template class librbd::io::ImageWriteRequest<librbd::ImageCtx>;
+template class librbd::io::ImageDiscardRequest<librbd::ImageCtx>;
+template class librbd::io::ImageFlushRequest<librbd::ImageCtx>;
+template class librbd::io::ImageWriteSameRequest<librbd::ImageCtx>;
+template class librbd::io::ImageCompareAndWriteRequest<librbd::ImageCtx>;
diff --git a/src/librbd/io/ImageRequest.h b/src/librbd/io/ImageRequest.h
new file mode 100644
index 00000000..d7d10019
--- /dev/null
+++ b/src/librbd/io/ImageRequest.h
@@ -0,0 +1,365 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_IMAGE_REQUEST_H
+#define CEPH_LIBRBD_IO_IMAGE_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "common/snap_types.h"
+#include "common/zipkin_trace.h"
+#include "osd/osd_types.h"
+#include "librbd/Utils.h"
+#include "librbd/io/Types.h"
+#include <list>
+#include <utility>
+#include <vector>
+
+namespace librbd {
+class ImageCtx;
+
+namespace io {
+
+class AioCompletion;
+class ObjectDispatchSpec;
+class ReadResult;
+
+template <typename ImageCtxT = ImageCtx>
+class ImageRequest {
+public:
+ typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
+
+ virtual ~ImageRequest() {
+ m_trace.event("finish");
+ }
+
+ static void aio_read(ImageCtxT *ictx, AioCompletion *c,
+ Extents &&image_extents, ReadResult &&read_result,
+ int op_flags, const ZTracer::Trace &parent_trace);
+ static void aio_write(ImageCtxT *ictx, AioCompletion *c,
+ Extents &&image_extents, bufferlist &&bl, int op_flags,
+ const ZTracer::Trace &parent_trace);
+ static void aio_discard(ImageCtxT *ictx, AioCompletion *c,
+ Extents &&image_extents,
+ uint32_t discard_granularity_bytes,
+ const ZTracer::Trace &parent_trace);
+ static void aio_flush(ImageCtxT *ictx, AioCompletion *c,
+ FlushSource flush_source,
+ const ZTracer::Trace &parent_trace);
+ static void aio_writesame(ImageCtxT *ictx, AioCompletion *c,
+ Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace);
+
+ static void aio_compare_and_write(ImageCtxT *ictx, AioCompletion *c,
+ Extents &&image_extents, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_offset,
+ int op_flags, const ZTracer::Trace &parent_trace);
+
+ void send();
+
+ void set_bypass_image_cache() {
+ m_bypass_image_cache = true;
+ }
+
+ inline const ZTracer::Trace &get_trace() const {
+ return m_trace;
+ }
+
+protected:
+ typedef std::list<ObjectDispatchSpec*> ObjectRequests;
+
+ ImageCtxT &m_image_ctx;
+ AioCompletion *m_aio_comp;
+ Extents m_image_extents;
+ ZTracer::Trace m_trace;
+ bool m_bypass_image_cache = false;
+
+ ImageRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ Extents &&image_extents, const char *trace_name,
+ const ZTracer::Trace &parent_trace)
+ : m_image_ctx(image_ctx), m_aio_comp(aio_comp),
+ m_image_extents(std::move(image_extents)),
+ m_trace(util::create_trace(image_ctx, trace_name, parent_trace)) {
+ m_trace.event("start");
+ }
+
+
+ virtual int clip_request();
+ virtual void update_timestamp();
+ virtual void send_request() = 0;
+ virtual void send_image_cache_request() = 0;
+
+ virtual aio_type_t get_aio_type() const = 0;
+ virtual const char *get_request_type() const = 0;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ImageReadRequest : public ImageRequest<ImageCtxT> {
+public:
+ using typename ImageRequest<ImageCtxT>::Extents;
+
+ ImageReadRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ Extents &&image_extents, ReadResult &&read_result,
+ int op_flags, const ZTracer::Trace &parent_trace);
+
+protected:
+ int clip_request() override;
+
+ void send_request() override;
+ void send_image_cache_request() override;
+
+ aio_type_t get_aio_type() const override {
+ return AIO_TYPE_READ;
+ }
+ const char *get_request_type() const override {
+ return "aio_read";
+ }
+private:
+ int m_op_flags;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class AbstractImageWriteRequest : public ImageRequest<ImageCtxT> {
+public:
+ inline void flag_synchronous() {
+ m_synchronous = true;
+ }
+
+protected:
+ using typename ImageRequest<ImageCtxT>::ObjectRequests;
+ using typename ImageRequest<ImageCtxT>::Extents;
+
+ typedef std::vector<ObjectExtent> ObjectExtents;
+
+ AbstractImageWriteRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ Extents &&image_extents, const char *trace_name,
+ const ZTracer::Trace &parent_trace)
+ : ImageRequest<ImageCtxT>(image_ctx, aio_comp, std::move(image_extents),
+ trace_name, parent_trace),
+ m_synchronous(false) {
+ }
+
+ void send_request() override;
+
+ virtual int prune_object_extents(ObjectExtents* object_extents) const {
+ return 0;
+ }
+
+ void send_object_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc, uint64_t journal_tid);
+ virtual ObjectDispatchSpec *create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) = 0;
+
+ virtual uint64_t append_journal_event(bool synchronous) = 0;
+ virtual void update_stats(size_t length) = 0;
+
+private:
+ bool m_synchronous;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ImageWriteRequest : public AbstractImageWriteRequest<ImageCtxT> {
+public:
+ using typename ImageRequest<ImageCtxT>::Extents;
+
+ ImageWriteRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ Extents &&image_extents, bufferlist &&bl, int op_flags,
+ const ZTracer::Trace &parent_trace)
+ : AbstractImageWriteRequest<ImageCtxT>(
+ image_ctx, aio_comp, std::move(image_extents), "write", parent_trace),
+ m_bl(std::move(bl)), m_op_flags(op_flags) {
+ }
+
+protected:
+ using typename ImageRequest<ImageCtxT>::ObjectRequests;
+ using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents;
+
+ aio_type_t get_aio_type() const override {
+ return AIO_TYPE_WRITE;
+ }
+ const char *get_request_type() const override {
+ return "aio_write";
+ }
+
+ void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl);
+
+ void send_image_cache_request() override;
+
+
+ ObjectDispatchSpec *create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) override;
+
+ uint64_t append_journal_event(bool synchronous) override;
+ void update_stats(size_t length) override;
+
+private:
+ bufferlist m_bl;
+ int m_op_flags;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ImageDiscardRequest : public AbstractImageWriteRequest<ImageCtxT> {
+public:
+ ImageDiscardRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ Extents&& image_extents,
+ uint32_t discard_granularity_bytes,
+ const ZTracer::Trace &parent_trace)
+ : AbstractImageWriteRequest<ImageCtxT>(
+ image_ctx, aio_comp, std::move(image_extents), "discard", parent_trace),
+ m_discard_granularity_bytes(discard_granularity_bytes) {
+ }
+
+protected:
+ using typename ImageRequest<ImageCtxT>::ObjectRequests;
+ using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents;
+
+ aio_type_t get_aio_type() const override {
+ return AIO_TYPE_DISCARD;
+ }
+ const char *get_request_type() const override {
+ return "aio_discard";
+ }
+
+ void send_image_cache_request() override;
+
+ ObjectDispatchSpec *create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) override;
+
+ uint64_t append_journal_event(bool synchronous) override;
+ void update_stats(size_t length) override;
+
+ int prune_object_extents(ObjectExtents* object_extents) const override;
+
+private:
+ uint32_t m_discard_granularity_bytes;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ImageFlushRequest : public ImageRequest<ImageCtxT> {
+public:
+ ImageFlushRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ FlushSource flush_source,
+ const ZTracer::Trace &parent_trace)
+ : ImageRequest<ImageCtxT>(image_ctx, aio_comp, {}, "flush", parent_trace),
+ m_flush_source(flush_source) {
+ }
+
+protected:
+ using typename ImageRequest<ImageCtxT>::ObjectRequests;
+
+ int clip_request() override {
+ return 0;
+ }
+ void update_timestamp() override {
+ }
+ void send_request() override;
+ void send_image_cache_request() override;
+
+ aio_type_t get_aio_type() const override {
+ return AIO_TYPE_FLUSH;
+ }
+ const char *get_request_type() const override {
+ return "aio_flush";
+ }
+
+private:
+ FlushSource m_flush_source;
+
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ImageWriteSameRequest : public AbstractImageWriteRequest<ImageCtxT> {
+public:
+ ImageWriteSameRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ Extents&& image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace)
+ : AbstractImageWriteRequest<ImageCtxT>(
+ image_ctx, aio_comp, std::move(image_extents), "writesame",
+ parent_trace),
+ m_data_bl(std::move(bl)), m_op_flags(op_flags) {
+ }
+
+protected:
+ using typename ImageRequest<ImageCtxT>::ObjectRequests;
+ using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents;
+
+ aio_type_t get_aio_type() const override {
+ return AIO_TYPE_WRITESAME;
+ }
+ const char *get_request_type() const override {
+ return "aio_writesame";
+ }
+
+ void send_image_cache_request() override;
+
+ ObjectDispatchSpec *create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) override;
+
+ uint64_t append_journal_event(bool synchronous) override;
+ void update_stats(size_t length) override;
+private:
+ bufferlist m_data_bl;
+ int m_op_flags;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ImageCompareAndWriteRequest : public AbstractImageWriteRequest<ImageCtxT> {
+public:
+ using typename ImageRequest<ImageCtxT>::ObjectRequests;
+ using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents;
+
+ ImageCompareAndWriteRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
+ Extents &&image_extents, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_offset,
+ int op_flags, const ZTracer::Trace &parent_trace)
+ : AbstractImageWriteRequest<ImageCtxT>(
+ image_ctx, aio_comp, std::move(image_extents), "compare_and_write", parent_trace),
+ m_cmp_bl(std::move(cmp_bl)), m_bl(std::move(bl)),
+ m_mismatch_offset(mismatch_offset), m_op_flags(op_flags) {
+ }
+
+protected:
+ void send_image_cache_request() override;
+
+ void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl);
+
+ ObjectDispatchSpec *create_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ uint64_t journal_tid, Context *on_finish) override;
+
+ uint64_t append_journal_event(bool synchronous) override;
+ void update_stats(size_t length) override;
+
+ aio_type_t get_aio_type() const override {
+ return AIO_TYPE_COMPARE_AND_WRITE;
+ }
+ const char *get_request_type() const override {
+ return "aio_compare_and_write";
+ }
+
+ int prune_object_extents(ObjectExtents* object_extents) const override;
+
+private:
+ bufferlist m_cmp_bl;
+ bufferlist m_bl;
+ uint64_t *m_mismatch_offset;
+ int m_op_flags;
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::ImageRequest<librbd::ImageCtx>;
+extern template class librbd::io::ImageReadRequest<librbd::ImageCtx>;
+extern template class librbd::io::AbstractImageWriteRequest<librbd::ImageCtx>;
+extern template class librbd::io::ImageWriteRequest<librbd::ImageCtx>;
+extern template class librbd::io::ImageDiscardRequest<librbd::ImageCtx>;
+extern template class librbd::io::ImageFlushRequest<librbd::ImageCtx>;
+extern template class librbd::io::ImageWriteSameRequest<librbd::ImageCtx>;
+extern template class librbd::io::ImageCompareAndWriteRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_IMAGE_REQUEST_H
diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc
new file mode 100644
index 00000000..7bdaf2f2
--- /dev/null
+++ b/src/librbd/io/ImageRequestWQ.cc
@@ -0,0 +1,1043 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ImageRequestWQ.h"
+#include "common/errno.h"
+#include "common/zipkin_trace.h"
+#include "common/Cond.h"
+#include "librbd/ExclusiveLock.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/ImageWatcher.h"
+#include "librbd/internal.h"
+#include "librbd/Utils.h"
+#include "librbd/exclusive_lock/Policy.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ImageRequest.h"
+#include "librbd/io/ImageDispatchSpec.h"
+#include "common/EventTrace.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+namespace {
+
+template <typename I>
+void flush_image(I& image_ctx, Context* on_finish) {
+ auto aio_comp = librbd::io::AioCompletion::create_and_start(
+ on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH);
+ auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request(
+ image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {});
+ req->send();
+ delete req;
+}
+
+} // anonymous namespace
+
+template <typename I>
+struct ImageRequestWQ<I>::C_AcquireLock : public Context {
+ ImageRequestWQ *work_queue;
+ ImageDispatchSpec<I> *image_request;
+
+ C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request)
+ : work_queue(work_queue), image_request(image_request) {
+ }
+
+ void finish(int r) override {
+ work_queue->handle_acquire_lock(r, image_request);
+ }
+};
+
+template <typename I>
+struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
+ ImageRequestWQ *work_queue;
+ explicit C_BlockedWrites(ImageRequestWQ *_work_queue)
+ : work_queue(_work_queue) {
+ }
+
+ void finish(int r) override {
+ work_queue->handle_blocked_writes(r);
+ }
+};
+
+template <typename I>
+struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
+ ImageRequestWQ *work_queue;
+ ImageDispatchSpec<I> *image_request;
+
+ C_RefreshFinish(ImageRequestWQ *work_queue,
+ ImageDispatchSpec<I> *image_request)
+ : work_queue(work_queue), image_request(image_request) {
+ }
+ void finish(int r) override {
+ work_queue->handle_refreshed(r, image_request);
+ }
+};
+
+static std::map<uint64_t, std::string> throttle_flags = {
+ { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
+ { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
+ { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
+ { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
+ { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
+ { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
+};
+
+template <typename I>
+ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
+ time_t ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp),
+ m_image_ctx(*image_ctx),
+ m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+
+ SafeTimer *timer;
+ Mutex *timer_lock;
+ ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
+
+ for (auto flag : throttle_flags) {
+ m_throttles.push_back(make_pair(
+ flag.first,
+ new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
+ }
+
+ this->register_work_queue();
+}
+
+template <typename I>
+ImageRequestWQ<I>::~ImageRequestWQ() {
+ for (auto t : m_throttles) {
+ delete t.second;
+ }
+}
+
+template <typename I>
+ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
+ ReadResult &&read_result, int op_flags) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << dendl;
+
+ C_SaferCond cond;
+ AioCompletion *c = AioCompletion::create(&cond);
+ aio_read(c, off, len, std::move(read_result), op_flags, false);
+ return cond.wait();
+}
+
+template <typename I>
+ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << dendl;
+
+ m_image_ctx.snap_lock.get_read();
+ int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
+ m_image_ctx.snap_lock.put_read();
+ if (r < 0) {
+ lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ C_SaferCond cond;
+ AioCompletion *c = AioCompletion::create(&cond);
+ aio_write(c, off, len, std::move(bl), op_flags, false);
+
+ r = cond.wait();
+ if (r < 0) {
+ return r;
+ }
+ return len;
+}
+
+template <typename I>
+ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
+ uint32_t discard_granularity_bytes) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << dendl;
+
+ m_image_ctx.snap_lock.get_read();
+ int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
+ m_image_ctx.snap_lock.put_read();
+ if (r < 0) {
+ lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ C_SaferCond cond;
+ AioCompletion *c = AioCompletion::create(&cond);
+ aio_discard(c, off, len, discard_granularity_bytes, false);
+
+ r = cond.wait();
+ if (r < 0) {
+ return r;
+ }
+ return len;
+}
+
+template <typename I>
+ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << ", data_len " << bl.length() << dendl;
+
+ m_image_ctx.snap_lock.get_read();
+ int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
+ m_image_ctx.snap_lock.put_read();
+ if (r < 0) {
+ lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ C_SaferCond cond;
+ AioCompletion *c = AioCompletion::create(&cond);
+ aio_writesame(c, off, len, std::move(bl), op_flags, false);
+
+ r = cond.wait();
+ if (r < 0) {
+ return r;
+ }
+ return len;
+}
+
+template <typename I>
+ssize_t ImageRequestWQ<I>::write_zeroes(uint64_t off, uint64_t len,
+ int zero_flags, int op_flags) {
+ auto cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << dendl;
+
+ m_image_ctx.snap_lock.get_read();
+ int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
+ m_image_ctx.snap_lock.put_read();
+ if (r < 0) {
+ lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ C_SaferCond ctx;
+ auto aio_comp = io::AioCompletion::create(&ctx);
+ aio_write_zeroes(aio_comp, off, len, zero_flags, op_flags, false);
+
+ r = ctx.wait();
+ if (r < 0) {
+ return r;
+ }
+ return len;
+}
+
+template <typename I>
+ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
+ bufferlist &&cmp_bl,
+ bufferlist &&bl,
+ uint64_t *mismatch_off,
+ int op_flags){
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
+ << off << ", " << "len = " << len << dendl;
+
+ m_image_ctx.snap_lock.get_read();
+ int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
+ m_image_ctx.snap_lock.put_read();
+ if (r < 0) {
+ lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ C_SaferCond cond;
+ AioCompletion *c = AioCompletion::create(&cond);
+ aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
+ mismatch_off, op_flags, false);
+
+ r = cond.wait();
+ if (r < 0) {
+ return r;
+ }
+
+ return len;
+}
+
+template <typename I>
+int ImageRequestWQ<I>::flush() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
+
+ C_SaferCond cond;
+ AioCompletion *c = AioCompletion::create(&cond);
+ aio_flush(c, false);
+
+ int r = cond.wait();
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+template <typename I>
+void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
+ ReadResult &&read_result, int op_flags,
+ bool native_async) {
+ CephContext *cct = m_image_ctx.cct;
+ FUNCTRACE(cct);
+ ZTracer::Trace trace;
+ if (m_image_ctx.blkin_trace_all) {
+ trace.init("wq: read", &m_image_ctx.trace_endpoint);
+ trace.event("start");
+ }
+
+ c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "completion=" << c << ", off=" << off << ", "
+ << "len=" << len << ", " << "flags=" << op_flags << dendl;
+
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
+ c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_io(c)) {
+ return;
+ }
+
+ // if journaling is enabled -- we need to replay the journal because
+ // it might contain an uncommitted write
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
+ require_lock_on_read()) {
+ queue(ImageDispatchSpec<I>::create_read_request(
+ m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
+ trace));
+ } else {
+ c->start_op();
+ ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
+ std::move(read_result), op_flags, trace);
+ finish_in_flight_io();
+ }
+ trace.event("finish");
+}
+
+template <typename I>
+void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags,
+ bool native_async) {
+ CephContext *cct = m_image_ctx.cct;
+ FUNCTRACE(cct);
+ ZTracer::Trace trace;
+ if (m_image_ctx.blkin_trace_all) {
+ trace.init("wq: write", &m_image_ctx.trace_endpoint);
+ trace.event("init");
+ }
+
+ c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "completion=" << c << ", off=" << off << ", "
+ << "len=" << len << ", flags=" << op_flags << dendl;
+
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
+ c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_io(c)) {
+ return;
+ }
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.non_blocking_aio || writes_blocked()) {
+ queue(ImageDispatchSpec<I>::create_write_request(
+ m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
+ } else {
+ c->start_op();
+ ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
+ std::move(bl), op_flags, trace);
+ finish_in_flight_io();
+ }
+ trace.event("finish");
+}
+
+template <typename I>
+void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
+ uint64_t len,
+ uint32_t discard_granularity_bytes,
+ bool native_async) {
+ CephContext *cct = m_image_ctx.cct;
+ FUNCTRACE(cct);
+ ZTracer::Trace trace;
+ if (m_image_ctx.blkin_trace_all) {
+ trace.init("wq: discard", &m_image_ctx.trace_endpoint);
+ trace.event("init");
+ }
+
+ c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "completion=" << c << ", off=" << off << ", len=" << len
+ << dendl;
+
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
+ c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_io(c)) {
+ return;
+ }
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.non_blocking_aio || writes_blocked()) {
+ queue(ImageDispatchSpec<I>::create_discard_request(
+ m_image_ctx, c, off, len, discard_granularity_bytes, trace));
+ } else {
+ c->start_op();
+ ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}},
+ discard_granularity_bytes, trace);
+ finish_in_flight_io();
+ }
+ trace.event("finish");
+}
+
+template <typename I>
+void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
+ CephContext *cct = m_image_ctx.cct;
+ FUNCTRACE(cct);
+ ZTracer::Trace trace;
+ if (m_image_ctx.blkin_trace_all) {
+ trace.init("wq: flush", &m_image_ctx.trace_endpoint);
+ trace.event("init");
+ }
+
+ c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "completion=" << c << dendl;
+
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
+ c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_io(c)) {
+ return;
+ }
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
+ queue(ImageDispatchSpec<I>::create_flush_request(
+ m_image_ctx, c, FLUSH_SOURCE_USER, trace));
+ } else {
+ c->start_op();
+ ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace);
+ finish_in_flight_io();
+ }
+ trace.event("finish");
+}
+
+template <typename I>
+void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
+ uint64_t len, bufferlist &&bl,
+ int op_flags, bool native_async) {
+ CephContext *cct = m_image_ctx.cct;
+ FUNCTRACE(cct);
+ ZTracer::Trace trace;
+ if (m_image_ctx.blkin_trace_all) {
+ trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
+ trace.event("init");
+ }
+
+ c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "completion=" << c << ", off=" << off << ", "
+ << "len=" << len << ", data_len = " << bl.length() << ", "
+ << "flags=" << op_flags << dendl;
+
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
+ c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_io(c)) {
+ return;
+ }
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.non_blocking_aio || writes_blocked()) {
+ queue(ImageDispatchSpec<I>::create_write_same_request(
+ m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
+ } else {
+ c->start_op();
+ ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl),
+ op_flags, trace);
+ finish_in_flight_io();
+ }
+ trace.event("finish");
+}
+
+
+template <typename I>
+void ImageRequestWQ<I>::aio_write_zeroes(io::AioCompletion *aio_comp,
+ uint64_t off, uint64_t len,
+ int zero_flags, int op_flags,
+ bool native_async) {
+ auto cct = m_image_ctx.cct;
+ FUNCTRACE(cct);
+ ZTracer::Trace trace;
+ if (m_image_ctx.blkin_trace_all) {
+ trace.init("io: write_zeroes", &m_image_ctx.trace_endpoint);
+ trace.event("init");
+ }
+
+ aio_comp->init_time(util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_DISCARD);
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "completion=" << aio_comp << ", off=" << off << ", "
+ << "len=" << len << dendl;
+
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
+ aio_comp->set_event_notify(true);
+ }
+
+ // validate the supported flags
+ if (zero_flags != 0U) {
+ aio_comp->fail(-EINVAL);
+ return;
+ }
+
+ if (!start_in_flight_io(aio_comp)) {
+ return;
+ }
+
+ // enable partial discard (zeroing) of objects
+ uint32_t discard_granularity_bytes = 0;
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.non_blocking_aio || writes_blocked()) {
+ queue(ImageDispatchSpec<I>::create_discard_request(
+ m_image_ctx, aio_comp, off, len, discard_granularity_bytes, trace));
+ } else {
+ aio_comp->start_op();
+ ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, {{off, len}},
+ discard_granularity_bytes, trace);
+ finish_in_flight_io();
+ }
+ trace.event("finish");
+}
+
+template <typename I>
+void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
+ uint64_t off, uint64_t len,
+ bufferlist &&cmp_bl,
+ bufferlist &&bl,
+ uint64_t *mismatch_off,
+ int op_flags, bool native_async) {
+ CephContext *cct = m_image_ctx.cct;
+ FUNCTRACE(cct);
+ ZTracer::Trace trace;
+ if (m_image_ctx.blkin_trace_all) {
+ trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
+ trace.event("init");
+ }
+
+ c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "completion=" << c << ", off=" << off << ", "
+ << "len=" << len << dendl;
+
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
+ c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_io(c)) {
+ return;
+ }
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.non_blocking_aio || writes_blocked()) {
+ queue(ImageDispatchSpec<I>::create_compare_and_write_request(
+ m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
+ mismatch_off, op_flags, trace));
+ } else {
+ c->start_op();
+ ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}},
+ std::move(cmp_bl), std::move(bl),
+ mismatch_off, op_flags, trace);
+ finish_in_flight_io();
+ }
+ trace.event("finish");
+}
+
+template <typename I>
+void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
+ ceph_assert(m_image_ctx.owner_lock.is_locked());
+
+ {
+ RWLock::WLocker locker(m_lock);
+ ceph_assert(!m_shutdown);
+ m_shutdown = true;
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
+ << dendl;
+ if (m_in_flight_ios > 0) {
+ m_on_shutdown = on_shutdown;
+ return;
+ }
+ }
+
+ // ensure that all in-flight IO is flushed
+ flush_image(m_image_ctx, on_shutdown);
+}
+
+template <typename I>
+int ImageRequestWQ<I>::block_writes() {
+ C_SaferCond cond_ctx;
+ block_writes(&cond_ctx);
+ return cond_ctx.wait();
+}
+
+template <typename I>
+void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
+ ceph_assert(m_image_ctx.owner_lock.is_locked());
+ CephContext *cct = m_image_ctx.cct;
+
+ {
+ RWLock::WLocker locker(m_lock);
+ ++m_write_blockers;
+ ldout(cct, 5) << &m_image_ctx << ", " << "num="
+ << m_write_blockers << dendl;
+ if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
+ m_write_blocker_contexts.push_back(on_blocked);
+ return;
+ }
+ }
+
+ // ensure that all in-flight IO is flushed
+ flush_image(m_image_ctx, on_blocked);
+}
+
+template <typename I>
+void ImageRequestWQ<I>::unblock_writes() {
+ CephContext *cct = m_image_ctx.cct;
+
+ bool wake_up = false;
+ Contexts waiter_contexts;
+ {
+ RWLock::WLocker locker(m_lock);
+ ceph_assert(m_write_blockers > 0);
+ --m_write_blockers;
+
+ ldout(cct, 5) << &m_image_ctx << ", " << "num="
+ << m_write_blockers << dendl;
+ if (m_write_blockers == 0) {
+ wake_up = true;
+ std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
+ }
+ }
+
+ if (wake_up) {
+ for (auto ctx : waiter_contexts) {
+ ctx->complete(0);
+ }
+ this->signal();
+ }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) {
+ ceph_assert(m_image_ctx.owner_lock.is_locked());
+ CephContext *cct = m_image_ctx.cct;
+
+ {
+ RWLock::WLocker locker(m_lock);
+ ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers="
+ << m_write_blockers << dendl;
+ if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
+ m_unblocked_write_waiter_contexts.push_back(on_unblocked);
+ return;
+ }
+ }
+
+ on_unblocked->complete(0);
+}
+
+template <typename I>
+void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ bool wake_up = false;
+ {
+ RWLock::WLocker locker(m_lock);
+ switch (direction) {
+ case DIRECTION_READ:
+ wake_up = (enabled != m_require_lock_on_read);
+ m_require_lock_on_read = enabled;
+ break;
+ case DIRECTION_WRITE:
+ wake_up = (enabled != m_require_lock_on_write);
+ m_require_lock_on_write = enabled;
+ break;
+ case DIRECTION_BOTH:
+ wake_up = (enabled != m_require_lock_on_read ||
+ enabled != m_require_lock_on_write);
+ m_require_lock_on_read = enabled;
+ m_require_lock_on_write = enabled;
+ break;
+ }
+ }
+
+ // wake up the thread pool whenever the state changes so that
+ // we can re-request the lock if required
+ if (wake_up) {
+ this->signal();
+ }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){
+ for (auto pair : m_throttles) {
+ pair.second->set_schedule_tick_min(tick);
+ }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag,
+ uint64_t limit,
+ uint64_t burst) {
+ CephContext *cct = m_image_ctx.cct;
+ TokenBucketThrottle *throttle = nullptr;
+ for (auto pair : m_throttles) {
+ if (flag == pair.first) {
+ throttle = pair.second;
+ break;
+ }
+ }
+ ceph_assert(throttle != nullptr);
+
+ int r = throttle->set_limit(limit, burst);
+ if (r < 0) {
+ lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
+ << "burst(" << burst << ") is less than "
+ << "limit(" << limit << ")" << dendl;
+ // if apply failed, we should at least make sure the limit works.
+ throttle->set_limit(limit, 0);
+ }
+
+ if (limit)
+ m_qos_enabled_flag |= flag;
+ else
+ m_qos_enabled_flag &= ~flag;
+}
+
+template <typename I>
+void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item,
+ uint64_t flag) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
+
+ std::lock_guard pool_locker{this->get_pool_lock()};
+ ceph_assert(m_io_throttled.load() > 0);
+ item->set_throttled(flag);
+ if (item->were_all_throttled()) {
+ this->requeue_back(pool_locker, item);
+ --m_io_throttled;
+ this->signal(pool_locker);
+ }
+}
+
+template <typename I>
+bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
+ uint64_t tokens = 0;
+ uint64_t flag = 0;
+ bool blocked = false;
+ TokenBucketThrottle* throttle = nullptr;
+
+ for (auto t : m_throttles) {
+ flag = t.first;
+ if (item->was_throttled(flag))
+ continue;
+
+ if (!(m_qos_enabled_flag & flag)) {
+ item->set_throttled(flag);
+ continue;
+ }
+
+ throttle = t.second;
+ if (item->tokens_requested(flag, &tokens) &&
+ throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>,
+ &ImageRequestWQ<I>::handle_throttle_ready>(
+ tokens, this, item, flag)) {
+ blocked = true;
+ } else {
+ item->set_throttled(flag);
+ }
+ }
+ return blocked;
+}
+
+template <typename I>
+void *ImageRequestWQ<I>::_void_dequeue() {
+ CephContext *cct = m_image_ctx.cct;
+ ImageDispatchSpec<I> *peek_item = this->front();
+
+ // no queued IO requests or all IO is blocked/stalled
+ if (peek_item == nullptr || m_io_blockers.load() > 0) {
+ return nullptr;
+ }
+
+ if (needs_throttle(peek_item)) {
+ ldout(cct, 15) << "throttling IO " << peek_item << dendl;
+
+ ++m_io_throttled;
+ // dequeue the throttled item
+ ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
+ return nullptr;
+ }
+
+ bool lock_required;
+ bool refresh_required = m_image_ctx.state->is_refresh_required();
+ {
+ RWLock::RLocker locker(m_lock);
+ bool write_op = peek_item->is_write_op();
+ lock_required = is_lock_required(write_op);
+ if (write_op) {
+ if (!lock_required && m_write_blockers > 0) {
+ // missing lock is not the write blocker
+ return nullptr;
+ }
+
+ if (!lock_required && !refresh_required) {
+ // completed ops will requeue the IO -- don't count it as in-progress
+ m_in_flight_writes++;
+ }
+ }
+ }
+
+ auto item = reinterpret_cast<ImageDispatchSpec<I> *>(
+ ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue());
+ ceph_assert(peek_item == item);
+
+ if (lock_required) {
+ this->get_pool_lock().unlock();
+ m_image_ctx.owner_lock.get_read();
+ if (m_image_ctx.exclusive_lock != nullptr) {
+ ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
+ if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
+ lderr(cct) << "op requires exclusive lock" << dendl;
+ fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(),
+ item);
+
+ // wake up the IO since we won't be returning a request to process
+ this->signal();
+ } else {
+ // stall IO until the acquire completes
+ ++m_io_blockers;
+ m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item));
+ }
+ } else {
+ // raced with the exclusive lock being disabled
+ lock_required = false;
+ }
+ m_image_ctx.owner_lock.put_read();
+ this->get_pool_lock().lock();
+
+ if (lock_required) {
+ return nullptr;
+ }
+ }
+
+ if (refresh_required) {
+ ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
+
+ // stall IO until the refresh completes
+ ++m_io_blockers;
+
+ this->get_pool_lock().unlock();
+ m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
+ this->get_pool_lock().lock();
+ return nullptr;
+ }
+
+ item->start_op();
+ return item;
+}
+
+template <typename I>
+void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "req=" << req << dendl;
+
+ req->send();
+
+ finish_queued_io(req);
+ if (req->is_write_op()) {
+ finish_in_flight_write();
+ }
+ delete req;
+
+ finish_in_flight_io();
+}
+
+template <typename I>
+void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) {
+ RWLock::RLocker locker(m_lock);
+ if (req->is_write_op()) {
+ ceph_assert(m_queued_writes > 0);
+ m_queued_writes--;
+ } else {
+ ceph_assert(m_queued_reads > 0);
+ m_queued_reads--;
+ }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::finish_in_flight_write() {
+ bool writes_blocked = false;
+ {
+ RWLock::RLocker locker(m_lock);
+ ceph_assert(m_in_flight_writes > 0);
+ if (--m_in_flight_writes == 0 &&
+ !m_write_blocker_contexts.empty()) {
+ writes_blocked = true;
+ }
+ }
+
+ if (writes_blocked) {
+ flush_image(m_image_ctx, new C_BlockedWrites(this));
+ }
+}
+
+template <typename I>
+int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
+ RWLock::RLocker locker(m_lock);
+
+ if (m_shutdown) {
+ CephContext *cct = m_image_ctx.cct;
+ lderr(cct) << "IO received on closed image" << dendl;
+
+ c->get();
+ c->fail(-ESHUTDOWN);
+ return false;
+ }
+
+ if (!m_image_ctx.data_ctx.is_valid()) {
+ CephContext *cct = m_image_ctx.cct;
+ lderr(cct) << "missing data pool" << dendl;
+
+ c->get();
+ c->fail(-ENODEV);
+ return false;
+ }
+
+ m_in_flight_ios++;
+ return true;
+}
+
+template <typename I>
+void ImageRequestWQ<I>::finish_in_flight_io() {
+ Context *on_shutdown;
+ {
+ RWLock::RLocker locker(m_lock);
+ if (--m_in_flight_ios > 0 || !m_shutdown) {
+ return;
+ }
+ on_shutdown = m_on_shutdown;
+ }
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << "completing shut down" << dendl;
+
+ ceph_assert(on_shutdown != nullptr);
+ flush_image(m_image_ctx, on_shutdown);
+}
+
+template <typename I>
+void ImageRequestWQ<I>::fail_in_flight_io(
+ int r, ImageDispatchSpec<I> *req) {
+ this->process_finish();
+ req->fail(r);
+ finish_queued_io(req);
+ delete req;
+ finish_in_flight_io();
+}
+
+template <typename I>
+bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
+ ceph_assert(m_lock.is_locked());
+ return ((write_op && m_require_lock_on_write) ||
+ (!write_op && m_require_lock_on_read));
+}
+
+template <typename I>
+void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) {
+ ceph_assert(m_image_ctx.owner_lock.is_locked());
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "req=" << req << dendl;
+
+ if (req->is_write_op()) {
+ m_queued_writes++;
+ } else {
+ m_queued_reads++;
+ }
+
+ ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req);
+}
+
+template <typename I>
+void ImageRequestWQ<I>::handle_acquire_lock(
+ int r, ImageDispatchSpec<I> *req) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
+
+ if (r < 0) {
+ fail_in_flight_io(r, req);
+ } else {
+ // since IO was stalled for acquire -- original IO order is preserved
+ // if we requeue this op for work queue processing
+ this->requeue_front(req);
+ }
+
+ ceph_assert(m_io_blockers.load() > 0);
+ --m_io_blockers;
+ this->signal();
+}
+
+template <typename I>
+void ImageRequestWQ<I>::handle_refreshed(
+ int r, ImageDispatchSpec<I> *req) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
+ << "req=" << req << dendl;
+ if (r < 0) {
+ fail_in_flight_io(r, req);
+ } else {
+ // since IO was stalled for refresh -- original IO order is preserved
+ // if we requeue this op for work queue processing
+ this->requeue_front(req);
+ }
+
+ ceph_assert(m_io_blockers.load() > 0);
+ --m_io_blockers;
+ this->signal();
+}
+
+template <typename I>
+void ImageRequestWQ<I>::handle_blocked_writes(int r) {
+ Contexts contexts;
+ {
+ RWLock::WLocker locker(m_lock);
+ contexts.swap(m_write_blocker_contexts);
+ }
+
+ for (auto ctx : contexts) {
+ ctx->complete(0);
+ }
+}
+
+template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
+
+} // namespace io
+} // namespace librbd
diff --git a/src/librbd/io/ImageRequestWQ.h b/src/librbd/io/ImageRequestWQ.h
new file mode 100644
index 00000000..23dcd48d
--- /dev/null
+++ b/src/librbd/io/ImageRequestWQ.h
@@ -0,0 +1,154 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
+#define CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
+
+#include "include/Context.h"
+#include "common/RWLock.h"
+#include "common/Throttle.h"
+#include "common/WorkQueue.h"
+#include "librbd/io/Types.h"
+
+#include <list>
+#include <atomic>
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace io {
+
+class AioCompletion;
+template <typename> class ImageDispatchSpec;
+class ReadResult;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ImageRequestWQ
+ : public ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> > {
+public:
+ ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
+ ThreadPool *tp);
+ ~ImageRequestWQ();
+
+ ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result,
+ int op_flags);
+ ssize_t write(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags);
+ ssize_t discard(uint64_t off, uint64_t len,
+ uint32_t discard_granularity_bytes);
+ ssize_t writesame(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags);
+ ssize_t write_zeroes(uint64_t off, uint64_t len, int zero_flags,
+ int op_flags);
+ ssize_t compare_and_write(uint64_t off, uint64_t len,
+ bufferlist &&cmp_bl, bufferlist &&bl,
+ uint64_t *mismatch_off, int op_flags);
+ int flush();
+
+ void aio_read(AioCompletion *c, uint64_t off, uint64_t len,
+ ReadResult &&read_result, int op_flags, bool native_async=true);
+ void aio_write(AioCompletion *c, uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags, bool native_async=true);
+ void aio_discard(AioCompletion *c, uint64_t off, uint64_t len,
+ uint32_t discard_granularity_bytes, bool native_async=true);
+ void aio_flush(AioCompletion *c, bool native_async=true);
+ void aio_writesame(AioCompletion *c, uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags, bool native_async=true);
+ void aio_write_zeroes(AioCompletion *c, uint64_t off, uint64_t len,
+ int zero_flags, int op_flags, bool native_async);
+ void aio_compare_and_write(AioCompletion *c, uint64_t off,
+ uint64_t len, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_off,
+ int op_flags, bool native_async=true);
+
+ using ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> >::drain;
+ using ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> >::empty;
+
+ void shut_down(Context *on_shutdown);
+
+ inline bool writes_blocked() const {
+ RWLock::RLocker locker(m_lock);
+ return (m_write_blockers > 0);
+ }
+
+ int block_writes();
+ void block_writes(Context *on_blocked);
+ void unblock_writes();
+
+ void wait_on_writes_unblocked(Context *on_unblocked);
+
+ void set_require_lock(Direction direction, bool enabled);
+
+ void apply_qos_schedule_tick_min(uint64_t tick);
+
+ void apply_qos_limit(const uint64_t flag, uint64_t limit, uint64_t burst);
+protected:
+ void *_void_dequeue() override;
+ void process(ImageDispatchSpec<ImageCtxT> *req) override;
+ bool _empty() override {
+ return (ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT>>::_empty() &&
+ m_io_throttled.load() == 0);
+ }
+
+
+private:
+ typedef std::list<Context *> Contexts;
+
+ struct C_AcquireLock;
+ struct C_BlockedWrites;
+ struct C_RefreshFinish;
+
+ ImageCtxT &m_image_ctx;
+ mutable RWLock m_lock;
+ Contexts m_write_blocker_contexts;
+ uint32_t m_write_blockers = 0;
+ Contexts m_unblocked_write_waiter_contexts;
+ bool m_require_lock_on_read = false;
+ bool m_require_lock_on_write = false;
+ std::atomic<unsigned> m_queued_reads { 0 };
+ std::atomic<unsigned> m_queued_writes { 0 };
+ std::atomic<unsigned> m_in_flight_ios { 0 };
+ std::atomic<unsigned> m_in_flight_writes { 0 };
+ std::atomic<unsigned> m_io_blockers { 0 };
+ std::atomic<unsigned> m_io_throttled { 0 };
+
+ std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles;
+ uint64_t m_qos_enabled_flag = 0;
+
+ bool m_shutdown = false;
+ Context *m_on_shutdown = nullptr;
+
+ bool is_lock_required(bool write_op) const;
+
+ inline bool require_lock_on_read() const {
+ RWLock::RLocker locker(m_lock);
+ return m_require_lock_on_read;
+ }
+ inline bool writes_empty() const {
+ RWLock::RLocker locker(m_lock);
+ return (m_queued_writes == 0);
+ }
+
+ bool needs_throttle(ImageDispatchSpec<ImageCtxT> *item);
+
+ void finish_queued_io(ImageDispatchSpec<ImageCtxT> *req);
+ void finish_in_flight_write();
+
+ int start_in_flight_io(AioCompletion *c);
+ void finish_in_flight_io();
+ void fail_in_flight_io(int r, ImageDispatchSpec<ImageCtxT> *req);
+
+ void queue(ImageDispatchSpec<ImageCtxT> *req);
+
+ void handle_acquire_lock(int r, ImageDispatchSpec<ImageCtxT> *req);
+ void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req);
+ void handle_blocked_writes(int r);
+
+ void handle_throttle_ready(int r, ImageDispatchSpec<ImageCtxT> *item, uint64_t flag);
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
diff --git a/src/librbd/io/ObjectDispatch.cc b/src/librbd/io/ObjectDispatch.cc
new file mode 100644
index 00000000..85c8b034
--- /dev/null
+++ b/src/librbd/io/ObjectDispatch.cc
@@ -0,0 +1,135 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ObjectDispatch.h"
+#include "common/dout.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/io/ObjectRequest.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::ObjectDispatch: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+template <typename I>
+ObjectDispatch<I>::ObjectDispatch(I* image_ctx)
+ : m_image_ctx(image_ctx) {
+}
+
+template <typename I>
+void ObjectDispatch<I>::shut_down(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ m_image_ctx->op_work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+bool ObjectDispatch<I>::read(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+ ExtentMap* extent_map, int* object_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl;
+
+ *dispatch_result = DISPATCH_RESULT_COMPLETE;
+ auto req = new ObjectReadRequest<I>(m_image_ctx, oid, object_no, object_off,
+ object_len, snap_id, op_flags,
+ parent_trace, read_data, extent_map,
+ on_dispatched);
+ req->send();
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::discard(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl;
+
+ *dispatch_result = DISPATCH_RESULT_COMPLETE;
+ auto req = new ObjectDiscardRequest<I>(m_image_ctx, oid, object_no,
+ object_off, object_len, snapc,
+ discard_flags, parent_trace,
+ on_dispatched);
+ req->send();
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << data.length() << dendl;
+
+ *dispatch_result = DISPATCH_RESULT_COMPLETE;
+ auto req = new ObjectWriteRequest<I>(m_image_ctx, oid, object_no, object_off,
+ std::move(data), snapc, op_flags,
+ parent_trace, on_dispatched);
+ req->send();
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write_same(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, Extents&& buffer_extents, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl;
+
+ *dispatch_result = DISPATCH_RESULT_COMPLETE;
+ auto req = new ObjectWriteSameRequest<I>(m_image_ctx, oid, object_no,
+ object_off, object_len,
+ std::move(data), snapc, op_flags,
+ parent_trace, on_dispatched);
+ req->send();
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::compare_and_write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << write_data.length()
+ << dendl;
+
+ *dispatch_result = DISPATCH_RESULT_COMPLETE;
+ auto req = new ObjectCompareAndWriteRequest<I>(m_image_ctx, oid, object_no,
+ object_off,
+ std::move(cmp_data),
+ std::move(write_data), snapc,
+ mismatch_offset, op_flags,
+ parent_trace, on_dispatched);
+ req->send();
+ return true;
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::ObjectDispatch<librbd::ImageCtx>;
diff --git a/src/librbd/io/ObjectDispatch.h b/src/librbd/io/ObjectDispatch.h
new file mode 100644
index 00000000..32e8272d
--- /dev/null
+++ b/src/librbd/io/ObjectDispatch.h
@@ -0,0 +1,104 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCH_H
+#define CEPH_LIBRBD_IO_OBJECT_DISPATCH_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "common/snap_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/io/Types.h"
+#include "librbd/io/ObjectDispatchInterface.h"
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+struct AioCompletion;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ObjectDispatch : public ObjectDispatchInterface {
+public:
+ ObjectDispatch(ImageCtxT* image_ctx);
+
+ ObjectDispatchLayer get_object_dispatch_layer() const override {
+ return OBJECT_DISPATCH_LAYER_CORE;
+ }
+
+ void shut_down(Context* on_finish) override;
+
+ bool read(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+ ExtentMap* extent_map, int* object_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+
+ bool discard(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) override;
+
+ bool write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) override;
+
+ bool write_same(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, Extents&& buffer_extents, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) override;
+
+ bool compare_and_write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+
+ bool flush(
+ FlushSource flush_source, const ZTracer::Trace &parent_trace,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override {
+ return false;
+ }
+
+ bool invalidate_cache(Context* on_finish) override {
+ return false;
+ }
+ bool reset_existence_cache(Context* on_finish) override {
+ return false;
+ }
+
+ void extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid) override {
+ }
+
+private:
+ ImageCtxT* m_image_ctx;
+
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::ObjectDispatch<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCH_H
diff --git a/src/librbd/io/ObjectDispatchInterface.h b/src/librbd/io/ObjectDispatchInterface.h
new file mode 100644
index 00000000..fddf0db1
--- /dev/null
+++ b/src/librbd/io/ObjectDispatchInterface.h
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCH_INTERFACE_H
+#define CEPH_LIBRBD_IO_OBJECT_DISPATCH_INTERFACE_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "common/snap_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/io/Types.h"
+
+struct Context;
+struct RWLock;
+
+namespace librbd {
+namespace io {
+
+struct AioCompletion;
+
+struct ObjectDispatchInterface {
+ virtual ~ObjectDispatchInterface() {
+ }
+
+ virtual ObjectDispatchLayer get_object_dispatch_layer() const = 0;
+
+ virtual void shut_down(Context* on_finish) = 0;
+
+ virtual bool read(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+ ExtentMap* extent_map, int* object_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) = 0;
+
+ virtual bool discard(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context**on_finish, Context* on_dispatched) = 0;
+
+ virtual bool write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context**on_finish, Context* on_dispatched) = 0;
+
+ virtual bool write_same(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, Extents&& buffer_extents, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, DispatchResult* dispatch_result,
+ Context**on_finish, Context* on_dispatched) = 0;
+
+ virtual bool compare_and_write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) = 0;
+
+ virtual bool flush(
+ FlushSource flush_source, const ZTracer::Trace &parent_trace,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) = 0;
+
+ virtual bool invalidate_cache(Context* on_finish) = 0;
+ virtual bool reset_existence_cache(Context* on_finish) = 0;
+
+ virtual void extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid) = 0;
+
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCH_INTERFACE_H
diff --git a/src/librbd/io/ObjectDispatchSpec.cc b/src/librbd/io/ObjectDispatchSpec.cc
new file mode 100644
index 00000000..2b6dccc5
--- /dev/null
+++ b/src/librbd/io/ObjectDispatchSpec.cc
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "include/Context.h"
+#include "librbd/io/ObjectDispatcher.h"
+#include <boost/variant.hpp>
+
+namespace librbd {
+namespace io {
+
+void ObjectDispatchSpec::C_Dispatcher::complete(int r) {
+ if (r < 0) {
+ finish(r);
+ return;
+ }
+
+ switch (object_dispatch_spec->dispatch_result) {
+ case DISPATCH_RESULT_CONTINUE:
+ object_dispatch_spec->send();
+ break;
+ case DISPATCH_RESULT_COMPLETE:
+ finish(r);
+ break;
+ case DISPATCH_RESULT_INVALID:
+ ceph_abort();
+ break;
+ }
+}
+
+void ObjectDispatchSpec::C_Dispatcher::finish(int r) {
+ on_finish->complete(r);
+ delete object_dispatch_spec;
+}
+
+void ObjectDispatchSpec::send() {
+ object_dispatcher->send(this);
+}
+
+void ObjectDispatchSpec::fail(int r) {
+ ceph_assert(r < 0);
+ dispatcher_ctx.complete(r);
+}
+
+} // namespace io
+} // namespace librbd
diff --git a/src/librbd/io/ObjectDispatchSpec.h b/src/librbd/io/ObjectDispatchSpec.h
new file mode 100644
index 00000000..a26d89fe
--- /dev/null
+++ b/src/librbd/io/ObjectDispatchSpec.h
@@ -0,0 +1,266 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCH_SPEC_H
+#define CEPH_LIBRBD_IO_OBJECT_DISPATCH_SPEC_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/snap_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/io/Types.h"
+#include <boost/variant/variant.hpp>
+
+namespace librbd {
+namespace io {
+
+struct ObjectDispatcherInterface;
+
+struct ObjectDispatchSpec {
+private:
+ // helper to avoid extra heap allocation per object IO
+ struct C_Dispatcher : public Context {
+ ObjectDispatchSpec* object_dispatch_spec;
+ Context* on_finish;
+
+ C_Dispatcher(ObjectDispatchSpec* object_dispatch_spec, Context* on_finish)
+ : object_dispatch_spec(object_dispatch_spec), on_finish(on_finish) {
+ }
+
+ void complete(int r) override;
+ void finish(int r) override;
+ };
+
+public:
+ struct RequestBase {
+ std::string oid;
+ uint64_t object_no;
+ uint64_t object_off;
+
+ RequestBase(const std::string& oid, uint64_t object_no, uint64_t object_off)
+ : oid(oid), object_no(object_no), object_off(object_off) {
+ }
+ };
+
+ struct ReadRequest : public RequestBase {
+ uint64_t object_len;
+ librados::snap_t snap_id;
+ ceph::bufferlist* read_data;
+ ExtentMap* extent_map;
+
+ ReadRequest(const std::string& oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, librados::snap_t snap_id,
+ ceph::bufferlist* read_data, ExtentMap* extent_map)
+ : RequestBase(oid, object_no, object_off),
+ object_len(object_len), snap_id(snap_id), read_data(read_data),
+ extent_map(extent_map) {
+ }
+ };
+
+ struct WriteRequestBase : public RequestBase {
+ ::SnapContext snapc;
+ uint64_t journal_tid;
+
+ WriteRequestBase(const std::string& oid, uint64_t object_no,
+ uint64_t object_off, const ::SnapContext& snapc,
+ uint64_t journal_tid)
+ : RequestBase(oid, object_no, object_off), snapc(snapc),
+ journal_tid(journal_tid) {
+ }
+ };
+
+ struct DiscardRequest : public WriteRequestBase {
+ uint64_t object_len;
+ int discard_flags;
+
+ DiscardRequest(const std::string& oid, uint64_t object_no,
+ uint64_t object_off, uint64_t object_len,
+ int discard_flags, const ::SnapContext& snapc,
+ uint64_t journal_tid)
+ : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid),
+ object_len(object_len), discard_flags(discard_flags) {
+ }
+ };
+
+ struct WriteRequest : public WriteRequestBase {
+ ceph::bufferlist data;
+
+ WriteRequest(const std::string& oid, uint64_t object_no,
+ uint64_t object_off, ceph::bufferlist&& data,
+ const ::SnapContext& snapc, uint64_t journal_tid)
+ : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid),
+ data(std::move(data)) {
+ }
+ };
+
+ struct WriteSameRequest : public WriteRequestBase {
+ uint64_t object_len;
+ Extents buffer_extents;
+ ceph::bufferlist data;
+
+ WriteSameRequest(const std::string& oid, uint64_t object_no,
+ uint64_t object_off, uint64_t object_len,
+ Extents&& buffer_extents, ceph::bufferlist&& data,
+ const ::SnapContext& snapc, uint64_t journal_tid)
+ : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid),
+ object_len(object_len), buffer_extents(std::move(buffer_extents)),
+ data(std::move(data)) {
+ }
+ };
+
+ struct CompareAndWriteRequest : public WriteRequestBase {
+ ceph::bufferlist cmp_data;
+ ceph::bufferlist data;
+ uint64_t* mismatch_offset;
+
+ CompareAndWriteRequest(const std::string& oid, uint64_t object_no,
+ uint64_t object_off, ceph::bufferlist&& cmp_data,
+ ceph::bufferlist&& data, uint64_t* mismatch_offset,
+ const ::SnapContext& snapc, uint64_t journal_tid)
+ : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid),
+ cmp_data(std::move(cmp_data)), data(std::move(data)),
+ mismatch_offset(mismatch_offset) {
+ }
+ };
+
+ struct FlushRequest {
+ FlushSource flush_source;
+
+ FlushRequest(FlushSource flush_source) : flush_source(flush_source) {
+ }
+ };
+
+ typedef boost::variant<ReadRequest,
+ DiscardRequest,
+ WriteRequest,
+ WriteSameRequest,
+ CompareAndWriteRequest,
+ FlushRequest> Request;
+
+ C_Dispatcher dispatcher_ctx;
+
+ ObjectDispatcherInterface* object_dispatcher;
+ ObjectDispatchLayer object_dispatch_layer;
+ int object_dispatch_flags = 0;
+ DispatchResult dispatch_result = DISPATCH_RESULT_INVALID;
+
+ Request request;
+ int op_flags;
+ ZTracer::Trace parent_trace;
+
+ template <typename ImageCtxT>
+ static ObjectDispatchSpec* create_read(
+ ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer,
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+ ExtentMap* extent_map, Context* on_finish) {
+ return new ObjectDispatchSpec(image_ctx->io_object_dispatcher,
+ object_dispatch_layer,
+ ReadRequest{oid, object_no, object_off,
+ object_len, snap_id, read_data,
+ extent_map},
+ op_flags, parent_trace, on_finish);
+ }
+
+ template <typename ImageCtxT>
+ static ObjectDispatchSpec* create_discard(
+ ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer,
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+ uint64_t journal_tid, const ZTracer::Trace &parent_trace,
+ Context *on_finish) {
+ return new ObjectDispatchSpec(image_ctx->io_object_dispatcher,
+ object_dispatch_layer,
+ DiscardRequest{oid, object_no, object_off,
+ object_len, discard_flags,
+ snapc, journal_tid},
+ 0, parent_trace, on_finish);
+ }
+
+ template <typename ImageCtxT>
+ static ObjectDispatchSpec* create_write(
+ ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer,
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ uint64_t journal_tid, const ZTracer::Trace &parent_trace,
+ Context *on_finish) {
+ return new ObjectDispatchSpec(image_ctx->io_object_dispatcher,
+ object_dispatch_layer,
+ WriteRequest{oid, object_no, object_off,
+ std::move(data), snapc,
+ journal_tid},
+ op_flags, parent_trace, on_finish);
+ }
+
+ template <typename ImageCtxT>
+ static ObjectDispatchSpec* create_write_same(
+ ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer,
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, Extents&& buffer_extents,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ uint64_t journal_tid, const ZTracer::Trace &parent_trace,
+ Context *on_finish) {
+ return new ObjectDispatchSpec(image_ctx->io_object_dispatcher,
+ object_dispatch_layer,
+ WriteSameRequest{oid, object_no, object_off,
+ object_len,
+ std::move(buffer_extents),
+ std::move(data), snapc,
+ journal_tid},
+ op_flags, parent_trace, on_finish);
+ }
+
+ template <typename ImageCtxT>
+ static ObjectDispatchSpec* create_compare_and_write(
+ ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer,
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc, uint64_t *mismatch_offset, int op_flags,
+ uint64_t journal_tid, const ZTracer::Trace &parent_trace,
+ Context *on_finish) {
+ return new ObjectDispatchSpec(image_ctx->io_object_dispatcher,
+ object_dispatch_layer,
+ CompareAndWriteRequest{oid, object_no,
+ object_off,
+ std::move(cmp_data),
+ std::move(write_data),
+ mismatch_offset,
+ snapc, journal_tid},
+ op_flags, parent_trace, on_finish);
+ }
+
+ template <typename ImageCtxT>
+ static ObjectDispatchSpec* create_flush(
+ ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer,
+ FlushSource flush_source, const ZTracer::Trace &parent_trace,
+ Context *on_finish) {
+ return new ObjectDispatchSpec(image_ctx->io_object_dispatcher,
+ object_dispatch_layer,
+ FlushRequest{flush_source}, 0,
+ parent_trace, on_finish);
+ }
+
+ void send();
+ void fail(int r);
+
+private:
+ template <typename> friend class ObjectDispatcher;
+
+ ObjectDispatchSpec(ObjectDispatcherInterface* object_dispatcher,
+ ObjectDispatchLayer object_dispatch_layer,
+ Request&& request, int op_flags,
+ const ZTracer::Trace& parent_trace, Context* on_finish)
+ : dispatcher_ctx(this, on_finish), object_dispatcher(object_dispatcher),
+ object_dispatch_layer(object_dispatch_layer), request(std::move(request)),
+ op_flags(op_flags), parent_trace(parent_trace) {
+ }
+
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCH_SPEC_H
diff --git a/src/librbd/io/ObjectDispatcher.cc b/src/librbd/io/ObjectDispatcher.cc
new file mode 100644
index 00000000..befc0750
--- /dev/null
+++ b/src/librbd/io/ObjectDispatcher.cc
@@ -0,0 +1,354 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ObjectDispatcher.h"
+#include "include/Context.h"
+#include "common/AsyncOpTracker.h"
+#include "common/dout.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/io/ObjectDispatch.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include <boost/variant.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::ObjectDispatcher: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+template <typename I>
+struct ObjectDispatcher<I>::C_LayerIterator : public Context {
+ ObjectDispatcher* object_dispatcher;
+ Context* on_finish;
+
+ ObjectDispatchLayer object_dispatch_layer = OBJECT_DISPATCH_LAYER_NONE;
+
+ C_LayerIterator(ObjectDispatcher* object_dispatcher,
+ Context* on_finish)
+ : object_dispatcher(object_dispatcher), on_finish(on_finish) {
+ }
+
+ void complete(int r) override {
+ while (true) {
+ object_dispatcher->m_lock.get_read();
+ auto it = object_dispatcher->m_object_dispatches.upper_bound(
+ object_dispatch_layer);
+ if (it == object_dispatcher->m_object_dispatches.end()) {
+ object_dispatcher->m_lock.put_read();
+ Context::complete(r);
+ return;
+ }
+
+ auto& object_dispatch_meta = it->second;
+ auto object_dispatch = object_dispatch_meta.object_dispatch;
+
+ // prevent recursive locking back into the dispatcher while handling IO
+ object_dispatch_meta.async_op_tracker->start_op();
+ object_dispatcher->m_lock.put_read();
+
+ // next loop should start after current layer
+ object_dispatch_layer = object_dispatch->get_object_dispatch_layer();
+
+ auto handled = execute(object_dispatch, this);
+ object_dispatch_meta.async_op_tracker->finish_op();
+
+ if (handled) {
+ break;
+ }
+ }
+ }
+
+ void finish(int r) override {
+ on_finish->complete(0);
+ }
+
+ virtual bool execute(ObjectDispatchInterface* object_dispatch,
+ Context* on_finish) = 0;
+};
+
+template <typename I>
+struct ObjectDispatcher<I>::C_InvalidateCache : public C_LayerIterator {
+ C_InvalidateCache(ObjectDispatcher* object_dispatcher, Context* on_finish)
+ : C_LayerIterator(object_dispatcher, on_finish) {
+ }
+
+ bool execute(ObjectDispatchInterface* object_dispatch,
+ Context* on_finish) override {
+ return object_dispatch->invalidate_cache(on_finish);
+ }
+};
+
+template <typename I>
+struct ObjectDispatcher<I>::C_ResetExistenceCache : public C_LayerIterator {
+ C_ResetExistenceCache(ObjectDispatcher* object_dispatcher, Context* on_finish)
+ : C_LayerIterator(object_dispatcher, on_finish) {
+ }
+
+ bool execute(ObjectDispatchInterface* object_dispatch,
+ Context* on_finish) override {
+ return object_dispatch->reset_existence_cache(on_finish);
+ }
+};
+
+template <typename I>
+struct ObjectDispatcher<I>::SendVisitor : public boost::static_visitor<bool> {
+ ObjectDispatchInterface* object_dispatch;
+ ObjectDispatchSpec* object_dispatch_spec;
+
+ SendVisitor(ObjectDispatchInterface* object_dispatch,
+ ObjectDispatchSpec* object_dispatch_spec)
+ : object_dispatch(object_dispatch),
+ object_dispatch_spec(object_dispatch_spec) {
+ }
+
+ bool operator()(ObjectDispatchSpec::ReadRequest& read) const {
+ return object_dispatch->read(
+ read.oid, read.object_no, read.object_off, read.object_len, read.snap_id,
+ object_dispatch_spec->op_flags, object_dispatch_spec->parent_trace,
+ read.read_data, read.extent_map,
+ &object_dispatch_spec->object_dispatch_flags,
+ &object_dispatch_spec->dispatch_result,
+ &object_dispatch_spec->dispatcher_ctx.on_finish,
+ &object_dispatch_spec->dispatcher_ctx);
+ }
+
+ bool operator()(ObjectDispatchSpec::DiscardRequest& discard) const {
+ return object_dispatch->discard(
+ discard.oid, discard.object_no, discard.object_off, discard.object_len,
+ discard.snapc, discard.discard_flags, object_dispatch_spec->parent_trace,
+ &object_dispatch_spec->object_dispatch_flags, &discard.journal_tid,
+ &object_dispatch_spec->dispatch_result,
+ &object_dispatch_spec->dispatcher_ctx.on_finish,
+ &object_dispatch_spec->dispatcher_ctx);
+ }
+
+ bool operator()(ObjectDispatchSpec::WriteRequest& write) const {
+ return object_dispatch->write(
+ write.oid, write.object_no, write.object_off, std::move(write.data),
+ write.snapc, object_dispatch_spec->op_flags,
+ object_dispatch_spec->parent_trace,
+ &object_dispatch_spec->object_dispatch_flags, &write.journal_tid,
+ &object_dispatch_spec->dispatch_result,
+ &object_dispatch_spec->dispatcher_ctx.on_finish,
+ &object_dispatch_spec->dispatcher_ctx);
+ }
+
+ bool operator()(ObjectDispatchSpec::WriteSameRequest& write_same) const {
+ return object_dispatch->write_same(
+ write_same.oid, write_same.object_no, write_same.object_off,
+ write_same.object_len, std::move(write_same.buffer_extents),
+ std::move(write_same.data), write_same.snapc,
+ object_dispatch_spec->op_flags, object_dispatch_spec->parent_trace,
+ &object_dispatch_spec->object_dispatch_flags, &write_same.journal_tid,
+ &object_dispatch_spec->dispatch_result,
+ &object_dispatch_spec->dispatcher_ctx.on_finish,
+ &object_dispatch_spec->dispatcher_ctx);
+ }
+
+ bool operator()(
+ ObjectDispatchSpec::CompareAndWriteRequest& compare_and_write) const {
+ return object_dispatch->compare_and_write(
+ compare_and_write.oid, compare_and_write.object_no,
+ compare_and_write.object_off, std::move(compare_and_write.cmp_data),
+ std::move(compare_and_write.data), compare_and_write.snapc,
+ object_dispatch_spec->op_flags, object_dispatch_spec->parent_trace,
+ compare_and_write.mismatch_offset,
+ &object_dispatch_spec->object_dispatch_flags,
+ &compare_and_write.journal_tid,
+ &object_dispatch_spec->dispatch_result,
+ &object_dispatch_spec->dispatcher_ctx.on_finish,
+ &object_dispatch_spec->dispatcher_ctx);
+ }
+
+ bool operator()(ObjectDispatchSpec::FlushRequest& flush) const {
+ return object_dispatch->flush(
+ flush.flush_source, object_dispatch_spec->parent_trace,
+ &object_dispatch_spec->dispatch_result,
+ &object_dispatch_spec->dispatcher_ctx.on_finish,
+ &object_dispatch_spec->dispatcher_ctx);
+ }
+};
+
+template <typename I>
+ObjectDispatcher<I>::ObjectDispatcher(I* image_ctx)
+ : m_image_ctx(image_ctx),
+ m_lock(librbd::util::unique_lock_name("librbd::io::ObjectDispatcher::lock",
+ this)) {
+ // configure the core object dispatch handler on startup
+ auto object_dispatch = new ObjectDispatch(image_ctx);
+ m_object_dispatches[object_dispatch->get_object_dispatch_layer()] =
+ {object_dispatch, new AsyncOpTracker()};
+}
+
+template <typename I>
+ObjectDispatcher<I>::~ObjectDispatcher() {
+ ceph_assert(m_object_dispatches.empty());
+}
+
+template <typename I>
+void ObjectDispatcher<I>::shut_down(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ std::map<ObjectDispatchLayer, ObjectDispatchMeta> object_dispatches;
+ {
+ RWLock::WLocker locker(m_lock);
+ std::swap(object_dispatches, m_object_dispatches);
+ }
+
+ for (auto it : object_dispatches) {
+ shut_down_object_dispatch(it.second, &on_finish);
+ }
+ on_finish->complete(0);
+}
+
+template <typename I>
+void ObjectDispatcher<I>::register_object_dispatch(
+ ObjectDispatchInterface* object_dispatch) {
+ auto cct = m_image_ctx->cct;
+ auto type = object_dispatch->get_object_dispatch_layer();
+ ldout(cct, 5) << "object_dispatch_layer=" << type << dendl;
+
+ RWLock::WLocker locker(m_lock);
+ ceph_assert(type < OBJECT_DISPATCH_LAYER_LAST);
+
+ auto result = m_object_dispatches.insert(
+ {type, {object_dispatch, new AsyncOpTracker()}});
+ ceph_assert(result.second);
+}
+
+template <typename I>
+void ObjectDispatcher<I>::shut_down_object_dispatch(
+ ObjectDispatchLayer object_dispatch_layer, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << "object_dispatch_layer=" << object_dispatch_layer << dendl;
+ ceph_assert(object_dispatch_layer + 1 < OBJECT_DISPATCH_LAYER_LAST);
+
+ ObjectDispatchMeta object_dispatch_meta;
+ {
+ RWLock::WLocker locker(m_lock);
+ auto it = m_object_dispatches.find(object_dispatch_layer);
+ ceph_assert(it != m_object_dispatches.end());
+
+ object_dispatch_meta = it->second;
+ m_object_dispatches.erase(it);
+ }
+
+ shut_down_object_dispatch(object_dispatch_meta, &on_finish);
+ on_finish->complete(0);
+}
+
+template <typename I>
+void ObjectDispatcher<I>::shut_down_object_dispatch(
+ ObjectDispatchMeta& object_dispatch_meta, Context** on_finish) {
+ auto object_dispatch = object_dispatch_meta.object_dispatch;
+ auto async_op_tracker = object_dispatch_meta.async_op_tracker;
+
+ Context* ctx = *on_finish;
+ ctx = new FunctionContext(
+ [object_dispatch, async_op_tracker, ctx](int r) {
+ delete object_dispatch;
+ delete async_op_tracker;
+
+ ctx->complete(r);
+ });
+ ctx = new FunctionContext([object_dispatch, ctx](int r) {
+ object_dispatch->shut_down(ctx);
+ });
+ *on_finish = new FunctionContext([async_op_tracker, ctx](int r) {
+ async_op_tracker->wait_for_ops(ctx);
+ });
+}
+
+template <typename I>
+void ObjectDispatcher<I>::invalidate_cache(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ on_finish = util::create_async_context_callback(*m_image_ctx, on_finish);
+ auto ctx = new C_InvalidateCache(this, on_finish);
+ ctx->complete(0);
+}
+
+template <typename I>
+void ObjectDispatcher<I>::reset_existence_cache(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ on_finish = util::create_async_context_callback(*m_image_ctx, on_finish);
+ auto ctx = new C_ResetExistenceCache(this, on_finish);
+ ctx->complete(0);
+}
+
+template <typename I>
+void ObjectDispatcher<I>::extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << object_no << " " << object_off << "~" << object_len
+ << dendl;
+
+ for (auto it : m_object_dispatches) {
+ auto& object_dispatch_meta = it.second;
+ auto object_dispatch = object_dispatch_meta.object_dispatch;
+ object_dispatch->extent_overwritten(object_no, object_off, object_len,
+ journal_tid, new_journal_tid);
+ }
+}
+
+template <typename I>
+void ObjectDispatcher<I>::send(ObjectDispatchSpec* object_dispatch_spec) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "object_dispatch_spec=" << object_dispatch_spec << dendl;
+
+ auto object_dispatch_layer = object_dispatch_spec->object_dispatch_layer;
+ ceph_assert(object_dispatch_layer + 1 < OBJECT_DISPATCH_LAYER_LAST);
+
+ // apply the IO request to all layers -- this method will be re-invoked
+ // by the dispatch layer if continuing / restarting the IO
+ while (true) {
+ m_lock.get_read();
+ object_dispatch_layer = object_dispatch_spec->object_dispatch_layer;
+ auto it = m_object_dispatches.upper_bound(object_dispatch_layer);
+ if (it == m_object_dispatches.end()) {
+ // the request is complete if handled by all layers
+ object_dispatch_spec->dispatch_result = DISPATCH_RESULT_COMPLETE;
+ m_lock.put_read();
+ break;
+ }
+
+ auto& object_dispatch_meta = it->second;
+ auto object_dispatch = object_dispatch_meta.object_dispatch;
+ object_dispatch_spec->dispatch_result = DISPATCH_RESULT_INVALID;
+
+ // prevent recursive locking back into the dispatcher while handling IO
+ object_dispatch_meta.async_op_tracker->start_op();
+ m_lock.put_read();
+
+ // advance to next layer in case we skip or continue
+ object_dispatch_spec->object_dispatch_layer =
+ object_dispatch->get_object_dispatch_layer();
+
+ bool handled = boost::apply_visitor(
+ SendVisitor{object_dispatch, object_dispatch_spec},
+ object_dispatch_spec->request);
+ object_dispatch_meta.async_op_tracker->finish_op();
+
+ // handled ops will resume when the dispatch ctx is invoked
+ if (handled) {
+ return;
+ }
+ }
+
+ // skipped through to the last layer
+ object_dispatch_spec->dispatcher_ctx.complete(0);
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::ObjectDispatcher<librbd::ImageCtx>;
diff --git a/src/librbd/io/ObjectDispatcher.h b/src/librbd/io/ObjectDispatcher.h
new file mode 100644
index 00000000..0370d268
--- /dev/null
+++ b/src/librbd/io/ObjectDispatcher.h
@@ -0,0 +1,89 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCHER_H
+#define CEPH_LIBRBD_IO_OBJECT_DISPATCHER_H
+
+#include "include/int_types.h"
+#include "common/RWLock.h"
+#include "librbd/io/Types.h"
+#include <map>
+
+struct AsyncOpTracker;
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+struct ObjectDispatchInterface;
+struct ObjectDispatchSpec;
+
+struct ObjectDispatcherInterface {
+public:
+ virtual ~ObjectDispatcherInterface() {
+ }
+
+private:
+ friend class ObjectDispatchSpec;
+
+ virtual void send(ObjectDispatchSpec* object_dispatch_spec) = 0;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ObjectDispatcher : public ObjectDispatcherInterface {
+public:
+ ObjectDispatcher(ImageCtxT* image_ctx);
+ ~ObjectDispatcher();
+
+ void shut_down(Context* on_finish);
+
+ void register_object_dispatch(ObjectDispatchInterface* object_dispatch);
+ void shut_down_object_dispatch(ObjectDispatchLayer object_dispatch_layer,
+ Context* on_finish);
+
+ void invalidate_cache(Context* on_finish);
+ void reset_existence_cache(Context* on_finish);
+
+ void extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid);
+
+private:
+ struct ObjectDispatchMeta {
+ ObjectDispatchInterface* object_dispatch = nullptr;
+ AsyncOpTracker* async_op_tracker = nullptr;
+
+ ObjectDispatchMeta() {
+ }
+ ObjectDispatchMeta(ObjectDispatchInterface* object_dispatch,
+ AsyncOpTracker* async_op_tracker)
+ : object_dispatch(object_dispatch), async_op_tracker(async_op_tracker) {
+ }
+ };
+
+ struct C_LayerIterator;
+ struct C_InvalidateCache;
+ struct C_ResetExistenceCache;
+ struct SendVisitor;
+
+ ImageCtxT* m_image_ctx;
+
+ RWLock m_lock;
+ std::map<ObjectDispatchLayer, ObjectDispatchMeta> m_object_dispatches;
+
+ void send(ObjectDispatchSpec* object_dispatch_spec);
+
+ void shut_down_object_dispatch(ObjectDispatchMeta& object_dispatch_meta,
+ Context** on_finish);
+
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::ObjectDispatcher<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCHER_H
diff --git a/src/librbd/io/ObjectRequest.cc b/src/librbd/io/ObjectRequest.cc
new file mode 100644
index 00000000..60f53df1
--- /dev/null
+++ b/src/librbd/io/ObjectRequest.cc
@@ -0,0 +1,729 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ObjectRequest.h"
+#include "common/ceph_context.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/Mutex.h"
+#include "common/RWLock.h"
+#include "common/WorkQueue.h"
+#include "include/Context.h"
+#include "include/err.h"
+#include "osd/osd_types.h"
+
+#include "librbd/ExclusiveLock.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ObjectMap.h"
+#include "librbd/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/CopyupRequest.h"
+#include "librbd/io/ImageRequest.h"
+#include "librbd/io/ReadResult.h"
+
+#include <boost/bind.hpp>
+#include <boost/optional.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::ObjectRequest: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+namespace {
+
+template <typename I>
+inline bool is_copy_on_read(I *ictx, librados::snap_t snap_id) {
+ RWLock::RLocker snap_locker(ictx->snap_lock);
+ return (ictx->clone_copy_on_read &&
+ !ictx->read_only && snap_id == CEPH_NOSNAP &&
+ (ictx->exclusive_lock == nullptr ||
+ ictx->exclusive_lock->is_lock_owner()));
+}
+
+} // anonymous namespace
+
+template <typename I>
+ObjectRequest<I>*
+ObjectRequest<I>::create_write(I *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *completion) {
+ return new ObjectWriteRequest<I>(ictx, oid, object_no, object_off,
+ std::move(data), snapc, op_flags,
+ parent_trace, completion);
+}
+
+template <typename I>
+ObjectRequest<I>*
+ObjectRequest<I>::create_discard(I *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ uint64_t object_len,
+ const ::SnapContext &snapc,
+ int discard_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *completion) {
+ return new ObjectDiscardRequest<I>(ictx, oid, object_no, object_off,
+ object_len, snapc, discard_flags,
+ parent_trace, completion);
+}
+
+template <typename I>
+ObjectRequest<I>*
+ObjectRequest<I>::create_write_same(I *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ uint64_t object_len,
+ ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *completion) {
+ return new ObjectWriteSameRequest<I>(ictx, oid, object_no, object_off,
+ object_len, std::move(data), snapc,
+ op_flags, parent_trace, completion);
+}
+
+template <typename I>
+ObjectRequest<I>*
+ObjectRequest<I>::create_compare_and_write(I *ictx, const std::string &oid,
+ uint64_t object_no,
+ uint64_t object_off,
+ ceph::bufferlist&& cmp_data,
+ ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc,
+ uint64_t *mismatch_offset,
+ int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *completion) {
+ return new ObjectCompareAndWriteRequest<I>(ictx, oid, object_no, object_off,
+ std::move(cmp_data),
+ std::move(write_data), snapc,
+ mismatch_offset, op_flags,
+ parent_trace, completion);
+}
+
+template <typename I>
+ObjectRequest<I>::ObjectRequest(I *ictx, const std::string &oid,
+ uint64_t objectno, uint64_t off,
+ uint64_t len, librados::snap_t snap_id,
+ const char *trace_name,
+ const ZTracer::Trace &trace,
+ Context *completion)
+ : m_ictx(ictx), m_oid(oid), m_object_no(objectno), m_object_off(off),
+ m_object_len(len), m_snap_id(snap_id), m_completion(completion),
+ m_trace(util::create_trace(*ictx, "", trace)) {
+ ceph_assert(m_ictx->data_ctx.is_valid());
+ if (m_trace.valid()) {
+ m_trace.copy_name(trace_name + std::string(" ") + oid);
+ m_trace.event("start");
+ }
+}
+
+template <typename I>
+void ObjectRequest<I>::add_write_hint(I& image_ctx,
+ librados::ObjectWriteOperation *wr) {
+ if (image_ctx.enable_alloc_hint) {
+ wr->set_alloc_hint2(image_ctx.get_object_size(),
+ image_ctx.get_object_size(),
+ image_ctx.alloc_hint_flags);
+ } else if (image_ctx.alloc_hint_flags != 0U) {
+ wr->set_alloc_hint2(0, 0, image_ctx.alloc_hint_flags);
+ }
+}
+
+template <typename I>
+bool ObjectRequest<I>::compute_parent_extents(Extents *parent_extents,
+ bool read_request) {
+ ceph_assert(m_ictx->snap_lock.is_locked());
+ ceph_assert(m_ictx->parent_lock.is_locked());
+
+ m_has_parent = false;
+ parent_extents->clear();
+
+ uint64_t parent_overlap;
+ int r = m_ictx->get_parent_overlap(m_snap_id, &parent_overlap);
+ if (r < 0) {
+ // NOTE: it's possible for a snapshot to be deleted while we are
+ // still reading from it
+ lderr(m_ictx->cct) << "failed to retrieve parent overlap: "
+ << cpp_strerror(r) << dendl;
+ return false;
+ }
+
+ if (!read_request && !m_ictx->migration_info.empty()) {
+ parent_overlap = m_ictx->migration_info.overlap;
+ }
+
+ if (parent_overlap == 0) {
+ return false;
+ }
+
+ Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, m_object_no, 0,
+ m_ictx->layout.object_size, *parent_extents);
+ uint64_t object_overlap = m_ictx->prune_parent_extents(*parent_extents,
+ parent_overlap);
+ if (object_overlap > 0) {
+ ldout(m_ictx->cct, 20) << "overlap " << parent_overlap << " "
+ << "extents " << *parent_extents << dendl;
+ m_has_parent = !parent_extents->empty();
+ return true;
+ }
+ return false;
+}
+
+template <typename I>
+void ObjectRequest<I>::async_finish(int r) {
+ ldout(m_ictx->cct, 20) << "r=" << r << dendl;
+ m_ictx->op_work_queue->queue(util::create_context_callback<
+ ObjectRequest<I>, &ObjectRequest<I>::finish>(this), r);
+}
+
+template <typename I>
+void ObjectRequest<I>::finish(int r) {
+ ldout(m_ictx->cct, 20) << "r=" << r << dendl;
+ m_completion->complete(r);
+ delete this;
+}
+
+/** read **/
+
+template <typename I>
+ObjectReadRequest<I>::ObjectReadRequest(I *ictx, const std::string &oid,
+ uint64_t objectno, uint64_t offset,
+ uint64_t len, librados::snap_t snap_id,
+ int op_flags,
+ const ZTracer::Trace &parent_trace,
+ bufferlist* read_data,
+ ExtentMap* extent_map,
+ Context *completion)
+ : ObjectRequest<I>(ictx, oid, objectno, offset, len, snap_id, "read",
+ parent_trace, completion),
+ m_op_flags(op_flags), m_read_data(read_data), m_extent_map(extent_map) {
+}
+
+template <typename I>
+void ObjectReadRequest<I>::send() {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << dendl;
+
+ read_object();
+}
+
+template <typename I>
+void ObjectReadRequest<I>::read_object() {
+ I *image_ctx = this->m_ictx;
+ {
+ RWLock::RLocker snap_locker(image_ctx->snap_lock);
+ if (image_ctx->object_map != nullptr &&
+ !image_ctx->object_map->object_may_exist(this->m_object_no)) {
+ image_ctx->op_work_queue->queue(new FunctionContext([this](int r) {
+ read_parent();
+ }), 0);
+ return;
+ }
+ }
+
+ ldout(image_ctx->cct, 20) << dendl;
+
+ librados::ObjectReadOperation op;
+ if (this->m_object_len >= image_ctx->sparse_read_threshold_bytes) {
+ op.sparse_read(this->m_object_off, this->m_object_len, m_extent_map,
+ m_read_data, nullptr);
+ } else {
+ op.read(this->m_object_off, this->m_object_len, m_read_data, nullptr);
+ }
+ op.set_op_flags2(m_op_flags);
+
+ librados::AioCompletion *rados_completion = util::create_rados_callback<
+ ObjectReadRequest<I>, &ObjectReadRequest<I>::handle_read_object>(this);
+ int flags = image_ctx->get_read_flags(this->m_snap_id);
+ int r = image_ctx->data_ctx.aio_operate(
+ this->m_oid, rados_completion, &op, flags, nullptr,
+ (this->m_trace.valid() ? this->m_trace.get_info() : nullptr));
+ ceph_assert(r == 0);
+
+ rados_completion->release();
+}
+
+template <typename I>
+void ObjectReadRequest<I>::handle_read_object(int r) {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << "r=" << r << dendl;
+
+ if (r == -ENOENT) {
+ read_parent();
+ return;
+ } else if (r < 0) {
+ lderr(image_ctx->cct) << "failed to read from object: "
+ << cpp_strerror(r) << dendl;
+ this->finish(r);
+ return;
+ }
+
+ this->finish(0);
+}
+
+template <typename I>
+void ObjectReadRequest<I>::read_parent() {
+ I *image_ctx = this->m_ictx;
+
+ RWLock::RLocker snap_locker(image_ctx->snap_lock);
+ RWLock::RLocker parent_locker(image_ctx->parent_lock);
+
+ // calculate reverse mapping onto the image
+ Extents parent_extents;
+ Striper::extent_to_file(image_ctx->cct, &image_ctx->layout,
+ this->m_object_no, this->m_object_off,
+ this->m_object_len, parent_extents);
+
+ uint64_t parent_overlap = 0;
+ uint64_t object_overlap = 0;
+ int r = image_ctx->get_parent_overlap(this->m_snap_id, &parent_overlap);
+ if (r == 0) {
+ object_overlap = image_ctx->prune_parent_extents(parent_extents,
+ parent_overlap);
+ }
+
+ if (object_overlap == 0) {
+ parent_locker.unlock();
+ snap_locker.unlock();
+
+ this->finish(-ENOENT);
+ return;
+ }
+
+ ldout(image_ctx->cct, 20) << dendl;
+
+ auto parent_completion = AioCompletion::create_and_start<
+ ObjectReadRequest<I>, &ObjectReadRequest<I>::handle_read_parent>(
+ this, util::get_image_ctx(image_ctx->parent), AIO_TYPE_READ);
+ ImageRequest<I>::aio_read(image_ctx->parent, parent_completion,
+ std::move(parent_extents), ReadResult{m_read_data},
+ 0, this->m_trace);
+}
+
+template <typename I>
+void ObjectReadRequest<I>::handle_read_parent(int r) {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << "r=" << r << dendl;
+
+ if (r == -ENOENT) {
+ this->finish(r);
+ return;
+ } else if (r < 0) {
+ lderr(image_ctx->cct) << "failed to read parent extents: "
+ << cpp_strerror(r) << dendl;
+ this->finish(r);
+ return;
+ }
+
+ copyup();
+}
+
+template <typename I>
+void ObjectReadRequest<I>::copyup() {
+ I *image_ctx = this->m_ictx;
+ if (!is_copy_on_read(image_ctx, this->m_snap_id)) {
+ this->finish(0);
+ return;
+ }
+
+ image_ctx->owner_lock.get_read();
+ image_ctx->snap_lock.get_read();
+ image_ctx->parent_lock.get_read();
+ Extents parent_extents;
+ if (!this->compute_parent_extents(&parent_extents, true) ||
+ (image_ctx->exclusive_lock != nullptr &&
+ !image_ctx->exclusive_lock->is_lock_owner())) {
+ image_ctx->parent_lock.put_read();
+ image_ctx->snap_lock.put_read();
+ image_ctx->owner_lock.put_read();
+ this->finish(0);
+ return;
+ }
+
+ ldout(image_ctx->cct, 20) << dendl;
+
+ image_ctx->copyup_list_lock.Lock();
+ auto it = image_ctx->copyup_list.find(this->m_object_no);
+ if (it == image_ctx->copyup_list.end()) {
+ // create and kick off a CopyupRequest
+ auto new_req = CopyupRequest<I>::create(
+ image_ctx, this->m_oid, this->m_object_no, std::move(parent_extents),
+ this->m_trace);
+
+ image_ctx->copyup_list[this->m_object_no] = new_req;
+ image_ctx->copyup_list_lock.Unlock();
+ image_ctx->parent_lock.put_read();
+ image_ctx->snap_lock.put_read();
+ new_req->send();
+ } else {
+ image_ctx->copyup_list_lock.Unlock();
+ image_ctx->parent_lock.put_read();
+ image_ctx->snap_lock.put_read();
+ }
+
+ image_ctx->owner_lock.put_read();
+ this->finish(0);
+}
+
+/** write **/
+
+template <typename I>
+AbstractObjectWriteRequest<I>::AbstractObjectWriteRequest(
+ I *ictx, const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t len, const ::SnapContext &snapc, const char *trace_name,
+ const ZTracer::Trace &parent_trace, Context *completion)
+ : ObjectRequest<I>(ictx, oid, object_no, object_off, len, CEPH_NOSNAP,
+ trace_name, parent_trace, completion),
+ m_snap_seq(snapc.seq.val)
+{
+ m_snaps.insert(m_snaps.end(), snapc.snaps.begin(), snapc.snaps.end());
+
+ if (this->m_object_off == 0 &&
+ this->m_object_len == ictx->get_object_size()) {
+ m_full_object = true;
+ }
+
+ compute_parent_info();
+
+ ictx->snap_lock.get_read();
+ if (!ictx->migration_info.empty()) {
+ m_guarding_migration_write = true;
+ }
+ ictx->snap_lock.put_read();
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::compute_parent_info() {
+ I *image_ctx = this->m_ictx;
+ RWLock::RLocker snap_locker(image_ctx->snap_lock);
+ RWLock::RLocker parent_locker(image_ctx->parent_lock);
+
+ this->compute_parent_extents(&m_parent_extents, false);
+
+ if (!this->has_parent() ||
+ (m_full_object && m_snaps.empty() && !is_post_copyup_write_required())) {
+ m_copyup_enabled = false;
+ }
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::add_write_hint(
+ librados::ObjectWriteOperation *wr) {
+ I *image_ctx = this->m_ictx;
+ RWLock::RLocker snap_locker(image_ctx->snap_lock);
+ if (image_ctx->object_map == nullptr || !this->m_object_may_exist) {
+ ObjectRequest<I>::add_write_hint(*image_ctx, wr);
+ }
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::send() {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << this->get_op_type() << " " << this->m_oid << " "
+ << this->m_object_off << "~" << this->m_object_len
+ << dendl;
+ {
+ RWLock::RLocker snap_lock(image_ctx->snap_lock);
+ if (image_ctx->object_map == nullptr) {
+ m_object_may_exist = true;
+ } else {
+ // should have been flushed prior to releasing lock
+ ceph_assert(image_ctx->exclusive_lock->is_lock_owner());
+ m_object_may_exist = image_ctx->object_map->object_may_exist(
+ this->m_object_no);
+ }
+ }
+
+ if (!m_object_may_exist && is_no_op_for_nonexistent_object()) {
+ ldout(image_ctx->cct, 20) << "skipping no-op on nonexistent object"
+ << dendl;
+ this->async_finish(0);
+ return;
+ }
+
+ pre_write_object_map_update();
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::pre_write_object_map_update() {
+ I *image_ctx = this->m_ictx;
+
+ image_ctx->snap_lock.get_read();
+ if (image_ctx->object_map == nullptr || !is_object_map_update_enabled()) {
+ image_ctx->snap_lock.put_read();
+ write_object();
+ return;
+ }
+
+ if (!m_object_may_exist && m_copyup_enabled) {
+ // optimization: copyup required
+ image_ctx->snap_lock.put_read();
+ copyup();
+ return;
+ }
+
+ uint8_t new_state = this->get_pre_write_object_map_state();
+ ldout(image_ctx->cct, 20) << this->m_oid << " " << this->m_object_off
+ << "~" << this->m_object_len << dendl;
+
+ image_ctx->object_map_lock.get_write();
+ if (image_ctx->object_map->template aio_update<
+ AbstractObjectWriteRequest<I>,
+ &AbstractObjectWriteRequest<I>::handle_pre_write_object_map_update>(
+ CEPH_NOSNAP, this->m_object_no, new_state, {}, this->m_trace, false,
+ this)) {
+ image_ctx->object_map_lock.put_write();
+ image_ctx->snap_lock.put_read();
+ return;
+ }
+
+ image_ctx->object_map_lock.put_write();
+ image_ctx->snap_lock.put_read();
+ write_object();
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::handle_pre_write_object_map_update(int r) {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << "r=" << r << dendl;
+ if (r < 0) {
+ lderr(image_ctx->cct) << "failed to update object map: "
+ << cpp_strerror(r) << dendl;
+ this->finish(r);
+ return;
+ }
+
+ write_object();
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::write_object() {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << dendl;
+
+ librados::ObjectWriteOperation write;
+ if (m_copyup_enabled) {
+ ldout(image_ctx->cct, 20) << "guarding write" << dendl;
+ if (m_guarding_migration_write) {
+ cls_client::assert_snapc_seq(
+ &write, m_snap_seq, cls::rbd::ASSERT_SNAPC_SEQ_LE_SNAPSET_SEQ);
+ } else {
+ write.assert_exists();
+ }
+ }
+
+ add_write_hint(&write);
+ add_write_ops(&write);
+ ceph_assert(write.size() != 0);
+
+ librados::AioCompletion *rados_completion = util::create_rados_callback<
+ AbstractObjectWriteRequest<I>,
+ &AbstractObjectWriteRequest<I>::handle_write_object>(this);
+ int r = image_ctx->data_ctx.aio_operate(
+ this->m_oid, rados_completion, &write, m_snap_seq, m_snaps,
+ (this->m_trace.valid() ? this->m_trace.get_info() : nullptr));
+ ceph_assert(r == 0);
+ rados_completion->release();
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::handle_write_object(int r) {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << "r=" << r << dendl;
+
+ r = filter_write_result(r);
+ if (r == -ENOENT) {
+ if (m_copyup_enabled) {
+ copyup();
+ return;
+ }
+ } else if (r == -ERANGE && m_guarding_migration_write) {
+ image_ctx->snap_lock.get_read();
+ m_guarding_migration_write = !image_ctx->migration_info.empty();
+ image_ctx->snap_lock.put_read();
+
+ if (m_guarding_migration_write) {
+ copyup();
+ } else {
+ ldout(image_ctx->cct, 10) << "migration parent gone, restart io" << dendl;
+ compute_parent_info();
+ write_object();
+ }
+ return;
+ } else if (r == -EILSEQ) {
+ ldout(image_ctx->cct, 10) << "failed to write object" << dendl;
+ this->finish(r);
+ return;
+ } else if (r < 0) {
+ lderr(image_ctx->cct) << "failed to write object: " << cpp_strerror(r)
+ << dendl;
+ this->finish(r);
+ return;
+ }
+
+ post_write_object_map_update();
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::copyup() {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << dendl;
+
+ ceph_assert(!m_copyup_in_progress);
+ m_copyup_in_progress = true;
+
+ image_ctx->copyup_list_lock.Lock();
+ auto it = image_ctx->copyup_list.find(this->m_object_no);
+ if (it == image_ctx->copyup_list.end()) {
+ auto new_req = CopyupRequest<I>::create(
+ image_ctx, this->m_oid, this->m_object_no,
+ std::move(this->m_parent_extents), this->m_trace);
+ this->m_parent_extents.clear();
+
+ // make sure to wait on this CopyupRequest
+ new_req->append_request(this);
+ image_ctx->copyup_list[this->m_object_no] = new_req;
+
+ image_ctx->copyup_list_lock.Unlock();
+ new_req->send();
+ } else {
+ it->second->append_request(this);
+ image_ctx->copyup_list_lock.Unlock();
+ }
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::handle_copyup(int r) {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << "r=" << r << dendl;
+
+ ceph_assert(m_copyup_in_progress);
+ m_copyup_in_progress = false;
+
+ if (r < 0 && r != -ERESTART) {
+ lderr(image_ctx->cct) << "failed to copyup object: " << cpp_strerror(r)
+ << dendl;
+ this->finish(r);
+ return;
+ }
+
+ if (r == -ERESTART || is_post_copyup_write_required()) {
+ write_object();
+ return;
+ }
+
+ post_write_object_map_update();
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::post_write_object_map_update() {
+ I *image_ctx = this->m_ictx;
+
+ image_ctx->snap_lock.get_read();
+ if (image_ctx->object_map == nullptr || !is_object_map_update_enabled() ||
+ !is_non_existent_post_write_object_map_state()) {
+ image_ctx->snap_lock.put_read();
+ this->finish(0);
+ return;
+ }
+
+ ldout(image_ctx->cct, 20) << dendl;
+
+ // should have been flushed prior to releasing lock
+ ceph_assert(image_ctx->exclusive_lock->is_lock_owner());
+ image_ctx->object_map_lock.get_write();
+ if (image_ctx->object_map->template aio_update<
+ AbstractObjectWriteRequest<I>,
+ &AbstractObjectWriteRequest<I>::handle_post_write_object_map_update>(
+ CEPH_NOSNAP, this->m_object_no, OBJECT_NONEXISTENT, OBJECT_PENDING,
+ this->m_trace, false, this)) {
+ image_ctx->object_map_lock.put_write();
+ image_ctx->snap_lock.put_read();
+ return;
+ }
+
+ image_ctx->object_map_lock.put_write();
+ image_ctx->snap_lock.put_read();
+ this->finish(0);
+}
+
+template <typename I>
+void AbstractObjectWriteRequest<I>::handle_post_write_object_map_update(int r) {
+ I *image_ctx = this->m_ictx;
+ ldout(image_ctx->cct, 20) << "r=" << r << dendl;
+ if (r < 0) {
+ lderr(image_ctx->cct) << "failed to update object map: "
+ << cpp_strerror(r) << dendl;
+ this->finish(r);
+ return;
+ }
+
+ this->finish(0);
+}
+
+template <typename I>
+void ObjectWriteRequest<I>::add_write_ops(librados::ObjectWriteOperation *wr) {
+ if (this->m_full_object) {
+ wr->write_full(m_write_data);
+ } else {
+ wr->write(this->m_object_off, m_write_data);
+ }
+ wr->set_op_flags2(m_op_flags);
+}
+
+template <typename I>
+void ObjectWriteSameRequest<I>::add_write_ops(
+ librados::ObjectWriteOperation *wr) {
+ wr->writesame(this->m_object_off, this->m_object_len, m_write_data);
+ wr->set_op_flags2(m_op_flags);
+}
+
+template <typename I>
+void ObjectCompareAndWriteRequest<I>::add_write_ops(
+ librados::ObjectWriteOperation *wr) {
+ wr->cmpext(this->m_object_off, m_cmp_bl, nullptr);
+
+ if (this->m_full_object) {
+ wr->write_full(m_write_bl);
+ } else {
+ wr->write(this->m_object_off, m_write_bl);
+ }
+ wr->set_op_flags2(m_op_flags);
+}
+
+template <typename I>
+int ObjectCompareAndWriteRequest<I>::filter_write_result(int r) const {
+ if (r <= -MAX_ERRNO) {
+ I *image_ctx = this->m_ictx;
+ Extents image_extents;
+
+ // object extent compare mismatch
+ uint64_t offset = -MAX_ERRNO - r;
+ Striper::extent_to_file(image_ctx->cct, &image_ctx->layout,
+ this->m_object_no, offset, this->m_object_len,
+ image_extents);
+ ceph_assert(image_extents.size() == 1);
+
+ if (m_mismatch_offset) {
+ *m_mismatch_offset = image_extents[0].first;
+ }
+ r = -EILSEQ;
+ }
+ return r;
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::ObjectRequest<librbd::ImageCtx>;
+template class librbd::io::ObjectReadRequest<librbd::ImageCtx>;
+template class librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>;
+template class librbd::io::ObjectWriteRequest<librbd::ImageCtx>;
+template class librbd::io::ObjectDiscardRequest<librbd::ImageCtx>;
+template class librbd::io::ObjectWriteSameRequest<librbd::ImageCtx>;
+template class librbd::io::ObjectCompareAndWriteRequest<librbd::ImageCtx>;
diff --git a/src/librbd/io/ObjectRequest.h b/src/librbd/io/ObjectRequest.h
new file mode 100644
index 00000000..9452ec43
--- /dev/null
+++ b/src/librbd/io/ObjectRequest.h
@@ -0,0 +1,478 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_OBJECT_REQUEST_H
+#define CEPH_LIBRBD_IO_OBJECT_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "common/snap_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/ObjectMap.h"
+#include "librbd/io/Types.h"
+#include <map>
+
+class Context;
+class ObjectExtent;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+struct AioCompletion;
+template <typename> class CopyupRequest;
+
+/**
+ * This class represents an I/O operation to a single RBD data object.
+ * Its subclasses encapsulate logic for dealing with special cases
+ * for I/O due to layering.
+ */
+template <typename ImageCtxT = ImageCtx>
+class ObjectRequest {
+public:
+ static ObjectRequest* create_write(
+ ImageCtxT *ictx, const std::string &oid, uint64_t object_no,
+ uint64_t object_off, ceph::bufferlist&& data, const ::SnapContext &snapc,
+ int op_flags, const ZTracer::Trace &parent_trace, Context *completion);
+ static ObjectRequest* create_discard(
+ ImageCtxT *ictx, const std::string &oid, uint64_t object_no,
+ uint64_t object_off, uint64_t object_len, const ::SnapContext &snapc,
+ int discard_flags, const ZTracer::Trace &parent_trace,
+ Context *completion);
+ static ObjectRequest* create_write_same(
+ ImageCtxT *ictx, const std::string &oid, uint64_t object_no,
+ uint64_t object_off, uint64_t object_len, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, Context *completion);
+ static ObjectRequest* create_compare_and_write(
+ ImageCtxT *ictx, const std::string &oid, uint64_t object_no,
+ uint64_t object_off, ceph::bufferlist&& cmp_data,
+ ceph::bufferlist&& write_data, const ::SnapContext &snapc,
+ uint64_t *mismatch_offset, int op_flags,
+ const ZTracer::Trace &parent_trace, Context *completion);
+
+ ObjectRequest(ImageCtxT *ictx, const std::string &oid,
+ uint64_t objectno, uint64_t off, uint64_t len,
+ librados::snap_t snap_id, const char *trace_name,
+ const ZTracer::Trace &parent_trace, Context *completion);
+ virtual ~ObjectRequest() {
+ m_trace.event("finish");
+ }
+
+ static void add_write_hint(ImageCtxT& image_ctx,
+ librados::ObjectWriteOperation *wr);
+
+ virtual void send() = 0;
+
+ bool has_parent() const {
+ return m_has_parent;
+ }
+
+ virtual const char *get_op_type() const = 0;
+
+protected:
+ bool compute_parent_extents(Extents *parent_extents, bool read_request);
+
+ ImageCtxT *m_ictx;
+ std::string m_oid;
+ uint64_t m_object_no, m_object_off, m_object_len;
+ librados::snap_t m_snap_id;
+ Context *m_completion;
+ ZTracer::Trace m_trace;
+
+ void async_finish(int r);
+ void finish(int r);
+
+private:
+ bool m_has_parent = false;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ObjectReadRequest : public ObjectRequest<ImageCtxT> {
+public:
+ typedef std::map<uint64_t, uint64_t> ExtentMap;
+
+ static ObjectReadRequest* create(ImageCtxT *ictx, const std::string &oid,
+ uint64_t objectno, uint64_t offset,
+ uint64_t len, librados::snap_t snap_id,
+ int op_flags,
+ const ZTracer::Trace &parent_trace,
+ ceph::bufferlist* read_data,
+ ExtentMap* extent_map, Context *completion) {
+ return new ObjectReadRequest(ictx, oid, objectno, offset, len,
+ snap_id, op_flags, parent_trace, read_data,
+ extent_map, completion);
+ }
+
+ ObjectReadRequest(ImageCtxT *ictx, const std::string &oid,
+ uint64_t objectno, uint64_t offset, uint64_t len,
+ librados::snap_t snap_id, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ ceph::bufferlist* read_data, ExtentMap* extent_map,
+ Context *completion);
+
+ void send() override;
+
+ const char *get_op_type() const override {
+ return "read";
+ }
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * |
+ * v
+ * READ_OBJECT
+ * |
+ * v (skip if not needed)
+ * READ_PARENT
+ * |
+ * v (skip if not needed)
+ * COPYUP
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ int m_op_flags;
+
+ ceph::bufferlist* m_read_data;
+ ExtentMap* m_extent_map;
+
+ void read_object();
+ void handle_read_object(int r);
+
+ void read_parent();
+ void handle_read_parent(int r);
+
+ void copyup();
+};
+
+template <typename ImageCtxT = ImageCtx>
+class AbstractObjectWriteRequest : public ObjectRequest<ImageCtxT> {
+public:
+ AbstractObjectWriteRequest(ImageCtxT *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ uint64_t len, const ::SnapContext &snapc,
+ const char *trace_name,
+ const ZTracer::Trace &parent_trace,
+ Context *completion);
+
+ virtual bool is_empty_write_op() const {
+ return false;
+ }
+
+ virtual uint8_t get_pre_write_object_map_state() const {
+ return OBJECT_EXISTS;
+ }
+
+ virtual void add_copyup_ops(librados::ObjectWriteOperation *wr) {
+ add_write_ops(wr);
+ }
+
+ void handle_copyup(int r);
+
+ void send() override;
+
+protected:
+ bool m_full_object = false;
+ bool m_copyup_enabled = true;
+
+ virtual bool is_no_op_for_nonexistent_object() const {
+ return false;
+ }
+ virtual bool is_object_map_update_enabled() const {
+ return true;
+ }
+ virtual bool is_post_copyup_write_required() const {
+ return false;
+ }
+ virtual bool is_non_existent_post_write_object_map_state() const {
+ return false;
+ }
+
+ virtual void add_write_hint(librados::ObjectWriteOperation *wr);
+ virtual void add_write_ops(librados::ObjectWriteOperation *wr) = 0;
+
+ virtual int filter_write_result(int r) const {
+ return r;
+ }
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v (no-op write request)
+ * DETECT_NO_OP . . . . . . . . . . . . . . . . . . .
+ * | .
+ * v (skip if not required/disabled) .
+ * PRE_UPDATE_OBJECT_MAP .
+ * | . .
+ * | . (child dne) .
+ * | . . . . . . . . . .
+ * | . .
+ * | (post-copyup write) . .
+ * | . . . . . . . . . . . . . .
+ * | . . . .
+ * v v . v .
+ * WRITE . . . . . . . . > COPYUP (if required) .
+ * | | .
+ * |/----------------------/ .
+ * | .
+ * v (skip if not required/disabled) .
+ * POST_UPDATE_OBJECT_MAP .
+ * | .
+ * v .
+ * <finish> < . . . . . . . . . . . . . . . . . . . .
+ *
+ * @endverbatim
+ */
+
+ uint64_t m_snap_seq;
+ std::vector<librados::snap_t> m_snaps;
+
+ Extents m_parent_extents;
+ bool m_object_may_exist = false;
+ bool m_copyup_in_progress = false;
+ bool m_guarding_migration_write = false;
+
+ void compute_parent_info();
+
+ void pre_write_object_map_update();
+ void handle_pre_write_object_map_update(int r);
+
+ void write_object();
+ void handle_write_object(int r);
+
+ void copyup();
+
+ void post_write_object_map_update();
+ void handle_post_write_object_map_update(int r);
+
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ObjectWriteRequest : public AbstractObjectWriteRequest<ImageCtxT> {
+public:
+ ObjectWriteRequest(ImageCtxT *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc,
+ int op_flags, const ZTracer::Trace &parent_trace,
+ Context *completion)
+ : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off,
+ data.length(), snapc, "write",
+ parent_trace, completion),
+ m_write_data(std::move(data)), m_op_flags(op_flags) {
+ }
+
+ bool is_empty_write_op() const override {
+ return (m_write_data.length() == 0);
+ }
+
+ const char *get_op_type() const override {
+ return "write";
+ }
+
+protected:
+ void add_write_ops(librados::ObjectWriteOperation *wr) override;
+
+private:
+ ceph::bufferlist m_write_data;
+ int m_op_flags;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ObjectDiscardRequest : public AbstractObjectWriteRequest<ImageCtxT> {
+public:
+ ObjectDiscardRequest(ImageCtxT *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc,
+ int discard_flags, const ZTracer::Trace &parent_trace,
+ Context *completion)
+ : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off,
+ object_len, snapc, "discard",
+ parent_trace, completion),
+ m_discard_flags(discard_flags) {
+ if (this->m_full_object) {
+ if ((m_discard_flags & OBJECT_DISCARD_FLAG_DISABLE_CLONE_REMOVE) != 0 &&
+ this->has_parent()) {
+ if (!this->m_copyup_enabled) {
+ // need to hide the parent object instead of child object
+ m_discard_action = DISCARD_ACTION_REMOVE_TRUNCATE;
+ } else {
+ m_discard_action = DISCARD_ACTION_TRUNCATE;
+ }
+ this->m_object_len = 0;
+ } else {
+ m_discard_action = DISCARD_ACTION_REMOVE;
+ }
+ } else if (object_off + object_len == ictx->layout.object_size) {
+ m_discard_action = DISCARD_ACTION_TRUNCATE;
+ } else {
+ m_discard_action = DISCARD_ACTION_ZERO;
+ }
+ }
+
+ const char* get_op_type() const override {
+ switch (m_discard_action) {
+ case DISCARD_ACTION_REMOVE:
+ return "remove";
+ case DISCARD_ACTION_REMOVE_TRUNCATE:
+ return "remove (create+truncate)";
+ case DISCARD_ACTION_TRUNCATE:
+ return "truncate";
+ case DISCARD_ACTION_ZERO:
+ return "zero";
+ }
+ ceph_abort();
+ return nullptr;
+ }
+
+ uint8_t get_pre_write_object_map_state() const override {
+ if (m_discard_action == DISCARD_ACTION_REMOVE) {
+ return OBJECT_PENDING;
+ }
+ return OBJECT_EXISTS;
+ }
+
+protected:
+ bool is_no_op_for_nonexistent_object() const override {
+ return (!this->has_parent());
+ }
+ bool is_object_map_update_enabled() const override {
+ return (
+ (m_discard_flags & OBJECT_DISCARD_FLAG_DISABLE_OBJECT_MAP_UPDATE) == 0);
+ }
+ bool is_non_existent_post_write_object_map_state() const override {
+ return (m_discard_action == DISCARD_ACTION_REMOVE);
+ }
+
+ void add_write_hint(librados::ObjectWriteOperation *wr) override {
+ // no hint for discard
+ }
+
+ void add_write_ops(librados::ObjectWriteOperation *wr) override {
+ switch (m_discard_action) {
+ case DISCARD_ACTION_REMOVE:
+ wr->remove();
+ break;
+ case DISCARD_ACTION_REMOVE_TRUNCATE:
+ wr->create(false);
+ // fall through
+ case DISCARD_ACTION_TRUNCATE:
+ wr->truncate(this->m_object_off);
+ break;
+ case DISCARD_ACTION_ZERO:
+ wr->zero(this->m_object_off, this->m_object_len);
+ break;
+ default:
+ ceph_abort();
+ break;
+ }
+ }
+
+private:
+ enum DiscardAction {
+ DISCARD_ACTION_REMOVE,
+ DISCARD_ACTION_REMOVE_TRUNCATE,
+ DISCARD_ACTION_TRUNCATE,
+ DISCARD_ACTION_ZERO
+ };
+
+ DiscardAction m_discard_action;
+ int m_discard_flags;
+
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ObjectWriteSameRequest : public AbstractObjectWriteRequest<ImageCtxT> {
+public:
+ ObjectWriteSameRequest(ImageCtxT *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *completion)
+ : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off,
+ object_len, snapc, "writesame",
+ parent_trace, completion),
+ m_write_data(std::move(data)), m_op_flags(op_flags) {
+ }
+
+ const char *get_op_type() const override {
+ return "writesame";
+ }
+
+protected:
+ void add_write_ops(librados::ObjectWriteOperation *wr) override;
+
+private:
+ ceph::bufferlist m_write_data;
+ int m_op_flags;
+};
+
+template <typename ImageCtxT = ImageCtx>
+class ObjectCompareAndWriteRequest : public AbstractObjectWriteRequest<ImageCtxT> {
+public:
+ ObjectCompareAndWriteRequest(ImageCtxT *ictx, const std::string &oid,
+ uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_bl,
+ ceph::bufferlist&& write_bl,
+ const ::SnapContext &snapc,
+ uint64_t *mismatch_offset, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *completion)
+ : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off,
+ cmp_bl.length(), snapc,
+ "compare_and_write", parent_trace,
+ completion),
+ m_cmp_bl(std::move(cmp_bl)), m_write_bl(std::move(write_bl)),
+ m_mismatch_offset(mismatch_offset), m_op_flags(op_flags) {
+ }
+
+ const char *get_op_type() const override {
+ return "compare_and_write";
+ }
+
+ void add_copyup_ops(librados::ObjectWriteOperation *wr) override {
+ // no-op on copyup
+ }
+
+protected:
+ virtual bool is_post_copyup_write_required() const {
+ return true;
+ }
+
+ void add_write_ops(librados::ObjectWriteOperation *wr) override;
+
+ int filter_write_result(int r) const override;
+
+private:
+ ceph::bufferlist m_cmp_bl;
+ ceph::bufferlist m_write_bl;
+ uint64_t *m_mismatch_offset;
+ int m_op_flags;
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::ObjectRequest<librbd::ImageCtx>;
+extern template class librbd::io::ObjectReadRequest<librbd::ImageCtx>;
+extern template class librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>;
+extern template class librbd::io::ObjectWriteRequest<librbd::ImageCtx>;
+extern template class librbd::io::ObjectDiscardRequest<librbd::ImageCtx>;
+extern template class librbd::io::ObjectWriteSameRequest<librbd::ImageCtx>;
+extern template class librbd::io::ObjectCompareAndWriteRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_OBJECT_REQUEST_H
diff --git a/src/librbd/io/ReadResult.cc b/src/librbd/io/ReadResult.cc
new file mode 100644
index 00000000..c24d8b4a
--- /dev/null
+++ b/src/librbd/io/ReadResult.cc
@@ -0,0 +1,170 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/ReadResult.h"
+#include "include/buffer.h"
+#include "common/dout.h"
+#include "librbd/io/AioCompletion.h"
+#include <boost/variant/apply_visitor.hpp>
+#include <boost/variant/static_visitor.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::ReadResult: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+struct ReadResult::SetClipLengthVisitor : public boost::static_visitor<void> {
+ size_t length;
+
+ explicit SetClipLengthVisitor(size_t length) : length(length) {
+ }
+
+ void operator()(Linear &linear) const {
+ ceph_assert(length <= linear.buf_len);
+ linear.buf_len = length;
+ }
+
+ template <typename T>
+ void operator()(T &t) const {
+ }
+};
+
+struct ReadResult::AssembleResultVisitor : public boost::static_visitor<void> {
+ CephContext *cct;
+ Striper::StripedReadResult &destriper;
+
+ AssembleResultVisitor(CephContext *cct, Striper::StripedReadResult &destriper)
+ : cct(cct), destriper(destriper) {
+ }
+
+ void operator()(Empty &empty) const {
+ ldout(cct, 20) << "dropping read result" << dendl;
+ }
+
+ void operator()(Linear &linear) const {
+ ldout(cct, 20) << "copying resulting bytes to "
+ << reinterpret_cast<void*>(linear.buf) << dendl;
+ destriper.assemble_result(cct, linear.buf, linear.buf_len);
+ }
+
+ void operator()(Vector &vector) const {
+ bufferlist bl;
+ destriper.assemble_result(cct, bl, true);
+
+ ldout(cct, 20) << "copying resulting " << bl.length() << " bytes to iovec "
+ << reinterpret_cast<const void*>(vector.iov) << dendl;
+
+ bufferlist::iterator it = bl.begin();
+ size_t length = bl.length();
+ size_t offset = 0;
+ int idx = 0;
+ for (; offset < length && idx < vector.iov_count; idx++) {
+ size_t len = std::min(vector.iov[idx].iov_len, length - offset);
+ it.copy(len, static_cast<char *>(vector.iov[idx].iov_base));
+ offset += len;
+ }
+ ceph_assert(offset == bl.length());
+ }
+
+ void operator()(Bufferlist &bufferlist) const {
+ bufferlist.bl->clear();
+ destriper.assemble_result(cct, *bufferlist.bl, true);
+
+ ldout(cct, 20) << "moved resulting " << bufferlist.bl->length() << " "
+ << "bytes to bl " << reinterpret_cast<void*>(bufferlist.bl)
+ << dendl;
+ }
+};
+
+ReadResult::C_ImageReadRequest::C_ImageReadRequest(
+ AioCompletion *aio_completion, const Extents image_extents)
+ : aio_completion(aio_completion), image_extents(image_extents) {
+ aio_completion->add_request();
+}
+
+void ReadResult::C_ImageReadRequest::finish(int r) {
+ CephContext *cct = aio_completion->ictx->cct;
+ ldout(cct, 10) << "C_ImageReadRequest: r=" << r
+ << dendl;
+ if (r >= 0) {
+ size_t length = 0;
+ for (auto &image_extent : image_extents) {
+ length += image_extent.second;
+ }
+ ceph_assert(length == bl.length());
+
+ aio_completion->lock.Lock();
+ aio_completion->read_result.m_destriper.add_partial_result(
+ cct, bl, image_extents);
+ aio_completion->lock.Unlock();
+ r = length;
+ }
+
+ aio_completion->complete_request(r);
+}
+
+ReadResult::C_ObjectReadRequest::C_ObjectReadRequest(
+ AioCompletion *aio_completion, uint64_t object_off, uint64_t object_len,
+ Extents&& buffer_extents)
+ : aio_completion(aio_completion), object_off(object_off),
+ object_len(object_len), buffer_extents(std::move(buffer_extents)) {
+ aio_completion->add_request();
+}
+
+void ReadResult::C_ObjectReadRequest::finish(int r) {
+ CephContext *cct = aio_completion->ictx->cct;
+ ldout(cct, 10) << "C_ObjectReadRequest: r=" << r
+ << dendl;
+
+ if (r == -ENOENT) {
+ r = 0;
+ }
+ if (r >= 0) {
+ ldout(cct, 10) << " got " << extent_map
+ << " for " << buffer_extents
+ << " bl " << bl.length() << dendl;
+ // handle the case where a sparse-read wasn't issued
+ if (extent_map.empty()) {
+ extent_map[object_off] = bl.length();
+ }
+
+ aio_completion->lock.Lock();
+ aio_completion->read_result.m_destriper.add_partial_sparse_result(
+ cct, bl, extent_map, object_off, buffer_extents);
+ aio_completion->lock.Unlock();
+
+ r = object_len;
+ }
+
+ aio_completion->complete_request(r);
+}
+
+ReadResult::ReadResult() : m_buffer(Empty()) {
+}
+
+ReadResult::ReadResult(char *buf, size_t buf_len)
+ : m_buffer(Linear(buf, buf_len)) {
+}
+
+ReadResult::ReadResult(const struct iovec *iov, int iov_count)
+ : m_buffer(Vector(iov, iov_count)) {
+}
+
+ReadResult::ReadResult(ceph::bufferlist *bl)
+ : m_buffer(Bufferlist(bl)) {
+}
+
+void ReadResult::set_clip_length(size_t length) {
+ boost::apply_visitor(SetClipLengthVisitor(length), m_buffer);
+}
+
+void ReadResult::assemble_result(CephContext *cct) {
+ boost::apply_visitor(AssembleResultVisitor(cct, m_destriper), m_buffer);
+}
+
+} // namespace io
+} // namespace librbd
+
diff --git a/src/librbd/io/ReadResult.h b/src/librbd/io/ReadResult.h
new file mode 100644
index 00000000..6fb5e4a4
--- /dev/null
+++ b/src/librbd/io/ReadResult.h
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_READ_RESULT_H
+#define CEPH_LIBRBD_IO_READ_RESULT_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/Context.h"
+#include "librbd/io/Types.h"
+#include "osdc/Striper.h"
+#include <sys/uio.h>
+#include <boost/variant/variant.hpp>
+
+struct CephContext;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+struct AioCompletion;
+template <typename> struct ObjectReadRequest;
+
+class ReadResult {
+public:
+ struct C_ImageReadRequest : public Context {
+ AioCompletion *aio_completion;
+ Extents image_extents;
+ bufferlist bl;
+
+ C_ImageReadRequest(AioCompletion *aio_completion,
+ const Extents image_extents);
+
+ void finish(int r) override;
+ };
+
+ struct C_ObjectReadRequest : public Context {
+ AioCompletion *aio_completion;
+ uint64_t object_off;
+ uint64_t object_len;
+ Extents buffer_extents;
+
+ bufferlist bl;
+ ExtentMap extent_map;
+
+ C_ObjectReadRequest(AioCompletion *aio_completion, uint64_t object_off,
+ uint64_t object_len, Extents&& buffer_extents);
+
+ void finish(int r) override;
+ };
+
+ ReadResult();
+ ReadResult(char *buf, size_t buf_len);
+ ReadResult(const struct iovec *iov, int iov_count);
+ ReadResult(ceph::bufferlist *bl);
+
+ void set_clip_length(size_t length);
+ void assemble_result(CephContext *cct);
+
+private:
+ struct Empty {
+ };
+
+ struct Linear {
+ char *buf;
+ size_t buf_len;
+
+ Linear(char *buf, size_t buf_len) : buf(buf), buf_len(buf_len) {
+ }
+ };
+
+ struct Vector {
+ const struct iovec *iov;
+ int iov_count;
+
+ Vector(const struct iovec *iov, int iov_count)
+ : iov(iov), iov_count(iov_count) {
+ }
+ };
+
+ struct Bufferlist {
+ ceph::bufferlist *bl;
+
+ Bufferlist(ceph::bufferlist *bl) : bl(bl) {
+ }
+ };
+
+ typedef boost::variant<Empty,
+ Linear,
+ Vector,
+ Bufferlist> Buffer;
+ struct SetClipLengthVisitor;
+ struct AssembleResultVisitor;
+
+ Buffer m_buffer;
+ Striper::StripedReadResult m_destriper;
+
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_READ_RESULT_H
+
diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h
new file mode 100644
index 00000000..6bd42eac
--- /dev/null
+++ b/src/librbd/io/Types.h
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_TYPES_H
+#define CEPH_LIBRBD_IO_TYPES_H
+
+#include "include/int_types.h"
+#include <map>
+#include <vector>
+
+namespace librbd {
+namespace io {
+
+#define RBD_QOS_IOPS_THROTTLE 1 << 0
+#define RBD_QOS_BPS_THROTTLE 1 << 1
+#define RBD_QOS_READ_IOPS_THROTTLE 1 << 2
+#define RBD_QOS_WRITE_IOPS_THROTTLE 1 << 3
+#define RBD_QOS_READ_BPS_THROTTLE 1 << 4
+#define RBD_QOS_WRITE_BPS_THROTTLE 1 << 5
+
+#define RBD_QOS_BPS_MASK (RBD_QOS_BPS_THROTTLE | RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_WRITE_BPS_THROTTLE)
+#define RBD_QOS_IOPS_MASK (RBD_QOS_IOPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE)
+#define RBD_QOS_READ_MASK (RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE)
+#define RBD_QOS_WRITE_MASK (RBD_QOS_WRITE_BPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE)
+
+#define RBD_QOS_MASK (RBD_QOS_BPS_MASK | RBD_QOS_IOPS_MASK)
+
+typedef enum {
+ AIO_TYPE_NONE = 0,
+ AIO_TYPE_GENERIC,
+ AIO_TYPE_OPEN,
+ AIO_TYPE_CLOSE,
+ AIO_TYPE_READ,
+ AIO_TYPE_WRITE,
+ AIO_TYPE_DISCARD,
+ AIO_TYPE_FLUSH,
+ AIO_TYPE_WRITESAME,
+ AIO_TYPE_COMPARE_AND_WRITE,
+} aio_type_t;
+
+enum FlushSource {
+ FLUSH_SOURCE_USER,
+ FLUSH_SOURCE_INTERNAL,
+ FLUSH_SOURCE_SHUTDOWN
+};
+
+enum Direction {
+ DIRECTION_READ,
+ DIRECTION_WRITE,
+ DIRECTION_BOTH
+};
+
+enum DispatchResult {
+ DISPATCH_RESULT_INVALID,
+ DISPATCH_RESULT_CONTINUE,
+ DISPATCH_RESULT_COMPLETE
+};
+
+enum ObjectDispatchLayer {
+ OBJECT_DISPATCH_LAYER_NONE = 0,
+ OBJECT_DISPATCH_LAYER_CACHE,
+ OBJECT_DISPATCH_LAYER_JOURNAL,
+ OBJECT_DISPATCH_LAYER_CORE,
+ OBJECT_DISPATCH_LAYER_LAST
+};
+
+enum {
+ OBJECT_DISCARD_FLAG_DISABLE_CLONE_REMOVE = 1UL << 0,
+ OBJECT_DISCARD_FLAG_DISABLE_OBJECT_MAP_UPDATE = 1UL << 1
+};
+
+enum {
+ OBJECT_DISPATCH_FLAG_FLUSH = 1UL << 0,
+ OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR = 1UL << 1
+};
+
+typedef std::vector<std::pair<uint64_t, uint64_t> > Extents;
+typedef std::map<uint64_t, uint64_t> ExtentMap;
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_TYPES_H
diff --git a/src/librbd/io/Utils.cc b/src/librbd/io/Utils.cc
new file mode 100644
index 00000000..1b50561a
--- /dev/null
+++ b/src/librbd/io/Utils.cc
@@ -0,0 +1,57 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/Utils.h"
+#include "include/buffer.h"
+#include "osd/osd_types.h"
+
+namespace librbd {
+namespace io {
+namespace util {
+
+bool assemble_write_same_extent(
+ const ObjectExtent &object_extent, const ceph::bufferlist& data,
+ ceph::bufferlist *ws_data, bool force_write) {
+ size_t data_len = data.length();
+
+ if (!force_write) {
+ bool may_writesame = true;
+ for (auto& q : object_extent.buffer_extents) {
+ if (!(q.first % data_len == 0 && q.second % data_len == 0)) {
+ may_writesame = false;
+ break;
+ }
+ }
+
+ if (may_writesame) {
+ ws_data->append(data);
+ return true;
+ }
+ }
+
+ for (auto& q : object_extent.buffer_extents) {
+ bufferlist sub_bl;
+ uint64_t sub_off = q.first % data_len;
+ uint64_t sub_len = data_len - sub_off;
+ uint64_t extent_left = q.second;
+ while (extent_left >= sub_len) {
+ sub_bl.substr_of(data, sub_off, sub_len);
+ ws_data->claim_append(sub_bl);
+ extent_left -= sub_len;
+ if (sub_off) {
+ sub_off = 0;
+ sub_len = data_len;
+ }
+ }
+ if (extent_left) {
+ sub_bl.substr_of(data, sub_off, extent_left);
+ ws_data->claim_append(sub_bl);
+ }
+ }
+ return false;
+}
+
+} // namespace util
+} // namespace io
+} // namespace librbd
+
diff --git a/src/librbd/io/Utils.h b/src/librbd/io/Utils.h
new file mode 100644
index 00000000..c1f373d4
--- /dev/null
+++ b/src/librbd/io/Utils.h
@@ -0,0 +1,26 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_UTILS_H
+#define CEPH_LIBRBD_IO_UTILS_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include <map>
+
+class ObjectExtent;
+
+namespace librbd {
+namespace io {
+namespace util {
+
+bool assemble_write_same_extent(const ObjectExtent &object_extent,
+ const ceph::bufferlist& data,
+ ceph::bufferlist *ws_data,
+ bool force_write);
+
+} // namespace util
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_UTILS_H