diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/compaction/tiered_compaction_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/compaction/tiered_compaction_test.cc')
-rw-r--r-- | src/rocksdb/db/compaction/tiered_compaction_test.cc | 2028 |
1 files changed, 2028 insertions, 0 deletions
diff --git a/src/rocksdb/db/compaction/tiered_compaction_test.cc b/src/rocksdb/db/compaction/tiered_compaction_test.cc new file mode 100644 index 000000000..aaebcfd94 --- /dev/null +++ b/src/rocksdb/db/compaction/tiered_compaction_test.cc @@ -0,0 +1,2028 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "rocksdb/iostats_context.h" +#include "rocksdb/listener.h" +#include "rocksdb/utilities/debug.h" +#include "test_util/mock_time_env.h" + +namespace ROCKSDB_NAMESPACE { + +#if !defined(ROCKSDB_LITE) + +class TieredCompactionTest : public DBTestBase, + public testing::WithParamInterface<bool> { + public: + TieredCompactionTest() + : DBTestBase("tiered_compaction_test", /*env_do_fsync=*/true), + kBasicCompStats(CompactionReason::kUniversalSizeAmplification, 1), + kBasicPerKeyPlacementCompStats( + CompactionReason::kUniversalSizeAmplification, 1), + kBasicFlushStats(CompactionReason::kFlush, 1) { + kBasicCompStats.micros = kHasValue; + kBasicCompStats.cpu_micros = kHasValue; + kBasicCompStats.bytes_read_non_output_levels = kHasValue; + kBasicCompStats.num_input_files_in_non_output_levels = kHasValue; + kBasicCompStats.num_input_records = kHasValue; + kBasicCompStats.num_dropped_records = kHasValue; + + kBasicPerLevelStats.num_output_records = kHasValue; + kBasicPerLevelStats.bytes_written = kHasValue; + kBasicPerLevelStats.num_output_files = kHasValue; + + kBasicPerKeyPlacementCompStats.micros = kHasValue; + kBasicPerKeyPlacementCompStats.cpu_micros = kHasValue; + kBasicPerKeyPlacementCompStats.Add(kBasicPerLevelStats); + + kBasicFlushStats.micros = kHasValue; + kBasicFlushStats.cpu_micros = kHasValue; + kBasicFlushStats.bytes_written = kHasValue; + kBasicFlushStats.num_output_files = kHasValue; + } + + protected: + static constexpr uint8_t kHasValue = 1; + + InternalStats::CompactionStats kBasicCompStats; + InternalStats::CompactionStats kBasicPerKeyPlacementCompStats; + InternalStats::CompactionOutputsStats kBasicPerLevelStats; + InternalStats::CompactionStats kBasicFlushStats; + + std::atomic_bool enable_per_key_placement = true; + + void SetUp() override { + SyncPoint::GetInstance()->SetCallBack( + "Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) { + auto supports_per_key_placement = static_cast<bool*>(arg); + *supports_per_key_placement = enable_per_key_placement; + }); + SyncPoint::GetInstance()->EnableProcessing(); + } + + const std::vector<InternalStats::CompactionStats>& GetCompactionStats() { + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + return internal_stats->TEST_GetCompactionStats(); + } + + const InternalStats::CompactionStats& GetPerKeyPlacementCompactionStats() { + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + return internal_stats->TEST_GetPerKeyPlacementCompactionStats(); + } + + // Verify the compaction stats, the stats are roughly compared + void VerifyCompactionStats( + const std::vector<InternalStats::CompactionStats>& expect_stats, + const InternalStats::CompactionStats& expect_pl_stats) { + const std::vector<InternalStats::CompactionStats>& stats = + GetCompactionStats(); + const size_t kLevels = expect_stats.size(); + ASSERT_EQ(kLevels, stats.size()); + + for (auto it = stats.begin(), expect = expect_stats.begin(); + it != stats.end(); it++, expect++) { + VerifyCompactionStats(*it, *expect); + } + + const InternalStats::CompactionStats& pl_stats = + GetPerKeyPlacementCompactionStats(); + VerifyCompactionStats(pl_stats, expect_pl_stats); + } + + void ResetAllStats(std::vector<InternalStats::CompactionStats>& stats, + InternalStats::CompactionStats& pl_stats) { + ASSERT_OK(dbfull()->ResetStats()); + for (auto& level_stats : stats) { + level_stats.Clear(); + } + pl_stats.Clear(); + } + + // bottommost_temperature is renaming to last_level_temperature, set either + // of them should have the same effect. + void SetColdTemperature(Options& options) { + if (GetParam()) { + options.bottommost_temperature = Temperature::kCold; + } else { + options.last_level_temperature = Temperature::kCold; + } + } + + private: + void CompareStats(uint64_t val, uint64_t expect) { + if (expect > 0) { + ASSERT_TRUE(val > 0); + } else { + ASSERT_EQ(val, 0); + } + } + + void VerifyCompactionStats( + const InternalStats::CompactionStats& stats, + const InternalStats::CompactionStats& expect_stats) { + CompareStats(stats.micros, expect_stats.micros); + CompareStats(stats.cpu_micros, expect_stats.cpu_micros); + CompareStats(stats.bytes_read_non_output_levels, + expect_stats.bytes_read_non_output_levels); + CompareStats(stats.bytes_read_output_level, + expect_stats.bytes_read_output_level); + CompareStats(stats.bytes_read_blob, expect_stats.bytes_read_blob); + CompareStats(stats.bytes_written, expect_stats.bytes_written); + CompareStats(stats.bytes_moved, expect_stats.bytes_moved); + CompareStats(stats.num_input_files_in_non_output_levels, + expect_stats.num_input_files_in_non_output_levels); + CompareStats(stats.num_input_files_in_output_level, + expect_stats.num_input_files_in_output_level); + CompareStats(stats.num_output_files, expect_stats.num_output_files); + CompareStats(stats.num_output_files_blob, + expect_stats.num_output_files_blob); + CompareStats(stats.num_input_records, expect_stats.num_input_records); + CompareStats(stats.num_dropped_records, expect_stats.num_dropped_records); + CompareStats(stats.num_output_records, expect_stats.num_output_records); + ASSERT_EQ(stats.count, expect_stats.count); + for (int i = 0; i < static_cast<int>(CompactionReason::kNumOfReasons); + i++) { + ASSERT_EQ(stats.counts[i], expect_stats.counts[i]); + } + } +}; + +TEST_P(TieredCompactionTest, SequenceBasedTieredStorageUniversal) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kLastLevel = kNumLevels - 1; + + auto options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + SetColdTemperature(options); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.statistics = CreateDBStatistics(); + options.max_subcompactions = 10; + DestroyAndReopen(options); + + std::atomic_uint64_t latest_cold_seq = 0; + std::vector<SequenceNumber> seq_history; + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput.context", [&](void* arg) { + auto context = static_cast<PerKeyPlacementContext*>(arg); + context->output_to_penultimate_level = + context->seq_num > latest_cold_seq; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels); + InternalStats::CompactionStats& last_stats = expect_stats[kLastLevel]; + InternalStats::CompactionStats expect_pl_stats; + + for (int i = 0; i < kNumTrigger; i++) { + for (int j = 0; j < kNumKeys; j++) { + ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + seq_history.emplace_back(dbfull()->GetLatestSequenceNumber()); + expect_stats[0].Add(kBasicFlushStats); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // the penultimate level file temperature is not cold, all data are output to + // the penultimate level. + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // basic compaction stats are still counted to the last level + expect_stats[kLastLevel].Add(kBasicCompStats); + expect_pl_stats.Add(kBasicPerKeyPlacementCompStats); + + VerifyCompactionStats(expect_stats, expect_pl_stats); + + ResetAllStats(expect_stats, expect_pl_stats); + + // move forward the cold_seq to split the file into 2 levels, so should have + // both the last level stats and the output_to_penultimate_level stats + latest_cold_seq = seq_history[0]; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + last_stats.Add(kBasicCompStats); + last_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + last_stats.Add(kBasicPerLevelStats); + last_stats.num_dropped_records = 0; + expect_pl_stats.Add(kBasicPerKeyPlacementCompStats); + expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + VerifyCompactionStats(expect_stats, expect_pl_stats); + + // delete all cold data, so all data will be on penultimate level + for (int i = 0; i < 10; i++) { + ASSERT_OK(Delete(Key(i))); + } + ASSERT_OK(Flush()); + + ResetAllStats(expect_stats, expect_pl_stats); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + last_stats.Add(kBasicCompStats); + last_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + last_stats.bytes_read_output_level = kHasValue; + last_stats.num_input_files_in_output_level = kHasValue; + expect_pl_stats.Add(kBasicPerKeyPlacementCompStats); + expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + VerifyCompactionStats(expect_stats, expect_pl_stats); + + // move forward the cold_seq again with range delete, take a snapshot to keep + // the range dels in both cold and hot SSTs + auto snap = db_->GetSnapshot(); + latest_cold_seq = seq_history[2]; + std::string start = Key(25), end = Key(35); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + + ResetAllStats(expect_stats, expect_pl_stats); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + last_stats.Add(kBasicCompStats); + last_stats.Add(kBasicPerLevelStats); + last_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + expect_pl_stats.Add(kBasicPerKeyPlacementCompStats); + expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + VerifyCompactionStats(expect_stats, expect_pl_stats); + + // verify data + std::string value; + for (int i = 0; i < kNumKeys; i++) { + if (i < 10 || (i >= 25 && i < 35)) { + ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &value).IsNotFound()); + } else { + ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value)); + } + } + + // range delete all hot data + start = Key(30); + end = Key(130); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // no range del is dropped because of snapshot + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 0); + + // release the snapshot and do compaction again should remove all hot data + db_->ReleaseSnapshot(snap); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // 2 range dels are dropped + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 3); + + // move backward the cold_seq, for example the user may change the setting of + // hot/cold data, but it won't impact the existing cold data, as the sequence + // number is zeroed out. + latest_cold_seq = seq_history[1]; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); +} + +TEST_P(TieredCompactionTest, RangeBasedTieredStorageUniversal) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kLastLevel = kNumLevels - 1; + + auto options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + SetColdTemperature(options); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.statistics = CreateDBStatistics(); + options.max_subcompactions = 10; + DestroyAndReopen(options); + auto cmp = options.comparator; + + port::Mutex mutex; + std::string hot_start = Key(10); + std::string hot_end = Key(50); + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput.context", [&](void* arg) { + auto context = static_cast<PerKeyPlacementContext*>(arg); + MutexLock l(&mutex); + context->output_to_penultimate_level = + cmp->Compare(context->key, hot_start) >= 0 && + cmp->Compare(context->key, hot_end) < 0; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels); + InternalStats::CompactionStats& last_stats = expect_stats[kLastLevel]; + InternalStats::CompactionStats expect_pl_stats; + + for (int i = 0; i < kNumTrigger; i++) { + for (int j = 0; j < kNumKeys; j++) { + ASSERT_OK(Put(Key(j), "value" + std::to_string(j))); + } + ASSERT_OK(Flush()); + expect_stats[0].Add(kBasicFlushStats); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + last_stats.Add(kBasicCompStats); + last_stats.Add(kBasicPerLevelStats); + expect_pl_stats.Add(kBasicPerKeyPlacementCompStats); + VerifyCompactionStats(expect_stats, expect_pl_stats); + + ResetAllStats(expect_stats, expect_pl_stats); + + // change to all cold, no output_to_penultimate_level output + { + MutexLock l(&mutex); + hot_start = Key(100); + hot_end = Key(200); + } + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + last_stats.Add(kBasicCompStats); + last_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + last_stats.Add(kBasicPerLevelStats); + last_stats.num_dropped_records = 0; + last_stats.bytes_read_output_level = kHasValue; + last_stats.num_input_files_in_output_level = kHasValue; + VerifyCompactionStats(expect_stats, expect_pl_stats); + + // change to all hot, universal compaction support moving data to up level if + // it's within compaction level range. + { + MutexLock l(&mutex); + hot_start = Key(0); + hot_end = Key(100); + } + + // No data is moved from cold tier to hot tier because no input files from L5 + // or higher, it's not safe to move data to output_to_penultimate_level level. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + + // Add 2 keys in higher level, but in separated files, all keys can be moved + // up if it's hot + ASSERT_OK(Put(Key(0), "value" + std::to_string(0))); + ASSERT_OK(Flush()); + ASSERT_OK(Put(Key(50), "value" + std::to_string(0))); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // change to only 1 key cold, to test compaction could stop even it matches + // size amp compaction threshold + { + MutexLock l(&mutex); + hot_start = Key(1); + hot_end = Key(1000); + } + + // generate files just enough to trigger compaction + for (int i = 0; i < kNumTrigger - 1; i++) { + for (int j = 0; j < 1000; j++) { + ASSERT_OK(Put(Key(j), "value" + std::to_string(j))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact( + true)); // make sure the compaction is able to finish + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + auto opts = db_->GetOptions(); + auto max_size_amp = + opts.compaction_options_universal.max_size_amplification_percent / 100; + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), + GetSstSizeHelper(Temperature::kCold) * max_size_amp); + + // delete all cold data + ASSERT_OK(Delete(Key(0))); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // range delete overlap with both hot/cold data, with a snapshot to make sure + // the range del is saved + auto snap = db_->GetSnapshot(); + { + MutexLock l(&mutex); + hot_start = Key(50); + hot_end = Key(100); + } + std::string start = Key(1), end = Key(70); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // no range del is dropped until snapshot is released + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 0); + + // verify data + std::string value; + for (int i = 0; i < kNumKeys; i++) { + if (i < 70) { + ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &value).IsNotFound()); + } else { + ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value)); + } + } + + db_->ReleaseSnapshot(snap); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // range del is dropped + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 1); +} + +TEST_P(TieredCompactionTest, LevelColdRangeDelete) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kLastLevel = kNumLevels - 1; + + auto options = CurrentOptions(); + SetColdTemperature(options); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + options.statistics = CreateDBStatistics(); + options.max_subcompactions = 10; + DestroyAndReopen(options); + + std::atomic_uint64_t latest_cold_seq = 0; + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput.context", [&](void* arg) { + auto context = static_cast<PerKeyPlacementContext*>(arg); + context->output_to_penultimate_level = + context->seq_num > latest_cold_seq; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(i), "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,1", + FilesPerLevel()); // bottommost but not last level file is hot + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // explicitly move the data to the last level + MoveFilesToLevel(kLastLevel); + + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + auto snap = db_->GetSnapshot(); + + std::string start = Key(10); + std::string end = Key(50); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + + // 20->30 will be marked as cold data, but it cannot be placed to cold tier + // (bottommost) otherwise, it will be "deleted" by the range del in + // output_to_penultimate_level level verify that these data will be able to + // queried + for (int i = 20; i < 30; i++) { + ASSERT_OK(Put(Key(i), "value" + std::to_string(i))); + } + // make the range tombstone and data after that cold + latest_cold_seq = dbfull()->GetLatestSequenceNumber(); + + // add home hot data, just for test + for (int i = 30; i < 40; i++) { + ASSERT_OK(Put(Key(i), "value" + std::to_string(i))); + } + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + std::string value; + for (int i = 0; i < kNumKeys; i++) { + auto s = db_->Get(ReadOptions(), Key(i), &value); + if ((i >= 10 && i < 20) || (i >= 40 && i < 50)) { + ASSERT_TRUE(s.IsNotFound()); + } else { + ASSERT_OK(s); + } + } + + db_->ReleaseSnapshot(snap); +} + +// Test SST partitioner cut after every single key +class SingleKeySstPartitioner : public SstPartitioner { + public: + const char* Name() const override { return "SingleKeySstPartitioner"; } + + PartitionerResult ShouldPartition( + const PartitionerRequest& /*request*/) override { + return kRequired; + } + + bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, + const Slice& /*largest_user_key*/) override { + return false; + } +}; + +class SingleKeySstPartitionerFactory : public SstPartitionerFactory { + public: + static const char* kClassName() { return "SingleKeySstPartitionerFactory"; } + const char* Name() const override { return kClassName(); } + + std::unique_ptr<SstPartitioner> CreatePartitioner( + const SstPartitioner::Context& /* context */) const override { + return std::unique_ptr<SstPartitioner>(new SingleKeySstPartitioner()); + } +}; + +TEST_P(TieredCompactionTest, LevelOutofBoundaryRangeDelete) { + const int kNumTrigger = 4; + const int kNumLevels = 3; + const int kNumKeys = 10; + + auto factory = std::make_shared<SingleKeySstPartitionerFactory>(); + auto options = CurrentOptions(); + SetColdTemperature(options); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + options.statistics = CreateDBStatistics(); + options.sst_partitioner_factory = factory; + options.max_subcompactions = 10; + DestroyAndReopen(options); + + std::atomic_uint64_t latest_cold_seq = 0; + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput.context", [&](void* arg) { + auto context = static_cast<PerKeyPlacementContext*>(arg); + context->output_to_penultimate_level = + context->seq_num > latest_cold_seq; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(i), "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + + MoveFilesToLevel(kNumLevels - 1); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_EQ("0,0,10", FilesPerLevel()); + + auto snap = db_->GetSnapshot(); + + // only range delete + std::string start = Key(3); + std::string end = Key(5); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), + 0); // tombstone has no size, even it's in hot tier + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_EQ("0,1,10", + FilesPerLevel()); // one file is at the penultimate level which + // only contains a range delete + + // Add 2 hot keys, each is a new SST, they will be placed in the same level as + // range del, but they don't have overlap with range del, make sure the range + // del will still be placed there + latest_cold_seq = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Put(Key(0), "new value" + std::to_string(0))); + auto snap2 = db_->GetSnapshot(); + ASSERT_OK(Put(Key(6), "new value" + std::to_string(6))); + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,2,10", + FilesPerLevel()); // one file is at the penultimate level + // which only contains a range delete + std::vector<LiveFileMetaData> live_file_meta; + db_->GetLiveFilesMetaData(&live_file_meta); + bool found_sst_with_del = false; + uint64_t sst_with_del_num = 0; + for (const auto& meta : live_file_meta) { + if (meta.num_deletions > 0) { + // found SST with del, which has 2 entries, one for data one for range del + ASSERT_EQ(meta.level, + kNumLevels - 2); // output to penultimate level + ASSERT_EQ(meta.num_entries, 2); + ASSERT_EQ(meta.num_deletions, 1); + found_sst_with_del = true; + sst_with_del_num = meta.file_number; + } + } + ASSERT_TRUE(found_sst_with_del); + + // release the first snapshot and compact, which should compact the range del + // but new inserted key `0` and `6` are still hot data which will be placed on + // the penultimate level + db_->ReleaseSnapshot(snap); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,2,7", FilesPerLevel()); + db_->GetLiveFilesMetaData(&live_file_meta); + found_sst_with_del = false; + for (const auto& meta : live_file_meta) { + // check new SST with del (the old one may not yet be deleted after + // compaction) + if (meta.num_deletions > 0 && meta.file_number != sst_with_del_num) { + found_sst_with_del = true; + } + } + ASSERT_FALSE(found_sst_with_del); + + // Now make all data cold, key 0 will be moved to the last level, but key 6 is + // still in snap2, so it will be kept at the penultimate level + latest_cold_seq = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,1,8", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + db_->ReleaseSnapshot(snap2); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,8", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); +} + +TEST_P(TieredCompactionTest, UniversalRangeDelete) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 10; + + auto factory = std::make_shared<SingleKeySstPartitionerFactory>(); + + auto options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + SetColdTemperature(options); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.statistics = CreateDBStatistics(); + options.sst_partitioner_factory = factory; + options.max_subcompactions = 10; + DestroyAndReopen(options); + + std::atomic_uint64_t latest_cold_seq = 0; + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput.context", [&](void* arg) { + auto context = static_cast<PerKeyPlacementContext*>(arg); + context->output_to_penultimate_level = + context->seq_num > latest_cold_seq; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(i), "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + + // compact to the penultimate level with 10 files + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_EQ("0,0,0,0,0,10", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // make all data cold + latest_cold_seq = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,10", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // range del which considered as hot data, but it will be merged and deleted + // with the last level data + std::string start = Key(3); + std::string end = Key(5); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_EQ("0,0,0,0,0,0,8", FilesPerLevel()); + + // range del with snapshot should be preserved in the penultimate level + auto snap = db_->GetSnapshot(); + + start = Key(6); + end = Key(8); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,8", FilesPerLevel()); + + // Add 2 hot keys, each is a new SST, they will be placed in the same level as + // range del, but no overlap with range del. + latest_cold_seq = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Put(Key(4), "new value" + std::to_string(0))); + auto snap2 = db_->GetSnapshot(); + ASSERT_OK(Put(Key(9), "new value" + std::to_string(6))); + + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,2,8", FilesPerLevel()); + // find the SST with range del + std::vector<LiveFileMetaData> live_file_meta; + db_->GetLiveFilesMetaData(&live_file_meta); + bool found_sst_with_del = false; + uint64_t sst_with_del_num = 0; + for (const auto& meta : live_file_meta) { + if (meta.num_deletions > 0) { + // found SST with del, which has 2 entries, one for data one for range del + ASSERT_EQ(meta.level, + kNumLevels - 2); // output_to_penultimate_level level + ASSERT_EQ(meta.num_entries, 2); + ASSERT_EQ(meta.num_deletions, 1); + found_sst_with_del = true; + sst_with_del_num = meta.file_number; + } + } + ASSERT_TRUE(found_sst_with_del); + + // release the first snapshot which should compact the range del, but data on + // the same level is still hot + db_->ReleaseSnapshot(snap); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,2,6", FilesPerLevel()); + db_->GetLiveFilesMetaData(&live_file_meta); + // no range del should be found in SST + found_sst_with_del = false; + for (const auto& meta : live_file_meta) { + // check new SST with del (the old one may not yet be deleted after + // compaction) + if (meta.num_deletions > 0 && meta.file_number != sst_with_del_num) { + found_sst_with_del = true; + } + } + ASSERT_FALSE(found_sst_with_del); + + // make all data to cold, but key 6 is still protected by snap2 + latest_cold_seq = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,7", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + db_->ReleaseSnapshot(snap2); + + // release snapshot, everything go to bottommost + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,7", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); +} + +TEST_P(TieredCompactionTest, SequenceBasedTieredStorageLevel) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kLastLevel = kNumLevels - 1; + + auto options = CurrentOptions(); + SetColdTemperature(options); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + options.statistics = CreateDBStatistics(); + options.max_subcompactions = 10; + DestroyAndReopen(options); + + std::atomic_uint64_t latest_cold_seq = 0; + std::vector<SequenceNumber> seq_history; + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput.context", [&](void* arg) { + auto context = static_cast<PerKeyPlacementContext*>(arg); + context->output_to_penultimate_level = + context->seq_num > latest_cold_seq; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels); + InternalStats::CompactionStats& last_stats = expect_stats[kLastLevel]; + InternalStats::CompactionStats expect_pl_stats; + + for (int i = 0; i < kNumTrigger; i++) { + for (int j = 0; j < kNumKeys; j++) { + ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + expect_stats[0].Add(kBasicFlushStats); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // non last level is hot + ASSERT_EQ("0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + expect_stats[1].Add(kBasicCompStats); + expect_stats[1].Add(kBasicPerLevelStats); + expect_stats[1].ResetCompactionReason(CompactionReason::kLevelL0FilesNum); + VerifyCompactionStats(expect_stats, expect_pl_stats); + + // move all data to the last level + MoveFilesToLevel(kLastLevel); + + ResetAllStats(expect_stats, expect_pl_stats); + + // The compaction won't move the data up + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + last_stats.Add(kBasicCompStats); + last_stats.Add(kBasicPerLevelStats); + last_stats.num_dropped_records = 0; + last_stats.bytes_read_non_output_levels = 0; + last_stats.num_input_files_in_non_output_levels = 0; + last_stats.bytes_read_output_level = kHasValue; + last_stats.num_input_files_in_output_level = kHasValue; + last_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + VerifyCompactionStats(expect_stats, expect_pl_stats); + + // Add new data, which is all hot and overriding all existing data + for (int i = 0; i < kNumTrigger; i++) { + for (int j = 0; j < kNumKeys; j++) { + ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + seq_history.emplace_back(dbfull()->GetLatestSequenceNumber()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + ResetAllStats(expect_stats, expect_pl_stats); + + // after compaction, all data are hot + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + for (int level = 2; level < kNumLevels - 1; level++) { + expect_stats[level].bytes_moved = kHasValue; + } + + last_stats.Add(kBasicCompStats); + last_stats.bytes_read_output_level = kHasValue; + last_stats.num_input_files_in_output_level = kHasValue; + last_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + expect_pl_stats.Add(kBasicPerKeyPlacementCompStats); + expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction); + VerifyCompactionStats(expect_stats, expect_pl_stats); + + // move forward the cold_seq, try to split the data into cold and hot, but in + // this case it's unsafe to split the data + // because it's non-last-level but bottommost file, the sequence number will + // be zeroed out and lost the time information (with + // `level_compaction_dynamic_level_bytes` or Universal Compaction, it should + // be rare.) + // TODO(zjay): ideally we should avoid zero out non-last-level bottommost file + latest_cold_seq = seq_history[1]; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + seq_history.clear(); + + // manually move all data (cold) to last level + MoveFilesToLevel(kLastLevel); + seq_history.clear(); + // Add new data once again + for (int i = 0; i < kNumTrigger; i++) { + for (int j = 0; j < kNumKeys; j++) { + ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + seq_history.emplace_back(dbfull()->GetLatestSequenceNumber()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + latest_cold_seq = seq_history[0]; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // delete all cold data + for (int i = 0; i < 10; i++) { + ASSERT_OK(Delete(Key(i))); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + latest_cold_seq = seq_history[2]; + + MoveFilesToLevel(kLastLevel); + + // move forward the cold_seq again with range delete, take a snapshot to keep + // the range dels in bottommost + auto snap = db_->GetSnapshot(); + + std::string start = Key(25), end = Key(35); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + // add one small key and large key in the input level, to make sure it's able + // to move hot data to input level within that range + ASSERT_OK(Put(Key(0), "value" + std::to_string(0))); + ASSERT_OK(Put(Key(100), "value" + std::to_string(0))); + + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // verify data + std::string value; + for (int i = 1; i < 130; i++) { + if (i < 10 || (i >= 25 && i < 35)) { + ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &value).IsNotFound()); + } else { + ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value)); + } + } + + // delete all hot data + ASSERT_OK(Delete(Key(0))); + start = Key(30); + end = Key(101); // range [101, 130] is cold, because it's not in input range + // in previous compaction + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // no range del is dropped because of snapshot + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 0); + + db_->ReleaseSnapshot(snap); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // 3 range dels dropped, the first one is double counted as expected, which is + // spread into 2 SST files + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 3); + + // move backward of cold_seq, which might happen when the user change the + // setting. the hot data won't move up, just to make sure it still runs + // fine, which is because: + // 1. sequence number is zeroed out, so no time information + // 2. leveled compaction only support move data up within the higher level + // input range + latest_cold_seq = seq_history[1]; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); +} + +TEST_P(TieredCompactionTest, RangeBasedTieredStorageLevel) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + + auto options = CurrentOptions(); + SetColdTemperature(options); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.level_compaction_dynamic_level_bytes = true; + options.num_levels = kNumLevels; + options.statistics = CreateDBStatistics(); + options.max_subcompactions = 10; + DestroyAndReopen(options); + auto cmp = options.comparator; + + port::Mutex mutex; + std::string hot_start = Key(10); + std::string hot_end = Key(50); + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput.context", [&](void* arg) { + auto context = static_cast<PerKeyPlacementContext*>(arg); + MutexLock l(&mutex); + context->output_to_penultimate_level = + cmp->Compare(context->key, hot_start) >= 0 && + cmp->Compare(context->key, hot_end) < 0; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < kNumTrigger; i++) { + for (int j = 0; j < kNumKeys; j++) { + ASSERT_OK(Put(Key(j), "value" + std::to_string(j))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // change to all cold + { + MutexLock l(&mutex); + hot_start = Key(100); + hot_end = Key(200); + } + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // change to all hot, but level compaction only support move cold to hot + // within it's higher level input range. + { + MutexLock l(&mutex); + hot_start = Key(0); + hot_end = Key(100); + } + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // with mixed hot/cold data + { + MutexLock l(&mutex); + hot_start = Key(50); + hot_end = Key(100); + } + ASSERT_OK(Put(Key(0), "value" + std::to_string(0))); + ASSERT_OK(Put(Key(100), "value" + std::to_string(100))); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // delete all hot data, but with snapshot to keep the range del + auto snap = db_->GetSnapshot(); + std::string start = Key(50); + std::string end = Key(100); + ASSERT_OK( + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // no range del is dropped because of snapshot + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 0); + + // release the snapshot and do compaction again should remove all hot data + db_->ReleaseSnapshot(snap); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + ASSERT_EQ( + options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE), + 1); +} + +INSTANTIATE_TEST_CASE_P(TieredCompactionTest, TieredCompactionTest, + testing::Bool()); + +class PrecludeLastLevelTest : public DBTestBase { + public: + PrecludeLastLevelTest() + : DBTestBase("preclude_last_level_test", /*env_do_fsync=*/false) { + mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock()); + mock_env_ = std::make_unique<CompositeEnvWrapper>(env_, mock_clock_); + } + + protected: + std::unique_ptr<Env> mock_env_; + std::shared_ptr<MockSystemClock> mock_clock_; + + void SetUp() override { + mock_clock_->InstallTimedWaitFixCallback(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast<PeriodicTaskScheduler*>(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); + }); + mock_clock_->SetCurrentTime(0); + } +}; + +TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 10000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + // enable preclude feature + options.preclude_last_level_data_seconds = 10000; + options.last_level_temperature = Temperature::kCold; + Reopen(options); + + // all data is hot, even they're in the last level + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // Generate a sstable and trigger manual compaction + ASSERT_OK(Put(Key(10), "value")); + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // all data is moved up to the penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // close explicitly, because the env is local variable which will be released + // first. + Close(); +} + +TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 10000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + // enable preclude feature + options.preclude_last_level_data_seconds = 10000; + options.last_level_temperature = Temperature::kCold; + // make sure it won't trigger Size Amp compaction, unlike normal Size Amp + // compaction which is typically a last level compaction, when tiered Storage + // ("preclude_last_level") is enabled, size amp won't include the last level. + // As the last level would be in cold tier and the size would not be a + // problem, which also avoid frequent hot to cold storage compaction. + options.compaction_options_universal.max_size_amplification_percent = 400; + Reopen(options); + + // all data is hot, even they're in the last level + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // Write more data, but still all hot until the 10th SST, as: + // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds + // The preclude_last_level_data_seconds is 10k + Random rnd(301); + for (; sst_num < kNumTrigger * 2 - 1; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + // the value needs to be big enough to trigger full compaction + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), rnd.RandomString(100))); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->WaitForCompact(true)); + } + + // all data is moved up to the penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // close explicitly, because the env is local variable which will be released + // first. + Close(); +} + +TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 2000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + std::vector<KeyVersion> key_versions; + ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(), + std::numeric_limits<size_t>::max(), + &key_versions)); + + // make sure there're more than 300 keys and first 100 keys are having seqno + // zeroed out, the last 100 key seqno not zeroed out + ASSERT_GT(key_versions.size(), 300); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(key_versions[i].sequence, 0); + } + auto rit = key_versions.rbegin(); + for (int i = 0; i < 100; i++) { + ASSERT_GT(rit->sequence, 0); + rit++; + } + + // enable preclude feature + options.preclude_last_level_data_seconds = 2000; + options.last_level_temperature = Temperature::kCold; + Reopen(options); + + // Generate a sstable and trigger manual compaction + ASSERT_OK(Put(Key(10), "value")); + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // some data are moved up, some are not + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + Close(); +} + +TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preclude_last_level_data_seconds = 60; + options.preserve_internal_time_seconds = 0; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + options.last_level_temperature = Temperature::kCold; + DestroyAndReopen(options); + + Random rnd(301); + + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(10) + 1)); + }); + + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(2))); + }); + } + ASSERT_OK(Flush()); + + TablePropertiesCollection tables_props; + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 1); + ASSERT_FALSE(tables_props.begin()->second->seqno_to_time_mapping.empty()); + SeqnoToTimeMapping tp_mapping; + ASSERT_OK( + tp_mapping.Add(tables_props.begin()->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + ASSERT_FALSE(tp_mapping.Empty()); + auto seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_FALSE(seqs.empty()); + + // Wait more than preclude_last_level time, then make sure all the data is + // compacted to the last level even there's no write (no seqno -> time + // information was flushed to any SST). + mock_clock_->MockSleepForSeconds(100); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + Close(); +} + +TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 2000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + // enable preclude feature + options.preclude_last_level_data_seconds = 2000; + options.last_level_temperature = Temperature::kCold; + Reopen(options); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // some data are moved up, some are not + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + std::vector<KeyVersion> key_versions; + ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(), + std::numeric_limits<size_t>::max(), + &key_versions)); + + // make sure there're more than 300 keys and first 100 keys are having seqno + // zeroed out, the last 100 key seqno not zeroed out + ASSERT_GT(key_versions.size(), 300); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(key_versions[i].sequence, 0); + } + auto rit = key_versions.rbegin(); + for (int i = 0; i < 100; i++) { + ASSERT_GT(rit->sequence, 0); + rit++; + } + + Close(); +} + +class PrecludeLastLevelTestWithParms + : public PrecludeLastLevelTest, + public testing::WithParamInterface<bool> { + public: + PrecludeLastLevelTestWithParms() : PrecludeLastLevelTest() {} +}; + +TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + bool enable_preclude_last_level = GetParam(); + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 2000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); }); + + Random rnd(301); + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), rnd.RandomString(100))); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + std::atomic_bool is_manual_compaction_running = false; + std::atomic_bool verified_compaction_order = false; + + // Make sure the manual compaction is in progress and try to trigger a + // SizeRatio compaction by flushing 4 files to L0. The compaction will try to + // compact 4 files at L0 to L5 (the last empty level). + // If the preclude_last_feature is enabled, the auto triggered compaction + // cannot be picked. Otherwise, the auto triggered compaction can run in + // parallel with the last level compaction. + // L0: [a] [b] [c] [d] + // L5: (locked if preclude_last_level is enabled) + // L6: [z] (locked: manual compaction in progress) + // TODO: in this case, L0 files should just be compacted to L4, so the 2 + // compactions won't be overlapped. + SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) { + auto compaction = static_cast<Compaction*>(arg); + if (compaction->is_manual_compaction()) { + is_manual_compaction_running = true; + TEST_SYNC_POINT( + "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:" + "ManualCompaction1"); + TEST_SYNC_POINT( + "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:" + "ManualCompaction2"); + is_manual_compaction_running = false; + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) { + auto compaction = static_cast<Compaction*>(arg); + if (enable_preclude_last_level && is_manual_compaction_running) { + ASSERT_TRUE(compaction == nullptr); + verified_compaction_order = true; + } else { + ASSERT_TRUE(compaction != nullptr); + verified_compaction_order = true; + } + if (!compaction || !compaction->is_manual_compaction()) { + TEST_SYNC_POINT( + "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:" + "AutoCompactionPicked"); + } + }); + + SyncPoint::GetInstance()->LoadDependency({ + {"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:" + "ManualCompaction1", + "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:StartWrite"}, + {"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:" + "AutoCompactionPicked", + "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:" + "ManualCompaction2"}, + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + // only enable if the Parameter is true + if (enable_preclude_last_level) { + options.preclude_last_level_data_seconds = 2000; + } + options.max_background_jobs = 8; + options.last_level_temperature = Temperature::kCold; + Reopen(options); + + auto manual_compaction_thread = port::Thread([this]() { + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + cro.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + }); + + TEST_SYNC_POINT( + "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:StartWrite"); + auto stop_token = + dbfull()->TEST_write_controler().GetCompactionPressureToken(); + + for (; sst_num < kNumTrigger * 2; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + // the value needs to be big enough to trigger full compaction + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + + manual_compaction_thread.join(); + + ASSERT_OK(dbfull()->WaitForCompact(true)); + + if (enable_preclude_last_level) { + ASSERT_NE("0,0,0,0,0,1,1", FilesPerLevel()); + } else { + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + } + ASSERT_TRUE(verified_compaction_order); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + stop_token.reset(); + + Close(); +} + +INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTestWithParms, + PrecludeLastLevelTestWithParms, testing::Bool()); + +// partition the SST into 3 ranges [0, 19] [20, 39] [40, ...] +class ThreeRangesPartitioner : public SstPartitioner { + public: + const char* Name() const override { return "SingleKeySstPartitioner"; } + + PartitionerResult ShouldPartition( + const PartitionerRequest& request) override { + if ((cmp->CompareWithoutTimestamp(*request.current_user_key, + DBTestBase::Key(20)) >= 0 && + cmp->CompareWithoutTimestamp(*request.prev_user_key, + DBTestBase::Key(20)) < 0) || + (cmp->CompareWithoutTimestamp(*request.current_user_key, + DBTestBase::Key(40)) >= 0 && + cmp->CompareWithoutTimestamp(*request.prev_user_key, + DBTestBase::Key(40)) < 0)) { + return kRequired; + } else { + return kNotRequired; + } + } + + bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, + const Slice& /*largest_user_key*/) override { + return false; + } + + const Comparator* cmp = BytewiseComparator(); +}; + +class ThreeRangesPartitionerFactory : public SstPartitionerFactory { + public: + static const char* kClassName() { + return "TombstoneTestSstPartitionerFactory"; + } + const char* Name() const override { return kClassName(); } + + std::unique_ptr<SstPartitioner> CreatePartitioner( + const SstPartitioner::Context& /* context */) const override { + return std::unique_ptr<SstPartitioner>(new ThreeRangesPartitioner()); + } +}; + +TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.preserve_internal_time_seconds = 10000; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + + Random rnd(301); + + for (int i = 0; i < 300; i++) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); }); + } + ASSERT_OK(Flush()); + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // make sure all data is compacted to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + // Create 3 L5 files + auto factory = std::make_shared<ThreeRangesPartitionerFactory>(); + options.sst_partitioner_factory = factory; + + Reopen(options); + + for (int i = 0; i < kNumTrigger - 1; i++) { + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(i * 100 + j), rnd.RandomString(10))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // L5: [0,19] [20,39] [40,299] + // L6: [0, 299] + ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel()); + + // enable tiered storage feature + options.preclude_last_level_data_seconds = 10000; + options.last_level_temperature = Temperature::kCold; + options.statistics = CreateDBStatistics(); + Reopen(options); + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + ASSERT_EQ(meta.levels[5].files.size(), 3); + ASSERT_EQ(meta.levels[6].files.size(), 1); + ASSERT_EQ(meta.levels[6].files[0].smallestkey, Key(0)); + ASSERT_EQ(meta.levels[6].files[0].largestkey, Key(299)); + + std::string file_path = meta.levels[5].files[1].db_path; + std::vector<std::string> files; + // pick 3rd file @L5 + file@L6 for compaction + files.push_back(file_path + "/" + meta.levels[5].files[2].name); + files.push_back(file_path + "/" + meta.levels[6].files[0].name); + ASSERT_OK(db_->CompactFiles(CompactionOptions(), files, 6)); + + // The compaction only moved partial of the hot data to hot tier, range[0,39] + // is unsafe to move up, otherwise, they will be overlapped with the existing + // files@L5. + // The output should be: + // L5: [0,19] [20,39] [40,299] <-- Temperature::kUnknown + // L6: [0,19] [20,39] <-- Temperature::kCold + // L6 file is split because of the customized partitioner + ASSERT_EQ("0,0,0,0,0,3,2", FilesPerLevel()); + + // even all the data is hot, but not all data are moved to the hot tier + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + db_->GetColumnFamilyMetaData(&meta); + ASSERT_EQ(meta.levels[5].files.size(), 3); + ASSERT_EQ(meta.levels[6].files.size(), 2); + for (const auto& file : meta.levels[5].files) { + ASSERT_EQ(file.temperature, Temperature::kUnknown); + } + for (const auto& file : meta.levels[6].files) { + ASSERT_EQ(file.temperature, Temperature::kCold); + } + ASSERT_EQ(meta.levels[6].files[0].smallestkey, Key(0)); + ASSERT_EQ(meta.levels[6].files[0].largestkey, Key(19)); + ASSERT_EQ(meta.levels[6].files[1].smallestkey, Key(20)); + ASSERT_EQ(meta.levels[6].files[1].largestkey, Key(39)); + + Close(); +} + +struct TestPropertiesCollector : public TablePropertiesCollector { + Status AddUserKey(const Slice& key, const Slice& /*value*/, + EntryType /*type*/, SequenceNumber /*seq*/, + uint64_t /*file_size*/) override { + if (cmp->Compare(key, DBTestBase::Key(100)) == 0) { + has_key_100 = true; + } + if (cmp->Compare(key, DBTestBase::Key(200)) == 0) { + has_key_200 = true; + } + + return Status::OK(); + } + + const char* Name() const override { return "TestTablePropertiesCollector"; } + + UserCollectedProperties GetReadableProperties() const override { + UserCollectedProperties ret; + return ret; + } + + Status Finish(UserCollectedProperties* /*properties*/) override { + // The LSM tree would be like: + // L5: [0,19] [20,39] [40,299] + // L6: [0, 299] + // the 3rd file @L5 has both 100 and 200, which will be marked for + // compaction + // Also avoid marking flushed SST for compaction, which won't have both 100 + // and 200 + if (has_key_100 && has_key_200) { + need_compact_ = true; + } else { + need_compact_ = false; + } + has_key_100 = false; + has_key_200 = false; + return Status::OK(); + } + + bool NeedCompact() const override { return need_compact_; } + + const Comparator* cmp = BytewiseComparator(); + + private: + bool has_key_100 = false; + bool has_key_200 = false; + + bool need_compact_ = false; +}; + +class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory { + public: + TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context /*context*/) override { + return new TestPropertiesCollector; + } + const char* Name() const override { return "TestTablePropertiesCollector"; } +}; + +TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompactionWithRangeDel) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.preserve_internal_time_seconds = 10000; + options.num_levels = kNumLevels; + // set a small max_compaction_bytes to avoid input level expansion + options.max_compaction_bytes = 30000; + options.ignore_max_compaction_bytes_for_input = false; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + + Random rnd(301); + + for (int i = 0; i < 300; i++) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); }); + } + ASSERT_OK(Flush()); + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // make sure all data is compacted to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + // Create 3 L5 files + auto factory = std::make_shared<ThreeRangesPartitionerFactory>(); + options.sst_partitioner_factory = factory; + + // the user defined properties_collector will mark the 3rd file for compaction + auto collector_factory = std::make_shared<TestPropertiesCollectorFactory>(); + options.table_properties_collector_factories.resize(1); + options.table_properties_collector_factories[0] = collector_factory; + // enable tiered storage feature + options.preclude_last_level_data_seconds = 10000; + options.last_level_temperature = Temperature::kCold; + Reopen(options); + + for (int i = 0; i < kNumTrigger - 2; i++) { + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(i * 100 + j), rnd.RandomString(10))); + } + ASSERT_OK(Flush()); + } + + // make sure there is one and only one compaction supports per-key placement + // but has the penultimate level output disabled. + std::atomic_int per_key_comp_num = 0; + SyncPoint::GetInstance()->SetCallBack( + "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) { + auto compaction = static_cast<Compaction*>(arg); + if (compaction->SupportsPerKeyPlacement()) { + ASSERT_EQ(compaction->GetPenultimateOutputRangeType(), + Compaction::PenultimateOutputRangeType::kDisabled); + per_key_comp_num++; + } + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(200 + j), rnd.RandomString(10))); + } + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + Key(32), Key(40))); + ASSERT_OK(Flush()); + + // Before the per-key placement compaction, the LSM tress should be like: + // L5: [0,19] [20,40] [40,299] + // L6: [0, 299] + // The 2nd file @L5 has the largest key 40 because of range del + + ASSERT_OK(dbfull()->WaitForCompact(true)); + + ASSERT_EQ(per_key_comp_num, 1); + + // the compaction won't move any data to the penultimate level + ASSERT_EQ("0,0,0,0,0,2,3", FilesPerLevel()); + + Close(); +} + +#endif // !defined(ROCKSDB_LITE) + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { +#if !defined(ROCKSDB_LITE) + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +#else + (void)argc; + (void)argv; + return 0; +#endif +} |