summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc')
-rw-r--r--src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc468
1 files changed, 468 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc b/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
new file mode 100644
index 000000000..ca365d044
--- /dev/null
+++ b/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
@@ -0,0 +1,468 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/transactions/write_unprepared_txn_db.h"
+#include "db/arena_wrapped_db_iter.h"
+#include "rocksdb/utilities/transaction_db.h"
+#include "util/cast_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Instead of reconstructing a Transaction object, and calling rollback on it,
+// we can be more efficient with RollbackRecoveredTransaction by skipping
+// unnecessary steps (eg. updating CommitMap, reconstructing keyset)
+Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
+ const DBImpl::RecoveredTransaction* rtxn) {
+ // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
+ assert(rtxn->unprepared_);
+ auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
+ auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
+ // In theory we could write with disableWAL = true during recovery, and
+ // assume that if we crash again during recovery, we can just replay from
+ // the very beginning. Unfortunately, the XIDs from the application may not
+ // necessarily be unique across restarts, potentially leading to situations
+ // like this:
+ //
+ // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
+ // -- crash and recover with Put(a) rolled back as it was not prepared
+ // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
+ // COMMIT(xid = 1)
+ // -- crash and recover with both a, b
+ //
+ // We could just write the rollback marker, but then we would have to extend
+ // MemTableInserter during recovery to actually do writes into the DB
+ // instead of just dropping the in-memory write batch.
+ //
+ WriteOptions w_options;
+
+ class InvalidSnapshotReadCallback : public ReadCallback {
+ public:
+ InvalidSnapshotReadCallback(SequenceNumber snapshot)
+ : ReadCallback(snapshot) {}
+
+ inline bool IsVisibleFullCheck(SequenceNumber) override {
+ // The seq provided as snapshot is the seq right before we have locked and
+ // wrote to it, so whatever is there, it is committed.
+ return true;
+ }
+
+ // Ignore the refresh request since we are confident that our snapshot seq
+ // is not going to be affected by concurrent compactions (not enabled yet.)
+ void Refresh(SequenceNumber) override {}
+ };
+
+ // Iterate starting with largest sequence number.
+ for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
+ auto last_visible_txn = it->first - 1;
+ const auto& batch = it->second.batch_;
+ WriteBatch rollback_batch;
+
+ struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
+ DBImpl* db_;
+ ReadOptions roptions;
+ InvalidSnapshotReadCallback callback;
+ WriteBatch* rollback_batch_;
+ std::map<uint32_t, const Comparator*>& comparators_;
+ std::map<uint32_t, ColumnFamilyHandle*>& handles_;
+ using CFKeys = std::set<Slice, SetComparator>;
+ std::map<uint32_t, CFKeys> keys_;
+ bool rollback_merge_operands_;
+ RollbackWriteBatchBuilder(
+ DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
+ std::map<uint32_t, const Comparator*>& comparators,
+ std::map<uint32_t, ColumnFamilyHandle*>& handles,
+ bool rollback_merge_operands)
+ : db_(db),
+ callback(snap_seq),
+ // disable min_uncommitted optimization
+ rollback_batch_(dst_batch),
+ comparators_(comparators),
+ handles_(handles),
+ rollback_merge_operands_(rollback_merge_operands) {}
+
+ Status Rollback(uint32_t cf, const Slice& key) {
+ Status s;
+ CFKeys& cf_keys = keys_[cf];
+ if (cf_keys.size() == 0) { // just inserted
+ auto cmp = comparators_[cf];
+ keys_[cf] = CFKeys(SetComparator(cmp));
+ }
+ auto res = cf_keys.insert(key);
+ if (res.second ==
+ false) { // second is false if a element already existed.
+ return s;
+ }
+
+ PinnableSlice pinnable_val;
+ bool not_used;
+ auto cf_handle = handles_[cf];
+ DBImpl::GetImplOptions get_impl_options;
+ get_impl_options.column_family = cf_handle;
+ get_impl_options.value = &pinnable_val;
+ get_impl_options.value_found = &not_used;
+ get_impl_options.callback = &callback;
+ s = db_->GetImpl(roptions, key, get_impl_options);
+ assert(s.ok() || s.IsNotFound());
+ if (s.ok()) {
+ s = rollback_batch_->Put(cf_handle, key, pinnable_val);
+ assert(s.ok());
+ } else if (s.IsNotFound()) {
+ // There has been no readable value before txn. By adding a delete we
+ // make sure that there will be none afterwards either.
+ s = rollback_batch_->Delete(cf_handle, key);
+ assert(s.ok());
+ } else {
+ // Unexpected status. Return it to the user.
+ }
+ return s;
+ }
+
+ Status PutCF(uint32_t cf, const Slice& key,
+ const Slice& /*val*/) override {
+ return Rollback(cf, key);
+ }
+
+ Status DeleteCF(uint32_t cf, const Slice& key) override {
+ return Rollback(cf, key);
+ }
+
+ Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
+ return Rollback(cf, key);
+ }
+
+ Status MergeCF(uint32_t cf, const Slice& key,
+ const Slice& /*val*/) override {
+ if (rollback_merge_operands_) {
+ return Rollback(cf, key);
+ } else {
+ return Status::OK();
+ }
+ }
+
+ // Recovered batches do not contain 2PC markers.
+ Status MarkNoop(bool) override { return Status::InvalidArgument(); }
+ Status MarkBeginPrepare(bool) override {
+ return Status::InvalidArgument();
+ }
+ Status MarkEndPrepare(const Slice&) override {
+ return Status::InvalidArgument();
+ }
+ Status MarkCommit(const Slice&) override {
+ return Status::InvalidArgument();
+ }
+ Status MarkRollback(const Slice&) override {
+ return Status::InvalidArgument();
+ }
+ } rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
+ *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
+ txn_db_options_.rollback_merge_operands);
+
+ auto s = batch->Iterate(&rollback_handler);
+ if (!s.ok()) {
+ return s;
+ }
+
+ // The Rollback marker will be used as a batch separator
+ WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
+
+ const uint64_t kNoLogRef = 0;
+ const bool kDisableMemtable = true;
+ const size_t kOneBatch = 1;
+ uint64_t seq_used = kMaxSequenceNumber;
+ s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
+ kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
+ if (!s.ok()) {
+ return s;
+ }
+
+ // If two_write_queues, we must manually release the sequence number to
+ // readers.
+ if (db_impl_->immutable_db_options().two_write_queues) {
+ db_impl_->SetLastPublishedSequence(seq_used);
+ }
+ }
+
+ return Status::OK();
+}
+
+Status WriteUnpreparedTxnDB::Initialize(
+ const std::vector<size_t>& compaction_enabled_cf_indices,
+ const std::vector<ColumnFamilyHandle*>& handles) {
+ // TODO(lth): Reduce code duplication in this function.
+ auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());
+ assert(dbimpl != nullptr);
+
+ db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
+ // A callback to commit a single sub-batch
+ class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
+ public:
+ explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
+ : db_(db) {}
+ Status Callback(SequenceNumber commit_seq,
+ bool is_mem_disabled __attribute__((__unused__)), uint64_t,
+ size_t /*index*/, size_t /*total*/) override {
+ assert(!is_mem_disabled);
+ db_->AddCommitted(commit_seq, commit_seq);
+ return Status::OK();
+ }
+
+ private:
+ WritePreparedTxnDB* db_;
+ };
+ db_impl_->SetRecoverableStatePreReleaseCallback(
+ new CommitSubBatchPreReleaseCallback(this));
+
+ // PessimisticTransactionDB::Initialize
+ for (auto cf_ptr : handles) {
+ AddColumnFamily(cf_ptr);
+ }
+ // Verify cf options
+ for (auto handle : handles) {
+ ColumnFamilyDescriptor cfd;
+ Status s = handle->GetDescriptor(&cfd);
+ if (!s.ok()) {
+ return s;
+ }
+ s = VerifyCFOptions(cfd.options);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ // Re-enable compaction for the column families that initially had
+ // compaction enabled.
+ std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
+ compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
+ for (auto index : compaction_enabled_cf_indices) {
+ compaction_enabled_cf_handles.push_back(handles[index]);
+ }
+
+ // create 'real' transactions from recovered shell transactions
+ auto rtxns = dbimpl->recovered_transactions();
+ std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
+ for (auto rtxn : rtxns) {
+ auto recovered_trx = rtxn.second;
+ assert(recovered_trx);
+ assert(recovered_trx->batches_.size() >= 1);
+ assert(recovered_trx->name_.length());
+
+ // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
+ // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
+ // two iterations is required.
+ if (recovered_trx->unprepared_) {
+ continue;
+ }
+
+ WriteOptions w_options;
+ w_options.sync = true;
+ TransactionOptions t_options;
+
+ auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
+ auto first_seq = recovered_trx->batches_.begin()->first;
+ auto last_prepare_batch_cnt =
+ recovered_trx->batches_.begin()->second.batch_cnt_;
+
+ Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
+ assert(real_trx);
+ auto wupt =
+ static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx);
+ wupt->recovered_txn_ = true;
+
+ real_trx->SetLogNumber(first_log_number);
+ real_trx->SetId(first_seq);
+ Status s = real_trx->SetName(recovered_trx->name_);
+ if (!s.ok()) {
+ return s;
+ }
+ wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
+
+ for (auto batch : recovered_trx->batches_) {
+ const auto& seq = batch.first;
+ const auto& batch_info = batch.second;
+ auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
+ assert(batch_info.log_number_);
+
+ ordered_seq_cnt[seq] = cnt;
+ assert(wupt->unprep_seqs_.count(seq) == 0);
+ wupt->unprep_seqs_[seq] = cnt;
+
+ s = wupt->RebuildFromWriteBatch(batch_info.batch_);
+ assert(s.ok());
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ const bool kClear = true;
+ wupt->InitWriteBatch(kClear);
+
+ real_trx->SetState(Transaction::PREPARED);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ // AddPrepared must be called in order
+ for (auto seq_cnt : ordered_seq_cnt) {
+ auto seq = seq_cnt.first;
+ auto cnt = seq_cnt.second;
+ for (size_t i = 0; i < cnt; i++) {
+ AddPrepared(seq + i);
+ }
+ }
+
+ SequenceNumber prev_max = max_evicted_seq_;
+ SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
+ AdvanceMaxEvictedSeq(prev_max, last_seq);
+ // Create a gap between max and the next snapshot. This simplifies the logic
+ // in IsInSnapshot by not having to consider the special case of max ==
+ // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
+ if (last_seq) {
+ db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
+ db_impl_->versions_->SetLastSequence(last_seq + 1);
+ db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
+ }
+
+ Status s;
+ // Rollback unprepared transactions.
+ for (auto rtxn : rtxns) {
+ auto recovered_trx = rtxn.second;
+ if (recovered_trx->unprepared_) {
+ s = RollbackRecoveredTransaction(recovered_trx);
+ if (!s.ok()) {
+ return s;
+ }
+ continue;
+ }
+ }
+
+ if (s.ok()) {
+ dbimpl->DeleteAllRecoveredTransactions();
+
+ // Compaction should start only after max_evicted_seq_ is set AND recovered
+ // transactions are either added to PrepareHeap or rolled back.
+ s = EnableAutoCompaction(compaction_enabled_cf_handles);
+ }
+
+ return s;
+}
+
+Transaction* WriteUnpreparedTxnDB::BeginTransaction(
+ const WriteOptions& write_options, const TransactionOptions& txn_options,
+ Transaction* old_txn) {
+ if (old_txn != nullptr) {
+ ReinitializeTransaction(old_txn, write_options, txn_options);
+ return old_txn;
+ } else {
+ return new WriteUnpreparedTxn(this, write_options, txn_options);
+ }
+}
+
+// Struct to hold ownership of snapshot and read callback for iterator cleanup.
+struct WriteUnpreparedTxnDB::IteratorState {
+ IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
+ std::shared_ptr<ManagedSnapshot> s,
+ SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
+ : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
+ kBackedByDBSnapshot),
+ snapshot(s) {}
+ SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
+
+ WriteUnpreparedTxnReadCallback callback;
+ std::shared_ptr<ManagedSnapshot> snapshot;
+};
+
+namespace {
+static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
+ delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
+}
+} // anonymous namespace
+
+Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family,
+ WriteUnpreparedTxn* txn) {
+ // TODO(lth): Refactor so that this logic is shared with WritePrepared.
+ constexpr bool ALLOW_BLOB = true;
+ constexpr bool ALLOW_REFRESH = true;
+ std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
+ SequenceNumber snapshot_seq = kMaxSequenceNumber;
+ SequenceNumber min_uncommitted = 0;
+
+ // Currently, the Prev() iterator logic does not work well without snapshot
+ // validation. The logic simply iterates through values of a key in
+ // ascending seqno order, stopping at the first non-visible value and
+ // returning the last visible value.
+ //
+ // For example, if snapshot sequence is 3, and we have the following keys:
+ // foo: v1 1
+ // foo: v2 2
+ // foo: v3 3
+ // foo: v4 4
+ // foo: v5 5
+ //
+ // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
+ // which is the last visible value.
+ //
+ // For unprepared transactions, if we have snap_seq = 3, but the current
+ // transaction has unprep_seq 5, then returning the first non-visible value
+ // would be incorrect, as we should return v5, and not v3. The problem is that
+ // there are committed values at snapshot_seq < commit_seq < unprep_seq.
+ //
+ // Snapshot validation can prevent this problem by ensuring that no committed
+ // values exist at snapshot_seq < commit_seq, and thus any value with a
+ // sequence number greater than snapshot_seq must be unprepared values. For
+ // example, if the transaction had a snapshot at 3, then snapshot validation
+ // would be performed during the Put(v5) call. It would find v4, and the Put
+ // would fail with snapshot validation failure.
+ //
+ // TODO(lth): Improve Prev() logic to continue iterating until
+ // max_visible_seq, and then return the last visible value, so that this
+ // restriction can be lifted.
+ const Snapshot* snapshot = nullptr;
+ if (options.snapshot == nullptr) {
+ snapshot = GetSnapshot();
+ own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
+ } else {
+ snapshot = options.snapshot;
+ }
+
+ snapshot_seq = snapshot->GetSequenceNumber();
+ assert(snapshot_seq != kMaxSequenceNumber);
+ // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
+ // guaranteed that for keys that were modified by this transaction (and thus
+ // might have unprepared values), no committed values exist at
+ // largest_validated_seq < commit_seq (or the contrapositive: any committed
+ // value must exist at commit_seq <= largest_validated_seq). This implies
+ // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
+ // snapshot_seq. As explained above, the problem with Prev() only happens when
+ // snapshot_seq < commit_seq.
+ //
+ // For keys that were not modified by this transaction, largest_validated_seq_
+ // is meaningless, and Prev() should just work with the existing visibility
+ // logic.
+ if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
+ !txn->unprep_seqs_.empty()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "WriteUnprepared iterator creation failed since the "
+ "transaction has performed unvalidated writes");
+ return nullptr;
+ }
+ min_uncommitted =
+ static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
+ ->min_uncommitted_;
+
+ auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
+ auto* state =
+ new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
+ auto* db_iter =
+ db_impl_->NewIteratorImpl(options, cfd, state->MaxVisibleSeq(),
+ &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH);
+ db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
+ return db_iter;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE