diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/db_write_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/db_write_test.cc | 679 |
1 files changed, 679 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_write_test.cc b/src/rocksdb/db/db_write_test.cc new file mode 100644 index 000000000..1011d5c9e --- /dev/null +++ b/src/rocksdb/db/db_write_test.cc @@ -0,0 +1,679 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <atomic> +#include <fstream> +#include <memory> +#include <thread> +#include <vector> + +#include "db/db_test_util.h" +#include "db/write_batch_internal.h" +#include "db/write_thread.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" +#include "util/random.h" +#include "util/string_util.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { + +// Test variations of WriteImpl. +class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> { + public: + DBWriteTest() : DBTestBase("db_write_test", /*env_do_fsync=*/true) {} + + Options GetOptions() { return DBTestBase::GetOptions(GetParam()); } + + void Open() { DBTestBase::Reopen(GetOptions()); } +}; + +class DBWriteTestUnparameterized : public DBTestBase { + public: + explicit DBWriteTestUnparameterized() + : DBTestBase("pipelined_write_test", /*env_do_fsync=*/false) {} +}; + +// It is invalid to do sync write while disabling WAL. +TEST_P(DBWriteTest, SyncAndDisableWAL) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = true; + ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument()); + WriteBatch batch; + ASSERT_OK(batch.Put("foo", "bar")); + ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument()); +} + +TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) { + Options options = GetOptions(); + options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = + 4; + std::vector<port::Thread> threads; + std::atomic<int> thread_num(0); + port::Mutex mutex; + port::CondVar cv(&mutex); + // Guarded by mutex + int writers = 0; + + Reopen(options); + + std::function<void()> write_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = false; + ASSERT_OK(dbfull()->Put(wo, key, "bar")); + }; + std::function<void()> write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + Status s = dbfull()->Put(wo, key, "bar"); + ASSERT_TRUE(s.ok() || s.IsIncomplete()); + }; + std::function<void(void*)> unblock_main_thread_func = [&](void*) { + mutex.Lock(); + ++writers; + cv.SignalAll(); + mutex.Unlock(); + }; + + // Create 3 L0 files and schedule 4th without waiting + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1", + "DBImpl::BackgroundCallFlush:start"}, + {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2", + "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"}, + // Make compaction start wait for the write stall to be detected and + // implemented by a write group leader + {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Schedule creation of 4th L0 file without waiting. This will seal the + // memtable and then wait for a sync point before writing the file. We need + // to do it this way because SwitchMemtable() needs to enter the + // write_thread + FlushOptions fopt; + fopt.wait = false; + ASSERT_OK(dbfull()->Flush(fopt)); + + // Create a mix of slowdown/no_slowdown write threads + mutex.Lock(); + // First leader + threads.emplace_back(write_slowdown_func); + while (writers != 1) { + cv.Wait(); + } + + // Second leader. Will stall writes + // Build a writers list with no slowdown in the middle: + // +-------------+ + // | slowdown +<----+ newest + // +--+----------+ + // | + // v + // +--+----------+ + // | no slowdown | + // +--+----------+ + // | + // v + // +--+----------+ + // | slowdown + + // +-------------+ + threads.emplace_back(write_slowdown_func); + while (writers != 2) { + cv.Wait(); + } + threads.emplace_back(write_no_slowdown_func); + while (writers != 3) { + cv.Wait(); + } + threads.emplace_back(write_slowdown_func); + while (writers != 4) { + cv.Wait(); + } + + mutex.Unlock(); + + TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1"); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr)); + // This would have triggered a write stall. Unblock the write group leader + TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2"); + // The leader is going to create missing newer links. When the leader + // finishes, the next leader is going to delay writes and fail writers with + // no_slowdown + + TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3"); + for (auto& t : threads) { + t.join(); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) { + Options options = GetOptions(); + options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = + 4; + std::vector<port::Thread> threads; + std::atomic<int> thread_num(0); + port::Mutex mutex; + port::CondVar cv(&mutex); + // Guarded by mutex + int writers = 0; + + Reopen(options); + + std::function<void()> write_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = false; + ASSERT_OK(dbfull()->Put(wo, key, "bar")); + }; + std::function<void()> write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + Status s = dbfull()->Put(wo, key, "bar"); + ASSERT_TRUE(s.ok() || s.IsIncomplete()); + }; + std::function<void(void*)> unblock_main_thread_func = [&](void*) { + mutex.Lock(); + ++writers; + cv.SignalAll(); + mutex.Unlock(); + }; + + // Create 3 L0 files and schedule 4th without waiting + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar")); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteTest::WriteThreadHangOnWriteStall:1", + "DBImpl::BackgroundCallFlush:start"}, + {"DBWriteTest::WriteThreadHangOnWriteStall:2", + "DBImpl::WriteImpl:BeforeLeaderEnters"}, + // Make compaction start wait for the write stall to be detected and + // implemented by a write group leader + {"DBWriteTest::WriteThreadHangOnWriteStall:3", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Schedule creation of 4th L0 file without waiting. This will seal the + // memtable and then wait for a sync point before writing the file. We need + // to do it this way because SwitchMemtable() needs to enter the + // write_thread + FlushOptions fopt; + fopt.wait = false; + ASSERT_OK(dbfull()->Flush(fopt)); + + // Create a mix of slowdown/no_slowdown write threads + mutex.Lock(); + // First leader + threads.emplace_back(write_slowdown_func); + while (writers != 1) { + cv.Wait(); + } + // Second leader. Will stall writes + threads.emplace_back(write_slowdown_func); + threads.emplace_back(write_no_slowdown_func); + threads.emplace_back(write_slowdown_func); + threads.emplace_back(write_no_slowdown_func); + threads.emplace_back(write_slowdown_func); + while (writers != 6) { + cv.Wait(); + } + mutex.Unlock(); + + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1"); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr)); + // This would have triggered a write stall. Unblock the write group leader + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2"); + // The leader is going to create missing newer links. When the leader + // finishes, the next leader is going to delay writes and fail writers with + // no_slowdown + + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3"); + for (auto& t : threads) { + t.join(); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { + constexpr int kNumThreads = 5; + std::unique_ptr<FaultInjectionTestEnv> mock_env( + new FaultInjectionTestEnv(env_)); + Options options = GetOptions(); + options.env = mock_env.get(); + Reopen(options); + std::atomic<int> ready_count{0}; + std::atomic<int> leader_count{0}; + std::vector<port::Thread> threads; + mock_env->SetFilesystemActive(false); + + // Wait until all threads linked to write threads, to make sure + // all threads join the same batch group. + SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + ready_count++; + auto* w = reinterpret_cast<WriteThread::Writer*>(arg); + if (w->state == WriteThread::STATE_GROUP_LEADER) { + leader_count++; + while (ready_count < kNumThreads) { + // busy waiting + } + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < kNumThreads; i++) { + threads.push_back(port::Thread( + [&](int index) { + // All threads should fail. + auto res = Put("key" + std::to_string(index), "value"); + if (options.manual_wal_flush) { + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + + // TSAN reports a false alarm for lock-order-inversion but Open and + // FlushWAL are not run concurrently. Disabling this until TSAN is + // fixed. + // res = dbfull()->FlushWAL(false); + // ASSERT_FALSE(res.ok()); + } else { + ASSERT_FALSE(res.ok()); + } + }, + i)); + } + for (int i = 0; i < kNumThreads; i++) { + threads[i].join(); + } + ASSERT_EQ(1, leader_count); + + // The Failed PUT operations can cause a BG error to be set. + // Mark it as Checked for the ASSERT_STATUS_CHECKED + dbfull()->Resume().PermitUncheckedError(); + + // Close before mock_env destruct. + Close(); +} + +TEST_F(DBWriteTestUnparameterized, PipelinedWriteRace) { + // This test was written to trigger a race in ExitAsBatchGroupLeader in case + // enable_pipelined_write_ was true. + // Writers for which ShouldWriteToMemtable() evaluates to false are removed + // from the write_group via CompleteFollower/ CompleteLeader. Writers in the + // middle of the group are fully unlinked, but if that writers is the + // last_writer, then we did not update the predecessor's link_older, i.e., + // this writer was still reachable via newest_writer_. + // + // But the problem was, that CompleteFollower already wakes up the thread + // owning that writer before the writer has been removed. This resulted in a + // race - if the leader thread was fast enough, then everything was fine. + // However, if the woken up thread finished the current write operation and + // then performed yet another write, then a new writer instance was added + // to newest_writer_. It is possible that the new writer is located on the + // same address on stack, and if this happened, then we had a problem, + // because the old code tried to find the last_writer in the list to unlink + // it, which in this case produced a cycle in the list. + // Whether two invocations of PipelinedWriteImpl() by the same thread actually + // allocate the writer on the same address depends on the OS and/or compiler, + // so it is rather hard to create a deterministic test for this. + + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.enable_pipelined_write = true; + std::vector<port::Thread> threads; + + std::atomic<int> write_counter{0}; + std::atomic<int> active_writers{0}; + std::atomic<bool> second_write_starting{false}; + std::atomic<bool> second_write_in_progress{false}; + std::atomic<WriteThread::Writer*> leader{nullptr}; + std::atomic<bool> finished_WAL_write{false}; + + DestroyAndReopen(options); + + auto write_one_doc = [&]() { + int a = write_counter.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + ASSERT_OK(dbfull()->Put(wo, key, "bar")); + --active_writers; + }; + + auto write_two_docs = [&]() { + write_one_doc(); + second_write_starting = true; + write_one_doc(); + }; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + if (second_write_starting.load()) { + second_write_in_progress = true; + return; + } + auto* w = reinterpret_cast<WriteThread::Writer*>(arg); + if (w->state == WriteThread::STATE_GROUP_LEADER) { + active_writers++; + if (leader.load() == nullptr) { + leader.store(w); + while (active_writers.load() < 2) { + // wait for another thread to join the write_group + } + } + } else { + // we disable the memtable for all followers so that they they are + // removed from the write_group before enqueuing it for the memtable + // write + w->disable_memtable = true; + active_writers++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::ExitAsBatchGroupLeader:Start", [&](void* arg) { + auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg); + if (wg->leader == leader && !finished_WAL_write) { + finished_WAL_write = true; + while (active_writers.load() < 3) { + // wait for the new writer to be enqueued + } + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters", + [&](void* arg) { + auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg); + if (wg->leader == leader) { + while (!second_write_in_progress.load()) { + // wait for the old follower thread to start the next write + } + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // start leader + one follower + threads.emplace_back(write_one_doc); + while (leader.load() == nullptr) { + // wait for leader + } + + // we perform two writes in the follower, so that for the second write + // the thread reinserts a Writer with the same address + threads.emplace_back(write_two_docs); + + // wait for the leader to enter ExitAsBatchGroupLeader + while (!finished_WAL_write.load()) { + // wait for write_group to have finished the WAL writes + } + + // start another writer thread to be enqueued before the leader can + // complete the writers from its write_group + threads.emplace_back(write_one_doc); + + for (auto& t : threads) { + t.join(); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(DBWriteTest, ManualWalFlushInEffect) { + Options options = GetOptions(); + Reopen(options); + // try the 1st WAL created during open + ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); + // try the 2nd wal created during SwitchWAL + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); +} + +TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) { + // Repro race condition bug where unflushed WAL data extended the synced size + // recorded to MANIFEST despite being unrecoverable. + Options options = GetOptions(); + std::unique_ptr<FaultInjectionTestEnv> fault_env( + new FaultInjectionTestEnv(env_)); + options.env = fault_env.get(); + options.manual_wal_flush = true; + options.track_and_verify_wals_in_manifest = true; + Reopen(options); + + ASSERT_OK(Put("key1", "val1")); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncWAL:Begin", + [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->FlushWAL(true /* sync */)); + + // Ensure callback ran. + ASSERT_EQ("val2", Get("key2")); + + Close(); + + // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the + // DB WAL. + fault_env->DropUnsyncedFileData(); + + Reopen(options); + + // Need to close before `fault_env` goes out of scope. + Close(); +} + +TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) { + // Repro bug where a WAL is appended and switched after + // `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully + // synced inactive logs. Previously such a WAL would be wrongly untracked + // so the final append would never be synced. + Options options = GetOptions(); + std::unique_ptr<FaultInjectionTestEnv> fault_env( + new FaultInjectionTestEnv(env_)); + options.env = fault_env.get(); + Reopen(options); + + ASSERT_OK(Put("key1", "val1")); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) { + ASSERT_OK(Put("key2", "val2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->FlushWAL(true /* sync */)); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_OK(Put("key3", "val3")); + + ASSERT_OK(db_->FlushWAL(true /* sync */)); + + Close(); + + // Simulate full loss of unsynced data. This should drop nothing since we did + // `FlushWAL(true /* sync */)` before `Close()`. + fault_env->DropUnsyncedFileData(); + + Reopen(options); + + ASSERT_EQ("val1", Get("key1")); + ASSERT_EQ("val2", Get("key2")); + ASSERT_EQ("val3", Get("key3")); + + // Need to close before `fault_env` goes out of scope. + Close(); +} + +TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { + std::unique_ptr<FaultInjectionTestEnv> mock_env( + new FaultInjectionTestEnv(env_)); + Options options = GetOptions(); + options.env = mock_env.get(); + Reopen(options); + for (int i = 0; i < 2; i++) { + // Forcibly fail WAL write for the first Put only. Subsequent Puts should + // fail due to read-only mode + mock_env->SetFilesystemActive(i != 0); + auto res = Put("key" + std::to_string(i), "value"); + // TSAN reports a false alarm for lock-order-inversion but Open and + // FlushWAL are not run concurrently. Disabling this until TSAN is + // fixed. + /* + if (options.manual_wal_flush && i == 0) { + // even with manual_wal_flush the 2nd Put should return error because of + // the read-only mode + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + */ + if (!options.manual_wal_flush) { + ASSERT_NOK(res); + } else { + ASSERT_OK(res); + } + } + // Close before mock_env destruct. + Close(); +} + +TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) { + Random rnd(301); + std::unique_ptr<FaultInjectionTestEnv> mock_env( + new FaultInjectionTestEnv(env_)); + Options options = GetOptions(); + options.env = mock_env.get(); + options.writable_file_max_buffer_size = 4 * 1024 * 1024; + options.write_buffer_size = 3 * 512 * 1024; + options.wal_bytes_per_sync = 256 * 1024; + options.manual_wal_flush = true; + Reopen(options); + mock_env->SetFilesystemActive(false, Status::IOError("Not active")); + Status s; + for (int i = 0; i < 4 * 512; ++i) { + s = Put(Key(i), rnd.RandomString(1024)); + if (!s.ok()) { + break; + } + } + ASSERT_EQ(s.severity(), Status::Severity::kFatalError); + + mock_env->SetFilesystemActive(true); + // Close before mock_env destruct. + Close(); +} + +// Test that db->LockWAL() flushes the WAL after locking. +TEST_P(DBWriteTest, LockWalInEffect) { + Options options = GetOptions(); + Reopen(options); + // try the 1st WAL created during open + ASSERT_OK(Put("key" + std::to_string(0), "value")); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); + ASSERT_OK(dbfull()->LockWAL()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); + ASSERT_OK(dbfull()->UnlockWAL()); + // try the 2nd wal created during SwitchWAL + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_OK(Put("key" + std::to_string(0), "value")); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); + ASSERT_OK(dbfull()->LockWAL()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); + ASSERT_OK(dbfull()->UnlockWAL()); +} + +TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { + Options options = GetOptions(); + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.statistics->set_stats_level(StatsLevel::kAll); + Reopen(options); + std::string wal_key_prefix = "WAL_KEY_"; + std::string no_wal_key_prefix = "K_"; + // 100 KB value each for NO-WAL operation + std::string no_wal_value(1024 * 100, 'X'); + // 1B value each for WAL operation + std::string wal_value = "0"; + std::thread threads[10]; + for (int t = 0; t < 10; t++) { + threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, + no_wal_value, this] { + for (int i = 0; i < 10; i++) { + ROCKSDB_NAMESPACE::WriteOptions write_option_disable; + write_option_disable.disableWAL = true; + ROCKSDB_NAMESPACE::WriteOptions write_option_default; + std::string no_wal_key = + no_wal_key_prefix + std::to_string(t) + "_" + std::to_string(i); + ASSERT_OK(this->Put(no_wal_key, no_wal_value, write_option_disable)); + std::string wal_key = + wal_key_prefix + std::to_string(i) + "_" + std::to_string(i); + ASSERT_OK(this->Put(wal_key, wal_value, write_option_default)); + ASSERT_OK(dbfull()->SyncWAL()); + } + return; + }); + } + for (auto& t : threads) { + t.join(); + } + uint64_t bytes_num = options.statistics->getTickerCount( + ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES); + // written WAL size should less than 100KB (even included HEADER & FOOTER + // overhead) + ASSERT_LE(bytes_num, 1024 * 100); +} + +INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, + testing::Values(DBTestBase::kDefault, + DBTestBase::kConcurrentWALWrites, + DBTestBase::kPipelinedWrite)); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} |