diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/transactions/lock/point | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/transactions/lock/point')
6 files changed, 1806 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_manager.cc b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager.cc new file mode 100644 index 000000000..b362a164d --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager.cc @@ -0,0 +1,721 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/lock/point/point_lock_manager.h" + +#include <algorithm> +#include <cinttypes> +#include <mutex> + +#include "monitoring/perf_context_imp.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/transaction_db_mutex.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/hash.h" +#include "util/thread_local.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +struct LockInfo { + bool exclusive; + autovector<TransactionID> txn_ids; + + // Transaction locks are not valid after this time in us + uint64_t expiration_time; + + LockInfo(TransactionID id, uint64_t time, bool ex) + : exclusive(ex), expiration_time(time) { + txn_ids.push_back(id); + } + LockInfo(const LockInfo& lock_info) + : exclusive(lock_info.exclusive), + txn_ids(lock_info.txn_ids), + expiration_time(lock_info.expiration_time) {} + void operator=(const LockInfo& lock_info) { + exclusive = lock_info.exclusive; + txn_ids = lock_info.txn_ids; + expiration_time = lock_info.expiration_time; + } + DECLARE_DEFAULT_MOVES(LockInfo); +}; + +struct LockMapStripe { + explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) { + stripe_mutex = factory->AllocateMutex(); + stripe_cv = factory->AllocateCondVar(); + assert(stripe_mutex); + assert(stripe_cv); + } + + // Mutex must be held before modifying keys map + std::shared_ptr<TransactionDBMutex> stripe_mutex; + + // Condition Variable per stripe for waiting on a lock + std::shared_ptr<TransactionDBCondVar> stripe_cv; + + // Locked keys mapped to the info about the transactions that locked them. + // TODO(agiardullo): Explore performance of other data structures. + UnorderedMap<std::string, LockInfo> keys; +}; + +// Map of #num_stripes LockMapStripes +struct LockMap { + explicit LockMap(size_t num_stripes, + std::shared_ptr<TransactionDBMutexFactory> factory) + : num_stripes_(num_stripes) { + lock_map_stripes_.reserve(num_stripes); + for (size_t i = 0; i < num_stripes; i++) { + LockMapStripe* stripe = new LockMapStripe(factory); + lock_map_stripes_.push_back(stripe); + } + } + + ~LockMap() { + for (auto stripe : lock_map_stripes_) { + delete stripe; + } + } + + // Number of sepearate LockMapStripes to create, each with their own Mutex + const size_t num_stripes_; + + // Count of keys that are currently locked in this column family. + // (Only maintained if PointLockManager::max_num_locks_ is positive.) + std::atomic<int64_t> lock_cnt{0}; + + std::vector<LockMapStripe*> lock_map_stripes_; + + size_t GetStripe(const std::string& key) const; +}; + +namespace { +void UnrefLockMapsCache(void* ptr) { + // Called when a thread exits or a ThreadLocalPtr gets destroyed. + auto lock_maps_cache = + static_cast<UnorderedMap<uint32_t, std::shared_ptr<LockMap>>*>(ptr); + delete lock_maps_cache; +} +} // anonymous namespace + +PointLockManager::PointLockManager(PessimisticTransactionDB* txn_db, + const TransactionDBOptions& opt) + : txn_db_impl_(txn_db), + default_num_stripes_(opt.num_stripes), + max_num_locks_(opt.max_num_locks), + lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), + dlock_buffer_(opt.max_num_deadlocks), + mutex_factory_(opt.custom_mutex_factory + ? opt.custom_mutex_factory + : std::make_shared<TransactionDBMutexFactoryImpl>()) {} + +size_t LockMap::GetStripe(const std::string& key) const { + assert(num_stripes_ > 0); + return FastRange64(GetSliceNPHash64(key), num_stripes_); +} + +void PointLockManager::AddColumnFamily(const ColumnFamilyHandle* cf) { + InstrumentedMutexLock l(&lock_map_mutex_); + + if (lock_maps_.find(cf->GetID()) == lock_maps_.end()) { + lock_maps_.emplace(cf->GetID(), std::make_shared<LockMap>( + default_num_stripes_, mutex_factory_)); + } else { + // column_family already exists in lock map + assert(false); + } +} + +void PointLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cf) { + // Remove lock_map for this column family. Since the lock map is stored + // as a shared ptr, concurrent transactions can still keep using it + // until they release their references to it. + { + InstrumentedMutexLock l(&lock_map_mutex_); + + auto lock_maps_iter = lock_maps_.find(cf->GetID()); + if (lock_maps_iter == lock_maps_.end()) { + return; + } + + lock_maps_.erase(lock_maps_iter); + } // lock_map_mutex_ + + // Clear all thread-local caches + autovector<void*> local_caches; + lock_maps_cache_->Scrape(&local_caches, nullptr); + for (auto cache : local_caches) { + delete static_cast<LockMaps*>(cache); + } +} + +// Look up the LockMap std::shared_ptr for a given column_family_id. +// Note: The LockMap is only valid as long as the caller is still holding on +// to the returned std::shared_ptr. +std::shared_ptr<LockMap> PointLockManager::GetLockMap( + ColumnFamilyId column_family_id) { + // First check thread-local cache + if (lock_maps_cache_->Get() == nullptr) { + lock_maps_cache_->Reset(new LockMaps()); + } + + auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get()); + + auto lock_map_iter = lock_maps_cache->find(column_family_id); + if (lock_map_iter != lock_maps_cache->end()) { + // Found lock map for this column family. + return lock_map_iter->second; + } + + // Not found in local cache, grab mutex and check shared LockMaps + InstrumentedMutexLock l(&lock_map_mutex_); + + lock_map_iter = lock_maps_.find(column_family_id); + if (lock_map_iter == lock_maps_.end()) { + return std::shared_ptr<LockMap>(nullptr); + } else { + // Found lock map. Store in thread-local cache and return. + std::shared_ptr<LockMap>& lock_map = lock_map_iter->second; + lock_maps_cache->insert({column_family_id, lock_map}); + + return lock_map; + } +} + +// Returns true if this lock has expired and can be acquired by another +// transaction. +// If false, sets *expire_time to the expiration time of the lock according +// to Env->GetMicros() or 0 if no expiration. +bool PointLockManager::IsLockExpired(TransactionID txn_id, + const LockInfo& lock_info, Env* env, + uint64_t* expire_time) { + if (lock_info.expiration_time == 0) { + *expire_time = 0; + return false; + } + + auto now = env->NowMicros(); + bool expired = lock_info.expiration_time <= now; + if (!expired) { + // return how many microseconds until lock will be expired + *expire_time = lock_info.expiration_time; + } else { + for (auto id : lock_info.txn_ids) { + if (txn_id == id) { + continue; + } + + bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id); + if (!success) { + expired = false; + *expire_time = 0; + break; + } + } + } + + return expired; +} + +Status PointLockManager::TryLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env* env, + bool exclusive) { + // Lookup lock map for this column family id + std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); + LockMap* lock_map = lock_map_ptr.get(); + if (lock_map == nullptr) { + char msg[255]; + snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32, + column_family_id); + + return Status::InvalidArgument(msg); + } + + // Need to lock the mutex for the stripe that this key hashes to + size_t stripe_num = lock_map->GetStripe(key); + assert(lock_map->lock_map_stripes_.size() > stripe_num); + LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); + + LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive); + int64_t timeout = txn->GetLockTimeout(); + + return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env, + timeout, lock_info); +} + +// Helper function for TryLock(). +Status PointLockManager::AcquireWithTimeout( + PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, + ColumnFamilyId column_family_id, const std::string& key, Env* env, + int64_t timeout, const LockInfo& lock_info) { + Status result; + uint64_t end_time = 0; + + if (timeout > 0) { + uint64_t start_time = env->NowMicros(); + end_time = start_time + timeout; + } + + if (timeout < 0) { + // If timeout is negative, we wait indefinitely to acquire the lock + result = stripe->stripe_mutex->Lock(); + } else { + result = stripe->stripe_mutex->TryLockFor(timeout); + } + + if (!result.ok()) { + // failed to acquire mutex + return result; + } + + // Acquire lock if we are able to + uint64_t expire_time_hint = 0; + autovector<TransactionID> wait_ids; + result = AcquireLocked(lock_map, stripe, key, env, lock_info, + &expire_time_hint, &wait_ids); + + if (!result.ok() && timeout != 0) { + PERF_TIMER_GUARD(key_lock_wait_time); + PERF_COUNTER_ADD(key_lock_wait_count, 1); + // If we weren't able to acquire the lock, we will keep retrying as long + // as the timeout allows. + bool timed_out = false; + do { + // Decide how long to wait + int64_t cv_end_time = -1; + if (expire_time_hint > 0 && end_time > 0) { + cv_end_time = std::min(expire_time_hint, end_time); + } else if (expire_time_hint > 0) { + cv_end_time = expire_time_hint; + } else if (end_time > 0) { + cv_end_time = end_time; + } + + assert(result.IsBusy() || wait_ids.size() != 0); + + // We are dependent on a transaction to finish, so perform deadlock + // detection. + if (wait_ids.size() != 0) { + if (txn->IsDeadlockDetect()) { + if (IncrementWaiters(txn, wait_ids, key, column_family_id, + lock_info.exclusive, env)) { + result = Status::Busy(Status::SubCode::kDeadlock); + stripe->stripe_mutex->UnLock(); + return result; + } + } + txn->SetWaitingTxn(wait_ids, column_family_id, &key); + } + + TEST_SYNC_POINT("PointLockManager::AcquireWithTimeout:WaitingTxn"); + if (cv_end_time < 0) { + // Wait indefinitely + result = stripe->stripe_cv->Wait(stripe->stripe_mutex); + } else { + uint64_t now = env->NowMicros(); + if (static_cast<uint64_t>(cv_end_time) > now) { + result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex, + cv_end_time - now); + } + } + + if (wait_ids.size() != 0) { + txn->ClearWaitingTxn(); + if (txn->IsDeadlockDetect()) { + DecrementWaiters(txn, wait_ids); + } + } + + if (result.IsTimedOut()) { + timed_out = true; + // Even though we timed out, we will still make one more attempt to + // acquire lock below (it is possible the lock expired and we + // were never signaled). + } + + if (result.ok() || result.IsTimedOut()) { + result = AcquireLocked(lock_map, stripe, key, env, lock_info, + &expire_time_hint, &wait_ids); + } + } while (!result.ok() && !timed_out); + } + + stripe->stripe_mutex->UnLock(); + + return result; +} + +void PointLockManager::DecrementWaiters( + const PessimisticTransaction* txn, + const autovector<TransactionID>& wait_ids) { + std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); + DecrementWaitersImpl(txn, wait_ids); +} + +void PointLockManager::DecrementWaitersImpl( + const PessimisticTransaction* txn, + const autovector<TransactionID>& wait_ids) { + auto id = txn->GetID(); + assert(wait_txn_map_.Contains(id)); + wait_txn_map_.Delete(id); + + for (auto wait_id : wait_ids) { + rev_wait_txn_map_.Get(wait_id)--; + if (rev_wait_txn_map_.Get(wait_id) == 0) { + rev_wait_txn_map_.Delete(wait_id); + } + } +} + +bool PointLockManager::IncrementWaiters( + const PessimisticTransaction* txn, + const autovector<TransactionID>& wait_ids, const std::string& key, + const uint32_t& cf_id, const bool& exclusive, Env* const env) { + auto id = txn->GetID(); + std::vector<int> queue_parents( + static_cast<size_t>(txn->GetDeadlockDetectDepth())); + std::vector<TransactionID> queue_values( + static_cast<size_t>(txn->GetDeadlockDetectDepth())); + std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); + assert(!wait_txn_map_.Contains(id)); + + wait_txn_map_.Insert(id, {wait_ids, cf_id, exclusive, key}); + + for (auto wait_id : wait_ids) { + if (rev_wait_txn_map_.Contains(wait_id)) { + rev_wait_txn_map_.Get(wait_id)++; + } else { + rev_wait_txn_map_.Insert(wait_id, 1); + } + } + + // No deadlock if nobody is waiting on self. + if (!rev_wait_txn_map_.Contains(id)) { + return false; + } + + const auto* next_ids = &wait_ids; + int parent = -1; + int64_t deadlock_time = 0; + for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) { + int i = 0; + if (next_ids) { + for (; i < static_cast<int>(next_ids->size()) && + tail + i < txn->GetDeadlockDetectDepth(); + i++) { + queue_values[tail + i] = (*next_ids)[i]; + queue_parents[tail + i] = parent; + } + tail += i; + } + + // No more items in the list, meaning no deadlock. + if (tail == head) { + return false; + } + + auto next = queue_values[head]; + if (next == id) { + std::vector<DeadlockInfo> path; + while (head != -1) { + assert(wait_txn_map_.Contains(queue_values[head])); + + auto extracted_info = wait_txn_map_.Get(queue_values[head]); + path.push_back({queue_values[head], extracted_info.m_cf_id, + extracted_info.m_exclusive, + extracted_info.m_waiting_key}); + head = queue_parents[head]; + } + if (!env->GetCurrentTime(&deadlock_time).ok()) { + /* + TODO(AR) this preserves the current behaviour whilst checking the + status of env->GetCurrentTime to ensure that ASSERT_STATUS_CHECKED + passes. Should we instead raise an error if !ok() ? + */ + deadlock_time = 0; + } + std::reverse(path.begin(), path.end()); + dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time)); + deadlock_time = 0; + DecrementWaitersImpl(txn, wait_ids); + return true; + } else if (!wait_txn_map_.Contains(next)) { + next_ids = nullptr; + continue; + } else { + parent = head; + next_ids = &(wait_txn_map_.Get(next).m_neighbors); + } + } + + // Wait cycle too big, just assume deadlock. + if (!env->GetCurrentTime(&deadlock_time).ok()) { + /* + TODO(AR) this preserves the current behaviour whilst checking the status + of env->GetCurrentTime to ensure that ASSERT_STATUS_CHECKED passes. + Should we instead raise an error if !ok() ? + */ + deadlock_time = 0; + } + dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true)); + DecrementWaitersImpl(txn, wait_ids); + return true; +} + +// Try to lock this key after we have acquired the mutex. +// Sets *expire_time to the expiration time in microseconds +// or 0 if no expiration. +// REQUIRED: Stripe mutex must be held. +Status PointLockManager::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, + const std::string& key, Env* env, + const LockInfo& txn_lock_info, + uint64_t* expire_time, + autovector<TransactionID>* txn_ids) { + assert(txn_lock_info.txn_ids.size() == 1); + + Status result; + // Check if this key is already locked + auto stripe_iter = stripe->keys.find(key); + if (stripe_iter != stripe->keys.end()) { + // Lock already held + LockInfo& lock_info = stripe_iter->second; + assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive); + + if (lock_info.exclusive || txn_lock_info.exclusive) { + if (lock_info.txn_ids.size() == 1 && + lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) { + // The list contains one txn and we're it, so just take it. + lock_info.exclusive = txn_lock_info.exclusive; + lock_info.expiration_time = txn_lock_info.expiration_time; + } else { + // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case + // it's there for a shared lock with multiple holders which was not + // caught in the first case. + if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env, + expire_time)) { + // lock is expired, can steal it + lock_info.txn_ids = txn_lock_info.txn_ids; + lock_info.exclusive = txn_lock_info.exclusive; + lock_info.expiration_time = txn_lock_info.expiration_time; + // lock_cnt does not change + } else { + result = Status::TimedOut(Status::SubCode::kLockTimeout); + *txn_ids = lock_info.txn_ids; + } + } + } else { + // We are requesting shared access to a shared lock, so just grant it. + lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]); + // Using std::max means that expiration time never goes down even when + // a transaction is removed from the list. The correct solution would be + // to track expiry for every transaction, but this would also work for + // now. + lock_info.expiration_time = + std::max(lock_info.expiration_time, txn_lock_info.expiration_time); + } + } else { // Lock not held. + // Check lock limit + if (max_num_locks_ > 0 && + lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) { + result = Status::Busy(Status::SubCode::kLockLimit); + } else { + // acquire lock + stripe->keys.emplace(key, txn_lock_info); + + // Maintain lock count if there is a limit on the number of locks + if (max_num_locks_) { + lock_map->lock_cnt++; + } + } + } + + return result; +} + +void PointLockManager::UnLockKey(PessimisticTransaction* txn, + const std::string& key, LockMapStripe* stripe, + LockMap* lock_map, Env* env) { +#ifdef NDEBUG + (void)env; +#endif + TransactionID txn_id = txn->GetID(); + + auto stripe_iter = stripe->keys.find(key); + if (stripe_iter != stripe->keys.end()) { + auto& txns = stripe_iter->second.txn_ids; + auto txn_it = std::find(txns.begin(), txns.end(), txn_id); + // Found the key we locked. unlock it. + if (txn_it != txns.end()) { + if (txns.size() == 1) { + stripe->keys.erase(stripe_iter); + } else { + auto last_it = txns.end() - 1; + if (txn_it != last_it) { + *txn_it = *last_it; + } + txns.pop_back(); + } + + if (max_num_locks_ > 0) { + // Maintain lock count if there is a limit on the number of locks. + assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); + lock_map->lock_cnt--; + } + } + } else { + // This key is either not locked or locked by someone else. This should + // only happen if the unlocking transaction has expired. + assert(txn->GetExpirationTime() > 0 && + txn->GetExpirationTime() < env->NowMicros()); + } +} + +void PointLockManager::UnLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env* env) { + std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); + LockMap* lock_map = lock_map_ptr.get(); + if (lock_map == nullptr) { + // Column Family must have been dropped. + return; + } + + // Lock the mutex for the stripe that this key hashes to + size_t stripe_num = lock_map->GetStripe(key); + assert(lock_map->lock_map_stripes_.size() > stripe_num); + LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); + + stripe->stripe_mutex->Lock().PermitUncheckedError(); + UnLockKey(txn, key, stripe, lock_map, env); + stripe->stripe_mutex->UnLock(); + + // Signal waiting threads to retry locking + stripe->stripe_cv->NotifyAll(); +} + +void PointLockManager::UnLock(PessimisticTransaction* txn, + const LockTracker& tracker, Env* env) { + std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it( + tracker.GetColumnFamilyIterator()); + assert(cf_it != nullptr); + while (cf_it->HasNext()) { + ColumnFamilyId cf = cf_it->Next(); + std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(cf); + LockMap* lock_map = lock_map_ptr.get(); + if (!lock_map) { + // Column Family must have been dropped. + return; + } + + // Bucket keys by lock_map_ stripe + UnorderedMap<size_t, std::vector<const std::string*>> keys_by_stripe( + lock_map->num_stripes_); + std::unique_ptr<LockTracker::KeyIterator> key_it( + tracker.GetKeyIterator(cf)); + assert(key_it != nullptr); + while (key_it->HasNext()) { + const std::string& key = key_it->Next(); + size_t stripe_num = lock_map->GetStripe(key); + keys_by_stripe[stripe_num].push_back(&key); + } + + // For each stripe, grab the stripe mutex and unlock all keys in this stripe + for (auto& stripe_iter : keys_by_stripe) { + size_t stripe_num = stripe_iter.first; + auto& stripe_keys = stripe_iter.second; + + assert(lock_map->lock_map_stripes_.size() > stripe_num); + LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); + + stripe->stripe_mutex->Lock().PermitUncheckedError(); + + for (const std::string* key : stripe_keys) { + UnLockKey(txn, *key, stripe, lock_map, env); + } + + stripe->stripe_mutex->UnLock(); + + // Signal waiting threads to retry locking + stripe->stripe_cv->NotifyAll(); + } + } +} + +PointLockManager::PointLockStatus PointLockManager::GetPointLockStatus() { + PointLockStatus data; + // Lock order here is important. The correct order is lock_map_mutex_, then + // for every column family ID in ascending order lock every stripe in + // ascending order. + InstrumentedMutexLock l(&lock_map_mutex_); + + std::vector<uint32_t> cf_ids; + for (const auto& map : lock_maps_) { + cf_ids.push_back(map.first); + } + std::sort(cf_ids.begin(), cf_ids.end()); + + for (auto i : cf_ids) { + const auto& stripes = lock_maps_[i]->lock_map_stripes_; + // Iterate and lock all stripes in ascending order. + for (const auto& j : stripes) { + j->stripe_mutex->Lock().PermitUncheckedError(); + for (const auto& it : j->keys) { + struct KeyLockInfo info; + info.exclusive = it.second.exclusive; + info.key = it.first; + for (const auto& id : it.second.txn_ids) { + info.ids.push_back(id); + } + data.insert({i, info}); + } + } + } + + // Unlock everything. Unlocking order is not important. + for (auto i : cf_ids) { + const auto& stripes = lock_maps_[i]->lock_map_stripes_; + for (const auto& j : stripes) { + j->stripe_mutex->UnLock(); + } + } + + return data; +} + +std::vector<DeadlockPath> PointLockManager::GetDeadlockInfoBuffer() { + return dlock_buffer_.PrepareBuffer(); +} + +void PointLockManager::Resize(uint32_t target_size) { + dlock_buffer_.Resize(target_size); +} + +PointLockManager::RangeLockStatus PointLockManager::GetRangeLockStatus() { + return {}; +} + +Status PointLockManager::TryLock(PessimisticTransaction* /* txn */, + ColumnFamilyId /* cf_id */, + const Endpoint& /* start */, + const Endpoint& /* end */, Env* /* env */, + bool /* exclusive */) { + return Status::NotSupported( + "PointLockManager does not support range locking"); +} + +void PointLockManager::UnLock(PessimisticTransaction* /* txn */, + ColumnFamilyId /* cf_id */, + const Endpoint& /* start */, + const Endpoint& /* end */, Env* /* env */) { + // no-op +} + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_manager.h b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager.h new file mode 100644 index 000000000..eeb34f3be --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager.h @@ -0,0 +1,224 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include <memory> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "monitoring/instrumented_mutex.h" +#include "rocksdb/utilities/transaction.h" +#include "util/autovector.h" +#include "util/hash_containers.h" +#include "util/hash_map.h" +#include "util/thread_local.h" +#include "utilities/transactions/lock/lock_manager.h" +#include "utilities/transactions/lock/point/point_lock_tracker.h" + +namespace ROCKSDB_NAMESPACE { + +class ColumnFamilyHandle; +struct LockInfo; +struct LockMap; +struct LockMapStripe; + +template <class Path> +class DeadlockInfoBufferTempl { + private: + std::vector<Path> paths_buffer_; + uint32_t buffer_idx_; + std::mutex paths_buffer_mutex_; + + std::vector<Path> Normalize() { + auto working = paths_buffer_; + + if (working.empty()) { + return working; + } + + // Next write occurs at a nonexistent path's slot + if (paths_buffer_[buffer_idx_].empty()) { + working.resize(buffer_idx_); + } else { + std::rotate(working.begin(), working.begin() + buffer_idx_, + working.end()); + } + + return working; + } + + public: + explicit DeadlockInfoBufferTempl(uint32_t n_latest_dlocks) + : paths_buffer_(n_latest_dlocks), buffer_idx_(0) {} + + void AddNewPath(Path path) { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + if (paths_buffer_.empty()) { + return; + } + + paths_buffer_[buffer_idx_] = std::move(path); + buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); + } + + void Resize(uint32_t target_size) { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + paths_buffer_ = Normalize(); + + // Drop the deadlocks that will no longer be needed ater the normalize + if (target_size < paths_buffer_.size()) { + paths_buffer_.erase( + paths_buffer_.begin(), + paths_buffer_.begin() + (paths_buffer_.size() - target_size)); + buffer_idx_ = 0; + } + // Resize the buffer to the target size and restore the buffer's idx + else { + auto prev_size = paths_buffer_.size(); + paths_buffer_.resize(target_size); + buffer_idx_ = (uint32_t)prev_size; + } + } + + std::vector<Path> PrepareBuffer() { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + // Reversing the normalized vector returns the latest deadlocks first + auto working = Normalize(); + std::reverse(working.begin(), working.end()); + + return working; + } +}; + +using DeadlockInfoBuffer = DeadlockInfoBufferTempl<DeadlockPath>; + +struct TrackedTrxInfo { + autovector<TransactionID> m_neighbors; + uint32_t m_cf_id; + bool m_exclusive; + std::string m_waiting_key; +}; + +class PointLockManager : public LockManager { + public: + PointLockManager(PessimisticTransactionDB* db, + const TransactionDBOptions& opt); + // No copying allowed + PointLockManager(const PointLockManager&) = delete; + PointLockManager& operator=(const PointLockManager&) = delete; + + ~PointLockManager() override {} + + bool IsPointLockSupported() const override { return true; } + + bool IsRangeLockSupported() const override { return false; } + + const LockTrackerFactory& GetLockTrackerFactory() const override { + return PointLockTrackerFactory::Get(); + } + + // Creates a new LockMap for this column family. Caller should guarantee + // that this column family does not already exist. + void AddColumnFamily(const ColumnFamilyHandle* cf) override; + // Deletes the LockMap for this column family. Caller should guarantee that + // this column family is no longer in use. + void RemoveColumnFamily(const ColumnFamilyHandle* cf) override; + + Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const std::string& key, Env* env, bool exclusive) override; + Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const Endpoint& start, const Endpoint& end, Env* env, + bool exclusive) override; + + void UnLock(PessimisticTransaction* txn, const LockTracker& tracker, + Env* env) override; + void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const std::string& key, Env* env) override; + void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const Endpoint& start, const Endpoint& end, Env* env) override; + + PointLockStatus GetPointLockStatus() override; + + RangeLockStatus GetRangeLockStatus() override; + + std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; + + void Resize(uint32_t new_size) override; + + private: + PessimisticTransactionDB* txn_db_impl_; + + // Default number of lock map stripes per column family + const size_t default_num_stripes_; + + // Limit on number of keys locked per column family + const int64_t max_num_locks_; + + // The following lock order must be satisfied in order to avoid deadlocking + // ourselves. + // - lock_map_mutex_ + // - stripe mutexes in ascending cf id, ascending stripe order + // - wait_txn_map_mutex_ + // + // Must be held when accessing/modifying lock_maps_. + InstrumentedMutex lock_map_mutex_; + + // Map of ColumnFamilyId to locked key info + using LockMaps = UnorderedMap<uint32_t, std::shared_ptr<LockMap>>; + LockMaps lock_maps_; + + // Thread-local cache of entries in lock_maps_. This is an optimization + // to avoid acquiring a mutex in order to look up a LockMap + std::unique_ptr<ThreadLocalPtr> lock_maps_cache_; + + // Must be held when modifying wait_txn_map_ and rev_wait_txn_map_. + std::mutex wait_txn_map_mutex_; + + // Maps from waitee -> number of waiters. + HashMap<TransactionID, int> rev_wait_txn_map_; + // Maps from waiter -> waitee. + HashMap<TransactionID, TrackedTrxInfo> wait_txn_map_; + DeadlockInfoBuffer dlock_buffer_; + + // Used to allocate mutexes/condvars to use when locking keys + std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; + + bool IsLockExpired(TransactionID txn_id, const LockInfo& lock_info, Env* env, + uint64_t* wait_time); + + std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id); + + Status AcquireWithTimeout(PessimisticTransaction* txn, LockMap* lock_map, + LockMapStripe* stripe, uint32_t column_family_id, + const std::string& key, Env* env, int64_t timeout, + const LockInfo& lock_info); + + Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, + const std::string& key, Env* env, + const LockInfo& lock_info, uint64_t* wait_time, + autovector<TransactionID>* txn_ids); + + void UnLockKey(PessimisticTransaction* txn, const std::string& key, + LockMapStripe* stripe, LockMap* lock_map, Env* env); + + bool IncrementWaiters(const PessimisticTransaction* txn, + const autovector<TransactionID>& wait_ids, + const std::string& key, const uint32_t& cf_id, + const bool& exclusive, Env* const env); + void DecrementWaiters(const PessimisticTransaction* txn, + const autovector<TransactionID>& wait_ids); + void DecrementWaitersImpl(const PessimisticTransaction* txn, + const autovector<TransactionID>& wait_ids); +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.cc b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.cc new file mode 100644 index 000000000..525fdea71 --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.cc @@ -0,0 +1,181 @@ +// Copyright (c) 2020-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/lock/point/point_lock_manager_test.h" + +namespace ROCKSDB_NAMESPACE { + +// This test is not applicable for Range Lock manager as Range Lock Manager +// operates on Column Families, not their ids. +TEST_F(PointLockManagerTest, LockNonExistingColumnFamily) { + MockColumnFamilyHandle cf(1024); + locker_->RemoveColumnFamily(&cf); + auto txn = NewTxn(); + auto s = locker_->TryLock(txn, 1024, "k", env_, true); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STREQ(s.getState(), "Column family id not found: 1024"); + delete txn; +} + +TEST_F(PointLockManagerTest, LockStatus) { + MockColumnFamilyHandle cf1(1024), cf2(2048); + locker_->AddColumnFamily(&cf1); + locker_->AddColumnFamily(&cf2); + + auto txn1 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1024, "k1", env_, true)); + ASSERT_OK(locker_->TryLock(txn1, 2048, "k1", env_, true)); + + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn2, 1024, "k2", env_, false)); + ASSERT_OK(locker_->TryLock(txn2, 2048, "k2", env_, false)); + + auto s = locker_->GetPointLockStatus(); + ASSERT_EQ(s.size(), 4u); + for (uint32_t cf_id : {1024, 2048}) { + ASSERT_EQ(s.count(cf_id), 2u); + auto range = s.equal_range(cf_id); + for (auto it = range.first; it != range.second; it++) { + ASSERT_TRUE(it->second.key == "k1" || it->second.key == "k2"); + if (it->second.key == "k1") { + ASSERT_EQ(it->second.exclusive, true); + ASSERT_EQ(it->second.ids.size(), 1u); + ASSERT_EQ(it->second.ids[0], txn1->GetID()); + } else if (it->second.key == "k2") { + ASSERT_EQ(it->second.exclusive, false); + ASSERT_EQ(it->second.ids.size(), 1u); + ASSERT_EQ(it->second.ids[0], txn2->GetID()); + } + } + } + + // Cleanup + locker_->UnLock(txn1, 1024, "k1", env_); + locker_->UnLock(txn1, 2048, "k1", env_); + locker_->UnLock(txn2, 1024, "k2", env_); + locker_->UnLock(txn2, 2048, "k2", env_); + + delete txn1; + delete txn2; +} + +TEST_F(PointLockManagerTest, UnlockExclusive) { + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + + auto txn1 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, true)); + locker_->UnLock(txn1, 1, "k", env_); + + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn2, 1, "k", env_); + + delete txn1; + delete txn2; +} + +TEST_F(PointLockManagerTest, UnlockShared) { + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + + auto txn1 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + locker_->UnLock(txn1, 1, "k", env_); + + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn2, 1, "k", env_); + + delete txn1; + delete txn2; +} + +// This test doesn't work with Range Lock Manager, because Range Lock Manager +// doesn't support deadlock_detect_depth. + +TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { + // Tests that when detecting deadlock, if the detection depth is exceeded, + // it's also viewed as deadlock. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + TransactionOptions txn_opt; + txn_opt.deadlock_detect = true; + txn_opt.deadlock_detect_depth = 1; + txn_opt.lock_timeout = 1000000; + auto txn1 = NewTxn(txn_opt); + auto txn2 = NewTxn(txn_opt); + auto txn3 = NewTxn(txn_opt); + auto txn4 = NewTxn(txn_opt); + // "a ->(k) b" means transaction a is waiting for transaction b to release + // the held lock on key k. + // txn4 ->(k3) -> txn3 ->(k2) txn2 ->(k1) txn1 + // txn3's deadlock detection will exceed the detection depth 1, + // which will be viewed as a deadlock. + // NOTE: + // txn4 ->(k3) -> txn3 must be set up before + // txn3 ->(k2) -> txn2, because to trigger deadlock detection for txn3, + // it must have another txn waiting on it, which is txn4 in this case. + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + + port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); + // block because txn1 is holding a lock on k1. + locker_->TryLock(txn2, 1, "k1", env_, true); + }); + + ASSERT_OK(locker_->TryLock(txn3, 1, "k3", env_, true)); + + port::Thread t2 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + // block because txn3 is holding a lock on k1. + locker_->TryLock(txn4, 1, "k3", env_, true); + }); + + auto s = locker_->TryLock(txn3, 1, "k2", env_, true); + ASSERT_TRUE(s.IsBusy()); + ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock); + + std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer(); + ASSERT_EQ(deadlock_paths.size(), 1u); + ASSERT_TRUE(deadlock_paths[0].limit_exceeded); + + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn3, 1, "k3", env_); + t1.join(); + t2.join(); + + delete txn4; + delete txn3; + delete txn2; + delete txn1; +} + +INSTANTIATE_TEST_CASE_P(PointLockManager, AnyLockManagerTest, + ::testing::Values(nullptr)); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "SKIPPED because Transactions are not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h new file mode 100644 index 000000000..ca9f46bf9 --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.h @@ -0,0 +1,324 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "file/file_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "utilities/transactions/lock/point/point_lock_manager.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +class MockColumnFamilyHandle : public ColumnFamilyHandle { + public: + explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {} + + ~MockColumnFamilyHandle() override {} + + const std::string& GetName() const override { return name_; } + + ColumnFamilyId GetID() const override { return cf_id_; } + + Status GetDescriptor(ColumnFamilyDescriptor*) override { + return Status::OK(); + } + + const Comparator* GetComparator() const override { + return BytewiseComparator(); + } + + private: + ColumnFamilyId cf_id_; + std::string name_ = "MockCF"; +}; + +class PointLockManagerTest : public testing::Test { + public: + void SetUp() override { + env_ = Env::Default(); + db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); + ASSERT_OK(env_->CreateDir(db_dir_)); + + Options opt; + opt.create_if_missing = true; + TransactionDBOptions txn_opt; + txn_opt.transaction_lock_timeout = 0; + + ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_)); + + // CAUTION: This test creates a separate lock manager object (right, NOT + // the one that the TransactionDB is using!), and runs tests on it. + locker_.reset(new PointLockManager( + static_cast<PessimisticTransactionDB*>(db_), txn_opt)); + + wait_sync_point_name_ = "PointLockManager::AcquireWithTimeout:WaitingTxn"; + } + + void TearDown() override { + delete db_; + EXPECT_OK(DestroyDir(env_, db_dir_)); + } + + PessimisticTransaction* NewTxn( + TransactionOptions txn_opt = TransactionOptions()) { + Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt); + return reinterpret_cast<PessimisticTransaction*>(txn); + } + + protected: + Env* env_; + std::shared_ptr<LockManager> locker_; + const char* wait_sync_point_name_; + friend void PointLockManagerTestExternalSetup(PointLockManagerTest*); + + private: + std::string db_dir_; + TransactionDB* db_; +}; + +using init_func_t = void (*)(PointLockManagerTest*); + +class AnyLockManagerTest : public PointLockManagerTest, + public testing::WithParamInterface<init_func_t> { + public: + void SetUp() override { + // If a custom setup function was provided, use it. Otherwise, use what we + // have inherited. + auto init_func = GetParam(); + if (init_func) + (*init_func)(this); + else + PointLockManagerTest::SetUp(); + } +}; + +TEST_P(AnyLockManagerTest, ReentrantExclusiveLock) { + // Tests that a txn can acquire exclusive lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, ReentrantSharedLock) { + // Tests that a txn can acquire shared lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, LockUpgrade) { + // Tests that a txn can upgrade from a shared lock to an exclusive lock. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockDowngrade) { + // Tests that a txn can acquire a shared lock after acquiring an exclusive + // lock on the same key. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockConflict) { + // Tests that lock conflicts lead to lock timeout. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + + { + // exclusive-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // exclusive-shared conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, false); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // shared-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + + delete txn1; + delete txn2; +} + +port::Thread BlockUntilWaitingTxn(const char* sync_point_name, + std::function<void()> f) { + std::atomic<bool> reached(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + sync_point_name, [&](void* /*arg*/) { reached.store(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread t(f); + + while (!reached.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + return t; +} + +TEST_P(AnyLockManagerTest, SharedLocks) { + // Tests that shared locks can be concurrently held by multiple transactions. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn1, 1, "k", env_); + locker_->UnLock(txn2, 1, "k", env_); + + delete txn1; + delete txn2; +} + +TEST_P(AnyLockManagerTest, Deadlock) { + // Tests that deadlock can be detected. + // Deadlock scenario: + // txn1 exclusively locks k1, and wants to lock k2; + // txn2 exclusively locks k2, and wants to lock k1. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + TransactionOptions txn_opt; + txn_opt.deadlock_detect = true; + txn_opt.lock_timeout = 1000000; + auto txn1 = NewTxn(txn_opt); + auto txn2 = NewTxn(txn_opt); + + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); + + // txn1 tries to lock k2, will block forever. + port::Thread t = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + // block because txn2 is holding a lock on k2. + locker_->TryLock(txn1, 1, "k2", env_, true); + }); + + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsBusy()); + ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock); + + std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer(); + ASSERT_EQ(deadlock_paths.size(), 1u); + ASSERT_FALSE(deadlock_paths[0].limit_exceeded); + + std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path; + ASSERT_EQ(deadlocks.size(), 2u); + + ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID()); + ASSERT_EQ(deadlocks[0].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[0].m_exclusive); + ASSERT_EQ(deadlocks[0].m_waiting_key, "k2"); + + ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID()); + ASSERT_EQ(deadlocks[1].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[1].m_exclusive); + ASSERT_EQ(deadlocks[1].m_waiting_key, "k1"); + + locker_->UnLock(txn2, 1, "k2", env_); + t.join(); + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + delete txn2; + delete txn1; +} + +TEST_P(AnyLockManagerTest, GetWaitingTxns_MultipleTxns) { + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + + auto txn1 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); + + auto txn3 = NewTxn(); + txn3->SetLockTimeout(10000); + port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + ASSERT_OK(locker_->TryLock(txn3, 1, "k", env_, true)); + locker_->UnLock(txn3, 1, "k", env_); + }); + + // Ok, now txn3 is waiting for lock on "k", which is owned by two + // transactions. Check that GetWaitingTxns reports this correctly + uint32_t wait_cf_id; + std::string wait_key; + auto waiters = txn3->GetWaitingTxns(&wait_cf_id, &wait_key); + + ASSERT_EQ(wait_cf_id, 1u); + ASSERT_EQ(wait_key, "k"); + ASSERT_EQ(waiters.size(), 2); + bool waits_correct = + (waiters[0] == txn1->GetID() && waiters[1] == txn2->GetID()) || + (waiters[1] == txn1->GetID() && waiters[0] == txn2->GetID()); + ASSERT_EQ(waits_correct, true); + + // Release locks so txn3 can proceed with execution + locker_->UnLock(txn1, 1, "k", env_); + locker_->UnLock(txn2, 1, "k", env_); + + // Wait until txn3 finishes + t1.join(); + + delete txn1; + delete txn2; + delete txn3; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc b/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc new file mode 100644 index 000000000..6204a8f02 --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc @@ -0,0 +1,257 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/lock/point/point_lock_tracker.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +class TrackedKeysColumnFamilyIterator + : public LockTracker::ColumnFamilyIterator { + public: + explicit TrackedKeysColumnFamilyIterator(const TrackedKeys& keys) + : tracked_keys_(keys), it_(keys.begin()) {} + + bool HasNext() const override { return it_ != tracked_keys_.end(); } + + ColumnFamilyId Next() override { return (it_++)->first; } + + private: + const TrackedKeys& tracked_keys_; + TrackedKeys::const_iterator it_; +}; + +class TrackedKeysIterator : public LockTracker::KeyIterator { + public: + TrackedKeysIterator(const TrackedKeys& keys, ColumnFamilyId id) + : key_infos_(keys.at(id)), it_(key_infos_.begin()) {} + + bool HasNext() const override { return it_ != key_infos_.end(); } + + const std::string& Next() override { return (it_++)->first; } + + private: + const TrackedKeyInfos& key_infos_; + TrackedKeyInfos::const_iterator it_; +}; + +} // namespace + +void PointLockTracker::Track(const PointLockRequest& r) { + auto& keys = tracked_keys_[r.column_family_id]; + auto result = keys.try_emplace(r.key, r.seq); + auto it = result.first; + if (!result.second && r.seq < it->second.seq) { + // Now tracking this key with an earlier sequence number + it->second.seq = r.seq; + } + // else we do not update the seq. The smaller the tracked seq, the stronger it + // the guarantee since it implies from the seq onward there has not been a + // concurrent update to the key. So we update the seq if it implies stronger + // guarantees, i.e., if it is smaller than the existing tracked seq. + + if (r.read_only) { + it->second.num_reads++; + } else { + it->second.num_writes++; + } + + it->second.exclusive = it->second.exclusive || r.exclusive; +} + +UntrackStatus PointLockTracker::Untrack(const PointLockRequest& r) { + auto cf_keys = tracked_keys_.find(r.column_family_id); + if (cf_keys == tracked_keys_.end()) { + return UntrackStatus::NOT_TRACKED; + } + + auto& keys = cf_keys->second; + auto it = keys.find(r.key); + if (it == keys.end()) { + return UntrackStatus::NOT_TRACKED; + } + + bool untracked = false; + auto& info = it->second; + if (r.read_only) { + if (info.num_reads > 0) { + info.num_reads--; + untracked = true; + } + } else { + if (info.num_writes > 0) { + info.num_writes--; + untracked = true; + } + } + + bool removed = false; + if (info.num_reads == 0 && info.num_writes == 0) { + keys.erase(it); + if (keys.empty()) { + tracked_keys_.erase(cf_keys); + } + removed = true; + } + + if (removed) { + return UntrackStatus::REMOVED; + } + if (untracked) { + return UntrackStatus::UNTRACKED; + } + return UntrackStatus::NOT_TRACKED; +} + +void PointLockTracker::Merge(const LockTracker& tracker) { + const PointLockTracker& t = static_cast<const PointLockTracker&>(tracker); + for (const auto& cf_keys : t.tracked_keys_) { + ColumnFamilyId cf = cf_keys.first; + const auto& keys = cf_keys.second; + + auto current_cf_keys = tracked_keys_.find(cf); + if (current_cf_keys == tracked_keys_.end()) { + tracked_keys_.emplace(cf_keys); + } else { + auto& current_keys = current_cf_keys->second; + for (const auto& key_info : keys) { + const std::string& key = key_info.first; + const TrackedKeyInfo& info = key_info.second; + // If key was not previously tracked, just copy the whole struct over. + // Otherwise, some merging needs to occur. + auto current_info = current_keys.find(key); + if (current_info == current_keys.end()) { + current_keys.emplace(key_info); + } else { + current_info->second.Merge(info); + } + } + } + } +} + +void PointLockTracker::Subtract(const LockTracker& tracker) { + const PointLockTracker& t = static_cast<const PointLockTracker&>(tracker); + for (const auto& cf_keys : t.tracked_keys_) { + ColumnFamilyId cf = cf_keys.first; + const auto& keys = cf_keys.second; + + auto& current_keys = tracked_keys_.at(cf); + for (const auto& key_info : keys) { + const std::string& key = key_info.first; + const TrackedKeyInfo& info = key_info.second; + uint32_t num_reads = info.num_reads; + uint32_t num_writes = info.num_writes; + + auto current_key_info = current_keys.find(key); + assert(current_key_info != current_keys.end()); + + // Decrement the total reads/writes of this key by the number of + // reads/writes done since the last SavePoint. + if (num_reads > 0) { + assert(current_key_info->second.num_reads >= num_reads); + current_key_info->second.num_reads -= num_reads; + } + if (num_writes > 0) { + assert(current_key_info->second.num_writes >= num_writes); + current_key_info->second.num_writes -= num_writes; + } + if (current_key_info->second.num_reads == 0 && + current_key_info->second.num_writes == 0) { + current_keys.erase(current_key_info); + } + } + } +} + +LockTracker* PointLockTracker::GetTrackedLocksSinceSavePoint( + const LockTracker& save_point_tracker) const { + // Examine the number of reads/writes performed on all keys written + // since the last SavePoint and compare to the total number of reads/writes + // for each key. + LockTracker* t = new PointLockTracker(); + const PointLockTracker& save_point_t = + static_cast<const PointLockTracker&>(save_point_tracker); + for (const auto& cf_keys : save_point_t.tracked_keys_) { + ColumnFamilyId cf = cf_keys.first; + const auto& keys = cf_keys.second; + + auto& current_keys = tracked_keys_.at(cf); + for (const auto& key_info : keys) { + const std::string& key = key_info.first; + const TrackedKeyInfo& info = key_info.second; + uint32_t num_reads = info.num_reads; + uint32_t num_writes = info.num_writes; + + auto current_key_info = current_keys.find(key); + assert(current_key_info != current_keys.end()); + assert(current_key_info->second.num_reads >= num_reads); + assert(current_key_info->second.num_writes >= num_writes); + + if (current_key_info->second.num_reads == num_reads && + current_key_info->second.num_writes == num_writes) { + // All the reads/writes to this key were done in the last savepoint. + PointLockRequest r; + r.column_family_id = cf; + r.key = key; + r.seq = info.seq; + r.read_only = (num_writes == 0); + r.exclusive = info.exclusive; + t->Track(r); + } + } + } + return t; +} + +PointLockStatus PointLockTracker::GetPointLockStatus( + ColumnFamilyId column_family_id, const std::string& key) const { + assert(IsPointLockSupported()); + PointLockStatus status; + auto it = tracked_keys_.find(column_family_id); + if (it == tracked_keys_.end()) { + return status; + } + + const auto& keys = it->second; + auto key_it = keys.find(key); + if (key_it == keys.end()) { + return status; + } + + const TrackedKeyInfo& key_info = key_it->second; + status.locked = true; + status.exclusive = key_info.exclusive; + status.seq = key_info.seq; + return status; +} + +uint64_t PointLockTracker::GetNumPointLocks() const { + uint64_t num_keys = 0; + for (const auto& cf_keys : tracked_keys_) { + num_keys += cf_keys.second.size(); + } + return num_keys; +} + +LockTracker::ColumnFamilyIterator* PointLockTracker::GetColumnFamilyIterator() + const { + return new TrackedKeysColumnFamilyIterator(tracked_keys_); +} + +LockTracker::KeyIterator* PointLockTracker::GetKeyIterator( + ColumnFamilyId column_family_id) const { + assert(tracked_keys_.find(column_family_id) != tracked_keys_.end()); + return new TrackedKeysIterator(tracked_keys_, column_family_id); +} + +void PointLockTracker::Clear() { tracked_keys_.clear(); } + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.h b/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.h new file mode 100644 index 000000000..daf6f9aa2 --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.h @@ -0,0 +1,99 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include <memory> +#include <string> +#include <unordered_map> + +#include "utilities/transactions/lock/lock_tracker.h" + +namespace ROCKSDB_NAMESPACE { + +struct TrackedKeyInfo { + // Earliest sequence number that is relevant to this transaction for this key + SequenceNumber seq; + + uint32_t num_writes; + uint32_t num_reads; + + bool exclusive; + + explicit TrackedKeyInfo(SequenceNumber seq_no) + : seq(seq_no), num_writes(0), num_reads(0), exclusive(false) {} + + void Merge(const TrackedKeyInfo& info) { + assert(seq <= info.seq); + num_reads += info.num_reads; + num_writes += info.num_writes; + exclusive = exclusive || info.exclusive; + } +}; + +using TrackedKeyInfos = std::unordered_map<std::string, TrackedKeyInfo>; + +using TrackedKeys = std::unordered_map<ColumnFamilyId, TrackedKeyInfos>; + +// Tracks point locks on single keys. +class PointLockTracker : public LockTracker { + public: + PointLockTracker() = default; + + PointLockTracker(const PointLockTracker&) = delete; + PointLockTracker& operator=(const PointLockTracker&) = delete; + + bool IsPointLockSupported() const override { return true; } + + bool IsRangeLockSupported() const override { return false; } + + void Track(const PointLockRequest& lock_request) override; + + UntrackStatus Untrack(const PointLockRequest& lock_request) override; + + void Track(const RangeLockRequest& /*lock_request*/) override {} + + UntrackStatus Untrack(const RangeLockRequest& /*lock_request*/) override { + return UntrackStatus::NOT_TRACKED; + } + + void Merge(const LockTracker& tracker) override; + + void Subtract(const LockTracker& tracker) override; + + void Clear() override; + + virtual LockTracker* GetTrackedLocksSinceSavePoint( + const LockTracker& save_point_tracker) const override; + + PointLockStatus GetPointLockStatus(ColumnFamilyId column_family_id, + const std::string& key) const override; + + uint64_t GetNumPointLocks() const override; + + ColumnFamilyIterator* GetColumnFamilyIterator() const override; + + KeyIterator* GetKeyIterator(ColumnFamilyId column_family_id) const override; + + private: + TrackedKeys tracked_keys_; +}; + +class PointLockTrackerFactory : public LockTrackerFactory { + public: + static const PointLockTrackerFactory& Get() { + static const PointLockTrackerFactory instance; + return instance; + } + + LockTracker* Create() const override { return new PointLockTracker(); } + + private: + PointLockTrackerFactory() {} +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE |