diff options
Diffstat (limited to 'src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc')
-rw-r--r-- | src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc b/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc new file mode 100644 index 000000000..6204a8f02 --- /dev/null +++ b/src/rocksdb/utilities/transactions/lock/point/point_lock_tracker.cc @@ -0,0 +1,257 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/lock/point/point_lock_tracker.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +class TrackedKeysColumnFamilyIterator + : public LockTracker::ColumnFamilyIterator { + public: + explicit TrackedKeysColumnFamilyIterator(const TrackedKeys& keys) + : tracked_keys_(keys), it_(keys.begin()) {} + + bool HasNext() const override { return it_ != tracked_keys_.end(); } + + ColumnFamilyId Next() override { return (it_++)->first; } + + private: + const TrackedKeys& tracked_keys_; + TrackedKeys::const_iterator it_; +}; + +class TrackedKeysIterator : public LockTracker::KeyIterator { + public: + TrackedKeysIterator(const TrackedKeys& keys, ColumnFamilyId id) + : key_infos_(keys.at(id)), it_(key_infos_.begin()) {} + + bool HasNext() const override { return it_ != key_infos_.end(); } + + const std::string& Next() override { return (it_++)->first; } + + private: + const TrackedKeyInfos& key_infos_; + TrackedKeyInfos::const_iterator it_; +}; + +} // namespace + +void PointLockTracker::Track(const PointLockRequest& r) { + auto& keys = tracked_keys_[r.column_family_id]; + auto result = keys.try_emplace(r.key, r.seq); + auto it = result.first; + if (!result.second && r.seq < it->second.seq) { + // Now tracking this key with an earlier sequence number + it->second.seq = r.seq; + } + // else we do not update the seq. The smaller the tracked seq, the stronger it + // the guarantee since it implies from the seq onward there has not been a + // concurrent update to the key. So we update the seq if it implies stronger + // guarantees, i.e., if it is smaller than the existing tracked seq. + + if (r.read_only) { + it->second.num_reads++; + } else { + it->second.num_writes++; + } + + it->second.exclusive = it->second.exclusive || r.exclusive; +} + +UntrackStatus PointLockTracker::Untrack(const PointLockRequest& r) { + auto cf_keys = tracked_keys_.find(r.column_family_id); + if (cf_keys == tracked_keys_.end()) { + return UntrackStatus::NOT_TRACKED; + } + + auto& keys = cf_keys->second; + auto it = keys.find(r.key); + if (it == keys.end()) { + return UntrackStatus::NOT_TRACKED; + } + + bool untracked = false; + auto& info = it->second; + if (r.read_only) { + if (info.num_reads > 0) { + info.num_reads--; + untracked = true; + } + } else { + if (info.num_writes > 0) { + info.num_writes--; + untracked = true; + } + } + + bool removed = false; + if (info.num_reads == 0 && info.num_writes == 0) { + keys.erase(it); + if (keys.empty()) { + tracked_keys_.erase(cf_keys); + } + removed = true; + } + + if (removed) { + return UntrackStatus::REMOVED; + } + if (untracked) { + return UntrackStatus::UNTRACKED; + } + return UntrackStatus::NOT_TRACKED; +} + +void PointLockTracker::Merge(const LockTracker& tracker) { + const PointLockTracker& t = static_cast<const PointLockTracker&>(tracker); + for (const auto& cf_keys : t.tracked_keys_) { + ColumnFamilyId cf = cf_keys.first; + const auto& keys = cf_keys.second; + + auto current_cf_keys = tracked_keys_.find(cf); + if (current_cf_keys == tracked_keys_.end()) { + tracked_keys_.emplace(cf_keys); + } else { + auto& current_keys = current_cf_keys->second; + for (const auto& key_info : keys) { + const std::string& key = key_info.first; + const TrackedKeyInfo& info = key_info.second; + // If key was not previously tracked, just copy the whole struct over. + // Otherwise, some merging needs to occur. + auto current_info = current_keys.find(key); + if (current_info == current_keys.end()) { + current_keys.emplace(key_info); + } else { + current_info->second.Merge(info); + } + } + } + } +} + +void PointLockTracker::Subtract(const LockTracker& tracker) { + const PointLockTracker& t = static_cast<const PointLockTracker&>(tracker); + for (const auto& cf_keys : t.tracked_keys_) { + ColumnFamilyId cf = cf_keys.first; + const auto& keys = cf_keys.second; + + auto& current_keys = tracked_keys_.at(cf); + for (const auto& key_info : keys) { + const std::string& key = key_info.first; + const TrackedKeyInfo& info = key_info.second; + uint32_t num_reads = info.num_reads; + uint32_t num_writes = info.num_writes; + + auto current_key_info = current_keys.find(key); + assert(current_key_info != current_keys.end()); + + // Decrement the total reads/writes of this key by the number of + // reads/writes done since the last SavePoint. + if (num_reads > 0) { + assert(current_key_info->second.num_reads >= num_reads); + current_key_info->second.num_reads -= num_reads; + } + if (num_writes > 0) { + assert(current_key_info->second.num_writes >= num_writes); + current_key_info->second.num_writes -= num_writes; + } + if (current_key_info->second.num_reads == 0 && + current_key_info->second.num_writes == 0) { + current_keys.erase(current_key_info); + } + } + } +} + +LockTracker* PointLockTracker::GetTrackedLocksSinceSavePoint( + const LockTracker& save_point_tracker) const { + // Examine the number of reads/writes performed on all keys written + // since the last SavePoint and compare to the total number of reads/writes + // for each key. + LockTracker* t = new PointLockTracker(); + const PointLockTracker& save_point_t = + static_cast<const PointLockTracker&>(save_point_tracker); + for (const auto& cf_keys : save_point_t.tracked_keys_) { + ColumnFamilyId cf = cf_keys.first; + const auto& keys = cf_keys.second; + + auto& current_keys = tracked_keys_.at(cf); + for (const auto& key_info : keys) { + const std::string& key = key_info.first; + const TrackedKeyInfo& info = key_info.second; + uint32_t num_reads = info.num_reads; + uint32_t num_writes = info.num_writes; + + auto current_key_info = current_keys.find(key); + assert(current_key_info != current_keys.end()); + assert(current_key_info->second.num_reads >= num_reads); + assert(current_key_info->second.num_writes >= num_writes); + + if (current_key_info->second.num_reads == num_reads && + current_key_info->second.num_writes == num_writes) { + // All the reads/writes to this key were done in the last savepoint. + PointLockRequest r; + r.column_family_id = cf; + r.key = key; + r.seq = info.seq; + r.read_only = (num_writes == 0); + r.exclusive = info.exclusive; + t->Track(r); + } + } + } + return t; +} + +PointLockStatus PointLockTracker::GetPointLockStatus( + ColumnFamilyId column_family_id, const std::string& key) const { + assert(IsPointLockSupported()); + PointLockStatus status; + auto it = tracked_keys_.find(column_family_id); + if (it == tracked_keys_.end()) { + return status; + } + + const auto& keys = it->second; + auto key_it = keys.find(key); + if (key_it == keys.end()) { + return status; + } + + const TrackedKeyInfo& key_info = key_it->second; + status.locked = true; + status.exclusive = key_info.exclusive; + status.seq = key_info.seq; + return status; +} + +uint64_t PointLockTracker::GetNumPointLocks() const { + uint64_t num_keys = 0; + for (const auto& cf_keys : tracked_keys_) { + num_keys += cf_keys.second.size(); + } + return num_keys; +} + +LockTracker::ColumnFamilyIterator* PointLockTracker::GetColumnFamilyIterator() + const { + return new TrackedKeysColumnFamilyIterator(tracked_keys_); +} + +LockTracker::KeyIterator* PointLockTracker::GetKeyIterator( + ColumnFamilyId column_family_id) const { + assert(tracked_keys_.find(column_family_id) != tracked_keys_.end()); + return new TrackedKeysIterator(tracked_keys_, column_family_id); +} + +void PointLockTracker::Clear() { tracked_keys_.clear(); } + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE |