diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/compact_files_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/compact_files_test.cc | 502 |
1 files changed, 502 insertions, 0 deletions
diff --git a/src/rocksdb/db/compact_files_test.cc b/src/rocksdb/db/compact_files_test.cc new file mode 100644 index 000000000..ef38946f7 --- /dev/null +++ b/src/rocksdb/db/compact_files_test.cc @@ -0,0 +1,502 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include <mutex> +#include <string> +#include <thread> +#include <vector> + +#include "db/db_impl/db_impl.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "util/cast_util.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +class CompactFilesTest : public testing::Test { + public: + CompactFilesTest() { + env_ = Env::Default(); + db_name_ = test::PerThreadDBPath("compact_files_test"); + } + + std::string db_name_; + Env* env_; +}; + +// A class which remembers the name of each flushed file. +class FlushedFileCollector : public EventListener { + public: + FlushedFileCollector() {} + ~FlushedFileCollector() override {} + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + std::lock_guard<std::mutex> lock(mutex_); + flushed_files_.push_back(info.file_path); + } + + std::vector<std::string> GetFlushedFiles() { + std::lock_guard<std::mutex> lock(mutex_); + std::vector<std::string> result; + for (auto fname : flushed_files_) { + result.push_back(fname); + } + return result; + } + void ClearFlushedFiles() { + std::lock_guard<std::mutex> lock(mutex_); + flushed_files_.clear(); + } + + private: + std::vector<std::string> flushed_files_; + std::mutex mutex_; +}; + +TEST_F(CompactFilesTest, L0ConflictsFiles) { + Options options; + // to trigger compaction more easily + const int kWriteBufferSize = 10000; + const int kLevel0Trigger = 2; + options.create_if_missing = true; + options.compaction_style = kCompactionStyleLevel; + // Small slowdown and stop trigger for experimental purpose. + options.level0_slowdown_writes_trigger = 20; + options.level0_stop_writes_trigger = 20; + options.level0_stop_writes_trigger = 20; + options.write_buffer_size = kWriteBufferSize; + options.level0_file_num_compaction_trigger = kLevel0Trigger; + options.compression = kNoCompression; + + DB* db = nullptr; + ASSERT_OK(DestroyDB(db_name_, options)); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"CompactFilesImpl:0", "BackgroundCallCompaction:0"}, + {"BackgroundCallCompaction:1", "CompactFilesImpl:1"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // create couple files + // Background compaction starts and waits in BackgroundCallCompaction:0 + for (int i = 0; i < kLevel0Trigger * 4; ++i) { + ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), "")); + ASSERT_OK(db->Put(WriteOptions(), std::to_string(100 - i), "")); + ASSERT_OK(db->Flush(FlushOptions())); + } + + ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta; + db->GetColumnFamilyMetaData(&meta); + std::string file1; + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + if (file1 == "") { + file1 = file.db_path + "/" + file.name; + } else { + std::string file2 = file.db_path + "/" + file.name; + // Another thread starts a compact files and creates an L0 compaction + // The background compaction then notices that there is an L0 compaction + // already in progress and doesn't do an L0 compaction + // Once the background compaction finishes, the compact files finishes + ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), + {file1, file2}, 0)); + break; + } + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + delete db; +} + +TEST_F(CompactFilesTest, MultipleLevel) { + Options options; + options.create_if_missing = true; + options.level_compaction_dynamic_level_bytes = true; + options.num_levels = 6; + // Add listener + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + ASSERT_OK(DestroyDB(db_name_, options)); + Status s = DB::Open(options, db_name_, &db); + ASSERT_OK(s); + ASSERT_NE(db, nullptr); + + // create couple files in L0, L3, L4 and L5 + for (int i = 5; i > 2; --i) { + collector->ClearFlushedFiles(); + ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), "")); + ASSERT_OK(db->Flush(FlushOptions())); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork()); + auto l0_files = collector->GetFlushedFiles(); + ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i)); + + std::string prop; + ASSERT_TRUE(db->GetProperty( + "rocksdb.num-files-at-level" + std::to_string(i), &prop)); + ASSERT_EQ("1", prop); + } + ASSERT_OK(db->Put(WriteOptions(), std::to_string(0), "")); + ASSERT_OK(db->Flush(FlushOptions())); + + ColumnFamilyMetaData meta; + db->GetColumnFamilyMetaData(&meta); + // Compact files except the file in L3 + std::vector<std::string> files; + for (int i = 0; i < 6; ++i) { + if (i == 3) continue; + for (auto& file : meta.levels[i].files) { + files.push_back(file.db_path + "/" + file.name); + } + } + + SyncPoint::GetInstance()->LoadDependency({ + {"CompactionJob::Run():Start", "CompactFilesTest.MultipleLevel:0"}, + {"CompactFilesTest.MultipleLevel:1", "CompactFilesImpl:3"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::thread thread([&] { + TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:0"); + ASSERT_OK(db->Put(WriteOptions(), "bar", "v2")); + ASSERT_OK(db->Put(WriteOptions(), "foo", "v2")); + ASSERT_OK(db->Flush(FlushOptions())); + TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:1"); + }); + + // Compaction cannot move up the data to higher level + // here we have input file from level 5, so the output level has to be >= 5 + for (int invalid_output_level = 0; invalid_output_level < 5; + invalid_output_level++) { + s = db->CompactFiles(CompactionOptions(), files, invalid_output_level); + std::cout << s.ToString() << std::endl; + ASSERT_TRUE(s.IsInvalidArgument()); + } + + ASSERT_OK(db->CompactFiles(CompactionOptions(), files, 5)); + SyncPoint::GetInstance()->DisableProcessing(); + thread.join(); + + delete db; +} + +TEST_F(CompactFilesTest, ObsoleteFiles) { + Options options; + // to trigger compaction more easily + const int kWriteBufferSize = 65536; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + options.level0_slowdown_writes_trigger = (1 << 30); + options.level0_stop_writes_trigger = (1 << 30); + options.write_buffer_size = kWriteBufferSize; + options.max_write_buffer_number = 2; + options.compression = kNoCompression; + + // Add listener + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + ASSERT_OK(DestroyDB(db_name_, options)); + Status s = DB::Open(options, db_name_, &db); + ASSERT_OK(s); + ASSERT_NE(db, nullptr); + + // create couple files + for (int i = 1000; i < 2000; ++i) { + ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), + std::string(kWriteBufferSize / 10, 'a' + (i % 26)))); + } + + auto l0_files = collector->GetFlushedFiles(); + ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); + ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact()); + + // verify all compaction input files are deleted + for (auto fname : l0_files) { + ASSERT_EQ(Status::NotFound(), env_->FileExists(fname)); + } + delete db; +} + +TEST_F(CompactFilesTest, NotCutOutputOnLevel0) { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + options.level0_slowdown_writes_trigger = 1000; + options.level0_stop_writes_trigger = 1000; + options.write_buffer_size = 65536; + options.max_write_buffer_number = 2; + options.compression = kNoCompression; + options.max_compaction_bytes = 5000; + + // Add listener + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + ASSERT_OK(DestroyDB(db_name_, options)); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + + // create couple files + for (int i = 0; i < 500; ++i) { + ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), + std::string(1000, 'a' + (i % 26)))); + } + ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable()); + auto l0_files_1 = collector->GetFlushedFiles(); + collector->ClearFlushedFiles(); + for (int i = 0; i < 500; ++i) { + ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), + std::string(1000, 'a' + (i % 26)))); + } + ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable()); + auto l0_files_2 = collector->GetFlushedFiles(); + ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0)); + ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0)); + // no assertion failure + delete db; +} + +TEST_F(CompactFilesTest, CapturingPendingFiles) { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + // Always do full scans for obsolete files (needed to reproduce the issue). + options.delete_obsolete_files_period_micros = 0; + + // Add listener. + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + ASSERT_OK(DestroyDB(db_name_, options)); + Status s = DB::Open(options, db_name_, &db); + ASSERT_OK(s); + assert(db); + + // Create 5 files. + for (int i = 0; i < 5; ++i) { + ASSERT_OK(db->Put(WriteOptions(), "key" + std::to_string(i), "value")); + ASSERT_OK(db->Flush(FlushOptions())); + } + + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork()); + auto l0_files = collector->GetFlushedFiles(); + EXPECT_EQ(5, l0_files.size()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"}, + {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Start compacting files. + ROCKSDB_NAMESPACE::port::Thread compaction_thread( + [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); }); + + // In the meantime flush another file. + TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0"); + ASSERT_OK(db->Put(WriteOptions(), "key5", "value")); + ASSERT_OK(db->Flush(FlushOptions())); + TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1"); + + compaction_thread.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + delete db; + + // Make sure we can reopen the DB. + s = DB::Open(options, db_name_, &db); + ASSERT_OK(s); + assert(db); + delete db; +} + +TEST_F(CompactFilesTest, CompactionFilterWithGetSv) { + class FilterWithGet : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + if (db_ == nullptr) { + return true; + } + std::string res; + db_->Get(ReadOptions(), "", &res); + return true; + } + + void SetDB(DB* db) { db_ = db; } + + const char* Name() const override { return "FilterWithGet"; } + + private: + DB* db_; + }; + + std::shared_ptr<FilterWithGet> cf(new FilterWithGet()); + + Options options; + options.create_if_missing = true; + options.compaction_filter = cf.get(); + + DB* db = nullptr; + ASSERT_OK(DestroyDB(db_name_, options)); + Status s = DB::Open(options, db_name_, &db); + ASSERT_OK(s); + + cf->SetDB(db); + + // Write one L0 file + ASSERT_OK(db->Put(WriteOptions(), "K1", "V1")); + ASSERT_OK(db->Flush(FlushOptions())); + + // Compact all L0 files using CompactFiles + ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta; + db->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + std::string fname = file.db_path + "/" + file.name; + ASSERT_OK( + db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0)); + } + + delete db; +} + +TEST_F(CompactFilesTest, SentinelCompressionType) { + if (!Zlib_Supported()) { + fprintf(stderr, "zlib compression not supported, skip this test\n"); + return; + } + if (!Snappy_Supported()) { + fprintf(stderr, "snappy compression not supported, skip this test\n"); + return; + } + // Check that passing `CompressionType::kDisableCompressionOption` to + // `CompactFiles` causes it to use the column family compression options. + for (auto compaction_style : {CompactionStyle::kCompactionStyleLevel, + CompactionStyle::kCompactionStyleUniversal, + CompactionStyle::kCompactionStyleNone}) { + ASSERT_OK(DestroyDB(db_name_, Options())); + Options options; + options.compaction_style = compaction_style; + // L0: Snappy, L1: ZSTD, L2: Snappy + options.compression_per_level = {CompressionType::kSnappyCompression, + CompressionType::kZlibCompression, + CompressionType::kSnappyCompression}; + options.create_if_missing = true; + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + DB* db = nullptr; + ASSERT_OK(DB::Open(options, db_name_, &db)); + + ASSERT_OK(db->Put(WriteOptions(), "key", "val")); + ASSERT_OK(db->Flush(FlushOptions())); + + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork()); + auto l0_files = collector->GetFlushedFiles(); + ASSERT_EQ(1, l0_files.size()); + + // L0->L1 compaction, so output should be ZSTD-compressed + CompactionOptions compaction_opts; + compaction_opts.compression = CompressionType::kDisableCompressionOption; + ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1)); + + ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props; + ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props)); + for (const auto& name_and_table_props : all_tables_props) { + ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression), + name_and_table_props.second->compression_name); + } + delete db; + } +} + +TEST_F(CompactFilesTest, GetCompactionJobInfo) { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + options.level0_slowdown_writes_trigger = 1000; + options.level0_stop_writes_trigger = 1000; + options.write_buffer_size = 65536; + options.max_write_buffer_number = 2; + options.compression = kNoCompression; + options.max_compaction_bytes = 5000; + + // Add listener + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + ASSERT_OK(DestroyDB(db_name_, options)); + Status s = DB::Open(options, db_name_, &db); + ASSERT_OK(s); + assert(db); + + // create couple files + for (int i = 0; i < 500; ++i) { + ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), + std::string(1000, 'a' + (i % 26)))); + } + ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable()); + auto l0_files_1 = collector->GetFlushedFiles(); + CompactionOptions co; + co.compression = CompressionType::kLZ4Compression; + CompactionJobInfo compaction_job_info{}; + ASSERT_OK( + db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info)); + ASSERT_EQ(compaction_job_info.base_input_level, 0); + ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID()); + ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName()); + ASSERT_EQ(compaction_job_info.compaction_reason, + CompactionReason::kManualCompaction); + ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression); + ASSERT_EQ(compaction_job_info.output_level, 0); + ASSERT_OK(compaction_job_info.status); + // no assertion failure + delete db; +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE |