diff options
Diffstat (limited to 'src/rocksdb/table/meta_blocks.cc')
-rw-r--r-- | src/rocksdb/table/meta_blocks.cc | 514 |
1 files changed, 514 insertions, 0 deletions
diff --git a/src/rocksdb/table/meta_blocks.cc b/src/rocksdb/table/meta_blocks.cc new file mode 100644 index 00000000..57111cfe --- /dev/null +++ b/src/rocksdb/table/meta_blocks.cc @@ -0,0 +1,514 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include "table/meta_blocks.h" + +#include <map> +#include <string> + +#include "db/table_properties_collector.h" +#include "rocksdb/table.h" +#include "rocksdb/table_properties.h" +#include "table/block.h" +#include "table/block_fetcher.h" +#include "table/format.h" +#include "table/internal_iterator.h" +#include "table/persistent_cache_helper.h" +#include "table/table_properties_internal.h" +#include "util/coding.h" +#include "util/file_reader_writer.h" + +namespace rocksdb { + +MetaIndexBuilder::MetaIndexBuilder() + : meta_index_block_(new BlockBuilder(1 /* restart interval */)) {} + +void MetaIndexBuilder::Add(const std::string& key, + const BlockHandle& handle) { + std::string handle_encoding; + handle.EncodeTo(&handle_encoding); + meta_block_handles_.insert({key, handle_encoding}); +} + +Slice MetaIndexBuilder::Finish() { + for (const auto& metablock : meta_block_handles_) { + meta_index_block_->Add(metablock.first, metablock.second); + } + return meta_index_block_->Finish(); +} + +// Property block will be read sequentially and cached in a heap located +// object, so there's no need for restart points. Thus we set the restart +// interval to infinity to save space. +PropertyBlockBuilder::PropertyBlockBuilder() + : properties_block_( + new BlockBuilder(port::kMaxInt32 /* restart interval */)) {} + +void PropertyBlockBuilder::Add(const std::string& name, + const std::string& val) { + props_.insert({name, val}); +} + +void PropertyBlockBuilder::Add(const std::string& name, uint64_t val) { + assert(props_.find(name) == props_.end()); + + std::string dst; + PutVarint64(&dst, val); + + Add(name, dst); +} + +void PropertyBlockBuilder::Add( + const UserCollectedProperties& user_collected_properties) { + for (const auto& prop : user_collected_properties) { + Add(prop.first, prop.second); + } +} + +void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { + Add(TablePropertiesNames::kRawKeySize, props.raw_key_size); + Add(TablePropertiesNames::kRawValueSize, props.raw_value_size); + Add(TablePropertiesNames::kDataSize, props.data_size); + Add(TablePropertiesNames::kIndexSize, props.index_size); + if (props.index_partitions != 0) { + Add(TablePropertiesNames::kIndexPartitions, props.index_partitions); + Add(TablePropertiesNames::kTopLevelIndexSize, props.top_level_index_size); + } + Add(TablePropertiesNames::kIndexKeyIsUserKey, props.index_key_is_user_key); + Add(TablePropertiesNames::kIndexValueIsDeltaEncoded, + props.index_value_is_delta_encoded); + Add(TablePropertiesNames::kNumEntries, props.num_entries); + Add(TablePropertiesNames::kDeletedKeys, props.num_deletions); + Add(TablePropertiesNames::kMergeOperands, props.num_merge_operands); + Add(TablePropertiesNames::kNumRangeDeletions, props.num_range_deletions); + Add(TablePropertiesNames::kNumDataBlocks, props.num_data_blocks); + Add(TablePropertiesNames::kFilterSize, props.filter_size); + Add(TablePropertiesNames::kFormatVersion, props.format_version); + Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len); + Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id); + Add(TablePropertiesNames::kCreationTime, props.creation_time); + Add(TablePropertiesNames::kOldestKeyTime, props.oldest_key_time); + + if (!props.filter_policy_name.empty()) { + Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name); + } + if (!props.comparator_name.empty()) { + Add(TablePropertiesNames::kComparator, props.comparator_name); + } + + if (!props.merge_operator_name.empty()) { + Add(TablePropertiesNames::kMergeOperator, props.merge_operator_name); + } + if (!props.prefix_extractor_name.empty()) { + Add(TablePropertiesNames::kPrefixExtractorName, + props.prefix_extractor_name); + } + if (!props.property_collectors_names.empty()) { + Add(TablePropertiesNames::kPropertyCollectors, + props.property_collectors_names); + } + if (!props.column_family_name.empty()) { + Add(TablePropertiesNames::kColumnFamilyName, props.column_family_name); + } + + if (!props.compression_name.empty()) { + Add(TablePropertiesNames::kCompression, props.compression_name); + } + if (!props.compression_options.empty()) { + Add(TablePropertiesNames::kCompressionOptions, props.compression_options); + } +} + +Slice PropertyBlockBuilder::Finish() { + for (const auto& prop : props_) { + properties_block_->Add(prop.first, prop.second); + } + + return properties_block_->Finish(); +} + +void LogPropertiesCollectionError( + Logger* info_log, const std::string& method, const std::string& name) { + assert(method == "Add" || method == "Finish"); + + std::string msg = + "Encountered error when calling TablePropertiesCollector::" + + method + "() with collector name: " + name; + ROCKS_LOG_ERROR(info_log, "%s", msg.c_str()); +} + +bool NotifyCollectTableCollectorsOnAdd( + const Slice& key, const Slice& value, uint64_t file_size, + const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors, + Logger* info_log) { + bool all_succeeded = true; + for (auto& collector : collectors) { + Status s = collector->InternalAdd(key, value, file_size); + all_succeeded = all_succeeded && s.ok(); + if (!s.ok()) { + LogPropertiesCollectionError(info_log, "Add" /* method */, + collector->Name()); + } + } + return all_succeeded; +} + +void NotifyCollectTableCollectorsOnBlockAdd( + const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors, + const uint64_t blockRawBytes, const uint64_t blockCompressedBytesFast, + const uint64_t blockCompressedBytesSlow) { + for (auto& collector : collectors) { + collector->BlockAdd(blockRawBytes, blockCompressedBytesFast, + blockCompressedBytesSlow); + } +} + +bool NotifyCollectTableCollectorsOnFinish( + const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors, + Logger* info_log, PropertyBlockBuilder* builder) { + bool all_succeeded = true; + for (auto& collector : collectors) { + UserCollectedProperties user_collected_properties; + Status s = collector->Finish(&user_collected_properties); + + all_succeeded = all_succeeded && s.ok(); + if (!s.ok()) { + LogPropertiesCollectionError(info_log, "Finish" /* method */, + collector->Name()); + } else { + builder->Add(user_collected_properties); + } + } + + return all_succeeded; +} + +Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, const Footer& footer, + const ImmutableCFOptions& ioptions, + TableProperties** table_properties, bool verify_checksum, + BlockHandle* ret_block_handle, + CacheAllocationPtr* verification_buf, + bool /*compression_type_missing*/, + MemoryAllocator* memory_allocator) { + assert(table_properties); + + Slice v = handle_value; + BlockHandle handle; + if (!handle.DecodeFrom(&v).ok()) { + return Status::InvalidArgument("Failed to decode properties block handle"); + } + + BlockContents block_contents; + ReadOptions read_options; + read_options.verify_checksums = verify_checksum; + Status s; + PersistentCacheOptions cache_options; + + BlockFetcher block_fetcher( + file, prefetch_buffer, footer, read_options, handle, &block_contents, + ioptions, false /* decompress */, false /*maybe_compressed*/, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); + s = block_fetcher.ReadBlockContents(); + // property block is never compressed. Need to add uncompress logic if we are + // to compress it.. + + if (!s.ok()) { + return s; + } + + Block properties_block(std::move(block_contents), + kDisableGlobalSequenceNumber); + DataBlockIter iter; + properties_block.NewIterator<DataBlockIter>(BytewiseComparator(), + BytewiseComparator(), &iter); + + auto new_table_properties = new TableProperties(); + // All pre-defined properties of type uint64_t + std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = { + {TablePropertiesNames::kDataSize, &new_table_properties->data_size}, + {TablePropertiesNames::kIndexSize, &new_table_properties->index_size}, + {TablePropertiesNames::kIndexPartitions, + &new_table_properties->index_partitions}, + {TablePropertiesNames::kTopLevelIndexSize, + &new_table_properties->top_level_index_size}, + {TablePropertiesNames::kIndexKeyIsUserKey, + &new_table_properties->index_key_is_user_key}, + {TablePropertiesNames::kIndexValueIsDeltaEncoded, + &new_table_properties->index_value_is_delta_encoded}, + {TablePropertiesNames::kFilterSize, &new_table_properties->filter_size}, + {TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size}, + {TablePropertiesNames::kRawValueSize, + &new_table_properties->raw_value_size}, + {TablePropertiesNames::kNumDataBlocks, + &new_table_properties->num_data_blocks}, + {TablePropertiesNames::kNumEntries, &new_table_properties->num_entries}, + {TablePropertiesNames::kDeletedKeys, + &new_table_properties->num_deletions}, + {TablePropertiesNames::kMergeOperands, + &new_table_properties->num_merge_operands}, + {TablePropertiesNames::kNumRangeDeletions, + &new_table_properties->num_range_deletions}, + {TablePropertiesNames::kFormatVersion, + &new_table_properties->format_version}, + {TablePropertiesNames::kFixedKeyLen, + &new_table_properties->fixed_key_len}, + {TablePropertiesNames::kColumnFamilyId, + &new_table_properties->column_family_id}, + {TablePropertiesNames::kCreationTime, + &new_table_properties->creation_time}, + {TablePropertiesNames::kOldestKeyTime, + &new_table_properties->oldest_key_time}, + }; + + std::string last_key; + for (iter.SeekToFirstOrReport(); iter.Valid(); iter.NextOrReport()) { + s = iter.status(); + if (!s.ok()) { + break; + } + + auto key = iter.key().ToString(); + // properties block should be strictly sorted with no duplicate key. + if (!last_key.empty() && + BytewiseComparator()->Compare(key, last_key) <= 0) { + s = Status::Corruption("properties unsorted"); + break; + } + last_key = key; + + auto raw_val = iter.value(); + auto pos = predefined_uint64_properties.find(key); + + new_table_properties->properties_offsets.insert( + {key, handle.offset() + iter.ValueOffset()}); + + if (pos != predefined_uint64_properties.end()) { + if (key == TablePropertiesNames::kDeletedKeys || + key == TablePropertiesNames::kMergeOperands) { + // Insert in user-collected properties for API backwards compatibility + new_table_properties->user_collected_properties.insert( + {key, raw_val.ToString()}); + } + // handle predefined rocksdb properties + uint64_t val; + if (!GetVarint64(&raw_val, &val)) { + // skip malformed value + auto error_msg = + "Detect malformed value in properties meta-block:" + "\tkey: " + key + "\tval: " + raw_val.ToString(); + ROCKS_LOG_ERROR(ioptions.info_log, "%s", error_msg.c_str()); + continue; + } + *(pos->second) = val; + } else if (key == TablePropertiesNames::kFilterPolicy) { + new_table_properties->filter_policy_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kColumnFamilyName) { + new_table_properties->column_family_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kComparator) { + new_table_properties->comparator_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kMergeOperator) { + new_table_properties->merge_operator_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kPrefixExtractorName) { + new_table_properties->prefix_extractor_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kPropertyCollectors) { + new_table_properties->property_collectors_names = raw_val.ToString(); + } else if (key == TablePropertiesNames::kCompression) { + new_table_properties->compression_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kCompressionOptions) { + new_table_properties->compression_options = raw_val.ToString(); + } else { + // handle user-collected properties + new_table_properties->user_collected_properties.insert( + {key, raw_val.ToString()}); + } + } + if (s.ok()) { + *table_properties = new_table_properties; + if (ret_block_handle != nullptr) { + *ret_block_handle = handle; + } + if (verification_buf != nullptr) { + size_t len = handle.size() + kBlockTrailerSize; + *verification_buf = rocksdb::AllocateBlock(len, memory_allocator); + if (verification_buf->get() != nullptr) { + memcpy(verification_buf->get(), block_contents.data.data(), len); + } + } + } else { + delete new_table_properties; + } + + return s; +} + +Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, + uint64_t table_magic_number, + const ImmutableCFOptions& ioptions, + TableProperties** properties, + bool compression_type_missing, + MemoryAllocator* memory_allocator) { + // -- Read metaindex block + Footer footer; + auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, + &footer, table_magic_number); + if (!s.ok()) { + return s; + } + + auto metaindex_handle = footer.metaindex_handle(); + BlockContents metaindex_contents; + ReadOptions read_options; + read_options.verify_checksums = false; + PersistentCacheOptions cache_options; + + BlockFetcher block_fetcher( + file, nullptr /* prefetch_buffer */, footer, read_options, + metaindex_handle, &metaindex_contents, ioptions, false /* decompress */, + false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), + cache_options, memory_allocator); + s = block_fetcher.ReadBlockContents(); + if (!s.ok()) { + return s; + } + // property blocks are never compressed. Need to add uncompress logic if we + // are to compress it. + Block metaindex_block(std::move(metaindex_contents), + kDisableGlobalSequenceNumber); + std::unique_ptr<InternalIterator> meta_iter( + metaindex_block.NewIterator<DataBlockIter>(BytewiseComparator(), + BytewiseComparator())); + + // -- Read property block + bool found_properties_block = true; + s = SeekToPropertiesBlock(meta_iter.get(), &found_properties_block); + if (!s.ok()) { + return s; + } + + TableProperties table_properties; + if (found_properties_block == true) { + s = ReadProperties( + meta_iter->value(), file, nullptr /* prefetch_buffer */, footer, + ioptions, properties, false /* verify_checksum */, + nullptr /* ret_block_hanel */, nullptr /* ret_block_contents */, + compression_type_missing, memory_allocator); + } else { + s = Status::NotFound(); + } + + return s; +} + +Status FindMetaBlock(InternalIterator* meta_index_iter, + const std::string& meta_block_name, + BlockHandle* block_handle) { + meta_index_iter->Seek(meta_block_name); + if (meta_index_iter->status().ok() && meta_index_iter->Valid() && + meta_index_iter->key() == meta_block_name) { + Slice v = meta_index_iter->value(); + return block_handle->DecodeFrom(&v); + } else { + return Status::Corruption("Cannot find the meta block", meta_block_name); + } +} + +Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, + uint64_t table_magic_number, + const ImmutableCFOptions& ioptions, + const std::string& meta_block_name, + BlockHandle* block_handle, + bool /*compression_type_missing*/, + MemoryAllocator* memory_allocator) { + Footer footer; + auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, + &footer, table_magic_number); + if (!s.ok()) { + return s; + } + + auto metaindex_handle = footer.metaindex_handle(); + BlockContents metaindex_contents; + ReadOptions read_options; + read_options.verify_checksums = false; + PersistentCacheOptions cache_options; + BlockFetcher block_fetcher( + file, nullptr /* prefetch_buffer */, footer, read_options, + metaindex_handle, &metaindex_contents, ioptions, + false /* do decompression */, false /*maybe_compressed*/, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); + s = block_fetcher.ReadBlockContents(); + if (!s.ok()) { + return s; + } + // meta blocks are never compressed. Need to add uncompress logic if we are to + // compress it. + Block metaindex_block(std::move(metaindex_contents), + kDisableGlobalSequenceNumber); + + std::unique_ptr<InternalIterator> meta_iter; + meta_iter.reset(metaindex_block.NewIterator<DataBlockIter>( + BytewiseComparator(), BytewiseComparator())); + + return FindMetaBlock(meta_iter.get(), meta_block_name, block_handle); +} + +Status ReadMetaBlock(RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, uint64_t file_size, + uint64_t table_magic_number, + const ImmutableCFOptions& ioptions, + const std::string& meta_block_name, + BlockContents* contents, bool /*compression_type_missing*/, + MemoryAllocator* memory_allocator) { + Status status; + Footer footer; + status = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer, + table_magic_number); + if (!status.ok()) { + return status; + } + + // Reading metaindex block + auto metaindex_handle = footer.metaindex_handle(); + BlockContents metaindex_contents; + ReadOptions read_options; + read_options.verify_checksums = false; + PersistentCacheOptions cache_options; + + BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options, + metaindex_handle, &metaindex_contents, ioptions, + false /* decompress */, false /*maybe_compressed*/, + UncompressionDict::GetEmptyDict(), cache_options, + memory_allocator); + status = block_fetcher.ReadBlockContents(); + if (!status.ok()) { + return status; + } + // meta block is never compressed. Need to add uncompress logic if we are to + // compress it. + + // Finding metablock + Block metaindex_block(std::move(metaindex_contents), + kDisableGlobalSequenceNumber); + + std::unique_ptr<InternalIterator> meta_iter; + meta_iter.reset(metaindex_block.NewIterator<DataBlockIter>( + BytewiseComparator(), BytewiseComparator())); + + BlockHandle block_handle; + status = FindMetaBlock(meta_iter.get(), meta_block_name, &block_handle); + + if (!status.ok()) { + return status; + } + + // Reading metablock + BlockFetcher block_fetcher2( + file, prefetch_buffer, footer, read_options, block_handle, contents, + ioptions, false /* decompress */, false /*maybe_compressed*/, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); + return block_fetcher2.ReadBlockContents(); +} + +} // namespace rocksdb |