summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/rate_limiter.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/util/rate_limiter.cc378
1 files changed, 378 insertions, 0 deletions
diff --git a/src/rocksdb/util/rate_limiter.cc b/src/rocksdb/util/rate_limiter.cc
new file mode 100644
index 000000000..6bbcabfae
--- /dev/null
+++ b/src/rocksdb/util/rate_limiter.cc
@@ -0,0 +1,378 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "util/rate_limiter.h"
+
+#include <algorithm>
+
+#include "monitoring/statistics.h"
+#include "port/port.h"
+#include "rocksdb/system_clock.h"
+#include "test_util/sync_point.h"
+#include "util/aligned_buffer.h"
+
+namespace ROCKSDB_NAMESPACE {
+size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
+ Env::IOPriority io_priority, Statistics* stats,
+ RateLimiter::OpType op_type) {
+ if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
+ bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
+
+ if (alignment > 0) {
+ // Here we may actually require more than burst and block
+ // as we can not write/read less than one page at a time on direct I/O
+ // thus we do not want to be strictly constrained by burst
+ bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
+ }
+ Request(bytes, io_priority, stats, op_type);
+ }
+ return bytes;
+}
+
+// Pending request
+struct GenericRateLimiter::Req {
+ explicit Req(int64_t _bytes, port::Mutex* _mu)
+ : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {}
+ int64_t request_bytes;
+ int64_t bytes;
+ port::CondVar cv;
+ bool granted;
+};
+
+GenericRateLimiter::GenericRateLimiter(
+ int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
+ RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
+ bool auto_tuned)
+ : RateLimiter(mode),
+ refill_period_us_(refill_period_us),
+ rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
+ : rate_bytes_per_sec),
+ refill_bytes_per_period_(
+ CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)),
+ clock_(clock),
+ stop_(false),
+ exit_cv_(&request_mutex_),
+ requests_to_wait_(0),
+ available_bytes_(0),
+ next_refill_us_(NowMicrosMonotonicLocked()),
+ fairness_(fairness > 100 ? 100 : fairness),
+ rnd_((uint32_t)time(nullptr)),
+ wait_until_refill_pending_(false),
+ auto_tuned_(auto_tuned),
+ num_drains_(0),
+ max_bytes_per_sec_(rate_bytes_per_sec),
+ tuned_time_(NowMicrosMonotonicLocked()) {
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ total_requests_[i] = 0;
+ total_bytes_through_[i] = 0;
+ }
+}
+
+GenericRateLimiter::~GenericRateLimiter() {
+ MutexLock g(&request_mutex_);
+ stop_ = true;
+ std::deque<Req*>::size_type queues_size_sum = 0;
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ queues_size_sum += queue_[i].size();
+ }
+ requests_to_wait_ = static_cast<int32_t>(queues_size_sum);
+
+ for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
+ std::deque<Req*> queue = queue_[i];
+ for (auto& r : queue) {
+ r->cv.Signal();
+ }
+ }
+
+ while (requests_to_wait_ > 0) {
+ exit_cv_.Wait();
+ }
+}
+
+// This API allows user to dynamically change rate limiter's bytes per second.
+void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
+ MutexLock g(&request_mutex_);
+ SetBytesPerSecondLocked(bytes_per_second);
+}
+
+void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) {
+ assert(bytes_per_second > 0);
+ rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed);
+ refill_bytes_per_period_.store(
+ CalculateRefillBytesPerPeriodLocked(bytes_per_second),
+ std::memory_order_relaxed);
+}
+
+void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
+ Statistics* stats) {
+ assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
+ bytes = std::max(static_cast<int64_t>(0), bytes);
+ TEST_SYNC_POINT("GenericRateLimiter::Request");
+ TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
+ &rate_bytes_per_sec_);
+ MutexLock g(&request_mutex_);
+
+ if (auto_tuned_) {
+ static const int kRefillsPerTune = 100;
+ std::chrono::microseconds now(NowMicrosMonotonicLocked());
+ if (now - tuned_time_ >=
+ kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
+ Status s = TuneLocked();
+ s.PermitUncheckedError(); //**TODO: What to do on error?
+ }
+ }
+
+ if (stop_) {
+ // It is now in the clean-up of ~GenericRateLimiter().
+ // Therefore any new incoming request will exit from here
+ // and not get satiesfied.
+ return;
+ }
+
+ ++total_requests_[pri];
+
+ if (available_bytes_ >= bytes) {
+ // Refill thread assigns quota and notifies requests waiting on
+ // the queue under mutex. So if we get here, that means nobody
+ // is waiting?
+ available_bytes_ -= bytes;
+ total_bytes_through_[pri] += bytes;
+ return;
+ }
+
+ // Request cannot be satisfied at this moment, enqueue
+ Req r(bytes, &request_mutex_);
+ queue_[pri].push_back(&r);
+ TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest",
+ &request_mutex_);
+ // A thread representing a queued request coordinates with other such threads.
+ // There are two main duties.
+ //
+ // (1) Waiting for the next refill time.
+ // (2) Refilling the bytes and granting requests.
+ do {
+ int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked();
+ if (time_until_refill_us > 0) {
+ if (wait_until_refill_pending_) {
+ // Somebody is performing (1). Trust we'll be woken up when our request
+ // is granted or we are needed for future duties.
+ r.cv.Wait();
+ } else {
+ // Whichever thread reaches here first performs duty (1) as described
+ // above.
+ int64_t wait_until = clock_->NowMicros() + time_until_refill_us;
+ RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
+ ++num_drains_;
+ wait_until_refill_pending_ = true;
+ r.cv.TimedWait(wait_until);
+ TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait",
+ &time_until_refill_us);
+ wait_until_refill_pending_ = false;
+ }
+ } else {
+ // Whichever thread reaches here first performs duty (2) as described
+ // above.
+ RefillBytesAndGrantRequestsLocked();
+ if (r.granted) {
+ // If there is any remaining requests, make sure there exists at least
+ // one candidate is awake for future duties by signaling a front request
+ // of a queue.
+ for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
+ std::deque<Req*> queue = queue_[i];
+ if (!queue.empty()) {
+ queue.front()->cv.Signal();
+ break;
+ }
+ }
+ }
+ }
+ // Invariant: non-granted request is always in one queue, and granted
+ // request is always in zero queues.
+#ifndef NDEBUG
+ int num_found = 0;
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ if (std::find(queue_[i].begin(), queue_[i].end(), &r) !=
+ queue_[i].end()) {
+ ++num_found;
+ }
+ }
+ if (r.granted) {
+ assert(num_found == 0);
+ } else {
+ assert(num_found == 1);
+ }
+#endif // NDEBUG
+ } while (!stop_ && !r.granted);
+
+ if (stop_) {
+ // It is now in the clean-up of ~GenericRateLimiter().
+ // Therefore any woken-up request will have come out of the loop and then
+ // exit here. It might or might not have been satisfied.
+ --requests_to_wait_;
+ exit_cv_.Signal();
+ }
+}
+
+std::vector<Env::IOPriority>
+GenericRateLimiter::GeneratePriorityIterationOrderLocked() {
+ std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */);
+ // We make Env::IO_USER a superior priority by always iterating its queue
+ // first
+ pri_iteration_order[0] = Env::IO_USER;
+
+ bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForHighPri",
+ &high_pri_iterated_after_mid_low_pri);
+ bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForMidPri",
+ &mid_pri_itereated_after_low_pri);
+
+ if (high_pri_iterated_after_mid_low_pri) {
+ pri_iteration_order[3] = Env::IO_HIGH;
+ pri_iteration_order[2] =
+ mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
+ pri_iteration_order[1] =
+ (pri_iteration_order[2] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
+ } else {
+ pri_iteration_order[1] = Env::IO_HIGH;
+ pri_iteration_order[3] =
+ mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
+ pri_iteration_order[2] =
+ (pri_iteration_order[3] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
+ }
+
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PreReturnPriIterationOrder",
+ &pri_iteration_order);
+ return pri_iteration_order;
+}
+
+void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() {
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_);
+ next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_;
+ // Carry over the left over quota from the last period
+ auto refill_bytes_per_period =
+ refill_bytes_per_period_.load(std::memory_order_relaxed);
+ if (available_bytes_ < refill_bytes_per_period) {
+ available_bytes_ += refill_bytes_per_period;
+ }
+
+ std::vector<Env::IOPriority> pri_iteration_order =
+ GeneratePriorityIterationOrderLocked();
+
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ assert(!pri_iteration_order.empty());
+ Env::IOPriority current_pri = pri_iteration_order[i];
+ auto* queue = &queue_[current_pri];
+ while (!queue->empty()) {
+ auto* next_req = queue->front();
+ if (available_bytes_ < next_req->request_bytes) {
+ // Grant partial request_bytes to avoid starvation of requests
+ // that become asking for more bytes than available_bytes_
+ // due to dynamically reduced rate limiter's bytes_per_second that
+ // leads to reduced refill_bytes_per_period hence available_bytes_
+ next_req->request_bytes -= available_bytes_;
+ available_bytes_ = 0;
+ break;
+ }
+ available_bytes_ -= next_req->request_bytes;
+ next_req->request_bytes = 0;
+ total_bytes_through_[current_pri] += next_req->bytes;
+ queue->pop_front();
+
+ next_req->granted = true;
+ // Quota granted, signal the thread to exit
+ next_req->cv.Signal();
+ }
+ }
+}
+
+int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked(
+ int64_t rate_bytes_per_sec) {
+ if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
+ refill_period_us_) {
+ // Avoid unexpected result in the overflow case. The result now is still
+ // inaccurate but is a number that is large enough.
+ return std::numeric_limits<int64_t>::max() / 1000000;
+ } else {
+ return rate_bytes_per_sec * refill_period_us_ / 1000000;
+ }
+}
+
+Status GenericRateLimiter::TuneLocked() {
+ const int kLowWatermarkPct = 50;
+ const int kHighWatermarkPct = 90;
+ const int kAdjustFactorPct = 5;
+ // computed rate limit will be in
+ // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
+ const int kAllowedRangeFactor = 20;
+
+ std::chrono::microseconds prev_tuned_time = tuned_time_;
+ tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked());
+
+ int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
+ std::chrono::microseconds(refill_period_us_) -
+ std::chrono::microseconds(1)) /
+ std::chrono::microseconds(refill_period_us_);
+ // We tune every kRefillsPerTune intervals, so the overflow and division-by-
+ // zero conditions should never happen.
+ assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100);
+ assert(elapsed_intervals > 0);
+ int64_t drained_pct = num_drains_ * 100 / elapsed_intervals;
+
+ int64_t prev_bytes_per_sec = GetBytesPerSecond();
+ int64_t new_bytes_per_sec;
+ if (drained_pct == 0) {
+ new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
+ } else if (drained_pct < kLowWatermarkPct) {
+ // sanitize to prevent overflow
+ int64_t sanitized_prev_bytes_per_sec =
+ std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100);
+ new_bytes_per_sec =
+ std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
+ sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
+ } else if (drained_pct > kHighWatermarkPct) {
+ // sanitize to prevent overflow
+ int64_t sanitized_prev_bytes_per_sec =
+ std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() /
+ (100 + kAdjustFactorPct));
+ new_bytes_per_sec =
+ std::min(max_bytes_per_sec_,
+ sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
+ } else {
+ new_bytes_per_sec = prev_bytes_per_sec;
+ }
+ if (new_bytes_per_sec != prev_bytes_per_sec) {
+ SetBytesPerSecondLocked(new_bytes_per_sec);
+ }
+ num_drains_ = 0;
+ return Status::OK();
+}
+
+RateLimiter* NewGenericRateLimiter(
+ int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
+ int32_t fairness /* = 10 */,
+ RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
+ bool auto_tuned /* = false */) {
+ assert(rate_bytes_per_sec > 0);
+ assert(refill_period_us > 0);
+ assert(fairness > 0);
+ std::unique_ptr<RateLimiter> limiter(
+ new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
+ mode, SystemClock::Default(), auto_tuned));
+ return limiter.release();
+}
+
+} // namespace ROCKSDB_NAMESPACE