// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #ifndef ROCKSDB_LITE #include "utilities/transactions/pessimistic_transaction.h" #include #include #include #include #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "logging/logging.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/snapshot.h" #include "rocksdb/status.h" #include "rocksdb/utilities/transaction_db.h" #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/string_util.h" #include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_util.h" #include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace ROCKSDB_NAMESPACE { struct WriteOptions; std::atomic PessimisticTransaction::txn_id_counter_(1); TransactionID PessimisticTransaction::GenTxnID() { return txn_id_counter_.fetch_add(1); } PessimisticTransaction::PessimisticTransaction( TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options, const bool init) : TransactionBaseImpl( txn_db->GetRootDB(), write_options, static_cast_with_check(txn_db) ->GetLockTrackerFactory()), txn_db_impl_(nullptr), expiration_time_(0), txn_id_(0), waiting_cf_id_(0), waiting_key_(nullptr), lock_timeout_(0), deadlock_detect_(false), deadlock_detect_depth_(0), skip_concurrency_control_(false) { txn_db_impl_ = static_cast_with_check(txn_db); db_impl_ = static_cast_with_check(db_); if (init) { Initialize(txn_options); } } void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { // Range lock manager uses address of transaction object as TXNID const TransactionDBOptions& db_options = txn_db_impl_->GetTxnDBOptions(); if (db_options.lock_mgr_handle && db_options.lock_mgr_handle->getLockManager()->IsRangeLockSupported()) { txn_id_ = reinterpret_cast(this); } else { txn_id_ = GenTxnID(); } txn_state_ = STARTED; deadlock_detect_ = txn_options.deadlock_detect; deadlock_detect_depth_ = txn_options.deadlock_detect_depth; write_batch_.SetMaxBytes(txn_options.max_write_batch_size); skip_concurrency_control_ = txn_options.skip_concurrency_control; lock_timeout_ = txn_options.lock_timeout * 1000; if (lock_timeout_ < 0) { // Lock timeout not set, use default lock_timeout_ = txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000; } if (txn_options.expiration >= 0) { expiration_time_ = start_time_ + txn_options.expiration * 1000; } else { expiration_time_ = 0; } if (txn_options.set_snapshot) { SetSnapshot(); } if (expiration_time_ > 0) { txn_db_impl_->InsertExpirableTransaction(txn_id_, this); } use_only_the_last_commit_time_batch_for_recovery_ = txn_options.use_only_the_last_commit_time_batch_for_recovery; skip_prepare_ = txn_options.skip_prepare; read_timestamp_ = kMaxTxnTimestamp; commit_timestamp_ = kMaxTxnTimestamp; } PessimisticTransaction::~PessimisticTransaction() { txn_db_impl_->UnLock(this, *tracked_locks_); if (expiration_time_ > 0) { txn_db_impl_->RemoveExpirableTransaction(txn_id_); } if (!name_.empty() && txn_state_ != COMMITTED) { txn_db_impl_->UnregisterTransaction(this); } } void PessimisticTransaction::Clear() { txn_db_impl_->UnLock(this, *tracked_locks_); TransactionBaseImpl::Clear(); } void PessimisticTransaction::Reinitialize( TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) { if (!name_.empty() && txn_state_ != COMMITTED) { txn_db_impl_->UnregisterTransaction(this); } TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options); Initialize(txn_options); } bool PessimisticTransaction::IsExpired() const { if (expiration_time_ > 0) { if (dbimpl_->GetSystemClock()->NowMicros() >= expiration_time_) { // Transaction is expired. return true; } } return false; } WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) : PessimisticTransaction(txn_db, write_options, txn_options) {} Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool exclusive, const bool do_validate) { return GetForUpdateImpl(read_options, column_family, key, value, exclusive, do_validate); } Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, bool exclusive, const bool do_validate) { return GetForUpdateImpl(read_options, column_family, key, pinnable_val, exclusive, do_validate); } template inline Status WriteCommittedTxn::GetForUpdateImpl( const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, TValue* value, bool exclusive, const bool do_validate) { column_family = column_family ? column_family : db_impl_->DefaultColumnFamily(); assert(column_family); if (!read_options.timestamp) { const Comparator* const ucmp = column_family->GetComparator(); assert(ucmp); size_t ts_sz = ucmp->timestamp_size(); if (0 == ts_sz) { return TransactionBaseImpl::GetForUpdate(read_options, column_family, key, value, exclusive, do_validate); } } else { Status s = db_impl_->FailIfTsMismatchCf( column_family, *(read_options.timestamp), /*ts_for_read=*/true); if (!s.ok()) { return s; } } if (!do_validate) { return Status::InvalidArgument( "If do_validate is false then GetForUpdate with read_timestamp is not " "defined."); } else if (kMaxTxnTimestamp == read_timestamp_) { return Status::InvalidArgument("read_timestamp must be set for validation"); } if (!read_options.timestamp) { ReadOptions read_opts_copy = read_options; char ts_buf[sizeof(kMaxTxnTimestamp)]; EncodeFixed64(ts_buf, read_timestamp_); Slice ts(ts_buf, sizeof(ts_buf)); read_opts_copy.timestamp = &ts; return TransactionBaseImpl::GetForUpdate(read_opts_copy, column_family, key, value, exclusive, do_validate); } assert(read_options.timestamp); const char* const ts_buf = read_options.timestamp->data(); assert(read_options.timestamp->size() == sizeof(kMaxTxnTimestamp)); TxnTimestamp ts = DecodeFixed64(ts_buf); if (ts != read_timestamp_) { return Status::InvalidArgument("Must read from the same read_timestamp"); } return TransactionBaseImpl::GetForUpdate(read_options, column_family, key, value, exclusive, do_validate); } Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, const bool assume_tracked) { const bool do_validate = !assume_tracked; return Operate(column_family, key, do_validate, assume_tracked, [column_family, &key, &value, this]() { Status s = GetBatchForWrite()->Put(column_family, key, value); if (s.ok()) { ++num_puts_; } return s; }); } Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value, const bool assume_tracked) { const bool do_validate = !assume_tracked; return Operate(column_family, key, do_validate, assume_tracked, [column_family, &key, &value, this]() { Status s = GetBatchForWrite()->Put(column_family, key, value); if (s.ok()) { ++num_puts_; } return s; }); } Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { return Operate( column_family, key, /*do_validate=*/false, /*assume_tracked=*/false, [column_family, &key, &value, this]() { Status s = GetBatchForWrite()->Put(column_family, key, value); if (s.ok()) { ++num_puts_; } return s; }); } Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { return Operate( column_family, key, /*do_validate=*/false, /*assume_tracked=*/false, [column_family, &key, &value, this]() { Status s = GetBatchForWrite()->Put(column_family, key, value); if (s.ok()) { ++num_puts_; } return s; }); } Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family, const Slice& key, const bool assume_tracked) { const bool do_validate = !assume_tracked; return Operate(column_family, key, do_validate, assume_tracked, [column_family, &key, this]() { Status s = GetBatchForWrite()->Delete(column_family, key); if (s.ok()) { ++num_deletes_; } return s; }); } Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family, const SliceParts& key, const bool assume_tracked) { const bool do_validate = !assume_tracked; return Operate(column_family, key, do_validate, assume_tracked, [column_family, &key, this]() { Status s = GetBatchForWrite()->Delete(column_family, key); if (s.ok()) { ++num_deletes_; } return s; }); } Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family, const Slice& key) { return Operate(column_family, key, /*do_validate=*/false, /*assume_tracked=*/false, [column_family, &key, this]() { Status s = GetBatchForWrite()->Delete(column_family, key); if (s.ok()) { ++num_deletes_; } return s; }); } Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family, const SliceParts& key) { return Operate(column_family, key, /*do_validate=*/false, /*assume_tracked=*/false, [column_family, &key, this]() { Status s = GetBatchForWrite()->Delete(column_family, key); if (s.ok()) { ++num_deletes_; } return s; }); } Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, const bool assume_tracked) { const bool do_validate = !assume_tracked; return Operate(column_family, key, do_validate, assume_tracked, [column_family, &key, this]() { Status s = GetBatchForWrite()->SingleDelete(column_family, key); if (s.ok()) { ++num_deletes_; } return s; }); } Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, const bool assume_tracked) { const bool do_validate = !assume_tracked; return Operate(column_family, key, do_validate, assume_tracked, [column_family, &key, this]() { Status s = GetBatchForWrite()->SingleDelete(column_family, key); if (s.ok()) { ++num_deletes_; } return s; }); } Status WriteCommittedTxn::SingleDeleteUntracked( ColumnFamilyHandle* column_family, const Slice& key) { return Operate(column_family, key, /*do_validate=*/false, /*assume_tracked=*/false, [column_family, &key, this]() { Status s = GetBatchForWrite()->SingleDelete(column_family, key); if (s.ok()) { ++num_deletes_; } return s; }); } Status WriteCommittedTxn::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, const bool assume_tracked) { const bool do_validate = !assume_tracked; return Operate(column_family, key, do_validate, assume_tracked, [column_family, &key, &value, this]() { Status s = GetBatchForWrite()->Merge(column_family, key, value); if (s.ok()) { ++num_merges_; } return s; }); } template Status WriteCommittedTxn::Operate(ColumnFamilyHandle* column_family, const TKey& key, const bool do_validate, const bool assume_tracked, TOperation&& operation) { Status s; if constexpr (std::is_same_v) { s = TryLock(column_family, key, /*read_only=*/false, /*exclusive=*/true, do_validate, assume_tracked); } else if constexpr (std::is_same_v) { std::string key_buf; Slice contiguous_key(key, &key_buf); s = TryLock(column_family, contiguous_key, /*read_only=*/false, /*exclusive=*/true, do_validate, assume_tracked); } if (!s.ok()) { return s; } column_family = column_family ? column_family : db_impl_->DefaultColumnFamily(); assert(column_family); const Comparator* const ucmp = column_family->GetComparator(); assert(ucmp); size_t ts_sz = ucmp->timestamp_size(); if (ts_sz > 0) { assert(ts_sz == sizeof(TxnTimestamp)); if (!IndexingEnabled()) { cfs_with_ts_tracked_when_indexing_disabled_.insert( column_family->GetID()); } } return operation(); } Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) { if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) { return Status::InvalidArgument( "Cannot decrease read timestamp for validation"); } read_timestamp_ = ts; return Status::OK(); } Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) { if (read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) { return Status::InvalidArgument( "Cannot commit at timestamp smaller than or equal to read timestamp"); } commit_timestamp_ = ts; return Status::OK(); } Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { if (batch && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { // CommitBatch() needs to lock the keys in the batch. // However, the application also needs to specify the timestamp for the // keys in batch before calling this API. // This means timestamp order may violate the order of locking, thus // violate the sequence number order for the same user key. // Therefore, we disallow this operation for now. return Status::NotSupported( "Batch to commit includes timestamp assigned before locking"); } std::unique_ptr keys_to_unlock(lock_tracker_factory_.Create()); Status s = LockBatch(batch, keys_to_unlock.get()); if (!s.ok()) { return s; } bool can_commit = false; if (IsExpired()) { s = Status::Expired(); } else if (expiration_time_ > 0) { TransactionState expected = STARTED; can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected, AWAITING_COMMIT); } else if (txn_state_ == STARTED) { // lock stealing is not a concern can_commit = true; } if (can_commit) { txn_state_.store(AWAITING_COMMIT); s = CommitBatchInternal(batch); if (s.ok()) { txn_state_.store(COMMITTED); } } else if (txn_state_ == LOCKS_STOLEN) { s = Status::Expired(); } else { s = Status::InvalidArgument("Transaction is not in state for commit."); } txn_db_impl_->UnLock(this, *keys_to_unlock); return s; } Status PessimisticTransaction::Prepare() { if (name_.empty()) { return Status::InvalidArgument( "Cannot prepare a transaction that has not been named."); } if (IsExpired()) { return Status::Expired(); } Status s; bool can_prepare = false; if (expiration_time_ > 0) { // must concern ourselves with expiraton and/or lock stealing // need to compare/exchange bc locks could be stolen under us here TransactionState expected = STARTED; can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected, AWAITING_PREPARE); } else if (txn_state_ == STARTED) { // expiration and lock stealing is not possible txn_state_.store(AWAITING_PREPARE); can_prepare = true; } if (can_prepare) { // transaction can't expire after preparation expiration_time_ = 0; assert(log_number_ == 0 || txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); s = PrepareInternal(); if (s.ok()) { txn_state_.store(PREPARED); } } else if (txn_state_ == LOCKS_STOLEN) { s = Status::Expired(); } else if (txn_state_ == PREPARED) { s = Status::InvalidArgument("Transaction has already been prepared."); } else if (txn_state_ == COMMITTED) { s = Status::InvalidArgument("Transaction has already been committed."); } else if (txn_state_ == ROLLEDBACK) { s = Status::InvalidArgument("Transaction has already been rolledback."); } else { s = Status::InvalidArgument("Transaction is not in state for commit."); } return s; } Status WriteCommittedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); assert(s.ok()); class MarkLogCallback : public PreReleaseCallback { public: MarkLogCallback(DBImpl* db, bool two_write_queues) : db_(db), two_write_queues_(two_write_queues) { (void)two_write_queues_; // to silence unused private field warning } virtual Status Callback(SequenceNumber, bool is_mem_disabled, uint64_t log_number, size_t /*index*/, size_t /*total*/) override { #ifdef NDEBUG (void)is_mem_disabled; #endif assert(log_number != 0); assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number); return Status::OK(); } private: DBImpl* db_; bool two_write_queues_; } mark_log_callback(db_impl_, db_impl_->immutable_db_options().two_write_queues); WriteCallback* const kNoWriteCallback = nullptr; const uint64_t kRefNoLog = 0; const bool kDisableMemtable = true; SequenceNumber* const KIgnoreSeqUsed = nullptr; const size_t kNoBatchCount = 0; s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback, &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount, &mark_log_callback); return s; } Status PessimisticTransaction::Commit() { bool commit_without_prepare = false; bool commit_prepared = false; if (IsExpired()) { return Status::Expired(); } if (expiration_time_ > 0) { // we must atomicaly compare and exchange the state here because at // this state in the transaction it is possible for another thread // to change our state out from under us in the even that we expire and have // our locks stolen. In this case the only valid state is STARTED because // a state of PREPARED would have a cleared expiration_time_. TransactionState expected = STARTED; commit_without_prepare = std::atomic_compare_exchange_strong( &txn_state_, &expected, AWAITING_COMMIT); TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); } else if (txn_state_ == PREPARED) { // expiration and lock stealing is not a concern commit_prepared = true; } else if (txn_state_ == STARTED) { // expiration and lock stealing is not a concern if (skip_prepare_) { commit_without_prepare = true; } else { return Status::TxnNotPrepared(); } } Status s; if (commit_without_prepare) { assert(!commit_prepared); if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) { s = Status::InvalidArgument( "Commit-time batch contains values that will not be committed."); } else { txn_state_.store(AWAITING_COMMIT); if (log_number_ > 0) { dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( log_number_); } s = CommitWithoutPrepareInternal(); if (!name_.empty()) { txn_db_impl_->UnregisterTransaction(this); } Clear(); if (s.ok()) { txn_state_.store(COMMITTED); } } } else if (commit_prepared) { txn_state_.store(AWAITING_COMMIT); s = CommitInternal(); if (!s.ok()) { ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, "Commit write failed"); return s; } // FindObsoleteFiles must now look to the memtables // to determine what prep logs must be kept around, // not the prep section heap. assert(log_number_ > 0); dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( log_number_); txn_db_impl_->UnregisterTransaction(this); Clear(); txn_state_.store(COMMITTED); } else if (txn_state_ == LOCKS_STOLEN) { s = Status::Expired(); } else if (txn_state_ == COMMITTED) { s = Status::InvalidArgument("Transaction has already been committed."); } else if (txn_state_ == ROLLEDBACK) { s = Status::InvalidArgument("Transaction has already been rolledback."); } else { s = Status::InvalidArgument("Transaction is not in state for commit."); } return s; } Status WriteCommittedTxn::CommitWithoutPrepareInternal() { WriteBatchWithIndex* wbwi = GetWriteBatch(); assert(wbwi); WriteBatch* wb = wbwi->GetWriteBatch(); assert(wb); const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb); if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) { return Status::InvalidArgument("Must assign a commit timestamp"); } if (needs_ts) { assert(commit_timestamp_ != kMaxTxnTimestamp); char commit_ts_buf[sizeof(kMaxTxnTimestamp)]; EncodeFixed64(commit_ts_buf, commit_timestamp_); Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf)); Status s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t { auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf); if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) { return sizeof(kMaxTxnTimestamp); } const Comparator* ucmp = WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf); return ucmp ? ucmp->timestamp_size() : std::numeric_limits::max(); }); if (!s.ok()) { return s; } } uint64_t seq_used = kMaxSequenceNumber; SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_, snapshot_notifier_, snapshot_); PostMemTableCallback* post_mem_cb = nullptr; if (snapshot_needed_) { if (commit_timestamp_ == kMaxTxnTimestamp) { return Status::InvalidArgument("Must set transaction commit timestamp"); } else { post_mem_cb = &snapshot_creation_cb; } } auto s = db_impl_->WriteImpl(write_options_, wb, /*callback*/ nullptr, /*log_used*/ nullptr, /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used, /*batch_cnt=*/0, /*pre_release_callback=*/nullptr, post_mem_cb); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (s.ok()) { SetId(seq_used); } return s; } Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { uint64_t seq_used = kMaxSequenceNumber; auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr, /*log_used*/ nullptr, /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (s.ok()) { SetId(seq_used); } return s; } Status WriteCommittedTxn::CommitInternal() { WriteBatchWithIndex* wbwi = GetWriteBatch(); assert(wbwi); WriteBatch* wb = wbwi->GetWriteBatch(); assert(wb); const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb); if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) { return Status::InvalidArgument("Must assign a commit timestamp"); } // We take the commit-time batch and append the Commit marker. // The Memtable will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); Status s; if (!needs_ts) { s = WriteBatchInternal::MarkCommit(working_batch, name_); } else { assert(commit_timestamp_ != kMaxTxnTimestamp); char commit_ts_buf[sizeof(kMaxTxnTimestamp)]; EncodeFixed64(commit_ts_buf, commit_timestamp_); Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf)); s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_, commit_ts); if (s.ok()) { s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t { if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) != cfs_with_ts_tracked_when_indexing_disabled_.end()) { return sizeof(kMaxTxnTimestamp); } const Comparator* ucmp = WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf); return ucmp ? ucmp->timestamp_size() : std::numeric_limits::max(); }); } } if (!s.ok()) { return s; } // any operations appended to this working_batch will be ignored from WAL working_batch->MarkWalTerminationPoint(); // insert prepared batch into Memtable only skipping WAL. // Memtable will ignore BeginPrepare/EndPrepare markers // in non recovery mode and simply insert the values s = WriteBatchInternal::Append(working_batch, wb); assert(s.ok()); uint64_t seq_used = kMaxSequenceNumber; SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_, snapshot_notifier_, snapshot_); PostMemTableCallback* post_mem_cb = nullptr; if (snapshot_needed_) { if (commit_timestamp_ == kMaxTxnTimestamp) { s = Status::InvalidArgument("Must set transaction commit timestamp"); return s; } else { post_mem_cb = &snapshot_creation_cb; } } s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, /*log_used*/ nullptr, /*log_ref*/ log_number_, /*disable_memtable*/ false, &seq_used, /*batch_cnt=*/0, /*pre_release_callback=*/nullptr, post_mem_cb); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (s.ok()) { SetId(seq_used); } return s; } Status PessimisticTransaction::Rollback() { Status s; if (txn_state_ == PREPARED) { txn_state_.store(AWAITING_ROLLBACK); s = RollbackInternal(); if (s.ok()) { // we do not need to keep our prepared section around assert(log_number_ > 0); dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( log_number_); Clear(); txn_state_.store(ROLLEDBACK); } } else if (txn_state_ == STARTED) { if (log_number_ > 0) { assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); assert(GetId() > 0); s = RollbackInternal(); if (s.ok()) { dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( log_number_); } } // prepare couldn't have taken place Clear(); } else if (txn_state_ == COMMITTED) { s = Status::InvalidArgument("This transaction has already been committed."); } else { s = Status::InvalidArgument( "Two phase transaction is not in state for rollback."); } return s; } Status WriteCommittedTxn::RollbackInternal() { WriteBatch rollback_marker; auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_); assert(s.ok()); s = db_impl_->WriteImpl(write_options_, &rollback_marker); return s; } Status PessimisticTransaction::RollbackToSavePoint() { if (txn_state_ != STARTED) { return Status::InvalidArgument("Transaction is beyond state for rollback."); } if (save_points_ != nullptr && !save_points_->empty()) { // Unlock any keys locked since last transaction auto& save_point_tracker = *save_points_->top().new_locks_; std::unique_ptr t( tracked_locks_->GetTrackedLocksSinceSavePoint(save_point_tracker)); if (t) { txn_db_impl_->UnLock(this, *t); } } return TransactionBaseImpl::RollbackToSavePoint(); } // Lock all keys in this batch. // On success, caller should unlock keys_to_unlock Status PessimisticTransaction::LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock) { if (!batch) { return Status::InvalidArgument("batch is nullptr"); } class Handler : public WriteBatch::Handler { public: // Sorted map of column_family_id to sorted set of keys. // Since LockBatch() always locks keys in sorted order, it cannot deadlock // with itself. We're not using a comparator here since it doesn't matter // what the sorting is as long as it's consistent. std::map> keys_; Handler() {} void RecordKey(uint32_t column_family_id, const Slice& key) { std::string key_str = key.ToString(); auto& cfh_keys = keys_[column_family_id]; auto iter = cfh_keys.find(key_str); if (iter == cfh_keys.end()) { // key not yet seen, store it. cfh_keys.insert({std::move(key_str)}); } } Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& /* unused */) override { RecordKey(column_family_id, key); return Status::OK(); } Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& /* unused */) override { RecordKey(column_family_id, key); return Status::OK(); } Status DeleteCF(uint32_t column_family_id, const Slice& key) override { RecordKey(column_family_id, key); return Status::OK(); } }; // Iterating on this handler will add all keys in this batch into keys Handler handler; Status s = batch->Iterate(&handler); if (!s.ok()) { return s; } // Attempt to lock all keys for (const auto& cf_iter : handler.keys_) { uint32_t cfh_id = cf_iter.first; auto& cfh_keys = cf_iter.second; for (const auto& key_iter : cfh_keys) { const std::string& key = key_iter; s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */); if (!s.ok()) { break; } PointLockRequest r; r.column_family_id = cfh_id; r.key = key; r.seq = kMaxSequenceNumber; r.read_only = false; r.exclusive = true; keys_to_unlock->Track(r); } if (!s.ok()) { break; } } if (!s.ok()) { txn_db_impl_->UnLock(this, *keys_to_unlock); } return s; } // Attempt to lock this key. // Returns OK if the key has been successfully locked. Non-ok, otherwise. // If check_shapshot is true and this transaction has a snapshot set, // this key will only be locked if there have been no writes to this key since // the snapshot time. Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, const bool do_validate, const bool assume_tracked) { assert(!assume_tracked || !do_validate); Status s; if (UNLIKELY(skip_concurrency_control_)) { return s; } uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); PointLockStatus status; bool lock_upgrade; bool previously_locked; if (tracked_locks_->IsPointLockSupported()) { status = tracked_locks_->GetPointLockStatus(cfh_id, key_str); previously_locked = status.locked; lock_upgrade = previously_locked && exclusive && !status.exclusive; } else { // If the record is tracked, we can assume it was locked, too. previously_locked = assume_tracked; status.locked = false; lock_upgrade = false; } // Lock this key if this transactions hasn't already locked it or we require // an upgrade. if (!previously_locked || lock_upgrade) { s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive); } const ColumnFamilyHandle* const cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); assert(cfh); const Comparator* const ucmp = cfh->GetComparator(); assert(ucmp); size_t ts_sz = ucmp->timestamp_size(); SetSnapshotIfNeeded(); // Even though we do not care about doing conflict checking for this write, // we still need to take a lock to make sure we do not cause a conflict with // some other write. However, we do not need to check if there have been // any writes since this transaction's snapshot. // TODO(agiardullo): could optimize by supporting shared txn locks in the // future. SequenceNumber tracked_at_seq = status.locked ? status.seq : kMaxSequenceNumber; if (!do_validate || (snapshot_ == nullptr && (0 == ts_sz || kMaxTxnTimestamp == read_timestamp_))) { if (assume_tracked && !previously_locked && tracked_locks_->IsPointLockSupported()) { s = Status::InvalidArgument( "assume_tracked is set but it is not tracked yet"); } // Need to remember the earliest sequence number that we know that this // key has not been modified after. This is useful if this same // transaction later tries to lock this key again. if (tracked_at_seq == kMaxSequenceNumber) { // Since we haven't checked a snapshot, we only know this key has not // been modified since after we locked it. // Note: when last_seq_same_as_publish_seq_==false this is less than the // latest allocated seq but it is ok since i) this is just a heuristic // used only as a hint to avoid actual check for conflicts, ii) this would // cause a false positive only if the snapthot is taken right after the // lock, which would be an unusual sequence. tracked_at_seq = db_->GetLatestSequenceNumber(); } } else if (s.ok()) { // If a snapshot is set, we need to make sure the key hasn't been modified // since the snapshot. This must be done after we locked the key. // If we already have validated an earilier snapshot it must has been // reflected in tracked_at_seq and ValidateSnapshot will return OK. s = ValidateSnapshot(column_family, key, &tracked_at_seq); if (!s.ok()) { // Failed to validate key // Unlock key we just locked if (lock_upgrade) { s = txn_db_impl_->TryLock(this, cfh_id, key_str, false /* exclusive */); assert(s.ok()); } else if (!previously_locked) { txn_db_impl_->UnLock(this, cfh_id, key.ToString()); } } } if (s.ok()) { // We must track all the locked keys so that we can unlock them later. If // the key is already locked, this func will update some stats on the // tracked key. It could also update the tracked_at_seq if it is lower // than the existing tracked key seq. These stats are necessary for // RollbackToSavePoint to determine whether a key can be safely removed // from tracked_keys_. Removal can only be done if a key was only locked // during the current savepoint. // // Recall that if assume_tracked is true, we assume that TrackKey has been // called previously since the last savepoint, with the same exclusive // setting, and at a lower sequence number, so skipping here should be // safe. if (!assume_tracked) { TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive); } else { #ifndef NDEBUG if (tracked_locks_->IsPointLockSupported()) { PointLockStatus lock_status = tracked_locks_->GetPointLockStatus(cfh_id, key_str); assert(lock_status.locked); assert(lock_status.seq <= tracked_at_seq); assert(lock_status.exclusive == exclusive); } #endif } } return s; } Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family, const Endpoint& start_endp, const Endpoint& end_endp) { ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); uint32_t cfh_id = GetColumnFamilyID(cfh); Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp); if (s.ok()) { RangeLockRequest req{cfh_id, start_endp, end_endp}; tracked_locks_->Track(req); } return s; } // Return OK() if this key has not been modified more recently than the // transaction snapshot_. // tracked_at_seq is the global seq at which we either locked the key or already // have done ValidateSnapshot. Status PessimisticTransaction::ValidateSnapshot( ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber* tracked_at_seq) { assert(snapshot_ || read_timestamp_ < kMaxTxnTimestamp); SequenceNumber snap_seq = 0; if (snapshot_) { snap_seq = snapshot_->GetSequenceNumber(); if (*tracked_at_seq <= snap_seq) { // If the key has been previous validated (or locked) at a sequence number // earlier than the current snapshot's sequence number, we already know it // has not been modified aftter snap_seq either. return Status::OK(); } } else { snap_seq = db_impl_->GetLatestSequenceNumber(); } // Otherwise we have either // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key // 2: snap_seq < tracked_at_seq: last time we lock the key was via // do_validate=false which means we had skipped ValidateSnapshot. In both // cases we should do ValidateSnapshot now. *tracked_at_seq = snap_seq; ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); assert(cfh); const Comparator* const ucmp = cfh->GetComparator(); assert(ucmp); size_t ts_sz = ucmp->timestamp_size(); std::string ts_buf; if (ts_sz > 0 && read_timestamp_ < kMaxTxnTimestamp) { assert(ts_sz == sizeof(read_timestamp_)); PutFixed64(&ts_buf, read_timestamp_); } return TransactionUtil::CheckKeyForConflicts( db_impl_, cfh, key.ToString(), snap_seq, ts_sz == 0 ? nullptr : &ts_buf, false /* cache_only */); } bool PessimisticTransaction::TryStealingLocks() { assert(IsExpired()); TransactionState expected = STARTED; return std::atomic_compare_exchange_strong(&txn_state_, &expected, LOCKS_STOLEN); } void PessimisticTransaction::UnlockGetForUpdate( ColumnFamilyHandle* column_family, const Slice& key) { txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); } Status PessimisticTransaction::SetName(const TransactionName& name) { Status s; if (txn_state_ == STARTED) { if (name_.length()) { s = Status::InvalidArgument("Transaction has already been named."); } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) { s = Status::InvalidArgument("Transaction name must be unique."); } else if (name.length() < 1 || name.length() > 512) { s = Status::InvalidArgument( "Transaction name length must be between 1 and 512 chars."); } else { name_ = name; txn_db_impl_->RegisterTransaction(this); } } else { s = Status::InvalidArgument("Transaction is beyond state for naming."); } return s; } } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE