diff options
Diffstat (limited to 'src/rocksdb/utilities/transactions/pessimistic_transaction_db.h')
-rw-r--r-- | src/rocksdb/utilities/transactions/pessimistic_transaction_db.h | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h new file mode 100644 index 00000000..e80b2885 --- /dev/null +++ b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h @@ -0,0 +1,197 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include <mutex> +#include <queue> +#include <set> +#include <string> +#include <unordered_map> +#include <vector> + +#include "db/db_iter.h" +#include "db/read_callback.h" +#include "db/snapshot_checker.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/transaction_lock_mgr.h" +#include "utilities/transactions/write_prepared_txn.h" + +namespace rocksdb { + +class PessimisticTransactionDB : public TransactionDB { + public: + explicit PessimisticTransactionDB(DB* db, + const TransactionDBOptions& txn_db_options); + + explicit PessimisticTransactionDB(StackableDB* db, + const TransactionDBOptions& txn_db_options); + + virtual ~PessimisticTransactionDB(); + + virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } + + virtual Status Initialize( + const std::vector<size_t>& compaction_enabled_cf_indices, + const std::vector<ColumnFamilyHandle*>& handles); + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override = 0; + + using StackableDB::Put; + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& val) override; + + using StackableDB::Delete; + virtual Status Delete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) override; + + using StackableDB::SingleDelete; + virtual Status SingleDelete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) override; + + using StackableDB::Merge; + virtual Status Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + + using TransactionDB::Write; + virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + + using StackableDB::CreateColumnFamily; + virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) override; + + using StackableDB::DropColumnFamily; + virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; + + Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, + const std::string& key, bool exclusive); + + void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys); + void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, + const std::string& key); + + void AddColumnFamily(const ColumnFamilyHandle* handle); + + static TransactionDBOptions ValidateTxnDBOptions( + const TransactionDBOptions& txn_db_options); + + const TransactionDBOptions& GetTxnDBOptions() const { + return txn_db_options_; + } + + void InsertExpirableTransaction(TransactionID tx_id, + PessimisticTransaction* tx); + void RemoveExpirableTransaction(TransactionID tx_id); + + // If transaction is no longer available, locks can be stolen + // If transaction is available, try stealing locks directly from transaction + // It is the caller's responsibility to ensure that the referred transaction + // is expirable (GetExpirationTime() > 0) and that it is expired. + bool TryStealingExpiredTransactionLocks(TransactionID tx_id); + + Transaction* GetTransactionByName(const TransactionName& name) override; + + void RegisterTransaction(Transaction* txn); + void UnregisterTransaction(Transaction* txn); + + // not thread safe. current use case is during recovery (single thread) + void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override; + + TransactionLockMgr::LockStatusData GetLockStatusData() override; + + std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; + void SetDeadlockInfoBufferSize(uint32_t target_size) override; + + // The default implementation does nothing. The actual implementation is moved + // to the child classes that actually need this information. This was due to + // an odd performance drop we observed when the added std::atomic member to + // the base class even when the subclass do not read it in the fast path. + virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} + virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} + + protected: + DBImpl* db_impl_; + std::shared_ptr<Logger> info_log_; + const TransactionDBOptions txn_db_options_; + + void ReinitializeTransaction( + Transaction* txn, const WriteOptions& write_options, + const TransactionOptions& txn_options = TransactionOptions()); + + virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options); + + private: + friend class WritePreparedTxnDB; + friend class WritePreparedTxnDBMock; + friend class WriteUnpreparedTxn; + friend class TransactionTest_DoubleEmptyWrite_Test; + friend class TransactionTest_DuplicateKeys_Test; + friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; + friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; + friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; + friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; + friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; + friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; + TransactionLockMgr lock_mgr_; + + // Must be held when adding/dropping column families. + InstrumentedMutex column_family_mutex_; + Transaction* BeginInternalTransaction(const WriteOptions& options); + + // Used to ensure that no locks are stolen from an expirable transaction + // that has started a commit. Only transactions with an expiration time + // should be in this map. + std::mutex map_mutex_; + std::unordered_map<TransactionID, PessimisticTransaction*> + expirable_transactions_map_; + + // map from name to two phase transaction instance + std::mutex name_map_mutex_; + std::unordered_map<TransactionName, Transaction*> transactions_; + + // Signal that we are testing a crash scenario. Some asserts could be relaxed + // in such cases. + virtual void TEST_Crash() {} +}; + +// A PessimisticTransactionDB that writes the data to the DB after the commit. +// In this way the DB only contains the committed data. +class WriteCommittedTxnDB : public PessimisticTransactionDB { + public: + explicit WriteCommittedTxnDB(DB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + explicit WriteCommittedTxnDB(StackableDB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + virtual ~WriteCommittedTxnDB() {} + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override; + + // Optimized version of ::Write that makes use of skip_concurrency_control + // hint + using TransactionDB::Write; + virtual Status Write(const WriteOptions& opts, + const TransactionDBWriteOptimizations& optimizations, + WriteBatch* updates) override; +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE |