summaryrefslogtreecommitdiffstats
path: root/src/librbd/io/QosImageDispatch.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/io/QosImageDispatch.cc305
1 files changed, 305 insertions, 0 deletions
diff --git a/src/librbd/io/QosImageDispatch.cc b/src/librbd/io/QosImageDispatch.cc
new file mode 100644
index 000000000..9ca88ac19
--- /dev/null
+++ b/src/librbd/io/QosImageDispatch.cc
@@ -0,0 +1,305 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/QosImageDispatch.h"
+#include "common/dout.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/io/FlushTracker.h"
+#include <map>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::QosImageDispatch: " << this << " " \
+ << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+namespace {
+
+uint64_t get_extent_length(const Extents& extents) {
+ uint64_t length = 0;
+ for (auto& extent : extents) {
+ length += extent.second;
+ }
+ return length;
+}
+
+uint64_t calculate_tokens(bool read_op, uint64_t extent_length, uint64_t flag) {
+ if (read_op && ((flag & IMAGE_DISPATCH_FLAG_QOS_WRITE_MASK) != 0)) {
+ return 0;
+ } else if (!read_op && ((flag & IMAGE_DISPATCH_FLAG_QOS_READ_MASK) != 0)) {
+ return 0;
+ }
+
+ return (((flag & IMAGE_DISPATCH_FLAG_QOS_BPS_MASK) != 0) ? extent_length : 1);
+}
+
+static std::map<uint64_t, std::string> throttle_flags = {
+ {IMAGE_DISPATCH_FLAG_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
+};
+
+} // anonymous namespace
+
+template <typename I>
+QosImageDispatch<I>::QosImageDispatch(I* image_ctx)
+ : m_image_ctx(image_ctx), m_flush_tracker(new FlushTracker<I>(image_ctx)) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+
+ SafeTimer *timer;
+ ceph::mutex *timer_lock;
+ ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
+ for (auto flag : throttle_flags) {
+ m_throttles.push_back(make_pair(
+ flag.first,
+ new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
+ }
+}
+
+template <typename I>
+QosImageDispatch<I>::~QosImageDispatch() {
+ for (auto t : m_throttles) {
+ delete t.second;
+ }
+ delete m_flush_tracker;
+}
+
+template <typename I>
+void QosImageDispatch<I>::shut_down(Context* on_finish) {
+ m_flush_tracker->shut_down();
+ on_finish->complete(0);
+}
+
+template <typename I>
+void QosImageDispatch<I>::apply_qos_schedule_tick_min(uint64_t tick) {
+ for (auto pair : m_throttles) {
+ pair.second->set_schedule_tick_min(tick);
+ }
+}
+
+template <typename I>
+void QosImageDispatch<I>::apply_qos_limit(uint64_t flag, uint64_t limit,
+ uint64_t burst, uint64_t burst_seconds) {
+ auto cct = m_image_ctx->cct;
+ TokenBucketThrottle *throttle = nullptr;
+ for (auto pair : m_throttles) {
+ if (flag == pair.first) {
+ throttle = pair.second;
+ break;
+ }
+ }
+ ceph_assert(throttle != nullptr);
+
+ int r = throttle->set_limit(limit, burst, burst_seconds);
+ if (r < 0) {
+ lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
+ << "burst(" << burst << ") is less than "
+ << "limit(" << limit << ")" << dendl;
+ // if apply failed, we should at least make sure the limit works.
+ throttle->set_limit(limit, 0, 1);
+ }
+
+ if (limit) {
+ m_qos_enabled_flag |= flag;
+ } else {
+ m_qos_enabled_flag &= ~flag;
+ }
+}
+
+template <typename I>
+bool QosImageDispatch<I>::read(
+ AioCompletion* aio_comp, Extents &&image_extents, ReadResult &&read_result,
+ IOContext io_context, int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(true, image_extents, tid, image_dispatch_flags,
+ dispatch_result, on_finish, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, tid, image_dispatch_flags,
+ dispatch_result, on_finish, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::discard(
+ AioCompletion* aio_comp, Extents &&image_extents,
+ uint32_t discard_granularity_bytes, IOContext io_context,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, tid, image_dispatch_flags,
+ dispatch_result, on_finish, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::write_same(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, tid, image_dispatch_flags,
+ dispatch_result, on_finish, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::compare_and_write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_offset, IOContext io_context,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, tid, image_dispatch_flags,
+ dispatch_result, on_finish, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::flush(
+ AioCompletion* aio_comp, FlushSource flush_source,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ *dispatch_result = DISPATCH_RESULT_CONTINUE;
+ m_flush_tracker->flush(on_dispatched);
+ return true;
+}
+
+template <typename I>
+void QosImageDispatch<I>::handle_finished(int r, uint64_t tid) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ m_flush_tracker->finish_io(tid);
+}
+
+template <typename I>
+bool QosImageDispatch<I>::set_throttle_flag(
+ std::atomic<uint32_t>* image_dispatch_flags, uint32_t flag) {
+ uint32_t expected = image_dispatch_flags->load();
+ uint32_t desired;
+ do {
+ desired = expected | flag;
+ } while (!image_dispatch_flags->compare_exchange_weak(expected, desired));
+
+ return ((desired & IMAGE_DISPATCH_FLAG_QOS_MASK) ==
+ IMAGE_DISPATCH_FLAG_QOS_MASK);
+}
+
+template <typename I>
+bool QosImageDispatch<I>::needs_throttle(
+ bool read_op, const Extents& image_extents, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ auto extent_length = get_extent_length(image_extents);
+ bool all_qos_flags_set = false;
+
+ if (!read_op) {
+ m_flush_tracker->start_io(tid);
+ *on_finish = new LambdaContext([this, tid, on_finish=*on_finish](int r) {
+ handle_finished(r, tid);
+ on_finish->complete(r);
+ });
+ }
+ *dispatch_result = DISPATCH_RESULT_CONTINUE;
+
+ auto qos_enabled_flag = m_qos_enabled_flag;
+ for (auto [flag, throttle] : m_throttles) {
+ if ((qos_enabled_flag & flag) == 0) {
+ all_qos_flags_set = set_throttle_flag(image_dispatch_flags, flag);
+ continue;
+ }
+
+ auto tokens = calculate_tokens(read_op, extent_length, flag);
+ if (tokens > 0 &&
+ throttle->get(tokens, this, &QosImageDispatch<I>::handle_throttle_ready,
+ Tag{image_dispatch_flags, on_dispatched}, flag)) {
+ ldout(cct, 15) << "on_dispatched=" << on_dispatched << ", "
+ << "flag=" << flag << dendl;
+ all_qos_flags_set = false;
+ } else {
+ all_qos_flags_set = set_throttle_flag(image_dispatch_flags, flag);
+ }
+ }
+ return !all_qos_flags_set;
+}
+
+template <typename I>
+void QosImageDispatch<I>::handle_throttle_ready(Tag&& tag, uint64_t flag) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 15) << "on_dispatched=" << tag.on_dispatched << ", "
+ << "flag=" << flag << dendl;
+
+ if (set_throttle_flag(tag.image_dispatch_flags, flag)) {
+ // timer_lock is held -- so dispatch from outside the timer thread
+ m_image_ctx->asio_engine->post(tag.on_dispatched, 0);
+ }
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::QosImageDispatch<librbd::ImageCtx>;