diff options
Diffstat (limited to 'src/rocksdb/db/write_controller.cc')
-rw-r--r-- | src/rocksdb/db/write_controller.cc | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/src/rocksdb/db/write_controller.cc b/src/rocksdb/db/write_controller.cc new file mode 100644 index 00000000..558aa721 --- /dev/null +++ b/src/rocksdb/db/write_controller.cc @@ -0,0 +1,128 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/write_controller.h" + +#include <atomic> +#include <cassert> +#include <ratio> +#include "rocksdb/env.h" + +namespace rocksdb { + +std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() { + ++total_stopped_; + return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); +} + +std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken( + uint64_t write_rate) { + total_delayed_++; + // Reset counters. + last_refill_time_ = 0; + bytes_left_ = 0; + set_delayed_write_rate(write_rate); + return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); +} + +std::unique_ptr<WriteControllerToken> +WriteController::GetCompactionPressureToken() { + ++total_compaction_pressure_; + return std::unique_ptr<WriteControllerToken>( + new CompactionPressureToken(this)); +} + +bool WriteController::IsStopped() const { + return total_stopped_.load(std::memory_order_relaxed) > 0; +} +// This is inside DB mutex, so we can't sleep and need to minimize +// frequency to get time. +// If it turns out to be a performance issue, we can redesign the thread +// synchronization model here. +// The function trust caller will sleep micros returned. +uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { + if (total_stopped_.load(std::memory_order_relaxed) > 0) { + return 0; + } + if (total_delayed_.load(std::memory_order_relaxed) == 0) { + return 0; + } + + const uint64_t kMicrosPerSecond = 1000000; + const uint64_t kRefillInterval = 1024U; + + if (bytes_left_ >= num_bytes) { + bytes_left_ -= num_bytes; + return 0; + } + // The frequency to get time inside DB mutex is less than one per refill + // interval. + auto time_now = NowMicrosMonotonic(env); + + uint64_t sleep_debt = 0; + uint64_t time_since_last_refill = 0; + if (last_refill_time_ != 0) { + if (last_refill_time_ > time_now) { + sleep_debt = last_refill_time_ - time_now; + } else { + time_since_last_refill = time_now - last_refill_time_; + bytes_left_ += + static_cast<uint64_t>(static_cast<double>(time_since_last_refill) / + kMicrosPerSecond * delayed_write_rate_); + if (time_since_last_refill >= kRefillInterval && + bytes_left_ > num_bytes) { + // If refill interval already passed and we have enough bytes + // return without extra sleeping. + last_refill_time_ = time_now; + bytes_left_ -= num_bytes; + return 0; + } + } + } + + uint64_t single_refill_amount = + delayed_write_rate_ * kRefillInterval / kMicrosPerSecond; + if (bytes_left_ + single_refill_amount >= num_bytes) { + // Wait until a refill interval + // Never trigger expire for less than one refill interval to avoid to get + // time. + bytes_left_ = bytes_left_ + single_refill_amount - num_bytes; + last_refill_time_ = time_now + kRefillInterval; + return kRefillInterval + sleep_debt; + } + + // Need to refill more than one interval. Need to sleep longer. Check + // whether expiration will hit + + // Sleep just until `num_bytes` is allowed. + uint64_t sleep_amount = + static_cast<uint64_t>(num_bytes / + static_cast<long double>(delayed_write_rate_) * + kMicrosPerSecond) + + sleep_debt; + last_refill_time_ = time_now + sleep_amount; + return sleep_amount; +} + +uint64_t WriteController::NowMicrosMonotonic(Env* env) { + return env->NowNanos() / std::milli::den; +} + +StopWriteToken::~StopWriteToken() { + assert(controller_->total_stopped_ >= 1); + --controller_->total_stopped_; +} + +DelayWriteToken::~DelayWriteToken() { + controller_->total_delayed_--; + assert(controller_->total_delayed_.load() >= 0); +} + +CompactionPressureToken::~CompactionPressureToken() { + controller_->total_compaction_pressure_--; + assert(controller_->total_compaction_pressure_ >= 0); +} + +} // namespace rocksdb |