summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/db_merge_operator_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/db_merge_operator_test.cc')
-rw-r--r--src/rocksdb/db/db_merge_operator_test.cc669
1 files changed, 669 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_merge_operator_test.cc b/src/rocksdb/db/db_merge_operator_test.cc
new file mode 100644
index 000000000..7c5505bd1
--- /dev/null
+++ b/src/rocksdb/db/db_merge_operator_test.cc
@@ -0,0 +1,669 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#include <string>
+#include <vector>
+
+#include "db/db_test_util.h"
+#include "db/forward_iterator.h"
+#include "port/stack_trace.h"
+#include "rocksdb/merge_operator.h"
+#include "util/random.h"
+#include "utilities/merge_operators.h"
+#include "utilities/merge_operators/string_append/stringappend2.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class TestReadCallback : public ReadCallback {
+ public:
+ TestReadCallback(SnapshotChecker* snapshot_checker,
+ SequenceNumber snapshot_seq)
+ : ReadCallback(snapshot_seq),
+ snapshot_checker_(snapshot_checker),
+ snapshot_seq_(snapshot_seq) {}
+
+ bool IsVisibleFullCheck(SequenceNumber seq) override {
+ return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
+ SnapshotCheckerResult::kInSnapshot;
+ }
+
+ private:
+ SnapshotChecker* snapshot_checker_;
+ SequenceNumber snapshot_seq_;
+};
+
+// Test merge operator functionality.
+class DBMergeOperatorTest : public DBTestBase {
+ public:
+ DBMergeOperatorTest()
+ : DBTestBase("db_merge_operator_test", /*env_do_fsync=*/false) {}
+
+ std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
+ const Slice& key,
+ const Snapshot* snapshot = nullptr) {
+ SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
+ : snapshot->GetSequenceNumber();
+ TestReadCallback read_callback(snapshot_checker, seq);
+ ReadOptions read_opt;
+ read_opt.snapshot = snapshot;
+ PinnableSlice value;
+ DBImpl::GetImplOptions get_impl_options;
+ get_impl_options.column_family = db_->DefaultColumnFamily();
+ get_impl_options.value = &value;
+ get_impl_options.callback = &read_callback;
+ Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
+ if (!s.ok()) {
+ return s.ToString();
+ }
+ return value.ToString();
+ }
+};
+
+TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
+ class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
+ public:
+ LimitedStringAppendMergeOp(int limit, char delim)
+ : StringAppendTESTOperator(delim), limit_(limit) {}
+
+ const char* Name() const override {
+ return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
+ }
+
+ bool ShouldMerge(const std::vector<Slice>& operands) const override {
+ if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
+ return true;
+ }
+ return false;
+ }
+
+ private:
+ size_t limit_ = 0;
+ };
+
+ Options options;
+ options.create_if_missing = true;
+ // Use only the latest two merge operands.
+ options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
+ options.env = env_;
+ Reopen(options);
+ // All K1 values are in memtable.
+ ASSERT_OK(Merge("k1", "a"));
+ ASSERT_OK(Merge("k1", "b"));
+ ASSERT_OK(Merge("k1", "c"));
+ ASSERT_OK(Merge("k1", "d"));
+ std::string value;
+ ASSERT_OK(db_->Get(ReadOptions(), "k1", &value));
+ // Make sure that only the latest two merge operands are used. If this was
+ // not the case the value would be "a,b,c,d".
+ ASSERT_EQ(value, "c,d");
+
+ // All K2 values are flushed to L0 into a single file.
+ ASSERT_OK(Merge("k2", "a"));
+ ASSERT_OK(Merge("k2", "b"));
+ ASSERT_OK(Merge("k2", "c"));
+ ASSERT_OK(Merge("k2", "d"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
+ ASSERT_EQ(value, "c,d");
+
+ // All K3 values are flushed and are in different files.
+ ASSERT_OK(Merge("k3", "ab"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Merge("k3", "bc"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Merge("k3", "cd"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Merge("k3", "de"));
+ ASSERT_OK(db_->Get(ReadOptions(), "k3", &value));
+ ASSERT_EQ(value, "cd,de");
+
+ // All K4 values are in different levels
+ ASSERT_OK(Merge("k4", "ab"));
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(4);
+ ASSERT_OK(Merge("k4", "bc"));
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(3);
+ ASSERT_OK(Merge("k4", "cd"));
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(1);
+ ASSERT_OK(Merge("k4", "de"));
+ ASSERT_OK(db_->Get(ReadOptions(), "k4", &value));
+ ASSERT_EQ(value, "cd,de");
+}
+
+TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
+ Options options;
+ options.create_if_missing = true;
+ options.merge_operator.reset(new TestPutOperator());
+ options.env = env_;
+ Reopen(options);
+ ASSERT_OK(Merge("k1", "v1"));
+ ASSERT_OK(Merge("k1", "corrupted"));
+ std::string value;
+ ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
+ VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
+}
+
+TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
+ Options options;
+ options.create_if_missing = true;
+ options.merge_operator.reset(new TestPutOperator());
+ options.max_successive_merges = 3;
+ options.env = env_;
+ Reopen(options);
+ ASSERT_OK(Merge("k1", "v1"));
+ ASSERT_OK(Merge("k1", "v2"));
+ // Will trigger a merge when hitting max_successive_merges and the merge
+ // will fail. The delta will be inserted nevertheless.
+ ASSERT_OK(Merge("k1", "corrupted"));
+ // Data should stay unmerged after the error.
+ VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
+}
+
+TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
+ Options options;
+ options.create_if_missing = true;
+ options.merge_operator.reset(new TestPutOperator());
+ options.env = env_;
+
+ DestroyAndReopen(options);
+ ASSERT_OK(Merge("k1", "v1"));
+ ASSERT_OK(Merge("k1", "corrupted"));
+ ASSERT_OK(Put("k2", "v2"));
+ auto* iter = db_->NewIterator(ReadOptions());
+ iter->Seek("k1");
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_TRUE(iter->status().IsCorruption());
+ delete iter;
+ iter = db_->NewIterator(ReadOptions());
+ iter->Seek("k2");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ iter->Prev();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_TRUE(iter->status().IsCorruption());
+ delete iter;
+ VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
+
+ DestroyAndReopen(options);
+ ASSERT_OK(Merge("k1", "v1"));
+ ASSERT_OK(Put("k2", "v2"));
+ ASSERT_OK(Merge("k2", "corrupted"));
+ iter = db_->NewIterator(ReadOptions());
+ iter->Seek("k1");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ iter->Next();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_TRUE(iter->status().IsCorruption());
+ delete iter;
+ VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
+}
+
+class MergeOperatorPinningTest : public DBMergeOperatorTest,
+ public testing::WithParamInterface<bool> {
+ public:
+ MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
+
+ bool disable_block_cache_;
+};
+
+INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
+ ::testing::Bool());
+
+#ifndef ROCKSDB_LITE
+TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
+ Options options = CurrentOptions();
+ BlockBasedTableOptions table_options;
+ table_options.block_size = 1; // every block will contain one entry
+ table_options.no_block_cache = disable_block_cache_;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
+ options.level0_slowdown_writes_trigger = (1 << 30);
+ options.level0_stop_writes_trigger = (1 << 30);
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+
+ const int kKeysPerFile = 10;
+ const int kOperandsPerKeyPerFile = 7;
+ const int kOperandSize = 100;
+ // Filse to write in L0 before compacting to lower level
+ const int kFilesPerLevel = 3;
+
+ Random rnd(301);
+ std::map<std::string, std::string> true_data;
+ int batch_num = 1;
+ int lvl_to_fill = 4;
+ int key_id = 0;
+ while (true) {
+ for (int j = 0; j < kKeysPerFile; j++) {
+ std::string key = Key(key_id % 35);
+ key_id++;
+ for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
+ std::string val = rnd.RandomString(kOperandSize);
+ ASSERT_OK(db_->Merge(WriteOptions(), key, val));
+ if (true_data[key].size() == 0) {
+ true_data[key] = val;
+ } else {
+ true_data[key] += "," + val;
+ }
+ }
+ }
+
+ if (lvl_to_fill == -1) {
+ // Keep last batch in memtable and stop
+ break;
+ }
+
+ ASSERT_OK(Flush());
+ if (batch_num % kFilesPerLevel == 0) {
+ if (lvl_to_fill != 0) {
+ MoveFilesToLevel(lvl_to_fill);
+ }
+ lvl_to_fill--;
+ }
+ batch_num++;
+ }
+
+ // 3 L0 files
+ // 1 L1 file
+ // 3 L2 files
+ // 1 L3 file
+ // 3 L4 Files
+ ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
+
+ VerifyDBFromMap(true_data);
+}
+
+class MergeOperatorHook : public MergeOperator {
+ public:
+ explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
+ : merge_op_(_merge_op) {}
+
+ bool FullMergeV2(const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const override {
+ before_merge_();
+ bool res = merge_op_->FullMergeV2(merge_in, merge_out);
+ after_merge_();
+ return res;
+ }
+
+ const char* Name() const override { return merge_op_->Name(); }
+
+ std::shared_ptr<MergeOperator> merge_op_;
+ std::function<void()> before_merge_ = []() {};
+ std::function<void()> after_merge_ = []() {};
+};
+
+TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
+ Options options = CurrentOptions();
+
+ auto merge_hook =
+ std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
+ options.merge_operator = merge_hook;
+ options.disable_auto_compactions = true;
+ options.level0_slowdown_writes_trigger = (1 << 30);
+ options.level0_stop_writes_trigger = (1 << 30);
+ options.max_open_files = 20;
+ BlockBasedTableOptions bbto;
+ bbto.no_block_cache = disable_block_cache_;
+ if (bbto.no_block_cache == false) {
+ bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
+ } else {
+ bbto.block_cache = nullptr;
+ }
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ DestroyAndReopen(options);
+
+ const int kNumOperands = 30;
+ const int kNumKeys = 1000;
+ const int kOperandSize = 100;
+ Random rnd(301);
+
+ // 1000 keys every key have 30 operands, every operand is in a different file
+ std::map<std::string, std::string> true_data;
+ for (int i = 0; i < kNumOperands; i++) {
+ for (int j = 0; j < kNumKeys; j++) {
+ std::string k = Key(j);
+ std::string v = rnd.RandomString(kOperandSize);
+ ASSERT_OK(db_->Merge(WriteOptions(), k, v));
+
+ true_data[k] = std::max(true_data[k], v);
+ }
+ ASSERT_OK(Flush());
+ }
+
+ std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
+ ASSERT_EQ(file_numbers.size(), kNumOperands);
+ int merge_cnt = 0;
+
+ // Code executed before merge operation
+ merge_hook->before_merge_ = [&]() {
+ // Evict all tables from cache before every merge operation
+ auto* table_cache = dbfull()->TEST_table_cache();
+ for (uint64_t num : file_numbers) {
+ TableCache::Evict(table_cache, num);
+ }
+ // Decrease cache capacity to force all unrefed blocks to be evicted
+ if (bbto.block_cache) {
+ bbto.block_cache->SetCapacity(1);
+ }
+ merge_cnt++;
+ };
+
+ // Code executed after merge operation
+ merge_hook->after_merge_ = [&]() {
+ // Increase capacity again after doing the merge
+ if (bbto.block_cache) {
+ bbto.block_cache->SetCapacity(64 * 1024 * 1024);
+ }
+ };
+
+ size_t total_reads;
+ VerifyDBFromMap(true_data, &total_reads);
+ ASSERT_EQ(merge_cnt, total_reads);
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ VerifyDBFromMap(true_data, &total_reads);
+}
+
+TEST_P(MergeOperatorPinningTest, TailingIterator) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreateMaxOperator();
+ BlockBasedTableOptions bbto;
+ bbto.no_block_cache = disable_block_cache_;
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ DestroyAndReopen(options);
+
+ const int kNumOperands = 100;
+ const int kNumWrites = 100000;
+
+ std::function<void()> writer_func = [&]() {
+ int k = 0;
+ for (int i = 0; i < kNumWrites; i++) {
+ ASSERT_OK(db_->Merge(WriteOptions(), Key(k), Key(k)));
+
+ if (i && i % kNumOperands == 0) {
+ k++;
+ }
+ if (i && i % 127 == 0) {
+ ASSERT_OK(Flush());
+ }
+ if (i && i % 317 == 0) {
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ }
+ }
+ };
+
+ std::function<void()> reader_func = [&]() {
+ ReadOptions ro;
+ ro.tailing = true;
+ Iterator* iter = db_->NewIterator(ro);
+ ASSERT_OK(iter->status());
+ iter->SeekToFirst();
+ for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
+ while (!iter->Valid()) {
+ // wait for the key to be written
+ env_->SleepForMicroseconds(100);
+ iter->Seek(Key(i));
+ }
+ ASSERT_EQ(iter->key(), Key(i));
+ ASSERT_EQ(iter->value(), Key(i));
+
+ iter->Next();
+ }
+ ASSERT_OK(iter->status());
+
+ delete iter;
+ };
+
+ ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
+ ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
+
+ writer_thread.join();
+ reader_thread.join();
+}
+
+TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ DestroyAndReopen(options);
+
+ // Overview of the test:
+ // * There are two merge operands for the same key: one in an sst file,
+ // another in a memtable.
+ // * Seek a tailing iterator to this key.
+ // * As part of the seek, the iterator will:
+ // (a) first visit the operand in the memtable and tell ForwardIterator
+ // to pin this operand, then
+ // (b) move on to the operand in the sst file, then pass both operands
+ // to merge operator.
+ // * The memtable may get flushed and unreferenced by another thread between
+ // (a) and (b). The test simulates it by flushing the memtable inside a
+ // SyncPoint callback located between (a) and (b).
+ // * In this case it's ForwardIterator's responsibility to keep the memtable
+ // pinned until (b) is complete. There used to be a bug causing
+ // ForwardIterator to not pin it in some circumstances. This test
+ // reproduces it.
+
+ ASSERT_OK(db_->Merge(WriteOptions(), "key", "sst"));
+ ASSERT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion A
+ ASSERT_OK(db_->Merge(WriteOptions(), "key", "memtable"));
+
+ // Pin SuperVersion A
+ std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
+ ASSERT_OK(someone_else->status());
+
+ bool pushed_first_operand = false;
+ bool stepped_to_next_operand = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
+ EXPECT_FALSE(pushed_first_operand);
+ pushed_first_operand = true;
+ EXPECT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion B
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
+ EXPECT_FALSE(stepped_to_next_operand);
+ stepped_to_next_operand = true;
+ someone_else.reset(); // Unpin SuperVersion A
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ReadOptions ro;
+ ro.tailing = true;
+ std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
+ iter->Seek("key");
+
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
+ EXPECT_TRUE(pushed_first_operand);
+ EXPECT_TRUE(stepped_to_next_operand);
+}
+#endif // ROCKSDB_LITE
+
+TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ DestroyAndReopen(options);
+
+ class TestSnapshotChecker : public SnapshotChecker {
+ public:
+ SnapshotCheckerResult CheckInSnapshot(
+ SequenceNumber seq, SequenceNumber snapshot_seq) const override {
+ return IsInSnapshot(seq, snapshot_seq)
+ ? SnapshotCheckerResult::kInSnapshot
+ : SnapshotCheckerResult::kNotInSnapshot;
+ }
+
+ bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
+ switch (snapshot_seq) {
+ case 0:
+ return seq == 0;
+ case 1:
+ return seq <= 1;
+ case 2:
+ // seq = 2 not visible to snapshot with seq = 2
+ return seq <= 1;
+ case 3:
+ return seq <= 3;
+ case 4:
+ // seq = 4 not visible to snpahost with seq = 4
+ return seq <= 3;
+ default:
+ // seq >=4 is uncommitted
+ return seq <= 4;
+ };
+ }
+ };
+ TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
+ dbfull()->SetSnapshotChecker(snapshot_checker);
+
+ std::string value;
+ ASSERT_OK(Merge("foo", "v1"));
+ ASSERT_EQ(1, db_->GetLatestSequenceNumber());
+ ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
+ ASSERT_OK(Merge("foo", "v2"));
+ ASSERT_EQ(2, db_->GetLatestSequenceNumber());
+ // v2 is not visible to latest snapshot, which has seq = 2.
+ ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
+ // Take a snapshot with seq = 2.
+ const Snapshot* snapshot1 = db_->GetSnapshot();
+ ASSERT_EQ(2, snapshot1->GetSequenceNumber());
+ // v2 is not visible to snapshot1, which has seq = 2
+ ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
+
+ // Verify flush doesn't alter the result.
+ ASSERT_OK(Flush());
+ ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
+ ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
+
+ ASSERT_OK(Merge("foo", "v3"));
+ ASSERT_EQ(3, db_->GetLatestSequenceNumber());
+ ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
+ ASSERT_OK(Merge("foo", "v4"));
+ ASSERT_EQ(4, db_->GetLatestSequenceNumber());
+ // v4 is not visible to latest snapshot, which has seq = 4.
+ ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
+ const Snapshot* snapshot2 = db_->GetSnapshot();
+ ASSERT_EQ(4, snapshot2->GetSequenceNumber());
+ // v4 is not visible to snapshot2, which has seq = 4.
+ ASSERT_EQ("v1,v2,v3",
+ GetWithReadCallback(snapshot_checker, "foo", snapshot2));
+
+ // Verify flush doesn't alter the result.
+ ASSERT_OK(Flush());
+ ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
+ ASSERT_EQ("v1,v2,v3",
+ GetWithReadCallback(snapshot_checker, "foo", snapshot2));
+ ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
+
+ ASSERT_OK(Merge("foo", "v5"));
+ ASSERT_EQ(5, db_->GetLatestSequenceNumber());
+ // v5 is uncommitted
+ ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
+
+ // full manual compaction.
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ // Verify compaction doesn't alter the result.
+ ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
+ ASSERT_EQ("v1,v2,v3",
+ GetWithReadCallback(snapshot_checker, "foo", snapshot2));
+ ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
+
+ db_->ReleaseSnapshot(snapshot1);
+ db_->ReleaseSnapshot(snapshot2);
+}
+
+class PerConfigMergeOperatorPinningTest
+ : public DBMergeOperatorTest,
+ public testing::WithParamInterface<std::tuple<bool, int>> {
+ public:
+ PerConfigMergeOperatorPinningTest() {
+ std::tie(disable_block_cache_, option_config_) = GetParam();
+ }
+
+ bool disable_block_cache_;
+};
+
+INSTANTIATE_TEST_CASE_P(
+ MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
+ ::testing::Combine(::testing::Bool(),
+ ::testing::Range(static_cast<int>(DBTestBase::kDefault),
+ static_cast<int>(DBTestBase::kEnd))));
+
+TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
+ if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
+ return;
+ }
+
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreateMaxOperator();
+ BlockBasedTableOptions table_options;
+ table_options.no_block_cache = disable_block_cache_;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ std::map<std::string, std::string> true_data;
+
+ const int kTotalMerges = 5000;
+ // Every key gets ~10 operands
+ const int kKeyRange = kTotalMerges / 10;
+ const int kOperandSize = 20;
+ const int kNumPutBefore = kKeyRange / 10; // 10% value
+ const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
+ const int kNumDelete = kKeyRange / 10; // 10% delete
+
+ // kNumPutBefore keys will have base values
+ for (int i = 0; i < kNumPutBefore; i++) {
+ std::string key = Key(rnd.Next() % kKeyRange);
+ std::string value = rnd.RandomString(kOperandSize);
+ ASSERT_OK(db_->Put(WriteOptions(), key, value));
+
+ true_data[key] = value;
+ }
+
+ // Do kTotalMerges merges
+ for (int i = 0; i < kTotalMerges; i++) {
+ std::string key = Key(rnd.Next() % kKeyRange);
+ std::string value = rnd.RandomString(kOperandSize);
+ ASSERT_OK(db_->Merge(WriteOptions(), key, value));
+
+ if (true_data[key] < value) {
+ true_data[key] = value;
+ }
+ }
+
+ // Overwrite random kNumPutAfter keys
+ for (int i = 0; i < kNumPutAfter; i++) {
+ std::string key = Key(rnd.Next() % kKeyRange);
+ std::string value = rnd.RandomString(kOperandSize);
+ ASSERT_OK(db_->Put(WriteOptions(), key, value));
+
+ true_data[key] = value;
+ }
+
+ // Delete random kNumDelete keys
+ for (int i = 0; i < kNumDelete; i++) {
+ std::string key = Key(rnd.Next() % kKeyRange);
+ ASSERT_OK(db_->Delete(WriteOptions(), key));
+
+ true_data.erase(key);
+ }
+
+ VerifyDBFromMap(true_data);
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}