From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../table/block_based/partitioned_index_reader.cc | 215 +++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 src/rocksdb/table/block_based/partitioned_index_reader.cc (limited to 'src/rocksdb/table/block_based/partitioned_index_reader.cc') diff --git a/src/rocksdb/table/block_based/partitioned_index_reader.cc b/src/rocksdb/table/block_based/partitioned_index_reader.cc new file mode 100644 index 000000000..017ea4a3a --- /dev/null +++ b/src/rocksdb/table/block_based/partitioned_index_reader.cc @@ -0,0 +1,215 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/partitioned_index_reader.h" + +#include "file/random_access_file_reader.h" +#include "table/block_based/block_based_table_reader.h" +#include "table/block_based/partitioned_index_iterator.h" + +namespace ROCKSDB_NAMESPACE { +Status PartitionIndexReader::Create( + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = + ReadIndexBlock(table, prefetch_buffer, ro, use_cache, + /*get_context=*/nullptr, lookup_context, &index_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + index_block.Reset(); + } + } + + index_reader->reset(new PartitionIndexReader(table, std::move(index_block))); + + return Status::OK(); +} + +InternalIteratorBase* PartitionIndexReader::NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + const bool no_io = (read_options.read_tier == kBlockCacheTier); + CachableEntry index_block; + const Status s = + GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority, + get_context, lookup_context, &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + const BlockBasedTable::Rep* rep = table()->rep_; + InternalIteratorBase* it = nullptr; + + Statistics* kNullStats = nullptr; + // Filters are already checked before seeking the index + if (!partition_map_.empty()) { + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + it = NewTwoLevelIterator( + new BlockBasedTable::PartitionedIndexIteratorState(table(), + &partition_map_), + index_block.GetValue()->NewIndexIterator( + internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), + index_value_is_full())); + } else { + ReadOptions ro; + ro.fill_cache = read_options.fill_cache; + ro.deadline = read_options.deadline; + ro.io_timeout = read_options.io_timeout; + ro.adaptive_readahead = read_options.adaptive_readahead; + ro.async_io = read_options.async_io; + ro.rate_limiter_priority = read_options.rate_limiter_priority; + + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + std::unique_ptr> index_iter( + index_block.GetValue()->NewIndexIterator( + internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), + index_value_is_full())); + + it = new PartitionedIndexIterator( + table(), ro, *internal_comparator(), std::move(index_iter), + lookup_context ? lookup_context->caller + : TableReaderCaller::kUncategorized); + } + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + + // TODO(myabandeh): Update TwoLevelIterator to be able to make use of + // on-stack BlockIter while the state is on heap. Currentlly it assumes + // the first level iter is always on heap and will attempt to delete it + // in its destructor. +} +Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, + bool pin) { + // Before read partitions, prefetch them to avoid lots of IOs + BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; + const BlockBasedTable::Rep* rep = table()->rep_; + IndexBlockIter biter; + BlockHandle handle; + Statistics* kNullStats = nullptr; + + CachableEntry index_block; + { + Status s = GetOrReadIndexBlock(false /* no_io */, ro.rate_limiter_priority, + nullptr /* get_context */, &lookup_context, + &index_block); + if (!s.ok()) { + return s; + } + } + + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + index_block.GetValue()->NewIndexIterator( + internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), index_value_is_full()); + // Index partitions are assumed to be consecuitive. Prefetch them all. + // Read the first block offset + biter.SeekToFirst(); + if (!biter.Valid()) { + // Empty index. + return biter.status(); + } + handle = biter.value().handle; + uint64_t prefetch_off = handle.offset(); + + // Read the last block's offset + biter.SeekToLast(); + if (!biter.Valid()) { + // Empty index. + return biter.status(); + } + handle = biter.value().handle; + uint64_t last_off = + handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle); + uint64_t prefetch_len = last_off - prefetch_off; + std::unique_ptr prefetch_buffer; + rep->CreateFilePrefetchBuffer( + 0, 0, &prefetch_buffer, false /*Implicit auto readahead*/, + 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/); + IOOptions opts; + { + Status s = rep->file->PrepareIOOptions(ro, opts); + if (s.ok()) { + s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, + static_cast(prefetch_len), + ro.rate_limiter_priority); + } + if (!s.ok()) { + return s; + } + } + + // For saving "all or nothing" to partition_map_ + UnorderedMap> map_in_progress; + + // After prefetch, read the partitions one by one + biter.SeekToFirst(); + size_t partition_count = 0; + for (; biter.Valid(); biter.Next()) { + handle = biter.value().handle; + CachableEntry block; + ++partition_count; + // TODO: Support counter batch update for partitioned index and + // filter blocks + Status s = table()->MaybeReadBlockAndLoadToCache( + prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), + /*wait=*/true, /*for_compaction=*/false, &block, BlockType::kIndex, + /*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr, + /*async_read=*/false); + + if (!s.ok()) { + return s; + } + if (block.GetValue() != nullptr) { + // Might need to "pin" some mmap-read blocks (GetOwnValue) if some + // partitions are successfully compressed (cached) and some are not + // compressed (mmap eligible) + if (block.IsCached() || block.GetOwnValue()) { + if (pin) { + map_in_progress[handle.offset()] = std::move(block); + } + } + } + } + Status s = biter.status(); + // Save (pin) them only if everything checks out + if (map_in_progress.size() == partition_count && s.ok()) { + std::swap(partition_map_, map_in_progress); + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3