From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../utilities/transactions/transaction_base.h | 384 +++++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 src/rocksdb/utilities/transactions/transaction_base.h (limited to 'src/rocksdb/utilities/transactions/transaction_base.h') diff --git a/src/rocksdb/utilities/transactions/transaction_base.h b/src/rocksdb/utilities/transactions/transaction_base.h new file mode 100644 index 000000000..1bcb20ca9 --- /dev/null +++ b/src/rocksdb/utilities/transactions/transaction_base.h @@ -0,0 +1,384 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "db/write_batch_internal.h" +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/snapshot.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/autovector.h" +#include "utilities/transactions/lock/lock_tracker.h" +#include "utilities/transactions/transaction_util.h" + +namespace ROCKSDB_NAMESPACE { + +class TransactionBaseImpl : public Transaction { + public: + TransactionBaseImpl(DB* db, const WriteOptions& write_options, + const LockTrackerFactory& lock_tracker_factory); + + ~TransactionBaseImpl() override; + + // Remove pending operations queued in this transaction. + virtual void Clear(); + + void Reinitialize(DB* db, const WriteOptions& write_options); + + // Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock + // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed. + // do_validate will be false if called from PutUntracked, DeleteUntracked, + // MergeUntracked, or GetForUpdate(do_validate=false) + virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, + bool read_only, bool exclusive, + const bool do_validate = true, + const bool assume_tracked = false) = 0; + + void SetSavePoint() override; + + Status RollbackToSavePoint() override; + + Status PopSavePoint() override; + + using Transaction::Get; + Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, std::string* value) override; + + Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) override; + + Status Get(const ReadOptions& options, const Slice& key, + std::string* value) override { + return Get(options, db_->DefaultColumnFamily(), key, value); + } + + using Transaction::GetForUpdate; + Status GetForUpdate(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool exclusive, + const bool do_validate) override; + + Status GetForUpdate(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val, bool exclusive, + const bool do_validate) override; + + Status GetForUpdate(const ReadOptions& options, const Slice& key, + std::string* value, bool exclusive, + const bool do_validate) override { + return GetForUpdate(options, db_->DefaultColumnFamily(), key, value, + exclusive, do_validate); + } + + using Transaction::MultiGet; + std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override; + + std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) override { + return MultiGet(options, + std::vector( + keys.size(), db_->DefaultColumnFamily()), + keys, values); + } + + void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, PinnableSlice* values, + Status* statuses, const bool sorted_input = false) override; + + using Transaction::MultiGetForUpdate; + std::vector MultiGetForUpdate( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override; + + std::vector MultiGetForUpdate( + const ReadOptions& options, const std::vector& keys, + std::vector* values) override { + return MultiGetForUpdate(options, + std::vector( + keys.size(), db_->DefaultColumnFamily()), + keys, values); + } + + Iterator* GetIterator(const ReadOptions& read_options) override; + Iterator* GetIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) override; + + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, const bool assume_tracked = false) override; + Status Put(const Slice& key, const Slice& value) override { + return Put(nullptr, key, value); + } + + Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value, + const bool assume_tracked = false) override; + Status Put(const SliceParts& key, const SliceParts& value) override { + return Put(nullptr, key, value); + } + + Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, const bool assume_tracked = false) override; + Status Merge(const Slice& key, const Slice& value) override { + return Merge(nullptr, key, value); + } + + Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; + Status Delete(const Slice& key) override { return Delete(nullptr, key); } + Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; + Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } + + Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; + Status SingleDelete(const Slice& key) override { + return SingleDelete(nullptr, key); + } + Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; + Status SingleDelete(const SliceParts& key) override { + return SingleDelete(nullptr, key); + } + + Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status PutUntracked(const Slice& key, const Slice& value) override { + return PutUntracked(nullptr, key, value); + } + + Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + Status PutUntracked(const SliceParts& key, const SliceParts& value) override { + return PutUntracked(nullptr, key, value); + } + + Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status MergeUntracked(const Slice& key, const Slice& value) override { + return MergeUntracked(nullptr, key, value); + } + + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status DeleteUntracked(const Slice& key) override { + return DeleteUntracked(nullptr, key); + } + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + Status DeleteUntracked(const SliceParts& key) override { + return DeleteUntracked(nullptr, key); + } + + Status SingleDeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status SingleDeleteUntracked(const Slice& key) override { + return SingleDeleteUntracked(nullptr, key); + } + + void PutLogData(const Slice& blob) override; + + WriteBatchWithIndex* GetWriteBatch() override; + + virtual void SetLockTimeout(int64_t /*timeout*/) override { /* Do nothing */ + } + + const Snapshot* GetSnapshot() const override { + // will return nullptr when there is no snapshot + return snapshot_.get(); + } + + std::shared_ptr GetTimestampedSnapshot() const override { + return snapshot_; + } + + virtual void SetSnapshot() override; + void SetSnapshotOnNextOperation( + std::shared_ptr notifier = nullptr) override; + + void ClearSnapshot() override { + snapshot_.reset(); + snapshot_needed_ = false; + snapshot_notifier_ = nullptr; + } + + void DisableIndexing() override { indexing_enabled_ = false; } + + void EnableIndexing() override { indexing_enabled_ = true; } + + bool IndexingEnabled() const { return indexing_enabled_; } + + uint64_t GetElapsedTime() const override; + + uint64_t GetNumPuts() const override; + + uint64_t GetNumDeletes() const override; + + uint64_t GetNumMerges() const override; + + uint64_t GetNumKeys() const override; + + void UndoGetForUpdate(ColumnFamilyHandle* column_family, + const Slice& key) override; + void UndoGetForUpdate(const Slice& key) override { + return UndoGetForUpdate(nullptr, key); + }; + + WriteOptions* GetWriteOptions() override { return &write_options_; } + + void SetWriteOptions(const WriteOptions& write_options) override { + write_options_ = write_options; + } + + // Used for memory management for snapshot_ + void ReleaseSnapshot(const Snapshot* snapshot, DB* db); + + // iterates over the given batch and makes the appropriate inserts. + // used for rebuilding prepared transactions after recovery. + virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) override; + + WriteBatch* GetCommitTimeWriteBatch() override; + + LockTracker& GetTrackedLocks() { return *tracked_locks_; } + + protected: + // Add a key to the list of tracked keys. + // + // seqno is the earliest seqno this key was involved with this transaction. + // readonly should be set to true if no data was written for this key + void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno, + bool readonly, bool exclusive); + + // Called when UndoGetForUpdate determines that this key can be unlocked. + virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family, + const Slice& key) = 0; + + // Sets a snapshot if SetSnapshotOnNextOperation() has been called. + void SetSnapshotIfNeeded(); + + // Initialize write_batch_ for 2PC by inserting Noop. + inline void InitWriteBatch(bool clear = false) { + if (clear) { + write_batch_.Clear(); + } + assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader); + auto s = WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + assert(s.ok()); + } + + WriteBatchBase* GetBatchForWrite(); + + DB* db_; + DBImpl* dbimpl_; + + WriteOptions write_options_; + + const Comparator* cmp_; + + const LockTrackerFactory& lock_tracker_factory_; + + // Stores that time the txn was constructed, in microseconds. + uint64_t start_time_; + + // Stores the current snapshot that was set by SetSnapshot or null if + // no snapshot is currently set. + std::shared_ptr snapshot_; + + // Count of various operations pending in this transaction + uint64_t num_puts_ = 0; + uint64_t num_deletes_ = 0; + uint64_t num_merges_ = 0; + + struct SavePoint { + std::shared_ptr snapshot_; + bool snapshot_needed_ = false; + std::shared_ptr snapshot_notifier_; + uint64_t num_puts_ = 0; + uint64_t num_deletes_ = 0; + uint64_t num_merges_ = 0; + + // Record all locks tracked since the last savepoint + std::shared_ptr new_locks_; + + SavePoint(std::shared_ptr snapshot, bool snapshot_needed, + std::shared_ptr snapshot_notifier, + uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges, + const LockTrackerFactory& lock_tracker_factory) + : snapshot_(snapshot), + snapshot_needed_(snapshot_needed), + snapshot_notifier_(snapshot_notifier), + num_puts_(num_puts), + num_deletes_(num_deletes), + num_merges_(num_merges), + new_locks_(lock_tracker_factory.Create()) {} + + explicit SavePoint(const LockTrackerFactory& lock_tracker_factory) + : new_locks_(lock_tracker_factory.Create()) {} + }; + + // Records writes pending in this transaction + WriteBatchWithIndex write_batch_; + + // For Pessimistic Transactions this is the set of acquired locks. + // Optimistic Transactions will keep note the requested locks (not actually + // locked), and do conflict checking until commit time based on the tracked + // lock requests. + std::unique_ptr tracked_locks_; + + // Stack of the Snapshot saved at each save point. Saved snapshots may be + // nullptr if there was no snapshot at the time SetSavePoint() was called. + std::unique_ptr>> + save_points_; + + private: + friend class WriteCommittedTxn; + friend class WritePreparedTxn; + + // Extra data to be persisted with the commit. Note this is only used when + // prepare phase is not skipped. + WriteBatch commit_time_batch_; + + // If true, future Put/Merge/Deletes will be indexed in the + // WriteBatchWithIndex. + // If false, future Put/Merge/Deletes will be inserted directly into the + // underlying WriteBatch and not indexed in the WriteBatchWithIndex. + bool indexing_enabled_; + + // SetSnapshotOnNextOperation() has been called and the snapshot has not yet + // been reset. + bool snapshot_needed_ = false; + + // SetSnapshotOnNextOperation() has been called and the caller would like + // a notification through the TransactionNotifier interface + std::shared_ptr snapshot_notifier_ = nullptr; + + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, + bool read_only, bool exclusive, const bool do_validate = true, + const bool assume_tracked = false); + + void SetSnapshotInternal(const Snapshot* snapshot); +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE -- cgit v1.2.3