diff options
Diffstat (limited to 'src/rocksdb/utilities/transactions/write_unprepared_txn.cc')
-rw-r--r-- | src/rocksdb/utilities/transactions/write_unprepared_txn.cc | 502 |
1 files changed, 502 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/write_unprepared_txn.cc b/src/rocksdb/utilities/transactions/write_unprepared_txn.cc new file mode 100644 index 00000000..731460ed --- /dev/null +++ b/src/rocksdb/utilities/transactions/write_unprepared_txn.cc @@ -0,0 +1,502 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/write_unprepared_txn.h" +#include "db/db_impl.h" +#include "util/cast_util.h" +#include "utilities/transactions/write_unprepared_txn_db.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +namespace rocksdb { + +bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { + auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); + + // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is + // in unprep_seqs, we have to check if seq is equal to prep_seq or any of + // the prepare_batch_cnt seq nums after it. + // + // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is + // large. + for (const auto& it : unprep_seqs) { + if (it.first <= seq && seq < it.first + it.second) { + return true; + } + } + + return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_); +} + +SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber( + WriteUnpreparedTxn* txn) { + auto unprep_seqs = txn->GetUnpreparedSequenceNumbers(); + if (unprep_seqs.size()) { + return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; + } + return 0; +} + +WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) + : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) { + max_write_batch_size_ = txn_options.max_write_batch_size; + // We set max bytes to zero so that we don't get a memory limit error. + // Instead of trying to keep write batch strictly under the size limit, we + // just flush to DB when the limit is exceeded in write unprepared, to avoid + // having retry logic. This also allows very big key-value pairs that exceed + // max bytes to succeed. + write_batch_.SetMaxBytes(0); +} + +WriteUnpreparedTxn::~WriteUnpreparedTxn() { + if (!unprep_seqs_.empty()) { + assert(log_number_ > 0); + assert(GetId() > 0); + assert(!name_.empty()); + + // We should rollback regardless of GetState, but some unit tests that + // test crash recovery run the destructor assuming that rollback does not + // happen, so that rollback during recovery can be exercised. + if (GetState() == STARTED) { + auto s __attribute__((__unused__)) = RollbackInternal(); + // TODO(lth): Better error handling. + assert(s.ok()); + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); + } + } +} + +void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { + PessimisticTransaction::Initialize(txn_options); + max_write_batch_size_ = txn_options.max_write_batch_size; + write_batch_.SetMaxBytes(0); + unprep_seqs_.clear(); + write_set_keys_.clear(); +} + +Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value, + const bool assume_tracked) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); +} + +Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value, + const bool assume_tracked) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); +} + +Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value, + const bool assume_tracked) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Merge(column_family, key, value, assume_tracked); +} + +Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, + const Slice& key, const bool assume_tracked) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Delete(column_family, key, assume_tracked); +} + +Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key, + const bool assume_tracked) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Delete(column_family, key, assume_tracked); +} + +Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key, + const bool assume_tracked) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); +} + +Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key, + const bool assume_tracked) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); +} + +Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { + const bool kPrepared = true; + Status s; + if (max_write_batch_size_ != 0 && + write_batch_.GetDataSize() > max_write_batch_size_) { + assert(GetState() != PREPARED); + s = FlushWriteBatchToDB(!kPrepared); + } + return s; +} + +void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) { + // TODO(lth): write_set_keys_ can just be a std::string instead of a vector. + write_set_keys_[cfid].push_back(key.ToString()); +} + +Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { + if (name_.empty()) { + return Status::InvalidArgument("Cannot write to DB without SetName."); + } + + // Update write_key_set_ for rollback purposes. + KeySetBuilder keyset_handler( + this, wupt_db_->txn_db_options_.rollback_merge_operands); + auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler); + assert(s.ok()); + if (!s.ok()) { + return s; + } + + // TODO(lth): Reduce duplicate code with WritePrepared prepare logic. + WriteOptions write_options = write_options_; + write_options.disableWAL = false; + const bool WRITE_AFTER_COMMIT = true; + const bool first_prepare_batch = log_number_ == 0; + // MarkEndPrepare will change Noop marker to the appropriate marker. + WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, + !WRITE_AFTER_COMMIT, !prepared); + // For each duplicate key we account for a new sub-batch + prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); + // AddPrepared better to be called in the pre-release callback otherwise there + // is a non-zero chance of max advancing prepare_seq and readers assume the + // data as committed. + // Also having it in the PreReleaseCallback allows in-order addition of + // prepared entries to PreparedHeap and hence enables an optimization. Refer + // to SmallestUnCommittedSeq for more details. + AddPreparedCallback add_prepared_callback( + wpt_db_, db_impl_, prepare_batch_cnt_, + db_impl_->immutable_db_options().two_write_queues, first_prepare_batch); + const bool DISABLE_MEMTABLE = true; + uint64_t seq_used = kMaxSequenceNumber; + // log_number_ should refer to the oldest log containing uncommitted data + // from the current transaction. This means that if log_number_ is set, + // WriteImpl should not overwrite that value, so set log_used to nullptr if + // log_number_ is already set. + uint64_t* log_used = log_number_ ? nullptr : &log_number_; + s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, log_used, /*log ref*/ + 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, + &add_prepared_callback); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + auto prepare_seq = seq_used; + + // Only call SetId if it hasn't been set yet. + if (GetId() == 0) { + SetId(prepare_seq); + } + // unprep_seqs_ will also contain prepared seqnos since they are treated in + // the same way in the prepare/commit callbacks. See the comment on the + // definition of unprep_seqs_. + unprep_seqs_[prepare_seq] = prepare_batch_cnt_; + + // Reset transaction state. + if (!prepared) { + prepare_batch_cnt_ = 0; + write_batch_.Clear(); + WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + } + + return s; +} + +Status WriteUnpreparedTxn::PrepareInternal() { + const bool kPrepared = true; + return FlushWriteBatchToDB(kPrepared); +} + +Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() { + if (unprep_seqs_.empty()) { + assert(log_number_ == 0); + assert(GetId() == 0); + return WritePreparedTxn::CommitWithoutPrepareInternal(); + } + + // TODO(lth): We should optimize commit without prepare to not perform + // a prepare under the hood. + auto s = PrepareInternal(); + if (!s.ok()) { + return s; + } + return CommitInternal(); +} + +Status WriteUnpreparedTxn::CommitInternal() { + // TODO(lth): Reduce duplicate code with WritePrepared commit logic. + + // We take the commit-time batch and append the Commit marker. The Memtable + // will ignore the Commit marker in non-recovery mode + WriteBatch* working_batch = GetCommitTimeWriteBatch(); + const bool empty = working_batch->Count() == 0; + WriteBatchInternal::MarkCommit(working_batch, name_); + + const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; + if (!empty && for_recovery) { + // When not writing to memtable, we can still cache the latest write batch. + // The cached batch will be written to memtable in WriteRecoverableState + // during FlushMemTable + WriteBatchInternal::SetAsLastestPersistentState(working_batch); + } + + const bool includes_data = !empty && !for_recovery; + size_t commit_batch_cnt = 0; + if (UNLIKELY(includes_data)) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Duplicate key overhead"); + SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); + auto s = working_batch->Iterate(&counter); + assert(s.ok()); + commit_batch_cnt = counter.BatchCount(); + } + const bool disable_memtable = !includes_data; + const bool do_one_write = + !db_impl_->immutable_db_options().two_write_queues || disable_memtable; + const bool publish_seq = do_one_write; + // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to + // DB in one shot. min_uncommitted still works since it requires capturing + // data that is written to DB but not yet committed, while + // CommitTimeWriteBatch commits with PreReleaseCallback. + WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt, publish_seq); + uint64_t seq_used = kMaxSequenceNumber; + // Since the prepared batch is directly written to memtable, there is already + // a connection between the memtable and its WAL, so there is no need to + // redundantly reference the log that contains the prepared data. + const uint64_t zero_log_number = 0ull; + size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; + auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + zero_log_number, disable_memtable, &seq_used, + batch_cnt, &update_commit_map); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + if (LIKELY(do_one_write || !s.ok())) { + if (LIKELY(s.ok())) { + // Note RemovePrepared should be called after WriteImpl that publishsed + // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + } + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; + } // else do the 2nd write to publish seq + // Note: the 2nd write comes with a performance penality. So if we have too + // many of commits accompanied with ComitTimeWriteBatch and yet we cannot + // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, + // two_write_queues should be disabled to avoid many additional writes here. + class PublishSeqPreReleaseCallback : public PreReleaseCallback { + public: + explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) + : db_impl_(db_impl) {} + Status Callback(SequenceNumber seq, + bool is_mem_disabled __attribute__((__unused__)), + uint64_t) override { + assert(is_mem_disabled); + assert(db_impl_->immutable_db_options().two_write_queues); + db_impl_->SetLastPublishedSequence(seq); + return Status::OK(); + } + + private: + DBImpl* db_impl_; + } publish_seq_callback(db_impl_); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + const bool DISABLE_MEMTABLE = true; + const size_t ONE_BATCH = 1; + const uint64_t NO_REF_LOG = 0; + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + &publish_seq_callback); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + // Note RemovePrepared should be called after WriteImpl that publishsed the + // seq. Otherwise SmallestUnCommittedSeq optimization breaks. + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; +} + +Status WriteUnpreparedTxn::RollbackInternal() { + // TODO(lth): Reduce duplicate code with WritePrepared rollback logic. + WriteBatchWithIndex rollback_batch( + wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); + assert(GetId() != kMaxSequenceNumber); + assert(GetId() > 0); + const auto& cf_map = *wupt_db_->GetCFHandleMap(); + auto read_at_seq = kMaxSequenceNumber; + Status s; + + ReadOptions roptions; + // Note that we do not use WriteUnpreparedTxnReadCallback because we do not + // need to read our own writes when reading prior versions of the key for + // rollback. + WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); + for (const auto& cfkey : write_set_keys_) { + const auto cfid = cfkey.first; + const auto& keys = cfkey.second; + for (const auto& key : keys) { + const auto& cf_handle = cf_map.at(cfid); + PinnableSlice pinnable_val; + bool not_used; + s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); + + if (s.ok()) { + s = rollback_batch.Put(cf_handle, key, pinnable_val); + assert(s.ok()); + } else if (s.IsNotFound()) { + s = rollback_batch.Delete(cf_handle, key); + assert(s.ok()); + } else { + return s; + } + } + } + + // The Rollback marker will be used as a batch separator + WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_); + bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; + const bool DISABLE_MEMTABLE = true; + const uint64_t NO_REF_LOG = 0; + uint64_t seq_used = kMaxSequenceNumber; + // TODO(lth): We write rollback batch all in a single batch here, but this + // should be subdivded into multiple batches as well. In phase 2, when key + // sets are read from WAL, this will happen naturally. + const size_t ONE_BATCH = 1; + // We commit the rolled back prepared batches. ALthough this is + // counter-intuitive, i) it is safe to do so, since the prepared batches are + // already canceled out by the rollback batch, ii) adding the commit entry to + // CommitCache will allow us to benefit from the existing mechanism in + // CommitCache that keeps an entry evicted due to max advance and yet overlaps + // with a live snapshot around so that the live snapshot properly skips the + // entry even if its prepare seq is lower than max_evicted_seq_. + WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH); + // Note: the rollback batch does not need AddPrepared since it is written to + // DB in one shot. min_uncommitted still works since it requires capturing + // data that is written to DB but not yet committed, while the roolback + // batch commits with PreReleaseCallback. + s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(), + nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE, + &seq_used, rollback_batch.SubBatchCnt(), + do_one_write ? &update_commit_map : nullptr); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + if (!s.ok()) { + return s; + } + if (do_one_write) { + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; + } // else do the 2nd write for commit + uint64_t& prepare_seq = seq_used; + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, + "RollbackInternal 2nd write prepare_seq: %" PRIu64, + prepare_seq); + // Commit the batch by writing an empty batch to the queue that will release + // the commit sequence number to readers. + WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare( + wpt_db_, db_impl_, unprep_seqs_, prepare_seq); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + &update_commit_map_with_prepare); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + // Mark the txn as rolled back + if (s.ok()) { + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + } + + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; +} + +Status WriteUnpreparedTxn::Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) { + auto snapshot = options.snapshot; + auto snap_seq = + snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; + SequenceNumber min_uncommitted = + kMinUnCommittedSeq; // by default disable the optimization + if (snapshot != nullptr) { + min_uncommitted = + static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) + ->min_uncommitted_; + } + + WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, + this); + return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value, + &callback); +} + +Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { + return GetIterator(options, wupt_db_->DefaultColumnFamily()); +} + +Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + // Make sure to get iterator from WriteUnprepareTxnDB, not the root db. + Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); + assert(db_iter); + + return write_batch_.NewIteratorWithBase(column_family, db_iter); +} + +const std::map<SequenceNumber, size_t>& +WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() { + return unprep_seqs_; +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE |