summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/repeatable_thread.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/util/repeatable_thread.h
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/rocksdb/util/repeatable_thread.h146
1 files changed, 146 insertions, 0 deletions
diff --git a/src/rocksdb/util/repeatable_thread.h b/src/rocksdb/util/repeatable_thread.h
new file mode 100644
index 00000000..967cc499
--- /dev/null
+++ b/src/rocksdb/util/repeatable_thread.h
@@ -0,0 +1,146 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <functional>
+#include <string>
+
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "util/mock_time_env.h"
+#include "util/mutexlock.h"
+
+namespace rocksdb {
+
+class RepeatableThread {
+ public:
+ RepeatableThread(std::function<void()> function,
+ const std::string& thread_name, Env* env, uint64_t delay_us,
+ uint64_t initial_delay_us = 0)
+ : function_(function),
+ thread_name_("rocksdb:" + thread_name),
+ env_(env),
+ delay_us_(delay_us),
+ initial_delay_us_(initial_delay_us),
+ mutex_(env),
+ cond_var_(&mutex_),
+ running_(true),
+#ifndef NDEBUG
+ waiting_(false),
+ run_count_(0),
+#endif
+ thread_([this] { thread(); }) {
+ }
+
+ void cancel() {
+ {
+ InstrumentedMutexLock l(&mutex_);
+ if (!running_) {
+ return;
+ }
+ running_ = false;
+ cond_var_.SignalAll();
+ }
+ thread_.join();
+ }
+
+ bool IsRunning() { return running_; }
+
+ ~RepeatableThread() { cancel(); }
+
+#ifndef NDEBUG
+ // Wait until RepeatableThread starting waiting, call the optional callback,
+ // then wait for one run of RepeatableThread. Tests can use provide a
+ // custom env object to mock time, and use the callback here to bump current
+ // time and trigger RepeatableThread. See repeatable_thread_test for example.
+ //
+ // Note: only support one caller of this method.
+ void TEST_WaitForRun(std::function<void()> callback = nullptr) {
+ InstrumentedMutexLock l(&mutex_);
+ while (!waiting_) {
+ cond_var_.Wait();
+ }
+ uint64_t prev_count = run_count_;
+ if (callback != nullptr) {
+ callback();
+ }
+ cond_var_.SignalAll();
+ while (!(run_count_ > prev_count)) {
+ cond_var_.Wait();
+ }
+ }
+#endif
+
+ private:
+ bool wait(uint64_t delay) {
+ InstrumentedMutexLock l(&mutex_);
+ if (running_ && delay > 0) {
+ uint64_t wait_until = env_->NowMicros() + delay;
+#ifndef NDEBUG
+ waiting_ = true;
+ cond_var_.SignalAll();
+#endif
+ while (running_) {
+ cond_var_.TimedWait(wait_until);
+ if (env_->NowMicros() >= wait_until) {
+ break;
+ }
+ }
+#ifndef NDEBUG
+ waiting_ = false;
+#endif
+ }
+ return running_;
+ }
+
+ void thread() {
+#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
+#if __GLIBC_PREREQ(2, 12)
+ // Set thread name.
+ auto thread_handle = thread_.native_handle();
+ int ret __attribute__((__unused__)) =
+ pthread_setname_np(thread_handle, thread_name_.c_str());
+ assert(ret == 0);
+#endif
+#endif
+
+ assert(delay_us_ > 0);
+ if (!wait(initial_delay_us_)) {
+ return;
+ }
+ do {
+ function_();
+#ifndef NDEBUG
+ {
+ InstrumentedMutexLock l(&mutex_);
+ run_count_++;
+ cond_var_.SignalAll();
+ }
+#endif
+ } while (wait(delay_us_));
+ }
+
+ const std::function<void()> function_;
+ const std::string thread_name_;
+ Env* const env_;
+ const uint64_t delay_us_;
+ const uint64_t initial_delay_us_;
+
+ // Mutex lock should be held when accessing running_, waiting_
+ // and run_count_.
+ InstrumentedMutex mutex_;
+ InstrumentedCondVar cond_var_;
+ bool running_;
+#ifndef NDEBUG
+ // RepeatableThread waiting for timeout.
+ bool waiting_;
+ // Times function_ had run.
+ uint64_t run_count_;
+#endif
+ port::Thread thread_;
+};
+
+} // namespace rocksdb