summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/db_with_timestamp_compaction_test.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/db_with_timestamp_compaction_test.cc334
1 files changed, 334 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_with_timestamp_compaction_test.cc b/src/rocksdb/db/db_with_timestamp_compaction_test.cc
new file mode 100644
index 000000000..d28f67e05
--- /dev/null
+++ b/src/rocksdb/db/db_with_timestamp_compaction_test.cc
@@ -0,0 +1,334 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/compaction/compaction.h"
+#include "db/db_test_util.h"
+#include "port/stack_trace.h"
+#include "test_util/testutil.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+std::string Key1(uint64_t key) {
+ std::string ret;
+ PutFixed64(&ret, key);
+ std::reverse(ret.begin(), ret.end());
+ return ret;
+}
+
+std::string Timestamp(uint64_t ts) {
+ std::string ret;
+ PutFixed64(&ret, ts);
+ return ret;
+}
+} // anonymous namespace
+
+class TimestampCompatibleCompactionTest : public DBTestBase {
+ public:
+ TimestampCompatibleCompactionTest()
+ : DBTestBase("ts_compatible_compaction_test", /*env_do_fsync=*/true) {}
+
+ std::string Get(const std::string& key, uint64_t ts) {
+ ReadOptions read_opts;
+ std::string ts_str = Timestamp(ts);
+ Slice ts_slice = ts_str;
+ read_opts.timestamp = &ts_slice;
+ std::string value;
+ Status s = db_->Get(read_opts, key, &value);
+ if (s.IsNotFound()) {
+ value.assign("NOT_FOUND");
+ } else if (!s.ok()) {
+ value.assign(s.ToString());
+ }
+ return value;
+ }
+};
+
+TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.compaction_style = kCompactionStyleLevel;
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+ options.level0_file_num_compaction_trigger = 3;
+ constexpr size_t kNumKeysPerFile = 101;
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(kNumKeysPerFile));
+ DestroyAndReopen(options);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+ const auto* compaction = reinterpret_cast<Compaction*>(arg);
+ ASSERT_NE(nullptr, compaction);
+ ASSERT_EQ(0, compaction->start_level());
+ ASSERT_EQ(1, compaction->num_input_levels());
+ // Check that all 3 L0 ssts are picked for level compaction.
+ ASSERT_EQ(3, compaction->num_input_files(0));
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ // Write a L0 with keys 0, 1, ..., 99 with ts from 100 to 199.
+ uint64_t ts = 100;
+ uint64_t key = 0;
+ WriteOptions write_opts;
+ for (; key < kNumKeysPerFile - 1; ++key, ++ts) {
+ std::string ts_str = Timestamp(ts);
+ ASSERT_OK(
+ db_->Put(write_opts, Key1(key), ts_str, "foo_" + std::to_string(key)));
+ }
+ // Write another L0 with keys 99 with newer ts.
+ ASSERT_OK(Flush());
+ uint64_t saved_read_ts1 = ts++;
+ key = 99;
+ for (int i = 0; i < 4; ++i, ++ts) {
+ std::string ts_str = Timestamp(ts);
+ ASSERT_OK(
+ db_->Put(write_opts, Key1(key), ts_str, "bar_" + std::to_string(key)));
+ }
+ ASSERT_OK(Flush());
+ uint64_t saved_read_ts2 = ts++;
+ // Write another L0 with keys 99, 100, 101, ..., 150
+ for (; key <= 150; ++key, ++ts) {
+ std::string ts_str = Timestamp(ts);
+ ASSERT_OK(
+ db_->Put(write_opts, Key1(key), ts_str, "foo1_" + std::to_string(key)));
+ }
+ ASSERT_OK(Flush());
+ // Wait for compaction to finish
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ uint64_t read_ts = ts;
+ ASSERT_EQ("foo_99", Get(Key1(99), saved_read_ts1));
+ ASSERT_EQ("bar_99", Get(Key1(99), saved_read_ts2));
+ ASSERT_EQ("foo1_99", Get(Key1(99), read_ts));
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(TimestampCompatibleCompactionTest, MultipleSubCompactions) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.compaction_style = kCompactionStyleUniversal;
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+ options.level0_file_num_compaction_trigger = 3;
+ options.max_subcompactions = 3;
+ options.target_file_size_base = 1024;
+ options.statistics = CreateDBStatistics();
+ DestroyAndReopen(options);
+
+ uint64_t ts = 100;
+ uint64_t key = 0;
+ WriteOptions write_opts;
+
+ // Write keys 0, 1, ..., 499 with ts from 100 to 599.
+ {
+ for (; key <= 499; ++key, ++ts) {
+ std::string ts_str = Timestamp(ts);
+ ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str,
+ "foo_" + std::to_string(key)));
+ }
+ }
+
+ // Write keys 500, ..., 999 with ts from 600 to 1099.
+ {
+ for (; key <= 999; ++key, ++ts) {
+ std::string ts_str = Timestamp(ts);
+ ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str,
+ "foo_" + std::to_string(key)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ // Wait for compaction to finish
+ {
+ ASSERT_OK(dbfull()->RunManualCompaction(
+ static_cast_with_check<ColumnFamilyHandleImpl>(
+ db_->DefaultColumnFamily())
+ ->cfd(),
+ 0 /* input_level */, 1 /* output_level */, CompactRangeOptions(),
+ nullptr /* begin */, nullptr /* end */, true /* exclusive */,
+ true /* disallow_trivial_move */,
+ std::numeric_limits<uint64_t>::max() /* max_file_num_to_ignore */,
+ "" /*trim_ts*/));
+ }
+
+ // Check stats to make sure multiple subcompactions were scheduled for
+ // boundaries not to be nullptr.
+ {
+ HistogramData num_sub_compactions;
+ options.statistics->histogramData(NUM_SUBCOMPACTIONS_SCHEDULED,
+ &num_sub_compactions);
+ ASSERT_GT(num_sub_compactions.sum, 1);
+ }
+
+ for (key = 0; key <= 999; ++key) {
+ ASSERT_EQ("foo_" + std::to_string(key), Get(Key1(key), ts));
+ }
+}
+
+class TestFilePartitioner : public SstPartitioner {
+ public:
+ explicit TestFilePartitioner() {}
+ ~TestFilePartitioner() override {}
+
+ const char* Name() const override { return "TestFilePartitioner"; }
+ PartitionerResult ShouldPartition(
+ const PartitionerRequest& /*request*/) override {
+ return PartitionerResult::kRequired;
+ }
+ bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
+ const Slice& /*largest_user_key*/) override {
+ return false;
+ }
+};
+
+class TestFilePartitionerFactory : public SstPartitionerFactory {
+ public:
+ explicit TestFilePartitionerFactory() {}
+ std::unique_ptr<SstPartitioner> CreatePartitioner(
+ const SstPartitioner::Context& /*context*/) const override {
+ std::unique_ptr<SstPartitioner> ret =
+ std::make_unique<TestFilePartitioner>();
+ return ret;
+ }
+ const char* Name() const override { return "TestFilePartitionerFactory"; }
+};
+
+#ifndef ROCKSDB_LITE
+TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL0) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.sst_partitioner_factory =
+ std::make_shared<TestFilePartitionerFactory>();
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+
+ constexpr int kNumFiles = 10;
+ constexpr int kKeysPerFile = 2;
+ const std::string user_key = "foo";
+ constexpr uint64_t start_ts = 10000;
+
+ uint64_t cur_ts = start_ts;
+ for (int k = 0; k < kNumFiles; ++k) {
+ for (int i = 0; i < kKeysPerFile; ++i) {
+ ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts),
+ "v" + std::to_string(i)));
+ ++cur_ts;
+ }
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ }
+
+ std::vector<std::string> input_files{};
+ {
+ std::vector<std::string> files;
+ ASSERT_OK(env_->GetChildren(dbname_, &files));
+ for (const auto& f : files) {
+ uint64_t file_num = 0;
+ FileType file_type = FileType::kWalFile;
+ if (!ParseFileName(f, &file_num, &file_type) ||
+ file_type != FileType::kTableFile) {
+ continue;
+ }
+ input_files.emplace_back(f);
+ }
+ // sorting here by name, which also happens to sort by generation date.
+ std::sort(input_files.begin(), input_files.end());
+ assert(kNumFiles == input_files.size());
+ std::vector<std::string> tmp;
+ tmp.emplace_back(input_files[input_files.size() / 2]);
+ input_files.swap(tmp);
+ }
+
+ {
+ std::vector<std::string> output_file_names;
+ CompactionJobInfo compaction_job_info;
+ ASSERT_OK(db_->CompactFiles(CompactionOptions(), input_files,
+ /*output_level=*/1, /*output_path_id=*/-1,
+ &output_file_names, &compaction_job_info));
+ // We expect the L0 files older than the original provided input were all
+ // included in the compaction.
+ ASSERT_EQ(static_cast<size_t>(kNumFiles / 2 + 1),
+ compaction_job_info.input_files.size());
+ }
+}
+
+TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL1) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.sst_partitioner_factory =
+ std::make_shared<TestFilePartitionerFactory>();
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+
+ constexpr int kNumFiles = 4;
+ options.level0_file_num_compaction_trigger = kNumFiles;
+
+ DestroyAndReopen(options);
+
+ constexpr int kKeysPerFile = 2;
+ const std::string user_key = "foo";
+ constexpr uint64_t start_ts = 10000;
+
+ uint64_t cur_ts = start_ts;
+ // Generate some initial files in both L0 and L1.
+ for (int k = 0; k < kNumFiles; ++k) {
+ for (int i = 0; i < kKeysPerFile; ++i) {
+ ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts),
+ "v" + std::to_string(i)));
+ ++cur_ts;
+ }
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0));
+ ASSERT_EQ(kNumFiles * kKeysPerFile,
+ NumTableFilesAtLevel(/*level=*/1, /*cf=*/0));
+
+ constexpr int additional_l0s = 2;
+ for (int i = 0; i < additional_l0s; ++i, ++cur_ts) {
+ ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts), "v"));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ }
+ ASSERT_EQ(additional_l0s, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0));
+
+ std::vector<std::string> inputs;
+ {
+ std::vector<LiveFileMetaData> fmetas;
+ db_->GetLiveFilesMetaData(&fmetas);
+ bool included_one_l1 = false;
+ for (const auto& meta : fmetas) {
+ if (meta.level == 0) {
+ inputs.emplace_back(meta.relative_filename);
+ } else if (!included_one_l1) {
+ inputs.emplace_back(meta.relative_filename);
+ included_one_l1 = true;
+ }
+ }
+ }
+ ASSERT_EQ(static_cast<size_t>(3), inputs.size());
+ {
+ std::vector<std::string> output_file_names;
+ CompactionJobInfo compaction_job_info;
+
+ ASSERT_OK(db_->CompactFiles(CompactionOptions(), inputs, /*output_level=*/1,
+ /*output_path_id=*/-1, &output_file_names,
+ &compaction_job_info));
+ ASSERT_EQ(kNumFiles * kKeysPerFile + 2, output_file_names.size());
+ ASSERT_EQ(kNumFiles * kKeysPerFile + 2,
+ static_cast<int>(compaction_job_info.input_files.size()));
+ }
+}
+#endif // !ROCKSDB_LITE
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}