diff options
Diffstat (limited to '')
-rw-r--r-- | src/librbd/io/ImageRequestWQ.cc | 1043 |
1 files changed, 1043 insertions, 0 deletions
diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc new file mode 100644 index 00000000..7bdaf2f2 --- /dev/null +++ b/src/librbd/io/ImageRequestWQ.cc @@ -0,0 +1,1043 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/ImageRequestWQ.h" +#include "common/errno.h" +#include "common/zipkin_trace.h" +#include "common/Cond.h" +#include "librbd/ExclusiveLock.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/ImageWatcher.h" +#include "librbd/internal.h" +#include "librbd/Utils.h" +#include "librbd/exclusive_lock/Policy.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ImageRequest.h" +#include "librbd/io/ImageDispatchSpec.h" +#include "common/EventTrace.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +namespace { + +template <typename I> +void flush_image(I& image_ctx, Context* on_finish) { + auto aio_comp = librbd::io::AioCompletion::create_and_start( + on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH); + auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request( + image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {}); + req->send(); + delete req; +} + +} // anonymous namespace + +template <typename I> +struct ImageRequestWQ<I>::C_AcquireLock : public Context { + ImageRequestWQ *work_queue; + ImageDispatchSpec<I> *image_request; + + C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request) + : work_queue(work_queue), image_request(image_request) { + } + + void finish(int r) override { + work_queue->handle_acquire_lock(r, image_request); + } +}; + +template <typename I> +struct ImageRequestWQ<I>::C_BlockedWrites : public Context { + ImageRequestWQ *work_queue; + explicit C_BlockedWrites(ImageRequestWQ *_work_queue) + : work_queue(_work_queue) { + } + + void finish(int r) override { + work_queue->handle_blocked_writes(r); + } +}; + +template <typename I> +struct ImageRequestWQ<I>::C_RefreshFinish : public Context { + ImageRequestWQ *work_queue; + ImageDispatchSpec<I> *image_request; + + C_RefreshFinish(ImageRequestWQ *work_queue, + ImageDispatchSpec<I> *image_request) + : work_queue(work_queue), image_request(image_request) { + } + void finish(int r) override { + work_queue->handle_refreshed(r, image_request); + } +}; + +static std::map<uint64_t, std::string> throttle_flags = { + { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" }, + { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" }, + { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" }, + { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" }, + { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" }, + { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" } +}; + +template <typename I> +ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name, + time_t ti, ThreadPool *tp) + : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp), + m_image_ctx(*image_ctx), + m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "ictx=" << image_ctx << dendl; + + SafeTimer *timer; + Mutex *timer_lock; + ImageCtx::get_timer_instance(cct, &timer, &timer_lock); + + for (auto flag : throttle_flags) { + m_throttles.push_back(make_pair( + flag.first, + new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock))); + } + + this->register_work_queue(); +} + +template <typename I> +ImageRequestWQ<I>::~ImageRequestWQ() { + for (auto t : m_throttles) { + delete t.second; + } +} + +template <typename I> +ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, + ReadResult &&read_result, int op_flags) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_read(c, off, len, std::move(read_result), op_flags, false); + return cond.wait(); +} + +template <typename I> +ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_write(c, off, len, std::move(bl), op_flags, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len, + uint32_t discard_granularity_bytes) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_discard(c, off, len, discard_granularity_bytes, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << ", data_len " << bl.length() << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_writesame(c, off, len, std::move(bl), op_flags, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::write_zeroes(uint64_t off, uint64_t len, + int zero_flags, int op_flags) { + auto cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " + << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond ctx; + auto aio_comp = io::AioCompletion::create(&ctx); + aio_write_zeroes(aio_comp, off, len, zero_flags, op_flags, false); + + r = ctx.wait(); + if (r < 0) { + return r; + } + return len; +} + +template <typename I> +ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len, + bufferlist &&cmp_bl, + bufferlist &&bl, + uint64_t *mismatch_off, + int op_flags){ + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off=" + << off << ", " << "len = " << len << dendl; + + m_image_ctx.snap_lock.get_read(); + int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); + m_image_ctx.snap_lock.put_read(); + if (r < 0) { + lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl), + mismatch_off, op_flags, false); + + r = cond.wait(); + if (r < 0) { + return r; + } + + return len; +} + +template <typename I> +int ImageRequestWQ<I>::flush() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; + + C_SaferCond cond; + AioCompletion *c = AioCompletion::create(&cond); + aio_flush(c, false); + + int r = cond.wait(); + if (r < 0) { + return r; + } + + return 0; +} + +template <typename I> +void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len, + ReadResult &&read_result, int op_flags, + bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: read", &m_image_ctx.trace_endpoint); + trace.event("start"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << ", " << "flags=" << op_flags << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + // if journaling is enabled -- we need to replay the journal because + // it might contain an uncommitted write + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || + require_lock_on_read()) { + queue(ImageDispatchSpec<I>::create_read_request( + m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags, + trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}}, + std::move(read_result), op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len, + bufferlist &&bl, int op_flags, + bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: write", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << ", flags=" << op_flags << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_write_request( + m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}}, + std::move(bl), op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off, + uint64_t len, + uint32_t discard_granularity_bytes, + bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: discard", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", len=" << len + << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_discard_request( + m_image_ctx, c, off, len, discard_granularity_bytes, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}}, + discard_granularity_bytes, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: flush", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { + queue(ImageDispatchSpec<I>::create_flush_request( + m_image_ctx, c, FLUSH_SOURCE_USER, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off, + uint64_t len, bufferlist &&bl, + int op_flags, bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: writesame", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << ", data_len = " << bl.length() << ", " + << "flags=" << op_flags << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_write_same_request( + m_image_ctx, c, off, len, std::move(bl), op_flags, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl), + op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + + +template <typename I> +void ImageRequestWQ<I>::aio_write_zeroes(io::AioCompletion *aio_comp, + uint64_t off, uint64_t len, + int zero_flags, int op_flags, + bool native_async) { + auto cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("io: write_zeroes", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + aio_comp->init_time(util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_DISCARD); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << aio_comp << ", off=" << off << ", " + << "len=" << len << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + aio_comp->set_event_notify(true); + } + + // validate the supported flags + if (zero_flags != 0U) { + aio_comp->fail(-EINVAL); + return; + } + + if (!start_in_flight_io(aio_comp)) { + return; + } + + // enable partial discard (zeroing) of objects + uint32_t discard_granularity_bytes = 0; + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_discard_request( + m_image_ctx, aio_comp, off, len, discard_granularity_bytes, trace)); + } else { + aio_comp->start_op(); + ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, {{off, len}}, + discard_granularity_bytes, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c, + uint64_t off, uint64_t len, + bufferlist &&cmp_bl, + bufferlist &&bl, + uint64_t *mismatch_off, + int op_flags, bool native_async) { + CephContext *cct = m_image_ctx.cct; + FUNCTRACE(cct); + ZTracer::Trace trace; + if (m_image_ctx.blkin_trace_all) { + trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint); + trace.event("init"); + } + + c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE); + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "completion=" << c << ", off=" << off << ", " + << "len=" << len << dendl; + + if (native_async && m_image_ctx.event_socket.is_valid()) { + c->set_event_notify(true); + } + + if (!start_in_flight_io(c)) { + return; + } + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_image_ctx.non_blocking_aio || writes_blocked()) { + queue(ImageDispatchSpec<I>::create_compare_and_write_request( + m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), + mismatch_off, op_flags, trace)); + } else { + c->start_op(); + ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}}, + std::move(cmp_bl), std::move(bl), + mismatch_off, op_flags, trace); + finish_in_flight_io(); + } + trace.event("finish"); +} + +template <typename I> +void ImageRequestWQ<I>::shut_down(Context *on_shutdown) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + + { + RWLock::WLocker locker(m_lock); + ceph_assert(!m_shutdown); + m_shutdown = true; + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() + << dendl; + if (m_in_flight_ios > 0) { + m_on_shutdown = on_shutdown; + return; + } + } + + // ensure that all in-flight IO is flushed + flush_image(m_image_ctx, on_shutdown); +} + +template <typename I> +int ImageRequestWQ<I>::block_writes() { + C_SaferCond cond_ctx; + block_writes(&cond_ctx); + return cond_ctx.wait(); +} + +template <typename I> +void ImageRequestWQ<I>::block_writes(Context *on_blocked) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + CephContext *cct = m_image_ctx.cct; + + { + RWLock::WLocker locker(m_lock); + ++m_write_blockers; + ldout(cct, 5) << &m_image_ctx << ", " << "num=" + << m_write_blockers << dendl; + if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { + m_write_blocker_contexts.push_back(on_blocked); + return; + } + } + + // ensure that all in-flight IO is flushed + flush_image(m_image_ctx, on_blocked); +} + +template <typename I> +void ImageRequestWQ<I>::unblock_writes() { + CephContext *cct = m_image_ctx.cct; + + bool wake_up = false; + Contexts waiter_contexts; + { + RWLock::WLocker locker(m_lock); + ceph_assert(m_write_blockers > 0); + --m_write_blockers; + + ldout(cct, 5) << &m_image_ctx << ", " << "num=" + << m_write_blockers << dendl; + if (m_write_blockers == 0) { + wake_up = true; + std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); + } + } + + if (wake_up) { + for (auto ctx : waiter_contexts) { + ctx->complete(0); + } + this->signal(); + } +} + +template <typename I> +void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + CephContext *cct = m_image_ctx.cct; + + { + RWLock::WLocker locker(m_lock); + ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers=" + << m_write_blockers << dendl; + if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { + m_unblocked_write_waiter_contexts.push_back(on_unblocked); + return; + } + } + + on_unblocked->complete(0); +} + +template <typename I> +void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + bool wake_up = false; + { + RWLock::WLocker locker(m_lock); + switch (direction) { + case DIRECTION_READ: + wake_up = (enabled != m_require_lock_on_read); + m_require_lock_on_read = enabled; + break; + case DIRECTION_WRITE: + wake_up = (enabled != m_require_lock_on_write); + m_require_lock_on_write = enabled; + break; + case DIRECTION_BOTH: + wake_up = (enabled != m_require_lock_on_read || + enabled != m_require_lock_on_write); + m_require_lock_on_read = enabled; + m_require_lock_on_write = enabled; + break; + } + } + + // wake up the thread pool whenever the state changes so that + // we can re-request the lock if required + if (wake_up) { + this->signal(); + } +} + +template <typename I> +void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){ + for (auto pair : m_throttles) { + pair.second->set_schedule_tick_min(tick); + } +} + +template <typename I> +void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag, + uint64_t limit, + uint64_t burst) { + CephContext *cct = m_image_ctx.cct; + TokenBucketThrottle *throttle = nullptr; + for (auto pair : m_throttles) { + if (flag == pair.first) { + throttle = pair.second; + break; + } + } + ceph_assert(throttle != nullptr); + + int r = throttle->set_limit(limit, burst); + if (r < 0) { + lderr(cct) << throttle->get_name() << ": invalid qos parameter: " + << "burst(" << burst << ") is less than " + << "limit(" << limit << ")" << dendl; + // if apply failed, we should at least make sure the limit works. + throttle->set_limit(limit, 0); + } + + if (limit) + m_qos_enabled_flag |= flag; + else + m_qos_enabled_flag &= ~flag; +} + +template <typename I> +void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, + uint64_t flag) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl; + + std::lock_guard pool_locker{this->get_pool_lock()}; + ceph_assert(m_io_throttled.load() > 0); + item->set_throttled(flag); + if (item->were_all_throttled()) { + this->requeue_back(pool_locker, item); + --m_io_throttled; + this->signal(pool_locker); + } +} + +template <typename I> +bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) { + uint64_t tokens = 0; + uint64_t flag = 0; + bool blocked = false; + TokenBucketThrottle* throttle = nullptr; + + for (auto t : m_throttles) { + flag = t.first; + if (item->was_throttled(flag)) + continue; + + if (!(m_qos_enabled_flag & flag)) { + item->set_throttled(flag); + continue; + } + + throttle = t.second; + if (item->tokens_requested(flag, &tokens) && + throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>, + &ImageRequestWQ<I>::handle_throttle_ready>( + tokens, this, item, flag)) { + blocked = true; + } else { + item->set_throttled(flag); + } + } + return blocked; +} + +template <typename I> +void *ImageRequestWQ<I>::_void_dequeue() { + CephContext *cct = m_image_ctx.cct; + ImageDispatchSpec<I> *peek_item = this->front(); + + // no queued IO requests or all IO is blocked/stalled + if (peek_item == nullptr || m_io_blockers.load() > 0) { + return nullptr; + } + + if (needs_throttle(peek_item)) { + ldout(cct, 15) << "throttling IO " << peek_item << dendl; + + ++m_io_throttled; + // dequeue the throttled item + ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue(); + return nullptr; + } + + bool lock_required; + bool refresh_required = m_image_ctx.state->is_refresh_required(); + { + RWLock::RLocker locker(m_lock); + bool write_op = peek_item->is_write_op(); + lock_required = is_lock_required(write_op); + if (write_op) { + if (!lock_required && m_write_blockers > 0) { + // missing lock is not the write blocker + return nullptr; + } + + if (!lock_required && !refresh_required) { + // completed ops will requeue the IO -- don't count it as in-progress + m_in_flight_writes++; + } + } + } + + auto item = reinterpret_cast<ImageDispatchSpec<I> *>( + ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue()); + ceph_assert(peek_item == item); + + if (lock_required) { + this->get_pool_lock().unlock(); + m_image_ctx.owner_lock.get_read(); + if (m_image_ctx.exclusive_lock != nullptr) { + ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; + if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { + lderr(cct) << "op requires exclusive lock" << dendl; + fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(), + item); + + // wake up the IO since we won't be returning a request to process + this->signal(); + } else { + // stall IO until the acquire completes + ++m_io_blockers; + m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item)); + } + } else { + // raced with the exclusive lock being disabled + lock_required = false; + } + m_image_ctx.owner_lock.put_read(); + this->get_pool_lock().lock(); + + if (lock_required) { + return nullptr; + } + } + + if (refresh_required) { + ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; + + // stall IO until the refresh completes + ++m_io_blockers; + + this->get_pool_lock().unlock(); + m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); + this->get_pool_lock().lock(); + return nullptr; + } + + item->start_op(); + return item; +} + +template <typename I> +void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "req=" << req << dendl; + + req->send(); + + finish_queued_io(req); + if (req->is_write_op()) { + finish_in_flight_write(); + } + delete req; + + finish_in_flight_io(); +} + +template <typename I> +void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) { + RWLock::RLocker locker(m_lock); + if (req->is_write_op()) { + ceph_assert(m_queued_writes > 0); + m_queued_writes--; + } else { + ceph_assert(m_queued_reads > 0); + m_queued_reads--; + } +} + +template <typename I> +void ImageRequestWQ<I>::finish_in_flight_write() { + bool writes_blocked = false; + { + RWLock::RLocker locker(m_lock); + ceph_assert(m_in_flight_writes > 0); + if (--m_in_flight_writes == 0 && + !m_write_blocker_contexts.empty()) { + writes_blocked = true; + } + } + + if (writes_blocked) { + flush_image(m_image_ctx, new C_BlockedWrites(this)); + } +} + +template <typename I> +int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) { + RWLock::RLocker locker(m_lock); + + if (m_shutdown) { + CephContext *cct = m_image_ctx.cct; + lderr(cct) << "IO received on closed image" << dendl; + + c->get(); + c->fail(-ESHUTDOWN); + return false; + } + + if (!m_image_ctx.data_ctx.is_valid()) { + CephContext *cct = m_image_ctx.cct; + lderr(cct) << "missing data pool" << dendl; + + c->get(); + c->fail(-ENODEV); + return false; + } + + m_in_flight_ios++; + return true; +} + +template <typename I> +void ImageRequestWQ<I>::finish_in_flight_io() { + Context *on_shutdown; + { + RWLock::RLocker locker(m_lock); + if (--m_in_flight_ios > 0 || !m_shutdown) { + return; + } + on_shutdown = m_on_shutdown; + } + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "completing shut down" << dendl; + + ceph_assert(on_shutdown != nullptr); + flush_image(m_image_ctx, on_shutdown); +} + +template <typename I> +void ImageRequestWQ<I>::fail_in_flight_io( + int r, ImageDispatchSpec<I> *req) { + this->process_finish(); + req->fail(r); + finish_queued_io(req); + delete req; + finish_in_flight_io(); +} + +template <typename I> +bool ImageRequestWQ<I>::is_lock_required(bool write_op) const { + ceph_assert(m_lock.is_locked()); + return ((write_op && m_require_lock_on_write) || + (!write_op && m_require_lock_on_read)); +} + +template <typename I> +void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) { + ceph_assert(m_image_ctx.owner_lock.is_locked()); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "req=" << req << dendl; + + if (req->is_write_op()) { + m_queued_writes++; + } else { + m_queued_reads++; + } + + ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req); +} + +template <typename I> +void ImageRequestWQ<I>::handle_acquire_lock( + int r, ImageDispatchSpec<I> *req) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; + + if (r < 0) { + fail_in_flight_io(r, req); + } else { + // since IO was stalled for acquire -- original IO order is preserved + // if we requeue this op for work queue processing + this->requeue_front(req); + } + + ceph_assert(m_io_blockers.load() > 0); + --m_io_blockers; + this->signal(); +} + +template <typename I> +void ImageRequestWQ<I>::handle_refreshed( + int r, ImageDispatchSpec<I> *req) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " + << "req=" << req << dendl; + if (r < 0) { + fail_in_flight_io(r, req); + } else { + // since IO was stalled for refresh -- original IO order is preserved + // if we requeue this op for work queue processing + this->requeue_front(req); + } + + ceph_assert(m_io_blockers.load() > 0); + --m_io_blockers; + this->signal(); +} + +template <typename I> +void ImageRequestWQ<I>::handle_blocked_writes(int r) { + Contexts contexts; + { + RWLock::WLocker locker(m_lock); + contexts.swap(m_write_blocker_contexts); + } + + for (auto ctx : contexts) { + ctx->complete(0); + } +} + +template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; + +} // namespace io +} // namespace librbd |