From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../table/block_based/block_based_table_builder.cc | 2096 ++++++++++++++++++++ 1 file changed, 2096 insertions(+) create mode 100644 src/rocksdb/table/block_based/block_based_table_builder.cc (limited to 'src/rocksdb/table/block_based/block_based_table_builder.cc') diff --git a/src/rocksdb/table/block_based/block_based_table_builder.cc b/src/rocksdb/table/block_based/block_based_table_builder.cc new file mode 100644 index 000000000..fed69af07 --- /dev/null +++ b/src/rocksdb/table/block_based/block_based_table_builder.cc @@ -0,0 +1,2096 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/block_based/block_based_table_builder.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "cache/cache_helpers.h" +#include "cache/cache_key.h" +#include "cache/cache_reservation_manager.h" +#include "db/dbformat.h" +#include "index_builder.h" +#include "logging/logging.h" +#include "memory/memory_allocator.h" +#include "rocksdb/cache.h" +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/flush_block_policy.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/table.h" +#include "rocksdb/types.h" +#include "table/block_based/block.h" +#include "table/block_based/block_based_table_factory.h" +#include "table/block_based/block_based_table_reader.h" +#include "table/block_based/block_builder.h" +#include "table/block_based/block_like_traits.h" +#include "table/block_based/filter_block.h" +#include "table/block_based/filter_policy_internal.h" +#include "table/block_based/full_filter_block.h" +#include "table/block_based/partitioned_filter_block.h" +#include "table/format.h" +#include "table/meta_blocks.h" +#include "table/table_builder.h" +#include "util/coding.h" +#include "util/compression.h" +#include "util/stop_watch.h" +#include "util/string_util.h" +#include "util/work_queue.h" + +namespace ROCKSDB_NAMESPACE { + +extern const std::string kHashIndexPrefixesBlock; +extern const std::string kHashIndexPrefixesMetadataBlock; + +// Without anonymous namespace here, we fail the warning -Wmissing-prototypes +namespace { + +constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize; + +// Create a filter block builder based on its type. +FilterBlockBuilder* CreateFilterBlockBuilder( + const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt, + const FilterBuildingContext& context, + const bool use_delta_encoding_for_index_values, + PartitionedIndexBuilder* const p_index_builder) { + const BlockBasedTableOptions& table_opt = context.table_options; + assert(table_opt.filter_policy); // precondition + + FilterBitsBuilder* filter_bits_builder = + BloomFilterPolicy::GetBuilderFromContext(context); + if (filter_bits_builder == nullptr) { + return nullptr; + } else { + if (table_opt.partition_filters) { + assert(p_index_builder != nullptr); + // Since after partition cut request from filter builder it takes time + // until index builder actully cuts the partition, until the end of a + // data block potentially with many keys, we take the lower bound as + // partition size. + assert(table_opt.block_size_deviation <= 100); + auto partition_size = + static_cast(((table_opt.metadata_block_size * + (100 - table_opt.block_size_deviation)) + + 99) / + 100); + partition_size = std::max(partition_size, static_cast(1)); + return new PartitionedFilterBlockBuilder( + mopt.prefix_extractor.get(), table_opt.whole_key_filtering, + filter_bits_builder, table_opt.index_block_restart_interval, + use_delta_encoding_for_index_values, p_index_builder, partition_size); + } else { + return new FullFilterBlockBuilder(mopt.prefix_extractor.get(), + table_opt.whole_key_filtering, + filter_bits_builder); + } + } +} + +bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size) { + // Check to see if compressed less than 12.5% + return compressed_size < uncomp_size - (uncomp_size / 8u); +} + +} // namespace + +// format_version is the block format as defined in include/rocksdb/table.h +Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info, + CompressionType* type, uint32_t format_version, + bool do_sample, std::string* compressed_output, + std::string* sampled_output_fast, + std::string* sampled_output_slow) { + assert(type); + assert(compressed_output); + assert(compressed_output->empty()); + + // If requested, we sample one in every N block with a + // fast and slow compression algorithm and report the stats. + // The users can use these stats to decide if it is worthwhile + // enabling compression and they also get a hint about which + // compression algorithm wil be beneficial. + if (do_sample && info.SampleForCompression() && + Random::GetTLSInstance()->OneIn( + static_cast(info.SampleForCompression()))) { + // Sampling with a fast compression algorithm + if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) { + CompressionType c = + LZ4_Supported() ? kLZ4Compression : kSnappyCompression; + CompressionContext context(c); + CompressionOptions options; + CompressionInfo info_tmp(options, context, + CompressionDict::GetEmptyDict(), c, + info.SampleForCompression()); + + CompressData(uncompressed_data, info_tmp, + GetCompressFormatForVersion(format_version), + sampled_output_fast); + } + + // Sampling with a slow but high-compression algorithm + if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) { + CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression; + CompressionContext context(c); + CompressionOptions options; + CompressionInfo info_tmp(options, context, + CompressionDict::GetEmptyDict(), c, + info.SampleForCompression()); + + CompressData(uncompressed_data, info_tmp, + GetCompressFormatForVersion(format_version), + sampled_output_slow); + } + } + + if (info.type() == kNoCompression) { + *type = kNoCompression; + return uncompressed_data; + } + + // Actually compress the data; if the compression method is not supported, + // or the compression fails etc., just fall back to uncompressed + if (!CompressData(uncompressed_data, info, + GetCompressFormatForVersion(format_version), + compressed_output)) { + *type = kNoCompression; + return uncompressed_data; + } + + // Check the compression ratio; if it's not good enough, just fall back to + // uncompressed + if (!GoodCompressionRatio(compressed_output->size(), + uncompressed_data.size())) { + *type = kNoCompression; + return uncompressed_data; + } + + *type = info.type(); + return *compressed_output; +} + +// kBlockBasedTableMagicNumber was picked by running +// echo rocksdb.table.block_based | sha1sum +// and taking the leading 64 bits. +// Please note that kBlockBasedTableMagicNumber may also be accessed by other +// .cc files +// for that reason we declare it extern in the header but to get the space +// allocated +// it must be not extern in one place. +const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull; +// We also support reading and writing legacy block based table format (for +// backwards compatibility) +const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull; + +// A collector that collects properties of interest to block-based table. +// For now this class looks heavy-weight since we only write one additional +// property. +// But in the foreseeable future, we will add more and more properties that are +// specific to block-based table. +class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector + : public IntTblPropCollector { + public: + explicit BlockBasedTablePropertiesCollector( + BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering, + bool prefix_filtering) + : index_type_(index_type), + whole_key_filtering_(whole_key_filtering), + prefix_filtering_(prefix_filtering) {} + + Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/, + uint64_t /*file_size*/) override { + // Intentionally left blank. Have no interest in collecting stats for + // individual key/value pairs. + return Status::OK(); + } + + virtual void BlockAdd(uint64_t /* block_uncomp_bytes */, + uint64_t /* block_compressed_bytes_fast */, + uint64_t /* block_compressed_bytes_slow */) override { + // Intentionally left blank. No interest in collecting stats for + // blocks. + return; + } + + Status Finish(UserCollectedProperties* properties) override { + std::string val; + PutFixed32(&val, static_cast(index_type_)); + properties->insert({BlockBasedTablePropertyNames::kIndexType, val}); + properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering, + whole_key_filtering_ ? kPropTrue : kPropFalse}); + properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering, + prefix_filtering_ ? kPropTrue : kPropFalse}); + return Status::OK(); + } + + // The name of the properties collector can be used for debugging purpose. + const char* Name() const override { + return "BlockBasedTablePropertiesCollector"; + } + + UserCollectedProperties GetReadableProperties() const override { + // Intentionally left blank. + return UserCollectedProperties(); + } + + private: + BlockBasedTableOptions::IndexType index_type_; + bool whole_key_filtering_; + bool prefix_filtering_; +}; + +struct BlockBasedTableBuilder::Rep { + const ImmutableOptions ioptions; + const MutableCFOptions moptions; + const BlockBasedTableOptions table_options; + const InternalKeyComparator& internal_comparator; + WritableFileWriter* file; + std::atomic offset; + size_t alignment; + BlockBuilder data_block; + // Buffers uncompressed data blocks to replay later. Needed when + // compression dictionary is enabled so we can finalize the dictionary before + // compressing any data blocks. + std::vector data_block_buffers; + BlockBuilder range_del_block; + + InternalKeySliceTransform internal_prefix_transform; + std::unique_ptr index_builder; + PartitionedIndexBuilder* p_index_builder_ = nullptr; + + std::string last_key; + const Slice* first_key_in_next_block = nullptr; + CompressionType compression_type; + uint64_t sample_for_compression; + std::atomic compressible_input_data_bytes; + std::atomic uncompressible_input_data_bytes; + std::atomic sampled_input_data_bytes; + std::atomic sampled_output_slow_data_bytes; + std::atomic sampled_output_fast_data_bytes; + CompressionOptions compression_opts; + std::unique_ptr compression_dict; + std::vector> compression_ctxs; + std::vector> verify_ctxs; + std::unique_ptr verify_dict; + + size_t data_begin_offset = 0; + + TableProperties props; + + // States of the builder. + // + // - `kBuffered`: This is the initial state where zero or more data blocks are + // accumulated uncompressed in-memory. From this state, call + // `EnterUnbuffered()` to finalize the compression dictionary if enabled, + // compress/write out any buffered blocks, and proceed to the `kUnbuffered` + // state. + // + // - `kUnbuffered`: This is the state when compression dictionary is finalized + // either because it wasn't enabled in the first place or it's been created + // from sampling previously buffered data. In this state, blocks are simply + // compressed/written out as they fill up. From this state, call `Finish()` + // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete + // the partially created file. + // + // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been + // called, so the table builder is no longer usable. We must be in this + // state by the time the destructor runs. + enum class State { + kBuffered, + kUnbuffered, + kClosed, + }; + State state; + // `kBuffered` state is allowed only as long as the buffering of uncompressed + // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. + uint64_t buffer_limit; + std::shared_ptr + compression_dict_buffer_cache_res_mgr; + const bool use_delta_encoding_for_index_values; + std::unique_ptr filter_builder; + OffsetableCacheKey base_cache_key; + const TableFileCreationReason reason; + + BlockHandle pending_handle; // Handle to add to index block + + std::string compressed_output; + std::unique_ptr flush_block_policy; + + std::vector> table_properties_collectors; + + std::unique_ptr pc_rep; + + uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } + void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } + + bool IsParallelCompressionEnabled() const { + return compression_opts.parallel_threads > 1; + } + + Status GetStatus() { + // We need to make modifications of status visible when status_ok is set + // to false, and this is ensured by status_mutex, so no special memory + // order for status_ok is required. + if (status_ok.load(std::memory_order_relaxed)) { + return Status::OK(); + } else { + return CopyStatus(); + } + } + + Status CopyStatus() { + std::lock_guard lock(status_mutex); + return status; + } + + IOStatus GetIOStatus() { + // We need to make modifications of io_status visible when status_ok is set + // to false, and this is ensured by io_status_mutex, so no special memory + // order for io_status_ok is required. + if (io_status_ok.load(std::memory_order_relaxed)) { + return IOStatus::OK(); + } else { + return CopyIOStatus(); + } + } + + IOStatus CopyIOStatus() { + std::lock_guard lock(io_status_mutex); + return io_status; + } + + // Never erase an existing status that is not OK. + void SetStatus(Status s) { + if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { + // Locking is an overkill for non compression_opts.parallel_threads + // case but since it's unlikely that s is not OK, we take this cost + // to be simplicity. + std::lock_guard lock(status_mutex); + status = s; + status_ok.store(false, std::memory_order_relaxed); + } + } + + // Never erase an existing I/O status that is not OK. + // Calling this will also SetStatus(ios) + void SetIOStatus(IOStatus ios) { + if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { + // Locking is an overkill for non compression_opts.parallel_threads + // case but since it's unlikely that s is not OK, we take this cost + // to be simplicity. + std::lock_guard lock(io_status_mutex); + io_status = ios; + io_status_ok.store(false, std::memory_order_relaxed); + } + SetStatus(ios); + } + + Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo, + WritableFileWriter* f) + : ioptions(tbo.ioptions), + moptions(tbo.moptions), + table_options(table_opt), + internal_comparator(tbo.internal_comparator), + file(f), + offset(0), + alignment(table_options.block_align + ? std::min(static_cast(table_options.block_size), + kDefaultPageSize) + : 0), + data_block(table_options.block_restart_interval, + table_options.use_delta_encoding, + false /* use_value_delta_encoding */, + tbo.internal_comparator.user_comparator() + ->CanKeysWithDifferentByteContentsBeEqual() + ? BlockBasedTableOptions::kDataBlockBinarySearch + : table_options.data_block_index_type, + table_options.data_block_hash_table_util_ratio), + range_del_block(1 /* block_restart_interval */), + internal_prefix_transform(tbo.moptions.prefix_extractor.get()), + compression_type(tbo.compression_type), + sample_for_compression(tbo.moptions.sample_for_compression), + compressible_input_data_bytes(0), + uncompressible_input_data_bytes(0), + sampled_input_data_bytes(0), + sampled_output_slow_data_bytes(0), + sampled_output_fast_data_bytes(0), + compression_opts(tbo.compression_opts), + compression_dict(), + compression_ctxs(tbo.compression_opts.parallel_threads), + verify_ctxs(tbo.compression_opts.parallel_threads), + verify_dict(), + state((tbo.compression_opts.max_dict_bytes > 0) ? State::kBuffered + : State::kUnbuffered), + use_delta_encoding_for_index_values(table_opt.format_version >= 4 && + !table_opt.block_align), + reason(tbo.reason), + flush_block_policy( + table_options.flush_block_policy_factory->NewFlushBlockPolicy( + table_options, data_block)), + status_ok(true), + io_status_ok(true) { + if (tbo.target_file_size == 0) { + buffer_limit = compression_opts.max_dict_buffer_bytes; + } else if (compression_opts.max_dict_buffer_bytes == 0) { + buffer_limit = tbo.target_file_size; + } else { + buffer_limit = std::min(tbo.target_file_size, + compression_opts.max_dict_buffer_bytes); + } + + const auto compress_dict_build_buffer_charged = + table_options.cache_usage_options.options_overrides + .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer) + .charged; + if (table_options.block_cache && + (compress_dict_build_buffer_charged == + CacheEntryRoleOptions::Decision::kEnabled || + compress_dict_build_buffer_charged == + CacheEntryRoleOptions::Decision::kFallback)) { + compression_dict_buffer_cache_res_mgr = + std::make_shared>( + table_options.block_cache); + } else { + compression_dict_buffer_cache_res_mgr = nullptr; + } + + for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { + compression_ctxs[i].reset(new CompressionContext(compression_type)); + } + if (table_options.index_type == + BlockBasedTableOptions::kTwoLevelIndexSearch) { + p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( + &internal_comparator, use_delta_encoding_for_index_values, + table_options); + index_builder.reset(p_index_builder_); + } else { + index_builder.reset(IndexBuilder::CreateIndexBuilder( + table_options.index_type, &internal_comparator, + &this->internal_prefix_transform, use_delta_encoding_for_index_values, + table_options)); + } + if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) { + // Apply optimize_filters_for_hits setting here when applicable by + // skipping filter generation + filter_builder.reset(); + } else if (tbo.skip_filters) { + // For SstFileWriter skip_filters + filter_builder.reset(); + } else if (!table_options.filter_policy) { + // Null filter_policy -> no filter + filter_builder.reset(); + } else { + FilterBuildingContext filter_context(table_options); + + filter_context.info_log = ioptions.logger; + filter_context.column_family_name = tbo.column_family_name; + filter_context.reason = reason; + + // Only populate other fields if known to be in LSM rather than + // generating external SST file + if (reason != TableFileCreationReason::kMisc) { + filter_context.compaction_style = ioptions.compaction_style; + filter_context.num_levels = ioptions.num_levels; + filter_context.level_at_creation = tbo.level_at_creation; + filter_context.is_bottommost = tbo.is_bottommost; + assert(filter_context.level_at_creation < filter_context.num_levels); + } + + filter_builder.reset(CreateFilterBlockBuilder( + ioptions, moptions, filter_context, + use_delta_encoding_for_index_values, p_index_builder_)); + } + + assert(tbo.int_tbl_prop_collector_factories); + for (auto& factory : *tbo.int_tbl_prop_collector_factories) { + assert(factory); + + table_properties_collectors.emplace_back( + factory->CreateIntTblPropCollector(tbo.column_family_id, + tbo.level_at_creation)); + } + table_properties_collectors.emplace_back( + new BlockBasedTablePropertiesCollector( + table_options.index_type, table_options.whole_key_filtering, + moptions.prefix_extractor != nullptr)); + const Comparator* ucmp = tbo.internal_comparator.user_comparator(); + assert(ucmp); + if (ucmp->timestamp_size() > 0) { + table_properties_collectors.emplace_back( + new TimestampTablePropertiesCollector(ucmp)); + } + if (table_options.verify_compression) { + for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { + verify_ctxs[i].reset(new UncompressionContext(compression_type)); + } + } + + // These are only needed for populating table properties + props.column_family_id = tbo.column_family_id; + props.column_family_name = tbo.column_family_name; + props.oldest_key_time = tbo.oldest_key_time; + props.file_creation_time = tbo.file_creation_time; + props.orig_file_number = tbo.cur_file_num; + props.db_id = tbo.db_id; + props.db_session_id = tbo.db_session_id; + props.db_host_id = ioptions.db_host_id; + if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { + ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); + } + } + + Rep(const Rep&) = delete; + Rep& operator=(const Rep&) = delete; + + private: + // Synchronize status & io_status accesses across threads from main thread, + // compression thread and write thread in parallel compression. + std::mutex status_mutex; + std::atomic status_ok; + Status status; + std::mutex io_status_mutex; + std::atomic io_status_ok; + IOStatus io_status; +}; + +struct BlockBasedTableBuilder::ParallelCompressionRep { + // Keys is a wrapper of vector of strings avoiding + // releasing string memories during vector clear() + // in order to save memory allocation overhead + class Keys { + public: + Keys() : keys_(kKeysInitSize), size_(0) {} + void PushBack(const Slice& key) { + if (size_ == keys_.size()) { + keys_.emplace_back(key.data(), key.size()); + } else { + keys_[size_].assign(key.data(), key.size()); + } + size_++; + } + void SwapAssign(std::vector& keys) { + size_ = keys.size(); + std::swap(keys_, keys); + } + void Clear() { size_ = 0; } + size_t Size() { return size_; } + std::string& Back() { return keys_[size_ - 1]; } + std::string& operator[](size_t idx) { + assert(idx < size_); + return keys_[idx]; + } + + private: + const size_t kKeysInitSize = 32; + std::vector keys_; + size_t size_; + }; + std::unique_ptr curr_block_keys; + + class BlockRepSlot; + + // BlockRep instances are fetched from and recycled to + // block_rep_pool during parallel compression. + struct BlockRep { + Slice contents; + Slice compressed_contents; + std::unique_ptr data; + std::unique_ptr compressed_data; + CompressionType compression_type; + std::unique_ptr first_key_in_next_block; + std::unique_ptr keys; + std::unique_ptr slot; + Status status; + }; + // Use a vector of BlockRep as a buffer for a determined number + // of BlockRep structures. All data referenced by pointers in + // BlockRep will be freed when this vector is destructed. + using BlockRepBuffer = std::vector; + BlockRepBuffer block_rep_buf; + // Use a thread-safe queue for concurrent access from block + // building thread and writer thread. + using BlockRepPool = WorkQueue; + BlockRepPool block_rep_pool; + + // Use BlockRepSlot to keep block order in write thread. + // slot_ will pass references to BlockRep + class BlockRepSlot { + public: + BlockRepSlot() : slot_(1) {} + template + void Fill(T&& rep) { + slot_.push(std::forward(rep)); + }; + void Take(BlockRep*& rep) { slot_.pop(rep); } + + private: + // slot_ will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + WorkQueue slot_; + }; + + // Compression queue will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + using CompressQueue = WorkQueue; + CompressQueue compress_queue; + std::vector compress_thread_pool; + + // Write queue will pass references to BlockRep::slot in block_rep_buf, + // and those references are always valid before the corresponding + // BlockRep::slot is destructed, which is before the destruction of + // block_rep_buf. + using WriteQueue = WorkQueue; + WriteQueue write_queue; + std::unique_ptr write_thread; + + // Estimate output file size when parallel compression is enabled. This is + // necessary because compression & flush are no longer synchronized, + // and BlockBasedTableBuilder::FileSize() is no longer accurate. + // memory_order_relaxed suffices because accurate statistics is not required. + class FileSizeEstimator { + public: + explicit FileSizeEstimator() + : uncomp_bytes_compressed(0), + uncomp_bytes_curr_block(0), + uncomp_bytes_curr_block_set(false), + uncomp_bytes_inflight(0), + blocks_inflight(0), + curr_compression_ratio(0), + estimated_file_size(0) {} + + // Estimate file size when a block is about to be emitted to + // compression thread + void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_add(uncomp_block_size, + std::memory_order_relaxed) + + uncomp_block_size; + + uint64_t new_blocks_inflight = + blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; + + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_uncomp_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + } + + // Estimate file size when a block is already reaped from + // compression thread + void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { + assert(uncomp_bytes_curr_block_set); + + uint64_t new_uncomp_bytes_compressed = + uncomp_bytes_compressed + uncomp_bytes_curr_block; + assert(new_uncomp_bytes_compressed > 0); + + curr_compression_ratio.store( + (curr_compression_ratio.load(std::memory_order_relaxed) * + uncomp_bytes_compressed + + compressed_block_size) / + static_cast(new_uncomp_bytes_compressed), + std::memory_order_relaxed); + uncomp_bytes_compressed = new_uncomp_bytes_compressed; + + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, + std::memory_order_relaxed) - + uncomp_bytes_curr_block; + + uint64_t new_blocks_inflight = + blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; + + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_uncomp_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + + uncomp_bytes_curr_block_set = false; + } + + void SetEstimatedFileSize(uint64_t size) { + estimated_file_size.store(size, std::memory_order_relaxed); + } + + uint64_t GetEstimatedFileSize() { + return estimated_file_size.load(std::memory_order_relaxed); + } + + void SetCurrBlockUncompSize(uint64_t size) { + uncomp_bytes_curr_block = size; + uncomp_bytes_curr_block_set = true; + } + + private: + // Input bytes compressed so far. + uint64_t uncomp_bytes_compressed; + // Size of current block being appended. + uint64_t uncomp_bytes_curr_block; + // Whether uncomp_bytes_curr_block has been set for next + // ReapBlock call. + bool uncomp_bytes_curr_block_set; + // Input bytes under compression and not appended yet. + std::atomic uncomp_bytes_inflight; + // Number of blocks under compression and not appended yet. + std::atomic blocks_inflight; + // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. + std::atomic curr_compression_ratio; + // Estimated SST file size. + std::atomic estimated_file_size; + }; + FileSizeEstimator file_size_estimator; + + // Facilities used for waiting first block completion. Need to Wait for + // the completion of first block compression and flush to get a non-zero + // compression ratio. + std::atomic first_block_processed; + std::condition_variable first_block_cond; + std::mutex first_block_mutex; + + explicit ParallelCompressionRep(uint32_t parallel_threads) + : curr_block_keys(new Keys()), + block_rep_buf(parallel_threads), + block_rep_pool(parallel_threads), + compress_queue(parallel_threads), + write_queue(parallel_threads), + first_block_processed(false) { + for (uint32_t i = 0; i < parallel_threads; i++) { + block_rep_buf[i].contents = Slice(); + block_rep_buf[i].compressed_contents = Slice(); + block_rep_buf[i].data.reset(new std::string()); + block_rep_buf[i].compressed_data.reset(new std::string()); + block_rep_buf[i].compression_type = CompressionType(); + block_rep_buf[i].first_key_in_next_block.reset(new std::string()); + block_rep_buf[i].keys.reset(new Keys()); + block_rep_buf[i].slot.reset(new BlockRepSlot()); + block_rep_buf[i].status = Status::OK(); + block_rep_pool.push(&block_rep_buf[i]); + } + } + + ~ParallelCompressionRep() { block_rep_pool.finish(); } + + // Make a block prepared to be emitted to compression thread + // Used in non-buffered mode + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + BlockBuilder* data_block) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + data_block->SwapAndReset(*(block_rep->data)); + block_rep->contents = *(block_rep->data); + std::swap(block_rep->keys, curr_block_keys); + curr_block_keys->Clear(); + return block_rep; + } + + // Used in EnterUnbuffered + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + std::string* data_block, + std::vector* keys) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + std::swap(*(block_rep->data), *data_block); + block_rep->contents = *(block_rep->data); + block_rep->keys->SwapAssign(*keys); + return block_rep; + } + + // Emit a block to compression thread + void EmitBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + assert(block_rep->status.ok()); + if (!write_queue.push(block_rep->slot.get())) { + return; + } + if (!compress_queue.push(block_rep)) { + return; + } + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::unique_lock lock(first_block_mutex); + first_block_cond.wait(lock, [this] { + return first_block_processed.load(std::memory_order_relaxed); + }); + } + } + + // Reap a block from compression thread + void ReapBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + block_rep->compressed_data->clear(); + block_rep_pool.push(block_rep); + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::lock_guard lock(first_block_mutex); + first_block_processed.store(true, std::memory_order_relaxed); + first_block_cond.notify_one(); + } + } + + private: + BlockRep* PrepareBlockInternal(CompressionType compression_type, + const Slice* first_key_in_next_block) { + BlockRep* block_rep = nullptr; + block_rep_pool.pop(block_rep); + assert(block_rep != nullptr); + + assert(block_rep->data); + + block_rep->compression_type = compression_type; + + if (first_key_in_next_block == nullptr) { + block_rep->first_key_in_next_block.reset(nullptr); + } else { + block_rep->first_key_in_next_block->assign( + first_key_in_next_block->data(), first_key_in_next_block->size()); + } + + return block_rep; + } +}; + +BlockBasedTableBuilder::BlockBasedTableBuilder( + const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo, + WritableFileWriter* file) { + BlockBasedTableOptions sanitized_table_options(table_options); + if (sanitized_table_options.format_version == 0 && + sanitized_table_options.checksum != kCRC32c) { + ROCKS_LOG_WARN( + tbo.ioptions.logger, + "Silently converting format_version to 1 because checksum is " + "non-default"); + // silently convert format_version to 1 to keep consistent with current + // behavior + sanitized_table_options.format_version = 1; + } + + rep_ = new Rep(sanitized_table_options, tbo, file); + + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey", + const_cast(&rep_->props)); + + BlockBasedTable::SetupBaseCacheKey(&rep_->props, tbo.db_session_id, + tbo.cur_file_num, &rep_->base_cache_key); + + if (rep_->IsParallelCompressionEnabled()) { + StartParallelCompression(); + } +} + +BlockBasedTableBuilder::~BlockBasedTableBuilder() { + // Catch errors where caller forgot to call Finish() + assert(rep_->state == Rep::State::kClosed); + delete rep_; +} + +void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { + Rep* r = rep_; + assert(rep_->state != Rep::State::kClosed); + if (!ok()) return; + ValueType value_type = ExtractValueType(key); + if (IsValueType(value_type)) { +#ifndef NDEBUG + if (r->props.num_entries > r->props.num_range_deletions) { + assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0); + } +#endif // !NDEBUG + + auto should_flush = r->flush_block_policy->Update(key, value); + if (should_flush) { + assert(!r->data_block.empty()); + r->first_key_in_next_block = &key; + Flush(); + if (r->state == Rep::State::kBuffered) { + bool exceeds_buffer_limit = + (r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit); + bool exceeds_global_block_cache_limit = false; + + // Increase cache charging for the last buffered data block + // only if the block is not going to be unbuffered immediately + // and there exists a cache reservation manager + if (!exceeds_buffer_limit && + r->compression_dict_buffer_cache_res_mgr != nullptr) { + Status s = + r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation( + r->data_begin_offset); + exceeds_global_block_cache_limit = s.IsMemoryLimit(); + } + + if (exceeds_buffer_limit || exceeds_global_block_cache_limit) { + EnterUnbuffered(); + } + } + + // Add item to index block. + // We do not emit the index entry for a block until we have seen the + // first key for the next data block. This allows us to use shorter + // keys in the index block. For example, consider a block boundary + // between the keys "the quick brown fox" and "the who". We can use + // "the r" as the key for the index block entry since it is >= all + // entries in the first block and < all entries in subsequent + // blocks. + if (ok() && r->state == Rep::State::kUnbuffered) { + if (r->IsParallelCompressionEnabled()) { + r->pc_rep->curr_block_keys->Clear(); + } else { + r->index_builder->AddIndexEntry(&r->last_key, &key, + r->pending_handle); + } + } + } + + // Note: PartitionedFilterBlockBuilder requires key being added to filter + // builder after being added to index builder. + if (r->state == Rep::State::kUnbuffered) { + if (r->IsParallelCompressionEnabled()) { + r->pc_rep->curr_block_keys->PushBack(key); + } else { + if (r->filter_builder != nullptr) { + size_t ts_sz = + r->internal_comparator.user_comparator()->timestamp_size(); + r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + } + } + } + + r->data_block.AddWithLastKey(key, value, r->last_key); + r->last_key.assign(key.data(), key.size()); + if (r->state == Rep::State::kBuffered) { + // Buffered keys will be replayed from data_block_buffers during + // `Finish()` once compression dictionary has been finalized. + } else { + if (!r->IsParallelCompressionEnabled()) { + r->index_builder->OnKeyAdded(key); + } + } + // TODO offset passed in is not accurate for parallel compression case + NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(), + r->table_properties_collectors, + r->ioptions.logger); + + } else if (value_type == kTypeRangeDeletion) { + r->range_del_block.Add(key, value); + // TODO offset passed in is not accurate for parallel compression case + NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(), + r->table_properties_collectors, + r->ioptions.logger); + } else { + assert(false); + } + + r->props.num_entries++; + r->props.raw_key_size += key.size(); + r->props.raw_value_size += value.size(); + if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion || + value_type == kTypeDeletionWithTimestamp) { + r->props.num_deletions++; + } else if (value_type == kTypeRangeDeletion) { + r->props.num_deletions++; + r->props.num_range_deletions++; + } else if (value_type == kTypeMerge) { + r->props.num_merge_operands++; + } +} + +void BlockBasedTableBuilder::Flush() { + Rep* r = rep_; + assert(rep_->state != Rep::State::kClosed); + if (!ok()) return; + if (r->data_block.empty()) return; + if (r->IsParallelCompressionEnabled() && + r->state == Rep::State::kUnbuffered) { + r->data_block.Finish(); + ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( + r->compression_type, r->first_key_in_next_block, &(r->data_block)); + assert(block_rep != nullptr); + r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), + r->get_offset()); + r->pc_rep->EmitBlock(block_rep); + } else { + WriteBlock(&r->data_block, &r->pending_handle, BlockType::kData); + } +} + +void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, + BlockHandle* handle, + BlockType block_type) { + block->Finish(); + std::string uncompressed_block_data; + uncompressed_block_data.reserve(rep_->table_options.block_size); + block->SwapAndReset(uncompressed_block_data); + if (rep_->state == Rep::State::kBuffered) { + assert(block_type == BlockType::kData); + rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_data)); + rep_->data_begin_offset += rep_->data_block_buffers.back().size(); + return; + } + WriteBlock(uncompressed_block_data, handle, block_type); +} + +void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data, + BlockHandle* handle, + BlockType block_type) { + Rep* r = rep_; + assert(r->state == Rep::State::kUnbuffered); + Slice block_contents; + CompressionType type; + Status compress_status; + bool is_data_block = block_type == BlockType::kData; + CompressAndVerifyBlock(uncompressed_block_data, is_data_block, + *(r->compression_ctxs[0]), r->verify_ctxs[0].get(), + &(r->compressed_output), &(block_contents), &type, + &compress_status); + r->SetStatus(compress_status); + if (!ok()) { + return; + } + + WriteMaybeCompressedBlock(block_contents, type, handle, block_type, + &uncompressed_block_data); + r->compressed_output.clear(); + if (is_data_block) { + r->props.data_size = r->get_offset(); + ++r->props.num_data_blocks; + } +} + +void BlockBasedTableBuilder::BGWorkCompression( + const CompressionContext& compression_ctx, + UncompressionContext* verify_ctx) { + ParallelCompressionRep::BlockRep* block_rep = nullptr; + while (rep_->pc_rep->compress_queue.pop(block_rep)) { + assert(block_rep != nullptr); + CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ + compression_ctx, verify_ctx, + block_rep->compressed_data.get(), + &block_rep->compressed_contents, + &(block_rep->compression_type), &block_rep->status); + block_rep->slot->Fill(block_rep); + } +} + +void BlockBasedTableBuilder::CompressAndVerifyBlock( + const Slice& uncompressed_block_data, bool is_data_block, + const CompressionContext& compression_ctx, UncompressionContext* verify_ctx, + std::string* compressed_output, Slice* block_contents, + CompressionType* type, Status* out_status) { + // File format contains a sequence of blocks where each block has: + // block_data: uint8[n] + // type: uint8 + // crc: uint32 + Rep* r = rep_; + bool is_status_ok = ok(); + if (!r->IsParallelCompressionEnabled()) { + assert(is_status_ok); + } + + *type = r->compression_type; + uint64_t sample_for_compression = r->sample_for_compression; + bool abort_compression = false; + + StopWatchNano timer( + r->ioptions.clock, + ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)); + + if (is_status_ok && uncompressed_block_data.size() < kCompressionSizeLimit) { + if (is_data_block) { + r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(), + std::memory_order_relaxed); + } + const CompressionDict* compression_dict; + if (!is_data_block || r->compression_dict == nullptr) { + compression_dict = &CompressionDict::GetEmptyDict(); + } else { + compression_dict = r->compression_dict.get(); + } + assert(compression_dict != nullptr); + CompressionInfo compression_info(r->compression_opts, compression_ctx, + *compression_dict, *type, + sample_for_compression); + + std::string sampled_output_fast; + std::string sampled_output_slow; + *block_contents = CompressBlock( + uncompressed_block_data, compression_info, type, + r->table_options.format_version, is_data_block /* do_sample */, + compressed_output, &sampled_output_fast, &sampled_output_slow); + + if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) { + // Currently compression sampling is only enabled for data block. + assert(is_data_block); + r->sampled_input_data_bytes.fetch_add(uncompressed_block_data.size(), + std::memory_order_relaxed); + r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(), + std::memory_order_relaxed); + r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(), + std::memory_order_relaxed); + } + // notify collectors on block add + NotifyCollectTableCollectorsOnBlockAdd( + r->table_properties_collectors, uncompressed_block_data.size(), + sampled_output_fast.size(), sampled_output_slow.size()); + + // Some of the compression algorithms are known to be unreliable. If + // the verify_compression flag is set then try to de-compress the + // compressed data and compare to the input. + if (*type != kNoCompression && r->table_options.verify_compression) { + // Retrieve the uncompressed contents into a new buffer + const UncompressionDict* verify_dict; + if (!is_data_block || r->verify_dict == nullptr) { + verify_dict = &UncompressionDict::GetEmptyDict(); + } else { + verify_dict = r->verify_dict.get(); + } + assert(verify_dict != nullptr); + BlockContents contents; + UncompressionInfo uncompression_info(*verify_ctx, *verify_dict, + r->compression_type); + Status stat = UncompressBlockData( + uncompression_info, block_contents->data(), block_contents->size(), + &contents, r->table_options.format_version, r->ioptions); + + if (stat.ok()) { + bool compressed_ok = + contents.data.compare(uncompressed_block_data) == 0; + if (!compressed_ok) { + // The result of the compression was invalid. abort. + abort_compression = true; + const char* const msg = + "Decompressed block did not match pre-compression block"; + ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg); + *out_status = Status::Corruption(msg); + } + } else { + // Decompression reported an error. abort. + *out_status = Status::Corruption(std::string("Could not decompress: ") + + stat.getState()); + abort_compression = true; + } + } + } else { + // Block is too big to be compressed. + if (is_data_block) { + r->uncompressible_input_data_bytes.fetch_add( + uncompressed_block_data.size(), std::memory_order_relaxed); + } + abort_compression = true; + } + if (is_data_block) { + r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize, + std::memory_order_relaxed); + } + + // Abort compression if the block is too big, or did not pass + // verification. + if (abort_compression) { + RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED); + *type = kNoCompression; + *block_contents = uncompressed_block_data; + } else if (*type != kNoCompression) { + if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) { + RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS, + timer.ElapsedNanos()); + } + RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED, + uncompressed_block_data.size()); + RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED); + } else if (*type != r->compression_type) { + RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED); + } +} + +void BlockBasedTableBuilder::WriteMaybeCompressedBlock( + const Slice& block_contents, CompressionType type, BlockHandle* handle, + BlockType block_type, const Slice* uncompressed_block_data) { + Rep* r = rep_; + bool is_data_block = block_type == BlockType::kData; + // Old, misleading name of this function: WriteRawBlock + StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS); + handle->set_offset(r->get_offset()); + handle->set_size(block_contents.size()); + assert(status().ok()); + assert(io_status().ok()); + + { + IOStatus io_s = r->file->Append(block_contents); + if (!io_s.ok()) { + r->SetIOStatus(io_s); + return; + } + } + + std::array trailer; + trailer[0] = type; + uint32_t checksum = ComputeBuiltinChecksumWithLastByte( + r->table_options.checksum, block_contents.data(), block_contents.size(), + /*last_byte*/ type); + + if (block_type == BlockType::kFilter) { + Status s = r->filter_builder->MaybePostVerifyFilter(block_contents); + if (!s.ok()) { + r->SetStatus(s); + return; + } + } + + EncodeFixed32(trailer.data() + 1, checksum); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum", + trailer.data()); + { + IOStatus io_s = r->file->Append(Slice(trailer.data(), trailer.size())); + if (!io_s.ok()) { + r->SetIOStatus(io_s); + return; + } + } + + { + Status s = Status::OK(); + bool warm_cache; + switch (r->table_options.prepopulate_block_cache) { + case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly: + warm_cache = (r->reason == TableFileCreationReason::kFlush); + break; + case BlockBasedTableOptions::PrepopulateBlockCache::kDisable: + warm_cache = false; + break; + default: + // missing case + assert(false); + warm_cache = false; + } + if (warm_cache) { + if (type == kNoCompression) { + s = InsertBlockInCacheHelper(block_contents, handle, block_type); + } else if (uncompressed_block_data != nullptr) { + s = InsertBlockInCacheHelper(*uncompressed_block_data, handle, + block_type); + } + if (!s.ok()) { + r->SetStatus(s); + return; + } + } + s = InsertBlockInCompressedCache(block_contents, type, handle); + if (!s.ok()) { + r->SetStatus(s); + return; + } + } + + r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize); + if (r->table_options.block_align && is_data_block) { + size_t pad_bytes = + (r->alignment - + ((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) & + (r->alignment - 1); + IOStatus io_s = r->file->Pad(pad_bytes); + if (io_s.ok()) { + r->set_offset(r->get_offset() + pad_bytes); + } else { + r->SetIOStatus(io_s); + return; + } + } + + if (r->IsParallelCompressionEnabled()) { + if (is_data_block) { + r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(), + r->get_offset()); + } else { + r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset()); + } + } +} + +void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() { + Rep* r = rep_; + ParallelCompressionRep::BlockRepSlot* slot = nullptr; + ParallelCompressionRep::BlockRep* block_rep = nullptr; + while (r->pc_rep->write_queue.pop(slot)) { + assert(slot != nullptr); + slot->Take(block_rep); + assert(block_rep != nullptr); + if (!block_rep->status.ok()) { + r->SetStatus(block_rep->status); + // Reap block so that blocked Flush() can finish + // if there is one, and Flush() will notice !ok() next time. + block_rep->status = Status::OK(); + r->pc_rep->ReapBlock(block_rep); + continue; + } + + for (size_t i = 0; i < block_rep->keys->Size(); i++) { + auto& key = (*block_rep->keys)[i]; + if (r->filter_builder != nullptr) { + size_t ts_sz = + r->internal_comparator.user_comparator()->timestamp_size(); + r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + } + r->index_builder->OnKeyAdded(key); + } + + r->pc_rep->file_size_estimator.SetCurrBlockUncompSize( + block_rep->data->size()); + WriteMaybeCompressedBlock(block_rep->compressed_contents, + block_rep->compression_type, &r->pending_handle, + BlockType::kData, &block_rep->contents); + if (!ok()) { + break; + } + + r->props.data_size = r->get_offset(); + ++r->props.num_data_blocks; + + if (block_rep->first_key_in_next_block == nullptr) { + r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr, + r->pending_handle); + } else { + Slice first_key_in_next_block = + Slice(*block_rep->first_key_in_next_block); + r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), + &first_key_in_next_block, + r->pending_handle); + } + + r->pc_rep->ReapBlock(block_rep); + } +} + +void BlockBasedTableBuilder::StartParallelCompression() { + rep_->pc_rep.reset( + new ParallelCompressionRep(rep_->compression_opts.parallel_threads)); + rep_->pc_rep->compress_thread_pool.reserve( + rep_->compression_opts.parallel_threads); + for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) { + rep_->pc_rep->compress_thread_pool.emplace_back([this, i] { + BGWorkCompression(*(rep_->compression_ctxs[i]), + rep_->verify_ctxs[i].get()); + }); + } + rep_->pc_rep->write_thread.reset( + new port::Thread([this] { BGWorkWriteMaybeCompressedBlock(); })); +} + +void BlockBasedTableBuilder::StopParallelCompression() { + rep_->pc_rep->compress_queue.finish(); + for (auto& thread : rep_->pc_rep->compress_thread_pool) { + thread.join(); + } + rep_->pc_rep->write_queue.finish(); + rep_->pc_rep->write_thread->join(); +} + +Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); } + +IOStatus BlockBasedTableBuilder::io_status() const { + return rep_->GetIOStatus(); +} + +// +// Make a copy of the block contents and insert into compressed block cache +// +Status BlockBasedTableBuilder::InsertBlockInCompressedCache( + const Slice& block_contents, const CompressionType type, + const BlockHandle* handle) { + Rep* r = rep_; + Cache* block_cache_compressed = r->table_options.block_cache_compressed.get(); + Status s; + if (type != kNoCompression && block_cache_compressed != nullptr) { + size_t size = block_contents.size(); + + auto ubuf = + AllocateBlock(size + 1, block_cache_compressed->memory_allocator()); + memcpy(ubuf.get(), block_contents.data(), size); + ubuf[size] = type; + + BlockContents* block_contents_to_cache = + new BlockContents(std::move(ubuf), size); +#ifndef NDEBUG + block_contents_to_cache->has_trailer = true; +#endif // NDEBUG + + CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle); + + s = block_cache_compressed->Insert( + key.AsSlice(), block_contents_to_cache, + block_contents_to_cache->ApproximateMemoryUsage(), + &DeleteCacheEntry); + if (s.ok()) { + RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD); + } else { + RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + } + // Invalidate OS cache. + r->file->InvalidateCache(static_cast(r->get_offset()), size) + .PermitUncheckedError(); + } + return s; +} + +Status BlockBasedTableBuilder::InsertBlockInCacheHelper( + const Slice& block_contents, const BlockHandle* handle, + BlockType block_type) { + Status s; + switch (block_type) { + case BlockType::kData: + case BlockType::kIndex: + case BlockType::kFilterPartitionIndex: + s = InsertBlockInCache(block_contents, handle, block_type); + break; + case BlockType::kFilter: + s = InsertBlockInCache(block_contents, handle, + block_type); + break; + case BlockType::kCompressionDictionary: + s = InsertBlockInCache(block_contents, handle, + block_type); + break; + default: + // no-op / not cached + break; + } + return s; +} + +template +Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, + const BlockHandle* handle, + BlockType block_type) { + // Uncompressed regular block cache + Cache* block_cache = rep_->table_options.block_cache.get(); + Status s; + if (block_cache != nullptr) { + size_t size = block_contents.size(); + auto buf = AllocateBlock(size, block_cache->memory_allocator()); + memcpy(buf.get(), block_contents.data(), size); + BlockContents results(std::move(buf), size); + + CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle); + + const size_t read_amp_bytes_per_bit = + rep_->table_options.read_amp_bytes_per_bit; + + // TODO akanksha:: Dedup below code by calling + // BlockBasedTable::PutDataBlockToCache. + std::unique_ptr block_holder( + BlocklikeTraits::Create( + std::move(results), read_amp_bytes_per_bit, + rep_->ioptions.statistics.get(), + false /*rep_->blocks_definitely_zstd_compressed*/, + rep_->table_options.filter_policy.get())); + + assert(block_holder->own_bytes()); + size_t charge = block_holder->ApproximateMemoryUsage(); + s = block_cache->Insert( + key.AsSlice(), block_holder.get(), + BlocklikeTraits::GetCacheItemHelper(block_type), charge, + nullptr, Cache::Priority::LOW); + + if (s.ok()) { + // Release ownership of block_holder. + block_holder.release(); + BlockBasedTable::UpdateCacheInsertionMetrics( + block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(), + rep_->ioptions.stats); + } else { + RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES); + } + } + return s; +} + +void BlockBasedTableBuilder::WriteFilterBlock( + MetaIndexBuilder* meta_index_builder) { + if (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty()) { + // No filter block needed + return; + } + BlockHandle filter_block_handle; + bool is_partitioned_filter = rep_->table_options.partition_filters; + if (ok()) { + rep_->props.num_filter_entries += + rep_->filter_builder->EstimateEntriesAdded(); + Status s = Status::Incomplete(); + while (ok() && s.IsIncomplete()) { + // filter_data is used to store the transferred filter data payload from + // FilterBlockBuilder and deallocate the payload by going out of scope. + // Otherwise, the payload will unnecessarily remain until + // BlockBasedTableBuilder is deallocated. + // + // See FilterBlockBuilder::Finish() for more on the difference in + // transferred filter data payload among different FilterBlockBuilder + // subtypes. + std::unique_ptr filter_data; + Slice filter_content = + rep_->filter_builder->Finish(filter_block_handle, &s, &filter_data); + + assert(s.ok() || s.IsIncomplete() || s.IsCorruption()); + if (s.IsCorruption()) { + rep_->SetStatus(s); + break; + } + + rep_->props.filter_size += filter_content.size(); + + BlockType btype = is_partitioned_filter && /* last */ s.ok() + ? BlockType::kFilterPartitionIndex + : BlockType::kFilter; + WriteMaybeCompressedBlock(filter_content, kNoCompression, + &filter_block_handle, btype); + } + rep_->filter_builder->ResetFilterBitsBuilder(); + } + if (ok()) { + // Add mapping from ".Name" to location + // of filter data. + std::string key; + key = is_partitioned_filter ? BlockBasedTable::kPartitionedFilterBlockPrefix + : BlockBasedTable::kFullFilterBlockPrefix; + key.append(rep_->table_options.filter_policy->CompatibilityName()); + meta_index_builder->Add(key, filter_block_handle); + } +} + +void BlockBasedTableBuilder::WriteIndexBlock( + MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) { + if (!ok()) { + return; + } + IndexBuilder::IndexBlocks index_blocks; + auto index_builder_status = rep_->index_builder->Finish(&index_blocks); + if (index_builder_status.IsIncomplete()) { + // We we have more than one index partition then meta_blocks are not + // supported for the index. Currently meta_blocks are used only by + // HashIndexBuilder which is not multi-partition. + assert(index_blocks.meta_blocks.empty()); + } else if (ok() && !index_builder_status.ok()) { + rep_->SetStatus(index_builder_status); + } + if (ok()) { + for (const auto& item : index_blocks.meta_blocks) { + BlockHandle block_handle; + WriteBlock(item.second, &block_handle, BlockType::kIndex); + if (!ok()) { + break; + } + meta_index_builder->Add(item.first, block_handle); + } + } + if (ok()) { + if (rep_->table_options.enable_index_compression) { + WriteBlock(index_blocks.index_block_contents, index_block_handle, + BlockType::kIndex); + } else { + WriteMaybeCompressedBlock(index_blocks.index_block_contents, + kNoCompression, index_block_handle, + BlockType::kIndex); + } + } + // If there are more index partitions, finish them and write them out + if (index_builder_status.IsIncomplete()) { + bool index_building_finished = false; + while (ok() && !index_building_finished) { + Status s = + rep_->index_builder->Finish(&index_blocks, *index_block_handle); + if (s.ok()) { + index_building_finished = true; + } else if (s.IsIncomplete()) { + // More partitioned index after this one + assert(!index_building_finished); + } else { + // Error + rep_->SetStatus(s); + return; + } + + if (rep_->table_options.enable_index_compression) { + WriteBlock(index_blocks.index_block_contents, index_block_handle, + BlockType::kIndex); + } else { + WriteMaybeCompressedBlock(index_blocks.index_block_contents, + kNoCompression, index_block_handle, + BlockType::kIndex); + } + // The last index_block_handle will be for the partition index block + } + } +} + +void BlockBasedTableBuilder::WritePropertiesBlock( + MetaIndexBuilder* meta_index_builder) { + BlockHandle properties_block_handle; + if (ok()) { + PropertyBlockBuilder property_block_builder; + rep_->props.filter_policy_name = + rep_->table_options.filter_policy != nullptr + ? rep_->table_options.filter_policy->Name() + : ""; + rep_->props.index_size = + rep_->index_builder->IndexSize() + kBlockTrailerSize; + rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr + ? rep_->ioptions.user_comparator->Name() + : "nullptr"; + rep_->props.merge_operator_name = + rep_->ioptions.merge_operator != nullptr + ? rep_->ioptions.merge_operator->Name() + : "nullptr"; + rep_->props.compression_name = + CompressionTypeToString(rep_->compression_type); + rep_->props.compression_options = + CompressionOptionsToString(rep_->compression_opts); + rep_->props.prefix_extractor_name = + rep_->moptions.prefix_extractor != nullptr + ? rep_->moptions.prefix_extractor->AsString() + : "nullptr"; + std::string property_collectors_names = "["; + for (size_t i = 0; + i < rep_->ioptions.table_properties_collector_factories.size(); ++i) { + if (i != 0) { + property_collectors_names += ","; + } + property_collectors_names += + rep_->ioptions.table_properties_collector_factories[i]->Name(); + } + property_collectors_names += "]"; + rep_->props.property_collectors_names = property_collectors_names; + if (rep_->table_options.index_type == + BlockBasedTableOptions::kTwoLevelIndexSearch) { + assert(rep_->p_index_builder_ != nullptr); + rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions(); + rep_->props.top_level_index_size = + rep_->p_index_builder_->TopLevelIndexSize(rep_->offset); + } + rep_->props.index_key_is_user_key = + !rep_->index_builder->seperator_is_key_plus_seq(); + rep_->props.index_value_is_delta_encoded = + rep_->use_delta_encoding_for_index_values; + if (rep_->sampled_input_data_bytes > 0) { + rep_->props.slow_compression_estimated_data_size = static_cast( + static_cast(rep_->sampled_output_slow_data_bytes) / + rep_->sampled_input_data_bytes * + rep_->compressible_input_data_bytes + + rep_->uncompressible_input_data_bytes + 0.5); + rep_->props.fast_compression_estimated_data_size = static_cast( + static_cast(rep_->sampled_output_fast_data_bytes) / + rep_->sampled_input_data_bytes * + rep_->compressible_input_data_bytes + + rep_->uncompressible_input_data_bytes + 0.5); + } else if (rep_->sample_for_compression > 0) { + // We tried to sample but none were found. Assume worst-case (compression + // ratio 1.0) so data is complete and aggregatable. + rep_->props.slow_compression_estimated_data_size = + rep_->compressible_input_data_bytes + + rep_->uncompressible_input_data_bytes; + rep_->props.fast_compression_estimated_data_size = + rep_->compressible_input_data_bytes + + rep_->uncompressible_input_data_bytes; + } + + // Add basic properties + property_block_builder.AddTableProperty(rep_->props); + + // Add use collected properties + NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors, + rep_->ioptions.logger, + &property_block_builder); + + Slice block_data = property_block_builder.Finish(); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data); + WriteMaybeCompressedBlock(block_data, kNoCompression, + &properties_block_handle, BlockType::kProperties); + } + if (ok()) { +#ifndef NDEBUG + { + uint64_t props_block_offset = properties_block_handle.offset(); + uint64_t props_block_size = properties_block_handle.size(); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset", + &props_block_offset); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize", + &props_block_size); + } +#endif // !NDEBUG + + const std::string* properties_block_meta = &kPropertiesBlockName; + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WritePropertiesBlock:Meta", + &properties_block_meta); + meta_index_builder->Add(*properties_block_meta, properties_block_handle); + } +} + +void BlockBasedTableBuilder::WriteCompressionDictBlock( + MetaIndexBuilder* meta_index_builder) { + if (rep_->compression_dict != nullptr && + rep_->compression_dict->GetRawDict().size()) { + BlockHandle compression_dict_block_handle; + if (ok()) { + WriteMaybeCompressedBlock(rep_->compression_dict->GetRawDict(), + kNoCompression, &compression_dict_block_handle, + BlockType::kCompressionDictionary); +#ifndef NDEBUG + Slice compression_dict = rep_->compression_dict->GetRawDict(); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict", + &compression_dict); +#endif // NDEBUG + } + if (ok()) { + meta_index_builder->Add(kCompressionDictBlockName, + compression_dict_block_handle); + } + } +} + +void BlockBasedTableBuilder::WriteRangeDelBlock( + MetaIndexBuilder* meta_index_builder) { + if (ok() && !rep_->range_del_block.empty()) { + BlockHandle range_del_block_handle; + WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression, + &range_del_block_handle, + BlockType::kRangeDeletion); + meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle); + } +} + +void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle, + BlockHandle& index_block_handle) { + Rep* r = rep_; + // this is guaranteed by BlockBasedTableBuilder's constructor + assert(r->table_options.checksum == kCRC32c || + r->table_options.format_version != 0); + assert(ok()); + + FooterBuilder footer; + footer.Build(kBlockBasedTableMagicNumber, r->table_options.format_version, + r->get_offset(), r->table_options.checksum, + metaindex_block_handle, index_block_handle); + IOStatus ios = r->file->Append(footer.GetSlice()); + if (ios.ok()) { + r->set_offset(r->get_offset() + footer.GetSlice().size()); + } else { + r->SetIOStatus(ios); + } +} + +void BlockBasedTableBuilder::EnterUnbuffered() { + Rep* r = rep_; + assert(r->state == Rep::State::kBuffered); + r->state = Rep::State::kUnbuffered; + const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0 + ? r->compression_opts.zstd_max_train_bytes + : r->compression_opts.max_dict_bytes; + const size_t kNumBlocksBuffered = r->data_block_buffers.size(); + if (kNumBlocksBuffered == 0) { + // The below code is neither safe nor necessary for handling zero data + // blocks. + return; + } + + // Abstract algebra teaches us that a finite cyclic group (such as the + // additive group of integers modulo N) can be generated by a number that is + // coprime with N. Since N is variable (number of buffered data blocks), we + // must then pick a prime number in order to guarantee coprimeness with any N. + // + // One downside of this approach is the spread will be poor when + // `kPrimeGeneratorRemainder` is close to zero or close to + // `kNumBlocksBuffered`. + // + // Picked a random number between one and one trillion and then chose the + // next prime number greater than or equal to it. + const uint64_t kPrimeGenerator = 545055921143ull; + // Can avoid repeated division by just adding the remainder repeatedly. + const size_t kPrimeGeneratorRemainder = static_cast( + kPrimeGenerator % static_cast(kNumBlocksBuffered)); + const size_t kInitSampleIdx = kNumBlocksBuffered / 2; + + std::string compression_dict_samples; + std::vector compression_dict_sample_lens; + size_t buffer_idx = kInitSampleIdx; + for (size_t i = 0; + i < kNumBlocksBuffered && compression_dict_samples.size() < kSampleBytes; + ++i) { + size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(), + r->data_block_buffers[buffer_idx].size()); + compression_dict_samples.append(r->data_block_buffers[buffer_idx], 0, + copy_len); + compression_dict_sample_lens.emplace_back(copy_len); + + buffer_idx += kPrimeGeneratorRemainder; + if (buffer_idx >= kNumBlocksBuffered) { + buffer_idx -= kNumBlocksBuffered; + } + } + + // final data block flushed, now we can generate dictionary from the samples. + // OK if compression_dict_samples is empty, we'll just get empty dictionary. + std::string dict; + if (r->compression_opts.zstd_max_train_bytes > 0) { + if (r->compression_opts.use_zstd_dict_trainer) { + dict = ZSTD_TrainDictionary(compression_dict_samples, + compression_dict_sample_lens, + r->compression_opts.max_dict_bytes); + } else { + dict = ZSTD_FinalizeDictionary( + compression_dict_samples, compression_dict_sample_lens, + r->compression_opts.max_dict_bytes, r->compression_opts.level); + } + } else { + dict = std::move(compression_dict_samples); + } + r->compression_dict.reset(new CompressionDict(dict, r->compression_type, + r->compression_opts.level)); + r->verify_dict.reset(new UncompressionDict( + dict, r->compression_type == kZSTD || + r->compression_type == kZSTDNotFinalCompression)); + + auto get_iterator_for_block = [&r](size_t i) { + auto& data_block = r->data_block_buffers[i]; + assert(!data_block.empty()); + + Block reader{BlockContents{data_block}}; + DataBlockIter* iter = reader.NewDataIterator( + r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber); + + iter->SeekToFirst(); + assert(iter->Valid()); + return std::unique_ptr(iter); + }; + + std::unique_ptr iter = nullptr, next_block_iter = nullptr; + + for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) { + if (iter == nullptr) { + iter = get_iterator_for_block(i); + assert(iter != nullptr); + }; + + if (i + 1 < r->data_block_buffers.size()) { + next_block_iter = get_iterator_for_block(i + 1); + } + + auto& data_block = r->data_block_buffers[i]; + if (r->IsParallelCompressionEnabled()) { + Slice first_key_in_next_block; + const Slice* first_key_in_next_block_ptr = &first_key_in_next_block; + if (i + 1 < r->data_block_buffers.size()) { + assert(next_block_iter != nullptr); + first_key_in_next_block = next_block_iter->key(); + } else { + first_key_in_next_block_ptr = r->first_key_in_next_block; + } + + std::vector keys; + for (; iter->Valid(); iter->Next()) { + keys.emplace_back(iter->key().ToString()); + } + + ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( + r->compression_type, first_key_in_next_block_ptr, &data_block, &keys); + + assert(block_rep != nullptr); + r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), + r->get_offset()); + r->pc_rep->EmitBlock(block_rep); + } else { + for (; iter->Valid(); iter->Next()) { + Slice key = iter->key(); + if (r->filter_builder != nullptr) { + size_t ts_sz = + r->internal_comparator.user_comparator()->timestamp_size(); + r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + } + r->index_builder->OnKeyAdded(key); + } + WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData); + if (ok() && i + 1 < r->data_block_buffers.size()) { + assert(next_block_iter != nullptr); + Slice first_key_in_next_block = next_block_iter->key(); + + Slice* first_key_in_next_block_ptr = &first_key_in_next_block; + + iter->SeekToLast(); + std::string last_key = iter->key().ToString(); + r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr, + r->pending_handle); + } + } + std::swap(iter, next_block_iter); + } + r->data_block_buffers.clear(); + r->data_begin_offset = 0; + // Release all reserved cache for data block buffers + if (r->compression_dict_buffer_cache_res_mgr != nullptr) { + Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation( + r->data_begin_offset); + s.PermitUncheckedError(); + } +} + +Status BlockBasedTableBuilder::Finish() { + Rep* r = rep_; + assert(r->state != Rep::State::kClosed); + bool empty_data_block = r->data_block.empty(); + r->first_key_in_next_block = nullptr; + Flush(); + if (r->state == Rep::State::kBuffered) { + EnterUnbuffered(); + } + if (r->IsParallelCompressionEnabled()) { + StopParallelCompression(); +#ifndef NDEBUG + for (const auto& br : r->pc_rep->block_rep_buf) { + assert(br.status.ok()); + } +#endif // !NDEBUG + } else { + // To make sure properties block is able to keep the accurate size of index + // block, we will finish writing all index entries first. + if (ok() && !empty_data_block) { + r->index_builder->AddIndexEntry( + &r->last_key, nullptr /* no next data block */, r->pending_handle); + } + } + + // Write meta blocks, metaindex block and footer in the following order. + // 1. [meta block: filter] + // 2. [meta block: index] + // 3. [meta block: compression dictionary] + // 4. [meta block: range deletion tombstone] + // 5. [meta block: properties] + // 6. [metaindex block] + // 7. Footer + BlockHandle metaindex_block_handle, index_block_handle; + MetaIndexBuilder meta_index_builder; + WriteFilterBlock(&meta_index_builder); + WriteIndexBlock(&meta_index_builder, &index_block_handle); + WriteCompressionDictBlock(&meta_index_builder); + WriteRangeDelBlock(&meta_index_builder); + WritePropertiesBlock(&meta_index_builder); + if (ok()) { + // flush the meta index block + WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression, + &metaindex_block_handle, BlockType::kMetaIndex); + } + if (ok()) { + WriteFooter(metaindex_block_handle, index_block_handle); + } + r->state = Rep::State::kClosed; + r->SetStatus(r->CopyIOStatus()); + Status ret_status = r->CopyStatus(); + assert(!ret_status.ok() || io_status().ok()); + return ret_status; +} + +void BlockBasedTableBuilder::Abandon() { + assert(rep_->state != Rep::State::kClosed); + if (rep_->IsParallelCompressionEnabled()) { + StopParallelCompression(); + } + rep_->state = Rep::State::kClosed; + rep_->CopyStatus().PermitUncheckedError(); + rep_->CopyIOStatus().PermitUncheckedError(); +} + +uint64_t BlockBasedTableBuilder::NumEntries() const { + return rep_->props.num_entries; +} + +bool BlockBasedTableBuilder::IsEmpty() const { + return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0; +} + +uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } + +uint64_t BlockBasedTableBuilder::EstimatedFileSize() const { + if (rep_->IsParallelCompressionEnabled()) { + // Use compression ratio so far and inflight uncompressed bytes to estimate + // final SST size. + return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize(); + } else { + return FileSize(); + } +} + +bool BlockBasedTableBuilder::NeedCompact() const { + for (const auto& collector : rep_->table_properties_collectors) { + if (collector->NeedCompact()) { + return true; + } + } + return false; +} + +TableProperties BlockBasedTableBuilder::GetTableProperties() const { + TableProperties ret = rep_->props; + for (const auto& collector : rep_->table_properties_collectors) { + for (const auto& prop : collector->GetReadableProperties()) { + ret.readable_properties.insert(prop); + } + collector->Finish(&ret.user_collected_properties).PermitUncheckedError(); + } + return ret; +} + +std::string BlockBasedTableBuilder::GetFileChecksum() const { + if (rep_->file != nullptr) { + return rep_->file->GetFileChecksum(); + } else { + return kUnknownFileChecksum; + } +} + +const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const { + if (rep_->file != nullptr) { + return rep_->file->GetFileChecksumFuncName(); + } else { + return kUnknownFileChecksumFuncName; + } +} +void BlockBasedTableBuilder::SetSeqnoTimeTableProperties( + const std::string& encoded_seqno_to_time_mapping, + uint64_t oldest_ancestor_time) { + rep_->props.seqno_to_time_mapping = encoded_seqno_to_time_mapping; + rep_->props.creation_time = oldest_ancestor_time; +} + +const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter."; +const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter."; +const std::string BlockBasedTable::kPartitionedFilterBlockPrefix = + "partitionedfilter."; +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3