summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/timer.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/util/timer.h
parentInitial commit. (diff)
downloadceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz
ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/util/timer.h')
-rw-r--r--src/rocksdb/util/timer.h340
1 files changed, 340 insertions, 0 deletions
diff --git a/src/rocksdb/util/timer.h b/src/rocksdb/util/timer.h
new file mode 100644
index 000000000..db71cefaf
--- /dev/null
+++ b/src/rocksdb/util/timer.h
@@ -0,0 +1,340 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "monitoring/instrumented_mutex.h"
+#include "rocksdb/system_clock.h"
+#include "test_util/sync_point.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// A Timer class to handle repeated work.
+//
+// `Start()` and `Shutdown()` are currently not thread-safe. The client must
+// serialize calls to these two member functions.
+//
+// A single timer instance can handle multiple functions via a single thread.
+// It is better to leave long running work to a dedicated thread pool.
+//
+// Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
+// Work (in terms of a `void function`) can be scheduled by calling `Add` with
+// a unique function name and de-scheduled by calling `Cancel`.
+// Many functions can be added.
+//
+// Impl Details:
+// A heap is used to keep track of when the next timer goes off.
+// A map from a function name to the function keeps track of all the functions.
+class Timer {
+ public:
+ explicit Timer(SystemClock* clock)
+ : clock_(clock),
+ mutex_(clock),
+ cond_var_(&mutex_),
+ running_(false),
+ executing_task_(false) {}
+
+ ~Timer() { Shutdown(); }
+
+ // Add a new function to run.
+ // fn_name has to be identical, otherwise it will fail to add and return false
+ // start_after_us is the initial delay.
+ // repeat_every_us is the interval between ending time of the last call and
+ // starting time of the next call. For example, repeat_every_us = 2000 and
+ // the function takes 1000us to run. If it starts at time [now]us, then it
+ // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
+ // repeat_every_us == 0 means do not repeat.
+ bool Add(std::function<void()> fn, const std::string& fn_name,
+ uint64_t start_after_us, uint64_t repeat_every_us) {
+ auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
+ repeat_every_us);
+ InstrumentedMutexLock l(&mutex_);
+ // Assign time within mutex to make sure the next_run_time is larger than
+ // the current running one
+ fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
+ // the new task start time should never before the current task executing
+ // time, as the executing task can only be running if it's next_run_time_us
+ // is due (<= clock_->NowMicros()).
+ if (executing_task_ &&
+ fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
+ return false;
+ }
+ auto it = map_.find(fn_name);
+ if (it == map_.end()) {
+ heap_.push(fn_info.get());
+ map_.try_emplace(fn_name, std::move(fn_info));
+ } else {
+ // timer doesn't support duplicated function name
+ return false;
+ }
+ cond_var_.SignalAll();
+ return true;
+ }
+
+ void Cancel(const std::string& fn_name) {
+ InstrumentedMutexLock l(&mutex_);
+
+ // Mark the function with fn_name as invalid so that it will not be
+ // requeued.
+ auto it = map_.find(fn_name);
+ if (it != map_.end() && it->second) {
+ it->second->Cancel();
+ }
+
+ // If the currently running function is fn_name, then we need to wait
+ // until it finishes before returning to caller.
+ while (!heap_.empty() && executing_task_) {
+ FunctionInfo* func_info = heap_.top();
+ assert(func_info);
+ if (func_info->name == fn_name) {
+ WaitForTaskCompleteIfNecessary();
+ } else {
+ break;
+ }
+ }
+ }
+
+ void CancelAll() {
+ InstrumentedMutexLock l(&mutex_);
+ CancelAllWithLock();
+ }
+
+ // Start the Timer
+ bool Start() {
+ InstrumentedMutexLock l(&mutex_);
+ if (running_) {
+ return false;
+ }
+
+ running_ = true;
+ thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
+ return true;
+ }
+
+ // Shutdown the Timer
+ bool Shutdown() {
+ {
+ InstrumentedMutexLock l(&mutex_);
+ if (!running_) {
+ return false;
+ }
+ running_ = false;
+ CancelAllWithLock();
+ cond_var_.SignalAll();
+ }
+
+ if (thread_) {
+ thread_->join();
+ }
+ return true;
+ }
+
+ bool HasPendingTask() const {
+ InstrumentedMutexLock l(&mutex_);
+ for (const auto& fn_info : map_) {
+ if (fn_info.second->IsValid()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+#ifndef NDEBUG
+ // Wait until Timer starting waiting, call the optional callback, then wait
+ // for Timer waiting again.
+ // Tests can provide a custom Clock object to mock time, and use the callback
+ // here to bump current time and trigger Timer. See timer_test for example.
+ //
+ // Note: only support one caller of this method.
+ void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
+ InstrumentedMutexLock l(&mutex_);
+ // It act as a spin lock
+ while (executing_task_ ||
+ (!heap_.empty() &&
+ heap_.top()->next_run_time_us <= clock_->NowMicros())) {
+ cond_var_.TimedWait(clock_->NowMicros() + 1000);
+ }
+ if (callback != nullptr) {
+ callback();
+ }
+ cond_var_.SignalAll();
+ do {
+ cond_var_.TimedWait(clock_->NowMicros() + 1000);
+ } while (executing_task_ ||
+ (!heap_.empty() &&
+ heap_.top()->next_run_time_us <= clock_->NowMicros()));
+ }
+
+ size_t TEST_GetPendingTaskNum() const {
+ InstrumentedMutexLock l(&mutex_);
+ size_t ret = 0;
+ for (const auto& fn_info : map_) {
+ if (fn_info.second->IsValid()) {
+ ret++;
+ }
+ }
+ return ret;
+ }
+
+ void TEST_OverrideTimer(SystemClock* clock) {
+ InstrumentedMutexLock l(&mutex_);
+ clock_ = clock;
+ }
+#endif // NDEBUG
+
+ private:
+ void Run() {
+ InstrumentedMutexLock l(&mutex_);
+
+ while (running_) {
+ if (heap_.empty()) {
+ // wait
+ TEST_SYNC_POINT("Timer::Run::Waiting");
+ cond_var_.Wait();
+ continue;
+ }
+
+ FunctionInfo* current_fn = heap_.top();
+ assert(current_fn);
+
+ if (!current_fn->IsValid()) {
+ heap_.pop();
+ map_.erase(current_fn->name);
+ continue;
+ }
+
+ if (current_fn->next_run_time_us <= clock_->NowMicros()) {
+ // make a copy of the function so it won't be changed after
+ // mutex_.unlock.
+ std::function<void()> fn = current_fn->fn;
+ executing_task_ = true;
+ mutex_.Unlock();
+ // Execute the work
+ fn();
+ mutex_.Lock();
+ executing_task_ = false;
+ cond_var_.SignalAll();
+
+ // Remove the work from the heap once it is done executing, make sure
+ // it's the same function after executing the work while mutex is
+ // released.
+ // Note that we are just removing the pointer from the heap. Its
+ // memory is still managed in the map (as it holds a unique ptr).
+ // So current_fn is still a valid ptr.
+ assert(heap_.top() == current_fn);
+ heap_.pop();
+
+ // current_fn may be cancelled already.
+ if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
+ assert(running_);
+ current_fn->next_run_time_us =
+ clock_->NowMicros() + current_fn->repeat_every_us;
+
+ // Schedule new work into the heap with new time.
+ heap_.push(current_fn);
+ } else {
+ // if current_fn is cancelled or no need to repeat, remove it from the
+ // map to avoid leak.
+ map_.erase(current_fn->name);
+ }
+ } else {
+ cond_var_.TimedWait(current_fn->next_run_time_us);
+ }
+ }
+ }
+
+ void CancelAllWithLock() {
+ mutex_.AssertHeld();
+ if (map_.empty() && heap_.empty()) {
+ return;
+ }
+
+ // With mutex_ held, set all tasks to invalid so that they will not be
+ // re-queued.
+ for (auto& elem : map_) {
+ auto& func_info = elem.second;
+ assert(func_info);
+ func_info->Cancel();
+ }
+
+ // WaitForTaskCompleteIfNecessary() may release mutex_
+ WaitForTaskCompleteIfNecessary();
+
+ while (!heap_.empty()) {
+ heap_.pop();
+ }
+ map_.clear();
+ }
+
+ // A wrapper around std::function to keep track when it should run next
+ // and at what frequency.
+ struct FunctionInfo {
+ // the actual work
+ std::function<void()> fn;
+ // name of the function
+ std::string name;
+ // when the function should run next
+ uint64_t next_run_time_us;
+ // repeat interval
+ uint64_t repeat_every_us;
+ // controls whether this function is valid.
+ // A function is valid upon construction and until someone explicitly
+ // calls `Cancel()`.
+ bool valid;
+
+ FunctionInfo(std::function<void()>&& _fn, std::string _name,
+ const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
+ : fn(std::move(_fn)),
+ name(std::move(_name)),
+ next_run_time_us(_next_run_time_us),
+ repeat_every_us(_repeat_every_us),
+ valid(true) {}
+
+ void Cancel() { valid = false; }
+
+ bool IsValid() const { return valid; }
+ };
+
+ void WaitForTaskCompleteIfNecessary() {
+ mutex_.AssertHeld();
+ while (executing_task_) {
+ TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
+ cond_var_.Wait();
+ }
+ }
+
+ struct RunTimeOrder {
+ bool operator()(const FunctionInfo* f1, const FunctionInfo* f2) {
+ return f1->next_run_time_us > f2->next_run_time_us;
+ }
+ };
+
+ SystemClock* clock_;
+ // This mutex controls both the heap_ and the map_. It needs to be held for
+ // making any changes in them.
+ mutable InstrumentedMutex mutex_;
+ InstrumentedCondVar cond_var_;
+ std::unique_ptr<port::Thread> thread_;
+ bool running_;
+ bool executing_task_;
+
+ std::priority_queue<FunctionInfo*, std::vector<FunctionInfo*>, RunTimeOrder>
+ heap_;
+
+ // In addition to providing a mapping from a function name to a function,
+ // it is also responsible for memory management.
+ std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
+};
+
+} // namespace ROCKSDB_NAMESPACE