From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../range/range_tree/range_tree_lock_manager.cc | 503 +++++++++++++++++++++ 1 file changed, 503 insertions(+) create mode 100644 src/rocksdb/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc (limited to 'src/rocksdb/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc') diff --git a/src/rocksdb/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/src/rocksdb/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc new file mode 100644 index 000000000..531165dea --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -0,0 +1,503 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h" + +#include +#include +#include + +#include "monitoring/perf_context_imp.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/transaction_db_mutex.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/hash.h" +#include "util/thread_local.h" +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +RangeLockManagerHandle* NewRangeLockManager( + std::shared_ptr mutex_factory) { + std::shared_ptr use_factory; + + if (mutex_factory) { + use_factory = mutex_factory; + } else { + use_factory.reset(new TransactionDBMutexFactoryImpl()); + } + return new RangeTreeLockManager(use_factory); +} + +static const char SUFFIX_INFIMUM = 0x0; +static const char SUFFIX_SUPREMUM = 0x1; + +// Convert Endpoint into an internal format used for storing it in locktree +// (DBT structure is used for passing endpoints to locktree and getting back) +void serialize_endpoint(const Endpoint& endp, std::string* buf) { + buf->push_back(endp.inf_suffix ? SUFFIX_SUPREMUM : SUFFIX_INFIMUM); + buf->append(endp.slice.data(), endp.slice.size()); +} + +// Decode the endpoint from the format it is stored in the locktree (DBT) to +// the one used outside: either Endpoint or EndpointWithString +template +void deserialize_endpoint(const DBT* dbt, EndpointStruct* endp) { + assert(dbt->size >= 1); + const char* dbt_data = (const char*)dbt->data; + char suffix = dbt_data[0]; + assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM); + endp->inf_suffix = (suffix == SUFFIX_SUPREMUM); + endp->slice = decltype(EndpointStruct::slice)(dbt_data + 1, dbt->size - 1); +} + +// Get a range lock on [start_key; end_key] range +Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, + uint32_t column_family_id, + const Endpoint& start_endp, + const Endpoint& end_endp, Env*, + bool exclusive) { + toku::lock_request request; + request.create(mutex_factory_); + DBT start_key_dbt, end_key_dbt; + + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:enter"); + std::string start_key; + std::string end_key; + serialize_endpoint(start_endp, &start_key); + serialize_endpoint(end_endp, &end_key); + + toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); + toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size()); + + auto lt = GetLockTreeForCF(column_family_id); + + // Put the key waited on into request's m_extra. See + // wait_callback_for_locktree for details. + std::string wait_key(start_endp.slice.data(), start_endp.slice.size()); + + request.set(lt.get(), (TXNID)txn, &start_key_dbt, &end_key_dbt, + exclusive ? toku::lock_request::WRITE : toku::lock_request::READ, + false /* not a big txn */, &wait_key); + + // This is for "periodically wake up and check if the wait is killed" feature + // which we are not using. + uint64_t killed_time_msec = 0; + uint64_t wait_time_msec = txn->GetLockTimeout(); + + if (wait_time_msec == static_cast(-1)) { + // The transaction has no wait timeout. lock_request::wait doesn't support + // this, it needs a number of milliseconds to wait. Pass it one year to + // be safe. + wait_time_msec = uint64_t(1000) * 60 * 60 * 24 * 365; + } else { + // convert microseconds to milliseconds + wait_time_msec = (wait_time_msec + 500) / 1000; + } + + std::vector di_path; + request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive, + const DBT* start_dbt, const DBT* end_dbt) { + EndpointWithString start; + EndpointWithString end; + deserialize_endpoint(start_dbt, &start); + deserialize_endpoint(end_dbt, &end); + + di_path.push_back({txnid, column_family_id, is_exclusive, std::move(start), + std::move(end)}); + }; + + request.start(); + + const int r = request.wait(wait_time_msec, killed_time_msec, + nullptr, // killed_callback + wait_callback_for_locktree, nullptr); + + // Inform the txn that we are no longer waiting: + txn->ClearWaitingTxn(); + + request.destroy(); + switch (r) { + case 0: + break; // fall through + case DB_LOCK_NOTGRANTED: + return Status::TimedOut(Status::SubCode::kLockTimeout); + case TOKUDB_OUT_OF_LOCKS: + return Status::Busy(Status::SubCode::kLockLimit); + case DB_LOCK_DEADLOCK: { + std::reverse(di_path.begin(), di_path.end()); + dlock_buffer_.AddNewPath( + RangeDeadlockPath(di_path, request.get_start_time())); + return Status::Busy(Status::SubCode::kDeadlock); + } + default: + assert(0); + return Status::Busy(Status::SubCode::kLockLimit); + } + + return Status::OK(); +} + +// Wait callback that locktree library will call to inform us about +// the lock waits that are in progress. +void wait_callback_for_locktree(void*, toku::lock_wait_infos* infos) { + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:EnterWaitingTxn"); + for (auto wait_info : *infos) { + // As long as we hold the lock on the locktree's pending request queue + // this should be safe. + auto txn = (PessimisticTransaction*)wait_info.waiter; + auto cf_id = (ColumnFamilyId)wait_info.ltree->get_dict_id().dictid; + + autovector waitee_ids; + for (auto waitee : wait_info.waitees) { + waitee_ids.push_back(waitee); + } + txn->SetWaitingTxn(waitee_ids, cf_id, (std::string*)wait_info.m_extra); + } + + // Here we can assume that the locktree code will now wait for some lock + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:WaitingTxn"); +} + +void RangeTreeLockManager::UnLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env*) { + auto locktree = GetLockTreeForCF(column_family_id); + std::string endp_image; + serialize_endpoint({key.data(), key.size(), false}, &endp_image); + + DBT key_dbt; + toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size()); + + toku::range_buffer range_buf; + range_buf.create(); + range_buf.append(&key_dbt, &key_dbt); + + locktree->release_locks((TXNID)txn, &range_buf); + range_buf.destroy(); + + toku::lock_request::retry_all_lock_requests( + locktree.get(), wait_callback_for_locktree, nullptr); +} + +void RangeTreeLockManager::UnLock(PessimisticTransaction* txn, + const LockTracker& tracker, Env*) { + const RangeTreeLockTracker* range_tracker = + static_cast(&tracker); + + RangeTreeLockTracker* range_trx_tracker = + static_cast(&txn->GetTrackedLocks()); + bool all_keys = (range_trx_tracker == range_tracker); + + // tracked_locks_->range_list may hold nullptr if the transaction has never + // acquired any locks. + ((RangeTreeLockTracker*)range_tracker)->ReleaseLocks(this, txn, all_keys); +} + +int RangeTreeLockManager::CompareDbtEndpoints(void* arg, const DBT* a_key, + const DBT* b_key) { + const char* a = (const char*)a_key->data; + const char* b = (const char*)b_key->data; + + size_t a_len = a_key->size; + size_t b_len = b_key->size; + + size_t min_len = std::min(a_len, b_len); + + // Compare the values. The first byte encodes the endpoint type, its value + // is either SUFFIX_INFIMUM or SUFFIX_SUPREMUM. + Comparator* cmp = (Comparator*)arg; + int res = cmp->Compare(Slice(a + 1, min_len - 1), Slice(b + 1, min_len - 1)); + if (!res) { + if (b_len > min_len) { + // a is shorter; + if (a[0] == SUFFIX_INFIMUM) { + return -1; //"a is smaller" + } else { + // a is considered padded with 0xFF:FF:FF:FF... + return 1; // "a" is bigger + } + } else if (a_len > min_len) { + // the opposite of the above: b is shorter. + if (b[0] == SUFFIX_INFIMUM) { + return 1; //"b is smaller" + } else { + // b is considered padded with 0xFF:FF:FF:FF... + return -1; // "b" is bigger + } + } else { + // the lengths are equal (and the key values, too) + if (a[0] < b[0]) { + return -1; + } else if (a[0] > b[0]) { + return 1; + } else { + return 0; + } + } + } else { + return res; + } +} + +namespace { +void UnrefLockTreeMapsCache(void* ptr) { + // Called when a thread exits or a ThreadLocalPtr gets destroyed. + auto lock_tree_map_cache = static_cast< + std::unordered_map>*>( + ptr); + delete lock_tree_map_cache; +} +} // anonymous namespace + +RangeTreeLockManager::RangeTreeLockManager( + std::shared_ptr mutex_factory) + : mutex_factory_(mutex_factory), + ltree_lookup_cache_(new ThreadLocalPtr(&UnrefLockTreeMapsCache)), + dlock_buffer_(10) { + ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_); +} + +int RangeTreeLockManager::on_create(toku::locktree* lt, void* arg) { + // arg is a pointer to RangeTreeLockManager + lt->set_escalation_barrier_func(&OnEscalationBarrierCheck, arg); + return 0; +} + +bool RangeTreeLockManager::OnEscalationBarrierCheck(const DBT* a, const DBT* b, + void* extra) { + Endpoint a_endp, b_endp; + deserialize_endpoint(a, &a_endp); + deserialize_endpoint(b, &b_endp); + auto self = static_cast(extra); + return self->barrier_func_(a_endp, b_endp); +} + +void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize( + uint32_t target_size) { + dlock_buffer_.Resize(target_size); +} + +void RangeTreeLockManager::Resize(uint32_t target_size) { + SetRangeDeadlockInfoBufferSize(target_size); +} + +std::vector +RangeTreeLockManager::GetRangeDeadlockInfoBuffer() { + return dlock_buffer_.PrepareBuffer(); +} + +std::vector RangeTreeLockManager::GetDeadlockInfoBuffer() { + std::vector res; + std::vector data = GetRangeDeadlockInfoBuffer(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + std::vector path; + + for (auto it2 = it->path.begin(); it2 != it->path.end(); ++it2) { + path.push_back( + {it2->m_txn_id, it2->m_cf_id, it2->m_exclusive, it2->m_start.slice}); + } + res.push_back(DeadlockPath(path, it->deadlock_time)); + } + return res; +} + +// @brief Lock Escalation Callback function +// +// @param txnid Transaction whose locks got escalated +// @param lt Lock Tree where escalation is happening +// @param buffer Escalation result: list of locks that this transaction now +// owns in this lock tree. +// @param void* Callback context +void RangeTreeLockManager::on_escalate(TXNID txnid, const toku::locktree* lt, + const toku::range_buffer& buffer, + void*) { + auto txn = (PessimisticTransaction*)txnid; + ((RangeTreeLockTracker*)&txn->GetTrackedLocks())->ReplaceLocks(lt, buffer); +} + +RangeTreeLockManager::~RangeTreeLockManager() { + autovector local_caches; + ltree_lookup_cache_->Scrape(&local_caches, nullptr); + for (auto cache : local_caches) { + delete static_cast(cache); + } + ltree_map_.clear(); // this will call release_lt() for all locktrees + ltm_.destroy(); +} + +RangeLockManagerHandle::Counters RangeTreeLockManager::GetStatus() { + LTM_STATUS_S ltm_status_test; + ltm_.get_status(<m_status_test); + Counters res; + + // Searching status variable by its string name is how Toku's unit tests + // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?) + // lookup keyname in status + for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS; i++) { + TOKU_ENGINE_STATUS_ROW status = <m_status_test.status[i]; + if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) { + res.escalation_count = status->value.num; + continue; + } + if (strcmp(status->keyname, "LTM_WAIT_COUNT") == 0) { + res.lock_wait_count = status->value.num; + continue; + } + if (strcmp(status->keyname, "LTM_SIZE_CURRENT") == 0) { + res.current_lock_memory = status->value.num; + } + } + return res; +} + +std::shared_ptr RangeTreeLockManager::MakeLockTreePtr( + toku::locktree* lt) { + toku::locktree_manager* ltm = <m_; + return std::shared_ptr( + lt, [ltm](toku::locktree* p) { ltm->release_lt(p); }); +} + +void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) { + uint32_t column_family_id = cfh->GetID(); + + InstrumentedMutexLock l(<ree_map_mutex_); + if (ltree_map_.find(column_family_id) == ltree_map_.end()) { + DICTIONARY_ID dict_id = {.dictid = column_family_id}; + toku::comparator cmp; + cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator()); + toku::locktree* ltree = + ltm_.get_lt(dict_id, cmp, + /* on_create_extra*/ static_cast(this)); + // This is ok to because get_lt has copied the comparator: + cmp.destroy(); + + ltree_map_.insert({column_family_id, MakeLockTreePtr(ltree)}); + } +} + +void RangeTreeLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cfh) { + uint32_t column_family_id = cfh->GetID(); + // Remove lock_map for this column family. Since the lock map is stored + // as a shared ptr, concurrent transactions can still keep using it + // until they release their references to it. + + // TODO what if one drops a column family while transaction(s) still have + // locks in it? + // locktree uses column family'c Comparator* as the criteria to do tree + // ordering. If the comparator is gone, we won't even be able to remove the + // elements from the locktree. + // A possible solution might be to remove everything right now: + // - wait until everyone traversing the locktree are gone + // - remove everything from the locktree. + // - some transactions may have acquired locks in their LockTracker objects. + // Arrange something so we don't blow up when they try to release them. + // - ... + // This use case (drop column family while somebody is using it) doesn't seem + // the priority, though. + + { + InstrumentedMutexLock l(<ree_map_mutex_); + + auto lock_maps_iter = ltree_map_.find(column_family_id); + assert(lock_maps_iter != ltree_map_.end()); + ltree_map_.erase(lock_maps_iter); + } // lock_map_mutex_ + + autovector local_caches; + ltree_lookup_cache_->Scrape(&local_caches, nullptr); + for (auto cache : local_caches) { + delete static_cast(cache); + } +} + +std::shared_ptr RangeTreeLockManager::GetLockTreeForCF( + ColumnFamilyId column_family_id) { + // First check thread-local cache + if (ltree_lookup_cache_->Get() == nullptr) { + ltree_lookup_cache_->Reset(new LockTreeMap()); + } + + auto ltree_map_cache = static_cast(ltree_lookup_cache_->Get()); + + auto it = ltree_map_cache->find(column_family_id); + if (it != ltree_map_cache->end()) { + // Found lock map for this column family. + return it->second; + } + + // Not found in local cache, grab mutex and check shared LockMaps + InstrumentedMutexLock l(<ree_map_mutex_); + + it = ltree_map_.find(column_family_id); + if (it == ltree_map_.end()) { + return nullptr; + } else { + // Found lock map. Store in thread-local cache and return. + ltree_map_cache->insert({column_family_id, it->second}); + return it->second; + } +} + +struct LOCK_PRINT_CONTEXT { + RangeLockManagerHandle::RangeLockStatus* data; // Save locks here + uint32_t cfh_id; // Column Family whose tree we are traversing +}; + +// Report left endpoints of the acquired locks +LockManager::PointLockStatus RangeTreeLockManager::GetPointLockStatus() { + PointLockStatus res; + LockManager::RangeLockStatus data = GetRangeLockStatus(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + auto& val = it->second; + res.insert({it->first, {val.start.slice, val.ids, val.exclusive}}); + } + return res; +} + +static void push_into_lock_status_data(void* param, const DBT* left, + const DBT* right, TXNID txnid_arg, + bool is_shared, TxnidVector* owners) { + struct LOCK_PRINT_CONTEXT* ctx = (LOCK_PRINT_CONTEXT*)param; + struct RangeLockInfo info; + + info.exclusive = !is_shared; + + deserialize_endpoint(left, &info.start); + deserialize_endpoint(right, &info.end); + + if (txnid_arg != TXNID_SHARED) { + info.ids.push_back(txnid_arg); + } else { + for (auto it : *owners) { + info.ids.push_back(it); + } + } + ctx->data->insert({ctx->cfh_id, info}); +} + +LockManager::RangeLockStatus RangeTreeLockManager::GetRangeLockStatus() { + LockManager::RangeLockStatus data; + { + InstrumentedMutexLock l(<ree_map_mutex_); + for (auto it : ltree_map_) { + LOCK_PRINT_CONTEXT ctx = {&data, it.first}; + it.second->dump_locks((void*)&ctx, push_into_lock_status_data); + } + } + return data; +} + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_WIN +#endif // ROCKSDB_LITE -- cgit v1.2.3