diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/transactions/pessimistic_transaction_db.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/transactions/pessimistic_transaction_db.h')
-rw-r--r-- | src/rocksdb/utilities/transactions/pessimistic_transaction_db.h | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h new file mode 100644 index 000000000..25cd11054 --- /dev/null +++ b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h @@ -0,0 +1,318 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include <mutex> +#include <queue> +#include <set> +#include <string> +#include <unordered_map> +#include <vector> + +#include "db/db_iter.h" +#include "db/read_callback.h" +#include "db/snapshot_checker.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" +#include "utilities/transactions/lock/lock_manager.h" +#include "utilities/transactions/lock/range/range_lock_manager.h" +#include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/write_prepared_txn.h" + +namespace ROCKSDB_NAMESPACE { + +class PessimisticTransactionDB : public TransactionDB { + public: + explicit PessimisticTransactionDB(DB* db, + const TransactionDBOptions& txn_db_options); + + explicit PessimisticTransactionDB(StackableDB* db, + const TransactionDBOptions& txn_db_options); + + virtual ~PessimisticTransactionDB(); + + virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } + + virtual Status Initialize( + const std::vector<size_t>& compaction_enabled_cf_indices, + const std::vector<ColumnFamilyHandle*>& handles); + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override = 0; + + using StackableDB::Put; + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& val) override; + + using StackableDB::Delete; + virtual Status Delete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) override; + + using StackableDB::SingleDelete; + virtual Status SingleDelete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) override; + + using StackableDB::Merge; + virtual Status Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + + using TransactionDB::Write; + virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + inline Status WriteWithConcurrencyControl(const WriteOptions& opts, + WriteBatch* updates) { + Status s; + if (opts.protection_bytes_per_key > 0) { + s = WriteBatchInternal::UpdateProtectionInfo( + updates, opts.protection_bytes_per_key); + } + if (s.ok()) { + // Need to lock all keys in this batch to prevent write conflicts with + // concurrent transactions. + Transaction* txn = BeginInternalTransaction(opts); + txn->DisableIndexing(); + + auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn); + + // Since commitBatch sorts the keys before locking, concurrent Write() + // operations will not cause a deadlock. + // In order to avoid a deadlock with a concurrent Transaction, + // Transactions should use a lock timeout. + s = txn_impl->CommitBatch(updates); + + delete txn; + } + + return s; + } + + using StackableDB::CreateColumnFamily; + virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) override; + + Status CreateColumnFamilies( + const ColumnFamilyOptions& options, + const std::vector<std::string>& column_family_names, + std::vector<ColumnFamilyHandle*>* handles) override; + + Status CreateColumnFamilies( + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles) override; + + using StackableDB::DropColumnFamily; + virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; + + Status DropColumnFamilies( + const std::vector<ColumnFamilyHandle*>& column_families) override; + + Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, + const std::string& key, bool exclusive); + Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id, + const Endpoint& start_endp, const Endpoint& end_endp); + + void UnLock(PessimisticTransaction* txn, const LockTracker& keys); + void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, + const std::string& key); + + void AddColumnFamily(const ColumnFamilyHandle* handle); + + static TransactionDBOptions ValidateTxnDBOptions( + const TransactionDBOptions& txn_db_options); + + const TransactionDBOptions& GetTxnDBOptions() const { + return txn_db_options_; + } + + void InsertExpirableTransaction(TransactionID tx_id, + PessimisticTransaction* tx); + void RemoveExpirableTransaction(TransactionID tx_id); + + // If transaction is no longer available, locks can be stolen + // If transaction is available, try stealing locks directly from transaction + // It is the caller's responsibility to ensure that the referred transaction + // is expirable (GetExpirationTime() > 0) and that it is expired. + bool TryStealingExpiredTransactionLocks(TransactionID tx_id); + + Transaction* GetTransactionByName(const TransactionName& name) override; + + void RegisterTransaction(Transaction* txn); + void UnregisterTransaction(Transaction* txn); + + // not thread safe. current use case is during recovery (single thread) + void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override; + + LockManager::PointLockStatus GetLockStatusData() override; + + std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; + void SetDeadlockInfoBufferSize(uint32_t target_size) override; + + // The default implementation does nothing. The actual implementation is moved + // to the child classes that actually need this information. This was due to + // an odd performance drop we observed when the added std::atomic member to + // the base class even when the subclass do not read it in the fast path. + virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} + virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} + + // Use the returned factory to create LockTrackers in transactions. + const LockTrackerFactory& GetLockTrackerFactory() const { + return lock_manager_->GetLockTrackerFactory(); + } + + std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot( + TxnTimestamp ts) override; + + std::shared_ptr<const Snapshot> GetTimestampedSnapshot( + TxnTimestamp ts) const override; + + void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override; + + Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub, + std::vector<std::shared_ptr<const Snapshot>>& + timestamped_snapshots) const override; + + protected: + DBImpl* db_impl_; + std::shared_ptr<Logger> info_log_; + const TransactionDBOptions txn_db_options_; + + static Status FailIfBatchHasTs(const WriteBatch* wb); + + static Status FailIfCfEnablesTs(const DB* db, + const ColumnFamilyHandle* column_family); + + void ReinitializeTransaction( + Transaction* txn, const WriteOptions& write_options, + const TransactionOptions& txn_options = TransactionOptions()); + + virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options); + + private: + friend class WritePreparedTxnDB; + friend class WritePreparedTxnDBMock; + friend class WriteUnpreparedTxn; + friend class TransactionTest_DoubleCrashInRecovery_Test; + friend class TransactionTest_DoubleEmptyWrite_Test; + friend class TransactionTest_DuplicateKeys_Test; + friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; + friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; + friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; + friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; + friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; + friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; + + Transaction* BeginInternalTransaction(const WriteOptions& options); + + std::shared_ptr<LockManager> lock_manager_; + + // Must be held when adding/dropping column families. + InstrumentedMutex column_family_mutex_; + + // Used to ensure that no locks are stolen from an expirable transaction + // that has started a commit. Only transactions with an expiration time + // should be in this map. + std::mutex map_mutex_; + std::unordered_map<TransactionID, PessimisticTransaction*> + expirable_transactions_map_; + + // map from name to two phase transaction instance + std::mutex name_map_mutex_; + std::unordered_map<TransactionName, Transaction*> transactions_; + + // Signal that we are testing a crash scenario. Some asserts could be relaxed + // in such cases. + virtual void TEST_Crash() {} +}; + +// A PessimisticTransactionDB that writes the data to the DB after the commit. +// In this way the DB only contains the committed data. +class WriteCommittedTxnDB : public PessimisticTransactionDB { + public: + explicit WriteCommittedTxnDB(DB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + explicit WriteCommittedTxnDB(StackableDB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + virtual ~WriteCommittedTxnDB() {} + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override; + + // Optimized version of ::Write that makes use of skip_concurrency_control + // hint + using TransactionDB::Write; + virtual Status Write(const WriteOptions& opts, + const TransactionDBWriteOptimizations& optimizations, + WriteBatch* updates) override; + virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; +}; + +inline Status PessimisticTransactionDB::FailIfBatchHasTs( + const WriteBatch* batch) { + if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { + return Status::NotSupported( + "Writes with timestamp must go through transaction API instead of " + "TransactionDB."); + } + return Status::OK(); +} + +inline Status PessimisticTransactionDB::FailIfCfEnablesTs( + const DB* db, const ColumnFamilyHandle* column_family) { + assert(db); + column_family = column_family ? column_family : db->DefaultColumnFamily(); + assert(column_family); + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() > 0) { + return Status::NotSupported( + "Write operation with user timestamp must go through the transaction " + "API instead of TransactionDB."); + } + return Status::OK(); +} + +class SnapshotCreationCallback : public PostMemTableCallback { + public: + explicit SnapshotCreationCallback( + DBImpl* dbi, TxnTimestamp commit_ts, + const std::shared_ptr<TransactionNotifier>& notifier, + std::shared_ptr<const Snapshot>& snapshot) + : db_impl_(dbi), + commit_ts_(commit_ts), + snapshot_notifier_(notifier), + snapshot_(snapshot) { + assert(db_impl_); + } + + ~SnapshotCreationCallback() override { + snapshot_creation_status_.PermitUncheckedError(); + } + + Status operator()(SequenceNumber seq, bool disable_memtable) override; + + private: + DBImpl* const db_impl_; + const TxnTimestamp commit_ts_; + std::shared_ptr<TransactionNotifier> snapshot_notifier_; + std::shared_ptr<const Snapshot>& snapshot_; + + Status snapshot_creation_status_; +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE |