summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/repeatable_thread.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rocksdb/util/repeatable_thread.h
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/util/repeatable_thread.h')
-rw-r--r--src/rocksdb/util/repeatable_thread.h149
1 files changed, 149 insertions, 0 deletions
diff --git a/src/rocksdb/util/repeatable_thread.h b/src/rocksdb/util/repeatable_thread.h
new file mode 100644
index 000000000..1ac8edee6
--- /dev/null
+++ b/src/rocksdb/util/repeatable_thread.h
@@ -0,0 +1,149 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <functional>
+#include <string>
+
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "test_util/mock_time_env.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Simple wrapper around port::Thread that supports calling a callback every
+// X seconds. If you pass in 0, then it will call your callback repeatedly
+// without delay.
+class RepeatableThread {
+ public:
+ RepeatableThread(std::function<void()> function,
+ const std::string& thread_name, Env* env, uint64_t delay_us,
+ uint64_t initial_delay_us = 0)
+ : function_(function),
+ thread_name_("rocksdb:" + thread_name),
+ env_(env),
+ delay_us_(delay_us),
+ initial_delay_us_(initial_delay_us),
+ mutex_(env),
+ cond_var_(&mutex_),
+ running_(true),
+#ifndef NDEBUG
+ waiting_(false),
+ run_count_(0),
+#endif
+ thread_([this] { thread(); }) {
+ }
+
+ void cancel() {
+ {
+ InstrumentedMutexLock l(&mutex_);
+ if (!running_) {
+ return;
+ }
+ running_ = false;
+ cond_var_.SignalAll();
+ }
+ thread_.join();
+ }
+
+ bool IsRunning() { return running_; }
+
+ ~RepeatableThread() { cancel(); }
+
+#ifndef NDEBUG
+ // Wait until RepeatableThread starting waiting, call the optional callback,
+ // then wait for one run of RepeatableThread. Tests can use provide a
+ // custom env object to mock time, and use the callback here to bump current
+ // time and trigger RepeatableThread. See repeatable_thread_test for example.
+ //
+ // Note: only support one caller of this method.
+ void TEST_WaitForRun(std::function<void()> callback = nullptr) {
+ InstrumentedMutexLock l(&mutex_);
+ while (!waiting_) {
+ cond_var_.Wait();
+ }
+ uint64_t prev_count = run_count_;
+ if (callback != nullptr) {
+ callback();
+ }
+ cond_var_.SignalAll();
+ while (!(run_count_ > prev_count)) {
+ cond_var_.Wait();
+ }
+ }
+#endif
+
+ private:
+ bool wait(uint64_t delay) {
+ InstrumentedMutexLock l(&mutex_);
+ if (running_ && delay > 0) {
+ uint64_t wait_until = env_->NowMicros() + delay;
+#ifndef NDEBUG
+ waiting_ = true;
+ cond_var_.SignalAll();
+#endif
+ while (running_) {
+ cond_var_.TimedWait(wait_until);
+ if (env_->NowMicros() >= wait_until) {
+ break;
+ }
+ }
+#ifndef NDEBUG
+ waiting_ = false;
+#endif
+ }
+ return running_;
+ }
+
+ void thread() {
+#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
+#if __GLIBC_PREREQ(2, 12)
+ // Set thread name.
+ auto thread_handle = thread_.native_handle();
+ int ret __attribute__((__unused__)) =
+ pthread_setname_np(thread_handle, thread_name_.c_str());
+ assert(ret == 0);
+#endif
+#endif
+
+ assert(delay_us_ > 0);
+ if (!wait(initial_delay_us_)) {
+ return;
+ }
+ do {
+ function_();
+#ifndef NDEBUG
+ {
+ InstrumentedMutexLock l(&mutex_);
+ run_count_++;
+ cond_var_.SignalAll();
+ }
+#endif
+ } while (wait(delay_us_));
+ }
+
+ const std::function<void()> function_;
+ const std::string thread_name_;
+ Env* const env_;
+ const uint64_t delay_us_;
+ const uint64_t initial_delay_us_;
+
+ // Mutex lock should be held when accessing running_, waiting_
+ // and run_count_.
+ InstrumentedMutex mutex_;
+ InstrumentedCondVar cond_var_;
+ bool running_;
+#ifndef NDEBUG
+ // RepeatableThread waiting for timeout.
+ bool waiting_;
+ // Times function_ had run.
+ uint64_t run_count_;
+#endif
+ port::Thread thread_;
+};
+
+} // namespace ROCKSDB_NAMESPACE