diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/compaction/compaction_picker_universal.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/compaction/compaction_picker_universal.cc')
-rw-r--r-- | src/rocksdb/db/compaction/compaction_picker_universal.cc | 1450 |
1 files changed, 1450 insertions, 0 deletions
diff --git a/src/rocksdb/db/compaction/compaction_picker_universal.cc b/src/rocksdb/db/compaction/compaction_picker_universal.cc new file mode 100644 index 000000000..376e4f60f --- /dev/null +++ b/src/rocksdb/db/compaction/compaction_picker_universal.cc @@ -0,0 +1,1450 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/compaction/compaction_picker_universal.h" +#ifndef ROCKSDB_LITE + +#include <cinttypes> +#include <limits> +#include <queue> +#include <string> +#include <utility> + +#include "db/column_family.h" +#include "file/filename.h" +#include "logging/log_buffer.h" +#include "logging/logging.h" +#include "monitoring/statistics.h" +#include "test_util/sync_point.h" +#include "util/random.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +namespace { +// A helper class that form universal compactions. The class is used by +// UniversalCompactionPicker::PickCompaction(). +// The usage is to create the class, and get the compaction object by calling +// PickCompaction(). +class UniversalCompactionBuilder { + public: + UniversalCompactionBuilder( + const ImmutableOptions& ioptions, const InternalKeyComparator* icmp, + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, + UniversalCompactionPicker* picker, LogBuffer* log_buffer) + : ioptions_(ioptions), + icmp_(icmp), + cf_name_(cf_name), + mutable_cf_options_(mutable_cf_options), + mutable_db_options_(mutable_db_options), + vstorage_(vstorage), + picker_(picker), + log_buffer_(log_buffer) {} + + // Form and return the compaction object. The caller owns return object. + Compaction* PickCompaction(); + + private: + struct SortedRun { + SortedRun(int _level, FileMetaData* _file, uint64_t _size, + uint64_t _compensated_file_size, bool _being_compacted) + : level(_level), + file(_file), + size(_size), + compensated_file_size(_compensated_file_size), + being_compacted(_being_compacted) { + assert(compensated_file_size > 0); + assert(level != 0 || file != nullptr); + } + + void Dump(char* out_buf, size_t out_buf_size, + bool print_path = false) const; + + // sorted_run_count is added into the string to print + void DumpSizeInfo(char* out_buf, size_t out_buf_size, + size_t sorted_run_count) const; + + int level; + // `file` Will be null for level > 0. For level = 0, the sorted run is + // for this file. + FileMetaData* file; + // For level > 0, `size` and `compensated_file_size` are sum of sizes all + // files in the level. `being_compacted` should be the same for all files + // in a non-zero level. Use the value here. + uint64_t size; + uint64_t compensated_file_size; + bool being_compacted; + }; + + // Pick Universal compaction to limit read amplification + Compaction* PickCompactionToReduceSortedRuns( + unsigned int ratio, unsigned int max_number_of_files_to_compact); + + // Pick Universal compaction to limit space amplification. + Compaction* PickCompactionToReduceSizeAmp(); + + // Try to pick incremental compaction to reduce space amplification. + // It will return null if it cannot find a fanout within the threshold. + // Fanout is defined as + // total size of files to compact at output level + // -------------------------------------------------- + // total size of files to compact at other levels + Compaction* PickIncrementalForReduceSizeAmp(double fanout_threshold); + + Compaction* PickDeleteTriggeredCompaction(); + + // Form a compaction from the sorted run indicated by start_index to the + // oldest sorted run. + // The caller is responsible for making sure that those files are not in + // compaction. + Compaction* PickCompactionToOldest(size_t start_index, + CompactionReason compaction_reason); + + Compaction* PickCompactionWithSortedRunRange( + size_t start_index, size_t end_index, CompactionReason compaction_reason); + + // Try to pick periodic compaction. The caller should only call it + // if there is at least one file marked for periodic compaction. + // null will be returned if no such a compaction can be formed + // because some files are being compacted. + Compaction* PickPeriodicCompaction(); + + // Used in universal compaction when the allow_trivial_move + // option is set. Checks whether there are any overlapping files + // in the input. Returns true if the input files are non + // overlapping. + bool IsInputFilesNonOverlapping(Compaction* c); + + uint64_t GetMaxOverlappingBytes() const; + + const ImmutableOptions& ioptions_; + const InternalKeyComparator* icmp_; + double score_; + std::vector<SortedRun> sorted_runs_; + const std::string& cf_name_; + const MutableCFOptions& mutable_cf_options_; + const MutableDBOptions& mutable_db_options_; + VersionStorageInfo* vstorage_; + UniversalCompactionPicker* picker_; + LogBuffer* log_buffer_; + + static std::vector<SortedRun> CalculateSortedRuns( + const VersionStorageInfo& vstorage); + + // Pick a path ID to place a newly generated file, with its estimated file + // size. + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + uint64_t file_size); +}; + +// Used in universal compaction when trivial move is enabled. +// This structure is used for the construction of min heap +// that contains the file meta data, the level of the file +// and the index of the file in that level + +struct InputFileInfo { + InputFileInfo() : f(nullptr), level(0), index(0) {} + + FileMetaData* f; + size_t level; + size_t index; +}; + +// Used in universal compaction when trivial move is enabled. +// This comparator is used for the construction of min heap +// based on the smallest key of the file. +struct SmallestKeyHeapComparator { + explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; } + + bool operator()(InputFileInfo i1, InputFileInfo i2) const { + return (ucmp_->CompareWithoutTimestamp(i1.f->smallest.user_key(), + i2.f->smallest.user_key()) > 0); + } + + private: + const Comparator* ucmp_; +}; + +using SmallestKeyHeap = + std::priority_queue<InputFileInfo, std::vector<InputFileInfo>, + SmallestKeyHeapComparator>; + +// This function creates the heap that is used to find if the files are +// overlapping during universal compaction when the allow_trivial_move +// is set. +SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { + SmallestKeyHeap smallest_key_priority_q = + SmallestKeyHeap(SmallestKeyHeapComparator(ucmp)); + + InputFileInfo input_file; + + for (size_t l = 0; l < c->num_input_levels(); l++) { + if (c->num_input_files(l) != 0) { + if (l == 0 && c->start_level() == 0) { + for (size_t i = 0; i < c->num_input_files(0); i++) { + input_file.f = c->input(0, i); + input_file.level = 0; + input_file.index = i; + smallest_key_priority_q.push(std::move(input_file)); + } + } else { + input_file.f = c->input(l, 0); + input_file.level = l; + input_file.index = 0; + smallest_key_priority_q.push(std::move(input_file)); + } + } + } + return smallest_key_priority_q; +} + +#ifndef NDEBUG +// smallest_seqno and largest_seqno are set iff. `files` is not empty. +void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files, + SequenceNumber* smallest_seqno, + SequenceNumber* largest_seqno) { + bool is_first = true; + for (FileMetaData* f : files) { + assert(f->fd.smallest_seqno <= f->fd.largest_seqno); + if (is_first) { + is_first = false; + *smallest_seqno = f->fd.smallest_seqno; + *largest_seqno = f->fd.largest_seqno; + } else { + if (f->fd.smallest_seqno < *smallest_seqno) { + *smallest_seqno = f->fd.smallest_seqno; + } + if (f->fd.largest_seqno > *largest_seqno) { + *largest_seqno = f->fd.largest_seqno; + } + } + } +} +#endif +} // namespace + +// Algorithm that checks to see if there are any overlapping +// files in the input +bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) { + auto comparator = icmp_->user_comparator(); + int first_iter = 1; + + InputFileInfo prev, curr, next; + + SmallestKeyHeap smallest_key_priority_q = + create_level_heap(c, icmp_->user_comparator()); + + while (!smallest_key_priority_q.empty()) { + curr = smallest_key_priority_q.top(); + smallest_key_priority_q.pop(); + + if (first_iter) { + prev = curr; + first_iter = 0; + } else { + if (comparator->CompareWithoutTimestamp( + prev.f->largest.user_key(), curr.f->smallest.user_key()) >= 0) { + // found overlapping files, return false + return false; + } + assert(comparator->CompareWithoutTimestamp( + curr.f->largest.user_key(), prev.f->largest.user_key()) > 0); + prev = curr; + } + + next.f = nullptr; + + if (c->level(curr.level) != 0 && + curr.index < c->num_input_files(curr.level) - 1) { + next.f = c->input(curr.level, curr.index + 1); + next.level = curr.level; + next.index = curr.index + 1; + } + + if (next.f) { + smallest_key_priority_q.push(std::move(next)); + } + } + return true; +} + +bool UniversalCompactionPicker::NeedsCompaction( + const VersionStorageInfo* vstorage) const { + const int kLevel0 = 0; + if (vstorage->CompactionScore(kLevel0) >= 1) { + return true; + } + if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) { + return true; + } + if (!vstorage->FilesMarkedForCompaction().empty()) { + return true; + } + return false; +} + +Compaction* UniversalCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, + LogBuffer* log_buffer, SequenceNumber /* earliest_memtable_seqno */) { + UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name, + mutable_cf_options, mutable_db_options, + vstorage, this, log_buffer); + return builder.PickCompaction(); +} + +void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf, + size_t out_buf_size, + bool print_path) const { + if (level == 0) { + assert(file != nullptr); + if (file->fd.GetPathId() == 0 || !print_path) { + snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber()); + } else { + snprintf(out_buf, out_buf_size, + "file %" PRIu64 + "(path " + "%" PRIu32 ")", + file->fd.GetNumber(), file->fd.GetPathId()); + } + } else { + snprintf(out_buf, out_buf_size, "level %d", level); + } +} + +void UniversalCompactionBuilder::SortedRun::DumpSizeInfo( + char* out_buf, size_t out_buf_size, size_t sorted_run_count) const { + if (level == 0) { + assert(file != nullptr); + snprintf(out_buf, out_buf_size, + "file %" PRIu64 "[%" ROCKSDB_PRIszt + "] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(), + file->compensated_file_size); + } else { + snprintf(out_buf, out_buf_size, + "level %d[%" ROCKSDB_PRIszt + "] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + level, sorted_run_count, size, compensated_file_size); + } +} + +std::vector<UniversalCompactionBuilder::SortedRun> +UniversalCompactionBuilder::CalculateSortedRuns( + const VersionStorageInfo& vstorage) { + std::vector<UniversalCompactionBuilder::SortedRun> ret; + for (FileMetaData* f : vstorage.LevelFiles(0)) { + ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, + f->being_compacted); + } + for (int level = 1; level < vstorage.num_levels(); level++) { + uint64_t total_compensated_size = 0U; + uint64_t total_size = 0U; + bool being_compacted = false; + for (FileMetaData* f : vstorage.LevelFiles(level)) { + total_compensated_size += f->compensated_file_size; + total_size += f->fd.GetFileSize(); + // Size amp, read amp and periodic compactions always include all files + // for a non-zero level. However, a delete triggered compaction and + // a trivial move might pick a subset of files in a sorted run. So + // always check all files in a sorted run and mark the entire run as + // being compacted if one or more files are being compacted + if (f->being_compacted) { + being_compacted = f->being_compacted; + } + } + if (total_compensated_size > 0) { + ret.emplace_back(level, nullptr, total_size, total_compensated_size, + being_compacted); + } + } + return ret; +} + +// Universal style of compaction. Pick files that are contiguous in +// time-range to compact. +Compaction* UniversalCompactionBuilder::PickCompaction() { + const int kLevel0 = 0; + score_ = vstorage_->CompactionScore(kLevel0); + sorted_runs_ = CalculateSortedRuns(*vstorage_); + + if (sorted_runs_.size() == 0 || + (vstorage_->FilesMarkedForPeriodicCompaction().empty() && + vstorage_->FilesMarkedForCompaction().empty() && + sorted_runs_.size() < (unsigned int)mutable_cf_options_ + .level0_file_num_compaction_trigger)) { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n", + cf_name_.c_str()); + TEST_SYNC_POINT_CALLBACK( + "UniversalCompactionBuilder::PickCompaction:Return", nullptr); + return nullptr; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_BUFFER_MAX_SZ( + log_buffer_, 3072, + "[%s] Universal: sorted runs: %" ROCKSDB_PRIszt " files: %s\n", + cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp)); + + Compaction* c = nullptr; + // Periodic compaction has higher priority than other type of compaction + // because it's a hard requirement. + if (!vstorage_->FilesMarkedForPeriodicCompaction().empty()) { + // Always need to do a full compaction for periodic compaction. + c = PickPeriodicCompaction(); + } + + // Check for size amplification. + if (c == nullptr && + sorted_runs_.size() >= + static_cast<size_t>( + mutable_cf_options_.level0_file_num_compaction_trigger)) { + if ((c = PickCompactionToReduceSizeAmp()) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size amp\n", + cf_name_.c_str()); + } else { + // Size amplification is within limits. Try reducing read + // amplification while maintaining file size ratios. + unsigned int ratio = + mutable_cf_options_.compaction_options_universal.size_ratio; + + if ((c = PickCompactionToReduceSortedRuns(ratio, UINT_MAX)) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: compacting for size ratio\n", + cf_name_.c_str()); + } else { + // Size amplification and file size ratios are within configured limits. + // If max read amplification is exceeding configured limits, then force + // compaction without looking at filesize ratios and try to reduce + // the number of files to fewer than level0_file_num_compaction_trigger. + // This is guaranteed by NeedsCompaction() + assert(sorted_runs_.size() >= + static_cast<size_t>( + mutable_cf_options_.level0_file_num_compaction_trigger)); + // Get the total number of sorted runs that are not being compacted + int num_sr_not_compacted = 0; + for (size_t i = 0; i < sorted_runs_.size(); i++) { + if (sorted_runs_[i].being_compacted == false) { + num_sr_not_compacted++; + } + } + + // The number of sorted runs that are not being compacted is greater + // than the maximum allowed number of sorted runs + if (num_sr_not_compacted > + mutable_cf_options_.level0_file_num_compaction_trigger) { + unsigned int num_files = + num_sr_not_compacted - + mutable_cf_options_.level0_file_num_compaction_trigger + 1; + if ((c = PickCompactionToReduceSortedRuns(UINT_MAX, num_files)) != + nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: compacting for file num -- %u\n", + cf_name_.c_str(), num_files); + } + } + } + } + } + + if (c == nullptr) { + if ((c = PickDeleteTriggeredCompaction()) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: delete triggered compaction\n", + cf_name_.c_str()); + } + } + + if (c == nullptr) { + TEST_SYNC_POINT_CALLBACK( + "UniversalCompactionBuilder::PickCompaction:Return", nullptr); + return nullptr; + } + + if (mutable_cf_options_.compaction_options_universal.allow_trivial_move == + true && + c->compaction_reason() != CompactionReason::kPeriodicCompaction) { + c->set_is_trivial_move(IsInputFilesNonOverlapping(c)); + } + +// validate that all the chosen files of L0 are non overlapping in time +#ifndef NDEBUG + bool is_first = true; + + size_t level_index = 0U; + if (c->start_level() == 0) { + for (auto f : *c->inputs(0)) { + assert(f->fd.smallest_seqno <= f->fd.largest_seqno); + if (is_first) { + is_first = false; + } + } + level_index = 1U; + } + for (; level_index < c->num_input_levels(); level_index++) { + if (c->num_input_files(level_index) != 0) { + SequenceNumber smallest_seqno = 0U; + SequenceNumber largest_seqno = 0U; + GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno, + &largest_seqno); + if (is_first) { + is_first = false; + } + } + } +#endif + // update statistics + size_t num_files = 0; + for (auto& each_level : *c->inputs()) { + num_files += each_level.files.size(); + } + RecordInHistogram(ioptions_.stats, NUM_FILES_IN_SINGLE_COMPACTION, num_files); + + picker_->RegisterCompaction(c); + vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_); + + TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return", + c); + return c; +} + +uint32_t UniversalCompactionBuilder::GetPathId( + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, uint64_t file_size) { + // Two conditions need to be satisfied: + // (1) the target path needs to be able to hold the file's size + // (2) Total size left in this and previous paths need to be not + // smaller than expected future file size before this new file is + // compacted, which is estimated based on size_ratio. + // For example, if now we are compacting files of size (1, 1, 2, 4, 8), + // we will make sure the target file, probably with size of 16, will be + // placed in a path so that eventually when new files are generated and + // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or + // before the path we chose. + // + // TODO(sdong): now the case of multiple column families is not + // considered in this algorithm. So the target size can be violated in + // that case. We need to improve it. + uint64_t accumulated_size = 0; + uint64_t future_size = + file_size * + (100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100; + uint32_t p = 0; + assert(!ioptions.cf_paths.empty()); + for (; p < ioptions.cf_paths.size() - 1; p++) { + uint64_t target_size = ioptions.cf_paths[p].target_size; + if (target_size > file_size && + accumulated_size + (target_size - file_size) > future_size) { + return p; + } + accumulated_size += target_size; + } + return p; +} + +// +// Consider compaction files based on their size differences with +// the next file in time order. +// +Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( + unsigned int ratio, unsigned int max_number_of_files_to_compact) { + unsigned int min_merge_width = + mutable_cf_options_.compaction_options_universal.min_merge_width; + unsigned int max_merge_width = + mutable_cf_options_.compaction_options_universal.max_merge_width; + + const SortedRun* sr = nullptr; + bool done = false; + size_t start_index = 0; + unsigned int candidate_count = 0; + + unsigned int max_files_to_compact = + std::min(max_merge_width, max_number_of_files_to_compact); + min_merge_width = std::max(min_merge_width, 2U); + + // Caller checks the size before executing this function. This invariant is + // important because otherwise we may have a possible integer underflow when + // dealing with unsigned types. + assert(sorted_runs_.size() > 0); + + // Considers a candidate file only if it is smaller than the + // total size accumulated so far. + for (size_t loop = 0; loop < sorted_runs_.size(); loop++) { + candidate_count = 0; + + // Skip files that are already being compacted + for (sr = nullptr; loop < sorted_runs_.size(); loop++) { + sr = &sorted_runs_[loop]; + + if (!sr->being_compacted) { + candidate_count = 1; + break; + } + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf)); + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: %s" + "[%d] being compacted, skipping", + cf_name_.c_str(), file_num_buf, loop); + + sr = nullptr; + } + + // This file is not being compacted. Consider it as the + // first candidate to be compacted. + uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0; + if (sr != nullptr) { + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: Possible candidate %s[%d].", + cf_name_.c_str(), file_num_buf, loop); + } + + // Check if the succeeding files need compaction. + for (size_t i = loop + 1; + candidate_count < max_files_to_compact && i < sorted_runs_.size(); + i++) { + const SortedRun* succeeding_sr = &sorted_runs_[i]; + if (succeeding_sr->being_compacted) { + break; + } + // Pick files if the total/last candidate file size (increased by the + // specified ratio) is still larger than the next candidate file. + // candidate_size is the total size of files picked so far with the + // default kCompactionStopStyleTotalSize; with + // kCompactionStopStyleSimilarSize, it's simply the size of the last + // picked file. + double sz = candidate_size * (100.0 + ratio) / 100.0; + if (sz < static_cast<double>(succeeding_sr->size)) { + break; + } + if (mutable_cf_options_.compaction_options_universal.stop_style == + kCompactionStopStyleSimilarSize) { + // Similar-size stopping rule: also check the last picked file isn't + // far larger than the next candidate file. + sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0; + if (sz < static_cast<double>(candidate_size)) { + // If the small file we've encountered begins a run of similar-size + // files, we'll pick them up on a future iteration of the outer + // loop. If it's some lonely straggler, it'll eventually get picked + // by the last-resort read amp strategy which disregards size ratios. + break; + } + candidate_size = succeeding_sr->compensated_file_size; + } else { // default kCompactionStopStyleTotalSize + candidate_size += succeeding_sr->compensated_file_size; + } + candidate_count++; + } + + // Found a series of consecutive files that need compaction. + if (candidate_count >= (unsigned int)min_merge_width) { + start_index = loop; + done = true; + break; + } else { + for (size_t i = loop; + i < loop + candidate_count && i < sorted_runs_.size(); i++) { + const SortedRun* skipping_sr = &sorted_runs_[i]; + char file_num_buf[256]; + skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Skipping %s", + cf_name_.c_str(), file_num_buf); + } + } + } + if (!done || candidate_count <= 1) { + return nullptr; + } + size_t first_index_after = start_index + candidate_count; + // Compression is enabled if files compacted earlier already reached + // size ratio of compression. + bool enable_compression = true; + int ratio_to_compress = + mutable_cf_options_.compaction_options_universal.compression_size_percent; + if (ratio_to_compress >= 0) { + uint64_t total_size = 0; + for (auto& sorted_run : sorted_runs_) { + total_size += sorted_run.compensated_file_size; + } + + uint64_t older_file_size = 0; + for (size_t i = sorted_runs_.size() - 1; i >= first_index_after; i--) { + older_file_size += sorted_runs_[i].size; + if (older_file_size * 100L >= total_size * (long)ratio_to_compress) { + enable_compression = false; + break; + } + } + } + + uint64_t estimated_total_size = 0; + for (unsigned int i = 0; i < first_index_after; i++) { + estimated_total_size += sorted_runs_[i].size; + } + uint32_t path_id = + GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); + int start_level = sorted_runs_[start_index].level; + int output_level; + if (first_index_after == sorted_runs_.size()) { + output_level = vstorage_->num_levels() - 1; + } else if (sorted_runs_[first_index_after].level == 0) { + output_level = 0; + } else { + output_level = sorted_runs_[first_index_after].level - 1; + } + + // last level is reserved for the files ingested behind + if (ioptions_.allow_ingest_behind && + (output_level == vstorage_->num_levels() - 1)) { + assert(output_level > 1); + output_level--; + } + + std::vector<CompactionInputFiles> inputs(vstorage_->num_levels()); + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i].level = start_level + static_cast<int>(i); + } + for (size_t i = start_index; i < first_index_after; i++) { + auto& picking_sr = sorted_runs_[i]; + if (picking_sr.level == 0) { + FileMetaData* picking_file = picking_sr.file; + inputs[0].files.push_back(picking_file); + } else { + auto& files = inputs[picking_sr.level - start_level].files; + for (auto* f : vstorage_->LevelFiles(picking_sr.level)) { + files.push_back(f); + } + } + char file_num_buf[256]; + picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i); + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Picking %s", + cf_name_.c_str(), file_num_buf); + } + + std::vector<FileMetaData*> grandparents; + // Include grandparents for potential file cutting in incremental + // mode. It is for aligning file cutting boundaries across levels, + // so that subsequent compactions can pick files with aligned + // buffer. + // Single files are only picked up in incremental mode, so that + // there is no need for full range. + if (mutable_cf_options_.compaction_options_universal.incremental && + first_index_after < sorted_runs_.size() && + sorted_runs_[first_index_after].level > 1) { + grandparents = vstorage_->LevelFiles(sorted_runs_[first_index_after].level); + } + + if (output_level != 0 && + picker_->FilesRangeOverlapWithCompaction( + inputs, output_level, + Compaction::EvaluatePenultimateLevel(vstorage_, ioptions_, + start_level, output_level))) { + return nullptr; + } + CompactionReason compaction_reason; + if (max_number_of_files_to_compact == UINT_MAX) { + compaction_reason = CompactionReason::kUniversalSizeRatio; + } else { + compaction_reason = CompactionReason::kUniversalSortedRunNum; + } + return new Compaction(vstorage_, ioptions_, mutable_cf_options_, + mutable_db_options_, std::move(inputs), output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, + kCompactionStyleUniversal), + GetMaxOverlappingBytes(), path_id, + GetCompressionType(vstorage_, mutable_cf_options_, + output_level, 1, enable_compression), + GetCompressionOptions(mutable_cf_options_, vstorage_, + output_level, enable_compression), + Temperature::kUnknown, + /* max_subcompactions */ 0, grandparents, + /* is manual */ false, /* trim_ts */ "", score_, + false /* deletion_compaction */, + /* l0_files_might_overlap */ true, compaction_reason); +} + +// Look at overall size amplification. If size amplification +// exceeds the configured value, then do a compaction +// of the candidate files all the way upto the earliest +// base file (overrides configured values of file-size ratios, +// min_merge_width and max_merge_width). +// +Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() { + // percentage flexibility while reducing size amplification + uint64_t ratio = mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent; + + unsigned int candidate_count = 0; + uint64_t candidate_size = 0; + size_t start_index = 0; + const SortedRun* sr = nullptr; + + assert(!sorted_runs_.empty()); + if (sorted_runs_.back().being_compacted) { + return nullptr; + } + + // Skip files that are already being compacted + for (size_t loop = 0; loop + 1 < sorted_runs_.size(); loop++) { + sr = &sorted_runs_[loop]; + if (!sr->being_compacted) { + start_index = loop; // Consider this as the first candidate. + break; + } + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: skipping %s[%d] compacted %s", + cf_name_.c_str(), file_num_buf, loop, + " cannot be a candidate to reduce size amp.\n"); + sr = nullptr; + } + + if (sr == nullptr) { + return nullptr; // no candidate files + } + { + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER( + log_buffer_, + "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s", + cf_name_.c_str(), file_num_buf, start_index, " to reduce size amp.\n"); + } + + // size of the base sorted run for size amp calculation + uint64_t base_sr_size = sorted_runs_.back().size; + size_t sr_end_idx = sorted_runs_.size() - 1; + // If tiered compaction is enabled and the last sorted run is the last level + if (ioptions_.preclude_last_level_data_seconds > 0 && + ioptions_.num_levels > 2 && + sorted_runs_.back().level == ioptions_.num_levels - 1 && + sorted_runs_.size() > 1) { + sr_end_idx = sorted_runs_.size() - 2; + base_sr_size = sorted_runs_[sr_end_idx].size; + } + + // keep adding up all the remaining files + for (size_t loop = start_index; loop < sr_end_idx; loop++) { + sr = &sorted_runs_[loop]; + if (sr->being_compacted) { + // TODO with incremental compaction is supported, we might want to + // schedule some incremental compactions in parallel if needed. + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER( + log_buffer_, "[%s] Universal: Possible candidate %s[%d] %s", + cf_name_.c_str(), file_num_buf, start_index, + " is already being compacted. No size amp reduction possible.\n"); + return nullptr; + } + candidate_size += sr->compensated_file_size; + candidate_count++; + } + if (candidate_count == 0) { + return nullptr; + } + + // size amplification = percentage of additional size + if (candidate_size * 100 < ratio * base_sr_size) { + ROCKS_LOG_BUFFER( + log_buffer_, + "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64 + " earliest-file-size %" PRIu64, + cf_name_.c_str(), candidate_size, base_sr_size); + return nullptr; + } else { + ROCKS_LOG_BUFFER( + log_buffer_, + "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64 + " earliest-file-size %" PRIu64, + cf_name_.c_str(), candidate_size, base_sr_size); + } + // Since incremental compaction can't include more than second last + // level, it can introduce penalty, compared to full compaction. We + // hard code the pentalty to be 80%. If we end up with a compaction + // fanout higher than 80% of full level compactions, we fall back + // to full level compaction. + // The 80% threshold is arbitrary and can be adjusted or made + // configurable in the future. + // This also prevent the case when compaction falls behind and we + // need to compact more levels for compactions to catch up. + if (mutable_cf_options_.compaction_options_universal.incremental) { + double fanout_threshold = static_cast<double>(base_sr_size) / + static_cast<double>(candidate_size) * 1.8; + Compaction* picked = PickIncrementalForReduceSizeAmp(fanout_threshold); + if (picked != nullptr) { + // As the feature is still incremental, picking incremental compaction + // might fail and we will fall bck to compacting full level. + return picked; + } + } + return PickCompactionWithSortedRunRange( + start_index, sr_end_idx, CompactionReason::kUniversalSizeAmplification); +} + +Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp( + double fanout_threshold) { + // Try find all potential compactions with total size just over + // options.max_compaction_size / 2, and take the one with the lowest + // fanout (defined in declaration of the function). + // This is done by having a sliding window of the files at the second + // lowest level, and keep expanding while finding overlapping in the + // last level. Once total size exceeds the size threshold, calculate + // the fanout value. And then shrinking from the small side of the + // window. Keep doing it until the end. + // Finally, we try to include upper level files if they fall into + // the range. + // + // Note that it is a similar problem as leveled compaction's + // kMinOverlappingRatio priority, but instead of picking single files + // we expand to a target compaction size. The reason is that in + // leveled compaction, actual fanout value tends to high, e.g. 10, so + // even with single file in down merging level, the extra size + // compacted in boundary files is at a lower ratio. But here users + // often have size of second last level size to be 1/4, 1/3 or even + // 1/2 of the bottommost level, so picking single file in second most + // level will cause significant waste, which is not desirable. + // + // This algorithm has lots of room to improve to pick more efficient + // compactions. + assert(sorted_runs_.size() >= 2); + int second_last_level = sorted_runs_[sorted_runs_.size() - 2].level; + if (second_last_level == 0) { + // Can't split Level 0. + return nullptr; + } + int output_level = sorted_runs_.back().level; + const std::vector<FileMetaData*>& bottom_files = + vstorage_->LevelFiles(output_level); + const std::vector<FileMetaData*>& files = + vstorage_->LevelFiles(second_last_level); + assert(!bottom_files.empty()); + assert(!files.empty()); + + // std::unordered_map<uint64_t, uint64_t> file_to_order; + + int picked_start_idx = 0; + int picked_end_idx = 0; + double picked_fanout = fanout_threshold; + + // Use half target compaction bytes as anchor to stop growing second most + // level files, and reserve growing space for more overlapping bottom level, + // clean cut, files from other levels, etc. + uint64_t comp_thres_size = mutable_cf_options_.max_compaction_bytes / 2; + int start_idx = 0; + int bottom_end_idx = 0; + int bottom_start_idx = 0; + uint64_t non_bottom_size = 0; + uint64_t bottom_size = 0; + bool end_bottom_size_counted = false; + for (int end_idx = 0; end_idx < static_cast<int>(files.size()); end_idx++) { + FileMetaData* end_file = files[end_idx]; + + // Include bottom most level files smaller than the current second + // last level file. + int num_skipped = 0; + while (bottom_end_idx < static_cast<int>(bottom_files.size()) && + icmp_->Compare(bottom_files[bottom_end_idx]->largest, + end_file->smallest) < 0) { + if (!end_bottom_size_counted) { + bottom_size += bottom_files[bottom_end_idx]->fd.file_size; + } + bottom_end_idx++; + end_bottom_size_counted = false; + num_skipped++; + } + + if (num_skipped > 1) { + // At least a file in the bottom most level falls into the file gap. No + // reason to include the file. We cut the range and start a new sliding + // window. + start_idx = end_idx; + } + + if (start_idx == end_idx) { + // new sliding window. + non_bottom_size = 0; + bottom_size = 0; + bottom_start_idx = bottom_end_idx; + end_bottom_size_counted = false; + } + + non_bottom_size += end_file->fd.file_size; + + // Include all overlapping files in bottom level. + while (bottom_end_idx < static_cast<int>(bottom_files.size()) && + icmp_->Compare(bottom_files[bottom_end_idx]->smallest, + end_file->largest) < 0) { + if (!end_bottom_size_counted) { + bottom_size += bottom_files[bottom_end_idx]->fd.file_size; + end_bottom_size_counted = true; + } + if (icmp_->Compare(bottom_files[bottom_end_idx]->largest, + end_file->largest) > 0) { + // next level file cross large boundary of current file. + break; + } + bottom_end_idx++; + end_bottom_size_counted = false; + } + + if ((non_bottom_size + bottom_size > comp_thres_size || + end_idx == static_cast<int>(files.size()) - 1) && + non_bottom_size > 0) { // Do we alow 0 size file at all? + // If it is a better compaction, remember it in picked* variables. + double fanout = static_cast<double>(bottom_size) / + static_cast<double>(non_bottom_size); + if (fanout < picked_fanout) { + picked_start_idx = start_idx; + picked_end_idx = end_idx; + picked_fanout = fanout; + } + // Shrink from the start end to under comp_thres_size + while (non_bottom_size + bottom_size > comp_thres_size && + start_idx <= end_idx) { + non_bottom_size -= files[start_idx]->fd.file_size; + start_idx++; + if (start_idx < static_cast<int>(files.size())) { + while (bottom_start_idx <= bottom_end_idx && + icmp_->Compare(bottom_files[bottom_start_idx]->largest, + files[start_idx]->smallest) < 0) { + bottom_size -= bottom_files[bottom_start_idx]->fd.file_size; + bottom_start_idx++; + } + } + } + } + } + + if (picked_fanout >= fanout_threshold) { + assert(picked_fanout == fanout_threshold); + return nullptr; + } + + std::vector<CompactionInputFiles> inputs; + CompactionInputFiles bottom_level_inputs; + CompactionInputFiles second_last_level_inputs; + second_last_level_inputs.level = second_last_level; + bottom_level_inputs.level = output_level; + for (int i = picked_start_idx; i <= picked_end_idx; i++) { + if (files[i]->being_compacted) { + return nullptr; + } + second_last_level_inputs.files.push_back(files[i]); + } + assert(!second_last_level_inputs.empty()); + if (!picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &second_last_level_inputs, + /*next_smallest=*/nullptr)) { + return nullptr; + } + // We might be able to avoid this binary search if we save and expand + // from bottom_start_idx and bottom_end_idx, but for now, we use + // SetupOtherInputs() for simplicity. + int parent_index = -1; // Create and use bottom_start_idx? + if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_, + &second_last_level_inputs, + &bottom_level_inputs, &parent_index, + /*base_index=*/-1)) { + return nullptr; + } + + // Try to include files in upper levels if they fall into the range. + // Since we need to go from lower level up and this is in the reverse + // order, compared to level order, we first write to an reversed + // data structure and finally copy them to compaction inputs. + InternalKey smallest, largest; + picker_->GetRange(second_last_level_inputs, &smallest, &largest); + std::vector<CompactionInputFiles> inputs_reverse; + for (auto it = ++(++sorted_runs_.rbegin()); it != sorted_runs_.rend(); it++) { + SortedRun& sr = *it; + if (sr.level == 0) { + break; + } + std::vector<FileMetaData*> level_inputs; + vstorage_->GetCleanInputsWithinInterval(sr.level, &smallest, &largest, + &level_inputs); + if (!level_inputs.empty()) { + inputs_reverse.push_back({}); + inputs_reverse.back().level = sr.level; + inputs_reverse.back().files = level_inputs; + picker_->GetRange(inputs_reverse.back(), &smallest, &largest); + } + } + for (auto it = inputs_reverse.rbegin(); it != inputs_reverse.rend(); it++) { + inputs.push_back(*it); + } + + inputs.push_back(second_last_level_inputs); + inputs.push_back(bottom_level_inputs); + + int start_level = Compaction::kInvalidLevel; + for (const auto& in : inputs) { + if (!in.empty()) { + // inputs should already be sorted by level + start_level = in.level; + break; + } + } + + // intra L0 compactions outputs could have overlap + if (output_level != 0 && + picker_->FilesRangeOverlapWithCompaction( + inputs, output_level, + Compaction::EvaluatePenultimateLevel(vstorage_, ioptions_, + start_level, output_level))) { + return nullptr; + } + + // TODO support multi paths? + uint32_t path_id = 0; + return new Compaction( + vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_, + std::move(inputs), output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, + kCompactionStyleUniversal), + GetMaxOverlappingBytes(), path_id, + GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1, + true /* enable_compression */), + GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, + true /* enable_compression */), + Temperature::kUnknown, + /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, + /* trim_ts */ "", score_, false /* deletion_compaction */, + /* l0_files_might_overlap */ true, + CompactionReason::kUniversalSizeAmplification); +} + +// Pick files marked for compaction. Typically, files are marked by +// CompactOnDeleteCollector due to the presence of tombstones. +Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { + CompactionInputFiles start_level_inputs; + int output_level; + std::vector<CompactionInputFiles> inputs; + std::vector<FileMetaData*> grandparents; + + if (vstorage_->num_levels() == 1) { + // This is single level universal. Since we're basically trying to reclaim + // space by processing files marked for compaction due to high tombstone + // density, let's do the same thing as compaction to reduce size amp which + // has the same goals. + int start_index = -1; + + start_level_inputs.level = 0; + start_level_inputs.files.clear(); + output_level = 0; + // Find the first file marked for compaction. Ignore the last file + for (size_t loop = 0; loop + 1 < sorted_runs_.size(); loop++) { + SortedRun* sr = &sorted_runs_[loop]; + if (sr->being_compacted) { + continue; + } + FileMetaData* f = vstorage_->LevelFiles(0)[loop]; + if (f->marked_for_compaction) { + start_level_inputs.files.push_back(f); + start_index = + static_cast<int>(loop); // Consider this as the first candidate. + break; + } + } + if (start_index < 0) { + // Either no file marked, or they're already being compacted + return nullptr; + } + + for (size_t loop = start_index + 1; loop < sorted_runs_.size(); loop++) { + SortedRun* sr = &sorted_runs_[loop]; + if (sr->being_compacted) { + break; + } + + FileMetaData* f = vstorage_->LevelFiles(0)[loop]; + start_level_inputs.files.push_back(f); + } + if (start_level_inputs.size() <= 1) { + // If only the last file in L0 is marked for compaction, ignore it + return nullptr; + } + inputs.push_back(start_level_inputs); + } else { + int start_level; + + // For multi-level universal, the strategy is to make this look more like + // leveled. We pick one of the files marked for compaction and compact with + // overlapping files in the adjacent level. + picker_->PickFilesMarkedForCompaction(cf_name_, vstorage_, &start_level, + &output_level, &start_level_inputs); + if (start_level_inputs.empty()) { + return nullptr; + } + + // Pick the first non-empty level after the start_level + for (output_level = start_level + 1; output_level < vstorage_->num_levels(); + output_level++) { + if (vstorage_->NumLevelFiles(output_level) != 0) { + break; + } + } + + // If all higher levels are empty, pick the highest level as output level + if (output_level == vstorage_->num_levels()) { + if (start_level == 0) { + output_level = vstorage_->num_levels() - 1; + } else { + // If start level is non-zero and all higher levels are empty, this + // compaction will translate into a trivial move. Since the idea is + // to reclaim space and trivial move doesn't help with that, we + // skip compaction in this case and return nullptr + return nullptr; + } + } + if (ioptions_.allow_ingest_behind && + output_level == vstorage_->num_levels() - 1) { + assert(output_level > 1); + output_level--; + } + + if (output_level != 0) { + if (start_level == 0) { + if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs, + output_level, nullptr)) { + return nullptr; + } + } + + CompactionInputFiles output_level_inputs; + int parent_index = -1; + + output_level_inputs.level = output_level; + if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_, + &start_level_inputs, &output_level_inputs, + &parent_index, -1)) { + return nullptr; + } + inputs.push_back(start_level_inputs); + if (!output_level_inputs.empty()) { + inputs.push_back(output_level_inputs); + } + if (picker_->FilesRangeOverlapWithCompaction( + inputs, output_level, + Compaction::EvaluatePenultimateLevel( + vstorage_, ioptions_, start_level, output_level))) { + return nullptr; + } + + picker_->GetGrandparents(vstorage_, start_level_inputs, + output_level_inputs, &grandparents); + } else { + inputs.push_back(start_level_inputs); + } + } + + uint64_t estimated_total_size = 0; + // Use size of the output level as estimated file size + for (FileMetaData* f : vstorage_->LevelFiles(output_level)) { + estimated_total_size += f->fd.GetFileSize(); + } + uint32_t path_id = + GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); + return new Compaction( + vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_, + std::move(inputs), output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, + kCompactionStyleUniversal), + /* max_grandparent_overlap_bytes */ GetMaxOverlappingBytes(), path_id, + GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1), + GetCompressionOptions(mutable_cf_options_, vstorage_, output_level), + Temperature::kUnknown, + /* max_subcompactions */ 0, grandparents, /* is manual */ false, + /* trim_ts */ "", score_, false /* deletion_compaction */, + /* l0_files_might_overlap */ true, + CompactionReason::kFilesMarkedForCompaction); +} + +Compaction* UniversalCompactionBuilder::PickCompactionToOldest( + size_t start_index, CompactionReason compaction_reason) { + return PickCompactionWithSortedRunRange(start_index, sorted_runs_.size() - 1, + compaction_reason); +} + +Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange( + size_t start_index, size_t end_index, CompactionReason compaction_reason) { + assert(start_index < sorted_runs_.size()); + + // Estimate total file size + uint64_t estimated_total_size = 0; + for (size_t loop = start_index; loop <= end_index; loop++) { + estimated_total_size += sorted_runs_[loop].size; + } + uint32_t path_id = + GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); + int start_level = sorted_runs_[start_index].level; + + std::vector<CompactionInputFiles> inputs(vstorage_->num_levels()); + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i].level = start_level + static_cast<int>(i); + } + for (size_t loop = start_index; loop <= end_index; loop++) { + auto& picking_sr = sorted_runs_[loop]; + if (picking_sr.level == 0) { + FileMetaData* f = picking_sr.file; + inputs[0].files.push_back(f); + } else { + auto& files = inputs[picking_sr.level - start_level].files; + for (auto* f : vstorage_->LevelFiles(picking_sr.level)) { + files.push_back(f); + } + } + std::string comp_reason_print_string; + if (compaction_reason == CompactionReason::kPeriodicCompaction) { + comp_reason_print_string = "periodic compaction"; + } else if (compaction_reason == + CompactionReason::kUniversalSizeAmplification) { + comp_reason_print_string = "size amp"; + } else { + assert(false); + comp_reason_print_string = "unknown: "; + comp_reason_print_string.append( + std::to_string(static_cast<int>(compaction_reason))); + } + + char file_num_buf[256]; + picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s", + cf_name_.c_str(), comp_reason_print_string.c_str(), + file_num_buf); + } + + int output_level; + if (end_index == sorted_runs_.size() - 1) { + // output files at the last level, unless it's reserved + output_level = vstorage_->num_levels() - 1; + // last level is reserved for the files ingested behind + if (ioptions_.allow_ingest_behind) { + assert(output_level > 1); + output_level--; + } + } else { + // if it's not including all sorted_runs, it can only output to the level + // above the `end_index + 1` sorted_run. + output_level = sorted_runs_[end_index + 1].level - 1; + } + + // intra L0 compactions outputs could have overlap + if (output_level != 0 && + picker_->FilesRangeOverlapWithCompaction( + inputs, output_level, + Compaction::EvaluatePenultimateLevel(vstorage_, ioptions_, + start_level, output_level))) { + return nullptr; + } + + // We never check size for + // compaction_options_universal.compression_size_percent, + // because we always compact all the files, so always compress. + return new Compaction( + vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_, + std::move(inputs), output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, + kCompactionStyleUniversal), + GetMaxOverlappingBytes(), path_id, + GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1, + true /* enable_compression */), + GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, + true /* enable_compression */), + Temperature::kUnknown, + /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, + /* trim_ts */ "", score_, false /* deletion_compaction */, + /* l0_files_might_overlap */ true, compaction_reason); +} + +Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction", + cf_name_.c_str()); + + // In universal compaction, sorted runs contain older data are almost always + // generated earlier too. To simplify the problem, we just try to trigger + // a full compaction. We start from the oldest sorted run and include + // all sorted runs, until we hit a sorted already being compacted. + // Since usually the largest (which is usually the oldest) sorted run is + // included anyway, doing a full compaction won't increase write + // amplification much. + + // Get some information from marked files to check whether a file is + // included in the compaction. + + size_t start_index = sorted_runs_.size(); + while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted) { + start_index--; + } + if (start_index == sorted_runs_.size()) { + return nullptr; + } + + // There is a rare corner case where we can't pick up all the files + // because some files are being compacted and we end up with picking files + // but none of them need periodic compaction. Unless we simply recompact + // the last sorted run (either the last level or last L0 file), we would just + // execute the compaction, in order to simplify the logic. + if (start_index == sorted_runs_.size() - 1) { + bool included_file_marked = false; + int start_level = sorted_runs_[start_index].level; + FileMetaData* start_file = sorted_runs_[start_index].file; + for (const std::pair<int, FileMetaData*>& level_file_pair : + vstorage_->FilesMarkedForPeriodicCompaction()) { + if (start_level != 0) { + // Last sorted run is a level + if (start_level == level_file_pair.first) { + included_file_marked = true; + break; + } + } else { + // Last sorted run is a L0 file. + if (start_file == level_file_pair.second) { + included_file_marked = true; + break; + } + } + } + if (!included_file_marked) { + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: Cannot form a compaction covering file " + "marked for periodic compaction", + cf_name_.c_str()); + return nullptr; + } + } + + Compaction* c = PickCompactionToOldest(start_index, + CompactionReason::kPeriodicCompaction); + + TEST_SYNC_POINT_CALLBACK( + "UniversalCompactionPicker::PickPeriodicCompaction:Return", c); + + return c; +} + +uint64_t UniversalCompactionBuilder::GetMaxOverlappingBytes() const { + if (!mutable_cf_options_.compaction_options_universal.incremental) { + return std::numeric_limits<uint64_t>::max(); + } else { + // Try to align cutting boundary with files at the next level if the + // file isn't end up with 1/2 of target size, or it would overlap + // with two full size files at the next level. + return mutable_cf_options_.target_file_size_base / 2 * 3; + } +} +} // namespace ROCKSDB_NAMESPACE + +#endif // !ROCKSDB_LITE |