summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/write_controller.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/write_controller.h')
-rw-r--r--src/rocksdb/db/write_controller.h144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/rocksdb/db/write_controller.h b/src/rocksdb/db/write_controller.h
new file mode 100644
index 00000000..7c301ce7
--- /dev/null
+++ b/src/rocksdb/db/write_controller.h
@@ -0,0 +1,144 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include "rocksdb/rate_limiter.h"
+
+namespace rocksdb {
+
+class Env;
+class WriteControllerToken;
+
+// WriteController is controlling write stalls in our write code-path. Write
+// stalls happen when compaction can't keep up with write rate.
+// All of the methods here (including WriteControllerToken's destructors) need
+// to be called while holding DB mutex
+class WriteController {
+ public:
+ explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
+ int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
+ : total_stopped_(0),
+ total_delayed_(0),
+ total_compaction_pressure_(0),
+ bytes_left_(0),
+ last_refill_time_(0),
+ low_pri_rate_limiter_(
+ NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
+ set_max_delayed_write_rate(_delayed_write_rate);
+ }
+ ~WriteController() = default;
+
+ // When an actor (column family) requests a stop token, all writes will be
+ // stopped until the stop token is released (deleted)
+ std::unique_ptr<WriteControllerToken> GetStopToken();
+ // When an actor (column family) requests a delay token, total delay for all
+ // writes to the DB will be controlled under the delayed write rate. Every
+ // write needs to call GetDelay() with number of bytes writing to the DB,
+ // which returns number of microseconds to sleep.
+ std::unique_ptr<WriteControllerToken> GetDelayToken(
+ uint64_t delayed_write_rate);
+ // When an actor (column family) requests a moderate token, compaction
+ // threads will be increased
+ std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
+
+ // these three metods are querying the state of the WriteController
+ bool IsStopped() const;
+ bool NeedsDelay() const { return total_delayed_.load() > 0; }
+ bool NeedSpeedupCompaction() const {
+ return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
+ }
+ // return how many microseconds the caller needs to sleep after the call
+ // num_bytes: how many number of bytes to put into the DB.
+ // Prerequisite: DB mutex held.
+ uint64_t GetDelay(Env* env, uint64_t num_bytes);
+ void set_delayed_write_rate(uint64_t write_rate) {
+ // avoid divide 0
+ if (write_rate == 0) {
+ write_rate = 1u;
+ } else if (write_rate > max_delayed_write_rate()) {
+ write_rate = max_delayed_write_rate();
+ }
+ delayed_write_rate_ = write_rate;
+ }
+
+ void set_max_delayed_write_rate(uint64_t write_rate) {
+ // avoid divide 0
+ if (write_rate == 0) {
+ write_rate = 1u;
+ }
+ max_delayed_write_rate_ = write_rate;
+ // update delayed_write_rate_ as well
+ delayed_write_rate_ = write_rate;
+ }
+
+ uint64_t delayed_write_rate() const { return delayed_write_rate_; }
+
+ uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
+
+ RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
+
+ private:
+ uint64_t NowMicrosMonotonic(Env* env);
+
+ friend class WriteControllerToken;
+ friend class StopWriteToken;
+ friend class DelayWriteToken;
+ friend class CompactionPressureToken;
+
+ std::atomic<int> total_stopped_;
+ std::atomic<int> total_delayed_;
+ std::atomic<int> total_compaction_pressure_;
+ uint64_t bytes_left_;
+ uint64_t last_refill_time_;
+ // write rate set when initialization or by `DBImpl::SetDBOptions`
+ uint64_t max_delayed_write_rate_;
+ // current write rate
+ uint64_t delayed_write_rate_;
+
+ std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
+};
+
+class WriteControllerToken {
+ public:
+ explicit WriteControllerToken(WriteController* controller)
+ : controller_(controller) {}
+ virtual ~WriteControllerToken() {}
+
+ protected:
+ WriteController* controller_;
+
+ private:
+ // no copying allowed
+ WriteControllerToken(const WriteControllerToken&) = delete;
+ void operator=(const WriteControllerToken&) = delete;
+};
+
+class StopWriteToken : public WriteControllerToken {
+ public:
+ explicit StopWriteToken(WriteController* controller)
+ : WriteControllerToken(controller) {}
+ virtual ~StopWriteToken();
+};
+
+class DelayWriteToken : public WriteControllerToken {
+ public:
+ explicit DelayWriteToken(WriteController* controller)
+ : WriteControllerToken(controller) {}
+ virtual ~DelayWriteToken();
+};
+
+class CompactionPressureToken : public WriteControllerToken {
+ public:
+ explicit CompactionPressureToken(WriteController* controller)
+ : WriteControllerToken(controller) {}
+ virtual ~CompactionPressureToken();
+};
+
+} // namespace rocksdb