summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/db_log_iter_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/db_log_iter_test.cc')
-rw-r--r--src/rocksdb/db/db_log_iter_test.cc294
1 files changed, 294 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_log_iter_test.cc b/src/rocksdb/db/db_log_iter_test.cc
new file mode 100644
index 00000000..45642bc7
--- /dev/null
+++ b/src/rocksdb/db/db_log_iter_test.cc
@@ -0,0 +1,294 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+// Introduction of SyncPoint effectively disabled building and running this test
+// in Release build.
+// which is a pity, it is a good test
+#if !defined(ROCKSDB_LITE)
+
+#include "db/db_test_util.h"
+#include "port/stack_trace.h"
+
+namespace rocksdb {
+
+class DBTestXactLogIterator : public DBTestBase {
+ public:
+ DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {}
+
+ std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
+ const SequenceNumber seq) {
+ std::unique_ptr<TransactionLogIterator> iter;
+ Status status = dbfull()->GetUpdatesSince(seq, &iter);
+ EXPECT_OK(status);
+ EXPECT_TRUE(iter->Valid());
+ return iter;
+ }
+};
+
+namespace {
+SequenceNumber ReadRecords(
+ std::unique_ptr<TransactionLogIterator>& iter,
+ int& count) {
+ count = 0;
+ SequenceNumber lastSequence = 0;
+ BatchResult res;
+ while (iter->Valid()) {
+ res = iter->GetBatch();
+ EXPECT_TRUE(res.sequence > lastSequence);
+ ++count;
+ lastSequence = res.sequence;
+ EXPECT_OK(iter->status());
+ iter->Next();
+ }
+ return res.sequence;
+}
+
+void ExpectRecords(
+ const int expected_no_records,
+ std::unique_ptr<TransactionLogIterator>& iter) {
+ int num_records;
+ ReadRecords(iter, num_records);
+ ASSERT_EQ(num_records, expected_no_records);
+}
+} // namespace
+
+TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
+ do {
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Put(0, "key1", DummyString(1024));
+ Put(1, "key2", DummyString(1024));
+ Put(1, "key2", DummyString(1024));
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
+ {
+ auto iter = OpenTransactionLogIter(0);
+ ExpectRecords(3, iter);
+ }
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ env_->SleepForMicroseconds(2 * 1000 * 1000);
+ {
+ Put(0, "key4", DummyString(1024));
+ Put(1, "key5", DummyString(1024));
+ Put(0, "key6", DummyString(1024));
+ }
+ {
+ auto iter = OpenTransactionLogIter(0);
+ ExpectRecords(6, iter);
+ }
+ } while (ChangeCompactOptions());
+}
+
+#ifndef NDEBUG // sync point is not included with DNDEBUG build
+TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
+ static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
+ static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
+ {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
+ "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
+ {"WalManager::GetSortedWalsOfType:1",
+ "WalManager::PurgeObsoleteFiles:1",
+ "WalManager::PurgeObsoleteFiles:2",
+ "WalManager::GetSortedWalsOfType:2"}};
+ for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
+ // Setup sync point dependency to reproduce the race condition of
+ // a log file moved to archived dir, in the middle of GetSortedWalFiles
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ { { sync_points[test][0], sync_points[test][1] },
+ { sync_points[test][2], sync_points[test][3] },
+ });
+
+ do {
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ Put("key1", DummyString(1024));
+ dbfull()->Flush(FlushOptions());
+ Put("key2", DummyString(1024));
+ dbfull()->Flush(FlushOptions());
+ Put("key3", DummyString(1024));
+ dbfull()->Flush(FlushOptions());
+ Put("key4", DummyString(1024));
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
+ dbfull()->FlushWAL(false);
+
+ {
+ auto iter = OpenTransactionLogIter(0);
+ ExpectRecords(4, iter);
+ }
+
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ // trigger async flush, and log move. Well, log move will
+ // wait until the GetSortedWalFiles:1 to reproduce the race
+ // condition
+ FlushOptions flush_options;
+ flush_options.wait = false;
+ dbfull()->Flush(flush_options);
+
+ // "key5" would be written in a new memtable and log
+ Put("key5", DummyString(1024));
+ dbfull()->FlushWAL(false);
+ {
+ // this iter would miss "key4" if not fixed
+ auto iter = OpenTransactionLogIter(0);
+ ExpectRecords(5, iter);
+ }
+ } while (ChangeCompactOptions());
+ }
+}
+#endif
+
+TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
+ do {
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ Put("key1", DummyString(1024));
+ auto iter = OpenTransactionLogIter(0);
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ iter->Next();
+ ASSERT_TRUE(!iter->Valid());
+ ASSERT_OK(iter->status());
+ Put("key2", DummyString(1024));
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
+ do {
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ Put("key1", DummyString(1024));
+ Put("key2", DummyString(1023));
+ dbfull()->Flush(FlushOptions());
+ Reopen(options);
+ auto iter = OpenTransactionLogIter(0);
+ ExpectRecords(2, iter);
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
+ do {
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ for (int i = 0; i < 1024; i++) {
+ Put("key"+ToString(i), DummyString(10));
+ }
+ dbfull()->Flush(FlushOptions());
+ dbfull()->FlushWAL(false);
+ // Corrupt this log to create a gap
+ rocksdb::VectorLogPtr wal_files;
+ ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
+ const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
+ if (mem_env_) {
+ mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2);
+ } else {
+ ASSERT_EQ(0, truncate(logfile_path.c_str(),
+ wal_files.front()->SizeFileBytes() / 2));
+ }
+
+ // Insert a new entry to a new log file
+ Put("key1025", DummyString(10));
+ dbfull()->FlushWAL(false);
+ // Try to read from the beginning. Should stop before the gap and read less
+ // than 1025 entries
+ auto iter = OpenTransactionLogIter(0);
+ int count;
+ SequenceNumber last_sequence_read = ReadRecords(iter, count);
+ ASSERT_LT(last_sequence_read, 1025U);
+ // Try to read past the gap, should be able to seek to key1025
+ auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
+ ExpectRecords(1, iter2);
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
+ do {
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ WriteBatch batch;
+ batch.Put(handles_[1], "key1", DummyString(1024));
+ batch.Put(handles_[0], "key2", DummyString(1024));
+ batch.Put(handles_[1], "key3", DummyString(1024));
+ batch.Delete(handles_[0], "key2");
+ dbfull()->Write(WriteOptions(), &batch);
+ Flush(1);
+ Flush(0);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ Put(1, "key4", DummyString(1024));
+ auto iter = OpenTransactionLogIter(3);
+ ExpectRecords(2, iter);
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ {
+ WriteBatch batch;
+ batch.Put(handles_[1], "key1", DummyString(1024));
+ batch.Put(handles_[0], "key2", DummyString(1024));
+ batch.PutLogData(Slice("blob1"));
+ batch.Put(handles_[1], "key3", DummyString(1024));
+ batch.PutLogData(Slice("blob2"));
+ batch.Delete(handles_[0], "key2");
+ dbfull()->Write(WriteOptions(), &batch);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ }
+
+ auto res = OpenTransactionLogIter(0)->GetBatch();
+ struct Handler : public WriteBatch::Handler {
+ std::string seen;
+ Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
+ seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " +
+ ToString(value.size()) + ")";
+ return Status::OK();
+ }
+ Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
+ seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +
+ ToString(value.size()) + ")";
+ return Status::OK();
+ }
+ void LogData(const Slice& blob) override {
+ seen += "LogData(" + blob.ToString() + ")";
+ }
+ Status DeleteCF(uint32_t cf, const Slice& key) override {
+ seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";
+ return Status::OK();
+ }
+ } handler;
+ res.writeBatchPtr->Iterate(&handler);
+ ASSERT_EQ(
+ "Put(1, key1, 1024)"
+ "Put(0, key2, 1024)"
+ "LogData(blob1)"
+ "Put(1, key3, 1024)"
+ "LogData(blob2)"
+ "Delete(0, key2)",
+ handler.seen);
+}
+} // namespace rocksdb
+
+#endif // !defined(ROCKSDB_LITE)
+
+int main(int argc, char** argv) {
+#if !defined(ROCKSDB_LITE)
+ rocksdb::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+#else
+ (void) argc;
+ (void) argv;
+ return 0;
+#endif
+}