diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/seqno_time_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/seqno_time_test.cc')
-rw-r--r-- | src/rocksdb/db/seqno_time_test.cc | 996 |
1 files changed, 996 insertions, 0 deletions
diff --git a/src/rocksdb/db/seqno_time_test.cc b/src/rocksdb/db/seqno_time_test.cc new file mode 100644 index 000000000..12394a368 --- /dev/null +++ b/src/rocksdb/db/seqno_time_test.cc @@ -0,0 +1,996 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "db/periodic_task_scheduler.h" +#include "db/seqno_to_time_mapping.h" +#include "port/stack_trace.h" +#include "rocksdb/iostats_context.h" +#include "rocksdb/utilities/debug.h" +#include "test_util/mock_time_env.h" + +#ifndef ROCKSDB_LITE + +namespace ROCKSDB_NAMESPACE { + +class SeqnoTimeTest : public DBTestBase { + public: + SeqnoTimeTest() : DBTestBase("seqno_time_test", /*env_do_fsync=*/false) { + mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock()); + mock_env_ = std::make_unique<CompositeEnvWrapper>(env_, mock_clock_); + } + + protected: + std::unique_ptr<Env> mock_env_; + std::shared_ptr<MockSystemClock> mock_clock_; + + void SetUp() override { + mock_clock_->InstallTimedWaitFixCallback(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast<PeriodicTaskScheduler*>(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); + }); + } + + // make sure the file is not in cache, otherwise it won't have IO info + void AssertKeyTemperature(int key_id, Temperature expected_temperature) { + get_iostats_context()->Reset(); + IOStatsContext* iostats = get_iostats_context(); + std::string result = Get(Key(key_id)); + ASSERT_FALSE(result.empty()); + ASSERT_GT(iostats->bytes_read, 0); + switch (expected_temperature) { + case Temperature::kUnknown: + ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_read_count, + 0); + ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, + 0); + break; + case Temperature::kCold: + ASSERT_GT(iostats->file_io_stats_by_temperature.cold_file_read_count, + 0); + ASSERT_GT(iostats->file_io_stats_by_temperature.cold_file_bytes_read, + 0); + break; + default: + // the test only support kCold now for the bottommost temperature + FAIL(); + } + } +}; + +TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preclude_last_level_data_seconds = 10000; + options.env = mock_env_.get(); + options.bottommost_temperature = Temperature::kCold; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // All data is hot, only output to penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // read a random key, which should be hot (kUnknown) + AssertKeyTemperature(20, Temperature::kUnknown); + + // Write more data, but still all hot until the 10th SST, as: + // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds + // The preclude_last_level_data_seconds is 10k + for (; sst_num < kNumTrigger * 2; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->WaitForCompact(true)); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + } + + // Now we have both hot data and cold data + for (; sst_num < kNumTrigger * 3; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->WaitForCompact(true)); + } + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + uint64_t hot_data_size = GetSstSizeHelper(Temperature::kUnknown); + uint64_t cold_data_size = GetSstSizeHelper(Temperature::kCold); + ASSERT_GT(hot_data_size, 0); + ASSERT_GT(cold_data_size, 0); + // the first a few key should be cold + AssertKeyTemperature(20, Temperature::kCold); + + for (int i = 0; i < 30; i++) { + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast<int>(20 * kKeyPerSec)); + }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // the hot/cold data cut off range should be between i * 20 + 200 -> 250 + AssertKeyTemperature(i * 20 + 250, Temperature::kUnknown); + AssertKeyTemperature(i * 20 + 200, Temperature::kCold); + } + + ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), cold_data_size); + + // Wait again, the most of the data should be cold after that + // but it may not be all cold, because if there's no new data write to SST, + // the compaction will not get the new seqno->time sampling to decide the last + // a few data's time. + for (int i = 0; i < 5; i++) { + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } + + // any random data close to the end should be cold + AssertKeyTemperature(1000, Temperature::kCold); + + // close explicitly, because the env is local variable which will be released + // first. + Close(); +} + +TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { + const int kNumLevels = 7; + const int kNumKeys = 100; + + Options options = CurrentOptions(); + options.preclude_last_level_data_seconds = 10000; + options.env = mock_env_.get(); + options.bottommost_temperature = Temperature::kCold; + options.num_levels = kNumLevels; + options.level_compaction_dynamic_level_bytes = true; + // TODO(zjay): for level compaction, auto-compaction may stuck in deadloop, if + // the penultimate level score > 1, but the hot is not cold enough to compact + // to last level, which will keep triggering compaction. + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + + int sst_num = 0; + // Write files that are overlap + for (; sst_num < 4; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + } + ASSERT_OK(Flush()); + } + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // All data is hot, only output to penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // read a random key, which should be hot (kUnknown) + AssertKeyTemperature(20, Temperature::kUnknown); + + // Adding more data to have mixed hot and cold data + for (; sst_num < 14; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // Compact the files to the last level which should split the hot/cold data + MoveFilesToLevel(6); + uint64_t hot_data_size = GetSstSizeHelper(Temperature::kUnknown); + uint64_t cold_data_size = GetSstSizeHelper(Temperature::kCold); + ASSERT_GT(hot_data_size, 0); + ASSERT_GT(cold_data_size, 0); + // the first a few key should be cold + AssertKeyTemperature(20, Temperature::kCold); + + // Wait some time, with each wait, the cold data is increasing and hot data is + // decreasing + for (int i = 0; i < 30; i++) { + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(200)); }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + uint64_t pre_hot = hot_data_size; + uint64_t pre_cold = cold_data_size; + hot_data_size = GetSstSizeHelper(Temperature::kUnknown); + cold_data_size = GetSstSizeHelper(Temperature::kCold); + ASSERT_LT(hot_data_size, pre_hot); + ASSERT_GT(cold_data_size, pre_cold); + + // the hot/cold cut_off key should be around i * 20 + 400 -> 450 + AssertKeyTemperature(i * 20 + 450, Temperature::kUnknown); + AssertKeyTemperature(i * 20 + 400, Temperature::kCold); + } + + // Wait again, the most of the data should be cold after that + // hot data might not be empty, because if we don't write new data, there's + // no seqno->time sampling available to the compaction + for (int i = 0; i < 5; i++) { + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } + + // any random data close to the end should be cold + AssertKeyTemperature(1000, Temperature::kCold); + + Close(); +} + +enum class SeqnoTimeTestType : char { + kTrackInternalTimeSeconds = 0, + kPrecludeLastLevel = 1, + kBothSetTrackSmaller = 2, +}; + +class SeqnoTimeTablePropTest + : public SeqnoTimeTest, + public ::testing::WithParamInterface<SeqnoTimeTestType> { + public: + SeqnoTimeTablePropTest() : SeqnoTimeTest() {} + + void SetTrackTimeDurationOptions(uint64_t track_time_duration, + Options& options) const { + // either option set will enable the time tracking feature + switch (GetParam()) { + case SeqnoTimeTestType::kTrackInternalTimeSeconds: + options.preclude_last_level_data_seconds = 0; + options.preserve_internal_time_seconds = track_time_duration; + break; + case SeqnoTimeTestType::kPrecludeLastLevel: + options.preclude_last_level_data_seconds = track_time_duration; + options.preserve_internal_time_seconds = 0; + break; + case SeqnoTimeTestType::kBothSetTrackSmaller: + options.preclude_last_level_data_seconds = track_time_duration; + options.preserve_internal_time_seconds = track_time_duration / 10; + break; + } + } +}; + +INSTANTIATE_TEST_CASE_P( + SeqnoTimeTablePropTest, SeqnoTimeTablePropTest, + ::testing::Values(SeqnoTimeTestType::kTrackInternalTimeSeconds, + SeqnoTimeTestType::kPrecludeLastLevel, + SeqnoTimeTestType::kBothSetTrackSmaller)); + +TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { + Options options = CurrentOptions(); + SetTrackTimeDurationOptions(10000, options); + + options.env = mock_env_.get(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + std::set<uint64_t> checked_file_nums; + SequenceNumber start_seq = dbfull()->GetLatestSequenceNumber(); + // Write a key every 10 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + } + ASSERT_OK(Flush()); + TablePropertiesCollection tables_props; + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 1); + auto it = tables_props.begin(); + SeqnoToTimeMapping tp_mapping; + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + ASSERT_FALSE(tp_mapping.Empty()); + auto seqs = tp_mapping.TEST_GetInternalMapping(); + // about ~20 seqs->time entries, because the sample rate is 10000/100, and it + // passes 2k time. + ASSERT_GE(seqs.size(), 19); + ASSERT_LE(seqs.size(), 21); + SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber(); + for (auto i = start_seq; i < start_seq + 10; i++) { + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i + 1) * 10); + } + start_seq += 10; + for (auto i = start_seq; i < seq_end; i++) { + // The result is within the range + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), (i - 10) * 10); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i + 10) * 10); + } + checked_file_nums.insert(it->second->orig_file_number); + start_seq = seq_end; + + // Write a key every 1 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i + 190), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1)); }); + } + seq_end = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Flush()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 2); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + // There only a few time sample + ASSERT_GE(seqs.size(), 1); + ASSERT_LE(seqs.size(), 3); + for (auto i = start_seq; i < seq_end; i++) { + // The result is not very accurate, as there is more data write within small + // range of time + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 1000); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); + } + checked_file_nums.insert(it->second->orig_file_number); + start_seq = seq_end; + + // Write a key every 200 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i + 380), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(200)); }); + } + seq_end = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Flush()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 3); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + // The sequence number -> time entries should be maxed + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + for (auto i = start_seq; i < seq_end - 99; i++) { + // likely the first 100 entries reports 0 + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); + } + start_seq += 101; + + for (auto i = start_seq; i < seq_end; i++) { + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 200 + 22200); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 200 + 22600); + } + checked_file_nums.insert(it->second->orig_file_number); + start_seq = seq_end; + + // Write a key every 100 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i + 570), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + seq_end = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Flush()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 4); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + + checked_file_nums.insert(it->second->orig_file_number); + + // re-enable compaction + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"}, + })); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_GE(tables_props.size(), 1); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + for (auto i = start_seq; i < seq_end - 99; i++) { + // likely the first 100 entries reports 0 + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 100 + 50000); + } + start_seq += 101; + + for (auto i = start_seq; i < seq_end; i++) { + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 100 + 52200); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 100 + 52400); + } + ASSERT_OK(db_->Close()); +} + +TEST_P(SeqnoTimeTablePropTest, MultiCFs) { + Options options = CurrentOptions(); + options.preclude_last_level_data_seconds = 0; + options.preserve_internal_time_seconds = 0; + options.env = mock_env_.get(); + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; + ReopenWithColumnFamilies({"default"}, options); + + const PeriodicTaskScheduler& scheduler = + dbfull()->TEST_GetPeriodicTaskScheduler(); + ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); + + // Write some data and increase the current time + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + ASSERT_OK(Flush()); + TablePropertiesCollection tables_props; + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 1); + auto it = tables_props.begin(); + ASSERT_TRUE(it->second->seqno_to_time_mapping.empty()); + + ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty()); + + Options options_1 = options; + SetTrackTimeDurationOptions(10000, options_1); + CreateColumnFamilies({"one"}, options_1); + ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); + + // Write some data to the default CF (without preclude_last_level feature) + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + ASSERT_OK(Flush()); + + // Write some data to the CF one + for (int i = 0; i < 20; i++) { + ASSERT_OK(Put(1, Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + } + ASSERT_OK(Flush(1)); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[1], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + SeqnoToTimeMapping tp_mapping; + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + ASSERT_FALSE(tp_mapping.Empty()); + auto seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 1); + ASSERT_LE(seqs.size(), 4); + + // Create one more CF with larger preclude_last_level time + Options options_2 = options; + SetTrackTimeDurationOptions(1000000, options_2); // 1m + CreateColumnFamilies({"two"}, options_2); + + // Add more data to CF "two" to fill the in memory mapping + for (int i = 0; i < 2000; i++) { + ASSERT_OK(Put(2, Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 1000 - 1); + ASSERT_LE(seqs.size(), 1000 + 1); + + ASSERT_OK(Flush(2)); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[2], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + // the max encoded entries is 100 + ASSERT_GE(seqs.size(), 100 - 1); + ASSERT_LE(seqs.size(), 100 + 1); + + // Write some data to default CF, as all memtable with preclude_last_level + // enabled have flushed, the in-memory seqno->time mapping should be cleared + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(0, Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); + ASSERT_OK(Flush(0)); + + // trigger compaction for CF "two" and make sure the compaction output has + // seqno_to_time_mapping + for (int j = 0; j < 3; j++) { + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(2, Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + ASSERT_OK(Flush(2)); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[2], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + + for (int j = 0; j < 2; j++) { + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(0, Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + ASSERT_OK(Flush(0)); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[0], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + ASSERT_TRUE(it->second->seqno_to_time_mapping.empty()); + + // Write some data to CF "two", but don't flush to accumulate + for (int i = 0; i < 1000; i++) { + ASSERT_OK(Put(2, Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + ASSERT_GE( + dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(), + 500); + // After dropping CF "one", the in-memory mapping will be change to only + // follow CF "two" options. + ASSERT_OK(db_->DropColumnFamily(handles_[1])); + ASSERT_LE( + dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(), + 100 + 5); + + // After dropping CF "two", the in-memory mapping is also clear. + ASSERT_OK(db_->DropColumnFamily(handles_[2])); + ASSERT_EQ( + dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(), + 0); + + // And the timer worker is stopped + ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); + Close(); +} + +TEST_P(SeqnoTimeTablePropTest, MultiInstancesBasic) { + const int kInstanceNum = 2; + + Options options = CurrentOptions(); + SetTrackTimeDurationOptions(10000, options); + options.env = mock_env_.get(); + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; + + auto dbs = std::vector<DB*>(kInstanceNum); + for (int i = 0; i < kInstanceNum; i++) { + ASSERT_OK( + DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i]))); + } + + // Make sure the second instance has the worker enabled + auto dbi = static_cast_with_check<DBImpl>(dbs[1]); + WriteOptions wo; + for (int i = 0; i < 200; i++) { + ASSERT_OK(dbi->Put(wo, Key(i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); }); + } + SeqnoToTimeMapping seqno_to_time_mapping = dbi->TEST_GetSeqnoToTimeMapping(); + ASSERT_GT(seqno_to_time_mapping.Size(), 10); + + for (int i = 0; i < kInstanceNum; i++) { + ASSERT_OK(dbs[i]->Close()); + delete dbs[i]; + } +} + +TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + + Options options = CurrentOptions(); + SetTrackTimeDurationOptions(10000, options); + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = kNumLevels; + options.env = mock_env_.get(); + + DestroyAndReopen(options); + + std::atomic_uint64_t num_seqno_zeroing{0}; + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput:ZeroingSeq", + [&](void* /*arg*/) { num_seqno_zeroing++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + int sst_num = 0; + for (; sst_num < kNumTrigger - 1; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + } + ASSERT_OK(Flush()); + } + TablePropertiesCollection tables_props; + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 3); + for (const auto& props : tables_props) { + ASSERT_FALSE(props.second->seqno_to_time_mapping.empty()); + SeqnoToTimeMapping tp_mapping; + ASSERT_OK(tp_mapping.Add(props.second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + ASSERT_FALSE(tp_mapping.Empty()); + auto seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 10 - 1); + ASSERT_LE(seqs.size(), 10 + 1); + } + + // Trigger a compaction + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); + } + sst_num++; + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 1); + + auto it = tables_props.begin(); + SeqnoToTimeMapping tp_mapping; + ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + + // compact to the last level + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + // make sure the data is all compacted to penultimate level if the feature is + // on, otherwise, compacted to the last level. + if (options.preclude_last_level_data_seconds > 0) { + ASSERT_GT(NumTableFilesAtLevel(5), 0); + ASSERT_EQ(NumTableFilesAtLevel(6), 0); + } else { + ASSERT_EQ(NumTableFilesAtLevel(5), 0); + ASSERT_GT(NumTableFilesAtLevel(6), 0); + } + + // regardless the file is on the last level or not, it should keep the time + // information and sequence number are not set + tables_props.clear(); + tp_mapping.Clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + + ASSERT_EQ(tables_props.size(), 1); + ASSERT_EQ(num_seqno_zeroing, 0); + + it = tables_props.begin(); + ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + + // make half of the data expired + mock_clock_->MockSleepForSeconds(static_cast<int>(8000)); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + tables_props.clear(); + tp_mapping.Clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + + if (options.preclude_last_level_data_seconds > 0) { + ASSERT_EQ(tables_props.size(), 2); + } else { + ASSERT_EQ(tables_props.size(), 1); + } + ASSERT_GT(num_seqno_zeroing, 0); + std::vector<KeyVersion> key_versions; + ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(), + std::numeric_limits<size_t>::max(), + &key_versions)); + // make sure there're more than 300 keys and first 100 keys are having seqno + // zeroed out, the last 100 key seqno not zeroed out + ASSERT_GT(key_versions.size(), 300); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(key_versions[i].sequence, 0); + } + auto rit = key_versions.rbegin(); + for (int i = 0; i < 100; i++) { + ASSERT_GT(rit->sequence, 0); + rit++; + } + + // make all data expired and compact again to push it to the last level + // regardless if the tiering feature is enabled or not + mock_clock_->MockSleepForSeconds(static_cast<int>(20000)); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_GT(num_seqno_zeroing, 0); + ASSERT_GT(NumTableFilesAtLevel(6), 0); + + Close(); +} + +TEST_F(SeqnoTimeTest, MappingAppend) { + SeqnoToTimeMapping test(/*max_time_duration=*/100, /*max_capacity=*/10); + + // ignore seqno == 0, as it may mean the seqno is zeroed out + ASSERT_FALSE(test.Append(0, 9)); + + ASSERT_TRUE(test.Append(3, 10)); + auto size = test.Size(); + // normal add + ASSERT_TRUE(test.Append(10, 11)); + size++; + ASSERT_EQ(size, test.Size()); + + // Append unsorted + ASSERT_FALSE(test.Append(8, 12)); + ASSERT_EQ(size, test.Size()); + + // Append with the same seqno, newer time will be accepted + ASSERT_TRUE(test.Append(10, 12)); + ASSERT_EQ(size, test.Size()); + // older time will be ignored + ASSERT_FALSE(test.Append(10, 9)); + ASSERT_EQ(size, test.Size()); + + // new seqno with old time will be ignored + ASSERT_FALSE(test.Append(12, 8)); + ASSERT_EQ(size, test.Size()); +} + +TEST_F(SeqnoTimeTest, GetOldestApproximateTime) { + SeqnoToTimeMapping test(/*max_time_duration=*/100, /*max_capacity=*/10); + + ASSERT_EQ(test.GetOldestApproximateTime(10), kUnknownSeqnoTime); + + test.Append(3, 10); + + ASSERT_EQ(test.GetOldestApproximateTime(2), kUnknownSeqnoTime); + ASSERT_EQ(test.GetOldestApproximateTime(3), 10); + ASSERT_EQ(test.GetOldestApproximateTime(10), 10); + + test.Append(10, 100); + + test.Append(100, 1000); + ASSERT_EQ(test.GetOldestApproximateTime(10), 100); + ASSERT_EQ(test.GetOldestApproximateTime(40), 100); + ASSERT_EQ(test.GetOldestApproximateTime(111), 1000); +} + +TEST_F(SeqnoTimeTest, Sort) { + SeqnoToTimeMapping test; + + // single entry + test.Add(10, 11); + ASSERT_OK(test.Sort()); + ASSERT_EQ(test.Size(), 1); + + // duplicate, should be removed by sort + test.Add(10, 11); + // same seqno, but older time, should be removed + test.Add(10, 9); + + // unuseful ones, should be removed by sort + test.Add(11, 9); + test.Add(9, 8); + + // Good ones + test.Add(1, 10); + test.Add(100, 100); + + ASSERT_OK(test.Sort()); + + auto seqs = test.TEST_GetInternalMapping(); + + std::deque<SeqnoToTimeMapping::SeqnoTimePair> expected; + expected.emplace_back(1, 10); + expected.emplace_back(10, 11); + expected.emplace_back(100, 100); + + ASSERT_EQ(expected, seqs); +} + +TEST_F(SeqnoTimeTest, EncodeDecodeBasic) { + SeqnoToTimeMapping test(0, 1000); + + std::string output; + test.Encode(output, 0, 1000, 100); + ASSERT_TRUE(output.empty()); + + for (int i = 1; i <= 1000; i++) { + ASSERT_TRUE(test.Append(i, i * 10)); + } + test.Encode(output, 0, 1000, 100); + + ASSERT_FALSE(output.empty()); + + SeqnoToTimeMapping decoded; + ASSERT_OK(decoded.Add(output)); + ASSERT_OK(decoded.Sort()); + ASSERT_EQ(decoded.Size(), SeqnoToTimeMapping::kMaxSeqnoTimePairsPerSST); + ASSERT_EQ(test.Size(), 1000); + + for (SequenceNumber seq = 0; seq <= 1000; seq++) { + // test has the more accurate time mapping, encode only pick + // kMaxSeqnoTimePairsPerSST number of entries, which is less accurate + uint64_t target_time = test.GetOldestApproximateTime(seq); + ASSERT_GE(decoded.GetOldestApproximateTime(seq), + target_time < 200 ? 0 : target_time - 200); + ASSERT_LE(decoded.GetOldestApproximateTime(seq), target_time); + } +} + +TEST_F(SeqnoTimeTest, EncodeDecodePerferNewTime) { + SeqnoToTimeMapping test(0, 10); + + test.Append(1, 10); + test.Append(5, 17); + test.Append(6, 25); + test.Append(8, 30); + + std::string output; + test.Encode(output, 1, 10, 0, 3); + + SeqnoToTimeMapping decoded; + ASSERT_OK(decoded.Add(output)); + ASSERT_OK(decoded.Sort()); + + ASSERT_EQ(decoded.Size(), 3); + + auto seqs = decoded.TEST_GetInternalMapping(); + std::deque<SeqnoToTimeMapping::SeqnoTimePair> expected; + expected.emplace_back(1, 10); + expected.emplace_back(6, 25); + expected.emplace_back(8, 30); + ASSERT_EQ(expected, seqs); + + // Add a few large time number + test.Append(10, 100); + test.Append(13, 200); + test.Append(16, 300); + + output.clear(); + test.Encode(output, 1, 20, 0, 4); + decoded.Clear(); + ASSERT_OK(decoded.Add(output)); + ASSERT_OK(decoded.Sort()); + ASSERT_EQ(decoded.Size(), 4); + + expected.clear(); + expected.emplace_back(1, 10); + // entry #6, #8 are skipped as they are too close to #1. + // entry #100 is also within skip range, but if it's skipped, there not enough + // number to fill 4 entries, so select it. + expected.emplace_back(10, 100); + expected.emplace_back(13, 200); + expected.emplace_back(16, 300); + seqs = decoded.TEST_GetInternalMapping(); + ASSERT_EQ(expected, seqs); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |