diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/db_write_buffer_manager_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/db_write_buffer_manager_test.cc | 862 |
1 files changed, 862 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_write_buffer_manager_test.cc b/src/rocksdb/db/db_write_buffer_manager_test.cc new file mode 100644 index 000000000..4c31a7824 --- /dev/null +++ b/src/rocksdb/db/db_write_buffer_manager_test.cc @@ -0,0 +1,862 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_test_util.h" +#include "db/write_thread.h" +#include "port/stack_trace.h" + +namespace ROCKSDB_NAMESPACE { + +class DBWriteBufferManagerTest : public DBTestBase, + public testing::WithParamInterface<bool> { + public: + DBWriteBufferManagerTest() + : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + bool cost_cache_; +}; + +TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + + WriteOptions wo; + wo.disableWAL = true; + + CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + + // Write to "Default", "cf2" and "cf3". + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + + ASSERT_OK(Put(3, Key(2), DummyString(40000), wo)); + // WriteBufferManager::buffer_size_ has exceeded after the previous write is + // completed. + + // This make sures write will go through and if stall was in effect, it will + // end. + ASSERT_OK(Put(0, Key(2), DummyString(1), wo)); +} + +// Test Single DB with multiple writer threads get blocked when +// WriteBufferManager execeeds buffer_size_ and flush is waiting to be +// finished. +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + WriteOptions wo; + wo.disableWAL = true; + + CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + + // Write to "Default", "cf2" and "cf3". No flush will be triggered. + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + + ASSERT_OK(Put(3, Key(2), DummyString(40000), wo)); + // WriteBufferManager::buffer_size_ has exceeded after the previous write is + // completed. + + std::unordered_set<WriteThread::Writer*> w_set; + std::vector<port::Thread> threads; + int wait_count_db = 0; + int num_writers = 4; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::atomic<int> thread_num(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.SignalAll(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + InstrumentedMutexLock lock(&mutex); + WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg); + w_set.insert(w); + // Allow the flush to continue if all writer threads are blocked. + if (w_set.size() == (unsigned long)num_writers) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s = true; + + std::function<void(int)> writer = [&](int cf) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + Status tmp = Put(cf, Slice(key), DummyString(1), wo); + InstrumentedMutexLock lock(&mutex); + s = s && tmp.ok(); + }; + + // Flow: + // main_writer thread will write but will be blocked (as Flush will on hold, + // buffer_size_ has exceeded, thus will create stall in effect). + // | + // | + // multiple writer threads will be created to write across multiple columns + // and they will be blocked. + // | + // | + // Last writer thread will write and when its blocked it will signal Flush to + // continue to clear the stall. + + threads.emplace_back(writer, 1); + // Wait untill first thread (main_writer) writing to DB is blocked and then + // create the multiple writers which will be blocked from getting added to the + // queue because stall is in effect. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + for (int i = 0; i < num_writers; i++) { + threads.emplace_back(writer, i % 4); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_TRUE(s); + + // Number of DBs blocked. + ASSERT_EQ(wait_count_db, 1); + // Number of Writer threads blocked. + ASSERT_EQ(w_set.size(), num_writers); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple DBs get blocked when WriteBufferManager limit exceeds and flush +// is waiting to be finished but DBs tries to write meanwhile. +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) { + std::vector<std::string> dbnames; + std::vector<DB*> dbs; + int num_dbs = 3; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + WriteOptions wo; + wo.disableWAL = true; + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000))); + } + // Insert to db_. + ASSERT_OK(Put(0, Key(1), DummyString(30000), wo)); + + // WriteBufferManager Limit exceeded. + std::vector<port::Thread> threads; + int wait_count_db = 0; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.Signal(); + // Since this is the last DB, signal Flush to continue. + if (wait_count_db == num_dbs + 1) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s = true; + + // Write to DB. + std::function<void(DB*)> write_db = [&](DB* db) { + Status tmp = db->Put(wo, Key(3), DummyString(1)); + InstrumentedMutexLock lock(&mutex); + s = s && tmp.ok(); + }; + + // Flow: + // db_ will write and will be blocked (as Flush will on hold and will create + // stall in effect). + // | + // multiple dbs writers will be created to write to that db and they will be + // blocked. + // | + // | + // Last writer will write and when its blocked it will signal Flush to + // continue to clear the stall. + + threads.emplace_back(write_db, db_); + // Wait untill first DB is blocked and then create the multiple writers for + // different DBs which will be blocked from getting added to the queue because + // stall is in effect. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + for (int i = 0; i < num_dbs; i++) { + threads.emplace_back(write_db, dbs[i]); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_TRUE(s); + ASSERT_EQ(num_dbs + 1, wait_count_db); + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple threads writing across multiple DBs and multiple columns get +// blocked when stall by WriteBufferManager is in effect. +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) { + std::vector<std::string> dbnames; + std::vector<DB*> dbs; + int num_dbs = 3; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + WriteOptions wo; + wo.disableWAL = true; + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000))); + } + // Insert to db_. + ASSERT_OK(Put(0, Key(1), DummyString(30000), wo)); + + // WriteBufferManager::buffer_size_ has exceeded after the previous write to + // dbs[0] is completed. + std::vector<port::Thread> threads; + int wait_count_db = 0; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::unordered_set<WriteThread::Writer*> w_set; + std::vector<port::Thread> writer_threads; + std::atomic<int> thread_num(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + thread_num.fetch_add(1); + cv.Signal(); + // Allow the flush to continue if all writer threads are blocked. + if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg); + { + InstrumentedMutexLock lock(&mutex); + w_set.insert(w); + thread_num.fetch_add(1); + // Allow the flush continue if all writer threads are blocked. + if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s1 = true, s2 = true; + // Write to multiple columns of db_. + std::function<void(int)> write_cf = [&](int cf) { + Status tmp = Put(cf, Key(3), DummyString(1), wo); + InstrumentedMutexLock lock(&mutex); + s1 = s1 && tmp.ok(); + }; + // Write to multiple DBs. + std::function<void(DB*)> write_db = [&](DB* db) { + Status tmp = db->Put(wo, Key(3), DummyString(1)); + InstrumentedMutexLock lock(&mutex); + s2 = s2 && tmp.ok(); + }; + + // Flow: + // thread will write to db_ will be blocked (as Flush will on hold, + // buffer_size_ has exceeded and will create stall in effect). + // | + // | + // multiple writers threads writing to different DBs and to db_ across + // multiple columns will be created and they will be blocked due to stall. + // | + // | + // Last writer thread will write and when its blocked it will signal Flush to + // continue to clear the stall. + threads.emplace_back(write_db, db_); + // Wait untill first thread is blocked and then create the multiple writer + // threads. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + + for (int i = 0; i < num_dbs; i++) { + // Write to multiple columns of db_. + writer_threads.emplace_back(write_cf, i % 3); + // Write to different dbs. + threads.emplace_back(write_db, dbs[i]); + } + for (auto& t : threads) { + t.join(); + } + for (auto& t : writer_threads) { + t.join(); + } + + ASSERT_TRUE(s1); + ASSERT_TRUE(s2); + + // Number of DBs blocked. + ASSERT_EQ(num_dbs + 1, wait_count_db); + // Number of Writer threads blocked. + ASSERT_EQ(w_set.size(), num_dbs); + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple threads writing across multiple columns of db_ by passing +// different values to WriteOption.no_slown_down. +TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + WriteOptions wo; + wo.disableWAL = true; + + CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options); + + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + + // Write to "Default", "cf2" and "cf3". No flush will be triggered. + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(3, Key(2), DummyString(40000), wo)); + + // WriteBufferManager::buffer_size_ has exceeded after the previous write to + // db_ is completed. + + std::unordered_set<WriteThread::Writer*> w_slowdown_set; + std::vector<port::Thread> threads; + int wait_count_db = 0; + int num_writers = 4; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::atomic<int> thread_num(0); + std::atomic<int> w_no_slowdown(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.SignalAll(); + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + { + InstrumentedMutexLock lock(&mutex); + WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg); + w_slowdown_set.insert(w); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + (unsigned long)w_no_slowdown.load( + std::memory_order_relaxed) == + (unsigned long)num_writers) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s1 = true, s2 = true; + + std::function<void(int)> write_slow_down = [&](int cf) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = false; + Status tmp = Put(cf, Slice(key), DummyString(1), write_op); + InstrumentedMutexLock lock(&mutex); + s1 = s1 && tmp.ok(); + }; + + std::function<void(int)> write_no_slow_down = [&](int cf) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = true; + Status tmp = Put(cf, Slice(key), DummyString(1), write_op); + { + InstrumentedMutexLock lock(&mutex); + s2 = s2 && !tmp.ok(); + w_no_slowdown.fetch_add(1); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + + (unsigned long)w_no_slowdown.load(std::memory_order_relaxed) == + (unsigned long)num_writers) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }; + + // Flow: + // main_writer thread will write but will be blocked (as Flush will on hold, + // buffer_size_ has exceeded, thus will create stall in effect). + // | + // | + // multiple writer threads will be created to write across multiple columns + // with different values of WriteOptions.no_slowdown. Some of them will + // be blocked and some of them will return with Incomplete status. + // | + // | + // Last writer thread will write and when its blocked/return it will signal + // Flush to continue to clear the stall. + threads.emplace_back(write_slow_down, 1); + // Wait untill first thread (main_writer) writing to DB is blocked and then + // create the multiple writers which will be blocked from getting added to the + // queue because stall is in effect. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + + for (int i = 0; i < num_writers; i += 2) { + threads.emplace_back(write_no_slow_down, (i) % 4); + threads.emplace_back(write_slow_down, (i + 1) % 4); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_TRUE(s1); + ASSERT_TRUE(s2); + // Number of DBs blocked. + ASSERT_EQ(wait_count_db, 1); + // Number of Writer threads blocked. + ASSERT_EQ(w_slowdown_set.size(), num_writers / 2); + // Number of Writer threads with WriteOptions.no_slowdown = true. + ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_writers / 2); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple threads writing across multiple columns of db_ and different +// dbs by passing different values to WriteOption.no_slown_down. +TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { + std::vector<std::string> dbnames; + std::vector<DB*> dbs; + int num_dbs = 4; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + WriteOptions wo; + wo.disableWAL = true; + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000))); + } + // Insert to db_. + ASSERT_OK(Put(0, Key(1), DummyString(30000), wo)); + + // WriteBufferManager::buffer_size_ has exceeded after the previous write to + // dbs[0] is completed. + std::vector<port::Thread> threads; + int wait_count_db = 0; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::unordered_set<WriteThread::Writer*> w_slowdown_set; + std::vector<port::Thread> writer_threads; + std::atomic<int> thread_num(0); + std::atomic<int> w_no_slowdown(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.Signal(); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + + (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) + + wait_count_db) == + (unsigned long)(2 * num_dbs + 1)) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg); + InstrumentedMutexLock lock(&mutex); + w_slowdown_set.insert(w); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + + (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) + + wait_count_db) == + (unsigned long)(2 * num_dbs + 1)) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s1 = true, s2 = true; + std::function<void(DB*)> write_slow_down = [&](DB* db) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = false; + Status tmp = db->Put(write_op, Slice(key), DummyString(1)); + InstrumentedMutexLock lock(&mutex); + s1 = s1 && tmp.ok(); + }; + + std::function<void(DB*)> write_no_slow_down = [&](DB* db) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = true; + Status tmp = db->Put(write_op, Slice(key), DummyString(1)); + { + InstrumentedMutexLock lock(&mutex); + s2 = s2 && !tmp.ok(); + w_no_slowdown.fetch_add(1); + if (w_slowdown_set.size() + + (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) + + wait_count_db) == + (unsigned long)(2 * num_dbs + 1)) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }; + + // Flow: + // first thread will write but will be blocked (as Flush will on hold, + // buffer_size_ has exceeded, thus will create stall in effect). + // | + // | + // multiple writer threads will be created to write across multiple columns + // of db_ and different DBs with different values of + // WriteOptions.no_slowdown. Some of them will be blocked and some of them + // will return with Incomplete status. + // | + // | + // Last writer thread will write and when its blocked/return it will signal + // Flush to continue to clear the stall. + threads.emplace_back(write_slow_down, db_); + // Wait untill first thread writing to DB is blocked and then + // create the multiple writers. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + + for (int i = 0; i < num_dbs; i += 2) { + // Write to multiple columns of db_. + writer_threads.emplace_back(write_slow_down, db_); + writer_threads.emplace_back(write_no_slow_down, db_); + // Write to different DBs. + threads.emplace_back(write_slow_down, dbs[i]); + threads.emplace_back(write_no_slow_down, dbs[i + 1]); + } + + for (auto& t : threads) { + t.join(); + } + + for (auto& t : writer_threads) { + t.join(); + } + + ASSERT_TRUE(s1); + ASSERT_TRUE(s2); + // Number of DBs blocked. + ASSERT_EQ((num_dbs / 2) + 1, wait_count_db); + // Number of writer threads writing to db_ blocked from getting added to the + // queue. + ASSERT_EQ(w_slowdown_set.size(), num_dbs / 2); + // Number of threads with WriteOptions.no_slowdown = true. + ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_dbs); + + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +#ifndef ROCKSDB_LITE + +// Tests a `WriteBufferManager` constructed with `allow_stall == false` does not +// thrash memtable switching when full and a CF receives multiple writes. +// Instead, we expect to switch a CF's memtable for flush only when that CF does +// not have any pending or running flush. +// +// This test uses multiple DBs each with a single CF instead of a single DB +// with multiple CFs. That way we can control which CF is considered for switch +// by writing to that CF's DB. +// +// Not supported in LITE mode due to `GetProperty()` unavailable. +TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) { + Options options = CurrentOptions(); + options.arena_block_size = 4 << 10; // 4KB + options.write_buffer_size = 1 << 20; // 1MB + std::shared_ptr<Cache> cache = + NewLRUCache(4 << 20 /* capacity (4MB) */, 2 /* num_shard_bits */); + ASSERT_LT(cache->GetUsage(), 256 << 10 /* 256KB */); + cost_cache_ = GetParam(); + if (cost_cache_) { + options.write_buffer_manager.reset(new WriteBufferManager( + 512 << 10 /* buffer_size (512KB) */, cache, false /* allow_stall */)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(512 << 10 /* buffer_size (512KB) */, + nullptr /* cache */, false /* allow_stall */)); + } + + Reopen(options); + std::string dbname = test::PerThreadDBPath("db_shared_wbm_db"); + DB* shared_wbm_db = nullptr; + + ASSERT_OK(DestroyDB(dbname, options)); + ASSERT_OK(DB::Open(options, dbname, &shared_wbm_db)); + + // The last write will make WBM need flush, but it won't flush yet. + ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions())); + ASSERT_FALSE(options.write_buffer_manager->ShouldFlush()); + ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions())); + ASSERT_TRUE(options.write_buffer_manager->ShouldFlush()); + + // Flushes will be pending, not running because flush threads are blocked. + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK( + shared_wbm_db->Put(WriteOptions(), Key(1), DummyString(1 /* len */))); + std::string prop; + ASSERT_TRUE( + shared_wbm_db->GetProperty("rocksdb.num-immutable-mem-table", &prop)); + ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop); + ASSERT_TRUE( + shared_wbm_db->GetProperty("rocksdb.mem-table-flush-pending", &prop)); + ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop); + } + + // Clean up DBs. + sleeping_task_high.WakeUp(); + sleeping_task_high.WaitUntilDone(); + ASSERT_OK(shared_wbm_db->Close()); + ASSERT_OK(DestroyDB(dbname, options)); + delete shared_wbm_db; +} + +#endif // ROCKSDB_LITE + +INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, + testing::Bool()); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} |