summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/table/block_based/block_based_table_reader_sync_and_async.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/table/block_based/block_based_table_reader_sync_and_async.h760
1 files changed, 760 insertions, 0 deletions
diff --git a/src/rocksdb/table/block_based/block_based_table_reader_sync_and_async.h b/src/rocksdb/table/block_based/block_based_table_reader_sync_and_async.h
new file mode 100644
index 000000000..8c7547a2a
--- /dev/null
+++ b/src/rocksdb/table/block_based/block_based_table_reader_sync_and_async.h
@@ -0,0 +1,760 @@
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+//
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "util/async_file_reader.h"
+#include "util/coro_utils.h"
+
+#if defined(WITHOUT_COROUTINES) || \
+ (defined(USE_COROUTINES) && defined(WITH_COROUTINES))
+
+namespace ROCKSDB_NAMESPACE {
+
+// This function reads multiple data blocks from disk using Env::MultiRead()
+// and optionally inserts them into the block cache. It uses the scratch
+// buffer provided by the caller, which is contiguous. If scratch is a nullptr
+// it allocates a separate buffer for each block. Typically, if the blocks
+// need to be uncompressed and there is no compressed block cache, callers
+// can allocate a temporary scratch buffer in order to minimize memory
+// allocations.
+// If options.fill_cache is true, it inserts the blocks into cache. If its
+// false and scratch is non-null and the blocks are uncompressed, it copies
+// the buffers to heap. In any case, the CachableEntry<Block> returned will
+// own the data bytes.
+// If compression is enabled and also there is no compressed block cache,
+// the adjacent blocks are read out in one IO (combined read)
+// batch - A MultiGetRange with only those keys with unique data blocks not
+// found in cache
+// handles - A vector of block handles. Some of them me be NULL handles
+// scratch - An optional contiguous buffer to read compressed blocks into
+DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
+(const ReadOptions& options, const MultiGetRange* batch,
+ const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
+ autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
+ autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
+ char* scratch, const UncompressionDict& uncompression_dict) const {
+ RandomAccessFileReader* file = rep_->file.get();
+ const Footer& footer = rep_->footer;
+ const ImmutableOptions& ioptions = rep_->ioptions;
+ size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
+ MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
+
+ if (ioptions.allow_mmap_reads) {
+ size_t idx_in_batch = 0;
+ for (auto mget_iter = batch->begin(); mget_iter != batch->end();
+ ++mget_iter, ++idx_in_batch) {
+ BlockCacheLookupContext lookup_data_block_context(
+ TableReaderCaller::kUserMultiGet);
+ const BlockHandle& handle = (*handles)[idx_in_batch];
+ if (handle.IsNull()) {
+ continue;
+ }
+
+ (*statuses)[idx_in_batch] =
+ RetrieveBlock(nullptr, options, handle, uncompression_dict,
+ &(*results)[idx_in_batch], BlockType::kData,
+ mget_iter->get_context, &lookup_data_block_context,
+ /* for_compaction */ false, /* use_cache */ true,
+ /* wait_for_cache */ true, /* async_read */ false);
+ }
+ CO_RETURN;
+ }
+
+ // In direct IO mode, blocks share the direct io buffer.
+ // Otherwise, blocks share the scratch buffer.
+ const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
+
+ autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
+ size_t buf_offset = 0;
+ size_t idx_in_batch = 0;
+
+ uint64_t prev_offset = 0;
+ size_t prev_len = 0;
+ autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
+ autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
+ for (auto mget_iter = batch->begin(); mget_iter != batch->end();
+ ++mget_iter, ++idx_in_batch) {
+ const BlockHandle& handle = (*handles)[idx_in_batch];
+ if (handle.IsNull()) {
+ continue;
+ }
+
+ size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
+
+ // If current block is adjacent to the previous one, at the same time,
+ // compression is enabled and there is no compressed cache, we combine
+ // the two block read as one.
+ // We don't combine block reads here in direct IO mode, because when doing
+ // direct IO read, the block requests will be realigned and merged when
+ // necessary.
+ if (use_shared_buffer && !file->use_direct_io() &&
+ prev_end == handle.offset()) {
+ req_offset_for_block.emplace_back(prev_len);
+ prev_len += BlockSizeWithTrailer(handle);
+ } else {
+ // No compression or current block and previous one is not adjacent:
+ // Step 1, create a new request for previous blocks
+ if (prev_len != 0) {
+ FSReadRequest req;
+ req.offset = prev_offset;
+ req.len = prev_len;
+ if (file->use_direct_io()) {
+ req.scratch = nullptr;
+ } else if (use_shared_buffer) {
+ req.scratch = scratch + buf_offset;
+ buf_offset += req.len;
+ } else {
+ req.scratch = new char[req.len];
+ }
+ read_reqs.emplace_back(req);
+ }
+
+ // Step 2, remeber the previous block info
+ prev_offset = handle.offset();
+ prev_len = BlockSizeWithTrailer(handle);
+ req_offset_for_block.emplace_back(0);
+ }
+ req_idx_for_block.emplace_back(read_reqs.size());
+
+ PERF_COUNTER_ADD(block_read_count, 1);
+ PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle));
+ }
+ // Handle the last block and process the pending last request
+ if (prev_len != 0) {
+ FSReadRequest req;
+ req.offset = prev_offset;
+ req.len = prev_len;
+ if (file->use_direct_io()) {
+ req.scratch = nullptr;
+ } else if (use_shared_buffer) {
+ req.scratch = scratch + buf_offset;
+ } else {
+ req.scratch = new char[req.len];
+ }
+ read_reqs.emplace_back(req);
+ }
+
+ AlignedBuf direct_io_buf;
+ {
+ IOOptions opts;
+ IOStatus s = file->PrepareIOOptions(options, opts);
+ if (s.ok()) {
+#if defined(WITH_COROUTINES)
+ if (file->use_direct_io()) {
+#endif // WITH_COROUTINES
+ s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(),
+ &direct_io_buf, options.rate_limiter_priority);
+#if defined(WITH_COROUTINES)
+ } else {
+ co_await batch->context()->reader().MultiReadAsync(
+ file, opts, &read_reqs[0], read_reqs.size(), &direct_io_buf);
+ }
+#endif // WITH_COROUTINES
+ }
+ if (!s.ok()) {
+ // Discard all the results in this batch if there is any time out
+ // or overall MultiRead error
+ for (FSReadRequest& req : read_reqs) {
+ req.status = s;
+ }
+ }
+ }
+
+ idx_in_batch = 0;
+ size_t valid_batch_idx = 0;
+ for (auto mget_iter = batch->begin(); mget_iter != batch->end();
+ ++mget_iter, ++idx_in_batch) {
+ const BlockHandle& handle = (*handles)[idx_in_batch];
+
+ if (handle.IsNull()) {
+ continue;
+ }
+
+ assert(valid_batch_idx < req_idx_for_block.size());
+ assert(valid_batch_idx < req_offset_for_block.size());
+ assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
+ size_t& req_idx = req_idx_for_block[valid_batch_idx];
+ size_t& req_offset = req_offset_for_block[valid_batch_idx];
+ valid_batch_idx++;
+ FSReadRequest& req = read_reqs[req_idx];
+ Status s = req.status;
+ if (s.ok()) {
+ if ((req.result.size() != req.len) ||
+ (req_offset + BlockSizeWithTrailer(handle) > req.result.size())) {
+ s = Status::Corruption("truncated block read from " +
+ rep_->file->file_name() + " offset " +
+ std::to_string(handle.offset()) + ", expected " +
+ std::to_string(req.len) + " bytes, got " +
+ std::to_string(req.result.size()));
+ }
+ }
+
+ BlockContents serialized_block;
+ if (s.ok()) {
+ if (!use_shared_buffer) {
+ // We allocated a buffer for this block. Give ownership of it to
+ // BlockContents so it can free the memory
+ assert(req.result.data() == req.scratch);
+ assert(req.result.size() == BlockSizeWithTrailer(handle));
+ assert(req_offset == 0);
+ serialized_block =
+ BlockContents(std::unique_ptr<char[]>(req.scratch), handle.size());
+ } else {
+ // We used the scratch buffer or direct io buffer
+ // which are shared by the blocks.
+ // serialized_block does not have the ownership.
+ serialized_block =
+ BlockContents(Slice(req.result.data() + req_offset, handle.size()));
+ }
+#ifndef NDEBUG
+ serialized_block.has_trailer = true;
+#endif
+
+ if (options.verify_checksums) {
+ PERF_TIMER_GUARD(block_checksum_time);
+ const char* data = req.result.data();
+ // Since the scratch might be shared, the offset of the data block in
+ // the buffer might not be 0. req.result.data() only point to the
+ // begin address of each read request, we need to add the offset
+ // in each read request. Checksum is stored in the block trailer,
+ // beyond the payload size.
+ s = VerifyBlockChecksum(footer.checksum_type(), data + req_offset,
+ handle.size(), rep_->file->file_name(),
+ handle.offset());
+ TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
+ }
+ } else if (!use_shared_buffer) {
+ // Free the allocated scratch buffer.
+ delete[] req.scratch;
+ }
+
+ if (s.ok()) {
+ // When the blocks share the same underlying buffer (scratch or direct io
+ // buffer), we may need to manually copy the block into heap if the
+ // serialized block has to be inserted into a cache. That falls into the
+ // following cases -
+ // 1. serialized block is not compressed, it needs to be inserted into
+ // the uncompressed block cache if there is one
+ // 2. If the serialized block is compressed, it needs to be inserted
+ // into the compressed block cache if there is one
+ //
+ // In all other cases, the serialized block is either uncompressed into a
+ // heap buffer or there is no cache at all.
+ CompressionType compression_type =
+ GetBlockCompressionType(serialized_block);
+ if (use_shared_buffer && (compression_type == kNoCompression ||
+ (compression_type != kNoCompression &&
+ rep_->table_options.block_cache_compressed))) {
+ Slice serialized =
+ Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle));
+ serialized_block = BlockContents(
+ CopyBufferToHeap(GetMemoryAllocator(rep_->table_options),
+ serialized),
+ handle.size());
+#ifndef NDEBUG
+ serialized_block.has_trailer = true;
+#endif
+ }
+ }
+
+ if (s.ok()) {
+ if (options.fill_cache) {
+ BlockCacheLookupContext lookup_data_block_context(
+ TableReaderCaller::kUserMultiGet);
+ CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
+ // MaybeReadBlockAndLoadToCache will insert into the block caches if
+ // necessary. Since we're passing the serialized block contents, it
+ // will avoid looking up the block cache
+ s = MaybeReadBlockAndLoadToCache(
+ nullptr, options, handle, uncompression_dict, /*wait=*/true,
+ /*for_compaction=*/false, block_entry, BlockType::kData,
+ mget_iter->get_context, &lookup_data_block_context,
+ &serialized_block, /*async_read=*/false);
+
+ // block_entry value could be null if no block cache is present, i.e
+ // BlockBasedTableOptions::no_block_cache is true and no compressed
+ // block cache is configured. In that case, fall
+ // through and set up the block explicitly
+ if (block_entry->GetValue() != nullptr) {
+ s.PermitUncheckedError();
+ continue;
+ }
+ }
+
+ CompressionType compression_type =
+ GetBlockCompressionType(serialized_block);
+ BlockContents contents;
+ if (compression_type != kNoCompression) {
+ UncompressionContext context(compression_type);
+ UncompressionInfo info(context, uncompression_dict, compression_type);
+ s = UncompressSerializedBlock(
+ info, req.result.data() + req_offset, handle.size(), &contents,
+ footer.format_version(), rep_->ioptions, memory_allocator);
+ } else {
+ // There are two cases here:
+ // 1) caller uses the shared buffer (scratch or direct io buffer);
+ // 2) we use the requst buffer.
+ // If scratch buffer or direct io buffer is used, we ensure that
+ // all serialized blocks are copyed to the heap as single blocks. If
+ // scratch buffer is not used, we also have no combined read, so the
+ // serialized block can be used directly.
+ contents = std::move(serialized_block);
+ }
+ if (s.ok()) {
+ (*results)[idx_in_batch].SetOwnedValue(std::make_unique<Block>(
+ std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
+ }
+ }
+ (*statuses)[idx_in_batch] = s;
+ }
+}
+
+using MultiGetRange = MultiGetContext::Range;
+DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
+(const ReadOptions& read_options, const MultiGetRange* mget_range,
+ const SliceTransform* prefix_extractor, bool skip_filters) {
+ if (mget_range->empty()) {
+ // Caller should ensure non-empty (performance bug)
+ assert(false);
+ CO_RETURN; // Nothing to do
+ }
+
+ FilterBlockReader* const filter =
+ !skip_filters ? rep_->filter.get() : nullptr;
+ MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
+ mget_range->end());
+
+ // First check the full filter
+ // If full filter not useful, Then go into each block
+ const bool no_io = read_options.read_tier == kBlockCacheTier;
+ uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
+ if (sst_file_range.begin()->get_context) {
+ tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id();
+ }
+ BlockCacheLookupContext lookup_context{
+ TableReaderCaller::kUserMultiGet, tracing_mget_id,
+ /*_get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
+ FullFilterKeysMayMatch(filter, &sst_file_range, no_io, prefix_extractor,
+ &lookup_context, read_options.rate_limiter_priority);
+
+ if (!sst_file_range.empty()) {
+ IndexBlockIter iiter_on_stack;
+ // if prefix_extractor found in block differs from options, disable
+ // BlockPrefixIndex. Only do this check when index_type is kHashSearch.
+ bool need_upper_bound_check = false;
+ if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
+ need_upper_bound_check = PrefixExtractorChanged(prefix_extractor);
+ }
+ auto iiter =
+ NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
+ sst_file_range.begin()->get_context, &lookup_context);
+ std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
+ if (iiter != &iiter_on_stack) {
+ iiter_unique_ptr.reset(iiter);
+ }
+
+ uint64_t prev_offset = std::numeric_limits<uint64_t>::max();
+ autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
+ autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
+ autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
+ MultiGetContext::Mask reused_mask = 0;
+ char stack_buf[kMultiGetReadStackBufSize];
+ std::unique_ptr<char[]> block_buf;
+ {
+ MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
+ sst_file_range.end());
+ std::vector<Cache::Handle*> cache_handles;
+ bool wait_for_cache_results = false;
+
+ CachableEntry<UncompressionDict> uncompression_dict;
+ Status uncompression_dict_status;
+ uncompression_dict_status.PermitUncheckedError();
+ bool uncompression_dict_inited = false;
+ size_t total_len = 0;
+ ReadOptions ro = read_options;
+ ro.read_tier = kBlockCacheTier;
+
+ for (auto miter = data_block_range.begin();
+ miter != data_block_range.end(); ++miter) {
+ const Slice& key = miter->ikey;
+ iiter->Seek(miter->ikey);
+
+ IndexValue v;
+ if (iiter->Valid()) {
+ v = iiter->value();
+ }
+ if (!iiter->Valid() ||
+ (!v.first_internal_key.empty() && !skip_filters &&
+ UserComparatorWrapper(rep_->internal_comparator.user_comparator())
+ .CompareWithoutTimestamp(
+ ExtractUserKey(key),
+ ExtractUserKey(v.first_internal_key)) < 0)) {
+ // The requested key falls between highest key in previous block and
+ // lowest key in current block.
+ if (!iiter->status().IsNotFound()) {
+ *(miter->s) = iiter->status();
+ }
+ data_block_range.SkipKey(miter);
+ sst_file_range.SkipKey(miter);
+ continue;
+ }
+
+ if (!uncompression_dict_inited && rep_->uncompression_dict_reader) {
+ uncompression_dict_status =
+ rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
+ nullptr /* prefetch_buffer */, no_io,
+ read_options.verify_checksums,
+ sst_file_range.begin()->get_context, &lookup_context,
+ &uncompression_dict);
+ uncompression_dict_inited = true;
+ }
+
+ if (!uncompression_dict_status.ok()) {
+ assert(!uncompression_dict_status.IsNotFound());
+ *(miter->s) = uncompression_dict_status;
+ data_block_range.SkipKey(miter);
+ sst_file_range.SkipKey(miter);
+ continue;
+ }
+
+ statuses.emplace_back();
+ results.emplace_back();
+ if (v.handle.offset() == prev_offset) {
+ // This key can reuse the previous block (later on).
+ // Mark previous as "reused"
+ reused_mask |= MultiGetContext::Mask{1} << (block_handles.size() - 1);
+ // Use null handle to indicate this one reuses same block as
+ // previous.
+ block_handles.emplace_back(BlockHandle::NullBlockHandle());
+ continue;
+ }
+ // Lookup the cache for the given data block referenced by an index
+ // iterator value (i.e BlockHandle). If it exists in the cache,
+ // initialize block to the contents of the data block.
+ prev_offset = v.handle.offset();
+ BlockHandle handle = v.handle;
+ BlockCacheLookupContext lookup_data_block_context(
+ TableReaderCaller::kUserMultiGet);
+ const UncompressionDict& dict = uncompression_dict.GetValue()
+ ? *uncompression_dict.GetValue()
+ : UncompressionDict::GetEmptyDict();
+ Status s = RetrieveBlock(
+ nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
+ miter->get_context, &lookup_data_block_context,
+ /* for_compaction */ false, /* use_cache */ true,
+ /* wait_for_cache */ false, /* async_read */ false);
+ if (s.IsIncomplete()) {
+ s = Status::OK();
+ }
+ if (s.ok() && !results.back().IsEmpty()) {
+ // Since we have a valid handle, check the value. If its nullptr,
+ // it means the cache is waiting for the final result and we're
+ // supposed to call WaitAll() to wait for the result.
+ if (results.back().GetValue() != nullptr) {
+ // Found it in the cache. Add NULL handle to indicate there is
+ // nothing to read from disk.
+ if (results.back().GetCacheHandle()) {
+ results.back().UpdateCachedValue();
+ }
+ block_handles.emplace_back(BlockHandle::NullBlockHandle());
+ } else {
+ // We have to wait for the cache lookup to finish in the
+ // background, and then we may have to read the block from disk
+ // anyway
+ assert(results.back().GetCacheHandle());
+ wait_for_cache_results = true;
+ block_handles.emplace_back(handle);
+ cache_handles.emplace_back(results.back().GetCacheHandle());
+ }
+ } else {
+ block_handles.emplace_back(handle);
+ total_len += BlockSizeWithTrailer(handle);
+ }
+ }
+
+ if (wait_for_cache_results) {
+ Cache* block_cache = rep_->table_options.block_cache.get();
+ block_cache->WaitAll(cache_handles);
+ for (size_t i = 0; i < block_handles.size(); ++i) {
+ // If this block was a success or failure or not needed because
+ // the corresponding key is in the same block as a prior key, skip
+ if (block_handles[i] == BlockHandle::NullBlockHandle() ||
+ results[i].IsEmpty()) {
+ continue;
+ }
+ results[i].UpdateCachedValue();
+ void* val = results[i].GetValue();
+ Cache::Handle* handle = results[i].GetCacheHandle();
+ // GetContext for any key will do, as the stats will be aggregated
+ // anyway
+ GetContext* get_context = sst_file_range.begin()->get_context;
+ if (!val) {
+ // The async cache lookup failed - could be due to an error
+ // or a false positive. We need to read the data block from
+ // the SST file
+ results[i].Reset();
+ total_len += BlockSizeWithTrailer(block_handles[i]);
+ UpdateCacheMissMetrics(BlockType::kData, get_context);
+ } else {
+ block_handles[i] = BlockHandle::NullBlockHandle();
+ UpdateCacheHitMetrics(BlockType::kData, get_context,
+ block_cache->GetUsage(handle));
+ }
+ }
+ }
+
+ if (total_len) {
+ char* scratch = nullptr;
+ const UncompressionDict& dict = uncompression_dict.GetValue()
+ ? *uncompression_dict.GetValue()
+ : UncompressionDict::GetEmptyDict();
+ assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
+ assert(uncompression_dict_status.ok());
+ // If using direct IO, then scratch is not used, so keep it nullptr.
+ // If the blocks need to be uncompressed and we don't need the
+ // compressed blocks, then we can use a contiguous block of
+ // memory to read in all the blocks as it will be temporary
+ // storage
+ // 1. If blocks are compressed and compressed block cache is there,
+ // alloc heap bufs
+ // 2. If blocks are uncompressed, alloc heap bufs
+ // 3. If blocks are compressed and no compressed block cache, use
+ // stack buf
+ if (!rep_->file->use_direct_io() &&
+ rep_->table_options.block_cache_compressed == nullptr &&
+ rep_->blocks_maybe_compressed) {
+ if (total_len <= kMultiGetReadStackBufSize) {
+ scratch = stack_buf;
+ } else {
+ scratch = new char[total_len];
+ block_buf.reset(scratch);
+ }
+ }
+ CO_AWAIT(RetrieveMultipleBlocks)
+ (read_options, &data_block_range, &block_handles, &statuses, &results,
+ scratch, dict);
+ if (sst_file_range.begin()->get_context) {
+ ++(sst_file_range.begin()
+ ->get_context->get_context_stats_.num_sst_read);
+ }
+ }
+ }
+
+ DataBlockIter first_biter;
+ DataBlockIter next_biter;
+ size_t idx_in_batch = 0;
+ SharedCleanablePtr shared_cleanable;
+ for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
+ ++miter) {
+ Status s;
+ GetContext* get_context = miter->get_context;
+ const Slice& key = miter->ikey;
+ bool matched = false; // if such user key matched a key in SST
+ bool done = false;
+ bool first_block = true;
+ do {
+ DataBlockIter* biter = nullptr;
+ bool reusing_prev_block;
+ bool later_reused;
+ uint64_t referenced_data_size = 0;
+ bool does_referenced_key_exist = false;
+ BlockCacheLookupContext lookup_data_block_context(
+ TableReaderCaller::kUserMultiGet, tracing_mget_id,
+ /*_get_from_user_specified_snapshot=*/read_options.snapshot !=
+ nullptr);
+ if (first_block) {
+ if (!block_handles[idx_in_batch].IsNull() ||
+ !results[idx_in_batch].IsEmpty()) {
+ first_biter.Invalidate(Status::OK());
+ NewDataBlockIterator<DataBlockIter>(
+ read_options, results[idx_in_batch], &first_biter,
+ statuses[idx_in_batch]);
+ reusing_prev_block = false;
+ } else {
+ // If handler is null and result is empty, then the status is never
+ // set, which should be the initial value: ok().
+ assert(statuses[idx_in_batch].ok());
+ reusing_prev_block = true;
+ }
+ biter = &first_biter;
+ later_reused =
+ (reused_mask & (MultiGetContext::Mask{1} << idx_in_batch)) != 0;
+ idx_in_batch++;
+ } else {
+ IndexValue v = iiter->value();
+ if (!v.first_internal_key.empty() && !skip_filters &&
+ UserComparatorWrapper(rep_->internal_comparator.user_comparator())
+ .CompareWithoutTimestamp(
+ ExtractUserKey(key),
+ ExtractUserKey(v.first_internal_key)) < 0) {
+ // The requested key falls between highest key in previous block and
+ // lowest key in current block.
+ break;
+ }
+
+ next_biter.Invalidate(Status::OK());
+ Status tmp_s;
+ NewDataBlockIterator<DataBlockIter>(
+ read_options, iiter->value().handle, &next_biter,
+ BlockType::kData, get_context, &lookup_data_block_context,
+ /* prefetch_buffer= */ nullptr, /* for_compaction = */ false,
+ /*async_read = */ false, tmp_s);
+ biter = &next_biter;
+ reusing_prev_block = false;
+ later_reused = false;
+ }
+
+ if (read_options.read_tier == kBlockCacheTier &&
+ biter->status().IsIncomplete()) {
+ // couldn't get block from block_cache
+ // Update Saver.state to Found because we are only looking for
+ // whether we can guarantee the key is not there when "no_io" is set
+ get_context->MarkKeyMayExist();
+ break;
+ }
+ if (!biter->status().ok()) {
+ s = biter->status();
+ break;
+ }
+
+ // Reusing blocks complicates pinning/Cleanable, because the cache
+ // entry referenced by biter can only be released once all returned
+ // pinned values are released. This code previously did an extra
+ // block_cache Ref for each reuse, but that unnecessarily increases
+ // block cache contention. Instead we can use a variant of shared_ptr
+ // to release in block cache only once.
+ //
+ // Although the biter loop below might SaveValue multiple times for
+ // merges, just one value_pinner suffices, as MultiGet will merge
+ // the operands before returning to the API user.
+ Cleanable* value_pinner;
+ if (biter->IsValuePinned()) {
+ if (reusing_prev_block) {
+ // Note that we don't yet know if the MultiGet results will need
+ // to pin this block, so we might wrap a block for sharing and
+ // still end up with 1 (or 0) pinning ref. Not ideal but OK.
+ //
+ // Here we avoid adding redundant cleanups if we didn't end up
+ // delegating the cleanup from last time around.
+ if (!biter->HasCleanups()) {
+ assert(shared_cleanable.get());
+ if (later_reused) {
+ shared_cleanable.RegisterCopyWith(biter);
+ } else {
+ shared_cleanable.MoveAsCleanupTo(biter);
+ }
+ }
+ } else if (later_reused) {
+ assert(biter->HasCleanups());
+ // Make the existing cleanups on `biter` sharable:
+ shared_cleanable.Allocate();
+ // Move existing `biter` cleanup(s) to `shared_cleanable`
+ biter->DelegateCleanupsTo(&*shared_cleanable);
+ // Reference `shared_cleanable` as new cleanup for `biter`
+ shared_cleanable.RegisterCopyWith(biter);
+ }
+ assert(biter->HasCleanups());
+ value_pinner = biter;
+ } else {
+ value_pinner = nullptr;
+ }
+
+ bool may_exist = biter->SeekForGet(key);
+ if (!may_exist) {
+ // HashSeek cannot find the key this block and the the iter is not
+ // the end of the block, i.e. cannot be in the following blocks
+ // either. In this case, the seek_key cannot be found, so we break
+ // from the top level for-loop.
+ break;
+ }
+
+ // Call the *saver function on each entry/block until it returns false
+ for (; biter->Valid(); biter->Next()) {
+ ParsedInternalKey parsed_key;
+ Status pik_status = ParseInternalKey(
+ biter->key(), &parsed_key, false /* log_err_key */); // TODO
+ if (!pik_status.ok()) {
+ s = pik_status;
+ }
+ if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
+ value_pinner)) {
+ if (get_context->State() == GetContext::GetState::kFound) {
+ does_referenced_key_exist = true;
+ referenced_data_size =
+ biter->key().size() + biter->value().size();
+ }
+ done = true;
+ break;
+ }
+ s = biter->status();
+ }
+ // Write the block cache access.
+ // XXX: There appear to be 'break' statements above that bypass this
+ // writing of the block cache trace record
+ if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
+ !reusing_prev_block) {
+ // Avoid making copy of block_key, cf_name, and referenced_key when
+ // constructing the access record.
+ Slice referenced_key;
+ if (does_referenced_key_exist) {
+ referenced_key = biter->key();
+ } else {
+ referenced_key = key;
+ }
+ BlockCacheTraceRecord access_record(
+ rep_->ioptions.clock->NowMicros(),
+ /*_block_key=*/"", lookup_data_block_context.block_type,
+ lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
+ /*_cf_name=*/"", rep_->level_for_tracing(),
+ rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
+ lookup_data_block_context.is_cache_hit,
+ lookup_data_block_context.no_insert,
+ lookup_data_block_context.get_id,
+ lookup_data_block_context.get_from_user_specified_snapshot,
+ /*_referenced_key=*/"", referenced_data_size,
+ lookup_data_block_context.num_keys_in_block,
+ does_referenced_key_exist);
+ // TODO: Should handle status here?
+ block_cache_tracer_
+ ->WriteBlockAccess(access_record,
+ lookup_data_block_context.block_key,
+ rep_->cf_name_for_tracing(), referenced_key)
+ .PermitUncheckedError();
+ }
+ s = biter->status();
+ if (done) {
+ // Avoid the extra Next which is expensive in two-level indexes
+ break;
+ }
+ if (first_block) {
+ iiter->Seek(key);
+ if (!iiter->Valid()) {
+ break;
+ }
+ }
+ first_block = false;
+ iiter->Next();
+ } while (iiter->Valid());
+
+ if (matched && filter != nullptr) {
+ RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
+ PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
+ rep_->level);
+ }
+ if (s.ok() && !iiter->status().IsNotFound()) {
+ s = iiter->status();
+ }
+ *(miter->s) = s;
+ }
+#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
+ // Not sure why we need to do it. Should investigate more.
+ for (auto& st : statuses) {
+ st.PermitUncheckedError();
+ }
+#endif // ROCKSDB_ASSERT_STATUS_CHECKED
+ }
+}
+} // namespace ROCKSDB_NAMESPACE
+#endif