diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/util/thread_list_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/util/thread_list_test.cc')
-rw-r--r-- | src/rocksdb/util/thread_list_test.cc | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/src/rocksdb/util/thread_list_test.cc b/src/rocksdb/util/thread_list_test.cc new file mode 100644 index 00000000..a4a343a9 --- /dev/null +++ b/src/rocksdb/util/thread_list_test.cc @@ -0,0 +1,352 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <mutex> +#include <condition_variable> + +#include "monitoring/thread_status_updater.h" +#include "rocksdb/db.h" +#include "util/testharness.h" + +#ifdef ROCKSDB_USING_THREAD_STATUS + +namespace rocksdb { + +class SimulatedBackgroundTask { + public: + SimulatedBackgroundTask( + const void* db_key, const std::string& db_name, + const void* cf_key, const std::string& cf_name, + const ThreadStatus::OperationType operation_type = + ThreadStatus::OP_UNKNOWN, + const ThreadStatus::StateType state_type = + ThreadStatus::STATE_UNKNOWN) + : db_key_(db_key), db_name_(db_name), + cf_key_(cf_key), cf_name_(cf_name), + operation_type_(operation_type), state_type_(state_type), + should_run_(true), running_count_(0) { + Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo( + db_key_, db_name_, cf_key_, cf_name_); + } + + ~SimulatedBackgroundTask() { + Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_); + } + + void Run() { + std::unique_lock<std::mutex> l(mutex_); + running_count_++; + Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_); + Env::Default()->GetThreadStatusUpdater()->SetThreadOperation( + operation_type_); + Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_); + while (should_run_) { + bg_cv_.wait(l); + } + Env::Default()->GetThreadStatusUpdater()->ClearThreadState(); + Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation(); + Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(nullptr); + running_count_--; + bg_cv_.notify_all(); + } + + void FinishAllTasks() { + std::unique_lock<std::mutex> l(mutex_); + should_run_ = false; + bg_cv_.notify_all(); + } + + void WaitUntilScheduled(int job_count, Env* env) { + while (running_count_ < job_count) { + env->SleepForMicroseconds(1000); + } + } + + void WaitUntilDone() { + std::unique_lock<std::mutex> l(mutex_); + while (running_count_ > 0) { + bg_cv_.wait(l); + } + } + + static void DoSimulatedTask(void* arg) { + reinterpret_cast<SimulatedBackgroundTask*>(arg)->Run(); + } + + private: + const void* db_key_; + const std::string db_name_; + const void* cf_key_; + const std::string cf_name_; + const ThreadStatus::OperationType operation_type_; + const ThreadStatus::StateType state_type_; + std::mutex mutex_; + std::condition_variable bg_cv_; + bool should_run_; + std::atomic<int> running_count_; +}; + +class ThreadListTest : public testing::Test { + public: + ThreadListTest() { + } +}; + +TEST_F(ThreadListTest, GlobalTables) { + // verify the global tables for operations and states are properly indexed. + for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) { + ASSERT_EQ(global_operation_table[type].type, type); + ASSERT_EQ(global_operation_table[type].name, + ThreadStatus::GetOperationName( + ThreadStatus::OperationType(type))); + } + + for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) { + ASSERT_EQ(global_state_table[type].type, type); + ASSERT_EQ(global_state_table[type].name, + ThreadStatus::GetStateName( + ThreadStatus::StateType(type))); + } + + for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) { + ASSERT_EQ(global_op_stage_table[stage].stage, stage); + ASSERT_EQ(global_op_stage_table[stage].name, + ThreadStatus::GetOperationStageName( + ThreadStatus::OperationStage(stage))); + } +} + +TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) { + Env* env = Env::Default(); + const int kHighPriorityThreads = 3; + const int kLowPriorityThreads = 5; + const int kSimulatedHighPriThreads = kHighPriorityThreads - 1; + const int kSimulatedLowPriThreads = kLowPriorityThreads / 3; + env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH); + env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW); + + SimulatedBackgroundTask running_task( + reinterpret_cast<void*>(1234), "running", + reinterpret_cast<void*>(5678), "pikachu"); + + for (int test = 0; test < kSimulatedHighPriThreads; ++test) { + env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, + &running_task, Env::Priority::HIGH); + } + for (int test = 0; test < kSimulatedLowPriThreads; ++test) { + env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, + &running_task, Env::Priority::LOW); + } + running_task.WaitUntilScheduled( + kSimulatedHighPriThreads + kSimulatedLowPriThreads, env); + + std::vector<ThreadStatus> thread_list; + + // Verify the number of running threads in each pool. + env->GetThreadList(&thread_list); + int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0}; + for (auto thread_status : thread_list) { + if (thread_status.cf_name == "pikachu" && + thread_status.db_name == "running") { + running_count[thread_status.thread_type]++; + } + } + ASSERT_EQ( + running_count[ThreadStatus::HIGH_PRIORITY], + kSimulatedHighPriThreads); + ASSERT_EQ( + running_count[ThreadStatus::LOW_PRIORITY], + kSimulatedLowPriThreads); + ASSERT_EQ( + running_count[ThreadStatus::USER], 0); + + running_task.FinishAllTasks(); + running_task.WaitUntilDone(); + + // Verify none of the threads are running + env->GetThreadList(&thread_list); + + for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) { + running_count[i] = 0; + } + for (auto thread_status : thread_list) { + if (thread_status.cf_name == "pikachu" && + thread_status.db_name == "running") { + running_count[thread_status.thread_type]++; + } + } + + ASSERT_EQ( + running_count[ThreadStatus::HIGH_PRIORITY], 0); + ASSERT_EQ( + running_count[ThreadStatus::LOW_PRIORITY], 0); + ASSERT_EQ( + running_count[ThreadStatus::USER], 0); +} + +namespace { + void UpdateStatusCounts( + const std::vector<ThreadStatus>& thread_list, + int operation_counts[], int state_counts[]) { + for (auto thread_status : thread_list) { + operation_counts[thread_status.operation_type]++; + state_counts[thread_status.state_type]++; + } + } + + void VerifyAndResetCounts( + const int correct_counts[], int collected_counts[], int size) { + for (int i = 0; i < size; ++i) { + ASSERT_EQ(collected_counts[i], correct_counts[i]); + collected_counts[i] = 0; + } + } + + void UpdateCount( + int operation_counts[], int from_event, int to_event, int amount) { + operation_counts[from_event] -= amount; + operation_counts[to_event] += amount; + } +} // namespace + +TEST_F(ThreadListTest, SimpleEventTest) { + Env* env = Env::Default(); + + // simulated tasks + const int kFlushWriteTasks = 3; + SimulatedBackgroundTask flush_write_task( + reinterpret_cast<void*>(1234), "running", + reinterpret_cast<void*>(5678), "pikachu", + ThreadStatus::OP_FLUSH); + + const int kCompactionWriteTasks = 4; + SimulatedBackgroundTask compaction_write_task( + reinterpret_cast<void*>(1234), "running", + reinterpret_cast<void*>(5678), "pikachu", + ThreadStatus::OP_COMPACTION); + + const int kCompactionReadTasks = 5; + SimulatedBackgroundTask compaction_read_task( + reinterpret_cast<void*>(1234), "running", + reinterpret_cast<void*>(5678), "pikachu", + ThreadStatus::OP_COMPACTION); + + const int kCompactionWaitTasks = 6; + SimulatedBackgroundTask compaction_wait_task( + reinterpret_cast<void*>(1234), "running", + reinterpret_cast<void*>(5678), "pikachu", + ThreadStatus::OP_COMPACTION); + + // setup right answers + int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0}; + correct_operation_counts[ThreadStatus::OP_FLUSH] = + kFlushWriteTasks; + correct_operation_counts[ThreadStatus::OP_COMPACTION] = + kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks; + + env->SetBackgroundThreads( + correct_operation_counts[ThreadStatus::OP_FLUSH], Env::HIGH); + env->SetBackgroundThreads( + correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW); + + // schedule the simulated tasks + for (int t = 0; t < kFlushWriteTasks; ++t) { + env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, + &flush_write_task, Env::Priority::HIGH); + } + flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env); + + for (int t = 0; t < kCompactionWriteTasks; ++t) { + env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, + &compaction_write_task, Env::Priority::LOW); + } + compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env); + + for (int t = 0; t < kCompactionReadTasks; ++t) { + env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, + &compaction_read_task, Env::Priority::LOW); + } + compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env); + + for (int t = 0; t < kCompactionWaitTasks; ++t) { + env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, + &compaction_wait_task, Env::Priority::LOW); + } + compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env); + + // verify the thread-status + int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0}; + int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0}; + + std::vector<ThreadStatus> thread_list; + env->GetThreadList(&thread_list); + UpdateStatusCounts(thread_list, operation_counts, state_counts); + VerifyAndResetCounts(correct_operation_counts, operation_counts, + ThreadStatus::NUM_OP_TYPES); + + // terminate compaction-wait tasks and see if the thread-status + // reflects this update + compaction_wait_task.FinishAllTasks(); + compaction_wait_task.WaitUntilDone(); + UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, + ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks); + + env->GetThreadList(&thread_list); + UpdateStatusCounts(thread_list, operation_counts, state_counts); + VerifyAndResetCounts(correct_operation_counts, operation_counts, + ThreadStatus::NUM_OP_TYPES); + + // terminate flush-write tasks and see if the thread-status + // reflects this update + flush_write_task.FinishAllTasks(); + flush_write_task.WaitUntilDone(); + UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH, + ThreadStatus::OP_UNKNOWN, kFlushWriteTasks); + + env->GetThreadList(&thread_list); + UpdateStatusCounts(thread_list, operation_counts, state_counts); + VerifyAndResetCounts(correct_operation_counts, operation_counts, + ThreadStatus::NUM_OP_TYPES); + + // terminate compaction-write tasks and see if the thread-status + // reflects this update + compaction_write_task.FinishAllTasks(); + compaction_write_task.WaitUntilDone(); + UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, + ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks); + + env->GetThreadList(&thread_list); + UpdateStatusCounts(thread_list, operation_counts, state_counts); + VerifyAndResetCounts(correct_operation_counts, operation_counts, + ThreadStatus::NUM_OP_TYPES); + + // terminate compaction-write tasks and see if the thread-status + // reflects this update + compaction_read_task.FinishAllTasks(); + compaction_read_task.WaitUntilDone(); + UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, + ThreadStatus::OP_UNKNOWN, kCompactionReadTasks); + + env->GetThreadList(&thread_list); + UpdateStatusCounts(thread_list, operation_counts, state_counts); + VerifyAndResetCounts(correct_operation_counts, operation_counts, + ThreadStatus::NUM_OP_TYPES); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return 0; +} + +#endif // ROCKSDB_USING_THREAD_STATUS |