diff options
Diffstat (limited to 'src/kv/rocksdb_cache')
-rw-r--r-- | src/kv/rocksdb_cache/BinnedLRUCache.cc | 624 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/BinnedLRUCache.h | 336 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/ShardedCache.cc | 159 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/ShardedCache.h | 141 |
4 files changed, 1260 insertions, 0 deletions
diff --git a/src/kv/rocksdb_cache/BinnedLRUCache.cc b/src/kv/rocksdb_cache/BinnedLRUCache.cc new file mode 100644 index 000000000..0d657883e --- /dev/null +++ b/src/kv/rocksdb_cache/BinnedLRUCache.cc @@ -0,0 +1,624 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "BinnedLRUCache.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string> + +#define dout_context cct +#define dout_subsys ceph_subsys_rocksdb +#undef dout_prefix +#define dout_prefix *_dout << "rocksdb: " + +namespace rocksdb_cache { + +BinnedLRUHandleTable::BinnedLRUHandleTable() : list_(nullptr), length_(0), elems_(0) { + Resize(); +} + +BinnedLRUHandleTable::~BinnedLRUHandleTable() { + ApplyToAllCacheEntries([](BinnedLRUHandle* h) { + if (h->refs == 1) { + h->Free(); + } + }); + delete[] list_; +} + +BinnedLRUHandle* BinnedLRUHandleTable::Lookup(const rocksdb::Slice& key, uint32_t hash) { + return *FindPointer(key, hash); +} + +BinnedLRUHandle* BinnedLRUHandleTable::Insert(BinnedLRUHandle* h) { + BinnedLRUHandle** ptr = FindPointer(h->key(), h->hash); + BinnedLRUHandle* old = *ptr; + h->next_hash = (old == nullptr ? nullptr : old->next_hash); + *ptr = h; + if (old == nullptr) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; +} + +BinnedLRUHandle* BinnedLRUHandleTable::Remove(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle** ptr = FindPointer(key, hash); + BinnedLRUHandle* result = *ptr; + if (result != nullptr) { + *ptr = result->next_hash; + --elems_; + } + return result; +} + +BinnedLRUHandle** BinnedLRUHandleTable::FindPointer(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; +} + +void BinnedLRUHandleTable::Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + BinnedLRUHandle** new_list = new BinnedLRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + BinnedLRUHandle* h = list_[i]; + while (h != nullptr) { + BinnedLRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + BinnedLRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + ceph_assert(elems_ == count); + delete[] list_; + list_ = new_list; + length_ = new_length; +} + +BinnedLRUCacheShard::BinnedLRUCacheShard(CephContext *c, size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio) + : cct(c), + capacity_(0), + high_pri_pool_usage_(0), + strict_capacity_limit_(strict_capacity_limit), + high_pri_pool_ratio_(high_pri_pool_ratio), + high_pri_pool_capacity_(0), + usage_(0), + lru_usage_(0) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; + lru_low_pri_ = &lru_; + SetCapacity(capacity); +} + +BinnedLRUCacheShard::~BinnedLRUCacheShard() {} + +bool BinnedLRUCacheShard::Unref(BinnedLRUHandle* e) { + ceph_assert(e->refs > 0); + e->refs--; + return e->refs == 0; +} + +// Call deleter and free + +void BinnedLRUCacheShard::EraseUnRefEntries() { + ceph::autovector<BinnedLRUHandle*> last_reference_list; + { + std::lock_guard<std::mutex> l(mutex_); + while (lru_.next != &lru_) { + BinnedLRUHandle* old = lru_.next; + ceph_assert(old->InCache()); + ceph_assert(old->refs == + 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + last_reference_list.push_back(old); + } + } + + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void BinnedLRUCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.lock(); + } + table_.ApplyToAllCacheEntries( + [callback](BinnedLRUHandle* h) { callback(h->value, h->charge); }); + if (thread_safe) { + mutex_.unlock(); + } +} + +void BinnedLRUCacheShard::TEST_GetLRUList(BinnedLRUHandle** lru, BinnedLRUHandle** lru_low_pri) { + *lru = &lru_; + *lru_low_pri = lru_low_pri_; +} + +size_t BinnedLRUCacheShard::TEST_GetLRUSize() { + BinnedLRUHandle* lru_handle = lru_.next; + size_t lru_size = 0; + while (lru_handle != &lru_) { + lru_size++; + lru_handle = lru_handle->next; + } + return lru_size; +} + +double BinnedLRUCacheShard::GetHighPriPoolRatio() const { + std::lock_guard<std::mutex> l(mutex_); + return high_pri_pool_ratio_; +} + +size_t BinnedLRUCacheShard::GetHighPriPoolUsage() const { + std::lock_guard<std::mutex> l(mutex_); + return high_pri_pool_usage_; +} + +void BinnedLRUCacheShard::LRU_Remove(BinnedLRUHandle* e) { + ceph_assert(e->next != nullptr); + ceph_assert(e->prev != nullptr); + if (lru_low_pri_ == e) { + lru_low_pri_ = e->prev; + } + e->next->prev = e->prev; + e->prev->next = e->next; + e->prev = e->next = nullptr; + lru_usage_ -= e->charge; + if (e->InHighPriPool()) { + ceph_assert(high_pri_pool_usage_ >= e->charge); + high_pri_pool_usage_ -= e->charge; + } +} + +void BinnedLRUCacheShard::LRU_Insert(BinnedLRUHandle* e) { + ceph_assert(e->next == nullptr); + ceph_assert(e->prev == nullptr); + if (high_pri_pool_ratio_ > 0 && e->IsHighPri()) { + // Inset "e" to head of LRU list. + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(true); + high_pri_pool_usage_ += e->charge; + MaintainPoolSize(); + } else { + // Insert "e" to the head of low-pri pool. Note that when + // high_pri_pool_ratio is 0, head of low-pri pool is also head of LRU list. + e->next = lru_low_pri_->next; + e->prev = lru_low_pri_; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(false); + lru_low_pri_ = e; + } + lru_usage_ += e->charge; +} + +void BinnedLRUCacheShard::MaintainPoolSize() { + while (high_pri_pool_usage_ > high_pri_pool_capacity_) { + // Overflow last entry in high-pri pool to low-pri pool. + lru_low_pri_ = lru_low_pri_->next; + ceph_assert(lru_low_pri_ != &lru_); + lru_low_pri_->SetInHighPriPool(false); + high_pri_pool_usage_ -= lru_low_pri_->charge; + } +} + +void BinnedLRUCacheShard::EvictFromLRU(size_t charge, + ceph::autovector<BinnedLRUHandle*>* deleted) { + while (usage_ + charge > capacity_ && lru_.next != &lru_) { + BinnedLRUHandle* old = lru_.next; + ceph_assert(old->InCache()); + ceph_assert(old->refs == 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + deleted->push_back(old); + } +} + +void BinnedLRUCacheShard::SetCapacity(size_t capacity) { + ceph::autovector<BinnedLRUHandle*> last_reference_list; + { + std::lock_guard<std::mutex> l(mutex_); + capacity_ = capacity; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + EvictFromLRU(0, &last_reference_list); + } + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void BinnedLRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + std::lock_guard<std::mutex> l(mutex_); + strict_capacity_limit_ = strict_capacity_limit; +} + +rocksdb::Cache::Handle* BinnedLRUCacheShard::Lookup(const rocksdb::Slice& key, uint32_t hash) { + std::lock_guard<std::mutex> l(mutex_); + BinnedLRUHandle* e = table_.Lookup(key, hash); + if (e != nullptr) { + ceph_assert(e->InCache()); + if (e->refs == 1) { + LRU_Remove(e); + } + e->refs++; + e->SetHit(); + } + return reinterpret_cast<rocksdb::Cache::Handle*>(e); +} + +bool BinnedLRUCacheShard::Ref(rocksdb::Cache::Handle* h) { + BinnedLRUHandle* handle = reinterpret_cast<BinnedLRUHandle*>(h); + std::lock_guard<std::mutex> l(mutex_); + if (handle->InCache() && handle->refs == 1) { + LRU_Remove(handle); + } + handle->refs++; + return true; +} + +void BinnedLRUCacheShard::SetHighPriPoolRatio(double high_pri_pool_ratio) { + std::lock_guard<std::mutex> l(mutex_); + high_pri_pool_ratio_ = high_pri_pool_ratio; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + MaintainPoolSize(); +} + +bool BinnedLRUCacheShard::Release(rocksdb::Cache::Handle* handle, bool force_erase) { + if (handle == nullptr) { + return false; + } + BinnedLRUHandle* e = reinterpret_cast<BinnedLRUHandle*>(handle); + bool last_reference = false; + { + std::lock_guard<std::mutex> l(mutex_); + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (e->refs == 1 && e->InCache()) { + // The item is still in cache, and nobody else holds a reference to it + if (usage_ > capacity_ || force_erase) { + // the cache is full + // The LRU list must be empty since the cache is full + ceph_assert(!(usage_ > capacity_) || lru_.next == &lru_); + // take this opportunity and remove the item + table_.Remove(e->key(), e->hash); + e->SetInCache(false); + Unref(e); + usage_ -= e->charge; + last_reference = true; + } else { + // put the item on the list to be potentially freed + LRU_Insert(e); + } + } + } + + // free outside of mutex + if (last_reference) { + e->Free(); + } + return last_reference; +} + +rocksdb::Status BinnedLRUCacheShard::Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, rocksdb::Cache::Priority priority) { + auto e = new BinnedLRUHandle(); + rocksdb::Status s; + ceph::autovector<BinnedLRUHandle*> last_reference_list; + + e->value = value; + e->deleter = deleter; + e->charge = charge; + e->key_length = key.size(); + e->key_data = new char[e->key_length]; + e->flags = 0; + e->hash = hash; + e->refs = (handle == nullptr + ? 1 + : 2); // One from BinnedLRUCache, one for the returned handle + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + std::copy_n(key.data(), e->key_length, e->key_data); + + { + std::lock_guard<std::mutex> l(mutex_); + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(charge, &last_reference_list); + + if (usage_ - lru_usage_ + charge > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + last_reference_list.push_back(e); + } else { + delete e; + *handle = nullptr; + s = rocksdb::Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // insert into the cache + // note that the cache might get larger than its capacity if not enough + // space was freed + BinnedLRUHandle* old = table_.Insert(e); + usage_ += e->charge; + if (old != nullptr) { + old->SetInCache(false); + if (Unref(old)) { + usage_ -= old->charge; + // old is on LRU because it's in cache and its reference count + // was just 1 (Unref returned 0) + LRU_Remove(old); + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + *handle = reinterpret_cast<rocksdb::Cache::Handle*>(e); + } + s = rocksdb::Status::OK(); + } + } + + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } + + return s; +} + +void BinnedLRUCacheShard::Erase(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle* e; + bool last_reference = false; + { + std::lock_guard<std::mutex> l(mutex_); + e = table_.Remove(key, hash); + if (e != nullptr) { + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (last_reference && e->InCache()) { + LRU_Remove(e); + } + e->SetInCache(false); + } + } + + // mutex not held here + // last_reference will only be true if e != nullptr + if (last_reference) { + e->Free(); + } +} + +size_t BinnedLRUCacheShard::GetUsage() const { + std::lock_guard<std::mutex> l(mutex_); + return usage_; +} + +size_t BinnedLRUCacheShard::GetPinnedUsage() const { + std::lock_guard<std::mutex> l(mutex_); + ceph_assert(usage_ >= lru_usage_); + return usage_ - lru_usage_; +} + +std::string BinnedLRUCacheShard::GetPrintableOptions() const { + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + std::lock_guard<std::mutex> l(mutex_); + snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n", + high_pri_pool_ratio_); + } + return std::string(buffer); +} + +BinnedLRUCache::BinnedLRUCache(CephContext *c, + size_t capacity, + int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit), cct(c) { + num_shards_ = 1 << num_shard_bits; + // TODO: Switch over to use mempool + int rc = posix_memalign((void**) &shards_, + CACHE_LINE_SIZE, + sizeof(BinnedLRUCacheShard) * num_shards_); + if (rc != 0) { + throw std::bad_alloc(); + } + size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_; + for (int i = 0; i < num_shards_; i++) { + new (&shards_[i]) + BinnedLRUCacheShard(c, per_shard, strict_capacity_limit, high_pri_pool_ratio); + } +} + +BinnedLRUCache::~BinnedLRUCache() { + for (int i = 0; i < num_shards_; i++) { + shards_[i].~BinnedLRUCacheShard(); + } + aligned_free(shards_); +} + +CacheShard* BinnedLRUCache::GetShard(int shard) { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +const CacheShard* BinnedLRUCache::GetShard(int shard) const { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +void* BinnedLRUCache::Value(Handle* handle) { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->value; +} + +size_t BinnedLRUCache::GetCharge(Handle* handle) const { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->charge; +} + +uint32_t BinnedLRUCache::GetHash(Handle* handle) const { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->hash; +} + +void BinnedLRUCache::DisownData() { +// Do not drop data if compile with ASAN to suppress leak warning. +#ifndef __SANITIZE_ADDRESS__ + shards_ = nullptr; +#endif // !__SANITIZE_ADDRESS__ +} + +size_t BinnedLRUCache::TEST_GetLRUSize() { + size_t lru_size_of_all_shards = 0; + for (int i = 0; i < num_shards_; i++) { + lru_size_of_all_shards += shards_[i].TEST_GetLRUSize(); + } + return lru_size_of_all_shards; +} + +void BinnedLRUCache::SetHighPriPoolRatio(double high_pri_pool_ratio) { + for (int i = 0; i < num_shards_; i++) { + shards_[i].SetHighPriPoolRatio(high_pri_pool_ratio); + } +} + +double BinnedLRUCache::GetHighPriPoolRatio() const { + double result = 0.0; + if (num_shards_ > 0) { + result = shards_[0].GetHighPriPoolRatio(); + } + return result; +} + +size_t BinnedLRUCache::GetHighPriPoolUsage() const { + // We will not lock the cache when getting the usage from shards. + size_t usage = 0; + for (int s = 0; s < num_shards_; s++) { + usage += shards_[s].GetHighPriPoolUsage(); + } + return usage; +} + +// PriCache + +int64_t BinnedLRUCache::request_cache_bytes(PriorityCache::Priority pri, uint64_t total_cache) const +{ + int64_t assigned = get_cache_bytes(pri); + int64_t request = 0; + + switch (pri) { + // PRI0 is for rocksdb's high priority items (indexes/filters) + case PriorityCache::Priority::PRI0: + { + request = GetHighPriPoolUsage(); + break; + } + // All other cache items are currently shoved into the PRI1 priority. + case PriorityCache::Priority::PRI1: + { + request = GetUsage(); + request -= GetHighPriPoolUsage(); + break; + } + default: + break; + } + request = (request > assigned) ? request - assigned : 0; + ldout(cct, 10) << __func__ << " Priority: " << static_cast<uint32_t>(pri) + << " Request: " << request << dendl; + return request; +} + +int64_t BinnedLRUCache::commit_cache_size(uint64_t total_bytes) +{ + size_t old_bytes = GetCapacity(); + int64_t new_bytes = PriorityCache::get_chunk( + get_cache_bytes(), total_bytes); + ldout(cct, 10) << __func__ << " old: " << old_bytes + << " new: " << new_bytes << dendl; + SetCapacity((size_t) new_bytes); + + double ratio = 0; + if (new_bytes > 0) { + int64_t pri0_bytes = get_cache_bytes(PriorityCache::Priority::PRI0); + // Add 10% of the "reserved" bytes so the ratio can't get stuck at 0 + pri0_bytes += (new_bytes - get_cache_bytes()) / 10; + ratio = (double) pri0_bytes / new_bytes; + } + ldout(cct, 10) << __func__ << " High Pri Pool Ratio set to " << ratio << dendl; + SetHighPriPoolRatio(ratio); + return new_bytes; +} + +std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache( + CephContext *c, + size_t capacity, + int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) { + if (num_shard_bits >= 20) { + return nullptr; // the cache cannot be sharded into too many fine pieces + } + if (high_pri_pool_ratio < 0.0 || high_pri_pool_ratio > 1.0) { + // invalid high_pri_pool_ratio + return nullptr; + } + if (num_shard_bits < 0) { + num_shard_bits = GetDefaultCacheShardBits(capacity); + } + return std::make_shared<BinnedLRUCache>( + c, capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio); +} + +} // namespace rocksdb_cache diff --git a/src/kv/rocksdb_cache/BinnedLRUCache.h b/src/kv/rocksdb_cache/BinnedLRUCache.h new file mode 100644 index 000000000..85608be0e --- /dev/null +++ b/src/kv/rocksdb_cache/BinnedLRUCache.h @@ -0,0 +1,336 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_BINNED_LRU_CACHE +#define ROCKSDB_BINNED_LRU_CACHE + +#include <string> +#include <mutex> + +#include "ShardedCache.h" +#include "common/autovector.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "common/ceph_context.h" + +namespace rocksdb_cache { + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. +// Entries are referenced by cache and/or by any external entity. +// The cache keeps all its entries in table. Some elements +// are also stored on LRU list. +// +// BinnedLRUHandle can be in these states: +// 1. Referenced externally AND in hash table. +// In that case the entry is *not* in the LRU. (refs > 1 && in_cache == true) +// 2. Not referenced externally and in hash table. In that case the entry is +// in the LRU and can be freed. (refs == 1 && in_cache == true) +// 3. Referenced externally and not in hash table. In that case the entry is +// in not on LRU and not in table. (refs >= 1 && in_cache == false) +// +// All newly created BinnedLRUHandles are in state 1. If you call +// BinnedLRUCacheShard::Release +// on entry in state 1, it will go into state 2. To move from state 1 to +// state 3, either call BinnedLRUCacheShard::Erase or BinnedLRUCacheShard::Insert with the +// same key. +// To move from state 2 to state 1, use BinnedLRUCacheShard::Lookup. +// Before destruction, make sure that no handles are in state 1. This means +// that any successful BinnedLRUCacheShard::Lookup/BinnedLRUCacheShard::Insert have a +// matching +// RUCache::Release (to move into state 2) or BinnedLRUCacheShard::Erase (for state 3) + +std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache( + CephContext *c, + size_t capacity, + int num_shard_bits = -1, + bool strict_capacity_limit = false, + double high_pri_pool_ratio = 0.0); + +struct BinnedLRUHandle { + void* value; + void (*deleter)(const rocksdb::Slice&, void* value); + BinnedLRUHandle* next_hash; + BinnedLRUHandle* next; + BinnedLRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + size_t key_length; + uint32_t refs; // a number of refs to this entry + // cache itself is counted as 1 + + // Include the following flags: + // in_cache: whether this entry is referenced by the hash table. + // is_high_pri: whether this entry is high priority entry. + // in_high_pri_pool: whether this entry is in high-pri pool. + char flags; + + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + + char* key_data = nullptr; // Beginning of key + + rocksdb::Slice key() const { + // For cheaper lookups, we allow a temporary Handle object + // to store a pointer to a key in "value". + if (next == this) { + return *(reinterpret_cast<rocksdb::Slice*>(value)); + } else { + return rocksdb::Slice(key_data, key_length); + } + } + + bool InCache() { return flags & 1; } + bool IsHighPri() { return flags & 2; } + bool InHighPriPool() { return flags & 4; } + bool HasHit() { return flags & 8; } + + void SetInCache(bool in_cache) { + if (in_cache) { + flags |= 1; + } else { + flags &= ~1; + } + } + + void SetPriority(rocksdb::Cache::Priority priority) { + if (priority == rocksdb::Cache::Priority::HIGH) { + flags |= 2; + } else { + flags &= ~2; + } + } + + void SetInHighPriPool(bool in_high_pri_pool) { + if (in_high_pri_pool) { + flags |= 4; + } else { + flags &= ~4; + } + } + + void SetHit() { flags |= 8; } + + void Free() { + ceph_assert((refs == 1 && InCache()) || (refs == 0 && !InCache())); + if (deleter) { + (*deleter)(key(), value); + } + delete[] key_data; + delete this; + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class BinnedLRUHandleTable { + public: + BinnedLRUHandleTable(); + ~BinnedLRUHandleTable(); + + BinnedLRUHandle* Lookup(const rocksdb::Slice& key, uint32_t hash); + BinnedLRUHandle* Insert(BinnedLRUHandle* h); + BinnedLRUHandle* Remove(const rocksdb::Slice& key, uint32_t hash); + + template <typename T> + void ApplyToAllCacheEntries(T func) { + for (uint32_t i = 0; i < length_; i++) { + BinnedLRUHandle* h = list_[i]; + while (h != nullptr) { + auto n = h->next_hash; + ceph_assert(h->InCache()); + func(h); + h = n; + } + } + } + + private: + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + BinnedLRUHandle** FindPointer(const rocksdb::Slice& key, uint32_t hash); + + void Resize(); + + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + BinnedLRUHandle** list_; + uint32_t length_; + uint32_t elems_; +}; + +// A single shard of sharded cache. +class alignas(CACHE_LINE_SIZE) BinnedLRUCacheShard : public CacheShard { + public: + BinnedLRUCacheShard(CephContext *c, size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio); + virtual ~BinnedLRUCacheShard(); + + // Separate from constructor so caller can easily make an array of BinnedLRUCache + // if current usage is more than new capacity, the function will attempt to + // free the needed space + virtual void SetCapacity(size_t capacity) override; + + // Set the flag to reject insertion if cache if full. + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + // Set percentage of capacity reserved for high-pri cache entries. + void SetHighPriPoolRatio(double high_pri_pool_ratio); + + // Like Cache methods, but with an extra "hash" parameter. + virtual rocksdb::Status Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, + rocksdb::Cache::Priority priority) override; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, uint32_t hash) override; + virtual bool Ref(rocksdb::Cache::Handle* handle) override; + virtual bool Release(rocksdb::Cache::Handle* handle, + bool force_erase = false) override; + virtual void Erase(const rocksdb::Slice& key, uint32_t hash) override; + + // Although in some platforms the update of size_t is atomic, to make sure + // GetUsage() and GetPinnedUsage() work correctly under any platform, we'll + // protect them with mutex_. + + virtual size_t GetUsage() const override; + virtual size_t GetPinnedUsage() const override; + + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + virtual void EraseUnRefEntries() override; + + virtual std::string GetPrintableOptions() const override; + + void TEST_GetLRUList(BinnedLRUHandle** lru, BinnedLRUHandle** lru_low_pri); + + // Retrieves number of elements in LRU, for unit test purpose only + // not threadsafe + size_t TEST_GetLRUSize(); + + // Retrieves high pri pool ratio + double GetHighPriPoolRatio() const; + + // Retrieves high pri pool usage + size_t GetHighPriPoolUsage() const; + + private: + CephContext *cct; + void LRU_Remove(BinnedLRUHandle* e); + void LRU_Insert(BinnedLRUHandle* e); + + // Overflow the last entry in high-pri pool to low-pri pool until size of + // high-pri pool is no larger than the size specify by high_pri_pool_pct. + void MaintainPoolSize(); + + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(BinnedLRUHandle* e); + + // Free some space following strict LRU policy until enough space + // to hold (usage_ + charge) is freed or the lru list is empty + // This function is not thread safe - it needs to be executed while + // holding the mutex_ + void EvictFromLRU(size_t charge, ceph::autovector<BinnedLRUHandle*>* deleted); + + // Initialized before use. + size_t capacity_; + + // Memory size for entries in high-pri pool. + size_t high_pri_pool_usage_; + + // Whether to reject insertion if cache reaches its full capacity. + bool strict_capacity_limit_; + + // Ratio of capacity reserved for high priority cache entries. + double high_pri_pool_ratio_; + + // High-pri pool size, equals to capacity * high_pri_pool_ratio. + // Remember the value to avoid recomputing each time. + double high_pri_pool_capacity_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + // LRU contains items which can be evicted, ie reference only by cache + BinnedLRUHandle lru_; + + // Pointer to head of low-pri pool in LRU list. + BinnedLRUHandle* lru_low_pri_; + + // ------------^^^^^^^^^^^^^----------- + // Not frequently modified data members + // ------------------------------------ + // + // We separate data members that are updated frequently from the ones that + // are not frequently updated so that they don't share the same cache line + // which will lead into false cache sharing + // + // ------------------------------------ + // Frequently modified data members + // ------------vvvvvvvvvvvvv----------- + BinnedLRUHandleTable table_; + + // Memory size for entries residing in the cache + size_t usage_; + + // Memory size for entries residing only in the LRU list + size_t lru_usage_; + + // mutex_ protects the following state. + // We don't count mutex_ as the cache's internal state so semantically we + // don't mind mutex_ invoking the non-const actions. + mutable std::mutex mutex_; +}; + +class BinnedLRUCache : public ShardedCache { + public: + BinnedLRUCache(CephContext *c, size_t capacity, int num_shard_bits, + bool strict_capacity_limit, double high_pri_pool_ratio); + virtual ~BinnedLRUCache(); + virtual const char* Name() const override { return "BinnedLRUCache"; } + virtual CacheShard* GetShard(int shard) override; + virtual const CacheShard* GetShard(int shard) const override; + virtual void* Value(Handle* handle) override; + virtual size_t GetCharge(Handle* handle) const override; + virtual uint32_t GetHash(Handle* handle) const override; + virtual void DisownData() override; + + // Retrieves number of elements in LRU, for unit test purpose only + size_t TEST_GetLRUSize(); + // Sets the high pri pool ratio + void SetHighPriPoolRatio(double high_pri_pool_ratio); + // Retrieves high pri pool ratio + double GetHighPriPoolRatio() const; + // Retrieves high pri pool usage + size_t GetHighPriPoolUsage() const; + + // PriorityCache + virtual int64_t request_cache_bytes( + PriorityCache::Priority pri, uint64_t total_cache) const; + virtual int64_t commit_cache_size(uint64_t total_cache); + virtual int64_t get_committed_size() const { + return GetCapacity(); + } + virtual std::string get_cache_name() const { + return "RocksDB Binned LRU Cache"; + } + + private: + CephContext *cct; + BinnedLRUCacheShard* shards_; + int num_shards_ = 0; +}; + +} // namespace rocksdb_cache + +#endif // ROCKSDB_BINNED_LRU_CACHE diff --git a/src/kv/rocksdb_cache/ShardedCache.cc b/src/kv/rocksdb_cache/ShardedCache.cc new file mode 100644 index 000000000..367140a94 --- /dev/null +++ b/src/kv/rocksdb_cache/ShardedCache.cc @@ -0,0 +1,159 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "ShardedCache.h" + +#include <string> + +namespace rocksdb_cache { + +ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) + : num_shard_bits_(num_shard_bits), + capacity_(capacity), + strict_capacity_limit_(strict_capacity_limit), + last_id_(1) {} + +void ShardedCache::SetCapacity(size_t capacity) { + int num_shards = 1 << num_shard_bits_; + const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; + std::lock_guard<std::mutex> l(capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetCapacity(per_shard); + } + capacity_ = capacity; +} + +void ShardedCache::SetStrictCapacityLimit(bool strict_capacity_limit) { + int num_shards = 1 << num_shard_bits_; + std::lock_guard<std::mutex> l(capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetStrictCapacityLimit(strict_capacity_limit); + } + strict_capacity_limit_ = strict_capacity_limit; +} + +rocksdb::Status ShardedCache::Insert(const rocksdb::Slice& key, void* value, size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Insert(key, hash, value, charge, deleter, handle, priority); +} + +rocksdb::Cache::Handle* ShardedCache::Lookup(const rocksdb::Slice& key, rocksdb::Statistics* /*stats*/) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash))->Lookup(key, hash); +} + +bool ShardedCache::Ref(rocksdb::Cache::Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Ref(handle); +} + +bool ShardedCache::Release(rocksdb::Cache::Handle* handle, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, force_erase); +} + +void ShardedCache::Erase(const rocksdb::Slice& key) { + uint32_t hash = HashSlice(key); + GetShard(Shard(hash))->Erase(key, hash); +} + +uint64_t ShardedCache::NewId() { + return last_id_.fetch_add(1, std::memory_order_relaxed); +} + +size_t ShardedCache::GetCapacity() const { + std::lock_guard<std::mutex> l(capacity_mutex_); + return capacity_; +} + +bool ShardedCache::HasStrictCapacityLimit() const { + std::lock_guard<std::mutex> l(capacity_mutex_); + return strict_capacity_limit_; +} + +size_t ShardedCache::GetUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetUsage(); + } + return usage; +} + +size_t ShardedCache::GetUsage(rocksdb::Cache::Handle* handle) const { + return GetCharge(handle); +} + +size_t ShardedCache::GetPinnedUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetPinnedUsage(); + } + return usage; +} + +void ShardedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->ApplyToAllCacheEntries(callback, thread_safe); + } +} + +void ShardedCache::EraseUnRefEntries() { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->EraseUnRefEntries(); + } +} + +std::string ShardedCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + std::lock_guard<std::mutex> l(capacity_mutex_); + snprintf(buffer, kBufferSize, " capacity : %" ROCKSDB_PRIszt "\n", + capacity_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " num_shard_bits : %d\n", num_shard_bits_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " strict_capacity_limit : %d\n", + strict_capacity_limit_); + ret.append(buffer); + } + ret.append(GetShard(0)->GetPrintableOptions()); + return ret; +} +int GetDefaultCacheShardBits(size_t capacity) { + int num_shard_bits = 0; + size_t min_shard_size = 512L * 1024L; // Every shard is at least 512KB. + size_t num_shards = capacity / min_shard_size; + while (num_shards >>= 1) { + if (++num_shard_bits >= 6) { + // No more than 6. + return num_shard_bits; + } + } + return num_shard_bits; +} + +} // namespace rocksdb_cache diff --git a/src/kv/rocksdb_cache/ShardedCache.h b/src/kv/rocksdb_cache/ShardedCache.h new file mode 100644 index 000000000..4d64893ab --- /dev/null +++ b/src/kv/rocksdb_cache/ShardedCache.h @@ -0,0 +1,141 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_SHARDED_CACHE +#define ROCKSDB_SHARDED_CACHE + +#include <atomic> +#include <string> +#include <mutex> + +#include "rocksdb/cache.h" +#include "include/ceph_hash.h" +#include "common/PriorityCache.h" +//#include "hash.h" + +#ifndef CACHE_LINE_SIZE +#define CACHE_LINE_SIZE 64 // XXX arch-specific define +#endif +#define ROCKSDB_PRIszt "zu" + +namespace rocksdb_cache { + +// Single cache shard interface. +class CacheShard { + public: + CacheShard() = default; + virtual ~CacheShard() = default; + + virtual rocksdb::Status Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, rocksdb::Cache::Priority priority) = 0; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, uint32_t hash) = 0; + virtual bool Ref(rocksdb::Cache::Handle* handle) = 0; + virtual bool Release(rocksdb::Cache::Handle* handle, bool force_erase = false) = 0; + virtual void Erase(const rocksdb::Slice& key, uint32_t hash) = 0; + virtual void SetCapacity(size_t capacity) = 0; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; + virtual size_t GetUsage() const = 0; + virtual size_t GetPinnedUsage() const = 0; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) = 0; + virtual void EraseUnRefEntries() = 0; + virtual std::string GetPrintableOptions() const { return ""; } +}; + +// Generic cache interface which shards cache by hash of keys. 2^num_shard_bits +// shards will be created, with capacity split evenly to each of the shards. +// Keys are sharded by the highest num_shard_bits bits of hash value. +class ShardedCache : public rocksdb::Cache, public PriorityCache::PriCache { + public: + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit); + virtual ~ShardedCache() = default; + virtual const char* Name() const override = 0; + virtual CacheShard* GetShard(int shard) = 0; + virtual const CacheShard* GetShard(int shard) const = 0; + virtual void* Value(Handle* handle) override = 0; + virtual size_t GetCharge(Handle* handle) const = 0; + virtual uint32_t GetHash(Handle* handle) const = 0; + virtual void DisownData() override = 0; + + virtual void SetCapacity(size_t capacity) override; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + virtual rocksdb::Status Insert(const rocksdb::Slice& key, void* value, size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, Priority priority) override; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, rocksdb::Statistics* stats) override; + virtual bool Ref(rocksdb::Cache::Handle* handle) override; + virtual bool Release(rocksdb::Cache::Handle* handle, bool force_erase = false) override; + virtual void Erase(const rocksdb::Slice& key) override; + virtual uint64_t NewId() override; + virtual size_t GetCapacity() const override; + virtual bool HasStrictCapacityLimit() const override; + virtual size_t GetUsage() const override; + virtual size_t GetUsage(rocksdb::Cache::Handle* handle) const override; + virtual size_t GetPinnedUsage() const override; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + virtual void EraseUnRefEntries() override; + virtual std::string GetPrintableOptions() const override; + + int GetNumShardBits() const { return num_shard_bits_; } + + // PriCache + virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const { + return cache_bytes[pri]; + } + virtual int64_t get_cache_bytes() const { + int64_t total = 0; + for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) { + PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i); + total += get_cache_bytes(pri); + } + return total; + } + virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) { + cache_bytes[pri] = bytes; + } + virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) { + cache_bytes[pri] += bytes; + } + virtual double get_cache_ratio() const { + return cache_ratio; + } + virtual void set_cache_ratio(double ratio) { + cache_ratio = ratio; + } + virtual std::string get_cache_name() const = 0; + + private: + static inline uint32_t HashSlice(const rocksdb::Slice& s) { + return ceph_str_hash(CEPH_STR_HASH_RJENKINS, s.data(), s.size()); +// return Hash(s.data(), s.size(), 0); + } + + uint32_t Shard(uint32_t hash) { + // Note, hash >> 32 yields hash in gcc, not the zero we expect! + return (num_shard_bits_ > 0) ? (hash >> (32 - num_shard_bits_)) : 0; + } + + int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0}; + double cache_ratio = 0; + + int num_shard_bits_; + mutable std::mutex capacity_mutex_; + size_t capacity_; + bool strict_capacity_limit_; + std::atomic<uint64_t> last_id_; +}; + +extern int GetDefaultCacheShardBits(size_t capacity); + +} // namespace rocksdb_cache +#endif // ROCKSDB_SHARDED_CACHE |