summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/write_unprepared_txn.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/transactions/write_unprepared_txn.h341
1 files changed, 341 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/write_unprepared_txn.h b/src/rocksdb/utilities/transactions/write_unprepared_txn.h
new file mode 100644
index 000000000..30c8f4c55
--- /dev/null
+++ b/src/rocksdb/utilities/transactions/write_unprepared_txn.h
@@ -0,0 +1,341 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <set>
+
+#include "utilities/transactions/write_prepared_txn.h"
+#include "utilities/transactions/write_unprepared_txn_db.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class WriteUnpreparedTxnDB;
+class WriteUnpreparedTxn;
+
+// WriteUnprepared transactions needs to be able to read their own uncommitted
+// writes, and supporting this requires some careful consideration. Because
+// writes in the current transaction may be flushed to DB already, we cannot
+// rely on the contents of WriteBatchWithIndex to determine whether a key should
+// be visible or not, so we have to remember to check the DB for any uncommitted
+// keys that should be visible to us. First, we will need to change the seek to
+// snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
+// Any key greater than max_visible_seq should not be visible because they
+// cannot be unprepared by the current transaction and they are not in its
+// snapshot.
+//
+// When we seek to max_visible_seq, one of these cases will happen:
+// 1. We hit a unprepared key from the current transaction.
+// 2. We hit a unprepared key from the another transaction.
+// 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
+// 4. We hit a committed key with seq <= snap_seq.
+//
+// IsVisibleFullCheck handles all cases correctly.
+//
+// Other notes:
+// Note that max_visible_seq is only calculated once at iterator construction
+// time, meaning if the same transaction is adding more unprep seqs through
+// writes during iteration, these newer writes may not be visible. This is not a
+// problem for MySQL though because it avoids modifying the index as it is
+// scanning through it to avoid the Halloween Problem. Instead, it scans the
+// index once up front, and modifies based on a temporary copy.
+//
+// In DBIter, there is a "reseek" optimization if the iterator skips over too
+// many keys. However, this assumes that the reseek seeks exactly to the
+// required key. In write unprepared, even after seeking directly to
+// max_visible_seq, some iteration may be required before hitting a visible key,
+// and special precautions must be taken to avoid performing another reseek,
+// leading to an infinite loop.
+//
+class WriteUnpreparedTxnReadCallback : public ReadCallback {
+ public:
+ WriteUnpreparedTxnReadCallback(
+ WritePreparedTxnDB* db, SequenceNumber snapshot,
+ SequenceNumber min_uncommitted,
+ const std::map<SequenceNumber, size_t>& unprep_seqs,
+ SnapshotBackup backed_by_snapshot)
+ // Pass our last uncommitted seq as the snapshot to the parent class to
+ // ensure that the parent will not prematurely filter out own writes. We
+ // will do the exact comparison against snapshots in IsVisibleFullCheck
+ // override.
+ : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
+ db_(db),
+ unprep_seqs_(unprep_seqs),
+ wup_snapshot_(snapshot),
+ backed_by_snapshot_(backed_by_snapshot) {
+ (void)backed_by_snapshot_; // to silence unused private field warning
+ }
+
+ virtual ~WriteUnpreparedTxnReadCallback() {
+ // If it is not backed by snapshot, the caller must check validity
+ assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
+ }
+
+ virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
+
+ inline bool valid() {
+ valid_checked_ = true;
+ return snap_released_ == false;
+ }
+
+ void Refresh(SequenceNumber seq) override {
+ max_visible_seq_ = std::max(max_visible_seq_, seq);
+ wup_snapshot_ = seq;
+ }
+
+ static SequenceNumber CalcMaxVisibleSeq(
+ const std::map<SequenceNumber, size_t>& unprep_seqs,
+ SequenceNumber snapshot_seq) {
+ SequenceNumber max_unprepared = 0;
+ if (unprep_seqs.size()) {
+ max_unprepared =
+ unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
+ }
+ return std::max(max_unprepared, snapshot_seq);
+ }
+
+ private:
+ WritePreparedTxnDB* db_;
+ const std::map<SequenceNumber, size_t>& unprep_seqs_;
+ SequenceNumber wup_snapshot_;
+ // Whether max_visible_seq_ is backed by a snapshot
+ const SnapshotBackup backed_by_snapshot_;
+ bool snap_released_ = false;
+ // Safety check to ensure that the caller has checked invalid statuses
+ bool valid_checked_ = false;
+};
+
+class WriteUnpreparedTxn : public WritePreparedTxn {
+ public:
+ WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
+ const WriteOptions& write_options,
+ const TransactionOptions& txn_options);
+
+ virtual ~WriteUnpreparedTxn();
+
+ using TransactionBaseImpl::Put;
+ virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value,
+ const bool assume_tracked = false) override;
+ virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
+ const SliceParts& value,
+ const bool assume_tracked = false) override;
+
+ using TransactionBaseImpl::Merge;
+ virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value,
+ const bool assume_tracked = false) override;
+
+ using TransactionBaseImpl::Delete;
+ virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
+ const bool assume_tracked = false) override;
+ virtual Status Delete(ColumnFamilyHandle* column_family,
+ const SliceParts& key,
+ const bool assume_tracked = false) override;
+
+ using TransactionBaseImpl::SingleDelete;
+ virtual Status SingleDelete(ColumnFamilyHandle* column_family,
+ const Slice& key,
+ const bool assume_tracked = false) override;
+ virtual Status SingleDelete(ColumnFamilyHandle* column_family,
+ const SliceParts& key,
+ const bool assume_tracked = false) override;
+
+ // In WriteUnprepared, untracked writes will break snapshot validation logic.
+ // Snapshot validation will only check the largest sequence number of a key to
+ // see if it was committed or not. However, an untracked unprepared write will
+ // hide smaller committed sequence numbers.
+ //
+ // TODO(lth): Investigate whether it is worth having snapshot validation
+ // validate all values larger than snap_seq. Otherwise, we should return
+ // Status::NotSupported for untracked writes.
+
+ virtual Status RebuildFromWriteBatch(WriteBatch*) override;
+
+ virtual uint64_t GetLastLogNumber() const override {
+ return last_log_number_;
+ }
+
+ void RemoveActiveIterator(Iterator* iter) {
+ active_iterators_.erase(
+ std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
+ active_iterators_.end());
+ }
+
+ protected:
+ void Initialize(const TransactionOptions& txn_options) override;
+
+ Status PrepareInternal() override;
+
+ Status CommitWithoutPrepareInternal() override;
+ Status CommitInternal() override;
+
+ Status RollbackInternal() override;
+
+ void Clear() override;
+
+ void SetSavePoint() override;
+ Status RollbackToSavePoint() override;
+ Status PopSavePoint() override;
+
+ // Get and GetIterator needs to be overridden so that a ReadCallback to
+ // handle read-your-own-write is used.
+ using Transaction::Get;
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) override;
+
+ using Transaction::MultiGet;
+ virtual void MultiGet(const ReadOptions& options,
+ ColumnFamilyHandle* column_family,
+ const size_t num_keys, const Slice* keys,
+ PinnableSlice* values, Status* statuses,
+ const bool sorted_input = false) override;
+
+ using Transaction::GetIterator;
+ virtual Iterator* GetIterator(const ReadOptions& options) override;
+ virtual Iterator* GetIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family) override;
+
+ virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
+ const Slice& key,
+ SequenceNumber* tracked_at_seq) override;
+
+ private:
+ friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
+ friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
+ friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
+ friend class WriteUnpreparedTxnDB;
+
+ const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
+ Status WriteRollbackKeys(const TransactionKeyMap& tracked_keys,
+ WriteBatchWithIndex* rollback_batch,
+ ReadCallback* callback, const ReadOptions& roptions);
+
+ Status MaybeFlushWriteBatchToDB();
+ Status FlushWriteBatchToDB(bool prepared);
+ Status FlushWriteBatchToDBInternal(bool prepared);
+ Status FlushWriteBatchWithSavePointToDB();
+ Status RollbackToSavePointInternal();
+ Status HandleWrite(std::function<Status()> do_write);
+
+ // For write unprepared, we check on every writebatch append to see if
+ // write_batch_flush_threshold_ has been exceeded, and then call
+ // FlushWriteBatchToDB if so. This logic is encapsulated in
+ // MaybeFlushWriteBatchToDB.
+ int64_t write_batch_flush_threshold_;
+ WriteUnpreparedTxnDB* wupt_db_;
+
+ // Ordered list of unprep_seq sequence numbers that we have already written
+ // to DB.
+ //
+ // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
+ // written by this transaction.
+ //
+ // Note that this contains both prepared and unprepared batches, since they
+ // are treated similarily in prepare heap/commit map, so it simplifies the
+ // commit callbacks.
+ std::map<SequenceNumber, size_t> unprep_seqs_;
+
+ uint64_t last_log_number_;
+
+ // Recovered transactions have tracked_keys_ populated, but are not actually
+ // locked for efficiency reasons. For recovered transactions, skip unlocking
+ // keys when transaction ends.
+ bool recovered_txn_;
+
+ // Track the largest sequence number at which we performed snapshot
+ // validation. If snapshot validation was skipped because no snapshot was set,
+ // then this is set to GetLastPublishedSequence. This value is useful because
+ // it means that for keys that have unprepared seqnos, we can guarantee that
+ // no committed keys by other transactions can exist between
+ // largest_validated_seq_ and max_unprep_seq. See
+ // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
+ // necessary for iterator Prev().
+ //
+ // Currently this value only increases during the lifetime of a transaction,
+ // but in some cases, we should be able to restore the previously largest
+ // value when calling RollbackToSavepoint.
+ SequenceNumber largest_validated_seq_;
+
+ using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
+ struct SavePoint {
+ // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
+ // used during RollbackToSavepoint to determine visibility when restoring
+ // old values.
+ //
+ // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
+ // subsets, this can potentially be deduplicated by just storing set
+ // difference. Investigate if this is worth it.
+ std::map<SequenceNumber, size_t> unprep_seqs_;
+
+ // This snapshot will be used to read keys at this savepoint if we call
+ // RollbackToSavePoint.
+ std::unique_ptr<ManagedSnapshot> snapshot_;
+
+ SavePoint(const std::map<SequenceNumber, size_t>& seqs,
+ ManagedSnapshot* snapshot)
+ : unprep_seqs_(seqs), snapshot_(snapshot){};
+ };
+
+ // We have 3 data structures holding savepoint information:
+ // 1. TransactionBaseImpl::save_points_
+ // 2. WriteUnpreparedTxn::flushed_save_points_
+ // 3. WriteUnpreparecTxn::unflushed_save_points_
+ //
+ // TransactionBaseImpl::save_points_ holds information about all write
+ // batches, including the current in-memory write_batch_, or unprepared
+ // batches that have been written out. Its responsibility is just to track
+ // which keys have been modified in every savepoint.
+ //
+ // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
+ // set on unprepared batches that have already flushed. It holds the snapshot
+ // and unprep_seqs at that savepoint, so that the rollback process can
+ // determine which keys were visible at that point in time.
+ //
+ // WriteUnpreparecTxn::unflushed_save_points_ holds information about
+ // savepoints on the current in-memory write_batch_. It simply records the
+ // size of the write batch at every savepoint.
+ //
+ // TODO(lth): Remove the redundancy between save_point_boundaries_ and
+ // write_batch_.save_points_.
+ //
+ // Based on this information, here are some invariants:
+ // size(unflushed_save_points_) = size(write_batch_.save_points_)
+ // size(flushed_save_points_) + size(unflushed_save_points_)
+ // = size(save_points_)
+ //
+ std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
+ flushed_save_points_;
+ std::unique_ptr<autovector<size_t>> unflushed_save_points_;
+
+ // It is currently unsafe to flush a write batch if there are active iterators
+ // created from this transaction. This is because we use WriteBatchWithIndex
+ // to do merging reads from the DB and the write batch. If we flush the write
+ // batch, it is possible that the delta iterator on the iterator will point to
+ // invalid memory.
+ std::vector<Iterator*> active_iterators_;
+
+ // Untracked keys that we have to rollback.
+ //
+ // TODO(lth): Currently we we do not record untracked keys per-savepoint.
+ // This means that when rolling back to savepoints, we have to check all
+ // keys in the current transaction for rollback. Note that this is only
+ // inefficient, but still correct because we take a snapshot at every
+ // savepoint, and we will use that snapshot to construct the rollback batch.
+ // The rollback batch will then contain a reissue of the same marker.
+ //
+ // A more optimal solution would be to only check keys changed since the
+ // last savepoint. Also, it may make sense to merge this into tracked_keys_
+ // and differentiate between tracked but not locked keys to avoid having two
+ // very similar data structures.
+ KeySet untracked_keys_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE