// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "table/block_based/partitioned_filter_block.h" #include #include "block_type.h" #include "file/random_access_file_reader.h" #include "logging/logging.h" #include "monitoring/perf_context_imp.h" #include "port/malloc.h" #include "port/port.h" #include "rocksdb/filter_policy.h" #include "table/block_based/block.h" #include "table/block_based/block_based_table_reader.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder( const SliceTransform* _prefix_extractor, bool whole_key_filtering, FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval, const bool use_value_delta_encoding, PartitionedIndexBuilder* const p_index_builder, const uint32_t partition_size) : FullFilterBlockBuilder(_prefix_extractor, whole_key_filtering, filter_bits_builder), index_on_filter_block_builder_(index_block_restart_interval, true /*use_delta_encoding*/, use_value_delta_encoding), index_on_filter_block_builder_without_seq_(index_block_restart_interval, true /*use_delta_encoding*/, use_value_delta_encoding), p_index_builder_(p_index_builder), keys_added_to_partition_(0), total_added_in_built_(0) { keys_per_partition_ = static_cast( filter_bits_builder_->ApproximateNumEntries(partition_size)); if (keys_per_partition_ < 1) { // partition_size (minus buffer, ~10%) might be smaller than minimum // filter size, sometimes based on cache line size. Try to find that // minimum size without CalculateSpace (not necessarily available). uint32_t larger = std::max(partition_size + 4, uint32_t{16}); for (;;) { keys_per_partition_ = static_cast( filter_bits_builder_->ApproximateNumEntries(larger)); if (keys_per_partition_ >= 1) { break; } larger += larger / 4; if (larger > 100000) { // might be a broken implementation. substitute something reasonable: // 1 key / byte. keys_per_partition_ = partition_size; break; } } } } PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() { partitioned_filters_construction_status_.PermitUncheckedError(); } void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock( const Slice* next_key) { // Use == to send the request only once if (keys_added_to_partition_ == keys_per_partition_) { // Currently only index builder is in charge of cutting a partition. We keep // requesting until it is granted. p_index_builder_->RequestPartitionCut(); } if (!p_index_builder_->ShouldCutFilterBlock()) { return; } // Add the prefix of the next key before finishing the partition without // updating last_prefix_str_. This hack, fixes a bug with format_verison=3 // where seeking for the prefix would lead us to the previous partition. const bool maybe_add_prefix = next_key && prefix_extractor() && prefix_extractor()->InDomain(*next_key); if (maybe_add_prefix) { const Slice next_key_prefix = prefix_extractor()->Transform(*next_key); if (next_key_prefix.compare(last_prefix_str()) != 0) { AddKey(next_key_prefix); } } total_added_in_built_ += filter_bits_builder_->EstimateEntriesAdded(); std::unique_ptr filter_data; Status filter_construction_status = Status::OK(); Slice filter = filter_bits_builder_->Finish(&filter_data, &filter_construction_status); if (filter_construction_status.ok()) { filter_construction_status = filter_bits_builder_->MaybePostVerify(filter); } std::string& index_key = p_index_builder_->GetPartitionKey(); filters.push_back({index_key, std::move(filter_data), filter}); if (!filter_construction_status.ok() && partitioned_filters_construction_status_.ok()) { partitioned_filters_construction_status_ = filter_construction_status; } keys_added_to_partition_ = 0; Reset(); } void PartitionedFilterBlockBuilder::Add(const Slice& key) { MaybeCutAFilterBlock(&key); FullFilterBlockBuilder::Add(key); } void PartitionedFilterBlockBuilder::AddKey(const Slice& key) { FullFilterBlockBuilder::AddKey(key); keys_added_to_partition_++; } size_t PartitionedFilterBlockBuilder::EstimateEntriesAdded() { return total_added_in_built_ + filter_bits_builder_->EstimateEntriesAdded(); } Slice PartitionedFilterBlockBuilder::Finish( const BlockHandle& last_partition_block_handle, Status* status, std::unique_ptr* filter_data) { if (finishing_filters == true) { // Record the handle of the last written filter block in the index std::string handle_encoding; last_partition_block_handle.EncodeTo(&handle_encoding); std::string handle_delta_encoding; PutVarsignedint64( &handle_delta_encoding, last_partition_block_handle.size() - last_encoded_handle_.size()); last_encoded_handle_ = last_partition_block_handle; const Slice handle_delta_encoding_slice(handle_delta_encoding); index_on_filter_block_builder_.Add(last_filter_entry_key, handle_encoding, &handle_delta_encoding_slice); if (!p_index_builder_->seperator_is_key_plus_seq()) { index_on_filter_block_builder_without_seq_.Add( ExtractUserKey(last_filter_entry_key), handle_encoding, &handle_delta_encoding_slice); } } else { MaybeCutAFilterBlock(nullptr); } if (!partitioned_filters_construction_status_.ok()) { *status = partitioned_filters_construction_status_; return Slice(); } // If there is no filter partition left, then return the index on filter // partitions if (UNLIKELY(filters.empty())) { *status = Status::OK(); last_filter_data.reset(); if (finishing_filters) { // Simplest to just add them all at the end total_added_in_built_ = 0; if (p_index_builder_->seperator_is_key_plus_seq()) { return index_on_filter_block_builder_.Finish(); } else { return index_on_filter_block_builder_without_seq_.Finish(); } } else { // This is the rare case where no key was added to the filter return Slice(); } } else { // Return the next filter partition in line and set Incomplete() status to // indicate we expect more calls to Finish *status = Status::Incomplete(); finishing_filters = true; last_filter_entry_key = filters.front().key; Slice filter = filters.front().filter; last_filter_data = std::move(filters.front().filter_data); if (filter_data != nullptr) { *filter_data = std::move(last_filter_data); } filters.pop_front(); return filter; } } PartitionedFilterBlockReader::PartitionedFilterBlockReader( const BlockBasedTable* t, CachableEntry&& filter_block) : FilterBlockReaderCommon(t, std::move(filter_block)) {} std::unique_ptr PartitionedFilterBlockReader::Create( const BlockBasedTable* table, const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, bool pin, BlockCacheLookupContext* lookup_context) { assert(table); assert(table->get_rep()); assert(!pin || prefetch); CachableEntry filter_block; if (prefetch || !use_cache) { const Status s = ReadFilterBlock( table, prefetch_buffer, ro, use_cache, nullptr /* get_context */, lookup_context, &filter_block, BlockType::kFilterPartitionIndex); if (!s.ok()) { IGNORE_STATUS_IF_ERROR(s); return std::unique_ptr(); } if (use_cache && !pin) { filter_block.Reset(); } } return std::unique_ptr( new PartitionedFilterBlockReader(table, std::move(filter_block))); } bool PartitionedFilterBlockReader::KeyMayMatch( const Slice& key, const bool no_io, const Slice* const const_ikey_ptr, GetContext* get_context, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority) { assert(const_ikey_ptr != nullptr); if (!whole_key_filtering()) { return true; } return MayMatch(key, no_io, const_ikey_ptr, get_context, lookup_context, rate_limiter_priority, &FullFilterBlockReader::KeyMayMatch); } void PartitionedFilterBlockReader::KeysMayMatch( MultiGetRange* range, const bool no_io, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority) { if (!whole_key_filtering()) { return; // Any/all may match } MayMatch(range, nullptr, no_io, lookup_context, rate_limiter_priority, &FullFilterBlockReader::KeysMayMatch2); } bool PartitionedFilterBlockReader::PrefixMayMatch( const Slice& prefix, const bool no_io, const Slice* const const_ikey_ptr, GetContext* get_context, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority) { assert(const_ikey_ptr != nullptr); return MayMatch(prefix, no_io, const_ikey_ptr, get_context, lookup_context, rate_limiter_priority, &FullFilterBlockReader::PrefixMayMatch); } void PartitionedFilterBlockReader::PrefixesMayMatch( MultiGetRange* range, const SliceTransform* prefix_extractor, const bool no_io, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority) { assert(prefix_extractor); MayMatch(range, prefix_extractor, no_io, lookup_context, rate_limiter_priority, &FullFilterBlockReader::PrefixesMayMatch); } BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle( const CachableEntry& filter_block, const Slice& entry) const { IndexBlockIter iter; const InternalKeyComparator* const comparator = internal_comparator(); Statistics* kNullStats = nullptr; filter_block.GetValue()->NewIndexIterator( comparator->user_comparator(), table()->get_rep()->get_global_seqno(BlockType::kFilterPartitionIndex), &iter, kNullStats, true /* total_order_seek */, false /* have_first_key */, index_key_includes_seq(), index_value_is_full()); iter.Seek(entry); if (UNLIKELY(!iter.Valid())) { // entry is larger than all the keys. However its prefix might still be // present in the last partition. If this is called by PrefixMayMatch this // is necessary for correct behavior. Otherwise it is unnecessary but safe. // Assuming this is an unlikely case for full key search, the performance // overhead should be negligible. iter.SeekToLast(); } assert(iter.Valid()); BlockHandle fltr_blk_handle = iter.value().handle; return fltr_blk_handle; } Status PartitionedFilterBlockReader::GetFilterPartitionBlock( FilePrefetchBuffer* prefetch_buffer, const BlockHandle& fltr_blk_handle, bool no_io, GetContext* get_context, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority, CachableEntry* filter_block) const { assert(table()); assert(filter_block); assert(filter_block->IsEmpty()); if (!filter_map_.empty()) { auto iter = filter_map_.find(fltr_blk_handle.offset()); // This is a possible scenario since block cache might not have had space // for the partition if (iter != filter_map_.end()) { filter_block->SetUnownedValue(iter->second.GetValue()); return Status::OK(); } } ReadOptions read_options; read_options.rate_limiter_priority = rate_limiter_priority; if (no_io) { read_options.read_tier = kBlockCacheTier; } const Status s = table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle, UncompressionDict::GetEmptyDict(), filter_block, BlockType::kFilter, get_context, lookup_context, /* for_compaction */ false, /* use_cache */ true, /* wait_for_cache */ true, /* async_read */ false); return s; } bool PartitionedFilterBlockReader::MayMatch( const Slice& slice, bool no_io, const Slice* const_ikey_ptr, GetContext* get_context, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority, FilterFunction filter_function) const { CachableEntry filter_block; Status s = GetOrReadFilterBlock( no_io, get_context, lookup_context, &filter_block, BlockType::kFilterPartitionIndex, rate_limiter_priority); if (UNLIKELY(!s.ok())) { IGNORE_STATUS_IF_ERROR(s); return true; } if (UNLIKELY(filter_block.GetValue()->size() == 0)) { return true; } auto filter_handle = GetFilterPartitionHandle(filter_block, *const_ikey_ptr); if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range return false; } CachableEntry filter_partition_block; s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle, no_io, get_context, lookup_context, rate_limiter_priority, &filter_partition_block); if (UNLIKELY(!s.ok())) { IGNORE_STATUS_IF_ERROR(s); return true; } FullFilterBlockReader filter_partition(table(), std::move(filter_partition_block)); return (filter_partition.*filter_function)(slice, no_io, const_ikey_ptr, get_context, lookup_context, rate_limiter_priority); } void PartitionedFilterBlockReader::MayMatch( MultiGetRange* range, const SliceTransform* prefix_extractor, bool no_io, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority, FilterManyFunction filter_function) const { CachableEntry filter_block; Status s = GetOrReadFilterBlock( no_io, range->begin()->get_context, lookup_context, &filter_block, BlockType::kFilterPartitionIndex, rate_limiter_priority); if (UNLIKELY(!s.ok())) { IGNORE_STATUS_IF_ERROR(s); return; // Any/all may match } if (UNLIKELY(filter_block.GetValue()->size() == 0)) { return; // Any/all may match } auto start_iter_same_handle = range->begin(); BlockHandle prev_filter_handle = BlockHandle::NullBlockHandle(); // For all keys mapping to same partition (must be adjacent in sorted order) // share block cache lookup and use full filter multiget on the partition // filter. for (auto iter = start_iter_same_handle; iter != range->end(); ++iter) { // TODO: re-use one top-level index iterator BlockHandle this_filter_handle = GetFilterPartitionHandle(filter_block, iter->ikey); if (!prev_filter_handle.IsNull() && this_filter_handle != prev_filter_handle) { MultiGetRange subrange(*range, start_iter_same_handle, iter); MayMatchPartition(&subrange, prefix_extractor, prev_filter_handle, no_io, lookup_context, rate_limiter_priority, filter_function); range->AddSkipsFrom(subrange); start_iter_same_handle = iter; } if (UNLIKELY(this_filter_handle.size() == 0)) { // key is out of range // Not reachable with current behavior of GetFilterPartitionHandle assert(false); range->SkipKey(iter); prev_filter_handle = BlockHandle::NullBlockHandle(); } else { prev_filter_handle = this_filter_handle; } } if (!prev_filter_handle.IsNull()) { MultiGetRange subrange(*range, start_iter_same_handle, range->end()); MayMatchPartition(&subrange, prefix_extractor, prev_filter_handle, no_io, lookup_context, rate_limiter_priority, filter_function); range->AddSkipsFrom(subrange); } } void PartitionedFilterBlockReader::MayMatchPartition( MultiGetRange* range, const SliceTransform* prefix_extractor, BlockHandle filter_handle, bool no_io, BlockCacheLookupContext* lookup_context, Env::IOPriority rate_limiter_priority, FilterManyFunction filter_function) const { CachableEntry filter_partition_block; Status s = GetFilterPartitionBlock( nullptr /* prefetch_buffer */, filter_handle, no_io, range->begin()->get_context, lookup_context, rate_limiter_priority, &filter_partition_block); if (UNLIKELY(!s.ok())) { IGNORE_STATUS_IF_ERROR(s); return; // Any/all may match } FullFilterBlockReader filter_partition(table(), std::move(filter_partition_block)); (filter_partition.*filter_function)(range, prefix_extractor, no_io, lookup_context, rate_limiter_priority); } size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const { size_t usage = ApproximateFilterBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size(const_cast(this)); #else usage += sizeof(*this); #endif // ROCKSDB_MALLOC_USABLE_SIZE return usage; // TODO(myabandeh): better estimation for filter_map_ size } // TODO(myabandeh): merge this with the same function in IndexReader Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, bool pin) { assert(table()); const BlockBasedTable::Rep* const rep = table()->get_rep(); assert(rep); BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; CachableEntry filter_block; Status s = GetOrReadFilterBlock(false /* no_io */, nullptr /* get_context */, &lookup_context, &filter_block, BlockType::kFilterPartitionIndex, ro.rate_limiter_priority); if (!s.ok()) { ROCKS_LOG_ERROR(rep->ioptions.logger, "Error retrieving top-level filter block while trying to " "cache filter partitions: %s", s.ToString().c_str()); return s; } // Before read partitions, prefetch them to avoid lots of IOs assert(filter_block.GetValue()); IndexBlockIter biter; const InternalKeyComparator* const comparator = internal_comparator(); Statistics* kNullStats = nullptr; filter_block.GetValue()->NewIndexIterator( comparator->user_comparator(), rep->get_global_seqno(BlockType::kFilterPartitionIndex), &biter, kNullStats, true /* total_order_seek */, false /* have_first_key */, index_key_includes_seq(), index_value_is_full()); // Index partitions are assumed to be consecuitive. Prefetch them all. // Read the first block offset biter.SeekToFirst(); BlockHandle handle = biter.value().handle; uint64_t prefetch_off = handle.offset(); // Read the last block's offset biter.SeekToLast(); handle = biter.value().handle; uint64_t last_off = handle.offset() + handle.size() + BlockBasedTable::kBlockTrailerSize; uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer( 0, 0, &prefetch_buffer, false /* Implicit autoreadahead */, 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/); IOOptions opts; s = rep->file->PrepareIOOptions(ro, opts); if (s.ok()) { s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, static_cast(prefetch_len), ro.rate_limiter_priority); } if (!s.ok()) { return s; } // After prefetch, read the partitions one by one for (biter.SeekToFirst(); biter.Valid(); biter.Next()) { handle = biter.value().handle; CachableEntry block; // TODO: Support counter batch update for partitioned index and // filter blocks s = table()->MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), /* wait */ true, /* for_compaction */ false, &block, BlockType::kFilter, nullptr /* get_context */, &lookup_context, nullptr /* contents */, false); if (!s.ok()) { return s; } assert(s.ok() || block.GetValue() == nullptr); if (block.GetValue() != nullptr) { if (block.IsCached()) { if (pin) { filter_map_[handle.offset()] = std::move(block); } } } } return biter.status(); } const InternalKeyComparator* PartitionedFilterBlockReader::internal_comparator() const { assert(table()); assert(table()->get_rep()); return &table()->get_rep()->internal_comparator; } bool PartitionedFilterBlockReader::index_key_includes_seq() const { assert(table()); assert(table()->get_rep()); return table()->get_rep()->index_key_includes_seq; } bool PartitionedFilterBlockReader::index_value_is_full() const { assert(table()); assert(table()->get_rep()); return table()->get_rep()->index_value_is_full; } } // namespace ROCKSDB_NAMESPACE