From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/cache/compressed_secondary_cache.cc | 325 ++++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 src/rocksdb/cache/compressed_secondary_cache.cc (limited to 'src/rocksdb/cache/compressed_secondary_cache.cc') diff --git a/src/rocksdb/cache/compressed_secondary_cache.cc b/src/rocksdb/cache/compressed_secondary_cache.cc new file mode 100644 index 000000000..7d1bdc789 --- /dev/null +++ b/src/rocksdb/cache/compressed_secondary_cache.cc @@ -0,0 +1,325 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/compressed_secondary_cache.h" + +#include +#include +#include + +#include "memory/memory_allocator.h" +#include "monitoring/perf_context_imp.h" +#include "util/compression.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +CompressedSecondaryCache::CompressedSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, double low_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy, + CompressionType compression_type, uint32_t compress_format_version, + bool enable_custom_split_merge) + : cache_options_(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, + use_adaptive_mutex, metadata_charge_policy, + compression_type, compress_format_version, + enable_custom_split_merge) { + cache_ = + NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, + metadata_charge_policy, low_pri_pool_ratio); +} + +CompressedSecondaryCache::~CompressedSecondaryCache() { cache_.reset(); } + +std::unique_ptr CompressedSecondaryCache::Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/, + bool advise_erase, bool& is_in_sec_cache) { + std::unique_ptr handle; + is_in_sec_cache = false; + Cache::Handle* lru_handle = cache_->Lookup(key); + if (lru_handle == nullptr) { + return nullptr; + } + + void* handle_value = cache_->Value(lru_handle); + if (handle_value == nullptr) { + cache_->Release(lru_handle, /*erase_if_last_ref=*/false); + return nullptr; + } + + CacheAllocationPtr* ptr{nullptr}; + CacheAllocationPtr merged_value; + size_t handle_value_charge{0}; + if (cache_options_.enable_custom_split_merge) { + CacheValueChunk* value_chunk_ptr = + reinterpret_cast(handle_value); + merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge); + ptr = &merged_value; + } else { + ptr = reinterpret_cast(handle_value); + handle_value_charge = cache_->GetCharge(lru_handle); + } + + Status s; + void* value{nullptr}; + size_t charge{0}; + if (cache_options_.compression_type == kNoCompression) { + s = create_cb(ptr->get(), handle_value_charge, &value, &charge); + } else { + UncompressionContext uncompression_context(cache_options_.compression_type); + UncompressionInfo uncompression_info(uncompression_context, + UncompressionDict::GetEmptyDict(), + cache_options_.compression_type); + + size_t uncompressed_size{0}; + CacheAllocationPtr uncompressed = UncompressData( + uncompression_info, (char*)ptr->get(), handle_value_charge, + &uncompressed_size, cache_options_.compress_format_version, + cache_options_.memory_allocator.get()); + + if (!uncompressed) { + cache_->Release(lru_handle, /*erase_if_last_ref=*/true); + return nullptr; + } + s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge); + } + + if (!s.ok()) { + cache_->Release(lru_handle, /*erase_if_last_ref=*/true); + return nullptr; + } + + if (advise_erase) { + cache_->Release(lru_handle, /*erase_if_last_ref=*/true); + // Insert a dummy handle. + cache_ + ->Insert(key, /*value=*/nullptr, /*charge=*/0, + GetDeletionCallback(cache_options_.enable_custom_split_merge)) + .PermitUncheckedError(); + } else { + is_in_sec_cache = true; + cache_->Release(lru_handle, /*erase_if_last_ref=*/false); + } + handle.reset(new CompressedSecondaryCacheResultHandle(value, charge)); + return handle; +} + +Status CompressedSecondaryCache::Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) { + if (value == nullptr) { + return Status::InvalidArgument(); + } + + Cache::Handle* lru_handle = cache_->Lookup(key); + Cache::DeleterFn del_cb = + GetDeletionCallback(cache_options_.enable_custom_split_merge); + if (lru_handle == nullptr) { + PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1); + // Insert a dummy handle if the handle is evicted for the first time. + return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, del_cb); + } else { + cache_->Release(lru_handle, /*erase_if_last_ref=*/false); + } + + size_t size = (*helper->size_cb)(value); + CacheAllocationPtr ptr = + AllocateBlock(size, cache_options_.memory_allocator.get()); + + Status s = (*helper->saveto_cb)(value, 0, size, ptr.get()); + if (!s.ok()) { + return s; + } + Slice val(ptr.get(), size); + + std::string compressed_val; + if (cache_options_.compression_type != kNoCompression) { + PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size); + CompressionOptions compression_opts; + CompressionContext compression_context(cache_options_.compression_type); + uint64_t sample_for_compression{0}; + CompressionInfo compression_info( + compression_opts, compression_context, CompressionDict::GetEmptyDict(), + cache_options_.compression_type, sample_for_compression); + + bool success = + CompressData(val, compression_info, + cache_options_.compress_format_version, &compressed_val); + + if (!success) { + return Status::Corruption("Error compressing value."); + } + + val = Slice(compressed_val); + size = compressed_val.size(); + PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size); + + if (!cache_options_.enable_custom_split_merge) { + ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); + memcpy(ptr.get(), compressed_val.data(), size); + } + } + + PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1); + if (cache_options_.enable_custom_split_merge) { + size_t charge{0}; + CacheValueChunk* value_chunks_head = + SplitValueIntoChunks(val, cache_options_.compression_type, charge); + return cache_->Insert(key, value_chunks_head, charge, del_cb); + } else { + CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); + return cache_->Insert(key, buf, size, del_cb); + } +} + +void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } + +Status CompressedSecondaryCache::SetCapacity(size_t capacity) { + MutexLock l(&capacity_mutex_); + cache_options_.capacity = capacity; + cache_->SetCapacity(capacity); + return Status::OK(); +} + +Status CompressedSecondaryCache::GetCapacity(size_t& capacity) { + MutexLock l(&capacity_mutex_); + capacity = cache_options_.capacity; + return Status::OK(); +} + +std::string CompressedSecondaryCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize{200}; + char buffer[kBufferSize]; + ret.append(cache_->GetPrintableOptions()); + snprintf(buffer, kBufferSize, " compression_type : %s\n", + CompressionTypeToString(cache_options_.compression_type).c_str()); + ret.append(buffer); + snprintf(buffer, kBufferSize, " compress_format_version : %d\n", + cache_options_.compress_format_version); + ret.append(buffer); + return ret; +} + +CompressedSecondaryCache::CacheValueChunk* +CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value, + CompressionType compression_type, + size_t& charge) { + assert(!value.empty()); + const char* src_ptr = value.data(); + size_t src_size{value.size()}; + + CacheValueChunk dummy_head = CacheValueChunk(); + CacheValueChunk* current_chunk = &dummy_head; + // Do not split when value size is large or there is no compression. + size_t predicted_chunk_size{0}; + size_t actual_chunk_size{0}; + size_t tmp_size{0}; + while (src_size > 0) { + predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size; + auto upper = + std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(), + predicted_chunk_size); + // Do not split when value size is too small, too large, close to a bin + // size, or there is no compression. + if (upper == malloc_bin_sizes_.begin() || + upper == malloc_bin_sizes_.end() || + *upper - predicted_chunk_size < malloc_bin_sizes_.front() || + compression_type == kNoCompression) { + tmp_size = predicted_chunk_size; + } else { + tmp_size = *(--upper); + } + + CacheValueChunk* new_chunk = + reinterpret_cast(new char[tmp_size]); + current_chunk->next = new_chunk; + current_chunk = current_chunk->next; + actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1; + memcpy(current_chunk->data, src_ptr, actual_chunk_size); + current_chunk->size = actual_chunk_size; + src_ptr += actual_chunk_size; + src_size -= actual_chunk_size; + charge += tmp_size; + } + current_chunk->next = nullptr; + + return dummy_head.next; +} + +CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( + const void* chunks_head, size_t& charge) { + const CacheValueChunk* head = + reinterpret_cast(chunks_head); + const CacheValueChunk* current_chunk = head; + charge = 0; + while (current_chunk != nullptr) { + charge += current_chunk->size; + current_chunk = current_chunk->next; + } + + CacheAllocationPtr ptr = + AllocateBlock(charge, cache_options_.memory_allocator.get()); + current_chunk = head; + size_t pos{0}; + while (current_chunk != nullptr) { + memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size); + pos += current_chunk->size; + current_chunk = current_chunk->next; + } + + return ptr; +} + +Cache::DeleterFn CompressedSecondaryCache::GetDeletionCallback( + bool enable_custom_split_merge) { + if (enable_custom_split_merge) { + return [](const Slice& /*key*/, void* obj) { + CacheValueChunk* chunks_head = reinterpret_cast(obj); + while (chunks_head != nullptr) { + CacheValueChunk* tmp_chunk = chunks_head; + chunks_head = chunks_head->next; + tmp_chunk->Free(); + obj = nullptr; + }; + }; + } else { + return [](const Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + obj = nullptr; + }; + } +} + +std::shared_ptr NewCompressedSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, double low_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy, + CompressionType compression_type, uint32_t compress_format_version, + bool enable_custom_split_merge) { + return std::make_shared( + capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, + low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, + metadata_charge_policy, compression_type, compress_format_version, + enable_custom_split_merge); +} + +std::shared_ptr NewCompressedSecondaryCache( + const CompressedSecondaryCacheOptions& opts) { + // The secondary_cache is disabled for this LRUCache instance. + assert(opts.secondary_cache == nullptr); + return NewCompressedSecondaryCache( + opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, + opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator, + opts.use_adaptive_mutex, opts.metadata_charge_policy, + opts.compression_type, opts.compress_format_version, + opts.enable_custom_split_merge); +} + +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3