diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/db_with_timestamp_compaction_test.cc | 334 |
1 files changed, 334 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_with_timestamp_compaction_test.cc b/src/rocksdb/db/db_with_timestamp_compaction_test.cc new file mode 100644 index 000000000..d28f67e05 --- /dev/null +++ b/src/rocksdb/db/db_with_timestamp_compaction_test.cc @@ -0,0 +1,334 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/compaction/compaction.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +std::string Key1(uint64_t key) { + std::string ret; + PutFixed64(&ret, key); + std::reverse(ret.begin(), ret.end()); + return ret; +} + +std::string Timestamp(uint64_t ts) { + std::string ret; + PutFixed64(&ret, ts); + return ret; +} +} // anonymous namespace + +class TimestampCompatibleCompactionTest : public DBTestBase { + public: + TimestampCompatibleCompactionTest() + : DBTestBase("ts_compatible_compaction_test", /*env_do_fsync=*/true) {} + + std::string Get(const std::string& key, uint64_t ts) { + ReadOptions read_opts; + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + read_opts.timestamp = &ts_slice; + std::string value; + Status s = db_->Get(read_opts, key, &value); + if (s.IsNotFound()) { + value.assign("NOT_FOUND"); + } else if (!s.ok()) { + value.assign(s.ToString()); + } + return value; + } +}; + +TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) { + Options options = CurrentOptions(); + options.env = env_; + options.compaction_style = kCompactionStyleLevel; + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + options.level0_file_num_compaction_trigger = 3; + constexpr size_t kNumKeysPerFile = 101; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + const auto* compaction = reinterpret_cast<Compaction*>(arg); + ASSERT_NE(nullptr, compaction); + ASSERT_EQ(0, compaction->start_level()); + ASSERT_EQ(1, compaction->num_input_levels()); + // Check that all 3 L0 ssts are picked for level compaction. + ASSERT_EQ(3, compaction->num_input_files(0)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + // Write a L0 with keys 0, 1, ..., 99 with ts from 100 to 199. + uint64_t ts = 100; + uint64_t key = 0; + WriteOptions write_opts; + for (; key < kNumKeysPerFile - 1; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + ASSERT_OK( + db_->Put(write_opts, Key1(key), ts_str, "foo_" + std::to_string(key))); + } + // Write another L0 with keys 99 with newer ts. + ASSERT_OK(Flush()); + uint64_t saved_read_ts1 = ts++; + key = 99; + for (int i = 0; i < 4; ++i, ++ts) { + std::string ts_str = Timestamp(ts); + ASSERT_OK( + db_->Put(write_opts, Key1(key), ts_str, "bar_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + uint64_t saved_read_ts2 = ts++; + // Write another L0 with keys 99, 100, 101, ..., 150 + for (; key <= 150; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + ASSERT_OK( + db_->Put(write_opts, Key1(key), ts_str, "foo1_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + // Wait for compaction to finish + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + uint64_t read_ts = ts; + ASSERT_EQ("foo_99", Get(Key1(99), saved_read_ts1)); + ASSERT_EQ("bar_99", Get(Key1(99), saved_read_ts2)); + ASSERT_EQ("foo1_99", Get(Key1(99), read_ts)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(TimestampCompatibleCompactionTest, MultipleSubCompactions) { + Options options = CurrentOptions(); + options.env = env_; + options.compaction_style = kCompactionStyleUniversal; + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + options.level0_file_num_compaction_trigger = 3; + options.max_subcompactions = 3; + options.target_file_size_base = 1024; + options.statistics = CreateDBStatistics(); + DestroyAndReopen(options); + + uint64_t ts = 100; + uint64_t key = 0; + WriteOptions write_opts; + + // Write keys 0, 1, ..., 499 with ts from 100 to 599. + { + for (; key <= 499; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str, + "foo_" + std::to_string(key))); + } + } + + // Write keys 500, ..., 999 with ts from 600 to 1099. + { + for (; key <= 999; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str, + "foo_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + } + + // Wait for compaction to finish + { + ASSERT_OK(dbfull()->RunManualCompaction( + static_cast_with_check<ColumnFamilyHandleImpl>( + db_->DefaultColumnFamily()) + ->cfd(), + 0 /* input_level */, 1 /* output_level */, CompactRangeOptions(), + nullptr /* begin */, nullptr /* end */, true /* exclusive */, + true /* disallow_trivial_move */, + std::numeric_limits<uint64_t>::max() /* max_file_num_to_ignore */, + "" /*trim_ts*/)); + } + + // Check stats to make sure multiple subcompactions were scheduled for + // boundaries not to be nullptr. + { + HistogramData num_sub_compactions; + options.statistics->histogramData(NUM_SUBCOMPACTIONS_SCHEDULED, + &num_sub_compactions); + ASSERT_GT(num_sub_compactions.sum, 1); + } + + for (key = 0; key <= 999; ++key) { + ASSERT_EQ("foo_" + std::to_string(key), Get(Key1(key), ts)); + } +} + +class TestFilePartitioner : public SstPartitioner { + public: + explicit TestFilePartitioner() {} + ~TestFilePartitioner() override {} + + const char* Name() const override { return "TestFilePartitioner"; } + PartitionerResult ShouldPartition( + const PartitionerRequest& /*request*/) override { + return PartitionerResult::kRequired; + } + bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, + const Slice& /*largest_user_key*/) override { + return false; + } +}; + +class TestFilePartitionerFactory : public SstPartitionerFactory { + public: + explicit TestFilePartitionerFactory() {} + std::unique_ptr<SstPartitioner> CreatePartitioner( + const SstPartitioner::Context& /*context*/) const override { + std::unique_ptr<SstPartitioner> ret = + std::make_unique<TestFilePartitioner>(); + return ret; + } + const char* Name() const override { return "TestFilePartitionerFactory"; } +}; + +#ifndef ROCKSDB_LITE +TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL0) { + Options options = CurrentOptions(); + options.env = env_; + options.sst_partitioner_factory = + std::make_shared<TestFilePartitionerFactory>(); + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + constexpr int kNumFiles = 10; + constexpr int kKeysPerFile = 2; + const std::string user_key = "foo"; + constexpr uint64_t start_ts = 10000; + + uint64_t cur_ts = start_ts; + for (int k = 0; k < kNumFiles; ++k) { + for (int i = 0; i < kKeysPerFile; ++i) { + ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts), + "v" + std::to_string(i))); + ++cur_ts; + } + ASSERT_OK(db_->Flush(FlushOptions())); + } + + std::vector<std::string> input_files{}; + { + std::vector<std::string> files; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& f : files) { + uint64_t file_num = 0; + FileType file_type = FileType::kWalFile; + if (!ParseFileName(f, &file_num, &file_type) || + file_type != FileType::kTableFile) { + continue; + } + input_files.emplace_back(f); + } + // sorting here by name, which also happens to sort by generation date. + std::sort(input_files.begin(), input_files.end()); + assert(kNumFiles == input_files.size()); + std::vector<std::string> tmp; + tmp.emplace_back(input_files[input_files.size() / 2]); + input_files.swap(tmp); + } + + { + std::vector<std::string> output_file_names; + CompactionJobInfo compaction_job_info; + ASSERT_OK(db_->CompactFiles(CompactionOptions(), input_files, + /*output_level=*/1, /*output_path_id=*/-1, + &output_file_names, &compaction_job_info)); + // We expect the L0 files older than the original provided input were all + // included in the compaction. + ASSERT_EQ(static_cast<size_t>(kNumFiles / 2 + 1), + compaction_job_info.input_files.size()); + } +} + +TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL1) { + Options options = CurrentOptions(); + options.env = env_; + options.sst_partitioner_factory = + std::make_shared<TestFilePartitionerFactory>(); + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + + constexpr int kNumFiles = 4; + options.level0_file_num_compaction_trigger = kNumFiles; + + DestroyAndReopen(options); + + constexpr int kKeysPerFile = 2; + const std::string user_key = "foo"; + constexpr uint64_t start_ts = 10000; + + uint64_t cur_ts = start_ts; + // Generate some initial files in both L0 and L1. + for (int k = 0; k < kNumFiles; ++k) { + for (int i = 0; i < kKeysPerFile; ++i) { + ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts), + "v" + std::to_string(i))); + ++cur_ts; + } + ASSERT_OK(db_->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0)); + ASSERT_EQ(kNumFiles * kKeysPerFile, + NumTableFilesAtLevel(/*level=*/1, /*cf=*/0)); + + constexpr int additional_l0s = 2; + for (int i = 0; i < additional_l0s; ++i, ++cur_ts) { + ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts), "v")); + ASSERT_OK(db_->Flush(FlushOptions())); + } + ASSERT_EQ(additional_l0s, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0)); + + std::vector<std::string> inputs; + { + std::vector<LiveFileMetaData> fmetas; + db_->GetLiveFilesMetaData(&fmetas); + bool included_one_l1 = false; + for (const auto& meta : fmetas) { + if (meta.level == 0) { + inputs.emplace_back(meta.relative_filename); + } else if (!included_one_l1) { + inputs.emplace_back(meta.relative_filename); + included_one_l1 = true; + } + } + } + ASSERT_EQ(static_cast<size_t>(3), inputs.size()); + { + std::vector<std::string> output_file_names; + CompactionJobInfo compaction_job_info; + + ASSERT_OK(db_->CompactFiles(CompactionOptions(), inputs, /*output_level=*/1, + /*output_path_id=*/-1, &output_file_names, + &compaction_job_info)); + ASSERT_EQ(kNumFiles * kKeysPerFile + 2, output_file_names.size()); + ASSERT_EQ(kNumFiles * kKeysPerFile + 2, + static_cast<int>(compaction_job_info.input_files.size())); + } +} +#endif // !ROCKSDB_LITE + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |