diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h')
-rw-r--r-- | src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h | 324 |
1 files changed, 324 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h new file mode 100644 index 000000000..ca9f46bf9 --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h @@ -0,0 +1,324 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "file/file_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "utilities/transactions/lock/point/point_lock_manager.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +class MockColumnFamilyHandle : public ColumnFamilyHandle { + public: + explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {} + + ~MockColumnFamilyHandle() override {} + + const std::string& GetName() const override { return name_; } + + ColumnFamilyId GetID() const override { return cf_id_; } + + Status GetDescriptor(ColumnFamilyDescriptor*) override { + return Status::OK(); + } + + const Comparator* GetComparator() const override { + return BytewiseComparator(); + } + + private: + ColumnFamilyId cf_id_; + std::string name_ = "MockCF"; +}; + +class PointLockManagerTest : public testing::Test { + public: + void SetUp() override { + env_ = Env::Default(); + db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); + ASSERT_OK(env_->CreateDir(db_dir_)); + + Options opt; + opt.create_if_missing = true; + TransactionDBOptions txn_opt; + txn_opt.transaction_lock_timeout = 0; + + ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_)); + + // CAUTION: This test creates a separate lock manager object (right, NOT + // the one that the TransactionDB is using!), and runs tests on it. + locker_.reset(new PointLockManager( + static_cast<PessimisticTransactionDB*>(db_), txn_opt)); + + wait_sync_point_name_ = "PointLockManager::AcquireWithTimeout:WaitingTxn"; + } + + void TearDown() override { + delete db_; + EXPECT_OK(DestroyDir(env_, db_dir_)); + } + + PessimisticTransaction* NewTxn( + TransactionOptions txn_opt = TransactionOptions()) { + Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt); + return reinterpret_cast<PessimisticTransaction*>(txn); + } + + protected: + Env* env_; + std::shared_ptr<LockManager> locker_; + const char* wait_sync_point_name_; + friend void PointLockManagerTestExternalSetup(PointLockManagerTest*); + + private: + std::string db_dir_; + TransactionDB* db_; +}; + +using init_func_t = void (*)(PointLockManagerTest*); + +class AnyLockManagerTest : public PointLockManagerTest, + public testing::WithParamInterface<init_func_t> { + public: + void SetUp() override { + // If a custom setup function was provided, use it. Otherwise, use what we + // have inherited. + auto init_func = GetParam(); + if (init_func) + (*init_func)(this); + else + PointLockManagerTest::SetUp(); + } +}; + +TEST_P(AnyLockManagerTest, ReentrantExclusiveLock) { + // Tests that a txn can acquire exclusive lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, ReentrantSharedLock) { + // Tests that a txn can acquire shared lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, LockUpgrade) { + // Tests that a txn can upgrade from a shared lock to an exclusive lock. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockDowngrade) { + // Tests that a txn can acquire a shared lock after acquiring an exclusive + // lock on the same key. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockConflict) { + // Tests that lock conflicts lead to lock timeout. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + + { + // exclusive-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // exclusive-shared conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, false); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // shared-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + + delete txn1; + delete txn2; +} + +port::Thread BlockUntilWaitingTxn(const char* sync_point_name, + std::function<void()> f) { + std::atomic<bool> reached(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + sync_point_name, [&](void* /*arg*/) { reached.store(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread t(f); + + while (!reached.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + return t; +} + +TEST_P(AnyLockManagerTest, SharedLocks) { + // Tests that shared locks can be concurrently held by multiple transactions. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn1, 1, "k", env_); + locker_->UnLock(txn2, 1, "k", env_); + + delete txn1; + delete txn2; +} + +TEST_P(AnyLockManagerTest, Deadlock) { + // Tests that deadlock can be detected. + // Deadlock scenario: + // txn1 exclusively locks k1, and wants to lock k2; + // txn2 exclusively locks k2, and wants to lock k1. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + TransactionOptions txn_opt; + txn_opt.deadlock_detect = true; + txn_opt.lock_timeout = 1000000; + auto txn1 = NewTxn(txn_opt); + auto txn2 = NewTxn(txn_opt); + + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); + + // txn1 tries to lock k2, will block forever. + port::Thread t = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + // block because txn2 is holding a lock on k2. + locker_->TryLock(txn1, 1, "k2", env_, true); + }); + + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsBusy()); + ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock); + + std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer(); + ASSERT_EQ(deadlock_paths.size(), 1u); + ASSERT_FALSE(deadlock_paths[0].limit_exceeded); + + std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path; + ASSERT_EQ(deadlocks.size(), 2u); + + ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID()); + ASSERT_EQ(deadlocks[0].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[0].m_exclusive); + ASSERT_EQ(deadlocks[0].m_waiting_key, "k2"); + + ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID()); + ASSERT_EQ(deadlocks[1].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[1].m_exclusive); + ASSERT_EQ(deadlocks[1].m_waiting_key, "k1"); + + locker_->UnLock(txn2, 1, "k2", env_); + t.join(); + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + delete txn2; + delete txn1; +} + +TEST_P(AnyLockManagerTest, GetWaitingTxns_MultipleTxns) { + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + + auto txn1 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); + + auto txn3 = NewTxn(); + txn3->SetLockTimeout(10000); + port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + ASSERT_OK(locker_->TryLock(txn3, 1, "k", env_, true)); + locker_->UnLock(txn3, 1, "k", env_); + }); + + // Ok, now txn3 is waiting for lock on "k", which is owned by two + // transactions. Check that GetWaitingTxns reports this correctly + uint32_t wait_cf_id; + std::string wait_key; + auto waiters = txn3->GetWaitingTxns(&wait_cf_id, &wait_key); + + ASSERT_EQ(wait_cf_id, 1u); + ASSERT_EQ(wait_key, "k"); + ASSERT_EQ(waiters.size(), 2); + bool waits_correct = + (waiters[0] == txn1->GetID() && waiters[1] == txn2->GetID()) || + (waiters[1] == txn1->GetID() && waiters[0] == txn2->GetID()); + ASSERT_EQ(waits_correct, true); + + // Release locks so txn3 can proceed with execution + locker_->UnLock(txn1, 1, "k", env_); + locker_->UnLock(txn2, 1, "k", env_); + + // Wait until txn3 finishes + t1.join(); + + delete txn1; + delete txn2; + delete txn3; +} + +} // namespace ROCKSDB_NAMESPACE |