summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db_stress_tool/multi_ops_txns_stress.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db_stress_tool/multi_ops_txns_stress.h444
1 files changed, 444 insertions, 0 deletions
diff --git a/src/rocksdb/db_stress_tool/multi_ops_txns_stress.h b/src/rocksdb/db_stress_tool/multi_ops_txns_stress.h
new file mode 100644
index 000000000..7463d05d7
--- /dev/null
+++ b/src/rocksdb/db_stress_tool/multi_ops_txns_stress.h
@@ -0,0 +1,444 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifdef GFLAGS
+#include "db_stress_tool/db_stress_common.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// This file defines MultiOpsTxnsStress so that we can stress test RocksDB
+// transactions on a simple, emulated relational table.
+//
+// The record format is similar to the example found at
+// https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format.
+//
+// The table is created by
+// ```
+// create table t1 (
+// a int primary key,
+// b int,
+// c int,
+// key(c),
+// )
+// ```
+//
+// (For simplicity, we use uint32_t for int here.)
+//
+// For this table, there is a primary index using `a`, as well as a secondary
+// index using `c` and `a`.
+//
+// Primary key format:
+// | index id | M(a) |
+// Primary index value:
+// | b | c |
+// M(a) represents the big-endian format of a.
+//
+// Secondary key format:
+// | index id | M(c) | M(a) |
+// Secondary index value:
+// | crc32 |
+// Similarly to M(a), M(c) is the big-endian format of c.
+//
+// The in-memory representation of a record is defined in class
+// MultiOpsTxnsStress:Record that includes a number of helper methods to
+// encode/decode primary index keys, primary index values, secondary index keys,
+// secondary index values, etc.
+//
+// Sometimes primary index and secondary index reside on different column
+// families, but sometimes they colocate in the same column family. Current
+// implementation puts them in the same (default) column family, and this is
+// subject to future change if we find it interesting to test the other case.
+//
+// Class MultiOpsTxnsStressTest has the following transactions for testing.
+//
+// 1. Primary key update
+// UPDATE t1 SET a = 3 WHERE a = 2;
+// ```
+// tx->GetForUpdate(primary key a=2)
+// tx->GetForUpdate(primary key a=3)
+// tx->Delete(primary key a=2)
+// tx->Put(primary key a=3, value)
+// tx->batch->SingleDelete(secondary key a=2)
+// tx->batch->Put(secondary key a=3, value)
+// tx->Prepare()
+// Tx->Commit()
+// ```
+//
+// 2. Secondary key update
+// UPDATE t1 SET c = 3 WHERE c = 2;
+// ```
+// iter->Seek(secondary key)
+// // Get corresponding primary key value(s) from iterator
+// tx->GetForUpdate(primary key)
+// tx->Put(primary key, value c=3)
+// tx->batch->SingleDelete(secondary key c=2)
+// tx->batch->Put(secondary key c=3)
+// tx->Prepare()
+// tx->Commit()
+// ```
+//
+// 3. Primary index value update
+// UPDATE t1 SET b = b + 1 WHERE a = 2;
+// ```
+// tx->GetForUpdate(primary key a=2)
+// tx->Put(primary key a=2, value b=b+1)
+// tx->Prepare()
+// tx->Commit()
+// ```
+//
+// 4. Point lookup
+// SELECT * FROM t1 WHERE a = 3;
+// ```
+// tx->Get(primary key a=3)
+// tx->Commit()
+// ```
+//
+// 5. Range scan
+// SELECT * FROM t1 WHERE c = 2;
+// ```
+// it = tx->GetIterator()
+// it->Seek(secondary key c=2)
+// tx->Commit()
+// ```
+
+class MultiOpsTxnsStressTest : public StressTest {
+ public:
+ class Record {
+ public:
+ static constexpr uint32_t kMetadataPrefix = 0;
+ static constexpr uint32_t kPrimaryIndexId = 1;
+ static constexpr uint32_t kSecondaryIndexId = 2;
+
+ static constexpr size_t kPrimaryIndexEntrySize = 8 + 8;
+ static constexpr size_t kSecondaryIndexEntrySize = 12 + 4;
+
+ static_assert(kPrimaryIndexId < kSecondaryIndexId,
+ "kPrimaryIndexId must be smaller than kSecondaryIndexId");
+
+ static_assert(sizeof(kPrimaryIndexId) == sizeof(uint32_t),
+ "kPrimaryIndexId must be 4 bytes");
+ static_assert(sizeof(kSecondaryIndexId) == sizeof(uint32_t),
+ "kSecondaryIndexId must be 4 bytes");
+
+ // Used for generating search key to probe primary index.
+ static std::string EncodePrimaryKey(uint32_t a);
+ // Used for generating search prefix to probe secondary index.
+ static std::string EncodeSecondaryKey(uint32_t c);
+ // Used for generating search key to probe secondary index.
+ static std::string EncodeSecondaryKey(uint32_t c, uint32_t a);
+
+ static std::tuple<Status, uint32_t, uint32_t> DecodePrimaryIndexValue(
+ Slice primary_index_value);
+
+ static std::pair<Status, uint32_t> DecodeSecondaryIndexValue(
+ Slice secondary_index_value);
+
+ Record() = default;
+ Record(uint32_t _a, uint32_t _b, uint32_t _c) : a_(_a), b_(_b), c_(_c) {}
+
+ bool operator==(const Record& other) const {
+ return a_ == other.a_ && b_ == other.b_ && c_ == other.c_;
+ }
+
+ bool operator!=(const Record& other) const { return !(*this == other); }
+
+ std::pair<std::string, std::string> EncodePrimaryIndexEntry() const;
+
+ std::string EncodePrimaryKey() const;
+
+ std::string EncodePrimaryIndexValue() const;
+
+ std::pair<std::string, std::string> EncodeSecondaryIndexEntry() const;
+
+ std::string EncodeSecondaryKey() const;
+
+ Status DecodePrimaryIndexEntry(Slice primary_index_key,
+ Slice primary_index_value);
+
+ Status DecodeSecondaryIndexEntry(Slice secondary_index_key,
+ Slice secondary_index_value);
+
+ uint32_t a_value() const { return a_; }
+ uint32_t b_value() const { return b_; }
+ uint32_t c_value() const { return c_; }
+
+ void SetA(uint32_t _a) { a_ = _a; }
+ void SetB(uint32_t _b) { b_ = _b; }
+ void SetC(uint32_t _c) { c_ = _c; }
+
+ std::string ToString() const {
+ std::string ret("(");
+ ret.append(std::to_string(a_));
+ ret.append(",");
+ ret.append(std::to_string(b_));
+ ret.append(",");
+ ret.append(std::to_string(c_));
+ ret.append(")");
+ return ret;
+ }
+
+ private:
+ friend class InvariantChecker;
+
+ uint32_t a_{0};
+ uint32_t b_{0};
+ uint32_t c_{0};
+ };
+
+ MultiOpsTxnsStressTest() {}
+
+ ~MultiOpsTxnsStressTest() override {}
+
+ void FinishInitDb(SharedState*) override;
+
+ void ReopenAndPreloadDbIfNeeded(SharedState* shared);
+
+ bool IsStateTracked() const override { return false; }
+
+ Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ std::vector<Status> TestMultiGet(
+ ThreadState* thread, const ReadOptions& read_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ // Given a key K, this creates an iterator which scans to K and then
+ // does a random sequence of Next/Prev operations.
+ Status TestIterate(ThreadState* thread, const ReadOptions& read_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ Status TestPut(ThreadState* thread, WriteOptions& write_opts,
+ const ReadOptions& read_opts, const std::vector<int>& cf_ids,
+ const std::vector<int64_t>& keys, char (&value)[100]) override;
+
+ Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ void TestIngestExternalFile(ThreadState* thread,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ void TestCompactRange(ThreadState* thread, int64_t rand_key,
+ const Slice& start_key,
+ ColumnFamilyHandle* column_family) override;
+
+ Status TestBackupRestore(ThreadState* thread,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+ Status TestCheckpoint(ThreadState* thread,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+
+#ifndef ROCKSDB_LITE
+ Status TestApproximateSize(ThreadState* thread, uint64_t iteration,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) override;
+#endif // !ROCKSDB_LITE
+
+ Status TestCustomOperations(
+ ThreadState* thread,
+ const std::vector<int>& rand_column_families) override;
+
+ void RegisterAdditionalListeners() override;
+
+#ifndef ROCKSDB_LITE
+ void PrepareTxnDbOptions(SharedState* /*shared*/,
+ TransactionDBOptions& txn_db_opts) override;
+#endif // !ROCKSDB_LITE
+
+ Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a,
+ uint32_t old_a_pos, uint32_t new_a);
+
+ Status SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c,
+ uint32_t old_c_pos, uint32_t new_c);
+
+ Status UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a,
+ uint32_t b_delta);
+
+ Status PointLookupTxn(ThreadState* thread, ReadOptions ropts, uint32_t a);
+
+ Status RangeScanTxn(ThreadState* thread, ReadOptions ropts, uint32_t c);
+
+ void VerifyDb(ThreadState* thread) const override;
+
+ void ContinuouslyVerifyDb(ThreadState* thread) const override {
+ VerifyDb(thread);
+ }
+
+ void VerifyPkSkFast(int job_id);
+
+ protected:
+ class Counter {
+ public:
+ uint64_t Next() { return value_.fetch_add(1); }
+
+ private:
+ std::atomic<uint64_t> value_ = Env::Default()->NowNanos();
+ };
+
+ using KeySet = std::set<uint32_t>;
+ class KeyGenerator {
+ public:
+ explicit KeyGenerator(uint32_t s, uint32_t low, uint32_t high,
+ KeySet&& existing_uniq, KeySet&& non_existing_uniq)
+ : rand_(s),
+ low_(low),
+ high_(high),
+ existing_uniq_(std::move(existing_uniq)),
+ non_existing_uniq_(std::move(non_existing_uniq)) {}
+ ~KeyGenerator() {
+ assert(!existing_uniq_.empty());
+ assert(!non_existing_uniq_.empty());
+ }
+ void FinishInit();
+
+ std::pair<uint32_t, uint32_t> ChooseExisting();
+ void Replace(uint32_t old_val, uint32_t old_pos, uint32_t new_val);
+ uint32_t Allocate();
+ void UndoAllocation(uint32_t new_val);
+
+ std::string ToString() const {
+ std::ostringstream oss;
+ oss << "[" << low_ << ", " << high_ << "): " << existing_.size()
+ << " elements, " << existing_uniq_.size() << " unique values, "
+ << non_existing_uniq_.size() << " unique non-existing values";
+ return oss.str();
+ }
+
+ private:
+ Random rand_;
+ uint32_t low_ = 0;
+ uint32_t high_ = 0;
+ std::vector<uint32_t> existing_{};
+ KeySet existing_uniq_{};
+ KeySet non_existing_uniq_{};
+ bool initialized_ = false;
+ };
+
+ // Return <a, pos>
+ std::pair<uint32_t, uint32_t> ChooseExistingA(ThreadState* thread);
+
+ uint32_t GenerateNextA(ThreadState* thread);
+
+ // Return <c, pos>
+ std::pair<uint32_t, uint32_t> ChooseExistingC(ThreadState* thread);
+
+ uint32_t GenerateNextC(ThreadState* thread);
+
+#ifndef ROCKSDB_LITE
+ // Randomly commit or rollback `txn`
+ void ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
+ SharedState*) override;
+
+ // Some applications, e.g. MyRocks writes a KV pair to the database via
+ // commit-time-write-batch (ctwb) in additional to the transaction's regular
+ // write batch. The key is usually constant representing some system
+ // metadata, while the value is monoticailly increasing which represents the
+ // actual value of the metadata. Method WriteToCommitTimeWriteBatch()
+ // emulates this scenario.
+ Status WriteToCommitTimeWriteBatch(Transaction& txn);
+
+ Status CommitAndCreateTimestampedSnapshotIfNeeded(ThreadState* thread,
+ Transaction& txn);
+
+ void SetupSnapshot(ThreadState* thread, ReadOptions& read_opts,
+ Transaction& txn,
+ std::shared_ptr<const Snapshot>& snapshot);
+#endif //! ROCKSDB_LITE
+
+ std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_;
+ std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_c_;
+
+ Counter counter_{};
+
+ private:
+ struct KeySpaces {
+ uint32_t lb_a = 0;
+ uint32_t ub_a = 0;
+ uint32_t lb_c = 0;
+ uint32_t ub_c = 0;
+
+ explicit KeySpaces() = default;
+ explicit KeySpaces(uint32_t _lb_a, uint32_t _ub_a, uint32_t _lb_c,
+ uint32_t _ub_c)
+ : lb_a(_lb_a), ub_a(_ub_a), lb_c(_lb_c), ub_c(_ub_c) {}
+
+ std::string EncodeTo() const;
+ bool DecodeFrom(Slice data);
+ };
+
+ void PersistKeySpacesDesc(const std::string& key_spaces_path, uint32_t lb_a,
+ uint32_t ub_a, uint32_t lb_c, uint32_t ub_c);
+
+ KeySpaces ReadKeySpacesDesc(const std::string& key_spaces_path);
+
+ void PreloadDb(SharedState* shared, int threads, uint32_t lb_a, uint32_t ub_a,
+ uint32_t lb_c, uint32_t ub_c);
+
+ void ScanExistingDb(SharedState* shared, int threads);
+};
+
+class InvariantChecker {
+ public:
+ static_assert(sizeof(MultiOpsTxnsStressTest::Record().a_) == sizeof(uint32_t),
+ "MultiOpsTxnsStressTest::Record::a_ must be 4 bytes");
+ static_assert(sizeof(MultiOpsTxnsStressTest::Record().b_) == sizeof(uint32_t),
+ "MultiOpsTxnsStressTest::Record::b_ must be 4 bytes");
+ static_assert(sizeof(MultiOpsTxnsStressTest::Record().c_) == sizeof(uint32_t),
+ "MultiOpsTxnsStressTest::Record::c_ must be 4 bytes");
+};
+
+class MultiOpsTxnsStressListener : public EventListener {
+ public:
+ explicit MultiOpsTxnsStressListener(MultiOpsTxnsStressTest* stress_test)
+ : stress_test_(stress_test) {
+ assert(stress_test_);
+ }
+
+#ifndef ROCKSDB_LITE
+ ~MultiOpsTxnsStressListener() override {}
+
+ void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
+ assert(db);
+#ifdef NDEBUG
+ (void)db;
+#endif
+ assert(info.cf_id == 0);
+ stress_test_->VerifyPkSkFast(info.job_id);
+ }
+
+ void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override {
+ assert(db);
+#ifdef NDEBUG
+ (void)db;
+#endif
+ assert(info.cf_id == 0);
+ stress_test_->VerifyPkSkFast(info.job_id);
+ }
+#endif //! ROCKSDB_LITE
+
+ private:
+ MultiOpsTxnsStressTest* const stress_test_ = nullptr;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // GFLAGS