summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/lock/range/range_locking_test.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/transactions/lock/range/range_locking_test.cc459
1 files changed, 459 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/lock/range/range_locking_test.cc b/src/rocksdb/utilities/transactions/lock/range/range_locking_test.cc
new file mode 100644
index 000000000..bce66c1f3
--- /dev/null
+++ b/src/rocksdb/utilities/transactions/lock/range/range_locking_test.cc
@@ -0,0 +1,459 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+#ifndef OS_WIN
+
+#include <algorithm>
+#include <functional>
+#include <string>
+#include <thread>
+
+#include "db/db_impl/db_impl.h"
+#include "port/port.h"
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/perf_context.h"
+#include "rocksdb/utilities/transaction.h"
+#include "rocksdb/utilities/transaction_db.h"
+#include "utilities/transactions/lock/point/point_lock_manager_test.h"
+#include "utilities/transactions/pessimistic_transaction_db.h"
+#include "utilities/transactions/transaction_test.h"
+
+using std::string;
+
+namespace ROCKSDB_NAMESPACE {
+
+class RangeLockingTest : public ::testing::Test {
+ public:
+ TransactionDB* db;
+ std::string dbname;
+ Options options;
+
+ std::shared_ptr<RangeLockManagerHandle> range_lock_mgr;
+ TransactionDBOptions txn_db_options;
+
+ RangeLockingTest() : db(nullptr) {
+ options.create_if_missing = true;
+ dbname = test::PerThreadDBPath("range_locking_testdb");
+
+ EXPECT_OK(DestroyDB(dbname, options));
+
+ range_lock_mgr.reset(NewRangeLockManager(nullptr));
+ txn_db_options.lock_mgr_handle = range_lock_mgr;
+
+ auto s = TransactionDB::Open(options, txn_db_options, dbname, &db);
+ assert(s.ok());
+ }
+
+ ~RangeLockingTest() {
+ delete db;
+ db = nullptr;
+ // This is to skip the assert statement in FaultInjectionTestEnv. There
+ // seems to be a bug in btrfs that the makes readdir return recently
+ // unlink-ed files. By using the default fs we simply ignore errors resulted
+ // from attempting to delete such files in DestroyDB.
+ EXPECT_OK(DestroyDB(dbname, options));
+ }
+
+ PessimisticTransaction* NewTxn(
+ TransactionOptions txn_opt = TransactionOptions()) {
+ Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opt);
+ return reinterpret_cast<PessimisticTransaction*>(txn);
+ }
+};
+
+// TODO: set a smaller lock wait timeout so that the test runs faster.
+TEST_F(RangeLockingTest, BasicRangeLocking) {
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+ std::string value;
+ ReadOptions read_options;
+ auto cf = db->DefaultColumnFamily();
+
+ Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ // Get a range lock
+ ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
+
+ // Check that range Lock inhibits an overlapping range lock
+ {
+ auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
+ ASSERT_TRUE(s.IsTimedOut());
+ }
+
+ // Check that range Lock inhibits an overlapping point lock
+ {
+ auto s = txn1->GetForUpdate(read_options, cf, Slice("b"), &value);
+ ASSERT_TRUE(s.IsTimedOut());
+ }
+
+ // Get a point lock, check that it inhibits range locks
+ ASSERT_OK(txn0->Put(cf, Slice("n"), Slice("value")));
+ {
+ auto s = txn1->GetRangeLock(cf, Endpoint("m"), Endpoint("p"));
+ ASSERT_TRUE(s.IsTimedOut());
+ }
+
+ ASSERT_OK(txn0->Commit());
+ txn1->Rollback();
+
+ delete txn0;
+ delete txn1;
+}
+
+TEST_F(RangeLockingTest, MyRocksLikeUpdate) {
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+ Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+ auto cf = db->DefaultColumnFamily();
+ Status s;
+
+ // Get a range lock for the range we are about to update
+ ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
+
+ bool try_range_lock_called = false;
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "RangeTreeLockManager::TryRangeLock:enter",
+ [&](void* /*arg*/) { try_range_lock_called = true; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // For performance reasons, the following must NOT call lock_mgr->TryLock():
+ // We verify that by checking the value of try_range_lock_called.
+ ASSERT_OK(txn0->Put(cf, Slice("b"), Slice("value"),
+ /*assume_tracked=*/true));
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ ASSERT_FALSE(try_range_lock_called);
+
+ txn0->Rollback();
+
+ delete txn0;
+}
+
+TEST_F(RangeLockingTest, UpgradeLockAndGetConflict) {
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+ auto cf = db->DefaultColumnFamily();
+ Status s;
+ std::string value;
+ txn_options.lock_timeout = 10;
+
+ Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ // Get the shared lock in txn0
+ s = txn0->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
+ false /*exclusive*/);
+ ASSERT_TRUE(s.IsNotFound());
+
+ // Get the shared lock on the same key in txn1
+ s = txn1->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
+ false /*exclusive*/);
+ ASSERT_TRUE(s.IsNotFound());
+
+ // Now, try getting an exclusive lock that overlaps with the above
+ s = txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("b"));
+ ASSERT_TRUE(s.IsTimedOut());
+
+ txn0->Rollback();
+ txn1->Rollback();
+
+ delete txn0;
+ delete txn1;
+}
+
+TEST_F(RangeLockingTest, SnapshotValidation) {
+ Status s;
+ Slice key_slice = Slice("k");
+ ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
+
+ auto txn0 = NewTxn();
+ txn0->Put(key_slice, Slice("initial"));
+ txn0->Commit();
+
+ // txn1
+ auto txn1 = NewTxn();
+ txn1->SetSnapshot();
+ std::string val1;
+ ASSERT_OK(txn1->Get(ReadOptions(), cfh, key_slice, &val1));
+ ASSERT_EQ(val1, "initial");
+ val1 = val1 + std::string("-txn1");
+
+ ASSERT_OK(txn1->Put(cfh, key_slice, Slice(val1)));
+
+ // txn2
+ auto txn2 = NewTxn();
+ txn2->SetSnapshot();
+ std::string val2;
+ // This will see the original value as nothing is committed
+ // This is also Get, so it is doesn't acquire any locks.
+ ASSERT_OK(txn2->Get(ReadOptions(), cfh, key_slice, &val2));
+ ASSERT_EQ(val2, "initial");
+
+ // txn1
+ ASSERT_OK(txn1->Commit());
+
+ // txn2
+ val2 = val2 + std::string("-txn2");
+ // Now, this call should do Snapshot Validation and fail:
+ s = txn2->Put(cfh, key_slice, Slice(val2));
+ ASSERT_TRUE(s.IsBusy());
+
+ ASSERT_OK(txn2->Commit());
+
+ delete txn0;
+ delete txn1;
+ delete txn2;
+}
+
+TEST_F(RangeLockingTest, MultipleTrxLockStatusData) {
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+ auto cf = db->DefaultColumnFamily();
+
+ Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ // Get a range lock
+ ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("z"), Endpoint("z")));
+ ASSERT_OK(txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("e")));
+
+ auto s = range_lock_mgr->GetRangeLockStatusData();
+ ASSERT_EQ(s.size(), 2);
+ for (auto it = s.begin(); it != s.end(); ++it) {
+ ASSERT_EQ(it->first, cf->GetID());
+ auto val = it->second;
+ ASSERT_FALSE(val.start.inf_suffix);
+ ASSERT_FALSE(val.end.inf_suffix);
+ ASSERT_TRUE(val.exclusive);
+ ASSERT_EQ(val.ids.size(), 1);
+ if (val.ids[0] == txn0->GetID()) {
+ ASSERT_EQ(val.start.slice, "z");
+ ASSERT_EQ(val.end.slice, "z");
+ } else if (val.ids[0] == txn1->GetID()) {
+ ASSERT_EQ(val.start.slice, "b");
+ ASSERT_EQ(val.end.slice, "e");
+ } else {
+ FAIL(); // Unknown transaction ID.
+ }
+ }
+
+ delete txn0;
+ delete txn1;
+}
+
+#if defined(__has_feature)
+#if __has_feature(thread_sanitizer)
+#define SKIP_LOCK_ESCALATION_TEST 1
+#endif
+#else
+#define SKIP_LOCK_ESCALATION_TEST 1
+#endif
+
+#ifndef SKIP_LOCK_ESCALATION_TEST
+TEST_F(RangeLockingTest, BasicLockEscalation) {
+ auto cf = db->DefaultColumnFamily();
+
+ auto counters = range_lock_mgr->GetStatus();
+
+ // Initially not using any lock memory
+ ASSERT_EQ(counters.current_lock_memory, 0);
+ ASSERT_EQ(counters.escalation_count, 0);
+
+ ASSERT_EQ(0, range_lock_mgr->SetMaxLockMemory(2000));
+
+ // Insert until we see lock escalations
+ auto txn = NewTxn();
+
+ // Get the locks until we hit an escalation
+ for (int i = 0; i < 2020; i++) {
+ std::ostringstream buf;
+ buf << std::setw(8) << std::setfill('0') << i;
+ std::string buf_str = buf.str();
+ ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
+ }
+ counters = range_lock_mgr->GetStatus();
+ ASSERT_GT(counters.escalation_count, 0);
+ ASSERT_LE(counters.current_lock_memory, 2000);
+
+ delete txn;
+}
+
+// An escalation barrier function. Allow escalation iff the first two bytes are
+// identical.
+static bool escalation_barrier(const Endpoint& a, const Endpoint& b) {
+ assert(a.slice.size() > 2);
+ assert(b.slice.size() > 2);
+ if (memcmp(a.slice.data(), b.slice.data(), 2)) {
+ return true; // This is a barrier
+ } else {
+ return false; // No barrier
+ }
+}
+
+TEST_F(RangeLockingTest, LockEscalationBarrier) {
+ auto cf = db->DefaultColumnFamily();
+
+ auto counters = range_lock_mgr->GetStatus();
+
+ // Initially not using any lock memory
+ ASSERT_EQ(counters.escalation_count, 0);
+
+ range_lock_mgr->SetMaxLockMemory(8000);
+ range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier);
+
+ // Insert enough locks to cause lock escalations to happen
+ auto txn = NewTxn();
+ const int N = 2000;
+ for (int i = 0; i < N; i++) {
+ std::ostringstream buf;
+ buf << std::setw(4) << std::setfill('0') << i;
+ std::string buf_str = buf.str();
+ ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
+ }
+ counters = range_lock_mgr->GetStatus();
+ ASSERT_GT(counters.escalation_count, 0);
+
+ // Check that lock escalation was not performed across escalation barriers:
+ // Use another txn to acquire locks near the barriers.
+ auto txn2 = NewTxn();
+ range_lock_mgr->SetMaxLockMemory(500000);
+ for (int i = 100; i < N; i += 100) {
+ std::ostringstream buf;
+ buf << std::setw(4) << std::setfill('0') << i - 1 << "-a";
+ std::string buf_str = buf.str();
+ // Check that we CAN get a lock near the escalation barrier
+ ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
+ }
+
+ txn->Rollback();
+ txn2->Rollback();
+ delete txn;
+ delete txn2;
+}
+
+#endif
+
+TEST_F(RangeLockingTest, LockWaitCount) {
+ TransactionOptions txn_options;
+ auto cf = db->DefaultColumnFamily();
+ txn_options.lock_timeout = 50;
+ Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
+ Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
+
+ // Get a range lock
+ ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
+
+ uint64_t lock_waits1 = range_lock_mgr->GetStatus().lock_wait_count;
+ // Attempt to get a conflicting lock
+ auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
+ ASSERT_TRUE(s.IsTimedOut());
+
+ // Check that the counter was incremented
+ uint64_t lock_waits2 = range_lock_mgr->GetStatus().lock_wait_count;
+ ASSERT_EQ(lock_waits1 + 1, lock_waits2);
+
+ txn0->Rollback();
+ txn1->Rollback();
+
+ delete txn0;
+ delete txn1;
+}
+
+TEST_F(RangeLockingTest, LockWaiteeAccess) {
+ TransactionOptions txn_options;
+ auto cf = db->DefaultColumnFamily();
+ txn_options.lock_timeout = 60;
+ Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
+ Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
+
+ // Get a range lock
+ ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
+
+ std::atomic<bool> reached(false);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "RangeTreeLockManager::TryRangeLock:EnterWaitingTxn", [&](void* /*arg*/) {
+ reached.store(true);
+ std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ port::Thread t([&]() {
+ // Attempt to get a conflicting lock
+ auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
+ ASSERT_TRUE(s.ok());
+ txn1->Rollback();
+ });
+
+ while (!reached.load()) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // Release locks and free the transaction
+ txn0->Rollback();
+ delete txn0;
+
+ t.join();
+
+ delete txn1;
+}
+
+void PointLockManagerTestExternalSetup(PointLockManagerTest* self) {
+ self->env_ = Env::Default();
+ self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
+ ASSERT_OK(self->env_->CreateDir(self->db_dir_));
+
+ Options opt;
+ opt.create_if_missing = true;
+ TransactionDBOptions txn_opt;
+ txn_opt.transaction_lock_timeout = 0;
+
+ auto mutex_factory = std::make_shared<TransactionDBMutexFactoryImpl>();
+ self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager());
+ std::shared_ptr<RangeLockManagerHandle> range_lock_mgr =
+ std::dynamic_pointer_cast<RangeLockManagerHandle>(self->locker_);
+ txn_opt.lock_mgr_handle = range_lock_mgr;
+
+ ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_));
+ self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn";
+}
+
+INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest,
+ ::testing::Values(PointLockManagerTestExternalSetup));
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else // OS_WIN
+
+#include <stdio.h>
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "skipped as Range Locking is not supported on Windows\n");
+ return 0;
+}
+
+#endif // OS_WIN
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr,
+ "skipped as transactions are not supported in rocksdb_lite\n");
+ return 0;
+}
+
+#endif // ROCKSDB_LITE