diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/blob/blob_source.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/blob/blob_source.cc')
-rw-r--r-- | src/rocksdb/db/blob/blob_source.cc | 488 |
1 files changed, 488 insertions, 0 deletions
diff --git a/src/rocksdb/db/blob/blob_source.cc b/src/rocksdb/db/blob/blob_source.cc new file mode 100644 index 000000000..bfade2507 --- /dev/null +++ b/src/rocksdb/db/blob/blob_source.cc @@ -0,0 +1,488 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_source.h" + +#include <cassert> +#include <string> + +#include "cache/cache_reservation_manager.h" +#include "cache/charged_cache.h" +#include "db/blob/blob_contents.h" +#include "db/blob/blob_file_reader.h" +#include "db/blob/blob_log_format.h" +#include "monitoring/statistics.h" +#include "options/cf_options.h" +#include "table/get_context.h" +#include "table/multiget_context.h" + +namespace ROCKSDB_NAMESPACE { + +BlobSource::BlobSource(const ImmutableOptions* immutable_options, + const std::string& db_id, + const std::string& db_session_id, + BlobFileCache* blob_file_cache) + : db_id_(db_id), + db_session_id_(db_session_id), + statistics_(immutable_options->statistics.get()), + blob_file_cache_(blob_file_cache), + blob_cache_(immutable_options->blob_cache), + lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) { +#ifndef ROCKSDB_LITE + auto bbto = + immutable_options->table_factory->GetOptions<BlockBasedTableOptions>(); + if (bbto && + bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache) + .charged == CacheEntryRoleOptions::Decision::kEnabled) { + blob_cache_ = std::make_shared<ChargedCache>(immutable_options->blob_cache, + bbto->block_cache); + } +#endif // ROCKSDB_LITE +} + +BlobSource::~BlobSource() = default; + +Status BlobSource::GetBlobFromCache( + const Slice& cache_key, CacheHandleGuard<BlobContents>* cached_blob) const { + assert(blob_cache_); + assert(!cache_key.empty()); + assert(cached_blob); + assert(cached_blob->IsEmpty()); + + Cache::Handle* cache_handle = nullptr; + cache_handle = GetEntryFromCache(cache_key); + if (cache_handle != nullptr) { + *cached_blob = + CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle); + + assert(cached_blob->GetValue()); + + PERF_COUNTER_ADD(blob_cache_hit_count, 1); + RecordTick(statistics_, BLOB_DB_CACHE_HIT); + RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ, + cached_blob->GetValue()->size()); + + return Status::OK(); + } + + RecordTick(statistics_, BLOB_DB_CACHE_MISS); + + return Status::NotFound("Blob not found in cache"); +} + +Status BlobSource::PutBlobIntoCache( + const Slice& cache_key, std::unique_ptr<BlobContents>* blob, + CacheHandleGuard<BlobContents>* cached_blob) const { + assert(blob_cache_); + assert(!cache_key.empty()); + assert(blob); + assert(*blob); + assert(cached_blob); + assert(cached_blob->IsEmpty()); + + Cache::Handle* cache_handle = nullptr; + const Status s = InsertEntryIntoCache(cache_key, blob->get(), + (*blob)->ApproximateMemoryUsage(), + &cache_handle, Cache::Priority::BOTTOM); + if (s.ok()) { + blob->release(); + + assert(cache_handle != nullptr); + *cached_blob = + CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle); + + assert(cached_blob->GetValue()); + + RecordTick(statistics_, BLOB_DB_CACHE_ADD); + RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE, + cached_blob->GetValue()->size()); + + } else { + RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES); + } + + return s; +} + +Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const { + Cache::Handle* cache_handle = nullptr; + + if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) { + Cache::CreateCallback create_cb = + [allocator = blob_cache_->memory_allocator()]( + const void* buf, size_t size, void** out_obj, + size_t* charge) -> Status { + return BlobContents::CreateCallback(AllocateBlock(size, allocator), buf, + size, out_obj, charge); + }; + + cache_handle = blob_cache_->Lookup(key, BlobContents::GetCacheItemHelper(), + create_cb, Cache::Priority::BOTTOM, + true /* wait_for_cache */, statistics_); + } else { + cache_handle = blob_cache_->Lookup(key, statistics_); + } + + return cache_handle; +} + +void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob, + PinnableSlice* value) { + assert(cached_blob); + assert(cached_blob->GetValue()); + assert(value); + + // To avoid copying the cached blob into the buffer provided by the + // application, we can simply transfer ownership of the cache handle to + // the target PinnableSlice. This has the potential to save a lot of + // CPU, especially with large blob values. + + value->Reset(); + + constexpr Cleanable* cleanable = nullptr; + value->PinSlice(cached_blob->GetValue()->data(), cleanable); + + cached_blob->TransferTo(value); +} + +void BlobSource::PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob, + PinnableSlice* value) { + assert(owned_blob); + assert(*owned_blob); + assert(value); + + BlobContents* const blob = owned_blob->release(); + assert(blob); + + value->Reset(); + value->PinSlice( + blob->data(), + [](void* arg1, void* /* arg2 */) { + delete static_cast<BlobContents*>(arg1); + }, + blob, nullptr); +} + +Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value, + size_t charge, + Cache::Handle** cache_handle, + Cache::Priority priority) const { + Status s; + + Cache::CacheItemHelper* const cache_item_helper = + BlobContents::GetCacheItemHelper(); + assert(cache_item_helper); + + if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) { + s = blob_cache_->Insert(key, value, cache_item_helper, charge, cache_handle, + priority); + } else { + s = blob_cache_->Insert(key, value, charge, cache_item_helper->del_cb, + cache_handle, priority); + } + + return s; +} + +Status BlobSource::GetBlob(const ReadOptions& read_options, + const Slice& user_key, uint64_t file_number, + uint64_t offset, uint64_t file_size, + uint64_t value_size, + CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* value, uint64_t* bytes_read) { + assert(value); + + Status s; + + const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); + + CacheHandleGuard<BlobContents> blob_handle; + + // First, try to get the blob from the cache + // + // If blob cache is enabled, we'll try to read from it. + if (blob_cache_) { + Slice key = cache_key.AsSlice(); + s = GetBlobFromCache(key, &blob_handle); + if (s.ok()) { + PinCachedBlob(&blob_handle, value); + + // For consistency, the size of on-disk (possibly compressed) blob record + // is assigned to bytes_read. + uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader( + user_key.size()) + : 0; + assert(offset >= adjustment); + + uint64_t record_size = value_size + adjustment; + if (bytes_read) { + *bytes_read = record_size; + } + return s; + } + } + + assert(blob_handle.IsEmpty()); + + const bool no_io = read_options.read_tier == kBlockCacheTier; + if (no_io) { + s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + return s; + } + + // Can't find the blob from the cache. Since I/O is allowed, read from the + // file. + std::unique_ptr<BlobContents> blob_contents; + + { + CacheHandleGuard<BlobFileReader> blob_file_reader; + s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); + if (!s.ok()) { + return s; + } + + assert(blob_file_reader.GetValue()); + + if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) { + return Status::Corruption("Compression type mismatch when reading blob"); + } + + MemoryAllocator* const allocator = (blob_cache_ && read_options.fill_cache) + ? blob_cache_->memory_allocator() + : nullptr; + + uint64_t read_size = 0; + s = blob_file_reader.GetValue()->GetBlob( + read_options, user_key, offset, value_size, compression_type, + prefetch_buffer, allocator, &blob_contents, &read_size); + if (!s.ok()) { + return s; + } + if (bytes_read) { + *bytes_read = read_size; + } + } + + if (blob_cache_ && read_options.fill_cache) { + // If filling cache is allowed and a cache is configured, try to put the + // blob to the cache. + Slice key = cache_key.AsSlice(); + s = PutBlobIntoCache(key, &blob_contents, &blob_handle); + if (!s.ok()) { + return s; + } + + PinCachedBlob(&blob_handle, value); + } else { + PinOwnedBlob(&blob_contents, value); + } + + assert(s.ok()); + return s; +} + +void BlobSource::MultiGetBlob(const ReadOptions& read_options, + autovector<BlobFileReadRequests>& blob_reqs, + uint64_t* bytes_read) { + assert(blob_reqs.size() > 0); + + uint64_t total_bytes_read = 0; + uint64_t bytes_read_in_file = 0; + + for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) { + // sort blob_reqs_in_file by file offset. + std::sort( + blob_reqs_in_file.begin(), blob_reqs_in_file.end(), + [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool { + return lhs.offset < rhs.offset; + }); + + MultiGetBlobFromOneFile(read_options, file_number, file_size, + blob_reqs_in_file, &bytes_read_in_file); + + total_bytes_read += bytes_read_in_file; + } + + if (bytes_read) { + *bytes_read = total_bytes_read; + } +} + +void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, + uint64_t file_number, + uint64_t /*file_size*/, + autovector<BlobReadRequest>& blob_reqs, + uint64_t* bytes_read) { + const size_t num_blobs = blob_reqs.size(); + assert(num_blobs > 0); + assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); + +#ifndef NDEBUG + for (size_t i = 0; i < num_blobs - 1; ++i) { + assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset); + } +#endif // !NDEBUG + + using Mask = uint64_t; + Mask cache_hit_mask = 0; + + uint64_t total_bytes = 0; + const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number); + + if (blob_cache_) { + size_t cached_blob_count = 0; + for (size_t i = 0; i < num_blobs; ++i) { + auto& req = blob_reqs[i]; + + CacheHandleGuard<BlobContents> blob_handle; + const CacheKey cache_key = base_cache_key.WithOffset(req.offset); + const Slice key = cache_key.AsSlice(); + + const Status s = GetBlobFromCache(key, &blob_handle); + + if (s.ok()) { + assert(req.status); + *req.status = s; + + PinCachedBlob(&blob_handle, req.result); + + // Update the counter for the number of valid blobs read from the cache. + ++cached_blob_count; + + // For consistency, the size of each on-disk (possibly compressed) blob + // record is accumulated to total_bytes. + uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader( + req.user_key->size()) + : 0; + assert(req.offset >= adjustment); + total_bytes += req.len + adjustment; + cache_hit_mask |= (Mask{1} << i); // cache hit + } + } + + // All blobs were read from the cache. + if (cached_blob_count == num_blobs) { + if (bytes_read) { + *bytes_read = total_bytes; + } + return; + } + } + + const bool no_io = read_options.read_tier == kBlockCacheTier; + if (no_io) { + for (size_t i = 0; i < num_blobs; ++i) { + if (!(cache_hit_mask & (Mask{1} << i))) { + BlobReadRequest& req = blob_reqs[i]; + assert(req.status); + + *req.status = + Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + } + } + return; + } + + { + // Find the rest of blobs from the file since I/O is allowed. + autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>> + _blob_reqs; + uint64_t _bytes_read = 0; + + for (size_t i = 0; i < num_blobs; ++i) { + if (!(cache_hit_mask & (Mask{1} << i))) { + _blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr<BlobContents>()); + } + } + + CacheHandleGuard<BlobFileReader> blob_file_reader; + Status s = + blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); + if (!s.ok()) { + for (size_t i = 0; i < _blob_reqs.size(); ++i) { + BlobReadRequest* const req = _blob_reqs[i].first; + assert(req); + assert(req->status); + + *req->status = s; + } + return; + } + + assert(blob_file_reader.GetValue()); + + MemoryAllocator* const allocator = (blob_cache_ && read_options.fill_cache) + ? blob_cache_->memory_allocator() + : nullptr; + + blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator, + _blob_reqs, &_bytes_read); + + if (blob_cache_ && read_options.fill_cache) { + // If filling cache is allowed and a cache is configured, try to put + // the blob(s) to the cache. + for (auto& [req, blob_contents] : _blob_reqs) { + assert(req); + + if (req->status->ok()) { + CacheHandleGuard<BlobContents> blob_handle; + const CacheKey cache_key = base_cache_key.WithOffset(req->offset); + const Slice key = cache_key.AsSlice(); + s = PutBlobIntoCache(key, &blob_contents, &blob_handle); + if (!s.ok()) { + *req->status = s; + } else { + PinCachedBlob(&blob_handle, req->result); + } + } + } + } else { + for (auto& [req, blob_contents] : _blob_reqs) { + assert(req); + + if (req->status->ok()) { + PinOwnedBlob(&blob_contents, req->result); + } + } + } + + total_bytes += _bytes_read; + if (bytes_read) { + *bytes_read = total_bytes; + } + } +} + +bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, + uint64_t offset, size_t* charge) const { + const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); + const Slice key = cache_key.AsSlice(); + + CacheHandleGuard<BlobContents> blob_handle; + const Status s = GetBlobFromCache(key, &blob_handle); + + if (s.ok() && blob_handle.GetValue() != nullptr) { + if (charge) { + const Cache* const cache = blob_handle.GetCache(); + assert(cache); + + Cache::Handle* const handle = blob_handle.GetCacheHandle(); + assert(handle); + + *charge = cache->GetUsage(handle); + } + + return true; + } + + return false; +} + +} // namespace ROCKSDB_NAMESPACE |