// Copyright (c) Meta Platforms, Inc. and affiliates. // // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include "db/blob/blob_garbage_meter.h" #include "db/compaction/compaction.h" #include "db/compaction/compaction_iterator.h" #include "db/internal_stats.h" #include "db/output_validator.h" namespace ROCKSDB_NAMESPACE { class CompactionOutputs; using CompactionFileOpenFunc = std::function; using CompactionFileCloseFunc = std::function; // Files produced by subcompaction, most of the functions are used by // compaction_job Open/Close compaction file functions. class CompactionOutputs { public: // compaction output file struct Output { Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp, bool _enable_order_check, bool _enable_hash, bool _finished, uint64_t precalculated_hash) : meta(std::move(_meta)), validator(_icmp, _enable_order_check, _enable_hash, precalculated_hash), finished(_finished) {} FileMetaData meta; OutputValidator validator; bool finished; std::shared_ptr table_properties; }; CompactionOutputs() = delete; explicit CompactionOutputs(const Compaction* compaction, const bool is_penultimate_level); // Add generated output to the list void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp, bool enable_order_check, bool enable_hash, bool finished = false, uint64_t precalculated_hash = 0) { outputs_.emplace_back(std::move(meta), icmp, enable_order_check, enable_hash, finished, precalculated_hash); } // Set new table builder for the current output void NewBuilder(const TableBuilderOptions& tboptions); // Assign a new WritableFileWriter to the current output void AssignFileWriter(WritableFileWriter* writer) { file_writer_.reset(writer); } // TODO: Remove it when remote compaction support tiered compaction void SetTotalBytes(uint64_t bytes) { stats_.bytes_written += bytes; } void SetNumOutputRecords(uint64_t num) { stats_.num_output_records = num; } // TODO: Move the BlobDB builder into CompactionOutputs const std::vector& GetBlobFileAdditions() const { if (is_penultimate_level_) { assert(blob_file_additions_.empty()); } return blob_file_additions_; } std::vector* GetBlobFileAdditionsPtr() { assert(!is_penultimate_level_); return &blob_file_additions_; } bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); } BlobGarbageMeter* CreateBlobGarbageMeter() { assert(!is_penultimate_level_); blob_garbage_meter_ = std::make_unique(); return blob_garbage_meter_.get(); } BlobGarbageMeter* GetBlobGarbageMeter() const { if (is_penultimate_level_) { // blobdb doesn't support per_key_placement yet assert(blob_garbage_meter_ == nullptr); return nullptr; } return blob_garbage_meter_.get(); } void UpdateBlobStats() { assert(!is_penultimate_level_); stats_.num_output_files_blob = blob_file_additions_.size(); for (const auto& blob : blob_file_additions_) { stats_.bytes_written_blob += blob.GetTotalBlobBytes(); } } // Finish the current output file Status Finish(const Status& intput_status, const SeqnoToTimeMapping& seqno_time_mapping); // Update output table properties from table builder void UpdateTableProperties() { current_output().table_properties = std::make_shared(GetTableProperties()); } IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock, Statistics* statistics, bool use_fsync); TableProperties GetTableProperties() { return builder_->GetTableProperties(); } Slice SmallestUserKey() const { if (!outputs_.empty() && outputs_[0].finished) { return outputs_[0].meta.smallest.user_key(); } else { return Slice{nullptr, 0}; } } Slice LargestUserKey() const { if (!outputs_.empty() && outputs_.back().finished) { return outputs_.back().meta.largest.user_key(); } else { return Slice{nullptr, 0}; } } // In case the last output file is empty, which doesn't need to keep. void RemoveLastEmptyOutput() { if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) { // An error occurred, so ignore the last output. outputs_.pop_back(); } } // Remove the last output, for example the last output doesn't have data (no // entry and no range-dels), but file_size might not be 0, as it has SST // metadata. void RemoveLastOutput() { assert(!outputs_.empty()); outputs_.pop_back(); } bool HasBuilder() const { return builder_ != nullptr; } FileMetaData* GetMetaData() { return ¤t_output().meta; } bool HasOutput() const { return !outputs_.empty(); } uint64_t NumEntries() const { return builder_->NumEntries(); } void ResetBuilder() { builder_.reset(); current_output_file_size_ = 0; } // Add range-dels from the aggregator to the current output file // @param comp_start_user_key and comp_end_user_key include timestamp if // user-defined timestamp is enabled. // @param full_history_ts_low used for range tombstone garbage collection. Status AddRangeDels(const Slice* comp_start_user_key, const Slice* comp_end_user_key, CompactionIterationStats& range_del_out_stats, bool bottommost_level, const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, const Slice& next_table_min_key, const std::string& full_history_ts_low); // if the outputs have range delete, range delete is also data bool HasRangeDel() const { return range_del_agg_ && !range_del_agg_->IsEmpty(); } private: friend class SubcompactionState; void FillFilesToCutForTtl(); void SetOutputSlitKey(const std::optional start, const std::optional end) { const InternalKeyComparator* icmp = &compaction_->column_family_data()->internal_comparator(); const InternalKey* output_split_key = compaction_->GetOutputSplitKey(); // Invalid output_split_key indicates that we do not need to split if (output_split_key != nullptr) { // We may only split the output when the cursor is in the range. Split if ((!end.has_value() || icmp->user_comparator()->Compare( ExtractUserKey(output_split_key->Encode()), end.value()) < 0) && (!start.has_value() || icmp->user_comparator()->Compare( ExtractUserKey(output_split_key->Encode()), start.value()) > 0)) { local_output_split_key_ = output_split_key; } } } // Returns true iff we should stop building the current output // before processing the current key in compaction iterator. bool ShouldStopBefore(const CompactionIterator& c_iter); void Cleanup() { if (builder_ != nullptr) { // May happen if we get a shutdown call in the middle of compaction builder_->Abandon(); builder_.reset(); } } // update tracked grandparents information like grandparent index, if it's // in the gap between 2 grandparent files, accumulated grandparent files size // etc. // It returns how many boundaries it crosses by including current key. size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key); // helper function to get the overlapped grandparent files size, it's only // used for calculating the first key's overlap. uint64_t GetCurrentKeyGrandparentOverlappedBytes( const Slice& internal_key) const; // Add current key from compaction_iterator to the output file. If needed // close and open new compaction output with the functions provided. Status AddToOutput(const CompactionIterator& c_iter, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func); // Close the current output. `open_file_func` is needed for creating new file // for range-dels only output file. Status CloseOutput(const Status& curr_status, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func) { Status status = curr_status; // handle subcompaction containing only range deletions if (status.ok() && !HasBuilder() && !HasOutput() && HasRangeDel()) { status = open_file_func(*this); } if (HasBuilder()) { const Slice empty_key{}; Status s = close_file_func(*this, status, empty_key); if (!s.ok() && status.ok()) { status = s; } } return status; } // This subcompaction's output could be empty if compaction was aborted before // this subcompaction had a chance to generate any output files. When // subcompactions are executed sequentially this is more likely and will be // particularly likely for the later subcompactions to be empty. Once they are // run in parallel however it should be much rarer. // It's caller's responsibility to make sure it's not empty. Output& current_output() { assert(!outputs_.empty()); return outputs_.back(); } // Assign the range_del_agg to the target output level. There's only one // range-del-aggregator per compaction outputs, for // output_to_penultimate_level compaction it is only assigned to the // penultimate level. void AssignRangeDelAggregator( std::unique_ptr&& range_del_agg) { assert(range_del_agg_ == nullptr); range_del_agg_ = std::move(range_del_agg); } const Compaction* compaction_; // current output builder and writer std::unique_ptr builder_; std::unique_ptr file_writer_; uint64_t current_output_file_size_ = 0; // all the compaction outputs so far std::vector outputs_; // BlobDB info std::vector blob_file_additions_; std::unique_ptr blob_garbage_meter_; // Basic compaction output stats for this level's outputs InternalStats::CompactionOutputsStats stats_; // indicate if this CompactionOutputs obj for penultimate_level, should always // be false if per_key_placement feature is not enabled. const bool is_penultimate_level_; std::unique_ptr range_del_agg_ = nullptr; // partitioner information std::string last_key_for_partitioner_; std::unique_ptr partitioner_; // A flag determines if this subcompaction has been split by the cursor bool is_split_ = false; // We also maintain the output split key for each subcompaction to avoid // repetitive comparison in ShouldStopBefore() const InternalKey* local_output_split_key_ = nullptr; // Some identified files with old oldest ancester time and the range should be // isolated out so that the output file(s) in that range can be merged down // for TTL and clear the timestamps for the range. std::vector files_to_cut_for_ttl_; int cur_files_to_cut_for_ttl_ = -1; int next_files_to_cut_for_ttl_ = 0; // An index that used to speed up ShouldStopBefore(). size_t grandparent_index_ = 0; // if the output key is being grandparent files gap, so: // key > grandparents[grandparent_index_ - 1].largest && // key < grandparents[grandparent_index_].smallest bool being_grandparent_gap_ = true; // The number of bytes overlapping between the current output and // grandparent files used in ShouldStopBefore(). uint64_t grandparent_overlapped_bytes_ = 0; // A flag determines whether the key has been seen in ShouldStopBefore() bool seen_key_ = false; // for the current output file, how many file boundaries has it crossed, // basically number of files overlapped * 2 size_t grandparent_boundary_switched_num_ = 0; }; // helper struct to concatenate the last level and penultimate level outputs // which could be replaced by std::ranges::join_view() in c++20 struct OutputIterator { public: explicit OutputIterator(const std::vector& a, const std::vector& b) : a_(a), b_(b) { within_a = !a_.empty(); idx_ = 0; } OutputIterator begin() { return *this; } OutputIterator end() { return *this; } size_t size() { return a_.size() + b_.size(); } const CompactionOutputs::Output& operator*() const { return within_a ? a_[idx_] : b_[idx_]; } OutputIterator& operator++() { idx_++; if (within_a && idx_ >= a_.size()) { within_a = false; idx_ = 0; } assert(within_a || idx_ <= b_.size()); return *this; } bool operator!=(const OutputIterator& /*rhs*/) const { return within_a || idx_ < b_.size(); } private: const std::vector& a_; const std::vector& b_; bool within_a; size_t idx_; }; } // namespace ROCKSDB_NAMESPACE