From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/db/db_block_cache_test.cc | 2313 +++++++++++++++++++++++++++++++++ 1 file changed, 2313 insertions(+) create mode 100644 src/rocksdb/db/db_block_cache_test.cc (limited to 'src/rocksdb/db/db_block_cache_test.cc') diff --git a/src/rocksdb/db/db_block_cache_test.cc b/src/rocksdb/db/db_block_cache_test.cc new file mode 100644 index 000000000..db80b82cb --- /dev/null +++ b/src/rocksdb/db/db_block_cache_test.cc @@ -0,0 +1,2313 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "cache/cache_key.h" +#include "cache/lru_cache.h" +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" +#include "env/unique_id_gen.h" +#include "port/stack_trace.h" +#include "rocksdb/persistent_cache.h" +#include "rocksdb/statistics.h" +#include "rocksdb/table.h" +#include "rocksdb/table_properties.h" +#include "table/block_based/block_based_table_reader.h" +#include "table/unique_id_impl.h" +#include "util/compression.h" +#include "util/defer.h" +#include "util/hash.h" +#include "util/math.h" +#include "util/random.h" +#include "utilities/fault_injection_fs.h" + +namespace ROCKSDB_NAMESPACE { + +class DBBlockCacheTest : public DBTestBase { + private: + size_t miss_count_ = 0; + size_t hit_count_ = 0; + size_t insert_count_ = 0; + size_t failure_count_ = 0; + size_t compression_dict_miss_count_ = 0; + size_t compression_dict_hit_count_ = 0; + size_t compression_dict_insert_count_ = 0; + size_t compressed_miss_count_ = 0; + size_t compressed_hit_count_ = 0; + size_t compressed_insert_count_ = 0; + size_t compressed_failure_count_ = 0; + + public: + const size_t kNumBlocks = 10; + const size_t kValueSize = 100; + + DBBlockCacheTest() + : DBTestBase("db_block_cache_test", /*env_do_fsync=*/true) {} + + BlockBasedTableOptions GetTableOptions() { + BlockBasedTableOptions table_options; + // Set a small enough block size so that each key-value get its own block. + table_options.block_size = 1; + return table_options; + } + + Options GetOptions(const BlockBasedTableOptions& table_options) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.avoid_flush_during_recovery = false; + // options.compression = kNoCompression; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + return options; + } + + void InitTable(const Options& /*options*/) { + std::string value(kValueSize, 'a'); + for (size_t i = 0; i < kNumBlocks; i++) { + ASSERT_OK(Put(std::to_string(i), value.c_str())); + } + } + + void RecordCacheCounters(const Options& options) { + miss_count_ = TestGetTickerCount(options, BLOCK_CACHE_MISS); + hit_count_ = TestGetTickerCount(options, BLOCK_CACHE_HIT); + insert_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD); + failure_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES); + compressed_miss_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + compressed_hit_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + compressed_insert_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD); + compressed_failure_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + } + + void RecordCacheCountersForCompressionDict(const Options& options) { + compression_dict_miss_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + compression_dict_hit_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_HIT); + compression_dict_insert_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD); + } + + void CheckCacheCounters(const Options& options, size_t expected_misses, + size_t expected_hits, size_t expected_inserts, + size_t expected_failures) { + size_t new_miss_count = TestGetTickerCount(options, BLOCK_CACHE_MISS); + size_t new_hit_count = TestGetTickerCount(options, BLOCK_CACHE_HIT); + size_t new_insert_count = TestGetTickerCount(options, BLOCK_CACHE_ADD); + size_t new_failure_count = + TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES); + ASSERT_EQ(miss_count_ + expected_misses, new_miss_count); + ASSERT_EQ(hit_count_ + expected_hits, new_hit_count); + ASSERT_EQ(insert_count_ + expected_inserts, new_insert_count); + ASSERT_EQ(failure_count_ + expected_failures, new_failure_count); + miss_count_ = new_miss_count; + hit_count_ = new_hit_count; + insert_count_ = new_insert_count; + failure_count_ = new_failure_count; + } + + void CheckCacheCountersForCompressionDict( + const Options& options, size_t expected_compression_dict_misses, + size_t expected_compression_dict_hits, + size_t expected_compression_dict_inserts) { + size_t new_compression_dict_miss_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + size_t new_compression_dict_hit_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_HIT); + size_t new_compression_dict_insert_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD); + ASSERT_EQ(compression_dict_miss_count_ + expected_compression_dict_misses, + new_compression_dict_miss_count); + ASSERT_EQ(compression_dict_hit_count_ + expected_compression_dict_hits, + new_compression_dict_hit_count); + ASSERT_EQ( + compression_dict_insert_count_ + expected_compression_dict_inserts, + new_compression_dict_insert_count); + compression_dict_miss_count_ = new_compression_dict_miss_count; + compression_dict_hit_count_ = new_compression_dict_hit_count; + compression_dict_insert_count_ = new_compression_dict_insert_count; + } + + void CheckCompressedCacheCounters(const Options& options, + size_t expected_misses, + size_t expected_hits, + size_t expected_inserts, + size_t expected_failures) { + size_t new_miss_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + size_t new_hit_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + size_t new_insert_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD); + size_t new_failure_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + ASSERT_EQ(compressed_miss_count_ + expected_misses, new_miss_count); + ASSERT_EQ(compressed_hit_count_ + expected_hits, new_hit_count); + ASSERT_EQ(compressed_insert_count_ + expected_inserts, new_insert_count); + ASSERT_EQ(compressed_failure_count_ + expected_failures, new_failure_count); + compressed_miss_count_ = new_miss_count; + compressed_hit_count_ = new_hit_count; + compressed_insert_count_ = new_insert_count; + compressed_failure_count_ = new_failure_count; + } + +#ifndef ROCKSDB_LITE + const std::array GetCacheEntryRoleCountsBg() { + // Verify in cache entry role stats + std::array cache_entry_role_counts; + std::map values; + EXPECT_TRUE(db_->GetMapProperty(DB::Properties::kFastBlockCacheEntryStats, + &values)); + for (size_t i = 0; i < kNumCacheEntryRoles; ++i) { + auto role = static_cast(i); + cache_entry_role_counts[i] = + ParseSizeT(values[BlockCacheEntryStatsMapKeys::EntryCount(role)]); + } + return cache_entry_role_counts; + } +#endif // ROCKSDB_LITE +}; + +TEST_F(DBBlockCacheTest, IteratorBlockCacheUsage) { + ReadOptions read_options; + read_options.fill_cache = false; + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + LRUCacheOptions co; + co.capacity = 0; + co.num_shard_bits = 0; + co.strict_capacity_limit = false; + // Needed not to count entry stats collector + co.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr cache = NewLRUCache(co); + table_options.block_cache = cache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + Reopen(options); + RecordCacheCounters(options); + + std::vector> iterators(kNumBlocks - 1); + Iterator* iter = nullptr; + + ASSERT_EQ(0, cache->GetUsage()); + iter = db_->NewIterator(read_options); + iter->Seek(std::to_string(0)); + ASSERT_LT(0, cache->GetUsage()); + delete iter; + iter = nullptr; + ASSERT_EQ(0, cache->GetUsage()); +} + +TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) { + ReadOptions read_options; + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + LRUCacheOptions co; + co.capacity = 0; + co.num_shard_bits = 0; + co.strict_capacity_limit = false; + // Needed not to count entry stats collector + co.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr cache = NewLRUCache(co); + table_options.block_cache = cache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + Reopen(options); + RecordCacheCounters(options); + + std::vector> iterators(kNumBlocks - 1); + Iterator* iter = nullptr; + + // Load blocks into cache. + for (size_t i = 0; i + 1 < kNumBlocks; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(std::to_string(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 1, 0, 1, 0); + iterators[i].reset(iter); + } + size_t usage = cache->GetUsage(); + ASSERT_LT(0, usage); + cache->SetCapacity(usage); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + + // Test with strict capacity limit. + cache->SetStrictCapacityLimit(true); + iter = db_->NewIterator(read_options); + iter->Seek(std::to_string(kNumBlocks - 1)); + ASSERT_TRUE(iter->status().IsMemoryLimit()); + CheckCacheCounters(options, 1, 0, 0, 1); + delete iter; + iter = nullptr; + + // Release iterators and access cache again. + for (size_t i = 0; i + 1 < kNumBlocks; i++) { + iterators[i].reset(); + CheckCacheCounters(options, 0, 0, 0, 0); + } + ASSERT_EQ(0, cache->GetPinnedUsage()); + for (size_t i = 0; i + 1 < kNumBlocks; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(std::to_string(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 0, 1, 0, 0); + iterators[i].reset(iter); + } +} + +#ifdef SNAPPY +TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.block_cache_compressed = nullptr; + table_options.block_size = 1; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + table_options.cache_index_and_filter_blocks = false; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.compression = CompressionType::kSnappyCompression; + + DestroyAndReopen(options); + + std::string value(kValueSize, 'a'); + for (size_t i = 0; i < kNumBlocks; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Flush()); + } + + ReadOptions read_options; + std::shared_ptr compressed_cache = NewLRUCache(1 << 25, 0, false); + LRUCacheOptions co; + co.capacity = 0; + co.num_shard_bits = 0; + co.strict_capacity_limit = false; + // Needed not to count entry stats collector + co.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr cache = NewLRUCache(co); + table_options.block_cache = cache; + table_options.no_block_cache = false; + table_options.block_cache_compressed = compressed_cache; + table_options.max_auto_readahead_size = 0; + table_options.cache_index_and_filter_blocks = false; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + Reopen(options); + RecordCacheCounters(options); + + // Load blocks into cache. + for (size_t i = 0; i < kNumBlocks - 1; i++) { + ASSERT_EQ(value, Get(std::to_string(i))); + CheckCacheCounters(options, 1, 0, 1, 0); + CheckCompressedCacheCounters(options, 1, 0, 1, 0); + } + + size_t usage = cache->GetUsage(); + ASSERT_EQ(0, usage); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + size_t compressed_usage = compressed_cache->GetUsage(); + ASSERT_LT(0, compressed_usage); + // Compressed block cache cannot be pinned. + ASSERT_EQ(0, compressed_cache->GetPinnedUsage()); + + // Set strict capacity limit flag. Now block will only load into compressed + // block cache. + cache->SetCapacity(usage); + cache->SetStrictCapacityLimit(true); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + + // Load last key block. + ASSERT_EQ( + "Operation aborted: Memory limit reached: Insert failed due to LRU cache " + "being full.", + Get(std::to_string(kNumBlocks - 1))); + // Failure will also record the miss counter. + CheckCacheCounters(options, 1, 0, 0, 1); + CheckCompressedCacheCounters(options, 1, 0, 1, 0); + + // Clear strict capacity limit flag. This time we shall hit compressed block + // cache and load into block cache. + cache->SetStrictCapacityLimit(false); + // Load last key block. + ASSERT_EQ(value, Get(std::to_string(kNumBlocks - 1))); + CheckCacheCounters(options, 1, 0, 1, 0); + CheckCompressedCacheCounters(options, 0, 1, 0, 0); +} + +namespace { +class PersistentCacheFromCache : public PersistentCache { + public: + PersistentCacheFromCache(std::shared_ptr cache, bool read_only) + : cache_(cache), read_only_(read_only) {} + + Status Insert(const Slice& key, const char* data, + const size_t size) override { + if (read_only_) { + return Status::NotSupported(); + } + std::unique_ptr copy{new char[size]}; + std::copy_n(data, size, copy.get()); + Status s = cache_->Insert( + key, copy.get(), size, + GetCacheEntryDeleterForRole()); + if (s.ok()) { + copy.release(); + } + return s; + } + + Status Lookup(const Slice& key, std::unique_ptr* data, + size_t* size) override { + auto handle = cache_->Lookup(key); + if (handle) { + char* ptr = static_cast(cache_->Value(handle)); + *size = cache_->GetCharge(handle); + data->reset(new char[*size]); + std::copy_n(ptr, *size, data->get()); + cache_->Release(handle); + return Status::OK(); + } else { + return Status::NotFound(); + } + } + + bool IsCompressed() override { return false; } + + StatsType Stats() override { return StatsType(); } + + std::string GetPrintableOptions() const override { return ""; } + + uint64_t NewId() override { return cache_->NewId(); } + + private: + std::shared_ptr cache_; + bool read_only_; +}; + +class ReadOnlyCacheWrapper : public CacheWrapper { + using CacheWrapper::CacheWrapper; + + using Cache::Insert; + Status Insert(const Slice& /*key*/, void* /*value*/, size_t /*charge*/, + void (*)(const Slice& key, void* value) /*deleter*/, + Handle** /*handle*/, Priority /*priority*/) override { + return Status::NotSupported(); + } +}; + +} // anonymous namespace + +TEST_F(DBBlockCacheTest, TestWithSameCompressed) { + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + std::shared_ptr rw_cache{NewLRUCache(1000000)}; + std::shared_ptr rw_pcache{ + new PersistentCacheFromCache(rw_cache, /*read_only*/ false)}; + // Exercise some obscure behavior with read-only wrappers + std::shared_ptr ro_cache{new ReadOnlyCacheWrapper(rw_cache)}; + std::shared_ptr ro_pcache{ + new PersistentCacheFromCache(rw_cache, /*read_only*/ true)}; + + // Simple same pointer + table_options.block_cache = rw_cache; + table_options.block_cache_compressed = rw_cache; + table_options.persistent_cache.reset(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ASSERT_EQ(TryReopen(options).ToString(), + "Invalid argument: block_cache same as block_cache_compressed not " + "currently supported, and would be bad for performance anyway"); + + // Other cases + table_options.block_cache = ro_cache; + table_options.block_cache_compressed = rw_cache; + table_options.persistent_cache.reset(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ASSERT_EQ(TryReopen(options).ToString(), + "Invalid argument: block_cache and block_cache_compressed share " + "the same key space, which is not supported"); + + table_options.block_cache = rw_cache; + table_options.block_cache_compressed = ro_cache; + table_options.persistent_cache.reset(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ASSERT_EQ(TryReopen(options).ToString(), + "Invalid argument: block_cache_compressed and block_cache share " + "the same key space, which is not supported"); + + table_options.block_cache = ro_cache; + table_options.block_cache_compressed.reset(); + table_options.persistent_cache = rw_pcache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ASSERT_EQ(TryReopen(options).ToString(), + "Invalid argument: block_cache and persistent_cache share the same " + "key space, which is not supported"); + + table_options.block_cache = rw_cache; + table_options.block_cache_compressed.reset(); + table_options.persistent_cache = ro_pcache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ASSERT_EQ(TryReopen(options).ToString(), + "Invalid argument: persistent_cache and block_cache share the same " + "key space, which is not supported"); + + table_options.block_cache.reset(); + table_options.no_block_cache = true; + table_options.block_cache_compressed = ro_cache; + table_options.persistent_cache = rw_pcache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ASSERT_EQ(TryReopen(options).ToString(), + "Invalid argument: block_cache_compressed and persistent_cache " + "share the same key space, which is not supported"); + + table_options.block_cache.reset(); + table_options.no_block_cache = true; + table_options.block_cache_compressed = rw_cache; + table_options.persistent_cache = ro_pcache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ASSERT_EQ(TryReopen(options).ToString(), + "Invalid argument: persistent_cache and block_cache_compressed " + "share the same key space, which is not supported"); +} +#endif // SNAPPY + +#ifndef ROCKSDB_LITE + +// Make sure that when options.block_cache is set, after a new table is +// created its index/filter blocks are added to block cache. +TEST_F(DBBlockCacheTest, IndexAndFilterBlocksOfNewTableAddedToCache) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "key", "val")); + // Create a new table. + ASSERT_OK(Flush(1)); + + // index/filter blocks added to block cache right after table creation. + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(2, /* only index/filter were added */ + TestGetTickerCount(options, BLOCK_CACHE_ADD)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS)); + uint64_t int_num; + ASSERT_TRUE( + dbfull()->GetIntProperty("rocksdb.estimate-table-readers-mem", &int_num)); + ASSERT_EQ(int_num, 0U); + + // Make sure filter block is in cache. + std::string value; + ReadOptions ropt; + db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value); + + // Miss count should remain the same. + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + + db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + + // Make sure index block is in cache. + auto index_block_hit = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT); + value = Get(1, "key"); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(index_block_hit + 1, + TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + value = Get(1, "key"); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(index_block_hit + 2, + TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); +} + +// With fill_cache = false, fills up the cache, then iterates over the entire +// db, verify dummy entries inserted in `BlockBasedTable::NewDataBlockIterator` +// does not cause heap-use-after-free errors in COMPILE_WITH_ASAN=1 runs +TEST_F(DBBlockCacheTest, FillCacheAndIterateDB) { + ReadOptions read_options; + read_options.fill_cache = false; + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + std::shared_ptr cache = NewLRUCache(10, 0, true); + table_options.block_cache = cache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + ASSERT_OK(Put("key2", "val2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("key3", "val3")); + ASSERT_OK(Put("key4", "val4")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("key5", "val5")); + ASSERT_OK(Put("key6", "val6")); + ASSERT_OK(Flush()); + + Iterator* iter = nullptr; + + iter = db_->NewIterator(read_options); + iter->Seek(std::to_string(0)); + while (iter->Valid()) { + iter->Next(); + } + delete iter; + iter = nullptr; +} + +TEST_F(DBBlockCacheTest, IndexAndFilterBlocksStats) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + LRUCacheOptions co; + // 500 bytes are enough to hold the first two blocks + co.capacity = 500; + co.num_shard_bits = 0; + co.strict_capacity_limit = false; + co.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr cache = NewLRUCache(co); + table_options.block_cache = cache; + table_options.filter_policy.reset(NewBloomFilterPolicy(20, true)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "longer_key", "val")); + // Create a new table + ASSERT_OK(Flush(1)); + size_t index_bytes_insert = + TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_INSERT); + size_t filter_bytes_insert = + TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_INSERT); + ASSERT_GT(index_bytes_insert, 0); + ASSERT_GT(filter_bytes_insert, 0); + ASSERT_EQ(cache->GetUsage(), index_bytes_insert + filter_bytes_insert); + // set the cache capacity to the current usage + cache->SetCapacity(index_bytes_insert + filter_bytes_insert); + // The index and filter eviction statistics were broken by the refactoring + // that moved the readers out of the block cache. Disabling these until we can + // bring the stats back. + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), 0); + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_EVICT), 0); + // Note that the second key needs to be no longer than the first one. + // Otherwise the second index block may not fit in cache. + ASSERT_OK(Put(1, "key", "val")); + // Create a new table + ASSERT_OK(Flush(1)); + // cache evicted old index and block entries + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_INSERT), + index_bytes_insert); + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_INSERT), + filter_bytes_insert); + // The index and filter eviction statistics were broken by the refactoring + // that moved the readers out of the block cache. Disabling these until we can + // bring the stats back. + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), + // index_bytes_insert); + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_EVICT), + // filter_bytes_insert); +} + +#if (defined OS_LINUX || defined OS_WIN) +TEST_F(DBBlockCacheTest, WarmCacheWithDataBlocksDuringFlush) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + BlockBasedTableOptions table_options; + table_options.block_cache = NewLRUCache(1 << 25, 0, false); + table_options.cache_index_and_filter_blocks = false; + table_options.prepopulate_block_cache = + BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + std::string value(kValueSize, 'a'); + for (size_t i = 1; i <= kNumBlocks; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Flush()); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD)); + ASSERT_EQ(value, Get(std::to_string(i))); + ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT)); + } + // Verify compaction not counted + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + EXPECT_EQ(kNumBlocks, + options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD)); +} + +// This test cache data, index and filter blocks during flush. +class DBBlockCacheTest1 : public DBTestBase, + public ::testing::WithParamInterface { + public: + const size_t kNumBlocks = 10; + const size_t kValueSize = 100; + DBBlockCacheTest1() : DBTestBase("db_block_cache_test1", true) {} +}; + +INSTANTIATE_TEST_CASE_P(DBBlockCacheTest1, DBBlockCacheTest1, + ::testing::Values(1, 2)); + +TEST_P(DBBlockCacheTest1, WarmCacheWithBlocksDuringFlush) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + BlockBasedTableOptions table_options; + table_options.block_cache = NewLRUCache(1 << 25, 0, false); + + uint32_t filter_type = GetParam(); + switch (filter_type) { + case 1: // partition_filter + table_options.partition_filters = true; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.filter_policy.reset(NewBloomFilterPolicy(10)); + break; + case 2: // full filter + table_options.filter_policy.reset(NewBloomFilterPolicy(10)); + break; + default: + assert(false); + } + + table_options.cache_index_and_filter_blocks = true; + table_options.prepopulate_block_cache = + BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + std::string value(kValueSize, 'a'); + for (size_t i = 1; i <= kNumBlocks; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Flush()); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD)); + if (filter_type == 1) { + ASSERT_EQ(2 * i, + options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD)); + ASSERT_EQ(2 * i, + options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD)); + } else { + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD)); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD)); + } + ASSERT_EQ(value, Get(std::to_string(i))); + + ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT)); + + ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(i * 3, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_HIT)); + if (filter_type == 1) { + ASSERT_EQ(i * 3, + options.statistics->getTickerCount(BLOCK_CACHE_FILTER_HIT)); + } else { + ASSERT_EQ(i * 2, + options.statistics->getTickerCount(BLOCK_CACHE_FILTER_HIT)); + } + ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_MISS)); + } + + // Verify compaction not counted + CompactRangeOptions cro; + // Ensure files are rewritten, not just trivially moved. + cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; + ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + EXPECT_EQ(kNumBlocks, + options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD)); + // Index and filter blocks are automatically warmed when the new table file + // is automatically opened at the end of compaction. This is not easily + // disabled so results in the new index and filter blocks being warmed. + if (filter_type == 1) { + EXPECT_EQ(2 * (1 + kNumBlocks), + options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD)); + EXPECT_EQ(2 * (1 + kNumBlocks), + options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD)); + } else { + EXPECT_EQ(1 + kNumBlocks, + options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD)); + EXPECT_EQ(1 + kNumBlocks, + options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD)); + } +} + +TEST_F(DBBlockCacheTest, DynamicallyWarmCacheDuringFlush) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + BlockBasedTableOptions table_options; + table_options.block_cache = NewLRUCache(1 << 25, 0, false); + table_options.cache_index_and_filter_blocks = false; + table_options.prepopulate_block_cache = + BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly; + + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + std::string value(kValueSize, 'a'); + + for (size_t i = 1; i <= 5; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Flush()); + ASSERT_EQ(1, + options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_ADD)); + + ASSERT_EQ(value, Get(std::to_string(i))); + ASSERT_EQ(0, + options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_ADD)); + ASSERT_EQ( + 0, options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_EQ(1, + options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_HIT)); + } + + ASSERT_OK(dbfull()->SetOptions( + {{"block_based_table_factory", "{prepopulate_block_cache=kDisable;}"}})); + + for (size_t i = 6; i <= kNumBlocks; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Flush()); + ASSERT_EQ(0, + options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_ADD)); + + ASSERT_EQ(value, Get(std::to_string(i))); + ASSERT_EQ(1, + options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_ADD)); + ASSERT_EQ( + 1, options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_EQ(0, + options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_HIT)); + } +} +#endif + +namespace { + +// A mock cache wraps LRUCache, and record how many entries have been +// inserted for each priority. +class MockCache : public LRUCache { + public: + static uint32_t high_pri_insert_count; + static uint32_t low_pri_insert_count; + + MockCache() + : LRUCache((size_t)1 << 25 /*capacity*/, 0 /*num_shard_bits*/, + false /*strict_capacity_limit*/, 0.0 /*high_pri_pool_ratio*/, + 0.0 /*low_pri_pool_ratio*/) {} + + using ShardedCache::Insert; + + Status Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper_cb, size_t charge, + Handle** handle, Priority priority) override { + DeleterFn delete_cb = helper_cb->del_cb; + if (priority == Priority::LOW) { + low_pri_insert_count++; + } else { + high_pri_insert_count++; + } + return LRUCache::Insert(key, value, charge, delete_cb, handle, priority); + } +}; + +uint32_t MockCache::high_pri_insert_count = 0; +uint32_t MockCache::low_pri_insert_count = 0; + +} // anonymous namespace + +TEST_F(DBBlockCacheTest, IndexAndFilterBlocksCachePriority) { + for (auto priority : {Cache::Priority::LOW, Cache::Priority::HIGH}) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.block_cache.reset(new MockCache()); + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + table_options.cache_index_and_filter_blocks_with_high_priority = + priority == Cache::Priority::HIGH ? true : false; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + MockCache::high_pri_insert_count = 0; + MockCache::low_pri_insert_count = 0; + + // Create a new table. + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Put("bar", "value")); + ASSERT_OK(Flush()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + // index/filter blocks added to block cache right after table creation. + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(2, /* only index/filter were added */ + TestGetTickerCount(options, BLOCK_CACHE_ADD)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS)); + if (priority == Cache::Priority::LOW) { + ASSERT_EQ(0u, MockCache::high_pri_insert_count); + ASSERT_EQ(2u, MockCache::low_pri_insert_count); + } else { + ASSERT_EQ(2u, MockCache::high_pri_insert_count); + ASSERT_EQ(0u, MockCache::low_pri_insert_count); + } + + // Access data block. + ASSERT_EQ("value", Get("foo")); + + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(3, /*adding data block*/ + TestGetTickerCount(options, BLOCK_CACHE_ADD)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS)); + + // Data block should be inserted with low priority. + if (priority == Cache::Priority::LOW) { + ASSERT_EQ(0u, MockCache::high_pri_insert_count); + ASSERT_EQ(3u, MockCache::low_pri_insert_count); + } else { + ASSERT_EQ(2u, MockCache::high_pri_insert_count); + ASSERT_EQ(1u, MockCache::low_pri_insert_count); + } + } +} + +namespace { + +// An LRUCache wrapper that can falsely report "not found" on Lookup. +// This allows us to manipulate BlockBasedTableReader into thinking +// another thread inserted the data in between Lookup and Insert, +// while mostly preserving the LRUCache interface/behavior. +class LookupLiarCache : public CacheWrapper { + int nth_lookup_not_found_ = 0; + + public: + explicit LookupLiarCache(std::shared_ptr target) + : CacheWrapper(std::move(target)) {} + + using Cache::Lookup; + Handle* Lookup(const Slice& key, Statistics* stats) override { + if (nth_lookup_not_found_ == 1) { + nth_lookup_not_found_ = 0; + return nullptr; + } + if (nth_lookup_not_found_ > 1) { + --nth_lookup_not_found_; + } + return CacheWrapper::Lookup(key, stats); + } + + // 1 == next lookup, 2 == after next, etc. + void SetNthLookupNotFound(int n) { nth_lookup_not_found_ = n; } +}; + +} // anonymous namespace + +TEST_F(DBBlockCacheTest, AddRedundantStats) { + const size_t capacity = size_t{1} << 25; + const int num_shard_bits = 0; // 1 shard + int iterations_tested = 0; + for (std::shared_ptr base_cache : + {NewLRUCache(capacity, num_shard_bits), + HyperClockCacheOptions( + capacity, + BlockBasedTableOptions().block_size /*estimated_value_size*/, + num_shard_bits) + .MakeSharedCache()}) { + if (!base_cache) { + // Skip clock cache when not supported + continue; + } + ++iterations_tested; + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + std::shared_ptr cache = + std::make_shared(base_cache); + + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.block_cache = cache; + table_options.filter_policy.reset(NewBloomFilterPolicy(50)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + // Create a new table. + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Put("bar", "value")); + ASSERT_OK(Flush()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + // Normal access filter+index+data. + ASSERT_EQ("value", Get("foo")); + + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD)); + // -------- + ASSERT_EQ(3, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT)); + // -------- + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT)); + + // Againt access filter+index+data, but force redundant load+insert on index + cache->SetNthLookupNotFound(2); + ASSERT_EQ("value", Get("bar")); + + ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD)); + // -------- + ASSERT_EQ(4, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT)); + // -------- + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT)); + + // Access just filter (with high probability), and force redundant + // load+insert + cache->SetNthLookupNotFound(1); + ASSERT_EQ("NOT_FOUND", Get("this key was not added")); + + EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD)); + EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD)); + EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD)); + // -------- + EXPECT_EQ(5, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + + EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT)); + EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT)); + EXPECT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT)); + // -------- + EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT)); + + // Access just data, forcing redundant load+insert + ReadOptions read_options; + std::unique_ptr iter{db_->NewIterator(read_options)}; + cache->SetNthLookupNotFound(1); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), "bar"); + + EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD)); + EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD)); + EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD)); + // -------- + EXPECT_EQ(6, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + + EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT)); + EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT)); + EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT)); + // -------- + EXPECT_EQ(3, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT)); + } + EXPECT_GE(iterations_tested, 1); +} + +TEST_F(DBBlockCacheTest, ParanoidFileChecks) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.level0_file_num_compaction_trigger = 2; + options.paranoid_file_checks = true; + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = false; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "1_key", "val")); + ASSERT_OK(Put(1, "9_key", "val")); + // Create a new table. + ASSERT_OK(Flush(1)); + ASSERT_EQ(1, /* read and cache data block */ + TestGetTickerCount(options, BLOCK_CACHE_ADD)); + + ASSERT_OK(Put(1, "1_key2", "val2")); + ASSERT_OK(Put(1, "9_key2", "val2")); + // Create a new SST file. This will further trigger a compaction + // and generate another file. + ASSERT_OK(Flush(1)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(3, /* Totally 3 files created up to now */ + TestGetTickerCount(options, BLOCK_CACHE_ADD)); + + // After disabling options.paranoid_file_checks. NO further block + // is added after generating a new file. + ASSERT_OK( + dbfull()->SetOptions(handles_[1], {{"paranoid_file_checks", "false"}})); + + ASSERT_OK(Put(1, "1_key3", "val3")); + ASSERT_OK(Put(1, "9_key3", "val3")); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(1, "1_key4", "val4")); + ASSERT_OK(Put(1, "9_key4", "val4")); + ASSERT_OK(Flush(1)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(3, /* Totally 3 files created up to now */ + TestGetTickerCount(options, BLOCK_CACHE_ADD)); +} + +TEST_F(DBBlockCacheTest, CompressedCache) { + if (!Snappy_Supported()) { + return; + } + int num_iter = 80; + + // Run this test three iterations. + // Iteration 1: only a uncompressed block cache + // Iteration 2: only a compressed block cache + // Iteration 3: both block cache and compressed cache + // Iteration 4: both block cache and compressed cache, but DB is not + // compressed + for (int iter = 0; iter < 4; iter++) { + Options options = CurrentOptions(); + options.write_buffer_size = 64 * 1024; // small write buffer + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + BlockBasedTableOptions table_options; + switch (iter) { + case 0: + // only uncompressed block cache + table_options.block_cache = NewLRUCache(8 * 1024); + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 1: + // no block cache, only compressed cache + table_options.no_block_cache = true; + table_options.block_cache = nullptr; + table_options.block_cache_compressed = NewLRUCache(8 * 1024); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 2: + // both compressed and uncompressed block cache + table_options.block_cache = NewLRUCache(1024); + table_options.block_cache_compressed = NewLRUCache(8 * 1024); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 3: + // both block cache and compressed cache, but DB is not compressed + // also, make block cache sizes bigger, to trigger block cache hits + table_options.block_cache = NewLRUCache(1024 * 1024); + table_options.block_cache_compressed = NewLRUCache(8 * 1024 * 1024); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.compression = kNoCompression; + break; + default: + FAIL(); + } + CreateAndReopenWithCF({"pikachu"}, options); + // default column family doesn't have block cache + Options no_block_cache_opts; + no_block_cache_opts.statistics = options.statistics; + no_block_cache_opts = CurrentOptions(no_block_cache_opts); + BlockBasedTableOptions table_options_no_bc; + table_options_no_bc.no_block_cache = true; + no_block_cache_opts.table_factory.reset( + NewBlockBasedTableFactory(table_options_no_bc)); + ReopenWithColumnFamilies( + {"default", "pikachu"}, + std::vector({no_block_cache_opts, options})); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + std::vector values; + std::string str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = rnd.RandomString(1000); + } + values.push_back(str); + ASSERT_OK(Put(1, Key(i), values[i])); + } + + // flush all data from memtable so that reads are from block cache + ASSERT_OK(Flush(1)); + + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(1, Key(i)), values[i]); + } + + // check that we triggered the appropriate code paths in the cache + switch (iter) { + case 0: + // only uncompressed block cache + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_MISS), 0); + ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 1: + // no block cache, only compressed cache + ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_MISS), 0); + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 2: + // both compressed and uncompressed block cache + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_MISS), 0); + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 3: + // both compressed and uncompressed block cache + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_MISS), 0); + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_HIT), 0); + ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS), 0); + // compressed doesn't have any hits since blocks are not compressed on + // storage + ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT), 0); + break; + default: + FAIL(); + } + + options.create_if_missing = true; + DestroyAndReopen(options); + } +} + +TEST_F(DBBlockCacheTest, CacheCompressionDict) { + const int kNumFiles = 4; + const int kNumEntriesPerFile = 128; + const int kNumBytesPerEntry = 1024; + + // Try all the available libraries that support dictionary compression + std::vector compression_types; + if (Zlib_Supported()) { + compression_types.push_back(kZlibCompression); + } + if (LZ4_Supported()) { + compression_types.push_back(kLZ4Compression); + compression_types.push_back(kLZ4HCCompression); + } + if (ZSTD_Supported()) { + compression_types.push_back(kZSTD); + } else if (ZSTDNotFinal_Supported()) { + compression_types.push_back(kZSTDNotFinalCompression); + } + Random rnd(301); + for (auto compression_type : compression_types) { + Options options = CurrentOptions(); + options.bottommost_compression = compression_type; + options.bottommost_compression_opts.max_dict_bytes = 4096; + options.bottommost_compression_opts.enabled = true; + options.create_if_missing = true; + options.num_levels = 2; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry; + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.block_cache.reset(new MockCache()); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + RecordCacheCountersForCompressionDict(options); + + for (int i = 0; i < kNumFiles; ++i) { + ASSERT_EQ(i, NumTableFilesAtLevel(0, 0)); + for (int j = 0; j < kNumEntriesPerFile; ++j) { + std::string value = rnd.RandomString(kNumBytesPerEntry); + ASSERT_OK(Put(Key(j * kNumFiles + i), value.c_str())); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(1)); + + // Compression dictionary blocks are preloaded. + CheckCacheCountersForCompressionDict( + options, kNumFiles /* expected_compression_dict_misses */, + 0 /* expected_compression_dict_hits */, + kNumFiles /* expected_compression_dict_inserts */); + + // Seek to a key in a file. It should cause the SST's dictionary meta-block + // to be read. + RecordCacheCounters(options); + RecordCacheCountersForCompressionDict(options); + ReadOptions read_options; + ASSERT_NE("NOT_FOUND", Get(Key(kNumFiles * kNumEntriesPerFile - 1))); + // Two block hits: index and dictionary since they are prefetched + // One block missed/added: data block + CheckCacheCounters(options, 1 /* expected_misses */, 2 /* expected_hits */, + 1 /* expected_inserts */, 0 /* expected_failures */); + CheckCacheCountersForCompressionDict( + options, 0 /* expected_compression_dict_misses */, + 1 /* expected_compression_dict_hits */, + 0 /* expected_compression_dict_inserts */); + } +} + +static void ClearCache(Cache* cache) { + auto roles = CopyCacheDeleterRoleMap(); + std::deque keys; + Cache::ApplyToAllEntriesOptions opts; + auto callback = [&](const Slice& key, void* /*value*/, size_t /*charge*/, + Cache::DeleterFn deleter) { + if (roles.find(deleter) == roles.end()) { + // Keep the stats collector + return; + } + keys.push_back(key.ToString()); + }; + cache->ApplyToAllEntries(callback, opts); + for (auto& k : keys) { + cache->Erase(k); + } +} + +TEST_F(DBBlockCacheTest, CacheEntryRoleStats) { + const size_t capacity = size_t{1} << 25; + int iterations_tested = 0; + for (bool partition : {false, true}) { + for (std::shared_ptr cache : + {NewLRUCache(capacity), + HyperClockCacheOptions( + capacity, + BlockBasedTableOptions().block_size /*estimated_value_size*/) + .MakeSharedCache()}) { + ++iterations_tested; + + Options options = CurrentOptions(); + SetTimeElapseOnlySleepOnReopen(&options); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.max_open_files = 13; + options.table_cache_numshardbits = 0; + // If this wakes up, it could interfere with test + options.stats_dump_period_sec = 0; + + BlockBasedTableOptions table_options; + table_options.block_cache = cache; + table_options.cache_index_and_filter_blocks = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(50)); + if (partition) { + table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch; + table_options.partition_filters = true; + } + table_options.metadata_cache_options.top_level_index_pinning = + PinningTier::kNone; + table_options.metadata_cache_options.partition_pinning = + PinningTier::kNone; + table_options.metadata_cache_options.unpartitioned_pinning = + PinningTier::kNone; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + // Create a new table. + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Put("bar", "value")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("zfoo", "value")); + ASSERT_OK(Put("zbar", "value")); + ASSERT_OK(Flush()); + + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + + // Fresh cache + ClearCache(cache.get()); + + std::array expected{}; + // For CacheEntryStatsCollector + expected[static_cast(CacheEntryRole::kMisc)] = 1; + EXPECT_EQ(expected, GetCacheEntryRoleCountsBg()); + + std::array prev_expected = expected; + + // First access only filters + ASSERT_EQ("NOT_FOUND", Get("different from any key added")); + expected[static_cast(CacheEntryRole::kFilterBlock)] += 2; + if (partition) { + expected[static_cast(CacheEntryRole::kFilterMetaBlock)] += 2; + } + // Within some time window, we will get cached entry stats + EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg()); + // Not enough to force a miss + env_->MockSleepForSeconds(45); + EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg()); + // Enough to force a miss + env_->MockSleepForSeconds(601); + EXPECT_EQ(expected, GetCacheEntryRoleCountsBg()); + + // Now access index and data block + ASSERT_EQ("value", Get("foo")); + expected[static_cast(CacheEntryRole::kIndexBlock)]++; + if (partition) { + // top-level + expected[static_cast(CacheEntryRole::kIndexBlock)]++; + } + expected[static_cast(CacheEntryRole::kDataBlock)]++; + // Enough to force a miss + env_->MockSleepForSeconds(601); + // But inject a simulated long scan so that we need a longer + // interval to force a miss next time. + SyncPoint::GetInstance()->SetCallBack( + "CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", + [this](void*) { + // To spend no more than 0.2% of time scanning, we would need + // interval of at least 10000s + env_->MockSleepForSeconds(20); + }); + SyncPoint::GetInstance()->EnableProcessing(); + EXPECT_EQ(expected, GetCacheEntryRoleCountsBg()); + prev_expected = expected; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // The same for other file + ASSERT_EQ("value", Get("zfoo")); + expected[static_cast(CacheEntryRole::kIndexBlock)]++; + if (partition) { + // top-level + expected[static_cast(CacheEntryRole::kIndexBlock)]++; + } + expected[static_cast(CacheEntryRole::kDataBlock)]++; + // Because of the simulated long scan, this is not enough to force + // a miss + env_->MockSleepForSeconds(601); + EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg()); + // But this is enough + env_->MockSleepForSeconds(10000); + EXPECT_EQ(expected, GetCacheEntryRoleCountsBg()); + prev_expected = expected; + + // Also check the GetProperty interface + std::map values; + ASSERT_TRUE( + db_->GetMapProperty(DB::Properties::kBlockCacheEntryStats, &values)); + + for (size_t i = 0; i < kNumCacheEntryRoles; ++i) { + auto role = static_cast(i); + EXPECT_EQ(std::to_string(expected[i]), + values[BlockCacheEntryStatsMapKeys::EntryCount(role)]); + } + + // Add one for kWriteBuffer + { + WriteBufferManager wbm(size_t{1} << 20, cache); + wbm.ReserveMem(1024); + expected[static_cast(CacheEntryRole::kWriteBuffer)]++; + // Now we check that the GetProperty interface is more agressive about + // re-scanning stats, but not totally aggressive. + // Within some time window, we will get cached entry stats + env_->MockSleepForSeconds(1); + EXPECT_EQ(std::to_string(prev_expected[static_cast( + CacheEntryRole::kWriteBuffer)]), + values[BlockCacheEntryStatsMapKeys::EntryCount( + CacheEntryRole::kWriteBuffer)]); + // Not enough for a "background" miss but enough for a "foreground" miss + env_->MockSleepForSeconds(45); + + ASSERT_TRUE(db_->GetMapProperty(DB::Properties::kBlockCacheEntryStats, + &values)); + EXPECT_EQ( + std::to_string( + expected[static_cast(CacheEntryRole::kWriteBuffer)]), + values[BlockCacheEntryStatsMapKeys::EntryCount( + CacheEntryRole::kWriteBuffer)]); + } + prev_expected = expected; + + // With collector pinned in cache, we should be able to hit + // even if the cache is full + ClearCache(cache.get()); + Cache::Handle* h = nullptr; + if (strcmp(cache->Name(), "LRUCache") == 0) { + ASSERT_OK(cache->Insert("Fill-it-up", nullptr, capacity + 1, + GetNoopDeleterForRole(), + &h, Cache::Priority::HIGH)); + } else { + // For ClockCache we use a 16-byte key. + ASSERT_OK(cache->Insert("Fill-it-up-xxxxx", nullptr, capacity + 1, + GetNoopDeleterForRole(), + &h, Cache::Priority::HIGH)); + } + ASSERT_GT(cache->GetUsage(), cache->GetCapacity()); + expected = {}; + // For CacheEntryStatsCollector + expected[static_cast(CacheEntryRole::kMisc)] = 1; + // For Fill-it-up + expected[static_cast(CacheEntryRole::kMisc)]++; + // Still able to hit on saved stats + EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg()); + // Enough to force a miss + env_->MockSleepForSeconds(1000); + EXPECT_EQ(expected, GetCacheEntryRoleCountsBg()); + + cache->Release(h); + + // Now we test that the DB mutex is not held during scans, for the ways + // we know how to (possibly) trigger them. Without a better good way to + // check this, we simply inject an acquire & release of the DB mutex + // deep in the stat collection code. If we were already holding the + // mutex, that is UB that would at least be found by TSAN. + int scan_count = 0; + SyncPoint::GetInstance()->SetCallBack( + "CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", + [this, &scan_count](void*) { + dbfull()->TEST_LockMutex(); + dbfull()->TEST_UnlockMutex(); + ++scan_count; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Different things that might trigger a scan, with mock sleeps to + // force a miss. + env_->MockSleepForSeconds(10000); + dbfull()->DumpStats(); + ASSERT_EQ(scan_count, 1); + + env_->MockSleepForSeconds(60); + ASSERT_TRUE(db_->GetMapProperty(DB::Properties::kFastBlockCacheEntryStats, + &values)); + ASSERT_EQ(scan_count, 1); + ASSERT_TRUE( + db_->GetMapProperty(DB::Properties::kBlockCacheEntryStats, &values)); + ASSERT_EQ(scan_count, 2); + + env_->MockSleepForSeconds(10000); + ASSERT_TRUE(db_->GetMapProperty(DB::Properties::kFastBlockCacheEntryStats, + &values)); + ASSERT_EQ(scan_count, 3); + + env_->MockSleepForSeconds(60); + std::string value_str; + ASSERT_TRUE(db_->GetProperty(DB::Properties::kFastBlockCacheEntryStats, + &value_str)); + ASSERT_EQ(scan_count, 3); + ASSERT_TRUE( + db_->GetProperty(DB::Properties::kBlockCacheEntryStats, &value_str)); + ASSERT_EQ(scan_count, 4); + + env_->MockSleepForSeconds(10000); + ASSERT_TRUE(db_->GetProperty(DB::Properties::kFastBlockCacheEntryStats, + &value_str)); + ASSERT_EQ(scan_count, 5); + + ASSERT_TRUE(db_->GetProperty(DB::Properties::kCFStats, &value_str)); + // To match historical speed, querying this property no longer triggers + // a scan, even if results are old. But periodic dump stats should keep + // things reasonably updated. + ASSERT_EQ(scan_count, /*unchanged*/ 5); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + } + EXPECT_GE(iterations_tested, 1); + } +} + +namespace { + +void DummyFillCache(Cache& cache, size_t entry_size, + std::vector>& handles) { + // fprintf(stderr, "Entry size: %zu\n", entry_size); + handles.clear(); + cache.EraseUnRefEntries(); + void* fake_value = &cache; + size_t capacity = cache.GetCapacity(); + OffsetableCacheKey ck{"abc", "abc", 42}; + for (size_t my_usage = 0; my_usage < capacity;) { + size_t charge = std::min(entry_size, capacity - my_usage); + Cache::Handle* handle; + Status st = cache.Insert(ck.WithOffset(my_usage).AsSlice(), fake_value, + charge, /*deleter*/ nullptr, &handle); + ASSERT_OK(st); + handles.emplace_back(&cache, handle); + my_usage += charge; + } +} + +class CountingLogger : public Logger { + public: + ~CountingLogger() override {} + using Logger::Logv; + void Logv(const InfoLogLevel log_level, const char* format, + va_list /*ap*/) override { + if (std::strstr(format, "HyperClockCache") == nullptr) { + // Not a match + return; + } + // static StderrLogger debug; + // debug.Logv(log_level, format, ap); + if (log_level == InfoLogLevel::INFO_LEVEL) { + ++info_count_; + } else if (log_level == InfoLogLevel::WARN_LEVEL) { + ++warn_count_; + } else if (log_level == InfoLogLevel::ERROR_LEVEL) { + ++error_count_; + } + } + + std::array PopCounts() { + std::array rv{{info_count_, warn_count_, error_count_}}; + info_count_ = warn_count_ = error_count_ = 0; + return rv; + } + + private: + int info_count_{}; + int warn_count_{}; + int error_count_{}; +}; + +} // namespace + +TEST_F(DBBlockCacheTest, HyperClockCacheReportProblems) { + size_t capacity = 1024 * 1024; + size_t value_size_est = 8 * 1024; + HyperClockCacheOptions hcc_opts{capacity, value_size_est}; + hcc_opts.num_shard_bits = 2; // 4 shards + hcc_opts.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr cache = hcc_opts.MakeSharedCache(); + std::shared_ptr logger = std::make_shared(); + + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + table_options.block_cache = cache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.info_log = logger; + // Going to sample more directly + options.stats_dump_period_sec = 0; + Reopen(options); + + std::vector> handles; + + // Clear anything from DB startup + logger->PopCounts(); + + // Fill cache based on expected size and check that when we + // don't report anything relevant in periodic stats dump + DummyFillCache(*cache, value_size_est, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{0, 0, 0}})); + + // Same, within reasonable bounds + DummyFillCache(*cache, value_size_est - value_size_est / 4, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{0, 0, 0}})); + + DummyFillCache(*cache, value_size_est + value_size_est / 3, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{0, 0, 0}})); + + // Estimate too high (value size too low) eventually reports ERROR + DummyFillCache(*cache, value_size_est / 2, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{0, 1, 0}})); + + DummyFillCache(*cache, value_size_est / 3, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{0, 0, 1}})); + + // Estimate too low (value size too high) starts with INFO + // and is only WARNING in the worst case + DummyFillCache(*cache, value_size_est * 2, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{1, 0, 0}})); + + DummyFillCache(*cache, value_size_est * 3, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{0, 1, 0}})); + + DummyFillCache(*cache, value_size_est * 20, handles); + dbfull()->DumpStats(); + EXPECT_EQ(logger->PopCounts(), (std::array{{0, 1, 0}})); +} + +#endif // ROCKSDB_LITE + +class DBBlockCacheKeyTest + : public DBTestBase, + public testing::WithParamInterface> { + public: + DBBlockCacheKeyTest() + : DBTestBase("db_block_cache_test", /*env_do_fsync=*/false) {} + + void SetUp() override { + use_compressed_cache_ = std::get<0>(GetParam()); + exclude_file_numbers_ = std::get<1>(GetParam()); + } + + bool use_compressed_cache_; + bool exclude_file_numbers_; +}; + +// Disable LinkFile so that we can physically copy a DB using Checkpoint. +// Disable file GetUniqueId to enable stable cache keys. +class StableCacheKeyTestFS : public FaultInjectionTestFS { + public: + explicit StableCacheKeyTestFS(const std::shared_ptr& base) + : FaultInjectionTestFS(base) { + SetFailGetUniqueId(true); + } + + virtual ~StableCacheKeyTestFS() override {} + + IOStatus LinkFile(const std::string&, const std::string&, const IOOptions&, + IODebugContext*) override { + return IOStatus::NotSupported("Disabled"); + } +}; + +TEST_P(DBBlockCacheKeyTest, StableCacheKeys) { + std::shared_ptr test_fs{ + new StableCacheKeyTestFS(env_->GetFileSystem())}; + std::unique_ptr test_env{ + new CompositeEnvWrapper(env_, test_fs)}; + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.env = test_env.get(); + + // Corrupting the table properties corrupts the unique id. + // Ignore the unique id recorded in the manifest. + options.verify_sst_unique_id_in_manifest = false; + + BlockBasedTableOptions table_options; + + int key_count = 0; + uint64_t expected_stat = 0; + + std::function verify_stats; + if (use_compressed_cache_) { + if (!Snappy_Supported()) { + ROCKSDB_GTEST_SKIP("Compressed cache test requires snappy support"); + return; + } + options.compression = CompressionType::kSnappyCompression; + table_options.no_block_cache = true; + table_options.block_cache_compressed = NewLRUCache(1 << 25, 0, false); + verify_stats = [&options, &expected_stat] { + // One for ordinary SST file and one for external SST file + ASSERT_EQ(expected_stat, + options.statistics->getTickerCount(BLOCK_CACHE_COMPRESSED_ADD)); + }; + } else { + table_options.cache_index_and_filter_blocks = true; + table_options.block_cache = NewLRUCache(1 << 25, 0, false); + verify_stats = [&options, &expected_stat] { + ASSERT_EQ(expected_stat, + options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD)); + ASSERT_EQ(expected_stat, + options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD)); + ASSERT_EQ(expected_stat, + options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD)); + }; + } + + table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"koko"}, options); + + if (exclude_file_numbers_) { + // Simulate something like old behavior without file numbers in properties. + // This is a "control" side of the test that also ensures safely degraded + // behavior on old files. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey", + [&](void* arg) { + TableProperties* props = reinterpret_cast(arg); + props->orig_file_number = 0; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + } + + std::function perform_gets = [&key_count, &expected_stat, this]() { + if (exclude_file_numbers_) { + // No cache key reuse should happen, because we can't rely on current + // file number being stable + expected_stat += key_count; + } else { + // Cache keys should be stable + expected_stat = key_count; + } + for (int i = 0; i < key_count; ++i) { + ASSERT_EQ(Get(1, Key(i)), "abc"); + } + }; + + // Ordinary SST files with same session id + const std::string something_compressible(500U, 'x'); + for (int i = 0; i < 2; ++i) { + ASSERT_OK(Put(1, Key(key_count), "abc")); + ASSERT_OK(Put(1, Key(key_count) + "a", something_compressible)); + ASSERT_OK(Flush(1)); + ++key_count; + } + +#ifndef ROCKSDB_LITE + // Save an export of those ordinary SST files for later + std::string export_files_dir = dbname_ + "/exported"; + ExportImportFilesMetaData* metadata_ptr_ = nullptr; + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir, + &metadata_ptr_)); + ASSERT_NE(metadata_ptr_, nullptr); + delete checkpoint; + checkpoint = nullptr; + + // External SST files with same session id + SstFileWriter sst_file_writer(EnvOptions(), options); + std::vector external; + for (int i = 0; i < 2; ++i) { + std::string f = dbname_ + "/external" + std::to_string(i) + ".sst"; + external.push_back(f); + ASSERT_OK(sst_file_writer.Open(f)); + ASSERT_OK(sst_file_writer.Put(Key(key_count), "abc")); + ASSERT_OK( + sst_file_writer.Put(Key(key_count) + "a", something_compressible)); + ++key_count; + ExternalSstFileInfo external_info; + ASSERT_OK(sst_file_writer.Finish(&external_info)); + IngestExternalFileOptions ingest_opts; + ASSERT_OK(db_->IngestExternalFile(handles_[1], {f}, ingest_opts)); + } + + if (exclude_file_numbers_) { + // FIXME(peterd): figure out where these extra ADDs are coming from + options.statistics->recordTick(BLOCK_CACHE_COMPRESSED_ADD, + uint64_t{0} - uint64_t{2}); + } +#endif + + perform_gets(); + verify_stats(); + + // Make sure we can cache hit after re-open + ReopenWithColumnFamilies({"default", "koko"}, options); + + perform_gets(); + verify_stats(); + + // Make sure we can cache hit even on a full copy of the DB. Using + // StableCacheKeyTestFS, Checkpoint will resort to full copy not hard link. + // (Checkpoint not available in LITE mode to test this.) +#ifndef ROCKSDB_LITE + auto db_copy_name = dbname_ + "-copy"; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(db_copy_name)); + delete checkpoint; + + Close(); + Destroy(options); + + // Switch to the DB copy + SaveAndRestore save_dbname(&dbname_, db_copy_name); + ReopenWithColumnFamilies({"default", "koko"}, options); + + perform_gets(); + verify_stats(); + + // And ensure that re-importing + ingesting the same files into a + // different DB uses same cache keys + DestroyAndReopen(options); + + ColumnFamilyHandle* cfh = nullptr; + ASSERT_OK(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + *metadata_ptr_, &cfh)); + ASSERT_NE(cfh, nullptr); + delete cfh; + cfh = nullptr; + delete metadata_ptr_; + metadata_ptr_ = nullptr; + + ASSERT_OK(DestroyDB(export_files_dir, options)); + + ReopenWithColumnFamilies({"default", "yoyo"}, options); + + IngestExternalFileOptions ingest_opts; + ASSERT_OK(db_->IngestExternalFile(handles_[1], {external}, ingest_opts)); + + perform_gets(); + verify_stats(); +#endif // !ROCKSDB_LITE + + Close(); + Destroy(options); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +class CacheKeyTest : public testing::Test { + public: + CacheKey GetBaseCacheKey() { + CacheKey rv = GetOffsetableCacheKey(0, /*min file_number*/ 1).WithOffset(0); + // Correct for file_number_ == 1 + *reinterpret_cast(&rv) ^= ReverseBits(uint64_t{1}); + return rv; + } + CacheKey GetCacheKey(uint64_t session_counter, uint64_t file_number, + uint64_t offset) { + OffsetableCacheKey offsetable = + GetOffsetableCacheKey(session_counter, file_number); + // * 4 to counteract optimization that strips lower 2 bits in encoding + // the offset in BlockBasedTable::GetCacheKey (which we prefer to include + // in unit tests to maximize functional coverage). + EXPECT_GE(offset * 4, offset); // no overflow + return BlockBasedTable::GetCacheKey(offsetable, + BlockHandle(offset * 4, /*size*/ 5)); + } + + protected: + OffsetableCacheKey GetOffsetableCacheKey(uint64_t session_counter, + uint64_t file_number) { + // Like SemiStructuredUniqueIdGen::GenerateNext + tp_.db_session_id = EncodeSessionId(base_session_upper_, + base_session_lower_ ^ session_counter); + tp_.db_id = std::to_string(db_id_); + tp_.orig_file_number = file_number; + bool is_stable; + std::string cur_session_id = ""; // ignored + uint64_t cur_file_number = 42; // ignored + OffsetableCacheKey rv; + BlockBasedTable::SetupBaseCacheKey(&tp_, cur_session_id, cur_file_number, + &rv, &is_stable); + EXPECT_TRUE(is_stable); + EXPECT_TRUE(!rv.IsEmpty()); + // BEGIN some assertions in relation to SST unique IDs + std::string external_unique_id_str; + EXPECT_OK(GetUniqueIdFromTableProperties(tp_, &external_unique_id_str)); + UniqueId64x2 sst_unique_id = {}; + EXPECT_OK(DecodeUniqueIdBytes(external_unique_id_str, &sst_unique_id)); + ExternalUniqueIdToInternal(&sst_unique_id); + OffsetableCacheKey ock = + OffsetableCacheKey::FromInternalUniqueId(&sst_unique_id); + EXPECT_EQ(rv.WithOffset(0).AsSlice(), ock.WithOffset(0).AsSlice()); + EXPECT_EQ(ock.ToInternalUniqueId(), sst_unique_id); + // END some assertions in relation to SST unique IDs + return rv; + } + + TableProperties tp_; + uint64_t base_session_upper_ = 0; + uint64_t base_session_lower_ = 0; + uint64_t db_id_ = 0; +}; + +TEST_F(CacheKeyTest, DBImplSessionIdStructure) { + // We have to generate our own session IDs for simulation purposes in other + // tests. Here we verify that the DBImpl implementation seems to match + // our construction here, by using lowest XORed-in bits for "session + // counter." + std::string session_id1 = DBImpl::GenerateDbSessionId(/*env*/ nullptr); + std::string session_id2 = DBImpl::GenerateDbSessionId(/*env*/ nullptr); + uint64_t upper1, upper2, lower1, lower2; + ASSERT_OK(DecodeSessionId(session_id1, &upper1, &lower1)); + ASSERT_OK(DecodeSessionId(session_id2, &upper2, &lower2)); + // Because generated in same process + ASSERT_EQ(upper1, upper2); + // Unless we generate > 4 billion session IDs in this process... + ASSERT_EQ(Upper32of64(lower1), Upper32of64(lower2)); + // But they must be different somewhere + ASSERT_NE(Lower32of64(lower1), Lower32of64(lower2)); +} + +namespace { +// Deconstruct cache key, based on knowledge of implementation details. +void DeconstructNonemptyCacheKey(const CacheKey& key, uint64_t* file_num_etc64, + uint64_t* offset_etc64) { + *file_num_etc64 = *reinterpret_cast(key.AsSlice().data()); + *offset_etc64 = *reinterpret_cast(key.AsSlice().data() + 8); + assert(*file_num_etc64 != 0); + if (*offset_etc64 == 0) { + std::swap(*file_num_etc64, *offset_etc64); + } + assert(*offset_etc64 != 0); +} + +// Make a bit mask of 0 to 64 bits +uint64_t MakeMask64(int bits) { + if (bits >= 64) { + return uint64_t{0} - 1; + } else { + return (uint64_t{1} << bits) - 1; + } +} + +// See CacheKeyTest::Encodings +struct CacheKeyDecoder { + // Inputs + uint64_t base_file_num_etc64, base_offset_etc64; + int session_counter_bits, file_number_bits, offset_bits; + + // Derived + uint64_t session_counter_mask, file_number_mask, offset_mask; + + // Outputs + uint64_t decoded_session_counter, decoded_file_num, decoded_offset; + + void SetBaseCacheKey(const CacheKey& base) { + DeconstructNonemptyCacheKey(base, &base_file_num_etc64, &base_offset_etc64); + } + + void SetRanges(int _session_counter_bits, int _file_number_bits, + int _offset_bits) { + session_counter_bits = _session_counter_bits; + session_counter_mask = MakeMask64(session_counter_bits); + file_number_bits = _file_number_bits; + file_number_mask = MakeMask64(file_number_bits); + offset_bits = _offset_bits; + offset_mask = MakeMask64(offset_bits); + } + + void Decode(const CacheKey& key) { + uint64_t file_num_etc64, offset_etc64; + DeconstructNonemptyCacheKey(key, &file_num_etc64, &offset_etc64); + + // First decode session counter + if (offset_bits + session_counter_bits <= 64) { + // fully recoverable from offset_etc64 + decoded_session_counter = + ReverseBits((offset_etc64 ^ base_offset_etc64)) & + session_counter_mask; + } else if (file_number_bits + session_counter_bits <= 64) { + // fully recoverable from file_num_etc64 + decoded_session_counter = DownwardInvolution( + (file_num_etc64 ^ base_file_num_etc64) & session_counter_mask); + } else { + // Need to combine parts from each word. + // Piece1 will contain some correct prefix of the bottom bits of + // session counter. + uint64_t piece1 = + ReverseBits((offset_etc64 ^ base_offset_etc64) & ~offset_mask); + int piece1_bits = 64 - offset_bits; + // Piece2 will contain involuded bits that we can combine with piece1 + // to infer rest of session counter + int piece2_bits = std::min(64 - file_number_bits, 64 - piece1_bits); + ASSERT_LT(piece2_bits, 64); + uint64_t piece2_mask = MakeMask64(piece2_bits); + uint64_t piece2 = (file_num_etc64 ^ base_file_num_etc64) & piece2_mask; + + // Cancel out the part of piece2 that we can infer from piece1 + // (DownwardInvolution distributes over xor) + piece2 ^= DownwardInvolution(piece1) & piece2_mask; + + // Now we need to solve for the unknown original bits in higher + // positions than piece1 provides. We use Gaussian elimination + // because we know that a piece2_bits X piece2_bits submatrix of + // the matrix underlying DownwardInvolution times the vector of + // unknown original bits equals piece2. + // + // Build an augmented row matrix for that submatrix, built column by + // column. + std::array aug_rows{}; + for (int i = 0; i < piece2_bits; ++i) { // over columns + uint64_t col_i = DownwardInvolution(uint64_t{1} << piece1_bits << i); + ASSERT_NE(col_i & 1U, 0); + for (int j = 0; j < piece2_bits; ++j) { // over rows + aug_rows[j] |= (col_i & 1U) << i; + col_i >>= 1; + } + } + // Augment with right hand side + for (int j = 0; j < piece2_bits; ++j) { // over rows + aug_rows[j] |= (piece2 & 1U) << piece2_bits; + piece2 >>= 1; + } + // Run Gaussian elimination + for (int i = 0; i < piece2_bits; ++i) { // over columns + // Find a row that can be used to cancel others + uint64_t canceller = 0; + // Note: Rows 0 through i-1 contain 1s in columns already eliminated + for (int j = i; j < piece2_bits; ++j) { // over rows + if (aug_rows[j] & (uint64_t{1} << i)) { + // Swap into appropriate row + std::swap(aug_rows[i], aug_rows[j]); + // Keep a handy copy for row reductions + canceller = aug_rows[i]; + break; + } + } + ASSERT_NE(canceller, 0); + for (int j = 0; j < piece2_bits; ++j) { // over rows + if (i != j && ((aug_rows[j] >> i) & 1) != 0) { + // Row reduction + aug_rows[j] ^= canceller; + } + } + } + // Extract result + decoded_session_counter = piece1; + for (int j = 0; j < piece2_bits; ++j) { // over rows + ASSERT_EQ(aug_rows[j] & piece2_mask, uint64_t{1} << j); + decoded_session_counter |= aug_rows[j] >> piece2_bits << piece1_bits + << j; + } + } + + decoded_offset = + offset_etc64 ^ base_offset_etc64 ^ ReverseBits(decoded_session_counter); + + decoded_file_num = ReverseBits(file_num_etc64 ^ base_file_num_etc64 ^ + DownwardInvolution(decoded_session_counter)); + } +}; +} // anonymous namespace + +TEST_F(CacheKeyTest, Encodings) { + // This test primarily verifies this claim from cache_key.cc: + // // In fact, if DB ids were not involved, we would be guaranteed unique + // // cache keys for files generated in a single process until total bits for + // // biggest session_id_counter, orig_file_number, and offset_in_file + // // reach 128 bits. + // + // To demonstrate this, CacheKeyDecoder can reconstruct the structured inputs + // to the cache key when provided an output cache key, the unstructured + // inputs, and bounds on the structured inputs. + // + // See OffsetableCacheKey comments in cache_key.cc. + + // We are going to randomly initialize some values that *should* not affect + // result + Random64 r{std::random_device{}()}; + + CacheKeyDecoder decoder; + db_id_ = r.Next(); + base_session_upper_ = r.Next(); + base_session_lower_ = r.Next(); + if (base_session_lower_ == 0) { + base_session_lower_ = 1; + } + + decoder.SetBaseCacheKey(GetBaseCacheKey()); + + // Loop over configurations and test those + for (int session_counter_bits = 0; session_counter_bits <= 64; + ++session_counter_bits) { + for (int file_number_bits = 1; file_number_bits <= 64; ++file_number_bits) { + // 62 bits max because unoptimized offset will be 64 bits in that case + for (int offset_bits = 0; offset_bits <= 62; ++offset_bits) { + if (session_counter_bits + file_number_bits + offset_bits > 128) { + break; + } + + decoder.SetRanges(session_counter_bits, file_number_bits, offset_bits); + + uint64_t session_counter = r.Next() & decoder.session_counter_mask; + uint64_t file_number = r.Next() & decoder.file_number_mask; + if (file_number == 0) { + // Minimum + file_number = 1; + } + uint64_t offset = r.Next() & decoder.offset_mask; + decoder.Decode(GetCacheKey(session_counter, file_number, offset)); + + EXPECT_EQ(decoder.decoded_session_counter, session_counter); + EXPECT_EQ(decoder.decoded_file_num, file_number); + EXPECT_EQ(decoder.decoded_offset, offset); + } + } + } +} + +INSTANTIATE_TEST_CASE_P(DBBlockCacheKeyTest, DBBlockCacheKeyTest, + ::testing::Combine(::testing::Bool(), + ::testing::Bool())); + +class DBBlockCachePinningTest + : public DBTestBase, + public testing::WithParamInterface< + std::tuple> { + public: + DBBlockCachePinningTest() + : DBTestBase("db_block_cache_test", /*env_do_fsync=*/false) {} + + void SetUp() override { + partition_index_and_filters_ = std::get<0>(GetParam()); + top_level_index_pinning_ = std::get<1>(GetParam()); + partition_pinning_ = std::get<2>(GetParam()); + unpartitioned_pinning_ = std::get<3>(GetParam()); + } + + bool partition_index_and_filters_; + PinningTier top_level_index_pinning_; + PinningTier partition_pinning_; + PinningTier unpartitioned_pinning_; +}; + +TEST_P(DBBlockCachePinningTest, TwoLevelDB) { + // Creates one file in L0 and one file in L1. Both files have enough data that + // their index and filter blocks are partitioned. The L1 file will also have + // a compression dictionary (those are trained only during compaction), which + // must be unpartitioned. + const int kKeySize = 32; + const int kBlockSize = 128; + const int kNumBlocksPerFile = 128; + const int kNumKeysPerFile = kBlockSize * kNumBlocksPerFile / kKeySize; + + Options options = CurrentOptions(); + // `kNoCompression` makes the unit test more portable. But it relies on the + // current behavior of persisting/accessing dictionary even when there's no + // (de)compression happening, which seems fairly likely to change over time. + options.compression = kNoCompression; + options.compression_opts.max_dict_bytes = 4 << 10; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.block_cache = NewLRUCache(1 << 20 /* capacity */); + table_options.block_size = kBlockSize; + table_options.metadata_block_size = kBlockSize; + table_options.cache_index_and_filter_blocks = true; + table_options.metadata_cache_options.top_level_index_pinning = + top_level_index_pinning_; + table_options.metadata_cache_options.partition_pinning = partition_pinning_; + table_options.metadata_cache_options.unpartitioned_pinning = + unpartitioned_pinning_; + table_options.filter_policy.reset( + NewBloomFilterPolicy(10 /* bits_per_key */)); + if (partition_index_and_filters_) { + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.partition_filters = true; + } + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + Reopen(options); + + Random rnd(301); + for (int i = 0; i < 2; ++i) { + for (int j = 0; j < kNumKeysPerFile; ++j) { + ASSERT_OK(Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kKeySize))); + } + ASSERT_OK(Flush()); + if (i == 0) { + // Prevent trivial move so file will be rewritten with dictionary and + // reopened with L1's pinning settings. + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } + } + + // Clear all unpinned blocks so unpinned blocks will show up as cache misses + // when reading a key from a file. + table_options.block_cache->EraseUnRefEntries(); + + // Get base cache values + uint64_t filter_misses = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS); + uint64_t index_misses = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); + uint64_t compression_dict_misses = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + + // Read a key from the L0 file + Get(Key(kNumKeysPerFile)); + uint64_t expected_filter_misses = filter_misses; + uint64_t expected_index_misses = index_misses; + uint64_t expected_compression_dict_misses = compression_dict_misses; + if (partition_index_and_filters_) { + if (top_level_index_pinning_ == PinningTier::kNone) { + ++expected_filter_misses; + ++expected_index_misses; + } + if (partition_pinning_ == PinningTier::kNone) { + ++expected_filter_misses; + ++expected_index_misses; + } + } else { + if (unpartitioned_pinning_ == PinningTier::kNone) { + ++expected_filter_misses; + ++expected_index_misses; + } + } + if (unpartitioned_pinning_ == PinningTier::kNone) { + ++expected_compression_dict_misses; + } + ASSERT_EQ(expected_filter_misses, + TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(expected_index_misses, + TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(expected_compression_dict_misses, + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); + + // Clear all unpinned blocks so unpinned blocks will show up as cache misses + // when reading a key from a file. + table_options.block_cache->EraseUnRefEntries(); + + // Read a key from the L1 file + Get(Key(0)); + if (partition_index_and_filters_) { + if (top_level_index_pinning_ == PinningTier::kNone || + top_level_index_pinning_ == PinningTier::kFlushedAndSimilar) { + ++expected_filter_misses; + ++expected_index_misses; + } + if (partition_pinning_ == PinningTier::kNone || + partition_pinning_ == PinningTier::kFlushedAndSimilar) { + ++expected_filter_misses; + ++expected_index_misses; + } + } else { + if (unpartitioned_pinning_ == PinningTier::kNone || + unpartitioned_pinning_ == PinningTier::kFlushedAndSimilar) { + ++expected_filter_misses; + ++expected_index_misses; + } + } + if (unpartitioned_pinning_ == PinningTier::kNone || + unpartitioned_pinning_ == PinningTier::kFlushedAndSimilar) { + ++expected_compression_dict_misses; + } + ASSERT_EQ(expected_filter_misses, + TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(expected_index_misses, + TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(expected_compression_dict_misses, + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); +} + +INSTANTIATE_TEST_CASE_P( + DBBlockCachePinningTest, DBBlockCachePinningTest, + ::testing::Combine( + ::testing::Bool(), + ::testing::Values(PinningTier::kNone, PinningTier::kFlushedAndSimilar, + PinningTier::kAll), + ::testing::Values(PinningTier::kNone, PinningTier::kFlushedAndSimilar, + PinningTier::kAll), + ::testing::Values(PinningTier::kNone, PinningTier::kFlushedAndSimilar, + PinningTier::kAll))); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3