summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/rate_limiter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/util/rate_limiter.cc')
-rw-r--r--src/rocksdb/util/rate_limiter.cc339
1 files changed, 339 insertions, 0 deletions
diff --git a/src/rocksdb/util/rate_limiter.cc b/src/rocksdb/util/rate_limiter.cc
new file mode 100644
index 000000000..b1eefe620
--- /dev/null
+++ b/src/rocksdb/util/rate_limiter.cc
@@ -0,0 +1,339 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "util/rate_limiter.h"
+#include "monitoring/statistics.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "test_util/sync_point.h"
+#include "util/aligned_buffer.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
+ Env::IOPriority io_priority, Statistics* stats,
+ RateLimiter::OpType op_type) {
+ if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
+ bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
+
+ if (alignment > 0) {
+ // Here we may actually require more than burst and block
+ // but we can not write less than one page at a time on direct I/O
+ // thus we may want not to use ratelimiter
+ bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
+ }
+ Request(bytes, io_priority, stats, op_type);
+ }
+ return bytes;
+}
+
+// Pending request
+struct GenericRateLimiter::Req {
+ explicit Req(int64_t _bytes, port::Mutex* _mu)
+ : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {}
+ int64_t request_bytes;
+ int64_t bytes;
+ port::CondVar cv;
+ bool granted;
+};
+
+GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
+ int64_t refill_period_us,
+ int32_t fairness, RateLimiter::Mode mode,
+ Env* env, bool auto_tuned)
+ : RateLimiter(mode),
+ refill_period_us_(refill_period_us),
+ rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
+ : rate_bytes_per_sec),
+ refill_bytes_per_period_(
+ CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
+ env_(env),
+ stop_(false),
+ exit_cv_(&request_mutex_),
+ requests_to_wait_(0),
+ available_bytes_(0),
+ next_refill_us_(NowMicrosMonotonic(env_)),
+ fairness_(fairness > 100 ? 100 : fairness),
+ rnd_((uint32_t)time(nullptr)),
+ leader_(nullptr),
+ auto_tuned_(auto_tuned),
+ num_drains_(0),
+ prev_num_drains_(0),
+ max_bytes_per_sec_(rate_bytes_per_sec),
+ tuned_time_(NowMicrosMonotonic(env_)) {
+ total_requests_[0] = 0;
+ total_requests_[1] = 0;
+ total_bytes_through_[0] = 0;
+ total_bytes_through_[1] = 0;
+}
+
+GenericRateLimiter::~GenericRateLimiter() {
+ MutexLock g(&request_mutex_);
+ stop_ = true;
+ requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() +
+ queue_[Env::IO_HIGH].size());
+ for (auto& r : queue_[Env::IO_HIGH]) {
+ r->cv.Signal();
+ }
+ for (auto& r : queue_[Env::IO_LOW]) {
+ r->cv.Signal();
+ }
+ while (requests_to_wait_ > 0) {
+ exit_cv_.Wait();
+ }
+}
+
+// This API allows user to dynamically change rate limiter's bytes per second.
+void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
+ assert(bytes_per_second > 0);
+ rate_bytes_per_sec_ = bytes_per_second;
+ refill_bytes_per_period_.store(
+ CalculateRefillBytesPerPeriod(bytes_per_second),
+ std::memory_order_relaxed);
+}
+
+void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
+ Statistics* stats) {
+ assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
+ TEST_SYNC_POINT("GenericRateLimiter::Request");
+ TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
+ &rate_bytes_per_sec_);
+ MutexLock g(&request_mutex_);
+
+ if (auto_tuned_) {
+ static const int kRefillsPerTune = 100;
+ std::chrono::microseconds now(NowMicrosMonotonic(env_));
+ if (now - tuned_time_ >=
+ kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
+ Tune();
+ }
+ }
+
+ if (stop_) {
+ return;
+ }
+
+ ++total_requests_[pri];
+
+ if (available_bytes_ >= bytes) {
+ // Refill thread assigns quota and notifies requests waiting on
+ // the queue under mutex. So if we get here, that means nobody
+ // is waiting?
+ available_bytes_ -= bytes;
+ total_bytes_through_[pri] += bytes;
+ return;
+ }
+
+ // Request cannot be satisfied at this moment, enqueue
+ Req r(bytes, &request_mutex_);
+ queue_[pri].push_back(&r);
+
+ do {
+ bool timedout = false;
+ // Leader election, candidates can be:
+ // (1) a new incoming request,
+ // (2) a previous leader, whose quota has not been not assigned yet due
+ // to lower priority
+ // (3) a previous waiter at the front of queue, who got notified by
+ // previous leader
+ if (leader_ == nullptr &&
+ ((!queue_[Env::IO_HIGH].empty() &&
+ &r == queue_[Env::IO_HIGH].front()) ||
+ (!queue_[Env::IO_LOW].empty() &&
+ &r == queue_[Env::IO_LOW].front()))) {
+ leader_ = &r;
+ int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_);
+ delta = delta > 0 ? delta : 0;
+ if (delta == 0) {
+ timedout = true;
+ } else {
+ int64_t wait_until = env_->NowMicros() + delta;
+ RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
+ ++num_drains_;
+ timedout = r.cv.TimedWait(wait_until);
+ }
+ } else {
+ // Not at the front of queue or an leader has already been elected
+ r.cv.Wait();
+ }
+
+ // request_mutex_ is held from now on
+ if (stop_) {
+ --requests_to_wait_;
+ exit_cv_.Signal();
+ return;
+ }
+
+ // Make sure the waken up request is always the header of its queue
+ assert(r.granted ||
+ (!queue_[Env::IO_HIGH].empty() &&
+ &r == queue_[Env::IO_HIGH].front()) ||
+ (!queue_[Env::IO_LOW].empty() &&
+ &r == queue_[Env::IO_LOW].front()));
+ assert(leader_ == nullptr ||
+ (!queue_[Env::IO_HIGH].empty() &&
+ leader_ == queue_[Env::IO_HIGH].front()) ||
+ (!queue_[Env::IO_LOW].empty() &&
+ leader_ == queue_[Env::IO_LOW].front()));
+
+ if (leader_ == &r) {
+ // Waken up from TimedWait()
+ if (timedout) {
+ // Time to do refill!
+ Refill();
+
+ // Re-elect a new leader regardless. This is to simplify the
+ // election handling.
+ leader_ = nullptr;
+
+ // Notify the header of queue if current leader is going away
+ if (r.granted) {
+ // Current leader already got granted with quota. Notify header
+ // of waiting queue to participate next round of election.
+ assert((queue_[Env::IO_HIGH].empty() ||
+ &r != queue_[Env::IO_HIGH].front()) &&
+ (queue_[Env::IO_LOW].empty() ||
+ &r != queue_[Env::IO_LOW].front()));
+ if (!queue_[Env::IO_HIGH].empty()) {
+ queue_[Env::IO_HIGH].front()->cv.Signal();
+ } else if (!queue_[Env::IO_LOW].empty()) {
+ queue_[Env::IO_LOW].front()->cv.Signal();
+ }
+ // Done
+ break;
+ }
+ } else {
+ // Spontaneous wake up, need to continue to wait
+ assert(!r.granted);
+ leader_ = nullptr;
+ }
+ } else {
+ // Waken up by previous leader:
+ // (1) if requested quota is granted, it is done.
+ // (2) if requested quota is not granted, this means current thread
+ // was picked as a new leader candidate (previous leader got quota).
+ // It needs to participate leader election because a new request may
+ // come in before this thread gets waken up. So it may actually need
+ // to do Wait() again.
+ assert(!timedout);
+ }
+ } while (!r.granted);
+}
+
+void GenericRateLimiter::Refill() {
+ TEST_SYNC_POINT("GenericRateLimiter::Refill");
+ next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_;
+ // Carry over the left over quota from the last period
+ auto refill_bytes_per_period =
+ refill_bytes_per_period_.load(std::memory_order_relaxed);
+ if (available_bytes_ < refill_bytes_per_period) {
+ available_bytes_ += refill_bytes_per_period;
+ }
+
+ int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;
+ for (int q = 0; q < 2; ++q) {
+ auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH;
+ auto* queue = &queue_[use_pri];
+ while (!queue->empty()) {
+ auto* next_req = queue->front();
+ if (available_bytes_ < next_req->request_bytes) {
+ // avoid starvation
+ next_req->request_bytes -= available_bytes_;
+ available_bytes_ = 0;
+ break;
+ }
+ available_bytes_ -= next_req->request_bytes;
+ next_req->request_bytes = 0;
+ total_bytes_through_[use_pri] += next_req->bytes;
+ queue->pop_front();
+
+ next_req->granted = true;
+ if (next_req != leader_) {
+ // Quota granted, signal the thread
+ next_req->cv.Signal();
+ }
+ }
+ }
+}
+
+int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
+ int64_t rate_bytes_per_sec) {
+ if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) {
+ // Avoid unexpected result in the overflow case. The result now is still
+ // inaccurate but is a number that is large enough.
+ return port::kMaxInt64 / 1000000;
+ } else {
+ return std::max(kMinRefillBytesPerPeriod,
+ rate_bytes_per_sec * refill_period_us_ / 1000000);
+ }
+}
+
+Status GenericRateLimiter::Tune() {
+ const int kLowWatermarkPct = 50;
+ const int kHighWatermarkPct = 90;
+ const int kAdjustFactorPct = 5;
+ // computed rate limit will be in
+ // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
+ const int kAllowedRangeFactor = 20;
+
+ std::chrono::microseconds prev_tuned_time = tuned_time_;
+ tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_));
+
+ int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
+ std::chrono::microseconds(refill_period_us_) -
+ std::chrono::microseconds(1)) /
+ std::chrono::microseconds(refill_period_us_);
+ // We tune every kRefillsPerTune intervals, so the overflow and division-by-
+ // zero conditions should never happen.
+ assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);
+ assert(elapsed_intervals > 0);
+ int64_t drained_pct =
+ (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals;
+
+ int64_t prev_bytes_per_sec = GetBytesPerSecond();
+ int64_t new_bytes_per_sec;
+ if (drained_pct == 0) {
+ new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
+ } else if (drained_pct < kLowWatermarkPct) {
+ // sanitize to prevent overflow
+ int64_t sanitized_prev_bytes_per_sec =
+ std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);
+ new_bytes_per_sec =
+ std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
+ sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
+ } else if (drained_pct > kHighWatermarkPct) {
+ // sanitize to prevent overflow
+ int64_t sanitized_prev_bytes_per_sec = std::min(
+ prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));
+ new_bytes_per_sec =
+ std::min(max_bytes_per_sec_,
+ sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
+ } else {
+ new_bytes_per_sec = prev_bytes_per_sec;
+ }
+ if (new_bytes_per_sec != prev_bytes_per_sec) {
+ SetBytesPerSecond(new_bytes_per_sec);
+ }
+ num_drains_ = prev_num_drains_;
+ return Status::OK();
+}
+
+RateLimiter* NewGenericRateLimiter(
+ int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
+ int32_t fairness /* = 10 */,
+ RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
+ bool auto_tuned /* = false */) {
+ assert(rate_bytes_per_sec > 0);
+ assert(refill_period_us > 0);
+ assert(fairness > 0);
+ return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
+ mode, Env::Default(), auto_tuned);
+}
+
+} // namespace ROCKSDB_NAMESPACE