diff options
Diffstat (limited to 'src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc')
-rw-r--r-- | src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc | 782 |
1 files changed, 782 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc new file mode 100644 index 000000000..950ef8042 --- /dev/null +++ b/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc @@ -0,0 +1,782 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/pessimistic_transaction_db.h" + +#include <cinttypes> +#include <sstream> +#include <string> +#include <unordered_set> +#include <vector> + +#include "db/db_impl/db_impl.h" +#include "logging/logging.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/mutexlock.h" +#include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" +#include "utilities/transactions/write_prepared_txn_db.h" +#include "utilities/transactions/write_unprepared_txn_db.h" + +namespace ROCKSDB_NAMESPACE { + +PessimisticTransactionDB::PessimisticTransactionDB( + DB* db, const TransactionDBOptions& txn_db_options) + : TransactionDB(db), + db_impl_(static_cast_with_check<DBImpl>(db)), + txn_db_options_(txn_db_options), + lock_manager_(NewLockManager(this, txn_db_options)) { + assert(db_impl_ != nullptr); + info_log_ = db_impl_->GetDBOptions().info_log; +} + +// Support initiliazing PessimisticTransactionDB from a stackable db +// +// PessimisticTransactionDB +// ^ ^ +// | | +// | + +// | StackableDB +// | ^ +// | | +// + + +// DBImpl +// ^ +// |(inherit) +// + +// DB +// +PessimisticTransactionDB::PessimisticTransactionDB( + StackableDB* db, const TransactionDBOptions& txn_db_options) + : TransactionDB(db), + db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())), + txn_db_options_(txn_db_options), + lock_manager_(NewLockManager(this, txn_db_options)) { + assert(db_impl_ != nullptr); +} + +PessimisticTransactionDB::~PessimisticTransactionDB() { + while (!transactions_.empty()) { + delete transactions_.begin()->second; + // TODO(myabandeh): this seems to be an unsafe approach as it is not quite + // clear whether delete would also remove the entry from transactions_. + } +} + +Status PessimisticTransactionDB::VerifyCFOptions( + const ColumnFamilyOptions& cf_options) { + const Comparator* const ucmp = cf_options.comparator; + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (0 == ts_sz) { + return Status::OK(); + } + if (ts_sz != sizeof(TxnTimestamp)) { + std::ostringstream oss; + oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp) + << " bytes. CF comparator " << std::string(ucmp->Name()) + << " timestamp size is " << ts_sz << " bytes"; + return Status::InvalidArgument(oss.str()); + } + if (txn_db_options_.write_policy != WRITE_COMMITTED) { + return Status::NotSupported("Only WriteCommittedTxn supports timestamp"); + } + return Status::OK(); +} + +Status PessimisticTransactionDB::Initialize( + const std::vector<size_t>& compaction_enabled_cf_indices, + const std::vector<ColumnFamilyHandle*>& handles) { + for (auto cf_ptr : handles) { + AddColumnFamily(cf_ptr); + } + // Verify cf options + for (auto handle : handles) { + ColumnFamilyDescriptor cfd; + Status s = handle->GetDescriptor(&cfd); + if (!s.ok()) { + return s; + } + s = VerifyCFOptions(cfd.options); + if (!s.ok()) { + return s; + } + } + + // Re-enable compaction for the column families that initially had + // compaction enabled. + std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles; + compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); + for (auto index : compaction_enabled_cf_indices) { + compaction_enabled_cf_handles.push_back(handles[index]); + } + + Status s = EnableAutoCompaction(compaction_enabled_cf_handles); + + // create 'real' transactions from recovered shell transactions + auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB()); + assert(dbimpl != nullptr); + auto rtrxs = dbimpl->recovered_transactions(); + + for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) { + auto recovered_trx = it->second; + assert(recovered_trx); + assert(recovered_trx->batches_.size() == 1); + const auto& seq = recovered_trx->batches_.begin()->first; + const auto& batch_info = recovered_trx->batches_.begin()->second; + assert(batch_info.log_number_); + assert(recovered_trx->name_.length()); + + WriteOptions w_options; + w_options.sync = true; + TransactionOptions t_options; + // This would help avoiding deadlock for keys that although exist in the WAL + // did not go through concurrency control. This includes the merge that + // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if + // there is a conflict between the keys of two transactions that must be + // avoided, it is already avoided by the application, MyRocks, before the + // restart (ii) application, MyRocks, guarntees to rollback/commit the + // recovered transactions before new transactions start. + t_options.skip_concurrency_control = true; + + Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); + assert(real_trx); + real_trx->SetLogNumber(batch_info.log_number_); + assert(seq != kMaxSequenceNumber); + if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) { + real_trx->SetId(seq); + } + + s = real_trx->SetName(recovered_trx->name_); + if (!s.ok()) { + break; + } + + s = real_trx->RebuildFromWriteBatch(batch_info.batch_); + // WriteCommitted set this to to disable this check that is specific to + // WritePrepared txns + assert(batch_info.batch_cnt_ == 0 || + real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_); + real_trx->SetState(Transaction::PREPARED); + if (!s.ok()) { + break; + } + } + if (s.ok()) { + dbimpl->DeleteAllRecoveredTransactions(); + } + return s; +} + +Transaction* WriteCommittedTxnDB::BeginTransaction( + const WriteOptions& write_options, const TransactionOptions& txn_options, + Transaction* old_txn) { + if (old_txn != nullptr) { + ReinitializeTransaction(old_txn, write_options, txn_options); + return old_txn; + } else { + return new WriteCommittedTxn(this, write_options, txn_options); + } +} + +TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( + const TransactionDBOptions& txn_db_options) { + TransactionDBOptions validated = txn_db_options; + + if (txn_db_options.num_stripes == 0) { + validated.num_stripes = 1; + } + + return validated; +} + +Status TransactionDB::Open(const Options& options, + const TransactionDBOptions& txn_db_options, + const std::string& dbname, TransactionDB** dbptr) { + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector<ColumnFamilyDescriptor> column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + std::vector<ColumnFamilyHandle*> handles; + Status s = TransactionDB::Open(db_options, txn_db_options, dbname, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + // i can delete the handle since DBImpl is always holding a reference to + // default column family + delete handles[0]; + } + + return s; +} + +Status TransactionDB::Open( + const DBOptions& db_options, const TransactionDBOptions& txn_db_options, + const std::string& dbname, + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) { + Status s; + DB* db = nullptr; + if (txn_db_options.write_policy == WRITE_COMMITTED && + db_options.unordered_write) { + return Status::NotSupported( + "WRITE_COMMITTED is incompatible with unordered_writes"); + } + if (txn_db_options.write_policy == WRITE_UNPREPARED && + db_options.unordered_write) { + // TODO(lth): support it + return Status::NotSupported( + "WRITE_UNPREPARED is currently incompatible with unordered_writes"); + } + if (txn_db_options.write_policy == WRITE_PREPARED && + db_options.unordered_write && !db_options.two_write_queues) { + return Status::NotSupported( + "WRITE_PREPARED is incompatible with unordered_writes if " + "two_write_queues is not enabled."); + } + + std::vector<ColumnFamilyDescriptor> column_families_copy = column_families; + std::vector<size_t> compaction_enabled_cf_indices; + DBOptions db_options_2pc = db_options; + PrepareWrap(&db_options_2pc, &column_families_copy, + &compaction_enabled_cf_indices); + const bool use_seq_per_batch = + txn_db_options.write_policy == WRITE_PREPARED || + txn_db_options.write_policy == WRITE_UNPREPARED; + const bool use_batch_per_txn = + txn_db_options.write_policy == WRITE_COMMITTED || + txn_db_options.write_policy == WRITE_PREPARED; + s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, + use_seq_per_batch, use_batch_per_txn); + if (s.ok()) { + ROCKS_LOG_WARN(db->GetDBOptions().info_log, + "Transaction write_policy is %" PRId32, + static_cast<int>(txn_db_options.write_policy)); + // if WrapDB return non-ok, db will be deleted in WrapDB() via + // ~StackableDB(). + s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, + dbptr); + } + return s; +} + +void TransactionDB::PrepareWrap( + DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families, + std::vector<size_t>* compaction_enabled_cf_indices) { + compaction_enabled_cf_indices->clear(); + + // Enable MemTable History if not already enabled + for (size_t i = 0; i < column_families->size(); i++) { + ColumnFamilyOptions* cf_options = &(*column_families)[i].options; + + if (cf_options->max_write_buffer_size_to_maintain == 0 && + cf_options->max_write_buffer_number_to_maintain == 0) { + // Setting to -1 will set the History size to + // max_write_buffer_number * write_buffer_size. + cf_options->max_write_buffer_size_to_maintain = -1; + } + if (!cf_options->disable_auto_compactions) { + // Disable compactions momentarily to prevent race with DB::Open + cf_options->disable_auto_compactions = true; + compaction_enabled_cf_indices->push_back(i); + } + } + db_options->allow_2pc = true; +} + +namespace { +template <typename DBType> +Status WrapAnotherDBInternal( + DBType* db, const TransactionDBOptions& txn_db_options, + const std::vector<size_t>& compaction_enabled_cf_indices, + const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) { + assert(db != nullptr); + assert(dbptr != nullptr); + *dbptr = nullptr; + std::unique_ptr<PessimisticTransactionDB> txn_db; + // txn_db owns object pointed to by the raw db pointer. + switch (txn_db_options.write_policy) { + case WRITE_UNPREPARED: + txn_db.reset(new WriteUnpreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); + break; + case WRITE_PREPARED: + txn_db.reset(new WritePreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); + break; + case WRITE_COMMITTED: + default: + txn_db.reset(new WriteCommittedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); + } + txn_db->UpdateCFComparatorMap(handles); + Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); + // In case of a failure at this point, db is deleted via the txn_db destructor + // and set to nullptr. + if (s.ok()) { + *dbptr = txn_db.release(); + } else { + for (auto* h : handles) { + delete h; + } + // txn_db still owns db, and ~StackableDB() will be called when txn_db goes + // out of scope, deleting the input db pointer. + ROCKS_LOG_FATAL(db->GetDBOptions().info_log, + "Failed to initialize txn_db: %s", s.ToString().c_str()); + } + return s; +} +} // namespace + +Status TransactionDB::WrapDB( + // make sure this db is already opened with memtable history enabled, + // auto compaction distabled and 2 phase commit enabled + DB* db, const TransactionDBOptions& txn_db_options, + const std::vector<size_t>& compaction_enabled_cf_indices, + const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) { + return WrapAnotherDBInternal(db, txn_db_options, + compaction_enabled_cf_indices, handles, dbptr); +} + +Status TransactionDB::WrapStackableDB( + // make sure this stackable_db is already opened with memtable history + // enabled, auto compaction distabled and 2 phase commit enabled + StackableDB* db, const TransactionDBOptions& txn_db_options, + const std::vector<size_t>& compaction_enabled_cf_indices, + const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) { + return WrapAnotherDBInternal(db, txn_db_options, + compaction_enabled_cf_indices, handles, dbptr); +} + +// Let LockManager know that this column family exists so it can +// allocate a LockMap for it. +void PessimisticTransactionDB::AddColumnFamily( + const ColumnFamilyHandle* handle) { + lock_manager_->AddColumnFamily(handle); +} + +Status PessimisticTransactionDB::CreateColumnFamily( + const ColumnFamilyOptions& options, const std::string& column_family_name, + ColumnFamilyHandle** handle) { + InstrumentedMutexLock l(&column_family_mutex_); + Status s = VerifyCFOptions(options); + if (!s.ok()) { + return s; + } + + s = db_->CreateColumnFamily(options, column_family_name, handle); + if (s.ok()) { + lock_manager_->AddColumnFamily(*handle); + UpdateCFComparatorMap(*handle); + } + + return s; +} + +Status PessimisticTransactionDB::CreateColumnFamilies( + const ColumnFamilyOptions& options, + const std::vector<std::string>& column_family_names, + std::vector<ColumnFamilyHandle*>* handles) { + InstrumentedMutexLock l(&column_family_mutex_); + + Status s = VerifyCFOptions(options); + if (!s.ok()) { + return s; + } + + s = db_->CreateColumnFamilies(options, column_family_names, handles); + if (s.ok()) { + for (auto* handle : *handles) { + lock_manager_->AddColumnFamily(handle); + UpdateCFComparatorMap(handle); + } + } + + return s; +} + +Status PessimisticTransactionDB::CreateColumnFamilies( + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles) { + InstrumentedMutexLock l(&column_family_mutex_); + + for (auto& cf_desc : column_families) { + Status s = VerifyCFOptions(cf_desc.options); + if (!s.ok()) { + return s; + } + } + + Status s = db_->CreateColumnFamilies(column_families, handles); + if (s.ok()) { + for (auto* handle : *handles) { + lock_manager_->AddColumnFamily(handle); + UpdateCFComparatorMap(handle); + } + } + + return s; +} + +// Let LockManager know that it can deallocate the LockMap for this +// column family. +Status PessimisticTransactionDB::DropColumnFamily( + ColumnFamilyHandle* column_family) { + InstrumentedMutexLock l(&column_family_mutex_); + + Status s = db_->DropColumnFamily(column_family); + if (s.ok()) { + lock_manager_->RemoveColumnFamily(column_family); + } + + return s; +} + +Status PessimisticTransactionDB::DropColumnFamilies( + const std::vector<ColumnFamilyHandle*>& column_families) { + InstrumentedMutexLock l(&column_family_mutex_); + + Status s = db_->DropColumnFamilies(column_families); + if (s.ok()) { + for (auto* handle : column_families) { + lock_manager_->RemoveColumnFamily(handle); + } + } + + return s; +} + +Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, + uint32_t cfh_id, + const std::string& key, + bool exclusive) { + return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive); +} + +Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn, + uint32_t cfh_id, + const Endpoint& start_endp, + const Endpoint& end_endp) { + return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(), + /*exclusive=*/true); +} + +void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, + const LockTracker& keys) { + lock_manager_->UnLock(txn, keys, GetEnv()); +} + +void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, + uint32_t cfh_id, const std::string& key) { + lock_manager_->UnLock(txn, cfh_id, key, GetEnv()); +} + +// Used when wrapping DB write operations in a transaction +Transaction* PessimisticTransactionDB::BeginInternalTransaction( + const WriteOptions& options) { + TransactionOptions txn_options; + Transaction* txn = BeginTransaction(options, txn_options, nullptr); + + // Use default timeout for non-transactional writes + txn->SetLockTimeout(txn_db_options_.default_lock_timeout); + return txn; +} + +// All user Put, Merge, Delete, and Write requests must be intercepted to make +// sure that they lock all keys that they are writing to avoid causing conflicts +// with any concurrent transactions. The easiest way to do this is to wrap all +// write operations in a transaction. +// +// Put(), Merge(), and Delete() only lock a single key per call. Write() will +// sort its keys before locking them. This guarantees that TransactionDB write +// methods cannot deadlock with each other (but still could deadlock with a +// Transaction). +Status PessimisticTransactionDB::Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, const Slice& val) { + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } + + Transaction* txn = BeginInternalTransaction(options); + txn->DisableIndexing(); + + // Since the client didn't create a transaction, they don't care about + // conflict checking for this write. So we just need to do PutUntracked(). + s = txn->PutUntracked(column_family, key, val); + + if (s.ok()) { + s = txn->Commit(); + } + + delete txn; + + return s; +} + +Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) { + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } + + Transaction* txn = BeginInternalTransaction(wopts); + txn->DisableIndexing(); + + // Since the client didn't create a transaction, they don't care about + // conflict checking for this write. So we just need to do + // DeleteUntracked(). + s = txn->DeleteUntracked(column_family, key); + + if (s.ok()) { + s = txn->Commit(); + } + + delete txn; + + return s; +} + +Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) { + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } + + Transaction* txn = BeginInternalTransaction(wopts); + txn->DisableIndexing(); + + // Since the client didn't create a transaction, they don't care about + // conflict checking for this write. So we just need to do + // SingleDeleteUntracked(). + s = txn->SingleDeleteUntracked(column_family, key); + + if (s.ok()) { + s = txn->Commit(); + } + + delete txn; + + return s; +} + +Status PessimisticTransactionDB::Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } + + Transaction* txn = BeginInternalTransaction(options); + txn->DisableIndexing(); + + // Since the client didn't create a transaction, they don't care about + // conflict checking for this write. So we just need to do + // MergeUntracked(). + s = txn->MergeUntracked(column_family, key, value); + + if (s.ok()) { + s = txn->Commit(); + } + + delete txn; + + return s; +} + +Status PessimisticTransactionDB::Write(const WriteOptions& opts, + WriteBatch* updates) { + return WriteWithConcurrencyControl(opts, updates); +} + +Status WriteCommittedTxnDB::Write(const WriteOptions& opts, + WriteBatch* updates) { + Status s = FailIfBatchHasTs(updates); + if (!s.ok()) { + return s; + } + if (txn_db_options_.skip_concurrency_control) { + return db_impl_->Write(opts, updates); + } else { + return WriteWithConcurrencyControl(opts, updates); + } +} + +Status WriteCommittedTxnDB::Write( + const WriteOptions& opts, + const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { + Status s = FailIfBatchHasTs(updates); + if (!s.ok()) { + return s; + } + if (optimizations.skip_concurrency_control) { + return db_impl_->Write(opts, updates); + } else { + return WriteWithConcurrencyControl(opts, updates); + } +} + +void PessimisticTransactionDB::InsertExpirableTransaction( + TransactionID tx_id, PessimisticTransaction* tx) { + assert(tx->GetExpirationTime() > 0); + std::lock_guard<std::mutex> lock(map_mutex_); + expirable_transactions_map_.insert({tx_id, tx}); +} + +void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) { + std::lock_guard<std::mutex> lock(map_mutex_); + expirable_transactions_map_.erase(tx_id); +} + +bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( + TransactionID tx_id) { + std::lock_guard<std::mutex> lock(map_mutex_); + + auto tx_it = expirable_transactions_map_.find(tx_id); + if (tx_it == expirable_transactions_map_.end()) { + return true; + } + PessimisticTransaction& tx = *(tx_it->second); + return tx.TryStealingLocks(); +} + +void PessimisticTransactionDB::ReinitializeTransaction( + Transaction* txn, const WriteOptions& write_options, + const TransactionOptions& txn_options) { + auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn); + + txn_impl->Reinitialize(this, write_options, txn_options); +} + +Transaction* PessimisticTransactionDB::GetTransactionByName( + const TransactionName& name) { + std::lock_guard<std::mutex> lock(name_map_mutex_); + auto it = transactions_.find(name); + if (it == transactions_.end()) { + return nullptr; + } else { + return it->second; + } +} + +void PessimisticTransactionDB::GetAllPreparedTransactions( + std::vector<Transaction*>* transv) { + assert(transv); + transv->clear(); + std::lock_guard<std::mutex> lock(name_map_mutex_); + for (auto it = transactions_.begin(); it != transactions_.end(); ++it) { + if (it->second->GetState() == Transaction::PREPARED) { + transv->push_back(it->second); + } + } +} + +LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() { + return lock_manager_->GetPointLockStatus(); +} + +std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() { + return lock_manager_->GetDeadlockInfoBuffer(); +} + +void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { + lock_manager_->Resize(target_size); +} + +void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { + assert(txn); + assert(txn->GetName().length() > 0); + assert(GetTransactionByName(txn->GetName()) == nullptr); + assert(txn->GetState() == Transaction::STARTED); + std::lock_guard<std::mutex> lock(name_map_mutex_); + transactions_[txn->GetName()] = txn; +} + +void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { + assert(txn); + std::lock_guard<std::mutex> lock(name_map_mutex_); + auto it = transactions_.find(txn->GetName()); + assert(it != transactions_.end()); + transactions_.erase(it); +} + +std::pair<Status, std::shared_ptr<const Snapshot>> +PessimisticTransactionDB::CreateTimestampedSnapshot(TxnTimestamp ts) { + if (kMaxTxnTimestamp == ts) { + return std::make_pair(Status::InvalidArgument("invalid ts"), nullptr); + } + assert(db_impl_); + return db_impl_->CreateTimestampedSnapshot(kMaxSequenceNumber, ts); +} + +std::shared_ptr<const Snapshot> +PessimisticTransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const { + assert(db_impl_); + return db_impl_->GetTimestampedSnapshot(ts); +} + +void PessimisticTransactionDB::ReleaseTimestampedSnapshotsOlderThan( + TxnTimestamp ts) { + assert(db_impl_); + db_impl_->ReleaseTimestampedSnapshotsOlderThan(ts); +} + +Status PessimisticTransactionDB::GetTimestampedSnapshots( + TxnTimestamp ts_lb, TxnTimestamp ts_ub, + std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const { + assert(db_impl_); + return db_impl_->GetTimestampedSnapshots(ts_lb, ts_ub, timestamped_snapshots); +} + +Status SnapshotCreationCallback::operator()(SequenceNumber seq, + bool disable_memtable) { + assert(db_impl_); + assert(commit_ts_ != kMaxTxnTimestamp); + + const bool two_write_queues = + db_impl_->immutable_db_options().two_write_queues; + assert(!two_write_queues || !disable_memtable); +#ifdef NDEBUG + (void)two_write_queues; + (void)disable_memtable; +#endif + + const bool seq_per_batch = db_impl_->seq_per_batch(); + if (!seq_per_batch) { + assert(db_impl_->GetLastPublishedSequence() <= seq); + } else { + assert(db_impl_->GetLastPublishedSequence() < seq); + } + + // Create a snapshot which can also be used for write conflict checking. + auto ret = db_impl_->CreateTimestampedSnapshot(seq, commit_ts_); + snapshot_creation_status_ = ret.first; + snapshot_ = ret.second; + if (snapshot_creation_status_.ok()) { + assert(snapshot_); + } else { + assert(!snapshot_); + } + if (snapshot_ && snapshot_notifier_) { + snapshot_notifier_->SnapshotCreated(snapshot_.get()); + } + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE |