diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/cache/compressed_secondary_cache_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/cache/compressed_secondary_cache_test.cc')
-rw-r--r-- | src/rocksdb/cache/compressed_secondary_cache_test.cc | 1005 |
1 files changed, 1005 insertions, 0 deletions
diff --git a/src/rocksdb/cache/compressed_secondary_cache_test.cc b/src/rocksdb/cache/compressed_secondary_cache_test.cc new file mode 100644 index 000000000..574c257a7 --- /dev/null +++ b/src/rocksdb/cache/compressed_secondary_cache_test.cc @@ -0,0 +1,1005 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/compressed_secondary_cache.h" + +#include <iterator> +#include <memory> +#include <tuple> + +#include "memory/jemalloc_nodump_allocator.h" +#include "rocksdb/convenience.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +class CompressedSecondaryCacheTest : public testing::Test { + public: + CompressedSecondaryCacheTest() : fail_create_(false) {} + ~CompressedSecondaryCacheTest() override = default; + + protected: + class TestItem { + public: + TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { + memcpy(buf_.get(), buf, size); + } + ~TestItem() = default; + + char* Buf() { return buf_.get(); } + [[nodiscard]] size_t Size() const { return size_; } + + private: + std::unique_ptr<char[]> buf_; + size_t size_; + }; + + static size_t SizeCallback(void* obj) { + return reinterpret_cast<TestItem*>(obj)->Size(); + } + + static Status SaveToCallback(void* from_obj, size_t from_offset, + size_t length, void* out) { + auto item = reinterpret_cast<TestItem*>(from_obj); + const char* buf = item->Buf(); + EXPECT_EQ(length, item->Size()); + EXPECT_EQ(from_offset, 0); + memcpy(out, buf, length); + return Status::OK(); + } + + static void DeletionCallback(const Slice& /*key*/, void* obj) { + delete reinterpret_cast<TestItem*>(obj); + obj = nullptr; + } + + static Cache::CacheItemHelper helper_; + + static Status SaveToCallbackFail(void* /*obj*/, size_t /*offset*/, + size_t /*size*/, void* /*out*/) { + return Status::NotSupported(); + } + + static Cache::CacheItemHelper helper_fail_; + + Cache::CreateCallback test_item_creator = [&](const void* buf, size_t size, + void** out_obj, + size_t* charge) -> Status { + if (fail_create_) { + return Status::NotSupported(); + } + *out_obj = reinterpret_cast<void*>(new TestItem((char*)buf, size)); + *charge = size; + return Status::OK(); + }; + + void SetFailCreate(bool fail) { fail_create_ = fail; } + + void BasicTestHelper(std::shared_ptr<SecondaryCache> sec_cache, + bool sec_cache_is_compressed) { + get_perf_context()->Reset(); + bool is_in_sec_cache{true}; + // Lookup an non-existent key. + std::unique_ptr<SecondaryCacheResultHandle> handle0 = sec_cache->Lookup( + "k0", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); + ASSERT_EQ(handle0, nullptr); + + Random rnd(301); + // Insert and Lookup the item k1 for the first time. + std::string str1(rnd.RandomString(1000)); + TestItem item1(str1.data(), str1.length()); + // A dummy handle is inserted if the item is inserted for the first time. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 1); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, 0); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0); + + std::unique_ptr<SecondaryCacheResultHandle> handle1_1 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle1_1, nullptr); + + // Insert and Lookup the item k1 for the second time and advise erasing it. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1); + + std::unique_ptr<SecondaryCacheResultHandle> handle1_2 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); + ASSERT_NE(handle1_2, nullptr); + ASSERT_FALSE(is_in_sec_cache); + if (sec_cache_is_compressed) { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, + 1000); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, + 1007); + } else { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, 0); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0); + } + + std::unique_ptr<TestItem> val1 = + std::unique_ptr<TestItem>(static_cast<TestItem*>(handle1_2->Value())); + ASSERT_NE(val1, nullptr); + ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0); + + // Lookup the item k1 again. + std::unique_ptr<SecondaryCacheResultHandle> handle1_3 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); + ASSERT_EQ(handle1_3, nullptr); + + // Insert and Lookup the item k2. + std::string str2(rnd.RandomString(1000)); + TestItem item2(str2.data(), str2.length()); + ASSERT_OK(sec_cache->Insert("k2", &item2, + &CompressedSecondaryCacheTest::helper_)); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 2); + std::unique_ptr<SecondaryCacheResultHandle> handle2_1 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle2_1, nullptr); + + ASSERT_OK(sec_cache->Insert("k2", &item2, + &CompressedSecondaryCacheTest::helper_)); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 2); + if (sec_cache_is_compressed) { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, + 2000); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, + 2014); + } else { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, 0); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0); + } + std::unique_ptr<SecondaryCacheResultHandle> handle2_2 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_NE(handle2_2, nullptr); + std::unique_ptr<TestItem> val2 = + std::unique_ptr<TestItem>(static_cast<TestItem*>(handle2_2->Value())); + ASSERT_NE(val2, nullptr); + ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); + + std::vector<SecondaryCacheResultHandle*> handles = {handle1_2.get(), + handle2_2.get()}; + sec_cache->WaitAll(handles); + + sec_cache.reset(); + } + + void BasicTest(bool sec_cache_is_compressed, bool use_jemalloc) { + CompressedSecondaryCacheOptions opts; + opts.capacity = 2048; + opts.num_shard_bits = 0; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + opts.compression_type = CompressionType::kNoCompression; + sec_cache_is_compressed = false; + } + } else { + opts.compression_type = CompressionType::kNoCompression; + } + + if (use_jemalloc) { + JemallocAllocatorOptions jopts; + std::shared_ptr<MemoryAllocator> allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (s.ok()) { + opts.memory_allocator = allocator; + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } + std::shared_ptr<SecondaryCache> sec_cache = + NewCompressedSecondaryCache(opts); + + BasicTestHelper(sec_cache, sec_cache_is_compressed); + } + + void FailsTest(bool sec_cache_is_compressed) { + CompressedSecondaryCacheOptions secondary_cache_opts; + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 1100; + secondary_cache_opts.num_shard_bits = 0; + std::shared_ptr<SecondaryCache> sec_cache = + NewCompressedSecondaryCache(secondary_cache_opts); + + // Insert and Lookup the first item. + Random rnd(301); + std::string str1(rnd.RandomString(1000)); + TestItem item1(str1.data(), str1.length()); + // Insert a dummy handle. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + // Insert k1. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + + // Insert and Lookup the second item. + std::string str2(rnd.RandomString(200)); + TestItem item2(str2.data(), str2.length()); + // Insert a dummy handle, k1 is not evicted. + ASSERT_OK(sec_cache->Insert("k2", &item2, + &CompressedSecondaryCacheTest::helper_)); + bool is_in_sec_cache{false}; + std::unique_ptr<SecondaryCacheResultHandle> handle1 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle1, nullptr); + + // Insert k2 and k1 is evicted. + ASSERT_OK(sec_cache->Insert("k2", &item2, + &CompressedSecondaryCacheTest::helper_)); + std::unique_ptr<SecondaryCacheResultHandle> handle2 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_NE(handle2, nullptr); + std::unique_ptr<TestItem> val2 = + std::unique_ptr<TestItem>(static_cast<TestItem*>(handle2->Value())); + ASSERT_NE(val2, nullptr); + ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); + + // Insert k1 again and a dummy handle is inserted. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + + std::unique_ptr<SecondaryCacheResultHandle> handle1_1 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle1_1, nullptr); + + // Create Fails. + SetFailCreate(true); + std::unique_ptr<SecondaryCacheResultHandle> handle2_1 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); + ASSERT_EQ(handle2_1, nullptr); + + // Save Fails. + std::string str3 = rnd.RandomString(10); + TestItem item3(str3.data(), str3.length()); + // The Status is OK because a dummy handle is inserted. + ASSERT_OK(sec_cache->Insert("k3", &item3, + &CompressedSecondaryCacheTest::helper_fail_)); + ASSERT_NOK(sec_cache->Insert("k3", &item3, + &CompressedSecondaryCacheTest::helper_fail_)); + + sec_cache.reset(); + } + + void BasicIntegrationTest(bool sec_cache_is_compressed, + bool enable_custom_split_merge) { + CompressedSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + sec_cache_is_compressed = false; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 6000; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.enable_custom_split_merge = enable_custom_split_merge; + std::shared_ptr<SecondaryCache> secondary_cache = + NewCompressedSecondaryCache(secondary_cache_opts); + LRUCacheOptions lru_cache_opts( + /*_capacity =*/1300, /*_num_shard_bits =*/0, + /*_strict_capacity_limit =*/false, /*_high_pri_pool_ratio =*/0.5, + /*_memory_allocator =*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio =*/0.0); + lru_cache_opts.secondary_cache = secondary_cache; + std::shared_ptr<Cache> cache = NewLRUCache(lru_cache_opts); + std::shared_ptr<Statistics> stats = CreateDBStatistics(); + + get_perf_context()->Reset(); + Random rnd(301); + std::string str1 = rnd.RandomString(1001); + auto item1_1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length())); + + std::string str2 = rnd.RandomString(1012); + auto item2_1 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_1, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 1); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, 0); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0); + + std::string str3 = rnd.RandomString(1024); + auto item3_1 = new TestItem(str3.data(), str3.length()); + // After this Insert, primary cache contains k3 and secondary cache contains + // k1's dummy item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k3", item3_1, &CompressedSecondaryCacheTest::helper_, str3.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 2); + + // After this Insert, primary cache contains k1 and secondary cache contains + // k1's dummy item, k2's dummy item, and k3's dummy item. + auto item1_2 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 3); + + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's item, k2's dummy item, and k3's dummy item. + auto item2_2 = new TestItem(str2.data(), str2.length()); + ASSERT_OK(cache->Insert( + "k2", item2_2, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1); + if (sec_cache_is_compressed) { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, + str1.length()); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, + 1008); + } else { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, 0); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0); + } + + // After this Insert, primary cache contains k3 and secondary cache contains + // k1's item and k2's item. + auto item3_2 = new TestItem(str3.data(), str3.length()); + ASSERT_OK(cache->Insert( + "k3", item3_2, &CompressedSecondaryCacheTest::helper_, str3.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 2); + if (sec_cache_is_compressed) { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, + str1.length() + str2.length()); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, + 2027); + } else { + ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, 0); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0); + } + + Cache::Handle* handle; + handle = cache->Lookup("k3", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_NE(handle, nullptr); + auto val3 = static_cast<TestItem*>(cache->Value(handle)); + ASSERT_NE(val3, nullptr); + ASSERT_EQ(memcmp(val3->Buf(), item3_2->Buf(), item3_2->Size()), 0); + cache->Release(handle); + + // Lookup an non-existent key. + handle = cache->Lookup("k0", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_EQ(handle, nullptr); + + // This Lookup should just insert a dummy handle in the primary cache + // and the k1 is still in the secondary cache. + handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_NE(handle, nullptr); + ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 1); + auto val1_1 = static_cast<TestItem*>(cache->Value(handle)); + ASSERT_NE(val1_1, nullptr); + ASSERT_EQ(memcmp(val1_1->Buf(), str1.data(), str1.size()), 0); + cache->Release(handle); + + // This Lookup should erase k1 from the secondary cache and insert + // it into primary cache; then k3 is demoted. + // k2 and k3 are in secondary cache. + handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_NE(handle, nullptr); + ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 1); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 3); + cache->Release(handle); + + // k2 is still in secondary cache. + handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_NE(handle, nullptr); + ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 2); + cache->Release(handle); + + // Testing SetCapacity(). + ASSERT_OK(secondary_cache->SetCapacity(0)); + handle = cache->Lookup("k3", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_EQ(handle, nullptr); + + ASSERT_OK(secondary_cache->SetCapacity(7000)); + size_t capacity; + ASSERT_OK(secondary_cache->GetCapacity(capacity)); + ASSERT_EQ(capacity, 7000); + auto item1_3 = new TestItem(str1.data(), str1.length()); + // After this Insert, primary cache contains k1. + ASSERT_OK(cache->Insert( + "k1", item1_3, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 3); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 4); + + auto item2_3 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_3, &CompressedSecondaryCacheTest::helper_, str1.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 4); + + auto item1_4 = new TestItem(str1.data(), str1.length()); + // After this Insert, primary cache contains k1 and secondary cache contains + // k1's dummy item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k1", item1_4, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 5); + + auto item2_4 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's real item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_4, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 5); + // This Lookup should just insert a dummy handle in the primary cache + // and the k1 is still in the secondary cache. + handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 3); + + cache.reset(); + secondary_cache.reset(); + } + + void BasicIntegrationFailTest(bool sec_cache_is_compressed) { + CompressedSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 6000; + secondary_cache_opts.num_shard_bits = 0; + std::shared_ptr<SecondaryCache> secondary_cache = + NewCompressedSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts( + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); + opts.secondary_cache = secondary_cache; + std::shared_ptr<Cache> cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1001); + auto item1 = std::make_unique<TestItem>(str1.data(), str1.length()); + ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length())); + ASSERT_OK(cache->Insert("k1", item1.get(), + &CompressedSecondaryCacheTest::helper_, + str1.length())); + item1.release(); // Appease clang-analyze "potential memory leak" + + Cache::Handle* handle; + handle = cache->Lookup("k2", nullptr, test_item_creator, + Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, false); + ASSERT_EQ(handle, nullptr); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationSaveFailTest(bool sec_cache_is_compressed) { + CompressedSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 6000; + secondary_cache_opts.num_shard_bits = 0; + + std::shared_ptr<SecondaryCache> secondary_cache = + NewCompressedSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts( + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); + opts.secondary_cache = secondary_cache; + std::shared_ptr<Cache> cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1001); + auto item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, + &CompressedSecondaryCacheTest::helper_fail_, + str1.length())); + + std::string str2 = rnd.RandomString(1002); + auto item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, + &CompressedSecondaryCacheTest::helper_fail_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 demotion would have failed. + handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 was not promoted, k2 should still be in cache. + handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationCreateFailTest(bool sec_cache_is_compressed) { + CompressedSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 6000; + secondary_cache_opts.num_shard_bits = 0; + + std::shared_ptr<SecondaryCache> secondary_cache = + NewCompressedSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts( + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); + opts.secondary_cache = secondary_cache; + std::shared_ptr<Cache> cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1001); + auto item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, + str1.length())); + + std::string str2 = rnd.RandomString(1002); + auto item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + SetFailCreate(true); + handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 creation would have failed + handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationFullCapacityTest(bool sec_cache_is_compressed) { + CompressedSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 6000; + secondary_cache_opts.num_shard_bits = 0; + + std::shared_ptr<SecondaryCache> secondary_cache = + NewCompressedSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts( + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); + opts.secondary_cache = secondary_cache; + std::shared_ptr<Cache> cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1001); + auto item1_1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length())); + + std::string str2 = rnd.RandomString(1002); + std::string str2_clone{str2}; + auto item2 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's dummy item. + ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, + str2.length())); + + // After this Insert, primary cache contains k1 and secondary cache contains + // k1's dummy item and k2's dummy item. + auto item1_2 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length())); + + auto item2_2 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_2, &CompressedSecondaryCacheTest::helper_, str2.length())); + + Cache::Handle* handle2; + handle2 = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle2, nullptr); + cache->Release(handle2); + + // k1 promotion should fail because cache is at capacity and + // strict_capacity_limit is true, but the lookup should still succeed. + // A k1's dummy item is inserted into primary cache. + Cache::Handle* handle1; + handle1 = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle1, nullptr); + cache->Release(handle1); + + // Since k1 didn't get inserted, k2 should still be in cache + handle2 = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle2, nullptr); + cache->Release(handle2); + + cache.reset(); + secondary_cache.reset(); + } + + void SplitValueIntoChunksTest() { + JemallocAllocatorOptions jopts; + std::shared_ptr<MemoryAllocator> allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (!s.ok()) { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + + using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk; + std::unique_ptr<CompressedSecondaryCache> sec_cache = + std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5, 0.0, + allocator); + Random rnd(301); + // 8500 = 8169 + 233 + 98, so there should be 3 chunks after split. + size_t str_size{8500}; + std::string str = rnd.RandomString(static_cast<int>(str_size)); + size_t charge{0}; + CacheValueChunk* chunks_head = + sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); + ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); + + CacheValueChunk* current_chunk = chunks_head; + ASSERT_EQ(current_chunk->size, 8192 - sizeof(CacheValueChunk) + 1); + current_chunk = current_chunk->next; + ASSERT_EQ(current_chunk->size, 256 - sizeof(CacheValueChunk) + 1); + current_chunk = current_chunk->next; + ASSERT_EQ(current_chunk->size, 98); + + sec_cache->GetDeletionCallback(true)("dummy", chunks_head); + } + + void MergeChunksIntoValueTest() { + using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk; + Random rnd(301); + size_t size1{2048}; + std::string str1 = rnd.RandomString(static_cast<int>(size1)); + CacheValueChunk* current_chunk = reinterpret_cast<CacheValueChunk*>( + new char[sizeof(CacheValueChunk) - 1 + size1]); + CacheValueChunk* chunks_head = current_chunk; + memcpy(current_chunk->data, str1.data(), size1); + current_chunk->size = size1; + + size_t size2{256}; + std::string str2 = rnd.RandomString(static_cast<int>(size2)); + current_chunk->next = reinterpret_cast<CacheValueChunk*>( + new char[sizeof(CacheValueChunk) - 1 + size2]); + current_chunk = current_chunk->next; + memcpy(current_chunk->data, str2.data(), size2); + current_chunk->size = size2; + + size_t size3{31}; + std::string str3 = rnd.RandomString(static_cast<int>(size3)); + current_chunk->next = reinterpret_cast<CacheValueChunk*>( + new char[sizeof(CacheValueChunk) - 1 + size3]); + current_chunk = current_chunk->next; + memcpy(current_chunk->data, str3.data(), size3); + current_chunk->size = size3; + current_chunk->next = nullptr; + + std::string str = str1 + str2 + str3; + + std::unique_ptr<CompressedSecondaryCache> sec_cache = + std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5, 0.0); + size_t charge{0}; + CacheAllocationPtr value = + sec_cache->MergeChunksIntoValue(chunks_head, charge); + ASSERT_EQ(charge, size1 + size2 + size3); + std::string value_str{value.get(), charge}; + ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); + + while (chunks_head != nullptr) { + CacheValueChunk* tmp_chunk = chunks_head; + chunks_head = chunks_head->next; + tmp_chunk->Free(); + } + } + + void SplictValueAndMergeChunksTest() { + JemallocAllocatorOptions jopts; + std::shared_ptr<MemoryAllocator> allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (!s.ok()) { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + + using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk; + std::unique_ptr<CompressedSecondaryCache> sec_cache = + std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5, 0.0, + allocator); + Random rnd(301); + // 8500 = 8169 + 233 + 98, so there should be 3 chunks after split. + size_t str_size{8500}; + std::string str = rnd.RandomString(static_cast<int>(str_size)); + size_t charge{0}; + CacheValueChunk* chunks_head = + sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); + ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); + + CacheAllocationPtr value = + sec_cache->MergeChunksIntoValue(chunks_head, charge); + ASSERT_EQ(charge, str_size); + std::string value_str{value.get(), charge}; + ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); + + sec_cache->GetDeletionCallback(true)("dummy", chunks_head); + } + + private: + bool fail_create_; +}; + +Cache::CacheItemHelper CompressedSecondaryCacheTest::helper_( + CompressedSecondaryCacheTest::SizeCallback, + CompressedSecondaryCacheTest::SaveToCallback, + CompressedSecondaryCacheTest::DeletionCallback); + +Cache::CacheItemHelper CompressedSecondaryCacheTest::helper_fail_( + CompressedSecondaryCacheTest::SizeCallback, + CompressedSecondaryCacheTest::SaveToCallbackFail, + CompressedSecondaryCacheTest::DeletionCallback); + +class CompressedSecCacheTestWithCompressAndAllocatorParam + : public CompressedSecondaryCacheTest, + public ::testing::WithParamInterface<std::tuple<bool, bool>> { + public: + CompressedSecCacheTestWithCompressAndAllocatorParam() { + sec_cache_is_compressed_ = std::get<0>(GetParam()); + use_jemalloc_ = std::get<1>(GetParam()); + } + bool sec_cache_is_compressed_; + bool use_jemalloc_; +}; + +TEST_P(CompressedSecCacheTestWithCompressAndAllocatorParam, BasicTes) { + BasicTest(sec_cache_is_compressed_, use_jemalloc_); +} + +INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests, + CompressedSecCacheTestWithCompressAndAllocatorParam, + ::testing::Combine(testing::Bool(), testing::Bool())); + +class CompressedSecondaryCacheTestWithCompressionParam + : public CompressedSecondaryCacheTest, + public ::testing::WithParamInterface<bool> { + public: + CompressedSecondaryCacheTestWithCompressionParam() { + sec_cache_is_compressed_ = GetParam(); + } + bool sec_cache_is_compressed_; +}; + +#ifndef ROCKSDB_LITE + +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, BasicTestFromString) { + std::shared_ptr<SecondaryCache> sec_cache{nullptr}; + std::string sec_cache_uri; + if (sec_cache_is_compressed_) { + if (LZ4_Supported()) { + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;" + "compress_format_version=2"; + } else { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kNoCompression"; + sec_cache_is_compressed_ = false; + } + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); + } else { + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kNoCompression"; + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); + } + BasicTestHelper(sec_cache, sec_cache_is_compressed_); +} + +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + BasicTestFromStringWithSplit) { + std::shared_ptr<SecondaryCache> sec_cache{nullptr}; + std::string sec_cache_uri; + if (sec_cache_is_compressed_) { + if (LZ4_Supported()) { + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;" + "compress_format_version=2;enable_custom_split_merge=true"; + } else { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kNoCompression;" + "enable_custom_split_merge=true"; + sec_cache_is_compressed_ = false; + } + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); + } else { + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kNoCompression;" + "enable_custom_split_merge=true"; + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); + } + BasicTestHelper(sec_cache, sec_cache_is_compressed_); +} + +#endif // ROCKSDB_LITE + +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, FailsTest) { + FailsTest(sec_cache_is_compressed_); +} + +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + BasicIntegrationFailTest) { + BasicIntegrationFailTest(sec_cache_is_compressed_); +} + +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + IntegrationSaveFailTest) { + IntegrationSaveFailTest(sec_cache_is_compressed_); +} + +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + IntegrationCreateFailTest) { + IntegrationCreateFailTest(sec_cache_is_compressed_); +} + +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + IntegrationFullCapacityTest) { + IntegrationFullCapacityTest(sec_cache_is_compressed_); +} + +INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests, + CompressedSecondaryCacheTestWithCompressionParam, + testing::Bool()); + +class CompressedSecCacheTestWithCompressAndSplitParam + : public CompressedSecondaryCacheTest, + public ::testing::WithParamInterface<std::tuple<bool, bool>> { + public: + CompressedSecCacheTestWithCompressAndSplitParam() { + sec_cache_is_compressed_ = std::get<0>(GetParam()); + enable_custom_split_merge_ = std::get<1>(GetParam()); + } + bool sec_cache_is_compressed_; + bool enable_custom_split_merge_; +}; + +TEST_P(CompressedSecCacheTestWithCompressAndSplitParam, BasicIntegrationTest) { + BasicIntegrationTest(sec_cache_is_compressed_, enable_custom_split_merge_); +} + +INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests, + CompressedSecCacheTestWithCompressAndSplitParam, + ::testing::Combine(testing::Bool(), testing::Bool())); + +TEST_F(CompressedSecondaryCacheTest, SplitValueIntoChunksTest) { + SplitValueIntoChunksTest(); +} + +TEST_F(CompressedSecondaryCacheTest, MergeChunksIntoValueTest) { + MergeChunksIntoValueTest(); +} + +TEST_F(CompressedSecondaryCacheTest, SplictValueAndMergeChunksTest) { + SplictValueAndMergeChunksTest(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |