// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #ifndef ROCKSDB_LITE #include #include #include #include #include #include "monitoring/instrumented_mutex.h" #include "rocksdb/utilities/transaction.h" #include "util/autovector.h" #include "util/hash_containers.h" #include "util/hash_map.h" #include "util/thread_local.h" #include "utilities/transactions/lock/lock_manager.h" #include "utilities/transactions/lock/point/point_lock_tracker.h" namespace ROCKSDB_NAMESPACE { class ColumnFamilyHandle; struct LockInfo; struct LockMap; struct LockMapStripe; template class DeadlockInfoBufferTempl { private: std::vector paths_buffer_; uint32_t buffer_idx_; std::mutex paths_buffer_mutex_; std::vector Normalize() { auto working = paths_buffer_; if (working.empty()) { return working; } // Next write occurs at a nonexistent path's slot if (paths_buffer_[buffer_idx_].empty()) { working.resize(buffer_idx_); } else { std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); } return working; } public: explicit DeadlockInfoBufferTempl(uint32_t n_latest_dlocks) : paths_buffer_(n_latest_dlocks), buffer_idx_(0) {} void AddNewPath(Path path) { std::lock_guard lock(paths_buffer_mutex_); if (paths_buffer_.empty()) { return; } paths_buffer_[buffer_idx_] = std::move(path); buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); } void Resize(uint32_t target_size) { std::lock_guard lock(paths_buffer_mutex_); paths_buffer_ = Normalize(); // Drop the deadlocks that will no longer be needed ater the normalize if (target_size < paths_buffer_.size()) { paths_buffer_.erase( paths_buffer_.begin(), paths_buffer_.begin() + (paths_buffer_.size() - target_size)); buffer_idx_ = 0; } // Resize the buffer to the target size and restore the buffer's idx else { auto prev_size = paths_buffer_.size(); paths_buffer_.resize(target_size); buffer_idx_ = (uint32_t)prev_size; } } std::vector PrepareBuffer() { std::lock_guard lock(paths_buffer_mutex_); // Reversing the normalized vector returns the latest deadlocks first auto working = Normalize(); std::reverse(working.begin(), working.end()); return working; } }; using DeadlockInfoBuffer = DeadlockInfoBufferTempl; struct TrackedTrxInfo { autovector m_neighbors; uint32_t m_cf_id; bool m_exclusive; std::string m_waiting_key; }; class PointLockManager : public LockManager { public: PointLockManager(PessimisticTransactionDB* db, const TransactionDBOptions& opt); // No copying allowed PointLockManager(const PointLockManager&) = delete; PointLockManager& operator=(const PointLockManager&) = delete; ~PointLockManager() override {} bool IsPointLockSupported() const override { return true; } bool IsRangeLockSupported() const override { return false; } const LockTrackerFactory& GetLockTrackerFactory() const override { return PointLockTrackerFactory::Get(); } // Creates a new LockMap for this column family. Caller should guarantee // that this column family does not already exist. void AddColumnFamily(const ColumnFamilyHandle* cf) override; // Deletes the LockMap for this column family. Caller should guarantee that // this column family is no longer in use. void RemoveColumnFamily(const ColumnFamilyHandle* cf) override; Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, const std::string& key, Env* env, bool exclusive) override; Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, const Endpoint& start, const Endpoint& end, Env* env, bool exclusive) override; void UnLock(PessimisticTransaction* txn, const LockTracker& tracker, Env* env) override; void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, const std::string& key, Env* env) override; void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, const Endpoint& start, const Endpoint& end, Env* env) override; PointLockStatus GetPointLockStatus() override; RangeLockStatus GetRangeLockStatus() override; std::vector GetDeadlockInfoBuffer() override; void Resize(uint32_t new_size) override; private: PessimisticTransactionDB* txn_db_impl_; // Default number of lock map stripes per column family const size_t default_num_stripes_; // Limit on number of keys locked per column family const int64_t max_num_locks_; // The following lock order must be satisfied in order to avoid deadlocking // ourselves. // - lock_map_mutex_ // - stripe mutexes in ascending cf id, ascending stripe order // - wait_txn_map_mutex_ // // Must be held when accessing/modifying lock_maps_. InstrumentedMutex lock_map_mutex_; // Map of ColumnFamilyId to locked key info using LockMaps = UnorderedMap>; LockMaps lock_maps_; // Thread-local cache of entries in lock_maps_. This is an optimization // to avoid acquiring a mutex in order to look up a LockMap std::unique_ptr lock_maps_cache_; // Must be held when modifying wait_txn_map_ and rev_wait_txn_map_. std::mutex wait_txn_map_mutex_; // Maps from waitee -> number of waiters. HashMap rev_wait_txn_map_; // Maps from waiter -> waitee. HashMap wait_txn_map_; DeadlockInfoBuffer dlock_buffer_; // Used to allocate mutexes/condvars to use when locking keys std::shared_ptr mutex_factory_; bool IsLockExpired(TransactionID txn_id, const LockInfo& lock_info, Env* env, uint64_t* wait_time); std::shared_ptr GetLockMap(uint32_t column_family_id); Status AcquireWithTimeout(PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info); Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, const std::string& key, Env* env, const LockInfo& lock_info, uint64_t* wait_time, autovector* txn_ids); void UnLockKey(PessimisticTransaction* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env); bool IncrementWaiters(const PessimisticTransaction* txn, const autovector& wait_ids, const std::string& key, const uint32_t& cf_id, const bool& exclusive, Env* const env); void DecrementWaiters(const PessimisticTransaction* txn, const autovector& wait_ids); void DecrementWaitersImpl(const PessimisticTransaction* txn, const autovector& wait_ids); }; } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE