diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/utilities/transactions/transaction_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/utilities/transactions/transaction_test.cc | 5797 |
1 files changed, 5797 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/transaction_test.cc b/src/rocksdb/utilities/transactions/transaction_test.cc new file mode 100644 index 00000000..732d4c81 --- /dev/null +++ b/src/rocksdb/utilities/transactions/transaction_test.cc @@ -0,0 +1,5797 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "utilities/transactions/transaction_test.h" + +#include <algorithm> +#include <functional> +#include <string> +#include <thread> + +#include "db/db_impl.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "table/mock_table.h" +#include "util/fault_injection_test_env.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "util/transaction_test_util.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/transactions/pessimistic_transaction_db.h" + +#include "port/port.h" + +using std::string; + +namespace rocksdb { + +INSTANTIATE_TEST_CASE_P( + DBAsBaseDB, TransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), + std::make_tuple(false, true, WRITE_COMMITTED), + std::make_tuple(false, false, WRITE_PREPARED), + std::make_tuple(false, true, WRITE_PREPARED), + std::make_tuple(false, false, WRITE_UNPREPARED), + std::make_tuple(false, true, WRITE_UNPREPARED))); +INSTANTIATE_TEST_CASE_P( + DBAsBaseDB, TransactionStressTest, + ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), + std::make_tuple(false, true, WRITE_COMMITTED), + std::make_tuple(false, false, WRITE_PREPARED), + std::make_tuple(false, true, WRITE_PREPARED), + std::make_tuple(false, false, WRITE_UNPREPARED), + std::make_tuple(false, true, WRITE_UNPREPARED))); +INSTANTIATE_TEST_CASE_P( + StackableDBAsBaseDB, TransactionTest, + ::testing::Values(std::make_tuple(true, true, WRITE_COMMITTED), + std::make_tuple(true, true, WRITE_PREPARED), + std::make_tuple(true, true, WRITE_UNPREPARED))); + +// MySQLStyleTransactionTest takes far too long for valgrind to run. +#ifndef ROCKSDB_VALGRIND_RUN +INSTANTIATE_TEST_CASE_P( + MySQLStyleTransactionTest, MySQLStyleTransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED, false), + std::make_tuple(false, true, WRITE_COMMITTED, false), + std::make_tuple(false, false, WRITE_PREPARED, false), + std::make_tuple(false, false, WRITE_PREPARED, true), + std::make_tuple(false, true, WRITE_PREPARED, false), + std::make_tuple(false, true, WRITE_PREPARED, true), + std::make_tuple(false, false, WRITE_UNPREPARED, false), + std::make_tuple(false, false, WRITE_UNPREPARED, true), + std::make_tuple(false, true, WRITE_UNPREPARED, false), + std::make_tuple(false, true, WRITE_UNPREPARED, true))); +#endif // ROCKSDB_VALGRIND_RUN + +TEST_P(TransactionTest, DoubleEmptyWrite) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + + WriteBatch batch; + + ASSERT_OK(db->Write(write_options, &batch)); + ASSERT_OK(db->Write(write_options, &batch)); + + // Also test committing empty transactions in 2PC + TransactionOptions txn_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Prepare()); + ASSERT_OK(txn0->Commit()); + delete txn0; + + // Also test that it works during recovery + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid2")); + txn0->Put(Slice("foo0"), Slice("bar0a")); + ASSERT_OK(txn0->Prepare()); + delete txn0; + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete()); + assert(db != nullptr); + txn0 = db->GetTransactionByName("xid2"); + ASSERT_OK(txn0->Commit()); + delete txn0; +} + +TEST_P(TransactionTest, SuccessTest) { + ASSERT_OK(db->ResetStats()); + + WriteOptions write_options; + ReadOptions read_options; + std::string value; + + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar"))); + + Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn); + + ASSERT_EQ(0, txn->GetNumPuts()); + ASSERT_LE(0, txn->GetID()); + + ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value)); + ASSERT_EQ(value, "bar"); + + ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2"))); + + ASSERT_EQ(1, txn->GetNumPuts()); + + ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value)); + ASSERT_EQ(value, "bar2"); + + ASSERT_OK(txn->Commit()); + + ASSERT_OK(db->Get(read_options, "foo", &value)); + ASSERT_EQ(value, "bar2"); + + delete txn; +} + +// The test clarifies the contract of do_validate and assume_tracked +// in GetForUpdate and Put/Merge/Delete +TEST_P(TransactionTest, AssumeExclusiveTracked) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + TransactionOptions txn_options; + txn_options.lock_timeout = 1; + const bool EXCLUSIVE = true; + const bool DO_VALIDATE = true; + const bool ASSUME_LOCKED = true; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); + txn->SetSnapshot(); + + // commit a value after the snapshot is taken + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + + // By default write should fail to the commit after our snapshot + s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE); + ASSERT_TRUE(s.IsBusy()); + // But the user could direct the db to skip validating the snapshot. The read + // value then should be the most recently committed + ASSERT_OK( + txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE)); + ASSERT_EQ(value, "bar"); + + // Although ValidateSnapshot is skipped the key must have still got locked + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_TRUE(s.IsTimedOut()); + + // By default the write operations should fail due to the commit after the + // snapshot + s = txn->Put(Slice("foo"), Slice("bar1")); + ASSERT_TRUE(s.IsBusy()); + s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"), + !ASSUME_LOCKED); + ASSERT_TRUE(s.IsBusy()); + // But the user could direct the db that it already assumes exclusive lock on + // the key due to the previous GetForUpdate call. + ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"), + ASSUME_LOCKED)); + ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"), + ASSUME_LOCKED)); + ASSERT_OK( + txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED)); + ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"), + ASSUME_LOCKED)); + + txn->Rollback(); + delete txn; +} + +// This test clarifies the contract of ValidateSnapshot +TEST_P(TransactionTest, ValidateSnapshotTest) { + for (bool with_flush : {true}) { + for (bool with_2pc : {true}) { + ASSERT_OK(ReOpen()); + WriteOptions write_options; + ReadOptions read_options; + std::string value; + + assert(db != nullptr); + Transaction* txn1 = + db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn1); + ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1"))); + if (with_2pc) { + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Prepare()); + } + + if (with_flush) { + auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + db_impl->TEST_FlushMemTable(true); + // Make sure the flushed memtable is not kept in memory + int max_memtable_in_history = + std::max(options.max_write_buffer_number, + options.max_write_buffer_number_to_maintain) + + 1; + for (int i = 0; i < max_memtable_in_history; i++) { + db->Put(write_options, Slice("key"), Slice("value")); + db_impl->TEST_FlushMemTable(true); + } + } + + Transaction* txn2 = + db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn2); + txn2->SetSnapshot(); + + ASSERT_OK(txn1->Commit()); + delete txn1; + + auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2); + // Test the simple case where the key is not tracked yet + auto trakced_seq = kMaxSequenceNumber; + auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", + &trakced_seq); + ASSERT_TRUE(s.IsBusy()); + delete txn2; + } + } +} + +TEST_P(TransactionTest, WaitingTxn) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + string value; + Status s; + + txn_options.lock_timeout = 1; + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + /* create second cf */ + ColumnFamilyHandle* cfa; + ColumnFamilyOptions cf_options; + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->Put(write_options, cfa, Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + TransactionID id1 = txn1->GetID(); + ASSERT_TRUE(txn1); + ASSERT_TRUE(txn2); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) { + std::string key; + uint32_t cf_id; + std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key); + ASSERT_EQ(key, "foo"); + ASSERT_EQ(wait.size(), 1); + ASSERT_EQ(wait[0], id1); + ASSERT_EQ(cf_id, 0); + }); + + get_perf_context()->Reset(); + // lock key in default cf + s = txn1->GetForUpdate(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0); + + // lock key in cfa + s = txn1->GetForUpdate(read_options, cfa, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0); + + auto lock_data = db->GetLockStatusData(); + // Locked keys exist in both column family. + ASSERT_EQ(lock_data.size(), 2); + + auto cf_iterator = lock_data.begin(); + + // The iterator points to an unordered_multimap + // thus the test can not assume any particular order. + + // Column family is 1 or 0 (cfa). + if (cf_iterator->first != 1 && cf_iterator->first != 0) { + FAIL(); + } + // The locked key is "foo" and is locked by txn1 + ASSERT_EQ(cf_iterator->second.key, "foo"); + ASSERT_EQ(cf_iterator->second.ids.size(), 1); + ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID()); + + cf_iterator++; + + // Column family is 0 (default) or 1. + if (cf_iterator->first != 1 && cf_iterator->first != 0) { + FAIL(); + } + // The locked key is "foo" and is locked by txn1 + ASSERT_EQ(cf_iterator->second.key, "foo"); + ASSERT_EQ(cf_iterator->second.ids.size(), 1); + ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID()); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + s = txn2->GetForUpdate(read_options, "foo", &value); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + ASSERT_EQ(get_perf_context()->key_lock_wait_count, 1); + ASSERT_GE(get_perf_context()->key_lock_wait_time, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + delete cfa; + delete txn1; + delete txn2; +} + +TEST_P(TransactionTest, SharedLocks) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + Status s; + + txn_options.lock_timeout = 1; + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + Transaction* txn3 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + ASSERT_TRUE(txn2); + ASSERT_TRUE(txn3); + + // Test shared access between txns + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + auto lock_data = db->GetLockStatusData(); + ASSERT_EQ(lock_data.size(), 1); + + auto cf_iterator = lock_data.begin(); + ASSERT_EQ(cf_iterator->second.key, "foo"); + + // We compare whether the set of txns locking this key is the same. To do + // this, we need to sort both vectors so that the comparison is done + // correctly. + std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(), + txn3->GetID()}; + std::vector<TransactionID> lock_txns = cf_iterator->second.ids; + ASSERT_EQ(expected_txns, lock_txns); + ASSERT_FALSE(cf_iterator->second.exclusive); + + txn1->Rollback(); + txn2->Rollback(); + txn3->Rollback(); + + // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it. + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn2->UndoGetForUpdate("foo"); + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + txn1->Rollback(); + txn2->Rollback(); + txn3->Rollback(); + + // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock. + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn2->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + ASSERT_OK(txn1->Rollback()); + ASSERT_OK(txn2->Rollback()); + + // Test txn1 trying to downgrade its lock. + s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + // Should still fail after "downgrading". + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->Rollback(); + txn2->Rollback(); + + // Test txn1 holding an exclusive lock and txn2 trying to obtain shared + // access. + s = txn1->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + delete txn1; + delete txn2; + delete txn3; +} + +TEST_P(TransactionTest, DeadlockCycleShared) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + + txn_options.lock_timeout = 1000000; + txn_options.deadlock_detect = true; + + // Set up a wait for chain like this: + // + // Tn -> T(n*2) + // Tn -> T(n*2 + 1) + // + // So we have: + // T1 -> T2 -> T4 ... + // | |> T5 ... + // |> T3 -> T6 ... + // |> T7 ... + // up to T31, then T[16 - 31] -> T1. + // Note that Tn holds lock on floor(n / 2). + + std::vector<Transaction*> txns(31); + + for (uint32_t i = 0; i < 31; i++) { + txns[i] = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txns[i]); + auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr, + false /* exclusive */); + ASSERT_OK(s); + } + + std::atomic<uint32_t> checkpoints(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + [&](void* /*arg*/) { checkpoints.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // We want the leaf transactions to block and hold everyone back. + std::vector<port::Thread> threads; + for (uint32_t i = 0; i < 15; i++) { + std::function<void()> blocking_thread = [&, i] { + auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr, + true /* exclusive */); + ASSERT_OK(s); + txns[i]->Rollback(); + delete txns[i]; + }; + threads.emplace_back(blocking_thread); + } + + // Wait until all threads are waiting on each other. + while (checkpoints.load() != 15) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Complete the cycle T[16 - 31] -> T1 + for (uint32_t i = 15; i < 31; i++) { + auto s = + txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */); + ASSERT_TRUE(s.IsDeadlock()); + + // Calculate next buffer len, plateau at 5 when 5 records are inserted. + const uint32_t curr_dlock_buffer_len_ = + (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14); + + auto dlock_buffer = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_); + auto dlock_entry = dlock_buffer[0].path; + ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks); + int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time; + int64_t cur_deadlock_time = 0; + for (auto const& dl_path_rec : dlock_buffer) { + cur_deadlock_time = dl_path_rec.deadlock_time; + ASSERT_NE(cur_deadlock_time, 0); + ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time); + pre_deadlock_time = cur_deadlock_time; + } + + int64_t curr_waiting_key = 0; + + // Offset of each txn id from the root of the shared dlock tree's txn id. + int64_t offset_root = dlock_entry[0].m_txn_id - 1; + // Offset of the final entry in the dlock path from the root's txn id. + TransactionID leaf_id = + dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root; + + for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) { + auto dl_node = *it; + ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id); + ASSERT_EQ(dl_node.m_cf_id, 0); + ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key)); + ASSERT_EQ(dl_node.m_exclusive, true); + + if (curr_waiting_key == 0) { + curr_waiting_key = leaf_id; + } + curr_waiting_key /= 2; + leaf_id /= 2; + } + } + + // Rollback the leaf transaction. + for (uint32_t i = 15; i < 31; i++) { + txns[i]->Rollback(); + delete txns[i]; + } + + for (auto& t : threads) { + t.join(); + } + + // Downsize the buffer and verify the 3 latest deadlocks are preserved. + auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(3); + auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 3); + + for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) { + for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) { + ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id, + dlock_buffer_before_resize[i].path[j].m_txn_id); + } + } + + // Upsize the buffer and verify the 3 latest dealocks are preserved. + dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(5); + dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 3); + + for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) { + for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) { + ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id, + dlock_buffer_before_resize[i].path[j].m_txn_id); + } + } + + // Downsize to 0 and verify the size is consistent. + dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(0); + dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 0); + + // Upsize from 0 to verify the size is persistent. + dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(3); + dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 0); + + // Contrived case of shared lock of cycle size 2 to verify that a shared + // lock causing a deadlock is correctly reported as "shared" in the buffer. + std::vector<Transaction*> txns_shared(2); + + // Create a cycle of size 2. + for (uint32_t i = 0; i < 2; i++) { + txns_shared[i] = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txns_shared[i]); + auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr); + ASSERT_OK(s); + } + + std::atomic<uint32_t> checkpoints_shared(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector<port::Thread> threads_shared; + for (uint32_t i = 0; i < 1; i++) { + std::function<void()> blocking_thread = [&, i] { + auto s = + txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); + ASSERT_OK(s); + txns_shared[i]->Rollback(); + delete txns_shared[i]; + }; + threads_shared.emplace_back(blocking_thread); + } + + // Wait until all threads are waiting on each other. + while (checkpoints_shared.load() != 1) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Complete the cycle T2 -> T1 with a shared lock. + auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false); + ASSERT_TRUE(s.IsDeadlock()); + + auto dlock_buffer = db->GetDeadlockInfoBuffer(); + + // Verify the size of the buffer and the single path. + ASSERT_EQ(dlock_buffer.size(), 1); + ASSERT_EQ(dlock_buffer[0].path.size(), 2); + + // Verify the exclusivity field of the transactions in the deadlock path. + ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive); + ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive); + txns_shared[1]->Rollback(); + delete txns_shared[1]; + + for (auto& t : threads_shared) { + t.join(); + } +} + +#ifndef ROCKSDB_VALGRIND_RUN +TEST_P(TransactionStressTest, DeadlockCycle) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + + // offset by 2 from the max depth to test edge case + const uint32_t kMaxCycleLength = 52; + + txn_options.lock_timeout = 1000000; + txn_options.deadlock_detect = true; + + for (uint32_t len = 2; len < kMaxCycleLength; len++) { + // Set up a long wait for chain like this: + // + // T1 -> T2 -> T3 -> ... -> Tlen + + std::vector<Transaction*> txns(len); + + for (uint32_t i = 0; i < len; i++) { + txns[i] = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txns[i]); + auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr); + ASSERT_OK(s); + } + + std::atomic<uint32_t> checkpoints(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + [&](void* /*arg*/) { checkpoints.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // We want the last transaction in the chain to block and hold everyone + // back. + std::vector<port::Thread> threads; + for (uint32_t i = 0; i < len - 1; i++) { + std::function<void()> blocking_thread = [&, i] { + auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); + ASSERT_OK(s); + txns[i]->Rollback(); + delete txns[i]; + }; + threads.emplace_back(blocking_thread); + } + + // Wait until all threads are waiting on each other. + while (checkpoints.load() != len - 1) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Complete the cycle Tlen -> T1 + auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr); + ASSERT_TRUE(s.IsDeadlock()); + + const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1); + uint32_t curr_waiting_key = 0; + TransactionID curr_txn_id = txns[0]->GetID(); + + auto dlock_buffer = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_); + uint32_t check_len = len; + bool check_limit_flag = false; + + // Special case for a deadlock path that exceeds the maximum depth. + if (len > 50) { + check_len = 0; + check_limit_flag = true; + } + auto dlock_entry = dlock_buffer[0].path; + ASSERT_EQ(dlock_entry.size(), check_len); + ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag); + + int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time; + int64_t cur_deadlock_time = 0; + for (auto const& dl_path_rec : dlock_buffer) { + cur_deadlock_time = dl_path_rec.deadlock_time; + ASSERT_NE(cur_deadlock_time, 0); + ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time); + pre_deadlock_time = cur_deadlock_time; + } + + // Iterates backwards over path verifying decreasing txn_ids. + for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) { + auto dl_node = *it; + ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1); + ASSERT_EQ(dl_node.m_cf_id, 0); + ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key)); + ASSERT_EQ(dl_node.m_exclusive, true); + + curr_txn_id--; + if (curr_waiting_key == 0) { + curr_waiting_key = len; + } + curr_waiting_key--; + } + + // Rollback the last transaction. + txns[len - 1]->Rollback(); + delete txns[len - 1]; + + for (auto& t : threads) { + t.join(); + } + } +} + +TEST_P(TransactionStressTest, DeadlockStress) { + const uint32_t NUM_TXN_THREADS = 10; + const uint32_t NUM_KEYS = 100; + const uint32_t NUM_ITERS = 10000; + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + + txn_options.lock_timeout = 1000000; + txn_options.deadlock_detect = true; + std::vector<std::string> keys; + + for (uint32_t i = 0; i < NUM_KEYS; i++) { + db->Put(write_options, Slice(ToString(i)), Slice("")); + keys.push_back(ToString(i)); + } + + size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id()); + Random rnd(static_cast<uint32_t>(tid)); + std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) { + std::default_random_engine g(seed); + + Transaction* txn; + for (uint32_t i = 0; i < NUM_ITERS; i++) { + txn = db->BeginTransaction(write_options, txn_options); + auto random_keys = keys; + std::shuffle(random_keys.begin(), random_keys.end(), g); + + // Lock keys in random order. + for (const auto& k : random_keys) { + // Lock mostly for shared access, but exclusive 1/4 of the time. + auto s = + txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0); + if (!s.ok()) { + ASSERT_TRUE(s.IsDeadlock()); + txn->Rollback(); + break; + } + } + + delete txn; + } + }; + + std::vector<port::Thread> threads; + for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { + threads.emplace_back(stress_thread, rnd.Next()); + } + + for (auto& t : threads) { + t.join(); + } +} +#endif // ROCKSDB_VALGRIND_RUN + +TEST_P(TransactionTest, CommitTimeBatchFailTest) { + WriteOptions write_options; + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + + ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog")); + + s = txn1->Put("foo", "bar"); + ASSERT_OK(s); + + // fails due to non-empty commit-time batch + s = txn1->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + delete txn1; +} + +TEST_P(TransactionTest, LogMarkLeakTest) { + TransactionOptions txn_options; + WriteOptions write_options; + options.write_buffer_size = 1024; + ASSERT_OK(ReOpenNoDelete()); + assert(db != nullptr); + Random rnd(47); + std::vector<Transaction*> txns; + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + // At the beginning there should be no log containing prepare data + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + for (size_t i = 0; i < 100; i++) { + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName("xid" + ToString(i))); + ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar"))); + ASSERT_OK(txn->Prepare()); + ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + if (rnd.OneIn(5)) { + txns.push_back(txn); + } else { + ASSERT_OK(txn->Commit()); + delete txn; + } + db_impl->TEST_FlushMemTable(true); + } + for (auto txn : txns) { + ASSERT_OK(txn->Commit()); + delete txn; + } + // At the end there should be no log left containing prepare data + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + // Make sure that the underlying data structures are properly truncated and + // cause not leak + ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0); + ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0); +} + +TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { + for (bool cwb4recovery : {true, false}) { + ASSERT_OK(ReOpen()); + WriteOptions write_options; + ReadOptions read_options; + + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery; + + string value; + Status s; + + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + ASSERT_EQ(db->GetTransactionByName("xid"), txn); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + // regular db put + s = db->Put(write_options, Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + // regular db read + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "bar2"); + + // commit time put + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")); + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")); + + // nothing has been prepped yet + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + s = txn->Prepare(); + ASSERT_OK(s); + + // data not im mem yet + s = db->Get(read_options, Slice("foo"), &value); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(read_options, Slice("gtid"), &value); + ASSERT_TRUE(s.IsNotFound()); + + // find trans in list of prepared transactions + std::vector<Transaction*> prepared_trans; + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 1); + ASSERT_EQ(prepared_trans.front()->GetName(), "xid"); + + auto log_containing_prep = + db_impl->TEST_FindMinLogContainingOutstandingPrep(); + ASSERT_GT(log_containing_prep, 0); + + // make commit + s = txn->Commit(); + ASSERT_OK(s); + + // value is now available + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + if (!cwb4recovery) { + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } + + // we already committed + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // no longer is prepared results + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); + + // heap should not care about prepared section anymore + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // but now our memtable should be referencing the prep section + ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep()); + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } + + db_impl->TEST_FlushMemTable(true); + // After flush the recoverable state must be visible + if (cwb4recovery) { + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } + + // after memtable flush we can now relese the log + ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep); + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + + delete txn; + + if (cwb4recovery) { + // kill and reopen to trigger recovery + s = ReOpenNoDelete(); + ASSERT_OK(s); + assert(db != nullptr); + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } + } +} + +TEST_P(TransactionTest, TwoPhaseNameTest) { + Status s; + + WriteOptions write_options; + TransactionOptions txn_options; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + Transaction* txn3 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn3); + delete txn3; + + // cant prepare txn without name + s = txn1->Prepare(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // name too short + s = txn1->SetName(""); + ASSERT_EQ(s, Status::InvalidArgument()); + + // name too long + s = txn1->SetName(std::string(513, 'x')); + ASSERT_EQ(s, Status::InvalidArgument()); + + // valid set name + s = txn1->SetName("name1"); + ASSERT_OK(s); + + // cant have duplicate name + s = txn2->SetName("name1"); + ASSERT_EQ(s, Status::InvalidArgument()); + + // shouldn't be able to prepare + s = txn2->Prepare(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // valid name set + s = txn2->SetName("name2"); + ASSERT_OK(s); + + // cant reset name + s = txn2->SetName("name3"); + ASSERT_EQ(s, Status::InvalidArgument()); + + ASSERT_EQ(txn1->GetName(), "name1"); + ASSERT_EQ(txn2->GetName(), "name2"); + + s = txn1->Prepare(); + ASSERT_OK(s); + + // can't rename after prepare + s = txn1->SetName("name4"); + ASSERT_EQ(s, Status::InvalidArgument()); + + txn1->Rollback(); + txn2->Rollback(); + delete txn1; + delete txn2; +} + +TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { + for (bool cwb4recovery : {true, false}) { + for (bool test_with_empty_wal : {true, false}) { + if (!cwb4recovery && test_with_empty_wal) { + continue; + } + ASSERT_OK(ReOpen()); + Status s; + std::string value; + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = + cwb4recovery; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + s = txn1->SetName("joe"); + ASSERT_OK(s); + + s = txn2->SetName("bob"); + ASSERT_OK(s); + + s = txn1->Prepare(); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; + + txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")); + + s = txn2->Prepare(); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + + delete txn2; + if (!cwb4recovery) { + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + } else { + if (test_with_empty_wal) { + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + db_impl->TEST_FlushMemTable(true); + // After flush the state must be visible + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + } + db->FlushWAL(true); + // kill and reopen to trigger recovery + s = ReOpenNoDelete(); + ASSERT_OK(s); + assert(db != nullptr); + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + } + } + } +} + +#ifndef ROCKSDB_VALGRIND_RUN +TEST_P(TransactionStressTest, TwoPhaseExpirationTest) { + Status s; + + WriteOptions write_options; + TransactionOptions txn_options; + txn_options.expiration = 500; // 500ms + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + ASSERT_TRUE(txn1); + + s = txn1->SetName("joe"); + ASSERT_OK(s); + s = txn2->SetName("bob"); + ASSERT_OK(s); + + s = txn1->Prepare(); + ASSERT_OK(s); + + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = txn2->Prepare(); + ASSERT_EQ(s, Status::Expired()); + + delete txn1; + delete txn2; +} + +TEST_P(TransactionTest, TwoPhaseRollbackTest) { + WriteOptions write_options; + ReadOptions read_options; + + TransactionOptions txn_options; + + std::string value; + Status s; + + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("tfoo"), Slice("tbar")); + ASSERT_OK(s); + + // value is readable form txn + s = txn->Get(read_options, Slice("tfoo"), &value); + ASSERT_OK(s); + ASSERT_EQ(value, "tbar"); + + // issue rollback + s = txn->Rollback(); + ASSERT_OK(s); + + // value is nolonger readable + s = txn->Get(read_options, Slice("tfoo"), &value); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(txn->GetNumPuts(), 0); + + // put new txn values + s = txn->Put(Slice("tfoo2"), Slice("tbar2")); + ASSERT_OK(s); + + // new value is readable from txn + s = txn->Get(read_options, Slice("tfoo2"), &value); + ASSERT_OK(s); + ASSERT_EQ(value, "tbar2"); + + s = txn->Prepare(); + ASSERT_OK(s); + + // flush to next wal + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_OK(s); + db_impl->TEST_FlushMemTable(true); + + // issue rollback (marker written to WAL) + s = txn->Rollback(); + ASSERT_OK(s); + + // value is nolonger readable + s = txn->Get(read_options, Slice("tfoo2"), &value); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(txn->GetNumPuts(), 0); + + // make commit + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // try rollback again + s = txn->Rollback(); + ASSERT_EQ(s, Status::InvalidArgument()); + + delete txn; +} + +TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + + TransactionOptions txn_options; + + std::string value; + Status s; + + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + ASSERT_EQ(db->GetTransactionByName("xid"), txn); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + // txn read + s = txn->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + // regular db put + s = db->Put(write_options, Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + db_impl->TEST_FlushMemTable(true); + + // regular db read + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "bar2"); + + // nothing has been prepped yet + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + // still not available to db + s = db->Get(read_options, Slice("foo"), &value); + ASSERT_TRUE(s.IsNotFound()); + + db->FlushWAL(false); + delete txn; + // kill and reopen + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + s = ReOpenNoDelete(); + ASSERT_OK(s); + assert(db != nullptr); + db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + + // find trans in list of prepared transactions + std::vector<Transaction*> prepared_trans; + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 1); + + txn = prepared_trans.front(); + ASSERT_TRUE(txn); + ASSERT_EQ(txn->GetName(), "xid"); + ASSERT_EQ(db->GetTransactionByName("xid"), txn); + + // log has been marked + auto log_containing_prep = + db_impl->TEST_FindMinLogContainingOutstandingPrep(); + ASSERT_GT(log_containing_prep, 0); + + // value is readable from txn + s = txn->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + // make commit + s = txn->Commit(); + ASSERT_OK(s); + + // value is now available + db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar"); + + // we already committed + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // no longer is prepared results + prepared_trans.clear(); + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + + // transaction should no longer be visible + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); + + // heap should not care about prepared section anymore + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // but now our memtable should be referencing the prep section + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep()); + + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } + + // Add a dummy record to memtable before a flush. Otherwise, the + // memtable will be empty and flush will be skipped. + s = db->Put(write_options, Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + + db_impl->TEST_FlushMemTable(true); + + // after memtable flush we can now release the log + ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep); + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + + delete txn; + + // deleting transaction should unregister transaction + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); +} +#endif // ROCKSDB_VALGRIND_RUN + +// TODO this test needs to be updated with serial commits +TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) { + // mix transaction writes and regular writes + const uint32_t NUM_TXN_THREADS = 50; + std::atomic<uint32_t> txn_thread_num(0); + + std::function<void()> txn_write_thread = [&]() { + uint32_t id = txn_thread_num.fetch_add(1); + + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + TransactionOptions txn_options; + txn_options.lock_timeout = 1000000; + if (id % 2 == 0) { + txn_options.expiration = 1000000; + } + TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(id))); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName(name)); + for (int i = 0; i < 10; i++) { + std::string key(name + "_" + std::string(1, static_cast<char>('A' + i))); + ASSERT_OK(txn->Put(key, "val")); + } + ASSERT_OK(txn->Prepare()); + ASSERT_OK(txn->Commit()); + delete txn; + }; + + // assure that all thread are in the same write group + std::atomic<uint32_t> t_wait_on_prepare(0); + std::atomic<uint32_t> t_wait_on_commit(0); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + auto* writer = reinterpret_cast<WriteThread::Writer*>(arg); + + if (writer->ShouldWriteToWAL()) { + t_wait_on_prepare.fetch_add(1); + // wait for friends + while (t_wait_on_prepare.load() < NUM_TXN_THREADS) { + env->SleepForMicroseconds(10); + } + } else if (writer->ShouldWriteToMemtable()) { + t_wait_on_commit.fetch_add(1); + // wait for friends + while (t_wait_on_commit.load() < NUM_TXN_THREADS) { + env->SleepForMicroseconds(10); + } + } else { + FAIL(); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // do all the writes + std::vector<port::Thread> threads; + for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { + threads.emplace_back(txn_write_thread); + } + for (auto& t : threads) { + t.join(); + } + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + ReadOptions read_options; + std::string value; + Status s; + for (uint32_t t = 0; t < NUM_TXN_THREADS; t++) { + TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(t))); + for (int i = 0; i < 10; i++) { + std::string key(name + "_" + std::string(1, static_cast<char>('A' + i))); + s = db->Get(read_options, key, &value); + ASSERT_OK(s); + ASSERT_EQ(value, "val"); + } + } +} + +TEST_P(TransactionStressTest, TwoPhaseLongPrepareTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("bob"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + delete txn; + + for (int i = 0; i < 1000; i++) { + std::string key(i, 'k'); + std::string val(1000, 'v'); + assert(db != nullptr); + s = db->Put(write_options, key, val); + ASSERT_OK(s); + + if (i % 29 == 0) { + // crash + env->SetFilesystemActive(false); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ReOpenNoDelete(); + } else if (i % 37 == 0) { + // close + ReOpenNoDelete(); + } + } + + // commit old txn + txn = db->GetTransactionByName("bob"); + ASSERT_TRUE(txn); + s = txn->Commit(); + ASSERT_OK(s); + + // verify data txn data + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + // verify non txn data + for (int i = 0; i < 1000; i++) { + std::string key(i, 'k'); + std::string val(1000, 'v'); + s = db->Get(read_options, key, &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, val); + } + + delete txn; +} + +TEST_P(TransactionTest, TwoPhaseSequenceTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + s = txn->Put(Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + s = txn->Put(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + s = txn->Put(Slice("foo4"), Slice("bar4")); + ASSERT_OK(s); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + // make commit + s = txn->Commit(); + ASSERT_OK(s); + + delete txn; + + // kill and reopen + env->SetFilesystemActive(false); + ReOpenNoDelete(); + assert(db != nullptr); + + // value is now available + s = db->Get(read_options, "foo4", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar4"); +} + +TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("a"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + delete txn; + + // kill and reopen + env->SetFilesystemActive(false); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ReOpenNoDelete(); + + // commit old txn + txn = db->GetTransactionByName("a"); + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + delete txn; + + txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("b"); + ASSERT_OK(s); + + s = txn->Put(Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + + s = txn->Prepare(); + ASSERT_OK(s); + + s = txn->Commit(); + ASSERT_OK(s); + + delete txn; + + // kill and reopen + env->SetFilesystemActive(false); + ReOpenNoDelete(); + assert(db != nullptr); + + // value is now available + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + s = db->Get(read_options, "foo2", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar2"); +} + +TEST_P(TransactionTest, TwoPhaseLogRollingTest) { + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + + Status s; + std::string v; + ColumnFamilyHandle *cfa, *cfb; + + // Create 2 new column families + ColumnFamilyOptions cf_options; + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + ASSERT_OK(s); + + WriteOptions wopts; + wopts.disableWAL = false; + wopts.sync = true; + + TransactionOptions topts1; + Transaction* txn1 = db->BeginTransaction(wopts, topts1); + s = txn1->SetName("xid1"); + ASSERT_OK(s); + + TransactionOptions topts2; + Transaction* txn2 = db->BeginTransaction(wopts, topts2); + s = txn2->SetName("xid2"); + ASSERT_OK(s); + + // transaction put in two column families + s = txn1->Put(cfa, "ka1", "va1"); + ASSERT_OK(s); + + // transaction put in two column families + s = txn2->Put(cfa, "ka2", "va2"); + ASSERT_OK(s); + s = txn2->Put(cfb, "kb2", "vb2"); + ASSERT_OK(s); + + // write prep section to wal + s = txn1->Prepare(); + ASSERT_OK(s); + + // our log should be in the heap + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + + // flush default cf to crate new log + s = db->Put(wopts, "foo", "bar"); + ASSERT_OK(s); + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + // make sure we are on a new log + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + + // put txn2 prep section in this log + s = txn2->Prepare(); + ASSERT_OK(s); + ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + + // heap should still see first log + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + + // commit txn1 + s = txn1->Commit(); + ASSERT_OK(s); + + // heap should now show txn2s log + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn2->GetLogNumber()); + + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // we should see txn1s log refernced by the memtables + ASSERT_EQ(txn1->GetLogNumber(), + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } + + // flush default cf to crate new log + s = db->Put(wopts, "foo", "bar2"); + ASSERT_OK(s); + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + // make sure we are on a new log + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + + // commit txn2 + s = txn2->Commit(); + ASSERT_OK(s); + + // heap should not show any logs + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // should show the first txn log + ASSERT_EQ(txn1->GetLogNumber(), + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } + + // flush only cfa memtable + s = db_impl->TEST_FlushMemTable(true, false, cfa); + ASSERT_OK(s); + + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // should show the first txn log + ASSERT_EQ(txn2->GetLogNumber(), + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } + + // flush only cfb memtable + s = db_impl->TEST_FlushMemTable(true, false, cfb); + ASSERT_OK(s); + + // should show not dependency on logs + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + delete txn1; + delete txn2; + delete cfa; + delete cfb; +} + +TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + + Status s; + ColumnFamilyHandle *cfa, *cfb; + + ColumnFamilyOptions cf_options; + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + ASSERT_OK(s); + + WriteOptions wopts; + wopts.disableWAL = false; + wopts.sync = true; + + auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa); + auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb); + + TransactionOptions topts1; + Transaction* txn1 = db->BeginTransaction(wopts, topts1); + s = txn1->SetName("xid1"); + ASSERT_OK(s); + s = txn1->Put(cfa, "boys", "girls1"); + ASSERT_OK(s); + + Transaction* txn2 = db->BeginTransaction(wopts, topts1); + s = txn2->SetName("xid2"); + ASSERT_OK(s); + s = txn2->Put(cfb, "up", "down1"); + ASSERT_OK(s); + + // prepre transaction in LOG A + s = txn1->Prepare(); + ASSERT_OK(s); + + // prepre transaction in LOG A + s = txn2->Prepare(); + ASSERT_OK(s); + + // regular put so that mem table can actually be flushed for log rolling + s = db->Put(wopts, "cats", "dogs1"); + ASSERT_OK(s); + + auto prepare_log_no = txn1->GetLogNumber(); + + // roll to LOG B + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + // now we pause background work so that + // imm()s are not flushed before we can check their status + s = db_impl->PauseBackgroundWork(); + ASSERT_OK(s); + + ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // This cf is empty and should ref the latest log + ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no); + ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // This cf is not flushed yet and should ref the log that has its data + ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no); + break; + default: + assert(false); + } + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + prepare_log_no); + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); + + // commit in LOG B + s = txn1->Commit(); + ASSERT_OK(s); + + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), + prepare_log_no); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); + break; + default: + assert(false); + } + + ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog()); + + // request a flush for all column families such that the earliest + // alive log file can be killed + db_impl->TEST_SwitchWAL(); + // log cannot be flushed because txn2 has not been commited + ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); + ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog()); + + // assert that cfa has a flush requested + ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested()); + + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // cfb should not be flushed becuse it has no data from LOG A + ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // cfb should be flushed becuse it has prepared data from LOG A + ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested()); + break; + default: + assert(false); + } + + // cfb now has data from LOG A + s = txn2->Commit(); + ASSERT_OK(s); + + db_impl->TEST_SwitchWAL(); + ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog()); + + // we should see that cfb now has a flush requested + ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested()); + + // all data in LOG A resides in a memtable that has been + // requested for a flush + ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed()); + + delete txn1; + delete txn2; + delete cfa; + delete cfb; +} +/* + * 1) use prepare to keep first log around to determine starting sequence + * during recovery. + * 2) insert many values, skipping wal, to increase seqid. + * 3) insert final value into wal + * 4) recover and see that final value was properly recovered - not + * hidden behind improperly summed sequence ids + */ +TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) { + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + WriteOptions wal_on, wal_off; + wal_on.sync = true; + wal_on.disableWAL = false; + wal_off.disableWAL = true; + ReadOptions read_options; + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(wal_on, txn_options); + + s = txn1->SetName("1"); + ASSERT_OK(s); + + s = db->Put(wal_on, "first", "first"); + ASSERT_OK(s); + + s = txn1->Put(Slice("dummy"), Slice("dummy")); + ASSERT_OK(s); + s = txn1->Prepare(); + ASSERT_OK(s); + + s = db->Put(wal_off, "cats", "dogs1"); + ASSERT_OK(s); + s = db->Put(wal_off, "cats", "dogs2"); + ASSERT_OK(s); + s = db->Put(wal_off, "cats", "dogs3"); + ASSERT_OK(s); + + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + s = db->Put(wal_on, "cats", "dogs4"); + ASSERT_OK(s); + + db->FlushWAL(false); + + // kill and reopen + env->SetFilesystemActive(false); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ReOpenNoDelete(); + assert(db != nullptr); + + s = db->Get(read_options, "first", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "first"); + + s = db->Get(read_options, "cats", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs4"); +} + +TEST_P(TransactionTest, FirstWriteTest) { + WriteOptions write_options; + + // Test conflict checking against the very first write to a db. + // The transaction's snapshot will have seq 1 and the following write + // will have sequence 1. + Status s = db->Put(write_options, "A", "a"); + + Transaction* txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + ASSERT_OK(s); + + s = txn->Put("A", "b"); + ASSERT_OK(s); + + delete txn; +} + +TEST_P(TransactionTest, FirstWriteTest2) { + WriteOptions write_options; + + Transaction* txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + // Test conflict checking against the very first write to a db. + // The transaction's snapshot is a seq 0 while the following write + // will have sequence 1. + Status s = db->Put(write_options, "A", "a"); + ASSERT_OK(s); + + s = txn->Put("A", "b"); + ASSERT_TRUE(s.IsBusy()); + + delete txn; +} + +TEST_P(TransactionTest, WriteOptionsTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = true; + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + ASSERT_TRUE(txn->GetWriteOptions()->sync); + + write_options.sync = false; + txn->SetWriteOptions(write_options); + ASSERT_FALSE(txn->GetWriteOptions()->sync); + ASSERT_TRUE(txn->GetWriteOptions()->disableWAL); + + delete txn; +} + +TEST_P(TransactionTest, WriteConflictTest) { + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + db->Put(write_options, "foo", "A"); + db->Put(write_options, "foo2", "B"); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = txn->Put("foo", "A2"); + ASSERT_OK(s); + + s = txn->Put("foo2", "B2"); + ASSERT_OK(s); + + // This Put outside of a transaction will conflict with the previous write + s = db->Put(write_options, "foo", "xxx"); + ASSERT_TRUE(s.IsTimedOut()); + + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "A"); + + s = txn->Commit(); + ASSERT_OK(s); + + db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "A2"); + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "B2"); + + delete txn; +} + +TEST_P(TransactionTest, WriteConflictTest2) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + std::string value; + Status s; + + db->Put(write_options, "foo", "bar"); + + txn_options.set_snapshot = true; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); + + // This Put outside of a transaction will conflict with a later write + s = db->Put(write_options, "foo", "barz"); + ASSERT_OK(s); + + s = txn->Put("foo2", "X"); + ASSERT_OK(s); + + s = txn->Put("foo", + "bar2"); // Conflicts with write done after snapshot taken + ASSERT_TRUE(s.IsBusy()); + + s = txn->Put("foo3", "Y"); + ASSERT_OK(s); + + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "barz"); + + ASSERT_EQ(2, txn->GetNumKeys()); + + s = txn->Commit(); + ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3 + + // Verify that transaction wrote foo2 and foo3 but not foo + db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "barz"); + + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "X"); + + db->Get(read_options, "foo3", &value); + ASSERT_EQ(value, "Y"); + + delete txn; +} + +TEST_P(TransactionTest, ReadConflictTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + TransactionOptions txn_options; + std::string value; + Status s; + + db->Put(write_options, "foo", "bar"); + db->Put(write_options, "foo2", "bar"); + + txn_options.set_snapshot = true; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); + + txn->SetSnapshot(); + snapshot_read_options.snapshot = txn->GetSnapshot(); + + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar"); + + // This Put outside of a transaction will conflict with the previous read + s = db->Put(write_options, "foo", "barz"); + ASSERT_TRUE(s.IsTimedOut()); + + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar"); + + s = txn->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar"); + + s = txn->Commit(); + ASSERT_OK(s); + + delete txn; +} + +TEST_P(TransactionTest, TxnOnlyTest) { + // Test to make sure transactions work when there are no other writes in an + // empty db. + + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = txn->Put("x", "y"); + ASSERT_OK(s); + + s = txn->Commit(); + ASSERT_OK(s); + + delete txn; +} + +TEST_P(TransactionTest, FlushTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + std::string value; + Status s; + + db->Put(write_options, Slice("foo"), Slice("bar")); + db->Put(write_options, Slice("foo2"), Slice("bar")); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + snapshot_read_options.snapshot = txn->GetSnapshot(); + + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar"); + + s = txn->Put(Slice("foo"), Slice("bar2")); + ASSERT_OK(s); + + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + + // Put a random key so we have a memtable to flush + s = db->Put(write_options, "dummy", "dummy"); + ASSERT_OK(s); + + // force a memtable flush + FlushOptions flush_ops; + db->Flush(flush_ops); + + s = txn->Commit(); + // txn should commit since the flushed table is still in MemtableList History + ASSERT_OK(s); + + db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + + delete txn; +} + +TEST_P(TransactionTest, FlushTest2) { + const size_t num_tests = 3; + + for (size_t n = 0; n < num_tests; n++) { + // Test different table factories + switch (n) { + case 0: + break; + case 1: + options.table_factory.reset(new mock::MockTableFactory()); + break; + case 2: { + PlainTableOptions pt_opts; + pt_opts.hash_table_ratio = 0; + options.table_factory.reset(NewPlainTableFactory(pt_opts)); + break; + } + } + + Status s = ReOpen(); + ASSERT_OK(s); + assert(db != nullptr); + + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + TransactionOptions txn_options; + string value; + + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + + db->Put(write_options, Slice("foo"), Slice("bar")); + db->Put(write_options, Slice("foo2"), Slice("bar2")); + db->Put(write_options, Slice("foo3"), Slice("bar3")); + + txn_options.set_snapshot = true; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); + + snapshot_read_options.snapshot = txn->GetSnapshot(); + + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar"); + + s = txn->Put(Slice("foo"), Slice("bar2")); + ASSERT_OK(s); + + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + // verify foo is locked by txn + s = db->Delete(write_options, "foo"); + ASSERT_TRUE(s.IsTimedOut()); + + s = db->Put(write_options, "Z", "z"); + ASSERT_OK(s); + s = db->Put(write_options, "dummy", "dummy"); + ASSERT_OK(s); + + s = db->Put(write_options, "S", "s"); + ASSERT_OK(s); + s = db->SingleDelete(write_options, "S"); + ASSERT_OK(s); + + s = txn->Delete("S"); + // Should fail after encountering a write to S in memtable + ASSERT_TRUE(s.IsBusy()); + + // force a memtable flush + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + // Put a random key so we have a MemTable to flush + s = db->Put(write_options, "dummy", "dummy2"); + ASSERT_OK(s); + + // force a memtable flush + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); + + s = db->Put(write_options, "dummy", "dummy3"); + ASSERT_OK(s); + + // force a memtable flush + // Since our test db has max_write_buffer_number=2, this flush will cause + // the first memtable to get purged from the MemtableList history. + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); + + s = txn->Put("X", "Y"); + // Should succeed after verifying there is no write to X in SST file + ASSERT_OK(s); + + s = txn->Put("Z", "zz"); + // Should fail after encountering a write to Z in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->GetForUpdate(read_options, "foo2", &value); + // should succeed since key was written before txn started + ASSERT_OK(s); + // verify foo2 is locked by txn + s = db->Delete(write_options, "foo2"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn->Delete("S"); + // Should fail after encountering a write to S in SST file + ASSERT_TRUE(s.IsBusy()); + + // Write a bunch of keys to db to force a compaction + Random rnd(47); + for (int i = 0; i < 1000; i++) { + s = db->Put(write_options, std::to_string(i), + test::CompressibleString(&rnd, 0.8, 100, &value)); + ASSERT_OK(s); + } + + s = txn->Put("X", "yy"); + // Should succeed after verifying there is no write to X in SST file + ASSERT_OK(s); + + s = txn->Put("Z", "zzz"); + // Should fail after encountering a write to Z in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->Delete("S"); + // Should fail after encountering a write to S in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->GetForUpdate(read_options, "foo3", &value); + // should succeed since key was written before txn started + ASSERT_OK(s); + // verify foo3 is locked by txn + s = db->Delete(write_options, "foo3"); + ASSERT_TRUE(s.IsTimedOut()); + + db_impl->TEST_WaitForCompact(); + + s = txn->Commit(); + ASSERT_OK(s); + + // Transaction should only write the keys that succeeded. + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + + s = db->Get(read_options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("yy", value); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ("z", value); + + delete txn; + } +} + +TEST_P(TransactionTest, NoSnapshotTest) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + db->Put(write_options, "AAA", "bar"); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + // Modify key after transaction start + db->Put(write_options, "AAA", "bar1"); + + // Read and write without a snap + txn->GetForUpdate(read_options, "AAA", &value); + ASSERT_EQ(value, "bar1"); + s = txn->Put("AAA", "bar2"); + ASSERT_OK(s); + + // Should commit since read/write was done after data changed + s = txn->Commit(); + ASSERT_OK(s); + + txn->GetForUpdate(read_options, "AAA", &value); + ASSERT_EQ(value, "bar2"); + + delete txn; +} + +TEST_P(TransactionTest, MultipleSnapshotTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + std::string value; + Status s; + + ASSERT_OK(db->Put(write_options, "AAA", "bar")); + ASSERT_OK(db->Put(write_options, "BBB", "bar")); + ASSERT_OK(db->Put(write_options, "CCC", "bar")); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + db->Put(write_options, "AAA", "bar1"); + + // Read and write without a snapshot + ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value)); + ASSERT_EQ(value, "bar1"); + s = txn->Put("AAA", "bar2"); + ASSERT_OK(s); + + // Modify BBB before snapshot is taken + ASSERT_OK(db->Put(write_options, "BBB", "bar1")); + + txn->SetSnapshot(); + snapshot_read_options.snapshot = txn->GetSnapshot(); + + // Read and write with snapshot + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value)); + ASSERT_EQ(value, "bar1"); + s = txn->Put("BBB", "bar2"); + ASSERT_OK(s); + + ASSERT_OK(db->Put(write_options, "CCC", "bar1")); + + // Set a new snapshot + txn->SetSnapshot(); + snapshot_read_options.snapshot = txn->GetSnapshot(); + + // Read and write with snapshot + txn->GetForUpdate(snapshot_read_options, "CCC", &value); + ASSERT_EQ(value, "bar1"); + s = txn->Put("CCC", "bar2"); + ASSERT_OK(s); + + s = txn->GetForUpdate(read_options, "AAA", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar2"); + s = txn->GetForUpdate(read_options, "BBB", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar2"); + s = txn->GetForUpdate(read_options, "CCC", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar2"); + + s = db->Get(read_options, "AAA", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar1"); + s = db->Get(read_options, "BBB", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar1"); + s = db->Get(read_options, "CCC", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar1"); + + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "AAA", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar2"); + s = db->Get(read_options, "BBB", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar2"); + s = db->Get(read_options, "CCC", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar2"); + + // verify that we track multiple writes to the same key at different snapshots + delete txn; + txn = db->BeginTransaction(write_options); + + // Potentially conflicting writes + db->Put(write_options, "ZZZ", "zzz"); + db->Put(write_options, "XXX", "xxx"); + + txn->SetSnapshot(); + + TransactionOptions txn_options; + txn_options.set_snapshot = true; + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + txn2->SetSnapshot(); + + // This should not conflict in txn since the snapshot is later than the + // previous write (spoiler alert: it will later conflict with txn2). + s = txn->Put("ZZZ", "zzzz"); + ASSERT_OK(s); + + s = txn->Commit(); + ASSERT_OK(s); + + delete txn; + + // This will conflict since the snapshot is earlier than another write to ZZZ + s = txn2->Put("ZZZ", "xxxxx"); + ASSERT_TRUE(s.IsBusy()); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "ZZZ", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzzz"); + + delete txn2; +} + +TEST_P(TransactionTest, ColumnFamiliesTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + TransactionOptions txn_options; + string value; + Status s; + + ColumnFamilyHandle *cfa, *cfb; + ColumnFamilyOptions cf_options; + + // Create 2 new column families + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + ASSERT_OK(s); + + delete cfa; + delete cfb; + delete db; + db = nullptr; + + // open DB with three column families + std::vector<ColumnFamilyDescriptor> column_families; + // have to open default column family + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + // open the new column families + column_families.push_back( + ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); + + std::vector<ColumnFamilyHandle*> handles; + + s = TransactionDB::Open(options, txn_db_options, dbname, column_families, + &handles, &db); + assert(db != nullptr); + ASSERT_OK(s); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + txn->SetSnapshot(); + snapshot_read_options.snapshot = txn->GetSnapshot(); + + txn_options.set_snapshot = true; + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + // Write some data to the db + WriteBatch batch; + batch.Put("foo", "foo"); + batch.Put(handles[1], "AAA", "bar"); + batch.Put(handles[1], "AAAZZZ", "bar"); + s = db->Write(write_options, &batch); + ASSERT_OK(s); + db->Delete(write_options, handles[1], "AAAZZZ"); + + // These keys do not conflict with existing writes since they're in + // different column families + s = txn->Delete("AAA"); + ASSERT_OK(s); + s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + Slice key_slice("AAAZZZ"); + Slice value_slices[2] = {Slice("bar"), Slice("bar")}; + s = txn->Put(handles[2], SliceParts(&key_slice, 1), + SliceParts(value_slices, 2)); + ASSERT_OK(s); + ASSERT_EQ(3, txn->GetNumKeys()); + + s = txn->Commit(); + ASSERT_OK(s); + s = db->Get(read_options, "AAA", &value); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(read_options, handles[2], "AAAZZZ", &value); + ASSERT_EQ(value, "barbar"); + + Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")}; + Slice value_slice("barbarbar"); + + s = txn2->Delete(handles[2], "XXX"); + ASSERT_OK(s); + s = txn2->Delete(handles[1], "XXX"); + ASSERT_OK(s); + + // This write will cause a conflict with the earlier batch write + s = txn2->Put(handles[1], SliceParts(key_slices, 3), + SliceParts(&value_slice, 1)); + ASSERT_TRUE(s.IsBusy()); + + s = txn2->Commit(); + ASSERT_OK(s); + // In the above the latest change to AAAZZZ in handles[1] is delete. + s = db->Get(read_options, handles[1], "AAAZZZ", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn; + delete txn2; + + txn = db->BeginTransaction(write_options, txn_options); + snapshot_read_options.snapshot = txn->GetSnapshot(); + + txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); + + std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2], + handles[0], handles[2]}; + std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"}; + std::vector<std::string> values(4); + + std::vector<Status> results = txn->MultiGetForUpdate( + snapshot_read_options, multiget_cfh, multiget_keys, &values); + ASSERT_OK(results[0]); + ASSERT_OK(results[1]); + ASSERT_OK(results[2]); + ASSERT_TRUE(results[3].IsNotFound()); + ASSERT_EQ(values[0], "bar"); + ASSERT_EQ(values[1], "barbar"); + ASSERT_EQ(values[2], "foo"); + + s = txn->SingleDelete(handles[2], "ZZZ"); + ASSERT_OK(s); + s = txn->Put(handles[2], "ZZZ", "YYY"); + ASSERT_OK(s); + s = txn->Put(handles[2], "ZZZ", "YYYY"); + ASSERT_OK(s); + s = txn->Delete(handles[2], "ZZZ"); + ASSERT_OK(s); + s = txn->Put(handles[2], "AAAZZZ", "barbarbar"); + ASSERT_OK(s); + + ASSERT_EQ(5, txn->GetNumKeys()); + + // Txn should commit + s = txn->Commit(); + ASSERT_OK(s); + s = db->Get(read_options, handles[2], "ZZZ", &value); + ASSERT_TRUE(s.IsNotFound()); + + // Put a key which will conflict with the next txn using the previous snapshot + db->Put(write_options, handles[2], "foo", "000"); + + results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh, + multiget_keys, &values); + // All results should fail since there was a conflict + ASSERT_TRUE(results[0].IsBusy()); + ASSERT_TRUE(results[1].IsBusy()); + ASSERT_TRUE(results[2].IsBusy()); + ASSERT_TRUE(results[3].IsBusy()); + + s = db->Get(read_options, handles[2], "foo", &value); + ASSERT_EQ(value, "000"); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->DropColumnFamily(handles[1]); + ASSERT_OK(s); + s = db->DropColumnFamily(handles[2]); + ASSERT_OK(s); + + delete txn; + delete txn2; + + for (auto handle : handles) { + delete handle; + } +} + +TEST_P(TransactionTest, ColumnFamiliesTest2) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + string value; + Status s; + + ColumnFamilyHandle *one, *two; + ColumnFamilyOptions cf_options; + + // Create 2 new column families + s = db->CreateColumnFamily(cf_options, "ONE", &one); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "TWO", &two); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options); + ASSERT_TRUE(txn1); + Transaction* txn2 = db->BeginTransaction(write_options); + ASSERT_TRUE(txn2); + + s = txn1->Put(one, "X", "1"); + ASSERT_OK(s); + s = txn1->Put(two, "X", "2"); + ASSERT_OK(s); + s = txn1->Put("X", "0"); + ASSERT_OK(s); + + s = txn2->Put(one, "X", "11"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn1->Commit(); + ASSERT_OK(s); + + // Drop first column family + s = db->DropColumnFamily(one); + ASSERT_OK(s); + + // Should fail since column family was dropped. + s = txn2->Commit(); + ASSERT_OK(s); + + delete txn1; + txn1 = db->BeginTransaction(write_options); + ASSERT_TRUE(txn1); + + // Should fail since column family was dropped + s = txn1->Put(one, "X", "111"); + ASSERT_TRUE(s.IsInvalidArgument()); + + s = txn1->Put(two, "X", "222"); + ASSERT_OK(s); + + s = txn1->Put("X", "000"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, two, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("222", value); + + s = db->Get(read_options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("000", value); + + s = db->DropColumnFamily(two); + ASSERT_OK(s); + + delete txn1; + delete txn2; + + delete one; + delete two; +} + +TEST_P(TransactionTest, EmptyTest) { + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + s = db->Put(write_options, "aaa", "aaa"); + ASSERT_OK(s); + + Transaction* txn = db->BeginTransaction(write_options); + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + txn = db->BeginTransaction(write_options); + txn->Rollback(); + delete txn; + + txn = db->BeginTransaction(write_options); + s = txn->GetForUpdate(read_options, "aaa", &value); + ASSERT_EQ(value, "aaa"); + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + s = txn->GetForUpdate(read_options, "aaa", &value); + ASSERT_EQ(value, "aaa"); + + // Conflicts with previous GetForUpdate + s = db->Put(write_options, "aaa", "xxx"); + ASSERT_TRUE(s.IsTimedOut()); + + // transaction expired! + s = txn->Commit(); + ASSERT_OK(s); + delete txn; +} + +TEST_P(TransactionTest, PredicateManyPreceders) { + WriteOptions write_options; + ReadOptions read_options1, read_options2; + TransactionOptions txn_options; + string value; + Status s; + + txn_options.set_snapshot = true; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + read_options1.snapshot = txn1->GetSnapshot(); + + Transaction* txn2 = db->BeginTransaction(write_options); + txn2->SetSnapshot(); + read_options2.snapshot = txn2->GetSnapshot(); + + std::vector<Slice> multiget_keys = {"1", "2", "3"}; + std::vector<std::string> multiget_values; + + std::vector<Status> results = + txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); + ASSERT_TRUE(results[1].IsNotFound()); + + s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate + ASSERT_TRUE(s.IsTimedOut()); + + txn2->Rollback(); + + multiget_values.clear(); + results = + txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); + ASSERT_TRUE(results[1].IsNotFound()); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; + delete txn2; + + txn1 = db->BeginTransaction(write_options, txn_options); + read_options1.snapshot = txn1->GetSnapshot(); + + txn2 = db->BeginTransaction(write_options, txn_options); + read_options2.snapshot = txn2->GetSnapshot(); + + s = txn1->Put("4", "x"); + ASSERT_OK(s); + + s = txn2->Delete("4"); // conflict + ASSERT_TRUE(s.IsTimedOut()); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options2, "4", &value); + ASSERT_TRUE(s.IsBusy()); + + txn2->Rollback(); + + delete txn1; + delete txn2; +} + +TEST_P(TransactionTest, LostUpdate) { + WriteOptions write_options; + ReadOptions read_options, read_options1, read_options2; + TransactionOptions txn_options; + std::string value; + Status s; + + // Test 2 transactions writing to the same key in multiple orders and + // with/without snapshots + + Transaction* txn1 = db->BeginTransaction(write_options); + Transaction* txn2 = db->BeginTransaction(write_options); + + s = txn1->Put("1", "1"); + ASSERT_OK(s); + + s = txn2->Put("1", "2"); // conflict + ASSERT_TRUE(s.IsTimedOut()); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "1", &value); + ASSERT_OK(s); + ASSERT_EQ("1", value); + + delete txn1; + delete txn2; + + txn_options.set_snapshot = true; + txn1 = db->BeginTransaction(write_options, txn_options); + read_options1.snapshot = txn1->GetSnapshot(); + + txn2 = db->BeginTransaction(write_options, txn_options); + read_options2.snapshot = txn2->GetSnapshot(); + + s = txn1->Put("1", "3"); + ASSERT_OK(s); + s = txn2->Put("1", "4"); // conflict + ASSERT_TRUE(s.IsTimedOut()); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "1", &value); + ASSERT_OK(s); + ASSERT_EQ("3", value); + + delete txn1; + delete txn2; + + txn1 = db->BeginTransaction(write_options, txn_options); + read_options1.snapshot = txn1->GetSnapshot(); + + txn2 = db->BeginTransaction(write_options, txn_options); + read_options2.snapshot = txn2->GetSnapshot(); + + s = txn1->Put("1", "5"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = txn2->Put("1", "6"); + ASSERT_TRUE(s.IsBusy()); + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "1", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + delete txn1; + delete txn2; + + txn1 = db->BeginTransaction(write_options, txn_options); + read_options1.snapshot = txn1->GetSnapshot(); + + txn2 = db->BeginTransaction(write_options, txn_options); + read_options2.snapshot = txn2->GetSnapshot(); + + s = txn1->Put("1", "7"); + ASSERT_OK(s); + s = txn1->Commit(); + ASSERT_OK(s); + + txn2->SetSnapshot(); + s = txn2->Put("1", "8"); + ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "1", &value); + ASSERT_OK(s); + ASSERT_EQ("8", value); + + delete txn1; + delete txn2; + + txn1 = db->BeginTransaction(write_options); + txn2 = db->BeginTransaction(write_options); + + s = txn1->Put("1", "9"); + ASSERT_OK(s); + s = txn1->Commit(); + ASSERT_OK(s); + + s = txn2->Put("1", "10"); + ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); + + delete txn1; + delete txn2; + + s = db->Get(read_options, "1", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "10"); +} + +TEST_P(TransactionTest, UntrackedWrites) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + // Verify transaction rollback works for untracked keys. + Transaction* txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + s = txn->PutUntracked("untracked", "0"); + ASSERT_OK(s); + txn->Rollback(); + s = db->Get(read_options, "untracked", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn; + txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + s = db->Put(write_options, "untracked", "x"); + ASSERT_OK(s); + + // Untracked writes should succeed even though key was written after snapshot + s = txn->PutUntracked("untracked", "1"); + ASSERT_OK(s); + s = txn->MergeUntracked("untracked", "2"); + ASSERT_OK(s); + s = txn->DeleteUntracked("untracked"); + ASSERT_OK(s); + + // Conflict + s = txn->Put("untracked", "3"); + ASSERT_TRUE(s.IsBusy()); + + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "untracked", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn; +} + +TEST_P(TransactionTest, ExpiredTransaction) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + string value; + Status s; + + // Set txn expiration timeout to 0 microseconds (expires instantly) + txn_options.expiration = 0; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + s = txn1->Put("X", "1"); + ASSERT_OK(s); + + s = txn1->Put("Y", "1"); + ASSERT_OK(s); + + Transaction* txn2 = db->BeginTransaction(write_options); + + // txn2 should be able to write to X since txn1 has expired + s = txn2->Put("X", "2"); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + s = db->Get(read_options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("2", value); + + s = txn1->Put("Z", "1"); + ASSERT_OK(s); + + // txn1 should fail to commit since it is expired + s = txn1->Commit(); + ASSERT_TRUE(s.IsExpired()); + + s = db->Get(read_options, "Y", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(read_options, "Z", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn1; + delete txn2; +} + +TEST_P(TransactionTest, ReinitializeTest) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + std::string value; + Status s; + + // Set txn expiration timeout to 0 microseconds (expires instantly) + txn_options.expiration = 0; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + // Reinitialize transaction to no long expire + txn_options.expiration = -1; + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->Put("Z", "z"); + ASSERT_OK(s); + + // Should commit since not expired + s = txn1->Commit(); + ASSERT_OK(s); + + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->Put("Z", "zz"); + ASSERT_OK(s); + + // Reinitilize txn1 and verify that Z gets unlocked + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + + Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr); + s = txn2->Put("Z", "zzz"); + ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzz"); + + // Verify snapshots get reinitialized correctly + txn1->SetSnapshot(); + s = txn1->Put("Z", "zzzz"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzzz"); + + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + const Snapshot* snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot); + + txn_options.set_snapshot = true; + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + snapshot = txn1->GetSnapshot(); + ASSERT_TRUE(snapshot); + + s = txn1->Put("Z", "a"); + ASSERT_OK(s); + + txn1->Rollback(); + + s = txn1->Put("Y", "y"); + ASSERT_OK(s); + + txn_options.set_snapshot = false; + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot); + + s = txn1->Put("X", "x"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzzz"); + + s = db->Get(read_options, "Y", &value); + ASSERT_TRUE(s.IsNotFound()); + + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->SetName("name"); + ASSERT_OK(s); + + s = txn1->Prepare(); + ASSERT_OK(s); + s = txn1->Commit(); + ASSERT_OK(s); + + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->SetName("name"); + ASSERT_OK(s); + + delete txn1; +} + +TEST_P(TransactionTest, Rollback) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + std::string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + ASSERT_OK(s); + + s = txn1->Put("X", "1"); + ASSERT_OK(s); + + Transaction* txn2 = db->BeginTransaction(write_options); + + // txn2 should not be able to write to X since txn1 has it locked + s = txn2->Put("X", "2"); + ASSERT_TRUE(s.IsTimedOut()); + + txn1->Rollback(); + delete txn1; + + // txn2 should now be able to write to X + s = txn2->Put("X", "3"); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("3", value); + + delete txn2; +} + +TEST_P(TransactionTest, LockLimitTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + TransactionOptions txn_options; + string value; + Status s; + + delete db; + db = nullptr; + + // Open DB with a lock limit of 3 + txn_db_options.max_num_locks = 3; + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + assert(db != nullptr); + ASSERT_OK(s); + + // Create a txn and verify we can only lock up to 3 keys + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); + + s = txn->Put("X", "x"); + ASSERT_OK(s); + + s = txn->Put("Y", "y"); + ASSERT_OK(s); + + s = txn->Put("Z", "z"); + ASSERT_OK(s); + + // lock limit reached + s = txn->Put("W", "w"); + ASSERT_TRUE(s.IsBusy()); + + // re-locking same key shouldn't put us over the limit + s = txn->Put("X", "xx"); + ASSERT_OK(s); + + s = txn->GetForUpdate(read_options, "W", &value); + ASSERT_TRUE(s.IsBusy()); + s = txn->GetForUpdate(read_options, "V", &value); + ASSERT_TRUE(s.IsBusy()); + + // re-locking same key shouldn't put us over the limit + s = txn->GetForUpdate(read_options, "Y", &value); + ASSERT_OK(s); + ASSERT_EQ("y", value); + + s = txn->Get(read_options, "W", &value); + ASSERT_TRUE(s.IsNotFound()); + + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + // "X" currently locked + s = txn2->Put("X", "x"); + ASSERT_TRUE(s.IsTimedOut()); + + // lock limit reached + s = txn2->Put("M", "m"); + ASSERT_TRUE(s.IsBusy()); + + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("xx", value); + + s = db->Get(read_options, "W", &value); + ASSERT_TRUE(s.IsNotFound()); + + // Committing txn should release its locks and allow txn2 to proceed + s = txn2->Put("X", "x2"); + ASSERT_OK(s); + + s = txn2->Delete("X"); + ASSERT_OK(s); + + s = txn2->Put("M", "m"); + ASSERT_OK(s); + + s = txn2->Put("Z", "z2"); + ASSERT_OK(s); + + // lock limit reached + s = txn2->Delete("Y"); + ASSERT_TRUE(s.IsBusy()); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ("z2", value); + + s = db->Get(read_options, "Y", &value); + ASSERT_OK(s); + ASSERT_EQ("y", value); + + s = db->Get(read_options, "X", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn; + delete txn2; +} + +TEST_P(TransactionTest, IteratorTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + std::string value; + Status s; + + // Write some keys to the db + s = db->Put(write_options, "A", "a"); + ASSERT_OK(s); + + s = db->Put(write_options, "G", "g"); + ASSERT_OK(s); + + s = db->Put(write_options, "F", "f"); + ASSERT_OK(s); + + s = db->Put(write_options, "C", "c"); + ASSERT_OK(s); + + s = db->Put(write_options, "D", "d"); + ASSERT_OK(s); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + // Write some keys in a txn + s = txn->Put("B", "b"); + ASSERT_OK(s); + + s = txn->Put("H", "h"); + ASSERT_OK(s); + + s = txn->Delete("D"); + ASSERT_OK(s); + + s = txn->Put("E", "e"); + ASSERT_OK(s); + + txn->SetSnapshot(); + const Snapshot* snapshot = txn->GetSnapshot(); + + // Write some keys to the db after the snapshot + s = db->Put(write_options, "BB", "xx"); + ASSERT_OK(s); + + s = db->Put(write_options, "C", "xx"); + ASSERT_OK(s); + + read_options.snapshot = snapshot; + Iterator* iter = txn->GetIterator(read_options); + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + + // Read all keys via iter and lock them all + std::string results[] = {"a", "b", "c", "e", "f", "g", "h"}; + for (int i = 0; i < 7; i++) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(results[i], iter->value().ToString()); + + s = txn->GetForUpdate(read_options, iter->key(), nullptr); + if (i == 2) { + // "C" was modified after txn's snapshot + ASSERT_TRUE(s.IsBusy()); + } else { + ASSERT_OK(s); + } + + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + + iter->Seek("G"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("g", iter->value().ToString()); + + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("f", iter->value().ToString()); + + iter->Seek("D"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("e", iter->value().ToString()); + + iter->Seek("C"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("c", iter->value().ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("e", iter->value().ToString()); + + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a", iter->value().ToString()); + + iter->Seek("X"); + ASSERT_OK(iter->status()); + ASSERT_FALSE(iter->Valid()); + + iter->SeekToLast(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("h", iter->value().ToString()); + + s = txn->Commit(); + ASSERT_OK(s); + + delete iter; + delete txn; +} + +TEST_P(TransactionTest, DisableIndexingTest) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + txn->DisableIndexing(); + + s = txn->Put("B", "b"); + ASSERT_OK(s); + + s = txn->Get(read_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + + Iterator* iter = txn->GetIterator(read_options); + ASSERT_OK(iter->status()); + + iter->Seek("B"); + ASSERT_OK(iter->status()); + ASSERT_FALSE(iter->Valid()); + + s = txn->Delete("A"); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + txn->EnableIndexing(); + + s = txn->Put("B", "bb"); + ASSERT_OK(s); + + iter->Seek("B"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bb", iter->value().ToString()); + + s = txn->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("bb", value); + + s = txn->Put("A", "aa"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("aa", value); + + delete iter; + delete txn; +} + +TEST_P(TransactionTest, SavepointTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + ASSERT_EQ(0, txn->GetNumPuts()); + + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + + txn->SetSavePoint(); // 1 + + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Put("B", "b"); + ASSERT_OK(s); + + ASSERT_EQ(1, txn->GetNumPuts()); + ASSERT_EQ(0, txn->GetNumDeletes()); + + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); + + delete txn; + txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Put("B", "bb"); + ASSERT_OK(s); + + s = txn->Put("C", "c"); + ASSERT_OK(s); + + txn->SetSavePoint(); // 2 + + s = txn->Delete("B"); + ASSERT_OK(s); + + s = txn->Put("C", "cc"); + ASSERT_OK(s); + + s = txn->Put("D", "d"); + ASSERT_OK(s); + + ASSERT_EQ(5, txn->GetNumPuts()); + ASSERT_EQ(1, txn->GetNumDeletes()); + + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2 + + ASSERT_EQ(3, txn->GetNumPuts()); + ASSERT_EQ(0, txn->GetNumDeletes()); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + s = txn->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("bb", value); + + s = txn->Get(read_options, "C", &value); + ASSERT_OK(s); + ASSERT_EQ("c", value); + + s = txn->Get(read_options, "D", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Put("E", "e"); + ASSERT_OK(s); + + ASSERT_EQ(5, txn->GetNumPuts()); + ASSERT_EQ(0, txn->GetNumDeletes()); + + // Rollback to beginning of txn + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + txn->Rollback(); + + ASSERT_EQ(0, txn->GetNumPuts()); + ASSERT_EQ(0, txn->GetNumDeletes()); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); + + s = txn->Get(read_options, "D", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Get(read_options, "D", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Get(read_options, "E", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Put("A", "aa"); + ASSERT_OK(s); + + s = txn->Put("F", "f"); + ASSERT_OK(s); + + ASSERT_EQ(2, txn->GetNumPuts()); + ASSERT_EQ(0, txn->GetNumDeletes()); + + txn->SetSavePoint(); // 3 + txn->SetSavePoint(); // 4 + + s = txn->Put("G", "g"); + ASSERT_OK(s); + + s = txn->SingleDelete("F"); + ASSERT_OK(s); + + s = txn->Delete("B"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("aa", value); + + s = txn->Get(read_options, "F", &value); + // According to db.h, doing a SingleDelete on a key that has been + // overwritten will have undefinied behavior. So it is unclear what the + // result of fetching "F" should be. The current implementation will + // return NotFound in this case. + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Get(read_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + + ASSERT_EQ(3, txn->GetNumPuts()); + ASSERT_EQ(2, txn->GetNumDeletes()); + + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3 + + ASSERT_EQ(2, txn->GetNumPuts()); + ASSERT_EQ(0, txn->GetNumDeletes()); + + s = txn->Get(read_options, "F", &value); + ASSERT_OK(s); + ASSERT_EQ("f", value); + + s = txn->Get(read_options, "G", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "F", &value); + ASSERT_OK(s); + ASSERT_EQ("f", value); + + s = db->Get(read_options, "G", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("aa", value); + + s = db->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); + + s = db->Get(read_options, "C", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(read_options, "D", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(read_options, "E", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn; +} + +TEST_P(TransactionTest, SavepointTest2) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + Status s; + + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + + s = txn1->Put("A", ""); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 1 + + s = txn1->Put("A", "a"); + ASSERT_OK(s); + + s = txn1->Put("C", "c"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 2 + + s = txn1->Put("A", "a"); + ASSERT_OK(s); + s = txn1->Put("B", "b"); + ASSERT_OK(s); + + ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2 + + // Verify that "A" and "C" is still locked while "B" is not + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + s = txn2->Put("A", "a2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b2"); + ASSERT_OK(s); + + s = txn1->Put("A", "aa"); + ASSERT_OK(s); + s = txn1->Put("B", "bb"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn1->Put("A", "aaa"); + ASSERT_OK(s); + s = txn1->Put("B", "bbb"); + ASSERT_OK(s); + s = txn1->Put("C", "ccc"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 3 + ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3 + + // Verify that "A", "B", "C" are still locked + txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + s = txn2->Put("A", "a2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c2"); + ASSERT_TRUE(s.IsTimedOut()); + + ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1 + + // Verify that only "A" is locked + s = txn2->Put("A", "a3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b3"); + ASSERT_OK(s); + s = txn2->Put("C", "c3po"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; + + // Verify "A" "C" "B" are no longer locked + s = txn2->Put("A", "a4"); + ASSERT_OK(s); + s = txn2->Put("B", "b4"); + ASSERT_OK(s); + s = txn2->Put("C", "c4"); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; +} + +TEST_P(TransactionTest, SavepointTest3) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + Status s; + + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + + s = txn1->PopSavePoint(); // No SavePoint present + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->Put("A", ""); + ASSERT_OK(s); + + s = txn1->PopSavePoint(); // Still no SavePoint present + ASSERT_TRUE(s.IsNotFound()); + + txn1->SetSavePoint(); // 1 + + s = txn1->Put("A", "a"); + ASSERT_OK(s); + + s = txn1->PopSavePoint(); // Remove 1 + ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound()); + + // Verify that "A" is still locked + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + s = txn2->Put("A", "a2"); + ASSERT_TRUE(s.IsTimedOut()); + delete txn2; + + txn1->SetSavePoint(); // 2 + + s = txn1->Put("B", "b"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 3 + + s = txn1->Put("B", "b2"); + ASSERT_OK(s); + + ASSERT_OK(txn1->RollbackToSavePoint()); // Roll back to 2 + + s = txn1->PopSavePoint(); + ASSERT_OK(s); + + s = txn1->PopSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; + + std::string value; + + // tnx1 should have modified "A" to "a" + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + // tnx1 should have set "B" to just "b" + s = db->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); + + s = db->Get(read_options, "C", &value); + ASSERT_TRUE(s.IsNotFound()); +} + +TEST_P(TransactionTest, UndoGetForUpdateTest) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + std::string value; + Status s; + + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + + txn1->UndoGetForUpdate("A"); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; + + txn1 = db->BeginTransaction(write_options, txn_options); + + txn1->UndoGetForUpdate("A"); + s = txn1->GetForUpdate(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + // Verify that A is locked + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + s = txn2->Put("A", "a"); + ASSERT_TRUE(s.IsTimedOut()); + + txn1->UndoGetForUpdate("A"); + + // Verify that A is now unlocked + s = txn2->Put("A", "a2"); + ASSERT_OK(s); + txn2->Commit(); + delete txn2; + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a2", value); + + s = txn1->Delete("A"); + ASSERT_OK(s); + s = txn1->GetForUpdate(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->Put("B", "b3"); + ASSERT_OK(s); + s = txn1->GetForUpdate(read_options, "B", &value); + ASSERT_OK(s); + + txn1->UndoGetForUpdate("A"); + txn1->UndoGetForUpdate("B"); + + // Verify that A and B are still locked + txn2 = db->BeginTransaction(write_options, txn_options); + s = txn2->Put("A", "a4"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b4"); + ASSERT_TRUE(s.IsTimedOut()); + + txn1->Rollback(); + delete txn1; + + // Verify that A and B are no longer locked + s = txn2->Put("A", "a5"); + ASSERT_OK(s); + s = txn2->Put("B", "b5"); + ASSERT_OK(s); + s = txn2->Commit(); + delete txn2; + ASSERT_OK(s); + + txn1 = db->BeginTransaction(write_options, txn_options); + + s = txn1->GetForUpdate(read_options, "A", &value); + ASSERT_OK(s); + s = txn1->GetForUpdate(read_options, "A", &value); + ASSERT_OK(s); + s = txn1->GetForUpdate(read_options, "C", &value); + ASSERT_TRUE(s.IsNotFound()); + s = txn1->GetForUpdate(read_options, "A", &value); + ASSERT_OK(s); + s = txn1->GetForUpdate(read_options, "C", &value); + ASSERT_TRUE(s.IsNotFound()); + s = txn1->GetForUpdate(read_options, "B", &value); + ASSERT_OK(s); + s = txn1->Put("B", "b5"); + s = txn1->GetForUpdate(read_options, "B", &value); + ASSERT_OK(s); + + txn1->UndoGetForUpdate("A"); + txn1->UndoGetForUpdate("B"); + txn1->UndoGetForUpdate("C"); + txn1->UndoGetForUpdate("X"); + + // Verify A,B,C are locked + txn2 = db->BeginTransaction(write_options, txn_options); + s = txn2->Put("A", "a6"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Delete("B"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c6"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("X", "x6"); + ASSERT_OK(s); + + txn1->UndoGetForUpdate("A"); + txn1->UndoGetForUpdate("B"); + txn1->UndoGetForUpdate("C"); + txn1->UndoGetForUpdate("X"); + + // Verify A,B are locked and C is not + s = txn2->Put("A", "a6"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Delete("B"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c6"); + ASSERT_OK(s); + s = txn2->Put("X", "x6"); + ASSERT_OK(s); + + txn1->UndoGetForUpdate("A"); + txn1->UndoGetForUpdate("B"); + txn1->UndoGetForUpdate("C"); + txn1->UndoGetForUpdate("X"); + + // Verify B is locked and A and C are not + s = txn2->Put("A", "a7"); + ASSERT_OK(s); + s = txn2->Delete("B"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c7"); + ASSERT_OK(s); + s = txn2->Put("X", "x7"); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; +} + +TEST_P(TransactionTest, UndoGetForUpdateTest2) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + std::string value; + Status s; + + s = db->Put(write_options, "A", ""); + ASSERT_OK(s); + + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + + s = txn1->GetForUpdate(read_options, "A", &value); + ASSERT_OK(s); + s = txn1->GetForUpdate(read_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->Put("F", "f"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 1 + + txn1->UndoGetForUpdate("A"); + + s = txn1->GetForUpdate(read_options, "C", &value); + ASSERT_TRUE(s.IsNotFound()); + s = txn1->GetForUpdate(read_options, "D", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->Put("E", "e"); + ASSERT_OK(s); + s = txn1->GetForUpdate(read_options, "E", &value); + ASSERT_OK(s); + + s = txn1->GetForUpdate(read_options, "F", &value); + ASSERT_OK(s); + + // Verify A,B,C,D,E,F are still locked + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + s = txn2->Put("A", "a1"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b1"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c1"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("D", "d1"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("E", "e1"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("F", "f1"); + ASSERT_TRUE(s.IsTimedOut()); + + txn1->UndoGetForUpdate("C"); + txn1->UndoGetForUpdate("E"); + + // Verify A,B,D,E,F are still locked and C is not. + s = txn2->Put("A", "a2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("D", "d2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("E", "e2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("F", "f2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c2"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 2 + + s = txn1->Put("H", "h"); + ASSERT_OK(s); + + txn1->UndoGetForUpdate("A"); + txn1->UndoGetForUpdate("B"); + txn1->UndoGetForUpdate("C"); + txn1->UndoGetForUpdate("D"); + txn1->UndoGetForUpdate("E"); + txn1->UndoGetForUpdate("F"); + txn1->UndoGetForUpdate("G"); + txn1->UndoGetForUpdate("H"); + + // Verify A,B,D,E,F,H are still locked and C,G are not. + s = txn2->Put("A", "a3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("D", "d3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("E", "e3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("F", "f3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("H", "h3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c3"); + ASSERT_OK(s); + s = txn2->Put("G", "g3"); + ASSERT_OK(s); + + txn1->RollbackToSavePoint(); // rollback to 2 + + // Verify A,B,D,E,F are still locked and C,G,H are not. + s = txn2->Put("A", "a3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("D", "d3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("E", "e3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("F", "f3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c3"); + ASSERT_OK(s); + s = txn2->Put("G", "g3"); + ASSERT_OK(s); + s = txn2->Put("H", "h3"); + ASSERT_OK(s); + + txn1->UndoGetForUpdate("A"); + txn1->UndoGetForUpdate("B"); + txn1->UndoGetForUpdate("C"); + txn1->UndoGetForUpdate("D"); + txn1->UndoGetForUpdate("E"); + txn1->UndoGetForUpdate("F"); + txn1->UndoGetForUpdate("G"); + txn1->UndoGetForUpdate("H"); + + // Verify A,B,E,F are still locked and C,D,G,H are not. + s = txn2->Put("A", "a3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("E", "e3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("F", "f3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c3"); + ASSERT_OK(s); + s = txn2->Put("D", "d3"); + ASSERT_OK(s); + s = txn2->Put("G", "g3"); + ASSERT_OK(s); + s = txn2->Put("H", "h3"); + ASSERT_OK(s); + + txn1->RollbackToSavePoint(); // rollback to 1 + + // Verify A,B,F are still locked and C,D,E,G,H are not. + s = txn2->Put("A", "a3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("B", "b3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("F", "f3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("C", "c3"); + ASSERT_OK(s); + s = txn2->Put("D", "d3"); + ASSERT_OK(s); + s = txn2->Put("E", "e3"); + ASSERT_OK(s); + s = txn2->Put("G", "g3"); + ASSERT_OK(s); + s = txn2->Put("H", "h3"); + ASSERT_OK(s); + + txn1->UndoGetForUpdate("A"); + txn1->UndoGetForUpdate("B"); + txn1->UndoGetForUpdate("C"); + txn1->UndoGetForUpdate("D"); + txn1->UndoGetForUpdate("E"); + txn1->UndoGetForUpdate("F"); + txn1->UndoGetForUpdate("G"); + txn1->UndoGetForUpdate("H"); + + // Verify F is still locked and A,B,C,D,E,G,H are not. + s = txn2->Put("F", "f3"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Put("A", "a3"); + ASSERT_OK(s); + s = txn2->Put("B", "b3"); + ASSERT_OK(s); + s = txn2->Put("C", "c3"); + ASSERT_OK(s); + s = txn2->Put("D", "d3"); + ASSERT_OK(s); + s = txn2->Put("E", "e3"); + ASSERT_OK(s); + s = txn2->Put("G", "g3"); + ASSERT_OK(s); + s = txn2->Put("H", "h3"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); + + delete txn1; + delete txn2; +} + +TEST_P(TransactionTest, TimeoutTest) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + delete db; + db = nullptr; + + // transaction writes have an infinite timeout, + // but we will override this when we start a txn + // db writes have infinite timeout + txn_db_options.transaction_lock_timeout = -1; + txn_db_options.default_lock_timeout = -1; + + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + assert(db != nullptr); + ASSERT_OK(s); + + s = db->Put(write_options, "aaa", "aaa"); + ASSERT_OK(s); + + TransactionOptions txn_options0; + txn_options0.expiration = 100; // 100ms + txn_options0.lock_timeout = 50; // txn timeout no longer infinite + Transaction* txn1 = db->BeginTransaction(write_options, txn_options0); + + s = txn1->GetForUpdate(read_options, "aaa", nullptr); + ASSERT_OK(s); + + // Conflicts with previous GetForUpdate. + // Since db writes do not have a timeout, this should eventually succeed when + // the transaction expires. + s = db->Put(write_options, "aaa", "xxx"); + ASSERT_OK(s); + + ASSERT_GE(txn1->GetElapsedTime(), + static_cast<uint64_t>(txn_options0.expiration)); + + s = txn1->Commit(); + ASSERT_TRUE(s.IsExpired()); // expired! + + s = db->Get(read_options, "aaa", &value); + ASSERT_OK(s); + ASSERT_EQ("xxx", value); + + delete txn1; + delete db; + + // transaction writes have 10ms timeout, + // db writes have infinite timeout + txn_db_options.transaction_lock_timeout = 50; + txn_db_options.default_lock_timeout = -1; + + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + ASSERT_OK(s); + + s = db->Put(write_options, "aaa", "aaa"); + ASSERT_OK(s); + + TransactionOptions txn_options; + txn_options.expiration = 100; // 100ms + txn1 = db->BeginTransaction(write_options, txn_options); + + s = txn1->GetForUpdate(read_options, "aaa", nullptr); + ASSERT_OK(s); + + // Conflicts with previous GetForUpdate. + // Since db writes do not have a timeout, this should eventually succeed when + // the transaction expires. + s = db->Put(write_options, "aaa", "xxx"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_NOK(s); // expired! + + s = db->Get(read_options, "aaa", &value); + ASSERT_OK(s); + ASSERT_EQ("xxx", value); + + delete txn1; + txn_options.expiration = 6000000; // 100 minutes + txn_options.lock_timeout = 1; // 1ms + txn1 = db->BeginTransaction(write_options, txn_options); + txn1->SetLockTimeout(100); + + TransactionOptions txn_options2; + txn_options2.expiration = 10; // 10ms + Transaction* txn2 = db->BeginTransaction(write_options, txn_options2); + ASSERT_OK(s); + + s = txn2->Put("a", "2"); + ASSERT_OK(s); + + // txn1 has a lock timeout longer than txn2's expiration, so it will win + s = txn1->Delete("a"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + // txn2 should be expired out since txn1 waiting until its timeout expired. + s = txn2->Commit(); + ASSERT_TRUE(s.IsExpired()); + + delete txn1; + delete txn2; + txn_options.expiration = 6000000; // 100 minutes + txn1 = db->BeginTransaction(write_options, txn_options); + txn_options2.expiration = 100000000; + txn2 = db->BeginTransaction(write_options, txn_options2); + + s = txn1->Delete("asdf"); + ASSERT_OK(s); + + // txn2 has a smaller lock timeout than txn1's expiration, so it will time out + s = txn2->Delete("asdf"); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = txn2->Put("asdf", "asdf"); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "asdf", &value); + ASSERT_OK(s); + ASSERT_EQ("asdf", value); + + delete txn1; + delete txn2; +} + +TEST_P(TransactionTest, SingleDeleteTest) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = txn->SingleDelete("A"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + txn = db->BeginTransaction(write_options); + + s = txn->SingleDelete("A"); + ASSERT_OK(s); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + txn = db->BeginTransaction(write_options); + + s = txn->SingleDelete("A"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + s = db->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + txn = db->BeginTransaction(write_options); + Transaction* txn2 = db->BeginTransaction(write_options); + txn2->SetSnapshot(); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Put("A", "a2"); + ASSERT_OK(s); + + s = txn->SingleDelete("A"); + ASSERT_OK(s); + + s = txn->SingleDelete("B"); + ASSERT_OK(s); + + // According to db.h, doing a SingleDelete on a key that has been + // overwritten will have undefinied behavior. So it is unclear what the + // result of fetching "A" should be. The current implementation will + // return NotFound in this case. + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn2->Put("B", "b"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + // According to db.h, doing a SingleDelete on a key that has been + // overwritten will have undefinied behavior. So it is unclear what the + // result of fetching "A" should be. The current implementation will + // return NotFound in this case. + s = db->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(read_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); +} + +TEST_P(TransactionTest, MergeTest) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn); + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + s = txn->Merge("A", "1"); + ASSERT_OK(s); + + s = txn->Merge("A", "2"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + s = txn->Merge("A", "3"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + TransactionOptions txn_options; + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + // verify that txn has "A" locked + s = txn2->Merge("A", "4"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a,3", value); +} + +TEST_P(TransactionTest, DeferSnapshotTest) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options); + Transaction* txn2 = db->BeginTransaction(write_options); + + txn1->SetSnapshotOnNextOperation(); + auto snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot); + + s = txn2->Put("A", "a2"); + ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn1->GetForUpdate(read_options, "A", &value); + // Should not conflict with txn2 since snapshot wasn't set until + // GetForUpdate was called. + ASSERT_OK(s); + ASSERT_EQ("a2", value); + + s = txn1->Put("A", "a1"); + ASSERT_OK(s); + + s = db->Put(write_options, "B", "b0"); + ASSERT_OK(s); + + // Cannot lock B since it was written after the snapshot was set + s = txn1->Put("B", "b1"); + ASSERT_TRUE(s.IsBusy()); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; + + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a1", value); + + s = db->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b0", value); +} + +TEST_P(TransactionTest, DeferSnapshotTest2) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + std::string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(write_options); + + txn1->SetSnapshot(); + + s = txn1->Put("A", "a1"); + ASSERT_OK(s); + + s = db->Put(write_options, "C", "c0"); + ASSERT_OK(s); + s = db->Put(write_options, "D", "d0"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + + txn1->SetSnapshotOnNextOperation(); + + s = txn1->Get(snapshot_read_options, "C", &value); + // Snapshot was set before C was written + ASSERT_TRUE(s.IsNotFound()); + s = txn1->Get(snapshot_read_options, "D", &value); + // Snapshot was set before D was written + ASSERT_TRUE(s.IsNotFound()); + + // Snapshot should not have changed yet. + snapshot_read_options.snapshot = txn1->GetSnapshot(); + + s = txn1->Get(snapshot_read_options, "C", &value); + // Snapshot was set before C was written + ASSERT_TRUE(s.IsNotFound()); + s = txn1->Get(snapshot_read_options, "D", &value); + // Snapshot was set before D was written + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->GetForUpdate(read_options, "C", &value); + ASSERT_OK(s); + ASSERT_EQ("c0", value); + + s = db->Put(write_options, "D", "d00"); + ASSERT_OK(s); + + // Snapshot is now set + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "D", &value); + ASSERT_OK(s); + ASSERT_EQ("d0", value); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; +} + +TEST_P(TransactionTest, DeferSnapshotSavePointTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + std::string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(write_options); + + txn1->SetSavePoint(); // 1 + + s = db->Put(write_options, "T", "1"); + ASSERT_OK(s); + + txn1->SetSnapshotOnNextOperation(); + + s = db->Put(write_options, "T", "2"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 2 + + s = db->Put(write_options, "T", "3"); + ASSERT_OK(s); + + s = txn1->Put("A", "a"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 3 + + s = db->Put(write_options, "T", "4"); + ASSERT_OK(s); + + txn1->SetSnapshot(); + txn1->SetSnapshotOnNextOperation(); + + txn1->SetSavePoint(); // 4 + + s = db->Put(write_options, "T", "5"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("4", value); + + s = txn1->Put("A", "a1"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 4 + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("4", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 3 + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("3", value); + + s = txn1->Get(read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 2 + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot_read_options.snapshot); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->Delete("A"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + ASSERT_TRUE(snapshot_read_options.snapshot); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 1 + ASSERT_OK(s); + + s = txn1->Delete("A"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot_read_options.snapshot); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; +} + +TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + + class Notifier : public TransactionNotifier { + private: + const Snapshot** snapshot_ptr_; + + public: + explicit Notifier(const Snapshot** snapshot_ptr) + : snapshot_ptr_(snapshot_ptr) {} + + void SnapshotCreated(const Snapshot* newSnapshot) override { + *snapshot_ptr_ = newSnapshot; + } + }; + + std::shared_ptr<Notifier> notifier = + std::make_shared<Notifier>(&read_options.snapshot); + Status s; + + s = db->Put(write_options, "B", "0"); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options); + + txn1->SetSnapshotOnNextOperation(notifier); + ASSERT_FALSE(read_options.snapshot); + + s = db->Put(write_options, "B", "1"); + ASSERT_OK(s); + + // A Get does not generate the snapshot + s = txn1->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_FALSE(read_options.snapshot); + ASSERT_EQ(value, "1"); + + // Any other operation does + s = txn1->Put("A", "0"); + ASSERT_OK(s); + + // Now change "B". + s = db->Put(write_options, "B", "2"); + ASSERT_OK(s); + + // The original value should still be read + s = txn1->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_TRUE(read_options.snapshot); + ASSERT_EQ(value, "1"); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; +} + +TEST_P(TransactionTest, ClearSnapshotTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + std::string value; + Status s; + + s = db->Put(write_options, "foo", "0"); + ASSERT_OK(s); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = db->Put(write_options, "foo", "1"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn->GetSnapshot(); + ASSERT_FALSE(snapshot_read_options.snapshot); + + // No snapshot created yet + s = txn->Get(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "1"); + + txn->SetSnapshot(); + snapshot_read_options.snapshot = txn->GetSnapshot(); + ASSERT_TRUE(snapshot_read_options.snapshot); + + s = db->Put(write_options, "foo", "2"); + ASSERT_OK(s); + + // Snapshot was created before change to '2' + s = txn->Get(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "1"); + + txn->ClearSnapshot(); + snapshot_read_options.snapshot = txn->GetSnapshot(); + ASSERT_FALSE(snapshot_read_options.snapshot); + + // Snapshot has now been cleared + s = txn->Get(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "2"); + + s = txn->Commit(); + ASSERT_OK(s); + + delete txn; +} + +TEST_P(TransactionTest, ToggleAutoCompactionTest) { + Status s; + + ColumnFamilyHandle *cfa, *cfb; + ColumnFamilyOptions cf_options; + + // Create 2 new column families + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + ASSERT_OK(s); + + delete cfa; + delete cfb; + delete db; + + // open DB with three column families + std::vector<ColumnFamilyDescriptor> column_families; + // have to open default column family + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + // open the new column families + column_families.push_back( + ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); + + ColumnFamilyOptions* cf_opt_default = &column_families[0].options; + ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options; + ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options; + cf_opt_default->disable_auto_compactions = false; + cf_opt_cfa->disable_auto_compactions = true; + cf_opt_cfb->disable_auto_compactions = false; + + std::vector<ColumnFamilyHandle*> handles; + + s = TransactionDB::Open(options, txn_db_options, dbname, column_families, + &handles, &db); + ASSERT_OK(s); + + auto cfh_default = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[0]); + auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions(); + + auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[1]); + auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions(); + + auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[2]); + auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions(); + + ASSERT_EQ(opt_default.disable_auto_compactions, false); + ASSERT_EQ(opt_a.disable_auto_compactions, true); + ASSERT_EQ(opt_b.disable_auto_compactions, false); + + for (auto handle : handles) { + delete handle; + } +} + +TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) { + // In this test, txn1 should succeed committing, + // as the callback is called after txn1 starts committing. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"TransactionTest::ExpirableTransactionDataRace:1"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionTest::ExpirableTransactionDataRace:1", [&](void* /*arg*/) { + WriteOptions write_options; + TransactionOptions txn_options; + + // Force txn1 to expire + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + Status s; + s = txn2->Put("X", "2"); + ASSERT_TRUE(s.IsTimedOut()); + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions write_options; + TransactionOptions txn_options; + + txn_options.expiration = 100; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + Status s; + s = txn1->Put("X", "1"); + ASSERT_OK(s); + s = txn1->Commit(); + ASSERT_OK(s); + + ReadOptions read_options; + string value; + s = db->Get(read_options, "X", &value); + ASSERT_EQ("1", value); + + delete txn1; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +#ifndef ROCKSDB_VALGRIND_RUN +namespace { +// cmt_delay_ms is the delay between prepare and commit +// first_id is the id of the first transaction +Status TransactionStressTestInserter( + TransactionDB* db, const size_t num_transactions, const size_t num_sets, + const size_t num_keys_per_set, Random64* rand, + const uint64_t cmt_delay_ms = 0, const uint64_t first_id = 0) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + // Inside the inserter we might also retake the snapshot. We do both since two + // separte functions are engaged for each. + txn_options.set_snapshot = rand->OneIn(2); + + RandomTransactionInserter inserter( + rand, write_options, read_options, num_keys_per_set, + static_cast<uint16_t>(num_sets), cmt_delay_ms, first_id); + + for (size_t t = 0; t < num_transactions; t++) { + bool success = inserter.TransactionDBInsert(db, txn_options); + if (!success) { + // unexpected failure + return inserter.GetLastStatus(); + } + } + + // Make sure at least some of the transactions succeeded. It's ok if + // some failed due to write-conflicts. + if (num_transactions != 1 && + inserter.GetFailureCount() > num_transactions / 2) { + return Status::TryAgain("Too many transactions failed! " + + std::to_string(inserter.GetFailureCount()) + " / " + + std::to_string(num_transactions)); + } + + return Status::OK(); +} +} // namespace + +// Worker threads add a number to a key from each set of keys. The checker +// threads verify that the sum of all keys in each set are equal. +TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { + // Small write buffer to trigger more compactions + options.write_buffer_size = 1024; + ReOpenNoDelete(); + const size_t num_workers = 4; // worker threads count + const size_t num_checkers = 2; // checker threads count + const size_t num_slow_checkers = 2; // checker threads emulating backups + const size_t num_slow_workers = 1; // slow worker threads count + const size_t num_transactions_per_thread = 10000; + const uint16_t num_sets = 3; + const size_t num_keys_per_set = 100; + // Setting the key-space to be 100 keys should cause enough write-conflicts + // to make this test interesting. + + std::vector<port::Thread> threads; + std::atomic<uint32_t> finished = {0}; + bool TAKE_SNAPSHOT = true; + uint64_t time_seed = env->NowMicros(); + printf("time_seed is %" PRIu64 "\n", time_seed); // would help to reproduce + + std::function<void()> call_inserter = [&] { + size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id()); + Random64 rand(time_seed * thd_seed); + ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread, + num_sets, num_keys_per_set, &rand)); + finished++; + }; + std::function<void()> call_checker = [&] { + size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id()); + Random64 rand(time_seed * thd_seed); + // Verify that data is consistent + while (finished < num_workers) { + Status s = RandomTransactionInserter::Verify( + db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand); + ASSERT_OK(s); + } + }; + std::function<void()> call_slow_checker = [&] { + size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id()); + Random64 rand(time_seed * thd_seed); + // Verify that data is consistent + while (finished < num_workers) { + uint64_t delay_ms = rand.Uniform(100) + 1; + Status s = RandomTransactionInserter::Verify( + db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms); + ASSERT_OK(s); + } + }; + std::function<void()> call_slow_inserter = [&] { + size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id()); + Random64 rand(time_seed * thd_seed); + uint64_t id = 0; + // Verify that data is consistent + while (finished < num_workers) { + uint64_t delay_ms = rand.Uniform(500) + 1; + ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set, + &rand, delay_ms, id++)); + } + }; + + for (uint32_t i = 0; i < num_workers; i++) { + threads.emplace_back(call_inserter); + } + for (uint32_t i = 0; i < num_checkers; i++) { + threads.emplace_back(call_checker); + } + if (with_slow_threads_) { + for (uint32_t i = 0; i < num_slow_checkers; i++) { + threads.emplace_back(call_slow_checker); + } + for (uint32_t i = 0; i < num_slow_workers; i++) { + threads.emplace_back(call_slow_inserter); + } + } + + // Wait for all threads to finish + for (auto& t : threads) { + t.join(); + } + + // Verify that data is consistent + Status s = RandomTransactionInserter::Verify(db, num_sets, num_keys_per_set, + !TAKE_SNAPSHOT); + ASSERT_OK(s); +} +#endif // ROCKSDB_VALGRIND_RUN + +TEST_P(TransactionTest, MemoryLimitTest) { + TransactionOptions txn_options; + // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data. + txn_options.max_write_batch_size = 29; + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options); + ASSERT_TRUE(txn); + + ASSERT_EQ(0, txn->GetNumPuts()); + ASSERT_LE(0, txn->GetID()); + + s = txn->Put(Slice("a"), Slice("....")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + s = txn->Put(Slice("b"), Slice("....")); + ASSERT_OK(s); + ASSERT_EQ(2, txn->GetNumPuts()); + + s = txn->Put(Slice("b"), Slice("....")); + auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db); + // For write unprepared, write batches exceeding max_write_batch_size will + // just flush to DB instead of returning a memory limit error. + if (pdb->GetTxnDBOptions().write_policy != WRITE_UNPREPARED) { + ASSERT_TRUE(s.IsMemoryLimit()); + ASSERT_EQ(2, txn->GetNumPuts()); + } else { + ASSERT_OK(s); + ASSERT_EQ(3, txn->GetNumPuts()); + } + + txn->Rollback(); + delete txn; +} + +// This test clarifies the existing expectation from the sequence number +// algorithm. It could detect mistakes in updating the code but it is not +// necessarily the one acceptable way. If the algorithm is legitimately changed, +// this unit test should be updated as well. +TEST_P(TransactionStressTest, SeqAdvanceTest) { + // TODO(myabandeh): must be test with false before new releases + const bool short_test = true; + WriteOptions wopts; + FlushOptions fopt; + + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some + // of the branches. This is the same as counting a binary number where i-th + // bit represents whether we take branch i in the represented by the number. + const size_t NUM_BRANCHES = short_test ? 6 : 10; + // Helper function that shows if the branch is to be taken in the run + // represented by the number n. + auto branch_do = [&](size_t n, size_t* branch) { + assert(*branch < NUM_BRANCHES); + const size_t filter = static_cast<size_t>(1) << *branch; + return n & filter; + }; + const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES; + for (size_t n = 0; n < max_n; n++) { + DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + size_t branch = 0; + auto seq = db_impl->GetLatestSequenceNumber(); + exp_seq = seq; + txn_t0(0); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + ASSERT_OK(db_impl->Flush(fopt)); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + } + if (!short_test && branch_do(n, &branch)) { + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); + db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + // Doing it twice might detect some bugs + txn_t0(1); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + + txn_t1(0); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + ASSERT_OK(db_impl->Flush(fopt)); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + } + if (!short_test && branch_do(n, &branch)) { + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); + db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + txn_t3(0); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + ASSERT_OK(db_impl->Flush(fopt)); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + } + if (!short_test && branch_do(n, &branch)) { + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); + db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + txn_t4(0); + seq = db_impl->TEST_GetLastVisibleSequence(); + + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + ASSERT_OK(db_impl->Flush(fopt)); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + } + if (!short_test && branch_do(n, &branch)) { + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); + db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + txn_t2(0); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + ASSERT_OK(db_impl->Flush(fopt)); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + } + if (!short_test && branch_do(n, &branch)) { + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); + db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + ASSERT_OK(ReOpen()); + } +} + +// Verify that the optimization would not compromize the correctness +TEST_P(TransactionTest, Optimizations) { + size_t comb_cnt = size_t(1) << 2; // 2 is number of optimization vars + for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) { + TransactionDBWriteOptimizations optimizations; + optimizations.skip_concurrency_control = IsInCombination(0, new_comb); + optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb); + + ASSERT_OK(ReOpen()); + WriteOptions write_options; + WriteBatch batch; + batch.Put(Slice("k"), Slice("v1")); + ASSERT_OK(db->Write(write_options, &batch)); + + ReadOptions ropt; + PinnableSlice pinnable_val; + ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val)); + ASSERT_TRUE(pinnable_val == ("v1")); + } +} + +// A comparator that uses only the first three bytes +class ThreeBytewiseComparator : public Comparator { + public: + ThreeBytewiseComparator() {} + const char* Name() const override { return "test.ThreeBytewiseComparator"; } + int Compare(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na.compare(nb); + } + bool Equal(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na == nb; + } + // This methods below dont seem relevant to this test. Implement them if + // proven othersize. + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortestSeparator(start, limit); + } + void FindShortSuccessor(std::string* key) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortSuccessor(key); + } +}; + +// Test that the transactional db can handle duplicate keys in the write batch +TEST_P(TransactionTest, DuplicateKeys) { + ColumnFamilyOptions cf_options; + std::string cf_name = "two"; + ColumnFamilyHandle* cf_handle = nullptr; + { + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + WriteOptions write_options; + WriteBatch batch; + batch.Put(Slice("key"), Slice("value")); + batch.Put(Slice("key2"), Slice("value2")); + // duplicate the keys + batch.Put(Slice("key"), Slice("value3")); + // duplicate the 2nd key. It should not be counted duplicate since a + // sub-patch is cut after the last duplicate. + batch.Put(Slice("key2"), Slice("value4")); + // duplicate the keys but in a different cf. It should not be counted as + // duplicate keys + batch.Put(cf_handle, Slice("key"), Slice("value5")); + + ASSERT_OK(db->Write(write_options, &batch)); + + ReadOptions ropt; + PinnableSlice pinnable_val; + auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("value3")); + s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("value4")); + s = db->Get(ropt, cf_handle, "key", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("value5")); + + delete cf_handle; + } + + // Test with non-bytewise comparator + { + ASSERT_OK(ReOpen()); + std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator()); + cf_options.comparator = comp_gc.get(); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + WriteOptions write_options; + WriteBatch batch; + batch.Put(cf_handle, Slice("key"), Slice("value")); + // The first three bytes are the same, do it must be counted as duplicate + batch.Put(cf_handle, Slice("key2"), Slice("value2")); + // check for 2nd duplicate key in cf with non-default comparator + batch.Put(cf_handle, Slice("key2b"), Slice("value2b")); + ASSERT_OK(db->Write(write_options, &batch)); + + // The value must be the most recent value for all the keys equal to "key", + // including "key2" + ReadOptions ropt; + PinnableSlice pinnable_val; + ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val)); + ASSERT_TRUE(pinnable_val == ("value2b")); + + // Test duplicate keys with rollback + TransactionOptions txn_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3"))); + ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4"))); + ASSERT_OK(txn0->Rollback()); + ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val)); + ASSERT_TRUE(pinnable_val == ("value2b")); + delete txn0; + + delete cf_handle; + cf_options.comparator = BytewiseComparator(); + } + + for (bool do_prepare : {true, false}) { + for (bool do_rollback : {true, false}) { + for (bool with_commit_batch : {true, false}) { + if (with_commit_batch && !do_prepare) { + continue; + } + if (with_commit_batch && do_rollback) { + continue; + } + ASSERT_OK(ReOpen()); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = false; + WriteOptions write_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto s = txn0->SetName("xid"); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0a")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0b")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo1"), Slice("bar1")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo2"), Slice("bar2a")); + ASSERT_OK(s); + // Repeat a key after the start of a sub-patch. This should not cause a + // duplicate in the most recent sub-patch and hence not creating a new + // sub-patch. + s = txn0->Put(Slice("foo0"), Slice("bar0c")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo2"), Slice("bar2b")); + ASSERT_OK(s); + // duplicate the keys but in a different cf. It should not be counted as + // duplicate. + s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo4"), Slice("bar4")); + ASSERT_OK(s); + s = txn0->Delete(Slice("foo4")); + ASSERT_OK(s); + s = txn0->SingleDelete(Slice("foo4")); + ASSERT_OK(s); + if (do_prepare) { + s = txn0->Prepare(); + ASSERT_OK(s); + } + if (do_rollback) { + // Test rolling back the batch with duplicates + s = txn0->Rollback(); + ASSERT_OK(s); + } else { + if (with_commit_batch) { + assert(do_prepare); + auto cb = txn0->GetCommitTimeWriteBatch(); + // duplicate a key in the original batch + // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch + // conflicting with the prepared batch is currently undefined and + // gives different results in different implementations. + + // s = cb->Put(Slice("foo0"), Slice("bar0d")); + // ASSERT_OK(s); + // add a new duplicate key + s = cb->Put(Slice("foo6"), Slice("bar6a")); + ASSERT_OK(s); + s = cb->Put(Slice("foo6"), Slice("bar6b")); + ASSERT_OK(s); + // add a duplicate key that is removed in the same batch + s = cb->Put(Slice("foo7"), Slice("bar7a")); + ASSERT_OK(s); + s = cb->Delete(Slice("foo7")); + ASSERT_OK(s); + } + s = txn0->Commit(); + ASSERT_OK(s); + } + delete txn0; + ReadOptions ropt; + PinnableSlice pinnable_val; + + if (do_rollback) { + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, cf_handle, "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + } else { + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0c")); + s = db->Get(ropt, cf_handle, "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0-cf1")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar2a,bar2b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar3,bar3")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + if (with_commit_batch) { + s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar6b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + } + } + delete cf_handle; + } // with_commit_batch + } // do_rollback + } // do_prepare + + { + // Also test with max_successive_merges > 0. max_successive_merges will not + // affect our algorithm for duplicate key insertion but we add the test to + // verify that. + cf_options.max_successive_merges = 2; + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); + ASSERT_OK(ReOpen()); + db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + WriteOptions write_options; + // Ensure one value for the key + ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value"))); + WriteBatch batch; + // Merge more than max_successive_merges times + batch.Merge(cf_handle, Slice("key"), Slice("1")); + batch.Merge(cf_handle, Slice("key"), Slice("2")); + batch.Merge(cf_handle, Slice("key"), Slice("3")); + batch.Merge(cf_handle, Slice("key"), Slice("4")); + ASSERT_OK(db->Write(write_options, &batch)); + ReadOptions read_options; + string value; + ASSERT_OK(db->Get(read_options, cf_handle, "key", &value)); + ASSERT_EQ(value, "value,1,2,3,4"); + delete cf_handle; + } + + { + // Test that the duplicate detection is not compromised after rolling back + // to a save point + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b"))); + txn0->SetSavePoint(); + ASSERT_OK(txn0->RollbackToSavePoint()); + ASSERT_OK(txn0->Commit()); + delete txn0; + } + + // Test sucessfull recovery after a crash + { + ASSERT_OK(ReOpen()); + TransactionOptions txn_options; + WriteOptions write_options; + ReadOptions ropt; + Transaction* txn0; + PinnableSlice pinnable_val; + Status s; + + std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator()); + cf_options.comparator = comp_gc.get(); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + delete cf_handle; + std::vector<ColumnFamilyDescriptor> cfds{ + ColumnFamilyDescriptor(kDefaultColumnFamilyName, + ColumnFamilyOptions(options)), + ColumnFamilyDescriptor(cf_name, cf_options), + }; + std::vector<ColumnFamilyHandle*> handles; + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + + ASSERT_OK(db->Put(write_options, "foo0", "init")); + ASSERT_OK(db->Put(write_options, "foo1", "init")); + ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init")); + ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init")); + + // one entry + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + ASSERT_OK(db->FlushWAL(true)); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0a")); + + // two entries, no duplicate + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b"))); + ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b"))); + ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast<DBImpl*>(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1]); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1b")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "fol1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1b")); + + // one duplicate with ::Put + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c"))); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c"))); + ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + ASSERT_OK(db->FlushWAL(true)); + // Flush only cf 1 + reinterpret_cast<DBImpl*>(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1]); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0d")); + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1c")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1d")); + + // Duplicate with ::Put, ::Delete + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e"))); + ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e"))); + ASSERT_OK(txn0->Delete(Slice("foo0"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + ASSERT_OK(db->FlushWAL(true)); + // Flush only cf 1 + reinterpret_cast<DBImpl*>(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1]); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + + // Duplicate with ::Put, ::SingleDelete + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g"))); + ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e"))); + ASSERT_OK(txn0->SingleDelete(Slice("foo0"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + ASSERT_OK(db->FlushWAL(true)); + // Flush only cf 1 + reinterpret_cast<DBImpl*>(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1]); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + + // Duplicate with ::Put, ::Merge + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i"))); + ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f"))); + ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + ASSERT_OK(db->FlushWAL(true)); + // Flush only cf 1 + reinterpret_cast<DBImpl*>(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1]); + reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0f,bar0g")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1i,bar1j")); + + for (auto h : handles) { + delete h; + } + delete db; + db = nullptr; + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE |