From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../utilities/transactions/write_prepared_txn_db.h | 1125 ++++++++++++++++++++ 1 file changed, 1125 insertions(+) create mode 100644 src/rocksdb/utilities/transactions/write_prepared_txn_db.h (limited to 'src/rocksdb/utilities/transactions/write_prepared_txn_db.h') diff --git a/src/rocksdb/utilities/transactions/write_prepared_txn_db.h b/src/rocksdb/utilities/transactions/write_prepared_txn_db.h new file mode 100644 index 000000000..25a382473 --- /dev/null +++ b/src/rocksdb/utilities/transactions/write_prepared_txn_db.h @@ -0,0 +1,1125 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include +#include + +#include "db/db_iter.h" +#include "db/pre_release_callback.h" +#include "db/read_callback.h" +#include "db/snapshot_checker.h" +#include "logging/logging.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" +#include "util/set_comparator.h" +#include "util/string_util.h" +#include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/write_prepared_txn.h" + +namespace ROCKSDB_NAMESPACE { +enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot }; + +// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. +// In this way some data in the DB might not be committed. The DB provides +// mechanisms to tell such data apart from committed data. +class WritePreparedTxnDB : public PessimisticTransactionDB { + public: + explicit WritePreparedTxnDB(DB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options), + SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), + SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), + COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), + COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), + FORMAT(COMMIT_CACHE_BITS) { + Init(txn_db_options); + } + + explicit WritePreparedTxnDB(StackableDB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options), + SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), + SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), + COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), + COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), + FORMAT(COMMIT_CACHE_BITS) { + Init(txn_db_options); + } + + virtual ~WritePreparedTxnDB(); + + virtual Status Initialize( + const std::vector& compaction_enabled_cf_indices, + const std::vector& handles) override; + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override; + + using TransactionDB::Write; + Status Write(const WriteOptions& opts, WriteBatch* updates) override; + + // Optimized version of ::Write that receives more optimization request such + // as skip_concurrency_control. + using PessimisticTransactionDB::Write; + Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&, + WriteBatch* updates) override; + + // Write the batch to the underlying DB and mark it as committed. Could be + // used by both directly from TxnDB or through a transaction. + Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch, + size_t batch_cnt, WritePreparedTxn* txn); + + using DB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override; + + using DB::MultiGet; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override; + + using DB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + + using DB::NewIterators; + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) override; + + // Check whether the transaction that wrote the value with sequence number seq + // is visible to the snapshot with sequence number snapshot_seq. + // Returns true if commit_seq <= snapshot_seq + // If the snapshot_seq is already released and snapshot_seq <= max, sets + // *snap_released to true and returns true as well. + inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, + uint64_t min_uncommitted = kMinUnCommittedSeq, + bool* snap_released = nullptr) const { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " min_uncommitted %" PRIu64, + prep_seq, snapshot_seq, min_uncommitted); + assert(min_uncommitted >= kMinUnCommittedSeq); + // Caller is responsible to initialize snap_released. + assert(snap_released == nullptr || *snap_released == false); + // Here we try to infer the return value without looking into prepare list. + // This would help avoiding synchronization over a shared map. + // TODO(myabandeh): optimize this. This sequence of checks must be correct + // but not necessary efficient + if (prep_seq == 0) { + // Compaction will output keys to bottom-level with sequence number 0 if + // it is visible to the earliest snapshot. + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); + return true; + } + if (snapshot_seq < prep_seq) { + // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } + if (prep_seq < min_uncommitted) { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32 + " because of min_uncommitted %" PRIu64, + prep_seq, snapshot_seq, 1, min_uncommitted); + return true; + } + // Commit of delayed prepared has two non-atomic steps: add to commit cache, + // remove from delayed prepared. Our reads from these two is also + // non-atomic. By looking into commit cache first thus we might not find the + // prep_seq neither in commit cache not in delayed_prepared_. To fix that i) + // we check if there was any delayed prepared BEFORE looking into commit + // cache, ii) if there was, we complete the search steps to be these: i) + // commit cache, ii) delayed prepared, commit cache again. In this way if + // the first query to commit cache missed the commit, the 2nd will catch it. + bool was_empty; + SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub; + CommitEntry64b dont_care; + auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; + size_t repeats = 0; + do { + repeats++; + assert(repeats < 100); + if (UNLIKELY(repeats >= 100)) { + throw std::runtime_error( + "The read was intrupted 100 times by update to max_evicted_seq_. " + "This is unexpected in all setups"); + } + max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire); + TEST_SYNC_POINT( + "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause"); + TEST_SYNC_POINT( + "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"); + was_empty = delayed_prepared_empty_.load(std::memory_order_acquire); + TEST_SYNC_POINT( + "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause"); + TEST_SYNC_POINT( + "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"); + CommitEntry cached; + bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause"); + TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"); + if (exist && prep_seq == cached.prep_seq) { + // It is committed and also not evicted from commit cache + ROCKS_LOG_DETAILS( + info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); + return cached.commit_seq <= snapshot_seq; + } + // else it could be committed but not inserted in the map which could + // happen after recovery, or it could be committed and evicted by another + // commit, or never committed. + + // At this point we don't know if it was committed or it is still prepared + max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); + if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) { + continue; + } + // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub + if (max_evicted_seq_ub < prep_seq) { + // Not evicted from cache and also not present, so must be still + // prepared + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } + TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause"); + TEST_SYNC_POINT( + "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"); + if (!was_empty) { + // We should not normally reach here + WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); + ReadLock rl(&prepared_mutex_); + ROCKS_LOG_WARN( + info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64, + static_cast(delayed_prepared_.size()), prep_seq); + if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { + // This is the order: 1) delayed_prepared_commits_ update, 2) publish + // 3) delayed_prepared_ clean up. So check if it is the case of a late + // clenaup. + auto it = delayed_prepared_commits_.find(prep_seq); + if (it == delayed_prepared_commits_.end()) { + // Then it is not committed yet + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } else { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " commit: %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, it->second, + snapshot_seq <= it->second); + return it->second <= snapshot_seq; + } + } else { + // 2nd query to commit cache. Refer to was_empty comment above. + exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + if (exist && prep_seq == cached.prep_seq) { + ROCKS_LOG_DETAILS( + info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); + return cached.commit_seq <= snapshot_seq; + } + max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); + } + } + } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)); + // When advancing max_evicted_seq_, we move older entires from prepared to + // delayed_prepared_. Also we move evicted entries from commit cache to + // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= + // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in + // old_commit_map_, iii) committed with no conflict with any snapshot. Case + // (i) delayed_prepared_ is checked above + if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case + // only (iii) is the case: committed + // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < + // snapshot_seq + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); + return true; + } + // else (ii) might be the case: check the commit data saved for this + // snapshot. If there was no overlapping commit entry, then it is committed + // with a commit_seq lower than any live snapshot, including snapshot_seq. + if (old_commit_map_empty_.load(std::memory_order_acquire)) { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32 " released=1", + prep_seq, snapshot_seq, 0); + assert(snap_released); + // This snapshot is not valid anymore. We cannot tell if prep_seq is + // committed before or after the snapshot. Return true but also set + // snap_released to true. + *snap_released = true; + return true; + } + { + // We should not normally reach here unless sapshot_seq is old. This is a + // rare case and it is ok to pay the cost of mutex ReadLock for such old, + // reading transactions. + WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); + ReadLock rl(&old_commit_map_mutex_); + auto prep_set_entry = old_commit_map_.find(snapshot_seq); + bool found = prep_set_entry != old_commit_map_.end(); + if (found) { + auto& vec = prep_set_entry->second; + found = std::binary_search(vec.begin(), vec.end(), prep_seq); + } else { + // coming from compaction + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32 " released=1", + prep_seq, snapshot_seq, 0); + // This snapshot is not valid anymore. We cannot tell if prep_seq is + // committed before or after the snapshot. Return true but also set + // snap_released to true. + assert(snap_released); + *snap_released = true; + return true; + } + + if (!found) { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32, + prep_seq, snapshot_seq, 1); + return true; + } + } + // (ii) it the case: it is committed but after the snapshot_seq + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } + + // Add the transaction with prepare sequence seq to the prepared list. + // Note: must be called serially with increasing seq on each call. + // locked is true if prepared_mutex_ is already locked. + void AddPrepared(uint64_t seq, bool locked = false); + // Check if any of the prepared txns are less than new max_evicted_seq_. Must + // be called with prepared_mutex_ write locked. + void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked); + // Remove the transaction with prepare sequence seq from the prepared list + void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); + // Add the transaction with prepare sequence prepare_seq and commit sequence + // commit_seq to the commit map. loop_cnt is to detect infinite loops. + // Note: must be called serially. + void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, + uint8_t loop_cnt = 0); + + struct CommitEntry { + uint64_t prep_seq; + uint64_t commit_seq; + CommitEntry() : prep_seq(0), commit_seq(0) {} + CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} + bool operator==(const CommitEntry& rhs) const { + return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; + } + }; + + struct CommitEntry64bFormat { + explicit CommitEntry64bFormat(size_t index_bits) + : INDEX_BITS(index_bits), + PREP_BITS(static_cast(64 - PAD_BITS - INDEX_BITS)), + COMMIT_BITS(static_cast(64 - PREP_BITS)), + COMMIT_FILTER(static_cast((1ull << COMMIT_BITS) - 1)), + DELTA_UPPERBOUND(static_cast((1ull << COMMIT_BITS))) {} + // Number of higher bits of a sequence number that is not used. They are + // used to encode the value type, ... + const size_t PAD_BITS = static_cast(8); + // Number of lower bits from prepare seq that can be skipped as they are + // implied by the index of the entry in the array + const size_t INDEX_BITS; + // Number of bits we use to encode the prepare seq + const size_t PREP_BITS; + // Number of bits we use to encode the commit seq. + const size_t COMMIT_BITS; + // Filter to encode/decode commit seq + const uint64_t COMMIT_FILTER; + // The value of commit_seq - prepare_seq + 1 must be less than this bound + const uint64_t DELTA_UPPERBOUND; + }; + + // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... + // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... + // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA + // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and + // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the + // bits that do not have to be encoded (will be provided externally) DELTA: + // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of + // index bits + PADs + struct CommitEntry64b { + constexpr CommitEntry64b() noexcept : rep_(0) {} + + CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) + : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} + + CommitEntry64b(const uint64_t ps, const uint64_t cs, + const CommitEntry64bFormat& format) { + assert(ps < static_cast( + (1ull << (format.PREP_BITS + format.INDEX_BITS)))); + assert(ps <= cs); + uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 + // zero is reserved for uninitialized entries + assert(0 < delta); + assert(delta < format.DELTA_UPPERBOUND); + if (delta >= format.DELTA_UPPERBOUND) { + throw std::runtime_error( + "commit_seq >> prepare_seq. The allowed distance is " + + std::to_string(format.DELTA_UPPERBOUND) + " commit_seq is " + + std::to_string(cs) + " prepare_seq is " + std::to_string(ps)); + } + rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; + rep_ = rep_ | delta; + } + + // Return false if the entry is empty + bool Parse(const uint64_t indexed_seq, CommitEntry* entry, + const CommitEntry64bFormat& format) { + uint64_t delta = rep_ & format.COMMIT_FILTER; + // zero is reserved for uninitialized entries + assert(delta < static_cast((1ull << format.COMMIT_BITS))); + if (delta == 0) { + return false; // initialized entry would have non-zero delta + } + + assert(indexed_seq < static_cast((1ull << format.INDEX_BITS))); + uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; + prep_up >>= format.PAD_BITS; + const uint64_t& prep_low = indexed_seq; + entry->prep_seq = prep_up | prep_low; + + entry->commit_seq = entry->prep_seq + delta - 1; + return true; + } + + private: + uint64_t rep_; + }; + + // Struct to hold ownership of snapshot and read callback for cleanup. + struct IteratorState; + + std::shared_ptr> GetCFComparatorMap() { + return cf_map_; + } + std::shared_ptr> GetCFHandleMap() { + return handle_map_; + } + void UpdateCFComparatorMap( + const std::vector& handles) override; + void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; + + virtual const Snapshot* GetSnapshot() override; + SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check); + + protected: + virtual Status VerifyCFOptions( + const ColumnFamilyOptions& cf_options) override; + // Assign the min and max sequence numbers for reading from the db. A seq > + // max is not valid, and a seq < min is valid, and a min <= seq < max requires + // further checking. Normally max is defined by the snapshot and min is by + // minimum uncommitted seq. + inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot, + SequenceNumber* min, + SequenceNumber* max); + // Validate is a snapshot sequence number is still valid based on the latest + // db status. backed_by_snapshot specifies if the number is baked by an actual + // snapshot object. order specified the memory order with which we load the + // atomic variables: relax is enough for the default since we care about last + // value seen by same thread. + inline bool ValidateSnapshot( + const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, + std::memory_order order = std::memory_order_relaxed); + // Get a dummy snapshot that refers to kMaxSequenceNumber + Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; } + + bool ShouldRollbackWithSingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + return rollback_deletion_type_callback_ + ? rollback_deletion_type_callback_(this, column_family, key) + : false; + } + + std::function + rollback_deletion_type_callback_; + + private: + friend class AddPreparedCallback; + friend class PreparedHeap_BasicsTest_Test; + friend class PreparedHeap_Concurrent_Test; + friend class PreparedHeap_EmptyAtTheEnd_Test; + friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test; + friend class WritePreparedCommitEntryPreReleaseCallback; + friend class WritePreparedTransactionTestBase; + friend class WritePreparedTxn; + friend class WritePreparedTxnDBMock; + friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test; + friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test; + friend class + WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test; + friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test; + friend class WritePreparedTransactionTest_BasicRecovery_Test; + friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test; + friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test; + friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test; + friend class WritePreparedTransactionTest_CommitMap_Test; + friend class WritePreparedTransactionTest_DoubleSnapshot_Test; + friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test; + friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; + friend class WritePreparedTransactionTest_IsInSnapshot_Test; + friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; + friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; + friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test; + friend class + WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test; + friend class + WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test; + friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test; + friend class WritePreparedTransactionTest_OldCommitMapGC_Test; + friend class WritePreparedTransactionTest_Rollback_Test; + friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test; + friend class WriteUnpreparedTxn; + friend class WriteUnpreparedTxnDB; + friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; + friend class MultiOpsTxnsStressTest; + + void Init(const TransactionDBOptions& txn_db_opts); + + void WPRecordTick(uint32_t ticker_type) const { + RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type); + } + + // A heap with the amortized O(1) complexity for erase. It uses one extra heap + // to keep track of erased entries that are not yet on top of the main heap. + class PreparedHeap { + // The mutex is required for push and pop from PreparedHeap. ::erase will + // use external synchronization via prepared_mutex_. + port::Mutex push_pop_mutex_; + std::deque heap_; + std::priority_queue, std::greater> + erased_heap_; + std::atomic heap_top_ = {kMaxSequenceNumber}; + // True when testing crash recovery + bool TEST_CRASH_ = false; + friend class WritePreparedTxnDB; + + public: + ~PreparedHeap() { + if (!TEST_CRASH_) { + assert(heap_.empty()); + assert(erased_heap_.empty()); + } + } + port::Mutex* push_pop_mutex() { return &push_pop_mutex_; } + + inline bool empty() { return top() == kMaxSequenceNumber; } + // Returns kMaxSequenceNumber if empty() and the smallest otherwise. + inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); } + inline void push(uint64_t v) { + push_pop_mutex_.AssertHeld(); + if (heap_.empty()) { + heap_top_.store(v, std::memory_order_release); + } else { + assert(heap_top_.load() < v); + } + heap_.push_back(v); + } + void pop(bool locked = false) { + if (!locked) { + push_pop_mutex()->Lock(); + } + push_pop_mutex_.AssertHeld(); + heap_.pop_front(); + while (!heap_.empty() && !erased_heap_.empty() && + // heap_.top() > erased_heap_.top() could happen if we have erased + // a non-existent entry. Ideally the user should not do that but we + // should be resilient against it. + heap_.front() >= erased_heap_.top()) { + if (heap_.front() == erased_heap_.top()) { + heap_.pop_front(); + } + uint64_t erased __attribute__((__unused__)); + erased = erased_heap_.top(); + erased_heap_.pop(); + // No duplicate prepare sequence numbers + assert(erased_heap_.empty() || erased_heap_.top() != erased); + } + while (heap_.empty() && !erased_heap_.empty()) { + erased_heap_.pop(); + } + heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber, + std::memory_order_release); + if (!locked) { + push_pop_mutex()->Unlock(); + } + } + // Concurrrent calls needs external synchronization. It is safe to be called + // concurrent to push and pop though. + void erase(uint64_t seq) { + if (!empty()) { + auto top_seq = top(); + if (seq < top_seq) { + // Already popped, ignore it. + } else if (top_seq == seq) { + pop(); +#ifndef NDEBUG + MutexLock ml(push_pop_mutex()); + assert(heap_.empty() || heap_.front() != seq); +#endif + } else { // top() > seq + // Down the heap, remember to pop it later + erased_heap_.push(seq); + } + } + } + }; + + void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; } + + // Get the commit entry with index indexed_seq from the commit table. It + // returns true if such entry exists. + bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, + CommitEntry* entry) const; + + // Rewrite the entry with the index indexed_seq in the commit table with the + // commit entry . If the rewrite results into eviction, + // sets the evicted_entry and returns true. + bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, + CommitEntry* evicted_entry); + + // Rewrite the entry with the index indexed_seq in the commit table with the + // commit entry new_entry only if the existing entry matches the + // expected_entry. Returns false otherwise. + bool ExchangeCommitEntry(const uint64_t indexed_seq, + CommitEntry64b& expected_entry, + const CommitEntry& new_entry); + + // Increase max_evicted_seq_ from the previous value prev_max to the new + // value. This also involves taking care of prepared txns that are not + // committed before new_max, as well as updating the list of live snapshots at + // the time of updating the max. Thread-safety: this function can be called + // concurrently. The concurrent invocations of this function is equivalent to + // a serial invocation in which the last invocation is the one with the + // largest new_max value. + void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, + const SequenceNumber& new_max); + + inline SequenceNumber SmallestUnCommittedSeq() { + // Note: We have two lists to look into, but for performance reasons they + // are not read atomically. Since CheckPreparedAgainstMax copies the entry + // to delayed_prepared_ before removing it from prepared_txns_, to ensure + // that a prepared entry will not go unmissed, we look into them in opposite + // order: first read prepared_txns_ and then delayed_prepared_. + + // This must be called before calling ::top. This is because the concurrent + // thread would call ::RemovePrepared before updating + // GetLatestSequenceNumber(). Reading then in opposite order here guarantees + // that the ::top that we read would be lower the ::top if we had otherwise + // update/read them atomically. + auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1; + auto min_prepare = prepared_txns_.top(); + // Since we update the prepare_heap always from the main write queue via + // PreReleaseCallback, the prepared_txns_.top() indicates the smallest + // prepared data in 2pc transactions. For non-2pc transactions that are + // written in two steps, we also update prepared_txns_ at the first step + // (via the same mechanism) so that their uncommitted data is reflected in + // SmallestUnCommittedSeq. + if (!delayed_prepared_empty_.load()) { + ReadLock rl(&prepared_mutex_); + if (!delayed_prepared_.empty()) { + return *delayed_prepared_.begin(); + } + } + bool empty = min_prepare == kMaxSequenceNumber; + if (empty) { + // Since GetLatestSequenceNumber is updated + // after prepared_txns_ are, the value of GetLatestSequenceNumber would + // reflect any uncommitted data that is not added to prepared_txns_ yet. + // Otherwise, if there is no concurrent txn, this value simply reflects + // that latest value in the memtable. + return next_prepare; + } else { + return std::min(min_prepare, next_prepare); + } + } + + // Enhance the snapshot object by recording in it the smallest uncommitted seq + inline void EnhanceSnapshot(SnapshotImpl* snapshot, + SequenceNumber min_uncommitted) { + assert(snapshot); + assert(min_uncommitted <= snapshot->number_ + 1); + snapshot->min_uncommitted_ = min_uncommitted; + } + + virtual const std::vector GetSnapshotListFromDB( + SequenceNumber max); + + // Will be called by the public ReleaseSnapshot method. Does the maintenance + // internal to WritePreparedTxnDB + void ReleaseSnapshotInternal(const SequenceNumber snap_seq); + + // Update the list of snapshots corresponding to the soon-to-be-updated + // max_evicted_seq_. Thread-safety: this function can be called concurrently. + // The concurrent invocations of this function is equivalent to a serial + // invocation in which the last invocation is the one with the largest + // version value. + void UpdateSnapshots(const std::vector& snapshots, + const SequenceNumber& version); + // Check the new list of new snapshots against the old one to see if any of + // the snapshots are released and to do the cleanup for the released snapshot. + void CleanupReleasedSnapshots( + const std::vector& new_snapshots, + const std::vector& old_snapshots); + + // Check an evicted entry against live snapshots to see if it should be kept + // around or it can be safely discarded (and hence assume committed for all + // snapshots). Thread-safety: this function can be called concurrently. If it + // is called concurrently with multiple UpdateSnapshots, the result is the + // same as checking the intersection of the snapshot list before updates with + // the snapshot list of all the concurrent updates. + void CheckAgainstSnapshots(const CommitEntry& evicted); + + // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < + // commit_seq. Return false if checking the next snapshot(s) is not needed. + // This is the case if none of the next snapshots could satisfy the condition. + // next_is_larger: the next snapshot will be a larger value + bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, + const uint64_t& commit_seq, + const uint64_t& snapshot_seq, + const bool next_is_larger); + + // A trick to increase the last visible sequence number by one and also wait + // for the in-flight commits to be visible. + void AdvanceSeqByOne(); + + // The list of live snapshots at the last time that max_evicted_seq_ advanced. + // The list stored into two data structures: in snapshot_cache_ that is + // efficient for concurrent reads, and in snapshots_ if the data does not fit + // into snapshot_cache_. The total number of snapshots in the two lists + std::atomic snapshots_total_ = {}; + // The list sorted in ascending order. Thread-safety for writes is provided + // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for + // each entry. In x86_64 architecture such reads are compiled to simple read + // instructions. + const size_t SNAPSHOT_CACHE_BITS; + const size_t SNAPSHOT_CACHE_SIZE; + std::unique_ptr[]> snapshot_cache_; + // 2nd list for storing snapshots. The list sorted in ascending order. + // Thread-safety is provided with snapshots_mutex_. + std::vector snapshots_; + // The list of all snapshots: snapshots_ + snapshot_cache_. This list although + // redundant but simplifies CleanupOldSnapshots implementation. + // Thread-safety is provided with snapshots_mutex_. + std::vector snapshots_all_; + // The version of the latest list of snapshots. This can be used to avoid + // rewriting a list that is concurrently updated with a more recent version. + SequenceNumber snapshots_version_ = 0; + + // A heap of prepared transactions. Thread-safety is provided with + // prepared_mutex_. + PreparedHeap prepared_txns_; + const size_t COMMIT_CACHE_BITS; + const size_t COMMIT_CACHE_SIZE; + const CommitEntry64bFormat FORMAT; + // commit_cache_ must be initialized to zero to tell apart an empty index from + // a filled one. Thread-safety is provided with commit_cache_mutex_. + std::unique_ptr[]> commit_cache_; + // The largest evicted *commit* sequence number from the commit_cache_. If a + // seq is smaller than max_evicted_seq_ is might or might not be present in + // commit_cache_. So commit_cache_ must first be checked before consulting + // with max_evicted_seq_. + std::atomic max_evicted_seq_ = {}; + // Order: 1) update future_max_evicted_seq_ = new_max, 2) + // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since + // GetSnapshotInternal guarantess that the snapshot seq is larger than + // future_max_evicted_seq_, this guarantes that if a snapshot is not larger + // than max has already being looked at via a GetSnapshotListFromDB(new_max). + std::atomic future_max_evicted_seq_ = {}; + // Advance max_evicted_seq_ by this value each time it needs an update. The + // larger the value, the less frequent advances we would have. We do not want + // it to be too large either as it would cause stalls by doing too much + // maintenance work under the lock. + size_t INC_STEP_FOR_MAX_EVICTED = 1; + // A map from old snapshots (expected to be used by a few read-only txns) to + // prepared sequence number of the evicted entries from commit_cache_ that + // overlaps with such snapshot. These are the prepared sequence numbers that + // the snapshot, to which they are mapped, cannot assume to be committed just + // because it is no longer in the commit_cache_. The vector must be sorted + // after each update. + // Thread-safety is provided with old_commit_map_mutex_. + std::map> old_commit_map_; + // A set of long-running prepared transactions that are not finished by the + // time max_evicted_seq_ advances their sequence number. This is expected to + // be empty normally. Thread-safety is provided with prepared_mutex_. + std::set delayed_prepared_; + // Commit of a delayed prepared: 1) update commit cache, 2) update + // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_. + // delayed_prepared_commits_ will help us tell apart the unprepared txns from + // the ones that are committed but not cleaned up yet. + std::unordered_map delayed_prepared_commits_; + // Update when delayed_prepared_.empty() changes. Expected to be true + // normally. + std::atomic delayed_prepared_empty_ = {true}; + // Update when old_commit_map_.empty() changes. Expected to be true normally. + std::atomic old_commit_map_empty_ = {true}; + mutable port::RWMutex prepared_mutex_; + mutable port::RWMutex old_commit_map_mutex_; + mutable port::RWMutex commit_cache_mutex_; + mutable port::RWMutex snapshots_mutex_; + // A cache of the cf comparators + // Thread safety: since it is a const it is safe to read it concurrently + std::shared_ptr> cf_map_; + // A cache of the cf handles + // Thread safety: since the handle is read-only object it is a const it is + // safe to read it concurrently + std::shared_ptr> handle_map_; + // A dummy snapshot object that refers to kMaxSequenceNumber + SnapshotImpl dummy_max_snapshot_; +}; + +class WritePreparedTxnReadCallback : public ReadCallback { + public: + WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) + : ReadCallback(snapshot), + db_(db), + backed_by_snapshot_(kBackedByDBSnapshot) {} + WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, + SequenceNumber min_uncommitted, + SnapshotBackup backed_by_snapshot) + : ReadCallback(snapshot, min_uncommitted), + db_(db), + backed_by_snapshot_(backed_by_snapshot) { + (void)backed_by_snapshot_; // to silence unused private field warning + } + + virtual ~WritePreparedTxnReadCallback() { + // If it is not backed by snapshot, the caller must check validity + assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot); + } + + // Will be called to see if the seq number visible; if not it moves on to + // the next seq number. + inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override { + auto snapshot = max_visible_seq_; + bool snap_released = false; + auto ret = + db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released); + assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot); + snap_released_ |= snap_released; + return ret; + } + + inline bool valid() { + valid_checked_ = true; + return snap_released_ == false; + } + + // TODO(myabandeh): override Refresh when Iterator::Refresh is supported + private: + WritePreparedTxnDB* db_; + // Whether max_visible_seq_ is backed by a snapshot + const SnapshotBackup backed_by_snapshot_; + bool snap_released_ = false; + // Safety check to ensure that the caller has checked invalid statuses + bool valid_checked_ = false; +}; + +class AddPreparedCallback : public PreReleaseCallback { + public: + AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl, + size_t sub_batch_cnt, bool two_write_queues, + bool first_prepare_batch) + : db_(db), + db_impl_(db_impl), + sub_batch_cnt_(sub_batch_cnt), + two_write_queues_(two_write_queues), + first_prepare_batch_(first_prepare_batch) { + (void)two_write_queues_; // to silence unused private field warning + } + virtual Status Callback(SequenceNumber prepare_seq, + bool is_mem_disabled __attribute__((__unused__)), + uint64_t log_number, size_t index, + size_t total) override { + assert(index < total); + // To reduce the cost of lock acquisition competing with the concurrent + // prepare requests, lock on the first callback and unlock on the last. + const bool do_lock = !two_write_queues_ || index == 0; + const bool do_unlock = !two_write_queues_ || index + 1 == total; + // Always Prepare from the main queue + assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue + TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause"); + TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume"); + if (do_lock) { + db_->prepared_txns_.push_pop_mutex()->Lock(); + } + const bool kLocked = true; + for (size_t i = 0; i < sub_batch_cnt_; i++) { + db_->AddPrepared(prepare_seq + i, kLocked); + } + if (do_unlock) { + db_->prepared_txns_.push_pop_mutex()->Unlock(); + } + TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end"); + if (first_prepare_batch_) { + assert(log_number != 0); + db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( + log_number); + } + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + DBImpl* db_impl_; + size_t sub_batch_cnt_; + bool two_write_queues_; + // It is 2PC and this is the first prepare batch. Always the case in 2PC + // unless it is WriteUnPrepared. + bool first_prepare_batch_; +}; + +class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { + public: + // includes_data indicates that the commit also writes non-empty + // CommitTimeWriteBatch to memtable, which needs to be committed separately. + WritePreparedCommitEntryPreReleaseCallback( + WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq, + size_t prep_batch_cnt, size_t data_batch_cnt = 0, + SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0) + : db_(db), + db_impl_(db_impl), + prep_seq_(prep_seq), + prep_batch_cnt_(prep_batch_cnt), + data_batch_cnt_(data_batch_cnt), + includes_data_(data_batch_cnt_ > 0), + aux_seq_(aux_seq), + aux_batch_cnt_(aux_batch_cnt), + includes_aux_batch_(aux_batch_cnt > 0) { + assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor + assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); + assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor + } + + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled __attribute__((__unused__)), + uint64_t, size_t /*index*/, + size_t /*total*/) override { + // Always commit from the 2nd queue + assert(!db_impl_->immutable_db_options().two_write_queues || + is_mem_disabled); + assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); + // Data batch is what accompanied with the commit marker and affects the + // last seq in the commit batch. + const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) + ? commit_seq + : commit_seq + data_batch_cnt_ - 1; + if (prep_seq_ != kMaxSequenceNumber) { + for (size_t i = 0; i < prep_batch_cnt_; i++) { + db_->AddCommitted(prep_seq_ + i, last_commit_seq); + } + } // else there was no prepare phase + if (includes_aux_batch_) { + for (size_t i = 0; i < aux_batch_cnt_; i++) { + db_->AddCommitted(aux_seq_ + i, last_commit_seq); + } + } + if (includes_data_) { + assert(data_batch_cnt_); + // Commit the data that is accompanied with the commit request + for (size_t i = 0; i < data_batch_cnt_; i++) { + // For commit seq of each batch use the commit seq of the last batch. + // This would make debugging easier by having all the batches having + // the same sequence number. + db_->AddCommitted(commit_seq + i, last_commit_seq); + } + } + if (db_impl_->immutable_db_options().two_write_queues) { + assert(is_mem_disabled); // implies the 2nd queue + // Publish the sequence number. We can do that here assuming the callback + // is invoked only from one write queue, which would guarantee that the + // publish sequence numbers will be in order, i.e., once a seq is + // published all the seq prior to that are also publishable. + db_impl_->SetLastPublishedSequence(last_commit_seq); + // Note RemovePrepared should be called after publishing the seq. + // Otherwise SmallestUnCommittedSeq optimization breaks. + if (prep_seq_ != kMaxSequenceNumber) { + db_->RemovePrepared(prep_seq_, prep_batch_cnt_); + } // else there was no prepare phase + if (includes_aux_batch_) { + db_->RemovePrepared(aux_seq_, aux_batch_cnt_); + } + } + // else SequenceNumber that is updated as part of the write already does the + // publishing + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + DBImpl* db_impl_; + // kMaxSequenceNumber if there was no prepare phase + SequenceNumber prep_seq_; + size_t prep_batch_cnt_; + size_t data_batch_cnt_; + // Data here is the batch that is written with the commit marker, either + // because it is commit without prepare or commit has a CommitTimeWriteBatch. + bool includes_data_; + // Auxiliary batch (if there is any) is a batch that is written before, but + // gets the same commit seq as prepare batch or data batch. This is used in + // two write queues where the CommitTimeWriteBatch becomes the aux batch and + // we do a separate write to actually commit everything. + SequenceNumber aux_seq_; + size_t aux_batch_cnt_; + bool includes_aux_batch_; +}; + +// For two_write_queues commit both the aborted batch and the cleanup batch and +// then published the seq +class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { + public: + WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db, + DBImpl* db_impl, + SequenceNumber prep_seq, + SequenceNumber rollback_seq, + size_t prep_batch_cnt) + : db_(db), + db_impl_(db_impl), + prep_seq_(prep_seq), + rollback_seq_(rollback_seq), + prep_batch_cnt_(prep_batch_cnt) { + assert(prep_seq != kMaxSequenceNumber); + assert(rollback_seq != kMaxSequenceNumber); + assert(prep_batch_cnt_ > 0); + } + + Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t, + size_t /*index*/, size_t /*total*/) override { + // Always commit from the 2nd queue + assert(is_mem_disabled); // implies the 2nd queue + assert(db_impl_->immutable_db_options().two_write_queues); +#ifdef NDEBUG + (void)is_mem_disabled; +#endif + const uint64_t last_commit_seq = commit_seq; + db_->AddCommitted(rollback_seq_, last_commit_seq); + for (size_t i = 0; i < prep_batch_cnt_; i++) { + db_->AddCommitted(prep_seq_ + i, last_commit_seq); + } + db_impl_->SetLastPublishedSequence(last_commit_seq); + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + DBImpl* db_impl_; + SequenceNumber prep_seq_; + SequenceNumber rollback_seq_; + size_t prep_batch_cnt_; +}; + +// Count the number of sub-batches inside a batch. A sub-batch does not have +// duplicate keys. +struct SubBatchCounter : public WriteBatch::Handler { + explicit SubBatchCounter(std::map& comparators) + : comparators_(comparators), batches_(1) {} + std::map& comparators_; + using CFKeys = std::set; + std::map keys_; + size_t batches_; + size_t BatchCount() { return batches_; } + void AddKey(const uint32_t cf, const Slice& key); + void InitWithComp(const uint32_t cf); + Status MarkNoop(bool) override { return Status::OK(); } + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + Status MarkCommit(const Slice&) override { return Status::OK(); } + Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { + AddKey(cf, key); + return Status::OK(); + } + Status DeleteCF(uint32_t cf, const Slice& key) override { + AddKey(cf, key); + return Status::OK(); + } + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + AddKey(cf, key); + return Status::OK(); + } + Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { + AddKey(cf, key); + return Status::OK(); + } + Status MarkBeginPrepare(bool) override { return Status::OK(); } + Status MarkRollback(const Slice&) override { return Status::OK(); } + Handler::OptionState WriteAfterCommit() const override { + return Handler::OptionState::kDisabled; + } +}; + +SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, + SequenceNumber* min, + SequenceNumber* max) { + if (snapshot != nullptr) { + *min = + static_cast_with_check(snapshot)->min_uncommitted_; + *max = static_cast_with_check(snapshot)->number_; + // A duplicate of the check in EnhanceSnapshot(). + assert(*min <= *max + 1); + return kBackedByDBSnapshot; + } else { + *min = SmallestUnCommittedSeq(); + *max = 0; // to be assigned later after sv is referenced. + return kUnbackedByDBSnapshot; + } +} + +bool WritePreparedTxnDB::ValidateSnapshot( + const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, + std::memory_order order) { + if (backed_by_snapshot == kBackedByDBSnapshot) { + return true; + } else { + SequenceNumber max = max_evicted_seq_.load(order); + // Validate that max has not advanced the snapshot seq that is not backed + // by a real snapshot. This is a very rare case that should not happen in + // real workloads. + if (UNLIKELY(snap_seq <= max && snap_seq != 0)) { + return false; + } + } + return true; +} + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE -- cgit v1.2.3