diff options
Diffstat (limited to 'src/rocksdb/db/db_log_iter_test.cc')
-rw-r--r-- | src/rocksdb/db/db_log_iter_test.cc | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_log_iter_test.cc b/src/rocksdb/db/db_log_iter_test.cc new file mode 100644 index 00000000..45642bc7 --- /dev/null +++ b/src/rocksdb/db/db_log_iter_test.cc @@ -0,0 +1,294 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Introduction of SyncPoint effectively disabled building and running this test +// in Release build. +// which is a pity, it is a good test +#if !defined(ROCKSDB_LITE) + +#include "db/db_test_util.h" +#include "port/stack_trace.h" + +namespace rocksdb { + +class DBTestXactLogIterator : public DBTestBase { + public: + DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {} + + std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter( + const SequenceNumber seq) { + std::unique_ptr<TransactionLogIterator> iter; + Status status = dbfull()->GetUpdatesSince(seq, &iter); + EXPECT_OK(status); + EXPECT_TRUE(iter->Valid()); + return iter; + } +}; + +namespace { +SequenceNumber ReadRecords( + std::unique_ptr<TransactionLogIterator>& iter, + int& count) { + count = 0; + SequenceNumber lastSequence = 0; + BatchResult res; + while (iter->Valid()) { + res = iter->GetBatch(); + EXPECT_TRUE(res.sequence > lastSequence); + ++count; + lastSequence = res.sequence; + EXPECT_OK(iter->status()); + iter->Next(); + } + return res.sequence; +} + +void ExpectRecords( + const int expected_no_records, + std::unique_ptr<TransactionLogIterator>& iter) { + int num_records; + ReadRecords(iter, num_records); + ASSERT_EQ(num_records, expected_no_records); +} +} // namespace + +TEST_F(DBTestXactLogIterator, TransactionLogIterator) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Put(0, "key1", DummyString(1024)); + Put(1, "key2", DummyString(1024)); + Put(1, "key2", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(3, iter); + } + ReopenWithColumnFamilies({"default", "pikachu"}, options); + env_->SleepForMicroseconds(2 * 1000 * 1000); + { + Put(0, "key4", DummyString(1024)); + Put(1, "key5", DummyString(1024)); + Put(0, "key6", DummyString(1024)); + } + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(6, iter); + } + } while (ChangeCompactOptions()); +} + +#ifndef NDEBUG // sync point is not included with DNDEBUG build +TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) { + static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; + static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { + {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1", + "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"}, + {"WalManager::GetSortedWalsOfType:1", + "WalManager::PurgeObsoleteFiles:1", + "WalManager::PurgeObsoleteFiles:2", + "WalManager::GetSortedWalsOfType:2"}}; + for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) { + // Setup sync point dependency to reproduce the race condition of + // a log file moved to archived dir, in the middle of GetSortedWalFiles + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { sync_points[test][0], sync_points[test][1] }, + { sync_points[test][2], sync_points[test][3] }, + }); + + do { + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + Put("key1", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key2", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key3", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key4", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + dbfull()->FlushWAL(false); + + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(4, iter); + } + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // trigger async flush, and log move. Well, log move will + // wait until the GetSortedWalFiles:1 to reproduce the race + // condition + FlushOptions flush_options; + flush_options.wait = false; + dbfull()->Flush(flush_options); + + // "key5" would be written in a new memtable and log + Put("key5", DummyString(1024)); + dbfull()->FlushWAL(false); + { + // this iter would miss "key4" if not fixed + auto iter = OpenTransactionLogIter(0); + ExpectRecords(5, iter); + } + } while (ChangeCompactOptions()); + } +} +#endif + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + Put("key1", DummyString(1024)); + auto iter = OpenTransactionLogIter(0); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + ASSERT_OK(iter->status()); + Put("key2", DummyString(1024)); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + Put("key1", DummyString(1024)); + Put("key2", DummyString(1023)); + dbfull()->Flush(FlushOptions()); + Reopen(options); + auto iter = OpenTransactionLogIter(0); + ExpectRecords(2, iter); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + for (int i = 0; i < 1024; i++) { + Put("key"+ToString(i), DummyString(10)); + } + dbfull()->Flush(FlushOptions()); + dbfull()->FlushWAL(false); + // Corrupt this log to create a gap + rocksdb::VectorLogPtr wal_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); + const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName(); + if (mem_env_) { + mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2); + } else { + ASSERT_EQ(0, truncate(logfile_path.c_str(), + wal_files.front()->SizeFileBytes() / 2)); + } + + // Insert a new entry to a new log file + Put("key1025", DummyString(10)); + dbfull()->FlushWAL(false); + // Try to read from the beginning. Should stop before the gap and read less + // than 1025 entries + auto iter = OpenTransactionLogIter(0); + int count; + SequenceNumber last_sequence_read = ReadRecords(iter, count); + ASSERT_LT(last_sequence_read, 1025U); + // Try to read past the gap, should be able to seek to key1025 + auto iter2 = OpenTransactionLogIter(last_sequence_read + 1); + ExpectRecords(1, iter2); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + WriteBatch batch; + batch.Put(handles_[1], "key1", DummyString(1024)); + batch.Put(handles_[0], "key2", DummyString(1024)); + batch.Put(handles_[1], "key3", DummyString(1024)); + batch.Delete(handles_[0], "key2"); + dbfull()->Write(WriteOptions(), &batch); + Flush(1); + Flush(0); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + Put(1, "key4", DummyString(1024)); + auto iter = OpenTransactionLogIter(3); + ExpectRecords(2, iter); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + { + WriteBatch batch; + batch.Put(handles_[1], "key1", DummyString(1024)); + batch.Put(handles_[0], "key2", DummyString(1024)); + batch.PutLogData(Slice("blob1")); + batch.Put(handles_[1], "key3", DummyString(1024)); + batch.PutLogData(Slice("blob2")); + batch.Delete(handles_[0], "key2"); + dbfull()->Write(WriteOptions(), &batch); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + } + + auto res = OpenTransactionLogIter(0)->GetBatch(); + struct Handler : public WriteBatch::Handler { + std::string seen; + Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override { + seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " + + ToString(value.size()) + ")"; + return Status::OK(); + } + Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override { + seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " + + ToString(value.size()) + ")"; + return Status::OK(); + } + void LogData(const Slice& blob) override { + seen += "LogData(" + blob.ToString() + ")"; + } + Status DeleteCF(uint32_t cf, const Slice& key) override { + seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")"; + return Status::OK(); + } + } handler; + res.writeBatchPtr->Iterate(&handler); + ASSERT_EQ( + "Put(1, key1, 1024)" + "Put(0, key2, 1024)" + "LogData(blob1)" + "Put(1, key3, 1024)" + "LogData(blob2)" + "Delete(0, key2)", + handler.seen); +} +} // namespace rocksdb + +#endif // !defined(ROCKSDB_LITE) + +int main(int argc, char** argv) { +#if !defined(ROCKSDB_LITE) + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +#else + (void) argc; + (void) argv; + return 0; +#endif +} |