diff options
Diffstat (limited to 'src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc')
-rw-r--r-- | src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc b/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc new file mode 100644 index 00000000..9aee33b0 --- /dev/null +++ b/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc @@ -0,0 +1,450 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "utilities/transactions/transaction_test.h" +#include "utilities/transactions/write_unprepared_txn.h" +#include "utilities/transactions/write_unprepared_txn_db.h" + +namespace rocksdb { + +class WriteUnpreparedTransactionTestBase : public TransactionTestBase { + public: + WriteUnpreparedTransactionTestBase(bool use_stackable_db, + bool two_write_queue, + TxnDBWritePolicy write_policy) + : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){} +}; + +class WriteUnpreparedTransactionTest + : public WriteUnpreparedTransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple<bool, bool, TxnDBWritePolicy>> { + public: + WriteUnpreparedTransactionTest() + : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()), + std::get<1>(GetParam()), + std::get<2>(GetParam())){} +}; + +INSTANTIATE_TEST_CASE_P( + WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED), + std::make_tuple(false, true, WRITE_UNPREPARED))); + +TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { + auto verify_state = [](Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); + }; + + options.disable_auto_compactions = true; + ReOpen(); + + // The following tests checks whether reading your own write for + // a transaction works for write unprepared, when there are uncommitted + // values written into DB. + // + // Although the values written by DB::Put are technically committed, we add + // their seq num to unprep_seqs_ to pretend that they were written into DB + // as part of an unprepared batch, and then check if they are visible to the + // transaction. + auto snapshot0 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v1")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v2")); + auto snapshot2 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v3")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v4")); + auto snapshot4 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v5")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v6")); + auto snapshot6 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v7")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v8")); + auto snapshot8 = db->GetSnapshot(); + + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); + + ReadOptions roptions; + roptions.snapshot = snapshot0; + + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + auto iter = txn->GetIterator(roptions); + + // Test Get(). + std::string value; + + ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); + ASSERT_EQ(value, "v3"); + + ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); + ASSERT_EQ(value, "v4"); + + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); + + ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); + ASSERT_EQ(value, "v7"); + + ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); + ASSERT_EQ(value, "v8"); + + wup_txn->unprep_seqs_.clear(); + + // Test Next(). + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); + + iter->Seek("a"); + verify_state(iter, "a", "v3"); + + iter->Next(); + verify_state(iter, "b", "v4"); + + iter->SeekToFirst(); + verify_state(iter, "a", "v3"); + + iter->Next(); + verify_state(iter, "b", "v4"); + + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); + + iter->Seek("a"); + verify_state(iter, "a", "v7"); + + iter->Next(); + verify_state(iter, "b", "v8"); + + iter->SeekToFirst(); + verify_state(iter, "a", "v7"); + + iter->Next(); + verify_state(iter, "b", "v8"); + + wup_txn->unprep_seqs_.clear(); + + // Test Prev(). For Prev(), we need to adjust the snapshot to match what is + // possible in WriteUnpreparedTxn. + // + // Because of row locks and ValidateSnapshot, there cannot be any committed + // entries after snapshot, but before the first prepared key. + roptions.snapshot = snapshot2; + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); + + iter->SeekForPrev("b"); + verify_state(iter, "b", "v4"); + + iter->Prev(); + verify_state(iter, "a", "v3"); + + iter->SeekToLast(); + verify_state(iter, "b", "v4"); + + iter->Prev(); + verify_state(iter, "a", "v3"); + + roptions.snapshot = snapshot6; + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); + + iter->SeekForPrev("b"); + verify_state(iter, "b", "v8"); + + iter->Prev(); + verify_state(iter, "a", "v7"); + + iter->SeekToLast(); + verify_state(iter, "b", "v8"); + + iter->Prev(); + verify_state(iter, "a", "v7"); + + // Since the unprep_seqs_ data were faked for testing, we do not want the + // destructor for the transaction to be rolling back data that did not + // exist. + wup_txn->unprep_seqs_.clear(); + + db->ReleaseSnapshot(snapshot0); + db->ReleaseSnapshot(snapshot2); + db->ReleaseSnapshot(snapshot4); + db->ReleaseSnapshot(snapshot6); + db->ReleaseSnapshot(snapshot8); + delete iter; + delete txn; +} + +// This tests how write unprepared behaves during recovery when the DB crashes +// after a transaction has either been unprepared or prepared, and tests if +// the changes are correctly applied for prepared transactions if we decide to +// rollback/commit. +TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { + WriteOptions write_options; + write_options.disableWAL = false; + TransactionOptions txn_options; + std::vector<Transaction*> prepared_trans; + WriteUnpreparedTxnDB* wup_db; + options.disable_auto_compactions = true; + + enum Action { UNPREPARED, ROLLBACK, COMMIT }; + + // batch_size of 1 causes writes to DB for every marker. + for (size_t batch_size : {1, 1000000}) { + txn_options.max_write_batch_size = batch_size; + for (bool empty : {true, false}) { + for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) { + for (int num_batches = 1; num_batches < 10; num_batches++) { + // Reset database. + prepared_trans.clear(); + ReOpen(); + wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db); + if (!empty) { + for (int i = 0; i < num_batches; i++) { + ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i), + "before value" + ToString(i))); + } + } + + // Write num_batches unprepared batches. + Transaction* txn = db->BeginTransaction(write_options, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); + txn->SetName("xid"); + for (int i = 0; i < num_batches; i++) { + ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); + if (txn_options.max_write_batch_size == 1) { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); + } else { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); + } + } + if (a == UNPREPARED) { + // This is done to prevent the destructor from rolling back the + // transaction for us, since we want to pretend we crashed and + // test that recovery does the rollback. + wup_txn->unprep_seqs_.clear(); + } else { + txn->Prepare(); + } + delete txn; + + // Crash and run recovery code paths. + wup_db->db_impl_->FlushWAL(true); + wup_db->TEST_Crash(); + ReOpenNoDelete(); + assert(db != nullptr); + + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1); + if (a == ROLLBACK) { + ASSERT_OK(prepared_trans[0]->Rollback()); + delete prepared_trans[0]; + } else if (a == COMMIT) { + ASSERT_OK(prepared_trans[0]->Commit()); + delete prepared_trans[0]; + } + + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + // Check that DB has before values. + if (!empty || a == COMMIT) { + for (int i = 0; i < num_batches; i++) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); + if (a == COMMIT) { + ASSERT_EQ(iter->value().ToString(), "value" + ToString(i)); + } else { + ASSERT_EQ(iter->value().ToString(), + "before value" + ToString(i)); + } + iter->Next(); + } + } + ASSERT_FALSE(iter->Valid()); + delete iter; + } + } + } + } +} + +// Basic test to see that unprepared batch gets written to DB when batch size +// is exceeded. It also does some basic checks to see if commit/rollback works +// as expected for write unprepared. +TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { + WriteOptions write_options; + TransactionOptions txn_options; + const int kNumKeys = 10; + + // batch_size of 1 causes writes to DB for every marker. + for (size_t batch_size : {1, 1000000}) { + txn_options.max_write_batch_size = batch_size; + for (bool prepare : {false, true}) { + for (bool commit : {false, true}) { + ReOpen(); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); + txn->SetName("xid"); + + for (int i = 0; i < kNumKeys; i++) { + txn->Put("k" + ToString(i), "v" + ToString(i)); + if (txn_options.max_write_batch_size == 1) { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); + } else { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); + } + } + + if (prepare) { + ASSERT_OK(txn->Prepare()); + } + + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + assert(!iter->Valid()); + ASSERT_FALSE(iter->Valid()); + delete iter; + + if (commit) { + ASSERT_OK(txn->Commit()); + } else { + ASSERT_OK(txn->Rollback()); + } + delete txn; + + iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + + for (int i = 0; i < (commit ? kNumKeys : 0); i++) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); + ASSERT_EQ(iter->value().ToString(), "v" + ToString(i)); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + delete iter; + } + } + } +} + +// Test whether logs containing unprepared/prepared batches are kept even +// after memtable finishes flushing, and whether they are removed when +// transaction commits/aborts. +// +// TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests. +TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) { + WriteOptions write_options; + TransactionOptions txn_options; + // batch_size of 1 causes writes to DB for every marker. + txn_options.max_write_batch_size = 1; + const int kNumKeys = 10; + + WriteOptions wopts; + wopts.sync = true; + + for (bool prepare : {false, true}) { + for (bool commit : {false, true}) { + ReOpen(); + auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db); + auto db_impl = wup_db->db_impl_; + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn1->SetName("xid1")); + + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn2->SetName("xid2")); + + // Spread this transaction across multiple log files. + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i))); + if (i >= kNumKeys / 2) { + ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i))); + } + + if (i > 0) { + db_impl->TEST_SwitchWAL(); + } + } + + ASSERT_GT(txn1->GetLogNumber(), 0); + ASSERT_GT(txn2->GetLogNumber(), 0); + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + + if (prepare) { + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn2->Prepare()); + } + + ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + if (commit) { + ASSERT_OK(txn1->Commit()); + } else { + ASSERT_OK(txn1->Rollback()); + } + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn2->GetLogNumber()); + + if (commit) { + ASSERT_OK(txn2->Commit()); + } else { + ASSERT_OK(txn2->Rollback()); + } + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + delete txn1; + delete txn2; + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE |