diff options
Diffstat (limited to 'src/librbd/io')
26 files changed, 6776 insertions, 0 deletions
diff --git a/src/librbd/io/AioCompletion.cc b/src/librbd/io/AioCompletion.cc new file mode 100644 index 00000000..73c58167 --- /dev/null +++ b/src/librbd/io/AioCompletion.cc @@ -0,0 +1,216 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/AioCompletion.h" +#include <errno.h> + +#include "common/ceph_context.h" +#include "common/dout.h" +#include "common/errno.h" +#include "common/perf_counters.h" +#include "common/WorkQueue.h" + +#include "librbd/ImageCtx.h" +#include "librbd/internal.h" +#include "librbd/Journal.h" +#include "librbd/Types.h" + +#ifdef WITH_LTTNG +#include "tracing/librbd.h" +#else +#define tracepoint(...) +#endif + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::AioCompletion: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +int AioCompletion::wait_for_complete() { + tracepoint(librbd, aio_wait_for_complete_enter, this); + lock.Lock(); + while (state != AIO_STATE_COMPLETE) + cond.Wait(lock); + lock.Unlock(); + tracepoint(librbd, aio_wait_for_complete_exit, 0); + return 0; +} + +void AioCompletion::finalize(ssize_t rval) +{ + ceph_assert(lock.is_locked()); + ceph_assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + ldout(cct, 20) << "r=" << rval << dendl; + if (rval >= 0 && aio_type == AIO_TYPE_READ) { + read_result.assemble_result(cct); + } +} + +void AioCompletion::complete() { + ceph_assert(lock.is_locked()); + ceph_assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + tracepoint(librbd, aio_complete_enter, this, rval); + if (ictx->perfcounter != nullptr) { + ceph::timespan elapsed = coarse_mono_clock::now() - start_time; + switch (aio_type) { + case AIO_TYPE_GENERIC: + case AIO_TYPE_OPEN: + case AIO_TYPE_CLOSE: + break; + case AIO_TYPE_READ: + ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed); break; + case AIO_TYPE_WRITE: + ictx->perfcounter->tinc(l_librbd_wr_latency, elapsed); break; + case AIO_TYPE_DISCARD: + ictx->perfcounter->tinc(l_librbd_discard_latency, elapsed); break; + case AIO_TYPE_FLUSH: + ictx->perfcounter->tinc(l_librbd_flush_latency, elapsed); break; + case AIO_TYPE_WRITESAME: + ictx->perfcounter->tinc(l_librbd_ws_latency, elapsed); break; + case AIO_TYPE_COMPARE_AND_WRITE: + ictx->perfcounter->tinc(l_librbd_cmp_latency, elapsed); break; + default: + lderr(cct) << "completed invalid aio_type: " << aio_type << dendl; + break; + } + } + + if ((aio_type == AIO_TYPE_CLOSE) || + (aio_type == AIO_TYPE_OPEN && rval < 0)) { + // must destroy ImageCtx prior to invoking callback + delete ictx; + ictx = nullptr; + } + + state = AIO_STATE_CALLBACK; + if (complete_cb) { + lock.Unlock(); + complete_cb(rbd_comp, complete_arg); + lock.Lock(); + } + + if (ictx != nullptr && event_notify && ictx->event_socket.is_valid()) { + ictx->completed_reqs_lock.Lock(); + ictx->completed_reqs.push_back(&m_xlist_item); + ictx->completed_reqs_lock.Unlock(); + ictx->event_socket.notify(); + } + + state = AIO_STATE_COMPLETE; + cond.Signal(); + + // note: possible for image to be closed after op marked finished + if (async_op.started()) { + async_op.finish_op(); + } + tracepoint(librbd, aio_complete_exit); +} + +void AioCompletion::init_time(ImageCtx *i, aio_type_t t) { + Mutex::Locker locker(lock); + if (ictx == nullptr) { + ictx = i; + aio_type = t; + start_time = coarse_mono_clock::now(); + } +} + +void AioCompletion::start_op() { + Mutex::Locker locker(lock); + ceph_assert(ictx != nullptr); + + if (aio_type == AIO_TYPE_OPEN || aio_type == AIO_TYPE_CLOSE) { + // no need to track async open/close operations + return; + } + + ceph_assert(!async_op.started()); + async_op.start_op(*ictx); +} + +void AioCompletion::fail(int r) +{ + lock.Lock(); + ceph_assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + lderr(cct) << cpp_strerror(r) << dendl; + ceph_assert(pending_count == 0); + rval = r; + complete(); + put_unlock(); +} + +void AioCompletion::set_request_count(uint32_t count) { + lock.Lock(); + ceph_assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + ldout(cct, 20) << "pending=" << count << dendl; + ceph_assert(pending_count == 0); + + if (count > 0) { + pending_count = count; + lock.Unlock(); + } else { + pending_count = 1; + lock.Unlock(); + + // ensure completion fires in clean lock context + ictx->op_work_queue->queue(new C_AioRequest(this), 0); + } +} + +void AioCompletion::complete_request(ssize_t r) +{ + lock.Lock(); + ceph_assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + if (rval >= 0) { + if (r < 0 && r != -EEXIST) + rval = r; + else if (r > 0) + rval += r; + } + ceph_assert(pending_count); + int count = --pending_count; + + ldout(cct, 20) << "cb=" << complete_cb << ", " + << "pending=" << pending_count << dendl; + if (!count) { + finalize(rval); + complete(); + } + put_unlock(); +} + +bool AioCompletion::is_complete() { + tracepoint(librbd, aio_is_complete_enter, this); + bool done; + { + Mutex::Locker l(lock); + done = this->state == AIO_STATE_COMPLETE; + } + tracepoint(librbd, aio_is_complete_exit, done); + return done; +} + +ssize_t AioCompletion::get_return_value() { + tracepoint(librbd, aio_get_return_value_enter, this); + lock.Lock(); + ssize_t r = rval; + lock.Unlock(); + tracepoint(librbd, aio_get_return_value_exit, r); + return r; +} + +} // namespace io +} // namespace librbd diff --git a/src/librbd/io/AioCompletion.h b/src/librbd/io/AioCompletion.h new file mode 100644 index 00000000..f3551a02 --- /dev/null +++ b/src/librbd/io/AioCompletion.h @@ -0,0 +1,203 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_AIO_COMPLETION_H +#define CEPH_LIBRBD_IO_AIO_COMPLETION_H + +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/ceph_time.h" +#include "include/Context.h" +#include "include/utime.h" +#include "include/rbd/librbd.hpp" + +#include "librbd/ImageCtx.h" +#include "librbd/io/AsyncOperation.h" +#include "librbd/io/ReadResult.h" +#include "librbd/io/Types.h" + +class CephContext; + +namespace librbd { +namespace io { + + +/** + * AioCompletion is the overall completion for a single + * rbd I/O request. It may be composed of many AioObjectRequests, + * which each go to a single object. + * + * The retrying of individual requests is handled at a lower level, + * so all AioCompletion cares about is the count of outstanding + * requests. The number of expected individual requests should be + * set initially using set_request_count() prior to issuing the + * requests. This ensures that the completion will not be completed + * within the caller's thread of execution (instead via a librados + * context or via a thread pool context for cache read hits). + */ +struct AioCompletion { + typedef enum { + AIO_STATE_PENDING = 0, + AIO_STATE_CALLBACK, + AIO_STATE_COMPLETE, + } aio_state_t; + + mutable Mutex lock; + Cond cond; + aio_state_t state; + ssize_t rval; + callback_t complete_cb; + void *complete_arg; + rbd_completion_t rbd_comp; + uint32_t pending_count; ///< number of requests + int ref; + bool released; + ImageCtx *ictx; + coarse_mono_time start_time; + aio_type_t aio_type; + + ReadResult read_result; + + AsyncOperation async_op; + + xlist<AioCompletion*>::item m_xlist_item; + bool event_notify; + + template <typename T, void (T::*MF)(int)> + static void callback_adapter(completion_t cb, void *arg) { + AioCompletion *comp = reinterpret_cast<AioCompletion *>(cb); + T *t = reinterpret_cast<T *>(arg); + (t->*MF)(comp->get_return_value()); + comp->release(); + } + + static AioCompletion *create(void *cb_arg, callback_t cb_complete, + rbd_completion_t rbd_comp) { + AioCompletion *comp = new AioCompletion(); + comp->set_complete_cb(cb_arg, cb_complete); + comp->rbd_comp = (rbd_comp != nullptr ? rbd_comp : comp); + return comp; + } + + template <typename T, void (T::*MF)(int) = &T::complete> + static AioCompletion *create(T *obj) { + AioCompletion *comp = new AioCompletion(); + comp->set_complete_cb(obj, &callback_adapter<T, MF>); + comp->rbd_comp = comp; + return comp; + } + + template <typename T, void (T::*MF)(int) = &T::complete> + static AioCompletion *create_and_start(T *obj, ImageCtx *image_ctx, + aio_type_t type) { + AioCompletion *comp = create<T, MF>(obj); + comp->init_time(image_ctx, type); + comp->start_op(); + return comp; + } + + AioCompletion() : lock("AioCompletion::lock", true, false), + state(AIO_STATE_PENDING), rval(0), complete_cb(NULL), + complete_arg(NULL), rbd_comp(NULL), + pending_count(0), ref(1), released(false), ictx(NULL), + aio_type(AIO_TYPE_NONE), m_xlist_item(this), + event_notify(false) { + } + + ~AioCompletion() { + } + + int wait_for_complete(); + + void finalize(ssize_t rval); + + inline bool is_initialized(aio_type_t type) const { + Mutex::Locker locker(lock); + return ((ictx != nullptr) && (aio_type == type)); + } + inline bool is_started() const { + Mutex::Locker locker(lock); + return async_op.started(); + } + + void init_time(ImageCtx *i, aio_type_t t); + void start_op(); + void fail(int r); + + void complete(); + + void set_complete_cb(void *cb_arg, callback_t cb) { + complete_cb = cb; + complete_arg = cb_arg; + } + + void set_request_count(uint32_t num); + void add_request() { + lock.Lock(); + ceph_assert(pending_count > 0); + lock.Unlock(); + get(); + } + void complete_request(ssize_t r); + + bool is_complete(); + + ssize_t get_return_value(); + + void get() { + lock.Lock(); + ceph_assert(ref > 0); + ref++; + lock.Unlock(); + } + void release() { + lock.Lock(); + ceph_assert(!released); + released = true; + put_unlock(); + } + void put() { + lock.Lock(); + put_unlock(); + } + void put_unlock() { + ceph_assert(ref > 0); + int n = --ref; + lock.Unlock(); + if (!n) { + if (ictx != nullptr && event_notify) { + ictx->completed_reqs_lock.Lock(); + m_xlist_item.remove_myself(); + ictx->completed_reqs_lock.Unlock(); + } + delete this; + } + } + + void set_event_notify(bool s) { + Mutex::Locker l(lock); + event_notify = s; + } + + void *get_arg() { + return complete_arg; + } +}; + +class C_AioRequest : public Context { +public: + C_AioRequest(AioCompletion *completion) : m_completion(completion) { + m_completion->add_request(); + } + ~C_AioRequest() override {} + void finish(int r) override { + m_completion->complete_request(r); + } +protected: + AioCompletion *m_completion; +}; + +} // namespace io +} // namespace librbd + +#endif // CEPH_LIBRBD_IO_AIO_COMPLETION_H diff --git a/src/librbd/io/AsyncOperation.cc b/src/librbd/io/AsyncOperation.cc new file mode 100644 index 00000000..c5a3bc93 --- /dev/null +++ b/src/librbd/io/AsyncOperation.cc @@ -0,0 +1,94 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/AsyncOperation.h" +#include "librbd/ImageCtx.h" +#include "common/dout.h" +#include "common/WorkQueue.h" +#include "include/ceph_assert.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::AsyncOperation: " + +namespace librbd { +namespace io { + +namespace { + +struct C_CompleteFlushes : public Context { + ImageCtx *image_ctx; + std::list<Context *> flush_contexts; + + explicit C_CompleteFlushes(ImageCtx *image_ctx, + std::list<Context *> &&flush_contexts) + : image_ctx(image_ctx), flush_contexts(std::move(flush_contexts)) { + } + void finish(int r) override { + RWLock::RLocker owner_locker(image_ctx->owner_lock); + while (!flush_contexts.empty()) { + Context *flush_ctx = flush_contexts.front(); + flush_contexts.pop_front(); + + ldout(image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl; + flush_ctx->complete(0); + } + } +}; + +} // anonymous namespace + +void AsyncOperation::start_op(ImageCtx &image_ctx) { + ceph_assert(m_image_ctx == NULL); + m_image_ctx = &image_ctx; + + ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl; + Mutex::Locker l(m_image_ctx->async_ops_lock); + m_image_ctx->async_ops.push_front(&m_xlist_item); +} + +void AsyncOperation::finish_op() { + ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl; + + { + Mutex::Locker l(m_image_ctx->async_ops_lock); + xlist<AsyncOperation *>::iterator iter(&m_xlist_item); + ++iter; + ceph_assert(m_xlist_item.remove_myself()); + + // linked list stored newest -> oldest ops + if (!iter.end() && !m_flush_contexts.empty()) { + ldout(m_image_ctx->cct, 20) << "moving flush contexts to previous op: " + << *iter << dendl; + (*iter)->m_flush_contexts.insert((*iter)->m_flush_contexts.end(), + m_flush_contexts.begin(), + m_flush_contexts.end()); + return; + } + } + + if (!m_flush_contexts.empty()) { + C_CompleteFlushes *ctx = new C_CompleteFlushes(m_image_ctx, + std::move(m_flush_contexts)); + m_image_ctx->op_work_queue->queue(ctx); + } +} + +void AsyncOperation::flush(Context* on_finish) { + { + Mutex::Locker locker(m_image_ctx->async_ops_lock); + xlist<AsyncOperation *>::iterator iter(&m_xlist_item); + ++iter; + + // linked list stored newest -> oldest ops + if (!iter.end()) { + (*iter)->m_flush_contexts.push_back(on_finish); + return; + } + } + + m_image_ctx->op_work_queue->queue(on_finish); +} + +} // namespace io +} // namespace librbd diff --git a/src/librbd/io/AsyncOperation.h b/src/librbd/io/AsyncOperation.h new file mode 100644 index 00000000..b0a37c4b --- /dev/null +++ b/src/librbd/io/AsyncOperation.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef LIBRBD_IO_ASYNC_OPERATION_H +#define LIBRBD_IO_ASYNC_OPERATION_H + +#include "include/ceph_assert.h" +#include "include/xlist.h" +#include <list> + +class Context; + +namespace librbd { + +class ImageCtx; + +namespace io { + +class AsyncOperation { +public: + + AsyncOperation() + : m_image_ctx(NULL), m_xlist_item(this) + { + } + + ~AsyncOperation() + { + ceph_assert(!m_xlist_item.is_on_list()); + } + + inline bool started() const { + return m_xlist_item.is_on_list(); + } + + void start_op(ImageCtx &image_ctx); + void finish_op(); + + void flush(Context *on_finish); + +private: + + ImageCtx *m_image_ctx; + xlist<AsyncOperation *>::item m_xlist_item; + std::list<Context *> m_flush_contexts; + +}; + +} // namespace io +} // namespace librbd + +#endif // LIBRBD_IO_ASYNC_OPERATION_H diff --git a/src/librbd/io/CopyupRequest.cc b/src/librbd/io/CopyupRequest.cc new file mode 100644 index 00000000..c2ebb10f --- /dev/null +++ b/src/librbd/io/CopyupRequest.cc @@ -0,0 +1,625 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/CopyupRequest.h" +#include "common/ceph_context.h" +#include "common/dout.h" +#include "common/errno.h" +#include "common/Mutex.h" +#include "common/WorkQueue.h" +#include "librbd/AsyncObjectThrottle.h" +#include "librbd/ExclusiveLock.h" +#include "librbd/ImageCtx.h" +#include "librbd/ObjectMap.h" +#include "librbd/Utils.h" +#include "librbd/deep_copy/ObjectCopyRequest.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ImageRequest.h" +#include "librbd/io/ObjectRequest.h" +#include "librbd/io/ReadResult.h" + +#include <boost/bind.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/lambda/construct.hpp> + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::CopyupRequest: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +namespace { + +template <typename I> +class C_UpdateObjectMap : public C_AsyncObjectThrottle<I> { +public: + C_UpdateObjectMap(AsyncObjectThrottle<I> &throttle, I *image_ctx, + uint64_t object_no, uint8_t head_object_map_state, + const std::vector<uint64_t> *snap_ids, + bool first_snap_is_clean, const ZTracer::Trace &trace, + size_t snap_id_idx) + : C_AsyncObjectThrottle<I>(throttle, *image_ctx), m_object_no(object_no), + m_head_object_map_state(head_object_map_state), m_snap_ids(*snap_ids), + m_first_snap_is_clean(first_snap_is_clean), m_trace(trace), + m_snap_id_idx(snap_id_idx) + { + } + + int send() override { + auto& image_ctx = this->m_image_ctx; + ceph_assert(image_ctx.owner_lock.is_locked()); + if (image_ctx.exclusive_lock == nullptr) { + return 1; + } + ceph_assert(image_ctx.exclusive_lock->is_lock_owner()); + + RWLock::RLocker snap_locker(image_ctx.snap_lock); + if (image_ctx.object_map == nullptr) { + return 1; + } + + uint64_t snap_id = m_snap_ids[m_snap_id_idx]; + if (snap_id == CEPH_NOSNAP) { + return update_head(); + } else { + return update_snapshot(snap_id); + } + } + + int update_head() { + auto& image_ctx = this->m_image_ctx; + RWLock::WLocker object_map_locker(image_ctx.object_map_lock); + bool sent = image_ctx.object_map->template aio_update<Context>( + CEPH_NOSNAP, m_object_no, m_head_object_map_state, {}, m_trace, false, + this); + return (sent ? 0 : 1); + } + + int update_snapshot(uint64_t snap_id) { + auto& image_ctx = this->m_image_ctx; + uint8_t state = OBJECT_EXISTS; + if (image_ctx.test_features(RBD_FEATURE_FAST_DIFF, image_ctx.snap_lock) && + (m_snap_id_idx > 0 || m_first_snap_is_clean)) { + // first snapshot should be exists+dirty since it contains + // the copyup data -- later snapshots inherit the data. + state = OBJECT_EXISTS_CLEAN; + } + + RWLock::RLocker object_map_locker(image_ctx.object_map_lock); + bool sent = image_ctx.object_map->template aio_update<Context>( + snap_id, m_object_no, state, {}, m_trace, true, this); + ceph_assert(sent); + return 0; + } + +private: + uint64_t m_object_no; + uint8_t m_head_object_map_state; + const std::vector<uint64_t> &m_snap_ids; + bool m_first_snap_is_clean; + const ZTracer::Trace &m_trace; + size_t m_snap_id_idx; +}; + +} // anonymous namespace + +template <typename I> +CopyupRequest<I>::CopyupRequest(I *ictx, const std::string &oid, + uint64_t objectno, Extents &&image_extents, + const ZTracer::Trace &parent_trace) + : m_image_ctx(ictx), m_oid(oid), m_object_no(objectno), + m_image_extents(image_extents), + m_trace(util::create_trace(*m_image_ctx, "copy-up", parent_trace)), + m_lock("CopyupRequest", false, false) +{ + ceph_assert(m_image_ctx->data_ctx.is_valid()); + m_async_op.start_op(*util::get_image_ctx(m_image_ctx)); +} + +template <typename I> +CopyupRequest<I>::~CopyupRequest() { + ceph_assert(m_pending_requests.empty()); + m_async_op.finish_op(); +} + +template <typename I> +void CopyupRequest<I>::append_request(AbstractObjectWriteRequest<I> *req) { + Mutex::Locker locker(m_lock); + + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "oid=" << m_oid << ", " + << "object_request=" << req << ", " + << "append=" << m_append_request_permitted << dendl; + if (m_append_request_permitted) { + m_pending_requests.push_back(req); + } else { + m_restart_requests.push_back(req); + } +} + +template <typename I> +void CopyupRequest<I>::send() { + read_from_parent(); +} + +template <typename I> +void CopyupRequest<I>::read_from_parent() { + auto cct = m_image_ctx->cct; + RWLock::RLocker snap_locker(m_image_ctx->snap_lock); + RWLock::RLocker parent_locker(m_image_ctx->parent_lock); + + if (m_image_ctx->parent == nullptr) { + ldout(cct, 5) << "parent detached" << dendl; + + m_image_ctx->op_work_queue->queue( + util::create_context_callback< + CopyupRequest<I>, &CopyupRequest<I>::handle_read_from_parent>(this), + -ENOENT); + return; + } else if (is_deep_copy()) { + deep_copy(); + return; + } + + auto comp = AioCompletion::create_and_start< + CopyupRequest<I>, + &CopyupRequest<I>::handle_read_from_parent>( + this, util::get_image_ctx(m_image_ctx->parent), AIO_TYPE_READ); + + ldout(cct, 20) << "oid=" << m_oid << ", " + << "completion=" << comp << ", " + << "extents=" << m_image_extents + << dendl; + ImageRequest<I>::aio_read(m_image_ctx->parent, comp, + std::move(m_image_extents), + ReadResult{&m_copyup_data}, 0, m_trace); +} + +template <typename I> +void CopyupRequest<I>::handle_read_from_parent(int r) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl; + + m_image_ctx->snap_lock.get_read(); + m_lock.Lock(); + m_copyup_is_zero = m_copyup_data.is_zero(); + m_copyup_required = is_copyup_required(); + disable_append_requests(); + + if (r < 0 && r != -ENOENT) { + m_lock.Unlock(); + m_image_ctx->snap_lock.put_read(); + + lderr(cct) << "error reading from parent: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + if (!m_copyup_required) { + m_lock.Unlock(); + m_image_ctx->snap_lock.put_read(); + + ldout(cct, 20) << "no-op, skipping" << dendl; + finish(0); + return; + } + + // copyup() will affect snapshots only if parent data is not all + // zeros. + if (!m_copyup_is_zero) { + m_snap_ids.insert(m_snap_ids.end(), m_image_ctx->snaps.rbegin(), + m_image_ctx->snaps.rend()); + } + + m_lock.Unlock(); + m_image_ctx->snap_lock.put_read(); + + update_object_maps(); +} + +template <typename I> +void CopyupRequest<I>::deep_copy() { + auto cct = m_image_ctx->cct; + ceph_assert(m_image_ctx->snap_lock.is_locked()); + ceph_assert(m_image_ctx->parent_lock.is_locked()); + ceph_assert(m_image_ctx->parent != nullptr); + + m_lock.Lock(); + m_flatten = is_copyup_required() ? true : m_image_ctx->migration_info.flatten; + m_lock.Unlock(); + + ldout(cct, 20) << "oid=" << m_oid << ", flatten=" << m_flatten << dendl; + + auto ctx = util::create_context_callback< + CopyupRequest<I>, &CopyupRequest<I>::handle_deep_copy>(this); + auto req = deep_copy::ObjectCopyRequest<I>::create( + m_image_ctx->parent, m_image_ctx, 0, 0, + m_image_ctx->migration_info.snap_map, m_object_no, m_flatten, ctx); + + req->send(); +} + +template <typename I> +void CopyupRequest<I>::handle_deep_copy(int r) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl; + + m_image_ctx->snap_lock.get_read(); + m_lock.Lock(); + m_copyup_required = is_copyup_required(); + if (r == -ENOENT && !m_flatten && m_copyup_required) { + m_lock.Unlock(); + m_image_ctx->snap_lock.put_read(); + + ldout(cct, 10) << "restart deep-copy with flatten" << dendl; + send(); + return; + } + + disable_append_requests(); + + if (r < 0 && r != -ENOENT) { + m_lock.Unlock(); + m_image_ctx->snap_lock.put_read(); + + lderr(cct) << "error encountered during deep-copy: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } + + if (!m_copyup_required && !is_update_object_map_required(r)) { + m_lock.Unlock(); + m_image_ctx->snap_lock.put_read(); + + if (r == -ENOENT) { + r = 0; + } + + ldout(cct, 20) << "skipping" << dendl; + finish(r); + return; + } + + // For deep-copy, copyup() will never affect snapshots. However, + // this state machine is responsible for updating object maps for + // snapshots that have been created on destination image after + // migration started. + if (r != -ENOENT) { + compute_deep_copy_snap_ids(); + } + + m_lock.Unlock(); + m_image_ctx->snap_lock.put_read(); + + update_object_maps(); +} + +template <typename I> +void CopyupRequest<I>::update_object_maps() { + RWLock::RLocker owner_locker(m_image_ctx->owner_lock); + RWLock::RLocker snap_locker(m_image_ctx->snap_lock); + if (m_image_ctx->object_map == nullptr) { + snap_locker.unlock(); + owner_locker.unlock(); + + copyup(); + return; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "oid=" << m_oid << dendl; + + bool copy_on_read = m_pending_requests.empty(); + uint8_t head_object_map_state = OBJECT_EXISTS; + if (copy_on_read && !m_snap_ids.empty() && + m_image_ctx->test_features(RBD_FEATURE_FAST_DIFF, + m_image_ctx->snap_lock)) { + // HEAD is non-dirty since data is tied to first snapshot + head_object_map_state = OBJECT_EXISTS_CLEAN; + } + + auto r_it = m_pending_requests.rbegin(); + if (r_it != m_pending_requests.rend()) { + // last write-op determines the final object map state + head_object_map_state = (*r_it)->get_pre_write_object_map_state(); + } + + RWLock::WLocker object_map_locker(m_image_ctx->object_map_lock); + if ((*m_image_ctx->object_map)[m_object_no] != head_object_map_state) { + // (maybe) need to update the HEAD object map state + m_snap_ids.push_back(CEPH_NOSNAP); + } + object_map_locker.unlock(); + snap_locker.unlock(); + + ceph_assert(m_image_ctx->exclusive_lock->is_lock_owner()); + typename AsyncObjectThrottle<I>::ContextFactory context_factory( + boost::lambda::bind(boost::lambda::new_ptr<C_UpdateObjectMap<I>>(), + boost::lambda::_1, m_image_ctx, m_object_no, head_object_map_state, + &m_snap_ids, m_first_snap_is_clean, m_trace, boost::lambda::_2)); + auto ctx = util::create_context_callback< + CopyupRequest<I>, &CopyupRequest<I>::handle_update_object_maps>(this); + auto throttle = new AsyncObjectThrottle<I>( + nullptr, *m_image_ctx, context_factory, ctx, nullptr, 0, m_snap_ids.size()); + throttle->start_ops( + m_image_ctx->config.template get_val<uint64_t>("rbd_concurrent_management_ops")); +} + +template <typename I> +void CopyupRequest<I>::handle_update_object_maps(int r) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl; + + if (r < 0) { + lderr(m_image_ctx->cct) << "failed to update object map: " + << cpp_strerror(r) << dendl; + + finish(r); + return; + } + + copyup(); +} + +template <typename I> +void CopyupRequest<I>::copyup() { + auto cct = m_image_ctx->cct; + m_image_ctx->snap_lock.get_read(); + auto snapc = m_image_ctx->snapc; + m_image_ctx->snap_lock.put_read(); + + m_lock.Lock(); + if (!m_copyup_required) { + m_lock.Unlock(); + + ldout(cct, 20) << "skipping copyup" << dendl; + finish(0); + return; + } + + ldout(cct, 20) << "oid=" << m_oid << dendl; + + bool copy_on_read = m_pending_requests.empty(); + bool deep_copyup = !snapc.snaps.empty() && !m_copyup_is_zero; + if (m_copyup_is_zero) { + m_copyup_data.clear(); + } + + int r; + librados::ObjectWriteOperation copyup_op; + if (copy_on_read || deep_copyup) { + copyup_op.exec("rbd", "copyup", m_copyup_data); + ObjectRequest<I>::add_write_hint(*m_image_ctx, ©up_op); + ++m_pending_copyups; + } + + librados::ObjectWriteOperation write_op; + if (!copy_on_read) { + if (!deep_copyup) { + write_op.exec("rbd", "copyup", m_copyup_data); + ObjectRequest<I>::add_write_hint(*m_image_ctx, &write_op); + } + + // merge all pending write ops into this single RADOS op + for (auto req : m_pending_requests) { + ldout(cct, 20) << "add_copyup_ops " << req << dendl; + req->add_copyup_ops(&write_op); + } + + if (write_op.size() > 0) { + ++m_pending_copyups; + } + } + m_lock.Unlock(); + + // issue librados ops at the end to simplify test cases + std::vector<librados::snap_t> snaps; + if (copyup_op.size() > 0) { + // send only the copyup request with a blank snapshot context so that + // all snapshots are detected from the parent for this object. If + // this is a CoW request, a second request will be created for the + // actual modification. + ldout(cct, 20) << "copyup with empty snapshot context" << dendl; + + auto comp = util::create_rados_callback< + CopyupRequest<I>, &CopyupRequest<I>::handle_copyup>(this); + r = m_image_ctx->data_ctx.aio_operate( + m_oid, comp, ©up_op, 0, snaps, + (m_trace.valid() ? m_trace.get_info() : nullptr)); + ceph_assert(r == 0); + comp->release(); + } + + if (write_op.size() > 0) { + // compare-and-write doesn't add any write ops (copyup+cmpext+write + // can't be executed in the same RADOS op because, unless the object + // was already present in the clone, cmpext wouldn't see it) + ldout(cct, 20) << (!deep_copyup && write_op.size() > 2 ? + "copyup + ops" : !deep_copyup ? "copyup" : "ops") + << " with current snapshot context" << dendl; + + snaps.insert(snaps.end(), snapc.snaps.begin(), snapc.snaps.end()); + auto comp = util::create_rados_callback< + CopyupRequest<I>, &CopyupRequest<I>::handle_copyup>(this); + r = m_image_ctx->data_ctx.aio_operate( + m_oid, comp, &write_op, snapc.seq, snaps, + (m_trace.valid() ? m_trace.get_info() : nullptr)); + ceph_assert(r == 0); + comp->release(); + } +} + +template <typename I> +void CopyupRequest<I>::handle_copyup(int r) { + auto cct = m_image_ctx->cct; + unsigned pending_copyups; + { + Mutex::Locker locker(m_lock); + ceph_assert(m_pending_copyups > 0); + pending_copyups = --m_pending_copyups; + } + + ldout(cct, 20) << "oid=" << m_oid << ", " << "r=" << r << ", " + << "pending=" << pending_copyups << dendl; + + if (r < 0 && r != -ENOENT) { + lderr(cct) << "failed to copyup object: " << cpp_strerror(r) << dendl; + complete_requests(false, r); + } + + if (pending_copyups == 0) { + finish(0); + } +} + +template <typename I> +void CopyupRequest<I>::finish(int r) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "oid=" << m_oid << ", r=" << r << dendl; + + complete_requests(true, r); + delete this; +} + +template <typename I> +void CopyupRequest<I>::complete_requests(bool override_restart_retval, int r) { + auto cct = m_image_ctx->cct; + remove_from_list(); + + while (!m_pending_requests.empty()) { + auto it = m_pending_requests.begin(); + auto req = *it; + ldout(cct, 20) << "completing request " << req << dendl; + req->handle_copyup(r); + m_pending_requests.erase(it); + } + + if (override_restart_retval) { + r = -ERESTART; + } + + while (!m_restart_requests.empty()) { + auto it = m_restart_requests.begin(); + auto req = *it; + ldout(cct, 20) << "restarting request " << req << dendl; + req->handle_copyup(r); + m_restart_requests.erase(it); + } +} + +template <typename I> +void CopyupRequest<I>::disable_append_requests() { + ceph_assert(m_lock.is_locked()); + m_append_request_permitted = false; +} + +template <typename I> +void CopyupRequest<I>::remove_from_list() { + Mutex::Locker copyup_list_locker(m_image_ctx->copyup_list_lock); + + auto it = m_image_ctx->copyup_list.find(m_object_no); + if (it != m_image_ctx->copyup_list.end()) { + m_image_ctx->copyup_list.erase(it); + } +} + +template <typename I> +bool CopyupRequest<I>::is_copyup_required() { + ceph_assert(m_lock.is_locked()); + + bool copy_on_read = m_pending_requests.empty(); + if (copy_on_read) { + // always force a copyup if CoR enabled + return true; + } + + if (!m_copyup_is_zero) { + return true; + } + + for (auto req : m_pending_requests) { + if (!req->is_empty_write_op()) { + return true; + } + } + return false; +} + +template <typename I> +bool CopyupRequest<I>::is_deep_copy() const { + ceph_assert(m_image_ctx->snap_lock.is_locked()); + return !m_image_ctx->migration_info.empty(); +} + +template <typename I> +bool CopyupRequest<I>::is_update_object_map_required(int r) { + ceph_assert(m_image_ctx->snap_lock.is_locked()); + + if (r < 0) { + return false; + } + + if (m_image_ctx->object_map == nullptr) { + return false; + } + + if (m_image_ctx->migration_info.empty()) { + // migration might have completed while IO was in-flight, + // assume worst-case and perform an object map update + return true; + } + + auto it = m_image_ctx->migration_info.snap_map.find(CEPH_NOSNAP); + ceph_assert(it != m_image_ctx->migration_info.snap_map.end()); + return it->second[0] != CEPH_NOSNAP; +} + +template <typename I> +void CopyupRequest<I>::compute_deep_copy_snap_ids() { + ceph_assert(m_image_ctx->snap_lock.is_locked()); + + // don't copy ids for the snaps updated by object deep copy or + // that don't overlap + std::set<uint64_t> deep_copied; + for (auto &it : m_image_ctx->migration_info.snap_map) { + if (it.first != CEPH_NOSNAP) { + deep_copied.insert(it.second.front()); + } + } + + RWLock::RLocker parent_locker(m_image_ctx->parent_lock); + std::copy_if(m_image_ctx->snaps.rbegin(), m_image_ctx->snaps.rend(), + std::back_inserter(m_snap_ids), + [this, cct=m_image_ctx->cct, &deep_copied](uint64_t snap_id) { + if (deep_copied.count(snap_id)) { + m_first_snap_is_clean = true; + return false; + } + + uint64_t parent_overlap = 0; + int r = m_image_ctx->get_parent_overlap(snap_id, &parent_overlap); + if (r < 0) { + ldout(cct, 5) << "failed getting parent overlap for snap_id: " + << snap_id << ": " << cpp_strerror(r) << dendl; + } + if (parent_overlap == 0) { + return false; + } + std::vector<std::pair<uint64_t, uint64_t>> extents; + Striper::extent_to_file(cct, &m_image_ctx->layout, + m_object_no, 0, + m_image_ctx->layout.object_size, + extents); + auto overlap = m_image_ctx->prune_parent_extents( + extents, parent_overlap); + return overlap > 0; + }); +} + +} // namespace io +} // namespace librbd + +template class librbd::io::CopyupRequest<librbd::ImageCtx>; diff --git a/src/librbd/io/CopyupRequest.h b/src/librbd/io/CopyupRequest.h new file mode 100644 index 00000000..e4b3a2e7 --- /dev/null +++ b/src/librbd/io/CopyupRequest.h @@ -0,0 +1,135 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_COPYUP_REQUEST_H +#define CEPH_LIBRBD_IO_COPYUP_REQUEST_H + +#include "include/int_types.h" +#include "include/rados/librados.hpp" +#include "include/buffer.h" +#include "common/Mutex.h" +#include "common/zipkin_trace.h" +#include "librbd/io/AsyncOperation.h" +#include "librbd/io/Types.h" + +#include <string> +#include <vector> + +namespace ZTracer { struct Trace; } + +namespace librbd { + +struct ImageCtx; + +namespace io { + +template <typename I> class AbstractObjectWriteRequest; + +template <typename ImageCtxT = librbd::ImageCtx> +class CopyupRequest { +public: + static CopyupRequest* create(ImageCtxT *ictx, const std::string &oid, + uint64_t objectno, Extents &&image_extents, + const ZTracer::Trace &parent_trace) { + return new CopyupRequest(ictx, oid, objectno, std::move(image_extents), + parent_trace); + } + + CopyupRequest(ImageCtxT *ictx, const std::string &oid, uint64_t objectno, + Extents &&image_extents, const ZTracer::Trace &parent_trace); + ~CopyupRequest(); + + void append_request(AbstractObjectWriteRequest<ImageCtxT> *req); + + void send(); + +private: + /** + * Copyup requests go through the following state machine to read from the + * parent image, update the object map, and copyup the object: + * + * + * @verbatim + * + * <start> + * | + * /---------/ \---------\ + * | | + * v v + * READ_FROM_PARENT DEEP_COPY + * | | + * \---------\ /---------/ + * | + * v (skip if not needed) + * UPDATE_OBJECT_MAPS + * | + * v (skip if not needed) + * COPYUP + * | + * v + * <finish> + * + * @endverbatim + * + * The OBJECT_MAP state is skipped if the object map isn't enabled or if + * an object map update isn't required. The COPYUP state is skipped if + * no data was read from the parent *and* there are no additional ops. + */ + + typedef std::vector<AbstractObjectWriteRequest<ImageCtxT> *> WriteRequests; + + ImageCtxT *m_image_ctx; + std::string m_oid; + uint64_t m_object_no; + Extents m_image_extents; + ZTracer::Trace m_trace; + + bool m_flatten = false; + bool m_copyup_required = true; + bool m_copyup_is_zero = true; + + ceph::bufferlist m_copyup_data; + + AsyncOperation m_async_op; + + std::vector<uint64_t> m_snap_ids; + bool m_first_snap_is_clean = false; + + Mutex m_lock; + WriteRequests m_pending_requests; + unsigned m_pending_copyups = 0; + + WriteRequests m_restart_requests; + bool m_append_request_permitted = true; + + void read_from_parent(); + void handle_read_from_parent(int r); + + void deep_copy(); + void handle_deep_copy(int r); + + void update_object_maps(); + void handle_update_object_maps(int r); + + void copyup(); + void handle_copyup(int r); + + void finish(int r); + void complete_requests(bool override_restart_retval, int r); + + void disable_append_requests(); + void remove_from_list(); + + bool is_copyup_required(); + bool is_update_object_map_required(int r); + bool is_deep_copy() const; + + void compute_deep_copy_snap_ids(); +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::CopyupRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_IO_COPYUP_REQUEST_H diff --git a/src/librbd/io/ImageDispatchSpec.cc b/src/librbd/io/ImageDispatchSpec.cc new file mode 100644 index 00000000..2c405d74 --- /dev/null +++ b/src/librbd/io/ImageDispatchSpec.cc @@ -0,0 +1,154 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ImageDispatchSpec.h" +#include "librbd/ImageCtx.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ImageRequest.h" +#include <boost/variant.hpp> + +namespace librbd { +namespace io { + +template <typename I> +struct ImageDispatchSpec<I>::SendVisitor + : public boost::static_visitor<void> { + ImageDispatchSpec* spec; + + explicit SendVisitor(ImageDispatchSpec* spec) + : spec(spec) { + } + + void operator()(Read& read) const { + ImageRequest<I>::aio_read( + &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents), + std::move(read.read_result), spec->m_op_flags, spec->m_parent_trace); + } + + void operator()(Discard& discard) const { + ImageRequest<I>::aio_discard( + &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents), + discard.discard_granularity_bytes, spec->m_parent_trace); + } + + void operator()(Write& write) const { + ImageRequest<I>::aio_write( + &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents), + std::move(write.bl), spec->m_op_flags, spec->m_parent_trace); + } + + void operator()(WriteSame& write_same) const { + ImageRequest<I>::aio_writesame( + &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents), + std::move(write_same.bl), spec->m_op_flags, spec->m_parent_trace); + } + + void operator()(CompareAndWrite& compare_and_write) const { + ImageRequest<I>::aio_compare_and_write( + &spec->m_image_ctx, spec->m_aio_comp, std::move(spec->m_image_extents), + std::move(compare_and_write.cmp_bl), std::move(compare_and_write.bl), + compare_and_write.mismatch_offset, spec->m_op_flags, + spec->m_parent_trace); + } + + void operator()(Flush& flush) const { + ImageRequest<I>::aio_flush( + &spec->m_image_ctx, spec->m_aio_comp, flush.flush_source, + spec->m_parent_trace); + } +}; + +template <typename I> +struct ImageDispatchSpec<I>::IsWriteOpVisitor + : public boost::static_visitor<bool> { + bool operator()(const Read&) const { + return false; + } + + template <typename T> + bool operator()(const T&) const { + return true; + } +}; + +template <typename I> +struct ImageDispatchSpec<I>::TokenRequestedVisitor + : public boost::static_visitor<uint64_t> { + ImageDispatchSpec* spec; + uint64_t flag; + uint64_t *tokens; + + TokenRequestedVisitor(ImageDispatchSpec* spec, uint64_t _flag, + uint64_t *tokens) + : spec(spec), flag(_flag), tokens(tokens) { + } + + uint64_t operator()(const Read&) const { + if (flag & RBD_QOS_WRITE_MASK) { + *tokens = 0; + return false; + } + + *tokens = (flag & RBD_QOS_BPS_MASK) ? spec->extents_length() : 1; + return true; + } + + uint64_t operator()(const Flush&) const { + *tokens = 0; + return true; + } + + template <typename T> + uint64_t operator()(const T&) const { + if (flag & RBD_QOS_READ_MASK) { + *tokens = 0; + return false; + } + + *tokens = (flag & RBD_QOS_BPS_MASK) ? spec->extents_length() : 1; + return true; + } +}; + +template <typename I> +void ImageDispatchSpec<I>::send() { + boost::apply_visitor(SendVisitor{this}, m_request); +} + +template <typename I> +void ImageDispatchSpec<I>::fail(int r) { + m_aio_comp->get(); + m_aio_comp->fail(r); +} + +template <typename I> +uint64_t ImageDispatchSpec<I>::extents_length() { + uint64_t length = 0; + auto &extents = this->m_image_extents; + + for (auto &extent : extents) { + length += extent.second; + } + return length; +} + +template <typename I> +bool ImageDispatchSpec<I>::is_write_op() const { + return boost::apply_visitor(IsWriteOpVisitor(), m_request); +} + +template <typename I> +bool ImageDispatchSpec<I>::tokens_requested(uint64_t flag, uint64_t *tokens) { + return boost::apply_visitor(TokenRequestedVisitor{this, flag, tokens}, + m_request); +} + +template <typename I> +void ImageDispatchSpec<I>::start_op() { + m_aio_comp->start_op(); +} + +} // namespace io +} // namespace librbd + +template class librbd::io::ImageDispatchSpec<librbd::ImageCtx>; diff --git a/src/librbd/io/ImageDispatchSpec.h b/src/librbd/io/ImageDispatchSpec.h new file mode 100644 index 00000000..93c53a0f --- /dev/null +++ b/src/librbd/io/ImageDispatchSpec.h @@ -0,0 +1,182 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_IMAGE_DISPATCH_SPEC_H +#define CEPH_LIBRBD_IO_IMAGE_DISPATCH_SPEC_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "common/zipkin_trace.h" +#include "librbd/io/Types.h" +#include "librbd/io/ReadResult.h" +#include <boost/variant/variant.hpp> + +namespace librbd { + +class ImageCtx; + +namespace io { + +class AioCompletion; + +template <typename ImageCtxT = ImageCtx> +class ImageDispatchSpec { +public: + struct Read { + ReadResult read_result; + + Read(ReadResult &&read_result) : read_result(std::move(read_result)) { + } + }; + + struct Discard { + uint32_t discard_granularity_bytes; + + Discard(uint32_t discard_granularity_bytes) + : discard_granularity_bytes(discard_granularity_bytes) { + } + }; + + struct Write { + bufferlist bl; + + Write(bufferlist&& bl) : bl(std::move(bl)) { + } + }; + + struct WriteSame { + bufferlist bl; + + WriteSame(bufferlist&& bl) : bl(std::move(bl)) { + } + }; + + struct CompareAndWrite { + bufferlist cmp_bl; + bufferlist bl; + uint64_t *mismatch_offset; + + CompareAndWrite(bufferlist&& cmp_bl, bufferlist&& bl, + uint64_t *mismatch_offset) + : cmp_bl(std::move(cmp_bl)), bl(std::move(bl)), + mismatch_offset(mismatch_offset) { + } + }; + + struct Flush { + FlushSource flush_source; + + Flush(FlushSource flush_source) : flush_source(flush_source) { + } + }; + + static ImageDispatchSpec* create_read_request( + ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents, + ReadResult &&read_result, int op_flags, + const ZTracer::Trace &parent_trace) { + return new ImageDispatchSpec(image_ctx, aio_comp, + std::move(image_extents), + Read{std::move(read_result)}, + op_flags, parent_trace); + } + + static ImageDispatchSpec* create_discard_request( + ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len, + uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace) { + return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}}, + Discard{discard_granularity_bytes}, + 0, parent_trace); + } + + static ImageDispatchSpec* create_write_request( + ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents, + bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) { + return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents), + Write{std::move(bl)}, op_flags, parent_trace); + } + + static ImageDispatchSpec* create_write_same_request( + ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) { + return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}}, + WriteSame{std::move(bl)}, op_flags, + parent_trace); + } + + static ImageDispatchSpec* create_compare_and_write_request( + ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents, + bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset, + int op_flags, const ZTracer::Trace &parent_trace) { + return new ImageDispatchSpec(image_ctx, aio_comp, + std::move(image_extents), + CompareAndWrite{std::move(cmp_bl), + std::move(bl), + mismatch_offset}, + op_flags, parent_trace); + } + + static ImageDispatchSpec* create_flush_request( + ImageCtxT &image_ctx, AioCompletion *aio_comp, + FlushSource flush_source, const ZTracer::Trace &parent_trace) { + return new ImageDispatchSpec(image_ctx, aio_comp, {}, Flush{flush_source}, + 0, parent_trace); + } + + void send(); + void fail(int r); + + bool is_write_op() const; + + void start_op(); + + bool tokens_requested(uint64_t flag, uint64_t *tokens); + + bool was_throttled(uint64_t flag) { + return m_throttled_flag & flag; + } + + void set_throttled(uint64_t flag) { + m_throttled_flag |= flag; + } + + bool were_all_throttled() { + return (m_throttled_flag & RBD_QOS_MASK) == RBD_QOS_MASK; + } + +private: + typedef boost::variant<Read, + Discard, + Write, + WriteSame, + CompareAndWrite, + Flush> Request; + + struct SendVisitor; + struct IsWriteOpVisitor; + struct TokenRequestedVisitor; + + ImageDispatchSpec(ImageCtxT& image_ctx, AioCompletion* aio_comp, + Extents&& image_extents, Request&& request, + int op_flags, const ZTracer::Trace& parent_trace) + : m_image_ctx(image_ctx), m_aio_comp(aio_comp), + m_image_extents(std::move(image_extents)), m_request(std::move(request)), + m_op_flags(op_flags), m_parent_trace(parent_trace) { + } + + ImageCtxT& m_image_ctx; + AioCompletion* m_aio_comp; + Extents m_image_extents; + Request m_request; + int m_op_flags; + ZTracer::Trace m_parent_trace; + std::atomic<uint64_t> m_throttled_flag = 0; + + uint64_t extents_length(); +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::ImageDispatchSpec<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_IO_IMAGE_DISPATCH_SPEC_H diff --git a/src/librbd/io/ImageRequest.cc b/src/librbd/io/ImageRequest.cc new file mode 100644 index 00000000..d6eb29fe --- /dev/null +++ b/src/librbd/io/ImageRequest.cc @@ -0,0 +1,824 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ImageRequest.h" +#include "librbd/ImageCtx.h" +#include "librbd/internal.h" +#include "librbd/Journal.h" +#include "librbd/Types.h" +#include "librbd/Utils.h" +#include "librbd/cache/ImageCache.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/AsyncOperation.h" +#include "librbd/io/ObjectDispatchInterface.h" +#include "librbd/io/ObjectDispatchSpec.h" +#include "librbd/io/ObjectDispatcher.h" +#include "librbd/io/Utils.h" +#include "librbd/journal/Types.h" +#include "include/rados/librados.hpp" +#include "common/perf_counters.h" +#include "common/WorkQueue.h" +#include "osdc/Striper.h" +#include <algorithm> +#include <functional> + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::ImageRequest: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +using librbd::util::get_image_ctx; + +namespace { + +template <typename I> +struct C_UpdateTimestamp : public Context { +public: + I& m_image_ctx; + bool m_modify; // if modify set to 'true', modify timestamp is updated, + // access timestamp otherwise + AsyncOperation m_async_op; + + C_UpdateTimestamp(I& ictx, bool m) : m_image_ctx(ictx), m_modify(m) { + m_async_op.start_op(*get_image_ctx(&m_image_ctx)); + } + ~C_UpdateTimestamp() override { + m_async_op.finish_op(); + } + + void send() { + librados::ObjectWriteOperation op; + if (m_modify) { + cls_client::set_modify_timestamp(&op); + } else { + cls_client::set_access_timestamp(&op); + } + + auto comp = librbd::util::create_rados_callback(this); + int r = m_image_ctx.md_ctx.aio_operate(m_image_ctx.header_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + } + + void finish(int r) override { + // ignore errors updating timestamp + } +}; + +bool should_update_timestamp(const utime_t& now, const utime_t& timestamp, + uint64_t interval) { + return (interval && + (static_cast<uint64_t>(now.sec()) >= interval + timestamp)); +} + +} // anonymous namespace + +template <typename I> +void ImageRequest<I>::aio_read(I *ictx, AioCompletion *c, + Extents &&image_extents, + ReadResult &&read_result, int op_flags, + const ZTracer::Trace &parent_trace) { + ImageReadRequest<I> req(*ictx, c, std::move(image_extents), + std::move(read_result), op_flags, parent_trace); + req.send(); +} + +template <typename I> +void ImageRequest<I>::aio_write(I *ictx, AioCompletion *c, + Extents &&image_extents, bufferlist &&bl, + int op_flags, + const ZTracer::Trace &parent_trace) { + ImageWriteRequest<I> req(*ictx, c, std::move(image_extents), std::move(bl), + op_flags, parent_trace); + req.send(); +} + +template <typename I> +void ImageRequest<I>::aio_discard(I *ictx, AioCompletion *c, + Extents &&image_extents, + uint32_t discard_granularity_bytes, + const ZTracer::Trace &parent_trace) { + ImageDiscardRequest<I> req(*ictx, c, std::move(image_extents), + discard_granularity_bytes, parent_trace); + req.send(); +} + +template <typename I> +void ImageRequest<I>::aio_flush(I *ictx, AioCompletion *c, + FlushSource flush_source, + const ZTracer::Trace &parent_trace) { + ImageFlushRequest<I> req(*ictx, c, flush_source, parent_trace); + req.send(); +} + +template <typename I> +void ImageRequest<I>::aio_writesame(I *ictx, AioCompletion *c, + Extents &&image_extents, + bufferlist &&bl, int op_flags, + const ZTracer::Trace &parent_trace) { + ImageWriteSameRequest<I> req(*ictx, c, std::move(image_extents), + std::move(bl), op_flags, parent_trace); + req.send(); +} + +template <typename I> +void ImageRequest<I>::aio_compare_and_write(I *ictx, AioCompletion *c, + Extents &&image_extents, + bufferlist &&cmp_bl, + bufferlist &&bl, + uint64_t *mismatch_offset, + int op_flags, + const ZTracer::Trace &parent_trace) { + ImageCompareAndWriteRequest<I> req(*ictx, c, std::move(image_extents), + std::move(cmp_bl), std::move(bl), + mismatch_offset, op_flags, parent_trace); + req.send(); +} + + +template <typename I> +void ImageRequest<I>::send() { + I &image_ctx = this->m_image_ctx; + ceph_assert(m_aio_comp->is_initialized(get_aio_type())); + ceph_assert(m_aio_comp->is_started()); + + CephContext *cct = image_ctx.cct; + AioCompletion *aio_comp = this->m_aio_comp; + ldout(cct, 20) << get_request_type() << ": ictx=" << &image_ctx << ", " + << "completion=" << aio_comp << dendl; + + aio_comp->get(); + int r = clip_request(); + if (r < 0) { + m_aio_comp->fail(r); + return; + } + + if (m_bypass_image_cache || m_image_ctx.image_cache == nullptr) { + update_timestamp(); + send_request(); + } else { + send_image_cache_request(); + } +} + +template <typename I> +int ImageRequest<I>::clip_request() { + RWLock::RLocker snap_locker(m_image_ctx.snap_lock); + for (auto &image_extent : m_image_extents) { + auto clip_len = image_extent.second; + int r = clip_io(get_image_ctx(&m_image_ctx), image_extent.first, &clip_len); + if (r < 0) { + return r; + } + + image_extent.second = clip_len; + } + return 0; +} + +template <typename I> +void ImageRequest<I>::update_timestamp() { + bool modify = (get_aio_type() != AIO_TYPE_READ); + uint64_t update_interval; + if (modify) { + update_interval = m_image_ctx.mtime_update_interval; + } else { + update_interval = m_image_ctx.atime_update_interval; + } + + if (update_interval == 0) { + return; + } + + utime_t (I::*get_timestamp_fn)() const; + void (I::*set_timestamp_fn)(utime_t); + if (modify) { + get_timestamp_fn = &I::get_modify_timestamp; + set_timestamp_fn = &I::set_modify_timestamp; + } else { + get_timestamp_fn = &I::get_access_timestamp; + set_timestamp_fn = &I::set_access_timestamp; + } + + utime_t ts = ceph_clock_now(); + { + RWLock::RLocker timestamp_locker(m_image_ctx.timestamp_lock); + if(!should_update_timestamp(ts, std::invoke(get_timestamp_fn, m_image_ctx), + update_interval)) { + return; + } + } + + { + RWLock::WLocker timestamp_locker(m_image_ctx.timestamp_lock); + bool update = should_update_timestamp( + ts, std::invoke(get_timestamp_fn, m_image_ctx), update_interval); + if (!update) { + return; + } + + std::invoke(set_timestamp_fn, m_image_ctx, ts); + } + + // TODO we fire and forget this outside the IO path to prevent + // potential race conditions with librbd client IO callbacks + // between different threads (e.g. librados and object cacher) + ldout(m_image_ctx.cct, 10) << get_request_type() << dendl; + auto req = new C_UpdateTimestamp<I>(m_image_ctx, modify); + req->send(); +} + +template <typename I> +ImageReadRequest<I>::ImageReadRequest(I &image_ctx, AioCompletion *aio_comp, + Extents &&image_extents, + ReadResult &&read_result, int op_flags, + const ZTracer::Trace &parent_trace) + : ImageRequest<I>(image_ctx, aio_comp, std::move(image_extents), "read", + parent_trace), + m_op_flags(op_flags) { + aio_comp->read_result = std::move(read_result); +} + +template <typename I> +int ImageReadRequest<I>::clip_request() { + int r = ImageRequest<I>::clip_request(); + if (r < 0) { + return r; + } + + uint64_t buffer_length = 0; + auto &image_extents = this->m_image_extents; + for (auto &image_extent : image_extents) { + buffer_length += image_extent.second; + } + this->m_aio_comp->read_result.set_clip_length(buffer_length); + return 0; +} + +template <typename I> +void ImageReadRequest<I>::send_request() { + I &image_ctx = this->m_image_ctx; + CephContext *cct = image_ctx.cct; + + auto &image_extents = this->m_image_extents; + if (image_ctx.cache && image_ctx.readahead_max_bytes > 0 && + !(m_op_flags & LIBRADOS_OP_FLAG_FADVISE_RANDOM)) { + readahead(get_image_ctx(&image_ctx), image_extents); + } + + AioCompletion *aio_comp = this->m_aio_comp; + librados::snap_t snap_id; + map<object_t,vector<ObjectExtent> > object_extents; + uint64_t buffer_ofs = 0; + { + // prevent image size from changing between computing clip and recording + // pending async operation + RWLock::RLocker snap_locker(image_ctx.snap_lock); + snap_id = image_ctx.snap_id; + + // map image extents to object extents + for (auto &extent : image_extents) { + if (extent.second == 0) { + continue; + } + + Striper::file_to_extents(cct, image_ctx.format_string, &image_ctx.layout, + extent.first, extent.second, 0, object_extents, + buffer_ofs); + buffer_ofs += extent.second; + } + } + + // pre-calculate the expected number of read requests + uint32_t request_count = 0; + for (auto &object_extent : object_extents) { + request_count += object_extent.second.size(); + } + aio_comp->set_request_count(request_count); + + // issue the requests + for (auto &object_extent : object_extents) { + for (auto &extent : object_extent.second) { + ldout(cct, 20) << "oid " << extent.oid << " " << extent.offset << "~" + << extent.length << " from " << extent.buffer_extents + << dendl; + + auto req_comp = new io::ReadResult::C_ObjectReadRequest( + aio_comp, extent.offset, extent.length, + std::move(extent.buffer_extents)); + auto req = ObjectDispatchSpec::create_read( + &image_ctx, OBJECT_DISPATCH_LAYER_NONE, extent.oid.name, + extent.objectno, extent.offset, extent.length, snap_id, m_op_flags, + this->m_trace, &req_comp->bl, &req_comp->extent_map, req_comp); + req->send(); + } + } + + aio_comp->put(); + + image_ctx.perfcounter->inc(l_librbd_rd); + image_ctx.perfcounter->inc(l_librbd_rd_bytes, buffer_ofs); +} + +template <typename I> +void ImageReadRequest<I>::send_image_cache_request() { + I &image_ctx = this->m_image_ctx; + ceph_assert(image_ctx.image_cache != nullptr); + + AioCompletion *aio_comp = this->m_aio_comp; + aio_comp->set_request_count(1); + + auto *req_comp = new io::ReadResult::C_ImageReadRequest( + aio_comp, this->m_image_extents); + image_ctx.image_cache->aio_read(std::move(this->m_image_extents), + &req_comp->bl, m_op_flags, + req_comp); +} + +template <typename I> +void AbstractImageWriteRequest<I>::send_request() { + I &image_ctx = this->m_image_ctx; + CephContext *cct = image_ctx.cct; + + RWLock::RLocker md_locker(image_ctx.md_lock); + + bool journaling = false; + + AioCompletion *aio_comp = this->m_aio_comp; + uint64_t clip_len = 0; + ObjectExtents object_extents; + ::SnapContext snapc; + { + // prevent image size from changing between computing clip and recording + // pending async operation + RWLock::RLocker snap_locker(image_ctx.snap_lock); + if (image_ctx.snap_id != CEPH_NOSNAP || image_ctx.read_only) { + aio_comp->fail(-EROFS); + return; + } + + for (auto &extent : this->m_image_extents) { + if (extent.second == 0) { + continue; + } + + // map to object extents + Striper::file_to_extents(cct, image_ctx.format_string, &image_ctx.layout, + extent.first, extent.second, 0, object_extents); + clip_len += extent.second; + } + + snapc = image_ctx.snapc; + journaling = (image_ctx.journal != nullptr && + image_ctx.journal->is_journal_appending()); + } + + int ret = prune_object_extents(&object_extents); + if (ret < 0) { + aio_comp->fail(ret); + return; + } + + if (!object_extents.empty()) { + uint64_t journal_tid = 0; + if (journaling) { + // in-flight ops are flushed prior to closing the journal + ceph_assert(image_ctx.journal != NULL); + journal_tid = append_journal_event(m_synchronous); + } + + aio_comp->set_request_count(object_extents.size()); + send_object_requests(object_extents, snapc, journal_tid); + } else { + // no IO to perform -- fire completion + aio_comp->set_request_count(0); + } + + update_stats(clip_len); + aio_comp->put(); +} + +template <typename I> +void AbstractImageWriteRequest<I>::send_object_requests( + const ObjectExtents &object_extents, const ::SnapContext &snapc, + uint64_t journal_tid) { + I &image_ctx = this->m_image_ctx; + CephContext *cct = image_ctx.cct; + + AioCompletion *aio_comp = this->m_aio_comp; + for (ObjectExtents::const_iterator p = object_extents.begin(); + p != object_extents.end(); ++p) { + ldout(cct, 20) << "oid " << p->oid << " " << p->offset << "~" << p->length + << " from " << p->buffer_extents << dendl; + C_AioRequest *req_comp = new C_AioRequest(aio_comp); + auto request = create_object_request(*p, snapc, journal_tid, req_comp); + + // if journaling, stash the request for later; otherwise send + if (request != NULL) { + request->send(); + } + } +} + +template <typename I> +void ImageWriteRequest<I>::assemble_extent(const ObjectExtent &object_extent, + bufferlist *bl) { + for (auto q = object_extent.buffer_extents.begin(); + q != object_extent.buffer_extents.end(); ++q) { + bufferlist sub_bl; + sub_bl.substr_of(m_bl, q->first, q->second); + bl->claim_append(sub_bl); + } +} + +template <typename I> +uint64_t ImageWriteRequest<I>::append_journal_event(bool synchronous) { + I &image_ctx = this->m_image_ctx; + + uint64_t tid = 0; + uint64_t buffer_offset = 0; + ceph_assert(!this->m_image_extents.empty()); + for (auto &extent : this->m_image_extents) { + bufferlist sub_bl; + sub_bl.substr_of(m_bl, buffer_offset, extent.second); + buffer_offset += extent.second; + + tid = image_ctx.journal->append_write_event(extent.first, extent.second, + sub_bl, synchronous); + } + + return tid; +} + +template <typename I> +void ImageWriteRequest<I>::send_image_cache_request() { + I &image_ctx = this->m_image_ctx; + ceph_assert(image_ctx.image_cache != nullptr); + + AioCompletion *aio_comp = this->m_aio_comp; + aio_comp->set_request_count(1); + C_AioRequest *req_comp = new C_AioRequest(aio_comp); + image_ctx.image_cache->aio_write(std::move(this->m_image_extents), + std::move(m_bl), m_op_flags, req_comp); +} + +template <typename I> +ObjectDispatchSpec *ImageWriteRequest<I>::create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) { + I &image_ctx = this->m_image_ctx; + + bufferlist bl; + assemble_extent(object_extent, &bl); + auto req = ObjectDispatchSpec::create_write( + &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name, + object_extent.objectno, object_extent.offset, std::move(bl), snapc, + m_op_flags, journal_tid, this->m_trace, on_finish); + return req; +} + +template <typename I> +void ImageWriteRequest<I>::update_stats(size_t length) { + I &image_ctx = this->m_image_ctx; + image_ctx.perfcounter->inc(l_librbd_wr); + image_ctx.perfcounter->inc(l_librbd_wr_bytes, length); +} + +template <typename I> +uint64_t ImageDiscardRequest<I>::append_journal_event(bool synchronous) { + I &image_ctx = this->m_image_ctx; + + uint64_t tid = 0; + ceph_assert(!this->m_image_extents.empty()); + for (auto &extent : this->m_image_extents) { + journal::EventEntry event_entry( + journal::AioDiscardEvent(extent.first, + extent.second, + this->m_discard_granularity_bytes)); + tid = image_ctx.journal->append_io_event(std::move(event_entry), + extent.first, extent.second, + synchronous, 0); + } + + return tid; +} + +template <typename I> +void ImageDiscardRequest<I>::send_image_cache_request() { + I &image_ctx = this->m_image_ctx; + ceph_assert(image_ctx.image_cache != nullptr); + + AioCompletion *aio_comp = this->m_aio_comp; + aio_comp->set_request_count(this->m_image_extents.size()); + for (auto &extent : this->m_image_extents) { + C_AioRequest *req_comp = new C_AioRequest(aio_comp); + image_ctx.image_cache->aio_discard(extent.first, extent.second, + this->m_discard_granularity_bytes, + req_comp); + } +} + +template <typename I> +ObjectDispatchSpec *ImageDiscardRequest<I>::create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) { + I &image_ctx = this->m_image_ctx; + auto req = ObjectDispatchSpec::create_discard( + &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name, + object_extent.objectno, object_extent.offset, object_extent.length, snapc, + OBJECT_DISCARD_FLAG_DISABLE_CLONE_REMOVE, journal_tid, this->m_trace, + on_finish); + return req; +} + +template <typename I> +void ImageDiscardRequest<I>::update_stats(size_t length) { + I &image_ctx = this->m_image_ctx; + image_ctx.perfcounter->inc(l_librbd_discard); + image_ctx.perfcounter->inc(l_librbd_discard_bytes, length); +} + +template <typename I> +int ImageDiscardRequest<I>::prune_object_extents( + ObjectExtents* object_extents) const { + if (m_discard_granularity_bytes == 0) { + return 0; + } + + // Align the range to discard_granularity_bytes boundary and skip + // and discards that are too small to free up any space. + // + // discard_granularity_bytes >= object_size && tail truncation + // is a special case for filestore + bool prune_required = false; + auto object_size = this->m_image_ctx.layout.object_size; + auto discard_granularity_bytes = std::min(m_discard_granularity_bytes, + object_size); + auto xform_lambda = + [discard_granularity_bytes, object_size, &prune_required] + (ObjectExtent& object_extent) { + auto& offset = object_extent.offset; + auto& length = object_extent.length; + auto next_offset = offset + length; + + if ((discard_granularity_bytes < object_size) || + (next_offset < object_size)) { + offset = p2roundup<uint64_t>(offset, discard_granularity_bytes); + next_offset = p2align<uint64_t>(next_offset, discard_granularity_bytes); + if (offset >= next_offset) { + prune_required = true; + length = 0; + } else { + length = next_offset - offset; + } + } + }; + std::for_each(object_extents->begin(), object_extents->end(), + xform_lambda); + + if (prune_required) { + // one or more object extents were skipped + auto remove_lambda = + [](const ObjectExtent& object_extent) { + return (object_extent.length == 0); + }; + object_extents->erase( + std::remove_if(object_extents->begin(), object_extents->end(), + remove_lambda), + object_extents->end()); + } + return 0; +} + +template <typename I> +void ImageFlushRequest<I>::send_request() { + I &image_ctx = this->m_image_ctx; + + bool journaling = false; + { + RWLock::RLocker snap_locker(image_ctx.snap_lock); + journaling = (m_flush_source == FLUSH_SOURCE_USER && + image_ctx.journal != nullptr && + image_ctx.journal->is_journal_appending()); + } + + AioCompletion *aio_comp = this->m_aio_comp; + aio_comp->set_request_count(1); + + Context *ctx = new C_AioRequest(aio_comp); + + // ensure no locks are held when flush is complete + ctx = librbd::util::create_async_context_callback(image_ctx, ctx); + + if (journaling) { + // in-flight ops are flushed prior to closing the journal + uint64_t journal_tid = image_ctx.journal->append_io_event( + journal::EventEntry(journal::AioFlushEvent()), 0, 0, false, 0); + image_ctx.journal->user_flushed(); + + ctx = new FunctionContext( + [&image_ctx, journal_tid, ctx](int r) { + image_ctx.journal->commit_io_event(journal_tid, r); + ctx->complete(r); + }); + ctx = new FunctionContext( + [&image_ctx, journal_tid, ctx](int r) { + image_ctx.journal->flush_event(journal_tid, ctx); + }); + } else { + // flush rbd cache only when journaling is not enabled + auto object_dispatch_spec = ObjectDispatchSpec::create_flush( + &image_ctx, OBJECT_DISPATCH_LAYER_NONE, m_flush_source, this->m_trace, + ctx); + ctx = new FunctionContext([object_dispatch_spec](int r) { + object_dispatch_spec->send(); + }); + } + + // ensure all in-flight IOs are settled if non-user flush request + aio_comp->async_op.flush(ctx); + aio_comp->put(); + + // might be flushing during image shutdown + if (image_ctx.perfcounter != nullptr) { + image_ctx.perfcounter->inc(l_librbd_flush); + } +} + +template <typename I> +void ImageFlushRequest<I>::send_image_cache_request() { + I &image_ctx = this->m_image_ctx; + ceph_assert(image_ctx.image_cache != nullptr); + + AioCompletion *aio_comp = this->m_aio_comp; + aio_comp->set_request_count(1); + C_AioRequest *req_comp = new C_AioRequest(aio_comp); + image_ctx.image_cache->aio_flush(req_comp); +} + +template <typename I> +uint64_t ImageWriteSameRequest<I>::append_journal_event(bool synchronous) { + I &image_ctx = this->m_image_ctx; + + uint64_t tid = 0; + ceph_assert(!this->m_image_extents.empty()); + for (auto &extent : this->m_image_extents) { + journal::EventEntry event_entry(journal::AioWriteSameEvent(extent.first, + extent.second, + m_data_bl)); + tid = image_ctx.journal->append_io_event(std::move(event_entry), + extent.first, extent.second, + synchronous, 0); + } + + return tid; +} + +template <typename I> +void ImageWriteSameRequest<I>::send_image_cache_request() { + I &image_ctx = this->m_image_ctx; + ceph_assert(image_ctx.image_cache != nullptr); + + AioCompletion *aio_comp = this->m_aio_comp; + aio_comp->set_request_count(this->m_image_extents.size()); + for (auto &extent : this->m_image_extents) { + C_AioRequest *req_comp = new C_AioRequest(aio_comp); + image_ctx.image_cache->aio_writesame(extent.first, extent.second, + std::move(m_data_bl), m_op_flags, + req_comp); + } +} + +template <typename I> +ObjectDispatchSpec *ImageWriteSameRequest<I>::create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) { + I &image_ctx = this->m_image_ctx; + + bufferlist bl; + ObjectDispatchSpec *req; + + if (util::assemble_write_same_extent(object_extent, m_data_bl, &bl, false)) { + Extents buffer_extents{object_extent.buffer_extents}; + + req = ObjectDispatchSpec::create_write_same( + &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name, + object_extent.objectno, object_extent.offset, object_extent.length, + std::move(buffer_extents), std::move(bl), snapc, m_op_flags, journal_tid, + this->m_trace, on_finish); + return req; + } + req = ObjectDispatchSpec::create_write( + &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name, + object_extent.objectno, object_extent.offset, std::move(bl), snapc, + m_op_flags, journal_tid, this->m_trace, on_finish); + return req; +} + +template <typename I> +void ImageWriteSameRequest<I>::update_stats(size_t length) { + I &image_ctx = this->m_image_ctx; + image_ctx.perfcounter->inc(l_librbd_ws); + image_ctx.perfcounter->inc(l_librbd_ws_bytes, length); +} + +template <typename I> +uint64_t ImageCompareAndWriteRequest<I>::append_journal_event( + bool synchronous) { + I &image_ctx = this->m_image_ctx; + + uint64_t tid = 0; + ceph_assert(this->m_image_extents.size() == 1); + auto &extent = this->m_image_extents.front(); + journal::EventEntry event_entry( + journal::AioCompareAndWriteEvent(extent.first, extent.second, m_cmp_bl, + m_bl)); + tid = image_ctx.journal->append_io_event(std::move(event_entry), + extent.first, extent.second, + synchronous, -EILSEQ); + + return tid; +} + +template <typename I> +void ImageCompareAndWriteRequest<I>::assemble_extent( + const ObjectExtent &object_extent, bufferlist *bl) { + for (auto q = object_extent.buffer_extents.begin(); + q != object_extent.buffer_extents.end(); ++q) { + bufferlist sub_bl; + sub_bl.substr_of(m_bl, q->first, q->second); + bl->claim_append(sub_bl); + } +} + +template <typename I> +void ImageCompareAndWriteRequest<I>::send_image_cache_request() { + I &image_ctx = this->m_image_ctx; + ceph_assert(image_ctx.image_cache != nullptr); + + AioCompletion *aio_comp = this->m_aio_comp; + aio_comp->set_request_count(1); + C_AioRequest *req_comp = new C_AioRequest(aio_comp); + image_ctx.image_cache->aio_compare_and_write( + std::move(this->m_image_extents), std::move(m_cmp_bl), std::move(m_bl), + m_mismatch_offset, m_op_flags, req_comp); +} + +template <typename I> +ObjectDispatchSpec *ImageCompareAndWriteRequest<I>::create_object_request( + const ObjectExtent &object_extent, + const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) { + I &image_ctx = this->m_image_ctx; + + // NOTE: safe to move m_cmp_bl since we only support this op against + // a single object + bufferlist bl; + assemble_extent(object_extent, &bl); + auto req = ObjectDispatchSpec::create_compare_and_write( + &image_ctx, OBJECT_DISPATCH_LAYER_NONE, object_extent.oid.name, + object_extent.objectno, object_extent.offset, std::move(m_cmp_bl), + std::move(bl), snapc, m_mismatch_offset, m_op_flags, journal_tid, + this->m_trace, on_finish); + return req; +} + +template <typename I> +void ImageCompareAndWriteRequest<I>::update_stats(size_t length) { + I &image_ctx = this->m_image_ctx; + image_ctx.perfcounter->inc(l_librbd_cmp); + image_ctx.perfcounter->inc(l_librbd_cmp_bytes, length); +} + +template <typename I> +int ImageCompareAndWriteRequest<I>::prune_object_extents( + ObjectExtents* object_extents) const { + if (object_extents->size() > 1) + return -EINVAL; + + I &image_ctx = this->m_image_ctx; + uint64_t sector_size = 512ULL; + uint64_t su = image_ctx.layout.stripe_unit; + ObjectExtent object_extent = object_extents->front(); + if (object_extent.offset % sector_size + object_extent.length > sector_size || + (su != 0 && (object_extent.offset % su + object_extent.length > su))) + return -EINVAL; + + return 0; +} + +} // namespace io +} // namespace librbd + +template class librbd::io::ImageRequest<librbd::ImageCtx>; +template class librbd::io::ImageReadRequest<librbd::ImageCtx>; +template class librbd::io::AbstractImageWriteRequest<librbd::ImageCtx>; +template class librbd::io::ImageWriteRequest<librbd::ImageCtx>; +template class librbd::io::ImageDiscardRequest<librbd::ImageCtx>; +template class librbd::io::ImageFlushRequest<librbd::ImageCtx>; +template class librbd::io::ImageWriteSameRequest<librbd::ImageCtx>; +template class librbd::io::ImageCompareAndWriteRequest<librbd::ImageCtx>; diff --git a/src/librbd/io/ImageRequest.h b/src/librbd/io/ImageRequest.h new file mode 100644 index 00000000..d7d10019 --- /dev/null +++ b/src/librbd/io/ImageRequest.h @@ -0,0 +1,365 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_IMAGE_REQUEST_H +#define CEPH_LIBRBD_IO_IMAGE_REQUEST_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "common/snap_types.h" +#include "common/zipkin_trace.h" +#include "osd/osd_types.h" +#include "librbd/Utils.h" +#include "librbd/io/Types.h" +#include <list> +#include <utility> +#include <vector> + +namespace librbd { +class ImageCtx; + +namespace io { + +class AioCompletion; +class ObjectDispatchSpec; +class ReadResult; + +template <typename ImageCtxT = ImageCtx> +class ImageRequest { +public: + typedef std::vector<std::pair<uint64_t,uint64_t> > Extents; + + virtual ~ImageRequest() { + m_trace.event("finish"); + } + + static void aio_read(ImageCtxT *ictx, AioCompletion *c, + Extents &&image_extents, ReadResult &&read_result, + int op_flags, const ZTracer::Trace &parent_trace); + static void aio_write(ImageCtxT *ictx, AioCompletion *c, + Extents &&image_extents, bufferlist &&bl, int op_flags, + const ZTracer::Trace &parent_trace); + static void aio_discard(ImageCtxT *ictx, AioCompletion *c, + Extents &&image_extents, + uint32_t discard_granularity_bytes, + const ZTracer::Trace &parent_trace); + static void aio_flush(ImageCtxT *ictx, AioCompletion *c, + FlushSource flush_source, + const ZTracer::Trace &parent_trace); + static void aio_writesame(ImageCtxT *ictx, AioCompletion *c, + Extents &&image_extents, bufferlist &&bl, + int op_flags, const ZTracer::Trace &parent_trace); + + static void aio_compare_and_write(ImageCtxT *ictx, AioCompletion *c, + Extents &&image_extents, bufferlist &&cmp_bl, + bufferlist &&bl, uint64_t *mismatch_offset, + int op_flags, const ZTracer::Trace &parent_trace); + + void send(); + + void set_bypass_image_cache() { + m_bypass_image_cache = true; + } + + inline const ZTracer::Trace &get_trace() const { + return m_trace; + } + +protected: + typedef std::list<ObjectDispatchSpec*> ObjectRequests; + + ImageCtxT &m_image_ctx; + AioCompletion *m_aio_comp; + Extents m_image_extents; + ZTracer::Trace m_trace; + bool m_bypass_image_cache = false; + + ImageRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + Extents &&image_extents, const char *trace_name, + const ZTracer::Trace &parent_trace) + : m_image_ctx(image_ctx), m_aio_comp(aio_comp), + m_image_extents(std::move(image_extents)), + m_trace(util::create_trace(image_ctx, trace_name, parent_trace)) { + m_trace.event("start"); + } + + + virtual int clip_request(); + virtual void update_timestamp(); + virtual void send_request() = 0; + virtual void send_image_cache_request() = 0; + + virtual aio_type_t get_aio_type() const = 0; + virtual const char *get_request_type() const = 0; +}; + +template <typename ImageCtxT = ImageCtx> +class ImageReadRequest : public ImageRequest<ImageCtxT> { +public: + using typename ImageRequest<ImageCtxT>::Extents; + + ImageReadRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + Extents &&image_extents, ReadResult &&read_result, + int op_flags, const ZTracer::Trace &parent_trace); + +protected: + int clip_request() override; + + void send_request() override; + void send_image_cache_request() override; + + aio_type_t get_aio_type() const override { + return AIO_TYPE_READ; + } + const char *get_request_type() const override { + return "aio_read"; + } +private: + int m_op_flags; +}; + +template <typename ImageCtxT = ImageCtx> +class AbstractImageWriteRequest : public ImageRequest<ImageCtxT> { +public: + inline void flag_synchronous() { + m_synchronous = true; + } + +protected: + using typename ImageRequest<ImageCtxT>::ObjectRequests; + using typename ImageRequest<ImageCtxT>::Extents; + + typedef std::vector<ObjectExtent> ObjectExtents; + + AbstractImageWriteRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + Extents &&image_extents, const char *trace_name, + const ZTracer::Trace &parent_trace) + : ImageRequest<ImageCtxT>(image_ctx, aio_comp, std::move(image_extents), + trace_name, parent_trace), + m_synchronous(false) { + } + + void send_request() override; + + virtual int prune_object_extents(ObjectExtents* object_extents) const { + return 0; + } + + void send_object_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, uint64_t journal_tid); + virtual ObjectDispatchSpec *create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) = 0; + + virtual uint64_t append_journal_event(bool synchronous) = 0; + virtual void update_stats(size_t length) = 0; + +private: + bool m_synchronous; +}; + +template <typename ImageCtxT = ImageCtx> +class ImageWriteRequest : public AbstractImageWriteRequest<ImageCtxT> { +public: + using typename ImageRequest<ImageCtxT>::Extents; + + ImageWriteRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + Extents &&image_extents, bufferlist &&bl, int op_flags, + const ZTracer::Trace &parent_trace) + : AbstractImageWriteRequest<ImageCtxT>( + image_ctx, aio_comp, std::move(image_extents), "write", parent_trace), + m_bl(std::move(bl)), m_op_flags(op_flags) { + } + +protected: + using typename ImageRequest<ImageCtxT>::ObjectRequests; + using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents; + + aio_type_t get_aio_type() const override { + return AIO_TYPE_WRITE; + } + const char *get_request_type() const override { + return "aio_write"; + } + + void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl); + + void send_image_cache_request() override; + + + ObjectDispatchSpec *create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) override; + + uint64_t append_journal_event(bool synchronous) override; + void update_stats(size_t length) override; + +private: + bufferlist m_bl; + int m_op_flags; +}; + +template <typename ImageCtxT = ImageCtx> +class ImageDiscardRequest : public AbstractImageWriteRequest<ImageCtxT> { +public: + ImageDiscardRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + Extents&& image_extents, + uint32_t discard_granularity_bytes, + const ZTracer::Trace &parent_trace) + : AbstractImageWriteRequest<ImageCtxT>( + image_ctx, aio_comp, std::move(image_extents), "discard", parent_trace), + m_discard_granularity_bytes(discard_granularity_bytes) { + } + +protected: + using typename ImageRequest<ImageCtxT>::ObjectRequests; + using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents; + + aio_type_t get_aio_type() const override { + return AIO_TYPE_DISCARD; + } + const char *get_request_type() const override { + return "aio_discard"; + } + + void send_image_cache_request() override; + + ObjectDispatchSpec *create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) override; + + uint64_t append_journal_event(bool synchronous) override; + void update_stats(size_t length) override; + + int prune_object_extents(ObjectExtents* object_extents) const override; + +private: + uint32_t m_discard_granularity_bytes; +}; + +template <typename ImageCtxT = ImageCtx> +class ImageFlushRequest : public ImageRequest<ImageCtxT> { +public: + ImageFlushRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + FlushSource flush_source, + const ZTracer::Trace &parent_trace) + : ImageRequest<ImageCtxT>(image_ctx, aio_comp, {}, "flush", parent_trace), + m_flush_source(flush_source) { + } + +protected: + using typename ImageRequest<ImageCtxT>::ObjectRequests; + + int clip_request() override { + return 0; + } + void update_timestamp() override { + } + void send_request() override; + void send_image_cache_request() override; + + aio_type_t get_aio_type() const override { + return AIO_TYPE_FLUSH; + } + const char *get_request_type() const override { + return "aio_flush"; + } + +private: + FlushSource m_flush_source; + +}; + +template <typename ImageCtxT = ImageCtx> +class ImageWriteSameRequest : public AbstractImageWriteRequest<ImageCtxT> { +public: + ImageWriteSameRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + Extents&& image_extents, bufferlist &&bl, + int op_flags, const ZTracer::Trace &parent_trace) + : AbstractImageWriteRequest<ImageCtxT>( + image_ctx, aio_comp, std::move(image_extents), "writesame", + parent_trace), + m_data_bl(std::move(bl)), m_op_flags(op_flags) { + } + +protected: + using typename ImageRequest<ImageCtxT>::ObjectRequests; + using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents; + + aio_type_t get_aio_type() const override { + return AIO_TYPE_WRITESAME; + } + const char *get_request_type() const override { + return "aio_writesame"; + } + + void send_image_cache_request() override; + + ObjectDispatchSpec *create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) override; + + uint64_t append_journal_event(bool synchronous) override; + void update_stats(size_t length) override; +private: + bufferlist m_data_bl; + int m_op_flags; +}; + +template <typename ImageCtxT = ImageCtx> +class ImageCompareAndWriteRequest : public AbstractImageWriteRequest<ImageCtxT> { +public: + using typename ImageRequest<ImageCtxT>::ObjectRequests; + using typename AbstractImageWriteRequest<ImageCtxT>::ObjectExtents; + + ImageCompareAndWriteRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, + Extents &&image_extents, bufferlist &&cmp_bl, + bufferlist &&bl, uint64_t *mismatch_offset, + int op_flags, const ZTracer::Trace &parent_trace) + : AbstractImageWriteRequest<ImageCtxT>( + image_ctx, aio_comp, std::move(image_extents), "compare_and_write", parent_trace), + m_cmp_bl(std::move(cmp_bl)), m_bl(std::move(bl)), + m_mismatch_offset(mismatch_offset), m_op_flags(op_flags) { + } + +protected: + void send_image_cache_request() override; + + void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl); + + ObjectDispatchSpec *create_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + uint64_t journal_tid, Context *on_finish) override; + + uint64_t append_journal_event(bool synchronous) override; + void update_stats(size_t length) override; + + aio_type_t get_aio_type() const override { + return AIO_TYPE_COMPARE_AND_WRITE; + } + const char *get_request_type() const override { + return "aio_compare_and_write"; + } + + int prune_object_extents(ObjectExtents* object_extents) const override; + +private: + bufferlist m_cmp_bl; + bufferlist m_bl; + uint64_t *m_mismatch_offset; + int m_op_flags; +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::ImageRequest<librbd::ImageCtx>; +extern template class librbd::io::ImageReadRequest<librbd::ImageCtx>; +extern template class librbd::io::AbstractImageWriteRequest<librbd::ImageCtx>; +extern template class librbd::io::ImageWriteRequest<librbd::ImageCtx>; +extern template class librbd::io::ImageDiscardRequest<librbd::ImageCtx>; +extern template class librbd::io::ImageFlushRequest<librbd::ImageCtx>; +extern template class librbd::io::ImageWriteSameRequest<librbd::ImageCtx>; +extern template class librbd::io::ImageCompareAndWriteRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_IO_IMAGE_REQUEST_H diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc new file mode 100644 index 00000000..7bdaf2f2 --- /dev/null +++ b/src/librbd/io/ImageRequestWQ.cc @@ -0,0 +1,1043 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ImageRequestWQ.h" +#include "common/errno.h" +#include "common/zipkin_trace.h" +#include "common/Cond.h" +#include "librbd/ExclusiveLock.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/ImageWatcher.h" +#include "librbd/internal.h" +#include "librbd/Utils.h" +#include "librbd/exclusive_lock/Policy.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ImageRequest.h" +#include "librbd/io/ImageDispatchSpec.h" +#include "common/EventTrace.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +namespace { + +template <typename I> +void flush_image(I& image_ctx, Context* on_finish) { + auto aio_comp = librbd::io::AioCompletion::create_and_start( + on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH); + auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request( + image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {}); + req->send(); + delete req; +} + +} // anonymous namespace + +template <typename I> +struct ImageRequestWQ<I>::C_AcquireLock : public Context { + ImageRequestWQ *work_queue; + ImageDispatchSpec<I> *image_request; + + C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request) + : work_queue(work_queue), image_request(image_request) { + } + + void finish(int r) override { + work_queue->handle_acquire_lock(r, image_request); + } +}; + +template <typename I> +struct ImageRequestWQ<I>::C_BlockedWrites : public Context { + ImageRequestWQ *work_queue; + explicit C_BlockedWrites(ImageRequestWQ *_work_queue) + : work_queue(_work_queue) { + } + + void finish(int r) override { + work_queue->handle_blocked_writes(r); + } +}; + +template <typename I> +struct ImageRequestWQ<I>::C_RefreshFinish : public Context { + ImageRequestWQ *work_queue; + ImageDispatchSpec<I> *image_request; + + C_RefreshFinish(ImageRequestWQ *work_queue, + ImageDispatchSpec<I> *image_request) + : work_queue(work_queue), image_request(image_request) { + } + void finish(int r) override { + work_queue->handle_refreshed(r, image_request); + } +}; + +static std::map<uint64_t, std::string> throttle_flags = { + { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" }, + { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" }, + { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" }, + { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" }, + { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" }, + { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" } +}; + +template <typename I> +ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name, + time_t ti, ThreadPool *tp) + : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp), + m_image_ctx(*image_ctx), + m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "ictx=" << image_ctx << dendl; + + SafeTimer *timer; + Mutex *timer_lock; + ImageCtx::get_timer_instance(cct, &timer, &timer_lock); + + for (auto flag : throttle_flags) { + m_throttles.push_back(make_pair( + flag.first, + new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock))); + } + + this->register_work_queue(); +} + +template <typename I> +ImageRequestWQ<I>::~ImageRequestWQ() { + for (auto t : m_throttles) { + delete t.second; + } +} + +template <typename I> +ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, + ReadResult &&read_result, int op_flags) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_read(c, off, len, std::move(read_result), op_flags, false); + return cond.wait(); +} + +template <typename I> +ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_write(c, off, len, std::move(bl), op_flags, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len, + uint32_t discard_granularity_bytes) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_discard(c, off, len, discard_granularity_bytes, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << ", data_len " << bl.length() << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_writesame(c, off, len, std::move(bl), op_flags, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::write_zeroes(uint64_t off, uint64_t len, + int zero_flags, int op_flags) { + auto cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond ctx; + auto aio_comp = io::AioCompletion::create(&ctx); + aio_write_zeroes(aio_comp, off, len, zero_flags, op_flags, false); + + r = ctx.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len, + bufferlist &&cmp_bl, + bufferlist &&bl, + uint64_t *mismatch_off, + int op_flags){ + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off=" + << off << ", " << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl), + mismatch_off, op_flags, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + + return len; +} + +template <typename I> +int ImageRequestWQ<I>::flush() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_flush(c, false); + + int r = cond.wait(); + if (r < 0) { + return r; + } + + return 0; +} + +template <typename I> +void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len, + ReadResult &&read_result, int op_flags, + bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: read", &m_image_ctx.trace_endpoint); + trace.event("start"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << ", " << "flags=" << op_flags << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + // if journaling is enabled -- we need to replay the journal because + // it might contain an uncommitted write + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || + require_lock_on_read()) { + queue(ImageDispatchSpec<I>::create_read_request( + m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags, + trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}}, + std::move(read_result), op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags, + bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: write", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << ", flags=" << op_flags << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_write_request( + m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}}, + std::move(bl), op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off, + uint64_t len, + uint32_t discard_granularity_bytes, + bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: discard", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", len=" << len + << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_discard_request( + m_image_ctx, c, off, len, discard_granularity_bytes, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}}, + discard_granularity_bytes, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: flush", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { + queue(ImageDispatchSpec<I>::create_flush_request( + m_image_ctx, c, FLUSH_SOURCE_USER, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off, + uint64_t len, bufferlist &&bl, + int op_flags, bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: writesame", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << ", data_len = " << bl.length() << ", " + << "flags=" << op_flags << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_write_same_request( + m_image_ctx, c, off, len, std::move(bl), op_flags, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl), + op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + + +template <typename I> +void ImageRequestWQ<I>::aio_write_zeroes(io::AioCompletion *aio_comp, + uint64_t off, uint64_t len, + int zero_flags, int op_flags, + bool native_async) { + auto cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("io: write_zeroes", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + aio_comp->init_time(util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_DISCARD); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << aio_comp << ", off=" << off << ", " + << "len=" << len << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + aio_comp->set_event_notify(true); + } + + // validate the supported flags + if (zero_flags != 0U) { + aio_comp->fail(-EINVAL); + return; + } + + if (!start_in_flight_io(aio_comp)) { + return; + } + + // enable partial discard (zeroing) of objects + uint32_t discard_granularity_bytes = 0; + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_discard_request( + m_image_ctx, aio_comp, off, len, discard_granularity_bytes, trace)); + } else { + aio_comp->start_op(); + ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, {{off, len}}, + discard_granularity_bytes, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c, + uint64_t off, uint64_t len, + bufferlist &&cmp_bl, + bufferlist &&bl, + uint64_t *mismatch_off, + int op_flags, bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_compare_and_write_request( + m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), + mismatch_off, op_flags, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}}, + std::move(cmp_bl), std::move(bl), + mismatch_off, op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::shut_down(Context *on_shutdown) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + + { + RWLock::WLocker locker(m_lock); + ceph_assert(!m_shutdown); + m_shutdown = true; + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() + << dendl; + if (m_in_flight_ios > 0) { + m_on_shutdown = on_shutdown; + return; + } + } + + // ensure that all in-flight IO is flushed + flush_image(m_image_ctx, on_shutdown); +} + +template <typename I> +int ImageRequestWQ<I>::block_writes() { + C_SaferCond cond_ctx; + block_writes(&cond_ctx); + return cond_ctx.wait(); +} + +template <typename I> +void ImageRequestWQ<I>::block_writes(Context *on_blocked) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + CephContext *cct = m_image_ctx.cct; + + { + RWLock::WLocker locker(m_lock); + ++m_write_blockers; + ldout(cct, 5) << &m_image_ctx << ", " << "num=" + << m_write_blockers << dendl; + if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { + m_write_blocker_contexts.push_back(on_blocked); + return; + } + } + + // ensure that all in-flight IO is flushed + flush_image(m_image_ctx, on_blocked); +} + +template <typename I> +void ImageRequestWQ<I>::unblock_writes() { + CephContext *cct = m_image_ctx.cct; + + bool wake_up = false; + Contexts waiter_contexts; + { + RWLock::WLocker locker(m_lock); + ceph_assert(m_write_blockers > 0); + --m_write_blockers; + + ldout(cct, 5) << &m_image_ctx << ", " << "num=" + << m_write_blockers << dendl; + if (m_write_blockers == 0) { + wake_up = true; + std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); + } + } + + if (wake_up) { + for (auto ctx : waiter_contexts) { + ctx->complete(0); + } + this->signal(); + } +} + +template <typename I> +void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + CephContext *cct = m_image_ctx.cct; + + { + RWLock::WLocker locker(m_lock); + ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers=" + << m_write_blockers << dendl; + if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { + m_unblocked_write_waiter_contexts.push_back(on_unblocked); + return; + } + } + + on_unblocked->complete(0); +} + +template <typename I> +void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + bool wake_up = false; + { + RWLock::WLocker locker(m_lock); + switch (direction) { + case DIRECTION_READ: + wake_up = (enabled != m_require_lock_on_read); + m_require_lock_on_read = enabled; + break; + case DIRECTION_WRITE: + wake_up = (enabled != m_require_lock_on_write); + m_require_lock_on_write = enabled; + break; + case DIRECTION_BOTH: + wake_up = (enabled != m_require_lock_on_read || + enabled != m_require_lock_on_write); + m_require_lock_on_read = enabled; + m_require_lock_on_write = enabled; + break; + } + } + + // wake up the thread pool whenever the state changes so that + // we can re-request the lock if required + if (wake_up) { + this->signal(); + } +} + +template <typename I> +void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){ + for (auto pair : m_throttles) { + pair.second->set_schedule_tick_min(tick); + } +} + +template <typename I> +void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag, + uint64_t limit, + uint64_t burst) { + CephContext *cct = m_image_ctx.cct; + TokenBucketThrottle *throttle = nullptr; + for (auto pair : m_throttles) { + if (flag == pair.first) { + throttle = pair.second; + break; + } + } + ceph_assert(throttle != nullptr); + + int r = throttle->set_limit(limit, burst); + if (r < 0) { + lderr(cct) << throttle->get_name() << ": invalid qos parameter: " + << "burst(" << burst << ") is less than " + << "limit(" << limit << ")" << dendl; + // if apply failed, we should at least make sure the limit works. + throttle->set_limit(limit, 0); + } + + if (limit) + m_qos_enabled_flag |= flag; + else + m_qos_enabled_flag &= ~flag; +} + +template <typename I> +void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, + uint64_t flag) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl; + + std::lock_guard pool_locker{this->get_pool_lock()}; + ceph_assert(m_io_throttled.load() > 0); + item->set_throttled(flag); + if (item->were_all_throttled()) { + this->requeue_back(pool_locker, item); + --m_io_throttled; + this->signal(pool_locker); + } +} + +template <typename I> +bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) { + uint64_t tokens = 0; + uint64_t flag = 0; + bool blocked = false; + TokenBucketThrottle* throttle = nullptr; + + for (auto t : m_throttles) { + flag = t.first; + if (item->was_throttled(flag)) + continue; + + if (!(m_qos_enabled_flag & flag)) { + item->set_throttled(flag); + continue; + } + + throttle = t.second; + if (item->tokens_requested(flag, &tokens) && + throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>, + &ImageRequestWQ<I>::handle_throttle_ready>( + tokens, this, item, flag)) { + blocked = true; + } else { + item->set_throttled(flag); + } + } + return blocked; +} + +template <typename I> +void *ImageRequestWQ<I>::_void_dequeue() { + CephContext *cct = m_image_ctx.cct; + ImageDispatchSpec<I> *peek_item = this->front(); + + // no queued IO requests or all IO is blocked/stalled + if (peek_item == nullptr || m_io_blockers.load() > 0) { + return nullptr; + } + + if (needs_throttle(peek_item)) { + ldout(cct, 15) << "throttling IO " << peek_item << dendl; + + ++m_io_throttled; + // dequeue the throttled item + ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue(); + return nullptr; + } + + bool lock_required; + bool refresh_required = m_image_ctx.state->is_refresh_required(); + { + RWLock::RLocker locker(m_lock); + bool write_op = peek_item->is_write_op(); + lock_required = is_lock_required(write_op); + if (write_op) { + if (!lock_required && m_write_blockers > 0) { + // missing lock is not the write blocker + return nullptr; + } + + if (!lock_required && !refresh_required) { + // completed ops will requeue the IO -- don't count it as in-progress + m_in_flight_writes++; + } + } + } + + auto item = reinterpret_cast<ImageDispatchSpec<I> *>( + ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue()); + ceph_assert(peek_item == item); + + if (lock_required) { + this->get_pool_lock().unlock(); + m_image_ctx.owner_lock.get_read(); + if (m_image_ctx.exclusive_lock != nullptr) { + ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; + if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { + lderr(cct) << "op requires exclusive lock" << dendl; + fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(), + item); + + // wake up the IO since we won't be returning a request to process + this->signal(); + } else { + // stall IO until the acquire completes + ++m_io_blockers; + m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item)); + } + } else { + // raced with the exclusive lock being disabled + lock_required = false; + } + m_image_ctx.owner_lock.put_read(); + this->get_pool_lock().lock(); + + if (lock_required) { + return nullptr; + } + } + + if (refresh_required) { + ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; + + // stall IO until the refresh completes + ++m_io_blockers; + + this->get_pool_lock().unlock(); + m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); + this->get_pool_lock().lock(); + return nullptr; + } + + item->start_op(); + return item; +} + +template <typename I> +void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "req=" << req << dendl; + + req->send(); + + finish_queued_io(req); + if (req->is_write_op()) { + finish_in_flight_write(); + } + delete req; + + finish_in_flight_io(); +} + +template <typename I> +void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) { + RWLock::RLocker locker(m_lock); + if (req->is_write_op()) { + ceph_assert(m_queued_writes > 0); + m_queued_writes--; + } else { + ceph_assert(m_queued_reads > 0); + m_queued_reads--; + } +} + +template <typename I> +void ImageRequestWQ<I>::finish_in_flight_write() { + bool writes_blocked = false; + { + RWLock::RLocker locker(m_lock); + ceph_assert(m_in_flight_writes > 0); + if (--m_in_flight_writes == 0 && + !m_write_blocker_contexts.empty()) { + writes_blocked = true; + } + } + + if (writes_blocked) { + flush_image(m_image_ctx, new C_BlockedWrites(this)); + } +} + +template <typename I> +int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) { + RWLock::RLocker locker(m_lock); + + if (m_shutdown) { + CephContext *cct = m_image_ctx.cct; + lderr(cct) << "IO received on closed image" << dendl; + + c->get(); + c->fail(-ESHUTDOWN); + return false; + } + + if (!m_image_ctx.data_ctx.is_valid()) { + CephContext *cct = m_image_ctx.cct; + lderr(cct) << "missing data pool" << dendl; + + c->get(); + c->fail(-ENODEV); + return false; + } + + m_in_flight_ios++; + return true; +} + +template <typename I> +void ImageRequestWQ<I>::finish_in_flight_io() { + Context *on_shutdown; + { + RWLock::RLocker locker(m_lock); + if (--m_in_flight_ios > 0 || !m_shutdown) { + return; + } + on_shutdown = m_on_shutdown; + } + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "completing shut down" << dendl; + + ceph_assert(on_shutdown != nullptr); + flush_image(m_image_ctx, on_shutdown); +} + +template <typename I> +void ImageRequestWQ<I>::fail_in_flight_io( + int r, ImageDispatchSpec<I> *req) { + this->process_finish(); + req->fail(r); + finish_queued_io(req); + delete req; + finish_in_flight_io(); +} + +template <typename I> +bool ImageRequestWQ<I>::is_lock_required(bool write_op) const { + ceph_assert(m_lock.is_locked()); + return ((write_op && m_require_lock_on_write) || + (!write_op && m_require_lock_on_read)); +} + +template <typename I> +void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "req=" << req << dendl; + + if (req->is_write_op()) { + m_queued_writes++; + } else { + m_queued_reads++; + } + + ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req); +} + +template <typename I> +void ImageRequestWQ<I>::handle_acquire_lock( + int r, ImageDispatchSpec<I> *req) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; + + if (r < 0) { + fail_in_flight_io(r, req); + } else { + // since IO was stalled for acquire -- original IO order is preserved + // if we requeue this op for work queue processing + this->requeue_front(req); + } + + ceph_assert(m_io_blockers.load() > 0); + --m_io_blockers; + this->signal(); +} + +template <typename I> +void ImageRequestWQ<I>::handle_refreshed( + int r, ImageDispatchSpec<I> *req) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " + << "req=" << req << dendl; + if (r < 0) { + fail_in_flight_io(r, req); + } else { + // since IO was stalled for refresh -- original IO order is preserved + // if we requeue this op for work queue processing + this->requeue_front(req); + } + + ceph_assert(m_io_blockers.load() > 0); + --m_io_blockers; + this->signal(); +} + +template <typename I> +void ImageRequestWQ<I>::handle_blocked_writes(int r) { + Contexts contexts; + { + RWLock::WLocker locker(m_lock); + contexts.swap(m_write_blocker_contexts); + } + + for (auto ctx : contexts) { + ctx->complete(0); + } +} + +template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; + +} // namespace io +} // namespace librbd diff --git a/src/librbd/io/ImageRequestWQ.h b/src/librbd/io/ImageRequestWQ.h new file mode 100644 index 00000000..23dcd48d --- /dev/null +++ b/src/librbd/io/ImageRequestWQ.h @@ -0,0 +1,154 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H +#define CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H + +#include "include/Context.h" +#include "common/RWLock.h" +#include "common/Throttle.h" +#include "common/WorkQueue.h" +#include "librbd/io/Types.h" + +#include <list> +#include <atomic> + +namespace librbd { + +class ImageCtx; + +namespace io { + +class AioCompletion; +template <typename> class ImageDispatchSpec; +class ReadResult; + +template <typename ImageCtxT = librbd::ImageCtx> +class ImageRequestWQ + : public ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> > { +public: + ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti, + ThreadPool *tp); + ~ImageRequestWQ(); + + ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result, + int op_flags); + ssize_t write(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags); + ssize_t discard(uint64_t off, uint64_t len, + uint32_t discard_granularity_bytes); + ssize_t writesame(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags); + ssize_t write_zeroes(uint64_t off, uint64_t len, int zero_flags, + int op_flags); + ssize_t compare_and_write(uint64_t off, uint64_t len, + bufferlist &&cmp_bl, bufferlist &&bl, + uint64_t *mismatch_off, int op_flags); + int flush(); + + void aio_read(AioCompletion *c, uint64_t off, uint64_t len, + ReadResult &&read_result, int op_flags, bool native_async=true); + void aio_write(AioCompletion *c, uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags, bool native_async=true); + void aio_discard(AioCompletion *c, uint64_t off, uint64_t len, + uint32_t discard_granularity_bytes, bool native_async=true); + void aio_flush(AioCompletion *c, bool native_async=true); + void aio_writesame(AioCompletion *c, uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags, bool native_async=true); + void aio_write_zeroes(AioCompletion *c, uint64_t off, uint64_t len, + int zero_flags, int op_flags, bool native_async); + void aio_compare_and_write(AioCompletion *c, uint64_t off, + uint64_t len, bufferlist &&cmp_bl, + bufferlist &&bl, uint64_t *mismatch_off, + int op_flags, bool native_async=true); + + using ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> >::drain; + using ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> >::empty; + + void shut_down(Context *on_shutdown); + + inline bool writes_blocked() const { + RWLock::RLocker locker(m_lock); + return (m_write_blockers > 0); + } + + int block_writes(); + void block_writes(Context *on_blocked); + void unblock_writes(); + + void wait_on_writes_unblocked(Context *on_unblocked); + + void set_require_lock(Direction direction, bool enabled); + + void apply_qos_schedule_tick_min(uint64_t tick); + + void apply_qos_limit(const uint64_t flag, uint64_t limit, uint64_t burst); +protected: + void *_void_dequeue() override; + void process(ImageDispatchSpec<ImageCtxT> *req) override; + bool _empty() override { + return (ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT>>::_empty() && + m_io_throttled.load() == 0); + } + + +private: + typedef std::list<Context *> Contexts; + + struct C_AcquireLock; + struct C_BlockedWrites; + struct C_RefreshFinish; + + ImageCtxT &m_image_ctx; + mutable RWLock m_lock; + Contexts m_write_blocker_contexts; + uint32_t m_write_blockers = 0; + Contexts m_unblocked_write_waiter_contexts; + bool m_require_lock_on_read = false; + bool m_require_lock_on_write = false; + std::atomic<unsigned> m_queued_reads { 0 }; + std::atomic<unsigned> m_queued_writes { 0 }; + std::atomic<unsigned> m_in_flight_ios { 0 }; + std::atomic<unsigned> m_in_flight_writes { 0 }; + std::atomic<unsigned> m_io_blockers { 0 }; + std::atomic<unsigned> m_io_throttled { 0 }; + + std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles; + uint64_t m_qos_enabled_flag = 0; + + bool m_shutdown = false; + Context *m_on_shutdown = nullptr; + + bool is_lock_required(bool write_op) const; + + inline bool require_lock_on_read() const { + RWLock::RLocker locker(m_lock); + return m_require_lock_on_read; + } + inline bool writes_empty() const { + RWLock::RLocker locker(m_lock); + return (m_queued_writes == 0); + } + + bool needs_throttle(ImageDispatchSpec<ImageCtxT> *item); + + void finish_queued_io(ImageDispatchSpec<ImageCtxT> *req); + void finish_in_flight_write(); + + int start_in_flight_io(AioCompletion *c); + void finish_in_flight_io(); + void fail_in_flight_io(int r, ImageDispatchSpec<ImageCtxT> *req); + + void queue(ImageDispatchSpec<ImageCtxT> *req); + + void handle_acquire_lock(int r, ImageDispatchSpec<ImageCtxT> *req); + void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req); + void handle_blocked_writes(int r); + + void handle_throttle_ready(int r, ImageDispatchSpec<ImageCtxT> *item, uint64_t flag); +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H diff --git a/src/librbd/io/ObjectDispatch.cc b/src/librbd/io/ObjectDispatch.cc new file mode 100644 index 00000000..85c8b034 --- /dev/null +++ b/src/librbd/io/ObjectDispatch.cc @@ -0,0 +1,135 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ObjectDispatch.h" +#include "common/dout.h" +#include "common/WorkQueue.h" +#include "librbd/ImageCtx.h" +#include "librbd/io/ObjectRequest.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::ObjectDispatch: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +template <typename I> +ObjectDispatch<I>::ObjectDispatch(I* image_ctx) + : m_image_ctx(image_ctx) { +} + +template <typename I> +void ObjectDispatch<I>::shut_down(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 5) << dendl; + + m_image_ctx->op_work_queue->queue(on_finish, 0); +} + +template <typename I> +bool ObjectDispatch<I>::read( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, librados::snap_t snap_id, int op_flags, + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, + ExtentMap* extent_map, int* object_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl; + + *dispatch_result = DISPATCH_RESULT_COMPLETE; + auto req = new ObjectReadRequest<I>(m_image_ctx, oid, object_no, object_off, + object_len, snap_id, op_flags, + parent_trace, read_data, extent_map, + on_dispatched); + req->send(); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::discard( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, const ::SnapContext &snapc, int discard_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl; + + *dispatch_result = DISPATCH_RESULT_COMPLETE; + auto req = new ObjectDiscardRequest<I>(m_image_ctx, oid, object_no, + object_off, object_len, snapc, + discard_flags, parent_trace, + on_dispatched); + req->send(); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::write( + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << oid << " " << object_off << "~" << data.length() << dendl; + + *dispatch_result = DISPATCH_RESULT_COMPLETE; + auto req = new ObjectWriteRequest<I>(m_image_ctx, oid, object_no, object_off, + std::move(data), snapc, op_flags, + parent_trace, on_dispatched); + req->send(); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::write_same( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, Extents&& buffer_extents, ceph::bufferlist&& data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl; + + *dispatch_result = DISPATCH_RESULT_COMPLETE; + auto req = new ObjectWriteSameRequest<I>(m_image_ctx, oid, object_no, + object_off, object_len, + std::move(data), snapc, op_flags, + parent_trace, on_dispatched); + req->send(); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::compare_and_write( + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, + int* object_dispatch_flags, uint64_t* journal_tid, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << oid << " " << object_off << "~" << write_data.length() + << dendl; + + *dispatch_result = DISPATCH_RESULT_COMPLETE; + auto req = new ObjectCompareAndWriteRequest<I>(m_image_ctx, oid, object_no, + object_off, + std::move(cmp_data), + std::move(write_data), snapc, + mismatch_offset, op_flags, + parent_trace, on_dispatched); + req->send(); + return true; +} + +} // namespace io +} // namespace librbd + +template class librbd::io::ObjectDispatch<librbd::ImageCtx>; diff --git a/src/librbd/io/ObjectDispatch.h b/src/librbd/io/ObjectDispatch.h new file mode 100644 index 00000000..32e8272d --- /dev/null +++ b/src/librbd/io/ObjectDispatch.h @@ -0,0 +1,104 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCH_H +#define CEPH_LIBRBD_IO_OBJECT_DISPATCH_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "common/snap_types.h" +#include "common/zipkin_trace.h" +#include "librbd/io/Types.h" +#include "librbd/io/ObjectDispatchInterface.h" + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace io { + +struct AioCompletion; + +template <typename ImageCtxT = librbd::ImageCtx> +class ObjectDispatch : public ObjectDispatchInterface { +public: + ObjectDispatch(ImageCtxT* image_ctx); + + ObjectDispatchLayer get_object_dispatch_layer() const override { + return OBJECT_DISPATCH_LAYER_CORE; + } + + void shut_down(Context* on_finish) override; + + bool read( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, librados::snap_t snap_id, int op_flags, + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, + ExtentMap* extent_map, int* object_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + + bool discard( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, const ::SnapContext &snapc, int discard_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) override; + + bool write( + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) override; + + bool write_same( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, Extents&& buffer_extents, ceph::bufferlist&& data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) override; + + bool compare_and_write( + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, + int* object_dispatch_flags, uint64_t* journal_tid, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + + bool flush( + FlushSource flush_source, const ZTracer::Trace &parent_trace, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override { + return false; + } + + bool invalidate_cache(Context* on_finish) override { + return false; + } + bool reset_existence_cache(Context* on_finish) override { + return false; + } + + void extent_overwritten( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + uint64_t journal_tid, uint64_t new_journal_tid) override { + } + +private: + ImageCtxT* m_image_ctx; + +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::ObjectDispatch<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCH_H diff --git a/src/librbd/io/ObjectDispatchInterface.h b/src/librbd/io/ObjectDispatchInterface.h new file mode 100644 index 00000000..fddf0db1 --- /dev/null +++ b/src/librbd/io/ObjectDispatchInterface.h @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCH_INTERFACE_H +#define CEPH_LIBRBD_IO_OBJECT_DISPATCH_INTERFACE_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "common/snap_types.h" +#include "common/zipkin_trace.h" +#include "librbd/io/Types.h" + +struct Context; +struct RWLock; + +namespace librbd { +namespace io { + +struct AioCompletion; + +struct ObjectDispatchInterface { + virtual ~ObjectDispatchInterface() { + } + + virtual ObjectDispatchLayer get_object_dispatch_layer() const = 0; + + virtual void shut_down(Context* on_finish) = 0; + + virtual bool read( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, librados::snap_t snap_id, int op_flags, + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, + ExtentMap* extent_map, int* object_dispatch_flags, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) = 0; + + virtual bool discard( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, const ::SnapContext &snapc, int discard_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context**on_finish, Context* on_dispatched) = 0; + + virtual bool write( + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context**on_finish, Context* on_dispatched) = 0; + + virtual bool write_same( + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, Extents&& buffer_extents, ceph::bufferlist&& data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, DispatchResult* dispatch_result, + Context**on_finish, Context* on_dispatched) = 0; + + virtual bool compare_and_write( + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, + int* object_dispatch_flags, uint64_t* journal_tid, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) = 0; + + virtual bool flush( + FlushSource flush_source, const ZTracer::Trace &parent_trace, + DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) = 0; + + virtual bool invalidate_cache(Context* on_finish) = 0; + virtual bool reset_existence_cache(Context* on_finish) = 0; + + virtual void extent_overwritten( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + uint64_t journal_tid, uint64_t new_journal_tid) = 0; + +}; + +} // namespace io +} // namespace librbd + +#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCH_INTERFACE_H diff --git a/src/librbd/io/ObjectDispatchSpec.cc b/src/librbd/io/ObjectDispatchSpec.cc new file mode 100644 index 00000000..2b6dccc5 --- /dev/null +++ b/src/librbd/io/ObjectDispatchSpec.cc @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ObjectDispatchSpec.h" +#include "include/Context.h" +#include "librbd/io/ObjectDispatcher.h" +#include <boost/variant.hpp> + +namespace librbd { +namespace io { + +void ObjectDispatchSpec::C_Dispatcher::complete(int r) { + if (r < 0) { + finish(r); + return; + } + + switch (object_dispatch_spec->dispatch_result) { + case DISPATCH_RESULT_CONTINUE: + object_dispatch_spec->send(); + break; + case DISPATCH_RESULT_COMPLETE: + finish(r); + break; + case DISPATCH_RESULT_INVALID: + ceph_abort(); + break; + } +} + +void ObjectDispatchSpec::C_Dispatcher::finish(int r) { + on_finish->complete(r); + delete object_dispatch_spec; +} + +void ObjectDispatchSpec::send() { + object_dispatcher->send(this); +} + +void ObjectDispatchSpec::fail(int r) { + ceph_assert(r < 0); + dispatcher_ctx.complete(r); +} + +} // namespace io +} // namespace librbd diff --git a/src/librbd/io/ObjectDispatchSpec.h b/src/librbd/io/ObjectDispatchSpec.h new file mode 100644 index 00000000..a26d89fe --- /dev/null +++ b/src/librbd/io/ObjectDispatchSpec.h @@ -0,0 +1,266 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCH_SPEC_H +#define CEPH_LIBRBD_IO_OBJECT_DISPATCH_SPEC_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/snap_types.h" +#include "common/zipkin_trace.h" +#include "librbd/io/Types.h" +#include <boost/variant/variant.hpp> + +namespace librbd { +namespace io { + +struct ObjectDispatcherInterface; + +struct ObjectDispatchSpec { +private: + // helper to avoid extra heap allocation per object IO + struct C_Dispatcher : public Context { + ObjectDispatchSpec* object_dispatch_spec; + Context* on_finish; + + C_Dispatcher(ObjectDispatchSpec* object_dispatch_spec, Context* on_finish) + : object_dispatch_spec(object_dispatch_spec), on_finish(on_finish) { + } + + void complete(int r) override; + void finish(int r) override; + }; + +public: + struct RequestBase { + std::string oid; + uint64_t object_no; + uint64_t object_off; + + RequestBase(const std::string& oid, uint64_t object_no, uint64_t object_off) + : oid(oid), object_no(object_no), object_off(object_off) { + } + }; + + struct ReadRequest : public RequestBase { + uint64_t object_len; + librados::snap_t snap_id; + ceph::bufferlist* read_data; + ExtentMap* extent_map; + + ReadRequest(const std::string& oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, librados::snap_t snap_id, + ceph::bufferlist* read_data, ExtentMap* extent_map) + : RequestBase(oid, object_no, object_off), + object_len(object_len), snap_id(snap_id), read_data(read_data), + extent_map(extent_map) { + } + }; + + struct WriteRequestBase : public RequestBase { + ::SnapContext snapc; + uint64_t journal_tid; + + WriteRequestBase(const std::string& oid, uint64_t object_no, + uint64_t object_off, const ::SnapContext& snapc, + uint64_t journal_tid) + : RequestBase(oid, object_no, object_off), snapc(snapc), + journal_tid(journal_tid) { + } + }; + + struct DiscardRequest : public WriteRequestBase { + uint64_t object_len; + int discard_flags; + + DiscardRequest(const std::string& oid, uint64_t object_no, + uint64_t object_off, uint64_t object_len, + int discard_flags, const ::SnapContext& snapc, + uint64_t journal_tid) + : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid), + object_len(object_len), discard_flags(discard_flags) { + } + }; + + struct WriteRequest : public WriteRequestBase { + ceph::bufferlist data; + + WriteRequest(const std::string& oid, uint64_t object_no, + uint64_t object_off, ceph::bufferlist&& data, + const ::SnapContext& snapc, uint64_t journal_tid) + : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid), + data(std::move(data)) { + } + }; + + struct WriteSameRequest : public WriteRequestBase { + uint64_t object_len; + Extents buffer_extents; + ceph::bufferlist data; + + WriteSameRequest(const std::string& oid, uint64_t object_no, + uint64_t object_off, uint64_t object_len, + Extents&& buffer_extents, ceph::bufferlist&& data, + const ::SnapContext& snapc, uint64_t journal_tid) + : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid), + object_len(object_len), buffer_extents(std::move(buffer_extents)), + data(std::move(data)) { + } + }; + + struct CompareAndWriteRequest : public WriteRequestBase { + ceph::bufferlist cmp_data; + ceph::bufferlist data; + uint64_t* mismatch_offset; + + CompareAndWriteRequest(const std::string& oid, uint64_t object_no, + uint64_t object_off, ceph::bufferlist&& cmp_data, + ceph::bufferlist&& data, uint64_t* mismatch_offset, + const ::SnapContext& snapc, uint64_t journal_tid) + : WriteRequestBase(oid, object_no, object_off, snapc, journal_tid), + cmp_data(std::move(cmp_data)), data(std::move(data)), + mismatch_offset(mismatch_offset) { + } + }; + + struct FlushRequest { + FlushSource flush_source; + + FlushRequest(FlushSource flush_source) : flush_source(flush_source) { + } + }; + + typedef boost::variant<ReadRequest, + DiscardRequest, + WriteRequest, + WriteSameRequest, + CompareAndWriteRequest, + FlushRequest> Request; + + C_Dispatcher dispatcher_ctx; + + ObjectDispatcherInterface* object_dispatcher; + ObjectDispatchLayer object_dispatch_layer; + int object_dispatch_flags = 0; + DispatchResult dispatch_result = DISPATCH_RESULT_INVALID; + + Request request; + int op_flags; + ZTracer::Trace parent_trace; + + template <typename ImageCtxT> + static ObjectDispatchSpec* create_read( + ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer, + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, librados::snap_t snap_id, int op_flags, + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, + ExtentMap* extent_map, Context* on_finish) { + return new ObjectDispatchSpec(image_ctx->io_object_dispatcher, + object_dispatch_layer, + ReadRequest{oid, object_no, object_off, + object_len, snap_id, read_data, + extent_map}, + op_flags, parent_trace, on_finish); + } + + template <typename ImageCtxT> + static ObjectDispatchSpec* create_discard( + ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer, + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, const ::SnapContext &snapc, int discard_flags, + uint64_t journal_tid, const ZTracer::Trace &parent_trace, + Context *on_finish) { + return new ObjectDispatchSpec(image_ctx->io_object_dispatcher, + object_dispatch_layer, + DiscardRequest{oid, object_no, object_off, + object_len, discard_flags, + snapc, journal_tid}, + 0, parent_trace, on_finish); + } + + template <typename ImageCtxT> + static ObjectDispatchSpec* create_write( + ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer, + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, + uint64_t journal_tid, const ZTracer::Trace &parent_trace, + Context *on_finish) { + return new ObjectDispatchSpec(image_ctx->io_object_dispatcher, + object_dispatch_layer, + WriteRequest{oid, object_no, object_off, + std::move(data), snapc, + journal_tid}, + op_flags, parent_trace, on_finish); + } + + template <typename ImageCtxT> + static ObjectDispatchSpec* create_write_same( + ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer, + const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t object_len, Extents&& buffer_extents, + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, + uint64_t journal_tid, const ZTracer::Trace &parent_trace, + Context *on_finish) { + return new ObjectDispatchSpec(image_ctx->io_object_dispatcher, + object_dispatch_layer, + WriteSameRequest{oid, object_no, object_off, + object_len, + std::move(buffer_extents), + std::move(data), snapc, + journal_tid}, + op_flags, parent_trace, on_finish); + } + + template <typename ImageCtxT> + static ObjectDispatchSpec* create_compare_and_write( + ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer, + const std::string &oid, uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, + const ::SnapContext &snapc, uint64_t *mismatch_offset, int op_flags, + uint64_t journal_tid, const ZTracer::Trace &parent_trace, + Context *on_finish) { + return new ObjectDispatchSpec(image_ctx->io_object_dispatcher, + object_dispatch_layer, + CompareAndWriteRequest{oid, object_no, + object_off, + std::move(cmp_data), + std::move(write_data), + mismatch_offset, + snapc, journal_tid}, + op_flags, parent_trace, on_finish); + } + + template <typename ImageCtxT> + static ObjectDispatchSpec* create_flush( + ImageCtxT* image_ctx, ObjectDispatchLayer object_dispatch_layer, + FlushSource flush_source, const ZTracer::Trace &parent_trace, + Context *on_finish) { + return new ObjectDispatchSpec(image_ctx->io_object_dispatcher, + object_dispatch_layer, + FlushRequest{flush_source}, 0, + parent_trace, on_finish); + } + + void send(); + void fail(int r); + +private: + template <typename> friend class ObjectDispatcher; + + ObjectDispatchSpec(ObjectDispatcherInterface* object_dispatcher, + ObjectDispatchLayer object_dispatch_layer, + Request&& request, int op_flags, + const ZTracer::Trace& parent_trace, Context* on_finish) + : dispatcher_ctx(this, on_finish), object_dispatcher(object_dispatcher), + object_dispatch_layer(object_dispatch_layer), request(std::move(request)), + op_flags(op_flags), parent_trace(parent_trace) { + } + +}; + +} // namespace io +} // namespace librbd + +#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCH_SPEC_H diff --git a/src/librbd/io/ObjectDispatcher.cc b/src/librbd/io/ObjectDispatcher.cc new file mode 100644 index 00000000..befc0750 --- /dev/null +++ b/src/librbd/io/ObjectDispatcher.cc @@ -0,0 +1,354 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ObjectDispatcher.h" +#include "include/Context.h" +#include "common/AsyncOpTracker.h" +#include "common/dout.h" +#include "common/WorkQueue.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/io/ObjectDispatch.h" +#include "librbd/io/ObjectDispatchSpec.h" +#include <boost/variant.hpp> + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::ObjectDispatcher: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +template <typename I> +struct ObjectDispatcher<I>::C_LayerIterator : public Context { + ObjectDispatcher* object_dispatcher; + Context* on_finish; + + ObjectDispatchLayer object_dispatch_layer = OBJECT_DISPATCH_LAYER_NONE; + + C_LayerIterator(ObjectDispatcher* object_dispatcher, + Context* on_finish) + : object_dispatcher(object_dispatcher), on_finish(on_finish) { + } + + void complete(int r) override { + while (true) { + object_dispatcher->m_lock.get_read(); + auto it = object_dispatcher->m_object_dispatches.upper_bound( + object_dispatch_layer); + if (it == object_dispatcher->m_object_dispatches.end()) { + object_dispatcher->m_lock.put_read(); + Context::complete(r); + return; + } + + auto& object_dispatch_meta = it->second; + auto object_dispatch = object_dispatch_meta.object_dispatch; + + // prevent recursive locking back into the dispatcher while handling IO + object_dispatch_meta.async_op_tracker->start_op(); + object_dispatcher->m_lock.put_read(); + + // next loop should start after current layer + object_dispatch_layer = object_dispatch->get_object_dispatch_layer(); + + auto handled = execute(object_dispatch, this); + object_dispatch_meta.async_op_tracker->finish_op(); + + if (handled) { + break; + } + } + } + + void finish(int r) override { + on_finish->complete(0); + } + + virtual bool execute(ObjectDispatchInterface* object_dispatch, + Context* on_finish) = 0; +}; + +template <typename I> +struct ObjectDispatcher<I>::C_InvalidateCache : public C_LayerIterator { + C_InvalidateCache(ObjectDispatcher* object_dispatcher, Context* on_finish) + : C_LayerIterator(object_dispatcher, on_finish) { + } + + bool execute(ObjectDispatchInterface* object_dispatch, + Context* on_finish) override { + return object_dispatch->invalidate_cache(on_finish); + } +}; + +template <typename I> +struct ObjectDispatcher<I>::C_ResetExistenceCache : public C_LayerIterator { + C_ResetExistenceCache(ObjectDispatcher* object_dispatcher, Context* on_finish) + : C_LayerIterator(object_dispatcher, on_finish) { + } + + bool execute(ObjectDispatchInterface* object_dispatch, + Context* on_finish) override { + return object_dispatch->reset_existence_cache(on_finish); + } +}; + +template <typename I> +struct ObjectDispatcher<I>::SendVisitor : public boost::static_visitor<bool> { + ObjectDispatchInterface* object_dispatch; + ObjectDispatchSpec* object_dispatch_spec; + + SendVisitor(ObjectDispatchInterface* object_dispatch, + ObjectDispatchSpec* object_dispatch_spec) + : object_dispatch(object_dispatch), + object_dispatch_spec(object_dispatch_spec) { + } + + bool operator()(ObjectDispatchSpec::ReadRequest& read) const { + return object_dispatch->read( + read.oid, read.object_no, read.object_off, read.object_len, read.snap_id, + object_dispatch_spec->op_flags, object_dispatch_spec->parent_trace, + read.read_data, read.extent_map, + &object_dispatch_spec->object_dispatch_flags, + &object_dispatch_spec->dispatch_result, + &object_dispatch_spec->dispatcher_ctx.on_finish, + &object_dispatch_spec->dispatcher_ctx); + } + + bool operator()(ObjectDispatchSpec::DiscardRequest& discard) const { + return object_dispatch->discard( + discard.oid, discard.object_no, discard.object_off, discard.object_len, + discard.snapc, discard.discard_flags, object_dispatch_spec->parent_trace, + &object_dispatch_spec->object_dispatch_flags, &discard.journal_tid, + &object_dispatch_spec->dispatch_result, + &object_dispatch_spec->dispatcher_ctx.on_finish, + &object_dispatch_spec->dispatcher_ctx); + } + + bool operator()(ObjectDispatchSpec::WriteRequest& write) const { + return object_dispatch->write( + write.oid, write.object_no, write.object_off, std::move(write.data), + write.snapc, object_dispatch_spec->op_flags, + object_dispatch_spec->parent_trace, + &object_dispatch_spec->object_dispatch_flags, &write.journal_tid, + &object_dispatch_spec->dispatch_result, + &object_dispatch_spec->dispatcher_ctx.on_finish, + &object_dispatch_spec->dispatcher_ctx); + } + + bool operator()(ObjectDispatchSpec::WriteSameRequest& write_same) const { + return object_dispatch->write_same( + write_same.oid, write_same.object_no, write_same.object_off, + write_same.object_len, std::move(write_same.buffer_extents), + std::move(write_same.data), write_same.snapc, + object_dispatch_spec->op_flags, object_dispatch_spec->parent_trace, + &object_dispatch_spec->object_dispatch_flags, &write_same.journal_tid, + &object_dispatch_spec->dispatch_result, + &object_dispatch_spec->dispatcher_ctx.on_finish, + &object_dispatch_spec->dispatcher_ctx); + } + + bool operator()( + ObjectDispatchSpec::CompareAndWriteRequest& compare_and_write) const { + return object_dispatch->compare_and_write( + compare_and_write.oid, compare_and_write.object_no, + compare_and_write.object_off, std::move(compare_and_write.cmp_data), + std::move(compare_and_write.data), compare_and_write.snapc, + object_dispatch_spec->op_flags, object_dispatch_spec->parent_trace, + compare_and_write.mismatch_offset, + &object_dispatch_spec->object_dispatch_flags, + &compare_and_write.journal_tid, + &object_dispatch_spec->dispatch_result, + &object_dispatch_spec->dispatcher_ctx.on_finish, + &object_dispatch_spec->dispatcher_ctx); + } + + bool operator()(ObjectDispatchSpec::FlushRequest& flush) const { + return object_dispatch->flush( + flush.flush_source, object_dispatch_spec->parent_trace, + &object_dispatch_spec->dispatch_result, + &object_dispatch_spec->dispatcher_ctx.on_finish, + &object_dispatch_spec->dispatcher_ctx); + } +}; + +template <typename I> +ObjectDispatcher<I>::ObjectDispatcher(I* image_ctx) + : m_image_ctx(image_ctx), + m_lock(librbd::util::unique_lock_name("librbd::io::ObjectDispatcher::lock", + this)) { + // configure the core object dispatch handler on startup + auto object_dispatch = new ObjectDispatch(image_ctx); + m_object_dispatches[object_dispatch->get_object_dispatch_layer()] = + {object_dispatch, new AsyncOpTracker()}; +} + +template <typename I> +ObjectDispatcher<I>::~ObjectDispatcher() { + ceph_assert(m_object_dispatches.empty()); +} + +template <typename I> +void ObjectDispatcher<I>::shut_down(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 5) << dendl; + + std::map<ObjectDispatchLayer, ObjectDispatchMeta> object_dispatches; + { + RWLock::WLocker locker(m_lock); + std::swap(object_dispatches, m_object_dispatches); + } + + for (auto it : object_dispatches) { + shut_down_object_dispatch(it.second, &on_finish); + } + on_finish->complete(0); +} + +template <typename I> +void ObjectDispatcher<I>::register_object_dispatch( + ObjectDispatchInterface* object_dispatch) { + auto cct = m_image_ctx->cct; + auto type = object_dispatch->get_object_dispatch_layer(); + ldout(cct, 5) << "object_dispatch_layer=" << type << dendl; + + RWLock::WLocker locker(m_lock); + ceph_assert(type < OBJECT_DISPATCH_LAYER_LAST); + + auto result = m_object_dispatches.insert( + {type, {object_dispatch, new AsyncOpTracker()}}); + ceph_assert(result.second); +} + +template <typename I> +void ObjectDispatcher<I>::shut_down_object_dispatch( + ObjectDispatchLayer object_dispatch_layer, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 5) << "object_dispatch_layer=" << object_dispatch_layer << dendl; + ceph_assert(object_dispatch_layer + 1 < OBJECT_DISPATCH_LAYER_LAST); + + ObjectDispatchMeta object_dispatch_meta; + { + RWLock::WLocker locker(m_lock); + auto it = m_object_dispatches.find(object_dispatch_layer); + ceph_assert(it != m_object_dispatches.end()); + + object_dispatch_meta = it->second; + m_object_dispatches.erase(it); + } + + shut_down_object_dispatch(object_dispatch_meta, &on_finish); + on_finish->complete(0); +} + +template <typename I> +void ObjectDispatcher<I>::shut_down_object_dispatch( + ObjectDispatchMeta& object_dispatch_meta, Context** on_finish) { + auto object_dispatch = object_dispatch_meta.object_dispatch; + auto async_op_tracker = object_dispatch_meta.async_op_tracker; + + Context* ctx = *on_finish; + ctx = new FunctionContext( + [object_dispatch, async_op_tracker, ctx](int r) { + delete object_dispatch; + delete async_op_tracker; + + ctx->complete(r); + }); + ctx = new FunctionContext([object_dispatch, ctx](int r) { + object_dispatch->shut_down(ctx); + }); + *on_finish = new FunctionContext([async_op_tracker, ctx](int r) { + async_op_tracker->wait_for_ops(ctx); + }); +} + +template <typename I> +void ObjectDispatcher<I>::invalidate_cache(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 5) << dendl; + + on_finish = util::create_async_context_callback(*m_image_ctx, on_finish); + auto ctx = new C_InvalidateCache(this, on_finish); + ctx->complete(0); +} + +template <typename I> +void ObjectDispatcher<I>::reset_existence_cache(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 5) << dendl; + + on_finish = util::create_async_context_callback(*m_image_ctx, on_finish); + auto ctx = new C_ResetExistenceCache(this, on_finish); + ctx->complete(0); +} + +template <typename I> +void ObjectDispatcher<I>::extent_overwritten( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + uint64_t journal_tid, uint64_t new_journal_tid) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << object_no << " " << object_off << "~" << object_len + << dendl; + + for (auto it : m_object_dispatches) { + auto& object_dispatch_meta = it.second; + auto object_dispatch = object_dispatch_meta.object_dispatch; + object_dispatch->extent_overwritten(object_no, object_off, object_len, + journal_tid, new_journal_tid); + } +} + +template <typename I> +void ObjectDispatcher<I>::send(ObjectDispatchSpec* object_dispatch_spec) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "object_dispatch_spec=" << object_dispatch_spec << dendl; + + auto object_dispatch_layer = object_dispatch_spec->object_dispatch_layer; + ceph_assert(object_dispatch_layer + 1 < OBJECT_DISPATCH_LAYER_LAST); + + // apply the IO request to all layers -- this method will be re-invoked + // by the dispatch layer if continuing / restarting the IO + while (true) { + m_lock.get_read(); + object_dispatch_layer = object_dispatch_spec->object_dispatch_layer; + auto it = m_object_dispatches.upper_bound(object_dispatch_layer); + if (it == m_object_dispatches.end()) { + // the request is complete if handled by all layers + object_dispatch_spec->dispatch_result = DISPATCH_RESULT_COMPLETE; + m_lock.put_read(); + break; + } + + auto& object_dispatch_meta = it->second; + auto object_dispatch = object_dispatch_meta.object_dispatch; + object_dispatch_spec->dispatch_result = DISPATCH_RESULT_INVALID; + + // prevent recursive locking back into the dispatcher while handling IO + object_dispatch_meta.async_op_tracker->start_op(); + m_lock.put_read(); + + // advance to next layer in case we skip or continue + object_dispatch_spec->object_dispatch_layer = + object_dispatch->get_object_dispatch_layer(); + + bool handled = boost::apply_visitor( + SendVisitor{object_dispatch, object_dispatch_spec}, + object_dispatch_spec->request); + object_dispatch_meta.async_op_tracker->finish_op(); + + // handled ops will resume when the dispatch ctx is invoked + if (handled) { + return; + } + } + + // skipped through to the last layer + object_dispatch_spec->dispatcher_ctx.complete(0); +} + +} // namespace io +} // namespace librbd + +template class librbd::io::ObjectDispatcher<librbd::ImageCtx>; diff --git a/src/librbd/io/ObjectDispatcher.h b/src/librbd/io/ObjectDispatcher.h new file mode 100644 index 00000000..0370d268 --- /dev/null +++ b/src/librbd/io/ObjectDispatcher.h @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCHER_H +#define CEPH_LIBRBD_IO_OBJECT_DISPATCHER_H + +#include "include/int_types.h" +#include "common/RWLock.h" +#include "librbd/io/Types.h" +#include <map> + +struct AsyncOpTracker; +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace io { + +struct ObjectDispatchInterface; +struct ObjectDispatchSpec; + +struct ObjectDispatcherInterface { +public: + virtual ~ObjectDispatcherInterface() { + } + +private: + friend class ObjectDispatchSpec; + + virtual void send(ObjectDispatchSpec* object_dispatch_spec) = 0; +}; + +template <typename ImageCtxT = ImageCtx> +class ObjectDispatcher : public ObjectDispatcherInterface { +public: + ObjectDispatcher(ImageCtxT* image_ctx); + ~ObjectDispatcher(); + + void shut_down(Context* on_finish); + + void register_object_dispatch(ObjectDispatchInterface* object_dispatch); + void shut_down_object_dispatch(ObjectDispatchLayer object_dispatch_layer, + Context* on_finish); + + void invalidate_cache(Context* on_finish); + void reset_existence_cache(Context* on_finish); + + void extent_overwritten( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + uint64_t journal_tid, uint64_t new_journal_tid); + +private: + struct ObjectDispatchMeta { + ObjectDispatchInterface* object_dispatch = nullptr; + AsyncOpTracker* async_op_tracker = nullptr; + + ObjectDispatchMeta() { + } + ObjectDispatchMeta(ObjectDispatchInterface* object_dispatch, + AsyncOpTracker* async_op_tracker) + : object_dispatch(object_dispatch), async_op_tracker(async_op_tracker) { + } + }; + + struct C_LayerIterator; + struct C_InvalidateCache; + struct C_ResetExistenceCache; + struct SendVisitor; + + ImageCtxT* m_image_ctx; + + RWLock m_lock; + std::map<ObjectDispatchLayer, ObjectDispatchMeta> m_object_dispatches; + + void send(ObjectDispatchSpec* object_dispatch_spec); + + void shut_down_object_dispatch(ObjectDispatchMeta& object_dispatch_meta, + Context** on_finish); + +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::ObjectDispatcher<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCHER_H diff --git a/src/librbd/io/ObjectRequest.cc b/src/librbd/io/ObjectRequest.cc new file mode 100644 index 00000000..60f53df1 --- /dev/null +++ b/src/librbd/io/ObjectRequest.cc @@ -0,0 +1,729 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ObjectRequest.h" +#include "common/ceph_context.h" +#include "common/dout.h" +#include "common/errno.h" +#include "common/Mutex.h" +#include "common/RWLock.h" +#include "common/WorkQueue.h" +#include "include/Context.h" +#include "include/err.h" +#include "osd/osd_types.h" + +#include "librbd/ExclusiveLock.h" +#include "librbd/ImageCtx.h" +#include "librbd/ObjectMap.h" +#include "librbd/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/CopyupRequest.h" +#include "librbd/io/ImageRequest.h" +#include "librbd/io/ReadResult.h" + +#include <boost/bind.hpp> +#include <boost/optional.hpp> + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::ObjectRequest: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +namespace { + +template <typename I> +inline bool is_copy_on_read(I *ictx, librados::snap_t snap_id) { + RWLock::RLocker snap_locker(ictx->snap_lock); + return (ictx->clone_copy_on_read && + !ictx->read_only && snap_id == CEPH_NOSNAP && + (ictx->exclusive_lock == nullptr || + ictx->exclusive_lock->is_lock_owner())); +} + +} // anonymous namespace + +template <typename I> +ObjectRequest<I>* +ObjectRequest<I>::create_write(I *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, + Context *completion) { + return new ObjectWriteRequest<I>(ictx, oid, object_no, object_off, + std::move(data), snapc, op_flags, + parent_trace, completion); +} + +template <typename I> +ObjectRequest<I>* +ObjectRequest<I>::create_discard(I *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + uint64_t object_len, + const ::SnapContext &snapc, + int discard_flags, + const ZTracer::Trace &parent_trace, + Context *completion) { + return new ObjectDiscardRequest<I>(ictx, oid, object_no, object_off, + object_len, snapc, discard_flags, + parent_trace, completion); +} + +template <typename I> +ObjectRequest<I>* +ObjectRequest<I>::create_write_same(I *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + uint64_t object_len, + ceph::bufferlist&& data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, + Context *completion) { + return new ObjectWriteSameRequest<I>(ictx, oid, object_no, object_off, + object_len, std::move(data), snapc, + op_flags, parent_trace, completion); +} + +template <typename I> +ObjectRequest<I>* +ObjectRequest<I>::create_compare_and_write(I *ictx, const std::string &oid, + uint64_t object_no, + uint64_t object_off, + ceph::bufferlist&& cmp_data, + ceph::bufferlist&& write_data, + const ::SnapContext &snapc, + uint64_t *mismatch_offset, + int op_flags, + const ZTracer::Trace &parent_trace, + Context *completion) { + return new ObjectCompareAndWriteRequest<I>(ictx, oid, object_no, object_off, + std::move(cmp_data), + std::move(write_data), snapc, + mismatch_offset, op_flags, + parent_trace, completion); +} + +template <typename I> +ObjectRequest<I>::ObjectRequest(I *ictx, const std::string &oid, + uint64_t objectno, uint64_t off, + uint64_t len, librados::snap_t snap_id, + const char *trace_name, + const ZTracer::Trace &trace, + Context *completion) + : m_ictx(ictx), m_oid(oid), m_object_no(objectno), m_object_off(off), + m_object_len(len), m_snap_id(snap_id), m_completion(completion), + m_trace(util::create_trace(*ictx, "", trace)) { + ceph_assert(m_ictx->data_ctx.is_valid()); + if (m_trace.valid()) { + m_trace.copy_name(trace_name + std::string(" ") + oid); + m_trace.event("start"); + } +} + +template <typename I> +void ObjectRequest<I>::add_write_hint(I& image_ctx, + librados::ObjectWriteOperation *wr) { + if (image_ctx.enable_alloc_hint) { + wr->set_alloc_hint2(image_ctx.get_object_size(), + image_ctx.get_object_size(), + image_ctx.alloc_hint_flags); + } else if (image_ctx.alloc_hint_flags != 0U) { + wr->set_alloc_hint2(0, 0, image_ctx.alloc_hint_flags); + } +} + +template <typename I> +bool ObjectRequest<I>::compute_parent_extents(Extents *parent_extents, + bool read_request) { + ceph_assert(m_ictx->snap_lock.is_locked()); + ceph_assert(m_ictx->parent_lock.is_locked()); + + m_has_parent = false; + parent_extents->clear(); + + uint64_t parent_overlap; + int r = m_ictx->get_parent_overlap(m_snap_id, &parent_overlap); + if (r < 0) { + // NOTE: it's possible for a snapshot to be deleted while we are + // still reading from it + lderr(m_ictx->cct) << "failed to retrieve parent overlap: " + << cpp_strerror(r) << dendl; + return false; + } + + if (!read_request && !m_ictx->migration_info.empty()) { + parent_overlap = m_ictx->migration_info.overlap; + } + + if (parent_overlap == 0) { + return false; + } + + Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, m_object_no, 0, + m_ictx->layout.object_size, *parent_extents); + uint64_t object_overlap = m_ictx->prune_parent_extents(*parent_extents, + parent_overlap); + if (object_overlap > 0) { + ldout(m_ictx->cct, 20) << "overlap " << parent_overlap << " " + << "extents " << *parent_extents << dendl; + m_has_parent = !parent_extents->empty(); + return true; + } + return false; +} + +template <typename I> +void ObjectRequest<I>::async_finish(int r) { + ldout(m_ictx->cct, 20) << "r=" << r << dendl; + m_ictx->op_work_queue->queue(util::create_context_callback< + ObjectRequest<I>, &ObjectRequest<I>::finish>(this), r); +} + +template <typename I> +void ObjectRequest<I>::finish(int r) { + ldout(m_ictx->cct, 20) << "r=" << r << dendl; + m_completion->complete(r); + delete this; +} + +/** read **/ + +template <typename I> +ObjectReadRequest<I>::ObjectReadRequest(I *ictx, const std::string &oid, + uint64_t objectno, uint64_t offset, + uint64_t len, librados::snap_t snap_id, + int op_flags, + const ZTracer::Trace &parent_trace, + bufferlist* read_data, + ExtentMap* extent_map, + Context *completion) + : ObjectRequest<I>(ictx, oid, objectno, offset, len, snap_id, "read", + parent_trace, completion), + m_op_flags(op_flags), m_read_data(read_data), m_extent_map(extent_map) { +} + +template <typename I> +void ObjectReadRequest<I>::send() { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << dendl; + + read_object(); +} + +template <typename I> +void ObjectReadRequest<I>::read_object() { + I *image_ctx = this->m_ictx; + { + RWLock::RLocker snap_locker(image_ctx->snap_lock); + if (image_ctx->object_map != nullptr && + !image_ctx->object_map->object_may_exist(this->m_object_no)) { + image_ctx->op_work_queue->queue(new FunctionContext([this](int r) { + read_parent(); + }), 0); + return; + } + } + + ldout(image_ctx->cct, 20) << dendl; + + librados::ObjectReadOperation op; + if (this->m_object_len >= image_ctx->sparse_read_threshold_bytes) { + op.sparse_read(this->m_object_off, this->m_object_len, m_extent_map, + m_read_data, nullptr); + } else { + op.read(this->m_object_off, this->m_object_len, m_read_data, nullptr); + } + op.set_op_flags2(m_op_flags); + + librados::AioCompletion *rados_completion = util::create_rados_callback< + ObjectReadRequest<I>, &ObjectReadRequest<I>::handle_read_object>(this); + int flags = image_ctx->get_read_flags(this->m_snap_id); + int r = image_ctx->data_ctx.aio_operate( + this->m_oid, rados_completion, &op, flags, nullptr, + (this->m_trace.valid() ? this->m_trace.get_info() : nullptr)); + ceph_assert(r == 0); + + rados_completion->release(); +} + +template <typename I> +void ObjectReadRequest<I>::handle_read_object(int r) { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << "r=" << r << dendl; + + if (r == -ENOENT) { + read_parent(); + return; + } else if (r < 0) { + lderr(image_ctx->cct) << "failed to read from object: " + << cpp_strerror(r) << dendl; + this->finish(r); + return; + } + + this->finish(0); +} + +template <typename I> +void ObjectReadRequest<I>::read_parent() { + I *image_ctx = this->m_ictx; + + RWLock::RLocker snap_locker(image_ctx->snap_lock); + RWLock::RLocker parent_locker(image_ctx->parent_lock); + + // calculate reverse mapping onto the image + Extents parent_extents; + Striper::extent_to_file(image_ctx->cct, &image_ctx->layout, + this->m_object_no, this->m_object_off, + this->m_object_len, parent_extents); + + uint64_t parent_overlap = 0; + uint64_t object_overlap = 0; + int r = image_ctx->get_parent_overlap(this->m_snap_id, &parent_overlap); + if (r == 0) { + object_overlap = image_ctx->prune_parent_extents(parent_extents, + parent_overlap); + } + + if (object_overlap == 0) { + parent_locker.unlock(); + snap_locker.unlock(); + + this->finish(-ENOENT); + return; + } + + ldout(image_ctx->cct, 20) << dendl; + + auto parent_completion = AioCompletion::create_and_start< + ObjectReadRequest<I>, &ObjectReadRequest<I>::handle_read_parent>( + this, util::get_image_ctx(image_ctx->parent), AIO_TYPE_READ); + ImageRequest<I>::aio_read(image_ctx->parent, parent_completion, + std::move(parent_extents), ReadResult{m_read_data}, + 0, this->m_trace); +} + +template <typename I> +void ObjectReadRequest<I>::handle_read_parent(int r) { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << "r=" << r << dendl; + + if (r == -ENOENT) { + this->finish(r); + return; + } else if (r < 0) { + lderr(image_ctx->cct) << "failed to read parent extents: " + << cpp_strerror(r) << dendl; + this->finish(r); + return; + } + + copyup(); +} + +template <typename I> +void ObjectReadRequest<I>::copyup() { + I *image_ctx = this->m_ictx; + if (!is_copy_on_read(image_ctx, this->m_snap_id)) { + this->finish(0); + return; + } + + image_ctx->owner_lock.get_read(); + image_ctx->snap_lock.get_read(); + image_ctx->parent_lock.get_read(); + Extents parent_extents; + if (!this->compute_parent_extents(&parent_extents, true) || + (image_ctx->exclusive_lock != nullptr && + !image_ctx->exclusive_lock->is_lock_owner())) { + image_ctx->parent_lock.put_read(); + image_ctx->snap_lock.put_read(); + image_ctx->owner_lock.put_read(); + this->finish(0); + return; + } + + ldout(image_ctx->cct, 20) << dendl; + + image_ctx->copyup_list_lock.Lock(); + auto it = image_ctx->copyup_list.find(this->m_object_no); + if (it == image_ctx->copyup_list.end()) { + // create and kick off a CopyupRequest + auto new_req = CopyupRequest<I>::create( + image_ctx, this->m_oid, this->m_object_no, std::move(parent_extents), + this->m_trace); + + image_ctx->copyup_list[this->m_object_no] = new_req; + image_ctx->copyup_list_lock.Unlock(); + image_ctx->parent_lock.put_read(); + image_ctx->snap_lock.put_read(); + new_req->send(); + } else { + image_ctx->copyup_list_lock.Unlock(); + image_ctx->parent_lock.put_read(); + image_ctx->snap_lock.put_read(); + } + + image_ctx->owner_lock.put_read(); + this->finish(0); +} + +/** write **/ + +template <typename I> +AbstractObjectWriteRequest<I>::AbstractObjectWriteRequest( + I *ictx, const std::string &oid, uint64_t object_no, uint64_t object_off, + uint64_t len, const ::SnapContext &snapc, const char *trace_name, + const ZTracer::Trace &parent_trace, Context *completion) + : ObjectRequest<I>(ictx, oid, object_no, object_off, len, CEPH_NOSNAP, + trace_name, parent_trace, completion), + m_snap_seq(snapc.seq.val) +{ + m_snaps.insert(m_snaps.end(), snapc.snaps.begin(), snapc.snaps.end()); + + if (this->m_object_off == 0 && + this->m_object_len == ictx->get_object_size()) { + m_full_object = true; + } + + compute_parent_info(); + + ictx->snap_lock.get_read(); + if (!ictx->migration_info.empty()) { + m_guarding_migration_write = true; + } + ictx->snap_lock.put_read(); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::compute_parent_info() { + I *image_ctx = this->m_ictx; + RWLock::RLocker snap_locker(image_ctx->snap_lock); + RWLock::RLocker parent_locker(image_ctx->parent_lock); + + this->compute_parent_extents(&m_parent_extents, false); + + if (!this->has_parent() || + (m_full_object && m_snaps.empty() && !is_post_copyup_write_required())) { + m_copyup_enabled = false; + } +} + +template <typename I> +void AbstractObjectWriteRequest<I>::add_write_hint( + librados::ObjectWriteOperation *wr) { + I *image_ctx = this->m_ictx; + RWLock::RLocker snap_locker(image_ctx->snap_lock); + if (image_ctx->object_map == nullptr || !this->m_object_may_exist) { + ObjectRequest<I>::add_write_hint(*image_ctx, wr); + } +} + +template <typename I> +void AbstractObjectWriteRequest<I>::send() { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << this->get_op_type() << " " << this->m_oid << " " + << this->m_object_off << "~" << this->m_object_len + << dendl; + { + RWLock::RLocker snap_lock(image_ctx->snap_lock); + if (image_ctx->object_map == nullptr) { + m_object_may_exist = true; + } else { + // should have been flushed prior to releasing lock + ceph_assert(image_ctx->exclusive_lock->is_lock_owner()); + m_object_may_exist = image_ctx->object_map->object_may_exist( + this->m_object_no); + } + } + + if (!m_object_may_exist && is_no_op_for_nonexistent_object()) { + ldout(image_ctx->cct, 20) << "skipping no-op on nonexistent object" + << dendl; + this->async_finish(0); + return; + } + + pre_write_object_map_update(); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::pre_write_object_map_update() { + I *image_ctx = this->m_ictx; + + image_ctx->snap_lock.get_read(); + if (image_ctx->object_map == nullptr || !is_object_map_update_enabled()) { + image_ctx->snap_lock.put_read(); + write_object(); + return; + } + + if (!m_object_may_exist && m_copyup_enabled) { + // optimization: copyup required + image_ctx->snap_lock.put_read(); + copyup(); + return; + } + + uint8_t new_state = this->get_pre_write_object_map_state(); + ldout(image_ctx->cct, 20) << this->m_oid << " " << this->m_object_off + << "~" << this->m_object_len << dendl; + + image_ctx->object_map_lock.get_write(); + if (image_ctx->object_map->template aio_update< + AbstractObjectWriteRequest<I>, + &AbstractObjectWriteRequest<I>::handle_pre_write_object_map_update>( + CEPH_NOSNAP, this->m_object_no, new_state, {}, this->m_trace, false, + this)) { + image_ctx->object_map_lock.put_write(); + image_ctx->snap_lock.put_read(); + return; + } + + image_ctx->object_map_lock.put_write(); + image_ctx->snap_lock.put_read(); + write_object(); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::handle_pre_write_object_map_update(int r) { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << "r=" << r << dendl; + if (r < 0) { + lderr(image_ctx->cct) << "failed to update object map: " + << cpp_strerror(r) << dendl; + this->finish(r); + return; + } + + write_object(); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::write_object() { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << dendl; + + librados::ObjectWriteOperation write; + if (m_copyup_enabled) { + ldout(image_ctx->cct, 20) << "guarding write" << dendl; + if (m_guarding_migration_write) { + cls_client::assert_snapc_seq( + &write, m_snap_seq, cls::rbd::ASSERT_SNAPC_SEQ_LE_SNAPSET_SEQ); + } else { + write.assert_exists(); + } + } + + add_write_hint(&write); + add_write_ops(&write); + ceph_assert(write.size() != 0); + + librados::AioCompletion *rados_completion = util::create_rados_callback< + AbstractObjectWriteRequest<I>, + &AbstractObjectWriteRequest<I>::handle_write_object>(this); + int r = image_ctx->data_ctx.aio_operate( + this->m_oid, rados_completion, &write, m_snap_seq, m_snaps, + (this->m_trace.valid() ? this->m_trace.get_info() : nullptr)); + ceph_assert(r == 0); + rados_completion->release(); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::handle_write_object(int r) { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << "r=" << r << dendl; + + r = filter_write_result(r); + if (r == -ENOENT) { + if (m_copyup_enabled) { + copyup(); + return; + } + } else if (r == -ERANGE && m_guarding_migration_write) { + image_ctx->snap_lock.get_read(); + m_guarding_migration_write = !image_ctx->migration_info.empty(); + image_ctx->snap_lock.put_read(); + + if (m_guarding_migration_write) { + copyup(); + } else { + ldout(image_ctx->cct, 10) << "migration parent gone, restart io" << dendl; + compute_parent_info(); + write_object(); + } + return; + } else if (r == -EILSEQ) { + ldout(image_ctx->cct, 10) << "failed to write object" << dendl; + this->finish(r); + return; + } else if (r < 0) { + lderr(image_ctx->cct) << "failed to write object: " << cpp_strerror(r) + << dendl; + this->finish(r); + return; + } + + post_write_object_map_update(); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::copyup() { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << dendl; + + ceph_assert(!m_copyup_in_progress); + m_copyup_in_progress = true; + + image_ctx->copyup_list_lock.Lock(); + auto it = image_ctx->copyup_list.find(this->m_object_no); + if (it == image_ctx->copyup_list.end()) { + auto new_req = CopyupRequest<I>::create( + image_ctx, this->m_oid, this->m_object_no, + std::move(this->m_parent_extents), this->m_trace); + this->m_parent_extents.clear(); + + // make sure to wait on this CopyupRequest + new_req->append_request(this); + image_ctx->copyup_list[this->m_object_no] = new_req; + + image_ctx->copyup_list_lock.Unlock(); + new_req->send(); + } else { + it->second->append_request(this); + image_ctx->copyup_list_lock.Unlock(); + } +} + +template <typename I> +void AbstractObjectWriteRequest<I>::handle_copyup(int r) { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << "r=" << r << dendl; + + ceph_assert(m_copyup_in_progress); + m_copyup_in_progress = false; + + if (r < 0 && r != -ERESTART) { + lderr(image_ctx->cct) << "failed to copyup object: " << cpp_strerror(r) + << dendl; + this->finish(r); + return; + } + + if (r == -ERESTART || is_post_copyup_write_required()) { + write_object(); + return; + } + + post_write_object_map_update(); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::post_write_object_map_update() { + I *image_ctx = this->m_ictx; + + image_ctx->snap_lock.get_read(); + if (image_ctx->object_map == nullptr || !is_object_map_update_enabled() || + !is_non_existent_post_write_object_map_state()) { + image_ctx->snap_lock.put_read(); + this->finish(0); + return; + } + + ldout(image_ctx->cct, 20) << dendl; + + // should have been flushed prior to releasing lock + ceph_assert(image_ctx->exclusive_lock->is_lock_owner()); + image_ctx->object_map_lock.get_write(); + if (image_ctx->object_map->template aio_update< + AbstractObjectWriteRequest<I>, + &AbstractObjectWriteRequest<I>::handle_post_write_object_map_update>( + CEPH_NOSNAP, this->m_object_no, OBJECT_NONEXISTENT, OBJECT_PENDING, + this->m_trace, false, this)) { + image_ctx->object_map_lock.put_write(); + image_ctx->snap_lock.put_read(); + return; + } + + image_ctx->object_map_lock.put_write(); + image_ctx->snap_lock.put_read(); + this->finish(0); +} + +template <typename I> +void AbstractObjectWriteRequest<I>::handle_post_write_object_map_update(int r) { + I *image_ctx = this->m_ictx; + ldout(image_ctx->cct, 20) << "r=" << r << dendl; + if (r < 0) { + lderr(image_ctx->cct) << "failed to update object map: " + << cpp_strerror(r) << dendl; + this->finish(r); + return; + } + + this->finish(0); +} + +template <typename I> +void ObjectWriteRequest<I>::add_write_ops(librados::ObjectWriteOperation *wr) { + if (this->m_full_object) { + wr->write_full(m_write_data); + } else { + wr->write(this->m_object_off, m_write_data); + } + wr->set_op_flags2(m_op_flags); +} + +template <typename I> +void ObjectWriteSameRequest<I>::add_write_ops( + librados::ObjectWriteOperation *wr) { + wr->writesame(this->m_object_off, this->m_object_len, m_write_data); + wr->set_op_flags2(m_op_flags); +} + +template <typename I> +void ObjectCompareAndWriteRequest<I>::add_write_ops( + librados::ObjectWriteOperation *wr) { + wr->cmpext(this->m_object_off, m_cmp_bl, nullptr); + + if (this->m_full_object) { + wr->write_full(m_write_bl); + } else { + wr->write(this->m_object_off, m_write_bl); + } + wr->set_op_flags2(m_op_flags); +} + +template <typename I> +int ObjectCompareAndWriteRequest<I>::filter_write_result(int r) const { + if (r <= -MAX_ERRNO) { + I *image_ctx = this->m_ictx; + Extents image_extents; + + // object extent compare mismatch + uint64_t offset = -MAX_ERRNO - r; + Striper::extent_to_file(image_ctx->cct, &image_ctx->layout, + this->m_object_no, offset, this->m_object_len, + image_extents); + ceph_assert(image_extents.size() == 1); + + if (m_mismatch_offset) { + *m_mismatch_offset = image_extents[0].first; + } + r = -EILSEQ; + } + return r; +} + +} // namespace io +} // namespace librbd + +template class librbd::io::ObjectRequest<librbd::ImageCtx>; +template class librbd::io::ObjectReadRequest<librbd::ImageCtx>; +template class librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>; +template class librbd::io::ObjectWriteRequest<librbd::ImageCtx>; +template class librbd::io::ObjectDiscardRequest<librbd::ImageCtx>; +template class librbd::io::ObjectWriteSameRequest<librbd::ImageCtx>; +template class librbd::io::ObjectCompareAndWriteRequest<librbd::ImageCtx>; diff --git a/src/librbd/io/ObjectRequest.h b/src/librbd/io/ObjectRequest.h new file mode 100644 index 00000000..9452ec43 --- /dev/null +++ b/src/librbd/io/ObjectRequest.h @@ -0,0 +1,478 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_OBJECT_REQUEST_H +#define CEPH_LIBRBD_IO_OBJECT_REQUEST_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "common/snap_types.h" +#include "common/zipkin_trace.h" +#include "librbd/ObjectMap.h" +#include "librbd/io/Types.h" +#include <map> + +class Context; +class ObjectExtent; + +namespace librbd { + +struct ImageCtx; + +namespace io { + +struct AioCompletion; +template <typename> class CopyupRequest; + +/** + * This class represents an I/O operation to a single RBD data object. + * Its subclasses encapsulate logic for dealing with special cases + * for I/O due to layering. + */ +template <typename ImageCtxT = ImageCtx> +class ObjectRequest { +public: + static ObjectRequest* create_write( + ImageCtxT *ictx, const std::string &oid, uint64_t object_no, + uint64_t object_off, ceph::bufferlist&& data, const ::SnapContext &snapc, + int op_flags, const ZTracer::Trace &parent_trace, Context *completion); + static ObjectRequest* create_discard( + ImageCtxT *ictx, const std::string &oid, uint64_t object_no, + uint64_t object_off, uint64_t object_len, const ::SnapContext &snapc, + int discard_flags, const ZTracer::Trace &parent_trace, + Context *completion); + static ObjectRequest* create_write_same( + ImageCtxT *ictx, const std::string &oid, uint64_t object_no, + uint64_t object_off, uint64_t object_len, ceph::bufferlist&& data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, Context *completion); + static ObjectRequest* create_compare_and_write( + ImageCtxT *ictx, const std::string &oid, uint64_t object_no, + uint64_t object_off, ceph::bufferlist&& cmp_data, + ceph::bufferlist&& write_data, const ::SnapContext &snapc, + uint64_t *mismatch_offset, int op_flags, + const ZTracer::Trace &parent_trace, Context *completion); + + ObjectRequest(ImageCtxT *ictx, const std::string &oid, + uint64_t objectno, uint64_t off, uint64_t len, + librados::snap_t snap_id, const char *trace_name, + const ZTracer::Trace &parent_trace, Context *completion); + virtual ~ObjectRequest() { + m_trace.event("finish"); + } + + static void add_write_hint(ImageCtxT& image_ctx, + librados::ObjectWriteOperation *wr); + + virtual void send() = 0; + + bool has_parent() const { + return m_has_parent; + } + + virtual const char *get_op_type() const = 0; + +protected: + bool compute_parent_extents(Extents *parent_extents, bool read_request); + + ImageCtxT *m_ictx; + std::string m_oid; + uint64_t m_object_no, m_object_off, m_object_len; + librados::snap_t m_snap_id; + Context *m_completion; + ZTracer::Trace m_trace; + + void async_finish(int r); + void finish(int r); + +private: + bool m_has_parent = false; +}; + +template <typename ImageCtxT = ImageCtx> +class ObjectReadRequest : public ObjectRequest<ImageCtxT> { +public: + typedef std::map<uint64_t, uint64_t> ExtentMap; + + static ObjectReadRequest* create(ImageCtxT *ictx, const std::string &oid, + uint64_t objectno, uint64_t offset, + uint64_t len, librados::snap_t snap_id, + int op_flags, + const ZTracer::Trace &parent_trace, + ceph::bufferlist* read_data, + ExtentMap* extent_map, Context *completion) { + return new ObjectReadRequest(ictx, oid, objectno, offset, len, + snap_id, op_flags, parent_trace, read_data, + extent_map, completion); + } + + ObjectReadRequest(ImageCtxT *ictx, const std::string &oid, + uint64_t objectno, uint64_t offset, uint64_t len, + librados::snap_t snap_id, int op_flags, + const ZTracer::Trace &parent_trace, + ceph::bufferlist* read_data, ExtentMap* extent_map, + Context *completion); + + void send() override; + + const char *get_op_type() const override { + return "read"; + } + +private: + /** + * @verbatim + * + * <start> + * | + * | + * v + * READ_OBJECT + * | + * v (skip if not needed) + * READ_PARENT + * | + * v (skip if not needed) + * COPYUP + * | + * v + * <finish> + * + * @endverbatim + */ + + int m_op_flags; + + ceph::bufferlist* m_read_data; + ExtentMap* m_extent_map; + + void read_object(); + void handle_read_object(int r); + + void read_parent(); + void handle_read_parent(int r); + + void copyup(); +}; + +template <typename ImageCtxT = ImageCtx> +class AbstractObjectWriteRequest : public ObjectRequest<ImageCtxT> { +public: + AbstractObjectWriteRequest(ImageCtxT *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + uint64_t len, const ::SnapContext &snapc, + const char *trace_name, + const ZTracer::Trace &parent_trace, + Context *completion); + + virtual bool is_empty_write_op() const { + return false; + } + + virtual uint8_t get_pre_write_object_map_state() const { + return OBJECT_EXISTS; + } + + virtual void add_copyup_ops(librados::ObjectWriteOperation *wr) { + add_write_ops(wr); + } + + void handle_copyup(int r); + + void send() override; + +protected: + bool m_full_object = false; + bool m_copyup_enabled = true; + + virtual bool is_no_op_for_nonexistent_object() const { + return false; + } + virtual bool is_object_map_update_enabled() const { + return true; + } + virtual bool is_post_copyup_write_required() const { + return false; + } + virtual bool is_non_existent_post_write_object_map_state() const { + return false; + } + + virtual void add_write_hint(librados::ObjectWriteOperation *wr); + virtual void add_write_ops(librados::ObjectWriteOperation *wr) = 0; + + virtual int filter_write_result(int r) const { + return r; + } + +private: + /** + * @verbatim + * + * <start> + * | + * v (no-op write request) + * DETECT_NO_OP . . . . . . . . . . . . . . . . . . . + * | . + * v (skip if not required/disabled) . + * PRE_UPDATE_OBJECT_MAP . + * | . . + * | . (child dne) . + * | . . . . . . . . . . + * | . . + * | (post-copyup write) . . + * | . . . . . . . . . . . . . . + * | . . . . + * v v . v . + * WRITE . . . . . . . . > COPYUP (if required) . + * | | . + * |/----------------------/ . + * | . + * v (skip if not required/disabled) . + * POST_UPDATE_OBJECT_MAP . + * | . + * v . + * <finish> < . . . . . . . . . . . . . . . . . . . . + * + * @endverbatim + */ + + uint64_t m_snap_seq; + std::vector<librados::snap_t> m_snaps; + + Extents m_parent_extents; + bool m_object_may_exist = false; + bool m_copyup_in_progress = false; + bool m_guarding_migration_write = false; + + void compute_parent_info(); + + void pre_write_object_map_update(); + void handle_pre_write_object_map_update(int r); + + void write_object(); + void handle_write_object(int r); + + void copyup(); + + void post_write_object_map_update(); + void handle_post_write_object_map_update(int r); + +}; + +template <typename ImageCtxT = ImageCtx> +class ObjectWriteRequest : public AbstractObjectWriteRequest<ImageCtxT> { +public: + ObjectWriteRequest(ImageCtxT *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& data, const ::SnapContext &snapc, + int op_flags, const ZTracer::Trace &parent_trace, + Context *completion) + : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off, + data.length(), snapc, "write", + parent_trace, completion), + m_write_data(std::move(data)), m_op_flags(op_flags) { + } + + bool is_empty_write_op() const override { + return (m_write_data.length() == 0); + } + + const char *get_op_type() const override { + return "write"; + } + +protected: + void add_write_ops(librados::ObjectWriteOperation *wr) override; + +private: + ceph::bufferlist m_write_data; + int m_op_flags; +}; + +template <typename ImageCtxT = ImageCtx> +class ObjectDiscardRequest : public AbstractObjectWriteRequest<ImageCtxT> { +public: + ObjectDiscardRequest(ImageCtxT *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + uint64_t object_len, const ::SnapContext &snapc, + int discard_flags, const ZTracer::Trace &parent_trace, + Context *completion) + : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off, + object_len, snapc, "discard", + parent_trace, completion), + m_discard_flags(discard_flags) { + if (this->m_full_object) { + if ((m_discard_flags & OBJECT_DISCARD_FLAG_DISABLE_CLONE_REMOVE) != 0 && + this->has_parent()) { + if (!this->m_copyup_enabled) { + // need to hide the parent object instead of child object + m_discard_action = DISCARD_ACTION_REMOVE_TRUNCATE; + } else { + m_discard_action = DISCARD_ACTION_TRUNCATE; + } + this->m_object_len = 0; + } else { + m_discard_action = DISCARD_ACTION_REMOVE; + } + } else if (object_off + object_len == ictx->layout.object_size) { + m_discard_action = DISCARD_ACTION_TRUNCATE; + } else { + m_discard_action = DISCARD_ACTION_ZERO; + } + } + + const char* get_op_type() const override { + switch (m_discard_action) { + case DISCARD_ACTION_REMOVE: + return "remove"; + case DISCARD_ACTION_REMOVE_TRUNCATE: + return "remove (create+truncate)"; + case DISCARD_ACTION_TRUNCATE: + return "truncate"; + case DISCARD_ACTION_ZERO: + return "zero"; + } + ceph_abort(); + return nullptr; + } + + uint8_t get_pre_write_object_map_state() const override { + if (m_discard_action == DISCARD_ACTION_REMOVE) { + return OBJECT_PENDING; + } + return OBJECT_EXISTS; + } + +protected: + bool is_no_op_for_nonexistent_object() const override { + return (!this->has_parent()); + } + bool is_object_map_update_enabled() const override { + return ( + (m_discard_flags & OBJECT_DISCARD_FLAG_DISABLE_OBJECT_MAP_UPDATE) == 0); + } + bool is_non_existent_post_write_object_map_state() const override { + return (m_discard_action == DISCARD_ACTION_REMOVE); + } + + void add_write_hint(librados::ObjectWriteOperation *wr) override { + // no hint for discard + } + + void add_write_ops(librados::ObjectWriteOperation *wr) override { + switch (m_discard_action) { + case DISCARD_ACTION_REMOVE: + wr->remove(); + break; + case DISCARD_ACTION_REMOVE_TRUNCATE: + wr->create(false); + // fall through + case DISCARD_ACTION_TRUNCATE: + wr->truncate(this->m_object_off); + break; + case DISCARD_ACTION_ZERO: + wr->zero(this->m_object_off, this->m_object_len); + break; + default: + ceph_abort(); + break; + } + } + +private: + enum DiscardAction { + DISCARD_ACTION_REMOVE, + DISCARD_ACTION_REMOVE_TRUNCATE, + DISCARD_ACTION_TRUNCATE, + DISCARD_ACTION_ZERO + }; + + DiscardAction m_discard_action; + int m_discard_flags; + +}; + +template <typename ImageCtxT = ImageCtx> +class ObjectWriteSameRequest : public AbstractObjectWriteRequest<ImageCtxT> { +public: + ObjectWriteSameRequest(ImageCtxT *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + uint64_t object_len, ceph::bufferlist&& data, + const ::SnapContext &snapc, int op_flags, + const ZTracer::Trace &parent_trace, + Context *completion) + : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off, + object_len, snapc, "writesame", + parent_trace, completion), + m_write_data(std::move(data)), m_op_flags(op_flags) { + } + + const char *get_op_type() const override { + return "writesame"; + } + +protected: + void add_write_ops(librados::ObjectWriteOperation *wr) override; + +private: + ceph::bufferlist m_write_data; + int m_op_flags; +}; + +template <typename ImageCtxT = ImageCtx> +class ObjectCompareAndWriteRequest : public AbstractObjectWriteRequest<ImageCtxT> { +public: + ObjectCompareAndWriteRequest(ImageCtxT *ictx, const std::string &oid, + uint64_t object_no, uint64_t object_off, + ceph::bufferlist&& cmp_bl, + ceph::bufferlist&& write_bl, + const ::SnapContext &snapc, + uint64_t *mismatch_offset, int op_flags, + const ZTracer::Trace &parent_trace, + Context *completion) + : AbstractObjectWriteRequest<ImageCtxT>(ictx, oid, object_no, object_off, + cmp_bl.length(), snapc, + "compare_and_write", parent_trace, + completion), + m_cmp_bl(std::move(cmp_bl)), m_write_bl(std::move(write_bl)), + m_mismatch_offset(mismatch_offset), m_op_flags(op_flags) { + } + + const char *get_op_type() const override { + return "compare_and_write"; + } + + void add_copyup_ops(librados::ObjectWriteOperation *wr) override { + // no-op on copyup + } + +protected: + virtual bool is_post_copyup_write_required() const { + return true; + } + + void add_write_ops(librados::ObjectWriteOperation *wr) override; + + int filter_write_result(int r) const override; + +private: + ceph::bufferlist m_cmp_bl; + ceph::bufferlist m_write_bl; + uint64_t *m_mismatch_offset; + int m_op_flags; +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::ObjectRequest<librbd::ImageCtx>; +extern template class librbd::io::ObjectReadRequest<librbd::ImageCtx>; +extern template class librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>; +extern template class librbd::io::ObjectWriteRequest<librbd::ImageCtx>; +extern template class librbd::io::ObjectDiscardRequest<librbd::ImageCtx>; +extern template class librbd::io::ObjectWriteSameRequest<librbd::ImageCtx>; +extern template class librbd::io::ObjectCompareAndWriteRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_IO_OBJECT_REQUEST_H diff --git a/src/librbd/io/ReadResult.cc b/src/librbd/io/ReadResult.cc new file mode 100644 index 00000000..c24d8b4a --- /dev/null +++ b/src/librbd/io/ReadResult.cc @@ -0,0 +1,170 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ReadResult.h" +#include "include/buffer.h" +#include "common/dout.h" +#include "librbd/io/AioCompletion.h" +#include <boost/variant/apply_visitor.hpp> +#include <boost/variant/static_visitor.hpp> + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::ReadResult: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +struct ReadResult::SetClipLengthVisitor : public boost::static_visitor<void> { + size_t length; + + explicit SetClipLengthVisitor(size_t length) : length(length) { + } + + void operator()(Linear &linear) const { + ceph_assert(length <= linear.buf_len); + linear.buf_len = length; + } + + template <typename T> + void operator()(T &t) const { + } +}; + +struct ReadResult::AssembleResultVisitor : public boost::static_visitor<void> { + CephContext *cct; + Striper::StripedReadResult &destriper; + + AssembleResultVisitor(CephContext *cct, Striper::StripedReadResult &destriper) + : cct(cct), destriper(destriper) { + } + + void operator()(Empty &empty) const { + ldout(cct, 20) << "dropping read result" << dendl; + } + + void operator()(Linear &linear) const { + ldout(cct, 20) << "copying resulting bytes to " + << reinterpret_cast<void*>(linear.buf) << dendl; + destriper.assemble_result(cct, linear.buf, linear.buf_len); + } + + void operator()(Vector &vector) const { + bufferlist bl; + destriper.assemble_result(cct, bl, true); + + ldout(cct, 20) << "copying resulting " << bl.length() << " bytes to iovec " + << reinterpret_cast<const void*>(vector.iov) << dendl; + + bufferlist::iterator it = bl.begin(); + size_t length = bl.length(); + size_t offset = 0; + int idx = 0; + for (; offset < length && idx < vector.iov_count; idx++) { + size_t len = std::min(vector.iov[idx].iov_len, length - offset); + it.copy(len, static_cast<char *>(vector.iov[idx].iov_base)); + offset += len; + } + ceph_assert(offset == bl.length()); + } + + void operator()(Bufferlist &bufferlist) const { + bufferlist.bl->clear(); + destriper.assemble_result(cct, *bufferlist.bl, true); + + ldout(cct, 20) << "moved resulting " << bufferlist.bl->length() << " " + << "bytes to bl " << reinterpret_cast<void*>(bufferlist.bl) + << dendl; + } +}; + +ReadResult::C_ImageReadRequest::C_ImageReadRequest( + AioCompletion *aio_completion, const Extents image_extents) + : aio_completion(aio_completion), image_extents(image_extents) { + aio_completion->add_request(); +} + +void ReadResult::C_ImageReadRequest::finish(int r) { + CephContext *cct = aio_completion->ictx->cct; + ldout(cct, 10) << "C_ImageReadRequest: r=" << r + << dendl; + if (r >= 0) { + size_t length = 0; + for (auto &image_extent : image_extents) { + length += image_extent.second; + } + ceph_assert(length == bl.length()); + + aio_completion->lock.Lock(); + aio_completion->read_result.m_destriper.add_partial_result( + cct, bl, image_extents); + aio_completion->lock.Unlock(); + r = length; + } + + aio_completion->complete_request(r); +} + +ReadResult::C_ObjectReadRequest::C_ObjectReadRequest( + AioCompletion *aio_completion, uint64_t object_off, uint64_t object_len, + Extents&& buffer_extents) + : aio_completion(aio_completion), object_off(object_off), + object_len(object_len), buffer_extents(std::move(buffer_extents)) { + aio_completion->add_request(); +} + +void ReadResult::C_ObjectReadRequest::finish(int r) { + CephContext *cct = aio_completion->ictx->cct; + ldout(cct, 10) << "C_ObjectReadRequest: r=" << r + << dendl; + + if (r == -ENOENT) { + r = 0; + } + if (r >= 0) { + ldout(cct, 10) << " got " << extent_map + << " for " << buffer_extents + << " bl " << bl.length() << dendl; + // handle the case where a sparse-read wasn't issued + if (extent_map.empty()) { + extent_map[object_off] = bl.length(); + } + + aio_completion->lock.Lock(); + aio_completion->read_result.m_destriper.add_partial_sparse_result( + cct, bl, extent_map, object_off, buffer_extents); + aio_completion->lock.Unlock(); + + r = object_len; + } + + aio_completion->complete_request(r); +} + +ReadResult::ReadResult() : m_buffer(Empty()) { +} + +ReadResult::ReadResult(char *buf, size_t buf_len) + : m_buffer(Linear(buf, buf_len)) { +} + +ReadResult::ReadResult(const struct iovec *iov, int iov_count) + : m_buffer(Vector(iov, iov_count)) { +} + +ReadResult::ReadResult(ceph::bufferlist *bl) + : m_buffer(Bufferlist(bl)) { +} + +void ReadResult::set_clip_length(size_t length) { + boost::apply_visitor(SetClipLengthVisitor(length), m_buffer); +} + +void ReadResult::assemble_result(CephContext *cct) { + boost::apply_visitor(AssembleResultVisitor(cct, m_destriper), m_buffer); +} + +} // namespace io +} // namespace librbd + diff --git a/src/librbd/io/ReadResult.h b/src/librbd/io/ReadResult.h new file mode 100644 index 00000000..6fb5e4a4 --- /dev/null +++ b/src/librbd/io/ReadResult.h @@ -0,0 +1,106 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_READ_RESULT_H +#define CEPH_LIBRBD_IO_READ_RESULT_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/Context.h" +#include "librbd/io/Types.h" +#include "osdc/Striper.h" +#include <sys/uio.h> +#include <boost/variant/variant.hpp> + +struct CephContext; + +namespace librbd { + +struct ImageCtx; + +namespace io { + +struct AioCompletion; +template <typename> struct ObjectReadRequest; + +class ReadResult { +public: + struct C_ImageReadRequest : public Context { + AioCompletion *aio_completion; + Extents image_extents; + bufferlist bl; + + C_ImageReadRequest(AioCompletion *aio_completion, + const Extents image_extents); + + void finish(int r) override; + }; + + struct C_ObjectReadRequest : public Context { + AioCompletion *aio_completion; + uint64_t object_off; + uint64_t object_len; + Extents buffer_extents; + + bufferlist bl; + ExtentMap extent_map; + + C_ObjectReadRequest(AioCompletion *aio_completion, uint64_t object_off, + uint64_t object_len, Extents&& buffer_extents); + + void finish(int r) override; + }; + + ReadResult(); + ReadResult(char *buf, size_t buf_len); + ReadResult(const struct iovec *iov, int iov_count); + ReadResult(ceph::bufferlist *bl); + + void set_clip_length(size_t length); + void assemble_result(CephContext *cct); + +private: + struct Empty { + }; + + struct Linear { + char *buf; + size_t buf_len; + + Linear(char *buf, size_t buf_len) : buf(buf), buf_len(buf_len) { + } + }; + + struct Vector { + const struct iovec *iov; + int iov_count; + + Vector(const struct iovec *iov, int iov_count) + : iov(iov), iov_count(iov_count) { + } + }; + + struct Bufferlist { + ceph::bufferlist *bl; + + Bufferlist(ceph::bufferlist *bl) : bl(bl) { + } + }; + + typedef boost::variant<Empty, + Linear, + Vector, + Bufferlist> Buffer; + struct SetClipLengthVisitor; + struct AssembleResultVisitor; + + Buffer m_buffer; + Striper::StripedReadResult m_destriper; + +}; + +} // namespace io +} // namespace librbd + +#endif // CEPH_LIBRBD_IO_READ_RESULT_H + diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h new file mode 100644 index 00000000..6bd42eac --- /dev/null +++ b/src/librbd/io/Types.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_TYPES_H +#define CEPH_LIBRBD_IO_TYPES_H + +#include "include/int_types.h" +#include <map> +#include <vector> + +namespace librbd { +namespace io { + +#define RBD_QOS_IOPS_THROTTLE 1 << 0 +#define RBD_QOS_BPS_THROTTLE 1 << 1 +#define RBD_QOS_READ_IOPS_THROTTLE 1 << 2 +#define RBD_QOS_WRITE_IOPS_THROTTLE 1 << 3 +#define RBD_QOS_READ_BPS_THROTTLE 1 << 4 +#define RBD_QOS_WRITE_BPS_THROTTLE 1 << 5 + +#define RBD_QOS_BPS_MASK (RBD_QOS_BPS_THROTTLE | RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_WRITE_BPS_THROTTLE) +#define RBD_QOS_IOPS_MASK (RBD_QOS_IOPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE) +#define RBD_QOS_READ_MASK (RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE) +#define RBD_QOS_WRITE_MASK (RBD_QOS_WRITE_BPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE) + +#define RBD_QOS_MASK (RBD_QOS_BPS_MASK | RBD_QOS_IOPS_MASK) + +typedef enum { + AIO_TYPE_NONE = 0, + AIO_TYPE_GENERIC, + AIO_TYPE_OPEN, + AIO_TYPE_CLOSE, + AIO_TYPE_READ, + AIO_TYPE_WRITE, + AIO_TYPE_DISCARD, + AIO_TYPE_FLUSH, + AIO_TYPE_WRITESAME, + AIO_TYPE_COMPARE_AND_WRITE, +} aio_type_t; + +enum FlushSource { + FLUSH_SOURCE_USER, + FLUSH_SOURCE_INTERNAL, + FLUSH_SOURCE_SHUTDOWN +}; + +enum Direction { + DIRECTION_READ, + DIRECTION_WRITE, + DIRECTION_BOTH +}; + +enum DispatchResult { + DISPATCH_RESULT_INVALID, + DISPATCH_RESULT_CONTINUE, + DISPATCH_RESULT_COMPLETE +}; + +enum ObjectDispatchLayer { + OBJECT_DISPATCH_LAYER_NONE = 0, + OBJECT_DISPATCH_LAYER_CACHE, + OBJECT_DISPATCH_LAYER_JOURNAL, + OBJECT_DISPATCH_LAYER_CORE, + OBJECT_DISPATCH_LAYER_LAST +}; + +enum { + OBJECT_DISCARD_FLAG_DISABLE_CLONE_REMOVE = 1UL << 0, + OBJECT_DISCARD_FLAG_DISABLE_OBJECT_MAP_UPDATE = 1UL << 1 +}; + +enum { + OBJECT_DISPATCH_FLAG_FLUSH = 1UL << 0, + OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR = 1UL << 1 +}; + +typedef std::vector<std::pair<uint64_t, uint64_t> > Extents; +typedef std::map<uint64_t, uint64_t> ExtentMap; + +} // namespace io +} // namespace librbd + +#endif // CEPH_LIBRBD_IO_TYPES_H diff --git a/src/librbd/io/Utils.cc b/src/librbd/io/Utils.cc new file mode 100644 index 00000000..1b50561a --- /dev/null +++ b/src/librbd/io/Utils.cc @@ -0,0 +1,57 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/Utils.h" +#include "include/buffer.h" +#include "osd/osd_types.h" + +namespace librbd { +namespace io { +namespace util { + +bool assemble_write_same_extent( + const ObjectExtent &object_extent, const ceph::bufferlist& data, + ceph::bufferlist *ws_data, bool force_write) { + size_t data_len = data.length(); + + if (!force_write) { + bool may_writesame = true; + for (auto& q : object_extent.buffer_extents) { + if (!(q.first % data_len == 0 && q.second % data_len == 0)) { + may_writesame = false; + break; + } + } + + if (may_writesame) { + ws_data->append(data); + return true; + } + } + + for (auto& q : object_extent.buffer_extents) { + bufferlist sub_bl; + uint64_t sub_off = q.first % data_len; + uint64_t sub_len = data_len - sub_off; + uint64_t extent_left = q.second; + while (extent_left >= sub_len) { + sub_bl.substr_of(data, sub_off, sub_len); + ws_data->claim_append(sub_bl); + extent_left -= sub_len; + if (sub_off) { + sub_off = 0; + sub_len = data_len; + } + } + if (extent_left) { + sub_bl.substr_of(data, sub_off, extent_left); + ws_data->claim_append(sub_bl); + } + } + return false; +} + +} // namespace util +} // namespace io +} // namespace librbd + diff --git a/src/librbd/io/Utils.h b/src/librbd/io/Utils.h new file mode 100644 index 00000000..c1f373d4 --- /dev/null +++ b/src/librbd/io/Utils.h @@ -0,0 +1,26 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_UTILS_H +#define CEPH_LIBRBD_IO_UTILS_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include <map> + +class ObjectExtent; + +namespace librbd { +namespace io { +namespace util { + +bool assemble_write_same_extent(const ObjectExtent &object_extent, + const ceph::bufferlist& data, + ceph::bufferlist *ws_data, + bool force_write); + +} // namespace util +} // namespace io +} // namespace librbd + +#endif // CEPH_LIBRBD_IO_UTILS_H |