summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/transactions/pessimistic_transaction_db.h318
1 files changed, 318 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h
new file mode 100644
index 000000000..25cd11054
--- /dev/null
+++ b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h
@@ -0,0 +1,318 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <mutex>
+#include <queue>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "db/db_iter.h"
+#include "db/read_callback.h"
+#include "db/snapshot_checker.h"
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/utilities/transaction_db.h"
+#include "util/cast_util.h"
+#include "utilities/transactions/lock/lock_manager.h"
+#include "utilities/transactions/lock/range/range_lock_manager.h"
+#include "utilities/transactions/pessimistic_transaction.h"
+#include "utilities/transactions/write_prepared_txn.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class PessimisticTransactionDB : public TransactionDB {
+ public:
+ explicit PessimisticTransactionDB(DB* db,
+ const TransactionDBOptions& txn_db_options);
+
+ explicit PessimisticTransactionDB(StackableDB* db,
+ const TransactionDBOptions& txn_db_options);
+
+ virtual ~PessimisticTransactionDB();
+
+ virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
+
+ virtual Status Initialize(
+ const std::vector<size_t>& compaction_enabled_cf_indices,
+ const std::vector<ColumnFamilyHandle*>& handles);
+
+ Transaction* BeginTransaction(const WriteOptions& write_options,
+ const TransactionOptions& txn_options,
+ Transaction* old_txn) override = 0;
+
+ using StackableDB::Put;
+ virtual Status Put(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& val) override;
+
+ using StackableDB::Delete;
+ virtual Status Delete(const WriteOptions& wopts,
+ ColumnFamilyHandle* column_family,
+ const Slice& key) override;
+
+ using StackableDB::SingleDelete;
+ virtual Status SingleDelete(const WriteOptions& wopts,
+ ColumnFamilyHandle* column_family,
+ const Slice& key) override;
+
+ using StackableDB::Merge;
+ virtual Status Merge(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override;
+
+ using TransactionDB::Write;
+ virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
+ inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
+ WriteBatch* updates) {
+ Status s;
+ if (opts.protection_bytes_per_key > 0) {
+ s = WriteBatchInternal::UpdateProtectionInfo(
+ updates, opts.protection_bytes_per_key);
+ }
+ if (s.ok()) {
+ // Need to lock all keys in this batch to prevent write conflicts with
+ // concurrent transactions.
+ Transaction* txn = BeginInternalTransaction(opts);
+ txn->DisableIndexing();
+
+ auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
+
+ // Since commitBatch sorts the keys before locking, concurrent Write()
+ // operations will not cause a deadlock.
+ // In order to avoid a deadlock with a concurrent Transaction,
+ // Transactions should use a lock timeout.
+ s = txn_impl->CommitBatch(updates);
+
+ delete txn;
+ }
+
+ return s;
+ }
+
+ using StackableDB::CreateColumnFamily;
+ virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
+ const std::string& column_family_name,
+ ColumnFamilyHandle** handle) override;
+
+ Status CreateColumnFamilies(
+ const ColumnFamilyOptions& options,
+ const std::vector<std::string>& column_family_names,
+ std::vector<ColumnFamilyHandle*>* handles) override;
+
+ Status CreateColumnFamilies(
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles) override;
+
+ using StackableDB::DropColumnFamily;
+ virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
+
+ Status DropColumnFamilies(
+ const std::vector<ColumnFamilyHandle*>& column_families) override;
+
+ Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
+ const std::string& key, bool exclusive);
+ Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
+ const Endpoint& start_endp, const Endpoint& end_endp);
+
+ void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
+ void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
+ const std::string& key);
+
+ void AddColumnFamily(const ColumnFamilyHandle* handle);
+
+ static TransactionDBOptions ValidateTxnDBOptions(
+ const TransactionDBOptions& txn_db_options);
+
+ const TransactionDBOptions& GetTxnDBOptions() const {
+ return txn_db_options_;
+ }
+
+ void InsertExpirableTransaction(TransactionID tx_id,
+ PessimisticTransaction* tx);
+ void RemoveExpirableTransaction(TransactionID tx_id);
+
+ // If transaction is no longer available, locks can be stolen
+ // If transaction is available, try stealing locks directly from transaction
+ // It is the caller's responsibility to ensure that the referred transaction
+ // is expirable (GetExpirationTime() > 0) and that it is expired.
+ bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
+
+ Transaction* GetTransactionByName(const TransactionName& name) override;
+
+ void RegisterTransaction(Transaction* txn);
+ void UnregisterTransaction(Transaction* txn);
+
+ // not thread safe. current use case is during recovery (single thread)
+ void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
+
+ LockManager::PointLockStatus GetLockStatusData() override;
+
+ std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
+ void SetDeadlockInfoBufferSize(uint32_t target_size) override;
+
+ // The default implementation does nothing. The actual implementation is moved
+ // to the child classes that actually need this information. This was due to
+ // an odd performance drop we observed when the added std::atomic member to
+ // the base class even when the subclass do not read it in the fast path.
+ virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
+ virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
+
+ // Use the returned factory to create LockTrackers in transactions.
+ const LockTrackerFactory& GetLockTrackerFactory() const {
+ return lock_manager_->GetLockTrackerFactory();
+ }
+
+ std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
+ TxnTimestamp ts) override;
+
+ std::shared_ptr<const Snapshot> GetTimestampedSnapshot(
+ TxnTimestamp ts) const override;
+
+ void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override;
+
+ Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
+ std::vector<std::shared_ptr<const Snapshot>>&
+ timestamped_snapshots) const override;
+
+ protected:
+ DBImpl* db_impl_;
+ std::shared_ptr<Logger> info_log_;
+ const TransactionDBOptions txn_db_options_;
+
+ static Status FailIfBatchHasTs(const WriteBatch* wb);
+
+ static Status FailIfCfEnablesTs(const DB* db,
+ const ColumnFamilyHandle* column_family);
+
+ void ReinitializeTransaction(
+ Transaction* txn, const WriteOptions& write_options,
+ const TransactionOptions& txn_options = TransactionOptions());
+
+ virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);
+
+ private:
+ friend class WritePreparedTxnDB;
+ friend class WritePreparedTxnDBMock;
+ friend class WriteUnpreparedTxn;
+ friend class TransactionTest_DoubleCrashInRecovery_Test;
+ friend class TransactionTest_DoubleEmptyWrite_Test;
+ friend class TransactionTest_DuplicateKeys_Test;
+ friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
+ friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
+ friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
+ friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
+ friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
+ friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
+
+ Transaction* BeginInternalTransaction(const WriteOptions& options);
+
+ std::shared_ptr<LockManager> lock_manager_;
+
+ // Must be held when adding/dropping column families.
+ InstrumentedMutex column_family_mutex_;
+
+ // Used to ensure that no locks are stolen from an expirable transaction
+ // that has started a commit. Only transactions with an expiration time
+ // should be in this map.
+ std::mutex map_mutex_;
+ std::unordered_map<TransactionID, PessimisticTransaction*>
+ expirable_transactions_map_;
+
+ // map from name to two phase transaction instance
+ std::mutex name_map_mutex_;
+ std::unordered_map<TransactionName, Transaction*> transactions_;
+
+ // Signal that we are testing a crash scenario. Some asserts could be relaxed
+ // in such cases.
+ virtual void TEST_Crash() {}
+};
+
+// A PessimisticTransactionDB that writes the data to the DB after the commit.
+// In this way the DB only contains the committed data.
+class WriteCommittedTxnDB : public PessimisticTransactionDB {
+ public:
+ explicit WriteCommittedTxnDB(DB* db,
+ const TransactionDBOptions& txn_db_options)
+ : PessimisticTransactionDB(db, txn_db_options) {}
+
+ explicit WriteCommittedTxnDB(StackableDB* db,
+ const TransactionDBOptions& txn_db_options)
+ : PessimisticTransactionDB(db, txn_db_options) {}
+
+ virtual ~WriteCommittedTxnDB() {}
+
+ Transaction* BeginTransaction(const WriteOptions& write_options,
+ const TransactionOptions& txn_options,
+ Transaction* old_txn) override;
+
+ // Optimized version of ::Write that makes use of skip_concurrency_control
+ // hint
+ using TransactionDB::Write;
+ virtual Status Write(const WriteOptions& opts,
+ const TransactionDBWriteOptimizations& optimizations,
+ WriteBatch* updates) override;
+ virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
+};
+
+inline Status PessimisticTransactionDB::FailIfBatchHasTs(
+ const WriteBatch* batch) {
+ if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
+ return Status::NotSupported(
+ "Writes with timestamp must go through transaction API instead of "
+ "TransactionDB.");
+ }
+ return Status::OK();
+}
+
+inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
+ const DB* db, const ColumnFamilyHandle* column_family) {
+ assert(db);
+ column_family = column_family ? column_family : db->DefaultColumnFamily();
+ assert(column_family);
+ const Comparator* const ucmp = column_family->GetComparator();
+ assert(ucmp);
+ if (ucmp->timestamp_size() > 0) {
+ return Status::NotSupported(
+ "Write operation with user timestamp must go through the transaction "
+ "API instead of TransactionDB.");
+ }
+ return Status::OK();
+}
+
+class SnapshotCreationCallback : public PostMemTableCallback {
+ public:
+ explicit SnapshotCreationCallback(
+ DBImpl* dbi, TxnTimestamp commit_ts,
+ const std::shared_ptr<TransactionNotifier>& notifier,
+ std::shared_ptr<const Snapshot>& snapshot)
+ : db_impl_(dbi),
+ commit_ts_(commit_ts),
+ snapshot_notifier_(notifier),
+ snapshot_(snapshot) {
+ assert(db_impl_);
+ }
+
+ ~SnapshotCreationCallback() override {
+ snapshot_creation_status_.PermitUncheckedError();
+ }
+
+ Status operator()(SequenceNumber seq, bool disable_memtable) override;
+
+ private:
+ DBImpl* const db_impl_;
+ const TxnTimestamp commit_ts_;
+ std::shared_ptr<TransactionNotifier> snapshot_notifier_;
+ std::shared_ptr<const Snapshot>& snapshot_;
+
+ Status snapshot_creation_status_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE