diff options
Diffstat (limited to 'src/rocksdb/utilities/transactions/timestamped_snapshot_test.cc')
-rw-r--r-- | src/rocksdb/utilities/transactions/timestamped_snapshot_test.cc | 466 |
1 files changed, 466 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/timestamped_snapshot_test.cc b/src/rocksdb/utilities/transactions/timestamped_snapshot_test.cc new file mode 100644 index 000000000..e9b474415 --- /dev/null +++ b/src/rocksdb/utilities/transactions/timestamped_snapshot_test.cc @@ -0,0 +1,466 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifdef ROCKSDB_LITE +#include <cstdio> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Transactions are not supported in LITE mode\n"); + return 0; +} +#else // ROCKSDB_LITE +#include <cassert> + +#include "util/cast_util.h" +#include "utilities/transactions/transaction_test.h" + +namespace ROCKSDB_NAMESPACE { +INSTANTIATE_TEST_CASE_P( + Unsupported, TimestampedSnapshotWithTsSanityCheck, + ::testing::Values( + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite))); + +INSTANTIATE_TEST_CASE_P(WriteCommitted, TransactionTest, + ::testing::Combine(::testing::Bool(), ::testing::Bool(), + ::testing::Values(WRITE_COMMITTED), + ::testing::Values(kOrderedWrite))); + +namespace { +// Not thread-safe. Caller needs to provide external synchronization. +class TsCheckingTxnNotifier : public TransactionNotifier { + public: + explicit TsCheckingTxnNotifier() = default; + + ~TsCheckingTxnNotifier() override {} + + void SnapshotCreated(const Snapshot* new_snapshot) override { + assert(new_snapshot); + if (prev_snapshot_seq_ != kMaxSequenceNumber) { + assert(prev_snapshot_seq_ <= new_snapshot->GetSequenceNumber()); + } + prev_snapshot_seq_ = new_snapshot->GetSequenceNumber(); + if (prev_snapshot_ts_ != kMaxTxnTimestamp) { + assert(prev_snapshot_ts_ <= new_snapshot->GetTimestamp()); + } + prev_snapshot_ts_ = new_snapshot->GetTimestamp(); + } + + TxnTimestamp prev_snapshot_ts() const { return prev_snapshot_ts_; } + + private: + SequenceNumber prev_snapshot_seq_ = kMaxSequenceNumber; + TxnTimestamp prev_snapshot_ts_ = kMaxTxnTimestamp; +}; +} // anonymous namespace + +TEST_P(TimestampedSnapshotWithTsSanityCheck, WithoutCommitTs) { + std::unique_ptr<Transaction> txn( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("a", "v")); + ASSERT_OK(txn->Prepare()); + Status s = txn->CommitAndTryCreateSnapshot(); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(txn->Rollback()); + + txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("a", "v")); + s = txn->CommitAndTryCreateSnapshot(); + ASSERT_TRUE(s.IsInvalidArgument()); +} + +TEST_P(TimestampedSnapshotWithTsSanityCheck, SetCommitTs) { + std::unique_ptr<Transaction> txn( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("a", "v")); + ASSERT_OK(txn->Prepare()); + std::shared_ptr<const Snapshot> snapshot; + Status s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot); + ASSERT_TRUE(s.IsNotSupported()); + ASSERT_OK(txn->Rollback()); + + txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("a", "v")); + s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot); + ASSERT_TRUE(s.IsNotSupported()); +} + +TEST_P(TransactionTest, WithoutCommitTs) { + std::unique_ptr<Transaction> txn( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("a", "v")); + ASSERT_OK(txn->Prepare()); + Status s = txn->CommitAndTryCreateSnapshot(); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(txn->Rollback()); + + txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("a", "v")); + s = txn->CommitAndTryCreateSnapshot(); + ASSERT_TRUE(s.IsInvalidArgument()); +} + +TEST_P(TransactionTest, ReuseExistingTxn) { + Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); + assert(txn); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("a", "v1")); + ASSERT_OK(txn->Prepare()); + + auto notifier = std::make_shared<TsCheckingTxnNotifier>(); + std::shared_ptr<const Snapshot> snapshot1; + Status s = + txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/100, &snapshot1); + ASSERT_OK(s); + ASSERT_EQ(100, snapshot1->GetTimestamp()); + + Transaction* txn1 = + db->BeginTransaction(WriteOptions(), TransactionOptions(), txn); + assert(txn1 == txn); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn->Put("a", "v2")); + ASSERT_OK(txn->Prepare()); + std::shared_ptr<const Snapshot> snapshot2; + s = txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/110, &snapshot2); + ASSERT_OK(s); + ASSERT_EQ(110, snapshot2->GetTimestamp()); + delete txn; + + { + std::string value; + ReadOptions read_opts; + read_opts.snapshot = snapshot1.get(); + ASSERT_OK(db->Get(read_opts, "a", &value)); + ASSERT_EQ("v1", value); + + read_opts.snapshot = snapshot2.get(); + ASSERT_OK(db->Get(read_opts, "a", &value)); + ASSERT_EQ("v2", value); + } +} + +TEST_P(TransactionTest, CreateSnapshotWhenCommit) { + std::unique_ptr<Transaction> txn( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn); + + constexpr int batch_size = 10; + for (int i = 0; i < batch_size; ++i) { + ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i), "v0")); + } + const SequenceNumber seq0 = db->GetLatestSequenceNumber(); + ASSERT_EQ(static_cast<SequenceNumber>(batch_size), seq0); + + txn->SetSnapshot(); + { + const Snapshot* const snapshot = txn->GetSnapshot(); + assert(snapshot); + ASSERT_EQ(seq0, snapshot->GetSequenceNumber()); + } + + for (int i = 0; i < batch_size; ++i) { + ASSERT_OK(txn->Put("k" + std::to_string(i), "v1")); + } + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Prepare()); + + std::shared_ptr<const Snapshot> snapshot; + constexpr TxnTimestamp timestamp = 1; + auto notifier = std::make_shared<TsCheckingTxnNotifier>(); + Status s = txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot); + ASSERT_OK(s); + ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp); + assert(snapshot); + ASSERT_EQ(timestamp, snapshot->GetTimestamp()); + ASSERT_EQ(seq0 + batch_size, snapshot->GetSequenceNumber()); + const Snapshot* const raw_snapshot_ptr = txn->GetSnapshot(); + ASSERT_EQ(raw_snapshot_ptr, snapshot.get()); + ASSERT_EQ(snapshot, txn->GetTimestampedSnapshot()); + + { + std::shared_ptr<const Snapshot> snapshot1 = + db->GetLatestTimestampedSnapshot(); + ASSERT_EQ(snapshot, snapshot1); + } + { + std::shared_ptr<const Snapshot> snapshot1 = + db->GetTimestampedSnapshot(timestamp); + ASSERT_EQ(snapshot, snapshot1); + } + { + std::vector<std::shared_ptr<const Snapshot> > snapshots; + s = db->GetAllTimestampedSnapshots(snapshots); + ASSERT_OK(s); + ASSERT_EQ(std::vector<std::shared_ptr<const Snapshot> >{snapshot}, + snapshots); + } +} + +TEST_P(TransactionTest, CreateSnapshot) { + // First create a non-timestamped snapshot + ManagedSnapshot snapshot_guard(db); + for (int i = 0; i < 10; ++i) { + ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i), + "v0_" + std::to_string(i))); + } + { + auto ret = db->CreateTimestampedSnapshot(kMaxTxnTimestamp); + ASSERT_TRUE(ret.first.IsInvalidArgument()); + auto snapshot = ret.second; + ASSERT_EQ(nullptr, snapshot.get()); + } + constexpr TxnTimestamp timestamp = 100; + Status s; + std::shared_ptr<const Snapshot> ts_snap0; + std::tie(s, ts_snap0) = db->CreateTimestampedSnapshot(timestamp); + ASSERT_OK(s); + assert(ts_snap0); + ASSERT_EQ(timestamp, ts_snap0->GetTimestamp()); + for (int i = 0; i < 10; ++i) { + ASSERT_OK(db->Delete(WriteOptions(), "k" + std::to_string(i))); + } + { + ReadOptions read_opts; + read_opts.snapshot = ts_snap0.get(); + for (int i = 0; i < 10; ++i) { + std::string value; + s = db->Get(read_opts, "k" + std::to_string(i), &value); + ASSERT_OK(s); + ASSERT_EQ("v0_" + std::to_string(i), value); + } + } + { + std::shared_ptr<const Snapshot> snapshot = + db->GetLatestTimestampedSnapshot(); + ASSERT_EQ(ts_snap0, snapshot); + } + { + std::shared_ptr<const Snapshot> snapshot = + db->GetTimestampedSnapshot(timestamp); + ASSERT_OK(s); + ASSERT_EQ(ts_snap0, snapshot); + } + { + std::vector<std::shared_ptr<const Snapshot> > snapshots; + s = db->GetAllTimestampedSnapshots(snapshots); + ASSERT_OK(s); + ASSERT_EQ(std::vector<std::shared_ptr<const Snapshot> >{ts_snap0}, + snapshots); + } +} + +TEST_P(TransactionTest, SequenceAndTsOrder) { + Status s; + std::shared_ptr<const Snapshot> snapshot; + std::tie(s, snapshot) = db->CreateTimestampedSnapshot(100); + ASSERT_OK(s); + assert(snapshot); + { + // Cannot request smaller timestamp for the new timestamped snapshot. + std::shared_ptr<const Snapshot> tmp_snapshot; + std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(50); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_EQ(nullptr, tmp_snapshot.get()); + } + + // If requesting a new timestamped snapshot with the same timestamp and + // sequence number, we avoid creating new snapshot object but reuse + // exisisting one. + std::shared_ptr<const Snapshot> snapshot1; + std::tie(s, snapshot1) = db->CreateTimestampedSnapshot(100); + ASSERT_OK(s); + ASSERT_EQ(snapshot.get(), snapshot1.get()); + + // If there is no write, but we request a larger timestamp, we still create + // a new snapshot object. + std::shared_ptr<const Snapshot> snapshot2; + std::tie(s, snapshot2) = db->CreateTimestampedSnapshot(200); + ASSERT_OK(s); + assert(snapshot2); + ASSERT_NE(snapshot.get(), snapshot2.get()); + ASSERT_EQ(snapshot2->GetSequenceNumber(), snapshot->GetSequenceNumber()); + ASSERT_EQ(200, snapshot2->GetTimestamp()); + + // Increase sequence number. + ASSERT_OK(db->Put(WriteOptions(), "foo", "v0")); + { + // We are requesting the same timestamp for a larger sequence number, thus + // we cannot create timestamped snapshot. + std::shared_ptr<const Snapshot> tmp_snapshot; + std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(200); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_EQ(nullptr, tmp_snapshot.get()); + } + { + std::unique_ptr<Transaction> txn1( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn1->Put("bar", "v0")); + std::shared_ptr<const Snapshot> ss; + ASSERT_OK(txn1->CommitAndTryCreateSnapshot(nullptr, 200, &ss)); + // Cannot create snapshot because requested timestamp is the same as the + // latest timestamped snapshot while sequence number is strictly higher. + ASSERT_EQ(nullptr, ss); + } + { + std::unique_ptr<Transaction> txn2( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn2->Put("bar", "v0")); + std::shared_ptr<const Snapshot> ss; + // Application should never do this. This is just to demonstrate error + // handling. + ASSERT_OK(txn2->CommitAndTryCreateSnapshot(nullptr, 100, &ss)); + // Cannot create snapshot because requested timestamp is smaller than + // latest timestamped snapshot. + ASSERT_EQ(nullptr, ss); + } +} + +TEST_P(TransactionTest, CloseDbWithSnapshots) { + std::unique_ptr<Transaction> txn( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Put("foo", "v")); + ASSERT_OK(txn->Prepare()); + std::shared_ptr<const Snapshot> snapshot; + constexpr TxnTimestamp timestamp = 121; + auto notifier = std::make_shared<TsCheckingTxnNotifier>(); + ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot)); + assert(snapshot); + ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp); + ASSERT_EQ(timestamp, snapshot->GetTimestamp()); + ASSERT_TRUE(db->Close().IsAborted()); +} + +TEST_P(TransactionTest, MultipleTimestampedSnapshots) { + auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB()); + assert(dbimpl); + const bool seq_per_batch = dbimpl->seq_per_batch(); + // TODO: remove the following assert(!seq_per_batch) once timestamped snapshot + // is supported in write-prepared/write-unprepared transactions. + assert(!seq_per_batch); + constexpr size_t txn_size = 10; + constexpr TxnTimestamp ts_delta = 10; + constexpr size_t num_txns = 100; + std::vector<std::shared_ptr<const Snapshot> > snapshots(num_txns); + constexpr TxnTimestamp start_ts = 10000; + auto notifier = std::make_shared<TsCheckingTxnNotifier>(); + for (size_t i = 0; i < num_txns; ++i) { + std::unique_ptr<Transaction> txn( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn->SetName("txn" + std::to_string(i))); + for (size_t j = 0; j < txn_size; ++j) { + ASSERT_OK(txn->Put("k" + std::to_string(j), + "v" + std::to_string(j) + "_" + std::to_string(i))); + } + if (0 == (i % 2)) { + ASSERT_OK(txn->Prepare()); + } + ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, start_ts + i * ts_delta, + &snapshots[i])); + assert(snapshots[i]); + ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp); + ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp()); + } + + { + auto snapshot = db->GetTimestampedSnapshot(start_ts + 1); + ASSERT_EQ(nullptr, snapshot); + } + + constexpr TxnTimestamp max_ts = start_ts + num_txns * ts_delta; + for (size_t i = 0; i < num_txns; ++i) { + auto snapshot = db->GetTimestampedSnapshot(start_ts + i * ts_delta); + ASSERT_EQ(snapshots[i], snapshot); + + std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots; + Status s = db->GetTimestampedSnapshots(max_ts, start_ts + i * ts_delta, + tmp_snapshots); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_TRUE(tmp_snapshots.empty()); + + for (size_t j = i; j < num_txns; ++j) { + std::vector<std::shared_ptr<const Snapshot> > expected_snapshots( + snapshots.begin() + i, snapshots.begin() + j); + tmp_snapshots.clear(); + s = db->GetTimestampedSnapshots(start_ts + i * ts_delta, + start_ts + j * ts_delta, tmp_snapshots); + if (i < j) { + ASSERT_OK(s); + } else { + ASSERT_TRUE(s.IsInvalidArgument()); + } + ASSERT_EQ(expected_snapshots, tmp_snapshots); + } + } + + { + std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots; + const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots); + ASSERT_OK(s); + ASSERT_EQ(snapshots, tmp_snapshots); + + const std::shared_ptr<const Snapshot> latest_snapshot = + db->GetLatestTimestampedSnapshot(); + ASSERT_EQ(snapshots.back(), latest_snapshot); + } + + for (size_t i = 0; i <= num_txns; ++i) { + std::vector<std::shared_ptr<const Snapshot> > snapshots1( + snapshots.begin() + i, snapshots.end()); + if (i > 0) { + auto snapshot1 = + db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta); + assert(snapshot1); + ASSERT_EQ(start_ts + (i - 1) * ts_delta, snapshot1->GetTimestamp()); + } + + db->ReleaseTimestampedSnapshotsOlderThan(start_ts + i * ts_delta); + + if (i > 0) { + auto snapshot1 = + db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta); + ASSERT_EQ(nullptr, snapshot1); + } + + std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots; + const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots); + ASSERT_OK(s); + ASSERT_EQ(snapshots1, tmp_snapshots); + } + + // Even after released by db, the applications still hold reference to shared + // snapshots. + for (size_t i = 0; i < num_txns; ++i) { + assert(snapshots[i]); + ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp()); + } + + snapshots.clear(); + ASSERT_OK(db->Close()); + delete db; + db = nullptr; +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +#endif // !ROCKSDB_LITE |