diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/cache/cache_bench.cc | 284 | ||||
-rw-r--r-- | src/rocksdb/cache/cache_test.cc | 703 | ||||
-rw-r--r-- | src/rocksdb/cache/clock_cache.cc | 728 | ||||
-rw-r--r-- | src/rocksdb/cache/clock_cache.h | 16 | ||||
-rw-r--r-- | src/rocksdb/cache/lru_cache.cc | 572 | ||||
-rw-r--r-- | src/rocksdb/cache/lru_cache.h | 316 | ||||
-rw-r--r-- | src/rocksdb/cache/lru_cache_test.cc | 197 | ||||
-rw-r--r-- | src/rocksdb/cache/sharded_cache.cc | 166 | ||||
-rw-r--r-- | src/rocksdb/cache/sharded_cache.h | 103 |
9 files changed, 3085 insertions, 0 deletions
diff --git a/src/rocksdb/cache/cache_bench.cc b/src/rocksdb/cache/cache_bench.cc new file mode 100644 index 00000000..098813d9 --- /dev/null +++ b/src/rocksdb/cache/cache_bench.cc @@ -0,0 +1,284 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif +#ifndef GFLAGS +#include <cstdio> +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else + +#include <inttypes.h> +#include <sys/types.h> +#include <stdio.h> + +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "util/gflags_compat.h" +#include "util/mutexlock.h" +#include "util/random.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; + +static const uint32_t KB = 1024; + +DEFINE_int32(threads, 16, "Number of concurrent threads to run."); +DEFINE_int64(cache_size, 8 * KB * KB, + "Number of bytes to use as a cache of uncompressed data."); +DEFINE_int32(num_shard_bits, 4, "shard_bits."); + +DEFINE_int64(max_key, 1 * KB * KB * KB, "Max number of key to place in cache"); +DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread."); + +DEFINE_bool(populate_cache, false, "Populate cache before operations"); +DEFINE_int32(insert_percent, 40, + "Ratio of insert to total workload (expressed as a percentage)"); +DEFINE_int32(lookup_percent, 50, + "Ratio of lookup to total workload (expressed as a percentage)"); +DEFINE_int32(erase_percent, 10, + "Ratio of erase to total workload (expressed as a percentage)"); + +DEFINE_bool(use_clock_cache, false, ""); + +namespace rocksdb { + +class CacheBench; +namespace { +void deleter(const Slice& /*key*/, void* value) { + delete reinterpret_cast<char *>(value); +} + +// State shared by all concurrent executions of the same benchmark. +class SharedState { + public: + explicit SharedState(CacheBench* cache_bench) + : cv_(&mu_), + num_threads_(FLAGS_threads), + num_initialized_(0), + start_(false), + num_done_(0), + cache_bench_(cache_bench) { + } + + ~SharedState() {} + + port::Mutex* GetMutex() { + return &mu_; + } + + port::CondVar* GetCondVar() { + return &cv_; + } + + CacheBench* GetCacheBench() const { + return cache_bench_; + } + + void IncInitialized() { + num_initialized_++; + } + + void IncDone() { + num_done_++; + } + + bool AllInitialized() const { + return num_initialized_ >= num_threads_; + } + + bool AllDone() const { + return num_done_ >= num_threads_; + } + + void SetStart() { + start_ = true; + } + + bool Started() const { + return start_; + } + + private: + port::Mutex mu_; + port::CondVar cv_; + + const uint64_t num_threads_; + uint64_t num_initialized_; + bool start_; + uint64_t num_done_; + + CacheBench* cache_bench_; +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + uint32_t tid; + Random rnd; + SharedState* shared; + + ThreadState(uint32_t index, SharedState* _shared) + : tid(index), rnd(1000 + index), shared(_shared) {} +}; +} // namespace + +class CacheBench { + public: + CacheBench() : num_threads_(FLAGS_threads) { + if (FLAGS_use_clock_cache) { + cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); + if (!cache_) { + fprintf(stderr, "Clock cache not supported.\n"); + exit(1); + } + } else { + cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); + } + } + + ~CacheBench() {} + + void PopulateCache() { + Random rnd(1); + for (int64_t i = 0; i < FLAGS_cache_size; i++) { + uint64_t rand_key = rnd.Next() % FLAGS_max_key; + // Cast uint64* to be char*, data would be copied to cache + Slice key(reinterpret_cast<char*>(&rand_key), 8); + // do insert + cache_->Insert(key, new char[10], 1, &deleter); + } + } + + bool Run() { + rocksdb::Env* env = rocksdb::Env::Default(); + + PrintEnv(); + SharedState shared(this); + std::vector<ThreadState*> threads(num_threads_); + for (uint32_t i = 0; i < num_threads_; i++) { + threads[i] = new ThreadState(i, &shared); + env->StartThread(ThreadBody, threads[i]); + } + { + MutexLock l(shared.GetMutex()); + while (!shared.AllInitialized()) { + shared.GetCondVar()->Wait(); + } + // Record start time + uint64_t start_time = env->NowMicros(); + + // Start all threads + shared.SetStart(); + shared.GetCondVar()->SignalAll(); + + // Wait threads to complete + while (!shared.AllDone()) { + shared.GetCondVar()->Wait(); + } + + // Record end time + uint64_t end_time = env->NowMicros(); + double elapsed = static_cast<double>(end_time - start_time) * 1e-6; + uint32_t qps = static_cast<uint32_t>( + static_cast<double>(FLAGS_threads * FLAGS_ops_per_thread) / elapsed); + fprintf(stdout, "Complete in %.3f s; QPS = %u\n", elapsed, qps); + } + return true; + } + + private: + std::shared_ptr<Cache> cache_; + uint32_t num_threads_; + + static void ThreadBody(void* v) { + ThreadState* thread = reinterpret_cast<ThreadState*>(v); + SharedState* shared = thread->shared; + + { + MutexLock l(shared->GetMutex()); + shared->IncInitialized(); + if (shared->AllInitialized()) { + shared->GetCondVar()->SignalAll(); + } + while (!shared->Started()) { + shared->GetCondVar()->Wait(); + } + } + thread->shared->GetCacheBench()->OperateCache(thread); + + { + MutexLock l(shared->GetMutex()); + shared->IncDone(); + if (shared->AllDone()) { + shared->GetCondVar()->SignalAll(); + } + } + } + + void OperateCache(ThreadState* thread) { + for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { + uint64_t rand_key = thread->rnd.Next() % FLAGS_max_key; + // Cast uint64* to be char*, data would be copied to cache + Slice key(reinterpret_cast<char*>(&rand_key), 8); + int32_t prob_op = thread->rnd.Uniform(100); + if (prob_op >= 0 && prob_op < FLAGS_insert_percent) { + // do insert + cache_->Insert(key, new char[10], 1, &deleter); + } else if (prob_op -= FLAGS_insert_percent && + prob_op < FLAGS_lookup_percent) { + // do lookup + auto handle = cache_->Lookup(key); + if (handle) { + cache_->Release(handle); + } + } else if (prob_op -= FLAGS_lookup_percent && + prob_op < FLAGS_erase_percent) { + // do erase + cache_->Erase(key); + } + } + } + + void PrintEnv() const { + printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); + printf("Number of threads : %d\n", FLAGS_threads); + printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); + printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); + printf("Num shard bits : %d\n", FLAGS_num_shard_bits); + printf("Max key : %" PRIu64 "\n", FLAGS_max_key); + printf("Populate cache : %d\n", FLAGS_populate_cache); + printf("Insert percentage : %d%%\n", FLAGS_insert_percent); + printf("Lookup percentage : %d%%\n", FLAGS_lookup_percent); + printf("Erase percentage : %d%%\n", FLAGS_erase_percent); + printf("----------------------------\n"); + } +}; +} // namespace rocksdb + +int main(int argc, char** argv) { + ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_threads <= 0) { + fprintf(stderr, "threads number <= 0\n"); + exit(1); + } + + rocksdb::CacheBench bench; + if (FLAGS_populate_cache) { + bench.PopulateCache(); + } + if (bench.Run()) { + return 0; + } else { + return 1; + } +} + +#endif // GFLAGS diff --git a/src/rocksdb/cache/cache_test.cc b/src/rocksdb/cache/cache_test.cc new file mode 100644 index 00000000..f9f77234 --- /dev/null +++ b/src/rocksdb/cache/cache_test.cc @@ -0,0 +1,703 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "rocksdb/cache.h" + +#include <forward_list> +#include <functional> +#include <iostream> +#include <string> +#include <vector> +#include "cache/clock_cache.h" +#include "cache/lru_cache.h" +#include "util/coding.h" +#include "util/string_util.h" +#include "util/testharness.h" + +namespace rocksdb { + +// Conversions between numeric keys/values and the types expected by Cache. +static std::string EncodeKey(int k) { + std::string result; + PutFixed32(&result, k); + return result; +} +static int DecodeKey(const Slice& k) { + assert(k.size() == 4); + return DecodeFixed32(k.data()); +} +static void* EncodeValue(uintptr_t v) { return reinterpret_cast<void*>(v); } +static int DecodeValue(void* v) { + return static_cast<int>(reinterpret_cast<uintptr_t>(v)); +} + +const std::string kLRU = "lru"; +const std::string kClock = "clock"; + +void dumbDeleter(const Slice& /*key*/, void* /*value*/) {} + +void eraseDeleter(const Slice& /*key*/, void* value) { + Cache* cache = reinterpret_cast<Cache*>(value); + cache->Erase("foo"); +} + +class CacheTest : public testing::TestWithParam<std::string> { + public: + static CacheTest* current_; + + static void Deleter(const Slice& key, void* v) { + current_->deleted_keys_.push_back(DecodeKey(key)); + current_->deleted_values_.push_back(DecodeValue(v)); + } + + static const int kCacheSize = 1000; + static const int kNumShardBits = 4; + + static const int kCacheSize2 = 100; + static const int kNumShardBits2 = 2; + + std::vector<int> deleted_keys_; + std::vector<int> deleted_values_; + std::shared_ptr<Cache> cache_; + std::shared_ptr<Cache> cache2_; + + CacheTest() + : cache_(NewCache(kCacheSize, kNumShardBits, false)), + cache2_(NewCache(kCacheSize2, kNumShardBits2, false)) { + current_ = this; + } + + ~CacheTest() override {} + + std::shared_ptr<Cache> NewCache(size_t capacity) { + auto type = GetParam(); + if (type == kLRU) { + return NewLRUCache(capacity); + } + if (type == kClock) { + return NewClockCache(capacity); + } + return nullptr; + } + + std::shared_ptr<Cache> NewCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { + auto type = GetParam(); + if (type == kLRU) { + return NewLRUCache(capacity, num_shard_bits, strict_capacity_limit); + } + if (type == kClock) { + return NewClockCache(capacity, num_shard_bits, strict_capacity_limit); + } + return nullptr; + } + + int Lookup(std::shared_ptr<Cache> cache, int key) { + Cache::Handle* handle = cache->Lookup(EncodeKey(key)); + const int r = (handle == nullptr) ? -1 : DecodeValue(cache->Value(handle)); + if (handle != nullptr) { + cache->Release(handle); + } + return r; + } + + void Insert(std::shared_ptr<Cache> cache, int key, int value, + int charge = 1) { + cache->Insert(EncodeKey(key), EncodeValue(value), charge, + &CacheTest::Deleter); + } + + void Erase(std::shared_ptr<Cache> cache, int key) { + cache->Erase(EncodeKey(key)); + } + + int Lookup(int key) { + return Lookup(cache_, key); + } + + void Insert(int key, int value, int charge = 1) { + Insert(cache_, key, value, charge); + } + + void Erase(int key) { + Erase(cache_, key); + } + + int Lookup2(int key) { + return Lookup(cache2_, key); + } + + void Insert2(int key, int value, int charge = 1) { + Insert(cache2_, key, value, charge); + } + + void Erase2(int key) { + Erase(cache2_, key); + } +}; +CacheTest* CacheTest::current_; + +TEST_P(CacheTest, UsageTest) { + // cache is std::shared_ptr and will be automatically cleaned up. + const uint64_t kCapacity = 100000; + auto cache = NewCache(kCapacity, 8, false); + + size_t usage = 0; + char value[10] = "abcdef"; + // make sure everything will be cached + for (int i = 1; i < 100; ++i) { + std::string key(i, 'a'); + auto kv_size = key.size() + 5; + cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter); + usage += kv_size; + ASSERT_EQ(usage, cache->GetUsage()); + } + + // make sure the cache will be overloaded + for (uint64_t i = 1; i < kCapacity; ++i) { + auto key = ToString(i); + cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5, + dumbDeleter); + } + + // the usage should be close to the capacity + ASSERT_GT(kCapacity, cache->GetUsage()); + ASSERT_LT(kCapacity * 0.95, cache->GetUsage()); +} + +TEST_P(CacheTest, PinnedUsageTest) { + // cache is std::shared_ptr and will be automatically cleaned up. + const uint64_t kCapacity = 100000; + auto cache = NewCache(kCapacity, 8, false); + + size_t pinned_usage = 0; + char value[10] = "abcdef"; + + std::forward_list<Cache::Handle*> unreleased_handles; + + // Add entries. Unpin some of them after insertion. Then, pin some of them + // again. Check GetPinnedUsage(). + for (int i = 1; i < 100; ++i) { + std::string key(i, 'a'); + auto kv_size = key.size() + 5; + Cache::Handle* handle; + cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter, + &handle); + pinned_usage += kv_size; + ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); + if (i % 2 == 0) { + cache->Release(handle); + pinned_usage -= kv_size; + ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); + } else { + unreleased_handles.push_front(handle); + } + if (i % 3 == 0) { + unreleased_handles.push_front(cache->Lookup(key)); + // If i % 2 == 0, then the entry was unpinned before Lookup, so pinned + // usage increased + if (i % 2 == 0) { + pinned_usage += kv_size; + } + ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); + } + } + + // check that overloading the cache does not change the pinned usage + for (uint64_t i = 1; i < 2 * kCapacity; ++i) { + auto key = ToString(i); + cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5, + dumbDeleter); + } + ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); + + // release handles for pinned entries to prevent memory leaks + for (auto handle : unreleased_handles) { + cache->Release(handle); + } +} + +TEST_P(CacheTest, HitAndMiss) { + ASSERT_EQ(-1, Lookup(100)); + + Insert(100, 101); + ASSERT_EQ(101, Lookup(100)); + ASSERT_EQ(-1, Lookup(200)); + ASSERT_EQ(-1, Lookup(300)); + + Insert(200, 201); + ASSERT_EQ(101, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(-1, Lookup(300)); + + Insert(100, 102); + ASSERT_EQ(102, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(-1, Lookup(300)); + + ASSERT_EQ(1U, deleted_keys_.size()); + ASSERT_EQ(100, deleted_keys_[0]); + ASSERT_EQ(101, deleted_values_[0]); +} + +TEST_P(CacheTest, InsertSameKey) { + Insert(1, 1); + Insert(1, 2); + ASSERT_EQ(2, Lookup(1)); +} + +TEST_P(CacheTest, Erase) { + Erase(200); + ASSERT_EQ(0U, deleted_keys_.size()); + + Insert(100, 101); + Insert(200, 201); + Erase(100); + ASSERT_EQ(-1, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(1U, deleted_keys_.size()); + ASSERT_EQ(100, deleted_keys_[0]); + ASSERT_EQ(101, deleted_values_[0]); + + Erase(100); + ASSERT_EQ(-1, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(1U, deleted_keys_.size()); +} + +TEST_P(CacheTest, EntriesArePinned) { + Insert(100, 101); + Cache::Handle* h1 = cache_->Lookup(EncodeKey(100)); + ASSERT_EQ(101, DecodeValue(cache_->Value(h1))); + ASSERT_EQ(1U, cache_->GetUsage()); + + Insert(100, 102); + Cache::Handle* h2 = cache_->Lookup(EncodeKey(100)); + ASSERT_EQ(102, DecodeValue(cache_->Value(h2))); + ASSERT_EQ(0U, deleted_keys_.size()); + ASSERT_EQ(2U, cache_->GetUsage()); + + cache_->Release(h1); + ASSERT_EQ(1U, deleted_keys_.size()); + ASSERT_EQ(100, deleted_keys_[0]); + ASSERT_EQ(101, deleted_values_[0]); + ASSERT_EQ(1U, cache_->GetUsage()); + + Erase(100); + ASSERT_EQ(-1, Lookup(100)); + ASSERT_EQ(1U, deleted_keys_.size()); + ASSERT_EQ(1U, cache_->GetUsage()); + + cache_->Release(h2); + ASSERT_EQ(2U, deleted_keys_.size()); + ASSERT_EQ(100, deleted_keys_[1]); + ASSERT_EQ(102, deleted_values_[1]); + ASSERT_EQ(0U, cache_->GetUsage()); +} + +TEST_P(CacheTest, EvictionPolicy) { + Insert(100, 101); + Insert(200, 201); + + // Frequently used entry must be kept around + for (int i = 0; i < kCacheSize + 200; i++) { + Insert(1000+i, 2000+i); + ASSERT_EQ(101, Lookup(100)); + } + ASSERT_EQ(101, Lookup(100)); + ASSERT_EQ(-1, Lookup(200)); +} + +TEST_P(CacheTest, ExternalRefPinsEntries) { + Insert(100, 101); + Cache::Handle* h = cache_->Lookup(EncodeKey(100)); + ASSERT_TRUE(cache_->Ref(h)); + ASSERT_EQ(101, DecodeValue(cache_->Value(h))); + ASSERT_EQ(1U, cache_->GetUsage()); + + for (int i = 0; i < 3; ++i) { + if (i > 0) { + // First release (i == 1) corresponds to Ref(), second release (i == 2) + // corresponds to Lookup(). Then, since all external refs are released, + // the below insertions should push out the cache entry. + cache_->Release(h); + } + // double cache size because the usage bit in block cache prevents 100 from + // being evicted in the first kCacheSize iterations + for (int j = 0; j < 2 * kCacheSize + 100; j++) { + Insert(1000 + j, 2000 + j); + } + if (i < 2) { + ASSERT_EQ(101, Lookup(100)); + } + } + ASSERT_EQ(-1, Lookup(100)); +} + +TEST_P(CacheTest, EvictionPolicyRef) { + Insert(100, 101); + Insert(101, 102); + Insert(102, 103); + Insert(103, 104); + Insert(200, 101); + Insert(201, 102); + Insert(202, 103); + Insert(203, 104); + Cache::Handle* h201 = cache_->Lookup(EncodeKey(200)); + Cache::Handle* h202 = cache_->Lookup(EncodeKey(201)); + Cache::Handle* h203 = cache_->Lookup(EncodeKey(202)); + Cache::Handle* h204 = cache_->Lookup(EncodeKey(203)); + Insert(300, 101); + Insert(301, 102); + Insert(302, 103); + Insert(303, 104); + + // Insert entries much more than Cache capacity + for (int i = 0; i < kCacheSize + 200; i++) { + Insert(1000 + i, 2000 + i); + } + + // Check whether the entries inserted in the beginning + // are evicted. Ones without extra ref are evicted and + // those with are not. + ASSERT_EQ(-1, Lookup(100)); + ASSERT_EQ(-1, Lookup(101)); + ASSERT_EQ(-1, Lookup(102)); + ASSERT_EQ(-1, Lookup(103)); + + ASSERT_EQ(-1, Lookup(300)); + ASSERT_EQ(-1, Lookup(301)); + ASSERT_EQ(-1, Lookup(302)); + ASSERT_EQ(-1, Lookup(303)); + + ASSERT_EQ(101, Lookup(200)); + ASSERT_EQ(102, Lookup(201)); + ASSERT_EQ(103, Lookup(202)); + ASSERT_EQ(104, Lookup(203)); + + // Cleaning up all the handles + cache_->Release(h201); + cache_->Release(h202); + cache_->Release(h203); + cache_->Release(h204); +} + +TEST_P(CacheTest, EvictEmptyCache) { + // Insert item large than capacity to trigger eviction on empty cache. + auto cache = NewCache(1, 0, false); + ASSERT_OK(cache->Insert("foo", nullptr, 10, dumbDeleter)); +} + +TEST_P(CacheTest, EraseFromDeleter) { + // Have deleter which will erase item from cache, which will re-enter + // the cache at that point. + std::shared_ptr<Cache> cache = NewCache(10, 0, false); + ASSERT_OK(cache->Insert("foo", nullptr, 1, dumbDeleter)); + ASSERT_OK(cache->Insert("bar", cache.get(), 1, eraseDeleter)); + cache->Erase("bar"); + ASSERT_EQ(nullptr, cache->Lookup("foo")); + ASSERT_EQ(nullptr, cache->Lookup("bar")); +} + +TEST_P(CacheTest, ErasedHandleState) { + // insert a key and get two handles + Insert(100, 1000); + Cache::Handle* h1 = cache_->Lookup(EncodeKey(100)); + Cache::Handle* h2 = cache_->Lookup(EncodeKey(100)); + ASSERT_EQ(h1, h2); + ASSERT_EQ(DecodeValue(cache_->Value(h1)), 1000); + ASSERT_EQ(DecodeValue(cache_->Value(h2)), 1000); + + // delete the key from the cache + Erase(100); + // can no longer find in the cache + ASSERT_EQ(-1, Lookup(100)); + + // release one handle + cache_->Release(h1); + // still can't find in cache + ASSERT_EQ(-1, Lookup(100)); + + cache_->Release(h2); +} + +TEST_P(CacheTest, HeavyEntries) { + // Add a bunch of light and heavy entries and then count the combined + // size of items still in the cache, which must be approximately the + // same as the total capacity. + const int kLight = 1; + const int kHeavy = 10; + int added = 0; + int index = 0; + while (added < 2*kCacheSize) { + const int weight = (index & 1) ? kLight : kHeavy; + Insert(index, 1000+index, weight); + added += weight; + index++; + } + + int cached_weight = 0; + for (int i = 0; i < index; i++) { + const int weight = (i & 1 ? kLight : kHeavy); + int r = Lookup(i); + if (r >= 0) { + cached_weight += weight; + ASSERT_EQ(1000+i, r); + } + } + ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10); +} + +TEST_P(CacheTest, NewId) { + uint64_t a = cache_->NewId(); + uint64_t b = cache_->NewId(); + ASSERT_NE(a, b); +} + + +class Value { + public: + explicit Value(size_t v) : v_(v) { } + + size_t v_; +}; + +namespace { +void deleter(const Slice& /*key*/, void* value) { + delete static_cast<Value *>(value); +} +} // namespace + +TEST_P(CacheTest, ReleaseAndErase) { + std::shared_ptr<Cache> cache = NewCache(5, 0, false); + Cache::Handle* handle; + Status s = cache->Insert(EncodeKey(100), EncodeValue(100), 1, + &CacheTest::Deleter, &handle); + ASSERT_TRUE(s.ok()); + ASSERT_EQ(5U, cache->GetCapacity()); + ASSERT_EQ(1U, cache->GetUsage()); + ASSERT_EQ(0U, deleted_keys_.size()); + auto erased = cache->Release(handle, true); + ASSERT_TRUE(erased); + // This tests that deleter has been called + ASSERT_EQ(1U, deleted_keys_.size()); +} + +TEST_P(CacheTest, ReleaseWithoutErase) { + std::shared_ptr<Cache> cache = NewCache(5, 0, false); + Cache::Handle* handle; + Status s = cache->Insert(EncodeKey(100), EncodeValue(100), 1, + &CacheTest::Deleter, &handle); + ASSERT_TRUE(s.ok()); + ASSERT_EQ(5U, cache->GetCapacity()); + ASSERT_EQ(1U, cache->GetUsage()); + ASSERT_EQ(0U, deleted_keys_.size()); + auto erased = cache->Release(handle); + ASSERT_FALSE(erased); + // This tests that deleter is not called. When cache has free capacity it is + // not expected to immediately erase the released items. + ASSERT_EQ(0U, deleted_keys_.size()); +} + +TEST_P(CacheTest, SetCapacity) { + // test1: increase capacity + // lets create a cache with capacity 5, + // then, insert 5 elements, then increase capacity + // to 10, returned capacity should be 10, usage=5 + std::shared_ptr<Cache> cache = NewCache(5, 0, false); + std::vector<Cache::Handle*> handles(10); + // Insert 5 entries, but not releasing. + for (size_t i = 0; i < 5; i++) { + std::string key = ToString(i+1); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); + } + ASSERT_EQ(5U, cache->GetCapacity()); + ASSERT_EQ(5U, cache->GetUsage()); + cache->SetCapacity(10); + ASSERT_EQ(10U, cache->GetCapacity()); + ASSERT_EQ(5U, cache->GetUsage()); + + // test2: decrease capacity + // insert 5 more elements to cache, then release 5, + // then decrease capacity to 7, final capacity should be 7 + // and usage should be 7 + for (size_t i = 5; i < 10; i++) { + std::string key = ToString(i+1); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); + } + ASSERT_EQ(10U, cache->GetCapacity()); + ASSERT_EQ(10U, cache->GetUsage()); + for (size_t i = 0; i < 5; i++) { + cache->Release(handles[i]); + } + ASSERT_EQ(10U, cache->GetCapacity()); + ASSERT_EQ(10U, cache->GetUsage()); + cache->SetCapacity(7); + ASSERT_EQ(7, cache->GetCapacity()); + ASSERT_EQ(7, cache->GetUsage()); + + // release remaining 5 to keep valgrind happy + for (size_t i = 5; i < 10; i++) { + cache->Release(handles[i]); + } +} + +TEST_P(CacheTest, SetStrictCapacityLimit) { + // test1: set the flag to false. Insert more keys than capacity. See if they + // all go through. + std::shared_ptr<Cache> cache = NewLRUCache(5, 0, false); + std::vector<Cache::Handle*> handles(10); + Status s; + for (size_t i = 0; i < 10; i++) { + std::string key = ToString(i + 1); + s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_OK(s); + ASSERT_NE(nullptr, handles[i]); + } + + // test2: set the flag to true. Insert and check if it fails. + std::string extra_key = "extra"; + Value* extra_value = new Value(0); + cache->SetStrictCapacityLimit(true); + Cache::Handle* handle; + s = cache->Insert(extra_key, extra_value, 1, &deleter, &handle); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(nullptr, handle); + + for (size_t i = 0; i < 10; i++) { + cache->Release(handles[i]); + } + + // test3: init with flag being true. + std::shared_ptr<Cache> cache2 = NewLRUCache(5, 0, true); + for (size_t i = 0; i < 5; i++) { + std::string key = ToString(i + 1); + s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_OK(s); + ASSERT_NE(nullptr, handles[i]); + } + s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(nullptr, handle); + // test insert without handle + s = cache2->Insert(extra_key, extra_value, 1, &deleter); + // AS if the key have been inserted into cache but get evicted immediately. + ASSERT_OK(s); + ASSERT_EQ(5, cache->GetUsage()); + ASSERT_EQ(nullptr, cache2->Lookup(extra_key)); + + for (size_t i = 0; i < 5; i++) { + cache2->Release(handles[i]); + } +} + +TEST_P(CacheTest, OverCapacity) { + size_t n = 10; + + // a LRUCache with n entries and one shard only + std::shared_ptr<Cache> cache = NewCache(n, 0, false); + + std::vector<Cache::Handle*> handles(n+1); + + // Insert n+1 entries, but not releasing. + for (size_t i = 0; i < n + 1; i++) { + std::string key = ToString(i+1); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); + } + + // Guess what's in the cache now? + for (size_t i = 0; i < n + 1; i++) { + std::string key = ToString(i+1); + auto h = cache->Lookup(key); + ASSERT_TRUE(h != nullptr); + if (h) cache->Release(h); + } + + // the cache is over capacity since nothing could be evicted + ASSERT_EQ(n + 1U, cache->GetUsage()); + for (size_t i = 0; i < n + 1; i++) { + cache->Release(handles[i]); + } + // Make sure eviction is triggered. + cache->SetCapacity(n); + + // cache is under capacity now since elements were released + ASSERT_EQ(n, cache->GetUsage()); + + // element 0 is evicted and the rest is there + // This is consistent with the LRU policy since the element 0 + // was released first + for (size_t i = 0; i < n + 1; i++) { + std::string key = ToString(i+1); + auto h = cache->Lookup(key); + if (h) { + ASSERT_NE(i, 0U); + cache->Release(h); + } else { + ASSERT_EQ(i, 0U); + } + } +} + +namespace { +std::vector<std::pair<int, int>> callback_state; +void callback(void* entry, size_t charge) { + callback_state.push_back({DecodeValue(entry), static_cast<int>(charge)}); +} +}; + +TEST_P(CacheTest, ApplyToAllCacheEntiresTest) { + std::vector<std::pair<int, int>> inserted; + callback_state.clear(); + + for (int i = 0; i < 10; ++i) { + Insert(i, i * 2, i + 1); + inserted.push_back({i * 2, i + 1}); + } + cache_->ApplyToAllCacheEntries(callback, true); + + std::sort(inserted.begin(), inserted.end()); + std::sort(callback_state.begin(), callback_state.end()); + ASSERT_TRUE(inserted == callback_state); +} + +TEST_P(CacheTest, DefaultShardBits) { + // test1: set the flag to false. Insert more keys than capacity. See if they + // all go through. + std::shared_ptr<Cache> cache = NewCache(16 * 1024L * 1024L); + ShardedCache* sc = dynamic_cast<ShardedCache*>(cache.get()); + ASSERT_EQ(5, sc->GetNumShardBits()); + + cache = NewLRUCache(511 * 1024L, -1, true); + sc = dynamic_cast<ShardedCache*>(cache.get()); + ASSERT_EQ(0, sc->GetNumShardBits()); + + cache = NewLRUCache(1024L * 1024L * 1024L, -1, true); + sc = dynamic_cast<ShardedCache*>(cache.get()); + ASSERT_EQ(6, sc->GetNumShardBits()); +} + +#ifdef SUPPORT_CLOCK_CACHE +std::shared_ptr<Cache> (*new_clock_cache_func)(size_t, int, + bool) = NewClockCache; +INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest, + testing::Values(kLRU, kClock)); +#else +INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest, testing::Values(kLRU)); +#endif // SUPPORT_CLOCK_CACHE + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/cache/clock_cache.cc b/src/rocksdb/cache/clock_cache.cc new file mode 100644 index 00000000..89173834 --- /dev/null +++ b/src/rocksdb/cache/clock_cache.cc @@ -0,0 +1,728 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "cache/clock_cache.h" + +#ifndef SUPPORT_CLOCK_CACHE + +namespace rocksdb { + +std::shared_ptr<Cache> NewClockCache(size_t /*capacity*/, int /*num_shard_bits*/, + bool /*strict_capacity_limit*/) { + // Clock cache not supported. + return nullptr; +} + +} // namespace rocksdb + +#else + +#include <assert.h> +#include <atomic> +#include <deque> + +// "tbb/concurrent_hash_map.h" requires RTTI if exception is enabled. +// Disable it so users can chooose to disable RTTI. +#ifndef ROCKSDB_USE_RTTI +#define TBB_USE_EXCEPTIONS 0 +#endif +#include "tbb/concurrent_hash_map.h" + +#include "cache/sharded_cache.h" +#include "port/port.h" +#include "util/autovector.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +namespace { + +// An implementation of the Cache interface based on CLOCK algorithm, with +// better concurrent performance than LRUCache. The idea of CLOCK algorithm +// is to maintain all cache entries in a circular list, and an iterator +// (the "head") pointing to the last examined entry. Eviction starts from the +// current head. Each entry is given a second chance before eviction, if it +// has been access since last examine. In contrast to LRU, no modification +// to the internal data-structure (except for flipping the usage bit) needs +// to be done upon lookup. This gives us oppertunity to implement a cache +// with better concurrency. +// +// Each cache entry is represented by a cache handle, and all the handles +// are arranged in a circular list, as describe above. Upon erase of an entry, +// we never remove the handle. Instead, the handle is put into a recycle bin +// to be re-use. This is to avoid memory dealocation, which is hard to deal +// with in concurrent environment. +// +// The cache also maintains a concurrent hash map for lookup. Any concurrent +// hash map implementation should do the work. We currently use +// tbb::concurrent_hash_map because it supports concurrent erase. +// +// Each cache handle has the following flags and counters, which are squeeze +// in an atomic interger, to make sure the handle always be in a consistent +// state: +// +// * In-cache bit: whether the entry is reference by the cache itself. If +// an entry is in cache, its key would also be available in the hash map. +// * Usage bit: whether the entry has been access by user since last +// examine for eviction. Can be reset by eviction. +// * Reference count: reference count by user. +// +// An entry can be reference only when it's in cache. An entry can be evicted +// only when it is in cache, has no usage since last examine, and reference +// count is zero. +// +// The follow figure shows a possible layout of the cache. Boxes represents +// cache handles and numbers in each box being in-cache bit, usage bit and +// reference count respectively. +// +// hash map: +// +-------+--------+ +// | key | handle | +// +-------+--------+ +// | "foo" | 5 |-------------------------------------+ +// +-------+--------+ | +// | "bar" | 2 |--+ | +// +-------+--------+ | | +// | | +// head | | +// | | | +// circular list: | | | +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// |(0,0,0)|---|(1,1,0)|---|(0,0,0)|---|(0,1,3)|---|(1,0,0)|---| ... +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// | | +// +-------+ +-----------+ +// | | +// +---+---+ +// recycle bin: | 1 | 3 | +// +---+---+ +// +// Suppose we try to insert "baz" into the cache at this point and the cache is +// full. The cache will first look for entries to evict, starting from where +// head points to (the second entry). It resets usage bit of the second entry, +// skips the third and fourth entry since they are not in cache, and finally +// evict the fifth entry ("foo"). It looks at recycle bin for available handle, +// grabs handle 3, and insert the key into the handle. The following figure +// shows the resulting layout. +// +// hash map: +// +-------+--------+ +// | key | handle | +// +-------+--------+ +// | "baz" | 3 |-------------+ +// +-------+--------+ | +// | "bar" | 2 |--+ | +// +-------+--------+ | | +// | | +// | | head +// | | | +// circular list: | | | +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// |(0,0,0)|---|(1,0,0)|---|(1,0,0)|---|(0,1,3)|---|(0,0,0)|---| ... +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// | | +// +-------+ +-----------------------------------+ +// | | +// +---+---+ +// recycle bin: | 1 | 5 | +// +---+---+ +// +// A global mutex guards the circular list, the head, and the recycle bin. +// We additionally require that modifying the hash map needs to hold the mutex. +// As such, Modifying the cache (such as Insert() and Erase()) require to +// hold the mutex. Lookup() only access the hash map and the flags associated +// with each handle, and don't require explicit locking. Release() has to +// acquire the mutex only when it releases the last reference to the entry and +// the entry has been erased from cache explicitly. A future improvement could +// be to remove the mutex completely. +// +// Benchmark: +// We run readrandom db_bench on a test DB of size 13GB, with size of each +// level: +// +// Level Files Size(MB) +// ------------------------- +// L0 1 0.01 +// L1 18 17.32 +// L2 230 182.94 +// L3 1186 1833.63 +// L4 4602 8140.30 +// +// We test with both 32 and 16 read threads, with 2GB cache size (the whole DB +// doesn't fits in) and 64GB cache size (the whole DB can fit in cache), and +// whether to put index and filter blocks in block cache. The benchmark runs +// with +// with RocksDB 4.10. We got the following result: +// +// Threads Cache Cache ClockCache LRUCache +// Size Index/Filter Throughput(MB/s) Hit Throughput(MB/s) Hit +// 32 2GB yes 466.7 85.9% 433.7 86.5% +// 32 2GB no 529.9 72.7% 532.7 73.9% +// 32 64GB yes 649.9 99.9% 507.9 99.9% +// 32 64GB no 740.4 99.9% 662.8 99.9% +// 16 2GB yes 278.4 85.9% 283.4 86.5% +// 16 2GB no 318.6 72.7% 335.8 73.9% +// 16 64GB yes 391.9 99.9% 353.3 99.9% +// 16 64GB no 433.8 99.8% 419.4 99.8% + +// Cache entry meta data. +struct CacheHandle { + Slice key; + uint32_t hash; + void* value; + size_t charge; + void (*deleter)(const Slice&, void* value); + + // Flags and counters associated with the cache handle: + // lowest bit: n-cache bit + // second lowest bit: usage bit + // the rest bits: reference count + // The handle is unused when flags equals to 0. The thread decreases the count + // to 0 is responsible to put the handle back to recycle_ and cleanup memory. + std::atomic<uint32_t> flags; + + CacheHandle() = default; + + CacheHandle(const CacheHandle& a) { *this = a; } + + CacheHandle(const Slice& k, void* v, + void (*del)(const Slice& key, void* value)) + : key(k), value(v), deleter(del) {} + + CacheHandle& operator=(const CacheHandle& a) { + // Only copy members needed for deletion. + key = a.key; + value = a.value; + deleter = a.deleter; + return *this; + } +}; + +// Key of hash map. We store hash value with the key for convenience. +struct CacheKey { + Slice key; + uint32_t hash_value; + + CacheKey() = default; + + CacheKey(const Slice& k, uint32_t h) { + key = k; + hash_value = h; + } + + static bool equal(const CacheKey& a, const CacheKey& b) { + return a.hash_value == b.hash_value && a.key == b.key; + } + + static size_t hash(const CacheKey& a) { + return static_cast<size_t>(a.hash_value); + } +}; + +struct CleanupContext { + // List of values to be deleted, along with the key and deleter. + autovector<CacheHandle> to_delete_value; + + // List of keys to be deleted. + autovector<const char*> to_delete_key; +}; + +// A cache shard which maintains its own CLOCK cache. +class ClockCacheShard final : public CacheShard { + public: + // Hash map type. + typedef tbb::concurrent_hash_map<CacheKey, CacheHandle*, CacheKey> HashTable; + + ClockCacheShard(); + ~ClockCacheShard() override; + + // Interfaces + void SetCapacity(size_t capacity) override; + void SetStrictCapacityLimit(bool strict_capacity_limit) override; + Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, Cache::Priority priority) override; + Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + // If the entry in in cache, increase reference count and return true. + // Return false otherwise. + // + // Not necessary to hold mutex_ before being called. + bool Ref(Cache::Handle* handle) override; + bool Release(Cache::Handle* handle, bool force_erase = false) override; + void Erase(const Slice& key, uint32_t hash) override; + bool EraseAndConfirm(const Slice& key, uint32_t hash, + CleanupContext* context); + size_t GetUsage() const override; + size_t GetPinnedUsage() const override; + void EraseUnRefEntries() override; + void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + private: + static const uint32_t kInCacheBit = 1; + static const uint32_t kUsageBit = 2; + static const uint32_t kRefsOffset = 2; + static const uint32_t kOneRef = 1 << kRefsOffset; + + // Helper functions to extract cache handle flags and counters. + static bool InCache(uint32_t flags) { return flags & kInCacheBit; } + static bool HasUsage(uint32_t flags) { return flags & kUsageBit; } + static uint32_t CountRefs(uint32_t flags) { return flags >> kRefsOffset; } + + // Decrease reference count of the entry. If this decreases the count to 0, + // recycle the entry. If set_usage is true, also set the usage bit. + // + // returns true if a value is erased. + // + // Not necessary to hold mutex_ before being called. + bool Unref(CacheHandle* handle, bool set_usage, CleanupContext* context); + + // Unset in-cache bit of the entry. Recycle the handle if necessary. + // + // returns true if a value is erased. + // + // Has to hold mutex_ before being called. + bool UnsetInCache(CacheHandle* handle, CleanupContext* context); + + // Put the handle back to recycle_ list, and put the value associated with + // it into to-be-deleted list. It doesn't cleanup the key as it might be + // reused by another handle. + // + // Has to hold mutex_ before being called. + void RecycleHandle(CacheHandle* handle, CleanupContext* context); + + // Delete keys and values in to-be-deleted list. Call the method without + // holding mutex, as destructors can be expensive. + void Cleanup(const CleanupContext& context); + + // Examine the handle for eviction. If the handle is in cache, usage bit is + // not set, and referece count is 0, evict it from cache. Otherwise unset + // the usage bit. + // + // Has to hold mutex_ before being called. + bool TryEvict(CacheHandle* value, CleanupContext* context); + + // Scan through the circular list, evict entries until we get enough capacity + // for new cache entry of specific size. Return true if success, false + // otherwise. + // + // Has to hold mutex_ before being called. + bool EvictFromCache(size_t charge, CleanupContext* context); + + CacheHandle* Insert(const Slice& key, uint32_t hash, void* value, + size_t change, + void (*deleter)(const Slice& key, void* value), + bool hold_reference, CleanupContext* context); + + // Guards list_, head_, and recycle_. In addition, updating table_ also has + // to hold the mutex, to avoid the cache being in inconsistent state. + mutable port::Mutex mutex_; + + // The circular list of cache handles. Initially the list is empty. Once a + // handle is needed by insertion, and no more handles are available in + // recycle bin, one more handle is appended to the end. + // + // We use std::deque for the circular list because we want to make sure + // pointers to handles are valid through out the life-cycle of the cache + // (in contrast to std::vector), and be able to grow the list (in contrast + // to statically allocated arrays). + std::deque<CacheHandle> list_; + + // Pointer to the next handle in the circular list to be examine for + // eviction. + size_t head_; + + // Recycle bin of cache handles. + autovector<CacheHandle*> recycle_; + + // Maximum cache size. + std::atomic<size_t> capacity_; + + // Current total size of the cache. + std::atomic<size_t> usage_; + + // Total un-released cache size. + std::atomic<size_t> pinned_usage_; + + // Whether allow insert into cache if cache is full. + std::atomic<bool> strict_capacity_limit_; + + // Hash table (tbb::concurrent_hash_map) for lookup. + HashTable table_; +}; + +ClockCacheShard::ClockCacheShard() + : head_(0), usage_(0), pinned_usage_(0), strict_capacity_limit_(false) {} + +ClockCacheShard::~ClockCacheShard() { + for (auto& handle : list_) { + uint32_t flags = handle.flags.load(std::memory_order_relaxed); + if (InCache(flags) || CountRefs(flags) > 0) { + if (handle.deleter != nullptr) { + (*handle.deleter)(handle.key, handle.value); + } + delete[] handle.key.data(); + } + } +} + +size_t ClockCacheShard::GetUsage() const { + return usage_.load(std::memory_order_relaxed); +} + +size_t ClockCacheShard::GetPinnedUsage() const { + return pinned_usage_.load(std::memory_order_relaxed); +} + +void ClockCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.Lock(); + } + for (auto& handle : list_) { + // Use relaxed semantics instead of acquire semantics since we are either + // holding mutex, or don't have thread safe requirement. + uint32_t flags = handle.flags.load(std::memory_order_relaxed); + if (InCache(flags)) { + callback(handle.value, handle.charge); + } + } + if (thread_safe) { + mutex_.Unlock(); + } +} + +void ClockCacheShard::RecycleHandle(CacheHandle* handle, + CleanupContext* context) { + mutex_.AssertHeld(); + assert(!InCache(handle->flags) && CountRefs(handle->flags) == 0); + context->to_delete_key.push_back(handle->key.data()); + context->to_delete_value.emplace_back(*handle); + handle->key.clear(); + handle->value = nullptr; + handle->deleter = nullptr; + recycle_.push_back(handle); + usage_.fetch_sub(handle->charge, std::memory_order_relaxed); +} + +void ClockCacheShard::Cleanup(const CleanupContext& context) { + for (const CacheHandle& handle : context.to_delete_value) { + if (handle.deleter) { + (*handle.deleter)(handle.key, handle.value); + } + } + for (const char* key : context.to_delete_key) { + delete[] key; + } +} + +bool ClockCacheShard::Ref(Cache::Handle* h) { + auto handle = reinterpret_cast<CacheHandle*>(h); + // CAS loop to increase reference count. + uint32_t flags = handle->flags.load(std::memory_order_relaxed); + while (InCache(flags)) { + // Use acquire semantics on success, as further operations on the cache + // entry has to be order after reference count is increased. + if (handle->flags.compare_exchange_weak(flags, flags + kOneRef, + std::memory_order_acquire, + std::memory_order_relaxed)) { + if (CountRefs(flags) == 0) { + // No reference count before the operation. + pinned_usage_.fetch_add(handle->charge, std::memory_order_relaxed); + } + return true; + } + } + return false; +} + +bool ClockCacheShard::Unref(CacheHandle* handle, bool set_usage, + CleanupContext* context) { + if (set_usage) { + handle->flags.fetch_or(kUsageBit, std::memory_order_relaxed); + } + // Use acquire-release semantics as previous operations on the cache entry + // has to be order before reference count is decreased, and potential cleanup + // of the entry has to be order after. + uint32_t flags = handle->flags.fetch_sub(kOneRef, std::memory_order_acq_rel); + assert(CountRefs(flags) > 0); + if (CountRefs(flags) == 1) { + // this is the last reference. + pinned_usage_.fetch_sub(handle->charge, std::memory_order_relaxed); + // Cleanup if it is the last reference. + if (!InCache(flags)) { + MutexLock l(&mutex_); + RecycleHandle(handle, context); + } + } + return context->to_delete_value.size(); +} + +bool ClockCacheShard::UnsetInCache(CacheHandle* handle, + CleanupContext* context) { + mutex_.AssertHeld(); + // Use acquire-release semantics as previous operations on the cache entry + // has to be order before reference count is decreased, and potential cleanup + // of the entry has to be order after. + uint32_t flags = + handle->flags.fetch_and(~kInCacheBit, std::memory_order_acq_rel); + // Cleanup if it is the last reference. + if (InCache(flags) && CountRefs(flags) == 0) { + RecycleHandle(handle, context); + } + return context->to_delete_value.size(); +} + +bool ClockCacheShard::TryEvict(CacheHandle* handle, CleanupContext* context) { + mutex_.AssertHeld(); + uint32_t flags = kInCacheBit; + if (handle->flags.compare_exchange_strong(flags, 0, std::memory_order_acquire, + std::memory_order_relaxed)) { + bool erased __attribute__((__unused__)) = + table_.erase(CacheKey(handle->key, handle->hash)); + assert(erased); + RecycleHandle(handle, context); + return true; + } + handle->flags.fetch_and(~kUsageBit, std::memory_order_relaxed); + return false; +} + +bool ClockCacheShard::EvictFromCache(size_t charge, CleanupContext* context) { + size_t usage = usage_.load(std::memory_order_relaxed); + size_t capacity = capacity_.load(std::memory_order_relaxed); + if (usage == 0) { + return charge <= capacity; + } + size_t new_head = head_; + bool second_iteration = false; + while (usage + charge > capacity) { + assert(new_head < list_.size()); + if (TryEvict(&list_[new_head], context)) { + usage = usage_.load(std::memory_order_relaxed); + } + new_head = (new_head + 1 >= list_.size()) ? 0 : new_head + 1; + if (new_head == head_) { + if (second_iteration) { + return false; + } else { + second_iteration = true; + } + } + } + head_ = new_head; + return true; +} + +void ClockCacheShard::SetCapacity(size_t capacity) { + CleanupContext context; + { + MutexLock l(&mutex_); + capacity_.store(capacity, std::memory_order_relaxed); + EvictFromCache(0, &context); + } + Cleanup(context); +} + +void ClockCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + strict_capacity_limit_.store(strict_capacity_limit, + std::memory_order_relaxed); +} + +CacheHandle* ClockCacheShard::Insert( + const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), bool hold_reference, + CleanupContext* context) { + MutexLock l(&mutex_); + bool success = EvictFromCache(charge, context); + bool strict = strict_capacity_limit_.load(std::memory_order_relaxed); + if (!success && (strict || !hold_reference)) { + context->to_delete_key.push_back(key.data()); + if (!hold_reference) { + context->to_delete_value.emplace_back(key, value, deleter); + } + return nullptr; + } + // Grab available handle from recycle bin. If recycle bin is empty, create + // and append new handle to end of circular list. + CacheHandle* handle = nullptr; + if (!recycle_.empty()) { + handle = recycle_.back(); + recycle_.pop_back(); + } else { + list_.emplace_back(); + handle = &list_.back(); + } + // Fill handle. + handle->key = key; + handle->hash = hash; + handle->value = value; + handle->charge = charge; + handle->deleter = deleter; + uint32_t flags = hold_reference ? kInCacheBit + kOneRef : kInCacheBit; + handle->flags.store(flags, std::memory_order_relaxed); + HashTable::accessor accessor; + if (table_.find(accessor, CacheKey(key, hash))) { + CacheHandle* existing_handle = accessor->second; + table_.erase(accessor); + UnsetInCache(existing_handle, context); + } + table_.insert(HashTable::value_type(CacheKey(key, hash), handle)); + if (hold_reference) { + pinned_usage_.fetch_add(charge, std::memory_order_relaxed); + } + usage_.fetch_add(charge, std::memory_order_relaxed); + return handle; +} + +Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** out_handle, + Cache::Priority /*priority*/) { + CleanupContext context; + HashTable::accessor accessor; + char* key_data = new char[key.size()]; + memcpy(key_data, key.data(), key.size()); + Slice key_copy(key_data, key.size()); + CacheHandle* handle = Insert(key_copy, hash, value, charge, deleter, + out_handle != nullptr, &context); + Status s; + if (out_handle != nullptr) { + if (handle == nullptr) { + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } else { + *out_handle = reinterpret_cast<Cache::Handle*>(handle); + } + } + Cleanup(context); + return s; +} + +Cache::Handle* ClockCacheShard::Lookup(const Slice& key, uint32_t hash) { + HashTable::const_accessor accessor; + if (!table_.find(accessor, CacheKey(key, hash))) { + return nullptr; + } + CacheHandle* handle = accessor->second; + accessor.release(); + // Ref() could fail if another thread sneak in and evict/erase the cache + // entry before we are able to hold reference. + if (!Ref(reinterpret_cast<Cache::Handle*>(handle))) { + return nullptr; + } + // Double check the key since the handle may now representing another key + // if other threads sneak in, evict/erase the entry and re-used the handle + // for another cache entry. + if (hash != handle->hash || key != handle->key) { + CleanupContext context; + Unref(handle, false, &context); + // It is possible Unref() delete the entry, so we need to cleanup. + Cleanup(context); + return nullptr; + } + return reinterpret_cast<Cache::Handle*>(handle); +} + +bool ClockCacheShard::Release(Cache::Handle* h, bool force_erase) { + CleanupContext context; + CacheHandle* handle = reinterpret_cast<CacheHandle*>(h); + bool erased = Unref(handle, true, &context); + if (force_erase && !erased) { + erased = EraseAndConfirm(handle->key, handle->hash, &context); + } + Cleanup(context); + return erased; +} + +void ClockCacheShard::Erase(const Slice& key, uint32_t hash) { + CleanupContext context; + EraseAndConfirm(key, hash, &context); + Cleanup(context); +} + +bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash, + CleanupContext* context) { + MutexLock l(&mutex_); + HashTable::accessor accessor; + bool erased = false; + if (table_.find(accessor, CacheKey(key, hash))) { + CacheHandle* handle = accessor->second; + table_.erase(accessor); + erased = UnsetInCache(handle, context); + } + return erased; +} + +void ClockCacheShard::EraseUnRefEntries() { + CleanupContext context; + { + MutexLock l(&mutex_); + table_.clear(); + for (auto& handle : list_) { + UnsetInCache(&handle, &context); + } + } + Cleanup(context); +} + +class ClockCache final : public ShardedCache { + public: + ClockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) { + int num_shards = 1 << num_shard_bits; + shards_ = new ClockCacheShard[num_shards]; + SetCapacity(capacity); + SetStrictCapacityLimit(strict_capacity_limit); + } + + ~ClockCache() override { delete[] shards_; } + + const char* Name() const override { return "ClockCache"; } + + CacheShard* GetShard(int shard) override { + return reinterpret_cast<CacheShard*>(&shards_[shard]); + } + + const CacheShard* GetShard(int shard) const override { + return reinterpret_cast<CacheShard*>(&shards_[shard]); + } + + void* Value(Handle* handle) override { + return reinterpret_cast<const CacheHandle*>(handle)->value; + } + + size_t GetCharge(Handle* handle) const override { + return reinterpret_cast<const CacheHandle*>(handle)->charge; + } + + uint32_t GetHash(Handle* handle) const override { + return reinterpret_cast<const CacheHandle*>(handle)->hash; + } + + void DisownData() override { shards_ = nullptr; } + + private: + ClockCacheShard* shards_; +}; + +} // end anonymous namespace + +std::shared_ptr<Cache> NewClockCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { + if (num_shard_bits < 0) { + num_shard_bits = GetDefaultCacheShardBits(capacity); + } + return std::make_shared<ClockCache>(capacity, num_shard_bits, + strict_capacity_limit); +} + +} // namespace rocksdb + +#endif // SUPPORT_CLOCK_CACHE diff --git a/src/rocksdb/cache/clock_cache.h b/src/rocksdb/cache/clock_cache.h new file mode 100644 index 00000000..1614c0ed --- /dev/null +++ b/src/rocksdb/cache/clock_cache.h @@ -0,0 +1,16 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/cache.h" + +#if defined(TBB) && !defined(ROCKSDB_LITE) +#define SUPPORT_CLOCK_CACHE +#endif diff --git a/src/rocksdb/cache/lru_cache.cc b/src/rocksdb/cache/lru_cache.cc new file mode 100644 index 00000000..fdcbb4e8 --- /dev/null +++ b/src/rocksdb/cache/lru_cache.cc @@ -0,0 +1,572 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "cache/lru_cache.h" + +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> +#include <string> + +#include "util/mutexlock.h" + +namespace rocksdb { + +LRUHandleTable::LRUHandleTable() : list_(nullptr), length_(0), elems_(0) { + Resize(); +} + +LRUHandleTable::~LRUHandleTable() { + ApplyToAllCacheEntries([](LRUHandle* h) { + if (h->refs == 1) { + h->Free(); + } + }); + delete[] list_; +} + +LRUHandle* LRUHandleTable::Lookup(const Slice& key, uint32_t hash) { + return *FindPointer(key, hash); +} + +LRUHandle* LRUHandleTable::Insert(LRUHandle* h) { + LRUHandle** ptr = FindPointer(h->key(), h->hash); + LRUHandle* old = *ptr; + h->next_hash = (old == nullptr ? nullptr : old->next_hash); + *ptr = h; + if (old == nullptr) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; +} + +LRUHandle* LRUHandleTable::Remove(const Slice& key, uint32_t hash) { + LRUHandle** ptr = FindPointer(key, hash); + LRUHandle* result = *ptr; + if (result != nullptr) { + *ptr = result->next_hash; + --elems_; + } + return result; +} + +LRUHandle** LRUHandleTable::FindPointer(const Slice& key, uint32_t hash) { + LRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; +} + +void LRUHandleTable::Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + LRUHandle** new_list = new LRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + LRUHandle* h = list_[i]; + while (h != nullptr) { + LRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + LRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + assert(elems_ == count); + delete[] list_; + list_ = new_list; + length_ = new_length; +} + +LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio, + bool use_adaptive_mutex) + : capacity_(0), + high_pri_pool_usage_(0), + strict_capacity_limit_(strict_capacity_limit), + high_pri_pool_ratio_(high_pri_pool_ratio), + high_pri_pool_capacity_(0), + usage_(0), + lru_usage_(0), + mutex_(use_adaptive_mutex) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; + lru_low_pri_ = &lru_; + SetCapacity(capacity); +} + +LRUCacheShard::~LRUCacheShard() {} + +bool LRUCacheShard::Unref(LRUHandle* e) { + assert(e->refs > 0); + e->refs--; + return e->refs == 0; +} + +// Call deleter and free + +void LRUCacheShard::EraseUnRefEntries() { + autovector<LRUHandle*> last_reference_list; + { + MutexLock l(&mutex_); + while (lru_.next != &lru_) { + LRUHandle* old = lru_.next; + assert(old->InCache()); + assert(old->refs == + 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + last_reference_list.push_back(old); + } + } + + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void LRUCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.Lock(); + } + table_.ApplyToAllCacheEntries( + [callback](LRUHandle* h) { callback(h->value, h->charge); }); + if (thread_safe) { + mutex_.Unlock(); + } +} + +void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) { + *lru = &lru_; + *lru_low_pri = lru_low_pri_; +} + +size_t LRUCacheShard::TEST_GetLRUSize() { + LRUHandle* lru_handle = lru_.next; + size_t lru_size = 0; + while (lru_handle != &lru_) { + lru_size++; + lru_handle = lru_handle->next; + } + return lru_size; +} + +double LRUCacheShard::GetHighPriPoolRatio() { + MutexLock l(&mutex_); + return high_pri_pool_ratio_; +} + +void LRUCacheShard::LRU_Remove(LRUHandle* e) { + assert(e->next != nullptr); + assert(e->prev != nullptr); + if (lru_low_pri_ == e) { + lru_low_pri_ = e->prev; + } + e->next->prev = e->prev; + e->prev->next = e->next; + e->prev = e->next = nullptr; + lru_usage_ -= e->charge; + if (e->InHighPriPool()) { + assert(high_pri_pool_usage_ >= e->charge); + high_pri_pool_usage_ -= e->charge; + } +} + +void LRUCacheShard::LRU_Insert(LRUHandle* e) { + assert(e->next == nullptr); + assert(e->prev == nullptr); + if (high_pri_pool_ratio_ > 0 && (e->IsHighPri() || e->HasHit())) { + // Inset "e" to head of LRU list. + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(true); + high_pri_pool_usage_ += e->charge; + MaintainPoolSize(); + } else { + // Insert "e" to the head of low-pri pool. Note that when + // high_pri_pool_ratio is 0, head of low-pri pool is also head of LRU list. + e->next = lru_low_pri_->next; + e->prev = lru_low_pri_; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(false); + lru_low_pri_ = e; + } + lru_usage_ += e->charge; +} + +void LRUCacheShard::MaintainPoolSize() { + while (high_pri_pool_usage_ > high_pri_pool_capacity_) { + // Overflow last entry in high-pri pool to low-pri pool. + lru_low_pri_ = lru_low_pri_->next; + assert(lru_low_pri_ != &lru_); + lru_low_pri_->SetInHighPriPool(false); + high_pri_pool_usage_ -= lru_low_pri_->charge; + } +} + +void LRUCacheShard::EvictFromLRU(size_t charge, + autovector<LRUHandle*>* deleted) { + while (usage_ + charge > capacity_ && lru_.next != &lru_) { + LRUHandle* old = lru_.next; + assert(old->InCache()); + assert(old->refs == 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + deleted->push_back(old); + } +} + +void LRUCacheShard::SetCapacity(size_t capacity) { + autovector<LRUHandle*> last_reference_list; + { + MutexLock l(&mutex_); + capacity_ = capacity; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + EvictFromLRU(0, &last_reference_list); + } + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + MutexLock l(&mutex_); + strict_capacity_limit_ = strict_capacity_limit; +} + +Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { + MutexLock l(&mutex_); + LRUHandle* e = table_.Lookup(key, hash); + if (e != nullptr) { + assert(e->InCache()); + if (e->refs == 1) { + LRU_Remove(e); + } + e->refs++; + e->SetHit(); + } + return reinterpret_cast<Cache::Handle*>(e); +} + +bool LRUCacheShard::Ref(Cache::Handle* h) { + LRUHandle* handle = reinterpret_cast<LRUHandle*>(h); + MutexLock l(&mutex_); + if (handle->InCache() && handle->refs == 1) { + LRU_Remove(handle); + } + handle->refs++; + return true; +} + +void LRUCacheShard::SetHighPriorityPoolRatio(double high_pri_pool_ratio) { + MutexLock l(&mutex_); + high_pri_pool_ratio_ = high_pri_pool_ratio; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + MaintainPoolSize(); +} + +bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) { + if (handle == nullptr) { + return false; + } + LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); + bool last_reference = false; + { + MutexLock l(&mutex_); + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (e->refs == 1 && e->InCache()) { + // The item is still in cache, and nobody else holds a reference to it + if (usage_ > capacity_ || force_erase) { + // the cache is full + // The LRU list must be empty since the cache is full + assert(!(usage_ > capacity_) || lru_.next == &lru_); + // take this opportunity and remove the item + table_.Remove(e->key(), e->hash); + e->SetInCache(false); + Unref(e); + usage_ -= e->charge; + last_reference = true; + } else { + // put the item on the list to be potentially freed + LRU_Insert(e); + } + } + } + + // free outside of mutex + if (last_reference) { + e->Free(); + } + return last_reference; +} + +Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, Cache::Priority priority) { + // Allocate the memory here outside of the mutex + // If the cache is full, we'll have to release it + // It shouldn't happen very often though. + LRUHandle* e = reinterpret_cast<LRUHandle*>( + new char[sizeof(LRUHandle) - 1 + key.size()]); + Status s; + autovector<LRUHandle*> last_reference_list; + + e->value = value; + e->deleter = deleter; + e->charge = charge; + e->key_length = key.size(); + e->flags = 0; + e->hash = hash; + e->refs = (handle == nullptr + ? 1 + : 2); // One from LRUCache, one for the returned handle + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + memcpy(e->key_data, key.data(), key.size()); + + { + MutexLock l(&mutex_); + + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(charge, &last_reference_list); + + if (usage_ - lru_usage_ + charge > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + last_reference_list.push_back(e); + } else { + delete[] reinterpret_cast<char*>(e); + *handle = nullptr; + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // insert into the cache + // note that the cache might get larger than its capacity if not enough + // space was freed + LRUHandle* old = table_.Insert(e); + usage_ += e->charge; + if (old != nullptr) { + old->SetInCache(false); + if (Unref(old)) { + usage_ -= old->charge; + // old is on LRU because it's in cache and its reference count + // was just 1 (Unref returned 0) + LRU_Remove(old); + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + *handle = reinterpret_cast<Cache::Handle*>(e); + } + s = Status::OK(); + } + } + + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } + + return s; +} + +void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { + LRUHandle* e; + bool last_reference = false; + { + MutexLock l(&mutex_); + e = table_.Remove(key, hash); + if (e != nullptr) { + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (last_reference && e->InCache()) { + LRU_Remove(e); + } + e->SetInCache(false); + } + } + + // mutex not held here + // last_reference will only be true if e != nullptr + if (last_reference) { + e->Free(); + } +} + +size_t LRUCacheShard::GetUsage() const { + MutexLock l(&mutex_); + return usage_; +} + +size_t LRUCacheShard::GetPinnedUsage() const { + MutexLock l(&mutex_); + assert(usage_ >= lru_usage_); + return usage_ - lru_usage_; +} + +std::string LRUCacheShard::GetPrintableOptions() const { + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + MutexLock l(&mutex_); + snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n", + high_pri_pool_ratio_); + } + return std::string(buffer); +} + +LRUCache::LRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit, double high_pri_pool_ratio, + std::shared_ptr<MemoryAllocator> allocator, + bool use_adaptive_mutex) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, + std::move(allocator)) { + num_shards_ = 1 << num_shard_bits; + shards_ = reinterpret_cast<LRUCacheShard*>( + port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_)); + size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_; + for (int i = 0; i < num_shards_; i++) { + new (&shards_[i]) + LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio, + use_adaptive_mutex); + } +} + +LRUCache::~LRUCache() { + if (shards_ != nullptr) { + assert(num_shards_ > 0); + for (int i = 0; i < num_shards_; i++) { + shards_[i].~LRUCacheShard(); + } + port::cacheline_aligned_free(shards_); + } +} + +CacheShard* LRUCache::GetShard(int shard) { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +const CacheShard* LRUCache::GetShard(int shard) const { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +void* LRUCache::Value(Handle* handle) { + return reinterpret_cast<const LRUHandle*>(handle)->value; +} + +size_t LRUCache::GetCharge(Handle* handle) const { + return reinterpret_cast<const LRUHandle*>(handle)->charge; +} + +uint32_t LRUCache::GetHash(Handle* handle) const { + return reinterpret_cast<const LRUHandle*>(handle)->hash; +} + +void LRUCache::DisownData() { +// Do not drop data if compile with ASAN to suppress leak warning. +#if defined(__clang__) +#if !defined(__has_feature) || !__has_feature(address_sanitizer) + shards_ = nullptr; + num_shards_ = 0; +#endif +#else // __clang__ +#ifndef __SANITIZE_ADDRESS__ + shards_ = nullptr; + num_shards_ = 0; +#endif // !__SANITIZE_ADDRESS__ +#endif // __clang__ +} + +size_t LRUCache::TEST_GetLRUSize() { + size_t lru_size_of_all_shards = 0; + for (int i = 0; i < num_shards_; i++) { + lru_size_of_all_shards += shards_[i].TEST_GetLRUSize(); + } + return lru_size_of_all_shards; +} + +double LRUCache::GetHighPriPoolRatio() { + double result = 0.0; + if (num_shards_ > 0) { + result = shards_[0].GetHighPriPoolRatio(); + } + return result; +} + +std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) { + return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, + cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, + cache_opts.use_adaptive_mutex); +} + +std::shared_ptr<Cache> NewLRUCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr<MemoryAllocator> memory_allocator, + bool use_adaptive_mutex) { + if (num_shard_bits >= 20) { + return nullptr; // the cache cannot be sharded into too many fine pieces + } + if (high_pri_pool_ratio < 0.0 || high_pri_pool_ratio > 1.0) { + // invalid high_pri_pool_ratio + return nullptr; + } + if (num_shard_bits < 0) { + num_shard_bits = GetDefaultCacheShardBits(capacity); + } + return std::make_shared<LRUCache>(capacity, num_shard_bits, + strict_capacity_limit, high_pri_pool_ratio, + std::move(memory_allocator), + use_adaptive_mutex); +} + +} // namespace rocksdb diff --git a/src/rocksdb/cache/lru_cache.h b/src/rocksdb/cache/lru_cache.h new file mode 100644 index 00000000..0d9a3174 --- /dev/null +++ b/src/rocksdb/cache/lru_cache.h @@ -0,0 +1,316 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include <string> + +#include "cache/sharded_cache.h" + +#include "port/port.h" +#include "util/autovector.h" + +namespace rocksdb { + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. +// Entries are referenced by cache and/or by any external entity. +// The cache keeps all its entries in table. Some elements +// are also stored on LRU list. +// +// LRUHandle can be in these states: +// 1. Referenced externally AND in hash table. +// In that case the entry is *not* in the LRU. (refs > 1 && in_cache == true) +// 2. Not referenced externally and in hash table. In that case the entry is +// in the LRU and can be freed. (refs == 1 && in_cache == true) +// 3. Referenced externally and not in hash table. In that case the entry is +// in not on LRU and not in table. (refs >= 1 && in_cache == false) +// +// All newly created LRUHandles are in state 1. If you call +// LRUCacheShard::Release +// on entry in state 1, it will go into state 2. To move from state 1 to +// state 3, either call LRUCacheShard::Erase or LRUCacheShard::Insert with the +// same key. +// To move from state 2 to state 1, use LRUCacheShard::Lookup. +// Before destruction, make sure that no handles are in state 1. This means +// that any successful LRUCacheShard::Lookup/LRUCacheShard::Insert have a +// matching +// RUCache::Release (to move into state 2) or LRUCacheShard::Erase (for state 3) + +struct LRUHandle { + void* value; + void (*deleter)(const Slice&, void* value); + LRUHandle* next_hash; + LRUHandle* next; + LRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + size_t key_length; + uint32_t refs; // a number of refs to this entry + // cache itself is counted as 1 + + // Include the following flags: + // IN_CACHE: whether this entry is referenced by the hash table. + // IS_HIGH_PRI: whether this entry is high priority entry. + // IN_HIGH_PRI_POOL: whether this entry is in high-pri pool. + // HAS_HIT: whether this entry has had any lookups (hits). + enum Flags : uint8_t { + IN_CACHE = (1 << 0), + IS_HIGH_PRI = (1 << 1), + IN_HIGH_PRI_POOL = (1 << 2), + HAS_HIT = (1 << 3), + }; + + uint8_t flags; + + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + + char key_data[1]; // Beginning of key + + Slice key() const { + // For cheaper lookups, we allow a temporary Handle object + // to store a pointer to a key in "value". + if (next == this) { + return *(reinterpret_cast<Slice*>(value)); + } else { + return Slice(key_data, key_length); + } + } + + bool InCache() const { return flags & IN_CACHE; } + bool IsHighPri() const { return flags & IS_HIGH_PRI; } + bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; } + bool HasHit() const { return flags & HAS_HIT; } + + void SetInCache(bool in_cache) { + if (in_cache) { + flags |= IN_CACHE; + } else { + flags &= ~IN_CACHE; + } + } + + void SetPriority(Cache::Priority priority) { + if (priority == Cache::Priority::HIGH) { + flags |= IS_HIGH_PRI; + } else { + flags &= ~IS_HIGH_PRI; + } + } + + void SetInHighPriPool(bool in_high_pri_pool) { + if (in_high_pri_pool) { + flags |= IN_HIGH_PRI_POOL; + } else { + flags &= ~IN_HIGH_PRI_POOL; + } + } + + void SetHit() { flags |= HAS_HIT; } + + void Free() { + assert((refs == 1 && InCache()) || (refs == 0 && !InCache())); + if (deleter) { + (*deleter)(key(), value); + } + delete[] reinterpret_cast<char*>(this); + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class LRUHandleTable { + public: + LRUHandleTable(); + ~LRUHandleTable(); + + LRUHandle* Lookup(const Slice& key, uint32_t hash); + LRUHandle* Insert(LRUHandle* h); + LRUHandle* Remove(const Slice& key, uint32_t hash); + + template <typename T> + void ApplyToAllCacheEntries(T func) { + for (uint32_t i = 0; i < length_; i++) { + LRUHandle* h = list_[i]; + while (h != nullptr) { + auto n = h->next_hash; + assert(h->InCache()); + func(h); + h = n; + } + } + } + + private: + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + LRUHandle** FindPointer(const Slice& key, uint32_t hash); + + void Resize(); + + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + LRUHandle** list_; + uint32_t length_; + uint32_t elems_; +}; + +// A single shard of sharded cache. +class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { + public: + LRUCacheShard(size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio, bool use_adaptive_mutex); + virtual ~LRUCacheShard(); + + // Separate from constructor so caller can easily make an array of LRUCache + // if current usage is more than new capacity, the function will attempt to + // free the needed space + virtual void SetCapacity(size_t capacity) override; + + // Set the flag to reject insertion if cache if full. + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + // Set percentage of capacity reserved for high-pri cache entries. + void SetHighPriorityPoolRatio(double high_pri_pool_ratio); + + // Like Cache methods, but with an extra "hash" parameter. + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, + Cache::Priority priority) override; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + virtual bool Ref(Cache::Handle* handle) override; + virtual bool Release(Cache::Handle* handle, + bool force_erase = false) override; + virtual void Erase(const Slice& key, uint32_t hash) override; + + // Although in some platforms the update of size_t is atomic, to make sure + // GetUsage() and GetPinnedUsage() work correctly under any platform, we'll + // protect them with mutex_. + + virtual size_t GetUsage() const override; + virtual size_t GetPinnedUsage() const override; + + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + virtual void EraseUnRefEntries() override; + + virtual std::string GetPrintableOptions() const override; + + void TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri); + + // Retrieves number of elements in LRU, for unit test purpose only + // not threadsafe + size_t TEST_GetLRUSize(); + + // Retrives high pri pool ratio + double GetHighPriPoolRatio(); + + private: + void LRU_Remove(LRUHandle* e); + void LRU_Insert(LRUHandle* e); + + // Overflow the last entry in high-pri pool to low-pri pool until size of + // high-pri pool is no larger than the size specify by high_pri_pool_pct. + void MaintainPoolSize(); + + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(LRUHandle* e); + + // Free some space following strict LRU policy until enough space + // to hold (usage_ + charge) is freed or the lru list is empty + // This function is not thread safe - it needs to be executed while + // holding the mutex_ + void EvictFromLRU(size_t charge, autovector<LRUHandle*>* deleted); + + // Initialized before use. + size_t capacity_; + + // Memory size for entries in high-pri pool. + size_t high_pri_pool_usage_; + + // Whether to reject insertion if cache reaches its full capacity. + bool strict_capacity_limit_; + + // Ratio of capacity reserved for high priority cache entries. + double high_pri_pool_ratio_; + + // High-pri pool size, equals to capacity * high_pri_pool_ratio. + // Remember the value to avoid recomputing each time. + double high_pri_pool_capacity_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + // LRU contains items which can be evicted, ie reference only by cache + LRUHandle lru_; + + // Pointer to head of low-pri pool in LRU list. + LRUHandle* lru_low_pri_; + + // ------------^^^^^^^^^^^^^----------- + // Not frequently modified data members + // ------------------------------------ + // + // We separate data members that are updated frequently from the ones that + // are not frequently updated so that they don't share the same cache line + // which will lead into false cache sharing + // + // ------------------------------------ + // Frequently modified data members + // ------------vvvvvvvvvvvvv----------- + LRUHandleTable table_; + + // Memory size for entries residing in the cache + size_t usage_; + + // Memory size for entries residing only in the LRU list + size_t lru_usage_; + + // mutex_ protects the following state. + // We don't count mutex_ as the cache's internal state so semantically we + // don't mind mutex_ invoking the non-const actions. + mutable port::Mutex mutex_; +}; + +class LRUCache +#ifdef NDEBUG + final +#endif + : public ShardedCache { + public: + LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr<MemoryAllocator> memory_allocator = nullptr, + bool use_adaptive_mutex = kDefaultToAdaptiveMutex); + virtual ~LRUCache(); + virtual const char* Name() const override { return "LRUCache"; } + virtual CacheShard* GetShard(int shard) override; + virtual const CacheShard* GetShard(int shard) const override; + virtual void* Value(Handle* handle) override; + virtual size_t GetCharge(Handle* handle) const override; + virtual uint32_t GetHash(Handle* handle) const override; + virtual void DisownData() override; + + // Retrieves number of elements in LRU, for unit test purpose only + size_t TEST_GetLRUSize(); + // Retrives high pri pool ratio + double GetHighPriPoolRatio(); + + private: + LRUCacheShard* shards_ = nullptr; + int num_shards_ = 0; +}; + +} // namespace rocksdb diff --git a/src/rocksdb/cache/lru_cache_test.cc b/src/rocksdb/cache/lru_cache_test.cc new file mode 100644 index 00000000..9980dd72 --- /dev/null +++ b/src/rocksdb/cache/lru_cache_test.cc @@ -0,0 +1,197 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/lru_cache.h" + +#include <string> +#include <vector> +#include "port/port.h" +#include "util/testharness.h" + +namespace rocksdb { + +class LRUCacheTest : public testing::Test { + public: + LRUCacheTest() {} + ~LRUCacheTest() override { DeleteCache(); } + + void DeleteCache() { + if (cache_ != nullptr) { + cache_->~LRUCacheShard(); + port::cacheline_aligned_free(cache_); + cache_ = nullptr; + } + } + + void NewCache(size_t capacity, double high_pri_pool_ratio = 0.0, + bool use_adaptive_mutex = kDefaultToAdaptiveMutex) { + DeleteCache(); + cache_ = reinterpret_cast<LRUCacheShard*>( + port::cacheline_aligned_alloc(sizeof(LRUCacheShard))); + new (cache_) LRUCacheShard(capacity, false /*strict_capcity_limit*/, + high_pri_pool_ratio, use_adaptive_mutex); + } + + void Insert(const std::string& key, + Cache::Priority priority = Cache::Priority::LOW) { + cache_->Insert(key, 0 /*hash*/, nullptr /*value*/, 1 /*charge*/, + nullptr /*deleter*/, nullptr /*handle*/, priority); + } + + void Insert(char key, Cache::Priority priority = Cache::Priority::LOW) { + Insert(std::string(1, key), priority); + } + + bool Lookup(const std::string& key) { + auto handle = cache_->Lookup(key, 0 /*hash*/); + if (handle) { + cache_->Release(handle); + return true; + } + return false; + } + + bool Lookup(char key) { return Lookup(std::string(1, key)); } + + void Erase(const std::string& key) { cache_->Erase(key, 0 /*hash*/); } + + void ValidateLRUList(std::vector<std::string> keys, + size_t num_high_pri_pool_keys = 0) { + LRUHandle* lru; + LRUHandle* lru_low_pri; + cache_->TEST_GetLRUList(&lru, &lru_low_pri); + LRUHandle* iter = lru; + bool in_high_pri_pool = false; + size_t high_pri_pool_keys = 0; + if (iter == lru_low_pri) { + in_high_pri_pool = true; + } + for (const auto& key : keys) { + iter = iter->next; + ASSERT_NE(lru, iter); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(in_high_pri_pool, iter->InHighPriPool()); + if (in_high_pri_pool) { + high_pri_pool_keys++; + } + if (iter == lru_low_pri) { + ASSERT_FALSE(in_high_pri_pool); + in_high_pri_pool = true; + } + } + ASSERT_EQ(lru, iter->next); + ASSERT_TRUE(in_high_pri_pool); + ASSERT_EQ(num_high_pri_pool_keys, high_pri_pool_keys); + } + + private: + LRUCacheShard* cache_ = nullptr; +}; + +TEST_F(LRUCacheTest, BasicLRU) { + NewCache(5); + for (char ch = 'a'; ch <= 'e'; ch++) { + Insert(ch); + } + ValidateLRUList({"a", "b", "c", "d", "e"}); + for (char ch = 'x'; ch <= 'z'; ch++) { + Insert(ch); + } + ValidateLRUList({"d", "e", "x", "y", "z"}); + ASSERT_FALSE(Lookup("b")); + ValidateLRUList({"d", "e", "x", "y", "z"}); + ASSERT_TRUE(Lookup("e")); + ValidateLRUList({"d", "x", "y", "z", "e"}); + ASSERT_TRUE(Lookup("z")); + ValidateLRUList({"d", "x", "y", "e", "z"}); + Erase("x"); + ValidateLRUList({"d", "y", "e", "z"}); + ASSERT_TRUE(Lookup("d")); + ValidateLRUList({"y", "e", "z", "d"}); + Insert("u"); + ValidateLRUList({"y", "e", "z", "d", "u"}); + Insert("v"); + ValidateLRUList({"e", "z", "d", "u", "v"}); +} + +TEST_F(LRUCacheTest, MidpointInsertion) { + // Allocate 2 cache entries to high-pri pool. + NewCache(5, 0.45); + + Insert("a", Cache::Priority::LOW); + Insert("b", Cache::Priority::LOW); + Insert("c", Cache::Priority::LOW); + Insert("x", Cache::Priority::HIGH); + Insert("y", Cache::Priority::HIGH); + ValidateLRUList({"a", "b", "c", "x", "y"}, 2); + + // Low-pri entries inserted to the tail of low-pri list (the midpoint). + // After lookup, it will move to the tail of the full list. + Insert("d", Cache::Priority::LOW); + ValidateLRUList({"b", "c", "d", "x", "y"}, 2); + ASSERT_TRUE(Lookup("d")); + ValidateLRUList({"b", "c", "x", "y", "d"}, 2); + + // High-pri entries will be inserted to the tail of full list. + Insert("z", Cache::Priority::HIGH); + ValidateLRUList({"c", "x", "y", "d", "z"}, 2); +} + +TEST_F(LRUCacheTest, EntriesWithPriority) { + // Allocate 2 cache entries to high-pri pool. + NewCache(5, 0.45); + + Insert("a", Cache::Priority::LOW); + Insert("b", Cache::Priority::LOW); + Insert("c", Cache::Priority::LOW); + ValidateLRUList({"a", "b", "c"}, 0); + + // Low-pri entries can take high-pri pool capacity if available + Insert("u", Cache::Priority::LOW); + Insert("v", Cache::Priority::LOW); + ValidateLRUList({"a", "b", "c", "u", "v"}, 0); + + Insert("X", Cache::Priority::HIGH); + Insert("Y", Cache::Priority::HIGH); + ValidateLRUList({"c", "u", "v", "X", "Y"}, 2); + + // High-pri entries can overflow to low-pri pool. + Insert("Z", Cache::Priority::HIGH); + ValidateLRUList({"u", "v", "X", "Y", "Z"}, 2); + + // Low-pri entries will be inserted to head of low-pri pool. + Insert("a", Cache::Priority::LOW); + ValidateLRUList({"v", "X", "a", "Y", "Z"}, 2); + + // Low-pri entries will be inserted to head of high-pri pool after lookup. + ASSERT_TRUE(Lookup("v")); + ValidateLRUList({"X", "a", "Y", "Z", "v"}, 2); + + // High-pri entries will be inserted to the head of the list after lookup. + ASSERT_TRUE(Lookup("X")); + ValidateLRUList({"a", "Y", "Z", "v", "X"}, 2); + ASSERT_TRUE(Lookup("Z")); + ValidateLRUList({"a", "Y", "v", "X", "Z"}, 2); + + Erase("Y"); + ValidateLRUList({"a", "v", "X", "Z"}, 2); + Erase("X"); + ValidateLRUList({"a", "v", "Z"}, 1); + Insert("d", Cache::Priority::LOW); + Insert("e", Cache::Priority::LOW); + ValidateLRUList({"a", "v", "d", "e", "Z"}, 1); + Insert("f", Cache::Priority::LOW); + Insert("g", Cache::Priority::LOW); + ValidateLRUList({"d", "e", "f", "g", "Z"}, 1); + ASSERT_TRUE(Lookup("d")); + ValidateLRUList({"e", "f", "g", "Z", "d"}, 2); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/cache/sharded_cache.cc b/src/rocksdb/cache/sharded_cache.cc new file mode 100644 index 00000000..a48a3218 --- /dev/null +++ b/src/rocksdb/cache/sharded_cache.cc @@ -0,0 +1,166 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "cache/sharded_cache.h" + +#include <string> + +#include "util/mutexlock.h" + +namespace rocksdb { + +ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit, + std::shared_ptr<MemoryAllocator> allocator) + : Cache(std::move(allocator)), + num_shard_bits_(num_shard_bits), + capacity_(capacity), + strict_capacity_limit_(strict_capacity_limit), + last_id_(1) {} + +void ShardedCache::SetCapacity(size_t capacity) { + int num_shards = 1 << num_shard_bits_; + const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; + MutexLock l(&capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetCapacity(per_shard); + } + capacity_ = capacity; +} + +void ShardedCache::SetStrictCapacityLimit(bool strict_capacity_limit) { + int num_shards = 1 << num_shard_bits_; + MutexLock l(&capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetStrictCapacityLimit(strict_capacity_limit); + } + strict_capacity_limit_ = strict_capacity_limit; +} + +Status ShardedCache::Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Insert(key, hash, value, charge, deleter, handle, priority); +} + +Cache::Handle* ShardedCache::Lookup(const Slice& key, Statistics* /*stats*/) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash))->Lookup(key, hash); +} + +bool ShardedCache::Ref(Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Ref(handle); +} + +bool ShardedCache::Release(Handle* handle, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, force_erase); +} + +void ShardedCache::Erase(const Slice& key) { + uint32_t hash = HashSlice(key); + GetShard(Shard(hash))->Erase(key, hash); +} + +uint64_t ShardedCache::NewId() { + return last_id_.fetch_add(1, std::memory_order_relaxed); +} + +size_t ShardedCache::GetCapacity() const { + MutexLock l(&capacity_mutex_); + return capacity_; +} + +bool ShardedCache::HasStrictCapacityLimit() const { + MutexLock l(&capacity_mutex_); + return strict_capacity_limit_; +} + +size_t ShardedCache::GetUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetUsage(); + } + return usage; +} + +size_t ShardedCache::GetUsage(Handle* handle) const { + return GetCharge(handle); +} + +size_t ShardedCache::GetPinnedUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetPinnedUsage(); + } + return usage; +} + +void ShardedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->ApplyToAllCacheEntries(callback, thread_safe); + } +} + +void ShardedCache::EraseUnRefEntries() { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->EraseUnRefEntries(); + } +} + +std::string ShardedCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + MutexLock l(&capacity_mutex_); + snprintf(buffer, kBufferSize, " capacity : %" ROCKSDB_PRIszt "\n", + capacity_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " num_shard_bits : %d\n", num_shard_bits_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " strict_capacity_limit : %d\n", + strict_capacity_limit_); + ret.append(buffer); + } + snprintf(buffer, kBufferSize, " memory_allocator : %s\n", + memory_allocator() ? memory_allocator()->Name() : "None"); + ret.append(buffer); + ret.append(GetShard(0)->GetPrintableOptions()); + return ret; +} +int GetDefaultCacheShardBits(size_t capacity) { + int num_shard_bits = 0; + size_t min_shard_size = 512L * 1024L; // Every shard is at least 512KB. + size_t num_shards = capacity / min_shard_size; + while (num_shards >>= 1) { + if (++num_shard_bits >= 6) { + // No more than 6. + return num_shard_bits; + } + } + return num_shard_bits; +} + +} // namespace rocksdb diff --git a/src/rocksdb/cache/sharded_cache.h b/src/rocksdb/cache/sharded_cache.h new file mode 100644 index 00000000..920898b8 --- /dev/null +++ b/src/rocksdb/cache/sharded_cache.h @@ -0,0 +1,103 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include <atomic> +#include <string> + +#include "port/port.h" +#include "rocksdb/cache.h" +#include "util/hash.h" + +namespace rocksdb { + +// Single cache shard interface. +class CacheShard { + public: + CacheShard() = default; + virtual ~CacheShard() = default; + + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, Cache::Priority priority) = 0; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0; + virtual bool Ref(Cache::Handle* handle) = 0; + virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0; + virtual void Erase(const Slice& key, uint32_t hash) = 0; + virtual void SetCapacity(size_t capacity) = 0; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; + virtual size_t GetUsage() const = 0; + virtual size_t GetPinnedUsage() const = 0; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) = 0; + virtual void EraseUnRefEntries() = 0; + virtual std::string GetPrintableOptions() const { return ""; } +}; + +// Generic cache interface which shards cache by hash of keys. 2^num_shard_bits +// shards will be created, with capacity split evenly to each of the shards. +// Keys are sharded by the highest num_shard_bits bits of hash value. +class ShardedCache : public Cache { + public: + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, + std::shared_ptr<MemoryAllocator> memory_allocator = nullptr); + virtual ~ShardedCache() = default; + virtual const char* Name() const override = 0; + virtual CacheShard* GetShard(int shard) = 0; + virtual const CacheShard* GetShard(int shard) const = 0; + virtual void* Value(Handle* handle) override = 0; + virtual size_t GetCharge(Handle* handle) const = 0; + virtual uint32_t GetHash(Handle* handle) const = 0; + virtual void DisownData() override = 0; + + virtual void SetCapacity(size_t capacity) override; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + virtual Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle, Priority priority) override; + virtual Handle* Lookup(const Slice& key, Statistics* stats) override; + virtual bool Ref(Handle* handle) override; + virtual bool Release(Handle* handle, bool force_erase = false) override; + virtual void Erase(const Slice& key) override; + virtual uint64_t NewId() override; + virtual size_t GetCapacity() const override; + virtual bool HasStrictCapacityLimit() const override; + virtual size_t GetUsage() const override; + virtual size_t GetUsage(Handle* handle) const override; + virtual size_t GetPinnedUsage() const override; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + virtual void EraseUnRefEntries() override; + virtual std::string GetPrintableOptions() const override; + + int GetNumShardBits() const { return num_shard_bits_; } + + private: + static inline uint32_t HashSlice(const Slice& s) { + return static_cast<uint32_t>(GetSliceNPHash64(s)); + } + + uint32_t Shard(uint32_t hash) { + // Note, hash >> 32 yields hash in gcc, not the zero we expect! + return (num_shard_bits_ > 0) ? (hash >> (32 - num_shard_bits_)) : 0; + } + + int num_shard_bits_; + mutable port::Mutex capacity_mutex_; + size_t capacity_; + bool strict_capacity_limit_; + std::atomic<uint64_t> last_id_; +}; + +extern int GetDefaultCacheShardBits(size_t capacity); + +} // namespace rocksdb |