diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/merge_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/merge_test.cc')
-rw-r--r-- | src/rocksdb/db/merge_test.cc | 629 |
1 files changed, 629 insertions, 0 deletions
diff --git a/src/rocksdb/db/merge_test.cc b/src/rocksdb/db/merge_test.cc new file mode 100644 index 000000000..0d373d41e --- /dev/null +++ b/src/rocksdb/db/merge_test.cc @@ -0,0 +1,629 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include <assert.h> + +#include <iostream> +#include <memory> + +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "db/write_batch_internal.h" +#include "port/stack_trace.h" +#include "rocksdb/cache.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/utilities/db_ttl.h" +#include "test_util/testharness.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +bool use_compression; + +class MergeTest : public testing::Test {}; + +size_t num_merge_operator_calls; +void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; } + +size_t num_partial_merge_calls; +void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; } + +class CountMergeOperator : public AssociativeMergeOperator { + public: + CountMergeOperator() { + mergeOperator_ = MergeOperators::CreateUInt64AddOperator(); + } + + bool Merge(const Slice& key, const Slice* existing_value, const Slice& value, + std::string* new_value, Logger* logger) const override { + assert(new_value->empty()); + ++num_merge_operator_calls; + if (existing_value == nullptr) { + new_value->assign(value.data(), value.size()); + return true; + } + + return mergeOperator_->PartialMerge(key, *existing_value, value, new_value, + logger); + } + + bool PartialMergeMulti(const Slice& key, + const std::deque<Slice>& operand_list, + std::string* new_value, + Logger* logger) const override { + assert(new_value->empty()); + ++num_partial_merge_calls; + return mergeOperator_->PartialMergeMulti(key, operand_list, new_value, + logger); + } + + const char* Name() const override { return "UInt64AddOperator"; } + + private: + std::shared_ptr<MergeOperator> mergeOperator_; +}; + +class EnvMergeTest : public EnvWrapper { + public: + EnvMergeTest() : EnvWrapper(Env::Default()) {} + static const char* kClassName() { return "MergeEnv"; } + const char* Name() const override { return kClassName(); } + // ~EnvMergeTest() override {} + + uint64_t NowNanos() override { + ++now_nanos_count_; + return target()->NowNanos(); + } + + static uint64_t now_nanos_count_; + + static std::unique_ptr<EnvMergeTest> singleton_; + + static EnvMergeTest* GetInstance() { + if (nullptr == singleton_) singleton_.reset(new EnvMergeTest); + return singleton_.get(); + } +}; + +uint64_t EnvMergeTest::now_nanos_count_{0}; +std::unique_ptr<EnvMergeTest> EnvMergeTest::singleton_; + +std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false, + const size_t max_successive_merges = 0) { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator = std::make_shared<CountMergeOperator>(); + options.max_successive_merges = max_successive_merges; + options.env = EnvMergeTest::GetInstance(); + EXPECT_OK(DestroyDB(dbname, Options())); + Status s; +// DBWithTTL is not supported in ROCKSDB_LITE +#ifndef ROCKSDB_LITE + if (ttl) { + DBWithTTL* db_with_ttl; + s = DBWithTTL::Open(options, dbname, &db_with_ttl); + db = db_with_ttl; + } else { + s = DB::Open(options, dbname, &db); + } +#else + assert(!ttl); + s = DB::Open(options, dbname, &db); +#endif // !ROCKSDB_LITE + EXPECT_OK(s); + assert(s.ok()); + // Allowed to call NowNanos during DB creation (in GenerateRawUniqueId() for + // session ID) + EnvMergeTest::now_nanos_count_ = 0; + return std::shared_ptr<DB>(db); +} + +// Imagine we are maintaining a set of uint64 counters. +// Each counter has a distinct name. And we would like +// to support four high level operations: +// set, add, get and remove +// This is a quick implementation without a Merge operation. +class Counters { + protected: + std::shared_ptr<DB> db_; + + WriteOptions put_option_; + ReadOptions get_option_; + WriteOptions delete_option_; + + uint64_t default_; + + public: + explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0) + : db_(db), + put_option_(), + get_option_(), + delete_option_(), + default_(defaultCount) { + assert(db_); + } + + virtual ~Counters() {} + + // public interface of Counters. + // All four functions return false + // if the underlying level db operation failed. + + // mapped to a levedb Put + bool set(const std::string& key, uint64_t value) { + // just treat the internal rep of int64 as the string + char buf[sizeof(value)]; + EncodeFixed64(buf, value); + Slice slice(buf, sizeof(value)); + auto s = db_->Put(put_option_, key, slice); + + if (s.ok()) { + return true; + } else { + std::cerr << s.ToString() << std::endl; + return false; + } + } + + // mapped to a rocksdb Delete + bool remove(const std::string& key) { + auto s = db_->Delete(delete_option_, key); + + if (s.ok()) { + return true; + } else { + std::cerr << s.ToString() << std::endl; + return false; + } + } + + // mapped to a rocksdb Get + bool get(const std::string& key, uint64_t* value) { + std::string str; + auto s = db_->Get(get_option_, key, &str); + + if (s.IsNotFound()) { + // return default value if not found; + *value = default_; + return true; + } else if (s.ok()) { + // deserialization + if (str.size() != sizeof(uint64_t)) { + std::cerr << "value corruption\n"; + return false; + } + *value = DecodeFixed64(&str[0]); + return true; + } else { + std::cerr << s.ToString() << std::endl; + return false; + } + } + + // 'add' is implemented as get -> modify -> set + // An alternative is a single merge operation, see MergeBasedCounters + virtual bool add(const std::string& key, uint64_t value) { + uint64_t base = default_; + return get(key, &base) && set(key, base + value); + } + + // convenience functions for testing + void assert_set(const std::string& key, uint64_t value) { + assert(set(key, value)); + } + + void assert_remove(const std::string& key) { assert(remove(key)); } + + uint64_t assert_get(const std::string& key) { + uint64_t value = default_; + int result = get(key, &value); + assert(result); + if (result == 0) exit(1); // Disable unused variable warning. + return value; + } + + void assert_add(const std::string& key, uint64_t value) { + int result = add(key, value); + assert(result); + if (result == 0) exit(1); // Disable unused variable warning. + } +}; + +// Implement 'add' directly with the new Merge operation +class MergeBasedCounters : public Counters { + private: + WriteOptions merge_option_; // for merge + + public: + explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0) + : Counters(db, defaultCount), merge_option_() {} + + // mapped to a rocksdb Merge operation + bool add(const std::string& key, uint64_t value) override { + char encoded[sizeof(uint64_t)]; + EncodeFixed64(encoded, value); + Slice slice(encoded, sizeof(uint64_t)); + auto s = db_->Merge(merge_option_, key, slice); + + if (s.ok()) { + return true; + } else { + std::cerr << s.ToString() << std::endl; + return false; + } + } +}; + +void dumpDb(DB* db) { + auto it = std::unique_ptr<Iterator>(db->NewIterator(ReadOptions())); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + // uint64_t value = DecodeFixed64(it->value().data()); + // std::cout << it->key().ToString() << ": " << value << std::endl; + } + assert(it->status().ok()); // Check for any errors found during the scan +} + +void testCounters(Counters& counters, DB* db, bool test_compaction) { + FlushOptions o; + o.wait = true; + + counters.assert_set("a", 1); + + if (test_compaction) { + ASSERT_OK(db->Flush(o)); + } + + ASSERT_EQ(counters.assert_get("a"), 1); + + counters.assert_remove("b"); + + // defaut value is 0 if non-existent + ASSERT_EQ(counters.assert_get("b"), 0); + + counters.assert_add("a", 2); + + if (test_compaction) { + ASSERT_OK(db->Flush(o)); + } + + // 1+2 = 3 + ASSERT_EQ(counters.assert_get("a"), 3); + + dumpDb(db); + + // 1+...+49 = ? + uint64_t sum = 0; + for (int i = 1; i < 50; i++) { + counters.assert_add("b", i); + sum += i; + } + ASSERT_EQ(counters.assert_get("b"), sum); + + dumpDb(db); + + if (test_compaction) { + ASSERT_OK(db->Flush(o)); + + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + dumpDb(db); + + ASSERT_EQ(counters.assert_get("a"), 3); + ASSERT_EQ(counters.assert_get("b"), sum); + } +} + +void testCountersWithFlushAndCompaction(Counters& counters, DB* db) { + ASSERT_OK(db->Put({}, "1", "1")); + ASSERT_OK(db->Flush(FlushOptions())); + + std::atomic<int> cnt{0}; + const auto get_thread_id = [&cnt]() { + thread_local int thread_id{cnt++}; + return thread_id; + }; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) { + int thread_id = get_thread_id(); + if (1 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_compact_thread:0"); + } else if (2 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:0"); + } + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) { + int thread_id = get_thread_id(); + if (0 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::set_options_thread:0"); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::set_options_thread:1"); + } + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) { + auto* mutex = reinterpret_cast<InstrumentedMutex*>(arg); + mutex->AssertHeld(); + int thread_id = get_thread_id(); + ASSERT_EQ(2, thread_id); + mutex->Unlock(); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:1"); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:2"); + mutex->Lock(); + }); + SyncPoint::GetInstance()->LoadDependency({ + {"testCountersWithFlushAndCompaction::set_options_thread:0", + "testCountersWithCompactionAndFlush:BeforeCompact"}, + {"testCountersWithFlushAndCompaction::bg_compact_thread:0", + "testCountersWithFlushAndCompaction:BeforeIncCounters"}, + {"testCountersWithFlushAndCompaction::bg_flush_thread:0", + "testCountersWithFlushAndCompaction::set_options_thread:1"}, + {"testCountersWithFlushAndCompaction::bg_flush_thread:1", + "testCountersWithFlushAndCompaction:BeforeVerification"}, + {"testCountersWithFlushAndCompaction:AfterGet", + "testCountersWithFlushAndCompaction::bg_flush_thread:2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread set_options_thread([&]() { + ASSERT_OK(reinterpret_cast<DBImpl*>(db)->SetOptions( + {{"disable_auto_compactions", "false"}})); + }); + TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact"); + port::Thread compact_thread([&]() { + ASSERT_OK(reinterpret_cast<DBImpl*>(db)->CompactRange( + CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr)); + }); + + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters"); + counters.add("test-key", 1); + + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db->Flush(flush_opts)); + + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification"); + std::string expected; + PutFixed64(&expected, 1); + std::string actual; + Status s = db->Get(ReadOptions(), "test-key", &actual); + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet"); + set_options_thread.join(); + compact_thread.join(); + ASSERT_OK(s); + ASSERT_EQ(expected, actual); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +void testSuccessiveMerge(Counters& counters, size_t max_num_merges, + size_t num_merges) { + counters.assert_remove("z"); + uint64_t sum = 0; + + for (size_t i = 1; i <= num_merges; ++i) { + resetNumMergeOperatorCalls(); + counters.assert_add("z", i); + sum += i; + + if (i % (max_num_merges + 1) == 0) { + ASSERT_EQ(num_merge_operator_calls, max_num_merges + 1); + } else { + ASSERT_EQ(num_merge_operator_calls, 0); + } + + resetNumMergeOperatorCalls(); + ASSERT_EQ(counters.assert_get("z"), sum); + ASSERT_EQ(num_merge_operator_calls, i % (max_num_merges + 1)); + } +} + +void testPartialMerge(Counters* counters, DB* db, size_t max_merge, + size_t min_merge, size_t count) { + FlushOptions o; + o.wait = true; + + // Test case 1: partial merge should be called when the number of merge + // operands exceeds the threshold. + uint64_t tmp_sum = 0; + resetNumPartialMergeCalls(); + for (size_t i = 1; i <= count; i++) { + counters->assert_add("b", i); + tmp_sum += i; + } + ASSERT_OK(db->Flush(o)); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ(tmp_sum, counters->assert_get("b")); + if (count > max_merge) { + // in this case, FullMerge should be called instead. + ASSERT_EQ(num_partial_merge_calls, 0U); + } else { + // if count >= min_merge, then partial merge should be called once. + ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1)); + } + + // Test case 2: partial merge should not be called when a put is found. + resetNumPartialMergeCalls(); + tmp_sum = 0; + ASSERT_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10")); + for (size_t i = 1; i <= count; i++) { + counters->assert_add("c", i); + tmp_sum += i; + } + ASSERT_OK(db->Flush(o)); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ(tmp_sum, counters->assert_get("c")); + ASSERT_EQ(num_partial_merge_calls, 0U); + // NowNanos was previously called in MergeHelper::FilterMerge(), which + // harmed performance. + ASSERT_EQ(EnvMergeTest::now_nanos_count_, 0U); +} + +void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges, + size_t num_merges) { + ASSERT_GT(num_merges, max_num_merges); + + Slice key("BatchSuccessiveMerge"); + uint64_t merge_value = 1; + char buf[sizeof(merge_value)]; + EncodeFixed64(buf, merge_value); + Slice merge_value_slice(buf, sizeof(merge_value)); + + // Create the batch + WriteBatch batch; + for (size_t i = 0; i < num_merges; ++i) { + ASSERT_OK(batch.Merge(key, merge_value_slice)); + } + + // Apply to memtable and count the number of merges + resetNumMergeOperatorCalls(); + ASSERT_OK(db->Write(WriteOptions(), &batch)); + ASSERT_EQ( + num_merge_operator_calls, + static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1)))); + + // Get the value + resetNumMergeOperatorCalls(); + std::string get_value_str; + ASSERT_OK(db->Get(ReadOptions(), key, &get_value_str)); + assert(get_value_str.size() == sizeof(uint64_t)); + uint64_t get_value = DecodeFixed64(&get_value_str[0]); + ASSERT_EQ(get_value, num_merges * merge_value); + ASSERT_EQ(num_merge_operator_calls, + static_cast<size_t>((num_merges % (max_num_merges + 1)))); +} + +void runTest(const std::string& dbname, const bool use_ttl = false) { + { + auto db = OpenDb(dbname, use_ttl); + + { + Counters counters(db, 0); + testCounters(counters, db.get(), true); + } + + { + MergeBasedCounters counters(db, 0); + testCounters(counters, db.get(), use_compression); + } + } + + ASSERT_OK(DestroyDB(dbname, Options())); + + { + size_t max_merge = 5; + auto db = OpenDb(dbname, use_ttl, max_merge); + MergeBasedCounters counters(db, 0); + testCounters(counters, db.get(), use_compression); + testSuccessiveMerge(counters, max_merge, max_merge * 2); + testSingleBatchSuccessiveMerge(db.get(), 5, 7); + ASSERT_OK(db->Close()); + ASSERT_OK(DestroyDB(dbname, Options())); + } + + { + size_t max_merge = 100; + // Min merge is hard-coded to 2. + uint32_t min_merge = 2; + for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) { + auto db = OpenDb(dbname, use_ttl, max_merge); + MergeBasedCounters counters(db, 0); + testPartialMerge(&counters, db.get(), max_merge, min_merge, count); + ASSERT_OK(db->Close()); + ASSERT_OK(DestroyDB(dbname, Options())); + } + { + auto db = OpenDb(dbname, use_ttl, max_merge); + MergeBasedCounters counters(db, 0); + testPartialMerge(&counters, db.get(), max_merge, min_merge, + min_merge * 10); + ASSERT_OK(db->Close()); + ASSERT_OK(DestroyDB(dbname, Options())); + } + } + + { + { + auto db = OpenDb(dbname); + MergeBasedCounters counters(db, 0); + counters.add("test-key", 1); + counters.add("test-key", 1); + counters.add("test-key", 1); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + + DB* reopen_db; + ASSERT_OK(DB::Open(Options(), dbname, &reopen_db)); + std::string value; + ASSERT_NOK(reopen_db->Get(ReadOptions(), "test-key", &value)); + delete reopen_db; + ASSERT_OK(DestroyDB(dbname, Options())); + } + + /* Temporary remove this test + { + std::cout << "Test merge-operator not set after reopen (recovery case)\n"; + { + auto db = OpenDb(dbname); + MergeBasedCounters counters(db, 0); + counters.add("test-key", 1); + counters.add("test-key", 1); + counters.add("test-key", 1); + } + + DB* reopen_db; + ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument()); + } + */ +} + +TEST_F(MergeTest, MergeDbTest) { + runTest(test::PerThreadDBPath("merge_testdb")); +} + +#ifndef ROCKSDB_LITE +TEST_F(MergeTest, MergeDbTtlTest) { + runTest(test::PerThreadDBPath("merge_testdbttl"), + true); // Run test on TTL database +} + +TEST_F(MergeTest, MergeWithCompactionAndFlush) { + const std::string dbname = + test::PerThreadDBPath("merge_with_compaction_and_flush"); + { + auto db = OpenDb(dbname); + { + MergeBasedCounters counters(db, 0); + testCountersWithFlushAndCompaction(counters, db.get()); + } + } + ASSERT_OK(DestroyDB(dbname, Options())); +} +#endif // !ROCKSDB_LITE + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::use_compression = false; + if (argc > 1) { + ROCKSDB_NAMESPACE::use_compression = true; + } + + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |